diff options
Diffstat (limited to 'src/rhimportd')
-rw-r--r-- | src/rhimportd/ctrlTelnet.go | 292 | ||||
-rw-r--r-- | src/rhimportd/ctrlWatchDir.go | 219 | ||||
-rw-r--r-- | src/rhimportd/ctrlWeb.go | 55 | ||||
-rw-r--r-- | src/rhimportd/ctrlWebSimple.go | 161 | ||||
-rw-r--r-- | src/rhimportd/ctrlWebSocket.go | 331 | ||||
-rw-r--r-- | src/rhimportd/main.go | 159 |
6 files changed, 1217 insertions, 0 deletions
diff --git a/src/rhimportd/ctrlTelnet.go b/src/rhimportd/ctrlTelnet.go new file mode 100644 index 0000000..5d0aebe --- /dev/null +++ b/src/rhimportd/ctrlTelnet.go @@ -0,0 +1,292 @@ +// +// rhimportd +// +// The Radio Helsinki Rivendell Import Daemon +// +// +// Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at> +// +// This file is part of rhimportd. +// +// rhimportd is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// rhimportd is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with rhimportd. If not, see <http://www.gnu.org/licenses/>. +// + +package main + +import ( + "fmt" + "github.com/spreadspace/telgo" + "helsinki.at/rhimport" + "net/http" + "strconv" + "strings" +) + +func telnetQuit(c *telgo.Client, args []string, conf *rhimport.Config, rddb *rhimport.RdDbChan) bool { + return true +} + +func telnetHelp(c *telgo.Client, args []string, conf *rhimport.Config, rddb *rhimport.RdDbChan) bool { + switch len(args) { + case 2: + switch args[1] { + case "quit": + c.Sayln("usage: quit") + c.Sayln(" terminates the client connection. You may also use Ctrl-D to do this.") + return false + case "help": + c.Sayln("usage: help [ <cmd> ]") + c.Sayln(" prints command overview or detailed info to <cmd>.") + return false + case "set": + c.Sayln("usage: set <param> <value>") + c.Sayln(" this sets the import parameter <param> to <value>.") + c.Sayln("") + c.Sayln(" available parameters:") + c.Sayln(" UserName string username to use for rdxport interface") + c.Sayln(" Password string password to use for rdxport interface") + c.Sayln(" SourceUri string uri to the file to import") + c.Sayln(" ShowId uint the RHRD show id to import to") + c.Sayln(" ClearShowCarts bool clear all show-carts before importing?") + c.Sayln(" GroupName string name of music-pool group to import to") + c.Sayln(" Cart uint cart to import to") + c.Sayln(" ClearCart bool remove/add cart before import") + c.Sayln(" Cut uint cut to import to") + c.Sayln(" Channels uint number of audio channels (default: %v)", conf.ImportParamDefaults.Channels) + c.Sayln(" NormalizationLevel int normalization level in dB (default: %v)", conf.ImportParamDefaults.NormalizationLevel) + c.Sayln(" AutotrimLevel int autotrim level in dB (default: %v)", conf.ImportParamDefaults.AutotrimLevel) + c.Sayln(" UseMetaData bool extract meta data from file (default: %v)", conf.ImportParamDefaults.UseMetaData) + c.Sayln("") + c.Sayln(" UserName, Password and SourceUri are mandatory parameters.") + c.Sayln("") + c.Sayln(" If ShowId is supplied GroupName, Channels, NomalizationLevel, AutorimLevel,") + c.Sayln(" UseMetaData and Cut will be ignored. The values from the shows' dropbox will") + c.Sayln(" be used instead. Cart may be specified but must point to an empty cart within") + c.Sayln(" that show. If ClearCut is true the specified cart will get deleted before") + c.Sayln(" importing. If Cart is 0 the next free cart in the show will be used. Show") + c.Sayln(" carts will always be imported into cut 1.") + c.Sayln("") + c.Sayln(" If GroupName is supplied Channels, NomalizationLevel, AutorimLevel,") + c.Sayln(" UseMetaData, Cut, Cart and ClearCart will be ignored. The values from") + c.Sayln(" the music pools' dropbox will be used instead. The file will always be") + c.Sayln(" imported into cut 1 of the first free cart within the music pool.") + c.Sayln("") + c.Sayln(" If ShowId and GroupName are omitted a Cart must be specified. Cut may be") + c.Sayln(" supplied in which case both cart and cut must already exist. The import will") + c.Sayln(" then replace the contents of the current data stored in Cart/Cut. If only Cart") + c.Sayln(" and no Cut is supplied and ClearCut is false the file will either get imported") + c.Sayln(" into the next cut of an existing cart or the cart will be created and the file") + c.Sayln(" will be imported into cut 1 of this cart.") + c.Sayln("") + c.Sayln(" In case of an error carts/cuts which might got created will be removed. Carts") + c.Sayln(" which got deleted because of ClearShowCarts or ClearCart are however gone for") + c.Sayln(" good.") + return false + case "show": + c.Sayln("usage: show") + c.Sayln(" this prints the current values of all import parameters.") + return false + case "reset": + c.Sayln("usage: reset") + c.Sayln(" this resets all import parameters to default values.") + return false + case "run": + c.Sayln("usage: run") + c.Sayln(" this starts the fetch/import process according to the current") + c.Sayln(" import parameters.") + return false + } + fallthrough + default: + c.Sayln("usage: <cmd> [ [ <arg1> ] ... ]") + c.Sayln(" available commands:") + c.Sayln(" quit close connection (or use Ctrl-D)") + c.Sayln(" help [ <cmd> ] print this, or help for specific command") + c.Sayln(" set <param> <value> sets parameter <param> on current import context") + c.Sayln(" show shows current import context") + c.Sayln(" reset resets current import context") + c.Sayln(" run runs fetch/import using current import context") + } + return false +} + +func telnetSetInt(c *telgo.Client, param *int, val string) { + if vint, err := strconv.ParseInt(val, 10, 32); err != nil { + c.Sayln("invalid value (must be an integer)") + } else { + *param = int(vint) + } +} + +func telnetSetUint(c *telgo.Client, param *uint, val string) { + if vuint, err := strconv.ParseUint(val, 10, 32); err != nil { + c.Sayln("invalid value (must be a positive integer)") + } else { + *param = uint(vuint) + } +} + +func telnetSetBool(c *telgo.Client, param *bool, val string) { + if vbool, err := strconv.ParseBool(val); err != nil { + c.Sayln("invalid value (must be true or false)") + } else { + *param = vbool + } +} + +func telnetSet(c *telgo.Client, args []string, conf *rhimport.Config, rddb *rhimport.RdDbChan) bool { + if len(args) != 3 { + c.Sayln("wrong number of arguments") + return false + } + + var ctx *rhimport.Context + if c.UserData == nil { + c.UserData = rhimport.NewContext(conf, rddb) + ctx = c.UserData.(*rhimport.Context) + ctx.Trusted = false + } else { + ctx = c.UserData.(*rhimport.Context) + } + switch strings.ToLower(args[1]) { + case "username": + ctx.UserName = args[2] + case "password": + ctx.Password = args[2] + case "sourceuri": + ctx.SourceUri = args[2] + case "showid": + telnetSetUint(c, &ctx.ShowId, args[2]) + case "clearshowcarts": + telnetSetBool(c, &ctx.ClearShowCarts, args[2]) + case "groupname": + ctx.GroupName = args[2] + case "cart": + telnetSetUint(c, &ctx.Cart, args[2]) + case "clearcart": + telnetSetBool(c, &ctx.ClearCart, args[2]) + case "cut": + telnetSetUint(c, &ctx.Cut, args[2]) + case "channels": + telnetSetUint(c, &ctx.Channels, args[2]) + case "normalizationlevel": + telnetSetInt(c, &ctx.NormalizationLevel, args[2]) + case "autotrimlevel": + telnetSetInt(c, &ctx.AutotrimLevel, args[2]) + case "usemetadata": + telnetSetBool(c, &ctx.UseMetaData, args[2]) + default: + c.Sayln("unknown parameter, use 'help set' for a list of available parameters") + } + return false +} + +func telnetReset(c *telgo.Client, args []string, conf *rhimport.Config, rddb *rhimport.RdDbChan) bool { + if len(args) > 1 { + c.Sayln("too many arguments") + return false + } + + c.UserData = nil + return false +} + +func telnetShow(c *telgo.Client, args []string, conf *rhimport.Config, rddb *rhimport.RdDbChan) bool { + if len(args) > 1 { + c.Sayln("too many arguments") + return false + } + + if c.UserData != nil { + ctx := c.UserData.(*rhimport.Context) + c.Sayln(" UserName: %q", ctx.UserName) + c.Sayln(" Password: %q", ctx.Password) + c.Sayln(" SourceUri: %q", ctx.SourceUri) + c.Sayln(" ShowId: %v", ctx.ShowId) + c.Sayln(" ClearShowCarts: %v", ctx.ClearShowCarts) + c.Sayln(" GroupName: %q", ctx.GroupName) + c.Sayln(" Cart: %v", ctx.Cart) + c.Sayln(" ClearCart: %v", ctx.ClearCart) + c.Sayln(" Cut: %v", ctx.Cut) + c.Sayln(" Channels: %v", ctx.Channels) + c.Sayln(" NormalizationLevel: %v", ctx.NormalizationLevel) + c.Sayln(" AutotrimLevel: %v", ctx.AutotrimLevel) + c.Sayln(" UseMetaData: %v", ctx.UseMetaData) + } else { + c.Sayln("context is empty") + } + return false +} + +func telnetProgressCallback(step int, stepName string, progress float64, userdata interface{}) bool { + c := userdata.(*telgo.Client) + c.Say("%s: %3.2f%%\r", stepName, progress*100) + return true +} + +func telnetRun(c *telgo.Client, args []string, conf *rhimport.Config, rddb *rhimport.RdDbChan) bool { + if c.UserData == nil { + c.Sayln("context is empty please set at least one option") + return false + } + ctx := c.UserData.(*rhimport.Context) + if err := ctx.SanityCheck(); err != nil { + c.Sayln("sanity check for import context returned: %s", err) + return false + } + + ctx.ProgressCallBack = telnetProgressCallback + ctx.ProgressCallBackData = c + ctx.Cancel = c.Cancel + + c.Sayln("fetching file from '%s'", ctx.SourceUri) + if res, err := rhimport.FetchFile(ctx); err != nil { + c.Sayln("fetch file error: %s", err) + return false + } else if res.ResponseCode != http.StatusOK { + c.Sayln("fetch file error: %s", res.ErrorString) + return false + } + + c.Sayln("") + c.Sayln("importing file '%s'", ctx.SourceFile) + if res, err := rhimport.ImportFile(ctx); err != nil { + c.Sayln("") + c.Sayln("import file error: %s", err) + } else { + c.Sayln("") + if res.ResponseCode == http.StatusOK { + c.Sayln("File got succesfully imported into Cart/Cut %d/%d", res.Cart, res.Cut) + } else { + c.Sayln("Fileimport has failed (Cart/Cut %d/%d): %s", res.Cart, res.Cut, res.ErrorString) + } + } + return false +} + +func StartControlTelnet(addr string, conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan) { + cmdlist := make(telgo.CmdList) + cmdlist["quit"] = func(c *telgo.Client, args []string) bool { return telnetQuit(c, args, conf, rddb) } + cmdlist["help"] = func(c *telgo.Client, args []string) bool { return telnetHelp(c, args, conf, rddb) } + cmdlist["set"] = func(c *telgo.Client, args []string) bool { return telnetSet(c, args, conf, rddb) } + cmdlist["reset"] = func(c *telgo.Client, args []string) bool { return telnetReset(c, args, conf, rddb) } + cmdlist["show"] = func(c *telgo.Client, args []string) bool { return telnetShow(c, args, conf, rddb) } + cmdlist["run"] = func(c *telgo.Client, args []string) bool { return telnetRun(c, args, conf, rddb) } + + rhl.Println("telnet-ctrl: listening on", addr) + s := telgo.NewServer(addr, "rhimportd> ", cmdlist, nil) + if err := s.Run(); err != nil { + fmt.Printf("telnet server returned: %s", err) + } +} diff --git a/src/rhimportd/ctrlWatchDir.go b/src/rhimportd/ctrlWatchDir.go new file mode 100644 index 0000000..6f84c15 --- /dev/null +++ b/src/rhimportd/ctrlWatchDir.go @@ -0,0 +1,219 @@ +// +// rhimportd +// +// The Radio Helsinki Rivendell Import Daemon +// +// +// Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at> +// +// This file is part of rhimportd. +// +// rhimportd is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// rhimportd is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with rhimportd. If not, see <http://www.gnu.org/licenses/>. +// + +package main + +import ( + "encoding/json" + "fmt" + "helsinki.at/rhimport" + "net/http" + "os" + "path/filepath" + "strings" + "time" +) + +type watchDirRequestData struct { + UserName string `json:"LOGIN_NAME"` + ShowId uint `json:"SHOW_ID"` + ClearShowCarts bool `json:"CLEAR_SHOW_CARTS"` + MusicPoolGroup string `json:"MUSIC_POOL_GROUP"` + Cart uint `json:"CART_NUMBER"` + ClearCart bool `json:"CLEAR_CART"` + Cut uint `json:"CUT_NUMBER"` + Channels uint `json:"CHANNELS"` + NormalizationLevel int `json:"NORMALIZATION_LEVEL"` + AutotrimLevel int `json:"AUTOTRIM_LEVEL"` + UseMetaData bool `json:"USE_METADATA"` + SourceUri string `json:"SOURCE_URI"` +} + +func newWatchDirRequestData(conf *rhimport.Config) *watchDirRequestData { + rd := new(watchDirRequestData) + rd.UserName = "" + rd.ShowId = 0 + rd.ClearShowCarts = false + rd.MusicPoolGroup = "" + rd.Cart = 0 + rd.ClearCart = false + rd.Cut = 0 + rd.Channels = conf.ImportParamDefaults.Channels + rd.NormalizationLevel = conf.ImportParamDefaults.NormalizationLevel + rd.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel + rd.UseMetaData = conf.ImportParamDefaults.UseMetaData + rd.SourceUri = "" + + return rd +} + +type watchDirResponseData struct { + ResponseCode int `json:"REPONSE_CODE"` + ErrorString string `json:"ERROR_STRING"` + Cart uint `json:"CART_NUMBER"` + Cut uint `json:"CUT_NUMBER"` +} + +func watchDirWriteResponse(filename string, resp *watchDirResponseData) { + file, err := os.OpenFile(filename, os.O_WRONLY|os.O_TRUNC, 0) + if err != nil { + rhl.Printf("watch-dir-ctrl: writing response failed: %s", err) + return + } + encoder := json.NewEncoder(file) + encoder.Encode(resp) + file.Close() + + dstname := strings.TrimSuffix(filename, ".running") + ".done" + os.Rename(filename, dstname) +} + +func watchDirErrorResponse(filename string, code int, errStr string) { + watchDirWriteResponse(filename, &watchDirResponseData{code, errStr, 0, 0}) +} + +func watchDirResponse(filename string, result *rhimport.Result) { + watchDirWriteResponse(filename, &watchDirResponseData{result.ResponseCode, result.ErrorString, result.Cart, result.Cut}) +} + +func watchDirParseRequest(conf *rhimport.Config, rddb *rhimport.RdDbChan, req *os.File) (ctx *rhimport.Context, err error) { + + decoder := json.NewDecoder(req) + reqdata := newWatchDirRequestData(conf) + if jsonerr := decoder.Decode(reqdata); jsonerr != nil { + err = fmt.Errorf("Error parsing JSON response: %s", jsonerr) + return + } + + ctx = rhimport.NewContext(conf, rddb) + ctx.UserName = reqdata.UserName + ctx.Trusted = true + ctx.ShowId = reqdata.ShowId + ctx.ClearShowCarts = reqdata.ClearShowCarts + ctx.GroupName = reqdata.MusicPoolGroup + ctx.Cart = reqdata.Cart + ctx.ClearCart = reqdata.ClearCart + ctx.Cut = reqdata.Cut + ctx.Channels = reqdata.Channels + ctx.NormalizationLevel = reqdata.NormalizationLevel + ctx.AutotrimLevel = reqdata.AutotrimLevel + ctx.UseMetaData = reqdata.UseMetaData + ctx.SourceUri = reqdata.SourceUri + return +} + +func watchDirHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, ctx *rhimport.Context, filename string) { + rhdl.Printf("WatchDirHandler: request for '%s'", filename) + + var err error + if err = ctx.SanityCheck(); err != nil { + watchDirErrorResponse(filename, http.StatusBadRequest, err.Error()) + return + } + + var res *rhimport.Result + if res, err = rhimport.FetchFile(ctx); err != nil { + watchDirErrorResponse(filename, http.StatusInternalServerError, err.Error()) + return + } + if res.ResponseCode != http.StatusOK { + watchDirErrorResponse(filename, res.ResponseCode, res.ErrorString) + return + } + + if res, err = rhimport.ImportFile(ctx); err != nil { + watchDirErrorResponse(filename, http.StatusInternalServerError, err.Error()) + return + } + if res.ResponseCode == http.StatusOK { + rhl.Println("ImportFile succesfully imported", ctx.SourceFile) + } else { + rhl.Println("ImportFile import of", ctx.SourceFile, "was unsuccesful") + } + + watchDirResponse(filename, res) + return +} + +func watchDirRun(dir *os.File, conf *rhimport.Config, rddb *rhimport.RdDbChan) { + rhl.Printf("watch-dir-ctrl: watching for files in %s", dir.Name()) + for { + var err error + if _, err = dir.Seek(0, 0); err != nil { + rhl.Printf("watch-dir-ctrl: reading directory contents failed: %s", err) + return + } + + var names []string + if names, err = dir.Readdirnames(0); err != nil { + rhl.Printf("watch-dir-ctrl: reading directory contents failed: %s", err) + return + } + + for _, name := range names { + if strings.HasSuffix(name, ".new") { + srcname := filepath.Join(dir.Name(), name) + + rhdl.Printf("watch-dir-ctrl: found new file %s", srcname) + var file *os.File + if file, err = os.Open(srcname); err != nil { + rhl.Printf("watch-dir-ctrl: error reading new file: %s", err) + continue + } + if ctx, err := watchDirParseRequest(conf, rddb, file); err == nil { + file.Close() + dstname := strings.TrimSuffix(srcname, ".new") + ".running" + os.Rename(srcname, dstname) + go watchDirHandler(conf, rddb, ctx, dstname) + } else { // ignoring files with json errors -> maybe the file has not been written completely + file.Close() + rhdl.Printf("watch-dir-ctrl: new file %s parser error: %s, ignoring for now", srcname, err) + } + } + } + + time.Sleep(1 * time.Second) + } +} + +func StartWatchDir(dirname string, conf *rhimport.Config, rddb *rhimport.RdDbChan) { + for { + time.Sleep(5 * time.Second) + dir, err := os.Open(dirname) + if err != nil { + rhl.Printf("watch-dir-ctrl: %s", err) + continue + } + if i, err := dir.Stat(); err != nil { + rhl.Printf("watch-dir-ctrl: %s", err) + continue + } else { + if !i.IsDir() { + rhl.Printf("watch-dir-ctrl: %s is not a directory", dirname) + continue + } + } + watchDirRun(dir, conf, rddb) + } +} diff --git a/src/rhimportd/ctrlWeb.go b/src/rhimportd/ctrlWeb.go new file mode 100644 index 0000000..c1cd4b5 --- /dev/null +++ b/src/rhimportd/ctrlWeb.go @@ -0,0 +1,55 @@ +// +// rhimportd +// +// The Radio Helsinki Rivendell Import Daemon +// +// +// Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at> +// +// This file is part of rhimportd. +// +// rhimportd is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// rhimportd is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with rhimportd. If not, see <http://www.gnu.org/licenses/>. +// + +package main + +import ( + "helsinki.at/rhimport" + "net/http" + _ "net/http/pprof" + "time" +) + +type webHandler struct { + *rhimport.Config + *rhimport.RdDbChan + *rhimport.SessionStoreChan + trusted bool + H func(*rhimport.Config, *rhimport.RdDbChan, *rhimport.SessionStoreChan, bool, http.ResponseWriter, *http.Request) +} + +func (self webHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + self.H(self.Config, self.RdDbChan, self.SessionStoreChan, self.trusted, w, r) +} + +func StartControlWeb(addr string, conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan) { + http.Handle("/public/simple", webHandler{conf, rddb, sessions, false, webSimpleHandler}) + // http.Handle("/trusted/simple", webHandler{conf, rddb, sessions, true, webSimpleHandler}) + + http.Handle("/public/socket", webHandler{conf, rddb, sessions, false, webSocketHandler}) + + rhl.Println("web-ctrl: listening on", addr) + server := &http.Server{Addr: addr, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second} + server.ListenAndServe() +} diff --git a/src/rhimportd/ctrlWebSimple.go b/src/rhimportd/ctrlWebSimple.go new file mode 100644 index 0000000..cd2c556 --- /dev/null +++ b/src/rhimportd/ctrlWebSimple.go @@ -0,0 +1,161 @@ +// +// rhimportd +// +// The Radio Helsinki Rivendell Import Daemon +// +// +// Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at> +// +// This file is part of rhimportd. +// +// rhimportd is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// rhimportd is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with rhimportd. If not, see <http://www.gnu.org/licenses/>. +// + +package main + +import ( + "encoding/json" + "fmt" + "helsinki.at/rhimport" + "html" + "net/http" +) + +type webSimpleRequestData struct { + UserName string `json:"LOGIN_NAME"` + Password string `json:"PASSWORD"` + ShowId uint `json:"SHOW_ID"` + ClearShowCarts bool `json:"CLEAR_SHOW_CARTS"` + MusicPoolGroup string `json:"MUSIC_POOL_GROUP"` + Cart uint `json:"CART_NUMBER"` + ClearCart bool `json:"CLEAR_CART"` + Cut uint `json:"CUT_NUMBER"` + Channels uint `json:"CHANNELS"` + NormalizationLevel int `json:"NORMALIZATION_LEVEL"` + AutotrimLevel int `json:"AUTOTRIM_LEVEL"` + UseMetaData bool `json:"USE_METADATA"` + SourceUri string `json:"SOURCE_URI"` +} + +func newWebSimpleRequestData(conf *rhimport.Config) *webSimpleRequestData { + rd := new(webSimpleRequestData) + rd.UserName = "" + rd.Password = "" + rd.ShowId = 0 + rd.ClearShowCarts = false + rd.MusicPoolGroup = "" + rd.Cart = 0 + rd.ClearCart = false + rd.Cut = 0 + rd.Channels = conf.ImportParamDefaults.Channels + rd.NormalizationLevel = conf.ImportParamDefaults.NormalizationLevel + rd.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel + rd.UseMetaData = conf.ImportParamDefaults.UseMetaData + rd.SourceUri = "" + + return rd +} + +type webSimpleResponseData struct { + ResponseCode int `json:"REPONSE_CODE"` + ErrorString string `json:"ERROR_STRING"` + Cart uint `json:"CART_NUMBER"` + Cut uint `json:"CUT_NUMBER"` +} + +func webSimpleErrorResponse(w http.ResponseWriter, code int, errStr string) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + encoder := json.NewEncoder(w) + respdata := webSimpleResponseData{code, errStr, 0, 0} + encoder.Encode(respdata) +} + +func webSimpleResponse(w http.ResponseWriter, result *rhimport.Result) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + encoder := json.NewEncoder(w) + respdata := webSimpleResponseData{result.ResponseCode, result.ErrorString, result.Cart, result.Cut} + encoder.Encode(respdata) +} + +func webSimpleParseRequest(conf *rhimport.Config, rddb *rhimport.RdDbChan, trusted bool, r *http.Request) (ctx *rhimport.Context, err error) { + + decoder := json.NewDecoder(r.Body) + reqdata := newWebSimpleRequestData(conf) + if jsonerr := decoder.Decode(reqdata); jsonerr != nil { + err = fmt.Errorf("Error parsing JSON response: %s", jsonerr) + return + } + + ctx = rhimport.NewContext(conf, rddb) + if trusted { + ctx.UserName = r.Header.Get("X-Forwarded-User") + } else { + ctx.UserName = reqdata.UserName + ctx.Password = reqdata.Password + } + ctx.Trusted = trusted + ctx.ShowId = reqdata.ShowId + ctx.ClearShowCarts = reqdata.ClearShowCarts + ctx.GroupName = reqdata.MusicPoolGroup + ctx.Cart = reqdata.Cart + ctx.ClearCart = reqdata.ClearCart + ctx.Cut = reqdata.Cut + ctx.Channels = reqdata.Channels + ctx.NormalizationLevel = reqdata.NormalizationLevel + ctx.AutotrimLevel = reqdata.AutotrimLevel + ctx.UseMetaData = reqdata.UseMetaData + ctx.SourceUri = reqdata.SourceUri + return +} + +func webSimpleHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan, trusted bool, w http.ResponseWriter, r *http.Request) { + rhdl.Printf("WebSimpleHandler: request for '%s'", html.EscapeString(r.URL.Path)) + + var ctx *rhimport.Context + var err error + if ctx, err = webSimpleParseRequest(conf, rddb, trusted, r); err != nil { + webSimpleErrorResponse(w, http.StatusBadRequest, err.Error()) + return + } + + if err = ctx.SanityCheck(); err != nil { + webSimpleErrorResponse(w, http.StatusBadRequest, err.Error()) + return + } + + var res *rhimport.Result + if res, err = rhimport.FetchFile(ctx); err != nil { + webSimpleErrorResponse(w, http.StatusInternalServerError, err.Error()) + return + } + if res.ResponseCode != http.StatusOK { + webSimpleErrorResponse(w, res.ResponseCode, res.ErrorString) + return + } + + if res, err = rhimport.ImportFile(ctx); err != nil { + webSimpleErrorResponse(w, http.StatusInternalServerError, err.Error()) + return + } + if res.ResponseCode == http.StatusOK { + rhl.Println("ImportFile succesfully imported", ctx.SourceFile) + } else { + rhl.Println("ImportFile import of", ctx.SourceFile, "was unsuccesful") + } + + webSimpleResponse(w, res) + return +} diff --git a/src/rhimportd/ctrlWebSocket.go b/src/rhimportd/ctrlWebSocket.go new file mode 100644 index 0000000..a0386f1 --- /dev/null +++ b/src/rhimportd/ctrlWebSocket.go @@ -0,0 +1,331 @@ +// +// rhimportd +// +// The Radio Helsinki Rivendell Import Daemon +// +// +// Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at> +// +// This file is part of rhimportd. +// +// rhimportd is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// rhimportd is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with rhimportd. If not, see <http://www.gnu.org/licenses/>. +// + +package main + +import ( + "fmt" + "github.com/gorilla/websocket" + "helsinki.at/rhimport" + "html" + "math" + "net/http" + "time" +) + +type webSocketRequestData struct { + Command string `json:"COMMAND"` + Id string `json:"ID"` + RefId string `json:"REFERENCE_ID"` + UserName string `json:"LOGIN_NAME"` + Password string `json:"PASSWORD"` + ShowId uint `json:"SHOW_ID"` + ClearShowCarts bool `json:"CLEAR_SHOW_CARTS"` + MusicPoolGroup string `json:"MUSIC_POOL_GROUP"` + Cart uint `json:"CART_NUMBER"` + ClearCart bool `json:"CLEAR_CART"` + Cut uint `json:"CUT_NUMBER"` + Channels uint `json:"CHANNELS"` + NormalizationLevel int `json:"NORMALIZATION_LEVEL"` + AutotrimLevel int `json:"AUTOTRIM_LEVEL"` + UseMetaData bool `json:"USE_METADATA"` + SourceUri string `json:"SOURCE_URI"` + Timeout uint `json:"TIMEOUT"` +} + +func newWebSocketRequestData(conf *rhimport.Config) *webSocketRequestData { + rd := new(webSocketRequestData) + rd.Command = "" + rd.Id = "" + rd.UserName = "" + rd.Password = "" + rd.ShowId = 0 + rd.ClearShowCarts = false + rd.MusicPoolGroup = "" + rd.Cart = 0 + rd.ClearCart = false + rd.Cut = 0 + rd.Channels = conf.ImportParamDefaults.Channels + rd.NormalizationLevel = conf.ImportParamDefaults.NormalizationLevel + rd.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel + rd.UseMetaData = conf.ImportParamDefaults.UseMetaData + rd.SourceUri = "" + rd.Timeout = 0 + return rd +} + +type webSocketResponseBaseData struct { + ResponseCode int `json:"RESPONSE_CODE"` + Type string `json:"TYPE"` + ErrorString string `json:"ERROR_STRING"` + Id string `json:"ID"` + RefId string `json:"REFERENCE_ID"` +} + +type webSocketResponseListData struct { + webSocketResponseBaseData + Sessions map[string]string `json:"SESSIONS"` +} + +type webSocketResponseProgressData struct { + webSocketResponseBaseData + Step int `json:"PROGRESS_STEP"` + StepName string `json:"PROGRESS_STEP_NAME"` + Progress float64 `json:"PROGRESS"` +} + +type webSocketResponseDoneData struct { + webSocketResponseBaseData + Cart uint `json:"CART_NUMBER"` + Cut uint `json:"CUT_NUMBER"` +} + +func sendWebSocketResponse(ws *websocket.Conn, rd interface{}) { + if err := ws.WriteJSON(rd); err != nil { + rhdl.Println("WebScoket Client", ws.RemoteAddr(), "write error:", err) + } +} + +func sendWebSocketErrorResponse(ws *websocket.Conn, code int, errStr string) { + rd := &webSocketResponseBaseData{} + rd.ResponseCode = code + rd.Type = "error" + rd.ErrorString = errStr + sendWebSocketResponse(ws, rd) +} + +func sendWebSocketAckResponse(ws *websocket.Conn, code int, id, refid string) { + rd := &webSocketResponseBaseData{} + rd.ResponseCode = code + rd.Type = "ack" + rd.ErrorString = "OK" + rd.Id = id + rd.RefId = refid + sendWebSocketResponse(ws, rd) +} + +func sendWebSocketListResponse(ws *websocket.Conn, sessions map[string]string) { + rd := &webSocketResponseListData{} + rd.ResponseCode = http.StatusOK + rd.Type = "list" + rd.ErrorString = "OK" + rd.Sessions = sessions + sendWebSocketResponse(ws, rd) +} + +func sendWebSocketProgressResponse(ws *websocket.Conn, id, refid string, step int, stepName string, progress float64) { + rd := &webSocketResponseProgressData{} + rd.ResponseCode = http.StatusOK + rd.Type = "progress" + rd.ErrorString = "OK" + rd.Id = id + rd.RefId = refid + rd.Step = step + rd.StepName = stepName + rd.Progress = progress + sendWebSocketResponse(ws, rd) +} + +func sendWebSocketDoneResponse(ws *websocket.Conn, code int, errStr, id, refid string, cart, cut uint) { + rd := &webSocketResponseDoneData{} + rd.ResponseCode = code + rd.Type = "done" + rd.ErrorString = errStr + rd.Id = id + rd.RefId = refid + rd.Cart = cart + rd.Cut = cut + sendWebSocketResponse(ws, rd) +} + +type webSocketSession struct { + id string + refId string + session *rhimport.SessionChan + progresschan chan rhimport.ProgressData + donechan chan rhimport.Result +} + +func newWebSocketSession() *webSocketSession { + session := &webSocketSession{} + session.progresschan = make(chan rhimport.ProgressData, 10) + session.donechan = make(chan rhimport.Result, 1) + return session +} + +func webSocketProgress(step int, stepName string, progress float64, userdata interface{}) bool { + if math.IsNaN(progress) { + progress = 0.0 + } + c := userdata.(chan<- rhimport.ProgressData) + select { + case c <- rhimport.ProgressData{Step: step, StepName: stepName, Progress: progress}: + default: + } + return true +} + +func webSocketDone(res rhimport.Result, userdata interface{}) bool { + c := userdata.(chan<- rhimport.Result) + c <- res + return true +} + +func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) (int, string) { + ctx := rhimport.NewContext(conf, nil) + ctx.UserName = reqdata.UserName + ctx.Password = reqdata.Password + ctx.Trusted = false + ctx.ShowId = reqdata.ShowId + ctx.ClearShowCarts = reqdata.ClearShowCarts + ctx.GroupName = reqdata.MusicPoolGroup + ctx.Cart = reqdata.Cart + ctx.ClearCart = reqdata.ClearCart + ctx.Cut = reqdata.Cut + ctx.Channels = reqdata.Channels + ctx.NormalizationLevel = reqdata.NormalizationLevel + ctx.AutotrimLevel = reqdata.AutotrimLevel + ctx.UseMetaData = reqdata.UseMetaData + ctx.SourceUri = reqdata.SourceUri + + id, s, code, errstring := sessions.New(ctx, reqdata.RefId) + if code != http.StatusOK { + return code, errstring + } + self.id = id + self.refId = reqdata.RefId + self.session = s + if err := s.AddDoneHandler((chan<- rhimport.Result)(self.donechan), webSocketDone); err != nil { + return http.StatusInternalServerError, err.Error() + } + if err := s.AddProgressHandler((chan<- rhimport.ProgressData)(self.progresschan), webSocketProgress); err != nil { + return http.StatusInternalServerError, err.Error() + } + s.Run(time.Duration(reqdata.Timeout) * time.Second) + return http.StatusOK, "SUCCESS" +} + +func (self *webSocketSession) reconnectSession(reqdata *webSocketRequestData, sessions *rhimport.SessionStoreChan) (int, string) { + s, refId, code, errstring := sessions.Get(reqdata.UserName, reqdata.Id) + if code != http.StatusOK { + return code, errstring + } + self.id = reqdata.Id + self.refId = refId + self.session = s + if err := s.AddDoneHandler((chan<- rhimport.Result)(self.donechan), webSocketDone); err != nil { + return http.StatusInternalServerError, err.Error() + } + if err := s.AddProgressHandler((chan<- rhimport.ProgressData)(self.progresschan), webSocketProgress); err != nil { + return http.StatusInternalServerError, err.Error() + } + s.Run(time.Duration(reqdata.Timeout) * time.Second) + return http.StatusOK, "SUCCESS" +} + +func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) { + defer ws.Close() + + session := newWebSocketSession() + for { + select { + case reqdata, ok := <-reqchan: + if !ok { + return + } + switch reqdata.Command { + case "new": + if session.id != "" { + sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session") + } else { + code, errstring := session.startNewSession(&reqdata, conf, sessions) + if code != http.StatusOK { + sendWebSocketErrorResponse(ws, code, errstring) + } else { + sendWebSocketAckResponse(ws, code, session.id, session.refId) + } + } + case "cancel": + if session.id == "" { + sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection doesn't handle any session") + } else { + session.session.Cancel() + } + case "reconnect": + if session.id != "" { + sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session") + } else { + code, errstring := session.reconnectSession(&reqdata, sessions) + if code != http.StatusOK { + sendWebSocketErrorResponse(ws, code, errstring) + } else { + sendWebSocketAckResponse(ws, code, session.id, session.refId) + } + } + case "list": + list, code, errstring := sessions.List(reqdata.UserName, reqdata.Password, false) + if code != http.StatusOK { + sendWebSocketErrorResponse(ws, code, errstring) + } else { + sendWebSocketListResponse(ws, list) + } + default: + sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command)) + } + case p := <-session.progresschan: + sendWebSocketProgressResponse(ws, session.id, session.refId, p.Step, p.StepName, p.Progress*100) + case d := <-session.donechan: + sendWebSocketDoneResponse(ws, d.ResponseCode, d.ErrorString, session.id, session.refId, d.Cart, d.Cut) + // TODO: send close message at this point? + } + } +} + +func webSocketHandler(conf *rhimport.Config, rddb *rhimport.RdDbChan, sessions *rhimport.SessionStoreChan, trusted bool, w http.ResponseWriter, r *http.Request) { + rhdl.Printf("WebSocketHandler: request for '%s'", html.EscapeString(r.URL.Path)) + + ws, err := websocket.Upgrade(w, r, nil, 1024, 1024) + if _, ok := err.(websocket.HandshakeError); ok { + http.Error(w, "Not a websocket handshake", 400) + return + } else if err != nil { + rhdl.Println("WebSocket Client", ws.RemoteAddr(), "error:", err) + return + } + rhdl.Println("WebSocket Client", ws.RemoteAddr(), "connected") + reqchan := make(chan webSocketRequestData) + go webSocketSessionHandler(reqchan, ws, conf, sessions) + defer close(reqchan) + + for { + reqdata := newWebSocketRequestData(conf) + if err := ws.ReadJSON(&reqdata); err != nil { + rhdl.Println("WebSocket Client", ws.RemoteAddr(), "disconnected:", err) + return + } else { + // rhdl.Printf("Websocket Client %s got: %+v", ws.RemoteAddr(), reqdata) + reqchan <- *reqdata + } + } +} diff --git a/src/rhimportd/main.go b/src/rhimportd/main.go new file mode 100644 index 0000000..d941b05 --- /dev/null +++ b/src/rhimportd/main.go @@ -0,0 +1,159 @@ +// +// rhimportd +// +// The Radio Helsinki Rivendell Import Daemon +// +// +// Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at> +// +// This file is part of rhimportd. +// +// rhimportd is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// rhimportd is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with rhimportd. If not, see <http://www.gnu.org/licenses/>. +// + +package main + +import ( + "flag" + "fmt" + "helsinki.at/rhimport" + "io/ioutil" + "log" + "os" + "os/signal" + "sync" +) + +var ( + rhl = log.New(os.Stderr, "[rhimportd]\t", log.LstdFlags) + rhdl = log.New(ioutil.Discard, "[rhimportd-dbg]\t", log.LstdFlags) +) + +func init() { + if _, exists := os.LookupEnv("RHIMPORTD_DEBUG"); exists { + rhdl.SetOutput(os.Stderr) + } +} + +type envStringValue string + +func newEnvStringValue(key, dflt string) *envStringValue { + if envval, exists := os.LookupEnv(key); exists { + return (*envStringValue)(&envval) + } else { + return (*envStringValue)(&dflt) + } +} + +func (s *envStringValue) Set(val string) error { + *s = envStringValue(val) + return nil +} + +func (s *envStringValue) Get() interface{} { return string(*s) } + +func (s *envStringValue) String() string { return fmt.Sprintf("%s", *s) } + +func main() { + webAddr := newEnvStringValue("RHIMPORTD_WEB_ADDR", "localhost:4080") + flag.Var(webAddr, "web-addr", "addr:port to listen on (environment: RHIMPORTD_WEB_ADDR)") + telnetAddr := newEnvStringValue("RHIMPORTD_TELNET_ADDR", "localhost:4023") + flag.Var(telnetAddr, "telnet-addr", "addr:port to listen on (environment: RHIMPORTD_TELNET_ADDR)") + watchDir := newEnvStringValue("RHIMPORTD_WATCH_DIR", "") + flag.Var(watchDir, "watch-dir", "directory to look for file based requests (environment: RHIMPORTD_WATCH_DIR)") + rdconf := newEnvStringValue("RHIMPORTD_RD_CONF", "/etc/rd.conf") + flag.Var(rdconf, "rdconf", "path to the Rivendell config file (environment: RHIMPORTD_RD_CONF)") + rdxportUrl := newEnvStringValue("RHIMPORTD_RDXPORT_URL", "http://localhost/rd-bin/rdxport.cgi") + flag.Var(rdxportUrl, "rdxport-url", "the url to the Rivendell web-api (environment: RHIMPORTD_RDXPORT_URL)") + tempDir := newEnvStringValue("RHIMPORTD_TEMP_DIR", os.TempDir()) + flag.Var(tempDir, "tmp-dir", "path to temporary files (environment: RHIMPORTD_TEMP_DIR)") + localFetchDir := newEnvStringValue("RHIMPORTD_LOCAL_FETCH_DIR", os.TempDir()) + flag.Var(localFetchDir, "local-fetch-dir", "base path for local:// urls (environment: RHIMPORTD_LOCAL_FETCH_DIR)") + help := flag.Bool("help", false, "show usage") + + flag.Parse() + if *help { + flag.Usage() + return + } + + conf, err := rhimport.NewConfig(rdconf.Get().(string), rdxportUrl.Get().(string), tempDir.Get().(string), localFetchDir.Get().(string)) + if err != nil { + rhl.Println("Error reading configuration:", err) + return + } + + rddb, err := rhimport.NewRdDb(conf) + if err != nil { + rhl.Println("Error initializing Rivdenll DB:", err) + return + } + defer rddb.Cleanup() + + sessions, err := rhimport.NewSessionStore(conf, rddb.GetInterface()) + if err != nil { + rhl.Println("Error initializing Session Store:", err) + return + } + defer sessions.Cleanup() + + var wg sync.WaitGroup + + if webAddr.Get().(string) != "" { + wg.Add(1) + go func() { + defer wg.Done() + rhl.Println("starting web-ctrl") + StartControlWeb(webAddr.Get().(string), conf, rddb.GetInterface(), sessions.GetInterface()) + rhl.Println("web-ctrl finished") + }() + } + + if telnetAddr.Get().(string) != "" { + wg.Add(1) + go func() { + defer wg.Done() + rhl.Println("starting telnet-ctrl") + StartControlTelnet(telnetAddr.Get().(string), conf, rddb.GetInterface(), sessions.GetInterface()) + rhl.Println("telnet-ctrl finished") + }() + } + + if watchDir.Get().(string) != "" { + wg.Add(1) + go func() { + defer wg.Done() + rhl.Println("starting watch-dir-ctrl") + StartWatchDir(watchDir.Get().(string), conf, rddb.GetInterface()) + rhl.Println("watch-dir-ctrl finished") + }() + } + + alldone := make(chan bool) + go func() { + defer func() { alldone <- true }() + wg.Wait() + }() + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + + select { + case <-c: + rhl.Println("received interrupt, shutdown") + return + case <-alldone: + return + } +} |