// // rhlibrary // // The Radio Helsinki Rivendell Library // // // Copyright (C) 2016 Christian Pointner // // This file is part of rhlibrary. // // rhlibrary is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // rhlibrary is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with rhlibrary. If not, see . // package player import ( "fmt" "io/ioutil" "log" "os" "path" "time" "github.com/spreadspace/go-gstreamer" ) func init() { gst.Init(nil) } type loadResult struct { err error } type loadRequest struct { cart uint cut uint response chan<- loadResult } type unloadResult struct { err error } type unloadRequest struct { response chan<- unloadResult } type playResult struct { err error } type playRequest struct { response chan<- playResult } type pauseResult struct { err error } type pauseRequest struct { response chan<- pauseResult } type playPauseResult struct { err error } type playPauseRequest struct { response chan<- playPauseResult } type stopResult struct { err error } type stopRequest struct { response chan<- stopResult } type seekResult struct { err error } type seekRequest struct { pos float64 response chan<- seekResult } type addUpdateHandlerResult struct { err error } type addUpdateHandlerRequest struct { userdata interface{} callback UpdateCB response chan<- addUpdateHandlerResult } type addStateChangeHandlerResult struct { err error } type addStateChangeHandlerRequest struct { userdata interface{} callback StateChangeCB response chan<- addStateChangeHandlerResult } type MeterChannel struct { Peak float64 Decay float64 } type Meter []MeterChannel type updateData struct { duration time.Duration pos time.Duration meter Meter } type UpdateCB func(duration time.Duration, pos time.Duration, meter Meter, userdata interface{}) bool type pUpdateCB struct { cb UpdateCB userdata interface{} } type State int const ( IDLE State = 1 + iota PAUSED PLAYING ) type StateChangeCB func(state State, userdata interface{}) bool type pStateChangeCB struct { cb StateChangeCB userdata interface{} } type Player struct { pipe *gst.Pipeline src *gst.Element level *gst.Element bus *gst.Bus basepath string stdlog *log.Logger dbglog *log.Logger state State duration time.Duration loadChan chan loadRequest unloadChan chan unloadRequest playChan chan playRequest pauseChan chan pauseRequest playPauseChan chan playPauseRequest stopChan chan stopRequest seekChan chan seekRequest addUpdateHandlerChan chan addUpdateHandlerRequest updateCBs []*pUpdateCB updateChan chan updateData addStateChangeHandlerChan chan addStateChangeHandlerRequest stateChangeCBs []*pStateChangeCB stateChangeChan chan State } func (p *Player) changeState(new State) { p.state = new p.stateChangeChan <- new if new == IDLE { p.updateChan <- updateData{} } } func (p *Player) onMessage(bus *gst.Bus, msg *gst.Message) { switch msg.GetType() { case gst.MESSAGE_EOS: p.pipe.SetState(gst.STATE_NULL) p.pipe.SetState(gst.STATE_PAUSED) p.changeState(PAUSED) p.duration = 0 case gst.MESSAGE_WARNING: warn, _ := msg.ParseWarning() p.stdlog.Printf("GStreamer Pipeline Warning: %s", warn) case gst.MESSAGE_ERROR: p.pipe.SetState(gst.STATE_NULL) p.changeState(IDLE) p.duration = 0 p.updateChan <- updateData{duration: p.duration} err, _ := msg.ParseError() p.stdlog.Printf("GStreamer Pipeline Error: %s", err) case gst.MESSAGE_DURATION_CHANGED, gst.MESSAGE_ASYNC_DONE: duration, ok := p.pipe.QueryDuration(gst.FORMAT_TIME) if ok { p.duration = time.Duration(duration) pos, ok := p.pipe.QueryPosition(gst.FORMAT_TIME) if !ok { p.stdlog.Printf("GStreamer Pipeline Error: unable to query playback position") } p.updateChan <- updateData{duration: p.duration, pos: time.Duration(pos)} } else { p.stdlog.Printf("GStreamer Pipeline Error: unable to query duration of file") } case gst.MESSAGE_ELEMENT: s := msg.GetStructure() peak := s.Data["peak"].(gst.GValueArray) decay := s.Data["decay"].(gst.GValueArray) meter := make([]MeterChannel, len(peak)) for i := 0; i < len(peak); i++ { meter[i] = MeterChannel{Peak: peak[i].(float64), Decay: decay[i].(float64)} } pos := time.Duration(s.Data["stream-time"].(uint64)) p.updateChan <- updateData{duration: p.duration, pos: pos, meter: meter} case gst.MESSAGE_STATE_CHANGED: case gst.MESSAGE_STREAM_STATUS: case gst.MESSAGE_STREAM_START: case gst.MESSAGE_TAG: case gst.MESSAGE_NEW_CLOCK: case gst.MESSAGE_RESET_TIME: default: p.dbglog.Printf("GStreamer Message: unknown type '%s'", msg.GetTypeName()) } } func (p *Player) load(cart, cut uint) (resp loadResult) { filename := path.Join(p.basepath, fmt.Sprintf("%06d_%03d.wav", cart, cut)) var file *os.File if file, resp.err = os.Open(filename); resp.err != nil { resp.err = fmt.Errorf("player: %s", resp.err) return } if info, err := file.Stat(); err != nil { resp.err = fmt.Errorf("player: %s", err) return } else { if info.IsDir() { resp.err = fmt.Errorf("player error: '%s' is a directory", filename) return } } file.Close() if p.state != IDLE { p.pipe.SetState(gst.STATE_NULL) p.changeState(IDLE) } p.src.SetProperty("uri", "file://"+filename) p.pipe.SetState(gst.STATE_PAUSED) p.changeState(PAUSED) return } func (p *Player) unload() (resp unloadResult) { p.pipe.SetState(gst.STATE_NULL) p.changeState(IDLE) return } func (p *Player) play() (resp playResult) { if p.state != PAUSED { resp.err = fmt.Errorf("player: no file loaded") return } p.pipe.SetState(gst.STATE_PLAYING) p.changeState(PLAYING) return } func (p *Player) pause() (resp pauseResult) { if p.state != PLAYING { resp.err = fmt.Errorf("player: not playing") return } p.pipe.SetState(gst.STATE_PAUSED) p.changeState(PAUSED) return } func (p *Player) playPause() (resp playPauseResult) { switch p.state { case IDLE, PAUSED: p.pipe.SetState(gst.STATE_PLAYING) p.changeState(PLAYING) case PLAYING: p.pipe.SetState(gst.STATE_PAUSED) p.changeState(PAUSED) } return } func (p *Player) stop() (resp stopResult) { switch p.state { case IDLE: p.pipe.SetState(gst.STATE_NULL) p.changeState(IDLE) case PLAYING, PAUSED: p.pipe.SetState(gst.STATE_NULL) p.pipe.SetState(gst.STATE_PAUSED) p.changeState(PAUSED) } return } func (p *Player) seek(pos float64) (resp seekResult) { postime := int64(float64(p.duration) * pos) switch p.state { case PLAYING, PAUSED: p.pipe.SeekSimple(gst.FORMAT_TIME, gst.SEEK_FLAG_FLUSH|gst.SEEK_FLAG_KEY_UNIT, postime) } return } func (p *Player) addUpdateHandler(callback UpdateCB, userdata interface{}) (resp addUpdateHandlerResult) { p.updateCBs = append(p.updateCBs, &pUpdateCB{callback, userdata}) return } func (p *Player) sendUpdate(duration time.Duration, pos time.Duration, meter Meter) { for _, cb := range p.updateCBs { if cb.cb != nil { if keep := cb.cb(duration, pos, meter, cb.userdata); !keep { cb.cb = nil } } } return } func (p *Player) addStateChangeHandler(callback StateChangeCB, userdata interface{}) (resp addStateChangeHandlerResult) { p.stateChangeCBs = append(p.stateChangeCBs, &pStateChangeCB{callback, userdata}) return } func (p *Player) sendStateChange(s State) { for _, cb := range p.stateChangeCBs { if cb.cb != nil { if keep := cb.cb(s, cb.userdata); !keep { cb.cb = nil } } } return } func (p *Player) dispatchRequests() { p.bus.AddSignalWatch() p.bus.Connect("message", func(bus *gst.Bus, msg *gst.Message) { p.onMessage(bus, msg) }) for { select { case req := <-p.loadChan: req.response <- p.load(req.cart, req.cut) case req := <-p.unloadChan: req.response <- p.unload() case req := <-p.playChan: req.response <- p.play() case req := <-p.pauseChan: req.response <- p.pause() case req := <-p.playPauseChan: req.response <- p.playPause() case req := <-p.stopChan: req.response <- p.stop() case req := <-p.seekChan: req.response <- p.seek(req.pos) case req := <-p.addUpdateHandlerChan: req.response <- p.addUpdateHandler(req.callback, req.userdata) case u := <-p.updateChan: p.sendUpdate(u.duration, u.pos, u.meter) case req := <-p.addStateChangeHandlerChan: req.response <- p.addStateChangeHandler(req.callback, req.userdata) case s := <-p.stateChangeChan: p.sendStateChange(s) } } } func (p *Player) createPipeline() (err error) { if p.pipe, err = gst.PipelineNew("rhlibrary"); err != nil { return } var conv1, conv2, sink *gst.Element if p.src, err = gst.ElementFactoryMake("uridecodebin", "source"); err != nil { return } if conv1, err = gst.ElementFactoryMake("audioconvert", "conv1"); err != nil { return } if p.level, err = gst.ElementFactoryMake("level", "meter"); err != nil { return } p.level.SetProperty("message", true) p.level.SetProperty("interval", 25000000) p.level.SetProperty("peak-ttl", 300000000) p.level.SetProperty("peak-falloff", 25) if conv2, err = gst.ElementFactoryMake("audioconvert", "conv2"); err != nil { return } if sink, err = gst.ElementFactoryMake("autoaudiosink", "sink"); err != nil { return } p.pipe.Add(p.src) p.pipe.Add(conv1) p.pipe.Add(p.level) p.pipe.Add(conv2) p.pipe.Add(sink) sinkpad, err := conv1.GetStaticPad("sink") if err != nil { return fmt.Errorf("player error getting sink pad from conv1: %s", err) } p.src.Connect("pad-added", func(_ interface{}, srcpad *gst.Pad) { if ret := srcpad.Link(sinkpad); ret != gst.PAD_LINK_OK { p.stdlog.Printf("player error linking src with conv1 (code: %v)", ret) } p.dbglog.Println("player succesfully linked src with conv1") }) conv1.Link(p.level) p.level.Link(conv2) conv2.Link(sink) if p.bus, err = p.pipe.GetBus(); err != nil { return } return } // ********************************************************* // Public Interface type PlayerChan struct { load chan<- loadRequest unload chan<- unloadRequest play chan<- playRequest playPause chan<- playPauseRequest pause chan<- pauseRequest stop chan<- stopRequest seek chan<- seekRequest addUpdateHandler chan<- addUpdateHandlerRequest addStateChangeHandler chan<- addStateChangeHandlerRequest } func (p *PlayerChan) Load(cart, cut uint) error { resCh := make(chan loadResult) req := loadRequest{} req.cart = cart req.cut = cut req.response = resCh p.load <- req res := <-resCh if res.err != nil { return res.err } return nil } func (p *PlayerChan) Unload() error { resCh := make(chan unloadResult) req := unloadRequest{} req.response = resCh p.unload <- req res := <-resCh if res.err != nil { return res.err } return nil } func (p *PlayerChan) Play() error { resCh := make(chan playResult) req := playRequest{} req.response = resCh p.play <- req res := <-resCh if res.err != nil { return res.err } return nil } func (p *PlayerChan) Pause() error { resCh := make(chan pauseResult) req := pauseRequest{} req.response = resCh p.pause <- req res := <-resCh if res.err != nil { return res.err } return nil } func (p *PlayerChan) PlayPause() error { resCh := make(chan playPauseResult) req := playPauseRequest{} req.response = resCh p.playPause <- req res := <-resCh if res.err != nil { return res.err } return nil } func (p *PlayerChan) Stop() error { resCh := make(chan stopResult) req := stopRequest{} req.response = resCh p.stop <- req res := <-resCh if res.err != nil { return res.err } return nil } func (p *PlayerChan) Seek(pos float64) error { resCh := make(chan seekResult) req := seekRequest{} req.pos = pos req.response = resCh p.seek <- req res := <-resCh if res.err != nil { return res.err } return nil } func (p *PlayerChan) AddUpdateHandler(callback UpdateCB, userdata interface{}) error { resCh := make(chan addUpdateHandlerResult) req := addUpdateHandlerRequest{} req.callback = callback req.userdata = userdata req.response = resCh p.addUpdateHandler <- req res := <-resCh if res.err != nil { return res.err } return nil } func (p *PlayerChan) AddStateChangeHandler(callback StateChangeCB, userdata interface{}) error { resCh := make(chan addStateChangeHandlerResult) req := addStateChangeHandlerRequest{} req.callback = callback req.userdata = userdata req.response = resCh p.addStateChangeHandler <- req res := <-resCh if res.err != nil { return res.err } return nil } func (p *Player) GetInterface() *PlayerChan { ch := &PlayerChan{} ch.load = p.loadChan ch.unload = p.unloadChan ch.play = p.playChan ch.pause = p.pauseChan ch.playPause = p.playPauseChan ch.stop = p.stopChan ch.seek = p.seekChan ch.addUpdateHandler = p.addUpdateHandlerChan ch.addStateChangeHandler = p.addStateChangeHandlerChan return ch } func NewPlayer(basepath string, stdlog *log.Logger, dbglog *log.Logger) (p *Player, err error) { p = &Player{} p.basepath = path.Clean(basepath) p.state = IDLE p.duration = 0 if stdlog != nil { p.stdlog = stdlog } else { p.stdlog = log.New(ioutil.Discard, "rhrd-go.player", log.LstdFlags) } if dbglog != nil { p.dbglog = dbglog } else { p.dbglog = log.New(ioutil.Discard, "rhrd-go.player-dbg", log.LstdFlags) } p.loadChan = make(chan loadRequest) p.unloadChan = make(chan unloadRequest) p.playChan = make(chan playRequest) p.pauseChan = make(chan pauseRequest) p.playPauseChan = make(chan playPauseRequest) p.stopChan = make(chan stopRequest) p.seekChan = make(chan seekRequest) p.addUpdateHandlerChan = make(chan addUpdateHandlerRequest) p.updateChan = make(chan updateData, 20) p.addStateChangeHandlerChan = make(chan addStateChangeHandlerRequest) p.stateChangeChan = make(chan State, 10) if err = p.createPipeline(); err != nil { return } go p.dispatchRequests() return }