From 3822025a7a4103f2c2de70f0c9199bc9a64cc3b4 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Wed, 23 Dec 2015 01:30:03 +0100 Subject: session now has a timeout diff --git a/session.go b/session.go index bce1072..88648f7 100644 --- a/session.go +++ b/session.go @@ -27,6 +27,7 @@ package rhimport import ( "fmt" "net/http" + "time" ) const ( @@ -34,10 +35,11 @@ const ( SESSION_RUNNING SESSION_CANCELED SESSION_DONE + SESSION_TIMEOUT ) type SessionChan struct { - runChan chan<- bool + runChan chan<- time.Duration cancelChan chan<- bool addProgressChan chan<- sessionAddProgressHandlerRequest addDoneChan chan<- sessionAddDoneHandlerRequest @@ -49,10 +51,11 @@ type Session struct { removeFunc func() done chan bool quit chan bool + timer *time.Timer cancelIntChan chan bool progressIntChan chan ProgressData doneIntChan chan ImportResult - runChan chan bool + runChan chan time.Duration cancelChan chan bool addProgressChan chan sessionAddProgressHandlerRequest addDoneChan chan sessionAddDoneHandlerRequest @@ -125,18 +128,19 @@ func session_import_run(ctx ImportContext, done chan<- ImportResult) { } } -func (self *Session) run() { +func (self *Session) run(timeout time.Duration) { self.ctx.ProgressCallBack = session_progress_callback self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan) self.ctx.Cancel = self.cancelIntChan go session_import_run(self.ctx, self.doneIntChan) self.state = SESSION_RUNNING + self.timer.Reset(timeout) return } -func (self *SessionChan) Run() { +func (self *SessionChan) Run(timeout time.Duration) { select { - case self.runChan <- true: + case self.runChan <- timeout: default: // command is already pending or session is about to be closed/removed } } @@ -182,7 +186,7 @@ func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgr } func (self *Session) addDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) (resp sessionAddDoneHandlerResponse) { - if self.state == SESSION_NEW && self.state != SESSION_RUNNING { + if self.state != SESSION_NEW && self.state != SESSION_RUNNING { resp.err = fmt.Errorf("session is already done/canceled") } self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata}) @@ -234,9 +238,19 @@ func (self *Session) dispatchRequests() { self.cancel() } return - case <-self.runChan: + case <-self.timer.C: + if self.state == SESSION_RUNNING { + self.cancel() + } + self.state = SESSION_TIMEOUT + r := &ImportResult{500, "timeout", 0, 0} + self.callDoneHandler(r) + if self.removeFunc != nil { + self.removeFunc() + } + case t := <-self.runChan: if self.state == SESSION_NEW { - self.run() + self.run(t) } case <-self.cancelChan: if self.state == SESSION_RUNNING { @@ -249,10 +263,13 @@ func (self *Session) dispatchRequests() { case p := <-self.progressIntChan: self.callProgressHandler(&p) case r := <-self.doneIntChan: - self.state = SESSION_DONE - self.callDoneHandler(&r) - if self.removeFunc != nil { - self.removeFunc() + if self.state != SESSION_TIMEOUT { + self.timer.Stop() + self.state = SESSION_DONE + self.callDoneHandler(&r) + if self.removeFunc != nil { + self.removeFunc() + } } } } @@ -271,11 +288,14 @@ func (self *Session) Cleanup() { self.quit <- true rhdl.Printf("waiting for session to close") <-self.done + close(self.quit) close(self.done) - close(self.progressIntChan) - close(self.doneIntChan) + self.timer.Stop() // don't close the channels we give out because this might lead to a panic if // somebody wites to an already removed session + // close(self.cancelIntChan) + // close(self.progressIntChan) + // close(self.doneIntChan) // close(self.runChan) // close(self.cancelChan) // close(self.addProgressChan) @@ -289,10 +309,11 @@ func NewSession(ctx *ImportContext, removeFunc func()) (session *Session) { session.removeFunc = removeFunc session.ctx = *ctx session.done = make(chan bool) + session.timer = time.NewTimer(10 * time.Second) session.cancelIntChan = make(chan bool, 1) session.progressIntChan = make(chan ProgressData, 10) session.doneIntChan = make(chan ImportResult, 1) - session.runChan = make(chan bool, 1) + session.runChan = make(chan time.Duration, 1) session.cancelChan = make(chan bool, 1) session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10) session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10) -- cgit v0.10.2