summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2016-07-22 16:45:12 (GMT)
committerChristian Pointner <equinox@helsinki.at>2016-07-22 16:45:12 (GMT)
commit405e7fa98878a61185d72f4bc5c9dede3f45f336 (patch)
tree6efa0f50f617be52cf4311e84e1f0bf7d0c1cd90
parent928b513a2b2be04babb9d2348ea5e28e1cb8bd7e (diff)
session can now haven an attached uploader .. needs testing
-rw-r--r--rhimport/core.go9
-rw-r--r--rhimport/fetcher.go18
-rw-r--r--rhimport/session.go100
3 files changed, 93 insertions, 34 deletions
diff --git a/rhimport/core.go b/rhimport/core.go
index 9780485..f1bbe16 100644
--- a/rhimport/core.go
+++ b/rhimport/core.go
@@ -126,6 +126,11 @@ func getCBAApiKey() string {
return ""
}
+type AttachmentChunk struct {
+ Data []byte
+ Error error
+}
+
type Context struct {
conf *Config
db *rddb.DBChan
@@ -144,7 +149,7 @@ type Context struct {
AutotrimLevel int
UseMetaData bool
SourceUri string
- AttachmentChan <-chan []byte
+ AttachmentChan chan AttachmentChunk
FetchConverter string
OrigFilename string
Title string
@@ -175,7 +180,7 @@ func NewContext(conf *Config, db *rddb.DBChan) *Context {
ctx.NormalizationLevel = conf.ImportParamDefaults.NormalizationLevel
ctx.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel
ctx.UseMetaData = conf.ImportParamDefaults.UseMetaData
- ctx.AttachmentChan = nil
+ ctx.AttachmentChan = make(chan AttachmentChunk, 32)
ctx.FetchConverter = "ffmpeg-bs1770"
ctx.OrigFilename = ""
ctx.Title = ""
diff --git a/rhimport/fetcher.go b/rhimport/fetcher.go
index 0a72342..f6c8d28 100644
--- a/rhimport/fetcher.go
+++ b/rhimport/fetcher.go
@@ -595,21 +595,27 @@ func writeAttachmentFile(ctx *Context, res *Result, sizeTotal uint64, conv Fetch
res.ResponseCode = http.StatusNoContent
res.ErrorString = "canceled"
return nil
- case data, ok := <-ctx.AttachmentChan:
+ case chunk, ok := <-ctx.AttachmentChan:
if !ok {
- rhl.Printf("receiving attachment '%s' got canceled (channel has been closed)", ctx.SourceFile)
+ rhl.Printf("receiving attachment '%s' got canceled (channel has been closed prematurely)", ctx.SourceFile)
res.ResponseCode = http.StatusNoContent
res.ErrorString = "canceled"
return nil
}
+ if chunk.Error != nil {
+ rhl.Printf("receiving attachment '%s' failed: %v", ctx.SourceFile, chunk.Error)
+ res.ResponseCode = http.StatusInternalServerError
+ res.ErrorString = chunk.Error.Error()
+ return nil
+ }
left := sizeTotal - written
- if int(left) < len(data) {
- rhl.Printf("attachment fetcher: truncating %d byes of extra data", len(data)-int(left))
- data = data[0:left]
+ if int(left) < len(chunk.Data) {
+ rhl.Printf("attachment fetcher: truncating %d byes of extra data", len(chunk.Data)-int(left))
+ chunk.Data = chunk.Data[0:left]
}
- w, err := conv.Write(data)
+ w, err := conv.Write(chunk.Data)
if err != nil {
rhl.Printf("Unable to write to converter(%s): %s", ctx.SourceFile, err)
return err
diff --git a/rhimport/session.go b/rhimport/session.go
index e0d6efb..b8b82ad 100644
--- a/rhimport/session.go
+++ b/rhimport/session.go
@@ -38,25 +38,6 @@ const (
SESSION_TIMEOUT
)
-type Session struct {
- ctx Context
- state int
- removeFunc func()
- done chan bool
- quit chan bool
- timer *time.Timer
- cancelIntChan chan bool
- progressRateLimit time.Duration
- progressIntChan chan ProgressData
- doneIntChan chan Result
- runChan chan time.Duration
- cancelChan chan bool
- addProgressChan chan sessionAddProgressHandlerRequest
- addDoneChan chan sessionAddDoneHandlerRequest
- progressCBs []*SessionProgressCB
- doneCBs []*SessionDoneCB
-}
-
type SessionProgressCB struct {
cb ProgressCB
userdata interface{}
@@ -97,6 +78,36 @@ type sessionAddDoneHandlerRequest struct {
response chan<- sessionAddDoneHandlerResponse
}
+type attachUploaderResponse struct {
+ cancel <-chan bool
+ attachment chan<- AttachmentChunk
+}
+
+type attachUploaderRequest struct {
+ response chan<- attachUploaderResponse
+}
+
+type Session struct {
+ ctx Context
+ state int
+ removeFunc func()
+ done chan bool
+ quit chan bool
+ timer *time.Timer
+ cancelIntChan chan bool
+ progressRateLimit time.Duration
+ progressIntChan chan ProgressData
+ doneIntChan chan Result
+ runChan chan time.Duration
+ cancelChan chan bool
+ addProgressChan chan sessionAddProgressHandlerRequest
+ addDoneChan chan sessionAddDoneHandlerRequest
+ attachUploaderChan chan attachUploaderRequest
+ progressCBs []*SessionProgressCB
+ doneCBs []*SessionDoneCB
+ cancelUploader chan bool
+}
+
func sessionProgressCallback(step int, stepName string, current, total float64, title string, cart, cut uint, userdata interface{}) bool {
out := userdata.(chan<- ProgressData)
out <- ProgressData{step, stepName, current, total, title, cart, cut}
@@ -196,8 +207,23 @@ func (self *Session) callDoneHandler(r *Result) {
}
}
+func (self *Session) attachUploader() (resp attachUploaderResponse) {
+ if self.cancelUploader != nil {
+ return
+ }
+ self.cancelUploader = make(chan bool, 1)
+ resp.cancel = self.cancelUploader
+ resp.attachment = self.ctx.AttachmentChan
+ return
+}
+
func (self *Session) dispatchRequests() {
- defer func() { self.done <- true }()
+ defer func() {
+ if self.cancelUploader != nil {
+ close(self.cancelUploader)
+ }
+ self.done <- true
+ }()
var lastProgress *ProgressData
progressPending := 0
@@ -261,6 +287,8 @@ func (self *Session) dispatchRequests() {
self.removeFunc()
}
}
+ case req := <-self.attachUploaderChan:
+ req.response <- self.attachUploader()
}
}
}
@@ -269,10 +297,11 @@ func (self *Session) dispatchRequests() {
// Public Interface
type SessionChan struct {
- runChan chan<- time.Duration
- cancelChan chan<- bool
- addProgressChan chan<- sessionAddProgressHandlerRequest
- addDoneChan chan<- sessionAddDoneHandlerRequest
+ runChan chan<- time.Duration
+ cancelChan chan<- bool
+ addProgressChan chan<- sessionAddProgressHandlerRequest
+ addDoneChan chan<- sessionAddDoneHandlerRequest
+ attachUploaderChan chan<- attachUploaderRequest
}
func (self *SessionChan) Run(timeout time.Duration) {
@@ -321,6 +350,21 @@ func (self *SessionChan) AddDoneHandler(userdata interface{}, cb DoneCB) error {
return res.err
}
+func (self *SessionChan) AttachUploader() (<-chan bool, chan<- AttachmentChunk) {
+ resCh := make(chan attachUploaderResponse)
+ req := attachUploaderRequest{}
+ req.response = resCh
+ select {
+ case self.attachUploaderChan <- req:
+ default:
+ // session is about to be closed/removed
+ return nil, nil
+ }
+
+ res := <-resCh
+ return res.cancel, res.attachment
+}
+
// *********************************************************
// Semi-Public Interface (only used by sessionStore)
@@ -330,12 +374,13 @@ func (self *Session) getInterface() *SessionChan {
ch.cancelChan = self.cancelChan
ch.addProgressChan = self.addProgressChan
ch.addDoneChan = self.addDoneChan
+ ch.attachUploaderChan = self.attachUploaderChan
return ch
}
func (self *Session) cleanup() {
self.quit <- true
- rhdl.Printf("waiting for session to close")
+ rhdl.Printf("Session: waiting for session to close")
<-self.done
close(self.quit)
close(self.done)
@@ -349,7 +394,8 @@ func (self *Session) cleanup() {
// close(self.cancelChan)
// close(self.addProgressChan)
// close(self.addDoneChan)
- rhdl.Printf("session is now cleaned up")
+ // close(self.attachUploader)
+ rhdl.Printf("Session: cleanup is now done")
}
func newSession(ctx *Context, removeFunc func()) (session *Session) {
@@ -357,6 +403,7 @@ func newSession(ctx *Context, removeFunc func()) (session *Session) {
session.state = SESSION_NEW
session.removeFunc = removeFunc
session.ctx = *ctx
+ session.quit = make(chan bool)
session.done = make(chan bool)
session.timer = time.NewTimer(10 * time.Second)
session.cancelIntChan = make(chan bool, 1)
@@ -367,6 +414,7 @@ func newSession(ctx *Context, removeFunc func()) (session *Session) {
session.cancelChan = make(chan bool, 1)
session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10)
session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10)
+ session.attachUploaderChan = make(chan attachUploaderRequest, 1)
go session.dispatchRequests()
return
}