// // rhimportd // // The Radio Helsinki Rivendell Import Daemon // // // Copyright (C) 2015-2016 Christian Pointner // // This file is part of rhimportd. // // rhimportd is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // rhimportd is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with rhimportd. If not, see . // package main import ( "code.helsinki.at/rhrd-go/rddb" "code.helsinki.at/rhrd-go/rhimport" "encoding/json" "fmt" "github.com/gorilla/websocket" "html" "io" "io/ioutil" "math" "net/http" "time" ) type webSocketRequestData struct { Command string `json:"COMMAND"` Id string `json:"ID"` RefId string `json:"REFERENCE_ID"` UserName string `json:"LOGIN_NAME"` Password string `json:"PASSWORD"` ShowId uint `json:"SHOW_ID"` ClearShowCarts bool `json:"CLEAR_SHOW_CARTS"` MusicPoolGroup string `json:"MUSIC_POOL_GROUP"` Cart uint `json:"CART_NUMBER"` ClearCart bool `json:"CLEAR_CART"` Cut uint `json:"CUT_NUMBER"` Channels uint `json:"CHANNELS"` NormalizationLevel int `json:"NORMALIZATION_LEVEL"` AutotrimLevel int `json:"AUTOTRIM_LEVEL"` UseMetaData bool `json:"USE_METADATA"` SourceUri string `json:"SOURCE_URI"` Timeout uint `json:"TIMEOUT"` } func newWebSocketRequestData(conf *rhimport.Config) *webSocketRequestData { rd := new(webSocketRequestData) rd.Command = "" rd.Id = "" rd.UserName = "" rd.Password = "" rd.ShowId = 0 rd.ClearShowCarts = false rd.MusicPoolGroup = "" rd.Cart = 0 rd.ClearCart = false rd.Cut = 0 rd.Channels = conf.ImportParamDefaults.Channels rd.NormalizationLevel = conf.ImportParamDefaults.NormalizationLevel rd.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel rd.UseMetaData = conf.ImportParamDefaults.UseMetaData rd.SourceUri = "" rd.Timeout = 0 return rd } type webSocketResponseBaseData struct { ResponseCode int `json:"RESPONSE_CODE"` Type string `json:"TYPE"` ErrorString string `json:"ERROR_STRING"` Id string `json:"ID"` RefId string `json:"REFERENCE_ID"` } type webSocketResponseListData struct { webSocketResponseBaseData Added map[string]string `json:"SESSIONS_ADDED"` Removed map[string]string `json:"SESSIONS_REMOVED"` } type webSocketResponseProgressData struct { webSocketResponseBaseData Step int `json:"PROGRESS_STEP"` StepName string `json:"PROGRESS_STEP_NAME"` Current float64 `json:"CURRENT"` Total float64 `json:"TOTAL"` Progress float64 `json:"PROGRESS"` Title string `json:"TITLE"` Cart uint `json:"CART_NUMBER"` Cut uint `json:"CUT_NUMBER"` } type webSocketResponseDoneData struct { webSocketResponseBaseData Cart uint `json:"CART_NUMBER"` Cut uint `json:"CUT_NUMBER"` } func sendWebSocketResponse(ws *websocket.Conn, rd interface{}) { if err := ws.WriteJSON(rd); err != nil { rhdl.Println("WebScoket Client", ws.RemoteAddr(), "write error:", err) } } func sendWebSocketErrorResponse(ws *websocket.Conn, code int, errStr string) { rd := &webSocketResponseBaseData{} rd.ResponseCode = code rd.Type = "error" rd.ErrorString = errStr sendWebSocketResponse(ws, rd) } func sendWebSocketAckResponse(ws *websocket.Conn, code int, id, refid string) { rd := &webSocketResponseBaseData{} rd.ResponseCode = code rd.Type = "ack" rd.ErrorString = "OK" rd.Id = id rd.RefId = refid sendWebSocketResponse(ws, rd) } func sendWebSocketListResponse(ws *websocket.Conn, added map[string]string, removed map[string]string) { rd := &webSocketResponseListData{} rd.ResponseCode = http.StatusOK rd.Type = "list" rd.ErrorString = "OK" if added == nil { rd.Added = make(map[string]string) } else { rd.Added = added } if removed == nil { rd.Removed = make(map[string]string) } else { rd.Removed = removed } sendWebSocketResponse(ws, rd) } func sendWebSocketProgressResponse(ws *websocket.Conn, id, refid string, step int, stepName string, current, total float64, title string, cart, cut uint) { progress := current / total if math.IsNaN(progress) || math.IsInf(progress, 0) { progress = 0.0 } rd := &webSocketResponseProgressData{} rd.ResponseCode = http.StatusOK rd.Type = "progress" rd.ErrorString = "OK" rd.Id = id rd.RefId = refid rd.Step = step rd.StepName = stepName rd.Current = current rd.Total = total rd.Progress = progress * 100 rd.Title = title rd.Cart = cart rd.Cut = cut sendWebSocketResponse(ws, rd) } func sendWebSocketDoneResponse(ws *websocket.Conn, code int, errStr, id, refid string, cart, cut uint) { rd := &webSocketResponseDoneData{} rd.ResponseCode = code rd.Type = "done" rd.ErrorString = errStr rd.Id = id rd.RefId = refid rd.Cart = cart rd.Cut = cut sendWebSocketResponse(ws, rd) } type webSocketSession struct { id string refId string session *rhimport.SessionChan progresschan chan rhimport.ProgressData donechan chan rhimport.Result } func newWebSocketSession() *webSocketSession { session := &webSocketSession{} session.progresschan = make(chan rhimport.ProgressData, 10) session.donechan = make(chan rhimport.Result, 1) return session } func webSocketProgress(step int, stepName string, current, total float64, title string, cart, cut uint, userdata interface{}) bool { if math.IsNaN(current) || math.IsInf(current, 0) { current = 0.0 } if math.IsNaN(total) || math.IsInf(total, 0) { total = 0.0 } c := userdata.(chan<- rhimport.ProgressData) select { case c <- rhimport.ProgressData{Step: step, StepName: stepName, Current: current, Total: total, Title: title, Cart: cart, Cut: cut}: default: } return true } func webSocketDone(res rhimport.Result, userdata interface{}) bool { c := userdata.(chan<- rhimport.Result) c <- res return true } func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, binchan <-chan []byte, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) (int, string) { ctx := rhimport.NewContext(conf, nil) ctx.UserName = reqdata.UserName ctx.Password = reqdata.Password ctx.Trusted = false ctx.ShowId = reqdata.ShowId ctx.ClearShowCarts = reqdata.ClearShowCarts ctx.GroupName = reqdata.MusicPoolGroup ctx.Cart = reqdata.Cart ctx.ClearCart = reqdata.ClearCart ctx.Cut = reqdata.Cut ctx.Channels = reqdata.Channels ctx.NormalizationLevel = reqdata.NormalizationLevel ctx.AutotrimLevel = reqdata.AutotrimLevel ctx.UseMetaData = reqdata.UseMetaData ctx.SourceUri = reqdata.SourceUri ctx.AttachmentChan = binchan id, s, code, errstring := sessions.New(ctx, reqdata.RefId) if code != http.StatusOK { return code, errstring } self.id = id self.refId = reqdata.RefId self.session = s if err := s.AddDoneHandler((chan<- rhimport.Result)(self.donechan), webSocketDone); err != nil { return http.StatusInternalServerError, err.Error() } if err := s.AddProgressHandler((chan<- rhimport.ProgressData)(self.progresschan), webSocketProgress); err != nil { return http.StatusInternalServerError, err.Error() } s.Run(time.Duration(reqdata.Timeout) * time.Second) return http.StatusOK, "SUCCESS" } func (self *webSocketSession) reconnectSession(reqdata *webSocketRequestData, sessions *rhimport.SessionStoreChan) (int, string) { s, refId, code, errstring := sessions.Get(reqdata.UserName, reqdata.Id) if code != http.StatusOK { return code, errstring } self.id = reqdata.Id self.refId = refId self.session = s if err := s.AddDoneHandler((chan<- rhimport.Result)(self.donechan), webSocketDone); err != nil { return http.StatusInternalServerError, err.Error() } if err := s.AddProgressHandler((chan<- rhimport.ProgressData)(self.progresschan), webSocketProgress); err != nil { return http.StatusInternalServerError, err.Error() } s.Run(time.Duration(reqdata.Timeout) * time.Second) return http.StatusOK, "SUCCESS" } type webSocketSessionListUpdate struct { added map[string]string removed map[string]string } type webSocketSessionListUpdateCBArg struct { closed <-chan bool updates chan<- webSocketSessionListUpdate } func webSocketListUpdate(added, removed map[string]string, userdata interface{}) bool { arg := userdata.(webSocketSessionListUpdateCBArg) if len(arg.closed) > 0 { return false } select { case arg.updates <- webSocketSessionListUpdate{added, removed}: default: } return true } func webSocketSessionHandler(reqchan <-chan webSocketRequestData, binchan <-chan []byte, ws *websocket.Conn, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) { defer ws.Close() closed := make(chan bool, 1) defer func() { closed <- true }() listUpdates := make(chan webSocketSessionListUpdate, 5) listUpdateCBArg := webSocketSessionListUpdateCBArg{closed, listUpdates} session := newWebSocketSession() for { select { case reqdata, ok := <-reqchan: if !ok { return } switch reqdata.Command { case "new": if session.id != "" { sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session") } else { code, errstring := session.startNewSession(&reqdata, binchan, conf, sessions) if code != http.StatusOK { sendWebSocketErrorResponse(ws, code, errstring) } else { sendWebSocketAckResponse(ws, code, session.id, session.refId) } } case "cancel": if session.id == "" { sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection doesn't handle any session") } else { session.session.Cancel() } case "reconnect": if session.id != "" { sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session") } else { code, errstring := session.reconnectSession(&reqdata, sessions) if code != http.StatusOK { sendWebSocketErrorResponse(ws, code, errstring) } else { sendWebSocketAckResponse(ws, code, session.id, session.refId) } } case "list": sessions, code, errstring := sessions.List(reqdata.UserName, reqdata.Password, false, listUpdateCBArg, webSocketListUpdate) if code != http.StatusOK { sendWebSocketErrorResponse(ws, code, errstring) } else { sendWebSocketListResponse(ws, sessions, nil) } default: sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command)) } case u := <-listUpdates: sendWebSocketListResponse(ws, u.added, u.removed) case p := <-session.progresschan: sendWebSocketProgressResponse(ws, session.id, session.refId, p.Step, p.StepName, p.Current, p.Total, p.Title, p.Cart, p.Cut) case d := <-session.donechan: sendWebSocketDoneResponse(ws, d.ResponseCode, d.ErrorString, session.id, session.refId, d.Cart, d.Cut) rhdl.Println("WebSocket Client", ws.RemoteAddr(), "done sent: sending close message.") ws.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Minute)) } } } func webSocketHandler(conf *rhimport.Config, db *rddb.DBChan, sessions *rhimport.SessionStoreChan, trusted bool, w http.ResponseWriter, r *http.Request) { rhdl.Printf("WebSocketHandler: request for '%s'", html.EscapeString(r.URL.Path)) ws, err := websocket.Upgrade(w, r, nil, 64*1024, 64*1024) if _, ok := err.(websocket.HandshakeError); ok { http.Error(w, "Not a websocket handshake", 400) return } else if err != nil { rhdl.Println("WebSocket Client", ws.RemoteAddr(), "error:", err) return } rhdl.Println("WebSocket Client", ws.RemoteAddr(), "connected") reqchan := make(chan webSocketRequestData) binchan := make(chan []byte) go webSocketSessionHandler(reqchan, binchan, ws, conf, sessions) defer close(reqchan) defer close(binchan) for { t, r, err := ws.NextReader() if err != nil { rhdl.Println("WebSocket Client", ws.RemoteAddr(), "disconnected:", err) return } switch t { case websocket.TextMessage: reqdata := newWebSocketRequestData(conf) if err := json.NewDecoder(r).Decode(&reqdata); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF } rhdl.Println("WebSocket Client", ws.RemoteAddr(), "request error:", err) sendWebSocketErrorResponse(ws, http.StatusBadRequest, err.Error()) return } // rhdl.Printf("Websocket Client %s got: %+v", ws.RemoteAddr(), reqdata) reqchan <- *reqdata case websocket.BinaryMessage: data, err := ioutil.ReadAll(r) if err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF } rhdl.Println("WebSocket Client", ws.RemoteAddr(), "disconnected:", err) sendWebSocketErrorResponse(ws, http.StatusInternalServerError, err.Error()) return } // rhdl.Printf("WebSocket Client %s: got binary message (%d bytes)", ws.RemoteAddr(), len(data)) binchan <- data } } }