summaryrefslogtreecommitdiff
path: root/src/helsinki.at/rhimport/session.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/helsinki.at/rhimport/session.go')
-rw-r--r--src/helsinki.at/rhimport/session.go270
1 files changed, 100 insertions, 170 deletions
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go
index 718d13e..ce84053 100644
--- a/src/helsinki.at/rhimport/session.go
+++ b/src/helsinki.at/rhimport/session.go
@@ -25,43 +25,33 @@
package rhimport
import (
- "encoding/base32"
- "fmt"
- "github.com/satori/go.uuid"
- "strings"
+ "time"
)
+type SessionChan struct {
+ runChan chan<- bool
+ cancelChan chan<- bool
+ addProgressChan chan<- sessionAddProgressHandlerRequest
+ addDoneChan chan<- sessionAddDoneHandlerRequest
+}
+
type Session struct {
- ImportContext
- ImportResult
- // TODO: add creation time for timeout
+ ctx ImportContext
+ running bool
+ done chan bool
+ progressIntChan chan ProgressData
+ doneIntChan chan ImportResult
+ runChan chan bool
+ cancelChan chan bool
+ addProgressChan chan sessionAddProgressHandlerRequest
+ addDoneChan chan sessionAddDoneHandlerRequest
+ // TODO: add pub/sub for progress and done
}
type ProgressData struct {
step int
step_name string
progress float64
- userdata interface{}
-}
-
-type newSessionResponse struct {
- id string
- err error
-}
-
-type newSessionRequest struct {
- ctx ImportContext
- response chan newSessionResponse
-}
-
-type sessionRunResponse struct {
- err error
-}
-
-type sessionRunRequest struct {
- user string
- id string
- response chan<- sessionRunResponse
}
type sessionAddProgressHandlerResponse struct {
@@ -69,10 +59,8 @@ type sessionAddProgressHandlerResponse struct {
}
type sessionAddProgressHandlerRequest struct {
- user string
- id string
userdata interface{}
- handler chan<- ProgressData
+ callback ImportProgressCB
response chan<- sessionAddProgressHandlerResponse
}
@@ -81,100 +69,57 @@ type sessionAddDoneHandlerResponse struct {
}
type sessionAddDoneHandlerRequest struct {
- user string
- id string
userdata interface{}
- handler chan<- ImportResult
+ callback func(*ImportResult, interface{})
response chan<- sessionAddDoneHandlerResponse
}
-type sessionRemoveResponse struct {
- err error
-}
-
-type sessionRemoveRequest struct {
- user string
- id string
- response chan sessionRemoveResponse
-}
-
-type SessionStore struct {
- store map[string]map[string]Session
- quit chan bool
- done chan bool
- newChan chan newSessionRequest
- runChan chan sessionRunRequest
- addProgressChan chan sessionAddProgressHandlerRequest
- addDoneChan chan sessionAddDoneHandlerRequest
- removeChan chan sessionRemoveRequest
+func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) {
+ out := userdata.(chan<- ProgressData)
+ out <- ProgressData{step, step_name, progress}
}
-func (self *SessionStore) Init() (err error) {
- return
-}
-
-func (self *SessionStore) newSession(ctx ImportContext) (resp newSessionResponse) {
- b := uuid.NewV4().Bytes()
- resp.id = strings.ToLower(strings.TrimRight(base32.StdEncoding.EncodeToString(b), "="))
- if _, exists := self.store[ctx.UserName]; !exists {
- self.store[ctx.UserName] = make(map[string]Session)
+// TODO: actually call import here
+func session_import_run(ctx ImportContext, done chan<- ImportResult) {
+ rhdl.Printf("faking import for: %+v", ctx)
+ for i := 0; i < 100; i++ {
+ if ctx.ProgressCallBack != nil {
+ ctx.ProgressCallBack(42, "faking", float64(i)/100.0, ctx.ProgressCallBackData)
+ }
+ time.Sleep(100 * time.Millisecond)
}
- self.store[ctx.UserName][resp.id] = Session{ImportContext: ctx}
- rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id)
- return
-}
-
-func (self *SessionStore) NewSession(ctx ImportContext) (string, error) {
- res_ch := make(chan newSessionResponse)
- req := newSessionRequest{}
- req.ctx = ctx
- req.response = res_ch
- self.newChan <- req
-
- res := <-res_ch
- if res.err != nil {
- return "", res.err
+ if ctx.ProgressCallBack != nil {
+ ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData)
}
- return res.id, nil
+ done <- ImportResult{200, "OK", 0, 0}
}
-func (self *SessionStore) run(user, id string) (resp sessionRunResponse) {
- if _, exists := self.store[user][id]; exists {
- rhdl.Printf("SessionStore: running session '%s/%s'", user, id)
- } else {
- resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id)
- }
+func (self *Session) run() {
+ self.ctx.ProgressCallBack = session_progress_callback
+ self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan)
+ go session_import_run(self.ctx, self.doneIntChan)
+ self.running = true
return
}
-func (self *SessionStore) Run(user, id string) error {
- res_ch := make(chan sessionRunResponse)
- req := sessionRunRequest{}
- req.user = user
- req.id = id
- req.response = res_ch
- self.runChan <- req
+func (self *SessionChan) Run() {
+ self.runChan <- true
+}
- res := <-res_ch
- return res.err
+func (self *SessionChan) Cancel() {
+ self.cancelChan <- true
}
-func (self *SessionStore) addProgressHandler(user, id string, userdata interface{}, handler chan<- ProgressData) (resp sessionAddProgressHandlerResponse) {
- if _, exists := self.store[user][id]; exists {
- rhdl.Printf("SessionStore: adding progress handler to '%s/%s'", user, id)
- } else {
- resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id)
- }
+func (self *Session) addProgressHandler(userdata interface{}, cb ImportProgressCB) (resp sessionAddProgressHandlerResponse) {
+ rhdl.Printf("Session: addProgressHandler called with: %+v", userdata)
return
}
-func (self *SessionStore) AddProgressHandler(user, id string, userdata interface{}, handler chan<- ProgressData) error {
+func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgressCB) error {
res_ch := make(chan sessionAddProgressHandlerResponse)
req := sessionAddProgressHandlerRequest{}
- req.user = user
- req.id = id
req.userdata = userdata
- req.handler = handler
+ req.callback = cb
req.response = res_ch
self.addProgressChan <- req
@@ -182,22 +127,16 @@ func (self *SessionStore) AddProgressHandler(user, id string, userdata interface
return res.err
}
-func (self *SessionStore) addDoneHandler(user, id string, userdata interface{}, handler chan<- ImportResult) (resp sessionAddDoneHandlerResponse) {
- if _, exists := self.store[user][id]; exists {
- rhdl.Printf("SessionStore: adding done handler to '%s/%s'", user, id)
- } else {
- resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id)
- }
+func (self *Session) addDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) (resp sessionAddDoneHandlerResponse) {
+ rhdl.Printf("Session: addDoneHandler called with: %+v", userdata)
return
}
-func (self *SessionStore) AddDoneHandler(user, id string, userdata interface{}, handler chan<- ImportResult) error {
+func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) error {
res_ch := make(chan sessionAddDoneHandlerResponse)
req := sessionAddDoneHandlerRequest{}
- req.user = user
- req.id = id
req.userdata = userdata
- req.handler = handler
+ req.callback = cb
req.response = res_ch
self.addDoneChan <- req
@@ -205,78 +144,69 @@ func (self *SessionStore) AddDoneHandler(user, id string, userdata interface{},
return res.err
}
-func (self *SessionStore) remove(user, id string) (resp sessionRemoveResponse) {
- if _, exists := self.store[user][id]; exists {
- delete(self.store[user], id)
- rhdl.Printf("SessionStore: removed session '%s/%s'", user, id)
- if _, exists := self.store[user]; exists {
- if len(self.store[user]) == 0 {
- delete(self.store, user)
- rhdl.Printf("SessionStore: removed user '%s'", user)
- }
- }
- } else {
- resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id)
- }
- return
-}
-
-func (self *SessionStore) Remove(user, id string) error {
- res_ch := make(chan sessionRemoveResponse)
- req := sessionRemoveRequest{}
- req.user = user
- req.id = id
- req.response = res_ch
- self.removeChan <- req
-
- res := <-res_ch
- return res.err
-}
-
-func (self *SessionStore) dispatchRequests() {
+func (self *Session) dispatchRequests() {
defer func() { self.done <- true }()
for {
select {
- case <-self.quit:
- return
- case req := <-self.newChan:
- req.response <- self.newSession(req.ctx)
- case req := <-self.runChan:
- req.response <- self.run(req.user, req.id)
+ case <-self.runChan:
+ self.run()
+ case <-self.cancelChan:
+ if self.running {
+ rhdl.Println("Session: canceling running imports is not yet implemented")
+ // TODO: send cancel to import goroutine once this got implemented
+ } else {
+ return
+ }
case req := <-self.addProgressChan:
- req.response <- self.addProgressHandler(req.user, req.id, req.userdata, req.handler)
+ req.response <- self.addProgressHandler(req.userdata, req.callback)
case req := <-self.addDoneChan:
- req.response <- self.addDoneHandler(req.user, req.id, req.userdata, req.handler)
- case req := <-self.removeChan:
- req.response <- self.remove(req.user, req.id)
+ req.response <- self.addDoneHandler(req.userdata, req.callback)
+ case progress := <-self.progressIntChan:
+ rhdl.Printf("Session: got progress: %+v", progress)
+ // TODO: call all subscribed progress handler
+ case result := <-self.doneIntChan:
+ self.running = false
+ rhdl.Printf("Session: import is done: %+v", result)
+ // TODO: call all subscribed done handler
+ // TODO: send remove request to session store?
+ return
}
}
}
-func (self *SessionStore) Cleanup() {
- self.quit <- true
+func (self *Session) getInterface() *SessionChan {
+ ch := &SessionChan{}
+ ch.runChan = self.runChan
+ ch.cancelChan = self.cancelChan
+ ch.addProgressChan = self.addProgressChan
+ ch.addDoneChan = self.addDoneChan
+ return ch
+}
+
+func (self *Session) Cleanup() {
+ // TODO: this blocks if dispatchRequests has ended already...
+ self.cancelChan <- true
<-self.done
- close(self.quit)
close(self.done)
- close(self.newChan)
+ close(self.progressIntChan)
+ close(self.doneIntChan)
close(self.runChan)
+ close(self.cancelChan)
close(self.addProgressChan)
close(self.addDoneChan)
- close(self.removeChan)
}
-func NewSessionStore(conf *Config) (store *SessionStore, err error) {
- store = new(SessionStore)
-
- store.quit = make(chan bool)
- store.done = make(chan bool)
- store.store = make(map[string]map[string]Session)
- store.newChan = make(chan newSessionRequest)
- store.runChan = make(chan sessionRunRequest)
- store.addProgressChan = make(chan sessionAddProgressHandlerRequest)
- store.addDoneChan = make(chan sessionAddDoneHandlerRequest)
- store.removeChan = make(chan sessionRemoveRequest)
-
- go store.dispatchRequests()
+func NewSession(ctx *ImportContext) (session *Session) {
+ session = new(Session)
+ session.running = false
+ session.ctx = *ctx
+ session.done = make(chan bool)
+ session.progressIntChan = make(chan ProgressData)
+ session.doneIntChan = make(chan ImportResult)
+ session.runChan = make(chan bool)
+ session.cancelChan = make(chan bool)
+ session.addProgressChan = make(chan sessionAddProgressHandlerRequest)
+ session.addDoneChan = make(chan sessionAddDoneHandlerRequest)
+ go session.dispatchRequests()
return
}