From bb072813ee52a605cc44d424ec1d037cbaee76a8 Mon Sep 17 00:00:00 2001
From: Christian Pointner <equinox@helsinki.at>
Date: Sun, 17 Jul 2016 19:50:03 +0200
Subject: send add/remove updates from session_store


diff --git a/rhimport/session_store.go b/rhimport/session_store.go
index f744f50..f25a297 100644
--- a/rhimport/session_store.go
+++ b/rhimport/session_store.go
@@ -60,6 +60,13 @@ type getSessionRequest struct {
 	response chan getSessionResponse
 }
 
+type SessionsUpdateCB func(added, removed map[string]string, userdata interface{}) bool
+
+type SessionsListCB struct {
+	cb       SessionsUpdateCB
+	userdata interface{}
+}
+
 type listSessionsResponse struct {
 	sessions     map[string]string
 	responsecode int
@@ -70,6 +77,8 @@ type listSessionsRequest struct {
 	user     string
 	password string
 	trusted  bool
+	callback SessionsUpdateCB
+	userdata interface{}
 	response chan listSessionsResponse
 }
 
@@ -84,13 +93,42 @@ type removeSessionRequest struct {
 	response chan removeSessionResponse
 }
 
-type SessionStoreElement struct {
+type SessionStoreSessionElement struct {
 	s     *Session
 	refId string
 }
 
+type SessionStoreUserElement struct {
+	sessions  map[string]*SessionStoreSessionElement
+	updateCBs []SessionsListCB
+}
+
+func (user *SessionStoreUserElement) callUpdateHandler(added, removed map[string]string) {
+	var keptCBs []SessionsListCB
+	for _, cb := range user.updateCBs {
+		if cb.cb != nil {
+			if keep := cb.cb(added, removed, cb.userdata); keep {
+				keptCBs = append(keptCBs, cb)
+			}
+		}
+	}
+	user.updateCBs = keptCBs
+}
+
+func (user *SessionStoreUserElement) callUpdateHandlerAdd(id, refId string) {
+	added := make(map[string]string)
+	added[id] = refId
+	user.callUpdateHandler(added, nil)
+}
+
+func (user *SessionStoreUserElement) callUpdateHandlerRemove(id, refId string) {
+	removed := make(map[string]string)
+	removed[id] = refId
+	user.callUpdateHandler(nil, removed)
+}
+
 type SessionStore struct {
-	store      map[string]map[string]*SessionStoreElement
+	store      map[string]*SessionStoreUserElement
 	conf       *Config
 	db         *rddb.DBChan
 	quit       chan bool
@@ -109,11 +147,11 @@ func generateSessionId() (string, error) {
 	return base64.RawStdEncoding.EncodeToString(b[:]), nil
 }
 
-func (self *SessionStore) new(ctx *Context, refId string) (resp newSessionResponse) {
+func (store *SessionStore) new(ctx *Context, refId string) (resp newSessionResponse) {
 	resp.responsecode = http.StatusOK
 	resp.errorstring = "OK"
 	if !ctx.Trusted {
-		if ok, err := self.db.CheckPassword(ctx.UserName, ctx.Password); err != nil {
+		if ok, err := store.db.CheckPassword(ctx.UserName, ctx.Password); err != nil {
 			resp.responsecode = http.StatusInternalServerError
 			resp.errorstring = err.Error()
 			return
@@ -128,23 +166,33 @@ func (self *SessionStore) new(ctx *Context, refId string) (resp newSessionRespon
 		resp.errorstring = err.Error()
 	} else {
 		resp.id = id
-		if _, exists := self.store[ctx.UserName]; !exists {
-			self.store[ctx.UserName] = make(map[string]*SessionStoreElement)
+		if _, exists := store.store[ctx.UserName]; !exists {
+			newuser := &SessionStoreUserElement{}
+			newuser.sessions = make(map[string]*SessionStoreSessionElement)
+			store.store[ctx.UserName] = newuser
 		}
-		ctx.conf = self.conf
-		ctx.db = self.db
-		s := &SessionStoreElement{newSession(ctx, func() { self.GetInterface().Remove(ctx.UserName, resp.id) }), refId}
-		self.store[ctx.UserName][resp.id] = s
-		resp.session = self.store[ctx.UserName][resp.id].s.getInterface()
+		ctx.conf = store.conf
+		ctx.db = store.db
+		s := &SessionStoreSessionElement{newSession(ctx, func() { store.GetInterface().Remove(ctx.UserName, resp.id) }), refId}
+		store.store[ctx.UserName].sessions[resp.id] = s
+		resp.session = store.store[ctx.UserName].sessions[resp.id].s.getInterface()
 		rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id)
+		store.store[ctx.UserName].callUpdateHandlerAdd(resp.id, refId)
 	}
 	return
 }
 
-func (self *SessionStore) get(user, id string) (resp getSessionResponse) {
+func (store *SessionStore) get(username, id string) (resp getSessionResponse) {
 	resp.responsecode = http.StatusOK
 	resp.errorstring = "OK"
-	if session, exists := self.store[user][id]; exists {
+
+	user, exists := store.store[username]
+	if !exists {
+		resp.responsecode = http.StatusNotFound
+		resp.errorstring = fmt.Sprintf("SessionStore: session '%s/%s' not found", user, id)
+	}
+
+	if session, exists := user.sessions[id]; exists {
 		resp.session = session.s.getInterface()
 		resp.refId = session.refId
 	} else {
@@ -154,11 +202,11 @@ func (self *SessionStore) get(user, id string) (resp getSessionResponse) {
 	return
 }
 
-func (self *SessionStore) list(user, password string, trusted bool) (resp listSessionsResponse) {
+func (store *SessionStore) list(username, password string, trusted bool, userdata interface{}, cb SessionsUpdateCB) (resp listSessionsResponse) {
 	resp.responsecode = http.StatusOK
 	resp.errorstring = "OK"
 	if !trusted {
-		if ok, err := self.db.CheckPassword(user, password); err != nil {
+		if ok, err := store.db.CheckPassword(username, password); err != nil {
 			resp.responsecode = http.StatusInternalServerError
 			resp.errorstring = err.Error()
 			return
@@ -169,26 +217,42 @@ func (self *SessionStore) list(user, password string, trusted bool) (resp listSe
 		}
 	}
 	resp.sessions = make(map[string]string)
-	if sessions, exists := self.store[user]; exists {
-		for id, e := range sessions {
+	if user, exists := store.store[username]; exists {
+		for id, e := range user.sessions {
 			resp.sessions[id] = e.refId
 		}
+		if cb != nil {
+			user.updateCBs = append(user.updateCBs, SessionsListCB{cb, userdata})
+		}
+	} else if cb != nil {
+		newuser := &SessionStoreUserElement{}
+		newuser.sessions = make(map[string]*SessionStoreSessionElement)
+		newuser.updateCBs = []SessionsListCB{SessionsListCB{cb, userdata}}
+		store.store[username] = newuser
 	}
 	return
 }
 
-func (self *SessionStore) remove(user, id string) (resp removeSessionResponse) {
+func (store *SessionStore) remove(username, id string) (resp removeSessionResponse) {
 	resp.responsecode = http.StatusOK
 	resp.errorstring = "OK"
-	if session, exists := self.store[user][id]; exists {
+
+	user, exists := store.store[username]
+	if !exists {
+		resp.responsecode = http.StatusNotFound
+		resp.errorstring = fmt.Sprintf("SessionStore: session '%s/%s' not found", user, id)
+	}
+
+	if session, exists := user.sessions[id]; exists {
 		go session.s.cleanup() // cleanup could take a while -> don't block all the other stuff
-		delete(self.store[user], id)
-		rhdl.Printf("SessionStore: removed session '%s/%s'", user, id)
-		if userstore, exists := self.store[user]; exists {
-			if len(userstore) == 0 {
-				delete(self.store, user)
-				rhdl.Printf("SessionStore: removed user '%s'", user)
-			}
+		refId := session.refId
+		delete(user.sessions, id)
+		rhdl.Printf("SessionStore: removed session '%s/%s'", username, id)
+		user.callUpdateHandlerRemove(id, refId)
+
+		if len(user.sessions) == 0 && len(user.updateCBs) == 0 {
+			delete(store.store, username)
+			rhdl.Printf("SessionStore: removed user '%s'", username)
 		}
 	} else {
 		resp.responsecode = http.StatusNotFound
@@ -197,20 +261,20 @@ func (self *SessionStore) remove(user, id string) (resp removeSessionResponse) {
 	return
 }
 
-func (self *SessionStore) dispatchRequests() {
-	defer func() { self.done <- true }()
+func (store *SessionStore) dispatchRequests() {
+	defer func() { store.done <- true }()
 	for {
 		select {
-		case <-self.quit:
+		case <-store.quit:
 			return
-		case req := <-self.newChan:
-			req.response <- self.new(req.ctx, req.refId)
-		case req := <-self.getChan:
-			req.response <- self.get(req.user, req.id)
-		case req := <-self.listChan:
-			req.response <- self.list(req.user, req.password, req.trusted)
-		case req := <-self.removeChan:
-			req.response <- self.remove(req.user, req.id)
+		case req := <-store.newChan:
+			req.response <- store.new(req.ctx, req.refId)
+		case req := <-store.getChan:
+			req.response <- store.get(req.user, req.id)
+		case req := <-store.listChan:
+			req.response <- store.list(req.user, req.password, req.trusted, req.userdata, req.callback)
+		case req := <-store.removeChan:
+			req.response <- store.remove(req.user, req.id)
 		}
 	}
 }
@@ -225,73 +289,75 @@ type SessionStoreChan struct {
 	removeChan chan<- removeSessionRequest
 }
 
-func (self *SessionStoreChan) New(ctx *Context, refId string) (string, *SessionChan, int, string) {
+func (store *SessionStoreChan) New(ctx *Context, refId string) (string, *SessionChan, int, string) {
 	resCh := make(chan newSessionResponse)
 	req := newSessionRequest{}
 	req.ctx = ctx
 	req.refId = refId
 	req.response = resCh
-	self.newChan <- req
+	store.newChan <- req
 
 	res := <-resCh
 	return res.id, res.session, res.responsecode, res.errorstring
 }
 
-func (self *SessionStoreChan) Get(user, id string) (*SessionChan, string, int, string) {
+func (store *SessionStoreChan) Get(user, id string) (*SessionChan, string, int, string) {
 	resCh := make(chan getSessionResponse)
 	req := getSessionRequest{}
 	req.user = user
 	req.id = id
 	req.response = resCh
-	self.getChan <- req
+	store.getChan <- req
 
 	res := <-resCh
 	return res.session, res.refId, res.responsecode, res.errorstring
 }
 
-func (self *SessionStoreChan) List(user, password string, trusted bool) (map[string]string, int, string) {
+func (store *SessionStoreChan) List(user, password string, trusted bool, userdata interface{}, cb SessionsUpdateCB) (map[string]string, int, string) {
 	resCh := make(chan listSessionsResponse)
 	req := listSessionsRequest{}
 	req.user = user
 	req.password = password
 	req.trusted = trusted
 	req.response = resCh
-	self.listChan <- req
+	req.callback = cb
+	req.userdata = userdata
+	store.listChan <- req
 
 	res := <-resCh
 	return res.sessions, res.responsecode, res.errorstring
 }
 
-func (self *SessionStoreChan) Remove(user, id string) (int, string) {
+func (store *SessionStoreChan) Remove(user, id string) (int, string) {
 	resCh := make(chan removeSessionResponse)
 	req := removeSessionRequest{}
 	req.user = user
 	req.id = id
 	req.response = resCh
-	self.removeChan <- req
+	store.removeChan <- req
 
 	res := <-resCh
 	return res.responsecode, res.errorstring
 }
 
-func (self *SessionStore) GetInterface() *SessionStoreChan {
+func (store *SessionStore) GetInterface() *SessionStoreChan {
 	ch := &SessionStoreChan{}
-	ch.newChan = self.newChan
-	ch.getChan = self.getChan
-	ch.listChan = self.listChan
-	ch.removeChan = self.removeChan
+	ch.newChan = store.newChan
+	ch.getChan = store.getChan
+	ch.listChan = store.listChan
+	ch.removeChan = store.removeChan
 	return ch
 }
 
-func (self *SessionStore) Cleanup() {
-	self.quit <- true
-	<-self.done
-	close(self.quit)
-	close(self.done)
-	close(self.newChan)
-	close(self.getChan)
-	close(self.listChan)
-	close(self.removeChan)
+func (store *SessionStore) Cleanup() {
+	store.quit <- true
+	<-store.done
+	close(store.quit)
+	close(store.done)
+	close(store.newChan)
+	close(store.getChan)
+	close(store.listChan)
+	close(store.removeChan)
 }
 
 func NewSessionStore(conf *Config, db *rddb.DBChan) (store *SessionStore, err error) {
@@ -300,7 +366,7 @@ func NewSessionStore(conf *Config, db *rddb.DBChan) (store *SessionStore, err er
 	store.db = db
 	store.quit = make(chan bool)
 	store.done = make(chan bool)
-	store.store = make(map[string]map[string]*SessionStoreElement)
+	store.store = make(map[string]*SessionStoreUserElement)
 	store.newChan = make(chan newSessionRequest, 10)
 	store.getChan = make(chan getSessionRequest, 10)
 	store.listChan = make(chan listSessionsRequest, 10)
-- 
cgit v0.10.2