summaryrefslogtreecommitdiff
path: root/src/rhimportd/ctrlWebSocket.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/rhimportd/ctrlWebSocket.go')
-rw-r--r--src/rhimportd/ctrlWebSocket.go17
1 files changed, 11 insertions, 6 deletions
diff --git a/src/rhimportd/ctrlWebSocket.go b/src/rhimportd/ctrlWebSocket.go
index d71f567..6095250 100644
--- a/src/rhimportd/ctrlWebSocket.go
+++ b/src/rhimportd/ctrlWebSocket.go
@@ -196,7 +196,7 @@ func webSocketDone(res rhimport.Result, userdata interface{}) bool {
return true
}
-func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) (int, string) {
+func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, binchan <-chan []byte, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) (int, string) {
ctx := rhimport.NewContext(conf, nil)
ctx.UserName = reqdata.UserName
ctx.Password = reqdata.Password
@@ -212,6 +212,7 @@ func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, con
ctx.AutotrimLevel = reqdata.AutotrimLevel
ctx.UseMetaData = reqdata.UseMetaData
ctx.SourceUri = reqdata.SourceUri
+ ctx.AttachmentChan = binchan
id, s, code, errstring := sessions.New(ctx, reqdata.RefId)
if code != http.StatusOK {
@@ -248,7 +249,7 @@ func (self *webSocketSession) reconnectSession(reqdata *webSocketRequestData, se
return http.StatusOK, "SUCCESS"
}
-func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) {
+func webSocketSessionHandler(reqchan <-chan webSocketRequestData, binchan <-chan []byte, ws *websocket.Conn, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) {
defer ws.Close()
session := newWebSocketSession()
@@ -263,7 +264,7 @@ func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.
if session.id != "" {
sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session")
} else {
- code, errstring := session.startNewSession(&reqdata, conf, sessions)
+ code, errstring := session.startNewSession(&reqdata, binchan, conf, sessions)
if code != http.StatusOK {
sendWebSocketErrorResponse(ws, code, errstring)
} else {
@@ -319,8 +320,10 @@ func webSocketHandler(conf *rhimport.Config, db *rddb.DBChan, sessions *rhimport
}
rhdl.Println("WebSocket Client", ws.RemoteAddr(), "connected")
reqchan := make(chan webSocketRequestData)
- go webSocketSessionHandler(reqchan, ws, conf, sessions)
+ binchan := make(chan []byte)
+ go webSocketSessionHandler(reqchan, binchan, ws, conf, sessions)
defer close(reqchan)
+ defer close(binchan)
for {
t, r, err := ws.NextReader()
@@ -345,12 +348,14 @@ func webSocketHandler(conf *rhimport.Config, db *rddb.DBChan, sessions *rhimport
reqchan <- *reqdata
}
case websocket.BinaryMessage:
- len, err := io.Copy(ioutil.Discard, r)
+ data, err := ioutil.ReadAll(r)
if err != nil {
rhdl.Println("WebSocket Client", ws.RemoteAddr(), "disconnected:", err)
+ // sendWebSocketErrorResponse(ws, http.StatusBadRequest, err.Error())
return
}
- rhdl.Printf("WebSocket Client %s: got binary message (%d bytes)", ws.RemoteAddr(), len)
+ // rhdl.Printf("WebSocket Client %s: got binary message (%d bytes)", ws.RemoteAddr(), len(data))
+ binchan <- data
}
}
}