From 2006f6c26b324416dae2be63f736ae4f520ef0f8 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Mon, 9 May 2016 19:57:10 +0200 Subject: implemented rate limiting for progress callbacks in session diff --git a/rhimport/session.go b/rhimport/session.go index 9232bc1..e04066a 100644 --- a/rhimport/session.go +++ b/rhimport/session.go @@ -39,21 +39,22 @@ const ( ) type Session struct { - ctx Context - state int - removeFunc func() - done chan bool - quit chan bool - timer *time.Timer - cancelIntChan chan bool - progressIntChan chan ProgressData - doneIntChan chan Result - runChan chan time.Duration - cancelChan chan bool - addProgressChan chan sessionAddProgressHandlerRequest - addDoneChan chan sessionAddDoneHandlerRequest - progressCBs []*SessionProgressCB - doneCBs []*SessionDoneCB + ctx Context + state int + removeFunc func() + done chan bool + quit chan bool + timer *time.Timer + cancelIntChan chan bool + progressRateLimit time.Duration + progressIntChan chan ProgressData + doneIntChan chan Result + runChan chan time.Duration + cancelChan chan bool + addProgressChan chan sessionAddProgressHandlerRequest + addDoneChan chan sessionAddDoneHandlerRequest + progressCBs []*SessionProgressCB + doneCBs []*SessionDoneCB } type SessionProgressCB struct { @@ -181,6 +182,12 @@ func (self *Session) callDoneHandler(r *Result) { func (self *Session) dispatchRequests() { defer func() { self.done <- true }() + + var lastProgress *ProgressData + progressPending := 0 + pt := time.NewTimer(self.progressRateLimit) + pt.Stop() + for { select { case <-self.quit: @@ -210,9 +217,19 @@ func (self *Session) dispatchRequests() { req.response <- self.addProgressHandler(req.userdata, req.callback) case req := <-self.addDoneChan: req.response <- self.addDoneHandler(req.userdata, req.callback) + case <-pt.C: + if progressPending > 1 && lastProgress != nil { + self.callProgressHandler(lastProgress) + } + lastProgress = nil case p := <-self.progressIntChan: if self.state == SESSION_RUNNING { - self.callProgressHandler(&p) + if lastProgress == nil { + self.callProgressHandler(&p) + pt.Reset(self.progressRateLimit) + } + lastProgress = &p + progressPending++ } case r := <-self.doneIntChan: if self.state != SESSION_TIMEOUT { @@ -322,6 +339,7 @@ func newSession(ctx *Context, removeFunc func()) (session *Session) { session.done = make(chan bool) session.timer = time.NewTimer(10 * time.Second) session.cancelIntChan = make(chan bool, 1) + session.progressRateLimit = 100 * time.Millisecond // TODO: hardcoded value session.progressIntChan = make(chan ProgressData, 10) session.doneIntChan = make(chan Result, 1) session.runChan = make(chan time.Duration, 1) -- cgit v0.10.2