From 0065196d2ea950052b0cde65d2f0334ddfddbb80 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Fri, 15 Jan 2016 03:07:10 +0100 Subject: player now constructs update messages diff --git a/player/player.go b/player/player.go index 3e20ffd..5fe631c 100644 --- a/player/player.go +++ b/player/player.go @@ -72,6 +72,28 @@ type stopRequest struct { response chan<- stopResult } +type addUpdateHandlerResult struct { + err error +} + +type addUpdateHandlerRequest struct { + userdata interface{} + callback UpdateCB + response chan<- addUpdateHandlerResult +} + +type MeterChannel struct { + peak float64 + decay float64 +} +type Meter map[int]MeterChannel +type updateData struct { + len int64 + pos int64 + meter Meter +} +type UpdateCB func(len int64, pos int64, meter Meter, userdata interface{}) bool + type State int const ( @@ -81,18 +103,20 @@ const ( ) type Player struct { - pipe *gst.Pipeline - src *gst.Element - level *gst.Element - bus *gst.Bus - basepath string - stdlog *log.Logger - dbglog *log.Logger - state State - loadChan chan loadRequest - playChan chan playRequest - pauseChan chan pauseRequest - stopChan chan stopRequest + pipe *gst.Pipeline + src *gst.Element + level *gst.Element + bus *gst.Bus + basepath string + stdlog *log.Logger + dbglog *log.Logger + state State + loadChan chan loadRequest + playChan chan playRequest + pauseChan chan pauseRequest + stopChan chan stopRequest + updateChan chan updateData + addUpdateHandlerChan chan addUpdateHandlerRequest } func (p *Player) onMessage(bus *gst.Bus, msg *gst.Message) { @@ -100,7 +124,7 @@ func (p *Player) onMessage(bus *gst.Bus, msg *gst.Message) { case gst.MESSAGE_EOS: p.pipe.SetState(gst.STATE_NULL) p.state = IDLE - p.stdlog.Printf("GStreamer Pipeline: EOS reached!") + p.updateChan <- updateData{} case gst.MESSAGE_WARNING: warn, _ := msg.ParseWarning() p.stdlog.Printf("GStreamer Pipeline Warning: %s", warn) @@ -112,17 +136,33 @@ func (p *Player) onMessage(bus *gst.Bus, msg *gst.Message) { case gst.MESSAGE_ASYNC_DONE: len, ok := p.pipe.QueryDuration(gst.FORMAT_TIME) if ok { - p.stdlog.Printf("GStreamer Pipeline: loaded file has length: %d.%d s", len/1000000000, len%1000000000) + p.updateChan <- updateData{len: len} } else { p.stdlog.Printf("GStreamer Pipeline Error: unable to query duration of file") } case gst.MESSAGE_ELEMENT: - src := msg.GetSourceName() s := msg.GetStructure() - p.stdlog.Printf(">>> %s(%s): peak: %v, decay: %v", src, s.Name, s.Data["peak"], s.Data["decay"]) + peak := s.Data["peak"].(gst.GValueArray) + decay := s.Data["decay"].(gst.GValueArray) + + meter := make(map[int]MeterChannel, len(peak)) + for i := 0; i < len(peak); i++ { + meter[i] = MeterChannel{peak: peak[i].(float64), decay: decay[i].(float64)} + } + + len, ok := p.pipe.QueryDuration(gst.FORMAT_TIME) + if !ok { + p.stdlog.Printf("GStreamer Pipeline Error: unable to query duration of file") + } + pos, ok := p.pipe.QueryPosition(gst.FORMAT_TIME) + if !ok { + p.stdlog.Printf("GStreamer Pipeline Error: unable to query duration of file") + } + + p.updateChan <- updateData{len: len, pos: pos, meter: meter} case gst.MESSAGE_STATE_CHANGED: default: - p.stdlog.Printf("GStreamer Message: unknown type '%s'", msg.GetTypeName()) + p.dbglog.Printf("GStreamer Message: unknown type '%s'", msg.GetTypeName()) } } @@ -190,6 +230,17 @@ func (p *Player) stop() (resp stopResult) { return } +func (p *Player) addUpdateHandler(callback UpdateCB, userdata interface{}) (resp addUpdateHandlerResult) { + resp.err = fmt.Errorf("not yet implemented!") // TODO: implement this + return +} + +func (p *Player) sendUpdate(len int64, pos int64, meter Meter) { + p.dbglog.Printf("player update: len: %d, pos: %d, meter: %+v", len, pos, meter) + // TODO: send this to all subscribed handler + return +} + func (p *Player) dispatchRequests() { p.bus.AddSignalWatch() p.bus.Connect("message", func(bus *gst.Bus, msg *gst.Message) { p.onMessage(bus, msg) }) @@ -204,6 +255,10 @@ func (p *Player) dispatchRequests() { req.response <- p.pause() case req := <-p.stopChan: req.response <- p.stop() + case req := <-p.addUpdateHandlerChan: + req.response <- p.addUpdateHandler(req.callback, req.userdata) + case u := <-p.updateChan: + p.sendUpdate(u.len, u.pos, u.meter) } } } @@ -263,10 +318,11 @@ func (p *Player) createPipeline() (err error) { // Public Interface type PlayerChan struct { - load chan<- loadRequest - play chan<- playRequest - pause chan<- pauseRequest - stop chan<- stopRequest + load chan<- loadRequest + play chan<- playRequest + pause chan<- pauseRequest + stop chan<- stopRequest + addUpdateHandler chan addUpdateHandlerRequest } func (p *PlayerChan) Load(cart, cut uint) error { @@ -323,12 +379,28 @@ func (p *PlayerChan) Stop() error { return nil } +func (p *PlayerChan) AddUpdateHandler(callback UpdateCB, userdata interface{}) error { + resCh := make(chan addUpdateHandlerResult) + req := addUpdateHandlerRequest{} + req.callback = callback + req.userdata = userdata + req.response = resCh + p.addUpdateHandler <- req + + res := <-resCh + if res.err != nil { + return res.err + } + return nil +} + func (p *Player) GetInterface() *PlayerChan { ch := &PlayerChan{} ch.load = p.loadChan ch.play = p.playChan ch.pause = p.pauseChan ch.stop = p.stopChan + ch.addUpdateHandler = p.addUpdateHandlerChan return ch } @@ -349,6 +421,8 @@ func NewPlayer(basepath string, stdlog *log.Logger, dbglog *log.Logger) (p *Play p.playChan = make(chan playRequest) p.pauseChan = make(chan pauseRequest) p.stopChan = make(chan stopRequest) + p.updateChan = make(chan updateData, 20) + p.addUpdateHandlerChan = make(chan addUpdateHandlerRequest) if err = p.createPipeline(); err != nil { return diff --git a/rhimport/core.go b/rhimport/core.go index 6876113..1ee8c45 100644 --- a/rhimport/core.go +++ b/rhimport/core.go @@ -53,7 +53,7 @@ func init() { } type ProgressCB func(step int, stepName string, progress float64, userdata interface{}) bool -type DoneCB func(Result, interface{}) bool +type DoneCB func(result Result, userdata interface{}) bool type Result struct { ResponseCode int -- cgit v0.10.2