diff options
author | Christian Pointner <equinox@helsinki.at> | 2015-12-21 21:19:27 (GMT) |
---|---|---|
committer | Christian Pointner <equinox@helsinki.at> | 2015-12-21 21:19:27 (GMT) |
commit | 0912ca15a61bc9eba102fc967367f8c1665842dc (patch) | |
tree | 4a479f3d242020a2340d94effb5a54aa64bd9978 /src | |
parent | c06f4c3db13c69ec5ff8f3685aa5055c2645b144 (diff) |
canceling using works now
Diffstat (limited to 'src')
-rw-r--r-- | src/helsinki.at/rhimport/fetcher.go | 41 | ||||
-rw-r--r-- | src/helsinki.at/rhimport/importer.go | 11 | ||||
-rw-r--r-- | src/helsinki.at/rhimportd/ctrlTelnet.go | 24 |
3 files changed, 64 insertions, 12 deletions
diff --git a/src/helsinki.at/rhimport/fetcher.go b/src/helsinki.at/rhimport/fetcher.go index bff62bc..24635d1 100644 --- a/src/helsinki.at/rhimport/fetcher.go +++ b/src/helsinki.at/rhimport/fetcher.go @@ -34,7 +34,9 @@ import ( "os" "path" "path/filepath" + "strconv" "strings" + "time" ) type FetchResult struct { @@ -114,10 +116,11 @@ func FetchFileCurl(ctx *ImportContext, res *FetchResult, uri *url.URL) (err erro easy.Setopt(curl.OPT_NOPROGRESS, false) easy.Setopt(curl.OPT_PROGRESSFUNCTION, func(dltotal, dlnow, ultotal, ulnow float64, userdata interface{}) bool { if ctx.Cancel != nil && len(ctx.Cancel) > 0 { + res.ResponseCode = http.StatusNoContent + res.ErrorString = "canceled" return false } - ctx := userdata.(*ImportContext) if ctx.ProgressCallBack != nil { ctx.ProgressCallBack(1, "downloading", dlnow/dltotal, ctx.ProgressCallBackData) } @@ -126,7 +129,11 @@ func FetchFileCurl(ctx *ImportContext, res *FetchResult, uri *url.URL) (err erro easy.Setopt(curl.OPT_PROGRESSDATA, ctx) if err = easy.Perform(); err != nil { - err = fmt.Errorf("curl-fetcher('%s'): %s", ctx.SourceUri, err) + if res.ResponseCode == http.StatusNoContent { + err = nil + } else { + err = fmt.Errorf("curl-fetcher('%s'): %s", ctx.SourceUri, err) + } return } @@ -159,6 +166,35 @@ func FetchFileLocal(ctx *ImportContext, res *FetchResult, uri *url.URL) (err err return } +func FetchFileFake(ctx *ImportContext, res *FetchResult, uri *url.URL) error { + rhdl.Printf("Fake fetcher for '%s'", ctx.SourceUri) + + if duration, err := strconv.ParseUint(uri.Host, 10, 32); err != nil { + err = nil + res.ResponseCode = http.StatusBadRequest + res.ErrorString = "invalid duration (must be a positive integer)" + } else { + for i := uint(0); i < uint(duration); i++ { + if ctx.Cancel != nil && len(ctx.Cancel) > 0 { + res.ResponseCode = http.StatusNoContent + res.ErrorString = "canceled" + return nil + } + if ctx.ProgressCallBack != nil { + ctx.ProgressCallBack(1, "faking", float64(i)/float64(duration), ctx.ProgressCallBackData) + } + time.Sleep(100 * time.Millisecond) + } + if ctx.ProgressCallBack != nil { + ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData) + } + ctx.SourceFile = "/nonexistend/fake.mp3" + ctx.DeleteSourceFile = false + ctx.DeleteSourceDir = false + } + return nil +} + type FetchFunc func(*ImportContext, *FetchResult, *url.URL) (err error) // TODO: implement fetchers for: @@ -168,6 +204,7 @@ type FetchFunc func(*ImportContext, *FetchResult, *url.URL) (err error) var ( fetchers = map[string]FetchFunc{ "local": FetchFileLocal, + "fake": FetchFileFake, } curl_protos = map[string]bool{ "http": false, "https": false, diff --git a/src/helsinki.at/rhimport/importer.go b/src/helsinki.at/rhimport/importer.go index df491d7..512daf6 100644 --- a/src/helsinki.at/rhimport/importer.go +++ b/src/helsinki.at/rhimport/importer.go @@ -502,10 +502,11 @@ func import_audio(ctx *ImportContext, res *ImportResult) (err error) { easy.Setopt(curl.OPT_NOPROGRESS, false) easy.Setopt(curl.OPT_PROGRESSFUNCTION, func(dltotal, dlnow, ultotal, ulnow float64, userdata interface{}) bool { if ctx.Cancel != nil && len(ctx.Cancel) > 0 { + res.ResponseCode = http.StatusNoContent + res.ErrorString = "canceled" return false } - ctx := userdata.(*ImportContext) if ctx.ProgressCallBack != nil { ctx.ProgressCallBack(2, "importing", ulnow/ultotal, ctx.ProgressCallBackData) } @@ -514,7 +515,13 @@ func import_audio(ctx *ImportContext, res *ImportResult) (err error) { easy.Setopt(curl.OPT_PROGRESSDATA, ctx) if err = easy.Perform(); err != nil { - err = fmt.Errorf("importer: %s", err) + if res.ResponseCode == http.StatusNoContent { + res.Cart = ctx.Cart + res.Cut = ctx.Cut + err = nil + } else { + err = fmt.Errorf("importer: %s", err) + } return } diff --git a/src/helsinki.at/rhimportd/ctrlTelnet.go b/src/helsinki.at/rhimportd/ctrlTelnet.go index ff94d49..41ac7f2 100644 --- a/src/helsinki.at/rhimportd/ctrlTelnet.go +++ b/src/helsinki.at/rhimportd/ctrlTelnet.go @@ -296,7 +296,7 @@ func telnet_cmd_run(ctx rhimport.ImportContext, out chan<- string) { } } -func (c *TelnetClient) handle_cmd_run(args []string) { +func (c *TelnetClient) handle_cmd_run(args []string, cancel <-chan bool) { if c.ctx == nil { c.say("context is empty please set at least one option") return @@ -306,17 +306,22 @@ func (c *TelnetClient) handle_cmd_run(args []string) { c.say("sanity check for import context returned: %s", err) return } + select { + case <-cancel: // consume potentially pending cancel request + default: + } stdout := make(chan string) c.ctx.ProgressCallBack = telnet_progress_callback c.ctx.ProgressCallBackData = (chan<- string)(stdout) + c.ctx.Cancel = cancel go telnet_cmd_run(*c.ctx, stdout) for str := range stdout { c.write_string(str) } } -func (c *TelnetClient) handle_cmd(cmdstr string, done chan<- bool) { +func (c *TelnetClient) handle_cmd(cmdstr string, done chan<- bool, cancel <-chan bool) { cmdslice := strings.Fields(cmdstr) if len(cmdslice) == 0 || cmdslice[0] == "" { done <- false @@ -337,14 +342,14 @@ func (c *TelnetClient) handle_cmd(cmdstr string, done chan<- bool) { } else if cmd == "show" { c.handle_cmd_show(args) } else if cmd == "run" { - c.handle_cmd_run(args) + c.handle_cmd_run(args, cancel) } else { c.say("unknown command '%s'", cmd) } done <- false } -func (c *TelnetClient) handle_iac(iac []byte) bool { +func (c *TelnetClient) handle_iac(iac []byte, cancel chan<- bool) bool { if len(iac) < 2 { return false // this shouldn't happen } @@ -355,8 +360,10 @@ func (c *TelnetClient) handle_iac(iac []byte) bool { case DO, DONT: iac[1] = WONT case IP: - // TODO: cancel running command (if any) - rhdl.Printf("canceling running process - is not yet implemented!") + select { + case cancel <- true: + default: // process got canceled already + } return false default: rhdl.Printf("ignoring unimplemented telnet command: %X", iac[1]) @@ -446,6 +453,7 @@ func (c *TelnetClient) handle() { go c.recv(in) done := make(chan bool) + cancel := make(chan bool, 1) c.write_string(prompt) for { select { @@ -456,9 +464,9 @@ func (c *TelnetClient) handle() { if len(cmd) > 0 { switch cmd[0] { case IAC: - c.handle_iac([]byte(cmd)) + c.handle_iac([]byte(cmd), cancel) default: - go c.handle_cmd(cmd, done) + go c.handle_cmd(cmd, done, cancel) } } else { c.write_string(prompt) |