// // rhimportd // // The Radio Helsinki Rivendell Import Daemon // // // Copyright (C) 2015 Christian Pointner // // This file is part of rhimportd. // // rhimportd is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // rhimportd is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with rhimportd. If not, see . // package rhimport import ( "fmt" "net/http" ) const ( SESSION_NEW = iota SESSION_RUNNING SESSION_CANCELED SESSION_DONE ) type SessionChan struct { runChan chan<- bool cancelChan chan<- bool addProgressChan chan<- sessionAddProgressHandlerRequest addDoneChan chan<- sessionAddDoneHandlerRequest } type Session struct { ctx ImportContext state int removeFunc func() done chan bool quit chan bool cancelIntChan chan bool progressIntChan chan ProgressData doneIntChan chan ImportResult runChan chan bool cancelChan chan bool addProgressChan chan sessionAddProgressHandlerRequest addDoneChan chan sessionAddDoneHandlerRequest progressCBs []*SessionProgressCB doneCBs []*SessionDoneCB } type SessionProgressCB struct { cb ImportProgressCB userdata interface{} } type SessionDoneCB struct { cb func(ImportResult, interface{}) bool userdata interface{} } type ProgressData struct { step int step_name string progress float64 } type sessionAddProgressHandlerResponse struct { err error } type sessionAddProgressHandlerRequest struct { userdata interface{} callback ImportProgressCB response chan<- sessionAddProgressHandlerResponse } type sessionAddDoneHandlerResponse struct { err error } type sessionAddDoneHandlerRequest struct { userdata interface{} callback func(ImportResult, interface{}) bool response chan<- sessionAddDoneHandlerResponse } func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) bool { out := userdata.(chan<- ProgressData) out <- ProgressData{step, step_name, progress} return true } func session_import_run(ctx ImportContext, done chan<- ImportResult) { if err := ctx.SanityCheck(); err != nil { done <- ImportResult{http.StatusBadRequest, err.Error(), 0, 0} return } if res, err := FetchFile(&ctx); err != nil { done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0} return } else if res.ResponseCode != http.StatusOK { done <- ImportResult{res.ResponseCode, res.ErrorString, 0, 0} return } if res, err := ImportFile(&ctx); err != nil { done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0} return } else { done <- *res return } } func (self *Session) run() { self.ctx.ProgressCallBack = session_progress_callback self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan) self.ctx.Cancel = self.cancelIntChan go session_import_run(self.ctx, self.doneIntChan) self.state = SESSION_RUNNING return } func (self *SessionChan) Run() { select { case self.runChan <- true: default: // command is already pending or session is about to be closed/removed } } func (self *Session) cancel() { rhdl.Println("Session: canceling running import") select { case self.cancelIntChan <- true: default: // session got canceled already?? } self.state = SESSION_CANCELED } func (self *SessionChan) Cancel() { select { case self.cancelChan <- true: default: // cancel is already pending or session is about to be closed/removed } } func (self *Session) addProgressHandler(userdata interface{}, cb ImportProgressCB) (resp sessionAddProgressHandlerResponse) { if self.state != SESSION_NEW && self.state != SESSION_RUNNING { resp.err = fmt.Errorf("session is already done/canceled") } self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata}) return } func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgressCB) error { res_ch := make(chan sessionAddProgressHandlerResponse) req := sessionAddProgressHandlerRequest{} req.userdata = userdata req.callback = cb req.response = res_ch select { case self.addProgressChan <- req: default: return fmt.Errorf("session is about to be closed/removed") } res := <-res_ch return res.err } func (self *Session) addDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) (resp sessionAddDoneHandlerResponse) { if self.state == SESSION_NEW && self.state != SESSION_RUNNING { resp.err = fmt.Errorf("session is already done/canceled") } self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata}) return } func (self *Session) callProgressHandler(p *ProgressData) { for _, cb := range self.progressCBs { if cb.cb != nil { if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep { cb.cb = nil } } } } func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) error { res_ch := make(chan sessionAddDoneHandlerResponse) req := sessionAddDoneHandlerRequest{} req.userdata = userdata req.callback = cb req.response = res_ch select { case self.addDoneChan <- req: default: return fmt.Errorf("session is about to be closed/removed") } res := <-res_ch return res.err } func (self *Session) callDoneHandler(r *ImportResult) { for _, cb := range self.doneCBs { if cb.cb != nil { if keep := cb.cb(*r, cb.userdata); !keep { cb.cb = nil } } } } func (self *Session) dispatchRequests() { defer func() { self.done <- true }() for { select { case <-self.quit: if self.state == SESSION_RUNNING { self.cancel() } return case <-self.runChan: if self.state == SESSION_NEW { self.run() } case <-self.cancelChan: if self.state == SESSION_RUNNING { self.cancel() } case req := <-self.addProgressChan: req.response <- self.addProgressHandler(req.userdata, req.callback) case req := <-self.addDoneChan: req.response <- self.addDoneHandler(req.userdata, req.callback) case p := <-self.progressIntChan: self.callProgressHandler(&p) case r := <-self.doneIntChan: self.state = SESSION_DONE self.callDoneHandler(&r) if self.removeFunc != nil { self.removeFunc() } } } } func (self *Session) getInterface() *SessionChan { ch := &SessionChan{} ch.runChan = self.runChan ch.cancelChan = self.cancelChan ch.addProgressChan = self.addProgressChan ch.addDoneChan = self.addDoneChan return ch } func (self *Session) Cleanup() { self.quit <- true rhdl.Printf("waiting for session to close") <-self.done close(self.done) close(self.progressIntChan) close(self.doneIntChan) // don't close the channels we give out because this might lead to a panic if // somebody wites to an already removed session // close(self.runChan) // close(self.cancelChan) // close(self.addProgressChan) // close(self.addDoneChan) rhdl.Printf("session is now cleaned up") } func NewSession(ctx *ImportContext, removeFunc func()) (session *Session) { session = new(Session) session.state = SESSION_NEW session.removeFunc = removeFunc session.ctx = *ctx session.done = make(chan bool) session.cancelIntChan = make(chan bool, 1) session.progressIntChan = make(chan ProgressData, 10) session.doneIntChan = make(chan ImportResult, 1) session.runChan = make(chan bool, 1) session.cancelChan = make(chan bool, 1) session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10) session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10) go session.dispatchRequests() return }