tangled
alpha
login
or
join now
pdewey.com
/
tangled-core
forked from
tangled.org/core
Monorepo for Tangled
0
fork
atom
overview
issues
pulls
pipelines
Compare changes
Choose any two refs to compare.
base:
txns
two-way-comms
tracing
three-easy-steps
test-pr-link
test-ci
tags-and-releases
summarize-rounds
stars
stacked-prs
spindle
sl/zkqpstkxovqx
sl/wznxxmtqvxwk
sl/wnrvrwyvrlzo
sl/uvpzuszrulvq
sl/sqkrqopzkvoo
sl/spindle-rewrite
sl/spindle-adapters
sl/rbac2test
sl/okmkyytolvko
sl/kzmmroxoztll
sl/knotmirror
sl/git-objects
settings-router
session-refresh
revert-telemetry
repotab-icons
repo-deletion
render-markup
remove-set-repo-at
release-artifacts
refactor-ks-client
reduce-dids
push-zxovstvplnok
push-zwoymssunvsp
push-zvuqkxuqzmlx
push-zvtqtworuppk
push-zvkrywwskknq
push-zunxqvqoqmoq
push-zrvypwzlqtvz
push-zrkqwwrtxvzn
push-zqrmrxxvrylu
push-zqqvxroypxvy
push-zomkuzmzokvl
push-znrkktvllqsp
push-znlsowpxwvvq
push-zlzkyrrnylxr
push-zklmrltkvxkq
push-yytstxoqlwmx
push-yynslowxntql
push-ywynwzosnrox
push-yuwoytzsmkvm
push-yulzrzunkznl
push-ytzxyplmvunn
push-ytpxvpknlxun
push-ytkuzknmmrmn
push-yssxzpkyorwv
push-yqnqquktxqpx
push-ylvkxyvlmkyq
push-ykxtvrlpxwrl
push-ykwytywspowp
push-xwxnuorpzwsu
push-xwotmtuuvokm
push-xvuvxolnykpl
push-xvrvnuvsmslv
push-xvnwyvuwwylt
push-xtzkpqpkmvvr
push-xtuxqqlytkws
push-xttmuzqzktwv
push-xsmwulupylxx
push-xpolynpvotzt
push-xopyykmxuqzw
push-xmplzvpqtnru
push-xlupzwtslltp
push-wzlvlnywrxrs
push-wzkqymznotmx
push-wyonpyyzurwl
push-wykupplnxytq
push-wxwpnnytwxsq
push-wxvwuxxzswmv
push-wvywrqlppxqo
push-wvwxxlyxrxlm
push-wukyptkosoux
push-wtmrprmomuus
push-wsxnywmtsnrp
push-wqzwpukkvnuw
push-wpkykovtqxnx
push-wnwpznvpzwqu
push-wnotmtoqlnvl
push-wmkmzrsvlkmk
push-wkwyolovuxvq
push-vyyrpsmynrwo
push-vyusnwqnmxwy
push-vynsusnqpmus
push-vwyomovpppwp
push-vwuuzxwwsvow
push-vvluoqoywsvp
push-vuzywsvmkwqn
push-vtunvupqotuo
push-vssvkowylwvx
push-vquoltwpkuny
push-vnxxyxursomy
push-vmxtqnzznxvk
push-vkywmxzkrnqx
push-uylxxnsupvlk
push-uwuymmkuwsrs
push-uwotwontksrz
push-uuyqkqmkmlsn
push-uruyyqupsmro
push-upnknovvnoqr
push-uotxnqrqnlyl
push-ummlkpsrvrvn
push-ukznmmplmlul
push-ukmxnzrlrpuo
push-tzyltlmtsznq
push-tyxzqwosmzuv
push-twskywooslyl
push-twnpklmstzor
push-twlxnytrpyuk
push-tvvnxqyykkwr
push-tvtwutsrtoxt
push-tultwuoxrmun
push-ttyzwxqtkpsl
push-tnwwtqxpoqpk
push-tlxunysxvxwk
push-tkxmuvxkylmy
push-tkusxvwknltw
push-synlttxvkzox
push-svxqqzsxxkor
push-svtkmrzmrwky
push-stktrzvlltpw
push-sssuxsytslts
push-spvnpqlqqpkw
push-snyqozvrosmk
push-snktzuwttuvu
push-smsrusztuvwt
push-rvtqynpmozzy
push-rurrtsvmnvku
push-ruoqnsmttnxx
push-rtwrkkyllvkm
push-rtvxspsprvuv
push-rsrowlxunurk
push-rrsulrtnsmtw
push-rqzvvtnzkzvq
push-rqxyyurmpkps
push-rnztkppsvskw
push-rlorkkyzokzr
push-rlktrmqtttnn
push-qwnqkqnmovyn
push-qtrkvornpykm
push-qsmrouptzqkq
push-qruszvywputp
push-qrltzqmlrlln
push-qqwquvlysrxt
push-qqsspmnuxqwk
push-qqrowoolzppw
push-qoplqnlvlqqo
push-qnstkrpltlqu
push-qlzpkvltqlzm
push-qkpqsrknozxs
push-qkkmntrmxqly
push-pzwzzlwkkpnw
push-pzrysszkwwtk
push-pymywyqyumlo
push-pyllqzrprrny
push-pwqwlvnsqtqr
push-pvpnsqvvzuky
push-ptrrwwvnkmxq
push-psqwruuwvwlo
push-prszwlupsoqz
push-ppstsspzpxrx
push-potvrpwlpwsl
push-pnknzwntuuwx
push-pmtpswykplzr
push-plpuqtyrprou
push-plmznxvyqrqw
push-pkyzqzmqunnl
push-pkqzkqmxotyz
push-pkklqwlqwoml
push-owwtwvxyzowl
push-ovsuzpqkpmlt
push-ovrqrxnpvroz
push-ossmlnsnvstq
push-ormxrmmzomqu
push-oovokvlkywly
push-okymwlupkook
push-nwslswprzvmx
push-nwrnkztxkovl
push-nupxprsykpzq
push-nsolyvvsyvzw
push-nqrqutkzxskl
push-nozqtwvsrvkx
push-nnuvlyptxsqy
push-nmwononozzxz
push-nlvzxwtrmryl
push-nlnnyywnokyn
push-mzupsvxpvqvx
push-mzsvpkottnnt
push-myqmppunmplu
push-mwkwusmyymno
push-mvmrzuxwmzvs
push-mutuoxwuokqw
push-mtsxyxnkznyy
push-mtqkyzxrqnnn
push-msvlvnlwuyxy
push-mrwusmywonor
push-mqkxnymrqmzz
push-mozrrovxmlou
push-mozonwyomvmy
push-mopllyvuuuls
push-monplorvolvl
push-momltwttmuyq
push-mntstlmzqrvs
push-mmxkmyvyuzlk
push-mmrtnyoskwoo
push-mmnuoyuqplyk
push-mlxxvvqpzvlw
push-mllunlokmuxp
push-lzkzortpuyyq
push-lyrpkknpnrus
push-lxxtrqtnnoxy
push-lvuknrupsyox
push-lstutzylzylk
push-lrzzmtxokrxw
push-lnvnxmxlqkux
push-lnupvsypkuow
push-llqkvunvvzyv
push-ktsnmppqsnls
push-kstnoynmspqp
push-ksrsmmytwuul
push-kslplloylmlp
push-kpmvvnzwyosv
push-kltomzxpyoxq
pull-from-fork-pill
profile-timeline
profile-customizations
pr-actions
patch-requests
paginate-issues
packages-small
packages
oplog
opengraph
op/zxpquyouulmx
op/ymtommrxssvo
op/vyrymqtwolsn
op/uuutpuzrlosr
op/tywxsxvptvss
op/pwztvmkoslrp
op/lyvszuuqvnzs
op/lrpyxormllvp
op/lpxlqqoskzzw
oauth
more-diff-stats
master
local-fragments
knot-xrpc
knot-local-clone
knot-cli
issues-edit
interdiff
improve-styling
improve-repo-name-checks
icy/tolqpt
icy/pwvyvo
icy/nmoxmq
group-profile-timeline
format-patch
fork-repo
fork-pulls
fix-tw-dark
fix-resubmit
fix-refresh-issues
fix-knot-forks
fix-jetstream-bugs
fix-env-test
file-tree
fast-dev
enable-html
drop-at
diff-improvements
did-email-assoc
crash-repro-knot
consolidate-syntax-styles
compare-merge-base
commits-page
commit-verification
ci-statuses
ci
camo
cache-busting
branch-prs-2
branch-prs
branch-fmt-patch
atprotate-pubkeys
all-repositories-page
add-migrations
v1.11.0-alpha
v1.10.0-alpha
v1.9.1-alpha
v1.9.0-alpha
v1.8.1-alpha
v1.8.0-alpha
v1.7.0-alpha
v1.6.0-alpha
v1.5.0-alpha
v1.4.0-alpha
v1.3.0-alpha
v1.2.2-alpha
v1.2.1-alpha
v1.2.0-alpha
v1.1.2-alpha
v1.1.1-alpha
v1.1.0-alpha
v1.0.6-alpha
v1.0.5-alpha
v1.0.4-alpha
v1.0.3-alpha
v1.0.2-alpha
v1.0.1-alpha
v1.0.0-alpha
compare:
txns
two-way-comms
tracing
three-easy-steps
test-pr-link
test-ci
tags-and-releases
summarize-rounds
stars
stacked-prs
spindle
sl/zkqpstkxovqx
sl/wznxxmtqvxwk
sl/wnrvrwyvrlzo
sl/uvpzuszrulvq
sl/sqkrqopzkvoo
sl/spindle-rewrite
sl/spindle-adapters
sl/rbac2test
sl/okmkyytolvko
sl/kzmmroxoztll
sl/knotmirror
sl/git-objects
settings-router
session-refresh
revert-telemetry
repotab-icons
repo-deletion
render-markup
remove-set-repo-at
release-artifacts
refactor-ks-client
reduce-dids
push-zxovstvplnok
push-zwoymssunvsp
push-zvuqkxuqzmlx
push-zvtqtworuppk
push-zvkrywwskknq
push-zunxqvqoqmoq
push-zrvypwzlqtvz
push-zrkqwwrtxvzn
push-zqrmrxxvrylu
push-zqqvxroypxvy
push-zomkuzmzokvl
push-znrkktvllqsp
push-znlsowpxwvvq
push-zlzkyrrnylxr
push-zklmrltkvxkq
push-yytstxoqlwmx
push-yynslowxntql
push-ywynwzosnrox
push-yuwoytzsmkvm
push-yulzrzunkznl
push-ytzxyplmvunn
push-ytpxvpknlxun
push-ytkuzknmmrmn
push-yssxzpkyorwv
push-yqnqquktxqpx
push-ylvkxyvlmkyq
push-ykxtvrlpxwrl
push-ykwytywspowp
push-xwxnuorpzwsu
push-xwotmtuuvokm
push-xvuvxolnykpl
push-xvrvnuvsmslv
push-xvnwyvuwwylt
push-xtzkpqpkmvvr
push-xtuxqqlytkws
push-xttmuzqzktwv
push-xsmwulupylxx
push-xpolynpvotzt
push-xopyykmxuqzw
push-xmplzvpqtnru
push-xlupzwtslltp
push-wzlvlnywrxrs
push-wzkqymznotmx
push-wyonpyyzurwl
push-wykupplnxytq
push-wxwpnnytwxsq
push-wxvwuxxzswmv
push-wvywrqlppxqo
push-wvwxxlyxrxlm
push-wukyptkosoux
push-wtmrprmomuus
push-wsxnywmtsnrp
push-wqzwpukkvnuw
push-wpkykovtqxnx
push-wnwpznvpzwqu
push-wnotmtoqlnvl
push-wmkmzrsvlkmk
push-wkwyolovuxvq
push-vyyrpsmynrwo
push-vyusnwqnmxwy
push-vynsusnqpmus
push-vwyomovpppwp
push-vwuuzxwwsvow
push-vvluoqoywsvp
push-vuzywsvmkwqn
push-vtunvupqotuo
push-vssvkowylwvx
push-vquoltwpkuny
push-vnxxyxursomy
push-vmxtqnzznxvk
push-vkywmxzkrnqx
push-uylxxnsupvlk
push-uwuymmkuwsrs
push-uwotwontksrz
push-uuyqkqmkmlsn
push-uruyyqupsmro
push-upnknovvnoqr
push-uotxnqrqnlyl
push-ummlkpsrvrvn
push-ukznmmplmlul
push-ukmxnzrlrpuo
push-tzyltlmtsznq
push-tyxzqwosmzuv
push-twskywooslyl
push-twnpklmstzor
push-twlxnytrpyuk
push-tvvnxqyykkwr
push-tvtwutsrtoxt
push-tultwuoxrmun
push-ttyzwxqtkpsl
push-tnwwtqxpoqpk
push-tlxunysxvxwk
push-tkxmuvxkylmy
push-tkusxvwknltw
push-synlttxvkzox
push-svxqqzsxxkor
push-svtkmrzmrwky
push-stktrzvlltpw
push-sssuxsytslts
push-spvnpqlqqpkw
push-snyqozvrosmk
push-snktzuwttuvu
push-smsrusztuvwt
push-rvtqynpmozzy
push-rurrtsvmnvku
push-ruoqnsmttnxx
push-rtwrkkyllvkm
push-rtvxspsprvuv
push-rsrowlxunurk
push-rrsulrtnsmtw
push-rqzvvtnzkzvq
push-rqxyyurmpkps
push-rnztkppsvskw
push-rlorkkyzokzr
push-rlktrmqtttnn
push-qwnqkqnmovyn
push-qtrkvornpykm
push-qsmrouptzqkq
push-qruszvywputp
push-qrltzqmlrlln
push-qqwquvlysrxt
push-qqsspmnuxqwk
push-qqrowoolzppw
push-qoplqnlvlqqo
push-qnstkrpltlqu
push-qlzpkvltqlzm
push-qkpqsrknozxs
push-qkkmntrmxqly
push-pzwzzlwkkpnw
push-pzrysszkwwtk
push-pymywyqyumlo
push-pyllqzrprrny
push-pwqwlvnsqtqr
push-pvpnsqvvzuky
push-ptrrwwvnkmxq
push-psqwruuwvwlo
push-prszwlupsoqz
push-ppstsspzpxrx
push-potvrpwlpwsl
push-pnknzwntuuwx
push-pmtpswykplzr
push-plpuqtyrprou
push-plmznxvyqrqw
push-pkyzqzmqunnl
push-pkqzkqmxotyz
push-pkklqwlqwoml
push-owwtwvxyzowl
push-ovsuzpqkpmlt
push-ovrqrxnpvroz
push-ossmlnsnvstq
push-ormxrmmzomqu
push-oovokvlkywly
push-okymwlupkook
push-nwslswprzvmx
push-nwrnkztxkovl
push-nupxprsykpzq
push-nsolyvvsyvzw
push-nqrqutkzxskl
push-nozqtwvsrvkx
push-nnuvlyptxsqy
push-nmwononozzxz
push-nlvzxwtrmryl
push-nlnnyywnokyn
push-mzupsvxpvqvx
push-mzsvpkottnnt
push-myqmppunmplu
push-mwkwusmyymno
push-mvmrzuxwmzvs
push-mutuoxwuokqw
push-mtsxyxnkznyy
push-mtqkyzxrqnnn
push-msvlvnlwuyxy
push-mrwusmywonor
push-mqkxnymrqmzz
push-mozrrovxmlou
push-mozonwyomvmy
push-mopllyvuuuls
push-monplorvolvl
push-momltwttmuyq
push-mntstlmzqrvs
push-mmxkmyvyuzlk
push-mmrtnyoskwoo
push-mmnuoyuqplyk
push-mlxxvvqpzvlw
push-mllunlokmuxp
push-lzkzortpuyyq
push-lyrpkknpnrus
push-lxxtrqtnnoxy
push-lvuknrupsyox
push-lstutzylzylk
push-lrzzmtxokrxw
push-lnvnxmxlqkux
push-lnupvsypkuow
push-llqkvunvvzyv
push-ktsnmppqsnls
push-kstnoynmspqp
push-ksrsmmytwuul
push-kslplloylmlp
push-kpmvvnzwyosv
push-kltomzxpyoxq
pull-from-fork-pill
profile-timeline
profile-customizations
pr-actions
patch-requests
paginate-issues
packages-small
packages
oplog
opengraph
op/zxpquyouulmx
op/ymtommrxssvo
op/vyrymqtwolsn
op/uuutpuzrlosr
op/tywxsxvptvss
op/pwztvmkoslrp
op/lyvszuuqvnzs
op/lrpyxormllvp
op/lpxlqqoskzzw
oauth
more-diff-stats
master
local-fragments
knot-xrpc
knot-local-clone
knot-cli
issues-edit
interdiff
improve-styling
improve-repo-name-checks
icy/tolqpt
icy/pwvyvo
icy/nmoxmq
group-profile-timeline
format-patch
fork-repo
fork-pulls
fix-tw-dark
fix-resubmit
fix-refresh-issues
fix-knot-forks
fix-jetstream-bugs
fix-env-test
file-tree
fast-dev
enable-html
drop-at
diff-improvements
did-email-assoc
crash-repro-knot
consolidate-syntax-styles
compare-merge-base
commits-page
commit-verification
ci-statuses
ci
camo
cache-busting
branch-prs-2
branch-prs
branch-fmt-patch
atprotate-pubkeys
all-repositories-page
add-migrations
v1.11.0-alpha
v1.10.0-alpha
v1.9.1-alpha
v1.9.0-alpha
v1.8.1-alpha
v1.8.0-alpha
v1.7.0-alpha
v1.6.0-alpha
v1.5.0-alpha
v1.4.0-alpha
v1.3.0-alpha
v1.2.2-alpha
v1.2.1-alpha
v1.2.0-alpha
v1.1.2-alpha
v1.1.1-alpha
v1.1.0-alpha
v1.0.6-alpha
v1.0.5-alpha
v1.0.4-alpha
v1.0.3-alpha
v1.0.2-alpha
v1.0.1-alpha
v1.0.0-alpha
go
+169
-46
4 changed files
expand all
collapse all
unified
split
appview
notify
db
db.go
logging_notifier.go
merged_notifier.go
state
state.go
+20
-31
appview/notify/merged_notifier.go
···
2
3
import (
4
"context"
5
-
"log/slog"
6
-
"reflect"
7
"sync"
8
9
"github.com/bluesky-social/indigo/atproto/syntax"
10
"tangled.org/core/appview/models"
11
-
"tangled.org/core/log"
12
)
13
14
type mergedNotifier struct {
15
notifiers []Notifier
16
-
logger *slog.Logger
17
}
18
19
-
func NewMergedNotifier(notifiers []Notifier, logger *slog.Logger) Notifier {
20
-
return &mergedNotifier{notifiers, logger}
21
}
22
23
var _ Notifier = &mergedNotifier{}
24
25
// fanout calls the same method on all notifiers concurrently
26
-
func (m *mergedNotifier) fanout(method string, ctx context.Context, args ...any) {
27
-
ctx = log.IntoContext(ctx, m.logger.With("method", method))
28
var wg sync.WaitGroup
29
for _, n := range m.notifiers {
30
wg.Add(1)
31
go func(notifier Notifier) {
32
defer wg.Done()
33
-
v := reflect.ValueOf(notifier).MethodByName(method)
34
-
in := make([]reflect.Value, len(args)+1)
35
-
in[0] = reflect.ValueOf(ctx)
36
-
for i, arg := range args {
37
-
in[i+1] = reflect.ValueOf(arg)
38
-
}
39
-
v.Call(in)
40
}(n)
41
}
42
}
43
44
func (m *mergedNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
45
-
m.fanout("NewRepo", ctx, repo)
46
}
47
48
func (m *mergedNotifier) NewStar(ctx context.Context, star *models.Star) {
49
-
m.fanout("NewStar", ctx, star)
50
}
51
52
func (m *mergedNotifier) DeleteStar(ctx context.Context, star *models.Star) {
53
-
m.fanout("DeleteStar", ctx, star)
54
}
55
56
func (m *mergedNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
57
-
m.fanout("NewIssue", ctx, issue, mentions)
58
}
59
60
func (m *mergedNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
61
-
m.fanout("NewIssueComment", ctx, comment, mentions)
62
}
63
64
func (m *mergedNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
65
-
m.fanout("NewIssueState", ctx, actor, issue)
66
}
67
68
func (m *mergedNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
69
-
m.fanout("DeleteIssue", ctx, issue)
70
}
71
72
func (m *mergedNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
73
-
m.fanout("NewFollow", ctx, follow)
74
}
75
76
func (m *mergedNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
77
-
m.fanout("DeleteFollow", ctx, follow)
78
}
79
80
func (m *mergedNotifier) NewPull(ctx context.Context, pull *models.Pull) {
81
-
m.fanout("NewPull", ctx, pull)
82
}
83
84
func (m *mergedNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) {
85
-
m.fanout("NewPullComment", ctx, comment, mentions)
86
}
87
88
func (m *mergedNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
89
-
m.fanout("NewPullState", ctx, actor, pull)
90
}
91
92
func (m *mergedNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
93
-
m.fanout("UpdateProfile", ctx, profile)
94
}
95
96
func (m *mergedNotifier) NewString(ctx context.Context, s *models.String) {
97
-
m.fanout("NewString", ctx, s)
98
}
99
100
func (m *mergedNotifier) EditString(ctx context.Context, s *models.String) {
101
-
m.fanout("EditString", ctx, s)
102
}
103
104
func (m *mergedNotifier) DeleteString(ctx context.Context, did, rkey string) {
105
-
m.fanout("DeleteString", ctx, did, rkey)
106
}
···
2
3
import (
4
"context"
0
0
5
"sync"
6
7
"github.com/bluesky-social/indigo/atproto/syntax"
8
"tangled.org/core/appview/models"
0
9
)
10
11
type mergedNotifier struct {
12
notifiers []Notifier
0
13
}
14
15
+
func NewMergedNotifier(notifiers []Notifier) Notifier {
16
+
return &mergedNotifier{notifiers}
17
}
18
19
var _ Notifier = &mergedNotifier{}
20
21
// fanout calls the same method on all notifiers concurrently
22
+
func (m *mergedNotifier) fanout(callback func(Notifier)) {
0
23
var wg sync.WaitGroup
24
for _, n := range m.notifiers {
25
wg.Add(1)
26
go func(notifier Notifier) {
27
defer wg.Done()
28
+
callback(n)
0
0
0
0
0
0
29
}(n)
30
}
31
}
32
33
func (m *mergedNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
34
+
m.fanout(func(n Notifier) { n.NewRepo(ctx, repo) })
35
}
36
37
func (m *mergedNotifier) NewStar(ctx context.Context, star *models.Star) {
38
+
m.fanout(func(n Notifier) { n.NewStar(ctx, star) })
39
}
40
41
func (m *mergedNotifier) DeleteStar(ctx context.Context, star *models.Star) {
42
+
m.fanout(func(n Notifier) { n.DeleteStar(ctx, star) })
43
}
44
45
func (m *mergedNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
46
+
m.fanout(func(n Notifier) { n.NewIssue(ctx, issue, mentions) })
47
}
48
49
func (m *mergedNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
50
+
m.fanout(func(n Notifier) { n.NewIssueComment(ctx, comment, mentions) })
51
}
52
53
func (m *mergedNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
54
+
m.fanout(func(n Notifier) { n.NewIssueState(ctx, actor, issue) })
55
}
56
57
func (m *mergedNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
58
+
m.fanout(func(n Notifier) { n.DeleteIssue(ctx, issue) })
59
}
60
61
func (m *mergedNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
62
+
m.fanout(func(n Notifier) { n.NewFollow(ctx, follow) })
63
}
64
65
func (m *mergedNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
66
+
m.fanout(func(n Notifier) { n.DeleteFollow(ctx, follow) })
67
}
68
69
func (m *mergedNotifier) NewPull(ctx context.Context, pull *models.Pull) {
70
+
m.fanout(func(n Notifier) { n.NewPull(ctx, pull) })
71
}
72
73
func (m *mergedNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) {
74
+
m.fanout(func(n Notifier) { n.NewPullComment(ctx, comment, mentions) })
75
}
76
77
func (m *mergedNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
78
+
m.fanout(func(n Notifier) { n.NewPullState(ctx, actor, pull) })
79
}
80
81
func (m *mergedNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
82
+
m.fanout(func(n Notifier) { n.UpdateProfile(ctx, profile) })
83
}
84
85
func (m *mergedNotifier) NewString(ctx context.Context, s *models.String) {
86
+
m.fanout(func(n Notifier) { n.NewString(ctx, s) })
87
}
88
89
func (m *mergedNotifier) EditString(ctx context.Context, s *models.String) {
90
+
m.fanout(func(n Notifier) { n.EditString(ctx, s) })
91
}
92
93
func (m *mergedNotifier) DeleteString(ctx context.Context, did, rkey string) {
94
+
m.fanout(func(n Notifier) { n.DeleteString(ctx, did, rkey) })
95
}
+42
-14
appview/notify/db/db.go
···
2
3
import (
4
"context"
5
-
"log"
6
"slices"
7
8
"github.com/bluesky-social/indigo/atproto/syntax"
···
11
"tangled.org/core/appview/models"
12
"tangled.org/core/appview/notify"
13
"tangled.org/core/idresolver"
0
14
"tangled.org/core/orm"
15
"tangled.org/core/sets"
16
)
···
38
}
39
40
func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
0
0
41
if star.RepoAt.Collection().String() != tangled.RepoNSID {
42
// skip string stars for now
43
return
···
45
var err error
46
repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt)))
47
if err != nil {
48
-
log.Printf("NewStar: failed to get repos: %v", err)
49
return
50
}
51
···
59
var pullId *int64
60
61
n.notifyEvent(
0
62
actorDid,
63
recipients,
64
eventType,
···
75
}
76
77
func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
0
0
78
collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
79
if err != nil {
80
-
log.Printf("failed to fetch collaborators: %v", err)
81
return
82
}
83
···
101
var pullId *int64
102
103
n.notifyEvent(
0
104
actorDid,
105
recipients,
106
models.NotificationTypeIssueCreated,
···
111
pullId,
112
)
113
n.notifyEvent(
0
114
actorDid,
115
sets.Collect(slices.Values(mentions)),
116
models.NotificationTypeUserMentioned,
···
123
}
124
125
func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
0
0
126
issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.IssueAt))
127
if err != nil {
128
-
log.Printf("NewIssueComment: failed to get issues: %v", err)
129
return
130
}
131
if len(issues) == 0 {
132
-
log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt)
133
return
134
}
135
issue := issues[0]
···
170
var pullId *int64
171
172
n.notifyEvent(
0
173
actorDid,
174
recipients,
175
models.NotificationTypeIssueCommented,
···
180
pullId,
181
)
182
n.notifyEvent(
0
183
actorDid,
184
sets.Collect(slices.Values(mentions)),
185
models.NotificationTypeUserMentioned,
···
204
var repoId, issueId, pullId *int64
205
206
n.notifyEvent(
0
207
actorDid,
208
recipients,
209
eventType,
···
220
}
221
222
func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
0
0
223
repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
224
if err != nil {
225
-
log.Printf("NewPull: failed to get repos: %v", err)
226
return
227
}
228
collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
229
if err != nil {
230
-
log.Printf("failed to fetch collaborators: %v", err)
231
return
232
}
233
···
249
pullId := &p
250
251
n.notifyEvent(
0
252
actorDid,
253
recipients,
254
eventType,
···
261
}
262
263
func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) {
0
0
264
pull, err := db.GetPull(n.db,
265
syntax.ATURI(comment.RepoAt),
266
comment.PullId,
267
)
268
if err != nil {
269
-
log.Printf("NewPullComment: failed to get pulls: %v", err)
270
return
271
}
272
273
repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", comment.RepoAt))
274
if err != nil {
275
-
log.Printf("NewPullComment: failed to get repos: %v", err)
276
return
277
}
278
···
298
pullId := &p
299
300
n.notifyEvent(
0
301
actorDid,
302
recipients,
303
eventType,
···
308
pullId,
309
)
310
n.notifyEvent(
0
311
actorDid,
312
sets.Collect(slices.Values(mentions)),
313
models.NotificationTypeUserMentioned,
···
336
}
337
338
func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
0
0
339
collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
340
if err != nil {
341
-
log.Printf("failed to fetch collaborators: %v", err)
342
return
343
}
344
···
368
}
369
370
n.notifyEvent(
0
371
actor,
372
recipients,
373
eventType,
···
380
}
381
382
func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
0
0
383
// Get repo details
384
repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
385
if err != nil {
386
-
log.Printf("NewPullState: failed to get repos: %v", err)
387
return
388
}
389
390
collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
391
if err != nil {
392
-
log.Printf("failed to fetch collaborators: %v", err)
393
return
394
}
395
···
417
case models.PullMerged:
418
eventType = models.NotificationTypePullMerged
419
default:
420
-
log.Println("NewPullState: unexpected new PR state:", pull.State)
421
return
422
}
423
p := int64(pull.ID)
424
pullId := &p
425
426
n.notifyEvent(
0
427
actor,
428
recipients,
429
eventType,
···
436
}
437
438
func (n *databaseNotifier) notifyEvent(
0
439
actorDid syntax.DID,
440
recipients sets.Set[syntax.DID],
441
eventType models.NotificationType,
···
445
issueId *int64,
446
pullId *int64,
447
) {
0
0
448
// if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody
449
if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions {
450
return
···
494
}
495
496
if err := db.CreateNotification(tx, notif); err != nil {
497
-
log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err)
498
}
499
}
500
···
2
3
import (
4
"context"
0
5
"slices"
6
7
"github.com/bluesky-social/indigo/atproto/syntax"
···
10
"tangled.org/core/appview/models"
11
"tangled.org/core/appview/notify"
12
"tangled.org/core/idresolver"
13
+
"tangled.org/core/log"
14
"tangled.org/core/orm"
15
"tangled.org/core/sets"
16
)
···
38
}
39
40
func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
41
+
l := log.FromContext(ctx)
42
+
43
if star.RepoAt.Collection().String() != tangled.RepoNSID {
44
// skip string stars for now
45
return
···
47
var err error
48
repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt)))
49
if err != nil {
50
+
l.Error("failed to get repos", "err", err)
51
return
52
}
53
···
61
var pullId *int64
62
63
n.notifyEvent(
64
+
ctx,
65
actorDid,
66
recipients,
67
eventType,
···
78
}
79
80
func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
81
+
l := log.FromContext(ctx)
82
+
83
collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
84
if err != nil {
85
+
l.Error("failed to fetch collaborators", "err", err)
86
return
87
}
88
···
106
var pullId *int64
107
108
n.notifyEvent(
109
+
ctx,
110
actorDid,
111
recipients,
112
models.NotificationTypeIssueCreated,
···
117
pullId,
118
)
119
n.notifyEvent(
120
+
ctx,
121
actorDid,
122
sets.Collect(slices.Values(mentions)),
123
models.NotificationTypeUserMentioned,
···
130
}
131
132
func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
133
+
l := log.FromContext(ctx)
134
+
135
issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.IssueAt))
136
if err != nil {
137
+
l.Error("failed to get issues", "err", err)
138
return
139
}
140
if len(issues) == 0 {
141
+
l.Error("no issue found for", "err", comment.IssueAt)
142
return
143
}
144
issue := issues[0]
···
179
var pullId *int64
180
181
n.notifyEvent(
182
+
ctx,
183
actorDid,
184
recipients,
185
models.NotificationTypeIssueCommented,
···
190
pullId,
191
)
192
n.notifyEvent(
193
+
ctx,
194
actorDid,
195
sets.Collect(slices.Values(mentions)),
196
models.NotificationTypeUserMentioned,
···
215
var repoId, issueId, pullId *int64
216
217
n.notifyEvent(
218
+
ctx,
219
actorDid,
220
recipients,
221
eventType,
···
232
}
233
234
func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
235
+
l := log.FromContext(ctx)
236
+
237
repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
238
if err != nil {
239
+
l.Error("failed to get repos", "err", err)
240
return
241
}
242
collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
243
if err != nil {
244
+
l.Error("failed to fetch collaborators", "err", err)
245
return
246
}
247
···
263
pullId := &p
264
265
n.notifyEvent(
266
+
ctx,
267
actorDid,
268
recipients,
269
eventType,
···
276
}
277
278
func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) {
279
+
l := log.FromContext(ctx)
280
+
281
pull, err := db.GetPull(n.db,
282
syntax.ATURI(comment.RepoAt),
283
comment.PullId,
284
)
285
if err != nil {
286
+
l.Error("failed to get pulls", "err", err)
287
return
288
}
289
290
repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", comment.RepoAt))
291
if err != nil {
292
+
l.Error("failed to get repos", "err", err)
293
return
294
}
295
···
315
pullId := &p
316
317
n.notifyEvent(
318
+
ctx,
319
actorDid,
320
recipients,
321
eventType,
···
326
pullId,
327
)
328
n.notifyEvent(
329
+
ctx,
330
actorDid,
331
sets.Collect(slices.Values(mentions)),
332
models.NotificationTypeUserMentioned,
···
355
}
356
357
func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
358
+
l := log.FromContext(ctx)
359
+
360
collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
361
if err != nil {
362
+
l.Error("failed to fetch collaborators", "err", err)
363
return
364
}
365
···
389
}
390
391
n.notifyEvent(
392
+
ctx,
393
actor,
394
recipients,
395
eventType,
···
402
}
403
404
func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
405
+
l := log.FromContext(ctx)
406
+
407
// Get repo details
408
repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
409
if err != nil {
410
+
l.Error("failed to get repos", "err", err)
411
return
412
}
413
414
collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
415
if err != nil {
416
+
l.Error("failed to fetch collaborators", "err", err)
417
return
418
}
419
···
441
case models.PullMerged:
442
eventType = models.NotificationTypePullMerged
443
default:
444
+
l.Error("unexpected new PR state", "state", pull.State)
445
return
446
}
447
p := int64(pull.ID)
448
pullId := &p
449
450
n.notifyEvent(
451
+
ctx,
452
actor,
453
recipients,
454
eventType,
···
461
}
462
463
func (n *databaseNotifier) notifyEvent(
464
+
ctx context.Context,
465
actorDid syntax.DID,
466
recipients sets.Set[syntax.DID],
467
eventType models.NotificationType,
···
471
issueId *int64,
472
pullId *int64,
473
) {
474
+
l := log.FromContext(ctx)
475
+
476
// if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody
477
if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions {
478
return
···
522
}
523
524
if err := db.CreateNotification(tx, notif); err != nil {
525
+
l.Error("failed to create notification", "recipientDid", recipientDid, "err", err)
526
}
527
}
528
+105
appview/notify/logging_notifier.go
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
package notify
2
+
3
+
import (
4
+
"context"
5
+
"log/slog"
6
+
7
+
"tangled.org/core/appview/models"
8
+
tlog "tangled.org/core/log"
9
+
10
+
"github.com/bluesky-social/indigo/atproto/syntax"
11
+
)
12
+
13
+
type loggingNotifier struct {
14
+
inner Notifier
15
+
logger *slog.Logger
16
+
}
17
+
18
+
func NewLoggingNotifier(inner Notifier, logger *slog.Logger) Notifier {
19
+
return &loggingNotifier{
20
+
inner,
21
+
logger,
22
+
}
23
+
}
24
+
25
+
var _ Notifier = &loggingNotifier{}
26
+
27
+
func (l *loggingNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
28
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewRepo"))
29
+
l.inner.NewRepo(ctx, repo)
30
+
}
31
+
32
+
func (l *loggingNotifier) NewStar(ctx context.Context, star *models.Star) {
33
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewStar"))
34
+
l.inner.NewStar(ctx, star)
35
+
}
36
+
37
+
func (l *loggingNotifier) DeleteStar(ctx context.Context, star *models.Star) {
38
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "DeleteStar"))
39
+
l.inner.DeleteStar(ctx, star)
40
+
}
41
+
42
+
func (l *loggingNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
43
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewIssue"))
44
+
l.inner.NewIssue(ctx, issue, mentions)
45
+
}
46
+
47
+
func (l *loggingNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
48
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewIssueComment"))
49
+
l.inner.NewIssueComment(ctx, comment, mentions)
50
+
}
51
+
52
+
func (l *loggingNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
53
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewIssueState"))
54
+
l.inner.NewIssueState(ctx, actor, issue)
55
+
}
56
+
57
+
func (l *loggingNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
58
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "DeleteIssue"))
59
+
l.inner.DeleteIssue(ctx, issue)
60
+
}
61
+
62
+
func (l *loggingNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
63
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewFollow"))
64
+
l.inner.NewFollow(ctx, follow)
65
+
}
66
+
67
+
func (l *loggingNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
68
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "DeleteFollow"))
69
+
l.inner.DeleteFollow(ctx, follow)
70
+
}
71
+
72
+
func (l *loggingNotifier) NewPull(ctx context.Context, pull *models.Pull) {
73
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewPull"))
74
+
l.inner.NewPull(ctx, pull)
75
+
}
76
+
77
+
func (l *loggingNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) {
78
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewPullComment"))
79
+
l.inner.NewPullComment(ctx, comment, mentions)
80
+
}
81
+
82
+
func (l *loggingNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
83
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewPullState"))
84
+
l.inner.NewPullState(ctx, actor, pull)
85
+
}
86
+
87
+
func (l *loggingNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
88
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "UpdateProfile"))
89
+
l.inner.UpdateProfile(ctx, profile)
90
+
}
91
+
92
+
func (l *loggingNotifier) NewString(ctx context.Context, s *models.String) {
93
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewString"))
94
+
l.inner.NewString(ctx, s)
95
+
}
96
+
97
+
func (l *loggingNotifier) EditString(ctx context.Context, s *models.String) {
98
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "EditString"))
99
+
l.inner.EditString(ctx, s)
100
+
}
101
+
102
+
func (l *loggingNotifier) DeleteString(ctx context.Context, did, rkey string) {
103
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "DeleteString"))
104
+
l.inner.DeleteString(ctx, did, rkey)
105
+
}
+2
-1
appview/state/state.go
···
173
notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
174
}
175
notifiers = append(notifiers, indexer)
176
-
notifier := notify.NewMergedNotifier(notifiers, tlog.SubLogger(logger, "notify"))
0
177
178
state := &State{
179
d,
···
173
notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
174
}
175
notifiers = append(notifiers, indexer)
176
+
notifier := notify.NewMergedNotifier(notifiers)
177
+
notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
178
179
state := &State{
180
d,