diff options
Diffstat (limited to 'rhimport/session.go')
-rw-r--r-- | rhimport/session.go | 272 |
1 files changed, 131 insertions, 141 deletions
diff --git a/rhimport/session.go b/rhimport/session.go index a119285..e749020 100644 --- a/rhimport/session.go +++ b/rhimport/session.go @@ -31,33 +31,23 @@ import ( ) const ( - SESSION_NEW = iota - SESSION_RUNNING - SESSION_CANCELED - SESSION_DONE - SESSION_TIMEOUT + _SESSION_NEW = iota + _SESSION_RUNNING + _SESSION_CANCELED + _SESSION_DONE + _SESSION_TIMEOUT ) -type SessionProgressCB struct { +type sessionProgressCB struct { cb ProgressCB userdata interface{} } -type SessionDoneCB struct { +type sessionDoneCB struct { cb DoneCB userdata interface{} } -type ProgressData struct { - Step int - StepName string - Current float64 - Total float64 - Title string - Cart uint - Cut uint -} - type sessionAddProgressHandlerResponse struct { err error } @@ -87,7 +77,7 @@ type attachUploaderRequest struct { response chan<- attachUploaderResponse } -type Session struct { +type session struct { ctx Context state int removeFunc func() @@ -103,8 +93,8 @@ type Session struct { addProgressChan chan sessionAddProgressHandlerRequest addDoneChan chan sessionAddDoneHandlerRequest attachUploaderChan chan attachUploaderRequest - progressCBs []*SessionProgressCB - doneCBs []*SessionDoneCB + progressCBs []*sessionProgressCB + doneCBs []*sessionDoneCB cancelUploader chan bool } @@ -148,47 +138,47 @@ func sessionRun(ctx Context, done chan<- Result) { done <- *res } -func (self *Session) run(timeout time.Duration) { - self.ctx.ProgressCallBack = sessionProgressCallback - self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan) - self.ctx.Cancel = self.cancelIntChan - go sessionRun(self.ctx, self.doneIntChan) - self.state = SESSION_RUNNING +func (s *session) run(timeout time.Duration) { + s.ctx.ProgressCallBack = sessionProgressCallback + s.ctx.ProgressCallBackData = (chan<- ProgressData)(s.progressIntChan) + s.ctx.Cancel = s.cancelIntChan + go sessionRun(s.ctx, s.doneIntChan) + s.state = _SESSION_RUNNING if timeout > 3*time.Hour { - self.ctx.stdlog.Printf("requested session timeout (%v) is to high - lowering to 3h", timeout) + s.ctx.stdlog.Printf("requested session timeout (%v) is to high - lowering to 3h", timeout) timeout = 3 * time.Hour } - self.timer.Reset(timeout) + s.timer.Reset(timeout) return } -func (self *Session) cancel() { - self.ctx.dbglog.Println("Session: canceling running import") +func (s *session) cancel() { + s.ctx.dbglog.Println("Session: canceling running import") select { - case self.cancelIntChan <- true: + case s.cancelIntChan <- true: default: // session got canceled already?? } - self.state = SESSION_CANCELED + s.state = _SESSION_CANCELED } -func (self *Session) addProgressHandler(userdata interface{}, cb ProgressCB) (resp sessionAddProgressHandlerResponse) { - if self.state != SESSION_NEW && self.state != SESSION_RUNNING { +func (s *session) addProgressHandler(userdata interface{}, cb ProgressCB) (resp sessionAddProgressHandlerResponse) { + if s.state != _SESSION_NEW && s.state != _SESSION_RUNNING { resp.err = fmt.Errorf("session is already done/canceled") } - self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata}) + s.progressCBs = append(s.progressCBs, &sessionProgressCB{cb, userdata}) return } -func (self *Session) addDoneHandler(userdata interface{}, cb DoneCB) (resp sessionAddDoneHandlerResponse) { - if self.state != SESSION_NEW && self.state != SESSION_RUNNING { +func (s *session) addDoneHandler(userdata interface{}, cb DoneCB) (resp sessionAddDoneHandlerResponse) { + if s.state != _SESSION_NEW && s.state != _SESSION_RUNNING { resp.err = fmt.Errorf("session is already done/canceled") } - self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata}) + s.doneCBs = append(s.doneCBs, &sessionDoneCB{cb, userdata}) return } -func (self *Session) callProgressHandler(p *ProgressData) { - for _, cb := range self.progressCBs { +func (s *session) callProgressHandler(p *ProgressData) { + for _, cb := range s.progressCBs { if cb.cb != nil { if keep := cb.cb(p.Step, p.StepName, p.Current, p.Total, p.Title, p.Cart, p.Cut, cb.userdata); !keep { cb.cb = nil @@ -197,8 +187,8 @@ func (self *Session) callProgressHandler(p *ProgressData) { } } -func (self *Session) callDoneHandler(r *Result) { - for _, cb := range self.doneCBs { +func (s *session) callDoneHandler(r *Result) { + for _, cb := range s.doneCBs { if cb.cb != nil { if keep := cb.cb(*r, cb.userdata); !keep { cb.cb = nil @@ -207,88 +197,88 @@ func (self *Session) callDoneHandler(r *Result) { } } -func (self *Session) attachUploader() (resp attachUploaderResponse) { - if self.cancelUploader != nil { +func (s *session) attachUploader() (resp attachUploaderResponse) { + if s.cancelUploader != nil { return } - self.cancelUploader = make(chan bool, 1) - resp.cancel = self.cancelUploader - resp.attachment = self.ctx.AttachmentChan + s.cancelUploader = make(chan bool, 1) + resp.cancel = s.cancelUploader + resp.attachment = s.ctx.AttachmentChan return } -func (self *Session) dispatchRequests() { +func (s *session) dispatchRequests() { defer func() { - if self.cancelUploader != nil { - close(self.cancelUploader) + if s.cancelUploader != nil { + close(s.cancelUploader) } - self.done <- true + s.done <- true }() var lastProgress *ProgressData progressPending := 0 - pt := time.NewTimer(self.progressRateLimit) + pt := time.NewTimer(s.progressRateLimit) pt.Stop() for { select { - case <-self.quit: - if self.state == SESSION_RUNNING { - self.cancel() + case <-s.quit: + if s.state == _SESSION_RUNNING { + s.cancel() } return - case <-self.timer.C: - if self.state == SESSION_RUNNING { - self.cancel() + case <-s.timer.C: + if s.state == _SESSION_RUNNING { + s.cancel() } - self.state = SESSION_TIMEOUT + s.state = _SESSION_TIMEOUT r := &Result{ResponseCode: http.StatusInternalServerError, ErrorString: "session timed out"} - self.callDoneHandler(r) - if self.removeFunc != nil { - self.removeFunc() + s.callDoneHandler(r) + if s.removeFunc != nil { + s.removeFunc() } - case t := <-self.runChan: - if self.state == SESSION_NEW { - self.run(t) + case t := <-s.runChan: + if s.state == _SESSION_NEW { + s.run(t) } - case <-self.cancelChan: - if self.state == SESSION_RUNNING { - self.cancel() + case <-s.cancelChan: + if s.state == _SESSION_RUNNING { + s.cancel() } - case req := <-self.addProgressChan: - req.response <- self.addProgressHandler(req.userdata, req.callback) - case req := <-self.addDoneChan: - req.response <- self.addDoneHandler(req.userdata, req.callback) + case req := <-s.addProgressChan: + req.response <- s.addProgressHandler(req.userdata, req.callback) + case req := <-s.addDoneChan: + req.response <- s.addDoneHandler(req.userdata, req.callback) case <-pt.C: if progressPending > 1 && lastProgress != nil { - self.callProgressHandler(lastProgress) + s.callProgressHandler(lastProgress) } progressPending = 0 lastProgress = nil - case p := <-self.progressIntChan: - if self.state == SESSION_RUNNING { + case p := <-s.progressIntChan: + if s.state == _SESSION_RUNNING { if lastProgress == nil { - self.callProgressHandler(&p) - pt.Reset(self.progressRateLimit) + s.callProgressHandler(&p) + pt.Reset(s.progressRateLimit) } else if lastProgress.Step != p.Step { - self.callProgressHandler(lastProgress) - self.callProgressHandler(&p) - pt.Reset(self.progressRateLimit) + s.callProgressHandler(lastProgress) + s.callProgressHandler(&p) + pt.Reset(s.progressRateLimit) } lastProgress = &p progressPending++ } - case r := <-self.doneIntChan: - if self.state != SESSION_TIMEOUT { - self.timer.Stop() - self.state = SESSION_DONE - self.callDoneHandler(&r) - if self.removeFunc != nil { - self.removeFunc() + case r := <-s.doneIntChan: + if s.state != _SESSION_TIMEOUT { + s.timer.Stop() + s.state = _SESSION_DONE + s.callDoneHandler(&r) + if s.removeFunc != nil { + s.removeFunc() } } - case req := <-self.attachUploaderChan: - req.response <- self.attachUploader() + case req := <-s.attachUploaderChan: + req.response <- s.attachUploader() } } } @@ -296,7 +286,7 @@ func (self *Session) dispatchRequests() { // ********************************************************* // Public Interface -type SessionChan struct { +type Session struct { runChan chan<- time.Duration cancelChan chan<- bool addProgressChan chan<- sessionAddProgressHandlerRequest @@ -304,28 +294,28 @@ type SessionChan struct { attachUploaderChan chan<- attachUploaderRequest } -func (self *SessionChan) Run(timeout time.Duration) { +func (s *Session) Run(timeout time.Duration) { select { - case self.runChan <- timeout: + case s.runChan <- timeout: default: // command is already pending or session is about to be closed/removed } } -func (self *SessionChan) Cancel() { +func (s *Session) Cancel() { select { - case self.cancelChan <- true: + case s.cancelChan <- true: default: // cancel is already pending or session is about to be closed/removed } } -func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ProgressCB) error { +func (s *Session) AddProgressHandler(userdata interface{}, cb ProgressCB) error { resCh := make(chan sessionAddProgressHandlerResponse) req := sessionAddProgressHandlerRequest{} req.userdata = userdata req.callback = cb req.response = resCh select { - case self.addProgressChan <- req: + case s.addProgressChan <- req: default: return fmt.Errorf("session is about to be closed/removed") } @@ -334,14 +324,14 @@ func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ProgressCB) return res.err } -func (self *SessionChan) AddDoneHandler(userdata interface{}, cb DoneCB) error { +func (s *Session) AddDoneHandler(userdata interface{}, cb DoneCB) error { resCh := make(chan sessionAddDoneHandlerResponse) req := sessionAddDoneHandlerRequest{} req.userdata = userdata req.callback = cb req.response = resCh select { - case self.addDoneChan <- req: + case s.addDoneChan <- req: default: return fmt.Errorf("session is about to be closed/removed") } @@ -350,12 +340,12 @@ func (self *SessionChan) AddDoneHandler(userdata interface{}, cb DoneCB) error { return res.err } -func (self *SessionChan) AttachUploader() (<-chan bool, chan<- AttachmentChunk) { +func (s *Session) AttachUploader() (<-chan bool, chan<- AttachmentChunk) { resCh := make(chan attachUploaderResponse) req := attachUploaderRequest{} req.response = resCh select { - case self.attachUploaderChan <- req: + case s.attachUploaderChan <- req: default: // session is about to be closed/removed return nil, nil @@ -368,53 +358,53 @@ func (self *SessionChan) AttachUploader() (<-chan bool, chan<- AttachmentChunk) // ********************************************************* // Semi-Public Interface (only used by sessionStore) -func (self *Session) getInterface() *SessionChan { - ch := &SessionChan{} - ch.runChan = self.runChan - ch.cancelChan = self.cancelChan - ch.addProgressChan = self.addProgressChan - ch.addDoneChan = self.addDoneChan - ch.attachUploaderChan = self.attachUploaderChan +func (s *session) getInterface() *Session { + ch := &Session{} + ch.runChan = s.runChan + ch.cancelChan = s.cancelChan + ch.addProgressChan = s.addProgressChan + ch.addDoneChan = s.addDoneChan + ch.attachUploaderChan = s.attachUploaderChan return ch } -func (self *Session) cleanup() { - self.quit <- true - self.ctx.dbglog.Printf("Session: waiting for session to close") - <-self.done - close(self.quit) - close(self.done) - self.timer.Stop() +func (s *session) cleanup() { + s.quit <- true + s.ctx.dbglog.Printf("Session: waiting for session to close") + <-s.done + close(s.quit) + close(s.done) + s.timer.Stop() // don't close the channels we give out because this might lead to a panic if // somebody wites to an already removed session - // close(self.cancelIntChan) - // close(self.progressIntChan) - // close(self.doneIntChan) - // close(self.runChan) - // close(self.cancelChan) - // close(self.addProgressChan) - // close(self.addDoneChan) - // close(self.attachUploader) - self.ctx.dbglog.Printf("Session: cleanup is now done") + // close(s.cancelIntChan) + // close(s.progressIntChan) + // close(s.doneIntChan) + // close(s.runChan) + // close(s.cancelChan) + // close(s.addProgressChan) + // close(s.addDoneChan) + // close(s.attachUploader) + s.ctx.dbglog.Printf("Session: cleanup is now done") } -func newSession(ctx *Context, removeFunc func()) (session *Session) { - session = new(Session) - session.state = SESSION_NEW - session.removeFunc = removeFunc - session.ctx = *ctx - session.quit = make(chan bool, 1) - session.done = make(chan bool) - session.timer = time.NewTimer(10 * time.Second) - session.cancelIntChan = make(chan bool, 1) - session.progressRateLimit = 100 * time.Millisecond // TODO: hardcoded value - session.progressIntChan = make(chan ProgressData, 10) - session.doneIntChan = make(chan Result, 1) - session.runChan = make(chan time.Duration, 1) - session.cancelChan = make(chan bool, 1) - session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10) - session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10) - session.attachUploaderChan = make(chan attachUploaderRequest, 1) - go session.dispatchRequests() +func newSession(ctx *Context, removeFunc func()) (s *session) { + s = &session{} + s.state = _SESSION_NEW + s.removeFunc = removeFunc + s.ctx = *ctx + s.quit = make(chan bool, 1) + s.done = make(chan bool) + s.timer = time.NewTimer(10 * time.Second) + s.cancelIntChan = make(chan bool, 1) + s.progressRateLimit = 100 * time.Millisecond // TODO: hardcoded value + s.progressIntChan = make(chan ProgressData, 10) + s.doneIntChan = make(chan Result, 1) + s.runChan = make(chan time.Duration, 1) + s.cancelChan = make(chan bool, 1) + s.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10) + s.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10) + s.attachUploaderChan = make(chan attachUploaderRequest, 1) + go s.dispatchRequests() return } |