diff options
author | Christian Pointner <equinox@helsinki.at> | 2015-12-30 13:44:32 (GMT) |
---|---|---|
committer | Christian Pointner <equinox@helsinki.at> | 2015-12-30 13:44:32 (GMT) |
commit | e9a1f32139ea729d0cd57a99132adc837d62690b (patch) | |
tree | 8fdc8ae790890c09f416109b7211de4484f36dfc /src/helsinki.at/rhimportd | |
parent | 6467923f8caaa02829fc53666b32df594da3ea68 (diff) |
improved session handling at websocket interface
Diffstat (limited to 'src/helsinki.at/rhimportd')
-rw-r--r-- | src/helsinki.at/rhimportd/ctrlWebSocket.go | 38 |
1 files changed, 27 insertions, 11 deletions
diff --git a/src/helsinki.at/rhimportd/ctrlWebSocket.go b/src/helsinki.at/rhimportd/ctrlWebSocket.go index d14dd90..1e8cfff 100644 --- a/src/helsinki.at/rhimportd/ctrlWebSocket.go +++ b/src/helsinki.at/rhimportd/ctrlWebSocket.go @@ -82,6 +82,8 @@ type webSocketResponseData struct { ProgressStep int `json:"PROGRESS_STEP"` ProgressStepName string `json:"PROGRESS_STEP_NAME"` Progress float64 `json:"PROGRESS"` + Cart uint `json:"CART_NUMBER"` + Cut uint `json:"CUT_NUMBER"` } func sendWebSocketResponse(ws *websocket.Conn, rd *webSocketResponseData) { @@ -90,37 +92,39 @@ func sendWebSocketResponse(ws *websocket.Conn, rd *webSocketResponseData) { } } -func sendWebSocketErrorResponse(ws *websocket.Conn, code int, err_str string) { - sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ERROR", ErrorString: err_str}) +func sendWebSocketErrorResponse(ws *websocket.Conn, id string, code int, err_str string) { + sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ERROR", ErrorString: err_str, Id: id}) } type webSocketSession struct { id string session *rhimport.SessionChan respchan chan webSocketResponseData + donechan chan rhimport.ImportResult } func newWebSocketSession() *webSocketSession { session := &webSocketSession{} session.respchan = make(chan webSocketResponseData, 10) + session.donechan = make(chan rhimport.ImportResult, 1) return session } func webSocketProgress(step int, step_name string, progress float64, userdata interface{}) bool { - out := userdata.(chan<- webSocketResponseData) if math.IsNaN(progress) { progress = 0.0 } + session := userdata.(*webSocketSession) select { - case out <- webSocketResponseData{http.StatusOK, "PROGRESS", "", "", step, step_name, progress * 100}: + case session.respchan <- webSocketResponseData{http.StatusOK, "PROGRESS", "", session.id, step, step_name, progress * 100, 0, 0}: default: } return true } func webSocketDone(res rhimport.ImportResult, userdata interface{}) bool { - respchan := userdata.(chan<- webSocketResponseData) - respchan <- webSocketResponseData{res.ResponseCode, "DONE", res.ErrorString, "", 0, "", 100.0} + session := userdata.(*webSocketSession) + session.donechan <- res return true } @@ -146,10 +150,10 @@ func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, con } self.id = id self.session = s - if err := s.AddDoneHandler((chan<- webSocketResponseData)(self.respchan), webSocketDone); err != nil { + if err := s.AddDoneHandler(self, webSocketDone); err != nil { return http.StatusInternalServerError, err.Error() } - if err := s.AddProgressHandler((chan<- webSocketResponseData)(self.respchan), webSocketProgress); err != nil { + if err := s.AddProgressHandler(self, webSocketProgress); err != nil { return http.StatusInternalServerError, err.Error() } s.Run(time.Duration(reqdata.Timeout) * time.Second) @@ -168,21 +172,33 @@ func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket. } switch reqdata.Command { case "new": + if session.id != "" { + sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection already handles a session") + return + } code, errstring := session.startNewSession(&reqdata, conf, rddb, sessions) if code != http.StatusOK { - sendWebSocketErrorResponse(ws, code, errstring) + sendWebSocketErrorResponse(ws, "", code, errstring) + return } else { sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id}) } case "reconnect": - sendWebSocketErrorResponse(ws, http.StatusNotImplemented, "reconnect session - not yet implemented") + if session.id != "" { + sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection already handles a session") + return + } + sendWebSocketErrorResponse(ws, "", http.StatusNotImplemented, "reconnect session - not yet implemented") return default: - sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command)) + sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command)) return } case respdata := <-session.respchan: sendWebSocketResponse(ws, &respdata) + case donedata := <-session.donechan: + sendWebSocketResponse(ws, &webSocketResponseData{donedata.ResponseCode, "DONE", donedata.ErrorString, session.id, 0, "", 100.0, donedata.Cart, donedata.Cut}) + // TODO: send close message at this point? } } } |