summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2015-12-29 19:13:44 (GMT)
committerChristian Pointner <equinox@helsinki.at>2015-12-29 19:13:44 (GMT)
commit6467923f8caaa02829fc53666b32df594da3ea68 (patch)
treebb3cdcc4f93dfee929b2c991e8bcbc1bfc1ed6e9 /src
parent4911fe3fd94019fcca730710e0b411e8e5a66847 (diff)
websocket handler new/run works now (needs testing!)
Diffstat (limited to 'src')
-rw-r--r--src/helsinki.at/rhimportd/ctrlWeb.go1
-rw-r--r--src/helsinki.at/rhimportd/ctrlWebSocket.go133
2 files changed, 104 insertions, 30 deletions
diff --git a/src/helsinki.at/rhimportd/ctrlWeb.go b/src/helsinki.at/rhimportd/ctrlWeb.go
index 2a95676..794f8aa 100644
--- a/src/helsinki.at/rhimportd/ctrlWeb.go
+++ b/src/helsinki.at/rhimportd/ctrlWeb.go
@@ -48,7 +48,6 @@ func StartControlWeb(addr_s string, conf *rhimport.Config, rddb *rhimport.RdDbCh
http.Handle("/trusted/simple", webHandler{conf, rddb, sessions, true, webSimpleHandler})
http.Handle("/public/socket", webHandler{conf, rddb, sessions, false, webSocketHandler})
- http.Handle("/trusted/socket", webHandler{conf, rddb, sessions, true, webSocketHandler})
rhl.Println("web-ctrl: listening on", addr_s)
server := &http.Server{Addr: addr_s, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second}
diff --git a/src/helsinki.at/rhimportd/ctrlWebSocket.go b/src/helsinki.at/rhimportd/ctrlWebSocket.go
index b3f10f6..d14dd90 100644
--- a/src/helsinki.at/rhimportd/ctrlWebSocket.go
+++ b/src/helsinki.at/rhimportd/ctrlWebSocket.go
@@ -25,12 +25,13 @@
package main
import (
- // "encoding/json"
"fmt"
"github.com/gorilla/websocket"
"helsinki.at/rhimport"
"html"
+ "math"
"net/http"
+ "time"
)
type webSocketRequestData struct {
@@ -49,6 +50,7 @@ type webSocketRequestData struct {
AutotrimLevel int `json:"AUTOTRIM_LEVEL"`
UseMetaData bool `json:"USE_METADATA"`
SourceUri string `json:"SOURCE_URI"`
+ Timeout uint `json:"TIMEOUT"`
}
func newWebSocketRequestData(conf *rhimport.Config) *webSocketRequestData {
@@ -68,44 +70,119 @@ func newWebSocketRequestData(conf *rhimport.Config) *webSocketRequestData {
rd.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel
rd.UseMetaData = conf.ImportParamDefaults.UseMetaData
rd.SourceUri = ""
-
+ rd.Timeout = 0
return rd
}
-type webSocketErrorResponseData struct {
- ResponseCode int `json:"RESPONSE_CODE"`
- ErrorString string `json:"ERROR_STRING"`
+type webSocketResponseData struct {
+ ResponseCode int `json:"RESPONSE_CODE"`
+ Type string `json:"TYPE"`
+ ErrorString string `json:"ERROR_STRING"`
+ Id string `json:"ID"`
+ ProgressStep int `json:"PROGRESS_STEP"`
+ ProgressStepName string `json:"PROGRESS_STEP_NAME"`
+ Progress float64 `json:"PROGRESS"`
}
-func sendWebSocketErrorResponse(ws *websocket.Conn, code int, err_str string) {
- if err := ws.WriteJSON(webSocketErrorResponseData{code, err_str}); err != nil {
+func sendWebSocketResponse(ws *websocket.Conn, rd *webSocketResponseData) {
+ if err := ws.WriteJSON(*rd); err != nil {
rhdl.Println("WebScoket Client", ws.RemoteAddr(), "write error:", err)
}
}
-func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn) {
- defer ws.Close()
+func sendWebSocketErrorResponse(ws *websocket.Conn, code int, err_str string) {
+ sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ERROR", ErrorString: err_str})
+}
+
+type webSocketSession struct {
+ id string
+ session *rhimport.SessionChan
+ respchan chan webSocketResponseData
+}
+func newWebSocketSession() *webSocketSession {
+ session := &webSocketSession{}
+ session.respchan = make(chan webSocketResponseData, 10)
+ return session
+}
+
+func webSocketProgress(step int, step_name string, progress float64, userdata interface{}) bool {
+ out := userdata.(chan<- webSocketResponseData)
+ if math.IsNaN(progress) {
+ progress = 0.0
+ }
select {
- case reqdata := <-reqchan:
- switch reqdata.Command {
- case "new":
- sendWebSocketErrorResponse(ws, http.StatusNotImplemented, "new session - not yet implemented")
- return
- case "reconnect":
- if reqdata.UserName == "" {
- sendWebSocketErrorResponse(ws, http.StatusBadRequest, "missing mandotary field LOGIN_NAME")
+ case out <- webSocketResponseData{http.StatusOK, "PROGRESS", "", "", step, step_name, progress * 100}:
+ default:
+ }
+ return true
+}
+
+func webSocketDone(res rhimport.ImportResult, userdata interface{}) bool {
+ respchan := userdata.(chan<- webSocketResponseData)
+ respchan <- webSocketResponseData{res.ResponseCode, "DONE", res.ErrorString, "", 0, "", 100.0}
+ return true
+}
+
+func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan) (int, string) {
+ ctx := rhimport.NewImportContext(conf, rddb, reqdata.UserName)
+ ctx.Password = reqdata.Password
+ ctx.Trusted = true // set this to false as soon as the interface is working
+ ctx.ShowId = reqdata.ShowId
+ ctx.ClearShowCarts = reqdata.ClearShowCarts
+ ctx.GroupName = reqdata.MusicPoolGroup
+ ctx.Cart = reqdata.Cart
+ ctx.ClearCart = reqdata.ClearCart
+ ctx.Cut = reqdata.Cut
+ ctx.Channels = reqdata.Channels
+ ctx.NormalizationLevel = reqdata.NormalizationLevel
+ ctx.AutotrimLevel = reqdata.AutotrimLevel
+ ctx.UseMetaData = reqdata.UseMetaData
+ ctx.SourceUri = reqdata.SourceUri
+
+ id, s, code, errstring := sessions.New(ctx)
+ if code != http.StatusOK {
+ return code, errstring
+ }
+ self.id = id
+ self.session = s
+ if err := s.AddDoneHandler((chan<- webSocketResponseData)(self.respchan), webSocketDone); err != nil {
+ return http.StatusInternalServerError, err.Error()
+ }
+ if err := s.AddProgressHandler((chan<- webSocketResponseData)(self.respchan), webSocketProgress); err != nil {
+ return http.StatusInternalServerError, err.Error()
+ }
+ s.Run(time.Duration(reqdata.Timeout) * time.Second)
+ return http.StatusOK, "SUCCESS"
+}
+
+func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn, conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan) {
+ defer ws.Close()
+
+ session := newWebSocketSession()
+ for {
+ select {
+ case reqdata, ok := <-reqchan:
+ if !ok {
return
}
- if reqdata.Id == "" {
- sendWebSocketErrorResponse(ws, http.StatusBadRequest, "missing mandotary field ID")
+ switch reqdata.Command {
+ case "new":
+ code, errstring := session.startNewSession(&reqdata, conf, rddb, sessions)
+ if code != http.StatusOK {
+ sendWebSocketErrorResponse(ws, code, errstring)
+ } else {
+ sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id})
+ }
+ case "reconnect":
+ sendWebSocketErrorResponse(ws, http.StatusNotImplemented, "reconnect session - not yet implemented")
+ return
+ default:
+ sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command))
return
}
- sendWebSocketErrorResponse(ws, http.StatusNotImplemented, "reconnect session - not yet implemented")
- return
- default:
- sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command))
- return
+ case respdata := <-session.respchan:
+ sendWebSocketResponse(ws, &respdata)
}
}
}
@@ -123,7 +200,8 @@ func webSocketHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *
}
rhdl.Println("WebSocket Client", ws.RemoteAddr(), "connected")
reqchan := make(chan webSocketRequestData)
- go webSocketSessionHandler(reqchan, ws)
+ go webSocketSessionHandler(reqchan, ws, conf, rddb, sessions)
+ defer close(reqchan)
for {
reqdata := newWebSocketRequestData(conf)
@@ -131,10 +209,7 @@ func webSocketHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *
rhdl.Println("WebSocket Client", ws.RemoteAddr(), "disconnected:", err)
return
} else {
- rhdl.Printf("Websocket Client %s got: %+v", ws.RemoteAddr(), reqdata)
- if trusted {
- reqdata.UserName = r.Header.Get("X-Forwarded-User")
- }
+ // rhdl.Printf("Websocket Client %s got: %+v", ws.RemoteAddr(), reqdata)
reqchan <- *reqdata
}
}