From b76cbf24f5880f8a070fb3111d2b268052bdf97d Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Fri, 24 Jun 2016 04:27:42 +0200 Subject: added attachment fetcher diff --git a/rhimport/core.go b/rhimport/core.go index e1ba735..6fb0bcb 100644 --- a/rhimport/core.go +++ b/rhimport/core.go @@ -120,6 +120,7 @@ type Context struct { AutotrimLevel int UseMetaData bool SourceUri string + AttachmentChan <-chan []byte SourceFile string DeleteSourceFile bool DeleteSourceDir bool @@ -146,6 +147,7 @@ func NewContext(conf *Config, db *rddb.DBChan) *Context { ctx.NormalizationLevel = conf.ImportParamDefaults.NormalizationLevel ctx.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel ctx.UseMetaData = conf.ImportParamDefaults.UseMetaData + ctx.AttachmentChan = nil ctx.SourceFile = "" ctx.DeleteSourceFile = false ctx.DeleteSourceDir = false diff --git a/rhimport/fetcher.go b/rhimport/fetcher.go index 4819e92..5779631 100644 --- a/rhimport/fetcher.go +++ b/rhimport/fetcher.go @@ -333,36 +333,117 @@ func fetchFileDir(ctx *Context, res *Result, uri *url.URL, dir string) (err erro func fetchFileFake(ctx *Context, res *Result, uri *url.URL) error { rhdl.Printf("Fake fetcher for '%s'", ctx.SourceUri) - if duration, err := strconv.ParseUint(uri.Host, 10, 32); err != nil { - err = nil + duration, err := strconv.ParseUint(uri.Host, 10, 32) + if err != nil { res.ResponseCode = http.StatusBadRequest res.ErrorString = "invalid duration (must be a positive integer)" - } else { - for i := uint(0); i < uint(duration); i++ { - if ctx.Cancel != nil && len(ctx.Cancel) > 0 { - rhl.Printf("faking got canceled") - res.ResponseCode = http.StatusNoContent - res.ErrorString = "canceled" - return nil + return nil + } + + for i := uint(0); i < uint(duration); i++ { + if ctx.Cancel != nil && len(ctx.Cancel) > 0 { + rhl.Printf("faking got canceled") + res.ResponseCode = http.StatusNoContent + res.ErrorString = "canceled" + return nil + } + if ctx.ProgressCallBack != nil { + if keep := ctx.ProgressCallBack(1, "faking", float64(i)/float64(duration), ctx.ProgressCallBackData); !keep { + ctx.ProgressCallBack = nil + } + } + time.Sleep(100 * time.Millisecond) + } + if ctx.ProgressCallBack != nil { + if keep := ctx.ProgressCallBack(1, "faking", 1.0, ctx.ProgressCallBackData); !keep { + ctx.ProgressCallBack = nil + } + } + ctx.SourceFile = "/nonexistend/fake.mp3" + if ctx.SourceFilePolicy == Auto { + ctx.DeleteSourceFile = false + ctx.DeleteSourceDir = false + } + return nil +} + +func writeAttachmentFile(ctx *Context, res *Result, sizeTotal uint64, src *os.File) error { + written := 0 + for { + select { + case <-ctx.Cancel: + rhl.Printf("receiving attachment '%s' got canceled", ctx.SourceFile) + res.ResponseCode = http.StatusNoContent + res.ErrorString = "canceled" + return nil + case data := <-ctx.AttachmentChan: + w, err := src.Write(data) + if err != nil { + rhl.Printf("Unable to write file %s: %s", ctx.SourceFile, err) + return err + } + written += w + + p := float64(written) / float64(sizeTotal) + if p > 1.0 { + p = 1.0 } + if ctx.ProgressCallBack != nil { - if keep := ctx.ProgressCallBack(1, "faking", float64(i)/float64(duration), ctx.ProgressCallBackData); !keep { + if keep := ctx.ProgressCallBack(1, "receiving", p, ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } - time.Sleep(100 * time.Millisecond) - } - if ctx.ProgressCallBack != nil { - if keep := ctx.ProgressCallBack(1, "faking", 1.0, ctx.ProgressCallBackData); !keep { - ctx.ProgressCallBack = nil + + if uint64(written) >= sizeTotal { + return nil } } - ctx.SourceFile = "/nonexistend/fake.mp3" - if ctx.SourceFilePolicy == Auto { - ctx.DeleteSourceFile = false - ctx.DeleteSourceDir = false + } +} + +func fetchFileAttachment(ctx *Context, res *Result, uri *url.URL) error { + rhdl.Printf("Attachment fetcher for '%s'", ctx.SourceUri) + + if ctx.AttachmentChan == nil { + return fmt.Errorf("attachement channel is nil") + } + + sizeTotal, err := strconv.ParseUint(uri.Host, 10, 32) + if err != nil { + res.ResponseCode = http.StatusBadRequest + res.ErrorString = "invalid attachment size (must be a positive integer)" + return nil + } + + basepath, err := ioutil.TempDir(ctx.conf.TempDir, "rhimportd-") + if err != nil { + return err + } + + ctx.SourceFile = filepath.Join(basepath, path.Clean("/"+uri.Path)) + + var src *os.File + if src, err = os.OpenFile(ctx.SourceFile, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600); err != nil { + rhl.Printf("Unable to create file %s: %s", ctx.SourceFile, err) + return err + } + defer src.Close() + + if ctx.ProgressCallBack != nil { + if keep := ctx.ProgressCallBack(1, "receiving", 0.0, ctx.ProgressCallBackData); !keep { + ctx.ProgressCallBack = nil } } + + if err = writeAttachmentFile(ctx, res, sizeTotal, src); err != nil { + return err + } + + if ctx.SourceFilePolicy == Auto { + ctx.DeleteSourceFile = true + ctx.DeleteSourceDir = true + } return nil } @@ -373,9 +454,10 @@ type FetchFunc func(*Context, *Result, *url.URL) (err error) // home:// ????? var ( fetchers = map[string]FetchFunc{ - "local": fetchFileLocal, - "tmp": fetchFileTmp, - "fake": fetchFileFake, + "local": fetchFileLocal, + "tmp": fetchFileTmp, + "fake": fetchFileFake, + "attachment": fetchFileAttachment, } curlProtos = map[string]bool{ "http": false, "https": false, @@ -441,7 +523,9 @@ func FetchFile(ctx *Context) (res *Result, err error) { if fetcher, ok := fetchers[uri.Scheme]; ok { err = fetcher(ctx, res, uri) } else { - err = fmt.Errorf("No fetcher for uri scheme '%s' found.", uri.Scheme) + res.ResponseCode = http.StatusBadRequest + res.ErrorString = fmt.Sprintf("No fetcher for uri scheme '%s' found.", uri.Scheme) + return } switch ctx.SourceFilePolicy { -- cgit v0.10.2