diff options
author | Christian Pointner <equinox@helsinki.at> | 2015-12-30 19:58:28 (GMT) |
---|---|---|
committer | Christian Pointner <equinox@helsinki.at> | 2015-12-30 19:58:28 (GMT) |
commit | b44f626200669a57a2bc012d2203c36f587b425b (patch) | |
tree | ebab1722aa041c75dd7535bb9841c28af8b3f2a4 /src | |
parent | e84dd58a96c80847a18b4674f3ebb600d94227e1 (diff) |
session store and web socket interface now support to set a reference id
Diffstat (limited to 'src')
-rw-r--r-- | src/helsinki.at/rhimport/session_store.go | 35 | ||||
-rw-r--r-- | src/helsinki.at/rhimportd/ctrlWebSocket.go | 46 |
2 files changed, 47 insertions, 34 deletions
diff --git a/src/helsinki.at/rhimport/session_store.go b/src/helsinki.at/rhimport/session_store.go index 5025d8c..7fe6585 100644 --- a/src/helsinki.at/rhimport/session_store.go +++ b/src/helsinki.at/rhimport/session_store.go @@ -40,11 +40,13 @@ type newSessionResponse struct { type newSessionRequest struct { ctx *ImportContext + refId string response chan newSessionResponse } type getSessionResponse struct { session *SessionChan + refId string responsecode int errorstring string } @@ -52,6 +54,7 @@ type getSessionResponse struct { type getSessionRequest struct { user string id string + refId string response chan getSessionResponse } @@ -66,8 +69,13 @@ type removeSessionRequest struct { response chan removeSessionResponse } +type SessionStoreElement struct { + s *Session + refId string +} + type SessionStore struct { - store map[string]map[string]*Session + store map[string]map[string]*SessionStoreElement quit chan bool done chan bool newChan chan newSessionRequest @@ -83,7 +91,7 @@ func generateSessionId() (string, error) { return base64.RawStdEncoding.EncodeToString(b[:]), nil } -func (self *SessionStore) new(ctx *ImportContext) (resp newSessionResponse) { +func (self *SessionStore) new(ctx *ImportContext, refId string) (resp newSessionResponse) { resp.responsecode = http.StatusOK resp.errorstring = "OK" if !ctx.Trusted { @@ -103,10 +111,11 @@ func (self *SessionStore) new(ctx *ImportContext) (resp newSessionResponse) { } else { resp.id = id if _, exists := self.store[ctx.UserName]; !exists { - self.store[ctx.UserName] = make(map[string]*Session) + self.store[ctx.UserName] = make(map[string]*SessionStoreElement) } - self.store[ctx.UserName][resp.id] = NewSession(ctx, func() { self.GetInterface().Remove(ctx.UserName, resp.id) }) - resp.session = self.store[ctx.UserName][resp.id].getInterface() + s := &SessionStoreElement{NewSession(ctx, func() { self.GetInterface().Remove(ctx.UserName, resp.id) }), refId} + self.store[ctx.UserName][resp.id] = s + resp.session = self.store[ctx.UserName][resp.id].s.getInterface() rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id) } return @@ -116,7 +125,8 @@ func (self *SessionStore) get(user, id string) (resp getSessionResponse) { resp.responsecode = http.StatusOK resp.errorstring = "OK" if session, exists := self.store[user][id]; exists { - resp.session = session.getInterface() + resp.session = session.s.getInterface() + resp.refId = session.refId } else { resp.responsecode = http.StatusNotFound resp.errorstring = fmt.Sprintf("SessionStore: session '%s/%s' not found", user, id) @@ -128,7 +138,7 @@ func (self *SessionStore) remove(user, id string) (resp removeSessionResponse) { resp.responsecode = http.StatusOK resp.errorstring = "OK" if session, exists := self.store[user][id]; exists { - go session.Cleanup() // cleanup could take a while -> don't block all the other stuff + go session.s.Cleanup() // cleanup could take a while -> don't block all the other stuff delete(self.store[user], id) rhdl.Printf("SessionStore: removed session '%s/%s'", user, id) if userstore, exists := self.store[user]; exists { @@ -151,7 +161,7 @@ func (self *SessionStore) dispatchRequests() { case <-self.quit: return case req := <-self.newChan: - req.response <- self.new(req.ctx) + req.response <- self.new(req.ctx, req.refId) case req := <-self.getChan: req.response <- self.get(req.user, req.id) case req := <-self.removeChan: @@ -169,10 +179,11 @@ type SessionStoreChan struct { removeChan chan<- removeSessionRequest } -func (self *SessionStoreChan) New(ctx *ImportContext) (string, *SessionChan, int, string) { +func (self *SessionStoreChan) New(ctx *ImportContext, refId string) (string, *SessionChan, int, string) { res_ch := make(chan newSessionResponse) req := newSessionRequest{} req.ctx = ctx + req.refId = refId req.response = res_ch self.newChan <- req @@ -180,7 +191,7 @@ func (self *SessionStoreChan) New(ctx *ImportContext) (string, *SessionChan, int return res.id, res.session, res.responsecode, res.errorstring } -func (self *SessionStoreChan) Get(user, id string) (*SessionChan, int, string) { +func (self *SessionStoreChan) Get(user, id string) (*SessionChan, string, int, string) { res_ch := make(chan getSessionResponse) req := getSessionRequest{} req.user = user @@ -189,7 +200,7 @@ func (self *SessionStoreChan) Get(user, id string) (*SessionChan, int, string) { self.getChan <- req res := <-res_ch - return res.session, res.responsecode, res.errorstring + return res.session, res.refId, res.responsecode, res.errorstring } func (self *SessionStoreChan) Remove(user, id string) (int, string) { @@ -227,7 +238,7 @@ func NewSessionStore(conf *Config) (store *SessionStore, err error) { store.quit = make(chan bool) store.done = make(chan bool) - store.store = make(map[string]map[string]*Session) + store.store = make(map[string]map[string]*SessionStoreElement) store.newChan = make(chan newSessionRequest, 10) store.getChan = make(chan getSessionRequest, 10) store.removeChan = make(chan removeSessionRequest, 10) diff --git a/src/helsinki.at/rhimportd/ctrlWebSocket.go b/src/helsinki.at/rhimportd/ctrlWebSocket.go index 92e261a..ea7bf24 100644 --- a/src/helsinki.at/rhimportd/ctrlWebSocket.go +++ b/src/helsinki.at/rhimportd/ctrlWebSocket.go @@ -37,6 +37,7 @@ import ( type webSocketRequestData struct { Command string `json:"COMMAND"` Id string `json:"ID"` + RefId string `json:"REFERENCE_ID"` UserName string `json:"LOGIN_NAME"` Password string `json:"PASSWORD"` ShowId uint `json:"SHOW_ID"` @@ -79,6 +80,7 @@ type webSocketResponseData struct { Type string `json:"TYPE"` ErrorString string `json:"ERROR_STRING"` Id string `json:"ID"` + RefId string `json:"REFERENCE_ID"` ProgressStep int `json:"PROGRESS_STEP"` ProgressStepName string `json:"PROGRESS_STEP_NAME"` Progress float64 `json:"PROGRESS"` @@ -98,6 +100,7 @@ func sendWebSocketErrorResponse(ws *websocket.Conn, id string, code int, err_str type webSocketSession struct { id string + refId string session *rhimport.SessionChan progresschan chan rhimport.ProgressData donechan chan rhimport.ImportResult @@ -131,7 +134,7 @@ func webSocketDone(res rhimport.ImportResult, userdata interface{}) bool { func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan) (int, string) { ctx := rhimport.NewImportContext(conf, rddb, reqdata.UserName) ctx.Password = reqdata.Password - ctx.Trusted = true // set this to false as soon as the interface is working + ctx.Trusted = true // TODO: set this to false as soon as the interface is working ctx.ShowId = reqdata.ShowId ctx.ClearShowCarts = reqdata.ClearShowCarts ctx.GroupName = reqdata.MusicPoolGroup @@ -144,11 +147,12 @@ func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, con ctx.UseMetaData = reqdata.UseMetaData ctx.SourceUri = reqdata.SourceUri - id, s, code, errstring := sessions.New(ctx) + id, s, code, errstring := sessions.New(ctx, reqdata.RefId) if code != http.StatusOK { return code, errstring } self.id = id + self.refId = reqdata.RefId self.session = s if err := s.AddDoneHandler((chan<- rhimport.ImportResult)(self.donechan), webSocketDone); err != nil { return http.StatusInternalServerError, err.Error() @@ -161,11 +165,12 @@ func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, con } func (self *webSocketSession) reconnectSession(reqdata *webSocketRequestData, sessions *rhimport.SessionStoreChan) (int, string) { - s, code, errstring := sessions.Get(reqdata.UserName, reqdata.Id) + s, refId, code, errstring := sessions.Get(reqdata.UserName, reqdata.Id) if code != http.StatusOK { return code, errstring } self.id = reqdata.Id + self.refId = refId self.session = s if err := s.AddDoneHandler((chan<- rhimport.ImportResult)(self.donechan), webSocketDone); err != nil { return http.StatusInternalServerError, err.Error() @@ -191,41 +196,38 @@ func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket. case "new": if session.id != "" { sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection already handles a session") - return - } - code, errstring := session.startNewSession(&reqdata, conf, rddb, sessions) - if code != http.StatusOK { - sendWebSocketErrorResponse(ws, "", code, errstring) - return } else { - sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id}) + code, errstring := session.startNewSession(&reqdata, conf, rddb, sessions) + if code != http.StatusOK { + sendWebSocketErrorResponse(ws, "", code, errstring) + } else { + sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id, RefId: session.refId}) + } } case "cancel": if session.id == "" { sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection doesn't handle any session") - return + } else { + session.session.Cancel() } - session.session.Cancel() case "reconnect": if session.id != "" { sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection already handles a session") - return - } - code, errstring := session.reconnectSession(&reqdata, sessions) - if code != http.StatusOK { - sendWebSocketErrorResponse(ws, "", code, errstring) - return } else { - sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id}) + code, errstring := session.reconnectSession(&reqdata, sessions) + if code != http.StatusOK { + sendWebSocketErrorResponse(ws, "", code, errstring) + } else { + sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id, RefId: session.refId}) + } } default: sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command)) - return } case p := <-session.progresschan: - sendWebSocketResponse(ws, &webSocketResponseData{http.StatusOK, "PROGRESS", "", session.id, p.Step, p.StepName, p.Progress * 100, 0, 0}) + sendWebSocketResponse(ws, &webSocketResponseData{http.StatusOK, "PROGRESS", "", session.id, session.refId, p.Step, p.StepName, p.Progress * 100, 0, 0}) case d := <-session.donechan: - sendWebSocketResponse(ws, &webSocketResponseData{d.ResponseCode, "DONE", d.ErrorString, session.id, 0, "", 100.0, d.Cart, d.Cut}) + sendWebSocketResponse(ws, &webSocketResponseData{d.ResponseCode, "DONE", d.ErrorString, session.id, session.refId, 0, "", 100.0, d.Cart, d.Cut}) // TODO: send close message at this point? } } |