summaryrefslogtreecommitdiff
path: root/player
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2016-01-15 02:07:10 (GMT)
committerChristian Pointner <equinox@helsinki.at>2016-01-15 02:07:10 (GMT)
commit0065196d2ea950052b0cde65d2f0334ddfddbb80 (patch)
tree6781cbd23ca85b1f465b156b7ca4340bf621f710 /player
parent44f927c4ff2e2e310f6f86b1baf7e65206141064 (diff)
player now constructs update messages
Diffstat (limited to 'player')
-rw-r--r--player/player.go116
1 files changed, 95 insertions, 21 deletions
diff --git a/player/player.go b/player/player.go
index 3e20ffd..5fe631c 100644
--- a/player/player.go
+++ b/player/player.go
@@ -72,6 +72,28 @@ type stopRequest struct {
response chan<- stopResult
}
+type addUpdateHandlerResult struct {
+ err error
+}
+
+type addUpdateHandlerRequest struct {
+ userdata interface{}
+ callback UpdateCB
+ response chan<- addUpdateHandlerResult
+}
+
+type MeterChannel struct {
+ peak float64
+ decay float64
+}
+type Meter map[int]MeterChannel
+type updateData struct {
+ len int64
+ pos int64
+ meter Meter
+}
+type UpdateCB func(len int64, pos int64, meter Meter, userdata interface{}) bool
+
type State int
const (
@@ -81,18 +103,20 @@ const (
)
type Player struct {
- pipe *gst.Pipeline
- src *gst.Element
- level *gst.Element
- bus *gst.Bus
- basepath string
- stdlog *log.Logger
- dbglog *log.Logger
- state State
- loadChan chan loadRequest
- playChan chan playRequest
- pauseChan chan pauseRequest
- stopChan chan stopRequest
+ pipe *gst.Pipeline
+ src *gst.Element
+ level *gst.Element
+ bus *gst.Bus
+ basepath string
+ stdlog *log.Logger
+ dbglog *log.Logger
+ state State
+ loadChan chan loadRequest
+ playChan chan playRequest
+ pauseChan chan pauseRequest
+ stopChan chan stopRequest
+ updateChan chan updateData
+ addUpdateHandlerChan chan addUpdateHandlerRequest
}
func (p *Player) onMessage(bus *gst.Bus, msg *gst.Message) {
@@ -100,7 +124,7 @@ func (p *Player) onMessage(bus *gst.Bus, msg *gst.Message) {
case gst.MESSAGE_EOS:
p.pipe.SetState(gst.STATE_NULL)
p.state = IDLE
- p.stdlog.Printf("GStreamer Pipeline: EOS reached!")
+ p.updateChan <- updateData{}
case gst.MESSAGE_WARNING:
warn, _ := msg.ParseWarning()
p.stdlog.Printf("GStreamer Pipeline Warning: %s", warn)
@@ -112,17 +136,33 @@ func (p *Player) onMessage(bus *gst.Bus, msg *gst.Message) {
case gst.MESSAGE_ASYNC_DONE:
len, ok := p.pipe.QueryDuration(gst.FORMAT_TIME)
if ok {
- p.stdlog.Printf("GStreamer Pipeline: loaded file has length: %d.%d s", len/1000000000, len%1000000000)
+ p.updateChan <- updateData{len: len}
} else {
p.stdlog.Printf("GStreamer Pipeline Error: unable to query duration of file")
}
case gst.MESSAGE_ELEMENT:
- src := msg.GetSourceName()
s := msg.GetStructure()
- p.stdlog.Printf(">>> %s(%s): peak: %v, decay: %v", src, s.Name, s.Data["peak"], s.Data["decay"])
+ peak := s.Data["peak"].(gst.GValueArray)
+ decay := s.Data["decay"].(gst.GValueArray)
+
+ meter := make(map[int]MeterChannel, len(peak))
+ for i := 0; i < len(peak); i++ {
+ meter[i] = MeterChannel{peak: peak[i].(float64), decay: decay[i].(float64)}
+ }
+
+ len, ok := p.pipe.QueryDuration(gst.FORMAT_TIME)
+ if !ok {
+ p.stdlog.Printf("GStreamer Pipeline Error: unable to query duration of file")
+ }
+ pos, ok := p.pipe.QueryPosition(gst.FORMAT_TIME)
+ if !ok {
+ p.stdlog.Printf("GStreamer Pipeline Error: unable to query duration of file")
+ }
+
+ p.updateChan <- updateData{len: len, pos: pos, meter: meter}
case gst.MESSAGE_STATE_CHANGED:
default:
- p.stdlog.Printf("GStreamer Message: unknown type '%s'", msg.GetTypeName())
+ p.dbglog.Printf("GStreamer Message: unknown type '%s'", msg.GetTypeName())
}
}
@@ -190,6 +230,17 @@ func (p *Player) stop() (resp stopResult) {
return
}
+func (p *Player) addUpdateHandler(callback UpdateCB, userdata interface{}) (resp addUpdateHandlerResult) {
+ resp.err = fmt.Errorf("not yet implemented!") // TODO: implement this
+ return
+}
+
+func (p *Player) sendUpdate(len int64, pos int64, meter Meter) {
+ p.dbglog.Printf("player update: len: %d, pos: %d, meter: %+v", len, pos, meter)
+ // TODO: send this to all subscribed handler
+ return
+}
+
func (p *Player) dispatchRequests() {
p.bus.AddSignalWatch()
p.bus.Connect("message", func(bus *gst.Bus, msg *gst.Message) { p.onMessage(bus, msg) })
@@ -204,6 +255,10 @@ func (p *Player) dispatchRequests() {
req.response <- p.pause()
case req := <-p.stopChan:
req.response <- p.stop()
+ case req := <-p.addUpdateHandlerChan:
+ req.response <- p.addUpdateHandler(req.callback, req.userdata)
+ case u := <-p.updateChan:
+ p.sendUpdate(u.len, u.pos, u.meter)
}
}
}
@@ -263,10 +318,11 @@ func (p *Player) createPipeline() (err error) {
// Public Interface
type PlayerChan struct {
- load chan<- loadRequest
- play chan<- playRequest
- pause chan<- pauseRequest
- stop chan<- stopRequest
+ load chan<- loadRequest
+ play chan<- playRequest
+ pause chan<- pauseRequest
+ stop chan<- stopRequest
+ addUpdateHandler chan addUpdateHandlerRequest
}
func (p *PlayerChan) Load(cart, cut uint) error {
@@ -323,12 +379,28 @@ func (p *PlayerChan) Stop() error {
return nil
}
+func (p *PlayerChan) AddUpdateHandler(callback UpdateCB, userdata interface{}) error {
+ resCh := make(chan addUpdateHandlerResult)
+ req := addUpdateHandlerRequest{}
+ req.callback = callback
+ req.userdata = userdata
+ req.response = resCh
+ p.addUpdateHandler <- req
+
+ res := <-resCh
+ if res.err != nil {
+ return res.err
+ }
+ return nil
+}
+
func (p *Player) GetInterface() *PlayerChan {
ch := &PlayerChan{}
ch.load = p.loadChan
ch.play = p.playChan
ch.pause = p.pauseChan
ch.stop = p.stopChan
+ ch.addUpdateHandler = p.addUpdateHandlerChan
return ch
}
@@ -349,6 +421,8 @@ func NewPlayer(basepath string, stdlog *log.Logger, dbglog *log.Logger) (p *Play
p.playChan = make(chan playRequest)
p.pauseChan = make(chan pauseRequest)
p.stopChan = make(chan stopRequest)
+ p.updateChan = make(chan updateData, 20)
+ p.addUpdateHandlerChan = make(chan addUpdateHandlerRequest)
if err = p.createPipeline(); err != nil {
return