summaryrefslogtreecommitdiff
path: root/src/helsinki.at/rhimport/session.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/helsinki.at/rhimport/session.go')
-rw-r--r--src/helsinki.at/rhimport/session.go328
1 files changed, 0 insertions, 328 deletions
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go
deleted file mode 100644
index 66705ec..0000000
--- a/src/helsinki.at/rhimport/session.go
+++ /dev/null
@@ -1,328 +0,0 @@
-//
-// rhimportd
-//
-// The Radio Helsinki Rivendell Import Daemon
-//
-//
-// Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at>
-//
-// This file is part of rhimportd.
-//
-// rhimportd is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// any later version.
-//
-// rhimportd is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with rhimportd. If not, see <http://www.gnu.org/licenses/>.
-//
-
-package rhimport
-
-import (
- "fmt"
- "net/http"
- "time"
-)
-
-const (
- SESSION_NEW = iota
- SESSION_RUNNING
- SESSION_CANCELED
- SESSION_DONE
- SESSION_TIMEOUT
-)
-
-type Session struct {
- ctx Context
- state int
- removeFunc func()
- done chan bool
- quit chan bool
- timer *time.Timer
- cancelIntChan chan bool
- progressIntChan chan ProgressData
- doneIntChan chan Result
- runChan chan time.Duration
- cancelChan chan bool
- addProgressChan chan sessionAddProgressHandlerRequest
- addDoneChan chan sessionAddDoneHandlerRequest
- progressCBs []*SessionProgressCB
- doneCBs []*SessionDoneCB
-}
-
-type SessionProgressCB struct {
- cb ProgressCB
- userdata interface{}
-}
-
-type SessionDoneCB struct {
- cb DoneCB
- userdata interface{}
-}
-
-type ProgressData struct {
- Step int
- StepName string
- Progress float64
-}
-
-type sessionAddProgressHandlerResponse struct {
- err error
-}
-
-type sessionAddProgressHandlerRequest struct {
- userdata interface{}
- callback ProgressCB
- response chan<- sessionAddProgressHandlerResponse
-}
-
-type sessionAddDoneHandlerResponse struct {
- err error
-}
-
-type sessionAddDoneHandlerRequest struct {
- userdata interface{}
- callback DoneCB
- response chan<- sessionAddDoneHandlerResponse
-}
-
-func sessionProgressCallback(step int, stepName string, progress float64, userdata interface{}) bool {
- out := userdata.(chan<- ProgressData)
- out <- ProgressData{step, stepName, progress}
- return true
-}
-
-func sessionRun(ctx Context, done chan<- Result) {
- if err := ctx.SanityCheck(); err != nil {
- done <- Result{http.StatusBadRequest, err.Error(), 0, 0}
- return
- }
-
- if res, err := FetchFile(&ctx); err != nil {
- done <- Result{http.StatusInternalServerError, err.Error(), 0, 0}
- return
- } else if res.ResponseCode != http.StatusOK {
- done <- *res
- return
- }
-
- if res, err := ImportFile(&ctx); err != nil {
- done <- Result{http.StatusInternalServerError, err.Error(), 0, 0}
- return
- } else {
- done <- *res
- return
- }
-}
-
-func (self *Session) run(timeout time.Duration) {
- self.ctx.ProgressCallBack = sessionProgressCallback
- self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan)
- self.ctx.Cancel = self.cancelIntChan
- go sessionRun(self.ctx, self.doneIntChan)
- self.state = SESSION_RUNNING
- self.timer.Reset(timeout)
- return
-}
-
-func (self *Session) cancel() {
- rhdl.Println("Session: canceling running import")
- select {
- case self.cancelIntChan <- true:
- default: // session got canceled already??
- }
- self.state = SESSION_CANCELED
-}
-
-func (self *Session) addProgressHandler(userdata interface{}, cb ProgressCB) (resp sessionAddProgressHandlerResponse) {
- if self.state != SESSION_NEW && self.state != SESSION_RUNNING {
- resp.err = fmt.Errorf("session is already done/canceled")
- }
- self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata})
- return
-}
-
-func (self *Session) addDoneHandler(userdata interface{}, cb DoneCB) (resp sessionAddDoneHandlerResponse) {
- if self.state != SESSION_NEW && self.state != SESSION_RUNNING {
- resp.err = fmt.Errorf("session is already done/canceled")
- }
- self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata})
- return
-}
-
-func (self *Session) callProgressHandler(p *ProgressData) {
- for _, cb := range self.progressCBs {
- if cb.cb != nil {
- if keep := cb.cb(p.Step, p.StepName, p.Progress, cb.userdata); !keep {
- cb.cb = nil
- }
- }
- }
-}
-
-func (self *Session) callDoneHandler(r *Result) {
- for _, cb := range self.doneCBs {
- if cb.cb != nil {
- if keep := cb.cb(*r, cb.userdata); !keep {
- cb.cb = nil
- }
- }
- }
-}
-
-func (self *Session) dispatchRequests() {
- defer func() { self.done <- true }()
- for {
- select {
- case <-self.quit:
- if self.state == SESSION_RUNNING {
- self.cancel()
- }
- return
- case <-self.timer.C:
- if self.state == SESSION_RUNNING {
- self.cancel()
- }
- self.state = SESSION_TIMEOUT
- r := &Result{500, "session timed out", 0, 0}
- self.callDoneHandler(r)
- if self.removeFunc != nil {
- self.removeFunc()
- }
- case t := <-self.runChan:
- if self.state == SESSION_NEW {
- self.run(t)
- }
- case <-self.cancelChan:
- if self.state == SESSION_RUNNING {
- self.cancel()
- }
- case req := <-self.addProgressChan:
- req.response <- self.addProgressHandler(req.userdata, req.callback)
- case req := <-self.addDoneChan:
- req.response <- self.addDoneHandler(req.userdata, req.callback)
- case p := <-self.progressIntChan:
- self.callProgressHandler(&p)
- case r := <-self.doneIntChan:
- if self.state != SESSION_TIMEOUT {
- self.timer.Stop()
- self.state = SESSION_DONE
- self.callDoneHandler(&r)
- if self.removeFunc != nil {
- self.removeFunc()
- }
- }
- }
- }
-}
-
-// *********************************************************
-// Public Interface
-
-type SessionChan struct {
- runChan chan<- time.Duration
- cancelChan chan<- bool
- addProgressChan chan<- sessionAddProgressHandlerRequest
- addDoneChan chan<- sessionAddDoneHandlerRequest
-}
-
-func (self *SessionChan) Run(timeout time.Duration) {
- select {
- case self.runChan <- timeout:
- default: // command is already pending or session is about to be closed/removed
- }
-}
-
-func (self *SessionChan) Cancel() {
- select {
- case self.cancelChan <- true:
- default: // cancel is already pending or session is about to be closed/removed
- }
-}
-
-func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ProgressCB) error {
- resCh := make(chan sessionAddProgressHandlerResponse)
- req := sessionAddProgressHandlerRequest{}
- req.userdata = userdata
- req.callback = cb
- req.response = resCh
- select {
- case self.addProgressChan <- req:
- default:
- return fmt.Errorf("session is about to be closed/removed")
- }
-
- res := <-resCh
- return res.err
-}
-
-func (self *SessionChan) AddDoneHandler(userdata interface{}, cb DoneCB) error {
- resCh := make(chan sessionAddDoneHandlerResponse)
- req := sessionAddDoneHandlerRequest{}
- req.userdata = userdata
- req.callback = cb
- req.response = resCh
- select {
- case self.addDoneChan <- req:
- default:
- return fmt.Errorf("session is about to be closed/removed")
- }
-
- res := <-resCh
- return res.err
-}
-
-// *********************************************************
-// Semi-Public Interface (only used by sessionStore)
-
-func (self *Session) getInterface() *SessionChan {
- ch := &SessionChan{}
- ch.runChan = self.runChan
- ch.cancelChan = self.cancelChan
- ch.addProgressChan = self.addProgressChan
- ch.addDoneChan = self.addDoneChan
- return ch
-}
-
-func (self *Session) cleanup() {
- self.quit <- true
- rhdl.Printf("waiting for session to close")
- <-self.done
- close(self.quit)
- close(self.done)
- self.timer.Stop()
- // don't close the channels we give out because this might lead to a panic if
- // somebody wites to an already removed session
- // close(self.cancelIntChan)
- // close(self.progressIntChan)
- // close(self.doneIntChan)
- // close(self.runChan)
- // close(self.cancelChan)
- // close(self.addProgressChan)
- // close(self.addDoneChan)
- rhdl.Printf("session is now cleaned up")
-}
-
-func newSession(ctx *Context, removeFunc func()) (session *Session) {
- session = new(Session)
- session.state = SESSION_NEW
- session.removeFunc = removeFunc
- session.ctx = *ctx
- session.done = make(chan bool)
- session.timer = time.NewTimer(10 * time.Second)
- session.cancelIntChan = make(chan bool, 1)
- session.progressIntChan = make(chan ProgressData, 10)
- session.doneIntChan = make(chan Result, 1)
- session.runChan = make(chan time.Duration, 1)
- session.cancelChan = make(chan bool, 1)
- session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10)
- session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10)
- go session.dispatchRequests()
- return
-}