From 0065196d2ea950052b0cde65d2f0334ddfddbb80 Mon Sep 17 00:00:00 2001
From: Christian Pointner <equinox@helsinki.at>
Date: Fri, 15 Jan 2016 03:07:10 +0100
Subject: player now constructs update messages


diff --git a/player/player.go b/player/player.go
index 3e20ffd..5fe631c 100644
--- a/player/player.go
+++ b/player/player.go
@@ -72,6 +72,28 @@ type stopRequest struct {
 	response chan<- stopResult
 }
 
+type addUpdateHandlerResult struct {
+	err error
+}
+
+type addUpdateHandlerRequest struct {
+	userdata interface{}
+	callback UpdateCB
+	response chan<- addUpdateHandlerResult
+}
+
+type MeterChannel struct {
+	peak  float64
+	decay float64
+}
+type Meter map[int]MeterChannel
+type updateData struct {
+	len   int64
+	pos   int64
+	meter Meter
+}
+type UpdateCB func(len int64, pos int64, meter Meter, userdata interface{}) bool
+
 type State int
 
 const (
@@ -81,18 +103,20 @@ const (
 )
 
 type Player struct {
-	pipe      *gst.Pipeline
-	src       *gst.Element
-	level     *gst.Element
-	bus       *gst.Bus
-	basepath  string
-	stdlog    *log.Logger
-	dbglog    *log.Logger
-	state     State
-	loadChan  chan loadRequest
-	playChan  chan playRequest
-	pauseChan chan pauseRequest
-	stopChan  chan stopRequest
+	pipe                 *gst.Pipeline
+	src                  *gst.Element
+	level                *gst.Element
+	bus                  *gst.Bus
+	basepath             string
+	stdlog               *log.Logger
+	dbglog               *log.Logger
+	state                State
+	loadChan             chan loadRequest
+	playChan             chan playRequest
+	pauseChan            chan pauseRequest
+	stopChan             chan stopRequest
+	updateChan           chan updateData
+	addUpdateHandlerChan chan addUpdateHandlerRequest
 }
 
 func (p *Player) onMessage(bus *gst.Bus, msg *gst.Message) {
@@ -100,7 +124,7 @@ func (p *Player) onMessage(bus *gst.Bus, msg *gst.Message) {
 	case gst.MESSAGE_EOS:
 		p.pipe.SetState(gst.STATE_NULL)
 		p.state = IDLE
-		p.stdlog.Printf("GStreamer Pipeline: EOS reached!")
+		p.updateChan <- updateData{}
 	case gst.MESSAGE_WARNING:
 		warn, _ := msg.ParseWarning()
 		p.stdlog.Printf("GStreamer Pipeline Warning: %s", warn)
@@ -112,17 +136,33 @@ func (p *Player) onMessage(bus *gst.Bus, msg *gst.Message) {
 	case gst.MESSAGE_ASYNC_DONE:
 		len, ok := p.pipe.QueryDuration(gst.FORMAT_TIME)
 		if ok {
-			p.stdlog.Printf("GStreamer Pipeline: loaded file has length: %d.%d s", len/1000000000, len%1000000000)
+			p.updateChan <- updateData{len: len}
 		} else {
 			p.stdlog.Printf("GStreamer Pipeline Error: unable to query duration of file")
 		}
 	case gst.MESSAGE_ELEMENT:
-		src := msg.GetSourceName()
 		s := msg.GetStructure()
-		p.stdlog.Printf(">>> %s(%s): peak: %v, decay: %v", src, s.Name, s.Data["peak"], s.Data["decay"])
+		peak := s.Data["peak"].(gst.GValueArray)
+		decay := s.Data["decay"].(gst.GValueArray)
+
+		meter := make(map[int]MeterChannel, len(peak))
+		for i := 0; i < len(peak); i++ {
+			meter[i] = MeterChannel{peak: peak[i].(float64), decay: decay[i].(float64)}
+		}
+
+		len, ok := p.pipe.QueryDuration(gst.FORMAT_TIME)
+		if !ok {
+			p.stdlog.Printf("GStreamer Pipeline Error: unable to query duration of file")
+		}
+		pos, ok := p.pipe.QueryPosition(gst.FORMAT_TIME)
+		if !ok {
+			p.stdlog.Printf("GStreamer Pipeline Error: unable to query duration of file")
+		}
+
+		p.updateChan <- updateData{len: len, pos: pos, meter: meter}
 	case gst.MESSAGE_STATE_CHANGED:
 	default:
-		p.stdlog.Printf("GStreamer Message: unknown type '%s'", msg.GetTypeName())
+		p.dbglog.Printf("GStreamer Message: unknown type '%s'", msg.GetTypeName())
 	}
 }
 
@@ -190,6 +230,17 @@ func (p *Player) stop() (resp stopResult) {
 	return
 }
 
+func (p *Player) addUpdateHandler(callback UpdateCB, userdata interface{}) (resp addUpdateHandlerResult) {
+	resp.err = fmt.Errorf("not yet implemented!") // TODO: implement this
+	return
+}
+
+func (p *Player) sendUpdate(len int64, pos int64, meter Meter) {
+	p.dbglog.Printf("player update: len: %d, pos: %d, meter: %+v", len, pos, meter)
+	// TODO: send this to all subscribed handler
+	return
+}
+
 func (p *Player) dispatchRequests() {
 	p.bus.AddSignalWatch()
 	p.bus.Connect("message", func(bus *gst.Bus, msg *gst.Message) { p.onMessage(bus, msg) })
@@ -204,6 +255,10 @@ func (p *Player) dispatchRequests() {
 			req.response <- p.pause()
 		case req := <-p.stopChan:
 			req.response <- p.stop()
+		case req := <-p.addUpdateHandlerChan:
+			req.response <- p.addUpdateHandler(req.callback, req.userdata)
+		case u := <-p.updateChan:
+			p.sendUpdate(u.len, u.pos, u.meter)
 		}
 	}
 }
@@ -263,10 +318,11 @@ func (p *Player) createPipeline() (err error) {
 // Public Interface
 
 type PlayerChan struct {
-	load  chan<- loadRequest
-	play  chan<- playRequest
-	pause chan<- pauseRequest
-	stop  chan<- stopRequest
+	load             chan<- loadRequest
+	play             chan<- playRequest
+	pause            chan<- pauseRequest
+	stop             chan<- stopRequest
+	addUpdateHandler chan addUpdateHandlerRequest
 }
 
 func (p *PlayerChan) Load(cart, cut uint) error {
@@ -323,12 +379,28 @@ func (p *PlayerChan) Stop() error {
 	return nil
 }
 
+func (p *PlayerChan) AddUpdateHandler(callback UpdateCB, userdata interface{}) error {
+	resCh := make(chan addUpdateHandlerResult)
+	req := addUpdateHandlerRequest{}
+	req.callback = callback
+	req.userdata = userdata
+	req.response = resCh
+	p.addUpdateHandler <- req
+
+	res := <-resCh
+	if res.err != nil {
+		return res.err
+	}
+	return nil
+}
+
 func (p *Player) GetInterface() *PlayerChan {
 	ch := &PlayerChan{}
 	ch.load = p.loadChan
 	ch.play = p.playChan
 	ch.pause = p.pauseChan
 	ch.stop = p.stopChan
+	ch.addUpdateHandler = p.addUpdateHandlerChan
 	return ch
 }
 
@@ -349,6 +421,8 @@ func NewPlayer(basepath string, stdlog *log.Logger, dbglog *log.Logger) (p *Play
 	p.playChan = make(chan playRequest)
 	p.pauseChan = make(chan pauseRequest)
 	p.stopChan = make(chan stopRequest)
+	p.updateChan = make(chan updateData, 20)
+	p.addUpdateHandlerChan = make(chan addUpdateHandlerRequest)
 
 	if err = p.createPipeline(); err != nil {
 		return
diff --git a/rhimport/core.go b/rhimport/core.go
index 6876113..1ee8c45 100644
--- a/rhimport/core.go
+++ b/rhimport/core.go
@@ -53,7 +53,7 @@ func init() {
 }
 
 type ProgressCB func(step int, stepName string, progress float64, userdata interface{}) bool
-type DoneCB func(Result, interface{}) bool
+type DoneCB func(result Result, userdata interface{}) bool
 
 type Result struct {
 	ResponseCode int
-- 
cgit v0.10.2