+4
server/message_store.go
+4
server/message_store.go
···
5
"sync"
6
)
7
8
type MemoryStore struct {
9
mu sync.Mutex
10
msgs map[int]message
11
offset int
12
}
13
14
func NewMemoryStore() *MemoryStore {
15
return &MemoryStore{
16
msgs: make(map[int]message),
17
}
18
}
19
20
func (m *MemoryStore) Write(msg message) error {
21
m.mu.Lock()
22
defer m.mu.Unlock()
···
28
return nil
29
}
30
31
func (m *MemoryStore) ReadFrom(offset int, handleFunc func(msg message)) error {
32
if offset < 0 || offset > m.offset {
33
return fmt.Errorf("invalid offset provided")
···
5
"sync"
6
)
7
8
+
// Memory store allows messages to be stored in memory
9
type MemoryStore struct {
10
mu sync.Mutex
11
msgs map[int]message
12
offset int
13
}
14
15
+
// New memory store initializes a new in memory store
16
func NewMemoryStore() *MemoryStore {
17
return &MemoryStore{
18
msgs: make(map[int]message),
19
}
20
}
21
22
+
// Write will write the provided message to the in memory store
23
func (m *MemoryStore) Write(msg message) error {
24
m.mu.Lock()
25
defer m.mu.Unlock()
···
31
return nil
32
}
33
34
+
// ReadFrom will read messages from (and including) the provided offset and pass them to the provided handler
35
func (m *MemoryStore) ReadFrom(offset int, handleFunc func(msg message)) error {
36
if offset < 0 || offset > m.offset {
37
return fmt.Errorf("invalid offset provided")
+4
server/subscriber.go
+4
server/subscriber.go