From bb072813ee52a605cc44d424ec1d037cbaee76a8 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 17 Jul 2016 19:50:03 +0200 Subject: send add/remove updates from session_store diff --git a/rhimport/session_store.go b/rhimport/session_store.go index f744f50..f25a297 100644 --- a/rhimport/session_store.go +++ b/rhimport/session_store.go @@ -60,6 +60,13 @@ type getSessionRequest struct { response chan getSessionResponse } +type SessionsUpdateCB func(added, removed map[string]string, userdata interface{}) bool + +type SessionsListCB struct { + cb SessionsUpdateCB + userdata interface{} +} + type listSessionsResponse struct { sessions map[string]string responsecode int @@ -70,6 +77,8 @@ type listSessionsRequest struct { user string password string trusted bool + callback SessionsUpdateCB + userdata interface{} response chan listSessionsResponse } @@ -84,13 +93,42 @@ type removeSessionRequest struct { response chan removeSessionResponse } -type SessionStoreElement struct { +type SessionStoreSessionElement struct { s *Session refId string } +type SessionStoreUserElement struct { + sessions map[string]*SessionStoreSessionElement + updateCBs []SessionsListCB +} + +func (user *SessionStoreUserElement) callUpdateHandler(added, removed map[string]string) { + var keptCBs []SessionsListCB + for _, cb := range user.updateCBs { + if cb.cb != nil { + if keep := cb.cb(added, removed, cb.userdata); keep { + keptCBs = append(keptCBs, cb) + } + } + } + user.updateCBs = keptCBs +} + +func (user *SessionStoreUserElement) callUpdateHandlerAdd(id, refId string) { + added := make(map[string]string) + added[id] = refId + user.callUpdateHandler(added, nil) +} + +func (user *SessionStoreUserElement) callUpdateHandlerRemove(id, refId string) { + removed := make(map[string]string) + removed[id] = refId + user.callUpdateHandler(nil, removed) +} + type SessionStore struct { - store map[string]map[string]*SessionStoreElement + store map[string]*SessionStoreUserElement conf *Config db *rddb.DBChan quit chan bool @@ -109,11 +147,11 @@ func generateSessionId() (string, error) { return base64.RawStdEncoding.EncodeToString(b[:]), nil } -func (self *SessionStore) new(ctx *Context, refId string) (resp newSessionResponse) { +func (store *SessionStore) new(ctx *Context, refId string) (resp newSessionResponse) { resp.responsecode = http.StatusOK resp.errorstring = "OK" if !ctx.Trusted { - if ok, err := self.db.CheckPassword(ctx.UserName, ctx.Password); err != nil { + if ok, err := store.db.CheckPassword(ctx.UserName, ctx.Password); err != nil { resp.responsecode = http.StatusInternalServerError resp.errorstring = err.Error() return @@ -128,23 +166,33 @@ func (self *SessionStore) new(ctx *Context, refId string) (resp newSessionRespon resp.errorstring = err.Error() } else { resp.id = id - if _, exists := self.store[ctx.UserName]; !exists { - self.store[ctx.UserName] = make(map[string]*SessionStoreElement) + if _, exists := store.store[ctx.UserName]; !exists { + newuser := &SessionStoreUserElement{} + newuser.sessions = make(map[string]*SessionStoreSessionElement) + store.store[ctx.UserName] = newuser } - ctx.conf = self.conf - ctx.db = self.db - s := &SessionStoreElement{newSession(ctx, func() { self.GetInterface().Remove(ctx.UserName, resp.id) }), refId} - self.store[ctx.UserName][resp.id] = s - resp.session = self.store[ctx.UserName][resp.id].s.getInterface() + ctx.conf = store.conf + ctx.db = store.db + s := &SessionStoreSessionElement{newSession(ctx, func() { store.GetInterface().Remove(ctx.UserName, resp.id) }), refId} + store.store[ctx.UserName].sessions[resp.id] = s + resp.session = store.store[ctx.UserName].sessions[resp.id].s.getInterface() rhdl.Printf("SessionStore: created session for '%s' -> %s", ctx.UserName, resp.id) + store.store[ctx.UserName].callUpdateHandlerAdd(resp.id, refId) } return } -func (self *SessionStore) get(user, id string) (resp getSessionResponse) { +func (store *SessionStore) get(username, id string) (resp getSessionResponse) { resp.responsecode = http.StatusOK resp.errorstring = "OK" - if session, exists := self.store[user][id]; exists { + + user, exists := store.store[username] + if !exists { + resp.responsecode = http.StatusNotFound + resp.errorstring = fmt.Sprintf("SessionStore: session '%s/%s' not found", user, id) + } + + if session, exists := user.sessions[id]; exists { resp.session = session.s.getInterface() resp.refId = session.refId } else { @@ -154,11 +202,11 @@ func (self *SessionStore) get(user, id string) (resp getSessionResponse) { return } -func (self *SessionStore) list(user, password string, trusted bool) (resp listSessionsResponse) { +func (store *SessionStore) list(username, password string, trusted bool, userdata interface{}, cb SessionsUpdateCB) (resp listSessionsResponse) { resp.responsecode = http.StatusOK resp.errorstring = "OK" if !trusted { - if ok, err := self.db.CheckPassword(user, password); err != nil { + if ok, err := store.db.CheckPassword(username, password); err != nil { resp.responsecode = http.StatusInternalServerError resp.errorstring = err.Error() return @@ -169,26 +217,42 @@ func (self *SessionStore) list(user, password string, trusted bool) (resp listSe } } resp.sessions = make(map[string]string) - if sessions, exists := self.store[user]; exists { - for id, e := range sessions { + if user, exists := store.store[username]; exists { + for id, e := range user.sessions { resp.sessions[id] = e.refId } + if cb != nil { + user.updateCBs = append(user.updateCBs, SessionsListCB{cb, userdata}) + } + } else if cb != nil { + newuser := &SessionStoreUserElement{} + newuser.sessions = make(map[string]*SessionStoreSessionElement) + newuser.updateCBs = []SessionsListCB{SessionsListCB{cb, userdata}} + store.store[username] = newuser } return } -func (self *SessionStore) remove(user, id string) (resp removeSessionResponse) { +func (store *SessionStore) remove(username, id string) (resp removeSessionResponse) { resp.responsecode = http.StatusOK resp.errorstring = "OK" - if session, exists := self.store[user][id]; exists { + + user, exists := store.store[username] + if !exists { + resp.responsecode = http.StatusNotFound + resp.errorstring = fmt.Sprintf("SessionStore: session '%s/%s' not found", user, id) + } + + if session, exists := user.sessions[id]; exists { go session.s.cleanup() // cleanup could take a while -> don't block all the other stuff - delete(self.store[user], id) - rhdl.Printf("SessionStore: removed session '%s/%s'", user, id) - if userstore, exists := self.store[user]; exists { - if len(userstore) == 0 { - delete(self.store, user) - rhdl.Printf("SessionStore: removed user '%s'", user) - } + refId := session.refId + delete(user.sessions, id) + rhdl.Printf("SessionStore: removed session '%s/%s'", username, id) + user.callUpdateHandlerRemove(id, refId) + + if len(user.sessions) == 0 && len(user.updateCBs) == 0 { + delete(store.store, username) + rhdl.Printf("SessionStore: removed user '%s'", username) } } else { resp.responsecode = http.StatusNotFound @@ -197,20 +261,20 @@ func (self *SessionStore) remove(user, id string) (resp removeSessionResponse) { return } -func (self *SessionStore) dispatchRequests() { - defer func() { self.done <- true }() +func (store *SessionStore) dispatchRequests() { + defer func() { store.done <- true }() for { select { - case <-self.quit: + case <-store.quit: return - case req := <-self.newChan: - req.response <- self.new(req.ctx, req.refId) - case req := <-self.getChan: - req.response <- self.get(req.user, req.id) - case req := <-self.listChan: - req.response <- self.list(req.user, req.password, req.trusted) - case req := <-self.removeChan: - req.response <- self.remove(req.user, req.id) + case req := <-store.newChan: + req.response <- store.new(req.ctx, req.refId) + case req := <-store.getChan: + req.response <- store.get(req.user, req.id) + case req := <-store.listChan: + req.response <- store.list(req.user, req.password, req.trusted, req.userdata, req.callback) + case req := <-store.removeChan: + req.response <- store.remove(req.user, req.id) } } } @@ -225,73 +289,75 @@ type SessionStoreChan struct { removeChan chan<- removeSessionRequest } -func (self *SessionStoreChan) New(ctx *Context, refId string) (string, *SessionChan, int, string) { +func (store *SessionStoreChan) New(ctx *Context, refId string) (string, *SessionChan, int, string) { resCh := make(chan newSessionResponse) req := newSessionRequest{} req.ctx = ctx req.refId = refId req.response = resCh - self.newChan <- req + store.newChan <- req res := <-resCh return res.id, res.session, res.responsecode, res.errorstring } -func (self *SessionStoreChan) Get(user, id string) (*SessionChan, string, int, string) { +func (store *SessionStoreChan) Get(user, id string) (*SessionChan, string, int, string) { resCh := make(chan getSessionResponse) req := getSessionRequest{} req.user = user req.id = id req.response = resCh - self.getChan <- req + store.getChan <- req res := <-resCh return res.session, res.refId, res.responsecode, res.errorstring } -func (self *SessionStoreChan) List(user, password string, trusted bool) (map[string]string, int, string) { +func (store *SessionStoreChan) List(user, password string, trusted bool, userdata interface{}, cb SessionsUpdateCB) (map[string]string, int, string) { resCh := make(chan listSessionsResponse) req := listSessionsRequest{} req.user = user req.password = password req.trusted = trusted req.response = resCh - self.listChan <- req + req.callback = cb + req.userdata = userdata + store.listChan <- req res := <-resCh return res.sessions, res.responsecode, res.errorstring } -func (self *SessionStoreChan) Remove(user, id string) (int, string) { +func (store *SessionStoreChan) Remove(user, id string) (int, string) { resCh := make(chan removeSessionResponse) req := removeSessionRequest{} req.user = user req.id = id req.response = resCh - self.removeChan <- req + store.removeChan <- req res := <-resCh return res.responsecode, res.errorstring } -func (self *SessionStore) GetInterface() *SessionStoreChan { +func (store *SessionStore) GetInterface() *SessionStoreChan { ch := &SessionStoreChan{} - ch.newChan = self.newChan - ch.getChan = self.getChan - ch.listChan = self.listChan - ch.removeChan = self.removeChan + ch.newChan = store.newChan + ch.getChan = store.getChan + ch.listChan = store.listChan + ch.removeChan = store.removeChan return ch } -func (self *SessionStore) Cleanup() { - self.quit <- true - <-self.done - close(self.quit) - close(self.done) - close(self.newChan) - close(self.getChan) - close(self.listChan) - close(self.removeChan) +func (store *SessionStore) Cleanup() { + store.quit <- true + <-store.done + close(store.quit) + close(store.done) + close(store.newChan) + close(store.getChan) + close(store.listChan) + close(store.removeChan) } func NewSessionStore(conf *Config, db *rddb.DBChan) (store *SessionStore, err error) { @@ -300,7 +366,7 @@ func NewSessionStore(conf *Config, db *rddb.DBChan) (store *SessionStore, err er store.db = db store.quit = make(chan bool) store.done = make(chan bool) - store.store = make(map[string]map[string]*SessionStoreElement) + store.store = make(map[string]*SessionStoreUserElement) store.newChan = make(chan newSessionRequest, 10) store.getChan = make(chan getSessionRequest, 10) store.listChan = make(chan listSessionsRequest, 10) -- cgit v0.10.2