diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/rhimportd/ctrlWebSocket.go | 51 |
1 files changed, 46 insertions, 5 deletions
diff --git a/src/rhimportd/ctrlWebSocket.go b/src/rhimportd/ctrlWebSocket.go index b55e634..f3e4438 100644 --- a/src/rhimportd/ctrlWebSocket.go +++ b/src/rhimportd/ctrlWebSocket.go @@ -89,7 +89,8 @@ type webSocketResponseBaseData struct { type webSocketResponseListData struct { webSocketResponseBaseData - Sessions map[string]string `json:"SESSIONS"` + Added map[string]string `json:"SESSIONS_ADDED"` + Removed map[string]string `json:"SESSIONS_REMOVED"` } type webSocketResponseProgressData struct { @@ -134,12 +135,21 @@ func sendWebSocketAckResponse(ws *websocket.Conn, code int, id, refid string) { sendWebSocketResponse(ws, rd) } -func sendWebSocketListResponse(ws *websocket.Conn, sessions map[string]string) { +func sendWebSocketListResponse(ws *websocket.Conn, added map[string]string, removed map[string]string) { rd := &webSocketResponseListData{} rd.ResponseCode = http.StatusOK rd.Type = "list" rd.ErrorString = "OK" - rd.Sessions = sessions + if added == nil { + rd.Added = make(map[string]string) + } else { + rd.Added = added + } + if removed == nil { + rd.Removed = make(map[string]string) + } else { + rd.Removed = removed + } sendWebSocketResponse(ws, rd) } @@ -267,9 +277,38 @@ func (self *webSocketSession) reconnectSession(reqdata *webSocketRequestData, se return http.StatusOK, "SUCCESS" } +type webSocketSessionListUpdate struct { + added map[string]string + removed map[string]string +} + +type webSocketSessionListUpdateCBArg struct { + closed <-chan bool + updates chan<- webSocketSessionListUpdate +} + +func webSocketListUpdate(added, removed map[string]string, userdata interface{}) bool { + arg := userdata.(webSocketSessionListUpdateCBArg) + if len(arg.closed) > 0 { + return false + } + select { + case arg.updates <- webSocketSessionListUpdate{added, removed}: + default: + } + return true +} + func webSocketSessionHandler(reqchan <-chan webSocketRequestData, binchan <-chan []byte, ws *websocket.Conn, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) { defer ws.Close() + closed := make(chan bool, 1) + defer func() { + closed <- true + }() + listUpdates := make(chan webSocketSessionListUpdate, 5) + listUpdateCBArg := webSocketSessionListUpdateCBArg{closed, listUpdates} + session := newWebSocketSession() for { select { @@ -307,15 +346,17 @@ func webSocketSessionHandler(reqchan <-chan webSocketRequestData, binchan <-chan } } case "list": - list, code, errstring := sessions.List(reqdata.UserName, reqdata.Password, false) + sessions, code, errstring := sessions.List(reqdata.UserName, reqdata.Password, false, listUpdateCBArg, webSocketListUpdate) if code != http.StatusOK { sendWebSocketErrorResponse(ws, code, errstring) } else { - sendWebSocketListResponse(ws, list) + sendWebSocketListResponse(ws, sessions, nil) } default: sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command)) } + case u := <-listUpdates: + sendWebSocketListResponse(ws, u.added, u.removed) case p := <-session.progresschan: sendWebSocketProgressResponse(ws, session.id, session.refId, p.Step, p.StepName, p.Current, p.Total, p.Title, p.Cart, p.Cut) case d := <-session.donechan: |