summaryrefslogtreecommitdiff
path: root/src/helsinki.at/rhimportd
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2015-12-30 13:44:32 (GMT)
committerChristian Pointner <equinox@helsinki.at>2015-12-30 13:44:32 (GMT)
commite9a1f32139ea729d0cd57a99132adc837d62690b (patch)
tree8fdc8ae790890c09f416109b7211de4484f36dfc /src/helsinki.at/rhimportd
parent6467923f8caaa02829fc53666b32df594da3ea68 (diff)
improved session handling at websocket interface
Diffstat (limited to 'src/helsinki.at/rhimportd')
-rw-r--r--src/helsinki.at/rhimportd/ctrlWebSocket.go38
1 files changed, 27 insertions, 11 deletions
diff --git a/src/helsinki.at/rhimportd/ctrlWebSocket.go b/src/helsinki.at/rhimportd/ctrlWebSocket.go
index d14dd90..1e8cfff 100644
--- a/src/helsinki.at/rhimportd/ctrlWebSocket.go
+++ b/src/helsinki.at/rhimportd/ctrlWebSocket.go
@@ -82,6 +82,8 @@ type webSocketResponseData struct {
ProgressStep int `json:"PROGRESS_STEP"`
ProgressStepName string `json:"PROGRESS_STEP_NAME"`
Progress float64 `json:"PROGRESS"`
+ Cart uint `json:"CART_NUMBER"`
+ Cut uint `json:"CUT_NUMBER"`
}
func sendWebSocketResponse(ws *websocket.Conn, rd *webSocketResponseData) {
@@ -90,37 +92,39 @@ func sendWebSocketResponse(ws *websocket.Conn, rd *webSocketResponseData) {
}
}
-func sendWebSocketErrorResponse(ws *websocket.Conn, code int, err_str string) {
- sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ERROR", ErrorString: err_str})
+func sendWebSocketErrorResponse(ws *websocket.Conn, id string, code int, err_str string) {
+ sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ERROR", ErrorString: err_str, Id: id})
}
type webSocketSession struct {
id string
session *rhimport.SessionChan
respchan chan webSocketResponseData
+ donechan chan rhimport.ImportResult
}
func newWebSocketSession() *webSocketSession {
session := &webSocketSession{}
session.respchan = make(chan webSocketResponseData, 10)
+ session.donechan = make(chan rhimport.ImportResult, 1)
return session
}
func webSocketProgress(step int, step_name string, progress float64, userdata interface{}) bool {
- out := userdata.(chan<- webSocketResponseData)
if math.IsNaN(progress) {
progress = 0.0
}
+ session := userdata.(*webSocketSession)
select {
- case out <- webSocketResponseData{http.StatusOK, "PROGRESS", "", "", step, step_name, progress * 100}:
+ case session.respchan <- webSocketResponseData{http.StatusOK, "PROGRESS", "", session.id, step, step_name, progress * 100, 0, 0}:
default:
}
return true
}
func webSocketDone(res rhimport.ImportResult, userdata interface{}) bool {
- respchan := userdata.(chan<- webSocketResponseData)
- respchan <- webSocketResponseData{res.ResponseCode, "DONE", res.ErrorString, "", 0, "", 100.0}
+ session := userdata.(*webSocketSession)
+ session.donechan <- res
return true
}
@@ -146,10 +150,10 @@ func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, con
}
self.id = id
self.session = s
- if err := s.AddDoneHandler((chan<- webSocketResponseData)(self.respchan), webSocketDone); err != nil {
+ if err := s.AddDoneHandler(self, webSocketDone); err != nil {
return http.StatusInternalServerError, err.Error()
}
- if err := s.AddProgressHandler((chan<- webSocketResponseData)(self.respchan), webSocketProgress); err != nil {
+ if err := s.AddProgressHandler(self, webSocketProgress); err != nil {
return http.StatusInternalServerError, err.Error()
}
s.Run(time.Duration(reqdata.Timeout) * time.Second)
@@ -168,21 +172,33 @@ func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.
}
switch reqdata.Command {
case "new":
+ if session.id != "" {
+ sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection already handles a session")
+ return
+ }
code, errstring := session.startNewSession(&reqdata, conf, rddb, sessions)
if code != http.StatusOK {
- sendWebSocketErrorResponse(ws, code, errstring)
+ sendWebSocketErrorResponse(ws, "", code, errstring)
+ return
} else {
sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id})
}
case "reconnect":
- sendWebSocketErrorResponse(ws, http.StatusNotImplemented, "reconnect session - not yet implemented")
+ if session.id != "" {
+ sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection already handles a session")
+ return
+ }
+ sendWebSocketErrorResponse(ws, "", http.StatusNotImplemented, "reconnect session - not yet implemented")
return
default:
- sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command))
+ sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command))
return
}
case respdata := <-session.respchan:
sendWebSocketResponse(ws, &respdata)
+ case donedata := <-session.donechan:
+ sendWebSocketResponse(ws, &webSocketResponseData{donedata.ResponseCode, "DONE", donedata.ErrorString, session.id, 0, "", 100.0, donedata.Cart, donedata.Cut})
+ // TODO: send close message at this point?
}
}
}