summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2015-12-30 19:58:28 (GMT)
committerChristian Pointner <equinox@helsinki.at>2015-12-30 19:58:28 (GMT)
commitb44f626200669a57a2bc012d2203c36f587b425b (patch)
treeebab1722aa041c75dd7535bb9841c28af8b3f2a4 /src
parente84dd58a96c80847a18b4674f3ebb600d94227e1 (diff)
session store and web socket interface now support to set a reference id
Diffstat (limited to 'src')
-rw-r--r--src/helsinki.at/rhimport/session_store.go35
-rw-r--r--src/helsinki.at/rhimportd/ctrlWebSocket.go46
2 files changed, 47 insertions, 34 deletions
diff --git a/src/helsinki.at/rhimport/session_store.go b/src/helsinki.at/rhimport/session_store.go
index 5025d8c..7fe6585 100644
--- a/src/helsinki.at/rhimport/session_store.go
+++ b/src/helsinki.at/rhimport/session_store.go
@@ -40,11 +40,13 @@ type newSessionResponse struct {
type newSessionRequest struct {
ctx *ImportContext
+ refId string
response chan newSessionResponse
}
type getSessionResponse struct {
session *SessionChan
+ refId string
responsecode int
errorstring string
}
@@ -52,6 +54,7 @@ type getSessionResponse struct {
type getSessionRequest struct {
user string
id string
+ refId string
response chan getSessionResponse
}
@@ -66,8 +69,13 @@ type removeSessionRequest struct {
response chan removeSessionResponse
}
+type SessionStoreElement struct {
+ s *Session
+ refId string
+}
+
type SessionStore struct {
- store map[string]map[string]*Session
+ store map[string]map[string]*SessionStoreElement
quit chan bool
done chan bool
newChan chan newSessionRequest
@@ -83,7 +91,7 @@ func generateSessionId() (string, error) {
return base64.RawStdEncoding.EncodeToString(b[:]), nil
}
-func (self *SessionStore) new(ctx *ImportContext) (resp newSessionResponse) {
+func (self *SessionStore) new(ctx *ImportContext, refId string) (resp newSessionResponse) {
resp.responsecode = http.StatusOK
resp.errorstring = "OK"
if !ctx.Trusted {
@@ -103,10 +111,11 @@ func (self *SessionStore) new(ctx *ImportContext) (resp newSessionResponse) {
} else {
resp.id = id
if _, exists := self.store[ctx.UserName]; !exists {
- self.store[ctx.UserName] = make(map[string]*Session)
+ self.store[ctx.UserName] = make(map[string]*SessionStoreElement)
}
- self.store[ctx.UserName][resp.id] = NewSession(ctx, func() { self.GetInterface().Remove(ctx.UserName, resp.id) })
- resp.session = self.store[ctx.UserName][resp.id].getInterface()
+ s := &SessionStoreElement{NewSession(ctx, func() { self.GetInterface().Remove(ctx.UserName, resp.id) }), refId}
+ self.store[ctx.UserName][resp.id] = s
+ resp.session = self.store[ctx.UserName][resp.id].s.getInterface()
rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id)
}
return
@@ -116,7 +125,8 @@ func (self *SessionStore) get(user, id string) (resp getSessionResponse) {
resp.responsecode = http.StatusOK
resp.errorstring = "OK"
if session, exists := self.store[user][id]; exists {
- resp.session = session.getInterface()
+ resp.session = session.s.getInterface()
+ resp.refId = session.refId
} else {
resp.responsecode = http.StatusNotFound
resp.errorstring = fmt.Sprintf("SessionStore: session '%s/%s' not found", user, id)
@@ -128,7 +138,7 @@ func (self *SessionStore) remove(user, id string) (resp removeSessionResponse) {
resp.responsecode = http.StatusOK
resp.errorstring = "OK"
if session, exists := self.store[user][id]; exists {
- go session.Cleanup() // cleanup could take a while -> don't block all the other stuff
+ go session.s.Cleanup() // cleanup could take a while -> don't block all the other stuff
delete(self.store[user], id)
rhdl.Printf("SessionStore: removed session '%s/%s'", user, id)
if userstore, exists := self.store[user]; exists {
@@ -151,7 +161,7 @@ func (self *SessionStore) dispatchRequests() {
case <-self.quit:
return
case req := <-self.newChan:
- req.response <- self.new(req.ctx)
+ req.response <- self.new(req.ctx, req.refId)
case req := <-self.getChan:
req.response <- self.get(req.user, req.id)
case req := <-self.removeChan:
@@ -169,10 +179,11 @@ type SessionStoreChan struct {
removeChan chan<- removeSessionRequest
}
-func (self *SessionStoreChan) New(ctx *ImportContext) (string, *SessionChan, int, string) {
+func (self *SessionStoreChan) New(ctx *ImportContext, refId string) (string, *SessionChan, int, string) {
res_ch := make(chan newSessionResponse)
req := newSessionRequest{}
req.ctx = ctx
+ req.refId = refId
req.response = res_ch
self.newChan <- req
@@ -180,7 +191,7 @@ func (self *SessionStoreChan) New(ctx *ImportContext) (string, *SessionChan, int
return res.id, res.session, res.responsecode, res.errorstring
}
-func (self *SessionStoreChan) Get(user, id string) (*SessionChan, int, string) {
+func (self *SessionStoreChan) Get(user, id string) (*SessionChan, string, int, string) {
res_ch := make(chan getSessionResponse)
req := getSessionRequest{}
req.user = user
@@ -189,7 +200,7 @@ func (self *SessionStoreChan) Get(user, id string) (*SessionChan, int, string) {
self.getChan <- req
res := <-res_ch
- return res.session, res.responsecode, res.errorstring
+ return res.session, res.refId, res.responsecode, res.errorstring
}
func (self *SessionStoreChan) Remove(user, id string) (int, string) {
@@ -227,7 +238,7 @@ func NewSessionStore(conf *Config) (store *SessionStore, err error) {
store.quit = make(chan bool)
store.done = make(chan bool)
- store.store = make(map[string]map[string]*Session)
+ store.store = make(map[string]map[string]*SessionStoreElement)
store.newChan = make(chan newSessionRequest, 10)
store.getChan = make(chan getSessionRequest, 10)
store.removeChan = make(chan removeSessionRequest, 10)
diff --git a/src/helsinki.at/rhimportd/ctrlWebSocket.go b/src/helsinki.at/rhimportd/ctrlWebSocket.go
index 92e261a..ea7bf24 100644
--- a/src/helsinki.at/rhimportd/ctrlWebSocket.go
+++ b/src/helsinki.at/rhimportd/ctrlWebSocket.go
@@ -37,6 +37,7 @@ import (
type webSocketRequestData struct {
Command string `json:"COMMAND"`
Id string `json:"ID"`
+ RefId string `json:"REFERENCE_ID"`
UserName string `json:"LOGIN_NAME"`
Password string `json:"PASSWORD"`
ShowId uint `json:"SHOW_ID"`
@@ -79,6 +80,7 @@ type webSocketResponseData struct {
Type string `json:"TYPE"`
ErrorString string `json:"ERROR_STRING"`
Id string `json:"ID"`
+ RefId string `json:"REFERENCE_ID"`
ProgressStep int `json:"PROGRESS_STEP"`
ProgressStepName string `json:"PROGRESS_STEP_NAME"`
Progress float64 `json:"PROGRESS"`
@@ -98,6 +100,7 @@ func sendWebSocketErrorResponse(ws *websocket.Conn, id string, code int, err_str
type webSocketSession struct {
id string
+ refId string
session *rhimport.SessionChan
progresschan chan rhimport.ProgressData
donechan chan rhimport.ImportResult
@@ -131,7 +134,7 @@ func webSocketDone(res rhimport.ImportResult, userdata interface{}) bool {
func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan) (int, string) {
ctx := rhimport.NewImportContext(conf, rddb, reqdata.UserName)
ctx.Password = reqdata.Password
- ctx.Trusted = true // set this to false as soon as the interface is working
+ ctx.Trusted = true // TODO: set this to false as soon as the interface is working
ctx.ShowId = reqdata.ShowId
ctx.ClearShowCarts = reqdata.ClearShowCarts
ctx.GroupName = reqdata.MusicPoolGroup
@@ -144,11 +147,12 @@ func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, con
ctx.UseMetaData = reqdata.UseMetaData
ctx.SourceUri = reqdata.SourceUri
- id, s, code, errstring := sessions.New(ctx)
+ id, s, code, errstring := sessions.New(ctx, reqdata.RefId)
if code != http.StatusOK {
return code, errstring
}
self.id = id
+ self.refId = reqdata.RefId
self.session = s
if err := s.AddDoneHandler((chan<- rhimport.ImportResult)(self.donechan), webSocketDone); err != nil {
return http.StatusInternalServerError, err.Error()
@@ -161,11 +165,12 @@ func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, con
}
func (self *webSocketSession) reconnectSession(reqdata *webSocketRequestData, sessions *rhimport.SessionStoreChan) (int, string) {
- s, code, errstring := sessions.Get(reqdata.UserName, reqdata.Id)
+ s, refId, code, errstring := sessions.Get(reqdata.UserName, reqdata.Id)
if code != http.StatusOK {
return code, errstring
}
self.id = reqdata.Id
+ self.refId = refId
self.session = s
if err := s.AddDoneHandler((chan<- rhimport.ImportResult)(self.donechan), webSocketDone); err != nil {
return http.StatusInternalServerError, err.Error()
@@ -191,41 +196,38 @@ func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.
case "new":
if session.id != "" {
sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection already handles a session")
- return
- }
- code, errstring := session.startNewSession(&reqdata, conf, rddb, sessions)
- if code != http.StatusOK {
- sendWebSocketErrorResponse(ws, "", code, errstring)
- return
} else {
- sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id})
+ code, errstring := session.startNewSession(&reqdata, conf, rddb, sessions)
+ if code != http.StatusOK {
+ sendWebSocketErrorResponse(ws, "", code, errstring)
+ } else {
+ sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id, RefId: session.refId})
+ }
}
case "cancel":
if session.id == "" {
sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection doesn't handle any session")
- return
+ } else {
+ session.session.Cancel()
}
- session.session.Cancel()
case "reconnect":
if session.id != "" {
sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection already handles a session")
- return
- }
- code, errstring := session.reconnectSession(&reqdata, sessions)
- if code != http.StatusOK {
- sendWebSocketErrorResponse(ws, "", code, errstring)
- return
} else {
- sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id})
+ code, errstring := session.reconnectSession(&reqdata, sessions)
+ if code != http.StatusOK {
+ sendWebSocketErrorResponse(ws, "", code, errstring)
+ } else {
+ sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id, RefId: session.refId})
+ }
}
default:
sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command))
- return
}
case p := <-session.progresschan:
- sendWebSocketResponse(ws, &webSocketResponseData{http.StatusOK, "PROGRESS", "", session.id, p.Step, p.StepName, p.Progress * 100, 0, 0})
+ sendWebSocketResponse(ws, &webSocketResponseData{http.StatusOK, "PROGRESS", "", session.id, session.refId, p.Step, p.StepName, p.Progress * 100, 0, 0})
case d := <-session.donechan:
- sendWebSocketResponse(ws, &webSocketResponseData{d.ResponseCode, "DONE", d.ErrorString, session.id, 0, "", 100.0, d.Cart, d.Cut})
+ sendWebSocketResponse(ws, &webSocketResponseData{d.ResponseCode, "DONE", d.ErrorString, session.id, session.refId, 0, "", 100.0, d.Cart, d.Cut})
// TODO: send close message at this point?
}
}