diff options
author | Christian Pointner <equinox@helsinki.at> | 2015-12-29 19:13:44 (GMT) |
---|---|---|
committer | Christian Pointner <equinox@helsinki.at> | 2015-12-29 19:13:44 (GMT) |
commit | 6467923f8caaa02829fc53666b32df594da3ea68 (patch) | |
tree | bb3cdcc4f93dfee929b2c991e8bcbc1bfc1ed6e9 /src | |
parent | 4911fe3fd94019fcca730710e0b411e8e5a66847 (diff) |
websocket handler new/run works now (needs testing!)
Diffstat (limited to 'src')
-rw-r--r-- | src/helsinki.at/rhimportd/ctrlWeb.go | 1 | ||||
-rw-r--r-- | src/helsinki.at/rhimportd/ctrlWebSocket.go | 133 |
2 files changed, 104 insertions, 30 deletions
diff --git a/src/helsinki.at/rhimportd/ctrlWeb.go b/src/helsinki.at/rhimportd/ctrlWeb.go index 2a95676..794f8aa 100644 --- a/src/helsinki.at/rhimportd/ctrlWeb.go +++ b/src/helsinki.at/rhimportd/ctrlWeb.go @@ -48,7 +48,6 @@ func StartControlWeb(addr_s string, conf *rhimport.Config, rddb *rhimport.RdDbCh http.Handle("/trusted/simple", webHandler{conf, rddb, sessions, true, webSimpleHandler}) http.Handle("/public/socket", webHandler{conf, rddb, sessions, false, webSocketHandler}) - http.Handle("/trusted/socket", webHandler{conf, rddb, sessions, true, webSocketHandler}) rhl.Println("web-ctrl: listening on", addr_s) server := &http.Server{Addr: addr_s, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second} diff --git a/src/helsinki.at/rhimportd/ctrlWebSocket.go b/src/helsinki.at/rhimportd/ctrlWebSocket.go index b3f10f6..d14dd90 100644 --- a/src/helsinki.at/rhimportd/ctrlWebSocket.go +++ b/src/helsinki.at/rhimportd/ctrlWebSocket.go @@ -25,12 +25,13 @@ package main import ( - // "encoding/json" "fmt" "github.com/gorilla/websocket" "helsinki.at/rhimport" "html" + "math" "net/http" + "time" ) type webSocketRequestData struct { @@ -49,6 +50,7 @@ type webSocketRequestData struct { AutotrimLevel int `json:"AUTOTRIM_LEVEL"` UseMetaData bool `json:"USE_METADATA"` SourceUri string `json:"SOURCE_URI"` + Timeout uint `json:"TIMEOUT"` } func newWebSocketRequestData(conf *rhimport.Config) *webSocketRequestData { @@ -68,44 +70,119 @@ func newWebSocketRequestData(conf *rhimport.Config) *webSocketRequestData { rd.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel rd.UseMetaData = conf.ImportParamDefaults.UseMetaData rd.SourceUri = "" - + rd.Timeout = 0 return rd } -type webSocketErrorResponseData struct { - ResponseCode int `json:"RESPONSE_CODE"` - ErrorString string `json:"ERROR_STRING"` +type webSocketResponseData struct { + ResponseCode int `json:"RESPONSE_CODE"` + Type string `json:"TYPE"` + ErrorString string `json:"ERROR_STRING"` + Id string `json:"ID"` + ProgressStep int `json:"PROGRESS_STEP"` + ProgressStepName string `json:"PROGRESS_STEP_NAME"` + Progress float64 `json:"PROGRESS"` } -func sendWebSocketErrorResponse(ws *websocket.Conn, code int, err_str string) { - if err := ws.WriteJSON(webSocketErrorResponseData{code, err_str}); err != nil { +func sendWebSocketResponse(ws *websocket.Conn, rd *webSocketResponseData) { + if err := ws.WriteJSON(*rd); err != nil { rhdl.Println("WebScoket Client", ws.RemoteAddr(), "write error:", err) } } -func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn) { - defer ws.Close() +func sendWebSocketErrorResponse(ws *websocket.Conn, code int, err_str string) { + sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ERROR", ErrorString: err_str}) +} + +type webSocketSession struct { + id string + session *rhimport.SessionChan + respchan chan webSocketResponseData +} +func newWebSocketSession() *webSocketSession { + session := &webSocketSession{} + session.respchan = make(chan webSocketResponseData, 10) + return session +} + +func webSocketProgress(step int, step_name string, progress float64, userdata interface{}) bool { + out := userdata.(chan<- webSocketResponseData) + if math.IsNaN(progress) { + progress = 0.0 + } select { - case reqdata := <-reqchan: - switch reqdata.Command { - case "new": - sendWebSocketErrorResponse(ws, http.StatusNotImplemented, "new session - not yet implemented") - return - case "reconnect": - if reqdata.UserName == "" { - sendWebSocketErrorResponse(ws, http.StatusBadRequest, "missing mandotary field LOGIN_NAME") + case out <- webSocketResponseData{http.StatusOK, "PROGRESS", "", "", step, step_name, progress * 100}: + default: + } + return true +} + +func webSocketDone(res rhimport.ImportResult, userdata interface{}) bool { + respchan := userdata.(chan<- webSocketResponseData) + respchan <- webSocketResponseData{res.ResponseCode, "DONE", res.ErrorString, "", 0, "", 100.0} + return true +} + +func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan) (int, string) { + ctx := rhimport.NewImportContext(conf, rddb, reqdata.UserName) + ctx.Password = reqdata.Password + ctx.Trusted = true // set this to false as soon as the interface is working + ctx.ShowId = reqdata.ShowId + ctx.ClearShowCarts = reqdata.ClearShowCarts + ctx.GroupName = reqdata.MusicPoolGroup + ctx.Cart = reqdata.Cart + ctx.ClearCart = reqdata.ClearCart + ctx.Cut = reqdata.Cut + ctx.Channels = reqdata.Channels + ctx.NormalizationLevel = reqdata.NormalizationLevel + ctx.AutotrimLevel = reqdata.AutotrimLevel + ctx.UseMetaData = reqdata.UseMetaData + ctx.SourceUri = reqdata.SourceUri + + id, s, code, errstring := sessions.New(ctx) + if code != http.StatusOK { + return code, errstring + } + self.id = id + self.session = s + if err := s.AddDoneHandler((chan<- webSocketResponseData)(self.respchan), webSocketDone); err != nil { + return http.StatusInternalServerError, err.Error() + } + if err := s.AddProgressHandler((chan<- webSocketResponseData)(self.respchan), webSocketProgress); err != nil { + return http.StatusInternalServerError, err.Error() + } + s.Run(time.Duration(reqdata.Timeout) * time.Second) + return http.StatusOK, "SUCCESS" +} + +func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn, conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan) { + defer ws.Close() + + session := newWebSocketSession() + for { + select { + case reqdata, ok := <-reqchan: + if !ok { return } - if reqdata.Id == "" { - sendWebSocketErrorResponse(ws, http.StatusBadRequest, "missing mandotary field ID") + switch reqdata.Command { + case "new": + code, errstring := session.startNewSession(&reqdata, conf, rddb, sessions) + if code != http.StatusOK { + sendWebSocketErrorResponse(ws, code, errstring) + } else { + sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id}) + } + case "reconnect": + sendWebSocketErrorResponse(ws, http.StatusNotImplemented, "reconnect session - not yet implemented") + return + default: + sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command)) return } - sendWebSocketErrorResponse(ws, http.StatusNotImplemented, "reconnect session - not yet implemented") - return - default: - sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command)) - return + case respdata := <-session.respchan: + sendWebSocketResponse(ws, &respdata) } } } @@ -123,7 +200,8 @@ func webSocketHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions * } rhdl.Println("WebSocket Client", ws.RemoteAddr(), "connected") reqchan := make(chan webSocketRequestData) - go webSocketSessionHandler(reqchan, ws) + go webSocketSessionHandler(reqchan, ws, conf, rddb, sessions) + defer close(reqchan) for { reqdata := newWebSocketRequestData(conf) @@ -131,10 +209,7 @@ func webSocketHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions * rhdl.Println("WebSocket Client", ws.RemoteAddr(), "disconnected:", err) return } else { - rhdl.Printf("Websocket Client %s got: %+v", ws.RemoteAddr(), reqdata) - if trusted { - reqdata.UserName = r.Header.Get("X-Forwarded-User") - } + // rhdl.Printf("Websocket Client %s got: %+v", ws.RemoteAddr(), reqdata) reqchan <- *reqdata } } |