// // rhimportd // // The Radio Helsinki Rivendell Import Daemon // // // Copyright (C) 2015 Christian Pointner // // This file is part of rhimportd. // // rhimportd is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // rhimportd is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with rhimportd. If not, see . // package rhimport import ( "time" ) type SessionChan struct { runChan chan<- bool cancelChan chan<- bool addProgressChan chan<- sessionAddProgressHandlerRequest addDoneChan chan<- sessionAddDoneHandlerRequest } type Session struct { ctx ImportContext running bool done chan bool progressIntChan chan ProgressData doneIntChan chan ImportResult runChan chan bool cancelChan chan bool addProgressChan chan sessionAddProgressHandlerRequest addDoneChan chan sessionAddDoneHandlerRequest // TODO: add pub/sub for progress and done } type ProgressData struct { step int step_name string progress float64 } type sessionAddProgressHandlerResponse struct { err error } type sessionAddProgressHandlerRequest struct { userdata interface{} callback ImportProgressCB response chan<- sessionAddProgressHandlerResponse } type sessionAddDoneHandlerResponse struct { err error } type sessionAddDoneHandlerRequest struct { userdata interface{} callback func(*ImportResult, interface{}) response chan<- sessionAddDoneHandlerResponse } func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) { out := userdata.(chan<- ProgressData) out <- ProgressData{step, step_name, progress} } // TODO: actually call import here func session_import_run(ctx ImportContext, done chan<- ImportResult) { rhdl.Printf("faking import for: %+v", ctx) for i := 0; i < 100; i++ { if ctx.ProgressCallBack != nil { ctx.ProgressCallBack(42, "faking", float64(i)/100.0, ctx.ProgressCallBackData) } time.Sleep(100 * time.Millisecond) } if ctx.ProgressCallBack != nil { ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData) } done <- ImportResult{200, "OK", 0, 0} } func (self *Session) run() { self.ctx.ProgressCallBack = session_progress_callback self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan) go session_import_run(self.ctx, self.doneIntChan) self.running = true return } func (self *SessionChan) Run() { self.runChan <- true } func (self *SessionChan) Cancel() { self.cancelChan <- true } func (self *Session) addProgressHandler(userdata interface{}, cb ImportProgressCB) (resp sessionAddProgressHandlerResponse) { rhdl.Printf("Session: addProgressHandler called with: %+v", userdata) return } func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgressCB) error { res_ch := make(chan sessionAddProgressHandlerResponse) req := sessionAddProgressHandlerRequest{} req.userdata = userdata req.callback = cb req.response = res_ch self.addProgressChan <- req res := <-res_ch return res.err } func (self *Session) addDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) (resp sessionAddDoneHandlerResponse) { rhdl.Printf("Session: addDoneHandler called with: %+v", userdata) return } func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) error { res_ch := make(chan sessionAddDoneHandlerResponse) req := sessionAddDoneHandlerRequest{} req.userdata = userdata req.callback = cb req.response = res_ch self.addDoneChan <- req res := <-res_ch return res.err } func (self *Session) dispatchRequests() { defer func() { self.done <- true }() for { select { case <-self.runChan: if !self.running { self.run() } case <-self.cancelChan: if self.running { rhdl.Println("Session: canceling running imports is not yet implemented") // TODO: send cancel to import goroutine once this got implemented } else { return } case req := <-self.addProgressChan: req.response <- self.addProgressHandler(req.userdata, req.callback) case req := <-self.addDoneChan: req.response <- self.addDoneHandler(req.userdata, req.callback) case progress := <-self.progressIntChan: rhdl.Printf("Session: got progress: %+v", progress) // TODO: call all subscribed progress handler case result := <-self.doneIntChan: self.running = false rhdl.Printf("Session: import is done: %+v", result) // TODO: call all subscribed done handler } } } func (self *Session) getInterface() *SessionChan { ch := &SessionChan{} ch.runChan = self.runChan ch.cancelChan = self.cancelChan ch.addProgressChan = self.addProgressChan ch.addDoneChan = self.addDoneChan return ch } func (self *Session) Cleanup() { // TODO: this blocks if dispatchRequests has ended already... // or if cancel doesn't work... self.cancelChan <- true <-self.done close(self.done) close(self.progressIntChan) close(self.doneIntChan) close(self.runChan) close(self.cancelChan) close(self.addProgressChan) close(self.addDoneChan) } func NewSession(ctx *ImportContext) (session *Session) { session = new(Session) session.running = false session.ctx = *ctx session.done = make(chan bool) session.progressIntChan = make(chan ProgressData) session.doneIntChan = make(chan ImportResult) session.runChan = make(chan bool) session.cancelChan = make(chan bool) session.addProgressChan = make(chan sessionAddProgressHandlerRequest) session.addDoneChan = make(chan sessionAddDoneHandlerRequest) go session.dispatchRequests() return }