summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2015-12-29 19:13:44 (GMT)
committerChristian Pointner <equinox@helsinki.at>2015-12-29 19:13:44 (GMT)
commit6467923f8caaa02829fc53666b32df594da3ea68 (patch)
treebb3cdcc4f93dfee929b2c991e8bcbc1bfc1ed6e9
parent4911fe3fd94019fcca730710e0b411e8e5a66847 (diff)
websocket handler new/run works now (needs testing!)
-rw-r--r--src/helsinki.at/rhimportd/ctrlWeb.go1
-rw-r--r--src/helsinki.at/rhimportd/ctrlWebSocket.go133
-rw-r--r--test/socket.html25
3 files changed, 122 insertions, 37 deletions
diff --git a/src/helsinki.at/rhimportd/ctrlWeb.go b/src/helsinki.at/rhimportd/ctrlWeb.go
index 2a95676..794f8aa 100644
--- a/src/helsinki.at/rhimportd/ctrlWeb.go
+++ b/src/helsinki.at/rhimportd/ctrlWeb.go
@@ -48,7 +48,6 @@ func StartControlWeb(addr_s string, conf *rhimport.Config, rddb *rhimport.RdDbCh
http.Handle("/trusted/simple", webHandler{conf, rddb, sessions, true, webSimpleHandler})
http.Handle("/public/socket", webHandler{conf, rddb, sessions, false, webSocketHandler})
- http.Handle("/trusted/socket", webHandler{conf, rddb, sessions, true, webSocketHandler})
rhl.Println("web-ctrl: listening on", addr_s)
server := &http.Server{Addr: addr_s, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second}
diff --git a/src/helsinki.at/rhimportd/ctrlWebSocket.go b/src/helsinki.at/rhimportd/ctrlWebSocket.go
index b3f10f6..d14dd90 100644
--- a/src/helsinki.at/rhimportd/ctrlWebSocket.go
+++ b/src/helsinki.at/rhimportd/ctrlWebSocket.go
@@ -25,12 +25,13 @@
package main
import (
- // "encoding/json"
"fmt"
"github.com/gorilla/websocket"
"helsinki.at/rhimport"
"html"
+ "math"
"net/http"
+ "time"
)
type webSocketRequestData struct {
@@ -49,6 +50,7 @@ type webSocketRequestData struct {
AutotrimLevel int `json:"AUTOTRIM_LEVEL"`
UseMetaData bool `json:"USE_METADATA"`
SourceUri string `json:"SOURCE_URI"`
+ Timeout uint `json:"TIMEOUT"`
}
func newWebSocketRequestData(conf *rhimport.Config) *webSocketRequestData {
@@ -68,44 +70,119 @@ func newWebSocketRequestData(conf *rhimport.Config) *webSocketRequestData {
rd.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel
rd.UseMetaData = conf.ImportParamDefaults.UseMetaData
rd.SourceUri = ""
-
+ rd.Timeout = 0
return rd
}
-type webSocketErrorResponseData struct {
- ResponseCode int `json:"RESPONSE_CODE"`
- ErrorString string `json:"ERROR_STRING"`
+type webSocketResponseData struct {
+ ResponseCode int `json:"RESPONSE_CODE"`
+ Type string `json:"TYPE"`
+ ErrorString string `json:"ERROR_STRING"`
+ Id string `json:"ID"`
+ ProgressStep int `json:"PROGRESS_STEP"`
+ ProgressStepName string `json:"PROGRESS_STEP_NAME"`
+ Progress float64 `json:"PROGRESS"`
}
-func sendWebSocketErrorResponse(ws *websocket.Conn, code int, err_str string) {
- if err := ws.WriteJSON(webSocketErrorResponseData{code, err_str}); err != nil {
+func sendWebSocketResponse(ws *websocket.Conn, rd *webSocketResponseData) {
+ if err := ws.WriteJSON(*rd); err != nil {
rhdl.Println("WebScoket Client", ws.RemoteAddr(), "write error:", err)
}
}
-func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn) {
- defer ws.Close()
+func sendWebSocketErrorResponse(ws *websocket.Conn, code int, err_str string) {
+ sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ERROR", ErrorString: err_str})
+}
+
+type webSocketSession struct {
+ id string
+ session *rhimport.SessionChan
+ respchan chan webSocketResponseData
+}
+func newWebSocketSession() *webSocketSession {
+ session := &webSocketSession{}
+ session.respchan = make(chan webSocketResponseData, 10)
+ return session
+}
+
+func webSocketProgress(step int, step_name string, progress float64, userdata interface{}) bool {
+ out := userdata.(chan<- webSocketResponseData)
+ if math.IsNaN(progress) {
+ progress = 0.0
+ }
select {
- case reqdata := <-reqchan:
- switch reqdata.Command {
- case "new":
- sendWebSocketErrorResponse(ws, http.StatusNotImplemented, "new session - not yet implemented")
- return
- case "reconnect":
- if reqdata.UserName == "" {
- sendWebSocketErrorResponse(ws, http.StatusBadRequest, "missing mandotary field LOGIN_NAME")
+ case out <- webSocketResponseData{http.StatusOK, "PROGRESS", "", "", step, step_name, progress * 100}:
+ default:
+ }
+ return true
+}
+
+func webSocketDone(res rhimport.ImportResult, userdata interface{}) bool {
+ respchan := userdata.(chan<- webSocketResponseData)
+ respchan <- webSocketResponseData{res.ResponseCode, "DONE", res.ErrorString, "", 0, "", 100.0}
+ return true
+}
+
+func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan) (int, string) {
+ ctx := rhimport.NewImportContext(conf, rddb, reqdata.UserName)
+ ctx.Password = reqdata.Password
+ ctx.Trusted = true // set this to false as soon as the interface is working
+ ctx.ShowId = reqdata.ShowId
+ ctx.ClearShowCarts = reqdata.ClearShowCarts
+ ctx.GroupName = reqdata.MusicPoolGroup
+ ctx.Cart = reqdata.Cart
+ ctx.ClearCart = reqdata.ClearCart
+ ctx.Cut = reqdata.Cut
+ ctx.Channels = reqdata.Channels
+ ctx.NormalizationLevel = reqdata.NormalizationLevel
+ ctx.AutotrimLevel = reqdata.AutotrimLevel
+ ctx.UseMetaData = reqdata.UseMetaData
+ ctx.SourceUri = reqdata.SourceUri
+
+ id, s, code, errstring := sessions.New(ctx)
+ if code != http.StatusOK {
+ return code, errstring
+ }
+ self.id = id
+ self.session = s
+ if err := s.AddDoneHandler((chan<- webSocketResponseData)(self.respchan), webSocketDone); err != nil {
+ return http.StatusInternalServerError, err.Error()
+ }
+ if err := s.AddProgressHandler((chan<- webSocketResponseData)(self.respchan), webSocketProgress); err != nil {
+ return http.StatusInternalServerError, err.Error()
+ }
+ s.Run(time.Duration(reqdata.Timeout) * time.Second)
+ return http.StatusOK, "SUCCESS"
+}
+
+func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn, conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan) {
+ defer ws.Close()
+
+ session := newWebSocketSession()
+ for {
+ select {
+ case reqdata, ok := <-reqchan:
+ if !ok {
return
}
- if reqdata.Id == "" {
- sendWebSocketErrorResponse(ws, http.StatusBadRequest, "missing mandotary field ID")
+ switch reqdata.Command {
+ case "new":
+ code, errstring := session.startNewSession(&reqdata, conf, rddb, sessions)
+ if code != http.StatusOK {
+ sendWebSocketErrorResponse(ws, code, errstring)
+ } else {
+ sendWebSocketResponse(ws, &webSocketResponseData{ResponseCode: code, Type: "ACK", Id: session.id})
+ }
+ case "reconnect":
+ sendWebSocketErrorResponse(ws, http.StatusNotImplemented, "reconnect session - not yet implemented")
+ return
+ default:
+ sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command))
return
}
- sendWebSocketErrorResponse(ws, http.StatusNotImplemented, "reconnect session - not yet implemented")
- return
- default:
- sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command))
- return
+ case respdata := <-session.respchan:
+ sendWebSocketResponse(ws, &respdata)
}
}
}
@@ -123,7 +200,8 @@ func webSocketHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *
}
rhdl.Println("WebSocket Client", ws.RemoteAddr(), "connected")
reqchan := make(chan webSocketRequestData)
- go webSocketSessionHandler(reqchan, ws)
+ go webSocketSessionHandler(reqchan, ws, conf, rddb, sessions)
+ defer close(reqchan)
for {
reqdata := newWebSocketRequestData(conf)
@@ -131,10 +209,7 @@ func webSocketHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *
rhdl.Println("WebSocket Client", ws.RemoteAddr(), "disconnected:", err)
return
} else {
- rhdl.Printf("Websocket Client %s got: %+v", ws.RemoteAddr(), reqdata)
- if trusted {
- reqdata.UserName = r.Header.Get("X-Forwarded-User")
- }
+ // rhdl.Printf("Websocket Client %s got: %+v", ws.RemoteAddr(), reqdata)
reqchan <- *reqdata
}
}
diff --git a/test/socket.html b/test/socket.html
index 9ee91b6..06fce06 100644
--- a/test/socket.html
+++ b/test/socket.html
@@ -1,6 +1,8 @@
+<!DOCTYPE HTML>
<html>
<head>
<title>rhimportd - Testclient</title>
+ <meta charset="utf-8">
<style type="text/css">
body {
background-color: #555;
@@ -15,20 +17,29 @@
</style>
<script src="jquery.min.js"></script>
<script type="text/javascript">
- function Test() {
- this.sock = new WebSocket("ws://localhost:4080/trusted/socket");
+ function Session(req) {
+ this.req = req
+ this.sock = new WebSocket("ws://localhost:4080/public/socket");
this.sock.onmessage = function (event) {
$('#rawmsg').text(event.data);
}
- this.update = function() {
- this.sock.send(JSON.stringify({ COMMAND: "new" }));
+ this.sock_onopen = function() {
+ this.req.COMMAND = "new";
+ this.req.TIMEOUT = 200;
+ this.sock.send(JSON.stringify(this.req));
}
+ this.sock.onopen = this.sock_onopen.bind(this);
}
+ var s;
+
function init() {
- test = new Test();
- setInterval(test.update.bind(test), 1000);
- // test.update();
+ req = { LOGIN_NAME: "heslinki",
+ PASSWORD: "12423",
+ SHOW_ID: 10002,
+ CLEAR_SHOW_CARTS: true,
+ SOURCE_URI: "fake://100" };
+ s = new Session(req);
}
</script>
</head>