diff options
-rw-r--r-- | session.go | 92 |
1 files changed, 61 insertions, 31 deletions
@@ -25,6 +25,7 @@ package rhimport import ( + "fmt" "net/http" ) @@ -132,15 +133,23 @@ func (self *Session) run() { } func (self *SessionChan) Run() { - self.runChan <- true + select { + case self.runChan <- true: + default: // command is already pending or session is about to be closed/removed + } } func (self *SessionChan) Cancel() { - self.cancelChan <- true + select { + case self.cancelChan <- true: + default: // cancel is already pending or session is about to be closed/removed + } } func (self *Session) addProgressHandler(userdata interface{}, cb ImportProgressCB) (resp sessionAddProgressHandlerResponse) { - rhdl.Printf("Session: addProgressHandler called with: %+v", userdata) + if self.state != SESSION_NEW && self.state != SESSION_RUNNING { + resp.err = fmt.Errorf("session is already done/canceled") + } self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata}) return } @@ -151,14 +160,20 @@ func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgr req.userdata = userdata req.callback = cb req.response = res_ch - self.addProgressChan <- req + select { + case self.addProgressChan <- req: + default: + return fmt.Errorf("session is about to be closed/removed") + } res := <-res_ch return res.err } func (self *Session) addDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) (resp sessionAddDoneHandlerResponse) { - rhdl.Printf("Session: addDoneHandler called with: %+v", userdata) + if self.state == SESSION_NEW && self.state != SESSION_RUNNING { + resp.err = fmt.Errorf("session is already done/canceled") + } self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata}) return } @@ -169,12 +184,36 @@ func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(ImportResu req.userdata = userdata req.callback = cb req.response = res_ch - self.addDoneChan <- req + select { + case self.addDoneChan <- req: + default: + return fmt.Errorf("session is about to be closed/removed") + } res := <-res_ch return res.err } +func (self *Session) callProgressHandler(p *ProgressData) { + for _, cb := range self.progressCBs { + if cb.cb != nil { + if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep { + cb.cb = nil + } + } + } +} + +func (self *Session) callDoneHandler(r *ImportResult) { + for _, cb := range self.doneCBs { + if cb.cb != nil { + if keep := cb.cb(*r, cb.userdata); !keep { + cb.cb = nil + } + } + } +} + func (self *Session) dispatchRequests() { defer func() { self.done <- true }() for { @@ -188,8 +227,9 @@ func (self *Session) dispatchRequests() { rhdl.Println("Session: canceling running import") select { case self.cancelIntChan <- true: - default: // session got canceled already + default: // session got canceled already?? } + self.state = SESSION_CANCELED } else { return } @@ -198,22 +238,10 @@ func (self *Session) dispatchRequests() { case req := <-self.addDoneChan: req.response <- self.addDoneHandler(req.userdata, req.callback) case p := <-self.progressIntChan: - for _, cb := range self.progressCBs { - if cb.cb != nil { - if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep { - cb.cb = nil - } - } - } + self.callProgressHandler(&p) case r := <-self.doneIntChan: self.state = SESSION_DONE - for _, cb := range self.doneCBs { - if cb.cb != nil { - if keep := cb.cb(r, cb.userdata); !keep { - cb.cb = nil - } - } - } + self.callDoneHandler(&r) } } } @@ -234,10 +262,12 @@ func (self *Session) Cleanup() { close(self.done) close(self.progressIntChan) close(self.doneIntChan) - close(self.runChan) - close(self.cancelChan) - close(self.addProgressChan) - close(self.addDoneChan) + // don't close the channels we give out because this might lead to a panic if + // somebody wites to an already removed session + // close(self.runChan) + // close(self.cancelChan) + // close(self.addProgressChan) + // close(self.addDoneChan) rhdl.Printf("session is now cleaned up") } @@ -247,12 +277,12 @@ func NewSession(ctx *ImportContext) (session *Session) { session.ctx = *ctx session.done = make(chan bool) session.cancelIntChan = make(chan bool, 1) - session.progressIntChan = make(chan ProgressData) - session.doneIntChan = make(chan ImportResult) - session.runChan = make(chan bool) - session.cancelChan = make(chan bool) - session.addProgressChan = make(chan sessionAddProgressHandlerRequest) - session.addDoneChan = make(chan sessionAddDoneHandlerRequest) + session.progressIntChan = make(chan ProgressData, 10) + session.doneIntChan = make(chan ImportResult, 1) + session.runChan = make(chan bool, 1) + session.cancelChan = make(chan bool, 1) + session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10) + session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10) go session.dispatchRequests() return } |