From 795d37b3aa2ee04f9a56a9a75284b0a4ac8a4b0f Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 20 Dec 2015 05:46:01 +0100 Subject: session store and session kind of work now still a lot to be done and a lot of testing is needed! diff --git a/importer.go b/importer.go index f65515b..292d376 100644 --- a/importer.go +++ b/importer.go @@ -39,6 +39,8 @@ var ( bool2str = map[bool]string{false: "0", true: "1"} ) +type ImportProgressCB func(step int, step_name string, progress float64, userdata interface{}) + type ImportContext struct { conf *Config rddb *RdDbChan @@ -59,7 +61,7 @@ type ImportContext struct { SourceFile string DeleteSourceFile bool DeleteSourceDir bool - ProgressCallBack func(step int, step_name string, progress float64, userdata interface{}) + ProgressCallBack ImportProgressCB ProgressCallBackData interface{} } diff --git a/rddb.go b/rddb.go index f596171..bb481f4 100644 --- a/rddb.go +++ b/rddb.go @@ -297,7 +297,7 @@ func (self *RdDb) dispatchRequests() { } } -func (self *RdDb) GetChannels() *RdDbChan { +func (self *RdDb) GetInterface() *RdDbChan { ch := &RdDbChan{} ch.getPasswordChan = self.getPasswordChan ch.getGroupOfCartChan = self.getGroupOfCartChan diff --git a/session.go b/session.go index 718d13e..ce84053 100644 --- a/session.go +++ b/session.go @@ -25,43 +25,33 @@ package rhimport import ( - "encoding/base32" - "fmt" - "github.com/satori/go.uuid" - "strings" + "time" ) +type SessionChan struct { + runChan chan<- bool + cancelChan chan<- bool + addProgressChan chan<- sessionAddProgressHandlerRequest + addDoneChan chan<- sessionAddDoneHandlerRequest +} + type Session struct { - ImportContext - ImportResult - // TODO: add creation time for timeout + ctx ImportContext + running bool + done chan bool + progressIntChan chan ProgressData + doneIntChan chan ImportResult + runChan chan bool + cancelChan chan bool + addProgressChan chan sessionAddProgressHandlerRequest + addDoneChan chan sessionAddDoneHandlerRequest + // TODO: add pub/sub for progress and done } type ProgressData struct { step int step_name string progress float64 - userdata interface{} -} - -type newSessionResponse struct { - id string - err error -} - -type newSessionRequest struct { - ctx ImportContext - response chan newSessionResponse -} - -type sessionRunResponse struct { - err error -} - -type sessionRunRequest struct { - user string - id string - response chan<- sessionRunResponse } type sessionAddProgressHandlerResponse struct { @@ -69,10 +59,8 @@ type sessionAddProgressHandlerResponse struct { } type sessionAddProgressHandlerRequest struct { - user string - id string userdata interface{} - handler chan<- ProgressData + callback ImportProgressCB response chan<- sessionAddProgressHandlerResponse } @@ -81,100 +69,57 @@ type sessionAddDoneHandlerResponse struct { } type sessionAddDoneHandlerRequest struct { - user string - id string userdata interface{} - handler chan<- ImportResult + callback func(*ImportResult, interface{}) response chan<- sessionAddDoneHandlerResponse } -type sessionRemoveResponse struct { - err error -} - -type sessionRemoveRequest struct { - user string - id string - response chan sessionRemoveResponse -} - -type SessionStore struct { - store map[string]map[string]Session - quit chan bool - done chan bool - newChan chan newSessionRequest - runChan chan sessionRunRequest - addProgressChan chan sessionAddProgressHandlerRequest - addDoneChan chan sessionAddDoneHandlerRequest - removeChan chan sessionRemoveRequest +func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) { + out := userdata.(chan<- ProgressData) + out <- ProgressData{step, step_name, progress} } -func (self *SessionStore) Init() (err error) { - return -} - -func (self *SessionStore) newSession(ctx ImportContext) (resp newSessionResponse) { - b := uuid.NewV4().Bytes() - resp.id = strings.ToLower(strings.TrimRight(base32.StdEncoding.EncodeToString(b), "=")) - if _, exists := self.store[ctx.UserName]; !exists { - self.store[ctx.UserName] = make(map[string]Session) +// TODO: actually call import here +func session_import_run(ctx ImportContext, done chan<- ImportResult) { + rhdl.Printf("faking import for: %+v", ctx) + for i := 0; i < 100; i++ { + if ctx.ProgressCallBack != nil { + ctx.ProgressCallBack(42, "faking", float64(i)/100.0, ctx.ProgressCallBackData) + } + time.Sleep(100 * time.Millisecond) } - self.store[ctx.UserName][resp.id] = Session{ImportContext: ctx} - rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id) - return -} - -func (self *SessionStore) NewSession(ctx ImportContext) (string, error) { - res_ch := make(chan newSessionResponse) - req := newSessionRequest{} - req.ctx = ctx - req.response = res_ch - self.newChan <- req - - res := <-res_ch - if res.err != nil { - return "", res.err + if ctx.ProgressCallBack != nil { + ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData) } - return res.id, nil + done <- ImportResult{200, "OK", 0, 0} } -func (self *SessionStore) run(user, id string) (resp sessionRunResponse) { - if _, exists := self.store[user][id]; exists { - rhdl.Printf("SessionStore: running session '%s/%s'", user, id) - } else { - resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) - } +func (self *Session) run() { + self.ctx.ProgressCallBack = session_progress_callback + self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan) + go session_import_run(self.ctx, self.doneIntChan) + self.running = true return } -func (self *SessionStore) Run(user, id string) error { - res_ch := make(chan sessionRunResponse) - req := sessionRunRequest{} - req.user = user - req.id = id - req.response = res_ch - self.runChan <- req +func (self *SessionChan) Run() { + self.runChan <- true +} - res := <-res_ch - return res.err +func (self *SessionChan) Cancel() { + self.cancelChan <- true } -func (self *SessionStore) addProgressHandler(user, id string, userdata interface{}, handler chan<- ProgressData) (resp sessionAddProgressHandlerResponse) { - if _, exists := self.store[user][id]; exists { - rhdl.Printf("SessionStore: adding progress handler to '%s/%s'", user, id) - } else { - resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) - } +func (self *Session) addProgressHandler(userdata interface{}, cb ImportProgressCB) (resp sessionAddProgressHandlerResponse) { + rhdl.Printf("Session: addProgressHandler called with: %+v", userdata) return } -func (self *SessionStore) AddProgressHandler(user, id string, userdata interface{}, handler chan<- ProgressData) error { +func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgressCB) error { res_ch := make(chan sessionAddProgressHandlerResponse) req := sessionAddProgressHandlerRequest{} - req.user = user - req.id = id req.userdata = userdata - req.handler = handler + req.callback = cb req.response = res_ch self.addProgressChan <- req @@ -182,22 +127,16 @@ func (self *SessionStore) AddProgressHandler(user, id string, userdata interface return res.err } -func (self *SessionStore) addDoneHandler(user, id string, userdata interface{}, handler chan<- ImportResult) (resp sessionAddDoneHandlerResponse) { - if _, exists := self.store[user][id]; exists { - rhdl.Printf("SessionStore: adding done handler to '%s/%s'", user, id) - } else { - resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) - } +func (self *Session) addDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) (resp sessionAddDoneHandlerResponse) { + rhdl.Printf("Session: addDoneHandler called with: %+v", userdata) return } -func (self *SessionStore) AddDoneHandler(user, id string, userdata interface{}, handler chan<- ImportResult) error { +func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) error { res_ch := make(chan sessionAddDoneHandlerResponse) req := sessionAddDoneHandlerRequest{} - req.user = user - req.id = id req.userdata = userdata - req.handler = handler + req.callback = cb req.response = res_ch self.addDoneChan <- req @@ -205,78 +144,69 @@ func (self *SessionStore) AddDoneHandler(user, id string, userdata interface{}, return res.err } -func (self *SessionStore) remove(user, id string) (resp sessionRemoveResponse) { - if _, exists := self.store[user][id]; exists { - delete(self.store[user], id) - rhdl.Printf("SessionStore: removed session '%s/%s'", user, id) - if _, exists := self.store[user]; exists { - if len(self.store[user]) == 0 { - delete(self.store, user) - rhdl.Printf("SessionStore: removed user '%s'", user) - } - } - } else { - resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) - } - return -} - -func (self *SessionStore) Remove(user, id string) error { - res_ch := make(chan sessionRemoveResponse) - req := sessionRemoveRequest{} - req.user = user - req.id = id - req.response = res_ch - self.removeChan <- req - - res := <-res_ch - return res.err -} - -func (self *SessionStore) dispatchRequests() { +func (self *Session) dispatchRequests() { defer func() { self.done <- true }() for { select { - case <-self.quit: - return - case req := <-self.newChan: - req.response <- self.newSession(req.ctx) - case req := <-self.runChan: - req.response <- self.run(req.user, req.id) + case <-self.runChan: + self.run() + case <-self.cancelChan: + if self.running { + rhdl.Println("Session: canceling running imports is not yet implemented") + // TODO: send cancel to import goroutine once this got implemented + } else { + return + } case req := <-self.addProgressChan: - req.response <- self.addProgressHandler(req.user, req.id, req.userdata, req.handler) + req.response <- self.addProgressHandler(req.userdata, req.callback) case req := <-self.addDoneChan: - req.response <- self.addDoneHandler(req.user, req.id, req.userdata, req.handler) - case req := <-self.removeChan: - req.response <- self.remove(req.user, req.id) + req.response <- self.addDoneHandler(req.userdata, req.callback) + case progress := <-self.progressIntChan: + rhdl.Printf("Session: got progress: %+v", progress) + // TODO: call all subscribed progress handler + case result := <-self.doneIntChan: + self.running = false + rhdl.Printf("Session: import is done: %+v", result) + // TODO: call all subscribed done handler + // TODO: send remove request to session store? + return } } } -func (self *SessionStore) Cleanup() { - self.quit <- true +func (self *Session) getInterface() *SessionChan { + ch := &SessionChan{} + ch.runChan = self.runChan + ch.cancelChan = self.cancelChan + ch.addProgressChan = self.addProgressChan + ch.addDoneChan = self.addDoneChan + return ch +} + +func (self *Session) Cleanup() { + // TODO: this blocks if dispatchRequests has ended already... + self.cancelChan <- true <-self.done - close(self.quit) close(self.done) - close(self.newChan) + close(self.progressIntChan) + close(self.doneIntChan) close(self.runChan) + close(self.cancelChan) close(self.addProgressChan) close(self.addDoneChan) - close(self.removeChan) } -func NewSessionStore(conf *Config) (store *SessionStore, err error) { - store = new(SessionStore) - - store.quit = make(chan bool) - store.done = make(chan bool) - store.store = make(map[string]map[string]Session) - store.newChan = make(chan newSessionRequest) - store.runChan = make(chan sessionRunRequest) - store.addProgressChan = make(chan sessionAddProgressHandlerRequest) - store.addDoneChan = make(chan sessionAddDoneHandlerRequest) - store.removeChan = make(chan sessionRemoveRequest) - - go store.dispatchRequests() +func NewSession(ctx *ImportContext) (session *Session) { + session = new(Session) + session.running = false + session.ctx = *ctx + session.done = make(chan bool) + session.progressIntChan = make(chan ProgressData) + session.doneIntChan = make(chan ImportResult) + session.runChan = make(chan bool) + session.cancelChan = make(chan bool) + session.addProgressChan = make(chan sessionAddProgressHandlerRequest) + session.addDoneChan = make(chan sessionAddDoneHandlerRequest) + go session.dispatchRequests() return } diff --git a/session_store.go b/session_store.go new file mode 100644 index 0000000..b2e1538 --- /dev/null +++ b/session_store.go @@ -0,0 +1,206 @@ +// +// rhimportd +// +// The Radio Helsinki Rivendell Import Daemon +// +// +// Copyright (C) 2015 Christian Pointner +// +// This file is part of rhimportd. +// +// rhimportd is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// rhimportd is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with rhimportd. If not, see . +// + +package rhimport + +import ( + "encoding/base32" + "fmt" + "github.com/satori/go.uuid" + "strings" +) + +type newSessionResponse struct { + id string + session *SessionChan + err error +} + +type newSessionRequest struct { + ctx *ImportContext + response chan newSessionResponse +} + +type getSessionResponse struct { + session *SessionChan + err error +} + +type getSessionRequest struct { + user string + id string + response chan getSessionResponse +} + +type removeSessionResponse struct { + err error +} + +type removeSessionRequest struct { + user string + id string + response chan removeSessionResponse +} + +type SessionStoreChan struct { + newChan chan<- newSessionRequest + getChan chan<- getSessionRequest + removeChan chan<- removeSessionRequest +} + +type SessionStore struct { + store map[string]map[string]*Session + quit chan bool + done chan bool + newChan chan newSessionRequest + getChan chan getSessionRequest + removeChan chan removeSessionRequest +} + +func (self *SessionStore) new(ctx *ImportContext) (resp newSessionResponse) { + b := uuid.NewV4().Bytes() + resp.id = strings.ToLower(strings.TrimRight(base32.StdEncoding.EncodeToString(b), "=")) + if _, exists := self.store[ctx.UserName]; !exists { + self.store[ctx.UserName] = make(map[string]*Session) + } + self.store[ctx.UserName][resp.id] = NewSession(ctx) + resp.session = self.store[ctx.UserName][resp.id].getInterface() + rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id) + return +} + +func (self *SessionStoreChan) New(ctx *ImportContext) (string, *SessionChan, error) { + res_ch := make(chan newSessionResponse) + req := newSessionRequest{} + req.ctx = ctx + req.response = res_ch + self.newChan <- req + + res := <-res_ch + if res.err != nil { + return "", nil, res.err + } + return res.id, res.session, nil +} + +func (self *SessionStore) get(user, id string) (resp getSessionResponse) { + if session, exists := self.store[user][id]; exists { + resp.session = session.getInterface() + } else { + resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) + } + return +} + +func (self *SessionStoreChan) Get(user, id string) (*SessionChan, error) { + res_ch := make(chan getSessionResponse) + req := getSessionRequest{} + req.user = user + req.id = id + req.response = res_ch + self.getChan <- req + + res := <-res_ch + if res.err != nil { + return nil, res.err + } + return res.session, nil +} + +func (self *SessionStore) remove(user, id string) (resp removeSessionResponse) { + if session, exists := self.store[user][id]; exists { + session.Cleanup() + delete(self.store[user], id) + rhdl.Printf("SessionStore: removed session '%s/%s'", user, id) + if userstore, exists := self.store[user]; exists { + if len(userstore) == 0 { + delete(self.store, user) + rhdl.Printf("SessionStore: removed user '%s'", user) + } + } + } else { + resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) + } + return +} + +func (self *SessionStoreChan) Remove(user, id string) error { + res_ch := make(chan removeSessionResponse) + req := removeSessionRequest{} + req.user = user + req.id = id + req.response = res_ch + self.removeChan <- req + + res := <-res_ch + return res.err +} + +func (self *SessionStore) dispatchRequests() { + defer func() { self.done <- true }() + for { + select { + case <-self.quit: + return + case req := <-self.newChan: + req.response <- self.new(req.ctx) + case req := <-self.getChan: + req.response <- self.get(req.user, req.id) + case req := <-self.removeChan: + req.response <- self.remove(req.user, req.id) + } + } +} + +func (self *SessionStore) GetInterface() *SessionStoreChan { + ch := &SessionStoreChan{} + ch.newChan = self.newChan + ch.getChan = self.getChan + ch.removeChan = self.removeChan + return ch +} + +func (self *SessionStore) Cleanup() { + self.quit <- true + <-self.done + close(self.quit) + close(self.done) + close(self.newChan) + close(self.getChan) + close(self.removeChan) +} + +func NewSessionStore(conf *Config) (store *SessionStore, err error) { + store = new(SessionStore) + + store.quit = make(chan bool) + store.done = make(chan bool) + store.store = make(map[string]map[string]*Session) + store.newChan = make(chan newSessionRequest) + store.getChan = make(chan getSessionRequest) + store.removeChan = make(chan removeSessionRequest) + + go store.dispatchRequests() + return +} -- cgit v0.10.2