summaryrefslogtreecommitdiff
path: root/src/helsinki.at/rhimport/session.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/helsinki.at/rhimport/session.go')
-rw-r--r--src/helsinki.at/rhimport/session.go46
1 files changed, 35 insertions, 11 deletions
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go
index 08a33ce..864aac6 100644
--- a/src/helsinki.at/rhimport/session.go
+++ b/src/helsinki.at/rhimport/session.go
@@ -53,7 +53,18 @@ type Session struct {
cancelChan chan bool
addProgressChan chan sessionAddProgressHandlerRequest
addDoneChan chan sessionAddDoneHandlerRequest
- // TODO: add pub/sub for progress and done
+ progressCBs []*SessionProgressCB
+ doneCBs []*SessionDoneCB
+}
+
+type SessionProgressCB struct {
+ cb ImportProgressCB
+ userdata interface{}
+}
+
+type SessionDoneCB struct {
+ cb func(ImportResult, interface{}) bool
+ userdata interface{}
}
type ProgressData struct {
@@ -78,13 +89,14 @@ type sessionAddDoneHandlerResponse struct {
type sessionAddDoneHandlerRequest struct {
userdata interface{}
- callback func(*ImportResult, interface{})
+ callback func(ImportResult, interface{}) bool
response chan<- sessionAddDoneHandlerResponse
}
-func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) {
+func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) bool {
out := userdata.(chan<- ProgressData)
out <- ProgressData{step, step_name, progress}
+ return true
}
func session_import_run(ctx ImportContext, done chan<- ImportResult) {
@@ -129,6 +141,7 @@ func (self *SessionChan) Cancel() {
func (self *Session) addProgressHandler(userdata interface{}, cb ImportProgressCB) (resp sessionAddProgressHandlerResponse) {
rhdl.Printf("Session: addProgressHandler called with: %+v", userdata)
+ self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata})
return
}
@@ -144,12 +157,13 @@ func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgr
return res.err
}
-func (self *Session) addDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) (resp sessionAddDoneHandlerResponse) {
+func (self *Session) addDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) (resp sessionAddDoneHandlerResponse) {
rhdl.Printf("Session: addDoneHandler called with: %+v", userdata)
+ self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata})
return
}
-func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) error {
+func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) error {
res_ch := make(chan sessionAddDoneHandlerResponse)
req := sessionAddDoneHandlerRequest{}
req.userdata = userdata
@@ -183,13 +197,23 @@ func (self *Session) dispatchRequests() {
req.response <- self.addProgressHandler(req.userdata, req.callback)
case req := <-self.addDoneChan:
req.response <- self.addDoneHandler(req.userdata, req.callback)
- case progress := <-self.progressIntChan:
- rhdl.Printf("Session: got progress: %+v", progress)
- // TODO: call all subscribed progress handler
- case result := <-self.doneIntChan:
+ case p := <-self.progressIntChan:
+ for _, cb := range self.progressCBs {
+ if cb.cb != nil {
+ if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep {
+ cb.cb = nil
+ }
+ }
+ }
+ case r := <-self.doneIntChan:
self.state = SESSION_DONE
- rhdl.Printf("Session: import is done: %+v", result)
- // TODO: call all subscribed done handler
+ for _, cb := range self.doneCBs {
+ if cb.cb != nil {
+ if keep := cb.cb(r, cb.userdata); !keep {
+ cb.cb = nil
+ }
+ }
+ }
}
}
}