// // rhimportd // // The Radio Helsinki Rivendell Import Daemon // // // Copyright (C) 2015-2016 Christian Pointner // // This file is part of rhimportd. // // rhimportd is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // rhimportd is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with rhimportd. If not, see . // package rhimport import ( "bytes" "fmt" "io" "log" "mime" "net/http" "net/url" "os" "os/exec" "os/user" "path" "path/filepath" "strconv" "strings" "time" "github.com/andelf/go-curl" ) type fetcherCurlCBData struct { ctx *Context res *Result filename string remotename string conv fetchConverter totalSize float64 written uint64 writeError error } func curlHeaderCallback(ptr []byte, userdata interface{}) bool { hdr := fmt.Sprintf("%s", ptr) data := userdata.(*fetcherCurlCBData) if strings.HasPrefix(hdr, "Content-Disposition:") { if mediatype, params, err := mime.ParseMediaType(strings.TrimPrefix(hdr, "Content-Disposition:")); err == nil { if mediatype == "attachment" { data.filename = filepath.Join(data.ctx.WorkDir, path.Clean("/"+params["filename"])) } } } return true } func curlWriteCallback(ptr []byte, userdata interface{}) bool { data := userdata.(*fetcherCurlCBData) if data.conv == nil { if data.filename == "" { name := path.Clean("/" + data.remotename) if name == "/" { data.ctx.dbglog.Printf("remotename('%s') is invalid, replacing it with 'unnamed'", data.remotename) name = "unnamed" } data.filename = filepath.Join(data.ctx.WorkDir, name) } data.ctx.OrigFilename = data.filename conv, newFilename, err := newFetchConverter(data.ctx, data.filename) if err != nil { data.ctx.stdlog.Printf("Unable to create converter for file %s: %s", data.filename, err) data.writeError = err return false } data.filename = newFilename data.conv = conv } w, err := data.conv.Write(ptr) if err != nil { data.ctx.stdlog.Printf("Unable to write to converter(%s): %s", data.filename, err) data.writeError = err return false } data.written += uint64(w) if data.written > FILESIZE_MAX { data.writeError = fmt.Errorf("file exceeds maximum file size") return false } data.ctx.reportProgress(1, "downloading", float64(data.written), data.totalSize) return true } func curlProgressCallback(dltotal, dlnow, ultotal, ulnow float64, userdata interface{}) bool { data := userdata.(*fetcherCurlCBData) if data.writeError != nil { return false } if data.ctx.isCanceled() { data.res.ResponseCode = http.StatusNoContent data.res.ErrorString = "canceled" return false } if dltotal > float64(FILESIZE_MAX) { data.res.ResponseCode = http.StatusRequestEntityTooLarge data.res.ErrorString = "file exceeds maximum file size" return false } data.totalSize = dltotal return true } func checkYoutubeDL(ctx *Context, res *Result, uri *url.URL) *youtubeDLInfo { cmd := exec.Command("youtube-dl", "--no-playlist", "-f", "bestaudio/best", "--prefer-free-formats", "-J", ctx.SourceUri) var stderr, stdout bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr if cba_api_key := getCBAApiKey(); cba_api_key != "" { cmd.Env = append(os.Environ(), "CBA_API_KEY="+cba_api_key) } ctx.stdlog.Printf("running youtube-dl for '%s'", ctx.SourceUri) done := make(chan *youtubeDLInfo) go func() { defer func() { done <- nil }() if err := cmd.Run(); err != nil { ctx.dbglog.Printf("youtube-dl: %v, stderr: %s", err, strings.TrimSpace(stderr.String())) return } info, err := newYoutubeDLInfoFromJSON(&stdout) if err != nil { ctx.dbglog.Printf("youtube-dl: %v, stderr: %s", err, strings.TrimSpace(stderr.String())) return } ctx.dbglog.Printf("youtube-dl: extractor: %s -> %s", info.Extractor, info.URL) ctx.SourceUri = info.URL done <- info }() select { case info := <-done: return info case <-ctx.Cancel: cmd.Process.Kill() res.ResponseCode = http.StatusNoContent res.ErrorString = "canceled" return nil } } func fetchFileCurl(ctx *Context, res *Result, uri *url.URL) (err error) { ctx.stdlog.Printf("curl-based fetcher called for '%s'", ctx.SourceUri) info := checkYoutubeDL(ctx, res, uri) if res.ResponseCode == http.StatusNoContent { ctx.stdlog.Printf("download of '%s' got canceled", ctx.SourceUri) return nil } easy := curl.EasyInit() if easy == nil { err = fmt.Errorf("Error initializing libcurl") return } defer easy.Cleanup() easy.Setopt(curl.OPT_FOLLOWLOCATION, true) easy.Setopt(curl.OPT_URL, ctx.SourceUri) easy.Setopt(curl.OPT_USERAGENT, "Radio Helsinki Import") if info != nil && (info.Protocol == "http" || info.Protocol == "https") { if len(info.HTTPHeaders) > 0 { var h []string for key, value := range info.HTTPHeaders { h = append(h, key+": "+value) } easy.Setopt(curl.OPT_HTTPHEADER, h) ctx.dbglog.Printf("added HTTP header: %q", h) } } cbdata := &fetcherCurlCBData{ctx: ctx, res: res, remotename: path.Base(uri.Path)} if info != nil { if info.Title == "" { cbdata.remotename = info.ID + "." + info.Ext } else { cbdata.remotename = info.Title + "." + info.Ext } cbdata.remotename = strings.NewReplacer("/", "_").Replace(cbdata.remotename) switch strings.ToLower(info.Extractor) { case "generic": case "dropbox": default: ctx.Title = info.ExtractorKey + ": " + info.Title if info.Title == "" { ctx.Title += info.ID } ctx.ExtraMetaData["TITLE"] = ctx.Title } } easy.Setopt(curl.OPT_HEADERFUNCTION, curlHeaderCallback) easy.Setopt(curl.OPT_HEADERDATA, cbdata) easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback) easy.Setopt(curl.OPT_WRITEDATA, cbdata) easy.Setopt(curl.OPT_NOPROGRESS, false) easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback) easy.Setopt(curl.OPT_PROGRESSDATA, cbdata) err = easy.Perform() statusCode := 0 if curlResponseCode, err := easy.Getinfo(curl.INFO_RESPONSE_CODE); err == nil { if code, ok := curlResponseCode.(int); ok { statusCode = code } } var convOut string var convErr error if cbdata.conv != nil { cbdata.conv.Close() ctx.dbglog.Printf("waiting for converter to finish...") convOut, convErr = cbdata.conv.GetResult(ctx, res) } if err != nil || cbdata.writeError != nil || convErr != nil { if res.ResponseCode == http.StatusNoContent { ctx.stdlog.Printf("download of '%s' got canceled", ctx.SourceUri) return nil } if statusCode > 0 && statusCode != http.StatusOK { res.ResponseCode = statusCode res.ErrorString = fmt.Sprintf("non-OK response code (%d) while fetching the resource", statusCode) return nil } if cbdata.writeError != nil { err = cbdata.writeError } if convErr != nil { err = fmt.Errorf("converter error: %v; converter output: %s", convErr, convOut) ctx.stdlog.Printf("%v", err) } err = fmt.Errorf("curl-fetcher('%s'): %s", ctx.SourceUri, err) ctx.stdlog.Println(err) return } ctx.dbglog.Printf("converter: loudness correction = %.2f dB", ctx.LoudnessCorr) ctx.SourceFile = cbdata.filename return } var months = [...]string{ "Jänner", "Februar", "März", "April", "Mai", "Juni", "Juli", "August", "September", "Oktober", "November", "Dezember", } var weekdays = [...]string{ "Sonntag", "Montag", "Dienstag", "Mittwoch", "Donnerstag", "Freitag", "Samstag", } func generateArchivFilePath(uri *url.URL) (file, path string, t time.Time, err error) { if t, err = time.Parse("2006/01/02/15/04", fmt.Sprintf("%s%s", uri.Host, uri.Path)); err != nil { return } file = t.Format("2006-01-02-1504") + ".ogg" path = fmt.Sprintf("%s/%04d/%02d-%s/%02d-%s", ARCHIV_BASE_PATH, t.Year(), t.Month(), months[t.Month()-1], t.Day(), weekdays[t.Weekday()]) return } func fetchFileArchiv(ctx *Context, res *Result, uri *url.URL) (err error) { ctx.dbglog.Printf("archiv fetcher called for '%s'", ctx.SourceUri) var srcfile, srcpath string var start time.Time if srcfile, srcpath, start, err = generateArchivFilePath(uri); err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("date/time is invalid: %s", err) return nil } end := start.Add(time.Hour) easy := curl.EasyInit() if easy == nil { err = fmt.Errorf("Error initializing libcurl") return } defer easy.Cleanup() scpuri := fmt.Sprintf("sftp://%s%s/%s", ARCHIV_HOST, srcpath, srcfile) easy.Setopt(curl.OPT_URL, scpuri) easy.Setopt(curl.OPT_USERNAME, ARCHIV_USER) u, _ := user.Current() easy.Setopt(curl.OPT_SSH_PUBLIC_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa.pub", u.HomeDir)) easy.Setopt(curl.OPT_SSH_PRIVATE_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa", u.HomeDir)) cbdata := &fetcherCurlCBData{ctx: ctx, res: res} cbdata.filename = filepath.Join(ctx.WorkDir, srcfile) easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback) easy.Setopt(curl.OPT_WRITEDATA, cbdata) easy.Setopt(curl.OPT_NOPROGRESS, false) easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback) easy.Setopt(curl.OPT_PROGRESSDATA, cbdata) easy.Setopt(curl.OPT_BUFFERSIZE, 1024*1024) ctx.Title = fmt.Sprintf("Archiv vom %s - %s Uhr", start.Format("2.1.2006 15:04"), end.Format("15:04")) ctx.ExtraMetaData["TITLE"] = ctx.Title ctx.ExtraMetaData["ALBUM"] = "Radio Helsinki Archiv" ctx.ExtraMetaData["ORGANIZATION"] = "Radio Helsinki" ctx.ExtraMetaData["DATE"] = start.Format("2.1.2006") ctx.dbglog.Printf("importing archiv file from %s", scpuri) err = easy.Perform() var convOut string var convErr error if cbdata.conv != nil { cbdata.conv.Close() ctx.dbglog.Printf("waiting for converter to finish...") convOut, convErr = cbdata.conv.GetResult(ctx, res) } if err != nil || cbdata.writeError != nil || convErr != nil { if res.ResponseCode == http.StatusNoContent { ctx.stdlog.Printf("download of '%s' got canceled", ctx.SourceUri) return nil } if cbdata.writeError != nil { err = cbdata.writeError } if convErr != nil { err = fmt.Errorf("converter error: %v; converter output: %s", convErr, convOut) ctx.stdlog.Printf("%v", err) } err = fmt.Errorf("archiv-fetcher('%s'): %s", ctx.SourceUri, err) ctx.stdlog.Println(err) return } ctx.dbglog.Printf("converter: loudness correction = %.2f dB", ctx.LoudnessCorr) ctx.SourceFile = cbdata.filename return } func fetchFileTmp(ctx *Context, res *Result, uri *url.URL) (err error) { ctx.stdlog.Printf("tmp fetcher called for '%s'", ctx.SourceUri) ctx.SourceFile = filepath.Join(ctx.conf.TempDir, path.Clean("/"+uri.Path)) size := int64(0) if info, err := os.Stat(ctx.SourceFile); err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("local-file stat(): %s", err) return nil } else { size = info.Size() if info.IsDir() { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("'%s' is a directory", ctx.SourceFile) return nil } } ctx.dbglog.Printf("1: Title = '%s', OrigName = '%s', SourceFile = '%s'", ctx.Title, ctx.OrigFilename, ctx.SourceFile) ctx.reportProgress(1, "fetching", 0.0, float64(size)) oldWorkDir := filepath.Dir(ctx.SourceFile) ctx.dbglog.Printf("switching over to old workdir: %s", oldWorkDir) ctx.SwitchTempWorkDir(oldWorkDir) ctx.reportProgress(1, "fetching", float64(size), float64(size)) ctx.dbglog.Printf("2: Title = '%s', OrigName = '%s', SourceFile = '%s'", ctx.Title, ctx.OrigFilename, ctx.SourceFile) return } func fetchFileConvert(ctx *Context, res *Result, origSrc io.Reader, sizeTotal int64) (err error) { origDir, origFile := path.Split(ctx.SourceFile) ctx.OrigFilename = ctx.SourceFile var conv fetchConverter if conv, ctx.SourceFile, err = newFetchConverter(ctx, filepath.Join(ctx.WorkDir, origFile)); err != nil { ctx.stdlog.Printf("Unable to create converter for file %s: %s", origDir+origFile, err) return } var buffer [1 * 1024 * 1024]byte written := uint64(0) for { var r, w int r, err = origSrc.Read(buffer[:]) if err == io.EOF { err = nil if r <= 0 { break } } if err != nil { ctx.stdlog.Printf("Unable to read from source file %s: %s", origDir+origFile, err) break } w, err = conv.Write(buffer[0:r]) if err != nil { ctx.stdlog.Printf("Unable to write to converter(%s): %s", ctx.SourceFile, err) break } written += uint64(w) ctx.reportProgress(1, "fetching", float64(written), float64(sizeTotal)) if ctx.isCanceled() { res.ResponseCode = http.StatusNoContent res.ErrorString = "canceled" break } } conv.Close() if res.ResponseCode == http.StatusNoContent { ctx.stdlog.Printf("converting of '%s' got canceled", ctx.SourceUri) return nil } ctx.dbglog.Printf("waiting for converter to finish...") convOut, convErr := conv.GetResult(ctx, res) if convErr != nil { if res.ResponseCode == http.StatusNoContent { ctx.stdlog.Printf("converting of '%s' got canceled", ctx.SourceUri) return nil } ctx.stdlog.Printf("converter error: %v; converter output: %s", convErr, convOut) return fmt.Errorf("converter error: %v; converter output: %s", convErr, convOut) } ctx.dbglog.Printf("converter: loudness correction = %.2f dB", ctx.LoudnessCorr) if err != nil { return err } ctx.reportProgress(1, "fetching", float64(sizeTotal), float64(sizeTotal)) return } func fetchFileLocal(ctx *Context, res *Result, uri *url.URL) (err error) { ctx.stdlog.Printf("local fetcher called for '%s'", ctx.SourceUri) ctx.SourceFile = filepath.Join(ctx.conf.LocalFetchDir, path.Clean("/"+uri.Path)) var src *os.File if src, err = os.Open(ctx.SourceFile); err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("local-file open(): %s", err) return nil } defer src.Close() size := int64(0) if info, err := src.Stat(); err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("local-file stat(): %s", err) return nil } else { size = info.Size() if info.IsDir() { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("'%s' is a directory", ctx.SourceFile) return nil } } if size > FILESIZE_MAX { res.ResponseCode = http.StatusRequestEntityTooLarge res.ErrorString = "file exceeds maximum file size" return nil } ctx.reportProgress(1, "fetching", 0.0, float64(size)) return fetchFileConvert(ctx, res, src, size) } func fetchFileSilence(ctx *Context, res *Result, uri *url.URL) error { ctx.dbglog.Printf("Silence fetcher for '%s'", ctx.SourceUri) d, err := strconv.ParseUint(uri.Host, 10, 32) if err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = "invalid duration (must be a positive integer)" return nil } duration := time.Duration(d) * 100 * time.Millisecond ctx.SourceFile = "silence.wav" ctx.Title = fmt.Sprintf("%v of total silence...", duration) ctx.ExtraMetaData["TITLE"] = ctx.Title wav, err := newPCMWavFile(uint32(ctx.conf.SampleRate), 16, uint16(ctx.Channels), duration) if err != nil { return err } size := wav.GetFileSize() wav.generator = NewSilenceGenerator() ctx.reportProgress(1, "fetching", 0.0, float64(size)) return fetchFileConvert(ctx, res, wav, int64(size)) } func writeAttachmentFile(ctx *Context, res *Result, sizeTotal uint64, conv fetchConverter) error { written := uint64(0) for { select { case <-ctx.Cancel: ctx.stdlog.Printf("receiving attachment '%s' got canceled", ctx.SourceFile) res.ResponseCode = http.StatusNoContent res.ErrorString = "canceled" return nil case chunk, ok := <-ctx.AttachmentChan: if !ok { ctx.stdlog.Printf("receiving attachment '%s' failed: channel has been closed prematurely, after %d Bytes", ctx.SourceFile, written) res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("file upload stopped prematurely (after %d Bytes)", written) return nil } if chunk.Error != nil { ctx.stdlog.Printf("receiving attachment '%s' failed: %v", ctx.SourceFile, chunk.Error) res.ResponseCode = http.StatusInternalServerError res.ErrorString = chunk.Error.Error() return nil } left := sizeTotal - written if int(left) < len(chunk.Data) { ctx.stdlog.Printf("attachment fetcher: truncating %d byes of extra data", len(chunk.Data)-int(left)) chunk.Data = chunk.Data[0:left] } w, err := conv.Write(chunk.Data) if err != nil { ctx.stdlog.Printf("Unable to write to converter(%s): %s", ctx.SourceFile, err) return err } written += uint64(w) ctx.reportProgress(1, "receiving", float64(written), float64(sizeTotal)) if uint64(written) >= sizeTotal { return nil } } } } func fetchFileAttachment(ctx *Context, res *Result, uri *url.URL) error { ctx.dbglog.Printf("Attachment fetcher for '%s'", uri.String()) sizeTotal, err := strconv.ParseUint(uri.Host, 10, 64) if err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = "invalid attachment size (must be a positive integer)" return nil } if sizeTotal > FILESIZE_MAX { res.ResponseCode = http.StatusRequestEntityTooLarge res.ErrorString = "file exceeds maximum file size" return nil } ctx.SourceFile = filepath.Join(ctx.WorkDir, path.Clean("/"+uri.Path)) var conv fetchConverter ctx.OrigFilename = ctx.SourceFile if conv, ctx.SourceFile, err = newFetchConverter(ctx, ctx.SourceFile); err != nil { ctx.stdlog.Printf("Unable to create converter for file %s: %s", ctx.OrigFilename, err) return err } ctx.reportProgress(1, "receiving", 0.0, float64(sizeTotal)) err = writeAttachmentFile(ctx, res, sizeTotal, conv) conv.Close() if res.ResponseCode == http.StatusNoContent { ctx.stdlog.Printf("download of '%s' got canceled", ctx.SourceUri) return nil } ctx.dbglog.Printf("waiting for converter to finish...") convOut, convErr := conv.GetResult(ctx, res) if err != nil { return err } if convErr != nil { if res.ResponseCode == http.StatusNoContent { ctx.stdlog.Printf("download of '%s' got canceled", ctx.SourceUri) return nil } ctx.stdlog.Printf("converter error: %v; converter output: %s", convErr, convOut) return fmt.Errorf("converter error: %v; converter output: %s", convErr, convOut) } ctx.dbglog.Printf("converter: loudness correction = %.2f dB", ctx.LoudnessCorr) return nil } type fetchFunc func(*Context, *Result, *url.URL) (err error) var ( fetchers = map[string]fetchFunc{ "local": fetchFileLocal, "tmp": fetchFileTmp, "silence": fetchFileSilence, "attachment": fetchFileAttachment, } curlProtos = map[string]bool{ "http": false, "https": false, "ftp": false, "ftps": false, } ) func fetcherInit(stdlog, dbglog *log.Logger) { archiveEnabled := false info := curl.VersionInfo(curl.VERSION_FIRST) protos := info.Protocols for _, proto := range protos { if proto == "sftp" { dbglog.Printf("curl: * enabling protocol %s", proto) fetchers["archiv"] = fetchFileArchiv archiveEnabled = true } else if _, ok := curlProtos[proto]; ok { dbglog.Printf("curl: * enabling protocol %s", proto) fetchers[proto] = fetchFileCurl curlProtos[proto] = true } else { dbglog.Printf("curl: ignoring protocol %s", proto) } } for proto, enabled := range curlProtos { if !enabled { stdlog.Printf("curl: protocol %s is disabled because the installed library version doesn't support it!", proto) } } if !archiveEnabled { stdlog.Printf("archiv: fetcher is disabled because the installed curl library version doesn't support sFTP!") } } func checkPassword(ctx *Context, res *Result) (err error) { ok := false if ok, err = ctx.CheckPassword(); err != nil { return } if !ok { res.ResponseCode = http.StatusUnauthorized res.ErrorString = "invalid username and/or password" } return } func FetchFile(ctx *Context) (res *Result, err error) { res = &Result{ResponseCode: http.StatusOK} ctx.stdlog.Printf("FetchFile: called for '%s'", ctx.SourceUri) var uri *url.URL if uri, err = url.Parse(ctx.SourceUri); err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("parsing uri: %s", err) return res, nil } if !ctx.Trusted { if err = checkPassword(ctx, res); err != nil || res.ResponseCode != http.StatusOK { return } } if fetcher, ok := fetchers[uri.Scheme]; ok { err = fetcher(ctx, res, uri) } else { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("No fetcher for uri scheme '%s' found.", uri.Scheme) return } return }