From 1d0d0aea92c936bca8de0653b4c92a9bfd73f05c Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Wed, 30 Mar 2016 05:36:11 +0200 Subject: switched to different pubsub package diff --git a/Makefile b/Makefile index c394c02..d424e5d 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ EXECUTEABLE := rhctl LIBS := "github.com/schleibinger/sio" \ "github.com/naoina/toml" \ - "github.com/olebedev/emitter" \ + "github.com/btittelbach/pubsub" \ "github.com/spreadspace/telgo" .PHONY: getlibs updatelibs vet format build clean distclean diff --git a/src/rhctl/switch_control.go b/src/rhctl/switch_control.go index 82faec8..df3dce9 100644 --- a/src/rhctl/switch_control.go +++ b/src/rhctl/switch_control.go @@ -22,7 +22,7 @@ package main import ( - "github.com/olebedev/emitter" + "github.com/btittelbach/pubsub" ) type CommandType uint8 @@ -53,7 +53,7 @@ type Command struct { type SwitchControl struct { sw *AudioSwitch servers []*PlayoutServer - Updates *emitter.Emitter + Updates *pubsub.PubSub Commands chan *Command } @@ -88,10 +88,10 @@ func (ctrl *SwitchControl) Run() { for _, srv := range ctrl.servers { srv.SwitchUpdates <- update } - ctrl.Updates.Emit("switch:"+update.Type.String(), update) + ctrl.Updates.Pub(update, "switch:"+update.Type.String()) case status := <-serverUpdates: rhdl.Printf("got server status update: %+v", status) - ctrl.Updates.Emit("server:status", status) + ctrl.Updates.Pub(status, "server:status") // TODO: recalculate overall status and send out commands to switch case cmd := <-ctrl.Commands: ctrl.handleCommand(cmd) @@ -103,7 +103,7 @@ func SwitchControlInit(conf *Config, sw *AudioSwitch, servers []*PlayoutServer) ctrl = &SwitchControl{} ctrl.sw = sw ctrl.servers = servers - ctrl.Updates = emitter.New(32) + ctrl.Updates = pubsub.NewNonBlocking(32) ctrl.Commands = make(chan *Command, 8) return } diff --git a/src/rhctl/telnet.go b/src/rhctl/telnet.go index a07a820..4189fc8 100644 --- a/src/rhctl/telnet.go +++ b/src/rhctl/telnet.go @@ -22,7 +22,6 @@ package main import ( - "github.com/olebedev/emitter" "github.com/spreadspace/telgo" ) @@ -36,44 +35,69 @@ func telnetStatus(c *telgo.Client, args []string, ctrl *SwitchControl) bool { return false } -func telnetListener(c *telgo.Client, ch <-chan emitter.Event) { - rhdl.Println("started telnetListener goroutine") +func telnetUpdateListener(c *telgo.Client, ctrl *SwitchControl) { + ch := c.UserData.(chan interface{}) for { - event, ok := <-ch + data, ok := <-ch if !ok { return } - c.Sayln("got event: %+v", event) // we need a way to find out that the client has stopped working + switch data.(type) { + case SwitchUpdate: + update := data.(SwitchUpdate) + if !c.Sayln("audio-switch status(%v): %s", update.Type, update.Data) { + ctrl.Updates.Unsub(ch) + return + } + case ServerStatus: + status := data.(ServerStatus) + if !c.Sayln("playout-server(%s): health=%s, channel=%s", status.Name, status.Health, status.Channel) { + ctrl.Updates.Unsub(ch) + return + } + default: + if !c.Sayln("unknown update of type: %T", data) { + ctrl.Updates.Unsub(ch) + return + } + } } - rhdl.Println("stopped telnetListener goroutine") } func telnetListen(c *telgo.Client, args []string, ctrl *SwitchControl) bool { - var ch <-chan emitter.Event if len(args) <= 1 { - ch = ctrl.Updates.On("*") + c.Sayln("missing argument: ") + return false + } + + var ch chan interface{} + if c.UserData == nil { + ch = ctrl.Updates.Sub() + c.UserData = ch } else { - switch args[1] { - case "status": - ch = ctrl.Updates.On("status") - case "server": - ch = ctrl.Updates.On("server:status") - case "audio": - fallthrough - case "gpi": - fallthrough - case "oc": - fallthrough - case "relay": - fallthrough - case "silence": - ch = ctrl.Updates.On("switch:" + args[1]) - default: - c.Sayln("unknown message type") - return false - } + ch = c.UserData.(chan interface{}) + } + + switch args[1] { + case "status": + ctrl.Updates.AddSub(ch, "status") + case "server": + ctrl.Updates.AddSub(ch, "server:status") + case "audio": + fallthrough + case "gpi": + fallthrough + case "oc": + fallthrough + case "relay": + fallthrough + case "silence": + ctrl.Updates.AddSub(ch, "switch:"+args[1]) + default: + c.Sayln("unknown message type") + return false } - go telnetListener(c, ch) + go telnetUpdateListener(c, ctrl) return false } @@ -106,7 +130,7 @@ func telnetHelp(c *telgo.Client, args []string) bool { c.Sayln(" show the status of the switch and servers") return false case "listen": - c.Sayln("usage: listen [ ]") + c.Sayln("usage: listen ") c.Sayln(" subscribe to messages of type . The following types are allowed:") c.Sayln(" - status overall status changes") c.Sayln(" - server status/health of the playout server") @@ -115,8 +139,6 @@ func telnetHelp(c *telgo.Client, args []string) bool { c.Sayln(" - oc open-collector status messages") c.Sayln(" - relay relay status messages") c.Sayln(" - silence status of the silence detector") - c.Sayln("") - c.Sayln(" if you omit the type this client you will be subscribed to all types.") return false case "server": c.Sayln("usage: server ") @@ -135,7 +157,7 @@ func telnetHelp(c *telgo.Client, args []string) bool { c.Sayln(" quit close connection (or use Ctrl-D)") c.Sayln(" help [ ] print this, or help for specific command") c.Sayln(" status show status of switch and all servers") - c.Sayln(" listen [ ] add listener for messages of type ") + c.Sayln(" listen add listener for messages of type ") c.Sayln(" server switch to server ") c.Sayln(" switch [ [ ] ... ] send command to switch") } @@ -164,7 +186,7 @@ func TelnetInit(conf *Config, ctrl *SwitchControl) (telnet *TelnetInterface) { cmdlist["help"] = telnetHelp cmdlist["quit"] = telnetQuit - telnet.server = telgo.NewServer(conf.Clients.Telnet.Address, "rhctl> ", cmdlist, ctrl) + telnet.server = telgo.NewServer(conf.Clients.Telnet.Address, "rhctl> ", cmdlist, nil) return } -- cgit v0.10.2