summaryrefslogtreecommitdiff
path: root/src/helsinki.at/rhimport/session.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/helsinki.at/rhimport/session.go')
-rw-r--r--src/helsinki.at/rhimport/session.go51
1 files changed, 36 insertions, 15 deletions
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go
index bce1072..88648f7 100644
--- a/src/helsinki.at/rhimport/session.go
+++ b/src/helsinki.at/rhimport/session.go
@@ -27,6 +27,7 @@ package rhimport
import (
"fmt"
"net/http"
+ "time"
)
const (
@@ -34,10 +35,11 @@ const (
SESSION_RUNNING
SESSION_CANCELED
SESSION_DONE
+ SESSION_TIMEOUT
)
type SessionChan struct {
- runChan chan<- bool
+ runChan chan<- time.Duration
cancelChan chan<- bool
addProgressChan chan<- sessionAddProgressHandlerRequest
addDoneChan chan<- sessionAddDoneHandlerRequest
@@ -49,10 +51,11 @@ type Session struct {
removeFunc func()
done chan bool
quit chan bool
+ timer *time.Timer
cancelIntChan chan bool
progressIntChan chan ProgressData
doneIntChan chan ImportResult
- runChan chan bool
+ runChan chan time.Duration
cancelChan chan bool
addProgressChan chan sessionAddProgressHandlerRequest
addDoneChan chan sessionAddDoneHandlerRequest
@@ -125,18 +128,19 @@ func session_import_run(ctx ImportContext, done chan<- ImportResult) {
}
}
-func (self *Session) run() {
+func (self *Session) run(timeout time.Duration) {
self.ctx.ProgressCallBack = session_progress_callback
self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan)
self.ctx.Cancel = self.cancelIntChan
go session_import_run(self.ctx, self.doneIntChan)
self.state = SESSION_RUNNING
+ self.timer.Reset(timeout)
return
}
-func (self *SessionChan) Run() {
+func (self *SessionChan) Run(timeout time.Duration) {
select {
- case self.runChan <- true:
+ case self.runChan <- timeout:
default: // command is already pending or session is about to be closed/removed
}
}
@@ -182,7 +186,7 @@ func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgr
}
func (self *Session) addDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) (resp sessionAddDoneHandlerResponse) {
- if self.state == SESSION_NEW && self.state != SESSION_RUNNING {
+ if self.state != SESSION_NEW && self.state != SESSION_RUNNING {
resp.err = fmt.Errorf("session is already done/canceled")
}
self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata})
@@ -234,9 +238,19 @@ func (self *Session) dispatchRequests() {
self.cancel()
}
return
- case <-self.runChan:
+ case <-self.timer.C:
+ if self.state == SESSION_RUNNING {
+ self.cancel()
+ }
+ self.state = SESSION_TIMEOUT
+ r := &ImportResult{500, "timeout", 0, 0}
+ self.callDoneHandler(r)
+ if self.removeFunc != nil {
+ self.removeFunc()
+ }
+ case t := <-self.runChan:
if self.state == SESSION_NEW {
- self.run()
+ self.run(t)
}
case <-self.cancelChan:
if self.state == SESSION_RUNNING {
@@ -249,10 +263,13 @@ func (self *Session) dispatchRequests() {
case p := <-self.progressIntChan:
self.callProgressHandler(&p)
case r := <-self.doneIntChan:
- self.state = SESSION_DONE
- self.callDoneHandler(&r)
- if self.removeFunc != nil {
- self.removeFunc()
+ if self.state != SESSION_TIMEOUT {
+ self.timer.Stop()
+ self.state = SESSION_DONE
+ self.callDoneHandler(&r)
+ if self.removeFunc != nil {
+ self.removeFunc()
+ }
}
}
}
@@ -271,11 +288,14 @@ func (self *Session) Cleanup() {
self.quit <- true
rhdl.Printf("waiting for session to close")
<-self.done
+ close(self.quit)
close(self.done)
- close(self.progressIntChan)
- close(self.doneIntChan)
+ self.timer.Stop()
// don't close the channels we give out because this might lead to a panic if
// somebody wites to an already removed session
+ // close(self.cancelIntChan)
+ // close(self.progressIntChan)
+ // close(self.doneIntChan)
// close(self.runChan)
// close(self.cancelChan)
// close(self.addProgressChan)
@@ -289,10 +309,11 @@ func NewSession(ctx *ImportContext, removeFunc func()) (session *Session) {
session.removeFunc = removeFunc
session.ctx = *ctx
session.done = make(chan bool)
+ session.timer = time.NewTimer(10 * time.Second)
session.cancelIntChan = make(chan bool, 1)
session.progressIntChan = make(chan ProgressData, 10)
session.doneIntChan = make(chan ImportResult, 1)
- session.runChan = make(chan bool, 1)
+ session.runChan = make(chan time.Duration, 1)
session.cancelChan = make(chan bool, 1)
session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10)
session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10)