diff options
author | Christian Pointner <equinox@helsinki.at> | 2015-12-18 02:36:46 (GMT) |
---|---|---|
committer | Christian Pointner <equinox@helsinki.at> | 2015-12-18 02:36:46 (GMT) |
commit | a8e3fbd06a614559a2bf470f03a07173406b0740 (patch) | |
tree | fcb0189516b081559bd66213f8740ed3d0a78765 /src | |
parent | 06dbfacd0b4f77587d08c566fa8166e54ff9d426 (diff) |
added basic version of a session store
Diffstat (limited to 'src')
-rw-r--r-- | src/helsinki.at/rhimport/session.go | 265 | ||||
-rw-r--r-- | src/helsinki.at/rhimportd/main.go | 92 |
2 files changed, 333 insertions, 24 deletions
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go new file mode 100644 index 0000000..37a4b54 --- /dev/null +++ b/src/helsinki.at/rhimport/session.go @@ -0,0 +1,265 @@ +// +// rhimportd +// +// The Radio Helsinki Rivendell Import Daemon +// +// +// Copyright (C) 2015 Christian Pointner <equinox@helsinki.at> +// +// This file is part of rhimportd. +// +// rhimportd is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// rhimportd is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with rhimportd. If not, see <http://www.gnu.org/licenses/>. +// + +package rhimport + +import ( + "encoding/base32" + "fmt" + "github.com/satori/go.uuid" + "strings" +) + +type Session struct { + ImportContext + ImportResult + // TODO: add creation time for timeout +} + +type ProgressData struct { + step int + step_name string + progress float64 + userdata interface{} +} + +type newSessionResponse struct { + id string + err error +} + +type newSessionRequest struct { + ctx ImportContext + response chan newSessionResponse +} + +type sessionRunResponse struct { + err error +} + +type sessionRunRequest struct { + user string + id string + response chan sessionRunResponse +} + +type sessionAddProgressHandlerResponse struct { + err error +} + +type sessionAddProgressHandlerRequest struct { + user string + id string + userdata interface{} + handler chan ProgressData + response chan sessionAddProgressHandlerResponse +} + +type sessionAddDoneHandlerResponse struct { + err error +} + +type sessionAddDoneHandlerRequest struct { + user string + id string + userdata interface{} + handler chan ImportResult + response chan sessionAddDoneHandlerResponse +} + +type sessionRemoveResponse struct { + err error +} + +type sessionRemoveRequest struct { + user string + id string + response chan sessionRemoveResponse +} + +type SessionStore struct { + store map[string]map[string]Session + quit chan bool + done chan bool + newChan chan newSessionRequest + runChan chan sessionRunRequest + addProgressChan chan sessionAddProgressHandlerRequest + addDoneChan chan sessionAddDoneHandlerRequest + removeChan chan sessionRemoveRequest +} + +func (self *SessionStore) Init() (err error) { + return +} + +func (self *SessionStore) newSession(ctx ImportContext) (resp newSessionResponse) { + b := uuid.NewV4().Bytes() + resp.id = strings.ToLower(strings.TrimRight(base32.StdEncoding.EncodeToString(b), "=")) + if _, exists := self.store[ctx.UserName]; !exists { + self.store[ctx.UserName] = make(map[string]Session) + } + self.store[ctx.UserName][resp.id] = Session{ImportContext: ctx} + rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id) + return +} + +func (self *SessionStore) NewSession(ctx ImportContext) (string, error) { + req := newSessionRequest{} + req.ctx = ctx + req.response = make(chan newSessionResponse) + self.newChan <- req + + res := <-req.response + if res.err != nil { + return "", res.err + } + return res.id, nil +} + +func (self *SessionStore) run(user, id string) (resp sessionRunResponse) { + rhdl.Printf("SessionStore: running session '%s/%s'", user, id) + return +} + +func (self *SessionStore) Run(user, id string) error { + req := sessionRunRequest{} + req.user = user + req.id = id + req.response = make(chan sessionRunResponse) + self.runChan <- req + + res := <-req.response + return res.err +} + +func (self *SessionStore) addProgressHandler(user, id string, userdata interface{}, handler chan ProgressData) (resp sessionAddProgressHandlerResponse) { + rhdl.Printf("SessionStore: adding progress handler to '%s/%s'", user, id) + return +} + +func (self *SessionStore) AddProgressHandler(user, id string, userdata interface{}, handler chan ProgressData) error { + req := sessionAddProgressHandlerRequest{} + req.user = user + req.id = id + req.userdata = userdata + req.handler = handler + req.response = make(chan sessionAddProgressHandlerResponse) + self.addProgressChan <- req + + res := <-req.response + return res.err +} + +func (self *SessionStore) addDoneHandler(user, id string, userdata interface{}, handler chan ImportResult) (resp sessionAddDoneHandlerResponse) { + rhdl.Printf("SessionStore: adding done handler to '%s/%s'", user, id) + return +} + +func (self *SessionStore) AddDoneHandler(user, id string, userdata interface{}, handler chan ImportResult) error { + req := sessionAddDoneHandlerRequest{} + req.user = user + req.id = id + req.userdata = userdata + req.handler = handler + req.response = make(chan sessionAddDoneHandlerResponse) + self.addDoneChan <- req + + res := <-req.response + return res.err +} + +func (self *SessionStore) remove(user, id string) (resp sessionRemoveResponse) { + if _, exists := self.store[user][id]; exists { + delete(self.store[user], id) + rhdl.Printf("SessionStore: removed session '%s/%s'", user, id) + if _, exists := self.store[user]; exists { + if len(self.store[user]) == 0 { + delete(self.store, user) + rhdl.Printf("SessionStore: removed user '%s'", user) + } + } + } else { + resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) + } + return +} + +func (self *SessionStore) Remove(user, id string) error { + req := sessionRemoveRequest{} + req.user = user + req.id = id + req.response = make(chan sessionRemoveResponse) + self.removeChan <- req + + res := <-req.response + return res.err +} + +func (self *SessionStore) dispatchRequests() { + defer func() { self.done <- true }() + for { + select { + case <-self.quit: + return + case req := <-self.newChan: + req.response <- self.newSession(req.ctx) + case req := <-self.runChan: + req.response <- self.run(req.user, req.id) + case req := <-self.addProgressChan: + req.response <- self.addProgressHandler(req.user, req.id, req.userdata, req.handler) + case req := <-self.addDoneChan: + req.response <- self.addDoneHandler(req.user, req.id, req.userdata, req.handler) + case req := <-self.removeChan: + req.response <- self.remove(req.user, req.id) + } + } +} + +func (self *SessionStore) Cleanup() { + self.quit <- true + <-self.done + close(self.quit) + close(self.done) + close(self.newChan) + close(self.runChan) + close(self.addProgressChan) + close(self.addDoneChan) + close(self.removeChan) +} + +func NewSessionStore(conf *Config) (store *SessionStore, err error) { + store = new(SessionStore) + + store.quit = make(chan bool) + store.done = make(chan bool) + store.store = make(map[string]map[string]Session) + store.newChan = make(chan newSessionRequest) + store.runChan = make(chan sessionRunRequest) + store.addProgressChan = make(chan sessionAddProgressHandlerRequest) + store.addDoneChan = make(chan sessionAddDoneHandlerRequest) + store.removeChan = make(chan sessionRemoveRequest) + + go store.dispatchRequests() + return +} diff --git a/src/helsinki.at/rhimportd/main.go b/src/helsinki.at/rhimportd/main.go index 32b81d7..8f1c65d 100644 --- a/src/helsinki.at/rhimportd/main.go +++ b/src/helsinki.at/rhimportd/main.go @@ -28,8 +28,8 @@ import ( "flag" "log" "os" - "os/signal" - "sync" + // "os/signal" + // "sync" "helsinki.at/rhimport" // "io/ioutil" @@ -42,7 +42,7 @@ var ( ) func main() { - web_addr_s := flag.String("web-addr", ":4000", "addr:port to listen on") + // web_addr_s := flag.String("web-addr", ":4000", "addr:port to listen on") rdconf_s := flag.String("rdconf", "/etc/rd.conf", "path to the Rivendell config file") rdxport_url_s := flag.String("rdxport-url", "http://localhost/rd-bin/rdxport.cgi", "the url to the Rivendell web-api") temp_dir_s := flag.String("tmp-dir", os.TempDir(), "path to temporary files") @@ -68,30 +68,74 @@ func main() { } defer rddb.Cleanup() - var wg sync.WaitGroup + sessions, err := rhimport.NewSessionStore(conf) + if err != nil { + rhl.Println("Error initializing Session Store:", err) + return + } + defer sessions.Cleanup() - wg.Add(1) - go func() { - defer wg.Done() - rhl.Println("start web-ctrl") - StartControlWeb(*web_addr_s, conf, rddb) - rhl.Println("web-ctrl finished") - }() + ctx := rhimport.ImportContext{UserName: "hugo"} + var id string + if id, err = sessions.NewSession(ctx); err != nil { + rhl.Println("NewSession:", err) + } - alldone := make(chan bool) - go func() { - defer func() { alldone <- true }() - wg.Wait() - }() + pchan := make(chan rhimport.ProgressData) + if err = sessions.AddProgressHandler(ctx.UserName, id, "mydata", pchan); err != nil { + rhl.Println("AddProgressHanlder:", err) + } - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) + dchan := make(chan rhimport.ImportResult) + if err = sessions.AddDoneHandler(ctx.UserName, id, "mydata", dchan); err != nil { + rhl.Println("AddDoneHanlder:", err) + } - select { - case <-c: - rhl.Println("received interrupt, shutdown") - return - case <-alldone: - return + if err = sessions.Run(ctx.UserName, id); err != nil { + rhl.Println("Run:", err) } + + var id2 string + if id2, err = sessions.NewSession(ctx); err != nil { + rhl.Println("NewSession:", err) + } + + if err = sessions.Remove(ctx.UserName, id); err != nil { + rhl.Println("Remove:", err) + } + + if err = sessions.Remove(ctx.UserName, id); err != nil { + rhl.Println("Remove:", err) + } + + if err = sessions.Remove(ctx.UserName, id2); err != nil { + rhl.Println("Remove:", err) + } + + // var wg sync.WaitGroup + + // wg.Add(1) + // go func() { + // defer wg.Done() + // rhl.Println("start web-ctrl") + // StartControlWeb(*web_addr_s, conf, rddb) + // rhl.Println("web-ctrl finished") + // }() + + // alldone := make(chan bool) + // go func() { + // defer func() { alldone <- true }() + // wg.Wait() + // }() + + // c := make(chan os.Signal, 1) + // signal.Notify(c, os.Interrupt) + + // select { + // case <-c: + // rhl.Println("received interrupt, shutdown") + // return + // case <-alldone: + // return + // } } |