porting all github actions from bluesky-social/indigo to tangled CI
at main 4.6 kB view raw
1package backfill 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 "time" 8 9 "github.com/bluesky-social/indigo/repomgr" 10 "github.com/ipfs/go-cid" 11) 12 13// A BufferedOp is an operation buffered while a repo is being backfilled. 14type BufferedOp struct { 15 // Kind describes the type of operation. 16 Kind repomgr.EventKind 17 // Path contains the path the operation applies to. 18 Path string 19 // Record contains the serialized record for create and update operations. 20 Record *[]byte 21 // Cid is the CID of the record. 22 Cid *cid.Cid 23} 24 25type opSet struct { 26 since *string 27 rev string 28 ops []*BufferedOp 29} 30 31type Memjob struct { 32 repo string 33 state string 34 rev string 35 lk sync.Mutex 36 bufferedOps []*opSet 37 38 createdAt time.Time 39 updatedAt time.Time 40} 41 42// Memstore is a simple in-memory implementation of the Backfill Store interface 43type Memstore struct { 44 lk sync.RWMutex 45 jobs map[string]*Memjob 46} 47 48func NewMemstore() *Memstore { 49 return &Memstore{ 50 jobs: make(map[string]*Memjob), 51 } 52} 53 54func (s *Memstore) EnqueueJob(repo string) error { 55 s.lk.Lock() 56 defer s.lk.Unlock() 57 58 if _, ok := s.jobs[repo]; ok { 59 return fmt.Errorf("job already exists for repo %s", repo) 60 } 61 62 j := &Memjob{ 63 repo: repo, 64 createdAt: time.Now(), 65 updatedAt: time.Now(), 66 state: StateEnqueued, 67 } 68 s.jobs[repo] = j 69 return nil 70} 71 72func (s *Memstore) EnqueueJobWithState(repo, state string) error { 73 s.lk.Lock() 74 defer s.lk.Unlock() 75 76 if _, ok := s.jobs[repo]; ok { 77 return fmt.Errorf("job already exists for repo %s", repo) 78 } 79 80 j := &Memjob{ 81 repo: repo, 82 createdAt: time.Now(), 83 updatedAt: time.Now(), 84 state: state, 85 } 86 s.jobs[repo] = j 87 return nil 88} 89 90func (s *Memstore) BufferOp(ctx context.Context, repo string, since *string, rev string, kind repomgr.EventKind, path string, rec *[]byte, cid *cid.Cid) (bool, error) { 91 s.lk.Lock() 92 93 // If the job doesn't exist, we can't buffer an op for it 94 j, ok := s.jobs[repo] 95 s.lk.Unlock() 96 if !ok { 97 return false, ErrJobNotFound 98 } 99 100 j.lk.Lock() 101 defer j.lk.Unlock() 102 103 switch j.state { 104 case StateComplete: 105 return false, ErrJobComplete 106 case StateInProgress: 107 // keep going and buffer the op 108 default: 109 return false, nil 110 } 111 112 j.bufferedOps = append(j.bufferedOps, &opSet{ 113 since: since, 114 rev: rev, 115 ops: []*BufferedOp{&BufferedOp{ 116 Path: path, 117 Kind: kind, 118 Record: rec, 119 Cid: cid, 120 }}, 121 }) 122 j.updatedAt = time.Now() 123 return true, nil 124} 125 126func (j *Memjob) BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error) { 127 j.lk.Lock() 128 defer j.lk.Unlock() 129 130 switch j.state { 131 case StateComplete: 132 return false, ErrJobComplete 133 case StateInProgress: 134 // keep going and buffer the op 135 default: 136 return false, nil 137 } 138 139 j.bufferedOps = append(j.bufferedOps, &opSet{ 140 since: since, 141 rev: rev, 142 ops: ops, 143 }) 144 j.updatedAt = time.Now() 145 return true, nil 146} 147 148func (s *Memstore) GetJob(ctx context.Context, repo string) (Job, error) { 149 s.lk.RLock() 150 defer s.lk.RUnlock() 151 152 j, ok := s.jobs[repo] 153 if !ok || j == nil { 154 return nil, nil 155 } 156 return j, nil 157} 158 159func (s *Memstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) { 160 s.lk.RLock() 161 defer s.lk.RUnlock() 162 163 for _, j := range s.jobs { 164 if j.State() == StateEnqueued { 165 return j, nil 166 } 167 } 168 return nil, nil 169} 170 171func (s *Memstore) PurgeRepo(ctx context.Context, repo string) error { 172 s.lk.RLock() 173 defer s.lk.RUnlock() 174 175 delete(s.jobs, repo) 176 return nil 177} 178 179func (j *Memjob) Repo() string { 180 return j.repo 181} 182 183func (j *Memjob) State() string { 184 j.lk.Lock() 185 defer j.lk.Unlock() 186 187 return j.state 188} 189 190func (j *Memjob) SetState(ctx context.Context, state string) error { 191 j.lk.Lock() 192 defer j.lk.Unlock() 193 194 j.state = state 195 j.updatedAt = time.Now() 196 return nil 197} 198 199func (j *Memjob) Rev() string { 200 return j.rev 201} 202 203func (j *Memjob) SetRev(ctx context.Context, rev string) error { 204 j.rev = rev 205 return nil 206} 207 208func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error) error { 209 panic("TODO: copy what we end up doing from the gormstore") 210 /* 211 j.lk.Lock() 212 defer j.lk.Unlock() 213 214 for _, opset := range j.bufferedOps { 215 for _, op := range opset.ops { 216 if err := fn(op.Kind, op.Path, op.Record, op.Cid); err != nil { 217 return err 218 } 219 } 220 } 221 222 j.bufferedOps = map[string][]*BufferedOp{} 223 j.state = StateComplete 224 225 return nil 226 */ 227} 228 229func (j *Memjob) ClearBufferedOps(ctx context.Context) error { 230 j.lk.Lock() 231 defer j.lk.Unlock() 232 233 j.bufferedOps = []*opSet{} 234 j.updatedAt = time.Now() 235 return nil 236} 237 238func (j *Memjob) RetryCount() int { 239 j.lk.Lock() 240 defer j.lk.Unlock() 241 return 0 242}