// // rhimportd // // The Radio Helsinki Rivendell Import Daemon // // // Copyright (C) 2015 Christian Pointner // // This file is part of rhimportd. // // rhimportd is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // rhimportd is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with rhimportd. If not, see . // package main import ( "fmt" "github.com/gorilla/websocket" "helsinki.at/rhimport" "html" "math" "net/http" "time" ) type webSocketRequestData struct { Command string `json:"COMMAND"` Id string `json:"ID"` UserName string `json:"LOGIN_NAME"` Password string `json:"PASSWORD"` ShowId uint `json:"SHOW_ID"` ClearShowCarts bool `json:"CLEAR_SHOW_CARTS"` MusicPoolGroup string `json:"MUSIC_POOL_GROUP"` Cart uint `json:"CART_NUMBER"` ClearCart bool `json:"CLEAR_CART"` Cut uint `json:"CUT_NUMBER"` Channels uint `json:"CHANNELS"` NormalizationLevel int `json:"NORMALIZATION_LEVEL"` AutotrimLevel int `json:"AUTOTRIM_LEVEL"` UseMetaData bool `json:"USE_METADATA"` SourceUri string `json:"SOURCE_URI"` Timeout uint `json:"TIMEOUT"` } func newWebSocketRequestData(conf *rhimport.Config) *webSocketRequestData { rd := new(webSocketRequestData) rd.Command = "" rd.Id = "" rd.UserName = "" rd.Password = "" rd.ShowId = 0 rd.ClearShowCarts = false rd.MusicPoolGroup = "" rd.Cart = 0 rd.ClearCart = false rd.Cut = 0 rd.Channels = conf.ImportParamDefaults.Channels rd.NormalizationLevel = conf.ImportParamDefaults.NormalizationLevel rd.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel rd.UseMetaData = conf.ImportParamDefaults.UseMetaData rd.SourceUri = "" rd.Timeout = 0 return rd } type webSocketResponseData struct { ResponseCode int `json:"RESPONSE_CODE"` Type string `json:"TYPE"` ErrorString string `json:"ERROR_STRING"` Id string `json:"ID"` ProgressStep int `json:"PROGRESS_STEP"` ProgressStepName string `json:"PROGRESS_STEP_NAME"` Progress float64 `json:"PROGRESS"` Cart uint `json:"CART_NUMBER"` Cut uint `json:"CUT_NUMBER"` } func sendWebSocketResponse(ws *websocket.Conn, rd *webSocketResponseData) { if err := ws.WriteJSON(*rd); err != nil { rhdl.Println("WebScoket Client", ws.RemoteAddr(), "write error:", err) } } func sendWebSocketErrorResponse(ws *websocket.Conn, id string, code int, err_str string) { sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ERROR", ErrorString: err_str, Id: id}) } type webSocketSession struct { id string session *rhimport.SessionChan respchan chan webSocketResponseData donechan chan rhimport.ImportResult } func newWebSocketSession() *webSocketSession { session := &webSocketSession{} session.respchan = make(chan webSocketResponseData, 10) session.donechan = make(chan rhimport.ImportResult, 1) return session } func webSocketProgress(step int, step_name string, progress float64, userdata interface{}) bool { if math.IsNaN(progress) { progress = 0.0 } session := userdata.(*webSocketSession) select { case session.respchan <- webSocketResponseData{http.StatusOK, "PROGRESS", "", session.id, step, step_name, progress * 100, 0, 0}: default: } return true } func webSocketDone(res rhimport.ImportResult, userdata interface{}) bool { session := userdata.(*webSocketSession) session.donechan <- res return true } func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan) (int, string) { ctx := rhimport.NewImportContext(conf, rddb, reqdata.UserName) ctx.Password = reqdata.Password ctx.Trusted = true // set this to false as soon as the interface is working ctx.ShowId = reqdata.ShowId ctx.ClearShowCarts = reqdata.ClearShowCarts ctx.GroupName = reqdata.MusicPoolGroup ctx.Cart = reqdata.Cart ctx.ClearCart = reqdata.ClearCart ctx.Cut = reqdata.Cut ctx.Channels = reqdata.Channels ctx.NormalizationLevel = reqdata.NormalizationLevel ctx.AutotrimLevel = reqdata.AutotrimLevel ctx.UseMetaData = reqdata.UseMetaData ctx.SourceUri = reqdata.SourceUri id, s, code, errstring := sessions.New(ctx) if code != http.StatusOK { return code, errstring } self.id = id self.session = s if err := s.AddDoneHandler(self, webSocketDone); err != nil { return http.StatusInternalServerError, err.Error() } if err := s.AddProgressHandler(self, webSocketProgress); err != nil { return http.StatusInternalServerError, err.Error() } s.Run(time.Duration(reqdata.Timeout) * time.Second) return http.StatusOK, "SUCCESS" } func (self *webSocketSession) reconnectSession(reqdata *webSocketRequestData, sessions *rhimport.SessionStoreChan) (int, string) { s, code, errstring := sessions.Get(reqdata.UserName, reqdata.Id) if code != http.StatusOK { return code, errstring } self.id = reqdata.Id self.session = s if err := s.AddDoneHandler(self, webSocketDone); err != nil { return http.StatusInternalServerError, err.Error() } if err := s.AddProgressHandler(self, webSocketProgress); err != nil { return http.StatusInternalServerError, err.Error() } s.Run(time.Duration(reqdata.Timeout) * time.Second) return http.StatusOK, "SUCCESS" } func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn, conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan) { defer ws.Close() session := newWebSocketSession() for { select { case reqdata, ok := <-reqchan: if !ok { return } switch reqdata.Command { case "new": if session.id != "" { sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection already handles a session") return } code, errstring := session.startNewSession(&reqdata, conf, rddb, sessions) if code != http.StatusOK { sendWebSocketErrorResponse(ws, "", code, errstring) return } else { sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id}) } case "cancel": if session.id == "" { sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection doesn't handle any session") return } session.session.Cancel() case "reconnect": if session.id != "" { sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, "This connection already handles a session") return } code, errstring := session.reconnectSession(&reqdata, sessions) if code != http.StatusOK { sendWebSocketErrorResponse(ws, "", code, errstring) return } else { sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id}) } default: sendWebSocketErrorResponse(ws, "", http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command)) return } case respdata := <-session.respchan: sendWebSocketResponse(ws, &respdata) case donedata := <-session.donechan: sendWebSocketResponse(ws, &webSocketResponseData{donedata.ResponseCode, "DONE", donedata.ErrorString, session.id, 0, "", 100.0, donedata.Cart, donedata.Cut}) // TODO: send close message at this point? } } } func webSocketHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan, trusted bool, w http.ResponseWriter, r *http.Request) { rhdl.Printf("WebSocketHandler: request for '%s'", html.EscapeString(r.URL.Path)) ws, err := websocket.Upgrade(w, r, nil, 1024, 1024) if _, ok := err.(websocket.HandshakeError); ok { http.Error(w, "Not a websocket handshake", 400) return } else if err != nil { rhdl.Println("WebSocket Client", ws.RemoteAddr(), "error:", err) return } rhdl.Println("WebSocket Client", ws.RemoteAddr(), "connected") reqchan := make(chan webSocketRequestData) go webSocketSessionHandler(reqchan, ws, conf, rddb, sessions) defer close(reqchan) for { reqdata := newWebSocketRequestData(conf) if err := ws.ReadJSON(&reqdata); err != nil { rhdl.Println("WebSocket Client", ws.RemoteAddr(), "disconnected:", err) return } else { // rhdl.Printf("Websocket Client %s got: %+v", ws.RemoteAddr(), reqdata) reqchan <- *reqdata } } return }