diff options
Diffstat (limited to 'src/helsinki.at/rhimportd/ctrlWebSocket.go')
-rw-r--r-- | src/helsinki.at/rhimportd/ctrlWebSocket.go | 331 |
1 files changed, 0 insertions, 331 deletions
diff --git a/src/helsinki.at/rhimportd/ctrlWebSocket.go b/src/helsinki.at/rhimportd/ctrlWebSocket.go deleted file mode 100644 index a0386f1..0000000 --- a/src/helsinki.at/rhimportd/ctrlWebSocket.go +++ /dev/null @@ -1,331 +0,0 @@ -// -// rhimportd -// -// The Radio Helsinki Rivendell Import Daemon -// -// -// Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at> -// -// This file is part of rhimportd. -// -// rhimportd is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// any later version. -// -// rhimportd is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with rhimportd. If not, see <http://www.gnu.org/licenses/>. -// - -package main - -import ( - "fmt" - "github.com/gorilla/websocket" - "helsinki.at/rhimport" - "html" - "math" - "net/http" - "time" -) - -type webSocketRequestData struct { - Command string `json:"COMMAND"` - Id string `json:"ID"` - RefId string `json:"REFERENCE_ID"` - UserName string `json:"LOGIN_NAME"` - Password string `json:"PASSWORD"` - ShowId uint `json:"SHOW_ID"` - ClearShowCarts bool `json:"CLEAR_SHOW_CARTS"` - MusicPoolGroup string `json:"MUSIC_POOL_GROUP"` - Cart uint `json:"CART_NUMBER"` - ClearCart bool `json:"CLEAR_CART"` - Cut uint `json:"CUT_NUMBER"` - Channels uint `json:"CHANNELS"` - NormalizationLevel int `json:"NORMALIZATION_LEVEL"` - AutotrimLevel int `json:"AUTOTRIM_LEVEL"` - UseMetaData bool `json:"USE_METADATA"` - SourceUri string `json:"SOURCE_URI"` - Timeout uint `json:"TIMEOUT"` -} - -func newWebSocketRequestData(conf *rhimport.Config) *webSocketRequestData { - rd := new(webSocketRequestData) - rd.Command = "" - rd.Id = "" - rd.UserName = "" - rd.Password = "" - rd.ShowId = 0 - rd.ClearShowCarts = false - rd.MusicPoolGroup = "" - rd.Cart = 0 - rd.ClearCart = false - rd.Cut = 0 - rd.Channels = conf.ImportParamDefaults.Channels - rd.NormalizationLevel = conf.ImportParamDefaults.NormalizationLevel - rd.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel - rd.UseMetaData = conf.ImportParamDefaults.UseMetaData - rd.SourceUri = "" - rd.Timeout = 0 - return rd -} - -type webSocketResponseBaseData struct { - ResponseCode int `json:"RESPONSE_CODE"` - Type string `json:"TYPE"` - ErrorString string `json:"ERROR_STRING"` - Id string `json:"ID"` - RefId string `json:"REFERENCE_ID"` -} - -type webSocketResponseListData struct { - webSocketResponseBaseData - Sessions map[string]string `json:"SESSIONS"` -} - -type webSocketResponseProgressData struct { - webSocketResponseBaseData - Step int `json:"PROGRESS_STEP"` - StepName string `json:"PROGRESS_STEP_NAME"` - Progress float64 `json:"PROGRESS"` -} - -type webSocketResponseDoneData struct { - webSocketResponseBaseData - Cart uint `json:"CART_NUMBER"` - Cut uint `json:"CUT_NUMBER"` -} - -func sendWebSocketResponse(ws *websocket.Conn, rd interface{}) { - if err := ws.WriteJSON(rd); err != nil { - rhdl.Println("WebScoket Client", ws.RemoteAddr(), "write error:", err) - } -} - -func sendWebSocketErrorResponse(ws *websocket.Conn, code int, errStr string) { - rd := &webSocketResponseBaseData{} - rd.ResponseCode = code - rd.Type = "error" - rd.ErrorString = errStr - sendWebSocketResponse(ws, rd) -} - -func sendWebSocketAckResponse(ws *websocket.Conn, code int, id, refid string) { - rd := &webSocketResponseBaseData{} - rd.ResponseCode = code - rd.Type = "ack" - rd.ErrorString = "OK" - rd.Id = id - rd.RefId = refid - sendWebSocketResponse(ws, rd) -} - -func sendWebSocketListResponse(ws *websocket.Conn, sessions map[string]string) { - rd := &webSocketResponseListData{} - rd.ResponseCode = http.StatusOK - rd.Type = "list" - rd.ErrorString = "OK" - rd.Sessions = sessions - sendWebSocketResponse(ws, rd) -} - -func sendWebSocketProgressResponse(ws *websocket.Conn, id, refid string, step int, stepName string, progress float64) { - rd := &webSocketResponseProgressData{} - rd.ResponseCode = http.StatusOK - rd.Type = "progress" - rd.ErrorString = "OK" - rd.Id = id - rd.RefId = refid - rd.Step = step - rd.StepName = stepName - rd.Progress = progress - sendWebSocketResponse(ws, rd) -} - -func sendWebSocketDoneResponse(ws *websocket.Conn, code int, errStr, id, refid string, cart, cut uint) { - rd := &webSocketResponseDoneData{} - rd.ResponseCode = code - rd.Type = "done" - rd.ErrorString = errStr - rd.Id = id - rd.RefId = refid - rd.Cart = cart - rd.Cut = cut - sendWebSocketResponse(ws, rd) -} - -type webSocketSession struct { - id string - refId string - session *rhimport.SessionChan - progresschan chan rhimport.ProgressData - donechan chan rhimport.Result -} - -func newWebSocketSession() *webSocketSession { - session := &webSocketSession{} - session.progresschan = make(chan rhimport.ProgressData, 10) - session.donechan = make(chan rhimport.Result, 1) - return session -} - -func webSocketProgress(step int, stepName string, progress float64, userdata interface{}) bool { - if math.IsNaN(progress) { - progress = 0.0 - } - c := userdata.(chan<- rhimport.ProgressData) - select { - case c <- rhimport.ProgressData{Step: step, StepName: stepName, Progress: progress}: - default: - } - return true -} - -func webSocketDone(res rhimport.Result, userdata interface{}) bool { - c := userdata.(chan<- rhimport.Result) - c <- res - return true -} - -func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) (int, string) { - ctx := rhimport.NewContext(conf, nil) - ctx.UserName = reqdata.UserName - ctx.Password = reqdata.Password - ctx.Trusted = false - ctx.ShowId = reqdata.ShowId - ctx.ClearShowCarts = reqdata.ClearShowCarts - ctx.GroupName = reqdata.MusicPoolGroup - ctx.Cart = reqdata.Cart - ctx.ClearCart = reqdata.ClearCart - ctx.Cut = reqdata.Cut - ctx.Channels = reqdata.Channels - ctx.NormalizationLevel = reqdata.NormalizationLevel - ctx.AutotrimLevel = reqdata.AutotrimLevel - ctx.UseMetaData = reqdata.UseMetaData - ctx.SourceUri = reqdata.SourceUri - - id, s, code, errstring := sessions.New(ctx, reqdata.RefId) - if code != http.StatusOK { - return code, errstring - } - self.id = id - self.refId = reqdata.RefId - self.session = s - if err := s.AddDoneHandler((chan<- rhimport.Result)(self.donechan), webSocketDone); err != nil { - return http.StatusInternalServerError, err.Error() - } - if err := s.AddProgressHandler((chan<- rhimport.ProgressData)(self.progresschan), webSocketProgress); err != nil { - return http.StatusInternalServerError, err.Error() - } - s.Run(time.Duration(reqdata.Timeout) * time.Second) - return http.StatusOK, "SUCCESS" -} - -func (self *webSocketSession) reconnectSession(reqdata *webSocketRequestData, sessions *rhimport.SessionStoreChan) (int, string) { - s, refId, code, errstring := sessions.Get(reqdata.UserName, reqdata.Id) - if code != http.StatusOK { - return code, errstring - } - self.id = reqdata.Id - self.refId = refId - self.session = s - if err := s.AddDoneHandler((chan<- rhimport.Result)(self.donechan), webSocketDone); err != nil { - return http.StatusInternalServerError, err.Error() - } - if err := s.AddProgressHandler((chan<- rhimport.ProgressData)(self.progresschan), webSocketProgress); err != nil { - return http.StatusInternalServerError, err.Error() - } - s.Run(time.Duration(reqdata.Timeout) * time.Second) - return http.StatusOK, "SUCCESS" -} - -func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) { - defer ws.Close() - - session := newWebSocketSession() - for { - select { - case reqdata, ok := <-reqchan: - if !ok { - return - } - switch reqdata.Command { - case "new": - if session.id != "" { - sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session") - } else { - code, errstring := session.startNewSession(&reqdata, conf, sessions) - if code != http.StatusOK { - sendWebSocketErrorResponse(ws, code, errstring) - } else { - sendWebSocketAckResponse(ws, code, session.id, session.refId) - } - } - case "cancel": - if session.id == "" { - sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection doesn't handle any session") - } else { - session.session.Cancel() - } - case "reconnect": - if session.id != "" { - sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session") - } else { - code, errstring := session.reconnectSession(&reqdata, sessions) - if code != http.StatusOK { - sendWebSocketErrorResponse(ws, code, errstring) - } else { - sendWebSocketAckResponse(ws, code, session.id, session.refId) - } - } - case "list": - list, code, errstring := sessions.List(reqdata.UserName, reqdata.Password, false) - if code != http.StatusOK { - sendWebSocketErrorResponse(ws, code, errstring) - } else { - sendWebSocketListResponse(ws, list) - } - default: - sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command)) - } - case p := <-session.progresschan: - sendWebSocketProgressResponse(ws, session.id, session.refId, p.Step, p.StepName, p.Progress*100) - case d := <-session.donechan: - sendWebSocketDoneResponse(ws, d.ResponseCode, d.ErrorString, session.id, session.refId, d.Cart, d.Cut) - // TODO: send close message at this point? - } - } -} - -func webSocketHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan, trusted bool, w http.ResponseWriter, r *http.Request) { - rhdl.Printf("WebSocketHandler: request for '%s'", html.EscapeString(r.URL.Path)) - - ws, err := websocket.Upgrade(w, r, nil, 1024, 1024) - if _, ok := err.(websocket.HandshakeError); ok { - http.Error(w, "Not a websocket handshake", 400) - return - } else if err != nil { - rhdl.Println("WebSocket Client", ws.RemoteAddr(), "error:", err) - return - } - rhdl.Println("WebSocket Client", ws.RemoteAddr(), "connected") - reqchan := make(chan webSocketRequestData) - go webSocketSessionHandler(reqchan, ws, conf, sessions) - defer close(reqchan) - - for { - reqdata := newWebSocketRequestData(conf) - if err := ws.ReadJSON(&reqdata); err != nil { - rhdl.Println("WebSocket Client", ws.RemoteAddr(), "disconnected:", err) - return - } else { - // rhdl.Printf("Websocket Client %s got: %+v", ws.RemoteAddr(), reqdata) - reqchan <- *reqdata - } - } -} |