diff options
Diffstat (limited to 'src/helsinki.at/rhimport/session.go')
-rw-r--r-- | src/helsinki.at/rhimport/session.go | 270 |
1 files changed, 100 insertions, 170 deletions
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go index 718d13e..ce84053 100644 --- a/src/helsinki.at/rhimport/session.go +++ b/src/helsinki.at/rhimport/session.go @@ -25,43 +25,33 @@ package rhimport import ( - "encoding/base32" - "fmt" - "github.com/satori/go.uuid" - "strings" + "time" ) +type SessionChan struct { + runChan chan<- bool + cancelChan chan<- bool + addProgressChan chan<- sessionAddProgressHandlerRequest + addDoneChan chan<- sessionAddDoneHandlerRequest +} + type Session struct { - ImportContext - ImportResult - // TODO: add creation time for timeout + ctx ImportContext + running bool + done chan bool + progressIntChan chan ProgressData + doneIntChan chan ImportResult + runChan chan bool + cancelChan chan bool + addProgressChan chan sessionAddProgressHandlerRequest + addDoneChan chan sessionAddDoneHandlerRequest + // TODO: add pub/sub for progress and done } type ProgressData struct { step int step_name string progress float64 - userdata interface{} -} - -type newSessionResponse struct { - id string - err error -} - -type newSessionRequest struct { - ctx ImportContext - response chan newSessionResponse -} - -type sessionRunResponse struct { - err error -} - -type sessionRunRequest struct { - user string - id string - response chan<- sessionRunResponse } type sessionAddProgressHandlerResponse struct { @@ -69,10 +59,8 @@ type sessionAddProgressHandlerResponse struct { } type sessionAddProgressHandlerRequest struct { - user string - id string userdata interface{} - handler chan<- ProgressData + callback ImportProgressCB response chan<- sessionAddProgressHandlerResponse } @@ -81,100 +69,57 @@ type sessionAddDoneHandlerResponse struct { } type sessionAddDoneHandlerRequest struct { - user string - id string userdata interface{} - handler chan<- ImportResult + callback func(*ImportResult, interface{}) response chan<- sessionAddDoneHandlerResponse } -type sessionRemoveResponse struct { - err error -} - -type sessionRemoveRequest struct { - user string - id string - response chan sessionRemoveResponse -} - -type SessionStore struct { - store map[string]map[string]Session - quit chan bool - done chan bool - newChan chan newSessionRequest - runChan chan sessionRunRequest - addProgressChan chan sessionAddProgressHandlerRequest - addDoneChan chan sessionAddDoneHandlerRequest - removeChan chan sessionRemoveRequest +func session_progress_callback(step int, step_name string, progress float64, userdata interface{}) { + out := userdata.(chan<- ProgressData) + out <- ProgressData{step, step_name, progress} } -func (self *SessionStore) Init() (err error) { - return -} - -func (self *SessionStore) newSession(ctx ImportContext) (resp newSessionResponse) { - b := uuid.NewV4().Bytes() - resp.id = strings.ToLower(strings.TrimRight(base32.StdEncoding.EncodeToString(b), "=")) - if _, exists := self.store[ctx.UserName]; !exists { - self.store[ctx.UserName] = make(map[string]Session) +// TODO: actually call import here +func session_import_run(ctx ImportContext, done chan<- ImportResult) { + rhdl.Printf("faking import for: %+v", ctx) + for i := 0; i < 100; i++ { + if ctx.ProgressCallBack != nil { + ctx.ProgressCallBack(42, "faking", float64(i)/100.0, ctx.ProgressCallBackData) + } + time.Sleep(100 * time.Millisecond) } - self.store[ctx.UserName][resp.id] = Session{ImportContext: ctx} - rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id) - return -} - -func (self *SessionStore) NewSession(ctx ImportContext) (string, error) { - res_ch := make(chan newSessionResponse) - req := newSessionRequest{} - req.ctx = ctx - req.response = res_ch - self.newChan <- req - - res := <-res_ch - if res.err != nil { - return "", res.err + if ctx.ProgressCallBack != nil { + ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData) } - return res.id, nil + done <- ImportResult{200, "OK", 0, 0} } -func (self *SessionStore) run(user, id string) (resp sessionRunResponse) { - if _, exists := self.store[user][id]; exists { - rhdl.Printf("SessionStore: running session '%s/%s'", user, id) - } else { - resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) - } +func (self *Session) run() { + self.ctx.ProgressCallBack = session_progress_callback + self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan) + go session_import_run(self.ctx, self.doneIntChan) + self.running = true return } -func (self *SessionStore) Run(user, id string) error { - res_ch := make(chan sessionRunResponse) - req := sessionRunRequest{} - req.user = user - req.id = id - req.response = res_ch - self.runChan <- req +func (self *SessionChan) Run() { + self.runChan <- true +} - res := <-res_ch - return res.err +func (self *SessionChan) Cancel() { + self.cancelChan <- true } -func (self *SessionStore) addProgressHandler(user, id string, userdata interface{}, handler chan<- ProgressData) (resp sessionAddProgressHandlerResponse) { - if _, exists := self.store[user][id]; exists { - rhdl.Printf("SessionStore: adding progress handler to '%s/%s'", user, id) - } else { - resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) - } +func (self *Session) addProgressHandler(userdata interface{}, cb ImportProgressCB) (resp sessionAddProgressHandlerResponse) { + rhdl.Printf("Session: addProgressHandler called with: %+v", userdata) return } -func (self *SessionStore) AddProgressHandler(user, id string, userdata interface{}, handler chan<- ProgressData) error { +func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgressCB) error { res_ch := make(chan sessionAddProgressHandlerResponse) req := sessionAddProgressHandlerRequest{} - req.user = user - req.id = id req.userdata = userdata - req.handler = handler + req.callback = cb req.response = res_ch self.addProgressChan <- req @@ -182,22 +127,16 @@ func (self *SessionStore) AddProgressHandler(user, id string, userdata interface return res.err } -func (self *SessionStore) addDoneHandler(user, id string, userdata interface{}, handler chan<- ImportResult) (resp sessionAddDoneHandlerResponse) { - if _, exists := self.store[user][id]; exists { - rhdl.Printf("SessionStore: adding done handler to '%s/%s'", user, id) - } else { - resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) - } +func (self *Session) addDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) (resp sessionAddDoneHandlerResponse) { + rhdl.Printf("Session: addDoneHandler called with: %+v", userdata) return } -func (self *SessionStore) AddDoneHandler(user, id string, userdata interface{}, handler chan<- ImportResult) error { +func (self *SessionChan) AddDoneHandler(userdata interface{}, cb func(*ImportResult, interface{})) error { res_ch := make(chan sessionAddDoneHandlerResponse) req := sessionAddDoneHandlerRequest{} - req.user = user - req.id = id req.userdata = userdata - req.handler = handler + req.callback = cb req.response = res_ch self.addDoneChan <- req @@ -205,78 +144,69 @@ func (self *SessionStore) AddDoneHandler(user, id string, userdata interface{}, return res.err } -func (self *SessionStore) remove(user, id string) (resp sessionRemoveResponse) { - if _, exists := self.store[user][id]; exists { - delete(self.store[user], id) - rhdl.Printf("SessionStore: removed session '%s/%s'", user, id) - if _, exists := self.store[user]; exists { - if len(self.store[user]) == 0 { - delete(self.store, user) - rhdl.Printf("SessionStore: removed user '%s'", user) - } - } - } else { - resp.err = fmt.Errorf("SessionStore: session '%s/%s' not found", user, id) - } - return -} - -func (self *SessionStore) Remove(user, id string) error { - res_ch := make(chan sessionRemoveResponse) - req := sessionRemoveRequest{} - req.user = user - req.id = id - req.response = res_ch - self.removeChan <- req - - res := <-res_ch - return res.err -} - -func (self *SessionStore) dispatchRequests() { +func (self *Session) dispatchRequests() { defer func() { self.done <- true }() for { select { - case <-self.quit: - return - case req := <-self.newChan: - req.response <- self.newSession(req.ctx) - case req := <-self.runChan: - req.response <- self.run(req.user, req.id) + case <-self.runChan: + self.run() + case <-self.cancelChan: + if self.running { + rhdl.Println("Session: canceling running imports is not yet implemented") + // TODO: send cancel to import goroutine once this got implemented + } else { + return + } case req := <-self.addProgressChan: - req.response <- self.addProgressHandler(req.user, req.id, req.userdata, req.handler) + req.response <- self.addProgressHandler(req.userdata, req.callback) case req := <-self.addDoneChan: - req.response <- self.addDoneHandler(req.user, req.id, req.userdata, req.handler) - case req := <-self.removeChan: - req.response <- self.remove(req.user, req.id) + req.response <- self.addDoneHandler(req.userdata, req.callback) + case progress := <-self.progressIntChan: + rhdl.Printf("Session: got progress: %+v", progress) + // TODO: call all subscribed progress handler + case result := <-self.doneIntChan: + self.running = false + rhdl.Printf("Session: import is done: %+v", result) + // TODO: call all subscribed done handler + // TODO: send remove request to session store? + return } } } -func (self *SessionStore) Cleanup() { - self.quit <- true +func (self *Session) getInterface() *SessionChan { + ch := &SessionChan{} + ch.runChan = self.runChan + ch.cancelChan = self.cancelChan + ch.addProgressChan = self.addProgressChan + ch.addDoneChan = self.addDoneChan + return ch +} + +func (self *Session) Cleanup() { + // TODO: this blocks if dispatchRequests has ended already... + self.cancelChan <- true <-self.done - close(self.quit) close(self.done) - close(self.newChan) + close(self.progressIntChan) + close(self.doneIntChan) close(self.runChan) + close(self.cancelChan) close(self.addProgressChan) close(self.addDoneChan) - close(self.removeChan) } -func NewSessionStore(conf *Config) (store *SessionStore, err error) { - store = new(SessionStore) - - store.quit = make(chan bool) - store.done = make(chan bool) - store.store = make(map[string]map[string]Session) - store.newChan = make(chan newSessionRequest) - store.runChan = make(chan sessionRunRequest) - store.addProgressChan = make(chan sessionAddProgressHandlerRequest) - store.addDoneChan = make(chan sessionAddDoneHandlerRequest) - store.removeChan = make(chan sessionRemoveRequest) - - go store.dispatchRequests() +func NewSession(ctx *ImportContext) (session *Session) { + session = new(Session) + session.running = false + session.ctx = *ctx + session.done = make(chan bool) + session.progressIntChan = make(chan ProgressData) + session.doneIntChan = make(chan ImportResult) + session.runChan = make(chan bool) + session.cancelChan = make(chan bool) + session.addProgressChan = make(chan sessionAddProgressHandlerRequest) + session.addDoneChan = make(chan sessionAddDoneHandlerRequest) + go session.dispatchRequests() return } |