// // rhimportd // // The Radio Helsinki Rivendell Import Daemon // // // Copyright (C) 2015 Christian Pointner // // This file is part of rhimportd. // // rhimportd is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // rhimportd is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with rhimportd. If not, see . // package rhimport import ( "encoding/base32" "fmt" "github.com/satori/go.uuid" "strings" ) type Session struct { ImportContext ImportResult // TODO: add creation time for timeout } type ProgressData struct { step int step_name string progress float64 userdata interface{} } type newSessionResponse struct { id string err error } type newSessionRequest struct { ctx ImportContext response chan newSessionResponse } type sessionRunResponse struct { err error } type sessionRunRequest struct { user string id string response chan sessionRunResponse } type sessionAddProgressHandlerResponse struct { err error } type sessionAddProgressHandlerRequest struct { user string id string userdata interface{} handler chan ProgressData response chan sessionAddProgressHandlerResponse } type sessionAddDoneHandlerResponse struct { err error } type sessionAddDoneHandlerRequest struct { user string id string userdata interface{} handler chan ImportResult response chan sessionAddDoneHandlerResponse } type sessionRemoveResponse struct { err error } type sessionRemoveRequest struct { user string id string response chan sessionRemoveResponse } type SessionStore struct { store map[string]map[string]Session quit chan bool done chan bool newChan chan newSessionRequest runChan chan sessionRunRequest addProgressChan chan sessionAddProgressHandlerRequest addDoneChan chan sessionAddDoneHandlerRequest removeChan chan sessionRemoveRequest } func (self *SessionStore) Init() (err error) { return } func (self *SessionStore) newSession(ctx ImportContext) (resp newSessionResponse) { b := uuid.NewV4().Bytes() resp.id = strings.ToLower(strings.TrimRight(base32.StdEncoding.EncodeToString(b), "=")) if _, exists := self.store[ctx.UserName]; !exists { self.store[ctx.UserName] = make(map[string]Session) } self.store[ctx.UserName][resp.id] = Session{ImportContext: ctx} rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id) return } func (self *SessionStore) NewSession(ctx ImportContext) (string, error) { req := newSessionRequest{} req.ctx = ctx req.response = make(chan newSessionResponse) self.newChan <- req res := <-req.response if res.err != nil { return "", res.err } return res.id, nil } func (self *SessionStore) run(user, id string) (resp sessionRunResponse) { if _, exists := self.store[user][id]; exists { rhdl.Printf("SessionStore: running session '%s/%s'", user, id) } else { resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) } return } func (self *SessionStore) Run(user, id string) error { req := sessionRunRequest{} req.user = user req.id = id req.response = make(chan sessionRunResponse) self.runChan <- req res := <-req.response return res.err } func (self *SessionStore) addProgressHandler(user, id string, userdata interface{}, handler chan ProgressData) (resp sessionAddProgressHandlerResponse) { if _, exists := self.store[user][id]; exists { rhdl.Printf("SessionStore: adding progress handler to '%s/%s'", user, id) } else { resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) } return } func (self *SessionStore) AddProgressHandler(user, id string, userdata interface{}, handler chan ProgressData) error { req := sessionAddProgressHandlerRequest{} req.user = user req.id = id req.userdata = userdata req.handler = handler req.response = make(chan sessionAddProgressHandlerResponse) self.addProgressChan <- req res := <-req.response return res.err } func (self *SessionStore) addDoneHandler(user, id string, userdata interface{}, handler chan ImportResult) (resp sessionAddDoneHandlerResponse) { if _, exists := self.store[user][id]; exists { rhdl.Printf("SessionStore: adding done handler to '%s/%s'", user, id) } else { resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) } return } func (self *SessionStore) AddDoneHandler(user, id string, userdata interface{}, handler chan ImportResult) error { req := sessionAddDoneHandlerRequest{} req.user = user req.id = id req.userdata = userdata req.handler = handler req.response = make(chan sessionAddDoneHandlerResponse) self.addDoneChan <- req res := <-req.response return res.err } func (self *SessionStore) remove(user, id string) (resp sessionRemoveResponse) { if _, exists := self.store[user][id]; exists { delete(self.store[user], id) rhdl.Printf("SessionStore: removed session '%s/%s'", user, id) if _, exists := self.store[user]; exists { if len(self.store[user]) == 0 { delete(self.store, user) rhdl.Printf("SessionStore: removed user '%s'", user) } } } else { resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) } return } func (self *SessionStore) Remove(user, id string) error { req := sessionRemoveRequest{} req.user = user req.id = id req.response = make(chan sessionRemoveResponse) self.removeChan <- req res := <-req.response return res.err } func (self *SessionStore) dispatchRequests() { defer func() { self.done <- true }() for { select { case <-self.quit: return case req := <-self.newChan: req.response <- self.newSession(req.ctx) case req := <-self.runChan: req.response <- self.run(req.user, req.id) case req := <-self.addProgressChan: req.response <- self.addProgressHandler(req.user, req.id, req.userdata, req.handler) case req := <-self.addDoneChan: req.response <- self.addDoneHandler(req.user, req.id, req.userdata, req.handler) case req := <-self.removeChan: req.response <- self.remove(req.user, req.id) } } } func (self *SessionStore) Cleanup() { self.quit <- true <-self.done close(self.quit) close(self.done) close(self.newChan) close(self.runChan) close(self.addProgressChan) close(self.addDoneChan) close(self.removeChan) } func NewSessionStore(conf *Config) (store *SessionStore, err error) { store = new(SessionStore) store.quit = make(chan bool) store.done = make(chan bool) store.store = make(map[string]map[string]Session) store.newChan = make(chan newSessionRequest) store.runChan = make(chan sessionRunRequest) store.addProgressChan = make(chan sessionAddProgressHandlerRequest) store.addDoneChan = make(chan sessionAddDoneHandlerRequest) store.removeChan = make(chan sessionRemoveRequest) go store.dispatchRequests() return }