forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
Monorepo for Tangled
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/appview/cache"
14 "tangled.org/core/appview/config"
15 "tangled.org/core/appview/db"
16 "tangled.org/core/appview/models"
17 ec "tangled.org/core/eventconsumer"
18 "tangled.org/core/eventconsumer/cursor"
19 "tangled.org/core/log"
20 "tangled.org/core/rbac"
21 spindle "tangled.org/core/spindle/models"
22)
23
24func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) {
25 logger := log.FromContext(ctx)
26 logger = log.SubLogger(logger, "spindlestream")
27
28 spindles, err := db.GetSpindles(
29 d,
30 db.FilterIsNot("verified", "null"),
31 )
32 if err != nil {
33 return nil, err
34 }
35
36 srcs := make(map[ec.Source]struct{})
37 for _, s := range spindles {
38 src := ec.NewSpindleSource(s.Instance)
39 srcs[src] = struct{}{}
40 }
41
42 cache := cache.New(c.Redis.Addr)
43 cursorStore := cursor.NewRedisCursorStore(cache)
44
45 cfg := ec.ConsumerConfig{
46 Sources: srcs,
47 ProcessFunc: spindleIngester(ctx, logger, d),
48 RetryInterval: c.Spindlestream.RetryInterval,
49 MaxRetryInterval: c.Spindlestream.MaxRetryInterval,
50 ConnectionTimeout: c.Spindlestream.ConnectionTimeout,
51 WorkerCount: c.Spindlestream.WorkerCount,
52 QueueSize: c.Spindlestream.QueueSize,
53 Logger: logger,
54 Dev: c.Core.Dev,
55 CursorStore: &cursorStore,
56 }
57
58 return ec.NewConsumer(cfg), nil
59}
60
61func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc {
62 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
63 switch msg.Nsid {
64 case tangled.PipelineStatusNSID:
65 return ingestPipelineStatus(ctx, logger, d, source, msg)
66 }
67
68 return nil
69 }
70}
71
72func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
73 var record tangled.PipelineStatus
74 err := json.Unmarshal(msg.EventJson, &record)
75 if err != nil {
76 return err
77 }
78
79 pipelineUri, err := syntax.ParseATURI(record.Pipeline)
80 if err != nil {
81 return err
82 }
83
84 exitCode := 0
85 if record.ExitCode != nil {
86 exitCode = int(*record.ExitCode)
87 }
88
89 // pick the record creation time if possible, or use time.Now
90 created := time.Now()
91 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) {
92 created = t
93 }
94
95 status := models.PipelineStatus{
96 Spindle: source.Key(),
97 Rkey: msg.Rkey,
98 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"),
99 PipelineRkey: pipelineUri.RecordKey().String(),
100 Created: created,
101 Workflow: record.Workflow,
102 Status: spindle.StatusKind(record.Status),
103 Error: record.Error,
104 ExitCode: exitCode,
105 }
106
107 err = db.AddPipelineStatus(d, status)
108 if err != nil {
109 return fmt.Errorf("failed to add pipeline status: %w", err)
110 }
111
112 return nil
113}