From c0463be82447f269d9947d16a40881dbbe600827 Mon Sep 17 00:00:00 2001
From: Christian Pointner <equinox@helsinki.at>
Date: Tue, 22 Dec 2015 00:45:49 +0100
Subject: basic session handling works now


diff --git a/src/helsinki.at/rhimport/fetcher.go b/src/helsinki.at/rhimport/fetcher.go
index 1f0b364..36d7055 100644
--- a/src/helsinki.at/rhimport/fetcher.go
+++ b/src/helsinki.at/rhimport/fetcher.go
@@ -193,7 +193,7 @@ func FetchFileFake(ctx *ImportContext, res *FetchResult, uri *url.URL) error {
 			time.Sleep(100 * time.Millisecond)
 		}
 		if ctx.ProgressCallBack != nil {
-			ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData)
+			ctx.ProgressCallBack(1, "faking", 1.0, ctx.ProgressCallBackData)
 		}
 		ctx.SourceFile = "/nonexistend/fake.mp3"
 		ctx.DeleteSourceFile = false
diff --git a/src/helsinki.at/rhimport/importer.go b/src/helsinki.at/rhimport/importer.go
index 1bfdf0c..a245270 100644
--- a/src/helsinki.at/rhimport/importer.go
+++ b/src/helsinki.at/rhimport/importer.go
@@ -693,18 +693,20 @@ func ImportFile(ctx *ImportContext) (res *ImportResult, err error) {
 	}
 
 	if ctx.Cart != 0 && ctx.Cut != 0 {
-		if err = import_audio(ctx, res); err != nil {
-			return
-		}
-		if res.ResponseCode != http.StatusOK {
-			rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", res.Cart, res.Cut, res.ErrorString)
+		if err = import_audio(ctx, res); err != nil || res.ResponseCode != http.StatusOK {
+			if err != nil {
+				rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", ctx.Cart, ctx.Cut, err)
+			} else {
+				rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", res.Cart, res.Cut, res.ErrorString)
+			}
+			// Try to clean up after failed import
 			rmres := ImportResult{ResponseCode: http.StatusOK}
 			if rmCartOnErr {
-				if err = remove_cart(ctx, &rmres); err != nil {
+				if rerr := remove_cart(ctx, &rmres); rerr != nil {
 					return
 				}
 			} else if rmCutOnErr {
-				if err = remove_cut(ctx, &rmres); err != nil {
+				if rerr := remove_cut(ctx, &rmres); rerr != nil {
 					return
 				}
 			}
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go
index 8f20958..08a33ce 100644
--- a/src/helsinki.at/rhimport/session.go
+++ b/src/helsinki.at/rhimport/session.go
@@ -25,7 +25,14 @@
 package rhimport
 
 import (
-	"time"
+	"net/http"
+)
+
+const (
+	SESSION_NEW = iota
+	SESSION_RUNNING
+	SESSION_CANCELED
+	SESSION_DONE
 )
 
 type SessionChan struct {
@@ -37,8 +44,9 @@ type SessionChan struct {
 
 type Session struct {
 	ctx             ImportContext
-	running         bool
+	state           int
 	done            chan bool
+	cancelIntChan   chan bool
 	progressIntChan chan ProgressData
 	doneIntChan     chan ImportResult
 	runChan         chan bool
@@ -79,26 +87,35 @@ func session_progress_callback(step int, step_name string, progress float64, use
 	out <- ProgressData{step, step_name, progress}
 }
 
-// TODO: actually call import here
 func session_import_run(ctx ImportContext, done chan<- ImportResult) {
-	rhdl.Printf("faking import for: %+v", ctx)
-	for i := 0; i < 100; i++ {
-		if ctx.ProgressCallBack != nil {
-			ctx.ProgressCallBack(42, "faking", float64(i)/100.0, ctx.ProgressCallBackData)
-		}
-		time.Sleep(100 * time.Millisecond)
+	if err := ctx.SanityCheck(); err != nil {
+		done <- ImportResult{http.StatusBadRequest, err.Error(), 0, 0}
+		return
 	}
-	if ctx.ProgressCallBack != nil {
-		ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData)
+
+	if res, err := FetchFile(&ctx); err != nil {
+		done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0}
+		return
+	} else if res.ResponseCode != http.StatusOK {
+		done <- ImportResult{res.ResponseCode, res.ErrorString, 0, 0}
+		return
+	}
+
+	if res, err := ImportFile(&ctx); err != nil {
+		done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0}
+		return
+	} else {
+		done <- *res
+		return
 	}
-	done <- ImportResult{200, "OK", 0, 0}
 }
 
 func (self *Session) run() {
 	self.ctx.ProgressCallBack = session_progress_callback
 	self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan)
+	self.ctx.Cancel = self.cancelIntChan
 	go session_import_run(self.ctx, self.doneIntChan)
-	self.running = true
+	self.state = SESSION_RUNNING
 	return
 }
 
@@ -149,13 +166,16 @@ func (self *Session) dispatchRequests() {
 	for {
 		select {
 		case <-self.runChan:
-			if !self.running {
+			if self.state == SESSION_NEW {
 				self.run()
 			}
 		case <-self.cancelChan:
-			if self.running {
-				rhdl.Println("Session: canceling running imports is not yet implemented")
-				// TODO: send cancel to import goroutine once this got implemented
+			if self.state == SESSION_RUNNING {
+				rhdl.Println("Session: canceling running import")
+				select {
+				case self.cancelIntChan <- true:
+				default: // session got canceled already
+				}
 			} else {
 				return
 			}
@@ -167,7 +187,7 @@ func (self *Session) dispatchRequests() {
 			rhdl.Printf("Session: got progress: %+v", progress)
 			// TODO: call all subscribed progress handler
 		case result := <-self.doneIntChan:
-			self.running = false
+			self.state = SESSION_DONE
 			rhdl.Printf("Session: import is done: %+v", result)
 			// TODO: call all subscribed done handler
 		}
@@ -184,9 +204,8 @@ func (self *Session) getInterface() *SessionChan {
 }
 
 func (self *Session) Cleanup() {
-	// TODO: this blocks if dispatchRequests has ended already...
-	//       or if cancel doesn't work...
 	self.cancelChan <- true
+	rhdl.Printf("waiting for session to close")
 	<-self.done
 	close(self.done)
 	close(self.progressIntChan)
@@ -195,13 +214,15 @@ func (self *Session) Cleanup() {
 	close(self.cancelChan)
 	close(self.addProgressChan)
 	close(self.addDoneChan)
+	rhdl.Printf("session is now cleaned up")
 }
 
 func NewSession(ctx *ImportContext) (session *Session) {
 	session = new(Session)
-	session.running = false
+	session.state = SESSION_NEW
 	session.ctx = *ctx
 	session.done = make(chan bool)
+	session.cancelIntChan = make(chan bool, 1)
 	session.progressIntChan = make(chan ProgressData)
 	session.doneIntChan = make(chan ImportResult)
 	session.runChan = make(chan bool)
diff --git a/src/helsinki.at/rhimport/session_store.go b/src/helsinki.at/rhimport/session_store.go
index b2e1538..8dd74c7 100644
--- a/src/helsinki.at/rhimport/session_store.go
+++ b/src/helsinki.at/rhimport/session_store.go
@@ -130,7 +130,7 @@ func (self *SessionStoreChan) Get(user, id string) (*SessionChan, error) {
 
 func (self *SessionStore) remove(user, id string) (resp removeSessionResponse) {
 	if session, exists := self.store[user][id]; exists {
-		session.Cleanup()
+		go session.Cleanup() // cleanup could take a while -> don't block all the other stuff
 		delete(self.store[user], id)
 		rhdl.Printf("SessionStore: removed session '%s/%s'", user, id)
 		if userstore, exists := self.store[user]; exists {
diff --git a/src/helsinki.at/rhimportd/ctrlTelnet.go b/src/helsinki.at/rhimportd/ctrlTelnet.go
index b47d2a1..687652c 100644
--- a/src/helsinki.at/rhimportd/ctrlTelnet.go
+++ b/src/helsinki.at/rhimportd/ctrlTelnet.go
@@ -198,7 +198,7 @@ func (c *TelnetClient) handle_cmd_set(args []string) {
 	}
 	if c.ctx == nil {
 		c.ctx = rhimport.NewImportContext(c.conf, c.rddb, "")
-		c.ctx.Trusted = true
+		c.ctx.Trusted = false
 	}
 	switch strings.ToLower(args[0]) {
 	case "username":
diff --git a/src/helsinki.at/rhimportd/ctrlWebSimple.go b/src/helsinki.at/rhimportd/ctrlWebSimple.go
index 8ea5ac3..f0e9f29 100644
--- a/src/helsinki.at/rhimportd/ctrlWebSimple.go
+++ b/src/helsinki.at/rhimportd/ctrlWebSimple.go
@@ -143,6 +143,7 @@ func webSimpleHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, trusted bo
 	}
 	if fres.ResponseCode != http.StatusOK {
 		webSimpleErrorResponse(w, fres.ResponseCode, fres.ErrorString)
+		return
 	}
 
 	var ires *rhimport.ImportResult
diff --git a/src/helsinki.at/rhimportd/main.go b/src/helsinki.at/rhimportd/main.go
index 4b6ddf4..b90df0a 100644
--- a/src/helsinki.at/rhimportd/main.go
+++ b/src/helsinki.at/rhimportd/main.go
@@ -26,7 +26,6 @@ package main
 
 import (
 	"flag"
-	"fmt"
 	"log"
 	"os"
 	"os/signal"
@@ -53,31 +52,29 @@ func session_test(conf *rhimport.Config, rddb *rhimport.RdDbChan) {
 
 	store := sessions.GetInterface()
 
-	ctx := rhimport.NewImportContext(conf, rddb, "hugo")
+	ctx := rhimport.NewImportContext(conf, rddb, "heslinki")
+	ctx.Trusted = true
+	ctx.ShowId = 10002
+	ctx.SourceUri = "fake://10"
+
 	id, session, err := store.New(ctx)
 	if err != nil {
-		rhl.Printf("Error SessionStore.New(): %s", err)
+		rhl.Printf("MAIN: Error SessionStore.New(): %s", err)
 		return
 	}
 
-	fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
-
+	rhl.Printf("MAIN: calling run for heslinki/%s", id)
 	session.Run()
 
-	fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
-
+	rhl.Printf("MAIN: waiting for 2 secondes")
 	time.Sleep(5 * time.Second)
 
-	store.Remove("hugo", id)
-	fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
-
-	time.Sleep(6 * time.Second)
-
-	fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
-
-	store.Remove("hugo", id)
+	rhl.Printf("MAIN: calling remove for heslinki/%s", id)
+	if err = store.Remove("heslinki", id); err != nil {
+		rhl.Printf("MAIN: Error SessionStore.Remove(): %s", err)
+	}
 
-	fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
+	rhl.Printf("MAIN: remove done")
 }
 
 func main() {
@@ -108,7 +105,7 @@ func main() {
 	}
 	defer rddb.Cleanup()
 
-	//	go session_test(conf, rddb.GetInterface())
+	go session_test(conf, rddb.GetInterface())
 
 	var wg sync.WaitGroup
 
-- 
cgit v0.10.2