summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2015-12-21 21:19:27 (GMT)
committerChristian Pointner <equinox@helsinki.at>2015-12-21 21:19:27 (GMT)
commit0912ca15a61bc9eba102fc967367f8c1665842dc (patch)
tree4a479f3d242020a2340d94effb5a54aa64bd9978
parentc06f4c3db13c69ec5ff8f3685aa5055c2645b144 (diff)
canceling using works now
-rw-r--r--src/helsinki.at/rhimport/fetcher.go41
-rw-r--r--src/helsinki.at/rhimport/importer.go11
-rw-r--r--src/helsinki.at/rhimportd/ctrlTelnet.go24
3 files changed, 64 insertions, 12 deletions
diff --git a/src/helsinki.at/rhimport/fetcher.go b/src/helsinki.at/rhimport/fetcher.go
index bff62bc..24635d1 100644
--- a/src/helsinki.at/rhimport/fetcher.go
+++ b/src/helsinki.at/rhimport/fetcher.go
@@ -34,7 +34,9 @@ import (
"os"
"path"
"path/filepath"
+ "strconv"
"strings"
+ "time"
)
type FetchResult struct {
@@ -114,10 +116,11 @@ func FetchFileCurl(ctx *ImportContext, res *FetchResult, uri *url.URL) (err erro
easy.Setopt(curl.OPT_NOPROGRESS, false)
easy.Setopt(curl.OPT_PROGRESSFUNCTION, func(dltotal, dlnow, ultotal, ulnow float64, userdata interface{}) bool {
if ctx.Cancel != nil && len(ctx.Cancel) > 0 {
+ res.ResponseCode = http.StatusNoContent
+ res.ErrorString = "canceled"
return false
}
- ctx := userdata.(*ImportContext)
if ctx.ProgressCallBack != nil {
ctx.ProgressCallBack(1, "downloading", dlnow/dltotal, ctx.ProgressCallBackData)
}
@@ -126,7 +129,11 @@ func FetchFileCurl(ctx *ImportContext, res *FetchResult, uri *url.URL) (err erro
easy.Setopt(curl.OPT_PROGRESSDATA, ctx)
if err = easy.Perform(); err != nil {
- err = fmt.Errorf("curl-fetcher('%s'): %s", ctx.SourceUri, err)
+ if res.ResponseCode == http.StatusNoContent {
+ err = nil
+ } else {
+ err = fmt.Errorf("curl-fetcher('%s'): %s", ctx.SourceUri, err)
+ }
return
}
@@ -159,6 +166,35 @@ func FetchFileLocal(ctx *ImportContext, res *FetchResult, uri *url.URL) (err err
return
}
+func FetchFileFake(ctx *ImportContext, res *FetchResult, uri *url.URL) error {
+ rhdl.Printf("Fake fetcher for '%s'", ctx.SourceUri)
+
+ if duration, err := strconv.ParseUint(uri.Host, 10, 32); err != nil {
+ err = nil
+ res.ResponseCode = http.StatusBadRequest
+ res.ErrorString = "invalid duration (must be a positive integer)"
+ } else {
+ for i := uint(0); i < uint(duration); i++ {
+ if ctx.Cancel != nil && len(ctx.Cancel) > 0 {
+ res.ResponseCode = http.StatusNoContent
+ res.ErrorString = "canceled"
+ return nil
+ }
+ if ctx.ProgressCallBack != nil {
+ ctx.ProgressCallBack(1, "faking", float64(i)/float64(duration), ctx.ProgressCallBackData)
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ if ctx.ProgressCallBack != nil {
+ ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData)
+ }
+ ctx.SourceFile = "/nonexistend/fake.mp3"
+ ctx.DeleteSourceFile = false
+ ctx.DeleteSourceDir = false
+ }
+ return nil
+}
+
type FetchFunc func(*ImportContext, *FetchResult, *url.URL) (err error)
// TODO: implement fetchers for:
@@ -168,6 +204,7 @@ type FetchFunc func(*ImportContext, *FetchResult, *url.URL) (err error)
var (
fetchers = map[string]FetchFunc{
"local": FetchFileLocal,
+ "fake": FetchFileFake,
}
curl_protos = map[string]bool{
"http": false, "https": false,
diff --git a/src/helsinki.at/rhimport/importer.go b/src/helsinki.at/rhimport/importer.go
index df491d7..512daf6 100644
--- a/src/helsinki.at/rhimport/importer.go
+++ b/src/helsinki.at/rhimport/importer.go
@@ -502,10 +502,11 @@ func import_audio(ctx *ImportContext, res *ImportResult) (err error) {
easy.Setopt(curl.OPT_NOPROGRESS, false)
easy.Setopt(curl.OPT_PROGRESSFUNCTION, func(dltotal, dlnow, ultotal, ulnow float64, userdata interface{}) bool {
if ctx.Cancel != nil && len(ctx.Cancel) > 0 {
+ res.ResponseCode = http.StatusNoContent
+ res.ErrorString = "canceled"
return false
}
- ctx := userdata.(*ImportContext)
if ctx.ProgressCallBack != nil {
ctx.ProgressCallBack(2, "importing", ulnow/ultotal, ctx.ProgressCallBackData)
}
@@ -514,7 +515,13 @@ func import_audio(ctx *ImportContext, res *ImportResult) (err error) {
easy.Setopt(curl.OPT_PROGRESSDATA, ctx)
if err = easy.Perform(); err != nil {
- err = fmt.Errorf("importer: %s", err)
+ if res.ResponseCode == http.StatusNoContent {
+ res.Cart = ctx.Cart
+ res.Cut = ctx.Cut
+ err = nil
+ } else {
+ err = fmt.Errorf("importer: %s", err)
+ }
return
}
diff --git a/src/helsinki.at/rhimportd/ctrlTelnet.go b/src/helsinki.at/rhimportd/ctrlTelnet.go
index ff94d49..41ac7f2 100644
--- a/src/helsinki.at/rhimportd/ctrlTelnet.go
+++ b/src/helsinki.at/rhimportd/ctrlTelnet.go
@@ -296,7 +296,7 @@ func telnet_cmd_run(ctx rhimport.ImportContext, out chan<- string) {
}
}
-func (c *TelnetClient) handle_cmd_run(args []string) {
+func (c *TelnetClient) handle_cmd_run(args []string, cancel <-chan bool) {
if c.ctx == nil {
c.say("context is empty please set at least one option")
return
@@ -306,17 +306,22 @@ func (c *TelnetClient) handle_cmd_run(args []string) {
c.say("sanity check for import context returned: %s", err)
return
}
+ select {
+ case <-cancel: // consume potentially pending cancel request
+ default:
+ }
stdout := make(chan string)
c.ctx.ProgressCallBack = telnet_progress_callback
c.ctx.ProgressCallBackData = (chan<- string)(stdout)
+ c.ctx.Cancel = cancel
go telnet_cmd_run(*c.ctx, stdout)
for str := range stdout {
c.write_string(str)
}
}
-func (c *TelnetClient) handle_cmd(cmdstr string, done chan<- bool) {
+func (c *TelnetClient) handle_cmd(cmdstr string, done chan<- bool, cancel <-chan bool) {
cmdslice := strings.Fields(cmdstr)
if len(cmdslice) == 0 || cmdslice[0] == "" {
done <- false
@@ -337,14 +342,14 @@ func (c *TelnetClient) handle_cmd(cmdstr string, done chan<- bool) {
} else if cmd == "show" {
c.handle_cmd_show(args)
} else if cmd == "run" {
- c.handle_cmd_run(args)
+ c.handle_cmd_run(args, cancel)
} else {
c.say("unknown command '%s'", cmd)
}
done <- false
}
-func (c *TelnetClient) handle_iac(iac []byte) bool {
+func (c *TelnetClient) handle_iac(iac []byte, cancel chan<- bool) bool {
if len(iac) < 2 {
return false // this shouldn't happen
}
@@ -355,8 +360,10 @@ func (c *TelnetClient) handle_iac(iac []byte) bool {
case DO, DONT:
iac[1] = WONT
case IP:
- // TODO: cancel running command (if any)
- rhdl.Printf("canceling running process - is not yet implemented!")
+ select {
+ case cancel <- true:
+ default: // process got canceled already
+ }
return false
default:
rhdl.Printf("ignoring unimplemented telnet command: %X", iac[1])
@@ -446,6 +453,7 @@ func (c *TelnetClient) handle() {
go c.recv(in)
done := make(chan bool)
+ cancel := make(chan bool, 1)
c.write_string(prompt)
for {
select {
@@ -456,9 +464,9 @@ func (c *TelnetClient) handle() {
if len(cmd) > 0 {
switch cmd[0] {
case IAC:
- c.handle_iac([]byte(cmd))
+ c.handle_iac([]byte(cmd), cancel)
default:
- go c.handle_cmd(cmd, done)
+ go c.handle_cmd(cmd, done, cancel)
}
} else {
c.write_string(prompt)