From 1d0d0aea92c936bca8de0653b4c92a9bfd73f05c Mon Sep 17 00:00:00 2001
From: Christian Pointner <equinox@spreadspace.org>
Date: Wed, 30 Mar 2016 05:36:11 +0200
Subject: switched to different pubsub package


diff --git a/Makefile b/Makefile
index c394c02..d424e5d 100644
--- a/Makefile
+++ b/Makefile
@@ -26,7 +26,7 @@ EXECUTEABLE := rhctl
 
 LIBS := "github.com/schleibinger/sio" \
         "github.com/naoina/toml" \
-        "github.com/olebedev/emitter" \
+        "github.com/btittelbach/pubsub" \
         "github.com/spreadspace/telgo"
 
 .PHONY: getlibs updatelibs vet format build clean distclean
diff --git a/src/rhctl/switch_control.go b/src/rhctl/switch_control.go
index 82faec8..df3dce9 100644
--- a/src/rhctl/switch_control.go
+++ b/src/rhctl/switch_control.go
@@ -22,7 +22,7 @@
 package main
 
 import (
-	"github.com/olebedev/emitter"
+	"github.com/btittelbach/pubsub"
 )
 
 type CommandType uint8
@@ -53,7 +53,7 @@ type Command struct {
 type SwitchControl struct {
 	sw       *AudioSwitch
 	servers  []*PlayoutServer
-	Updates  *emitter.Emitter
+	Updates  *pubsub.PubSub
 	Commands chan *Command
 }
 
@@ -88,10 +88,10 @@ func (ctrl *SwitchControl) Run() {
 			for _, srv := range ctrl.servers {
 				srv.SwitchUpdates <- update
 			}
-			ctrl.Updates.Emit("switch:"+update.Type.String(), update)
+			ctrl.Updates.Pub(update, "switch:"+update.Type.String())
 		case status := <-serverUpdates:
 			rhdl.Printf("got server status update: %+v", status)
-			ctrl.Updates.Emit("server:status", status)
+			ctrl.Updates.Pub(status, "server:status")
 			// TODO: recalculate overall status and send out commands to switch
 		case cmd := <-ctrl.Commands:
 			ctrl.handleCommand(cmd)
@@ -103,7 +103,7 @@ func SwitchControlInit(conf *Config, sw *AudioSwitch, servers []*PlayoutServer)
 	ctrl = &SwitchControl{}
 	ctrl.sw = sw
 	ctrl.servers = servers
-	ctrl.Updates = emitter.New(32)
+	ctrl.Updates = pubsub.NewNonBlocking(32)
 	ctrl.Commands = make(chan *Command, 8)
 	return
 }
diff --git a/src/rhctl/telnet.go b/src/rhctl/telnet.go
index a07a820..4189fc8 100644
--- a/src/rhctl/telnet.go
+++ b/src/rhctl/telnet.go
@@ -22,7 +22,6 @@
 package main
 
 import (
-	"github.com/olebedev/emitter"
 	"github.com/spreadspace/telgo"
 )
 
@@ -36,44 +35,69 @@ func telnetStatus(c *telgo.Client, args []string, ctrl *SwitchControl) bool {
 	return false
 }
 
-func telnetListener(c *telgo.Client, ch <-chan emitter.Event) {
-	rhdl.Println("started telnetListener goroutine")
+func telnetUpdateListener(c *telgo.Client, ctrl *SwitchControl) {
+	ch := c.UserData.(chan interface{})
 	for {
-		event, ok := <-ch
+		data, ok := <-ch
 		if !ok {
 			return
 		}
-		c.Sayln("got event: %+v", event) // we need a way to find out that the client has stopped working
+		switch data.(type) {
+		case SwitchUpdate:
+			update := data.(SwitchUpdate)
+			if !c.Sayln("audio-switch status(%v): %s", update.Type, update.Data) {
+				ctrl.Updates.Unsub(ch)
+				return
+			}
+		case ServerStatus:
+			status := data.(ServerStatus)
+			if !c.Sayln("playout-server(%s): health=%s, channel=%s", status.Name, status.Health, status.Channel) {
+				ctrl.Updates.Unsub(ch)
+				return
+			}
+		default:
+			if !c.Sayln("unknown update of type: %T", data) {
+				ctrl.Updates.Unsub(ch)
+				return
+			}
+		}
 	}
-	rhdl.Println("stopped telnetListener goroutine")
 }
 
 func telnetListen(c *telgo.Client, args []string, ctrl *SwitchControl) bool {
-	var ch <-chan emitter.Event
 	if len(args) <= 1 {
-		ch = ctrl.Updates.On("*")
+		c.Sayln("missing argument: <type>")
+		return false
+	}
+
+	var ch chan interface{}
+	if c.UserData == nil {
+		ch = ctrl.Updates.Sub()
+		c.UserData = ch
 	} else {
-		switch args[1] {
-		case "status":
-			ch = ctrl.Updates.On("status")
-		case "server":
-			ch = ctrl.Updates.On("server:status")
-		case "audio":
-			fallthrough
-		case "gpi":
-			fallthrough
-		case "oc":
-			fallthrough
-		case "relay":
-			fallthrough
-		case "silence":
-			ch = ctrl.Updates.On("switch:" + args[1])
-		default:
-			c.Sayln("unknown message type")
-			return false
-		}
+		ch = c.UserData.(chan interface{})
+	}
+
+	switch args[1] {
+	case "status":
+		ctrl.Updates.AddSub(ch, "status")
+	case "server":
+		ctrl.Updates.AddSub(ch, "server:status")
+	case "audio":
+		fallthrough
+	case "gpi":
+		fallthrough
+	case "oc":
+		fallthrough
+	case "relay":
+		fallthrough
+	case "silence":
+		ctrl.Updates.AddSub(ch, "switch:"+args[1])
+	default:
+		c.Sayln("unknown message type")
+		return false
 	}
-	go telnetListener(c, ch)
+	go telnetUpdateListener(c, ctrl)
 	return false
 }
 
@@ -106,7 +130,7 @@ func telnetHelp(c *telgo.Client, args []string) bool {
 			c.Sayln("   show the status of the switch and servers")
 			return false
 		case "listen":
-			c.Sayln("usage: listen [ <type> ]")
+			c.Sayln("usage: listen <type>")
 			c.Sayln("   subscribe to messages of type <type>. The following types are allowed:")
 			c.Sayln("    - status    overall status changes")
 			c.Sayln("    - server    status/health of the playout server")
@@ -115,8 +139,6 @@ func telnetHelp(c *telgo.Client, args []string) bool {
 			c.Sayln("    - oc        open-collector status messages")
 			c.Sayln("    - relay     relay status messages")
 			c.Sayln("    - silence   status of the silence detector")
-			c.Sayln("")
-			c.Sayln("   if you omit the type this client you will be subscribed to all types.")
 			return false
 		case "server":
 			c.Sayln("usage: server <name>")
@@ -135,7 +157,7 @@ func telnetHelp(c *telgo.Client, args []string) bool {
 		c.Sayln("    quit                             close connection (or use Ctrl-D)")
 		c.Sayln("    help [ <cmd> ]                   print this, or help for specific command")
 		c.Sayln("    status                           show status of switch and all servers")
-		c.Sayln("    listen [ <type> ]                add listener for messages of type <type>")
+		c.Sayln("    listen <type>                    add listener for messages of type <type>")
 		c.Sayln("    server <name>                    switch to server <name>")
 		c.Sayln("    switch <cmd> [ [ <arg1> ] ... ]  send command to switch")
 	}
@@ -164,7 +186,7 @@ func TelnetInit(conf *Config, ctrl *SwitchControl) (telnet *TelnetInterface) {
 	cmdlist["help"] = telnetHelp
 	cmdlist["quit"] = telnetQuit
 
-	telnet.server = telgo.NewServer(conf.Clients.Telnet.Address, "rhctl> ", cmdlist, ctrl)
+	telnet.server = telgo.NewServer(conf.Clients.Telnet.Address, "rhctl> ", cmdlist, nil)
 
 	return
 }
-- 
cgit v0.10.2