summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rw-r--r--src/rhctl/switch_control.go10
-rw-r--r--src/rhctl/telnet.go88
3 files changed, 61 insertions, 39 deletions
diff --git a/Makefile b/Makefile
index c394c02..d424e5d 100644
--- a/Makefile
+++ b/Makefile
@@ -26,7 +26,7 @@ EXECUTEABLE := rhctl
LIBS := "github.com/schleibinger/sio" \
"github.com/naoina/toml" \
- "github.com/olebedev/emitter" \
+ "github.com/btittelbach/pubsub" \
"github.com/spreadspace/telgo"
.PHONY: getlibs updatelibs vet format build clean distclean
diff --git a/src/rhctl/switch_control.go b/src/rhctl/switch_control.go
index 82faec8..df3dce9 100644
--- a/src/rhctl/switch_control.go
+++ b/src/rhctl/switch_control.go
@@ -22,7 +22,7 @@
package main
import (
- "github.com/olebedev/emitter"
+ "github.com/btittelbach/pubsub"
)
type CommandType uint8
@@ -53,7 +53,7 @@ type Command struct {
type SwitchControl struct {
sw *AudioSwitch
servers []*PlayoutServer
- Updates *emitter.Emitter
+ Updates *pubsub.PubSub
Commands chan *Command
}
@@ -88,10 +88,10 @@ func (ctrl *SwitchControl) Run() {
for _, srv := range ctrl.servers {
srv.SwitchUpdates <- update
}
- ctrl.Updates.Emit("switch:"+update.Type.String(), update)
+ ctrl.Updates.Pub(update, "switch:"+update.Type.String())
case status := <-serverUpdates:
rhdl.Printf("got server status update: %+v", status)
- ctrl.Updates.Emit("server:status", status)
+ ctrl.Updates.Pub(status, "server:status")
// TODO: recalculate overall status and send out commands to switch
case cmd := <-ctrl.Commands:
ctrl.handleCommand(cmd)
@@ -103,7 +103,7 @@ func SwitchControlInit(conf *Config, sw *AudioSwitch, servers []*PlayoutServer)
ctrl = &SwitchControl{}
ctrl.sw = sw
ctrl.servers = servers
- ctrl.Updates = emitter.New(32)
+ ctrl.Updates = pubsub.NewNonBlocking(32)
ctrl.Commands = make(chan *Command, 8)
return
}
diff --git a/src/rhctl/telnet.go b/src/rhctl/telnet.go
index a07a820..4189fc8 100644
--- a/src/rhctl/telnet.go
+++ b/src/rhctl/telnet.go
@@ -22,7 +22,6 @@
package main
import (
- "github.com/olebedev/emitter"
"github.com/spreadspace/telgo"
)
@@ -36,44 +35,69 @@ func telnetStatus(c *telgo.Client, args []string, ctrl *SwitchControl) bool {
return false
}
-func telnetListener(c *telgo.Client, ch <-chan emitter.Event) {
- rhdl.Println("started telnetListener goroutine")
+func telnetUpdateListener(c *telgo.Client, ctrl *SwitchControl) {
+ ch := c.UserData.(chan interface{})
for {
- event, ok := <-ch
+ data, ok := <-ch
if !ok {
return
}
- c.Sayln("got event: %+v", event) // we need a way to find out that the client has stopped working
+ switch data.(type) {
+ case SwitchUpdate:
+ update := data.(SwitchUpdate)
+ if !c.Sayln("audio-switch status(%v): %s", update.Type, update.Data) {
+ ctrl.Updates.Unsub(ch)
+ return
+ }
+ case ServerStatus:
+ status := data.(ServerStatus)
+ if !c.Sayln("playout-server(%s): health=%s, channel=%s", status.Name, status.Health, status.Channel) {
+ ctrl.Updates.Unsub(ch)
+ return
+ }
+ default:
+ if !c.Sayln("unknown update of type: %T", data) {
+ ctrl.Updates.Unsub(ch)
+ return
+ }
+ }
}
- rhdl.Println("stopped telnetListener goroutine")
}
func telnetListen(c *telgo.Client, args []string, ctrl *SwitchControl) bool {
- var ch <-chan emitter.Event
if len(args) <= 1 {
- ch = ctrl.Updates.On("*")
+ c.Sayln("missing argument: <type>")
+ return false
+ }
+
+ var ch chan interface{}
+ if c.UserData == nil {
+ ch = ctrl.Updates.Sub()
+ c.UserData = ch
} else {
- switch args[1] {
- case "status":
- ch = ctrl.Updates.On("status")
- case "server":
- ch = ctrl.Updates.On("server:status")
- case "audio":
- fallthrough
- case "gpi":
- fallthrough
- case "oc":
- fallthrough
- case "relay":
- fallthrough
- case "silence":
- ch = ctrl.Updates.On("switch:" + args[1])
- default:
- c.Sayln("unknown message type")
- return false
- }
+ ch = c.UserData.(chan interface{})
+ }
+
+ switch args[1] {
+ case "status":
+ ctrl.Updates.AddSub(ch, "status")
+ case "server":
+ ctrl.Updates.AddSub(ch, "server:status")
+ case "audio":
+ fallthrough
+ case "gpi":
+ fallthrough
+ case "oc":
+ fallthrough
+ case "relay":
+ fallthrough
+ case "silence":
+ ctrl.Updates.AddSub(ch, "switch:"+args[1])
+ default:
+ c.Sayln("unknown message type")
+ return false
}
- go telnetListener(c, ch)
+ go telnetUpdateListener(c, ctrl)
return false
}
@@ -106,7 +130,7 @@ func telnetHelp(c *telgo.Client, args []string) bool {
c.Sayln(" show the status of the switch and servers")
return false
case "listen":
- c.Sayln("usage: listen [ <type> ]")
+ c.Sayln("usage: listen <type>")
c.Sayln(" subscribe to messages of type <type>. The following types are allowed:")
c.Sayln(" - status overall status changes")
c.Sayln(" - server status/health of the playout server")
@@ -115,8 +139,6 @@ func telnetHelp(c *telgo.Client, args []string) bool {
c.Sayln(" - oc open-collector status messages")
c.Sayln(" - relay relay status messages")
c.Sayln(" - silence status of the silence detector")
- c.Sayln("")
- c.Sayln(" if you omit the type this client you will be subscribed to all types.")
return false
case "server":
c.Sayln("usage: server <name>")
@@ -135,7 +157,7 @@ func telnetHelp(c *telgo.Client, args []string) bool {
c.Sayln(" quit close connection (or use Ctrl-D)")
c.Sayln(" help [ <cmd> ] print this, or help for specific command")
c.Sayln(" status show status of switch and all servers")
- c.Sayln(" listen [ <type> ] add listener for messages of type <type>")
+ c.Sayln(" listen <type> add listener for messages of type <type>")
c.Sayln(" server <name> switch to server <name>")
c.Sayln(" switch <cmd> [ [ <arg1> ] ... ] send command to switch")
}
@@ -164,7 +186,7 @@ func TelnetInit(conf *Config, ctrl *SwitchControl) (telnet *TelnetInterface) {
cmdlist["help"] = telnetHelp
cmdlist["quit"] = telnetQuit
- telnet.server = telgo.NewServer(conf.Clients.Telnet.Address, "rhctl> ", cmdlist, ctrl)
+ telnet.server = telgo.NewServer(conf.Clients.Telnet.Address, "rhctl> ", cmdlist, nil)
return
}