summaryrefslogtreecommitdiff
path: root/rhimport/session.go
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2016-01-08 01:06:30 (GMT)
committerChristian Pointner <equinox@helsinki.at>2016-01-08 01:06:30 (GMT)
commite88ef9d360843541bd348b35099dda1b15c6c896 (patch)
treed53b90301fbf535d076a56b10f44851c0572ec82 /rhimport/session.go
parent9cd0b1783c0c90c68c4840b5d317e9135e07774e (diff)
prepare export rhimport package to rhrd-go repo
Diffstat (limited to 'rhimport/session.go')
-rw-r--r--rhimport/session.go328
1 files changed, 328 insertions, 0 deletions
diff --git a/rhimport/session.go b/rhimport/session.go
new file mode 100644
index 0000000..66705ec
--- /dev/null
+++ b/rhimport/session.go
@@ -0,0 +1,328 @@
+//
+// rhimportd
+//
+// The Radio Helsinki Rivendell Import Daemon
+//
+//
+// Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at>
+//
+// This file is part of rhimportd.
+//
+// rhimportd is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// any later version.
+//
+// rhimportd is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with rhimportd. If not, see <http://www.gnu.org/licenses/>.
+//
+
+package rhimport
+
+import (
+ "fmt"
+ "net/http"
+ "time"
+)
+
+const (
+ SESSION_NEW = iota
+ SESSION_RUNNING
+ SESSION_CANCELED
+ SESSION_DONE
+ SESSION_TIMEOUT
+)
+
+type Session struct {
+ ctx Context
+ state int
+ removeFunc func()
+ done chan bool
+ quit chan bool
+ timer *time.Timer
+ cancelIntChan chan bool
+ progressIntChan chan ProgressData
+ doneIntChan chan Result
+ runChan chan time.Duration
+ cancelChan chan bool
+ addProgressChan chan sessionAddProgressHandlerRequest
+ addDoneChan chan sessionAddDoneHandlerRequest
+ progressCBs []*SessionProgressCB
+ doneCBs []*SessionDoneCB
+}
+
+type SessionProgressCB struct {
+ cb ProgressCB
+ userdata interface{}
+}
+
+type SessionDoneCB struct {
+ cb DoneCB
+ userdata interface{}
+}
+
+type ProgressData struct {
+ Step int
+ StepName string
+ Progress float64
+}
+
+type sessionAddProgressHandlerResponse struct {
+ err error
+}
+
+type sessionAddProgressHandlerRequest struct {
+ userdata interface{}
+ callback ProgressCB
+ response chan<- sessionAddProgressHandlerResponse
+}
+
+type sessionAddDoneHandlerResponse struct {
+ err error
+}
+
+type sessionAddDoneHandlerRequest struct {
+ userdata interface{}
+ callback DoneCB
+ response chan<- sessionAddDoneHandlerResponse
+}
+
+func sessionProgressCallback(step int, stepName string, progress float64, userdata interface{}) bool {
+ out := userdata.(chan<- ProgressData)
+ out <- ProgressData{step, stepName, progress}
+ return true
+}
+
+func sessionRun(ctx Context, done chan<- Result) {
+ if err := ctx.SanityCheck(); err != nil {
+ done <- Result{http.StatusBadRequest, err.Error(), 0, 0}
+ return
+ }
+
+ if res, err := FetchFile(&ctx); err != nil {
+ done <- Result{http.StatusInternalServerError, err.Error(), 0, 0}
+ return
+ } else if res.ResponseCode != http.StatusOK {
+ done <- *res
+ return
+ }
+
+ if res, err := ImportFile(&ctx); err != nil {
+ done <- Result{http.StatusInternalServerError, err.Error(), 0, 0}
+ return
+ } else {
+ done <- *res
+ return
+ }
+}
+
+func (self *Session) run(timeout time.Duration) {
+ self.ctx.ProgressCallBack = sessionProgressCallback
+ self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan)
+ self.ctx.Cancel = self.cancelIntChan
+ go sessionRun(self.ctx, self.doneIntChan)
+ self.state = SESSION_RUNNING
+ self.timer.Reset(timeout)
+ return
+}
+
+func (self *Session) cancel() {
+ rhdl.Println("Session: canceling running import")
+ select {
+ case self.cancelIntChan <- true:
+ default: // session got canceled already??
+ }
+ self.state = SESSION_CANCELED
+}
+
+func (self *Session) addProgressHandler(userdata interface{}, cb ProgressCB) (resp sessionAddProgressHandlerResponse) {
+ if self.state != SESSION_NEW && self.state != SESSION_RUNNING {
+ resp.err = fmt.Errorf("session is already done/canceled")
+ }
+ self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata})
+ return
+}
+
+func (self *Session) addDoneHandler(userdata interface{}, cb DoneCB) (resp sessionAddDoneHandlerResponse) {
+ if self.state != SESSION_NEW && self.state != SESSION_RUNNING {
+ resp.err = fmt.Errorf("session is already done/canceled")
+ }
+ self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata})
+ return
+}
+
+func (self *Session) callProgressHandler(p *ProgressData) {
+ for _, cb := range self.progressCBs {
+ if cb.cb != nil {
+ if keep := cb.cb(p.Step, p.StepName, p.Progress, cb.userdata); !keep {
+ cb.cb = nil
+ }
+ }
+ }
+}
+
+func (self *Session) callDoneHandler(r *Result) {
+ for _, cb := range self.doneCBs {
+ if cb.cb != nil {
+ if keep := cb.cb(*r, cb.userdata); !keep {
+ cb.cb = nil
+ }
+ }
+ }
+}
+
+func (self *Session) dispatchRequests() {
+ defer func() { self.done <- true }()
+ for {
+ select {
+ case <-self.quit:
+ if self.state == SESSION_RUNNING {
+ self.cancel()
+ }
+ return
+ case <-self.timer.C:
+ if self.state == SESSION_RUNNING {
+ self.cancel()
+ }
+ self.state = SESSION_TIMEOUT
+ r := &Result{500, "session timed out", 0, 0}
+ self.callDoneHandler(r)
+ if self.removeFunc != nil {
+ self.removeFunc()
+ }
+ case t := <-self.runChan:
+ if self.state == SESSION_NEW {
+ self.run(t)
+ }
+ case <-self.cancelChan:
+ if self.state == SESSION_RUNNING {
+ self.cancel()
+ }
+ case req := <-self.addProgressChan:
+ req.response <- self.addProgressHandler(req.userdata, req.callback)
+ case req := <-self.addDoneChan:
+ req.response <- self.addDoneHandler(req.userdata, req.callback)
+ case p := <-self.progressIntChan:
+ self.callProgressHandler(&p)
+ case r := <-self.doneIntChan:
+ if self.state != SESSION_TIMEOUT {
+ self.timer.Stop()
+ self.state = SESSION_DONE
+ self.callDoneHandler(&r)
+ if self.removeFunc != nil {
+ self.removeFunc()
+ }
+ }
+ }
+ }
+}
+
+// *********************************************************
+// Public Interface
+
+type SessionChan struct {
+ runChan chan<- time.Duration
+ cancelChan chan<- bool
+ addProgressChan chan<- sessionAddProgressHandlerRequest
+ addDoneChan chan<- sessionAddDoneHandlerRequest
+}
+
+func (self *SessionChan) Run(timeout time.Duration) {
+ select {
+ case self.runChan <- timeout:
+ default: // command is already pending or session is about to be closed/removed
+ }
+}
+
+func (self *SessionChan) Cancel() {
+ select {
+ case self.cancelChan <- true:
+ default: // cancel is already pending or session is about to be closed/removed
+ }
+}
+
+func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ProgressCB) error {
+ resCh := make(chan sessionAddProgressHandlerResponse)
+ req := sessionAddProgressHandlerRequest{}
+ req.userdata = userdata
+ req.callback = cb
+ req.response = resCh
+ select {
+ case self.addProgressChan <- req:
+ default:
+ return fmt.Errorf("session is about to be closed/removed")
+ }
+
+ res := <-resCh
+ return res.err
+}
+
+func (self *SessionChan) AddDoneHandler(userdata interface{}, cb DoneCB) error {
+ resCh := make(chan sessionAddDoneHandlerResponse)
+ req := sessionAddDoneHandlerRequest{}
+ req.userdata = userdata
+ req.callback = cb
+ req.response = resCh
+ select {
+ case self.addDoneChan <- req:
+ default:
+ return fmt.Errorf("session is about to be closed/removed")
+ }
+
+ res := <-resCh
+ return res.err
+}
+
+// *********************************************************
+// Semi-Public Interface (only used by sessionStore)
+
+func (self *Session) getInterface() *SessionChan {
+ ch := &SessionChan{}
+ ch.runChan = self.runChan
+ ch.cancelChan = self.cancelChan
+ ch.addProgressChan = self.addProgressChan
+ ch.addDoneChan = self.addDoneChan
+ return ch
+}
+
+func (self *Session) cleanup() {
+ self.quit <- true
+ rhdl.Printf("waiting for session to close")
+ <-self.done
+ close(self.quit)
+ close(self.done)
+ self.timer.Stop()
+ // don't close the channels we give out because this might lead to a panic if
+ // somebody wites to an already removed session
+ // close(self.cancelIntChan)
+ // close(self.progressIntChan)
+ // close(self.doneIntChan)
+ // close(self.runChan)
+ // close(self.cancelChan)
+ // close(self.addProgressChan)
+ // close(self.addDoneChan)
+ rhdl.Printf("session is now cleaned up")
+}
+
+func newSession(ctx *Context, removeFunc func()) (session *Session) {
+ session = new(Session)
+ session.state = SESSION_NEW
+ session.removeFunc = removeFunc
+ session.ctx = *ctx
+ session.done = make(chan bool)
+ session.timer = time.NewTimer(10 * time.Second)
+ session.cancelIntChan = make(chan bool, 1)
+ session.progressIntChan = make(chan ProgressData, 10)
+ session.doneIntChan = make(chan Result, 1)
+ session.runChan = make(chan time.Duration, 1)
+ session.cancelChan = make(chan bool, 1)
+ session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10)
+ session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10)
+ go session.dispatchRequests()
+ return
+}