diff options
Diffstat (limited to 'rhimport/session.go')
-rw-r--r-- | rhimport/session.go | 100 |
1 files changed, 74 insertions, 26 deletions
diff --git a/rhimport/session.go b/rhimport/session.go index e0d6efb..b8b82ad 100644 --- a/rhimport/session.go +++ b/rhimport/session.go @@ -38,25 +38,6 @@ const ( SESSION_TIMEOUT ) -type Session struct { - ctx Context - state int - removeFunc func() - done chan bool - quit chan bool - timer *time.Timer - cancelIntChan chan bool - progressRateLimit time.Duration - progressIntChan chan ProgressData - doneIntChan chan Result - runChan chan time.Duration - cancelChan chan bool - addProgressChan chan sessionAddProgressHandlerRequest - addDoneChan chan sessionAddDoneHandlerRequest - progressCBs []*SessionProgressCB - doneCBs []*SessionDoneCB -} - type SessionProgressCB struct { cb ProgressCB userdata interface{} @@ -97,6 +78,36 @@ type sessionAddDoneHandlerRequest struct { response chan<- sessionAddDoneHandlerResponse } +type attachUploaderResponse struct { + cancel <-chan bool + attachment chan<- AttachmentChunk +} + +type attachUploaderRequest struct { + response chan<- attachUploaderResponse +} + +type Session struct { + ctx Context + state int + removeFunc func() + done chan bool + quit chan bool + timer *time.Timer + cancelIntChan chan bool + progressRateLimit time.Duration + progressIntChan chan ProgressData + doneIntChan chan Result + runChan chan time.Duration + cancelChan chan bool + addProgressChan chan sessionAddProgressHandlerRequest + addDoneChan chan sessionAddDoneHandlerRequest + attachUploaderChan chan attachUploaderRequest + progressCBs []*SessionProgressCB + doneCBs []*SessionDoneCB + cancelUploader chan bool +} + func sessionProgressCallback(step int, stepName string, current, total float64, title string, cart, cut uint, userdata interface{}) bool { out := userdata.(chan<- ProgressData) out <- ProgressData{step, stepName, current, total, title, cart, cut} @@ -196,8 +207,23 @@ func (self *Session) callDoneHandler(r *Result) { } } +func (self *Session) attachUploader() (resp attachUploaderResponse) { + if self.cancelUploader != nil { + return + } + self.cancelUploader = make(chan bool, 1) + resp.cancel = self.cancelUploader + resp.attachment = self.ctx.AttachmentChan + return +} + func (self *Session) dispatchRequests() { - defer func() { self.done <- true }() + defer func() { + if self.cancelUploader != nil { + close(self.cancelUploader) + } + self.done <- true + }() var lastProgress *ProgressData progressPending := 0 @@ -261,6 +287,8 @@ func (self *Session) dispatchRequests() { self.removeFunc() } } + case req := <-self.attachUploaderChan: + req.response <- self.attachUploader() } } } @@ -269,10 +297,11 @@ func (self *Session) dispatchRequests() { // Public Interface type SessionChan struct { - runChan chan<- time.Duration - cancelChan chan<- bool - addProgressChan chan<- sessionAddProgressHandlerRequest - addDoneChan chan<- sessionAddDoneHandlerRequest + runChan chan<- time.Duration + cancelChan chan<- bool + addProgressChan chan<- sessionAddProgressHandlerRequest + addDoneChan chan<- sessionAddDoneHandlerRequest + attachUploaderChan chan<- attachUploaderRequest } func (self *SessionChan) Run(timeout time.Duration) { @@ -321,6 +350,21 @@ func (self *SessionChan) AddDoneHandler(userdata interface{}, cb DoneCB) error { return res.err } +func (self *SessionChan) AttachUploader() (<-chan bool, chan<- AttachmentChunk) { + resCh := make(chan attachUploaderResponse) + req := attachUploaderRequest{} + req.response = resCh + select { + case self.attachUploaderChan <- req: + default: + // session is about to be closed/removed + return nil, nil + } + + res := <-resCh + return res.cancel, res.attachment +} + // ********************************************************* // Semi-Public Interface (only used by sessionStore) @@ -330,12 +374,13 @@ func (self *Session) getInterface() *SessionChan { ch.cancelChan = self.cancelChan ch.addProgressChan = self.addProgressChan ch.addDoneChan = self.addDoneChan + ch.attachUploaderChan = self.attachUploaderChan return ch } func (self *Session) cleanup() { self.quit <- true - rhdl.Printf("waiting for session to close") + rhdl.Printf("Session: waiting for session to close") <-self.done close(self.quit) close(self.done) @@ -349,7 +394,8 @@ func (self *Session) cleanup() { // close(self.cancelChan) // close(self.addProgressChan) // close(self.addDoneChan) - rhdl.Printf("session is now cleaned up") + // close(self.attachUploader) + rhdl.Printf("Session: cleanup is now done") } func newSession(ctx *Context, removeFunc func()) (session *Session) { @@ -357,6 +403,7 @@ func newSession(ctx *Context, removeFunc func()) (session *Session) { session.state = SESSION_NEW session.removeFunc = removeFunc session.ctx = *ctx + session.quit = make(chan bool) session.done = make(chan bool) session.timer = time.NewTimer(10 * time.Second) session.cancelIntChan = make(chan bool, 1) @@ -367,6 +414,7 @@ func newSession(ctx *Context, removeFunc func()) (session *Session) { session.cancelChan = make(chan bool, 1) session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10) session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10) + session.attachUploaderChan = make(chan attachUploaderRequest, 1) go session.dispatchRequests() return } |