summaryrefslogtreecommitdiff
path: root/src/helsinki.at
diff options
context:
space:
mode:
Diffstat (limited to 'src/helsinki.at')
-rw-r--r--src/helsinki.at/rhimport/session.go51
-rw-r--r--src/helsinki.at/rhimportd/main.go33
2 files changed, 52 insertions, 32 deletions
diff --git a/src/helsinki.at/rhimport/session.go b/src/helsinki.at/rhimport/session.go
index bce1072..88648f7 100644
--- a/src/helsinki.at/rhimport/session.go
+++ b/src/helsinki.at/rhimport/session.go
@@ -27,6 +27,7 @@ package rhimport
import (
"fmt"
"net/http"
+ "time"
)
const (
@@ -34,10 +35,11 @@ const (
SESSION_RUNNING
SESSION_CANCELED
SESSION_DONE
+ SESSION_TIMEOUT
)
type SessionChan struct {
- runChan chan<- bool
+ runChan chan<- time.Duration
cancelChan chan<- bool
addProgressChan chan<- sessionAddProgressHandlerRequest
addDoneChan chan<- sessionAddDoneHandlerRequest
@@ -49,10 +51,11 @@ type Session struct {
removeFunc func()
done chan bool
quit chan bool
+ timer *time.Timer
cancelIntChan chan bool
progressIntChan chan ProgressData
doneIntChan chan ImportResult
- runChan chan bool
+ runChan chan time.Duration
cancelChan chan bool
addProgressChan chan sessionAddProgressHandlerRequest
addDoneChan chan sessionAddDoneHandlerRequest
@@ -125,18 +128,19 @@ func session_import_run(ctx ImportContext, done chan<- ImportResult) {
}
}
-func (self *Session) run() {
+func (self *Session) run(timeout time.Duration) {
self.ctx.ProgressCallBack = session_progress_callback
self.ctx.ProgressCallBackData = (chan<- ProgressData)(self.progressIntChan)
self.ctx.Cancel = self.cancelIntChan
go session_import_run(self.ctx, self.doneIntChan)
self.state = SESSION_RUNNING
+ self.timer.Reset(timeout)
return
}
-func (self *SessionChan) Run() {
+func (self *SessionChan) Run(timeout time.Duration) {
select {
- case self.runChan <- true:
+ case self.runChan <- timeout:
default: // command is already pending or session is about to be closed/removed
}
}
@@ -182,7 +186,7 @@ func (self *SessionChan) AddProgressHandler(userdata interface{}, cb ImportProgr
}
func (self *Session) addDoneHandler(userdata interface{}, cb func(ImportResult, interface{}) bool) (resp sessionAddDoneHandlerResponse) {
- if self.state == SESSION_NEW && self.state != SESSION_RUNNING {
+ if self.state != SESSION_NEW && self.state != SESSION_RUNNING {
resp.err = fmt.Errorf("session is already done/canceled")
}
self.doneCBs = append(self.doneCBs, &SessionDoneCB{cb, userdata})
@@ -234,9 +238,19 @@ func (self *Session) dispatchRequests() {
self.cancel()
}
return
- case <-self.runChan:
+ case <-self.timer.C:
+ if self.state == SESSION_RUNNING {
+ self.cancel()
+ }
+ self.state = SESSION_TIMEOUT
+ r := &ImportResult{500, "timeout", 0, 0}
+ self.callDoneHandler(r)
+ if self.removeFunc != nil {
+ self.removeFunc()
+ }
+ case t := <-self.runChan:
if self.state == SESSION_NEW {
- self.run()
+ self.run(t)
}
case <-self.cancelChan:
if self.state == SESSION_RUNNING {
@@ -249,10 +263,13 @@ func (self *Session) dispatchRequests() {
case p := <-self.progressIntChan:
self.callProgressHandler(&p)
case r := <-self.doneIntChan:
- self.state = SESSION_DONE
- self.callDoneHandler(&r)
- if self.removeFunc != nil {
- self.removeFunc()
+ if self.state != SESSION_TIMEOUT {
+ self.timer.Stop()
+ self.state = SESSION_DONE
+ self.callDoneHandler(&r)
+ if self.removeFunc != nil {
+ self.removeFunc()
+ }
}
}
}
@@ -271,11 +288,14 @@ func (self *Session) Cleanup() {
self.quit <- true
rhdl.Printf("waiting for session to close")
<-self.done
+ close(self.quit)
close(self.done)
- close(self.progressIntChan)
- close(self.doneIntChan)
+ self.timer.Stop()
// don't close the channels we give out because this might lead to a panic if
// somebody wites to an already removed session
+ // close(self.cancelIntChan)
+ // close(self.progressIntChan)
+ // close(self.doneIntChan)
// close(self.runChan)
// close(self.cancelChan)
// close(self.addProgressChan)
@@ -289,10 +309,11 @@ func NewSession(ctx *ImportContext, removeFunc func()) (session *Session) {
session.removeFunc = removeFunc
session.ctx = *ctx
session.done = make(chan bool)
+ session.timer = time.NewTimer(10 * time.Second)
session.cancelIntChan = make(chan bool, 1)
session.progressIntChan = make(chan ProgressData, 10)
session.doneIntChan = make(chan ImportResult, 1)
- session.runChan = make(chan bool, 1)
+ session.runChan = make(chan time.Duration, 1)
session.cancelChan = make(chan bool, 1)
session.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10)
session.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10)
diff --git a/src/helsinki.at/rhimportd/main.go b/src/helsinki.at/rhimportd/main.go
index dee6b37..56fe612 100644
--- a/src/helsinki.at/rhimportd/main.go
+++ b/src/helsinki.at/rhimportd/main.go
@@ -93,36 +93,35 @@ func session_test(conf *rhimport.Config, rddb *rhimport.RdDbChan) {
}
var session *rhimport.SessionChan
- if session, err = store.Get(ctx.UserName, "<test>"); err != nil {
- rhl.Printf("MAIN: (Expected) Error SessionStore.Get(): %s", err)
- }
if session, err = store.Get(ctx.UserName, id); err != nil {
rhl.Printf("MAIN: Error SessionStore.Get(): %s", err)
return
}
- pch := make(chan string, 1000)
- if err = session.AddProgressHandler((chan<- string)(pch), session_test_progress1); err != nil {
- rhl.Printf("MAIN: Error Session.AddProgressHandler(): %s", err)
- }
- dch := make(chan rhimport.ImportResult)
+ dch := make(chan rhimport.ImportResult, 1)
if err = session.AddDoneHandler((chan<- rhimport.ImportResult)(dch), session_test_done); err != nil {
rhl.Printf("MAIN: Error Session.AddDoneHandler(): %s", err)
+ return
}
- rhl.Printf("MAIN: calling run for %s/%s", ctx.UserName, id)
- session.Run()
+ // pch := make(chan string, 10)
+ // if err = session.AddProgressHandler((chan<- string)(pch), session_test_progress1); err != nil {
+ // rhl.Printf("MAIN: Error Session.AddProgressHandler(): %s", err)
+ // }
+ // go func() {
+ // time.Sleep(3 * time.Second)
+ // if err = session.AddProgressHandler((chan<- string)(pch), session_test_progress2); err != nil {
+ // rhl.Printf("MAIN: Error Session.AddProgressHandler(): %s", err)
+ // }
+ // }()
- time.Sleep(500 * time.Millisecond)
-
- if err = session.AddProgressHandler((chan<- string)(pch), session_test_progress2); err != nil {
- rhl.Printf("MAIN: Error Session.AddProgressHandler(): %s", err)
- }
+ rhl.Printf("MAIN: calling run for %s/%s", ctx.UserName, id)
+ session.Run(10 * time.Second)
for {
select {
- case p := <-pch:
- fmt.Println(p)
+ // case p := <-pch:
+ // fmt.Println(p)
case r := <-dch:
fmt.Printf("Import finished: %+v\n", r)
break