// // rhlibrary // // The Radio Helsinki Rivendell Library // // // Copyright (C) 2016 Christian Pointner // // This file is part of rhlibrary. // // rhlibrary is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // rhlibrary is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with rhlibrary. If not, see . // package player import ( "fmt" "github.com/gotk3/gotk3/glib" "github.com/spreadspace/go-gstreamer" "io/ioutil" "log" "os" "path" ) func init() { gst.Init(nil) } type loadResult struct { err error } type loadRequest struct { cart uint cut uint response chan<- loadResult } type playResult struct { err error } type playRequest struct { response chan<- playResult } type pauseResult struct { err error } type pauseRequest struct { response chan<- pauseResult } type stopResult struct { err error } type stopRequest struct { response chan<- stopResult } type State int const ( IDLE State = 1 + iota PAUSED PLAYING ) type Player struct { pipe *gst.Pipeline src *gst.Element volume *gst.Element bus *gst.Bus basepath string stdlog *log.Logger dbglog *log.Logger state State loadChan chan loadRequest playChan chan playRequest pauseChan chan pauseRequest stopChan chan stopRequest } func (p *Player) onMessage(bus *gst.Bus, msg *gst.Message) { switch msg.GetType() { case gst.MESSAGE_EOS: p.pipe.SetState(gst.STATE_NULL) p.state = IDLE p.stdlog.Printf("GStreamer Pipeline: EOS reached!\n") case gst.MESSAGE_WARNING: warn, _ := msg.ParseWarning() p.stdlog.Printf("GStreamer Pipeline Warning: %s\n", warn) case gst.MESSAGE_ERROR: p.pipe.SetState(gst.STATE_NULL) p.state = IDLE err, _ := msg.ParseError() p.stdlog.Printf("GStreamer Pipeline Error: %s\n", err) case gst.MESSAGE_ASYNC_DONE: len, ok := p.pipe.QueryDuration(gst.FORMAT_TIME) if ok { p.stdlog.Printf("GStreamer Pipeline: loaded file has length: %d.%d s\n", len/1000000000, len%1000000000) } else { p.stdlog.Printf("GStreamer Pipeline Error: unable to query duration of file\n") } case gst.MESSAGE_STATE_CHANGED: default: p.stdlog.Printf("GStreamer Message: unknown type '%s'\n", msg.GetTypeName()) } } func (p *Player) load(cart, cut uint) (resp loadResult) { filename := path.Join(p.basepath, fmt.Sprintf("%06d_%03d.wav", cart, cut)) var file *os.File if file, resp.err = os.Open(filename); resp.err != nil { resp.err = fmt.Errorf("player: %s", resp.err) return } if info, err := file.Stat(); err != nil { resp.err = fmt.Errorf("player: %s", err) return } else { if info.IsDir() { resp.err = fmt.Errorf("player error: '%s' is a directory", filename) return } } file.Close() if p.state != IDLE { p.pipe.SetState(gst.STATE_NULL) p.state = IDLE } p.src.SetProperty("uri", "file://"+filename) p.pipe.SetState(gst.STATE_PAUSED) p.state = PAUSED return } func (p *Player) play() (resp playResult) { if p.state != PAUSED { resp.err = fmt.Errorf("player: no file loaded") return } p.pipe.SetState(gst.STATE_PLAYING) p.state = PLAYING return } func (p *Player) pause() (resp pauseResult) { if p.state != PLAYING { resp.err = fmt.Errorf("player: not playing") return } p.pipe.SetState(gst.STATE_PAUSED) p.state = PAUSED return } func (p *Player) stop() (resp stopResult) { switch p.state { case IDLE: p.pipe.SetState(gst.STATE_NULL) p.state = IDLE case PLAYING: fallthrough case PAUSED: p.pipe.SetState(gst.STATE_NULL) p.pipe.SetState(gst.STATE_PAUSED) p.state = PAUSED } return } func (p *Player) dispatchRequests() { p.bus.AddSignalWatch() p.bus.Connect("message", func(bus *gst.Bus, msg *gst.Message) { p.onMessage(bus, msg) }) for { select { case req := <-p.loadChan: req.response <- p.load(req.cart, req.cut) case req := <-p.playChan: req.response <- p.play() case req := <-p.pauseChan: req.response <- p.pause() case req := <-p.stopChan: req.response <- p.stop() } } } func (p *Player) createPipeline() (err error) { if p.pipe, err = gst.PipelineNew("rhlibrary"); err != nil { return } var conv1, conv2, sink *gst.Element if p.src, err = gst.ElementFactoryMake("uridecodebin", "source"); err != nil { return } if conv1, err = gst.ElementFactoryMake("audioconvert", "conv1"); err != nil { return } if p.volume, err = gst.ElementFactoryMake("volume", "meter"); err != nil { return } if conv2, err = gst.ElementFactoryMake("audioconvert", "conv2"); err != nil { return } if sink, err = gst.ElementFactoryMake("autoaudiosink", "sink"); err != nil { return } p.pipe.Add(p.src) p.pipe.Add(conv1) p.pipe.Add(p.volume) p.pipe.Add(conv2) p.pipe.Add(sink) sinkpad, err := conv1.GetStaticPad("sink") if err != nil { return fmt.Errorf("player error getting sink pad from conv1: %s", err) } p.src.Connect("pad-added", func(_ *glib.Object, srcpad *gst.Pad) { if ret := srcpad.Link(sinkpad); ret != gst.PAD_LINK_OK { p.stdlog.Println("player error linking src with conv1 (code: %v)", ret) } p.dbglog.Println("player succesfully linked src with conv1") }) conv1.Link(p.volume) p.volume.Link(conv2) conv2.Link(sink) if p.bus, err = p.pipe.GetBus(); err != nil { return } return } // ********************************************************* // Public Interface type PlayerChan struct { load chan<- loadRequest play chan<- playRequest pause chan<- pauseRequest stop chan<- stopRequest } func (p *PlayerChan) Load(cart, cut uint) error { resCh := make(chan loadResult) req := loadRequest{} req.cart = cart req.cut = cut req.response = resCh p.load <- req res := <-resCh if res.err != nil { return res.err } return nil } func (p *PlayerChan) Play() error { resCh := make(chan playResult) req := playRequest{} req.response = resCh p.play <- req res := <-resCh if res.err != nil { return res.err } return nil } func (p *PlayerChan) Pause() error { resCh := make(chan pauseResult) req := pauseRequest{} req.response = resCh p.pause <- req res := <-resCh if res.err != nil { return res.err } return nil } func (p *PlayerChan) Stop() error { resCh := make(chan stopResult) req := stopRequest{} req.response = resCh p.stop <- req res := <-resCh if res.err != nil { return res.err } return nil } func (p *Player) GetInterface() *PlayerChan { ch := &PlayerChan{} ch.load = p.loadChan ch.play = p.playChan ch.pause = p.pauseChan ch.stop = p.stopChan return ch } func NewPlayer(basepath string, stdlog *log.Logger, dbglog *log.Logger) (p *Player, err error) { p = &Player{} p.basepath = path.Clean(basepath) if stdlog != nil { p.stdlog = stdlog } else { p.stdlog = log.New(ioutil.Discard, "autoplayrhrd-go.player", log.LstdFlags) } if dbglog != nil { p.dbglog = dbglog } else { p.dbglog = log.New(ioutil.Discard, "rhrd-go.player-dbg", log.LstdFlags) } p.loadChan = make(chan loadRequest) p.playChan = make(chan playRequest) p.pauseChan = make(chan pauseRequest) p.stopChan = make(chan stopRequest) if err = p.createPipeline(); err != nil { return } go p.dispatchRequests() return }