summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2015-12-20 04:46:01 (GMT)
committerChristian Pointner <equinox@helsinki.at>2015-12-20 04:46:01 (GMT)
commit75dd88f110c117b7fc73b83b13efd2385347f1b9 (patch)
treea0f17c7d8a60b98f894c3222824ee3cd92fe85ae
parent8fc2a9c93d566feae576c1421b3c64dabc4b4976 (diff)
session store and session kind of work now
still a lot to be done and a lot of testing is needed!
-rw-r--r--src/helsinki.at/rhimport/importer.go4
-rw-r--r--src/helsinki.at/rhimport/rddb.go2
-rw-r--r--src/helsinki.at/rhimport/session.go270
-rw-r--r--src/helsinki.at/rhimport/session_store.go206
-rw-r--r--src/helsinki.at/rhimportd/main.go57
5 files changed, 355 insertions, 184 deletions
diff --git a/src/helsinki.at/rhimport/importer.go b/src/helsinki.at/rhimport/importer.go
index f65515b..292d376 100644
--- a/src/helsinki.at/rhimport/importer.go
+++ b/src/helsinki.at/rhimport/importer.go
@@ -39,6 +39,8 @@ var (
bool2str = map[bool]string{false: "0", true: "1"}
)
+type ImportProgressCB func(step int, step_name string, progress float64, userdata interface{})
+
type ImportContext struct {
conf *Config
rddb *RdDbChan
@@ -59,7 +61,7 @@ type ImportContext struct {
SourceFile string
DeleteSourceFile bool
DeleteSourceDir bool
- ProgressCallBack func(step int, step_name string, progress float64, userdata interface{})
+ ProgressCallBack ImportProgressCB
ProgressCallBackData interface{}
}
diff --git a/src/helsinki.at/rhimport/rddb.go b/src/helsinki.at/rhimport/rddb.go
index f596171..bb481f4 100644
--- a/src/helsinki.at/rhimport/rddb.go
+++ b/src/helsinki.at/rhimport/rddb.go
@@ -297,7 +297,7 @@ func (self *RdDb) dispatchRequests() {
}
}
-func (self *RdDb) GetChannels() *RdDbChan {
+func (self *RdDb) GetInterface() *RdDbChan {
ch := &RdDbChan{}
ch.getPasswordChan = self.getPasswordChan
ch.getGroupOfCartChan = self.getGroupOfCartChan
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go
index 718d13e..ce84053 100644
--- a/src/helsinki.at/rhimport/session.go
+++ b/src/helsinki.at/rhimport/session.go
@@ -25,43 +25,33 @@
package rhimport
import (
- "encoding/base32"
- "fmt"
- "github.com/satori/go.uuid"
- "strings"
+ "time"
)
+type SessionChan struct {
+ runChan chan<- bool
+ cancelChan chan<- bool
+ addProgressChan chan<- sessionAddProgressHandlerRequest
+ addDoneChan chan<- sessionAddDoneHandlerRequest
+}
+
type Session struct {
- ImportContext
- ImportResult
- // TODO: add creation time for timeout
+ ctx ImportContext
+ running bool
+ done chan bool
+ progressIntChan chan ProgressData
+ doneIntChan chan ImportResult
+ runChan chan bool
+ cancelChan chan bool
+ addProgressChan chan sessionAddProgressHandlerRequest
+ addDoneChan chan sessionAddDoneHandlerRequest
+ // TODO: add pub/sub for progress and done
}
type ProgressData struct {
step int
step_name string
progress float64
- userdata interface{}
-}
-
-type newSessionResponse struct {
- id string
- err error
-}
-
-type newSessionRequest struct {
- ctx ImportContext
- response chan newSessionResponse
-}
-
-type sessionRunResponse struct {
- err error
-}
-
-type sessionRunRequest struct {
- user string
- id string
- response chan<- sessionRunResponse
}
type sessionAddProgressHandlerResponse struct {
@@ -69,10 +59,8 @@ type sessionAddProgressHandlerResponse struct {
}
type sessionAddProgressHandlerRequest struct {
- user string
- id string
userdata interface{}
- handler chan<- ProgressData
+ callback ImportProgressCB
response chan<- sessionAddProgressHandlerResponse
}
@@ -81,100 +69,57 @@ type sessionAddDoneHandlerResponse struct {
}
type sessionAddDoneHandlerRequest struct {
- user string
- id string
userdata interface{}
- handler chan<- ImportResult
+ callback func(*ImportResult, interface{})
response chan<- sessionAddDoneHandlerResponse
}
-type sessionRemoveResponse struct {
- err error
-}
-
-type sessionRemoveRequest struct {
- user string
- id string
- response chan sessionRemoveResponse
-}
-
-type SessionStore struct {
- store map[string]map[string]Session
- quit chan bool
- done chan bool
- newChan chan newSessionRequest
- runChan chan sessionRunRequest
- addProgressChan chan sessionAddProgressHandlerRequest
- addDoneChan chan sessionAddDoneHandlerRequest
- removeChan chan sessionRemoveRequest
+func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) {
+ out := userdata.(chan<- ProgressData)
+ out <- ProgressData{step, step_name, progress}
}
-func (self *SessionStore) Init() (err error) {
- return
-}
-
-func (self *SessionStore) newSession(ctx ImportContext) (resp newSessionResponse) {
- b := uuid.NewV4().Bytes()
- resp.id = strings.ToLower(strings.TrimRight(base32.StdEncoding.EncodeToString(b), "="))
- if _, exists := self.store[ctx.UserName]; !exists {
- self.store[ctx.UserName] = make(map[string]Session)
+// TODO: actually call import here
+func session_import_run(ctx ImportContext, done chan<- ImportResult) {
+ rhdl.Printf("faking import for: %+v", ctx)
+ for i := 0; i < 100; i++ {
+ if ctx.ProgressCallBack != nil {
+ ctx.ProgressCallBack(42, "faking", float64(i)/100.0, ctx.ProgressCallBackData)
+ }
+ time.Sleep(100 * time.Millisecond)
}
- self.store[ctx.UserName][resp.id] = Session{ImportContext: ctx}
- rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id)
- return
-}
-
-func (self *SessionStore) NewSession(ctx ImportContext) (string, error) {
- res_ch := make(chan newSessionResponse)
- req := newSessionRequest{}
- req.ctx = ctx
- req.response = res_ch
- self.newChan <- req
-
- res := <-res_ch
- if res.err != nil {
- return "", res.err
+ if ctx.ProgressCallBack != nil {
+ ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData)
}
- return res.id, nil
+ done <- ImportResult{200, "OK", 0, 0}
}
-func (self *SessionStore) run(user, id string) (resp sessionRunResponse) {
- if _, exists := self.store[user][id]; exists {
- rhdl.Printf("SessionStore: running session '%s/%s'", user, id)
- } else {
- resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id)
- }
+func (self *Session) run() {
+ self.ctx.ProgressCallBack = session_progress_callback
+ self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan)
+ go session_import_run(self.ctx, self.doneIntChan)
+ self.running = true
return
}
-func (self *SessionStore) Run(user, id string) error {
- res_ch := make(chan sessionRunResponse)
- req := sessionRunRequest{}
- req.user = user
- req.id = id
- req.response = res_ch
- self.runChan <- req
+func (self *SessionChan) Run() {
+ self.runChan <- true
+}
- res := <-res_ch
- return res.err
+func (self *SessionChan) Cancel() {
+ self.cancelChan <- true
}
-func (self *SessionStore) addProgressHandler(user, id string, userdata interface{}, handler chan<- ProgressData) (resp sessionAddProgressHandlerResponse) {
- if _, exists := self.store[user][id]; exists {
- rhdl.Printf("SessionStore: adding progress handler to '%s/%s'", user, id)
- } else {
- resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id)
- }
+func (self *Session) addProgressHandler(userdata interface{}, cb ImportProgressCB) (resp sessionAddProgressHandlerResponse) {
+ rhdl.Printf("Session: addProgressHandler called with: %+v", userdata)
return
}
-func (self *SessionStore) AddProgressHandler(user, id string, userdata interface{}, handler chan<- ProgressData) error {
+func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgressCB) error {
res_ch := make(chan sessionAddProgressHandlerResponse)
req := sessionAddProgressHandlerRequest{}
- req.user = user
- req.id = id
req.userdata = userdata
- req.handler = handler
+ req.callback = cb
req.response = res_ch
self.addProgressChan <- req
@@ -182,22 +127,16 @@ func (self *SessionStore) AddProgressHandler(user, id string, userdata interface
return res.err
}
-func (self *SessionStore) addDoneHandler(user, id string, userdata interface{}, handler chan<- ImportResult) (resp sessionAddDoneHandlerResponse) {
- if _, exists := self.store[user][id]; exists {
- rhdl.Printf("SessionStore: adding done handler to '%s/%s'", user, id)
- } else {
- resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id)
- }
+func (self *Session) addDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) (resp sessionAddDoneHandlerResponse) {
+ rhdl.Printf("Session: addDoneHandler called with: %+v", userdata)
return
}
-func (self *SessionStore) AddDoneHandler(user, id string, userdata interface{}, handler chan<- ImportResult) error {
+func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) error {
res_ch := make(chan sessionAddDoneHandlerResponse)
req := sessionAddDoneHandlerRequest{}
- req.user = user
- req.id = id
req.userdata = userdata
- req.handler = handler
+ req.callback = cb
req.response = res_ch
self.addDoneChan <- req
@@ -205,78 +144,69 @@ func (self *SessionStore) AddDoneHandler(user, id string, userdata interface{},
return res.err
}
-func (self *SessionStore) remove(user, id string) (resp sessionRemoveResponse) {
- if _, exists := self.store[user][id]; exists {
- delete(self.store[user], id)
- rhdl.Printf("SessionStore: removed session '%s/%s'", user, id)
- if _, exists := self.store[user]; exists {
- if len(self.store[user]) == 0 {
- delete(self.store, user)
- rhdl.Printf("SessionStore: removed user '%s'", user)
- }
- }
- } else {
- resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id)
- }
- return
-}
-
-func (self *SessionStore) Remove(user, id string) error {
- res_ch := make(chan sessionRemoveResponse)
- req := sessionRemoveRequest{}
- req.user = user
- req.id = id
- req.response = res_ch
- self.removeChan <- req
-
- res := <-res_ch
- return res.err
-}
-
-func (self *SessionStore) dispatchRequests() {
+func (self *Session) dispatchRequests() {
defer func() { self.done <- true }()
for {
select {
- case <-self.quit:
- return
- case req := <-self.newChan:
- req.response <- self.newSession(req.ctx)
- case req := <-self.runChan:
- req.response <- self.run(req.user, req.id)
+ case <-self.runChan:
+ self.run()
+ case <-self.cancelChan:
+ if self.running {
+ rhdl.Println("Session: canceling running imports is not yet implemented")
+ // TODO: send cancel to import goroutine once this got implemented
+ } else {
+ return
+ }
case req := <-self.addProgressChan:
- req.response <- self.addProgressHandler(req.user, req.id, req.userdata, req.handler)
+ req.response <- self.addProgressHandler(req.userdata, req.callback)
case req := <-self.addDoneChan:
- req.response <- self.addDoneHandler(req.user, req.id, req.userdata, req.handler)
- case req := <-self.removeChan:
- req.response <- self.remove(req.user, req.id)
+ req.response <- self.addDoneHandler(req.userdata, req.callback)
+ case progress := <-self.progressIntChan:
+ rhdl.Printf("Session: got progress: %+v", progress)
+ // TODO: call all subscribed progress handler
+ case result := <-self.doneIntChan:
+ self.running = false
+ rhdl.Printf("Session: import is done: %+v", result)
+ // TODO: call all subscribed done handler
+ // TODO: send remove request to session store?
+ return
}
}
}
-func (self *SessionStore) Cleanup() {
- self.quit <- true
+func (self *Session) getInterface() *SessionChan {
+ ch := &SessionChan{}
+ ch.runChan = self.runChan
+ ch.cancelChan = self.cancelChan
+ ch.addProgressChan = self.addProgressChan
+ ch.addDoneChan = self.addDoneChan
+ return ch
+}
+
+func (self *Session) Cleanup() {
+ // TODO: this blocks if dispatchRequests has ended already...
+ self.cancelChan <- true
<-self.done
- close(self.quit)
close(self.done)
- close(self.newChan)
+ close(self.progressIntChan)
+ close(self.doneIntChan)
close(self.runChan)
+ close(self.cancelChan)
close(self.addProgressChan)
close(self.addDoneChan)
- close(self.removeChan)
}
-func NewSessionStore(conf *Config) (store *SessionStore, err error) {
- store = new(SessionStore)
-
- store.quit = make(chan bool)
- store.done = make(chan bool)
- store.store = make(map[string]map[string]Session)
- store.newChan = make(chan newSessionRequest)
- store.runChan = make(chan sessionRunRequest)
- store.addProgressChan = make(chan sessionAddProgressHandlerRequest)
- store.addDoneChan = make(chan sessionAddDoneHandlerRequest)
- store.removeChan = make(chan sessionRemoveRequest)
-
- go store.dispatchRequests()
+func NewSession(ctx *ImportContext) (session *Session) {
+ session = new(Session)
+ session.running = false
+ session.ctx = *ctx
+ session.done = make(chan bool)
+ session.progressIntChan = make(chan ProgressData)
+ session.doneIntChan = make(chan ImportResult)
+ session.runChan = make(chan bool)
+ session.cancelChan = make(chan bool)
+ session.addProgressChan = make(chan sessionAddProgressHandlerRequest)
+ session.addDoneChan = make(chan sessionAddDoneHandlerRequest)
+ go session.dispatchRequests()
return
}
diff --git a/src/helsinki.at/rhimport/session_store.go b/src/helsinki.at/rhimport/session_store.go
new file mode 100644
index 0000000..b2e1538
--- /dev/null
+++ b/src/helsinki.at/rhimport/session_store.go
@@ -0,0 +1,206 @@
+//
+// rhimportd
+//
+// The Radio Helsinki Rivendell Import Daemon
+//
+//
+// Copyright (C) 2015 Christian Pointner <equinox@helsinki.at>
+//
+// This file is part of rhimportd.
+//
+// rhimportd is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// any later version.
+//
+// rhimportd is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with rhimportd. If not, see <http://www.gnu.org/licenses/>.
+//
+
+package rhimport
+
+import (
+ "encoding/base32"
+ "fmt"
+ "github.com/satori/go.uuid"
+ "strings"
+)
+
+type newSessionResponse struct {
+ id string
+ session *SessionChan
+ err error
+}
+
+type newSessionRequest struct {
+ ctx *ImportContext
+ response chan newSessionResponse
+}
+
+type getSessionResponse struct {
+ session *SessionChan
+ err error
+}
+
+type getSessionRequest struct {
+ user string
+ id string
+ response chan getSessionResponse
+}
+
+type removeSessionResponse struct {
+ err error
+}
+
+type removeSessionRequest struct {
+ user string
+ id string
+ response chan removeSessionResponse
+}
+
+type SessionStoreChan struct {
+ newChan chan<- newSessionRequest
+ getChan chan<- getSessionRequest
+ removeChan chan<- removeSessionRequest
+}
+
+type SessionStore struct {
+ store map[string]map[string]*Session
+ quit chan bool
+ done chan bool
+ newChan chan newSessionRequest
+ getChan chan getSessionRequest
+ removeChan chan removeSessionRequest
+}
+
+func (self *SessionStore) new(ctx *ImportContext) (resp newSessionResponse) {
+ b := uuid.NewV4().Bytes()
+ resp.id = strings.ToLower(strings.TrimRight(base32.StdEncoding.EncodeToString(b), "="))
+ if _, exists := self.store[ctx.UserName]; !exists {
+ self.store[ctx.UserName] = make(map[string]*Session)
+ }
+ self.store[ctx.UserName][resp.id] = NewSession(ctx)
+ resp.session = self.store[ctx.UserName][resp.id].getInterface()
+ rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id)
+ return
+}
+
+func (self *SessionStoreChan) New(ctx *ImportContext) (string, *SessionChan, error) {
+ res_ch := make(chan newSessionResponse)
+ req := newSessionRequest{}
+ req.ctx = ctx
+ req.response = res_ch
+ self.newChan <- req
+
+ res := <-res_ch
+ if res.err != nil {
+ return "", nil, res.err
+ }
+ return res.id, res.session, nil
+}
+
+func (self *SessionStore) get(user, id string) (resp getSessionResponse) {
+ if session, exists := self.store[user][id]; exists {
+ resp.session = session.getInterface()
+ } else {
+ resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id)
+ }
+ return
+}
+
+func (self *SessionStoreChan) Get(user, id string) (*SessionChan, error) {
+ res_ch := make(chan getSessionResponse)
+ req := getSessionRequest{}
+ req.user = user
+ req.id = id
+ req.response = res_ch
+ self.getChan <- req
+
+ res := <-res_ch
+ if res.err != nil {
+ return nil, res.err
+ }
+ return res.session, nil
+}
+
+func (self *SessionStore) remove(user, id string) (resp removeSessionResponse) {
+ if session, exists := self.store[user][id]; exists {
+ session.Cleanup()
+ delete(self.store[user], id)
+ rhdl.Printf("SessionStore: removed session '%s/%s'", user, id)
+ if userstore, exists := self.store[user]; exists {
+ if len(userstore) == 0 {
+ delete(self.store, user)
+ rhdl.Printf("SessionStore: removed user '%s'", user)
+ }
+ }
+ } else {
+ resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id)
+ }
+ return
+}
+
+func (self *SessionStoreChan) Remove(user, id string) error {
+ res_ch := make(chan removeSessionResponse)
+ req := removeSessionRequest{}
+ req.user = user
+ req.id = id
+ req.response = res_ch
+ self.removeChan <- req
+
+ res := <-res_ch
+ return res.err
+}
+
+func (self *SessionStore) dispatchRequests() {
+ defer func() { self.done <- true }()
+ for {
+ select {
+ case <-self.quit:
+ return
+ case req := <-self.newChan:
+ req.response <- self.new(req.ctx)
+ case req := <-self.getChan:
+ req.response <- self.get(req.user, req.id)
+ case req := <-self.removeChan:
+ req.response <- self.remove(req.user, req.id)
+ }
+ }
+}
+
+func (self *SessionStore) GetInterface() *SessionStoreChan {
+ ch := &SessionStoreChan{}
+ ch.newChan = self.newChan
+ ch.getChan = self.getChan
+ ch.removeChan = self.removeChan
+ return ch
+}
+
+func (self *SessionStore) Cleanup() {
+ self.quit <- true
+ <-self.done
+ close(self.quit)
+ close(self.done)
+ close(self.newChan)
+ close(self.getChan)
+ close(self.removeChan)
+}
+
+func NewSessionStore(conf *Config) (store *SessionStore, err error) {
+ store = new(SessionStore)
+
+ store.quit = make(chan bool)
+ store.done = make(chan bool)
+ store.store = make(map[string]map[string]*Session)
+ store.newChan = make(chan newSessionRequest)
+ store.getChan = make(chan getSessionRequest)
+ store.removeChan = make(chan removeSessionRequest)
+
+ go store.dispatchRequests()
+ return
+}
diff --git a/src/helsinki.at/rhimportd/main.go b/src/helsinki.at/rhimportd/main.go
index d599cd5..51a406e 100644
--- a/src/helsinki.at/rhimportd/main.go
+++ b/src/helsinki.at/rhimportd/main.go
@@ -26,10 +26,12 @@ package main
import (
"flag"
+ "fmt"
"log"
"os"
"os/signal"
"sync"
+ "time"
"helsinki.at/rhimport"
// "io/ioutil"
@@ -42,8 +44,8 @@ var (
)
func main() {
- web_addr_s := flag.String("web-addr", ":4080", "addr:port to listen on")
- telnet_addr_s := flag.String("telnet-addr", ":4023", "addr:port to listen on")
+ // web_addr_s := flag.String("web-addr", ":4080", "addr:port to listen on")
+ // telnet_addr_s := flag.String("telnet-addr", ":4023", "addr:port to listen on")
rdconf_s := flag.String("rdconf", "/etc/rd.conf", "path to the Rivendell config file")
rdxport_url_s := flag.String("rdxport-url", "http://localhost/rd-bin/rdxport.cgi", "the url to the Rivendell web-api")
temp_dir_s := flag.String("tmp-dir", os.TempDir(), "path to temporary files")
@@ -76,23 +78,54 @@ func main() {
}
defer sessions.Cleanup()
+ store := sessions.GetInterface()
+
+ ctx := rhimport.NewImportContext(conf, rddb.GetInterface(), "hugo")
+ id, session, err := store.New(ctx)
+ if err != nil {
+ rhl.Printf("Error SessionStore.New(): %s", err)
+ return
+ }
+
+ fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
+
+ session.Run()
+
+ fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
+
+ time.Sleep(12 * time.Second)
+
+ fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
+
+ store.Remove("hugo", id)
+
+ fmt.Printf("\n\nSESSION_STORE: %+v\n\n", sessions)
+
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
- rhl.Println("starting web-ctrl")
- StartControlWeb(*web_addr_s, conf, rddb.GetChannels())
- rhl.Println("web-ctrl finished")
+ for {
+ time.Sleep(10 * time.Second)
+ }
}()
- wg.Add(1)
- go func() {
- defer wg.Done()
- rhl.Println("starting telnet-ctrl")
- StartControlTelnet(*telnet_addr_s, conf, rddb.GetChannels())
- rhl.Println("telnet-ctrl finished")
- }()
+ // wg.Add(1)
+ // go func() {
+ // defer wg.Done()
+ // rhl.Println("starting web-ctrl")
+ // StartControlWeb(*web_addr_s, conf, rddb.GetInterface())
+ // rhl.Println("web-ctrl finished")
+ // }()
+
+ // wg.Add(1)
+ // go func() {
+ // defer wg.Done()
+ // rhl.Println("starting telnet-ctrl")
+ // StartControlTelnet(*telnet_addr_s, conf, rddb.GetInterface())
+ // rhl.Println("telnet-ctrl finished")
+ // }()
alldone := make(chan bool)
go func() {