// // rhimportd // // The Radio Helsinki Rivendell Import Daemon // // // Copyright (C) 2015-2016 Christian Pointner // // This file is part of rhimportd. // // rhimportd is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // rhimportd is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with rhimportd. If not, see . // package rhimport import ( "fmt" "io/ioutil" "mime" "net/http" "net/url" "os" "os/user" "path" "path/filepath" "strconv" "strings" "time" "github.com/andelf/go-curl" ) type FetcherCurlCBData struct { ctx *Context res *Result basepath string filename string remotename string conv Converter writeError error } func (self *FetcherCurlCBData) Cleanup() { if self.conv != nil { self.conv.Close() } } func curlHeaderCallback(ptr []byte, userdata interface{}) bool { hdr := fmt.Sprintf("%s", ptr) data := userdata.(*FetcherCurlCBData) if strings.HasPrefix(hdr, "Content-Disposition:") { if mediatype, params, err := mime.ParseMediaType(strings.TrimPrefix(hdr, "Content-Disposition:")); err == nil { if mediatype == "attachment" { data.filename = filepath.Join(data.basepath, path.Clean("/"+params["filename"])) } } } return true } func curlWriteCallback(ptr []byte, userdata interface{}) bool { data := userdata.(*FetcherCurlCBData) if data.conv == nil { if data.filename == "" { name := path.Clean("/" + data.remotename) if name == "/" { rhdl.Printf("remotename('%s') is invalid, replacing it with 'unnamed'", data.remotename) name = "unnamed" } data.filename = filepath.Join(data.basepath, name) } nc, newFilename, err := NewFFMpegConverter(data.filename) if err != nil { rhl.Printf("Unable to create converter for file %s: %s", data.filename, err) data.writeError = err return false } data.filename = newFilename data.conv = nc } if _, err := data.conv.Write(ptr); err != nil { rhl.Printf("Unable to write file %s: %s", data.filename, err) data.writeError = err return false } return true } func curlProgressCallback(dltotal, dlnow, ultotal, ulnow float64, userdata interface{}) bool { data := userdata.(*FetcherCurlCBData) if data.writeError != nil { return false } if data.ctx.Cancel != nil && len(data.ctx.Cancel) > 0 { data.res.ResponseCode = http.StatusNoContent data.res.ErrorString = "canceled" return false } if dltotal > float64(FILESIZE_MAX) { data.res.ResponseCode = http.StatusRequestEntityTooLarge data.res.ErrorString = "file exceeds maximum file size" return false } if data.ctx.ProgressCallBack != nil { if keep := data.ctx.ProgressCallBack(1, "downloading", dlnow, dltotal, data.ctx.ProgressCallBackData); !keep { data.ctx.ProgressCallBack = nil } } return true } func fetchFileCurl(ctx *Context, res *Result, uri *url.URL) (err error) { rhl.Printf("curl-based fetcher called for '%s'", ctx.SourceUri) easy := curl.EasyInit() if easy == nil { err = fmt.Errorf("Error initializing libcurl") return } defer easy.Cleanup() easy.Setopt(curl.OPT_FOLLOWLOCATION, true) easy.Setopt(curl.OPT_URL, ctx.SourceUri) easy.Setopt(curl.OPT_USERAGENT, "Radio Helsinki Import") cbdata := &FetcherCurlCBData{ctx: ctx, res: res, remotename: path.Base(uri.Path)} defer cbdata.Cleanup() if cbdata.basepath, err = ioutil.TempDir(ctx.conf.TempDir, "rhimportd-"); err != nil { return } easy.Setopt(curl.OPT_HEADERFUNCTION, curlHeaderCallback) easy.Setopt(curl.OPT_HEADERDATA, cbdata) easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback) easy.Setopt(curl.OPT_WRITEDATA, cbdata) easy.Setopt(curl.OPT_NOPROGRESS, false) easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback) easy.Setopt(curl.OPT_PROGRESSDATA, cbdata) err = easy.Perform() cbdata.conv.Close() rhl.Printf("waiting for converter to finish...") convOut, convErr := cbdata.conv.GetResult() if err != nil || cbdata.writeError != nil || convErr != nil { if cbdata.conv != nil { rhdl.Printf("Removing stale file: %s", cbdata.filename) os.Remove(cbdata.filename) os.Remove(path.Dir(cbdata.filename)) } if res.ResponseCode == http.StatusNoContent { rhl.Printf("download of '%s' got canceled", ctx.SourceUri) return nil } if cbdata.writeError != nil { err = cbdata.writeError } if convErr != nil { err = convErr if convOut != "" { rhl.Printf("converter output: %s", convOut) } } err = fmt.Errorf("curl-fetcher('%s'): %s", ctx.SourceUri, err) rhl.Println(err) return } ctx.SourceFile = cbdata.filename if ctx.SourceFilePolicy == Auto { ctx.DeleteSourceFile = true ctx.DeleteSourceDir = true } return } var months = [...]string{ "Jänner", "Februar", "März", "April", "Mai", "Juni", "Juli", "August", "September", "Oktober", "November", "Dezember", } var weekdays = [...]string{ "Sonntag", "Montag", "Dienstag", "Mittwoch", "Donnerstag", "Freitag", "Samstag", } func generateArchivFilePath(uri *url.URL) (file, path string, err error) { var t time.Time if t, err = time.Parse("2006/01/02/15/04", fmt.Sprintf("%s%s", uri.Host, uri.Path)); err != nil { return } file = t.Format("2006-01-02-1504") + ".ogg" // TODO: make basepath configurable path = fmt.Sprintf("/srv/archiv/%04d/%02d-%s/%02d-%s", t.Year(), t.Month(), months[t.Month()-1], t.Day(), weekdays[t.Weekday()]) return } func fetchFileArchiv(ctx *Context, res *Result, uri *url.URL) (err error) { rhdl.Printf("archiv fetcher called for '%s'", ctx.SourceUri) var srcfile, srcpath string if srcfile, srcpath, err = generateArchivFilePath(uri); err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("date/time is invalid: %s", err) return nil } easy := curl.EasyInit() if easy == nil { err = fmt.Errorf("Error initializing libcurl") return } defer easy.Cleanup() // TODO: make user and host configurable scpuri := fmt.Sprintf("sftp://archiv.helsinki.at%s/%s", srcpath, srcfile) easy.Setopt(curl.OPT_URL, scpuri) easy.Setopt(curl.OPT_USERNAME, "rhimport") u, _ := user.Current() easy.Setopt(curl.OPT_SSH_PUBLIC_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa.pub", u.HomeDir)) easy.Setopt(curl.OPT_SSH_PRIVATE_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa", u.HomeDir)) cbdata := &FetcherCurlCBData{ctx: ctx, res: res} defer cbdata.Cleanup() var destpath string if destpath, err = ioutil.TempDir(ctx.conf.TempDir, "rhimportd-"); err != nil { return } cbdata.filename = fmt.Sprintf("%s/%s", destpath, srcfile) easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback) easy.Setopt(curl.OPT_WRITEDATA, cbdata) easy.Setopt(curl.OPT_NOPROGRESS, false) easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback) easy.Setopt(curl.OPT_PROGRESSDATA, cbdata) rhdl.Printf("importing archiv file from %s", scpuri) err = easy.Perform() cbdata.conv.Close() rhl.Printf("waiting for converter to finish...") convOut, convErr := cbdata.conv.GetResult() if err != nil || cbdata.writeError != nil || convErr != nil { if cbdata.conv != nil { rhdl.Printf("Removing stale file: %s", cbdata.filename) os.Remove(cbdata.filename) os.Remove(path.Dir(cbdata.filename)) } if res.ResponseCode == http.StatusNoContent { rhl.Printf("download of '%s' got canceled", ctx.SourceUri) return nil } if cbdata.writeError != nil { err = cbdata.writeError } if convErr != nil { err = convErr if convOut != "" { rhl.Printf("converter output: %s", convOut) } } err = fmt.Errorf("archiv-fetcher('%s'): %s", ctx.SourceUri, err) rhl.Println(err) return } ctx.SourceFile = cbdata.filename if ctx.SourceFilePolicy == Auto { ctx.DeleteSourceFile = true ctx.DeleteSourceDir = true } return } func fetchFileLocal(ctx *Context, res *Result, uri *url.URL) (err error) { return fetchFileDir(ctx, res, uri, ctx.conf.LocalFetchDir) } func fetchFileTmp(ctx *Context, res *Result, uri *url.URL) (err error) { return fetchFileDir(ctx, res, uri, ctx.conf.TempDir) } func fetchFileDir(ctx *Context, res *Result, uri *url.URL, dir string) (err error) { rhl.Printf("Dir fetcher called for '%s'", ctx.SourceUri) ctx.SourceFile = filepath.Join(dir, path.Clean("/"+uri.Path)) var src *os.File if src, err = os.Open(ctx.SourceFile); err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("local-file open(): %s", err) return nil } defer src.Close() size := int64(0) if info, err := src.Stat(); err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("local-file stat(): %s", err) return nil } else { size = info.Size() if info.IsDir() { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("'%s' is a directory", ctx.SourceFile) return nil } } if size > FILESIZE_MAX { res.ResponseCode = http.StatusRequestEntityTooLarge res.ErrorString = "file exceeds maximum file size" return nil } if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(1, "fetching", 0.0, float64(size), ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(1, "fetching", float64(size), float64(size), ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } if ctx.SourceFilePolicy == Auto { ctx.DeleteSourceFile = false ctx.DeleteSourceDir = false } return } func fetchFileFake(ctx *Context, res *Result, uri *url.URL) error { rhdl.Printf("Fake fetcher for '%s'", ctx.SourceUri) duration, err := strconv.ParseUint(uri.Host, 10, 32) if err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = "invalid duration (must be a positive integer)" return nil } for i := uint(0); i < uint(duration); i++ { if ctx.Cancel != nil && len(ctx.Cancel) > 0 { rhl.Printf("faking got canceled") res.ResponseCode = http.StatusNoContent res.ErrorString = "canceled" return nil } if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(1, "faking", float64(i), float64(duration), ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } time.Sleep(100 * time.Millisecond) } if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(1, "faking", float64(duration), float64(duration), ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } ctx.SourceFile = "/nonexistend/fake.mp3" if ctx.SourceFilePolicy == Auto { ctx.DeleteSourceFile = false ctx.DeleteSourceDir = false } return nil } func writeAttachmentFile(ctx *Context, res *Result, sizeTotal uint64, src *os.File) error { cancel := ctx.Cancel if cancel == nil { cancel = make(<-chan bool) } written := uint64(0) for { select { case <-cancel: rhl.Printf("receiving attachment '%s' got canceled", ctx.SourceFile) res.ResponseCode = http.StatusNoContent res.ErrorString = "canceled" return nil case data, ok := <-ctx.AttachmentChan: if !ok { rhl.Printf("receiving attachment '%s' got canceled (channel has been closed)", ctx.SourceFile) res.ResponseCode = http.StatusNoContent res.ErrorString = "canceled" return nil } left := sizeTotal - written if int(left) < len(data) { rhl.Printf("attachment fetcher: truncating ") data = data[0:left] } w, err := src.Write(data) if err != nil { rhl.Printf("Unable to write file %s: %s", ctx.SourceFile, err) return err } written += uint64(w) if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(1, "receiving", float64(written), float64(sizeTotal), ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } if uint64(written) >= sizeTotal { return nil } } } } func fetchFileAttachment(ctx *Context, res *Result, uri *url.URL) error { rhdl.Printf("Attachment fetcher for '%s'", ctx.SourceUri) if ctx.AttachmentChan == nil { return fmt.Errorf("attachement channel is nil") } sizeTotal, err := strconv.ParseUint(uri.Host, 10, 64) if err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = "invalid attachment size (must be a positive integer)" return nil } if sizeTotal > FILESIZE_MAX { res.ResponseCode = http.StatusRequestEntityTooLarge res.ErrorString = "file exceeds maximum file size" return nil } basepath, err := ioutil.TempDir(ctx.conf.TempDir, "rhimportd-") if err != nil { return err } ctx.SourceFile = filepath.Join(basepath, path.Clean("/"+uri.Path)) var src *os.File if src, err = os.OpenFile(ctx.SourceFile, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600); err != nil { rhl.Printf("Unable to create file %s: %s", ctx.SourceFile, err) return err } defer src.Close() if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(1, "receiving", 0.0, float64(sizeTotal), ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } if err = writeAttachmentFile(ctx, res, sizeTotal, src); err != nil { return err } if ctx.SourceFilePolicy == Auto { ctx.DeleteSourceFile = true ctx.DeleteSourceDir = true } return nil } type FetchFunc func(*Context, *Result, *url.URL) (err error) // TODO: implement fetchers for: // public:// // home:// ????? var ( fetchers = map[string]FetchFunc{ "local": fetchFileLocal, "tmp": fetchFileTmp, "fake": fetchFileFake, "attachment": fetchFileAttachment, } curlProtos = map[string]bool{ "http": false, "https": false, "ftp": false, "ftps": false, } ) func fetcherInit() { archiveEnabled := false info := curl.VersionInfo(curl.VERSION_FIRST) protos := info.Protocols for _, proto := range protos { if proto == "sftp" { rhdl.Printf("curl: enabling protocol %s", proto) fetchers["archiv"] = fetchFileArchiv archiveEnabled = true } else if _, ok := curlProtos[proto]; ok { rhdl.Printf("curl: enabling protocol %s", proto) fetchers[proto] = fetchFileCurl curlProtos[proto] = true } else { rhdl.Printf("curl: ignoring protocol %s", proto) } } for proto, enabled := range curlProtos { if !enabled { rhl.Printf("curl: protocol %s is disabled because the installed library version doesn't support it!", proto) } } if !archiveEnabled { rhl.Printf("archiv: fetcher is disabled because the installed curl library version doesn't support sFTP!") } } func checkPassword(ctx *Context, res *Result) (err error) { ok := false if ok, err = ctx.CheckPassword(); err != nil { return } if !ok { res.ResponseCode = http.StatusUnauthorized res.ErrorString = "invalid username and/or password" } return } func FetchFile(ctx *Context) (res *Result, err error) { res = &Result{ResponseCode: http.StatusOK} var uri *url.URL if uri, err = url.Parse(ctx.SourceUri); err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("parsing uri: %s", err) return res, nil } if !ctx.Trusted { if err = checkPassword(ctx, res); err != nil || res.ResponseCode != http.StatusOK { return } } if fetcher, ok := fetchers[uri.Scheme]; ok { err = fetcher(ctx, res, uri) } else { res.ResponseCode = http.StatusBadRequest res.ErrorString = fmt.Sprintf("No fetcher for uri scheme '%s' found.", uri.Scheme) return } switch ctx.SourceFilePolicy { case Keep: ctx.DeleteSourceFile = false ctx.DeleteSourceDir = false case DeleteWithDir: ctx.DeleteSourceDir = true fallthrough case Delete: ctx.DeleteSourceFile = true } return }