// // rhimportd // // The Radio Helsinki Rivendell Import Daemon // // // Copyright (C) 2015-2016 Christian Pointner // // This file is part of rhimportd. // // rhimportd is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // rhimportd is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with rhimportd. If not, see . // package rhimport import ( "fmt" "net/http" "time" ) const ( _SESSION_NEW = iota _SESSION_RUNNING _SESSION_CANCELED _SESSION_DONE _SESSION_TIMEOUT ) type sessionProgressCB struct { cb ProgressCB userdata interface{} } type sessionDoneCB struct { cb DoneCB userdata interface{} } type sessionAddProgressHandlerResponse struct { err error } type sessionAddProgressHandlerRequest struct { userdata interface{} callback ProgressCB response chan<- sessionAddProgressHandlerResponse } type sessionAddDoneHandlerResponse struct { err error } type sessionAddDoneHandlerRequest struct { userdata interface{} callback DoneCB response chan<- sessionAddDoneHandlerResponse } type attachUploaderResponse struct { cancel <-chan bool attachment chan<- AttachmentChunk } type attachUploaderRequest struct { response chan<- attachUploaderResponse } type session struct { ctx Context state int removeFunc func() done chan bool quit chan bool timer *time.Timer cancelIntChan chan bool progressRateLimit time.Duration progressIntChan chan ProgressData doneIntChan chan Result runChan chan time.Duration cancelChan chan bool addProgressChan chan sessionAddProgressHandlerRequest addDoneChan chan sessionAddDoneHandlerRequest attachUploaderChan chan attachUploaderRequest progressCBs []*sessionProgressCB doneCBs []*sessionDoneCB cancelUploader chan bool } func sessionProgressCallback(step int, stepName string, current, total float64, title string, cart, cut uint, userdata interface{}) bool { out := userdata.(chan<- ProgressData) out <- ProgressData{step, stepName, current, total, title, cart, cut} return true } func sessionRun(ctx Context, done chan<- Result) { err := ctx.SanityCheck() if err != nil { done <- Result{ResponseCode: http.StatusBadRequest, ErrorString: err.Error()} return } var res *Result res, err = FetchFile(&ctx) if err != nil { done <- Result{ResponseCode: http.StatusInternalServerError, ErrorString: err.Error()} return } if res.ResponseCode != http.StatusOK { done <- *res return } if res, err = NormalizeFile(&ctx); err != nil { done <- Result{ResponseCode: http.StatusInternalServerError, ErrorString: err.Error()} return } if res.ResponseCode != http.StatusOK { done <- *res return } if res, err = ImportFile(&ctx); err != nil { res.ResponseCode = http.StatusInternalServerError res.ErrorString = err.Error() } done <- *res } func (s *session) run(timeout time.Duration) { s.ctx.ProgressCallBack = sessionProgressCallback s.ctx.ProgressCallBackData = (chan<- ProgressData)(s.progressIntChan) s.ctx.Cancel = s.cancelIntChan go sessionRun(s.ctx, s.doneIntChan) s.state = _SESSION_RUNNING if timeout > 3*time.Hour { s.ctx.stdlog.Printf("requested session timeout (%v) is to high - lowering to 3h", timeout) timeout = 3 * time.Hour } s.timer.Reset(timeout) return } func (s *session) cancel() { s.ctx.dbglog.Println("Session: canceling running import") select { case s.cancelIntChan <- true: default: // session got canceled already?? } s.state = _SESSION_CANCELED } func (s *session) addProgressHandler(userdata interface{}, cb ProgressCB) (resp sessionAddProgressHandlerResponse) { if s.state != _SESSION_NEW && s.state != _SESSION_RUNNING { resp.err = fmt.Errorf("session is already done/canceled") } s.progressCBs = append(s.progressCBs, &sessionProgressCB{cb, userdata}) return } func (s *session) addDoneHandler(userdata interface{}, cb DoneCB) (resp sessionAddDoneHandlerResponse) { if s.state != _SESSION_NEW && s.state != _SESSION_RUNNING { resp.err = fmt.Errorf("session is already done/canceled") } s.doneCBs = append(s.doneCBs, &sessionDoneCB{cb, userdata}) return } func (s *session) callProgressHandler(p *ProgressData) { for _, cb := range s.progressCBs { if cb.cb != nil { if keep := cb.cb(p.Step, p.StepName, p.Current, p.Total, p.Title, p.Cart, p.Cut, cb.userdata); !keep { cb.cb = nil } } } } func (s *session) callDoneHandler(r *Result) { for _, cb := range s.doneCBs { if cb.cb != nil { if keep := cb.cb(*r, cb.userdata); !keep { cb.cb = nil } } } } func (s *session) attachUploader() (resp attachUploaderResponse) { if s.cancelUploader != nil { return } s.cancelUploader = make(chan bool, 1) resp.cancel = s.cancelUploader resp.attachment = s.ctx.AttachmentChan return } func (s *session) dispatchRequests() { defer func() { if s.cancelUploader != nil { close(s.cancelUploader) } s.done <- true }() var lastProgress *ProgressData progressPending := 0 pt := time.NewTimer(s.progressRateLimit) pt.Stop() for { select { case <-s.quit: if s.state == _SESSION_RUNNING { s.cancel() } return case <-s.timer.C: if s.state == _SESSION_RUNNING { s.cancel() } s.state = _SESSION_TIMEOUT r := &Result{ResponseCode: http.StatusInternalServerError, ErrorString: "session timed out"} s.callDoneHandler(r) if s.removeFunc != nil { s.removeFunc() } case t := <-s.runChan: if s.state == _SESSION_NEW { s.run(t) } case <-s.cancelChan: if s.state == _SESSION_RUNNING { s.cancel() } case req := <-s.addProgressChan: req.response <- s.addProgressHandler(req.userdata, req.callback) case req := <-s.addDoneChan: req.response <- s.addDoneHandler(req.userdata, req.callback) case <-pt.C: if progressPending > 1 && lastProgress != nil { s.callProgressHandler(lastProgress) } progressPending = 0 lastProgress = nil case p := <-s.progressIntChan: if s.state == _SESSION_RUNNING { if lastProgress == nil { s.callProgressHandler(&p) pt.Reset(s.progressRateLimit) } else if lastProgress.Step != p.Step { s.callProgressHandler(lastProgress) s.callProgressHandler(&p) pt.Reset(s.progressRateLimit) } lastProgress = &p progressPending++ } case r := <-s.doneIntChan: if s.state != _SESSION_TIMEOUT { s.timer.Stop() s.state = _SESSION_DONE s.callDoneHandler(&r) if s.removeFunc != nil { s.removeFunc() } } case req := <-s.attachUploaderChan: req.response <- s.attachUploader() } } } // ********************************************************* // Public Interface type Session struct { runChan chan<- time.Duration cancelChan chan<- bool addProgressChan chan<- sessionAddProgressHandlerRequest addDoneChan chan<- sessionAddDoneHandlerRequest attachUploaderChan chan<- attachUploaderRequest } func (s *Session) Run(timeout time.Duration) { select { case s.runChan <- timeout: default: // command is already pending or session is about to be closed/removed } } func (s *Session) Cancel() { select { case s.cancelChan <- true: default: // cancel is already pending or session is about to be closed/removed } } func (s *Session) AddProgressHandler(userdata interface{}, cb ProgressCB) error { resCh := make(chan sessionAddProgressHandlerResponse) req := sessionAddProgressHandlerRequest{} req.userdata = userdata req.callback = cb req.response = resCh select { case s.addProgressChan <- req: default: return fmt.Errorf("session is about to be closed/removed") } res := <-resCh return res.err } func (s *Session) AddDoneHandler(userdata interface{}, cb DoneCB) error { resCh := make(chan sessionAddDoneHandlerResponse) req := sessionAddDoneHandlerRequest{} req.userdata = userdata req.callback = cb req.response = resCh select { case s.addDoneChan <- req: default: return fmt.Errorf("session is about to be closed/removed") } res := <-resCh return res.err } func (s *Session) AttachUploader() (<-chan bool, chan<- AttachmentChunk) { resCh := make(chan attachUploaderResponse) req := attachUploaderRequest{} req.response = resCh select { case s.attachUploaderChan <- req: default: // session is about to be closed/removed return nil, nil } res := <-resCh return res.cancel, res.attachment } // ********************************************************* // Semi-Public Interface (only used by sessionStore) func (s *session) getInterface() *Session { ch := &Session{} ch.runChan = s.runChan ch.cancelChan = s.cancelChan ch.addProgressChan = s.addProgressChan ch.addDoneChan = s.addDoneChan ch.attachUploaderChan = s.attachUploaderChan return ch } func (s *session) cleanup() { s.quit <- true s.ctx.dbglog.Printf("Session: waiting for session to close") <-s.done close(s.quit) close(s.done) s.timer.Stop() // don't close the channels we give out because this might lead to a panic if // somebody wites to an already removed session // close(s.cancelIntChan) // close(s.progressIntChan) // close(s.doneIntChan) // close(s.runChan) // close(s.cancelChan) // close(s.addProgressChan) // close(s.addDoneChan) // close(s.attachUploader) s.ctx.dbglog.Printf("Session: cleanup is now done") } func newSession(ctx *Context, removeFunc func()) (s *session) { s = &session{} s.state = _SESSION_NEW s.removeFunc = removeFunc s.ctx = *ctx s.quit = make(chan bool, 1) s.done = make(chan bool) s.timer = time.NewTimer(10 * time.Second) s.cancelIntChan = make(chan bool, 1) s.progressRateLimit = 100 * time.Millisecond // TODO: hardcoded value s.progressIntChan = make(chan ProgressData, 10) s.doneIntChan = make(chan Result, 1) s.runChan = make(chan time.Duration, 1) s.cancelChan = make(chan bool, 1) s.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10) s.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10) s.attachUploaderChan = make(chan attachUploaderRequest, 1) go s.dispatchRequests() return }