1package backfill
2
3import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/bluesky-social/indigo/repomgr"
10 "github.com/ipfs/go-cid"
11)
12
13// A BufferedOp is an operation buffered while a repo is being backfilled.
14type BufferedOp struct {
15 // Kind describes the type of operation.
16 Kind repomgr.EventKind
17 // Path contains the path the operation applies to.
18 Path string
19 // Record contains the serialized record for create and update operations.
20 Record *[]byte
21 // Cid is the CID of the record.
22 Cid *cid.Cid
23}
24
25type opSet struct {
26 since *string
27 rev string
28 ops []*BufferedOp
29}
30
31type Memjob struct {
32 repo string
33 state string
34 rev string
35 lk sync.Mutex
36 bufferedOps []*opSet
37
38 createdAt time.Time
39 updatedAt time.Time
40}
41
42// Memstore is a simple in-memory implementation of the Backfill Store interface
43type Memstore struct {
44 lk sync.RWMutex
45 jobs map[string]*Memjob
46}
47
48func NewMemstore() *Memstore {
49 return &Memstore{
50 jobs: make(map[string]*Memjob),
51 }
52}
53
54func (s *Memstore) EnqueueJob(repo string) error {
55 s.lk.Lock()
56 defer s.lk.Unlock()
57
58 if _, ok := s.jobs[repo]; ok {
59 return fmt.Errorf("job already exists for repo %s", repo)
60 }
61
62 j := &Memjob{
63 repo: repo,
64 createdAt: time.Now(),
65 updatedAt: time.Now(),
66 state: StateEnqueued,
67 }
68 s.jobs[repo] = j
69 return nil
70}
71
72func (s *Memstore) EnqueueJobWithState(repo, state string) error {
73 s.lk.Lock()
74 defer s.lk.Unlock()
75
76 if _, ok := s.jobs[repo]; ok {
77 return fmt.Errorf("job already exists for repo %s", repo)
78 }
79
80 j := &Memjob{
81 repo: repo,
82 createdAt: time.Now(),
83 updatedAt: time.Now(),
84 state: state,
85 }
86 s.jobs[repo] = j
87 return nil
88}
89
90func (s *Memstore) BufferOp(ctx context.Context, repo string, since *string, rev string, kind repomgr.EventKind, path string, rec *[]byte, cid *cid.Cid) (bool, error) {
91 s.lk.Lock()
92
93 // If the job doesn't exist, we can't buffer an op for it
94 j, ok := s.jobs[repo]
95 s.lk.Unlock()
96 if !ok {
97 return false, ErrJobNotFound
98 }
99
100 j.lk.Lock()
101 defer j.lk.Unlock()
102
103 switch j.state {
104 case StateComplete:
105 return false, ErrJobComplete
106 case StateInProgress:
107 // keep going and buffer the op
108 default:
109 return false, nil
110 }
111
112 j.bufferedOps = append(j.bufferedOps, &opSet{
113 since: since,
114 rev: rev,
115 ops: []*BufferedOp{&BufferedOp{
116 Path: path,
117 Kind: kind,
118 Record: rec,
119 Cid: cid,
120 }},
121 })
122 j.updatedAt = time.Now()
123 return true, nil
124}
125
126func (j *Memjob) BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error) {
127 j.lk.Lock()
128 defer j.lk.Unlock()
129
130 switch j.state {
131 case StateComplete:
132 return false, ErrJobComplete
133 case StateInProgress:
134 // keep going and buffer the op
135 default:
136 return false, nil
137 }
138
139 j.bufferedOps = append(j.bufferedOps, &opSet{
140 since: since,
141 rev: rev,
142 ops: ops,
143 })
144 j.updatedAt = time.Now()
145 return true, nil
146}
147
148func (s *Memstore) GetJob(ctx context.Context, repo string) (Job, error) {
149 s.lk.RLock()
150 defer s.lk.RUnlock()
151
152 j, ok := s.jobs[repo]
153 if !ok || j == nil {
154 return nil, nil
155 }
156 return j, nil
157}
158
159func (s *Memstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) {
160 s.lk.RLock()
161 defer s.lk.RUnlock()
162
163 for _, j := range s.jobs {
164 if j.State() == StateEnqueued {
165 return j, nil
166 }
167 }
168 return nil, nil
169}
170
171func (s *Memstore) PurgeRepo(ctx context.Context, repo string) error {
172 s.lk.RLock()
173 defer s.lk.RUnlock()
174
175 delete(s.jobs, repo)
176 return nil
177}
178
179func (j *Memjob) Repo() string {
180 return j.repo
181}
182
183func (j *Memjob) State() string {
184 j.lk.Lock()
185 defer j.lk.Unlock()
186
187 return j.state
188}
189
190func (j *Memjob) SetState(ctx context.Context, state string) error {
191 j.lk.Lock()
192 defer j.lk.Unlock()
193
194 j.state = state
195 j.updatedAt = time.Now()
196 return nil
197}
198
199func (j *Memjob) Rev() string {
200 return j.rev
201}
202
203func (j *Memjob) SetRev(ctx context.Context, rev string) error {
204 j.rev = rev
205 return nil
206}
207
208func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error) error {
209 panic("TODO: copy what we end up doing from the gormstore")
210 /*
211 j.lk.Lock()
212 defer j.lk.Unlock()
213
214 for _, opset := range j.bufferedOps {
215 for _, op := range opset.ops {
216 if err := fn(op.Kind, op.Path, op.Record, op.Cid); err != nil {
217 return err
218 }
219 }
220 }
221
222 j.bufferedOps = map[string][]*BufferedOp{}
223 j.state = StateComplete
224
225 return nil
226 */
227}
228
229func (j *Memjob) ClearBufferedOps(ctx context.Context) error {
230 j.lk.Lock()
231 defer j.lk.Unlock()
232
233 j.bufferedOps = []*opSet{}
234 j.updatedAt = time.Now()
235 return nil
236}
237
238func (j *Memjob) RetryCount() int {
239 j.lk.Lock()
240 defer j.lk.Unlock()
241 return 0
242}