From e9a1f32139ea729d0cd57a99132adc837d62690b Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Wed, 30 Dec 2015 14:44:32 +0100 Subject: improved session handling at websocket interface diff --git a/src/helsinki.at/rhimportd/ctrlWebSocket.go b/src/helsinki.at/rhimportd/ctrlWebSocket.go index d14dd90..1e8cfff 100644 --- a/src/helsinki.at/rhimportd/ctrlWebSocket.go +++ b/src/helsinki.at/rhimportd/ctrlWebSocket.go @@ -82,6 +82,8 @@ type webSocketResponseData struct { ProgressStep int `json:"PROGRESS_STEP"` ProgressStepName string `json:"PROGRESS_STEP_NAME"` Progress float64 `json:"PROGRESS"` + Cart uint `json:"CART_NUMBER"` + Cut uint `json:"CUT_NUMBER"` } func sendWebSocketResponse(ws *websocket.Conn, rd *webSocketResponseData) { @@ -90,37 +92,39 @@ func sendWebSocketResponse(ws *websocket.Conn, rd *webSocketResponseData) { } } -func sendWebSocketErrorResponse(ws *websocket.Conn, code int, err_str string) { - sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ERROR", ErrorString: err_str}) +func sendWebSocketErrorResponse(ws *websocket.Conn, id string, code int, err_str string) { + sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ERROR", ErrorString: err_str, Id: id}) } type webSocketSession struct { id string session *rhimport.SessionChan respchan chan webSocketResponseData + donechan chan rhimport.ImportResult } func newWebSocketSession() *webSocketSession { session := &webSocketSession{} session.respchan = make(chan webSocketResponseData, 10) + session.donechan = make(chan rhimport.ImportResult, 1) return session } func webSocketProgress(step int, step_name string, progress float64, userdata interface{}) bool { - out := userdata.(chan<- webSocketResponseData) if math.IsNaN(progress) { progress = 0.0 } + session := userdata.(*webSocketSession) select { - case out <- webSocketResponseData{http.StatusOK, "PROGRESS", "", "", step, step_name, progress * 100}: + case session.respchan <- webSocketResponseData{http.StatusOK, "PROGRESS", "", session.id, step, step_name, progress * 100, 0, 0}: default: } return true } func webSocketDone(res rhimport.ImportResult, userdata interface{}) bool { - respchan := userdata.(chan<- webSocketResponseData) - respchan <- webSocketResponseData{res.ResponseCode, "DONE", res.ErrorString, "", 0, "", 100.0} + session := userdata.(*webSocketSession) + session.donechan <- res return true } @@ -146,10 +150,10 @@ func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, con } self.id = id self.session = s - if err := s.AddDoneHandler((chan<- webSocketResponseData)(self.respchan), webSocketDone); err != nil { + if err := s.AddDoneHandler(self, webSocketDone); err != nil { return http.StatusInternalServerError, err.Error() } - if err := s.AddProgressHandler((chan<- webSocketResponseData)(self.respchan), webSocketProgress); err != nil { + if err := s.AddProgressHandler(self, webSocketProgress); err != nil { return http.StatusInternalServerError, err.Error() } s.Run(time.Duration(reqdata.Timeout) * time.Second) @@ -168,21 +172,33 @@ func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket. } switch reqdata.Command { case "new": + if session.id != "" { + sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection already handles a session") + return + } code, errstring := session.startNewSession(&reqdata, conf, rddb, sessions) if code != http.StatusOK { - sendWebSocketErrorResponse(ws, code, errstring) + sendWebSocketErrorResponse(ws, "", code, errstring) + return } else { sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id}) } case "reconnect": - sendWebSocketErrorResponse(ws, http.StatusNotImplemented, "reconnect session - not yet implemented") + if session.id != "" { + sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection already handles a session") + return + } + sendWebSocketErrorResponse(ws, "", http.StatusNotImplemented, "reconnect session - not yet implemented") return default: - sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command)) + sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command)) return } case respdata := <-session.respchan: sendWebSocketResponse(ws, &respdata) + case donedata := <-session.donechan: + sendWebSocketResponse(ws, &webSocketResponseData{donedata.ResponseCode, "DONE", donedata.ErrorString, session.id, 0, "", 100.0, donedata.Cart, donedata.Cut}) + // TODO: send close message at this point? } } } diff --git a/test/socket.html b/test/socket.html index 06fce06..e2f7b85 100644 --- a/test/socket.html +++ b/test/socket.html @@ -38,7 +38,7 @@ PASSWORD: "12423", SHOW_ID: 10002, CLEAR_SHOW_CARTS: true, - SOURCE_URI: "fake://100" }; + SOURCE_URI: "http://www.tonycuffe.com/mp3/tail%20toddle.mp3" }; s = new Session(req); } -- cgit v0.10.2