// // rhimportd // // The Radio Helsinki Rivendell Import Daemon // // // Copyright (C) 2015-2016 Christian Pointner // // This file is part of rhimportd. // // rhimportd is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // rhimportd is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with rhimportd. If not, see . // package rhimport import ( "bytes" "fmt" "io" "io/ioutil" "mime" "net/http" "net/url" "os" "os/exec" "os/user" "path" "path/filepath" "strconv" "strings" "time" "github.com/andelf/go-curl" ) type FetcherCurlCBData struct { ctx *Context res *Result basepath string filename string remotename string metadata map[string]string conv FetchConverter totalSize float64 written uint64 writeError error } func curlHeaderCallback(ptr []byte, userdata interface{}) bool { hdr := fmt.Sprintf("%s", ptr) data := userdata.(*FetcherCurlCBData) if strings.HasPrefix(hdr, "Content-Disposition:") { if mediatype, params, err := mime.ParseMediaType(strings.TrimPrefix(hdr, "Content-Disposition:")); err == nil { if mediatype == "attachment" { data.filename = filepath.Join(data.basepath, path.Clean("/"+params["filename"])) } } } return true } func curlWriteCallback(ptr []byte, userdata interface{}) bool { data := userdata.(*FetcherCurlCBData) if data.conv == nil { if data.filename == "" { name := path.Clean("/" + data.remotename) if name == "/" { rhdl.Printf("remotename('%s') is invalid, replacing it with 'unnamed'", data.remotename) name = "unnamed" } data.filename = filepath.Join(data.basepath, name) } data.ctx.OrigFilename = data.filename conv, newFilename, err := NewFetchConverter(data.ctx.FetchConverter, data.filename, data.metadata, data.ctx.Channels) if err != nil { rhl.Printf("Unable to create converter for file %s: %s", data.filename, err) data.writeError = err return false } data.filename = newFilename data.conv = conv } w, err := data.conv.Write(ptr) if err != nil { rhl.Printf("Unable to write to converter(%s): %s", data.filename, err) data.writeError = err return false } data.written += uint64(w) if data.written > FILESIZE_MAX { data.writeError = fmt.Errorf("file exceeds maximum file size") return false } if data.ctx.ProgressCallBack != nil { if keep := data.ctx.ProgressCallBack(1, "downloading", float64(data.written), data.totalSize, data.ctx.ProgressCallBackData); !keep { data.ctx.ProgressCallBack = nil } } return true } func curlProgressCallback(dltotal, dlnow, ultotal, ulnow float64, userdata interface{}) bool { data := userdata.(*FetcherCurlCBData) if data.writeError != nil { return false } if data.ctx.Cancel != nil && len(data.ctx.Cancel) > 0 { data.res.ResponseCode = http.StatusNoContent data.res.ErrorString = "canceled" return false } if dltotal > float64(FILESIZE_MAX) { data.res.ResponseCode = http.StatusRequestEntityTooLarge data.res.ErrorString = "file exceeds maximum file size" return false } data.totalSize = dltotal return true } func checkYoutubeDL(ctx *Context, res *Result, uri *url.URL) *YoutubeDLInfo { cmd := exec.Command("youtube-dl", "--no-playlist", "-f", "bestaudio/best", "--prefer-free-formats", "-J", ctx.SourceUri) var stderr, stdout bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr done := make(chan *YoutubeDLInfo) go func() { defer func() { done <- nil }() if err := cmd.Run(); err != nil { rhdl.Printf("youtube-dl: %v, stderr: %s", err, strings.TrimSpace(stderr.String())) return } info, err := NewYoutubeDLInfoFromJSON(&stdout) if err != nil { rhdl.Printf("youtube-dl: %v, stderr: %s", err, strings.TrimSpace(stderr.String())) return } rhl.Printf("youtube-dl: extractor: %s -> %s", info.Extractor, info.URL) ctx.SourceUri = info.URL done <- info }() select { case info := <-done: return info case <-ctx.Cancel: cmd.Process.Kill() res.ResponseCode = http.StatusNoContent res.ErrorString = "canceled" return nil } } func fetchFileCurl(ctx *Context, res *Result, uri *url.URL) (err error) { rhl.Printf("curl-based fetcher called for '%s'", ctx.SourceUri) info := checkYoutubeDL(ctx, res, uri) if res.ResponseCode == http.StatusNoContent { rhl.Printf("download of '%s' got canceled", ctx.SourceUri) return nil } easy := curl.EasyInit() if easy == nil { err = fmt.Errorf("Error initializing libcurl") return } defer easy.Cleanup() easy.Setopt(curl.OPT_FOLLOWLOCATION, true) easy.Setopt(curl.OPT_URL, ctx.SourceUri) easy.Setopt(curl.OPT_USERAGENT, "Radio Helsinki Import") if info != nil && (info.Protocol == "http" || info.Protocol == "https") { if len(info.HTTPHeaders) > 0 { var h []string for key, value := range info.HTTPHeaders { h = append(h, key+": "+value) } easy.Setopt(curl.OPT_HTTPHEADER, h) rhdl.Printf("added HTTP header: %q", h) } } cbdata := &FetcherCurlCBData{ctx: ctx, res: res, remotename: path.Base(uri.Path)} if cbdata.basepath, err = ioutil.TempDir(ctx.conf.TempDir, "rhimportd-"); err != nil { return } if info != nil { if info.Title == "" { cbdata.remotename = info.ID + "." + info.Ext } else { cbdata.remotename = info.Title + "." + info.Ext } } easy.Setopt(curl.OPT_HEADERFUNCTION, curlHeaderCallback) easy.Setopt(curl.OPT_HEADERDATA, cbdata) easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback) easy.Setopt(curl.OPT_WRITEDATA, cbdata) easy.Setopt(curl.OPT_NOPROGRESS, false) easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback) easy.Setopt(curl.OPT_PROGRESSDATA, cbdata) err = easy.Perform() statusCode := 0 if curlResponseCode, err := easy.Getinfo(curl.INFO_RESPONSE_CODE); err == nil { if code, ok := curlResponseCode.(int); ok { statusCode = code } } var convOut string var convErr error if cbdata.conv != nil { cbdata.conv.Close() rhl.Printf("waiting for converter to finish...") convOut, convErr = cbdata.conv.GetResult(ctx, res) } if err != nil || cbdata.writeError != nil || convErr != nil { if cbdata.conv != nil { rhdl.Printf("Removing stale file: %s", cbdata.filename) os.Remove(cbdata.filename) os.Remove(path.Dir(cbdata.filename)) } if res.ResponseCode == http.StatusNoContent { rhl.Printf("download of '%s' got canceled", ctx.SourceUri) return nil } if statusCode > 0 && statusCode != http.StatusOK { res.ResponseCode = statusCode res.ErrorString = fmt.Sprintf("non-OK response code (%d) while fetching the resource", statusCode) return nil } if cbdata.writeError != nil { err = cbdata.writeError } if convErr != nil { err = fmt.Errorf("converter error: %v; converter output: %s", convErr, convOut) rhl.Printf("%v", err) } err = fmt.Errorf("curl-fetcher('%s'): %s", ctx.SourceUri, err) rhl.Println(err) return } rhdl.Printf("converter: loudness correction = %.2f dB", ctx.LoudnessCorr) ctx.SourceFile = cbdata.filename if ctx.SourceFilePolicy == Auto { ctx.DeleteSourceFile = true ctx.DeleteSourceDir = true } return } var months = [...]string{ "Jänner", "Februar", "März", "April", "Mai", "Juni", "Juli", "August", "September", "Oktober", "November", "Dezember", } var weekdays = [...]string{ "Sonntag", "Montag", "Dienstag", "Mittwoch", "Donnerstag", "Freitag", "Samstag", } func generateArchivFilePath(uri *url.URL) (file, path string, t time.Time, err error) { if t, err = time.Parse("2006/01/02/15/04", fmt.Sprintf("%s%s", uri.Host, uri.Path)); err != nil { return } file = t.Format("2006-01-02-1504") + ".ogg" path = fmt.Sprintf("%s/%04d/%02d-%s/%02d-%s", ARCHIV_BASE_PATH, t.Year(), t.Month(), months[t.Month()-1], t.Day(), weekdays[t.Weekday()]) return } func fetchFileArchiv(ctx *Context, res *Result, uri *url.URL) (err error) { rhdl.Printf("archiv fetcher called for '%s'", ctx.SourceUri) var srcfile, srcpath string var start time.Time if srcfile, srcpath, start, err = generateArchivFilePath(uri); err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("date/time is invalid: %s", err) return nil } end := start.Add(time.Hour) easy := curl.EasyInit() if easy == nil { err = fmt.Errorf("Error initializing libcurl") return } defer easy.Cleanup() scpuri := fmt.Sprintf("sftp://%s%s/%s", ARCHIV_HOST, srcpath, srcfile) easy.Setopt(curl.OPT_URL, scpuri) easy.Setopt(curl.OPT_USERNAME, ARCHIV_USER) u, _ := user.Current() easy.Setopt(curl.OPT_SSH_PUBLIC_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa.pub", u.HomeDir)) easy.Setopt(curl.OPT_SSH_PRIVATE_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa", u.HomeDir)) var destpath string if destpath, err = ioutil.TempDir(ctx.conf.TempDir, "rhimportd-"); err != nil { return } cbdata := &FetcherCurlCBData{ctx: ctx, res: res} cbdata.filename = fmt.Sprintf("%s/%s", destpath, srcfile) easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback) easy.Setopt(curl.OPT_WRITEDATA, cbdata) easy.Setopt(curl.OPT_NOPROGRESS, false) easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback) easy.Setopt(curl.OPT_PROGRESSDATA, cbdata) easy.Setopt(curl.OPT_BUFFERSIZE, 1024*1024) cbdata.metadata = make(map[string]string) cbdata.metadata["TITLE"] = fmt.Sprintf("Archiv vom %s - %s Uhr", start.Format("2.1.2006 15:04"), end.Format("15:04")) cbdata.metadata["ALBUM"] = "Radio Helsinki Archiv" cbdata.metadata["ORGANIZATION"] = "Radio Helsinki" cbdata.metadata["DATE"] = start.Format("2.1.2006") rhdl.Printf("importing archiv file from %s", scpuri) err = easy.Perform() var convOut string var convErr error if cbdata.conv != nil { cbdata.conv.Close() rhl.Printf("waiting for converter to finish...") convOut, convErr = cbdata.conv.GetResult(ctx, res) } if err != nil || cbdata.writeError != nil || convErr != nil { if cbdata.conv != nil { rhdl.Printf("Removing stale file: %s", cbdata.filename) os.Remove(cbdata.filename) os.Remove(path.Dir(cbdata.filename)) } if res.ResponseCode == http.StatusNoContent { rhl.Printf("download of '%s' got canceled", ctx.SourceUri) return nil } if cbdata.writeError != nil { err = cbdata.writeError } if convErr != nil { err = fmt.Errorf("converter error: %v; converter output: %s", convErr, convOut) rhl.Printf("%v", err) } err = fmt.Errorf("archiv-fetcher('%s'): %s", ctx.SourceUri, err) rhl.Println(err) return } rhdl.Printf("converter: loudness correction = %.2f dB", ctx.LoudnessCorr) ctx.SourceFile = cbdata.filename if ctx.SourceFilePolicy == Auto { ctx.DeleteSourceFile = true ctx.DeleteSourceDir = true } return } func fetchFileLocal(ctx *Context, res *Result, uri *url.URL) (err error) { return fetchFileDir(ctx, res, uri, ctx.conf.LocalFetchDir, true) } func fetchFileTmp(ctx *Context, res *Result, uri *url.URL) (err error) { return fetchFileDir(ctx, res, uri, ctx.conf.TempDir, false) } func fetchFileDirConvert(ctx *Context, res *Result, origSrc *os.File, sizeTotal int64) (err error) { basepath, err := ioutil.TempDir(ctx.conf.TempDir, "rhimportd-") if err != nil { return err } origDir, origFile := path.Split(ctx.SourceFile) ctx.OrigFilename = ctx.SourceFile var conv FetchConverter if conv, ctx.SourceFile, err = NewFetchConverter(ctx.FetchConverter, filepath.Join(basepath, origFile), nil, ctx.Channels); err != nil { rhl.Printf("Unable to create converter for file %s: %s", origDir+origFile, err) return } var buffer [1 * 1024 * 1024]byte written := uint64(0) for { var r, w int r, err = origSrc.Read(buffer[:]) if err == io.EOF { err = nil break } if err != nil { rhl.Printf("Unable to read from source file %s: %s", origDir+origFile, err) break } w, err = conv.Write(buffer[0:r]) if err != nil { rhl.Printf("Unable to write to converter(%s): %s", ctx.SourceFile, err) break } written += uint64(w) if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(1, "fetching", float64(written), float64(sizeTotal), ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } } conv.Close() rhl.Printf("waiting for converter to finish...") if convOut, convErr := conv.GetResult(ctx, res); convErr != nil { if convOut != "" { rhl.Printf("converter error: %v; converter output: %s", convErr, convOut) } return fmt.Errorf("converter error: %v; converter output: %s", convErr, convOut) } rhdl.Printf("converter: loudness correction = %.2f dB", ctx.LoudnessCorr) if err != nil { return err } switch ctx.SourceFilePolicy { case Auto: ctx.DeleteSourceFile = true ctx.DeleteSourceDir = true case Keep: ctx.DeleteSourceFile = false ctx.DeleteSourceDir = false case Delete: ctx.DeleteSourceDir = true ctx.DeleteSourceFile = true os.Remove(origDir + origFile) case DeleteWithDir: ctx.DeleteSourceDir = true ctx.DeleteSourceFile = true os.Remove(origDir + origFile) os.Remove(origDir) } return } func fetchFileDir(ctx *Context, res *Result, uri *url.URL, dir string, convert bool) (err error) { rhl.Printf("Dir fetcher called for '%s'", ctx.SourceUri) ctx.SourceFile = filepath.Join(dir, path.Clean("/"+uri.Path)) var src *os.File if src, err = os.Open(ctx.SourceFile); err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("local-file open(): %s", err) return nil } defer src.Close() size := int64(0) if info, err := src.Stat(); err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("local-file stat(): %s", err) return nil } else { size = info.Size() if info.IsDir() { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("'%s' is a directory", ctx.SourceFile) return nil } } if size > FILESIZE_MAX { res.ResponseCode = http.StatusRequestEntityTooLarge res.ErrorString = "file exceeds maximum file size" return nil } if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(1, "fetching", 0.0, float64(size), ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } if convert { if err = fetchFileDirConvert(ctx, res, src, size); err != nil { return } } else { if ctx.SourceFilePolicy == Auto { ctx.DeleteSourceFile = false ctx.DeleteSourceDir = false } } if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(1, "fetching", float64(size), float64(size), ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } return } func fetchFileFake(ctx *Context, res *Result, uri *url.URL) error { rhdl.Printf("Fake fetcher for '%s'", ctx.SourceUri) duration, err := strconv.ParseUint(uri.Host, 10, 32) if err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = "invalid duration (must be a positive integer)" return nil } for i := uint(0); i < uint(duration); i++ { if ctx.Cancel != nil && len(ctx.Cancel) > 0 { rhl.Printf("faking got canceled") res.ResponseCode = http.StatusNoContent res.ErrorString = "canceled" return nil } if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(1, "faking", float64(i), float64(duration), ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } time.Sleep(100 * time.Millisecond) } if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(1, "faking", float64(duration), float64(duration), ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } ctx.SourceFile = "/nonexistend/fake.mp3" ctx.OrigFilename = ctx.SourceFile if ctx.SourceFilePolicy == Auto { ctx.DeleteSourceFile = false ctx.DeleteSourceDir = false } return nil } func writeAttachmentFile(ctx *Context, res *Result, sizeTotal uint64, conv FetchConverter) error { cancel := ctx.Cancel if cancel == nil { cancel = make(<-chan bool) } written := uint64(0) for { select { case <-cancel: rhl.Printf("receiving attachment '%s' got canceled", ctx.SourceFile) res.ResponseCode = http.StatusNoContent res.ErrorString = "canceled" return nil case data, ok := <-ctx.AttachmentChan: if !ok { rhl.Printf("receiving attachment '%s' got canceled (channel has been closed)", ctx.SourceFile) res.ResponseCode = http.StatusNoContent res.ErrorString = "canceled" return nil } left := sizeTotal - written if int(left) < len(data) { rhl.Printf("attachment fetcher: truncating %d byes of extra data", len(data)-int(left)) data = data[0:left] } w, err := conv.Write(data) if err != nil { rhl.Printf("Unable to write to converter(%s): %s", ctx.SourceFile, err) return err } written += uint64(w) if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(1, "receiving", float64(written), float64(sizeTotal), ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } if uint64(written) >= sizeTotal { return nil } } } } func fetchFileAttachment(ctx *Context, res *Result, uri *url.URL) error { rhdl.Printf("Attachment fetcher for '%s'", ctx.SourceUri) if ctx.AttachmentChan == nil { return fmt.Errorf("attachement channel is nil") } sizeTotal, err := strconv.ParseUint(uri.Host, 10, 64) if err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = "invalid attachment size (must be a positive integer)" return nil } if sizeTotal > FILESIZE_MAX { res.ResponseCode = http.StatusRequestEntityTooLarge res.ErrorString = "file exceeds maximum file size" return nil } basepath, err := ioutil.TempDir(ctx.conf.TempDir, "rhimportd-") if err != nil { return err } ctx.SourceFile = filepath.Join(basepath, path.Clean("/"+uri.Path)) var conv FetchConverter ctx.OrigFilename = ctx.SourceFile if conv, ctx.SourceFile, err = NewFetchConverter(ctx.FetchConverter, ctx.SourceFile, nil, ctx.Channels); err != nil { rhl.Printf("Unable to create converter for file %s: %s", ctx.OrigFilename, err) return err } if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(1, "receiving", 0.0, float64(sizeTotal), ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } err = writeAttachmentFile(ctx, res, sizeTotal, conv) conv.Close() rhl.Printf("waiting for converter to finish...") convOut, convErr := conv.GetResult(ctx, res) if err != nil { return err } if convErr != nil { if convOut != "" { rhl.Printf("converter error: %v; converter output: %s", convErr, convOut) } return fmt.Errorf("converter error: %v; converter output: %s", convErr, convOut) } rhdl.Printf("converter: loudness correction = %.2f dB", ctx.LoudnessCorr) if ctx.SourceFilePolicy == Auto { ctx.DeleteSourceFile = true ctx.DeleteSourceDir = true } return nil } type FetchFunc func(*Context, *Result, *url.URL) (err error) var ( fetchers = map[string]FetchFunc{ "local": fetchFileLocal, "tmp": fetchFileTmp, "fake": fetchFileFake, "attachment": fetchFileAttachment, } curlProtos = map[string]bool{ "http": false, "https": false, "ftp": false, "ftps": false, } ) func fetcherInit() { archiveEnabled := false info := curl.VersionInfo(curl.VERSION_FIRST) protos := info.Protocols for _, proto := range protos { if proto == "sftp" { rhdl.Printf("curl: enabling protocol %s", proto) fetchers["archiv"] = fetchFileArchiv archiveEnabled = true } else if _, ok := curlProtos[proto]; ok { rhdl.Printf("curl: enabling protocol %s", proto) fetchers[proto] = fetchFileCurl curlProtos[proto] = true } else { rhdl.Printf("curl: ignoring protocol %s", proto) } } for proto, enabled := range curlProtos { if !enabled { rhl.Printf("curl: protocol %s is disabled because the installed library version doesn't support it!", proto) } } if !archiveEnabled { rhl.Printf("archiv: fetcher is disabled because the installed curl library version doesn't support sFTP!") } } func checkPassword(ctx *Context, res *Result) (err error) { ok := false if ok, err = ctx.CheckPassword(); err != nil { return } if !ok { res.ResponseCode = http.StatusUnauthorized res.ErrorString = "invalid username and/or password" } return } func FetchFile(ctx *Context) (res *Result, err error) { res = &Result{ResponseCode: http.StatusOK} var uri *url.URL if uri, err = url.Parse(ctx.SourceUri); err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("parsing uri: %s", err) return res, nil } if !ctx.Trusted { if err = checkPassword(ctx, res); err != nil || res.ResponseCode != http.StatusOK { return } } if fetcher, ok := fetchers[uri.Scheme]; ok { err = fetcher(ctx, res, uri) } else { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("No fetcher for uri scheme '%s' found.", uri.Scheme) return } switch ctx.SourceFilePolicy { case Keep: ctx.DeleteSourceFile = false ctx.DeleteSourceDir = false case DeleteWithDir: ctx.DeleteSourceDir = true fallthrough case Delete: ctx.DeleteSourceFile = true } return }