diff options
Diffstat (limited to 'src/rhimportd/ctrlWebSocket.go')
-rw-r--r-- | src/rhimportd/ctrlWebSocket.go | 331 |
1 files changed, 331 insertions, 0 deletions
diff --git a/src/rhimportd/ctrlWebSocket.go b/src/rhimportd/ctrlWebSocket.go new file mode 100644 index 0000000..a0386f1 --- /dev/null +++ b/src/rhimportd/ctrlWebSocket.go @@ -0,0 +1,331 @@ +// +// rhimportd +// +// The Radio Helsinki Rivendell Import Daemon +// +// +// Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at> +// +// This file is part of rhimportd. +// +// rhimportd is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// rhimportd is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with rhimportd. If not, see <http://www.gnu.org/licenses/>. +// + +package main + +import ( + "fmt" + "github.com/gorilla/websocket" + "helsinki.at/rhimport" + "html" + "math" + "net/http" + "time" +) + +type webSocketRequestData struct { + Command string `json:"COMMAND"` + Id string `json:"ID"` + RefId string `json:"REFERENCE_ID"` + UserName string `json:"LOGIN_NAME"` + Password string `json:"PASSWORD"` + ShowId uint `json:"SHOW_ID"` + ClearShowCarts bool `json:"CLEAR_SHOW_CARTS"` + MusicPoolGroup string `json:"MUSIC_POOL_GROUP"` + Cart uint `json:"CART_NUMBER"` + ClearCart bool `json:"CLEAR_CART"` + Cut uint `json:"CUT_NUMBER"` + Channels uint `json:"CHANNELS"` + NormalizationLevel int `json:"NORMALIZATION_LEVEL"` + AutotrimLevel int `json:"AUTOTRIM_LEVEL"` + UseMetaData bool `json:"USE_METADATA"` + SourceUri string `json:"SOURCE_URI"` + Timeout uint `json:"TIMEOUT"` +} + +func newWebSocketRequestData(conf *rhimport.Config) *webSocketRequestData { + rd := new(webSocketRequestData) + rd.Command = "" + rd.Id = "" + rd.UserName = "" + rd.Password = "" + rd.ShowId = 0 + rd.ClearShowCarts = false + rd.MusicPoolGroup = "" + rd.Cart = 0 + rd.ClearCart = false + rd.Cut = 0 + rd.Channels = conf.ImportParamDefaults.Channels + rd.NormalizationLevel = conf.ImportParamDefaults.NormalizationLevel + rd.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel + rd.UseMetaData = conf.ImportParamDefaults.UseMetaData + rd.SourceUri = "" + rd.Timeout = 0 + return rd +} + +type webSocketResponseBaseData struct { + ResponseCode int `json:"RESPONSE_CODE"` + Type string `json:"TYPE"` + ErrorString string `json:"ERROR_STRING"` + Id string `json:"ID"` + RefId string `json:"REFERENCE_ID"` +} + +type webSocketResponseListData struct { + webSocketResponseBaseData + Sessions map[string]string `json:"SESSIONS"` +} + +type webSocketResponseProgressData struct { + webSocketResponseBaseData + Step int `json:"PROGRESS_STEP"` + StepName string `json:"PROGRESS_STEP_NAME"` + Progress float64 `json:"PROGRESS"` +} + +type webSocketResponseDoneData struct { + webSocketResponseBaseData + Cart uint `json:"CART_NUMBER"` + Cut uint `json:"CUT_NUMBER"` +} + +func sendWebSocketResponse(ws *websocket.Conn, rd interface{}) { + if err := ws.WriteJSON(rd); err != nil { + rhdl.Println("WebScoket Client", ws.RemoteAddr(), "write error:", err) + } +} + +func sendWebSocketErrorResponse(ws *websocket.Conn, code int, errStr string) { + rd := &webSocketResponseBaseData{} + rd.ResponseCode = code + rd.Type = "error" + rd.ErrorString = errStr + sendWebSocketResponse(ws, rd) +} + +func sendWebSocketAckResponse(ws *websocket.Conn, code int, id, refid string) { + rd := &webSocketResponseBaseData{} + rd.ResponseCode = code + rd.Type = "ack" + rd.ErrorString = "OK" + rd.Id = id + rd.RefId = refid + sendWebSocketResponse(ws, rd) +} + +func sendWebSocketListResponse(ws *websocket.Conn, sessions map[string]string) { + rd := &webSocketResponseListData{} + rd.ResponseCode = http.StatusOK + rd.Type = "list" + rd.ErrorString = "OK" + rd.Sessions = sessions + sendWebSocketResponse(ws, rd) +} + +func sendWebSocketProgressResponse(ws *websocket.Conn, id, refid string, step int, stepName string, progress float64) { + rd := &webSocketResponseProgressData{} + rd.ResponseCode = http.StatusOK + rd.Type = "progress" + rd.ErrorString = "OK" + rd.Id = id + rd.RefId = refid + rd.Step = step + rd.StepName = stepName + rd.Progress = progress + sendWebSocketResponse(ws, rd) +} + +func sendWebSocketDoneResponse(ws *websocket.Conn, code int, errStr, id, refid string, cart, cut uint) { + rd := &webSocketResponseDoneData{} + rd.ResponseCode = code + rd.Type = "done" + rd.ErrorString = errStr + rd.Id = id + rd.RefId = refid + rd.Cart = cart + rd.Cut = cut + sendWebSocketResponse(ws, rd) +} + +type webSocketSession struct { + id string + refId string + session *rhimport.SessionChan + progresschan chan rhimport.ProgressData + donechan chan rhimport.Result +} + +func newWebSocketSession() *webSocketSession { + session := &webSocketSession{} + session.progresschan = make(chan rhimport.ProgressData, 10) + session.donechan = make(chan rhimport.Result, 1) + return session +} + +func webSocketProgress(step int, stepName string, progress float64, userdata interface{}) bool { + if math.IsNaN(progress) { + progress = 0.0 + } + c := userdata.(chan<- rhimport.ProgressData) + select { + case c <- rhimport.ProgressData{Step: step, StepName: stepName, Progress: progress}: + default: + } + return true +} + +func webSocketDone(res rhimport.Result, userdata interface{}) bool { + c := userdata.(chan<- rhimport.Result) + c <- res + return true +} + +func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) (int, string) { + ctx := rhimport.NewContext(conf, nil) + ctx.UserName = reqdata.UserName + ctx.Password = reqdata.Password + ctx.Trusted = false + ctx.ShowId = reqdata.ShowId + ctx.ClearShowCarts = reqdata.ClearShowCarts + ctx.GroupName = reqdata.MusicPoolGroup + ctx.Cart = reqdata.Cart + ctx.ClearCart = reqdata.ClearCart + ctx.Cut = reqdata.Cut + ctx.Channels = reqdata.Channels + ctx.NormalizationLevel = reqdata.NormalizationLevel + ctx.AutotrimLevel = reqdata.AutotrimLevel + ctx.UseMetaData = reqdata.UseMetaData + ctx.SourceUri = reqdata.SourceUri + + id, s, code, errstring := sessions.New(ctx, reqdata.RefId) + if code != http.StatusOK { + return code, errstring + } + self.id = id + self.refId = reqdata.RefId + self.session = s + if err := s.AddDoneHandler((chan<- rhimport.Result)(self.donechan), webSocketDone); err != nil { + return http.StatusInternalServerError, err.Error() + } + if err := s.AddProgressHandler((chan<- rhimport.ProgressData)(self.progresschan), webSocketProgress); err != nil { + return http.StatusInternalServerError, err.Error() + } + s.Run(time.Duration(reqdata.Timeout) * time.Second) + return http.StatusOK, "SUCCESS" +} + +func (self *webSocketSession) reconnectSession(reqdata *webSocketRequestData, sessions *rhimport.SessionStoreChan) (int, string) { + s, refId, code, errstring := sessions.Get(reqdata.UserName, reqdata.Id) + if code != http.StatusOK { + return code, errstring + } + self.id = reqdata.Id + self.refId = refId + self.session = s + if err := s.AddDoneHandler((chan<- rhimport.Result)(self.donechan), webSocketDone); err != nil { + return http.StatusInternalServerError, err.Error() + } + if err := s.AddProgressHandler((chan<- rhimport.ProgressData)(self.progresschan), webSocketProgress); err != nil { + return http.StatusInternalServerError, err.Error() + } + s.Run(time.Duration(reqdata.Timeout) * time.Second) + return http.StatusOK, "SUCCESS" +} + +func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) { + defer ws.Close() + + session := newWebSocketSession() + for { + select { + case reqdata, ok := <-reqchan: + if !ok { + return + } + switch reqdata.Command { + case "new": + if session.id != "" { + sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session") + } else { + code, errstring := session.startNewSession(&reqdata, conf, sessions) + if code != http.StatusOK { + sendWebSocketErrorResponse(ws, code, errstring) + } else { + sendWebSocketAckResponse(ws, code, session.id, session.refId) + } + } + case "cancel": + if session.id == "" { + sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection doesn't handle any session") + } else { + session.session.Cancel() + } + case "reconnect": + if session.id != "" { + sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session") + } else { + code, errstring := session.reconnectSession(&reqdata, sessions) + if code != http.StatusOK { + sendWebSocketErrorResponse(ws, code, errstring) + } else { + sendWebSocketAckResponse(ws, code, session.id, session.refId) + } + } + case "list": + list, code, errstring := sessions.List(reqdata.UserName, reqdata.Password, false) + if code != http.StatusOK { + sendWebSocketErrorResponse(ws, code, errstring) + } else { + sendWebSocketListResponse(ws, list) + } + default: + sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command)) + } + case p := <-session.progresschan: + sendWebSocketProgressResponse(ws, session.id, session.refId, p.Step, p.StepName, p.Progress*100) + case d := <-session.donechan: + sendWebSocketDoneResponse(ws, d.ResponseCode, d.ErrorString, session.id, session.refId, d.Cart, d.Cut) + // TODO: send close message at this point? + } + } +} + +func webSocketHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan, trusted bool, w http.ResponseWriter, r *http.Request) { + rhdl.Printf("WebSocketHandler: request for '%s'", html.EscapeString(r.URL.Path)) + + ws, err := websocket.Upgrade(w, r, nil, 1024, 1024) + if _, ok := err.(websocket.HandshakeError); ok { + http.Error(w, "Not a websocket handshake", 400) + return + } else if err != nil { + rhdl.Println("WebSocket Client", ws.RemoteAddr(), "error:", err) + return + } + rhdl.Println("WebSocket Client", ws.RemoteAddr(), "connected") + reqchan := make(chan webSocketRequestData) + go webSocketSessionHandler(reqchan, ws, conf, sessions) + defer close(reqchan) + + for { + reqdata := newWebSocketRequestData(conf) + if err := ws.ReadJSON(&reqdata); err != nil { + rhdl.Println("WebSocket Client", ws.RemoteAddr(), "disconnected:", err) + return + } else { + // rhdl.Printf("Websocket Client %s got: %+v", ws.RemoteAddr(), reqdata) + reqchan <- *reqdata + } + } +} |