diff options
Diffstat (limited to 'session.go')
-rw-r--r-- | session.go | 46 |
1 files changed, 35 insertions, 11 deletions
@@ -53,7 +53,18 @@ type Session struct { cancelChan chan bool addProgressChan chan sessionAddProgressHandlerRequest addDoneChan chan sessionAddDoneHandlerRequest - // TODO: add pub/sub for progress and done + progressCBs []*SessionProgressCB + doneCBs []*SessionDoneCB +} + +type SessionProgressCB struct { + cb ImportProgressCB + userdata interface{} +} + +type SessionDoneCB struct { + cb func(ImportResult, interface{}) bool + userdata interface{} } type ProgressData struct { @@ -78,13 +89,14 @@ type sessionAddDoneHandlerResponse struct { type sessionAddDoneHandlerRequest struct { userdata interface{} - callback func(*ImportResult, interface{}) + callback func(ImportResult, interface{}) bool response chan<- sessionAddDoneHandlerResponse } -func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) { +func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) bool { out := userdata.(chan<- ProgressData) out <- ProgressData{step, step_name, progress} + return true } func session_import_run(ctx ImportContext, done chan<- ImportResult) { @@ -129,6 +141,7 @@ func (self *SessionChan) Cancel() { func (self *Session) addProgressHandler(userdata interface{}, cb ImportProgressCB) (resp sessionAddProgressHandlerResponse) { rhdl.Printf("Session: addProgressHandler called with: %+v", userdata) + self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata}) return } @@ -144,12 +157,13 @@ func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgr return res.err } -func (self *Session) addDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) (resp sessionAddDoneHandlerResponse) { +func (self *Session) addDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) (resp sessionAddDoneHandlerResponse) { rhdl.Printf("Session: addDoneHandler called with: %+v", userdata) + self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata}) return } -func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) error { +func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) error { res_ch := make(chan sessionAddDoneHandlerResponse) req := sessionAddDoneHandlerRequest{} req.userdata = userdata @@ -183,13 +197,23 @@ func (self *Session) dispatchRequests() { req.response <- self.addProgressHandler(req.userdata, req.callback) case req := <-self.addDoneChan: req.response <- self.addDoneHandler(req.userdata, req.callback) - case progress := <-self.progressIntChan: - rhdl.Printf("Session: got progress: %+v", progress) - // TODO: call all subscribed progress handler - case result := <-self.doneIntChan: + case p := <-self.progressIntChan: + for _, cb := range self.progressCBs { + if cb.cb != nil { + if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep { + cb.cb = nil + } + } + } + case r := <-self.doneIntChan: self.state = SESSION_DONE - rhdl.Printf("Session: import is done: %+v", result) - // TODO: call all subscribed done handler + for _, cb := range self.doneCBs { + if cb.cb != nil { + if keep := cb.cb(r, cb.userdata); !keep { + cb.cb = nil + } + } + } } } } |