this repo has no description

feat(bsky-activity): use a queue

Changed files
+55 -8
cmd
bsky-activity
+55 -8
cmd/bsky-activity/main.go
··· 7 7 "os" 8 8 "os/signal" 9 9 "strings" 10 + "sync" 10 11 "syscall" 12 + "time" 11 13 12 14 appbsky "github.com/bluesky-social/indigo/api/bsky" 13 15 jetstream "github.com/bluesky-social/jetstream/pkg/models" ··· 15 17 "github.com/redis/go-redis/v9" 16 18 ) 17 19 20 + type Queue struct { 21 + lk sync.Mutex 22 + events []jetstream.Event 23 + } 24 + 25 + func NewQueue(capacity int) *Queue { 26 + return &Queue{ 27 + events: make([]jetstream.Event, 0, capacity), 28 + } 29 + } 30 + 31 + func (q *Queue) Enqueue(event jetstream.Event) { 32 + q.lk.Lock() 33 + defer q.lk.Unlock() 34 + 35 + q.events = append(q.events, event) 36 + } 37 + 38 + func (q *Queue) Dequeue() (jetstream.Event, bool) { 39 + q.lk.Lock() 40 + defer q.lk.Unlock() 41 + 42 + var event jetstream.Event 43 + 44 + if len(q.events) == 0 { 45 + return event, false 46 + } 47 + 48 + event = q.events[0] 49 + q.events = q.events[1:] 50 + return event, true 51 + } 52 + 53 + func (q *Queue) Size() int { 54 + q.lk.Lock() 55 + defer q.lk.Unlock() 56 + 57 + return len(q.events) 58 + } 59 + 18 60 const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe` 19 61 20 62 var AppBskyAllowlist = map[string]bool{ ··· 59 101 return false 60 102 } 61 103 62 - func handler(ctx context.Context, events <-chan jetstream.Event) { 104 + func handler(ctx context.Context, queue *Queue) { 63 105 rdb := redis.NewClient(&redis.Options{ 64 106 Addr: "localhost:6379", 65 107 Password: "", ··· 69 111 var eventCount int 70 112 71 113 eventLoop: 72 - for event := range events { 114 + for { 73 115 select { 74 116 case <-ctx.Done(): 75 117 break eventLoop 76 118 default: 77 119 } 78 120 121 + event, ok := queue.Dequeue() 122 + if !ok { 123 + time.Sleep(100 * time.Millisecond) 124 + continue 125 + } 126 + 79 127 if event.Kind != jetstream.EventKindCommit { 80 128 continue 81 129 } ··· 145 193 if _, err := pipe.Exec(ctx); err != nil { 146 194 log.Printf("failed to exec pipe\n") 147 195 } 196 + log.Printf("queue size: %d\n", queue.Size()) 148 197 } 149 198 } 150 199 } ··· 164 213 log.Printf("websocket closed\n") 165 214 }() 166 215 167 - jetstreamEvents := make(chan jetstream.Event) 168 - go handler(ctx, jetstreamEvents) 216 + queue := NewQueue(100_000) 217 + go handler(ctx, queue) 169 218 170 219 log.Printf("starting up\n") 171 - var event jetstream.Event 172 220 go func() { 173 221 for { 174 - event = jetstream.Event{} 222 + var event jetstream.Event 175 223 err := conn.ReadJSON(&event) 176 224 if err != nil { 177 225 log.Printf("ReadJSON error: %v\n", err) 178 226 stop() 179 227 break 180 - } else { 181 - jetstreamEvents <- event 182 228 } 229 + queue.Enqueue(event) 183 230 } 184 231 }() 185 232