diff options
author | Christian Pointner <equinox@helsinki.at> | 2015-12-22 23:00:55 (GMT) |
---|---|---|
committer | Christian Pointner <equinox@helsinki.at> | 2015-12-22 23:00:55 (GMT) |
commit | 7444aeb57313fd28ad04182d50252a76e394c261 (patch) | |
tree | b9bcaf38525a099b179959f230d2ce3975817ca5 /src/helsinki.at/rhimport | |
parent | 979b0567a605826906d22ad5e0a1d79f9d4ab946 (diff) |
sessions now call remove when done
Diffstat (limited to 'src/helsinki.at/rhimport')
-rw-r--r-- | src/helsinki.at/rhimport/session.go | 53 | ||||
-rw-r--r-- | src/helsinki.at/rhimport/session_store.go | 8 |
2 files changed, 37 insertions, 24 deletions
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go index 8843c81..bce1072 100644 --- a/src/helsinki.at/rhimport/session.go +++ b/src/helsinki.at/rhimport/session.go @@ -46,7 +46,9 @@ type SessionChan struct { type Session struct { ctx ImportContext state int + removeFunc func() done chan bool + quit chan bool cancelIntChan chan bool progressIntChan chan ProgressData doneIntChan chan ImportResult @@ -139,6 +141,15 @@ func (self *SessionChan) Run() { } } +func (self *Session) cancel() { + rhdl.Println("Session: canceling running import") + select { + case self.cancelIntChan <- true: + default: // session got canceled already?? + } + self.state = SESSION_CANCELED +} + func (self *SessionChan) Cancel() { select { case self.cancelChan <- true: @@ -178,6 +189,16 @@ func (self *Session) addDoneHandler(userdata interface{}, cb func(ImportResult, return } +func (self *Session) callProgressHandler(p *ProgressData) { + for _, cb := range self.progressCBs { + if cb.cb != nil { + if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep { + cb.cb = nil + } + } + } +} + func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) error { res_ch := make(chan sessionAddDoneHandlerResponse) req := sessionAddDoneHandlerRequest{} @@ -194,16 +215,6 @@ func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(ImportResu return res.err } -func (self *Session) callProgressHandler(p *ProgressData) { - for _, cb := range self.progressCBs { - if cb.cb != nil { - if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep { - cb.cb = nil - } - } - } -} - func (self *Session) callDoneHandler(r *ImportResult) { for _, cb := range self.doneCBs { if cb.cb != nil { @@ -218,20 +229,18 @@ func (self *Session) dispatchRequests() { defer func() { self.done <- true }() for { select { + case <-self.quit: + if self.state == SESSION_RUNNING { + self.cancel() + } + return case <-self.runChan: if self.state == SESSION_NEW { self.run() } case <-self.cancelChan: if self.state == SESSION_RUNNING { - rhdl.Println("Session: canceling running import") - select { - case self.cancelIntChan <- true: - default: // session got canceled already?? - } - self.state = SESSION_CANCELED - } else { - return + self.cancel() } case req := <-self.addProgressChan: req.response <- self.addProgressHandler(req.userdata, req.callback) @@ -242,6 +251,9 @@ func (self *Session) dispatchRequests() { case r := <-self.doneIntChan: self.state = SESSION_DONE self.callDoneHandler(&r) + if self.removeFunc != nil { + self.removeFunc() + } } } } @@ -256,7 +268,7 @@ func (self *Session) getInterface() *SessionChan { } func (self *Session) Cleanup() { - self.cancelChan <- true + self.quit <- true rhdl.Printf("waiting for session to close") <-self.done close(self.done) @@ -271,9 +283,10 @@ func (self *Session) Cleanup() { rhdl.Printf("session is now cleaned up") } -func NewSession(ctx *ImportContext) (session *Session) { +func NewSession(ctx *ImportContext, removeFunc func()) (session *Session) { session = new(Session) session.state = SESSION_NEW + session.removeFunc = removeFunc session.ctx = *ctx session.done = make(chan bool) session.cancelIntChan = make(chan bool, 1) diff --git a/src/helsinki.at/rhimport/session_store.go b/src/helsinki.at/rhimport/session_store.go index 8dd74c7..505bd75 100644 --- a/src/helsinki.at/rhimport/session_store.go +++ b/src/helsinki.at/rhimport/session_store.go @@ -84,7 +84,7 @@ func (self *SessionStore) new(ctx *ImportContext) (resp newSessionResponse) { if _, exists := self.store[ctx.UserName]; !exists { self.store[ctx.UserName] = make(map[string]*Session) } - self.store[ctx.UserName][resp.id] = NewSession(ctx) + self.store[ctx.UserName][resp.id] = NewSession(ctx, func() { self.GetInterface().Remove(ctx.UserName, resp.id) }) resp.session = self.store[ctx.UserName][resp.id].getInterface() rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id) return @@ -197,9 +197,9 @@ func NewSessionStore(conf *Config) (store *SessionStore, err error) { store.quit = make(chan bool) store.done = make(chan bool) store.store = make(map[string]map[string]*Session) - store.newChan = make(chan newSessionRequest) - store.getChan = make(chan getSessionRequest) - store.removeChan = make(chan removeSessionRequest) + store.newChan = make(chan newSessionRequest, 10) + store.getChan = make(chan getSessionRequest, 10) + store.removeChan = make(chan removeSessionRequest, 10) go store.dispatchRequests() return |