summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2015-12-22 03:18:46 (GMT)
committerChristian Pointner <equinox@helsinki.at>2015-12-22 03:18:46 (GMT)
commite40235206363f09a43723fd7af10ef66bcfa08a3 (patch)
treee00bd8861c3e460eaf64f98ade15d03006189c61
parentc0463be82447f269d9947d16a40881dbbe600827 (diff)
session based callbacks work now
-rw-r--r--src/helsinki.at/rhimport/fetcher.go16
-rw-r--r--src/helsinki.at/rhimport/importer.go10
-rw-r--r--src/helsinki.at/rhimport/session.go46
-rw-r--r--src/helsinki.at/rhimportd/ctrlTelnet.go3
-rw-r--r--src/helsinki.at/rhimportd/main.go57
5 files changed, 110 insertions, 22 deletions
diff --git a/src/helsinki.at/rhimport/fetcher.go b/src/helsinki.at/rhimport/fetcher.go
index 36d7055..acb6592 100644
--- a/src/helsinki.at/rhimport/fetcher.go
+++ b/src/helsinki.at/rhimport/fetcher.go
@@ -123,7 +123,9 @@ func FetchFileCurl(ctx *ImportContext, res *FetchResult, uri *url.URL) (err erro
}
if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(1, "downloading", dlnow/dltotal, ctx.ProgressCallBackData)
+ if keep := ctx.ProgressCallBack(1, "downloading", dlnow/dltotal, ctx.ProgressCallBackData); !keep {
+ ctx.ProgressCallBack = nil
+ }
}
return true
})
@@ -156,7 +158,9 @@ func FetchFileCurl(ctx *ImportContext, res *FetchResult, uri *url.URL) (err erro
func FetchFileLocal(ctx *ImportContext, res *FetchResult, uri *url.URL) (err error) {
rhl.Printf("Local fetcher called for '%s'", ctx.SourceUri)
if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(1, "fetching", 1.0, ctx.ProgressCallBackData)
+ if keep := ctx.ProgressCallBack(1, "fetching", 1.0, ctx.ProgressCallBackData); !keep {
+ ctx.ProgressCallBack = nil
+ }
}
ctx.SourceFile = filepath.Join(ctx.conf.LocalFetchDir, path.Clean("/"+uri.Path))
@@ -188,12 +192,16 @@ func FetchFileFake(ctx *ImportContext, res *FetchResult, uri *url.URL) error {
return nil
}
if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(1, "faking", float64(i)/float64(duration), ctx.ProgressCallBackData)
+ if keep := ctx.ProgressCallBack(1, "faking", float64(i)/float64(duration), ctx.ProgressCallBackData); !keep {
+ ctx.ProgressCallBack = nil
+ }
}
time.Sleep(100 * time.Millisecond)
}
if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(1, "faking", 1.0, ctx.ProgressCallBackData)
+ if keep := ctx.ProgressCallBack(1, "faking", 1.0, ctx.ProgressCallBackData); !keep {
+ ctx.ProgressCallBack = nil
+ }
}
ctx.SourceFile = "/nonexistend/fake.mp3"
ctx.DeleteSourceFile = false
diff --git a/src/helsinki.at/rhimport/importer.go b/src/helsinki.at/rhimport/importer.go
index a245270..575adc9 100644
--- a/src/helsinki.at/rhimport/importer.go
+++ b/src/helsinki.at/rhimport/importer.go
@@ -44,7 +44,7 @@ var (
bool2str = map[bool]string{false: "0", true: "1"}
)
-type ImportProgressCB func(step int, step_name string, progress float64, userdata interface{})
+type ImportProgressCB func(step int, step_name string, progress float64, userdata interface{}) bool
type ImportContext struct {
conf *Config
@@ -508,7 +508,9 @@ func import_audio(ctx *ImportContext, res *ImportResult) (err error) {
}
if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(2, "importing", ulnow/ultotal, ctx.ProgressCallBackData)
+ if keep := ctx.ProgressCallBack(2, "importing", ulnow/ultotal, ctx.ProgressCallBackData); !keep {
+ ctx.ProgressCallBack = nil
+ }
}
return true
})
@@ -630,7 +632,9 @@ func ImportFile(ctx *ImportContext) (res *ImportResult, err error) {
rhl.Printf("importer: ImportFile called with: show-id: %d, pool-name: '%s', cart/cut: %d/%d", ctx.ShowId, ctx.GroupName, ctx.Cart, ctx.Cut)
if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(2, "importing", 0.0, ctx.ProgressCallBackData)
+ if keep := ctx.ProgressCallBack(2, "importing", 0.0, ctx.ProgressCallBackData); !keep {
+ ctx.ProgressCallBack = nil
+ }
}
if ctx.Trusted {
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go
index 08a33ce..864aac6 100644
--- a/src/helsinki.at/rhimport/session.go
+++ b/src/helsinki.at/rhimport/session.go
@@ -53,7 +53,18 @@ type Session struct {
cancelChan chan bool
addProgressChan chan sessionAddProgressHandlerRequest
addDoneChan chan sessionAddDoneHandlerRequest
- // TODO: add pub/sub for progress and done
+ progressCBs []*SessionProgressCB
+ doneCBs []*SessionDoneCB
+}
+
+type SessionProgressCB struct {
+ cb ImportProgressCB
+ userdata interface{}
+}
+
+type SessionDoneCB struct {
+ cb func(ImportResult, interface{}) bool
+ userdata interface{}
}
type ProgressData struct {
@@ -78,13 +89,14 @@ type sessionAddDoneHandlerResponse struct {
type sessionAddDoneHandlerRequest struct {
userdata interface{}
- callback func(*ImportResult, interface{})
+ callback func(ImportResult, interface{}) bool
response chan<- sessionAddDoneHandlerResponse
}
-func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) {
+func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) bool {
out := userdata.(chan<- ProgressData)
out <- ProgressData{step, step_name, progress}
+ return true
}
func session_import_run(ctx ImportContext, done chan<- ImportResult) {
@@ -129,6 +141,7 @@ func (self *SessionChan) Cancel() {
func (self *Session) addProgressHandler(userdata interface{}, cb ImportProgressCB) (resp sessionAddProgressHandlerResponse) {
rhdl.Printf("Session: addProgressHandler called with: %+v", userdata)
+ self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata})
return
}
@@ -144,12 +157,13 @@ func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgr
return res.err
}
-func (self *Session) addDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) (resp sessionAddDoneHandlerResponse) {
+func (self *Session) addDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) (resp sessionAddDoneHandlerResponse) {
rhdl.Printf("Session: addDoneHandler called with: %+v", userdata)
+ self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata})
return
}
-func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) error {
+func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) error {
res_ch := make(chan sessionAddDoneHandlerResponse)
req := sessionAddDoneHandlerRequest{}
req.userdata = userdata
@@ -183,13 +197,23 @@ func (self *Session) dispatchRequests() {
req.response <- self.addProgressHandler(req.userdata, req.callback)
case req := <-self.addDoneChan:
req.response <- self.addDoneHandler(req.userdata, req.callback)
- case progress := <-self.progressIntChan:
- rhdl.Printf("Session: got progress: %+v", progress)
- // TODO: call all subscribed progress handler
- case result := <-self.doneIntChan:
+ case p := <-self.progressIntChan:
+ for _, cb := range self.progressCBs {
+ if cb.cb != nil {
+ if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep {
+ cb.cb = nil
+ }
+ }
+ }
+ case r := <-self.doneIntChan:
self.state = SESSION_DONE
- rhdl.Printf("Session: import is done: %+v", result)
- // TODO: call all subscribed done handler
+ for _, cb := range self.doneCBs {
+ if cb.cb != nil {
+ if keep := cb.cb(r, cb.userdata); !keep {
+ cb.cb = nil
+ }
+ }
+ }
}
}
}
diff --git a/src/helsinki.at/rhimportd/ctrlTelnet.go b/src/helsinki.at/rhimportd/ctrlTelnet.go
index 687652c..74e396f 100644
--- a/src/helsinki.at/rhimportd/ctrlTelnet.go
+++ b/src/helsinki.at/rhimportd/ctrlTelnet.go
@@ -264,9 +264,10 @@ func (c *TelnetClient) handle_cmd_show(args []string) {
}
}
-func telnet_progress_callback(step int, step_name string, progress float64, userdata interface{}) {
+func telnet_progress_callback(step int, step_name string, progress float64, userdata interface{}) bool {
out := userdata.(chan<- string)
out <- fmt.Sprintf("%s: %3.2f%%\r", step_name, progress*100)
+ return true
}
func telnet_cmd_run(ctx rhimport.ImportContext, out chan<- string) {
diff --git a/src/helsinki.at/rhimportd/main.go b/src/helsinki.at/rhimportd/main.go
index b90df0a..aeb7194 100644
--- a/src/helsinki.at/rhimportd/main.go
+++ b/src/helsinki.at/rhimportd/main.go
@@ -26,6 +26,7 @@ package main
import (
"flag"
+ "fmt"
"log"
"os"
"os/signal"
@@ -42,6 +43,33 @@ var (
//rhdl = log.New(ioutil.Discard, "[rhimportd-dbg]\t", log.LstdFlags)
)
+func session_test_progress1(step int, step_name string, progress float64, userdata interface{}) bool {
+ out := userdata.(chan<- string)
+ select {
+ case out <- fmt.Sprintf("CB1 %d, %s: %3.2f%%", step, step_name, progress*100):
+ default:
+ }
+ if step > 1 {
+ return false
+ }
+ return true
+}
+
+func session_test_progress2(step int, step_name string, progress float64, userdata interface{}) bool {
+ out := userdata.(chan<- string)
+ select {
+ case out <- fmt.Sprintf("CB2 %d, %s: %3.2f%%", step, step_name, progress*100):
+ default:
+ }
+ return true
+}
+
+func session_test_done(res rhimport.ImportResult, userdata interface{}) bool {
+ done := userdata.(chan<- rhimport.ImportResult)
+ done <- res
+ return true
+}
+
func session_test(conf *rhimport.Config, rddb *rhimport.RdDbChan) {
sessions, err := rhimport.NewSessionStore(conf)
if err != nil {
@@ -55,7 +83,8 @@ func session_test(conf *rhimport.Config, rddb *rhimport.RdDbChan) {
ctx := rhimport.NewImportContext(conf, rddb, "heslinki")
ctx.Trusted = true
ctx.ShowId = 10002
- ctx.SourceUri = "fake://10"
+ ctx.ClearShowCarts = true
+ ctx.SourceUri = "http://www.tonycuffe.com/mp3/tail%20toddle.mp3"
id, session, err := store.New(ctx)
if err != nil {
@@ -63,11 +92,33 @@ func session_test(conf *rhimport.Config, rddb *rhimport.RdDbChan) {
return
}
+ pch := make(chan string, 1000)
+ if err = session.AddProgressHandler((chan<- string)(pch), session_test_progress1); err != nil {
+ rhl.Printf("MAIN: Error Session.AddProgressHandler(): %s", err)
+ }
+ dch := make(chan rhimport.ImportResult)
+ if err = session.AddDoneHandler((chan<- rhimport.ImportResult)(dch), session_test_done); err != nil {
+ rhl.Printf("MAIN: Error Session.AddDoneHandler(): %s", err)
+ }
+
rhl.Printf("MAIN: calling run for heslinki/%s", id)
session.Run()
- rhl.Printf("MAIN: waiting for 2 secondes")
- time.Sleep(5 * time.Second)
+ time.Sleep(500 * time.Millisecond)
+
+ if err = session.AddProgressHandler((chan<- string)(pch), session_test_progress2); err != nil {
+ rhl.Printf("MAIN: Error Session.AddProgressHandler(): %s", err)
+ }
+
+ for {
+ select {
+ case p := <-pch:
+ fmt.Println(p)
+ case r := <-dch:
+ fmt.Printf("Import finished: %+v\n", r)
+ break
+ }
+ }
rhl.Printf("MAIN: calling remove for heslinki/%s", id)
if err = store.Remove("heslinki", id); err != nil {