diff options
author | Christian Pointner <equinox@helsinki.at> | 2016-07-22 16:45:12 (GMT) |
---|---|---|
committer | Christian Pointner <equinox@helsinki.at> | 2016-07-22 16:45:12 (GMT) |
commit | 405e7fa98878a61185d72f4bc5c9dede3f45f336 (patch) | |
tree | 6efa0f50f617be52cf4311e84e1f0bf7d0c1cd90 /rhimport | |
parent | 928b513a2b2be04babb9d2348ea5e28e1cb8bd7e (diff) |
session can now haven an attached uploader .. needs testing
Diffstat (limited to 'rhimport')
-rw-r--r-- | rhimport/core.go | 9 | ||||
-rw-r--r-- | rhimport/fetcher.go | 18 | ||||
-rw-r--r-- | rhimport/session.go | 100 |
3 files changed, 93 insertions, 34 deletions
diff --git a/rhimport/core.go b/rhimport/core.go index 9780485..f1bbe16 100644 --- a/rhimport/core.go +++ b/rhimport/core.go @@ -126,6 +126,11 @@ func getCBAApiKey() string { return "" } +type AttachmentChunk struct { + Data []byte + Error error +} + type Context struct { conf *Config db *rddb.DBChan @@ -144,7 +149,7 @@ type Context struct { AutotrimLevel int UseMetaData bool SourceUri string - AttachmentChan <-chan []byte + AttachmentChan chan AttachmentChunk FetchConverter string OrigFilename string Title string @@ -175,7 +180,7 @@ func NewContext(conf *Config, db *rddb.DBChan) *Context { ctx.NormalizationLevel = conf.ImportParamDefaults.NormalizationLevel ctx.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel ctx.UseMetaData = conf.ImportParamDefaults.UseMetaData - ctx.AttachmentChan = nil + ctx.AttachmentChan = make(chan AttachmentChunk, 32) ctx.FetchConverter = "ffmpeg-bs1770" ctx.OrigFilename = "" ctx.Title = "" diff --git a/rhimport/fetcher.go b/rhimport/fetcher.go index 0a72342..f6c8d28 100644 --- a/rhimport/fetcher.go +++ b/rhimport/fetcher.go @@ -595,21 +595,27 @@ func writeAttachmentFile(ctx *Context, res *Result, sizeTotal uint64, conv Fetch res.ResponseCode = http.StatusNoContent res.ErrorString = "canceled" return nil - case data, ok := <-ctx.AttachmentChan: + case chunk, ok := <-ctx.AttachmentChan: if !ok { - rhl.Printf("receiving attachment '%s' got canceled (channel has been closed)", ctx.SourceFile) + rhl.Printf("receiving attachment '%s' got canceled (channel has been closed prematurely)", ctx.SourceFile) res.ResponseCode = http.StatusNoContent res.ErrorString = "canceled" return nil } + if chunk.Error != nil { + rhl.Printf("receiving attachment '%s' failed: %v", ctx.SourceFile, chunk.Error) + res.ResponseCode = http.StatusInternalServerError + res.ErrorString = chunk.Error.Error() + return nil + } left := sizeTotal - written - if int(left) < len(data) { - rhl.Printf("attachment fetcher: truncating %d byes of extra data", len(data)-int(left)) - data = data[0:left] + if int(left) < len(chunk.Data) { + rhl.Printf("attachment fetcher: truncating %d byes of extra data", len(chunk.Data)-int(left)) + chunk.Data = chunk.Data[0:left] } - w, err := conv.Write(data) + w, err := conv.Write(chunk.Data) if err != nil { rhl.Printf("Unable to write to converter(%s): %s", ctx.SourceFile, err) return err diff --git a/rhimport/session.go b/rhimport/session.go index e0d6efb..b8b82ad 100644 --- a/rhimport/session.go +++ b/rhimport/session.go @@ -38,25 +38,6 @@ const ( SESSION_TIMEOUT ) -type Session struct { - ctx Context - state int - removeFunc func() - done chan bool - quit chan bool - timer *time.Timer - cancelIntChan chan bool - progressRateLimit time.Duration - progressIntChan chan ProgressData - doneIntChan chan Result - runChan chan time.Duration - cancelChan chan bool - addProgressChan chan sessionAddProgressHandlerRequest - addDoneChan chan sessionAddDoneHandlerRequest - progressCBs []*SessionProgressCB - doneCBs []*SessionDoneCB -} - type SessionProgressCB struct { cb ProgressCB userdata interface{} @@ -97,6 +78,36 @@ type sessionAddDoneHandlerRequest struct { response chan<- sessionAddDoneHandlerResponse } +type attachUploaderResponse struct { + cancel <-chan bool + attachment chan<- AttachmentChunk +} + +type attachUploaderRequest struct { + response chan<- attachUploaderResponse +} + +type Session struct { + ctx Context + state int + removeFunc func() + done chan bool + quit chan bool + timer *time.Timer + cancelIntChan chan bool + progressRateLimit time.Duration + progressIntChan chan ProgressData + doneIntChan chan Result + runChan chan time.Duration + cancelChan chan bool + addProgressChan chan sessionAddProgressHandlerRequest + addDoneChan chan sessionAddDoneHandlerRequest + attachUploaderChan chan attachUploaderRequest + progressCBs []*SessionProgressCB + doneCBs []*SessionDoneCB + cancelUploader chan bool +} + func sessionProgressCallback(step int, stepName string, current, total float64, title string, cart, cut uint, userdata interface{}) bool { out := userdata.(chan<- ProgressData) out <- ProgressData{step, stepName, current, total, title, cart, cut} @@ -196,8 +207,23 @@ func (self *Session) callDoneHandler(r *Result) { } } +func (self *Session) attachUploader() (resp attachUploaderResponse) { + if self.cancelUploader != nil { + return + } + self.cancelUploader = make(chan bool, 1) + resp.cancel = self.cancelUploader + resp.attachment = self.ctx.AttachmentChan + return +} + func (self *Session) dispatchRequests() { - defer func() { self.done <- true }() + defer func() { + if self.cancelUploader != nil { + close(self.cancelUploader) + } + self.done <- true + }() var lastProgress *ProgressData progressPending := 0 @@ -261,6 +287,8 @@ func (self *Session) dispatchRequests() { self.removeFunc() } } + case req := <-self.attachUploaderChan: + req.response <- self.attachUploader() } } } @@ -269,10 +297,11 @@ func (self *Session) dispatchRequests() { // Public Interface type SessionChan struct { - runChan chan<- time.Duration - cancelChan chan<- bool - addProgressChan chan<- sessionAddProgressHandlerRequest - addDoneChan chan<- sessionAddDoneHandlerRequest + runChan chan<- time.Duration + cancelChan chan<- bool + addProgressChan chan<- sessionAddProgressHandlerRequest + addDoneChan chan<- sessionAddDoneHandlerRequest + attachUploaderChan chan<- attachUploaderRequest } func (self *SessionChan) Run(timeout time.Duration) { @@ -321,6 +350,21 @@ func (self *SessionChan) AddDoneHandler(userdata interface{}, cb DoneCB) error { return res.err } +func (self *SessionChan) AttachUploader() (<-chan bool, chan<- AttachmentChunk) { + resCh := make(chan attachUploaderResponse) + req := attachUploaderRequest{} + req.response = resCh + select { + case self.attachUploaderChan <- req: + default: + // session is about to be closed/removed + return nil, nil + } + + res := <-resCh + return res.cancel, res.attachment +} + // ********************************************************* // Semi-Public Interface (only used by sessionStore) @@ -330,12 +374,13 @@ func (self *Session) getInterface() *SessionChan { ch.cancelChan = self.cancelChan ch.addProgressChan = self.addProgressChan ch.addDoneChan = self.addDoneChan + ch.attachUploaderChan = self.attachUploaderChan return ch } func (self *Session) cleanup() { self.quit <- true - rhdl.Printf("waiting for session to close") + rhdl.Printf("Session: waiting for session to close") <-self.done close(self.quit) close(self.done) @@ -349,7 +394,8 @@ func (self *Session) cleanup() { // close(self.cancelChan) // close(self.addProgressChan) // close(self.addDoneChan) - rhdl.Printf("session is now cleaned up") + // close(self.attachUploader) + rhdl.Printf("Session: cleanup is now done") } func newSession(ctx *Context, removeFunc func()) (session *Session) { @@ -357,6 +403,7 @@ func newSession(ctx *Context, removeFunc func()) (session *Session) { session.state = SESSION_NEW session.removeFunc = removeFunc session.ctx = *ctx + session.quit = make(chan bool) session.done = make(chan bool) session.timer = time.NewTimer(10 * time.Second) session.cancelIntChan = make(chan bool, 1) @@ -367,6 +414,7 @@ func newSession(ctx *Context, removeFunc func()) (session *Session) { session.cancelChan = make(chan bool, 1) session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10) session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10) + session.attachUploaderChan = make(chan attachUploaderRequest, 1) go session.dispatchRequests() return } |