summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2016-05-09 17:57:10 (GMT)
committerChristian Pointner <equinox@helsinki.at>2016-05-09 17:57:10 (GMT)
commit2006f6c26b324416dae2be63f736ae4f520ef0f8 (patch)
tree7b82183b861788b99786f81aaa808bb08b276e50
parentf88f22efa5f336cbd3dad60c4c4aaa71f39b0333 (diff)
implemented rate limiting for progress callbacks in session
-rw-r--r--rhimport/session.go50
1 files changed, 34 insertions, 16 deletions
diff --git a/rhimport/session.go b/rhimport/session.go
index 9232bc1..e04066a 100644
--- a/rhimport/session.go
+++ b/rhimport/session.go
@@ -39,21 +39,22 @@ const (
)
type Session struct {
- ctx Context
- state int
- removeFunc func()
- done chan bool
- quit chan bool
- timer *time.Timer
- cancelIntChan chan bool
- progressIntChan chan ProgressData
- doneIntChan chan Result
- runChan chan time.Duration
- cancelChan chan bool
- addProgressChan chan sessionAddProgressHandlerRequest
- addDoneChan chan sessionAddDoneHandlerRequest
- progressCBs []*SessionProgressCB
- doneCBs []*SessionDoneCB
+ ctx Context
+ state int
+ removeFunc func()
+ done chan bool
+ quit chan bool
+ timer *time.Timer
+ cancelIntChan chan bool
+ progressRateLimit time.Duration
+ progressIntChan chan ProgressData
+ doneIntChan chan Result
+ runChan chan time.Duration
+ cancelChan chan bool
+ addProgressChan chan sessionAddProgressHandlerRequest
+ addDoneChan chan sessionAddDoneHandlerRequest
+ progressCBs []*SessionProgressCB
+ doneCBs []*SessionDoneCB
}
type SessionProgressCB struct {
@@ -181,6 +182,12 @@ func (self *Session) callDoneHandler(r *Result) {
func (self *Session) dispatchRequests() {
defer func() { self.done <- true }()
+
+ var lastProgress *ProgressData
+ progressPending := 0
+ pt := time.NewTimer(self.progressRateLimit)
+ pt.Stop()
+
for {
select {
case <-self.quit:
@@ -210,9 +217,19 @@ func (self *Session) dispatchRequests() {
req.response <- self.addProgressHandler(req.userdata, req.callback)
case req := <-self.addDoneChan:
req.response <- self.addDoneHandler(req.userdata, req.callback)
+ case <-pt.C:
+ if progressPending > 1 && lastProgress != nil {
+ self.callProgressHandler(lastProgress)
+ }
+ lastProgress = nil
case p := <-self.progressIntChan:
if self.state == SESSION_RUNNING {
- self.callProgressHandler(&p)
+ if lastProgress == nil {
+ self.callProgressHandler(&p)
+ pt.Reset(self.progressRateLimit)
+ }
+ lastProgress = &p
+ progressPending++
}
case r := <-self.doneIntChan:
if self.state != SESSION_TIMEOUT {
@@ -322,6 +339,7 @@ func newSession(ctx *Context, removeFunc func()) (session *Session) {
session.done = make(chan bool)
session.timer = time.NewTimer(10 * time.Second)
session.cancelIntChan = make(chan bool, 1)
+ session.progressRateLimit = 100 * time.Millisecond // TODO: hardcoded value
session.progressIntChan = make(chan ProgressData, 10)
session.doneIntChan = make(chan Result, 1)
session.runChan = make(chan time.Duration, 1)