summaryrefslogtreecommitdiff
path: root/src/helsinki.at/rhimport
diff options
context:
space:
mode:
Diffstat (limited to 'src/helsinki.at/rhimport')
-rw-r--r--src/helsinki.at/rhimport/session.go92
1 files changed, 61 insertions, 31 deletions
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go
index 864aac6..8843c81 100644
--- a/src/helsinki.at/rhimport/session.go
+++ b/src/helsinki.at/rhimport/session.go
@@ -25,6 +25,7 @@
package rhimport
import (
+ "fmt"
"net/http"
)
@@ -132,15 +133,23 @@ func (self *Session) run() {
}
func (self *SessionChan) Run() {
- self.runChan <- true
+ select {
+ case self.runChan <- true:
+ default: // command is already pending or session is about to be closed/removed
+ }
}
func (self *SessionChan) Cancel() {
- self.cancelChan <- true
+ select {
+ case self.cancelChan <- true:
+ default: // cancel is already pending or session is about to be closed/removed
+ }
}
func (self *Session) addProgressHandler(userdata interface{}, cb ImportProgressCB) (resp sessionAddProgressHandlerResponse) {
- rhdl.Printf("Session: addProgressHandler called with: %+v", userdata)
+ if self.state != SESSION_NEW && self.state != SESSION_RUNNING {
+ resp.err = fmt.Errorf("session is already done/canceled")
+ }
self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata})
return
}
@@ -151,14 +160,20 @@ func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgr
req.userdata = userdata
req.callback = cb
req.response = res_ch
- self.addProgressChan <- req
+ select {
+ case self.addProgressChan <- req:
+ default:
+ return fmt.Errorf("session is about to be closed/removed")
+ }
res := <-res_ch
return res.err
}
func (self *Session) addDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) (resp sessionAddDoneHandlerResponse) {
- rhdl.Printf("Session: addDoneHandler called with: %+v", userdata)
+ if self.state == SESSION_NEW && self.state != SESSION_RUNNING {
+ resp.err = fmt.Errorf("session is already done/canceled")
+ }
self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata})
return
}
@@ -169,12 +184,36 @@ func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(ImportResu
req.userdata = userdata
req.callback = cb
req.response = res_ch
- self.addDoneChan <- req
+ select {
+ case self.addDoneChan <- req:
+ default:
+ return fmt.Errorf("session is about to be closed/removed")
+ }
res := <-res_ch
return res.err
}
+func (self *Session) callProgressHandler(p *ProgressData) {
+ for _, cb := range self.progressCBs {
+ if cb.cb != nil {
+ if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep {
+ cb.cb = nil
+ }
+ }
+ }
+}
+
+func (self *Session) callDoneHandler(r *ImportResult) {
+ for _, cb := range self.doneCBs {
+ if cb.cb != nil {
+ if keep := cb.cb(*r, cb.userdata); !keep {
+ cb.cb = nil
+ }
+ }
+ }
+}
+
func (self *Session) dispatchRequests() {
defer func() { self.done <- true }()
for {
@@ -188,8 +227,9 @@ func (self *Session) dispatchRequests() {
rhdl.Println("Session: canceling running import")
select {
case self.cancelIntChan <- true:
- default: // session got canceled already
+ default: // session got canceled already??
}
+ self.state = SESSION_CANCELED
} else {
return
}
@@ -198,22 +238,10 @@ func (self *Session) dispatchRequests() {
case req := <-self.addDoneChan:
req.response <- self.addDoneHandler(req.userdata, req.callback)
case p := <-self.progressIntChan:
- for _, cb := range self.progressCBs {
- if cb.cb != nil {
- if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep {
- cb.cb = nil
- }
- }
- }
+ self.callProgressHandler(&p)
case r := <-self.doneIntChan:
self.state = SESSION_DONE
- for _, cb := range self.doneCBs {
- if cb.cb != nil {
- if keep := cb.cb(r, cb.userdata); !keep {
- cb.cb = nil
- }
- }
- }
+ self.callDoneHandler(&r)
}
}
}
@@ -234,10 +262,12 @@ func (self *Session) Cleanup() {
close(self.done)
close(self.progressIntChan)
close(self.doneIntChan)
- close(self.runChan)
- close(self.cancelChan)
- close(self.addProgressChan)
- close(self.addDoneChan)
+ // don't close the channels we give out because this might lead to a panic if
+ // somebody wites to an already removed session
+ // close(self.runChan)
+ // close(self.cancelChan)
+ // close(self.addProgressChan)
+ // close(self.addDoneChan)
rhdl.Printf("session is now cleaned up")
}
@@ -247,12 +277,12 @@ func NewSession(ctx *ImportContext) (session *Session) {
session.ctx = *ctx
session.done = make(chan bool)
session.cancelIntChan = make(chan bool, 1)
- session.progressIntChan = make(chan ProgressData)
- session.doneIntChan = make(chan ImportResult)
- session.runChan = make(chan bool)
- session.cancelChan = make(chan bool)
- session.addProgressChan = make(chan sessionAddProgressHandlerRequest)
- session.addDoneChan = make(chan sessionAddDoneHandlerRequest)
+ session.progressIntChan = make(chan ProgressData, 10)
+ session.doneIntChan = make(chan ImportResult, 1)
+ session.runChan = make(chan bool, 1)
+ session.cancelChan = make(chan bool, 1)
+ session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10)
+ session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10)
go session.dispatchRequests()
return
}