+20
-14
knotclient/events.go
+20
-14
knotclient/events.go
···
7
"log/slog"
8
"math/rand"
9
"net/url"
10
"sync"
11
"time"
12
···
62
}
63
64
type CursorStore interface {
65
-
Set(knot, cursor string)
66
-
Get(knot string) (cursor string)
67
}
68
69
type RedisCursorStore struct {
···
80
cursorKey = "cursor:%s"
81
)
82
83
-
func (r *RedisCursorStore) Set(knot, cursor string) {
84
key := fmt.Sprintf(cursorKey, knot)
85
r.rdb.Set(context.Background(), key, cursor, 0)
86
}
87
88
-
func (r *RedisCursorStore) Get(knot string) (cursor string) {
89
key := fmt.Sprintf(cursorKey, knot)
90
val, err := r.rdb.Get(context.Background(), key).Result()
91
if err != nil {
92
-
return ""
93
}
94
95
-
return val
96
}
97
98
type MemoryCursorStore struct {
99
store sync.Map
100
}
101
102
-
func (m *MemoryCursorStore) Set(knot, cursor string) {
103
m.store.Store(knot, cursor)
104
}
105
106
-
func (m *MemoryCursorStore) Get(knot string) (cursor string) {
107
if result, ok := m.store.Load(knot); ok {
108
-
if val, ok := result.(string); ok {
109
return val
110
}
111
}
112
113
-
return ""
114
}
115
116
-
func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) {
117
scheme := "wss"
118
if e.cfg.Dev {
119
scheme = "ws"
···
124
return nil, err
125
}
126
127
-
if cursor != "" {
128
query := url.Values{}
129
-
query.Add("cursor", cursor)
130
u.RawQuery = query.Encode()
131
}
132
return u, nil
···
222
}
223
224
// update cursor
225
-
c.cfg.CursorStore.Set(j.source.Knot, msg.Rkey)
226
227
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
228
c.logger.Error("error processing message", "source", j.source, "err", err)
···
7
"log/slog"
8
"math/rand"
9
"net/url"
10
+
"strconv"
11
"sync"
12
"time"
13
···
63
}
64
65
type CursorStore interface {
66
+
Set(knot string, cursor int64)
67
+
Get(knot string) (cursor int64)
68
}
69
70
type RedisCursorStore struct {
···
81
cursorKey = "cursor:%s"
82
)
83
84
+
func (r *RedisCursorStore) Set(knot string, cursor int64) {
85
key := fmt.Sprintf(cursorKey, knot)
86
r.rdb.Set(context.Background(), key, cursor, 0)
87
}
88
89
+
func (r *RedisCursorStore) Get(knot string) (cursor int64) {
90
key := fmt.Sprintf(cursorKey, knot)
91
val, err := r.rdb.Get(context.Background(), key).Result()
92
if err != nil {
93
+
return 0
94
}
95
96
+
cursor, err = strconv.ParseInt(val, 10, 64)
97
+
if err != nil {
98
+
return 0 // optionally log parsing error
99
+
}
100
+
101
+
return cursor
102
}
103
104
type MemoryCursorStore struct {
105
store sync.Map
106
}
107
108
+
func (m *MemoryCursorStore) Set(knot string, cursor int64) {
109
m.store.Store(knot, cursor)
110
}
111
112
+
func (m *MemoryCursorStore) Get(knot string) (cursor int64) {
113
if result, ok := m.store.Load(knot); ok {
114
+
if val, ok := result.(int64); ok {
115
return val
116
}
117
}
118
119
+
return 0
120
}
121
122
+
func (e *EventConsumer) buildUrl(s EventSource, cursor int64) (*url.URL, error) {
123
scheme := "wss"
124
if e.cfg.Dev {
125
scheme = "ws"
···
130
return nil, err
131
}
132
133
+
if cursor != 0 {
134
query := url.Values{}
135
+
query.Add("cursor", fmt.Sprintf("%d", cursor))
136
u.RawQuery = query.Encode()
137
}
138
return u, nil
···
228
}
229
230
// update cursor
231
+
c.cfg.CursorStore.Set(j.source.Knot, time.Now().Unix())
232
233
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
234
c.logger.Error("error processing message", "source", j.source, "err", err)
+10
-6
knotserver/db/events.go
+10
-6
knotserver/db/events.go
···
10
Rkey string `json:"rkey"`
11
Nsid string `json:"nsid"`
12
EventJson string `json:"event"`
13
}
14
15
func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error {
16
_, err := d.db.Exec(
17
`insert into events (rkey, nsid, event) values (?, ?, ?)`,
18
event.Rkey,
···
25
return err
26
}
27
28
-
func (d *DB) GetEvents(cursor string) ([]Event, error) {
29
whereClause := ""
30
args := []any{}
31
-
if cursor != "" {
32
-
whereClause = "where rkey > ?"
33
args = append(args, cursor)
34
}
35
36
query := fmt.Sprintf(`
37
-
select rkey, nsid, event
38
from events
39
%s
40
-
order by rkey asc
41
limit 100
42
`, whereClause)
43
···
50
var evts []Event
51
for rows.Next() {
52
var ev Event
53
-
rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson)
54
evts = append(evts, ev)
55
}
56
···
10
Rkey string `json:"rkey"`
11
Nsid string `json:"nsid"`
12
EventJson string `json:"event"`
13
+
Created int64 `json:"created"`
14
}
15
16
func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error {
17
+
18
_, err := d.db.Exec(
19
`insert into events (rkey, nsid, event) values (?, ?, ?)`,
20
event.Rkey,
···
27
return err
28
}
29
30
+
func (d *DB) GetEvents(cursor int64) ([]Event, error) {
31
whereClause := ""
32
args := []any{}
33
+
if cursor > 0 {
34
+
whereClause = "where created > ?"
35
args = append(args, cursor)
36
}
37
38
query := fmt.Sprintf(`
39
+
select rkey, nsid, event, created
40
from events
41
%s
42
+
order by created asc
43
limit 100
44
`, whereClause)
45
···
52
var evts []Event
53
for rows.Next() {
54
var ev Event
55
+
if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil {
56
+
return nil, err
57
+
}
58
evts = append(evts, ev)
59
}
60
+1
knotserver/db/init.go
+1
knotserver/db/init.go
+8
-3
knotserver/events.go
+8
-3
knotserver/events.go
···
4
"context"
5
"encoding/json"
6
"net/http"
7
"time"
8
9
"github.com/gorilla/websocket"
···
42
}
43
}()
44
45
-
cursor := r.URL.Query().Get("cursor")
46
47
// complete backfill first before going to live data
48
l.Debug("going through backfill", "cursor", cursor)
···
74
}
75
}
76
77
-
func (h *Handle) streamOps(conn *websocket.Conn, cursor *string) error {
78
events, err := h.db.GetEvents(*cursor)
79
if err != nil {
80
h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
···
105
h.l.Debug("err", "err", err)
106
return err
107
}
108
-
*cursor = event.Rkey
109
}
110
111
return nil
···
4
"context"
5
"encoding/json"
6
"net/http"
7
+
"strconv"
8
"time"
9
10
"github.com/gorilla/websocket"
···
43
}
44
}()
45
46
+
cursorStr := r.URL.Query().Get("cursor")
47
+
cursor, err := strconv.ParseInt(cursorStr, 10, 64)
48
+
if err != nil {
49
+
l.Error("empty or invalid cursor, defaulting to zero", "invalidCursor", cursorStr)
50
+
}
51
52
// complete backfill first before going to live data
53
l.Debug("going through backfill", "cursor", cursor)
···
79
}
80
}
81
82
+
func (h *Handle) streamOps(conn *websocket.Conn, cursor *int64) error {
83
events, err := h.db.GetEvents(*cursor)
84
if err != nil {
85
h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
···
110
h.l.Debug("err", "err", err)
111
return err
112
}
113
+
*cursor = event.Created
114
}
115
116
return nil
+1
-1
nix/vm.nix
+1
-1
nix/vm.nix
···
21
g = config.services.tangled-knot.gitUser;
22
in [
23
"d /var/lib/knot 0770 ${u} ${g} - -" # Create the directory first
24
-
"f+ /var/lib/knot/secret 0660 ${u} ${g} - KNOT_SERVER_SECRET=40b4db20544e37a12ba3ed7353d4d4421a30e0593385068d2ef85263495794d8"
25
];
26
services.tangled-knot = {
27
enable = true;
···
21
g = config.services.tangled-knot.gitUser;
22
in [
23
"d /var/lib/knot 0770 ${u} ${g} - -" # Create the directory first
24
+
"f+ /var/lib/knot/secret 0660 ${u} ${g} - KNOT_SERVER_SECRET=16154910ef55fe48121082c0b51fc0e360a8b15eb7bda7991d88dc9f7684427a"
25
];
26
services.tangled-knot = {
27
enable = true;