forked from tangled.org/core
this repo has no description

knotserver: introduce notifier package

Notifier is a semaphore that can be used to indicate a change in a
resource.

Signed-off-by: oppiliappan <me@oppi.li>

authored by oppi.li and committed by Tangled e487d3f7 01cee5e8

Changed files
+67 -7
knotserver
+9 -2
knotserver/db/oplog.go
··· 2 3 import ( 4 "fmt" 5 ) 6 7 type Op struct { ··· 13 Ref string // the reference being updated 14 } 15 16 - func (d *DB) InsertOp(op Op) error { 17 _, err := d.db.Exec( 18 `insert into oplog (tid, did, repo, old_sha, new_sha, ref) values (?, ?, ?, ?, ?, ?)`, 19 op.Tid, ··· 23 op.NewSha, 24 op.Ref, 25 ) 26 - return err 27 } 28 29 func (d *DB) GetOps(cursor string) ([]Op, error) {
··· 2 3 import ( 4 "fmt" 5 + 6 + "tangled.sh/tangled.sh/core/knotserver/notifier" 7 ) 8 9 type Op struct { ··· 15 Ref string // the reference being updated 16 } 17 18 + func (d *DB) InsertOp(op Op, notifier *notifier.Notifier) error { 19 _, err := d.db.Exec( 20 `insert into oplog (tid, did, repo, old_sha, new_sha, ref) values (?, ?, ?, ?, ?, ?)`, 21 op.Tid, ··· 25 op.NewSha, 26 op.Ref, 27 ) 28 + if err != nil { 29 + return err 30 + } 31 + 32 + notifier.NotifyAll() 33 + return nil 34 } 35 36 func (d *DB) GetOps(cursor string) ([]Op, error) {
+4 -1
knotserver/handler.go
··· 11 "tangled.sh/tangled.sh/core/jetstream" 12 "tangled.sh/tangled.sh/core/knotserver/config" 13 "tangled.sh/tangled.sh/core/knotserver/db" 14 "tangled.sh/tangled.sh/core/rbac" 15 ) 16 ··· 24 jc *jetstream.JetstreamClient 25 e *rbac.Enforcer 26 l *slog.Logger 27 28 // init is a channel that is closed when the knot has been initailized 29 // i.e. when the first user (knot owner) has been added. ··· 31 knotInitialized bool 32 } 33 34 - func Setup(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer, jc *jetstream.JetstreamClient, l *slog.Logger) (http.Handler, error) { 35 r := chi.NewRouter() 36 37 h := Handle{ ··· 40 e: e, 41 l: l, 42 jc: jc, 43 init: make(chan struct{}), 44 } 45
··· 11 "tangled.sh/tangled.sh/core/jetstream" 12 "tangled.sh/tangled.sh/core/knotserver/config" 13 "tangled.sh/tangled.sh/core/knotserver/db" 14 + "tangled.sh/tangled.sh/core/knotserver/notifier" 15 "tangled.sh/tangled.sh/core/rbac" 16 ) 17 ··· 25 jc *jetstream.JetstreamClient 26 e *rbac.Enforcer 27 l *slog.Logger 28 + n *notifier.Notifier 29 30 // init is a channel that is closed when the knot has been initailized 31 // i.e. when the first user (knot owner) has been added. ··· 33 knotInitialized bool 34 } 35 36 + func Setup(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer, jc *jetstream.JetstreamClient, l *slog.Logger, n *notifier.Notifier) (http.Handler, error) { 37 r := chi.NewRouter() 38 39 h := Handle{ ··· 42 e: e, 43 l: l, 44 jc: jc, 45 + n: n, 46 init: make(chan struct{}), 47 } 48
+5 -2
knotserver/internal.go
··· 12 "github.com/go-chi/chi/v5/middleware" 13 "tangled.sh/tangled.sh/core/knotserver/config" 14 "tangled.sh/tangled.sh/core/knotserver/db" 15 "tangled.sh/tangled.sh/core/rbac" 16 ) 17 ··· 20 c *config.Config 21 e *rbac.Enforcer 22 l *slog.Logger 23 } 24 25 func (h *InternalHandle) PushAllowed(w http.ResponseWriter, r *http.Request) { ··· 99 } 100 101 for _, op := range ops { 102 - err := h.db.InsertOp(op) 103 if err != nil { 104 l.Error("failed to insert op", "err", err, "op", op) 105 continue ··· 109 return 110 } 111 112 - func Internal(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer, l *slog.Logger) http.Handler { 113 r := chi.NewRouter() 114 115 h := InternalHandle{ ··· 117 c, 118 e, 119 l, 120 } 121 122 r.Get("/push-allowed", h.PushAllowed)
··· 12 "github.com/go-chi/chi/v5/middleware" 13 "tangled.sh/tangled.sh/core/knotserver/config" 14 "tangled.sh/tangled.sh/core/knotserver/db" 15 + "tangled.sh/tangled.sh/core/knotserver/notifier" 16 "tangled.sh/tangled.sh/core/rbac" 17 ) 18 ··· 21 c *config.Config 22 e *rbac.Enforcer 23 l *slog.Logger 24 + n *notifier.Notifier 25 } 26 27 func (h *InternalHandle) PushAllowed(w http.ResponseWriter, r *http.Request) { ··· 101 } 102 103 for _, op := range ops { 104 + err := h.db.InsertOp(op, h.n) 105 if err != nil { 106 l.Error("failed to insert op", "err", err, "op", op) 107 continue ··· 111 return 112 } 113 114 + func Internal(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer, l *slog.Logger, n *notifier.Notifier) http.Handler { 115 r := chi.NewRouter() 116 117 h := InternalHandle{ ··· 119 c, 120 e, 121 l, 122 + n, 123 } 124 125 r.Get("/push-allowed", h.PushAllowed)
+43
knotserver/notifier/notifier.go
···
··· 1 + package notifier 2 + 3 + import ( 4 + "sync" 5 + ) 6 + 7 + type Notifier struct { 8 + subscribers map[chan struct{}]struct{} 9 + mu sync.Mutex 10 + } 11 + 12 + func New() Notifier { 13 + return Notifier{ 14 + subscribers: make(map[chan struct{}]struct{}), 15 + } 16 + } 17 + 18 + func (n *Notifier) Subscribe() chan struct{} { 19 + ch := make(chan struct{}, 1) 20 + n.mu.Lock() 21 + n.subscribers[ch] = struct{}{} 22 + n.mu.Unlock() 23 + return ch 24 + } 25 + 26 + func (n *Notifier) Unsubscribe(ch chan struct{}) { 27 + n.mu.Lock() 28 + delete(n.subscribers, ch) 29 + close(ch) 30 + n.mu.Unlock() 31 + } 32 + 33 + func (n *Notifier) NotifyAll() { 34 + n.mu.Lock() 35 + for ch := range n.subscribers { 36 + select { 37 + case ch <- struct{}{}: 38 + default: 39 + // avoid blocking if channel is full 40 + } 41 + } 42 + n.mu.Unlock() 43 + }
+6 -2
knotserver/server.go
··· 11 "tangled.sh/tangled.sh/core/jetstream" 12 "tangled.sh/tangled.sh/core/knotserver/config" 13 "tangled.sh/tangled.sh/core/knotserver/db" 14 "tangled.sh/tangled.sh/core/log" 15 "tangled.sh/tangled.sh/core/rbac" 16 ) ··· 79 logger.Error("failed to setup jetstream", "error", err) 80 } 81 82 - mux, err := Setup(ctx, c, db, e, jc, logger) 83 if err != nil { 84 return fmt.Errorf("failed to setup server: %w", err) 85 } 86 - imux := Internal(ctx, c, db, e, iLogger) 87 88 logger.Info("starting internal server", "address", c.Server.InternalListenAddr) 89 go http.ListenAndServe(c.Server.InternalListenAddr, imux)
··· 11 "tangled.sh/tangled.sh/core/jetstream" 12 "tangled.sh/tangled.sh/core/knotserver/config" 13 "tangled.sh/tangled.sh/core/knotserver/db" 14 + "tangled.sh/tangled.sh/core/knotserver/notifier" 15 "tangled.sh/tangled.sh/core/log" 16 "tangled.sh/tangled.sh/core/rbac" 17 ) ··· 80 logger.Error("failed to setup jetstream", "error", err) 81 } 82 83 + notifier := notifier.New() 84 + 85 + mux, err := Setup(ctx, c, db, e, jc, logger, &notifier) 86 if err != nil { 87 return fmt.Errorf("failed to setup server: %w", err) 88 } 89 + 90 + imux := Internal(ctx, c, db, e, iLogger, &notifier) 91 92 logger.Info("starting internal server", "address", c.Server.InternalListenAddr) 93 go http.ListenAndServe(c.Server.InternalListenAddr, imux)