···11+# dev-env
22+33+Borrowed directly from
44+[atcute](https://github.com/mary-ext/atcute/tree/trunk/packages/internal/dev-env),
55+MIT licensed. Used primarily as a CI environment.
···11+export * from "./constants.js";
22+export * from "./network.js";
33+export * from "./pds.js";
44+export * from "./plc.js";
55+export * from "./utils.js";
···11-package resync
22-33-import (
44- "context"
55- "fmt"
66- "time"
77-88- "golang.org/x/sync/errgroup"
99- "stream.place/streamplace/pkg/atproto"
1010- "stream.place/streamplace/pkg/bus"
1111- "stream.place/streamplace/pkg/config"
1212- "stream.place/streamplace/pkg/log"
1313- "stream.place/streamplace/pkg/model"
1414-)
1515-1616-// resync a fresh database from the PDSses, copying over the few pieces of local state
1717-// that we have
1818-func Resync(ctx context.Context, cli *config.CLI) error {
1919- oldMod, err := model.MakeDB(cli.IndexDBPath)
2020- if err != nil {
2121- return err
2222- }
2323- tempDBPath := cli.IndexDBPath + ".temp." + fmt.Sprintf("%d", time.Now().UnixNano())
2424- newMod, err := model.MakeDB(tempDBPath)
2525- if err != nil {
2626- return err
2727- }
2828- repos, err := oldMod.GetAllRepos()
2929- if err != nil {
3030- return err
3131- }
3232-3333- atsync := &atproto.ATProtoSynchronizer{
3434- CLI: cli,
3535- Model: newMod,
3636- StatefulDB: nil, // TODO: Add StatefulDB for resync when migration is ready
3737- Noter: nil,
3838- Bus: bus.NewBus(),
3939- }
4040-4141- doneMap := make(map[string]bool)
4242-4343- g, ctx := errgroup.WithContext(ctx)
4444-4545- doneChan := make(chan string)
4646- go func() {
4747- for {
4848- select {
4949- case <-ctx.Done():
5050- return
5151- case did := <-doneChan:
5252- doneMap[did] = true
5353- case <-time.After(10 * time.Second):
5454- for _, repo := range repos {
5555- if !doneMap[repo.DID] {
5656- log.Warn(ctx, "remaining repos to sync", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS)
5757- }
5858- }
5959- }
6060- }
6161- }()
6262-6363- for _, repo := range repos {
6464- repo := repo // capture range variable
6565- doneMap[repo.DID] = false
6666- g.Go(func() error {
6767- log.Warn(ctx, "syncing repo", "did", repo.DID, "handle", repo.Handle)
6868- ctx := log.WithLogValues(ctx, "resyncDID", repo.DID, "resyncHandle", repo.Handle)
6969- _, err := atsync.SyncBlueskyRepoCached(ctx, repo.Handle, newMod)
7070- if err != nil {
7171- log.Error(ctx, "failed to sync repo", "did", repo.DID, "handle", repo.Handle, "err", err)
7272- return nil
7373- }
7474- log.Log(ctx, "synced repo", "did", repo.DID, "handle", repo.Handle)
7575- doneChan <- repo.DID
7676- return nil
7777- })
7878- }
7979-8080- if err := g.Wait(); err != nil {
8181- return err
8282- }
8383-8484- // TODO: Update OAuth session migration to use new statefulDB
8585- // oauthSessions, err := oldMod.ListOAuthSessions()
8686- // if err != nil {
8787- // return err
8888- // }
8989- // for _, session := range oauthSessions {
9090- // err := newMod.CreateOAuthSession(session.DownstreamDPoPJKT, &session)
9191- // if err != nil {
9292- // return fmt.Errorf("failed to create oauth session: %w", err)
9393- // }
9494- // }
9595- // log.Log(ctx, "migrated oauth sessions", "count", len(oauthSessions))
9696-9797- // TODO: Update notification migration to use new statefulDB
9898- // notificationTokens, err := oldMod.ListNotifications()
9999- // if err != nil {
100100- // return err
101101- // }
102102- // for _, token := range notificationTokens {
103103- // err := newMod.CreateNotification(token.Token, token.RepoDID)
104104- // if err != nil {
105105- // return fmt.Errorf("failed to create notification: %w", err)
106106- // }
107107- // }
108108- // log.Log(ctx, "migrated notification tokens", "count", len(notificationTokens))
109109-110110- log.Log(ctx, "resync complete!", "newDBPath", tempDBPath)
111111-112112- return nil
113113-}
+1-1
pkg/spxrpc/com_atproto_moderation.go
···6565 did = aturi.Authority().String()
6666 // if it's chat, we want the clip from the streamer, not from the chatter
6767 if aturi.Collection() == "place.stream.chat.message" {
6868- msg, err := s.model.GetChatMessage(body.Subject.RepoStrongRef.Cid)
6868+ msg, err := s.model.GetChatMessage(body.Subject.RepoStrongRef.Uri)
6969 if err != nil {
7070 log.Error(ctx, "failed to get chat message for chat report", "error", err)
7171 } else {