summaryrefslogtreecommitdiff
path: root/rhimport/session.go
diff options
context:
space:
mode:
Diffstat (limited to 'rhimport/session.go')
-rw-r--r--rhimport/session.go272
1 files changed, 131 insertions, 141 deletions
diff --git a/rhimport/session.go b/rhimport/session.go
index a119285..e749020 100644
--- a/rhimport/session.go
+++ b/rhimport/session.go
@@ -31,33 +31,23 @@ import (
)
const (
- SESSION_NEW = iota
- SESSION_RUNNING
- SESSION_CANCELED
- SESSION_DONE
- SESSION_TIMEOUT
+ _SESSION_NEW = iota
+ _SESSION_RUNNING
+ _SESSION_CANCELED
+ _SESSION_DONE
+ _SESSION_TIMEOUT
)
-type SessionProgressCB struct {
+type sessionProgressCB struct {
cb ProgressCB
userdata interface{}
}
-type SessionDoneCB struct {
+type sessionDoneCB struct {
cb DoneCB
userdata interface{}
}
-type ProgressData struct {
- Step int
- StepName string
- Current float64
- Total float64
- Title string
- Cart uint
- Cut uint
-}
-
type sessionAddProgressHandlerResponse struct {
err error
}
@@ -87,7 +77,7 @@ type attachUploaderRequest struct {
response chan<- attachUploaderResponse
}
-type Session struct {
+type session struct {
ctx Context
state int
removeFunc func()
@@ -103,8 +93,8 @@ type Session struct {
addProgressChan chan sessionAddProgressHandlerRequest
addDoneChan chan sessionAddDoneHandlerRequest
attachUploaderChan chan attachUploaderRequest
- progressCBs []*SessionProgressCB
- doneCBs []*SessionDoneCB
+ progressCBs []*sessionProgressCB
+ doneCBs []*sessionDoneCB
cancelUploader chan bool
}
@@ -148,47 +138,47 @@ func sessionRun(ctx Context, done chan<- Result) {
done <- *res
}
-func (self *Session) run(timeout time.Duration) {
- self.ctx.ProgressCallBack = sessionProgressCallback
- self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan)
- self.ctx.Cancel = self.cancelIntChan
- go sessionRun(self.ctx, self.doneIntChan)
- self.state = SESSION_RUNNING
+func (s *session) run(timeout time.Duration) {
+ s.ctx.ProgressCallBack = sessionProgressCallback
+ s.ctx.ProgressCallBackData = (chan<- ProgressData)(s.progressIntChan)
+ s.ctx.Cancel = s.cancelIntChan
+ go sessionRun(s.ctx, s.doneIntChan)
+ s.state = _SESSION_RUNNING
if timeout > 3*time.Hour {
- self.ctx.stdlog.Printf("requested session timeout (%v) is to high - lowering to 3h", timeout)
+ s.ctx.stdlog.Printf("requested session timeout (%v) is to high - lowering to 3h", timeout)
timeout = 3 * time.Hour
}
- self.timer.Reset(timeout)
+ s.timer.Reset(timeout)
return
}
-func (self *Session) cancel() {
- self.ctx.dbglog.Println("Session: canceling running import")
+func (s *session) cancel() {
+ s.ctx.dbglog.Println("Session: canceling running import")
select {
- case self.cancelIntChan <- true:
+ case s.cancelIntChan <- true:
default: // session got canceled already??
}
- self.state = SESSION_CANCELED
+ s.state = _SESSION_CANCELED
}
-func (self *Session) addProgressHandler(userdata interface{}, cb ProgressCB) (resp sessionAddProgressHandlerResponse) {
- if self.state != SESSION_NEW && self.state != SESSION_RUNNING {
+func (s *session) addProgressHandler(userdata interface{}, cb ProgressCB) (resp sessionAddProgressHandlerResponse) {
+ if s.state != _SESSION_NEW && s.state != _SESSION_RUNNING {
resp.err = fmt.Errorf("session is already done/canceled")
}
- self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata})
+ s.progressCBs = append(s.progressCBs, &sessionProgressCB{cb, userdata})
return
}
-func (self *Session) addDoneHandler(userdata interface{}, cb DoneCB) (resp sessionAddDoneHandlerResponse) {
- if self.state != SESSION_NEW && self.state != SESSION_RUNNING {
+func (s *session) addDoneHandler(userdata interface{}, cb DoneCB) (resp sessionAddDoneHandlerResponse) {
+ if s.state != _SESSION_NEW && s.state != _SESSION_RUNNING {
resp.err = fmt.Errorf("session is already done/canceled")
}
- self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata})
+ s.doneCBs = append(s.doneCBs, &sessionDoneCB{cb, userdata})
return
}
-func (self *Session) callProgressHandler(p *ProgressData) {
- for _, cb := range self.progressCBs {
+func (s *session) callProgressHandler(p *ProgressData) {
+ for _, cb := range s.progressCBs {
if cb.cb != nil {
if keep := cb.cb(p.Step, p.StepName, p.Current, p.Total, p.Title, p.Cart, p.Cut, cb.userdata); !keep {
cb.cb = nil
@@ -197,8 +187,8 @@ func (self *Session) callProgressHandler(p *ProgressData) {
}
}
-func (self *Session) callDoneHandler(r *Result) {
- for _, cb := range self.doneCBs {
+func (s *session) callDoneHandler(r *Result) {
+ for _, cb := range s.doneCBs {
if cb.cb != nil {
if keep := cb.cb(*r, cb.userdata); !keep {
cb.cb = nil
@@ -207,88 +197,88 @@ func (self *Session) callDoneHandler(r *Result) {
}
}
-func (self *Session) attachUploader() (resp attachUploaderResponse) {
- if self.cancelUploader != nil {
+func (s *session) attachUploader() (resp attachUploaderResponse) {
+ if s.cancelUploader != nil {
return
}
- self.cancelUploader = make(chan bool, 1)
- resp.cancel = self.cancelUploader
- resp.attachment = self.ctx.AttachmentChan
+ s.cancelUploader = make(chan bool, 1)
+ resp.cancel = s.cancelUploader
+ resp.attachment = s.ctx.AttachmentChan
return
}
-func (self *Session) dispatchRequests() {
+func (s *session) dispatchRequests() {
defer func() {
- if self.cancelUploader != nil {
- close(self.cancelUploader)
+ if s.cancelUploader != nil {
+ close(s.cancelUploader)
}
- self.done <- true
+ s.done <- true
}()
var lastProgress *ProgressData
progressPending := 0
- pt := time.NewTimer(self.progressRateLimit)
+ pt := time.NewTimer(s.progressRateLimit)
pt.Stop()
for {
select {
- case <-self.quit:
- if self.state == SESSION_RUNNING {
- self.cancel()
+ case <-s.quit:
+ if s.state == _SESSION_RUNNING {
+ s.cancel()
}
return
- case <-self.timer.C:
- if self.state == SESSION_RUNNING {
- self.cancel()
+ case <-s.timer.C:
+ if s.state == _SESSION_RUNNING {
+ s.cancel()
}
- self.state = SESSION_TIMEOUT
+ s.state = _SESSION_TIMEOUT
r := &Result{ResponseCode: http.StatusInternalServerError, ErrorString: "session timed out"}
- self.callDoneHandler(r)
- if self.removeFunc != nil {
- self.removeFunc()
+ s.callDoneHandler(r)
+ if s.removeFunc != nil {
+ s.removeFunc()
}
- case t := <-self.runChan:
- if self.state == SESSION_NEW {
- self.run(t)
+ case t := <-s.runChan:
+ if s.state == _SESSION_NEW {
+ s.run(t)
}
- case <-self.cancelChan:
- if self.state == SESSION_RUNNING {
- self.cancel()
+ case <-s.cancelChan:
+ if s.state == _SESSION_RUNNING {
+ s.cancel()
}
- case req := <-self.addProgressChan:
- req.response <- self.addProgressHandler(req.userdata, req.callback)
- case req := <-self.addDoneChan:
- req.response <- self.addDoneHandler(req.userdata, req.callback)
+ case req := <-s.addProgressChan:
+ req.response <- s.addProgressHandler(req.userdata, req.callback)
+ case req := <-s.addDoneChan:
+ req.response <- s.addDoneHandler(req.userdata, req.callback)
case <-pt.C:
if progressPending > 1 && lastProgress != nil {
- self.callProgressHandler(lastProgress)
+ s.callProgressHandler(lastProgress)
}
progressPending = 0
lastProgress = nil
- case p := <-self.progressIntChan:
- if self.state == SESSION_RUNNING {
+ case p := <-s.progressIntChan:
+ if s.state == _SESSION_RUNNING {
if lastProgress == nil {
- self.callProgressHandler(&p)
- pt.Reset(self.progressRateLimit)
+ s.callProgressHandler(&p)
+ pt.Reset(s.progressRateLimit)
} else if lastProgress.Step != p.Step {
- self.callProgressHandler(lastProgress)
- self.callProgressHandler(&p)
- pt.Reset(self.progressRateLimit)
+ s.callProgressHandler(lastProgress)
+ s.callProgressHandler(&p)
+ pt.Reset(s.progressRateLimit)
}
lastProgress = &p
progressPending++
}
- case r := <-self.doneIntChan:
- if self.state != SESSION_TIMEOUT {
- self.timer.Stop()
- self.state = SESSION_DONE
- self.callDoneHandler(&r)
- if self.removeFunc != nil {
- self.removeFunc()
+ case r := <-s.doneIntChan:
+ if s.state != _SESSION_TIMEOUT {
+ s.timer.Stop()
+ s.state = _SESSION_DONE
+ s.callDoneHandler(&r)
+ if s.removeFunc != nil {
+ s.removeFunc()
}
}
- case req := <-self.attachUploaderChan:
- req.response <- self.attachUploader()
+ case req := <-s.attachUploaderChan:
+ req.response <- s.attachUploader()
}
}
}
@@ -296,7 +286,7 @@ func (self *Session) dispatchRequests() {
// *********************************************************
// Public Interface
-type SessionChan struct {
+type Session struct {
runChan chan<- time.Duration
cancelChan chan<- bool
addProgressChan chan<- sessionAddProgressHandlerRequest
@@ -304,28 +294,28 @@ type SessionChan struct {
attachUploaderChan chan<- attachUploaderRequest
}
-func (self *SessionChan) Run(timeout time.Duration) {
+func (s *Session) Run(timeout time.Duration) {
select {
- case self.runChan <- timeout:
+ case s.runChan <- timeout:
default: // command is already pending or session is about to be closed/removed
}
}
-func (self *SessionChan) Cancel() {
+func (s *Session) Cancel() {
select {
- case self.cancelChan <- true:
+ case s.cancelChan <- true:
default: // cancel is already pending or session is about to be closed/removed
}
}
-func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ProgressCB) error {
+func (s *Session) AddProgressHandler(userdata interface{}, cb ProgressCB) error {
resCh := make(chan sessionAddProgressHandlerResponse)
req := sessionAddProgressHandlerRequest{}
req.userdata = userdata
req.callback = cb
req.response = resCh
select {
- case self.addProgressChan <- req:
+ case s.addProgressChan <- req:
default:
return fmt.Errorf("session is about to be closed/removed")
}
@@ -334,14 +324,14 @@ func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ProgressCB)
return res.err
}
-func (self *SessionChan) AddDoneHandler(userdata interface{}, cb DoneCB) error {
+func (s *Session) AddDoneHandler(userdata interface{}, cb DoneCB) error {
resCh := make(chan sessionAddDoneHandlerResponse)
req := sessionAddDoneHandlerRequest{}
req.userdata = userdata
req.callback = cb
req.response = resCh
select {
- case self.addDoneChan <- req:
+ case s.addDoneChan <- req:
default:
return fmt.Errorf("session is about to be closed/removed")
}
@@ -350,12 +340,12 @@ func (self *SessionChan) AddDoneHandler(userdata interface{}, cb DoneCB) error {
return res.err
}
-func (self *SessionChan) AttachUploader() (<-chan bool, chan<- AttachmentChunk) {
+func (s *Session) AttachUploader() (<-chan bool, chan<- AttachmentChunk) {
resCh := make(chan attachUploaderResponse)
req := attachUploaderRequest{}
req.response = resCh
select {
- case self.attachUploaderChan <- req:
+ case s.attachUploaderChan <- req:
default:
// session is about to be closed/removed
return nil, nil
@@ -368,53 +358,53 @@ func (self *SessionChan) AttachUploader() (<-chan bool, chan<- AttachmentChunk)
// *********************************************************
// Semi-Public Interface (only used by sessionStore)
-func (self *Session) getInterface() *SessionChan {
- ch := &SessionChan{}
- ch.runChan = self.runChan
- ch.cancelChan = self.cancelChan
- ch.addProgressChan = self.addProgressChan
- ch.addDoneChan = self.addDoneChan
- ch.attachUploaderChan = self.attachUploaderChan
+func (s *session) getInterface() *Session {
+ ch := &Session{}
+ ch.runChan = s.runChan
+ ch.cancelChan = s.cancelChan
+ ch.addProgressChan = s.addProgressChan
+ ch.addDoneChan = s.addDoneChan
+ ch.attachUploaderChan = s.attachUploaderChan
return ch
}
-func (self *Session) cleanup() {
- self.quit <- true
- self.ctx.dbglog.Printf("Session: waiting for session to close")
- <-self.done
- close(self.quit)
- close(self.done)
- self.timer.Stop()
+func (s *session) cleanup() {
+ s.quit <- true
+ s.ctx.dbglog.Printf("Session: waiting for session to close")
+ <-s.done
+ close(s.quit)
+ close(s.done)
+ s.timer.Stop()
// don't close the channels we give out because this might lead to a panic if
// somebody wites to an already removed session
- // close(self.cancelIntChan)
- // close(self.progressIntChan)
- // close(self.doneIntChan)
- // close(self.runChan)
- // close(self.cancelChan)
- // close(self.addProgressChan)
- // close(self.addDoneChan)
- // close(self.attachUploader)
- self.ctx.dbglog.Printf("Session: cleanup is now done")
+ // close(s.cancelIntChan)
+ // close(s.progressIntChan)
+ // close(s.doneIntChan)
+ // close(s.runChan)
+ // close(s.cancelChan)
+ // close(s.addProgressChan)
+ // close(s.addDoneChan)
+ // close(s.attachUploader)
+ s.ctx.dbglog.Printf("Session: cleanup is now done")
}
-func newSession(ctx *Context, removeFunc func()) (session *Session) {
- session = new(Session)
- session.state = SESSION_NEW
- session.removeFunc = removeFunc
- session.ctx = *ctx
- session.quit = make(chan bool, 1)
- session.done = make(chan bool)
- session.timer = time.NewTimer(10 * time.Second)
- session.cancelIntChan = make(chan bool, 1)
- session.progressRateLimit = 100 * time.Millisecond // TODO: hardcoded value
- session.progressIntChan = make(chan ProgressData, 10)
- session.doneIntChan = make(chan Result, 1)
- session.runChan = make(chan time.Duration, 1)
- session.cancelChan = make(chan bool, 1)
- session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10)
- session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10)
- session.attachUploaderChan = make(chan attachUploaderRequest, 1)
- go session.dispatchRequests()
+func newSession(ctx *Context, removeFunc func()) (s *session) {
+ s = &session{}
+ s.state = _SESSION_NEW
+ s.removeFunc = removeFunc
+ s.ctx = *ctx
+ s.quit = make(chan bool, 1)
+ s.done = make(chan bool)
+ s.timer = time.NewTimer(10 * time.Second)
+ s.cancelIntChan = make(chan bool, 1)
+ s.progressRateLimit = 100 * time.Millisecond // TODO: hardcoded value
+ s.progressIntChan = make(chan ProgressData, 10)
+ s.doneIntChan = make(chan Result, 1)
+ s.runChan = make(chan time.Duration, 1)
+ s.cancelChan = make(chan bool, 1)
+ s.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10)
+ s.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10)
+ s.attachUploaderChan = make(chan attachUploaderRequest, 1)
+ go s.dispatchRequests()
return
}