forked from
tangled.org/core
Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/appview/cache"
14 "tangled.org/core/appview/config"
15 "tangled.org/core/appview/db"
16 "tangled.org/core/appview/models"
17 ec "tangled.org/core/eventconsumer"
18 "tangled.org/core/eventconsumer/cursor"
19 "tangled.org/core/log"
20 "tangled.org/core/orm"
21 "tangled.org/core/rbac"
22 spindle "tangled.org/core/spindle/models"
23)
24
25func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) {
26 logger := log.FromContext(ctx)
27 logger = log.SubLogger(logger, "spindlestream")
28
29 spindles, err := db.GetSpindles(
30 ctx,
31 d,
32 orm.FilterIsNot("verified", "null"),
33 )
34 if err != nil {
35 return nil, err
36 }
37
38 srcs := make(map[ec.Source]struct{})
39 for _, s := range spindles {
40 src := ec.NewSpindleSource(s.Instance)
41 srcs[src] = struct{}{}
42 }
43
44 cache := cache.New(c.Redis.Addr)
45 cursorStore := cursor.NewRedisCursorStore(cache)
46
47 cfg := ec.ConsumerConfig{
48 Sources: srcs,
49 ProcessFunc: spindleIngester(ctx, logger, d),
50 RetryInterval: c.Spindlestream.RetryInterval,
51 MaxRetryInterval: c.Spindlestream.MaxRetryInterval,
52 ConnectionTimeout: c.Spindlestream.ConnectionTimeout,
53 WorkerCount: c.Spindlestream.WorkerCount,
54 QueueSize: c.Spindlestream.QueueSize,
55 Logger: logger,
56 Dev: c.Core.Dev,
57 CursorStore: &cursorStore,
58 }
59
60 return ec.NewConsumer(cfg), nil
61}
62
63func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc {
64 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
65 switch msg.Nsid {
66 case tangled.PipelineStatusNSID:
67 return ingestPipelineStatus(ctx, logger, d, source, msg)
68 }
69
70 return nil
71 }
72}
73
74func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
75 var record tangled.PipelineStatus
76 err := json.Unmarshal(msg.EventJson, &record)
77 if err != nil {
78 return err
79 }
80
81 pipelineUri, err := syntax.ParseATURI(record.Pipeline)
82 if err != nil {
83 return err
84 }
85
86 exitCode := 0
87 if record.ExitCode != nil {
88 exitCode = int(*record.ExitCode)
89 }
90
91 // pick the record creation time if possible, or use time.Now
92 created := time.Now()
93 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) {
94 created = t
95 }
96
97 status := models.PipelineStatus{
98 Spindle: source.Key(),
99 Rkey: msg.Rkey,
100 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"),
101 PipelineRkey: pipelineUri.RecordKey().String(),
102 Created: created,
103 Workflow: record.Workflow,
104 Status: spindle.StatusKind(record.Status),
105 Error: record.Error,
106 ExitCode: exitCode,
107 }
108
109 err = db.AddPipelineStatus(ctx, d, status)
110 if err != nil {
111 return fmt.Errorf("failed to add pipeline status: %w", err)
112 }
113
114 return nil
115}