From c0463be82447f269d9947d16a40881dbbe600827 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 22 Dec 2015 00:45:49 +0100 Subject: basic session handling works now diff --git a/src/helsinki.at/rhimport/fetcher.go b/src/helsinki.at/rhimport/fetcher.go index 1f0b364..36d7055 100644 --- a/src/helsinki.at/rhimport/fetcher.go +++ b/src/helsinki.at/rhimport/fetcher.go @@ -193,7 +193,7 @@ func FetchFileFake(ctx *ImportContext, res *FetchResult, uri *url.URL) error { time.Sleep(100 * time.Millisecond) } if ctx.ProgressCallBack != nil { - ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData) + ctx.ProgressCallBack(1, "faking", 1.0, ctx.ProgressCallBackData) } ctx.SourceFile = "/nonexistend/fake.mp3" ctx.DeleteSourceFile = false diff --git a/src/helsinki.at/rhimport/importer.go b/src/helsinki.at/rhimport/importer.go index 1bfdf0c..a245270 100644 --- a/src/helsinki.at/rhimport/importer.go +++ b/src/helsinki.at/rhimport/importer.go @@ -693,18 +693,20 @@ func ImportFile(ctx *ImportContext) (res *ImportResult, err error) { } if ctx.Cart != 0 && ctx.Cut != 0 { - if err = import_audio(ctx, res); err != nil { - return - } - if res.ResponseCode != http.StatusOK { - rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", res.Cart, res.Cut, res.ErrorString) + if err = import_audio(ctx, res); err != nil || res.ResponseCode != http.StatusOK { + if err != nil { + rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", ctx.Cart, ctx.Cut, err) + } else { + rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", res.Cart, res.Cut, res.ErrorString) + } + // Try to clean up after failed import rmres := ImportResult{ResponseCode: http.StatusOK} if rmCartOnErr { - if err = remove_cart(ctx, &rmres); err != nil { + if rerr := remove_cart(ctx, &rmres); rerr != nil { return } } else if rmCutOnErr { - if err = remove_cut(ctx, &rmres); err != nil { + if rerr := remove_cut(ctx, &rmres); rerr != nil { return } } diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go index 8f20958..08a33ce 100644 --- a/src/helsinki.at/rhimport/session.go +++ b/src/helsinki.at/rhimport/session.go @@ -25,7 +25,14 @@ package rhimport import ( - "time" + "net/http" +) + +const ( + SESSION_NEW = iota + SESSION_RUNNING + SESSION_CANCELED + SESSION_DONE ) type SessionChan struct { @@ -37,8 +44,9 @@ type SessionChan struct { type Session struct { ctx ImportContext - running bool + state int done chan bool + cancelIntChan chan bool progressIntChan chan ProgressData doneIntChan chan ImportResult runChan chan bool @@ -79,26 +87,35 @@ func session_progress_callback(step int, step_name string, progress float64, use out <- ProgressData{step, step_name, progress} } -// TODO: actually call import here func session_import_run(ctx ImportContext, done chan<- ImportResult) { - rhdl.Printf("faking import for: %+v", ctx) - for i := 0; i < 100; i++ { - if ctx.ProgressCallBack != nil { - ctx.ProgressCallBack(42, "faking", float64(i)/100.0, ctx.ProgressCallBackData) - } - time.Sleep(100 * time.Millisecond) + if err := ctx.SanityCheck(); err != nil { + done <- ImportResult{http.StatusBadRequest, err.Error(), 0, 0} + return } - if ctx.ProgressCallBack != nil { - ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData) + + if res, err := FetchFile(&ctx); err != nil { + done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0} + return + } else if res.ResponseCode != http.StatusOK { + done <- ImportResult{res.ResponseCode, res.ErrorString, 0, 0} + return + } + + if res, err := ImportFile(&ctx); err != nil { + done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0} + return + } else { + done <- *res + return } - done <- ImportResult{200, "OK", 0, 0} } func (self *Session) run() { self.ctx.ProgressCallBack = session_progress_callback self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan) + self.ctx.Cancel = self.cancelIntChan go session_import_run(self.ctx, self.doneIntChan) - self.running = true + self.state = SESSION_RUNNING return } @@ -149,13 +166,16 @@ func (self *Session) dispatchRequests() { for { select { case <-self.runChan: - if !self.running { + if self.state == SESSION_NEW { self.run() } case <-self.cancelChan: - if self.running { - rhdl.Println("Session: canceling running imports is not yet implemented") - // TODO: send cancel to import goroutine once this got implemented + if self.state == SESSION_RUNNING { + rhdl.Println("Session: canceling running import") + select { + case self.cancelIntChan <- true: + default: // session got canceled already + } } else { return } @@ -167,7 +187,7 @@ func (self *Session) dispatchRequests() { rhdl.Printf("Session: got progress: %+v", progress) // TODO: call all subscribed progress handler case result := <-self.doneIntChan: - self.running = false + self.state = SESSION_DONE rhdl.Printf("Session: import is done: %+v", result) // TODO: call all subscribed done handler } @@ -184,9 +204,8 @@ func (self *Session) getInterface() *SessionChan { } func (self *Session) Cleanup() { - // TODO: this blocks if dispatchRequests has ended already... - // or if cancel doesn't work... self.cancelChan <- true + rhdl.Printf("waiting for session to close") <-self.done close(self.done) close(self.progressIntChan) @@ -195,13 +214,15 @@ func (self *Session) Cleanup() { close(self.cancelChan) close(self.addProgressChan) close(self.addDoneChan) + rhdl.Printf("session is now cleaned up") } func NewSession(ctx *ImportContext) (session *Session) { session = new(Session) - session.running = false + session.state = SESSION_NEW session.ctx = *ctx session.done = make(chan bool) + session.cancelIntChan = make(chan bool, 1) session.progressIntChan = make(chan ProgressData) session.doneIntChan = make(chan ImportResult) session.runChan = make(chan bool) diff --git a/src/helsinki.at/rhimport/session_store.go b/src/helsinki.at/rhimport/session_store.go index b2e1538..8dd74c7 100644 --- a/src/helsinki.at/rhimport/session_store.go +++ b/src/helsinki.at/rhimport/session_store.go @@ -130,7 +130,7 @@ func (self *SessionStoreChan) Get(user, id string) (*SessionChan, error) { func (self *SessionStore) remove(user, id string) (resp removeSessionResponse) { if session, exists := self.store[user][id]; exists { - session.Cleanup() + go session.Cleanup() // cleanup could take a while -> don't block all the other stuff delete(self.store[user], id) rhdl.Printf("SessionStore: removed session '%s/%s'", user, id) if userstore, exists := self.store[user]; exists { diff --git a/src/helsinki.at/rhimportd/ctrlTelnet.go b/src/helsinki.at/rhimportd/ctrlTelnet.go index b47d2a1..687652c 100644 --- a/src/helsinki.at/rhimportd/ctrlTelnet.go +++ b/src/helsinki.at/rhimportd/ctrlTelnet.go @@ -198,7 +198,7 @@ func (c *TelnetClient) handle_cmd_set(args []string) { } if c.ctx == nil { c.ctx = rhimport.NewImportContext(c.conf, c.rddb, "") - c.ctx.Trusted = true + c.ctx.Trusted = false } switch strings.ToLower(args[0]) { case "username": diff --git a/src/helsinki.at/rhimportd/ctrlWebSimple.go b/src/helsinki.at/rhimportd/ctrlWebSimple.go index 8ea5ac3..f0e9f29 100644 --- a/src/helsinki.at/rhimportd/ctrlWebSimple.go +++ b/src/helsinki.at/rhimportd/ctrlWebSimple.go @@ -143,6 +143,7 @@ func webSimpleHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, trusted bo } if fres.ResponseCode != http.StatusOK { webSimpleErrorResponse(w, fres.ResponseCode, fres.ErrorString) + return } var ires *rhimport.ImportResult diff --git a/src/helsinki.at/rhimportd/main.go b/src/helsinki.at/rhimportd/main.go index 4b6ddf4..b90df0a 100644 --- a/src/helsinki.at/rhimportd/main.go +++ b/src/helsinki.at/rhimportd/main.go @@ -26,7 +26,6 @@ package main import ( "flag" - "fmt" "log" "os" "os/signal" @@ -53,31 +52,29 @@ func session_test(conf *rhimport.Config, rddb *rhimport.RdDbChan) { store := sessions.GetInterface() - ctx := rhimport.NewImportContext(conf, rddb, "hugo") + ctx := rhimport.NewImportContext(conf, rddb, "heslinki") + ctx.Trusted = true + ctx.ShowId = 10002 + ctx.SourceUri = "fake://10" + id, session, err := store.New(ctx) if err != nil { - rhl.Printf("Error SessionStore.New(): %s", err) + rhl.Printf("MAIN: Error SessionStore.New(): %s", err) return } - fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions) - + rhl.Printf("MAIN: calling run for heslinki/%s", id) session.Run() - fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions) - + rhl.Printf("MAIN: waiting for 2 secondes") time.Sleep(5 * time.Second) - store.Remove("hugo", id) - fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions) - - time.Sleep(6 * time.Second) - - fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions) - - store.Remove("hugo", id) + rhl.Printf("MAIN: calling remove for heslinki/%s", id) + if err = store.Remove("heslinki", id); err != nil { + rhl.Printf("MAIN: Error SessionStore.Remove(): %s", err) + } - fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions) + rhl.Printf("MAIN: remove done") } func main() { @@ -108,7 +105,7 @@ func main() { } defer rddb.Cleanup() - // go session_test(conf, rddb.GetInterface()) + go session_test(conf, rddb.GetInterface()) var wg sync.WaitGroup -- cgit v0.10.2