// // rhimportd // // The Radio Helsinki Rivendell Import Daemon // // // Copyright (C) 2015 Christian Pointner // // This file is part of rhimportd. // // rhimportd is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // rhimportd is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with rhimportd. If not, see . // package rhimport import ( "fmt" "net/http" ) const ( SESSION_NEW = iota SESSION_RUNNING SESSION_CANCELED SESSION_DONE ) type SessionChan struct { runChan chan<- bool cancelChan chan<- bool addProgressChan chan<- sessionAddProgressHandlerRequest addDoneChan chan<- sessionAddDoneHandlerRequest } type Session struct { ctx ImportContext state int done chan bool cancelIntChan chan bool progressIntChan chan ProgressData doneIntChan chan ImportResult runChan chan bool cancelChan chan bool addProgressChan chan sessionAddProgressHandlerRequest addDoneChan chan sessionAddDoneHandlerRequest progressCBs []*SessionProgressCB doneCBs []*SessionDoneCB } type SessionProgressCB struct { cb ImportProgressCB userdata interface{} } type SessionDoneCB struct { cb func(ImportResult, interface{}) bool userdata interface{} } type ProgressData struct { step int step_name string progress float64 } type sessionAddProgressHandlerResponse struct { err error } type sessionAddProgressHandlerRequest struct { userdata interface{} callback ImportProgressCB response chan<- sessionAddProgressHandlerResponse } type sessionAddDoneHandlerResponse struct { err error } type sessionAddDoneHandlerRequest struct { userdata interface{} callback func(ImportResult, interface{}) bool response chan<- sessionAddDoneHandlerResponse } func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) bool { out := userdata.(chan<- ProgressData) out <- ProgressData{step, step_name, progress} return true } func session_import_run(ctx ImportContext, done chan<- ImportResult) { if err := ctx.SanityCheck(); err != nil { done <- ImportResult{http.StatusBadRequest, err.Error(), 0, 0} return } if res, err := FetchFile(&ctx); err != nil { done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0} return } else if res.ResponseCode != http.StatusOK { done <- ImportResult{res.ResponseCode, res.ErrorString, 0, 0} return } if res, err := ImportFile(&ctx); err != nil { done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0} return } else { done <- *res return } } func (self *Session) run() { self.ctx.ProgressCallBack = session_progress_callback self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan) self.ctx.Cancel = self.cancelIntChan go session_import_run(self.ctx, self.doneIntChan) self.state = SESSION_RUNNING return } func (self *SessionChan) Run() { select { case self.runChan <- true: default: // command is already pending or session is about to be closed/removed } } func (self *SessionChan) Cancel() { select { case self.cancelChan <- true: default: // cancel is already pending or session is about to be closed/removed } } func (self *Session) addProgressHandler(userdata interface{}, cb ImportProgressCB) (resp sessionAddProgressHandlerResponse) { if self.state != SESSION_NEW && self.state != SESSION_RUNNING { resp.err = fmt.Errorf("session is already done/canceled") } self.progressCBs = append(self.progressCBs, &SessionProgressCB{cb, userdata}) return } func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgressCB) error { res_ch := make(chan sessionAddProgressHandlerResponse) req := sessionAddProgressHandlerRequest{} req.userdata = userdata req.callback = cb req.response = res_ch select { case self.addProgressChan <- req: default: return fmt.Errorf("session is about to be closed/removed") } res := <-res_ch return res.err } func (self *Session) addDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) (resp sessionAddDoneHandlerResponse) { if self.state == SESSION_NEW && self.state != SESSION_RUNNING { resp.err = fmt.Errorf("session is already done/canceled") } self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata}) return } func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) error { res_ch := make(chan sessionAddDoneHandlerResponse) req := sessionAddDoneHandlerRequest{} req.userdata = userdata req.callback = cb req.response = res_ch select { case self.addDoneChan <- req: default: return fmt.Errorf("session is about to be closed/removed") } res := <-res_ch return res.err } func (self *Session) callProgressHandler(p *ProgressData) { for _, cb := range self.progressCBs { if cb.cb != nil { if keep := cb.cb(p.step, p.step_name, p.progress, cb.userdata); !keep { cb.cb = nil } } } } func (self *Session) callDoneHandler(r *ImportResult) { for _, cb := range self.doneCBs { if cb.cb != nil { if keep := cb.cb(*r, cb.userdata); !keep { cb.cb = nil } } } } func (self *Session) dispatchRequests() { defer func() { self.done <- true }() for { select { case <-self.runChan: if self.state == SESSION_NEW { self.run() } case <-self.cancelChan: if self.state == SESSION_RUNNING { rhdl.Println("Session: canceling running import") select { case self.cancelIntChan <- true: default: // session got canceled already?? } self.state = SESSION_CANCELED } else { return } case req := <-self.addProgressChan: req.response <- self.addProgressHandler(req.userdata, req.callback) case req := <-self.addDoneChan: req.response <- self.addDoneHandler(req.userdata, req.callback) case p := <-self.progressIntChan: self.callProgressHandler(&p) case r := <-self.doneIntChan: self.state = SESSION_DONE self.callDoneHandler(&r) } } } func (self *Session) getInterface() *SessionChan { ch := &SessionChan{} ch.runChan = self.runChan ch.cancelChan = self.cancelChan ch.addProgressChan = self.addProgressChan ch.addDoneChan = self.addDoneChan return ch } func (self *Session) Cleanup() { self.cancelChan <- true rhdl.Printf("waiting for session to close") <-self.done close(self.done) close(self.progressIntChan) close(self.doneIntChan) // don't close the channels we give out because this might lead to a panic if // somebody wites to an already removed session // close(self.runChan) // close(self.cancelChan) // close(self.addProgressChan) // close(self.addDoneChan) rhdl.Printf("session is now cleaned up") } func NewSession(ctx *ImportContext) (session *Session) { session = new(Session) session.state = SESSION_NEW session.ctx = *ctx session.done = make(chan bool) session.cancelIntChan = make(chan bool, 1) session.progressIntChan = make(chan ProgressData, 10) session.doneIntChan = make(chan ImportResult, 1) session.runChan = make(chan bool, 1) session.cancelChan = make(chan bool, 1) session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10) session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10) go session.dispatchRequests() return }