diff options
Diffstat (limited to 'rhimport/session.go')
-rw-r--r-- | rhimport/session.go | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/rhimport/session.go b/rhimport/session.go new file mode 100644 index 0000000..66705ec --- /dev/null +++ b/rhimport/session.go @@ -0,0 +1,328 @@ +// +// rhimportd +// +// The Radio Helsinki Rivendell Import Daemon +// +// +// Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at> +// +// This file is part of rhimportd. +// +// rhimportd is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// rhimportd is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with rhimportd. If not, see <http://www.gnu.org/licenses/>. +// + +package rhimport + +import ( + "fmt" + "net/http" + "time" +) + +const ( + SESSION_NEW = iota + SESSION_RUNNING + SESSION_CANCELED + SESSION_DONE + SESSION_TIMEOUT +) + +type Session struct { + ctx Context + state int + removeFunc func() + done chan bool + quit chan bool + timer *time.Timer + cancelIntChan chan bool + progressIntChan chan ProgressData + doneIntChan chan Result + runChan chan time.Duration + cancelChan chan bool + addProgressChan chan sessionAddProgressHandlerRequest + addDoneChan chan sessionAddDoneHandlerRequest + progressCBs []*SessionProgressCB + doneCBs []*SessionDoneCB +} + +type SessionProgressCB struct { + cb ProgressCB + userdata interface{} +} + +type SessionDoneCB struct { + cb DoneCB + userdata interface{} +} + +type ProgressData struct { + Step int + StepName string + Progress float64 +} + +type sessionAddProgressHandlerResponse struct { + err error +} + +type sessionAddProgressHandlerRequest struct { + userdata interface{} + callback ProgressCB + response chan<- sessionAddProgressHandlerResponse +} + +type sessionAddDoneHandlerResponse struct { + err error +} + +type sessionAddDoneHandlerRequest struct { + userdata interface{} + callback DoneCB + response chan<- sessionAddDoneHandlerResponse +} + +func sessionProgressCallback(step int, stepName string, progress float64, userdata interface{}) bool { + out := userdata.(chan<- ProgressData) + out <- ProgressData{step, stepName, progress} + return true +} + +func sessionRun(ctx Context, done chan<- Result) { + if err := ctx.SanityCheck(); err != nil { + done <- Result{http.StatusBadRequest, err.Error(), 0, 0} + return + } + + if res, err := FetchFile(&ctx); err != nil { + done <- Result{http.StatusInternalServerError, err.Error(), 0, 0} + return + } else if res.ResponseCode != http.StatusOK { + done <- *res + return + } + + if res, err := ImportFile(&ctx); err != nil { + done <- Result{http.StatusInternalServerError, err.Error(), 0, 0} + return + } else { + done <- *res + return + } +} + +func (self *Session) run(timeout time.Duration) { + self.ctx.ProgressCallBack = sessionProgressCallback + self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan) + self.ctx.Cancel = self.cancelIntChan + go sessionRun(self.ctx, self.doneIntChan) + self.state = SESSION_RUNNING + self.timer.Reset(timeout) + return +} + +func (self *Session) cancel() { + rhdl.Println("Session: canceling running import") + select { + case self.cancelIntChan <- true: + default: // session got canceled already?? + } + self.state = SESSION_CANCELED +} + +func (self *Session) addProgressHandler(userdata interface{}, cb ProgressCB) (resp sessionAddProgressHandlerResponse) { + if self.state != SESSION_NEW && self.state != SESSION_RUNNING { + resp.err = fmt.Errorf("session is already done/canceled") + } + self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata}) + return +} + +func (self *Session) addDoneHandler(userdata interface{}, cb DoneCB) (resp sessionAddDoneHandlerResponse) { + if self.state != SESSION_NEW && self.state != SESSION_RUNNING { + resp.err = fmt.Errorf("session is already done/canceled") + } + self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata}) + return +} + +func (self *Session) callProgressHandler(p *ProgressData) { + for _, cb := range self.progressCBs { + if cb.cb != nil { + if keep := cb.cb(p.Step, p.StepName, p.Progress, cb.userdata); !keep { + cb.cb = nil + } + } + } +} + +func (self *Session) callDoneHandler(r *Result) { + for _, cb := range self.doneCBs { + if cb.cb != nil { + if keep := cb.cb(*r, cb.userdata); !keep { + cb.cb = nil + } + } + } +} + +func (self *Session) dispatchRequests() { + defer func() { self.done <- true }() + for { + select { + case <-self.quit: + if self.state == SESSION_RUNNING { + self.cancel() + } + return + case <-self.timer.C: + if self.state == SESSION_RUNNING { + self.cancel() + } + self.state = SESSION_TIMEOUT + r := &Result{500, "session timed out", 0, 0} + self.callDoneHandler(r) + if self.removeFunc != nil { + self.removeFunc() + } + case t := <-self.runChan: + if self.state == SESSION_NEW { + self.run(t) + } + case <-self.cancelChan: + if self.state == SESSION_RUNNING { + self.cancel() + } + case req := <-self.addProgressChan: + req.response <- self.addProgressHandler(req.userdata, req.callback) + case req := <-self.addDoneChan: + req.response <- self.addDoneHandler(req.userdata, req.callback) + case p := <-self.progressIntChan: + self.callProgressHandler(&p) + case r := <-self.doneIntChan: + if self.state != SESSION_TIMEOUT { + self.timer.Stop() + self.state = SESSION_DONE + self.callDoneHandler(&r) + if self.removeFunc != nil { + self.removeFunc() + } + } + } + } +} + +// ********************************************************* +// Public Interface + +type SessionChan struct { + runChan chan<- time.Duration + cancelChan chan<- bool + addProgressChan chan<- sessionAddProgressHandlerRequest + addDoneChan chan<- sessionAddDoneHandlerRequest +} + +func (self *SessionChan) Run(timeout time.Duration) { + select { + case self.runChan <- timeout: + default: // command is already pending or session is about to be closed/removed + } +} + +func (self *SessionChan) Cancel() { + select { + case self.cancelChan <- true: + default: // cancel is already pending or session is about to be closed/removed + } +} + +func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ProgressCB) error { + resCh := make(chan sessionAddProgressHandlerResponse) + req := sessionAddProgressHandlerRequest{} + req.userdata = userdata + req.callback = cb + req.response = resCh + select { + case self.addProgressChan <- req: + default: + return fmt.Errorf("session is about to be closed/removed") + } + + res := <-resCh + return res.err +} + +func (self *SessionChan) AddDoneHandler(userdata interface{}, cb DoneCB) error { + resCh := make(chan sessionAddDoneHandlerResponse) + req := sessionAddDoneHandlerRequest{} + req.userdata = userdata + req.callback = cb + req.response = resCh + select { + case self.addDoneChan <- req: + default: + return fmt.Errorf("session is about to be closed/removed") + } + + res := <-resCh + return res.err +} + +// ********************************************************* +// Semi-Public Interface (only used by sessionStore) + +func (self *Session) getInterface() *SessionChan { + ch := &SessionChan{} + ch.runChan = self.runChan + ch.cancelChan = self.cancelChan + ch.addProgressChan = self.addProgressChan + ch.addDoneChan = self.addDoneChan + return ch +} + +func (self *Session) cleanup() { + self.quit <- true + rhdl.Printf("waiting for session to close") + <-self.done + close(self.quit) + close(self.done) + self.timer.Stop() + // don't close the channels we give out because this might lead to a panic if + // somebody wites to an already removed session + // close(self.cancelIntChan) + // close(self.progressIntChan) + // close(self.doneIntChan) + // close(self.runChan) + // close(self.cancelChan) + // close(self.addProgressChan) + // close(self.addDoneChan) + rhdl.Printf("session is now cleaned up") +} + +func newSession(ctx *Context, removeFunc func()) (session *Session) { + session = new(Session) + session.state = SESSION_NEW + session.removeFunc = removeFunc + session.ctx = *ctx + session.done = make(chan bool) + session.timer = time.NewTimer(10 * time.Second) + session.cancelIntChan = make(chan bool, 1) + session.progressIntChan = make(chan ProgressData, 10) + session.doneIntChan = make(chan Result, 1) + session.runChan = make(chan time.Duration, 1) + session.cancelChan = make(chan bool, 1) + session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10) + session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10) + go session.dispatchRequests() + return +} |