fork
Configure Feed
Select the types of activity you want to include in your feed.
Live video on the AT Protocol
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package model
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "hash/fnv"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 lexutil "github.com/bluesky-social/indigo/lex/util"
12 "gorm.io/gorm"
13 "stream.place/streamplace/pkg/streamplace"
14)
15
16type ChatMessage struct {
17 CID string `json:"cid" gorm:"primaryKey;column:cid"`
18 URI string `json:"uri" gorm:"column:uri"`
19 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_recent_messages,priority:2"`
20 ChatMessage *[]byte `json:"chatMessage" gorm:"column:chat_message"`
21 RepoDID string `json:"repoDID" gorm:"column:repo_did"`
22 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
23 ChatProfile *ChatProfile `json:"chatProfile,omitempty" gorm:"foreignKey:RepoDID;references:RepoDID"`
24 IndexedAt *time.Time `json:"indexedAt,omitempty" gorm:"column:indexed_at"`
25 StreamerRepoDID string `json:"streamerRepoDID" gorm:"column:streamer_repo_did;idx_recent_messages,priority:1"`
26 StreamerRepo *Repo `json:"streamerRepo,omitempty" gorm:"foreignKey:DID;references:StreamerRepoDID"`
27 ReplyToCID *string `json:"replyToCID,omitempty" gorm:"column:reply_to_cid"`
28 ReplyTo *ChatMessage `json:"replyTo,omitempty" gorm:"foreignKey:ReplyToCID;references:CID"`
29 DeletedAt *time.Time `json:"deletedAt,omitempty" gorm:"column:deleted_at"`
30}
31
32// hashString creates a hash from a string, used for deterministic color selection
33func hashString(s string) int {
34 h := fnv.New32a()
35 h.Write([]byte(s))
36 return int(h.Sum32())
37}
38
39func (m *ChatMessage) ToStreamplaceMessageView() (*streamplace.ChatDefs_MessageView, error) {
40 rec, err := lexutil.CborDecodeValue(*m.ChatMessage)
41 if err != nil {
42 return nil, fmt.Errorf("error decoding feed post: %w", err)
43 }
44 message := &streamplace.ChatDefs_MessageView{
45 LexiconTypeID: "place.stream.chat.defs#messageView",
46 }
47 message.Uri = m.URI
48 message.Cid = m.CID
49 message.Author = &bsky.ActorDefs_ProfileViewBasic{
50 Did: m.RepoDID,
51 }
52 if m.Repo != nil {
53 message.Author.Handle = m.Repo.Handle
54 }
55 message.Record = &lexutil.LexiconTypeDecoder{Val: rec}
56 message.IndexedAt = m.IndexedAt.UTC().Format(time.RFC3339Nano)
57 if m.ChatProfile != nil {
58 scp, err := m.ChatProfile.ToStreamplaceChatProfile()
59 if err != nil {
60 return nil, fmt.Errorf("error converting chat profile to streamplace chat profile: %w", err)
61 }
62 message.ChatProfile = scp
63 } else {
64 // If no chat profile exists, create a default one with a color based on the user's DID
65 defaultColor := DefaultColors[hashString(m.RepoDID)%len(DefaultColors)]
66 message.ChatProfile = &streamplace.ChatProfile{
67 Color: defaultColor,
68 }
69
70 }
71 if m.ReplyTo != nil {
72 replyTo, err := m.ReplyTo.ToStreamplaceMessageView()
73 if err != nil {
74 return nil, fmt.Errorf("error converting reply to to streamplace message view: %w", err)
75 }
76 message.ReplyTo = &streamplace.ChatDefs_MessageView_ReplyTo{
77 ChatDefs_MessageView: replyTo,
78 }
79 }
80 return message, nil
81}
82
83func (m *DBModel) CreateChatMessage(ctx context.Context, message *ChatMessage) error {
84 return m.DB.Create(message).Error
85}
86
87func (m *DBModel) DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error {
88 tx := m.DB.Model(&ChatMessage{}).Where("uri = ?", uri).Update("deleted_at", deletedAt)
89 if tx.Error != nil {
90 return tx.Error
91 }
92 if tx.RowsAffected == 0 {
93 return fmt.Errorf("no chat message found for uri: %s", uri)
94 }
95 return nil
96}
97
98func (m *DBModel) GetChatMessage(uri string) (*ChatMessage, error) {
99 var message ChatMessage
100 err := m.DB.
101 Preload("Repo").
102 Preload("ChatProfile").
103 Preload("ReplyTo").
104 Preload("ReplyTo.Repo").
105 Preload("ReplyTo.ChatProfile").
106 Where("uri = ?", uri).
107 Where("deleted_at IS NULL").
108 First(&message).
109 Error
110 if errors.Is(err, gorm.ErrRecordNotFound) {
111 return nil, nil
112 }
113 if err != nil {
114 return nil, fmt.Errorf("error retrieving chat message: %w", err)
115 }
116 return &message, nil
117}
118
119func (m *DBModel) MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) {
120 dbmessages := []ChatMessage{}
121 err := m.DB.
122 Preload("Repo").
123 Preload("ChatProfile").
124 Preload("ReplyTo").
125 Preload("ReplyTo.Repo").
126 Preload("ReplyTo.ChatProfile").
127 Where("streamer_repo_did = ?", repoDID).
128 // Exclude messages from users blocked by the streamer
129 Joins("LEFT JOIN blocks ON blocks.repo_did = chat_messages.streamer_repo_did AND blocks.subject_did = chat_messages.repo_did").
130 Where("blocks.rkey IS NULL"). // Only include messages where no block exists
131 // Exclude gated messages
132 Joins("LEFT JOIN gates ON gates.repo_did = chat_messages.streamer_repo_did AND gates.hidden_message = chat_messages.uri").
133 Where("gates.hidden_message IS NULL"). // Only include messages where no gate exists
134 // Exclude labeled messages
135 Joins("LEFT JOIN labels ON labels.uri = chat_messages.uri").
136 Where("labels.uri IS NULL"). // Only include messages where no label exists
137 // Exclude deleted messages
138 Where("chat_messages.deleted_at IS NULL").
139 Limit(100).
140 Order("chat_messages.created_at DESC").
141 Find(&dbmessages).Error
142 if err != nil {
143 return nil, fmt.Errorf("error retrieving replies: %w", err)
144 }
145 spmessages := []*streamplace.ChatDefs_MessageView{}
146 for _, m := range dbmessages {
147 spmessage, err := m.ToStreamplaceMessageView()
148 if err != nil {
149 return nil, fmt.Errorf("error converting feed post to bsky post view: %w", err)
150 }
151 spmessages = append(spmessages, spmessage)
152 }
153 return spmessages, nil
154}