diff options
author | Christian Pointner <equinox@helsinki.at> | 2016-05-06 23:13:47 (GMT) |
---|---|---|
committer | Christian Pointner <equinox@helsinki.at> | 2016-05-06 23:13:47 (GMT) |
commit | f500c36ebcdbe77503e1c71fcac26b44be174039 (patch) | |
tree | 66141540b8e5b5490b14fab45ff9a23ead9d3805 /rhimport | |
parent | 10f69ccb656abe346b73448408a0e5c3a962decf (diff) |
refactored curl/archiv fetcher to handle write errors correctly
Diffstat (limited to 'rhimport')
-rw-r--r-- | rhimport/fetcher.go | 172 | ||||
-rw-r--r-- | rhimport/session.go | 2 |
2 files changed, 92 insertions, 82 deletions
diff --git a/rhimport/fetcher.go b/rhimport/fetcher.go index 2ff5a9c..ff25ae3 100644 --- a/rhimport/fetcher.go +++ b/rhimport/fetcher.go @@ -48,6 +48,7 @@ type FetcherCurlCBData struct { filename string remotename string file *os.File + writeError error } func (self *FetcherCurlCBData) Cleanup() { @@ -79,12 +80,14 @@ func curlWriteCallback(ptr []byte, userdata interface{}) bool { fp, err := os.OpenFile(data.filename, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600) if err != nil { rhl.Printf("Unable to create file %s: %s", data.filename, err) + data.writeError = err return false } data.file = fp } if _, err := data.file.Write(ptr); err != nil { rhl.Printf("Unable to write file %s: %s", data.filename, err) + data.writeError = err return false } return true @@ -93,6 +96,10 @@ func curlWriteCallback(ptr []byte, userdata interface{}) bool { func curlProgressCallback(dltotal, dlnow, ultotal, ulnow float64, userdata interface{}) bool { data := userdata.(*FetcherCurlCBData) + if data.writeError != nil { + return false + } + if data.ctx.Cancel != nil && len(data.ctx.Cancel) > 0 { rhl.Printf("downloading '%s' got canceled", data.ctx.SourceUri) data.res.ResponseCode = http.StatusNoContent @@ -112,52 +119,53 @@ func fetchFileCurl(ctx *Context, res *Result, uri *url.URL) (err error) { rhl.Printf("curl-based fetcher called for '%s'", ctx.SourceUri) easy := curl.EasyInit() - if easy != nil { - defer easy.Cleanup() + if easy == nil { + err = fmt.Errorf("Error initializing libcurl") + return + } + defer easy.Cleanup() - easy.Setopt(curl.OPT_FOLLOWLOCATION, true) - easy.Setopt(curl.OPT_URL, ctx.SourceUri) - easy.Setopt(curl.OPT_USERAGENT, "Radio Helsinki Import") + easy.Setopt(curl.OPT_FOLLOWLOCATION, true) + easy.Setopt(curl.OPT_URL, ctx.SourceUri) + easy.Setopt(curl.OPT_USERAGENT, "Radio Helsinki Import") - cbdata := &FetcherCurlCBData{ctx: ctx, res: res, remotename: path.Base(uri.Path)} - defer cbdata.Cleanup() - if cbdata.basepath, err = ioutil.TempDir(ctx.conf.TempDir, "rhimportd-"); err != nil { - return - } + cbdata := &FetcherCurlCBData{ctx: ctx, res: res, remotename: path.Base(uri.Path)} + defer cbdata.Cleanup() + if cbdata.basepath, err = ioutil.TempDir(ctx.conf.TempDir, "rhimportd-"); err != nil { + return + } - easy.Setopt(curl.OPT_HEADERFUNCTION, curlHeaderCallback) - easy.Setopt(curl.OPT_HEADERDATA, cbdata) + easy.Setopt(curl.OPT_HEADERFUNCTION, curlHeaderCallback) + easy.Setopt(curl.OPT_HEADERDATA, cbdata) - easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback) - easy.Setopt(curl.OPT_WRITEDATA, cbdata) + easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback) + easy.Setopt(curl.OPT_WRITEDATA, cbdata) - easy.Setopt(curl.OPT_NOPROGRESS, false) - easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback) - easy.Setopt(curl.OPT_PROGRESSDATA, cbdata) + easy.Setopt(curl.OPT_NOPROGRESS, false) + easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback) + easy.Setopt(curl.OPT_PROGRESSDATA, cbdata) - if err = easy.Perform(); err != nil { - if cbdata.file != nil { - rhdl.Printf("Removing stale file: %s", cbdata.filename) - os.Remove(cbdata.filename) - os.Remove(path.Dir(cbdata.filename)) - } - if res.ResponseCode == http.StatusNoContent { - err = nil - } else { - err = fmt.Errorf("curl-fetcher('%s'): %s", ctx.SourceUri, err) - } - return + if err = easy.Perform(); err != nil { + if cbdata.file != nil { + rhdl.Printf("Removing stale file: %s", cbdata.filename) + os.Remove(cbdata.filename) + os.Remove(path.Dir(cbdata.filename)) } - - ctx.SourceFile = cbdata.filename - if ctx.SourceFilePolicy == Auto { - ctx.DeleteSourceFile = true - ctx.DeleteSourceDir = true + if res.ResponseCode == http.StatusNoContent { + return nil } - } else { - err = fmt.Errorf("Error initializing libcurl") + if cbdata.writeError != nil { + err = cbdata.writeError + } + err = fmt.Errorf("curl-fetcher('%s'): %s", ctx.SourceUri, err) + return } + ctx.SourceFile = cbdata.filename + if ctx.SourceFilePolicy == Auto { + ctx.DeleteSourceFile = true + ctx.DeleteSourceDir = true + } return } @@ -211,58 +219,60 @@ func fetchFileArchiv(ctx *Context, res *Result, uri *url.URL) (err error) { } easy := curl.EasyInit() - if easy != nil { - defer easy.Cleanup() - - // TODO: make user and host configurable - scpuri := fmt.Sprintf("sftp://archiv.helsinki.at%s/%s", srcpath, srcfile) - easy.Setopt(curl.OPT_URL, scpuri) - easy.Setopt(curl.OPT_USERNAME, "rhimport") - - u, _ := user.Current() - easy.Setopt(curl.OPT_SSH_PUBLIC_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa.pub", u.HomeDir)) - easy.Setopt(curl.OPT_SSH_PRIVATE_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa", u.HomeDir)) - - cbdata := &FetcherCurlCBData{ctx: ctx, res: res} - defer cbdata.Cleanup() - var destpath string - if destpath, err = ioutil.TempDir(ctx.conf.TempDir, "rhimportd-"); err != nil { - return - } - cbdata.filename = fmt.Sprintf("%s/%s", destpath, srcfile) + if easy == nil { + err = fmt.Errorf("Error initializing libcurl") + return + } + defer easy.Cleanup() - easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback) - easy.Setopt(curl.OPT_WRITEDATA, cbdata) + // TODO: make user and host configurable + scpuri := fmt.Sprintf("sftp://archiv.helsinki.at%s/%s", srcpath, srcfile) + easy.Setopt(curl.OPT_URL, scpuri) + easy.Setopt(curl.OPT_USERNAME, "rhimport") - easy.Setopt(curl.OPT_NOPROGRESS, false) - easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback) - easy.Setopt(curl.OPT_PROGRESSDATA, cbdata) + u, _ := user.Current() + easy.Setopt(curl.OPT_SSH_PUBLIC_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa.pub", u.HomeDir)) + easy.Setopt(curl.OPT_SSH_PRIVATE_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa", u.HomeDir)) - rhdl.Printf("importing archiv file from %s", scpuri) - if err = easy.Perform(); err != nil { - if cbdata.file != nil { - rhdl.Printf("Removing stale file: %s", cbdata.filename) - os.Remove(cbdata.filename) - os.Remove(path.Dir(cbdata.filename)) - } - if res.ResponseCode == http.StatusNoContent { - err = nil - } else { - err = fmt.Errorf("archiv fetcher('%s'): %s", ctx.SourceUri, err) - } - return - } + cbdata := &FetcherCurlCBData{ctx: ctx, res: res} + defer cbdata.Cleanup() + var destpath string + if destpath, err = ioutil.TempDir(ctx.conf.TempDir, "rhimportd-"); err != nil { + return + } + cbdata.filename = fmt.Sprintf("%s/%s", destpath, srcfile) - ctx.SourceFile = cbdata.filename - if ctx.SourceFilePolicy == Auto { - ctx.DeleteSourceFile = true - ctx.DeleteSourceDir = true + easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback) + easy.Setopt(curl.OPT_WRITEDATA, cbdata) + + easy.Setopt(curl.OPT_NOPROGRESS, false) + easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback) + easy.Setopt(curl.OPT_PROGRESSDATA, cbdata) + + rhdl.Printf("importing archiv file from %s", scpuri) + if err = easy.Perform(); err != nil { + if cbdata.file != nil { + rhdl.Printf("Removing stale file: %s", cbdata.filename) + os.Remove(cbdata.filename) + os.Remove(path.Dir(cbdata.filename)) } - } else { - err = fmt.Errorf("Error initializing libcurl") + if res.ResponseCode == http.StatusNoContent { + return nil + } + if cbdata.writeError != nil { + err = cbdata.writeError + } + err = fmt.Errorf("archiv-fetcher('%s'): %s", ctx.SourceUri, err) + return } - return nil + ctx.SourceFile = cbdata.filename + if ctx.SourceFilePolicy == Auto { + ctx.DeleteSourceFile = true + ctx.DeleteSourceDir = true + } + + return } func fetchFileLocal(ctx *Context, res *Result, uri *url.URL) (err error) { diff --git a/rhimport/session.go b/rhimport/session.go index 450c818..9232bc1 100644 --- a/rhimport/session.go +++ b/rhimport/session.go @@ -211,7 +211,7 @@ func (self *Session) dispatchRequests() { case req := <-self.addDoneChan: req.response <- self.addDoneHandler(req.userdata, req.callback) case p := <-self.progressIntChan: - if self.state != SESSION_RUNNING { + if self.state == SESSION_RUNNING { self.callProgressHandler(&p) } case r := <-self.doneIntChan: |