From e40235206363f09a43723fd7af10ef66bcfa08a3 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 22 Dec 2015 04:18:46 +0100 Subject: session based callbacks work now diff --git a/src/helsinki.at/rhimport/fetcher.go b/src/helsinki.at/rhimport/fetcher.go index 36d7055..acb6592 100644 --- a/src/helsinki.at/rhimport/fetcher.go +++ b/src/helsinki.at/rhimport/fetcher.go @@ -123,7 +123,9 @@ func FetchFileCurl(ctx *ImportContext, res *FetchResult, uri *url.URL) (err erro } if ctx.ProgressCallBack != nil { - ctx.ProgressCallBack(1, "downloading", dlnow/dltotal, ctx.ProgressCallBackData) + if keep := ctx.ProgressCallBack(1, "downloading", dlnow/dltotal, ctx.ProgressCallBackData); !keep { + ctx.ProgressCallBack = nil + } } return true }) @@ -156,7 +158,9 @@ func FetchFileCurl(ctx *ImportContext, res *FetchResult, uri *url.URL) (err erro func FetchFileLocal(ctx *ImportContext, res *FetchResult, uri *url.URL) (err error) { rhl.Printf("Local fetcher called for '%s'", ctx.SourceUri) if ctx.ProgressCallBack != nil { - ctx.ProgressCallBack(1, "fetching", 1.0, ctx.ProgressCallBackData) + if keep := ctx.ProgressCallBack(1, "fetching", 1.0, ctx.ProgressCallBackData); !keep { + ctx.ProgressCallBack = nil + } } ctx.SourceFile = filepath.Join(ctx.conf.LocalFetchDir, path.Clean("/"+uri.Path)) @@ -188,12 +192,16 @@ func FetchFileFake(ctx *ImportContext, res *FetchResult, uri *url.URL) error { return nil } if ctx.ProgressCallBack != nil { - ctx.ProgressCallBack(1, "faking", float64(i)/float64(duration), ctx.ProgressCallBackData) + if keep := ctx.ProgressCallBack(1, "faking", float64(i)/float64(duration), ctx.ProgressCallBackData); !keep { + ctx.ProgressCallBack = nil + } } time.Sleep(100 * time.Millisecond) } if ctx.ProgressCallBack != nil { - ctx.ProgressCallBack(1, "faking", 1.0, ctx.ProgressCallBackData) + if keep := ctx.ProgressCallBack(1, "faking", 1.0, ctx.ProgressCallBackData); !keep { + ctx.ProgressCallBack = nil + } } ctx.SourceFile = "/nonexistend/fake.mp3" ctx.DeleteSourceFile = false diff --git a/src/helsinki.at/rhimport/importer.go b/src/helsinki.at/rhimport/importer.go index a245270..575adc9 100644 --- a/src/helsinki.at/rhimport/importer.go +++ b/src/helsinki.at/rhimport/importer.go @@ -44,7 +44,7 @@ var ( bool2str = map[bool]string{false: "0", true: "1"} ) -type ImportProgressCB func(step int, step_name string, progress float64, userdata interface{}) +type ImportProgressCB func(step int, step_name string, progress float64, userdata interface{}) bool type ImportContext struct { conf *Config @@ -508,7 +508,9 @@ func import_audio(ctx *ImportContext, res *ImportResult) (err error) { } if ctx.ProgressCallBack != nil { - ctx.ProgressCallBack(2, "importing", ulnow/ultotal, ctx.ProgressCallBackData) + if keep := ctx.ProgressCallBack(2, "importing", ulnow/ultotal, ctx.ProgressCallBackData); !keep { + ctx.ProgressCallBack = nil + } } return true }) @@ -630,7 +632,9 @@ func ImportFile(ctx *ImportContext) (res *ImportResult, err error) { rhl.Printf("importer: ImportFile called with: show-id: %d, pool-name: '%s', cart/cut: %d/%d", ctx.ShowId, ctx.GroupName, ctx.Cart, ctx.Cut) if ctx.ProgressCallBack != nil { - ctx.ProgressCallBack(2, "importing", 0.0, ctx.ProgressCallBackData) + if keep := ctx.ProgressCallBack(2, "importing", 0.0, ctx.ProgressCallBackData); !keep { + ctx.ProgressCallBack = nil + } } if ctx.Trusted { diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go index 08a33ce..864aac6 100644 --- a/src/helsinki.at/rhimport/session.go +++ b/src/helsinki.at/rhimport/session.go @@ -53,7 +53,18 @@ type Session struct { cancelChan chan bool addProgressChan chan sessionAddProgressHandlerRequest addDoneChan chan sessionAddDoneHandlerRequest - // TODO: add pub/sub for progress and done + progressCBs []*SessionProgressCB + doneCBs []*SessionDoneCB +} + +type SessionProgressCB struct { + cb ImportProgressCB + userdata interface{} +} + +type SessionDoneCB struct { + cb func(ImportResult, interface{}) bool + userdata interface{} } type ProgressData struct { @@ -78,13 +89,14 @@ type sessionAddDoneHandlerResponse struct { type sessionAddDoneHandlerRequest struct { userdata interface{} - callback func(*ImportResult, interface{}) + callback func(ImportResult, interface{}) bool response chan<- sessionAddDoneHandlerResponse } -func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) { +func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) bool { out := userdata.(chan<- ProgressData) out <- ProgressData{step, step_name, progress} + return true } func session_import_run(ctx ImportContext, done chan<- ImportResult) { @@ -129,6 +141,7 @@ func (self *SessionChan) Cancel() { func (self *Session) addProgressHandler(userdata interface{}, cb ImportProgressCB) (resp sessionAddProgressHandlerResponse) { rhdl.Printf("Session: addProgressHandler called with: %+v", userdata) + self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata}) return } @@ -144,12 +157,13 @@ func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgr return res.err } -func (self *Session) addDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) (resp sessionAddDoneHandlerResponse) { +func (self *Session) addDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) (resp sessionAddDoneHandlerResponse) { rhdl.Printf("Session: addDoneHandler called with: %+v", userdata) + self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata}) return } -func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) error { +func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) error { res_ch := make(chan sessionAddDoneHandlerResponse) req := sessionAddDoneHandlerRequest{} req.userdata = userdata @@ -183,13 +197,23 @@ func (self *Session) dispatchRequests() { req.response <- self.addProgressHandler(req.userdata, req.callback) case req := <-self.addDoneChan: req.response <- self.addDoneHandler(req.userdata, req.callback) - case progress := <-self.progressIntChan: - rhdl.Printf("Session: got progress: %+v", progress) - // TODO: call all subscribed progress handler - case result := <-self.doneIntChan: + case p := <-self.progressIntChan: + for _, cb := range self.progressCBs { + if cb.cb != nil { + if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep { + cb.cb = nil + } + } + } + case r := <-self.doneIntChan: self.state = SESSION_DONE - rhdl.Printf("Session: import is done: %+v", result) - // TODO: call all subscribed done handler + for _, cb := range self.doneCBs { + if cb.cb != nil { + if keep := cb.cb(r, cb.userdata); !keep { + cb.cb = nil + } + } + } } } } diff --git a/src/helsinki.at/rhimportd/ctrlTelnet.go b/src/helsinki.at/rhimportd/ctrlTelnet.go index 687652c..74e396f 100644 --- a/src/helsinki.at/rhimportd/ctrlTelnet.go +++ b/src/helsinki.at/rhimportd/ctrlTelnet.go @@ -264,9 +264,10 @@ func (c *TelnetClient) handle_cmd_show(args []string) { } } -func telnet_progress_callback(step int, step_name string, progress float64, userdata interface{}) { +func telnet_progress_callback(step int, step_name string, progress float64, userdata interface{}) bool { out := userdata.(chan<- string) out <- fmt.Sprintf("%s: %3.2f%%\r", step_name, progress*100) + return true } func telnet_cmd_run(ctx rhimport.ImportContext, out chan<- string) { diff --git a/src/helsinki.at/rhimportd/main.go b/src/helsinki.at/rhimportd/main.go index b90df0a..aeb7194 100644 --- a/src/helsinki.at/rhimportd/main.go +++ b/src/helsinki.at/rhimportd/main.go @@ -26,6 +26,7 @@ package main import ( "flag" + "fmt" "log" "os" "os/signal" @@ -42,6 +43,33 @@ var ( //rhdl = log.New(ioutil.Discard, "[rhimportd-dbg]\t", log.LstdFlags) ) +func session_test_progress1(step int, step_name string, progress float64, userdata interface{}) bool { + out := userdata.(chan<- string) + select { + case out <- fmt.Sprintf("CB1 %d, %s: %3.2f%%", step, step_name, progress*100): + default: + } + if step > 1 { + return false + } + return true +} + +func session_test_progress2(step int, step_name string, progress float64, userdata interface{}) bool { + out := userdata.(chan<- string) + select { + case out <- fmt.Sprintf("CB2 %d, %s: %3.2f%%", step, step_name, progress*100): + default: + } + return true +} + +func session_test_done(res rhimport.ImportResult, userdata interface{}) bool { + done := userdata.(chan<- rhimport.ImportResult) + done <- res + return true +} + func session_test(conf *rhimport.Config, rddb *rhimport.RdDbChan) { sessions, err := rhimport.NewSessionStore(conf) if err != nil { @@ -55,7 +83,8 @@ func session_test(conf *rhimport.Config, rddb *rhimport.RdDbChan) { ctx := rhimport.NewImportContext(conf, rddb, "heslinki") ctx.Trusted = true ctx.ShowId = 10002 - ctx.SourceUri = "fake://10" + ctx.ClearShowCarts = true + ctx.SourceUri = "http://www.tonycuffe.com/mp3/tail%20toddle.mp3" id, session, err := store.New(ctx) if err != nil { @@ -63,11 +92,33 @@ func session_test(conf *rhimport.Config, rddb *rhimport.RdDbChan) { return } + pch := make(chan string, 1000) + if err = session.AddProgressHandler((chan<- string)(pch), session_test_progress1); err != nil { + rhl.Printf("MAIN: Error Session.AddProgressHandler(): %s", err) + } + dch := make(chan rhimport.ImportResult) + if err = session.AddDoneHandler((chan<- rhimport.ImportResult)(dch), session_test_done); err != nil { + rhl.Printf("MAIN: Error Session.AddDoneHandler(): %s", err) + } + rhl.Printf("MAIN: calling run for heslinki/%s", id) session.Run() - rhl.Printf("MAIN: waiting for 2 secondes") - time.Sleep(5 * time.Second) + time.Sleep(500 * time.Millisecond) + + if err = session.AddProgressHandler((chan<- string)(pch), session_test_progress2); err != nil { + rhl.Printf("MAIN: Error Session.AddProgressHandler(): %s", err) + } + + for { + select { + case p := <-pch: + fmt.Println(p) + case r := <-dch: + fmt.Printf("Import finished: %+v\n", r) + break + } + } rhl.Printf("MAIN: calling remove for heslinki/%s", id) if err = store.Remove("heslinki", id); err != nil { -- cgit v0.10.2