From 21027f52089faf514ecd2c5c1e8bfa4c5f8fe8d3 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Fri, 18 Dec 2015 03:36:46 +0100 Subject: added basic version of a session store diff --git a/session.go b/session.go new file mode 100644 index 0000000..37a4b54 --- /dev/null +++ b/session.go @@ -0,0 +1,265 @@ +// +// rhimportd +// +// The Radio Helsinki Rivendell Import Daemon +// +// +// Copyright (C) 2015 Christian Pointner +// +// This file is part of rhimportd. +// +// rhimportd is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// rhimportd is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with rhimportd. If not, see . +// + +package rhimport + +import ( + "encoding/base32" + "fmt" + "github.com/satori/go.uuid" + "strings" +) + +type Session struct { + ImportContext + ImportResult + // TODO: add creation time for timeout +} + +type ProgressData struct { + step int + step_name string + progress float64 + userdata interface{} +} + +type newSessionResponse struct { + id string + err error +} + +type newSessionRequest struct { + ctx ImportContext + response chan newSessionResponse +} + +type sessionRunResponse struct { + err error +} + +type sessionRunRequest struct { + user string + id string + response chan sessionRunResponse +} + +type sessionAddProgressHandlerResponse struct { + err error +} + +type sessionAddProgressHandlerRequest struct { + user string + id string + userdata interface{} + handler chan ProgressData + response chan sessionAddProgressHandlerResponse +} + +type sessionAddDoneHandlerResponse struct { + err error +} + +type sessionAddDoneHandlerRequest struct { + user string + id string + userdata interface{} + handler chan ImportResult + response chan sessionAddDoneHandlerResponse +} + +type sessionRemoveResponse struct { + err error +} + +type sessionRemoveRequest struct { + user string + id string + response chan sessionRemoveResponse +} + +type SessionStore struct { + store map[string]map[string]Session + quit chan bool + done chan bool + newChan chan newSessionRequest + runChan chan sessionRunRequest + addProgressChan chan sessionAddProgressHandlerRequest + addDoneChan chan sessionAddDoneHandlerRequest + removeChan chan sessionRemoveRequest +} + +func (self *SessionStore) Init() (err error) { + return +} + +func (self *SessionStore) newSession(ctx ImportContext) (resp newSessionResponse) { + b := uuid.NewV4().Bytes() + resp.id = strings.ToLower(strings.TrimRight(base32.StdEncoding.EncodeToString(b), "=")) + if _, exists := self.store[ctx.UserName]; !exists { + self.store[ctx.UserName] = make(map[string]Session) + } + self.store[ctx.UserName][resp.id] = Session{ImportContext: ctx} + rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id) + return +} + +func (self *SessionStore) NewSession(ctx ImportContext) (string, error) { + req := newSessionRequest{} + req.ctx = ctx + req.response = make(chan newSessionResponse) + self.newChan <- req + + res := <-req.response + if res.err != nil { + return "", res.err + } + return res.id, nil +} + +func (self *SessionStore) run(user, id string) (resp sessionRunResponse) { + rhdl.Printf("SessionStore: running session '%s/%s'", user, id) + return +} + +func (self *SessionStore) Run(user, id string) error { + req := sessionRunRequest{} + req.user = user + req.id = id + req.response = make(chan sessionRunResponse) + self.runChan <- req + + res := <-req.response + return res.err +} + +func (self *SessionStore) addProgressHandler(user, id string, userdata interface{}, handler chan ProgressData) (resp sessionAddProgressHandlerResponse) { + rhdl.Printf("SessionStore: adding progress handler to '%s/%s'", user, id) + return +} + +func (self *SessionStore) AddProgressHandler(user, id string, userdata interface{}, handler chan ProgressData) error { + req := sessionAddProgressHandlerRequest{} + req.user = user + req.id = id + req.userdata = userdata + req.handler = handler + req.response = make(chan sessionAddProgressHandlerResponse) + self.addProgressChan <- req + + res := <-req.response + return res.err +} + +func (self *SessionStore) addDoneHandler(user, id string, userdata interface{}, handler chan ImportResult) (resp sessionAddDoneHandlerResponse) { + rhdl.Printf("SessionStore: adding done handler to '%s/%s'", user, id) + return +} + +func (self *SessionStore) AddDoneHandler(user, id string, userdata interface{}, handler chan ImportResult) error { + req := sessionAddDoneHandlerRequest{} + req.user = user + req.id = id + req.userdata = userdata + req.handler = handler + req.response = make(chan sessionAddDoneHandlerResponse) + self.addDoneChan <- req + + res := <-req.response + return res.err +} + +func (self *SessionStore) remove(user, id string) (resp sessionRemoveResponse) { + if _, exists := self.store[user][id]; exists { + delete(self.store[user], id) + rhdl.Printf("SessionStore: removed session '%s/%s'", user, id) + if _, exists := self.store[user]; exists { + if len(self.store[user]) == 0 { + delete(self.store, user) + rhdl.Printf("SessionStore: removed user '%s'", user) + } + } + } else { + resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) + } + return +} + +func (self *SessionStore) Remove(user, id string) error { + req := sessionRemoveRequest{} + req.user = user + req.id = id + req.response = make(chan sessionRemoveResponse) + self.removeChan <- req + + res := <-req.response + return res.err +} + +func (self *SessionStore) dispatchRequests() { + defer func() { self.done <- true }() + for { + select { + case <-self.quit: + return + case req := <-self.newChan: + req.response <- self.newSession(req.ctx) + case req := <-self.runChan: + req.response <- self.run(req.user, req.id) + case req := <-self.addProgressChan: + req.response <- self.addProgressHandler(req.user, req.id, req.userdata, req.handler) + case req := <-self.addDoneChan: + req.response <- self.addDoneHandler(req.user, req.id, req.userdata, req.handler) + case req := <-self.removeChan: + req.response <- self.remove(req.user, req.id) + } + } +} + +func (self *SessionStore) Cleanup() { + self.quit <- true + <-self.done + close(self.quit) + close(self.done) + close(self.newChan) + close(self.runChan) + close(self.addProgressChan) + close(self.addDoneChan) + close(self.removeChan) +} + +func NewSessionStore(conf *Config) (store *SessionStore, err error) { + store = new(SessionStore) + + store.quit = make(chan bool) + store.done = make(chan bool) + store.store = make(map[string]map[string]Session) + store.newChan = make(chan newSessionRequest) + store.runChan = make(chan sessionRunRequest) + store.addProgressChan = make(chan sessionAddProgressHandlerRequest) + store.addDoneChan = make(chan sessionAddDoneHandlerRequest) + store.removeChan = make(chan sessionRemoveRequest) + + go store.dispatchRequests() + return +} -- cgit v0.10.2