summaryrefslogtreecommitdiff
path: root/src/helsinki.at/rhimport/session.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/helsinki.at/rhimport/session.go')
-rw-r--r--src/helsinki.at/rhimport/session.go63
1 files changed, 42 insertions, 21 deletions
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go
index 8f20958..08a33ce 100644
--- a/src/helsinki.at/rhimport/session.go
+++ b/src/helsinki.at/rhimport/session.go
@@ -25,7 +25,14 @@
package rhimport
import (
- "time"
+ "net/http"
+)
+
+const (
+ SESSION_NEW = iota
+ SESSION_RUNNING
+ SESSION_CANCELED
+ SESSION_DONE
)
type SessionChan struct {
@@ -37,8 +44,9 @@ type SessionChan struct {
type Session struct {
ctx ImportContext
- running bool
+ state int
done chan bool
+ cancelIntChan chan bool
progressIntChan chan ProgressData
doneIntChan chan ImportResult
runChan chan bool
@@ -79,26 +87,35 @@ func session_progress_callback(step int, step_name string, progress float64, use
out <- ProgressData{step, step_name, progress}
}
-// TODO: actually call import here
func session_import_run(ctx ImportContext, done chan<- ImportResult) {
- rhdl.Printf("faking import for: %+v", ctx)
- for i := 0; i < 100; i++ {
- if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(42, "faking", float64(i)/100.0, ctx.ProgressCallBackData)
- }
- time.Sleep(100 * time.Millisecond)
+ if err := ctx.SanityCheck(); err != nil {
+ done <- ImportResult{http.StatusBadRequest, err.Error(), 0, 0}
+ return
}
- if ctx.ProgressCallBack != nil {
- ctx.ProgressCallBack(42, "faking", 1.0, ctx.ProgressCallBackData)
+
+ if res, err := FetchFile(&ctx); err != nil {
+ done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0}
+ return
+ } else if res.ResponseCode != http.StatusOK {
+ done <- ImportResult{res.ResponseCode, res.ErrorString, 0, 0}
+ return
+ }
+
+ if res, err := ImportFile(&ctx); err != nil {
+ done <- ImportResult{http.StatusInternalServerError, err.Error(), 0, 0}
+ return
+ } else {
+ done <- *res
+ return
}
- done <- ImportResult{200, "OK", 0, 0}
}
func (self *Session) run() {
self.ctx.ProgressCallBack = session_progress_callback
self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan)
+ self.ctx.Cancel = self.cancelIntChan
go session_import_run(self.ctx, self.doneIntChan)
- self.running = true
+ self.state = SESSION_RUNNING
return
}
@@ -149,13 +166,16 @@ func (self *Session) dispatchRequests() {
for {
select {
case <-self.runChan:
- if !self.running {
+ if self.state == SESSION_NEW {
self.run()
}
case <-self.cancelChan:
- if self.running {
- rhdl.Println("Session: canceling running imports is not yet implemented")
- // TODO: send cancel to import goroutine once this got implemented
+ if self.state == SESSION_RUNNING {
+ rhdl.Println("Session: canceling running import")
+ select {
+ case self.cancelIntChan <- true:
+ default: // session got canceled already
+ }
} else {
return
}
@@ -167,7 +187,7 @@ func (self *Session) dispatchRequests() {
rhdl.Printf("Session: got progress: %+v", progress)
// TODO: call all subscribed progress handler
case result := <-self.doneIntChan:
- self.running = false
+ self.state = SESSION_DONE
rhdl.Printf("Session: import is done: %+v", result)
// TODO: call all subscribed done handler
}
@@ -184,9 +204,8 @@ func (self *Session) getInterface() *SessionChan {
}
func (self *Session) Cleanup() {
- // TODO: this blocks if dispatchRequests has ended already...
- // or if cancel doesn't work...
self.cancelChan <- true
+ rhdl.Printf("waiting for session to close")
<-self.done
close(self.done)
close(self.progressIntChan)
@@ -195,13 +214,15 @@ func (self *Session) Cleanup() {
close(self.cancelChan)
close(self.addProgressChan)
close(self.addDoneChan)
+ rhdl.Printf("session is now cleaned up")
}
func NewSession(ctx *ImportContext) (session *Session) {
session = new(Session)
- session.running = false
+ session.state = SESSION_NEW
session.ctx = *ctx
session.done = make(chan bool)
+ session.cancelIntChan = make(chan bool, 1)
session.progressIntChan = make(chan ProgressData)
session.doneIntChan = make(chan ImportResult)
session.runChan = make(chan bool)