summaryrefslogtreecommitdiff
path: root/src/rhimportd/ctrlWebSocket.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/rhimportd/ctrlWebSocket.go')
-rw-r--r--src/rhimportd/ctrlWebSocket.go331
1 files changed, 331 insertions, 0 deletions
diff --git a/src/rhimportd/ctrlWebSocket.go b/src/rhimportd/ctrlWebSocket.go
new file mode 100644
index 0000000..a0386f1
--- /dev/null
+++ b/src/rhimportd/ctrlWebSocket.go
@@ -0,0 +1,331 @@
+//
+// rhimportd
+//
+// The Radio Helsinki Rivendell Import Daemon
+//
+//
+// Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at>
+//
+// This file is part of rhimportd.
+//
+// rhimportd is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// any later version.
+//
+// rhimportd is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with rhimportd. If not, see <http://www.gnu.org/licenses/>.
+//
+
+package main
+
+import (
+ "fmt"
+ "github.com/gorilla/websocket"
+ "helsinki.at/rhimport"
+ "html"
+ "math"
+ "net/http"
+ "time"
+)
+
+type webSocketRequestData struct {
+ Command string `json:"COMMAND"`
+ Id string `json:"ID"`
+ RefId string `json:"REFERENCE_ID"`
+ UserName string `json:"LOGIN_NAME"`
+ Password string `json:"PASSWORD"`
+ ShowId uint `json:"SHOW_ID"`
+ ClearShowCarts bool `json:"CLEAR_SHOW_CARTS"`
+ MusicPoolGroup string `json:"MUSIC_POOL_GROUP"`
+ Cart uint `json:"CART_NUMBER"`
+ ClearCart bool `json:"CLEAR_CART"`
+ Cut uint `json:"CUT_NUMBER"`
+ Channels uint `json:"CHANNELS"`
+ NormalizationLevel int `json:"NORMALIZATION_LEVEL"`
+ AutotrimLevel int `json:"AUTOTRIM_LEVEL"`
+ UseMetaData bool `json:"USE_METADATA"`
+ SourceUri string `json:"SOURCE_URI"`
+ Timeout uint `json:"TIMEOUT"`
+}
+
+func newWebSocketRequestData(conf *rhimport.Config) *webSocketRequestData {
+ rd := new(webSocketRequestData)
+ rd.Command = ""
+ rd.Id = ""
+ rd.UserName = ""
+ rd.Password = ""
+ rd.ShowId = 0
+ rd.ClearShowCarts = false
+ rd.MusicPoolGroup = ""
+ rd.Cart = 0
+ rd.ClearCart = false
+ rd.Cut = 0
+ rd.Channels = conf.ImportParamDefaults.Channels
+ rd.NormalizationLevel = conf.ImportParamDefaults.NormalizationLevel
+ rd.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel
+ rd.UseMetaData = conf.ImportParamDefaults.UseMetaData
+ rd.SourceUri = ""
+ rd.Timeout = 0
+ return rd
+}
+
+type webSocketResponseBaseData struct {
+ ResponseCode int `json:"RESPONSE_CODE"`
+ Type string `json:"TYPE"`
+ ErrorString string `json:"ERROR_STRING"`
+ Id string `json:"ID"`
+ RefId string `json:"REFERENCE_ID"`
+}
+
+type webSocketResponseListData struct {
+ webSocketResponseBaseData
+ Sessions map[string]string `json:"SESSIONS"`
+}
+
+type webSocketResponseProgressData struct {
+ webSocketResponseBaseData
+ Step int `json:"PROGRESS_STEP"`
+ StepName string `json:"PROGRESS_STEP_NAME"`
+ Progress float64 `json:"PROGRESS"`
+}
+
+type webSocketResponseDoneData struct {
+ webSocketResponseBaseData
+ Cart uint `json:"CART_NUMBER"`
+ Cut uint `json:"CUT_NUMBER"`
+}
+
+func sendWebSocketResponse(ws *websocket.Conn, rd interface{}) {
+ if err := ws.WriteJSON(rd); err != nil {
+ rhdl.Println("WebScoket Client", ws.RemoteAddr(), "write error:", err)
+ }
+}
+
+func sendWebSocketErrorResponse(ws *websocket.Conn, code int, errStr string) {
+ rd := &webSocketResponseBaseData{}
+ rd.ResponseCode = code
+ rd.Type = "error"
+ rd.ErrorString = errStr
+ sendWebSocketResponse(ws, rd)
+}
+
+func sendWebSocketAckResponse(ws *websocket.Conn, code int, id, refid string) {
+ rd := &webSocketResponseBaseData{}
+ rd.ResponseCode = code
+ rd.Type = "ack"
+ rd.ErrorString = "OK"
+ rd.Id = id
+ rd.RefId = refid
+ sendWebSocketResponse(ws, rd)
+}
+
+func sendWebSocketListResponse(ws *websocket.Conn, sessions map[string]string) {
+ rd := &webSocketResponseListData{}
+ rd.ResponseCode = http.StatusOK
+ rd.Type = "list"
+ rd.ErrorString = "OK"
+ rd.Sessions = sessions
+ sendWebSocketResponse(ws, rd)
+}
+
+func sendWebSocketProgressResponse(ws *websocket.Conn, id, refid string, step int, stepName string, progress float64) {
+ rd := &webSocketResponseProgressData{}
+ rd.ResponseCode = http.StatusOK
+ rd.Type = "progress"
+ rd.ErrorString = "OK"
+ rd.Id = id
+ rd.RefId = refid
+ rd.Step = step
+ rd.StepName = stepName
+ rd.Progress = progress
+ sendWebSocketResponse(ws, rd)
+}
+
+func sendWebSocketDoneResponse(ws *websocket.Conn, code int, errStr, id, refid string, cart, cut uint) {
+ rd := &webSocketResponseDoneData{}
+ rd.ResponseCode = code
+ rd.Type = "done"
+ rd.ErrorString = errStr
+ rd.Id = id
+ rd.RefId = refid
+ rd.Cart = cart
+ rd.Cut = cut
+ sendWebSocketResponse(ws, rd)
+}
+
+type webSocketSession struct {
+ id string
+ refId string
+ session *rhimport.SessionChan
+ progresschan chan rhimport.ProgressData
+ donechan chan rhimport.Result
+}
+
+func newWebSocketSession() *webSocketSession {
+ session := &webSocketSession{}
+ session.progresschan = make(chan rhimport.ProgressData, 10)
+ session.donechan = make(chan rhimport.Result, 1)
+ return session
+}
+
+func webSocketProgress(step int, stepName string, progress float64, userdata interface{}) bool {
+ if math.IsNaN(progress) {
+ progress = 0.0
+ }
+ c := userdata.(chan<- rhimport.ProgressData)
+ select {
+ case c <- rhimport.ProgressData{Step: step, StepName: stepName, Progress: progress}:
+ default:
+ }
+ return true
+}
+
+func webSocketDone(res rhimport.Result, userdata interface{}) bool {
+ c := userdata.(chan<- rhimport.Result)
+ c <- res
+ return true
+}
+
+func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) (int, string) {
+ ctx := rhimport.NewContext(conf, nil)
+ ctx.UserName = reqdata.UserName
+ ctx.Password = reqdata.Password
+ ctx.Trusted = false
+ ctx.ShowId = reqdata.ShowId
+ ctx.ClearShowCarts = reqdata.ClearShowCarts
+ ctx.GroupName = reqdata.MusicPoolGroup
+ ctx.Cart = reqdata.Cart
+ ctx.ClearCart = reqdata.ClearCart
+ ctx.Cut = reqdata.Cut
+ ctx.Channels = reqdata.Channels
+ ctx.NormalizationLevel = reqdata.NormalizationLevel
+ ctx.AutotrimLevel = reqdata.AutotrimLevel
+ ctx.UseMetaData = reqdata.UseMetaData
+ ctx.SourceUri = reqdata.SourceUri
+
+ id, s, code, errstring := sessions.New(ctx, reqdata.RefId)
+ if code != http.StatusOK {
+ return code, errstring
+ }
+ self.id = id
+ self.refId = reqdata.RefId
+ self.session = s
+ if err := s.AddDoneHandler((chan<- rhimport.Result)(self.donechan), webSocketDone); err != nil {
+ return http.StatusInternalServerError, err.Error()
+ }
+ if err := s.AddProgressHandler((chan<- rhimport.ProgressData)(self.progresschan), webSocketProgress); err != nil {
+ return http.StatusInternalServerError, err.Error()
+ }
+ s.Run(time.Duration(reqdata.Timeout) * time.Second)
+ return http.StatusOK, "SUCCESS"
+}
+
+func (self *webSocketSession) reconnectSession(reqdata *webSocketRequestData, sessions *rhimport.SessionStoreChan) (int, string) {
+ s, refId, code, errstring := sessions.Get(reqdata.UserName, reqdata.Id)
+ if code != http.StatusOK {
+ return code, errstring
+ }
+ self.id = reqdata.Id
+ self.refId = refId
+ self.session = s
+ if err := s.AddDoneHandler((chan<- rhimport.Result)(self.donechan), webSocketDone); err != nil {
+ return http.StatusInternalServerError, err.Error()
+ }
+ if err := s.AddProgressHandler((chan<- rhimport.ProgressData)(self.progresschan), webSocketProgress); err != nil {
+ return http.StatusInternalServerError, err.Error()
+ }
+ s.Run(time.Duration(reqdata.Timeout) * time.Second)
+ return http.StatusOK, "SUCCESS"
+}
+
+func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) {
+ defer ws.Close()
+
+ session := newWebSocketSession()
+ for {
+ select {
+ case reqdata, ok := <-reqchan:
+ if !ok {
+ return
+ }
+ switch reqdata.Command {
+ case "new":
+ if session.id != "" {
+ sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session")
+ } else {
+ code, errstring := session.startNewSession(&reqdata, conf, sessions)
+ if code != http.StatusOK {
+ sendWebSocketErrorResponse(ws, code, errstring)
+ } else {
+ sendWebSocketAckResponse(ws, code, session.id, session.refId)
+ }
+ }
+ case "cancel":
+ if session.id == "" {
+ sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection doesn't handle any session")
+ } else {
+ session.session.Cancel()
+ }
+ case "reconnect":
+ if session.id != "" {
+ sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session")
+ } else {
+ code, errstring := session.reconnectSession(&reqdata, sessions)
+ if code != http.StatusOK {
+ sendWebSocketErrorResponse(ws, code, errstring)
+ } else {
+ sendWebSocketAckResponse(ws, code, session.id, session.refId)
+ }
+ }
+ case "list":
+ list, code, errstring := sessions.List(reqdata.UserName, reqdata.Password, false)
+ if code != http.StatusOK {
+ sendWebSocketErrorResponse(ws, code, errstring)
+ } else {
+ sendWebSocketListResponse(ws, list)
+ }
+ default:
+ sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command))
+ }
+ case p := <-session.progresschan:
+ sendWebSocketProgressResponse(ws, session.id, session.refId, p.Step, p.StepName, p.Progress*100)
+ case d := <-session.donechan:
+ sendWebSocketDoneResponse(ws, d.ResponseCode, d.ErrorString, session.id, session.refId, d.Cart, d.Cut)
+ // TODO: send close message at this point?
+ }
+ }
+}
+
+func webSocketHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan, trusted bool, w http.ResponseWriter, r *http.Request) {
+ rhdl.Printf("WebSocketHandler: request for '%s'", html.EscapeString(r.URL.Path))
+
+ ws, err := websocket.Upgrade(w, r, nil, 1024, 1024)
+ if _, ok := err.(websocket.HandshakeError); ok {
+ http.Error(w, "Not a websocket handshake", 400)
+ return
+ } else if err != nil {
+ rhdl.Println("WebSocket Client", ws.RemoteAddr(), "error:", err)
+ return
+ }
+ rhdl.Println("WebSocket Client", ws.RemoteAddr(), "connected")
+ reqchan := make(chan webSocketRequestData)
+ go webSocketSessionHandler(reqchan, ws, conf, sessions)
+ defer close(reqchan)
+
+ for {
+ reqdata := newWebSocketRequestData(conf)
+ if err := ws.ReadJSON(&reqdata); err != nil {
+ rhdl.Println("WebSocket Client", ws.RemoteAddr(), "disconnected:", err)
+ return
+ } else {
+ // rhdl.Printf("Websocket Client %s got: %+v", ws.RemoteAddr(), reqdata)
+ reqchan <- *reqdata
+ }
+ }
+}