forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
this repo has no description
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "tangled.sh/tangled.sh/core/api/tangled"
13 "tangled.sh/tangled.sh/core/appview/cache"
14 "tangled.sh/tangled.sh/core/appview/config"
15 "tangled.sh/tangled.sh/core/appview/db"
16 ec "tangled.sh/tangled.sh/core/eventconsumer"
17 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
18 "tangled.sh/tangled.sh/core/log"
19 "tangled.sh/tangled.sh/core/rbac"
20 spindle "tangled.sh/tangled.sh/core/spindle/models"
21)
22
23func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) {
24 spindles, err := db.GetSpindles(
25 d,
26 db.FilterIsNot("verified", "null"),
27 )
28 if err != nil {
29 return nil, err
30 }
31
32 srcs := make(map[ec.Source]struct{})
33 for _, s := range spindles {
34 src := ec.NewSpindleSource(s.Instance)
35 srcs[src] = struct{}{}
36 }
37
38 logger := log.New("spindlestream")
39 cache := cache.New(c.Redis.Addr)
40 cursorStore := cursor.NewRedisCursorStore(cache)
41
42 cfg := ec.ConsumerConfig{
43 Sources: srcs,
44 ProcessFunc: spindleIngester(ctx, logger, d),
45 RetryInterval: c.Spindlestream.RetryInterval,
46 MaxRetryInterval: c.Spindlestream.MaxRetryInterval,
47 ConnectionTimeout: c.Spindlestream.ConnectionTimeout,
48 WorkerCount: c.Spindlestream.WorkerCount,
49 QueueSize: c.Spindlestream.QueueSize,
50 Logger: logger,
51 Dev: c.Core.Dev,
52 CursorStore: &cursorStore,
53 }
54
55 return ec.NewConsumer(cfg), nil
56}
57
58func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc {
59 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
60 switch msg.Nsid {
61 case tangled.PipelineStatusNSID:
62 return ingestPipelineStatus(ctx, logger, d, source, msg)
63 }
64
65 return nil
66 }
67}
68
69func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
70 var record tangled.PipelineStatus
71 err := json.Unmarshal(msg.EventJson, &record)
72 if err != nil {
73 return err
74 }
75
76 pipelineUri, err := syntax.ParseATURI(record.Pipeline)
77 if err != nil {
78 return err
79 }
80
81 exitCode := 0
82 if record.ExitCode != nil {
83 exitCode = int(*record.ExitCode)
84 }
85
86 // pick the record creation time if possible, or use time.Now
87 created := time.Now()
88 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) {
89 created = t
90 }
91
92 status := db.PipelineStatus{
93 Spindle: source.Key(),
94 Rkey: msg.Rkey,
95 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"),
96 PipelineRkey: pipelineUri.RecordKey().String(),
97 Created: created,
98 Workflow: record.Workflow,
99 Status: spindle.StatusKind(record.Status),
100 Error: record.Error,
101 ExitCode: exitCode,
102 }
103
104 err = db.AddPipelineStatus(d, status)
105 if err != nil {
106 return fmt.Errorf("failed to add pipeline status: %w", err)
107 }
108
109 return nil
110}