summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2015-12-22 23:00:55 (GMT)
committerChristian Pointner <equinox@helsinki.at>2015-12-22 23:00:55 (GMT)
commit7444aeb57313fd28ad04182d50252a76e394c261 (patch)
treeb9bcaf38525a099b179959f230d2ce3975817ca5
parent979b0567a605826906d22ad5e0a1d79f9d4ab946 (diff)
sessions now call remove when done
-rw-r--r--src/helsinki.at/rhimport/session.go53
-rw-r--r--src/helsinki.at/rhimport/session_store.go8
2 files changed, 37 insertions, 24 deletions
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go
index 8843c81..bce1072 100644
--- a/src/helsinki.at/rhimport/session.go
+++ b/src/helsinki.at/rhimport/session.go
@@ -46,7 +46,9 @@ type SessionChan struct {
type Session struct {
ctx ImportContext
state int
+ removeFunc func()
done chan bool
+ quit chan bool
cancelIntChan chan bool
progressIntChan chan ProgressData
doneIntChan chan ImportResult
@@ -139,6 +141,15 @@ func (self *SessionChan) Run() {
}
}
+func (self *Session) cancel() {
+ rhdl.Println("Session: canceling running import")
+ select {
+ case self.cancelIntChan <- true:
+ default: // session got canceled already??
+ }
+ self.state = SESSION_CANCELED
+}
+
func (self *SessionChan) Cancel() {
select {
case self.cancelChan <- true:
@@ -178,6 +189,16 @@ func (self *Session) addDoneHandler(userdata interface{}, cb func(ImportResult,
return
}
+func (self *Session) callProgressHandler(p *ProgressData) {
+ for _, cb := range self.progressCBs {
+ if cb.cb != nil {
+ if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep {
+ cb.cb = nil
+ }
+ }
+ }
+}
+
func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) error {
res_ch := make(chan sessionAddDoneHandlerResponse)
req := sessionAddDoneHandlerRequest{}
@@ -194,16 +215,6 @@ func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(ImportResu
return res.err
}
-func (self *Session) callProgressHandler(p *ProgressData) {
- for _, cb := range self.progressCBs {
- if cb.cb != nil {
- if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep {
- cb.cb = nil
- }
- }
- }
-}
-
func (self *Session) callDoneHandler(r *ImportResult) {
for _, cb := range self.doneCBs {
if cb.cb != nil {
@@ -218,20 +229,18 @@ func (self *Session) dispatchRequests() {
defer func() { self.done <- true }()
for {
select {
+ case <-self.quit:
+ if self.state == SESSION_RUNNING {
+ self.cancel()
+ }
+ return
case <-self.runChan:
if self.state == SESSION_NEW {
self.run()
}
case <-self.cancelChan:
if self.state == SESSION_RUNNING {
- rhdl.Println("Session: canceling running import")
- select {
- case self.cancelIntChan <- true:
- default: // session got canceled already??
- }
- self.state = SESSION_CANCELED
- } else {
- return
+ self.cancel()
}
case req := <-self.addProgressChan:
req.response <- self.addProgressHandler(req.userdata, req.callback)
@@ -242,6 +251,9 @@ func (self *Session) dispatchRequests() {
case r := <-self.doneIntChan:
self.state = SESSION_DONE
self.callDoneHandler(&r)
+ if self.removeFunc != nil {
+ self.removeFunc()
+ }
}
}
}
@@ -256,7 +268,7 @@ func (self *Session) getInterface() *SessionChan {
}
func (self *Session) Cleanup() {
- self.cancelChan <- true
+ self.quit <- true
rhdl.Printf("waiting for session to close")
<-self.done
close(self.done)
@@ -271,9 +283,10 @@ func (self *Session) Cleanup() {
rhdl.Printf("session is now cleaned up")
}
-func NewSession(ctx *ImportContext) (session *Session) {
+func NewSession(ctx *ImportContext, removeFunc func()) (session *Session) {
session = new(Session)
session.state = SESSION_NEW
+ session.removeFunc = removeFunc
session.ctx = *ctx
session.done = make(chan bool)
session.cancelIntChan = make(chan bool, 1)
diff --git a/src/helsinki.at/rhimport/session_store.go b/src/helsinki.at/rhimport/session_store.go
index 8dd74c7..505bd75 100644
--- a/src/helsinki.at/rhimport/session_store.go
+++ b/src/helsinki.at/rhimport/session_store.go
@@ -84,7 +84,7 @@ func (self *SessionStore) new(ctx *ImportContext) (resp newSessionResponse) {
if _, exists := self.store[ctx.UserName]; !exists {
self.store[ctx.UserName] = make(map[string]*Session)
}
- self.store[ctx.UserName][resp.id] = NewSession(ctx)
+ self.store[ctx.UserName][resp.id] = NewSession(ctx, func() { self.GetInterface().Remove(ctx.UserName, resp.id) })
resp.session = self.store[ctx.UserName][resp.id].getInterface()
rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id)
return
@@ -197,9 +197,9 @@ func NewSessionStore(conf *Config) (store *SessionStore, err error) {
store.quit = make(chan bool)
store.done = make(chan bool)
store.store = make(map[string]map[string]*Session)
- store.newChan = make(chan newSessionRequest)
- store.getChan = make(chan getSessionRequest)
- store.removeChan = make(chan removeSessionRequest)
+ store.newChan = make(chan newSessionRequest, 10)
+ store.getChan = make(chan getSessionRequest, 10)
+ store.removeChan = make(chan removeSessionRequest, 10)
go store.dispatchRequests()
return