diff options
Diffstat (limited to 'src/helsinki.at/rhimport/session.go')
-rw-r--r-- | src/helsinki.at/rhimport/session.go | 328 |
1 files changed, 0 insertions, 328 deletions
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go deleted file mode 100644 index 66705ec..0000000 --- a/src/helsinki.at/rhimport/session.go +++ /dev/null @@ -1,328 +0,0 @@ -// -// rhimportd -// -// The Radio Helsinki Rivendell Import Daemon -// -// -// Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at> -// -// This file is part of rhimportd. -// -// rhimportd is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// any later version. -// -// rhimportd is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with rhimportd. If not, see <http://www.gnu.org/licenses/>. -// - -package rhimport - -import ( - "fmt" - "net/http" - "time" -) - -const ( - SESSION_NEW = iota - SESSION_RUNNING - SESSION_CANCELED - SESSION_DONE - SESSION_TIMEOUT -) - -type Session struct { - ctx Context - state int - removeFunc func() - done chan bool - quit chan bool - timer *time.Timer - cancelIntChan chan bool - progressIntChan chan ProgressData - doneIntChan chan Result - runChan chan time.Duration - cancelChan chan bool - addProgressChan chan sessionAddProgressHandlerRequest - addDoneChan chan sessionAddDoneHandlerRequest - progressCBs []*SessionProgressCB - doneCBs []*SessionDoneCB -} - -type SessionProgressCB struct { - cb ProgressCB - userdata interface{} -} - -type SessionDoneCB struct { - cb DoneCB - userdata interface{} -} - -type ProgressData struct { - Step int - StepName string - Progress float64 -} - -type sessionAddProgressHandlerResponse struct { - err error -} - -type sessionAddProgressHandlerRequest struct { - userdata interface{} - callback ProgressCB - response chan<- sessionAddProgressHandlerResponse -} - -type sessionAddDoneHandlerResponse struct { - err error -} - -type sessionAddDoneHandlerRequest struct { - userdata interface{} - callback DoneCB - response chan<- sessionAddDoneHandlerResponse -} - -func sessionProgressCallback(step int, stepName string, progress float64, userdata interface{}) bool { - out := userdata.(chan<- ProgressData) - out <- ProgressData{step, stepName, progress} - return true -} - -func sessionRun(ctx Context, done chan<- Result) { - if err := ctx.SanityCheck(); err != nil { - done <- Result{http.StatusBadRequest, err.Error(), 0, 0} - return - } - - if res, err := FetchFile(&ctx); err != nil { - done <- Result{http.StatusInternalServerError, err.Error(), 0, 0} - return - } else if res.ResponseCode != http.StatusOK { - done <- *res - return - } - - if res, err := ImportFile(&ctx); err != nil { - done <- Result{http.StatusInternalServerError, err.Error(), 0, 0} - return - } else { - done <- *res - return - } -} - -func (self *Session) run(timeout time.Duration) { - self.ctx.ProgressCallBack = sessionProgressCallback - self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan) - self.ctx.Cancel = self.cancelIntChan - go sessionRun(self.ctx, self.doneIntChan) - self.state = SESSION_RUNNING - self.timer.Reset(timeout) - return -} - -func (self *Session) cancel() { - rhdl.Println("Session: canceling running import") - select { - case self.cancelIntChan <- true: - default: // session got canceled already?? - } - self.state = SESSION_CANCELED -} - -func (self *Session) addProgressHandler(userdata interface{}, cb ProgressCB) (resp sessionAddProgressHandlerResponse) { - if self.state != SESSION_NEW && self.state != SESSION_RUNNING { - resp.err = fmt.Errorf("session is already done/canceled") - } - self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata}) - return -} - -func (self *Session) addDoneHandler(userdata interface{}, cb DoneCB) (resp sessionAddDoneHandlerResponse) { - if self.state != SESSION_NEW && self.state != SESSION_RUNNING { - resp.err = fmt.Errorf("session is already done/canceled") - } - self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata}) - return -} - -func (self *Session) callProgressHandler(p *ProgressData) { - for _, cb := range self.progressCBs { - if cb.cb != nil { - if keep := cb.cb(p.Step, p.StepName, p.Progress, cb.userdata); !keep { - cb.cb = nil - } - } - } -} - -func (self *Session) callDoneHandler(r *Result) { - for _, cb := range self.doneCBs { - if cb.cb != nil { - if keep := cb.cb(*r, cb.userdata); !keep { - cb.cb = nil - } - } - } -} - -func (self *Session) dispatchRequests() { - defer func() { self.done <- true }() - for { - select { - case <-self.quit: - if self.state == SESSION_RUNNING { - self.cancel() - } - return - case <-self.timer.C: - if self.state == SESSION_RUNNING { - self.cancel() - } - self.state = SESSION_TIMEOUT - r := &Result{500, "session timed out", 0, 0} - self.callDoneHandler(r) - if self.removeFunc != nil { - self.removeFunc() - } - case t := <-self.runChan: - if self.state == SESSION_NEW { - self.run(t) - } - case <-self.cancelChan: - if self.state == SESSION_RUNNING { - self.cancel() - } - case req := <-self.addProgressChan: - req.response <- self.addProgressHandler(req.userdata, req.callback) - case req := <-self.addDoneChan: - req.response <- self.addDoneHandler(req.userdata, req.callback) - case p := <-self.progressIntChan: - self.callProgressHandler(&p) - case r := <-self.doneIntChan: - if self.state != SESSION_TIMEOUT { - self.timer.Stop() - self.state = SESSION_DONE - self.callDoneHandler(&r) - if self.removeFunc != nil { - self.removeFunc() - } - } - } - } -} - -// ********************************************************* -// Public Interface - -type SessionChan struct { - runChan chan<- time.Duration - cancelChan chan<- bool - addProgressChan chan<- sessionAddProgressHandlerRequest - addDoneChan chan<- sessionAddDoneHandlerRequest -} - -func (self *SessionChan) Run(timeout time.Duration) { - select { - case self.runChan <- timeout: - default: // command is already pending or session is about to be closed/removed - } -} - -func (self *SessionChan) Cancel() { - select { - case self.cancelChan <- true: - default: // cancel is already pending or session is about to be closed/removed - } -} - -func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ProgressCB) error { - resCh := make(chan sessionAddProgressHandlerResponse) - req := sessionAddProgressHandlerRequest{} - req.userdata = userdata - req.callback = cb - req.response = resCh - select { - case self.addProgressChan <- req: - default: - return fmt.Errorf("session is about to be closed/removed") - } - - res := <-resCh - return res.err -} - -func (self *SessionChan) AddDoneHandler(userdata interface{}, cb DoneCB) error { - resCh := make(chan sessionAddDoneHandlerResponse) - req := sessionAddDoneHandlerRequest{} - req.userdata = userdata - req.callback = cb - req.response = resCh - select { - case self.addDoneChan <- req: - default: - return fmt.Errorf("session is about to be closed/removed") - } - - res := <-resCh - return res.err -} - -// ********************************************************* -// Semi-Public Interface (only used by sessionStore) - -func (self *Session) getInterface() *SessionChan { - ch := &SessionChan{} - ch.runChan = self.runChan - ch.cancelChan = self.cancelChan - ch.addProgressChan = self.addProgressChan - ch.addDoneChan = self.addDoneChan - return ch -} - -func (self *Session) cleanup() { - self.quit <- true - rhdl.Printf("waiting for session to close") - <-self.done - close(self.quit) - close(self.done) - self.timer.Stop() - // don't close the channels we give out because this might lead to a panic if - // somebody wites to an already removed session - // close(self.cancelIntChan) - // close(self.progressIntChan) - // close(self.doneIntChan) - // close(self.runChan) - // close(self.cancelChan) - // close(self.addProgressChan) - // close(self.addDoneChan) - rhdl.Printf("session is now cleaned up") -} - -func newSession(ctx *Context, removeFunc func()) (session *Session) { - session = new(Session) - session.state = SESSION_NEW - session.removeFunc = removeFunc - session.ctx = *ctx - session.done = make(chan bool) - session.timer = time.NewTimer(10 * time.Second) - session.cancelIntChan = make(chan bool, 1) - session.progressIntChan = make(chan ProgressData, 10) - session.doneIntChan = make(chan Result, 1) - session.runChan = make(chan time.Duration, 1) - session.cancelChan = make(chan bool, 1) - session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10) - session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10) - go session.dispatchRequests() - return -} |