summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2015-12-21 23:45:49 (GMT)
committerChristian Pointner <equinox@helsinki.at>2015-12-21 23:45:49 (GMT)
commitc0463be82447f269d9947d16a40881dbbe600827 (patch)
tree323dbe19236ee179bea46c9d40af54e2a3403eb3
parentb917755aab166ebb59ebdb5356114d373ce5cb91 (diff)
basic session handling works now
-rw-r--r--src/helsinki.at/rhimport/fetcher.go2
-rw-r--r--src/helsinki.at/rhimport/importer.go16
-rw-r--r--src/helsinki.at/rhimport/session.go63
-rw-r--r--src/helsinki.at/rhimport/session_store.go2
-rw-r--r--src/helsinki.at/rhimportd/ctrlTelnet.go2
-rw-r--r--src/helsinki.at/rhimportd/ctrlWebSimple.go1
-rw-r--r--src/helsinki.at/rhimportd/main.go31
7 files changed, 69 insertions, 48 deletions
diff --git a/src/helsinki.at/rhimport/fetcher.go b/src/helsinki.at/rhimport/fetcher.go
index 1f0b364..36d7055 100644
--- a/src/helsinki.at/rhimport/fetcher.go
+++ b/src/helsinki.at/rhimport/fetcher.go
@@ -193,7 +193,7 @@ func FetchFileFake(ctx *ImportContext, res *FetchResult, uri *url.URL) error {
time.Sleep(100 * time.Millisecond)
}
if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData)
+ ctx.ProgressCallBack(1, "faking", 1.0, ctx.ProgressCallBackData)
}
ctx.SourceFile = "/nonexistend/fake.mp3"
ctx.DeleteSourceFile = false
diff --git a/src/helsinki.at/rhimport/importer.go b/src/helsinki.at/rhimport/importer.go
index 1bfdf0c..a245270 100644
--- a/src/helsinki.at/rhimport/importer.go
+++ b/src/helsinki.at/rhimport/importer.go
@@ -693,18 +693,20 @@ func ImportFile(ctx *ImportContext) (res *ImportResult, err error) {
}
if ctx.Cart != 0 && ctx.Cut != 0 {
- if err = import_audio(ctx, res); err != nil {
- return
- }
- if res.ResponseCode != http.StatusOK {
- rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", res.Cart, res.Cut, res.ErrorString)
+ if err = import_audio(ctx, res); err != nil || res.ResponseCode != http.StatusOK {
+ if err != nil {
+ rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", ctx.Cart, ctx.Cut, err)
+ } else {
+ rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", res.Cart, res.Cut, res.ErrorString)
+ }
+ // Try to clean up after failed import
rmres := ImportResult{ResponseCode: http.StatusOK}
if rmCartOnErr {
- if err = remove_cart(ctx, &rmres); err != nil {
+ if rerr := remove_cart(ctx, &rmres); rerr != nil {
return
}
} else if rmCutOnErr {
- if err = remove_cut(ctx, &rmres); err != nil {
+ if rerr := remove_cut(ctx, &rmres); rerr != nil {
return
}
}
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go
index 8f20958..08a33ce 100644
--- a/src/helsinki.at/rhimport/session.go
+++ b/src/helsinki.at/rhimport/session.go
@@ -25,7 +25,14 @@
package rhimport
import (
- "time"
+ "net/http"
+)
+
+const (
+ SESSION_NEW = iota
+ SESSION_RUNNING
+ SESSION_CANCELED
+ SESSION_DONE
)
type SessionChan struct {
@@ -37,8 +44,9 @@ type SessionChan struct {
type Session struct {
ctx ImportContext
- running bool
+ state int
done chan bool
+ cancelIntChan chan bool
progressIntChan chan ProgressData
doneIntChan chan ImportResult
runChan chan bool
@@ -79,26 +87,35 @@ func session_progress_callback(step int, step_name string, progress float64, use
out <- ProgressData{step, step_name, progress}
}
-// TODO: actually call import here
func session_import_run(ctx ImportContext, done chan<- ImportResult) {
- rhdl.Printf("faking import for: %+v", ctx)
- for i := 0; i < 100; i++ {
- if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(42, "faking", float64(i)/100.0, ctx.ProgressCallBackData)
- }
- time.Sleep(100 * time.Millisecond)
+ if err := ctx.SanityCheck(); err != nil {
+ done <- ImportResult{http.StatusBadRequest, err.Error(), 0, 0}
+ return
}
- if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData)
+
+ if res, err := FetchFile(&ctx); err != nil {
+ done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0}
+ return
+ } else if res.ResponseCode != http.StatusOK {
+ done <- ImportResult{res.ResponseCode, res.ErrorString, 0, 0}
+ return
+ }
+
+ if res, err := ImportFile(&ctx); err != nil {
+ done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0}
+ return
+ } else {
+ done <- *res
+ return
}
- done <- ImportResult{200, "OK", 0, 0}
}
func (self *Session) run() {
self.ctx.ProgressCallBack = session_progress_callback
self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan)
+ self.ctx.Cancel = self.cancelIntChan
go session_import_run(self.ctx, self.doneIntChan)
- self.running = true
+ self.state = SESSION_RUNNING
return
}
@@ -149,13 +166,16 @@ func (self *Session) dispatchRequests() {
for {
select {
case <-self.runChan:
- if !self.running {
+ if self.state == SESSION_NEW {
self.run()
}
case <-self.cancelChan:
- if self.running {
- rhdl.Println("Session: canceling running imports is not yet implemented")
- // TODO: send cancel to import goroutine once this got implemented
+ if self.state == SESSION_RUNNING {
+ rhdl.Println("Session: canceling running import")
+ select {
+ case self.cancelIntChan <- true:
+ default: // session got canceled already
+ }
} else {
return
}
@@ -167,7 +187,7 @@ func (self *Session) dispatchRequests() {
rhdl.Printf("Session: got progress: %+v", progress)
// TODO: call all subscribed progress handler
case result := <-self.doneIntChan:
- self.running = false
+ self.state = SESSION_DONE
rhdl.Printf("Session: import is done: %+v", result)
// TODO: call all subscribed done handler
}
@@ -184,9 +204,8 @@ func (self *Session) getInterface() *SessionChan {
}
func (self *Session) Cleanup() {
- // TODO: this blocks if dispatchRequests has ended already...
- // or if cancel doesn't work...
self.cancelChan <- true
+ rhdl.Printf("waiting for session to close")
<-self.done
close(self.done)
close(self.progressIntChan)
@@ -195,13 +214,15 @@ func (self *Session) Cleanup() {
close(self.cancelChan)
close(self.addProgressChan)
close(self.addDoneChan)
+ rhdl.Printf("session is now cleaned up")
}
func NewSession(ctx *ImportContext) (session *Session) {
session = new(Session)
- session.running = false
+ session.state = SESSION_NEW
session.ctx = *ctx
session.done = make(chan bool)
+ session.cancelIntChan = make(chan bool, 1)
session.progressIntChan = make(chan ProgressData)
session.doneIntChan = make(chan ImportResult)
session.runChan = make(chan bool)
diff --git a/src/helsinki.at/rhimport/session_store.go b/src/helsinki.at/rhimport/session_store.go
index b2e1538..8dd74c7 100644
--- a/src/helsinki.at/rhimport/session_store.go
+++ b/src/helsinki.at/rhimport/session_store.go
@@ -130,7 +130,7 @@ func (self *SessionStoreChan) Get(user, id string) (*SessionChan, error) {
func (self *SessionStore) remove(user, id string) (resp removeSessionResponse) {
if session, exists := self.store[user][id]; exists {
- session.Cleanup()
+ go session.Cleanup() // cleanup could take a while -> don't block all the other stuff
delete(self.store[user], id)
rhdl.Printf("SessionStore: removed session '%s/%s'", user, id)
if userstore, exists := self.store[user]; exists {
diff --git a/src/helsinki.at/rhimportd/ctrlTelnet.go b/src/helsinki.at/rhimportd/ctrlTelnet.go
index b47d2a1..687652c 100644
--- a/src/helsinki.at/rhimportd/ctrlTelnet.go
+++ b/src/helsinki.at/rhimportd/ctrlTelnet.go
@@ -198,7 +198,7 @@ func (c *TelnetClient) handle_cmd_set(args []string) {
}
if c.ctx == nil {
c.ctx = rhimport.NewImportContext(c.conf, c.rddb, "")
- c.ctx.Trusted = true
+ c.ctx.Trusted = false
}
switch strings.ToLower(args[0]) {
case "username":
diff --git a/src/helsinki.at/rhimportd/ctrlWebSimple.go b/src/helsinki.at/rhimportd/ctrlWebSimple.go
index 8ea5ac3..f0e9f29 100644
--- a/src/helsinki.at/rhimportd/ctrlWebSimple.go
+++ b/src/helsinki.at/rhimportd/ctrlWebSimple.go
@@ -143,6 +143,7 @@ func webSimpleHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, trusted bo
}
if fres.ResponseCode != http.StatusOK {
webSimpleErrorResponse(w, fres.ResponseCode, fres.ErrorString)
+ return
}
var ires *rhimport.ImportResult
diff --git a/src/helsinki.at/rhimportd/main.go b/src/helsinki.at/rhimportd/main.go
index 4b6ddf4..b90df0a 100644
--- a/src/helsinki.at/rhimportd/main.go
+++ b/src/helsinki.at/rhimportd/main.go
@@ -26,7 +26,6 @@ package main
import (
"flag"
- "fmt"
"log"
"os"
"os/signal"
@@ -53,31 +52,29 @@ func session_test(conf *rhimport.Config, rddb *rhimport.RdDbChan) {
store := sessions.GetInterface()
- ctx := rhimport.NewImportContext(conf, rddb, "hugo")
+ ctx := rhimport.NewImportContext(conf, rddb, "heslinki")
+ ctx.Trusted = true
+ ctx.ShowId = 10002
+ ctx.SourceUri = "fake://10"
+
id, session, err := store.New(ctx)
if err != nil {
- rhl.Printf("Error SessionStore.New(): %s", err)
+ rhl.Printf("MAIN: Error SessionStore.New(): %s", err)
return
}
- fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
-
+ rhl.Printf("MAIN: calling run for heslinki/%s", id)
session.Run()
- fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
-
+ rhl.Printf("MAIN: waiting for 2 secondes")
time.Sleep(5 * time.Second)
- store.Remove("hugo", id)
- fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
-
- time.Sleep(6 * time.Second)
-
- fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
-
- store.Remove("hugo", id)
+ rhl.Printf("MAIN: calling remove for heslinki/%s", id)
+ if err = store.Remove("heslinki", id); err != nil {
+ rhl.Printf("MAIN: Error SessionStore.Remove(): %s", err)
+ }
- fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
+ rhl.Printf("MAIN: remove done")
}
func main() {
@@ -108,7 +105,7 @@ func main() {
}
defer rddb.Cleanup()
- // go session_test(conf, rddb.GetInterface())
+ go session_test(conf, rddb.GetInterface())
var wg sync.WaitGroup