diff options
-rw-r--r-- | fetcher.go | 2 | ||||
-rw-r--r-- | importer.go | 16 | ||||
-rw-r--r-- | session.go | 63 | ||||
-rw-r--r-- | session_store.go | 2 |
4 files changed, 53 insertions, 30 deletions
@@ -193,7 +193,7 @@ func FetchFileFake(ctx *ImportContext, res *FetchResult, uri *url.URL) error { time.Sleep(100 * time.Millisecond) } if ctx.ProgressCallBack != nil { - ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData) + ctx.ProgressCallBack(1, "faking", 1.0, ctx.ProgressCallBackData) } ctx.SourceFile = "/nonexistend/fake.mp3" ctx.DeleteSourceFile = false diff --git a/importer.go b/importer.go index 1bfdf0c..a245270 100644 --- a/importer.go +++ b/importer.go @@ -693,18 +693,20 @@ func ImportFile(ctx *ImportContext) (res *ImportResult, err error) { } if ctx.Cart != 0 && ctx.Cut != 0 { - if err = import_audio(ctx, res); err != nil { - return - } - if res.ResponseCode != http.StatusOK { - rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", res.Cart, res.Cut, res.ErrorString) + if err = import_audio(ctx, res); err != nil || res.ResponseCode != http.StatusOK { + if err != nil { + rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", ctx.Cart, ctx.Cut, err) + } else { + rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", res.Cart, res.Cut, res.ErrorString) + } + // Try to clean up after failed import rmres := ImportResult{ResponseCode: http.StatusOK} if rmCartOnErr { - if err = remove_cart(ctx, &rmres); err != nil { + if rerr := remove_cart(ctx, &rmres); rerr != nil { return } } else if rmCutOnErr { - if err = remove_cut(ctx, &rmres); err != nil { + if rerr := remove_cut(ctx, &rmres); rerr != nil { return } } @@ -25,7 +25,14 @@ package rhimport import ( - "time" + "net/http" +) + +const ( + SESSION_NEW = iota + SESSION_RUNNING + SESSION_CANCELED + SESSION_DONE ) type SessionChan struct { @@ -37,8 +44,9 @@ type SessionChan struct { type Session struct { ctx ImportContext - running bool + state int done chan bool + cancelIntChan chan bool progressIntChan chan ProgressData doneIntChan chan ImportResult runChan chan bool @@ -79,26 +87,35 @@ func session_progress_callback(step int, step_name string, progress float64, use out <- ProgressData{step, step_name, progress} } -// TODO: actually call import here func session_import_run(ctx ImportContext, done chan<- ImportResult) { - rhdl.Printf("faking import for: %+v", ctx) - for i := 0; i < 100; i++ { - if ctx.ProgressCallBack != nil { - ctx.ProgressCallBack(42, "faking", float64(i)/100.0, ctx.ProgressCallBackData) - } - time.Sleep(100 * time.Millisecond) + if err := ctx.SanityCheck(); err != nil { + done <- ImportResult{http.StatusBadRequest, err.Error(), 0, 0} + return } - if ctx.ProgressCallBack != nil { - ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData) + + if res, err := FetchFile(&ctx); err != nil { + done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0} + return + } else if res.ResponseCode != http.StatusOK { + done <- ImportResult{res.ResponseCode, res.ErrorString, 0, 0} + return + } + + if res, err := ImportFile(&ctx); err != nil { + done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0} + return + } else { + done <- *res + return } - done <- ImportResult{200, "OK", 0, 0} } func (self *Session) run() { self.ctx.ProgressCallBack = session_progress_callback self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan) + self.ctx.Cancel = self.cancelIntChan go session_import_run(self.ctx, self.doneIntChan) - self.running = true + self.state = SESSION_RUNNING return } @@ -149,13 +166,16 @@ func (self *Session) dispatchRequests() { for { select { case <-self.runChan: - if !self.running { + if self.state == SESSION_NEW { self.run() } case <-self.cancelChan: - if self.running { - rhdl.Println("Session: canceling running imports is not yet implemented") - // TODO: send cancel to import goroutine once this got implemented + if self.state == SESSION_RUNNING { + rhdl.Println("Session: canceling running import") + select { + case self.cancelIntChan <- true: + default: // session got canceled already + } } else { return } @@ -167,7 +187,7 @@ func (self *Session) dispatchRequests() { rhdl.Printf("Session: got progress: %+v", progress) // TODO: call all subscribed progress handler case result := <-self.doneIntChan: - self.running = false + self.state = SESSION_DONE rhdl.Printf("Session: import is done: %+v", result) // TODO: call all subscribed done handler } @@ -184,9 +204,8 @@ func (self *Session) getInterface() *SessionChan { } func (self *Session) Cleanup() { - // TODO: this blocks if dispatchRequests has ended already... - // or if cancel doesn't work... self.cancelChan <- true + rhdl.Printf("waiting for session to close") <-self.done close(self.done) close(self.progressIntChan) @@ -195,13 +214,15 @@ func (self *Session) Cleanup() { close(self.cancelChan) close(self.addProgressChan) close(self.addDoneChan) + rhdl.Printf("session is now cleaned up") } func NewSession(ctx *ImportContext) (session *Session) { session = new(Session) - session.running = false + session.state = SESSION_NEW session.ctx = *ctx session.done = make(chan bool) + session.cancelIntChan = make(chan bool, 1) session.progressIntChan = make(chan ProgressData) session.doneIntChan = make(chan ImportResult) session.runChan = make(chan bool) diff --git a/session_store.go b/session_store.go index b2e1538..8dd74c7 100644 --- a/session_store.go +++ b/session_store.go @@ -130,7 +130,7 @@ func (self *SessionStoreChan) Get(user, id string) (*SessionChan, error) { func (self *SessionStore) remove(user, id string) (resp removeSessionResponse) { if session, exists := self.store[user][id]; exists { - session.Cleanup() + go session.Cleanup() // cleanup could take a while -> don't block all the other stuff delete(self.store[user], id) rhdl.Printf("SessionStore: removed session '%s/%s'", user, id) if userstore, exists := self.store[user]; exists { |