summaryrefslogtreecommitdiff
path: root/src/helsinki.at/rhimport
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2015-12-21 23:45:49 (GMT)
committerChristian Pointner <equinox@helsinki.at>2015-12-21 23:45:49 (GMT)
commitc0463be82447f269d9947d16a40881dbbe600827 (patch)
tree323dbe19236ee179bea46c9d40af54e2a3403eb3 /src/helsinki.at/rhimport
parentb917755aab166ebb59ebdb5356114d373ce5cb91 (diff)
basic session handling works now
Diffstat (limited to 'src/helsinki.at/rhimport')
-rw-r--r--src/helsinki.at/rhimport/fetcher.go2
-rw-r--r--src/helsinki.at/rhimport/importer.go16
-rw-r--r--src/helsinki.at/rhimport/session.go63
-rw-r--r--src/helsinki.at/rhimport/session_store.go2
4 files changed, 53 insertions, 30 deletions
diff --git a/src/helsinki.at/rhimport/fetcher.go b/src/helsinki.at/rhimport/fetcher.go
index 1f0b364..36d7055 100644
--- a/src/helsinki.at/rhimport/fetcher.go
+++ b/src/helsinki.at/rhimport/fetcher.go
@@ -193,7 +193,7 @@ func FetchFileFake(ctx *ImportContext, res *FetchResult, uri *url.URL) error {
time.Sleep(100 * time.Millisecond)
}
if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData)
+ ctx.ProgressCallBack(1, "faking", 1.0, ctx.ProgressCallBackData)
}
ctx.SourceFile = "/nonexistend/fake.mp3"
ctx.DeleteSourceFile = false
diff --git a/src/helsinki.at/rhimport/importer.go b/src/helsinki.at/rhimport/importer.go
index 1bfdf0c..a245270 100644
--- a/src/helsinki.at/rhimport/importer.go
+++ b/src/helsinki.at/rhimport/importer.go
@@ -693,18 +693,20 @@ func ImportFile(ctx *ImportContext) (res *ImportResult, err error) {
}
if ctx.Cart != 0 && ctx.Cut != 0 {
- if err = import_audio(ctx, res); err != nil {
- return
- }
- if res.ResponseCode != http.StatusOK {
- rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", res.Cart, res.Cut, res.ErrorString)
+ if err = import_audio(ctx, res); err != nil || res.ResponseCode != http.StatusOK {
+ if err != nil {
+ rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", ctx.Cart, ctx.Cut, err)
+ } else {
+ rhl.Printf("Fileimport has failed (Cart/Cut %d/%d): %s", res.Cart, res.Cut, res.ErrorString)
+ }
+ // Try to clean up after failed import
rmres := ImportResult{ResponseCode: http.StatusOK}
if rmCartOnErr {
- if err = remove_cart(ctx, &rmres); err != nil {
+ if rerr := remove_cart(ctx, &rmres); rerr != nil {
return
}
} else if rmCutOnErr {
- if err = remove_cut(ctx, &rmres); err != nil {
+ if rerr := remove_cut(ctx, &rmres); rerr != nil {
return
}
}
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go
index 8f20958..08a33ce 100644
--- a/src/helsinki.at/rhimport/session.go
+++ b/src/helsinki.at/rhimport/session.go
@@ -25,7 +25,14 @@
package rhimport
import (
- "time"
+ "net/http"
+)
+
+const (
+ SESSION_NEW = iota
+ SESSION_RUNNING
+ SESSION_CANCELED
+ SESSION_DONE
)
type SessionChan struct {
@@ -37,8 +44,9 @@ type SessionChan struct {
type Session struct {
ctx ImportContext
- running bool
+ state int
done chan bool
+ cancelIntChan chan bool
progressIntChan chan ProgressData
doneIntChan chan ImportResult
runChan chan bool
@@ -79,26 +87,35 @@ func session_progress_callback(step int, step_name string, progress float64, use
out <- ProgressData{step, step_name, progress}
}
-// TODO: actually call import here
func session_import_run(ctx ImportContext, done chan<- ImportResult) {
- rhdl.Printf("faking import for: %+v", ctx)
- for i := 0; i < 100; i++ {
- if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(42, "faking", float64(i)/100.0, ctx.ProgressCallBackData)
- }
- time.Sleep(100 * time.Millisecond)
+ if err := ctx.SanityCheck(); err != nil {
+ done <- ImportResult{http.StatusBadRequest, err.Error(), 0, 0}
+ return
}
- if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData)
+
+ if res, err := FetchFile(&ctx); err != nil {
+ done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0}
+ return
+ } else if res.ResponseCode != http.StatusOK {
+ done <- ImportResult{res.ResponseCode, res.ErrorString, 0, 0}
+ return
+ }
+
+ if res, err := ImportFile(&ctx); err != nil {
+ done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0}
+ return
+ } else {
+ done <- *res
+ return
}
- done <- ImportResult{200, "OK", 0, 0}
}
func (self *Session) run() {
self.ctx.ProgressCallBack = session_progress_callback
self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan)
+ self.ctx.Cancel = self.cancelIntChan
go session_import_run(self.ctx, self.doneIntChan)
- self.running = true
+ self.state = SESSION_RUNNING
return
}
@@ -149,13 +166,16 @@ func (self *Session) dispatchRequests() {
for {
select {
case <-self.runChan:
- if !self.running {
+ if self.state == SESSION_NEW {
self.run()
}
case <-self.cancelChan:
- if self.running {
- rhdl.Println("Session: canceling running imports is not yet implemented")
- // TODO: send cancel to import goroutine once this got implemented
+ if self.state == SESSION_RUNNING {
+ rhdl.Println("Session: canceling running import")
+ select {
+ case self.cancelIntChan <- true:
+ default: // session got canceled already
+ }
} else {
return
}
@@ -167,7 +187,7 @@ func (self *Session) dispatchRequests() {
rhdl.Printf("Session: got progress: %+v", progress)
// TODO: call all subscribed progress handler
case result := <-self.doneIntChan:
- self.running = false
+ self.state = SESSION_DONE
rhdl.Printf("Session: import is done: %+v", result)
// TODO: call all subscribed done handler
}
@@ -184,9 +204,8 @@ func (self *Session) getInterface() *SessionChan {
}
func (self *Session) Cleanup() {
- // TODO: this blocks if dispatchRequests has ended already...
- // or if cancel doesn't work...
self.cancelChan <- true
+ rhdl.Printf("waiting for session to close")
<-self.done
close(self.done)
close(self.progressIntChan)
@@ -195,13 +214,15 @@ func (self *Session) Cleanup() {
close(self.cancelChan)
close(self.addProgressChan)
close(self.addDoneChan)
+ rhdl.Printf("session is now cleaned up")
}
func NewSession(ctx *ImportContext) (session *Session) {
session = new(Session)
- session.running = false
+ session.state = SESSION_NEW
session.ctx = *ctx
session.done = make(chan bool)
+ session.cancelIntChan = make(chan bool, 1)
session.progressIntChan = make(chan ProgressData)
session.doneIntChan = make(chan ImportResult)
session.runChan = make(chan bool)
diff --git a/src/helsinki.at/rhimport/session_store.go b/src/helsinki.at/rhimport/session_store.go
index b2e1538..8dd74c7 100644
--- a/src/helsinki.at/rhimport/session_store.go
+++ b/src/helsinki.at/rhimport/session_store.go
@@ -130,7 +130,7 @@ func (self *SessionStoreChan) Get(user, id string) (*SessionChan, error) {
func (self *SessionStore) remove(user, id string) (resp removeSessionResponse) {
if session, exists := self.store[user][id]; exists {
- session.Cleanup()
+ go session.Cleanup() // cleanup could take a while -> don't block all the other stuff
delete(self.store[user], id)
rhdl.Printf("SessionStore: removed session '%s/%s'", user, id)
if userstore, exists := self.store[user]; exists {