summaryrefslogtreecommitdiff
path: root/rhimport
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2016-05-06 23:13:47 (GMT)
committerChristian Pointner <equinox@helsinki.at>2016-05-06 23:13:47 (GMT)
commitf500c36ebcdbe77503e1c71fcac26b44be174039 (patch)
tree66141540b8e5b5490b14fab45ff9a23ead9d3805 /rhimport
parent10f69ccb656abe346b73448408a0e5c3a962decf (diff)
refactored curl/archiv fetcher to handle write errors correctly
Diffstat (limited to 'rhimport')
-rw-r--r--rhimport/fetcher.go172
-rw-r--r--rhimport/session.go2
2 files changed, 92 insertions, 82 deletions
diff --git a/rhimport/fetcher.go b/rhimport/fetcher.go
index 2ff5a9c..ff25ae3 100644
--- a/rhimport/fetcher.go
+++ b/rhimport/fetcher.go
@@ -48,6 +48,7 @@ type FetcherCurlCBData struct {
filename string
remotename string
file *os.File
+ writeError error
}
func (self *FetcherCurlCBData) Cleanup() {
@@ -79,12 +80,14 @@ func curlWriteCallback(ptr []byte, userdata interface{}) bool {
fp, err := os.OpenFile(data.filename, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600)
if err != nil {
rhl.Printf("Unable to create file %s: %s", data.filename, err)
+ data.writeError = err
return false
}
data.file = fp
}
if _, err := data.file.Write(ptr); err != nil {
rhl.Printf("Unable to write file %s: %s", data.filename, err)
+ data.writeError = err
return false
}
return true
@@ -93,6 +96,10 @@ func curlWriteCallback(ptr []byte, userdata interface{}) bool {
func curlProgressCallback(dltotal, dlnow, ultotal, ulnow float64, userdata interface{}) bool {
data := userdata.(*FetcherCurlCBData)
+ if data.writeError != nil {
+ return false
+ }
+
if data.ctx.Cancel != nil && len(data.ctx.Cancel) > 0 {
rhl.Printf("downloading '%s' got canceled", data.ctx.SourceUri)
data.res.ResponseCode = http.StatusNoContent
@@ -112,52 +119,53 @@ func fetchFileCurl(ctx *Context, res *Result, uri *url.URL) (err error) {
rhl.Printf("curl-based fetcher called for '%s'", ctx.SourceUri)
easy := curl.EasyInit()
- if easy != nil {
- defer easy.Cleanup()
+ if easy == nil {
+ err = fmt.Errorf("Error initializing libcurl")
+ return
+ }
+ defer easy.Cleanup()
- easy.Setopt(curl.OPT_FOLLOWLOCATION, true)
- easy.Setopt(curl.OPT_URL, ctx.SourceUri)
- easy.Setopt(curl.OPT_USERAGENT, "Radio Helsinki Import")
+ easy.Setopt(curl.OPT_FOLLOWLOCATION, true)
+ easy.Setopt(curl.OPT_URL, ctx.SourceUri)
+ easy.Setopt(curl.OPT_USERAGENT, "Radio Helsinki Import")
- cbdata := &FetcherCurlCBData{ctx: ctx, res: res, remotename: path.Base(uri.Path)}
- defer cbdata.Cleanup()
- if cbdata.basepath, err = ioutil.TempDir(ctx.conf.TempDir, "rhimportd-"); err != nil {
- return
- }
+ cbdata := &FetcherCurlCBData{ctx: ctx, res: res, remotename: path.Base(uri.Path)}
+ defer cbdata.Cleanup()
+ if cbdata.basepath, err = ioutil.TempDir(ctx.conf.TempDir, "rhimportd-"); err != nil {
+ return
+ }
- easy.Setopt(curl.OPT_HEADERFUNCTION, curlHeaderCallback)
- easy.Setopt(curl.OPT_HEADERDATA, cbdata)
+ easy.Setopt(curl.OPT_HEADERFUNCTION, curlHeaderCallback)
+ easy.Setopt(curl.OPT_HEADERDATA, cbdata)
- easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback)
- easy.Setopt(curl.OPT_WRITEDATA, cbdata)
+ easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback)
+ easy.Setopt(curl.OPT_WRITEDATA, cbdata)
- easy.Setopt(curl.OPT_NOPROGRESS, false)
- easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback)
- easy.Setopt(curl.OPT_PROGRESSDATA, cbdata)
+ easy.Setopt(curl.OPT_NOPROGRESS, false)
+ easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback)
+ easy.Setopt(curl.OPT_PROGRESSDATA, cbdata)
- if err = easy.Perform(); err != nil {
- if cbdata.file != nil {
- rhdl.Printf("Removing stale file: %s", cbdata.filename)
- os.Remove(cbdata.filename)
- os.Remove(path.Dir(cbdata.filename))
- }
- if res.ResponseCode == http.StatusNoContent {
- err = nil
- } else {
- err = fmt.Errorf("curl-fetcher('%s'): %s", ctx.SourceUri, err)
- }
- return
+ if err = easy.Perform(); err != nil {
+ if cbdata.file != nil {
+ rhdl.Printf("Removing stale file: %s", cbdata.filename)
+ os.Remove(cbdata.filename)
+ os.Remove(path.Dir(cbdata.filename))
}
-
- ctx.SourceFile = cbdata.filename
- if ctx.SourceFilePolicy == Auto {
- ctx.DeleteSourceFile = true
- ctx.DeleteSourceDir = true
+ if res.ResponseCode == http.StatusNoContent {
+ return nil
}
- } else {
- err = fmt.Errorf("Error initializing libcurl")
+ if cbdata.writeError != nil {
+ err = cbdata.writeError
+ }
+ err = fmt.Errorf("curl-fetcher('%s'): %s", ctx.SourceUri, err)
+ return
}
+ ctx.SourceFile = cbdata.filename
+ if ctx.SourceFilePolicy == Auto {
+ ctx.DeleteSourceFile = true
+ ctx.DeleteSourceDir = true
+ }
return
}
@@ -211,58 +219,60 @@ func fetchFileArchiv(ctx *Context, res *Result, uri *url.URL) (err error) {
}
easy := curl.EasyInit()
- if easy != nil {
- defer easy.Cleanup()
-
- // TODO: make user and host configurable
- scpuri := fmt.Sprintf("sftp://archiv.helsinki.at%s/%s", srcpath, srcfile)
- easy.Setopt(curl.OPT_URL, scpuri)
- easy.Setopt(curl.OPT_USERNAME, "rhimport")
-
- u, _ := user.Current()
- easy.Setopt(curl.OPT_SSH_PUBLIC_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa.pub", u.HomeDir))
- easy.Setopt(curl.OPT_SSH_PRIVATE_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa", u.HomeDir))
-
- cbdata := &FetcherCurlCBData{ctx: ctx, res: res}
- defer cbdata.Cleanup()
- var destpath string
- if destpath, err = ioutil.TempDir(ctx.conf.TempDir, "rhimportd-"); err != nil {
- return
- }
- cbdata.filename = fmt.Sprintf("%s/%s", destpath, srcfile)
+ if easy == nil {
+ err = fmt.Errorf("Error initializing libcurl")
+ return
+ }
+ defer easy.Cleanup()
- easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback)
- easy.Setopt(curl.OPT_WRITEDATA, cbdata)
+ // TODO: make user and host configurable
+ scpuri := fmt.Sprintf("sftp://archiv.helsinki.at%s/%s", srcpath, srcfile)
+ easy.Setopt(curl.OPT_URL, scpuri)
+ easy.Setopt(curl.OPT_USERNAME, "rhimport")
- easy.Setopt(curl.OPT_NOPROGRESS, false)
- easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback)
- easy.Setopt(curl.OPT_PROGRESSDATA, cbdata)
+ u, _ := user.Current()
+ easy.Setopt(curl.OPT_SSH_PUBLIC_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa.pub", u.HomeDir))
+ easy.Setopt(curl.OPT_SSH_PRIVATE_KEYFILE, fmt.Sprintf("%s/.ssh/id_rsa", u.HomeDir))
- rhdl.Printf("importing archiv file from %s", scpuri)
- if err = easy.Perform(); err != nil {
- if cbdata.file != nil {
- rhdl.Printf("Removing stale file: %s", cbdata.filename)
- os.Remove(cbdata.filename)
- os.Remove(path.Dir(cbdata.filename))
- }
- if res.ResponseCode == http.StatusNoContent {
- err = nil
- } else {
- err = fmt.Errorf("archiv fetcher('%s'): %s", ctx.SourceUri, err)
- }
- return
- }
+ cbdata := &FetcherCurlCBData{ctx: ctx, res: res}
+ defer cbdata.Cleanup()
+ var destpath string
+ if destpath, err = ioutil.TempDir(ctx.conf.TempDir, "rhimportd-"); err != nil {
+ return
+ }
+ cbdata.filename = fmt.Sprintf("%s/%s", destpath, srcfile)
- ctx.SourceFile = cbdata.filename
- if ctx.SourceFilePolicy == Auto {
- ctx.DeleteSourceFile = true
- ctx.DeleteSourceDir = true
+ easy.Setopt(curl.OPT_WRITEFUNCTION, curlWriteCallback)
+ easy.Setopt(curl.OPT_WRITEDATA, cbdata)
+
+ easy.Setopt(curl.OPT_NOPROGRESS, false)
+ easy.Setopt(curl.OPT_PROGRESSFUNCTION, curlProgressCallback)
+ easy.Setopt(curl.OPT_PROGRESSDATA, cbdata)
+
+ rhdl.Printf("importing archiv file from %s", scpuri)
+ if err = easy.Perform(); err != nil {
+ if cbdata.file != nil {
+ rhdl.Printf("Removing stale file: %s", cbdata.filename)
+ os.Remove(cbdata.filename)
+ os.Remove(path.Dir(cbdata.filename))
}
- } else {
- err = fmt.Errorf("Error initializing libcurl")
+ if res.ResponseCode == http.StatusNoContent {
+ return nil
+ }
+ if cbdata.writeError != nil {
+ err = cbdata.writeError
+ }
+ err = fmt.Errorf("archiv-fetcher('%s'): %s", ctx.SourceUri, err)
+ return
}
- return nil
+ ctx.SourceFile = cbdata.filename
+ if ctx.SourceFilePolicy == Auto {
+ ctx.DeleteSourceFile = true
+ ctx.DeleteSourceDir = true
+ }
+
+ return
}
func fetchFileLocal(ctx *Context, res *Result, uri *url.URL) (err error) {
diff --git a/rhimport/session.go b/rhimport/session.go
index 450c818..9232bc1 100644
--- a/rhimport/session.go
+++ b/rhimport/session.go
@@ -211,7 +211,7 @@ func (self *Session) dispatchRequests() {
case req := <-self.addDoneChan:
req.response <- self.addDoneHandler(req.userdata, req.callback)
case p := <-self.progressIntChan:
- if self.state != SESSION_RUNNING {
+ if self.state == SESSION_RUNNING {
self.callProgressHandler(&p)
}
case r := <-self.doneIntChan: