Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "hash/fnv"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/api/bsky"
12 lexutil "github.com/bluesky-social/indigo/lex/util"
13 "github.com/rivo/uniseg"
14 "gorm.io/gorm"
15 "stream.place/streamplace/pkg/streamplace"
16)
17
18type ChatMessage struct {
19 CID string `json:"cid" gorm:"primaryKey;column:cid"`
20 URI string `json:"uri" gorm:"column:uri"`
21 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_recent_messages,priority:2"`
22 ChatMessage *[]byte `json:"chatMessage" gorm:"column:chat_message"`
23 RepoDID string `json:"repoDID" gorm:"column:repo_did"`
24 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
25 ChatProfile *ChatProfile `json:"chatProfile,omitempty" gorm:"foreignKey:RepoDID;references:RepoDID"`
26 IndexedAt *time.Time `json:"indexedAt,omitempty" gorm:"column:indexed_at"`
27 StreamerRepoDID string `json:"streamerRepoDID" gorm:"column:streamer_repo_did;idx_recent_messages,priority:1"`
28 StreamerRepo *Repo `json:"streamerRepo,omitempty" gorm:"foreignKey:DID;references:StreamerRepoDID"`
29 ReplyToCID *string `json:"replyToCID,omitempty" gorm:"column:reply_to_cid"`
30 ReplyTo *ChatMessage `json:"replyTo,omitempty" gorm:"foreignKey:ReplyToCID;references:CID"`
31 DeletedAt *time.Time `json:"deletedAt,omitempty" gorm:"column:deleted_at"`
32}
33
34// hashString creates a hash from a string, used for deterministic color selection
35func hashString(s string) int {
36 h := fnv.New32a()
37 h.Write([]byte(s))
38 return int(h.Sum32())
39}
40
41func (m *ChatMessage) ToStreamplaceMessageView() (*streamplace.ChatDefs_MessageView, error) {
42 rec, err := lexutil.CborDecodeValue(*m.ChatMessage)
43 if err != nil {
44 return nil, fmt.Errorf("error decoding feed post: %w", err)
45 }
46 // Truncate message text if it is a ChatMessage
47 if msg, ok := rec.(*streamplace.ChatMessage); ok {
48 graphemeCount := uniseg.GraphemeClusterCount(msg.Text)
49 if graphemeCount > 300 {
50 gr := uniseg.NewGraphemes(msg.Text)
51 var result strings.Builder
52 for count := 0; count < 300 && gr.Next(); count++ {
53 result.WriteString(gr.Str())
54 }
55 msg.Text = result.String()
56 }
57 }
58
59 message := &streamplace.ChatDefs_MessageView{
60 LexiconTypeID: "place.stream.chat.defs#messageView",
61 }
62 message.Uri = m.URI
63 message.Cid = m.CID
64 message.Author = &bsky.ActorDefs_ProfileViewBasic{
65 Did: m.RepoDID,
66 }
67 if m.Repo != nil {
68 message.Author.Handle = m.Repo.Handle
69 }
70 message.Record = &lexutil.LexiconTypeDecoder{Val: rec}
71 message.IndexedAt = m.IndexedAt.UTC().Format(time.RFC3339Nano)
72 if m.ChatProfile != nil {
73 scp, err := m.ChatProfile.ToStreamplaceChatProfile()
74 if err != nil {
75 return nil, fmt.Errorf("error converting chat profile to streamplace chat profile: %w", err)
76 }
77 message.ChatProfile = scp
78 } else {
79 // If no chat profile exists, create a default one with a color based on the user's DID
80 defaultColor := DefaultColors[hashString(m.RepoDID)%len(DefaultColors)]
81 message.ChatProfile = &streamplace.ChatProfile{
82 Color: defaultColor,
83 }
84
85 }
86 if m.ReplyTo != nil {
87 replyTo, err := m.ReplyTo.ToStreamplaceMessageView()
88 if err != nil {
89 return nil, fmt.Errorf("error converting reply to to streamplace message view: %w", err)
90 }
91 message.ReplyTo = &streamplace.ChatDefs_MessageView_ReplyTo{
92 ChatDefs_MessageView: replyTo,
93 }
94 }
95 return message, nil
96}
97
98func (m *DBModel) CreateChatMessage(ctx context.Context, message *ChatMessage) error {
99 return m.DB.Create(message).Error
100}
101
102func (m *DBModel) DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error {
103 tx := m.DB.Model(&ChatMessage{}).Where("uri = ?", uri).Update("deleted_at", deletedAt)
104 if tx.Error != nil {
105 return tx.Error
106 }
107 if tx.RowsAffected == 0 {
108 return fmt.Errorf("no chat message found for uri: %s", uri)
109 }
110 return nil
111}
112
113func (m *DBModel) GetChatMessage(uri string) (*ChatMessage, error) {
114 var message ChatMessage
115 err := m.DB.
116 Preload("Repo").
117 Preload("ChatProfile").
118 Preload("ReplyTo").
119 Preload("ReplyTo.Repo").
120 Preload("ReplyTo.ChatProfile").
121 Where("uri = ?", uri).
122 Where("deleted_at IS NULL").
123 First(&message).
124 Error
125 if errors.Is(err, gorm.ErrRecordNotFound) {
126 return nil, nil
127 }
128 if err != nil {
129 return nil, fmt.Errorf("error retrieving chat message: %w", err)
130 }
131 return &message, nil
132}
133
134func (m *DBModel) MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) {
135 return m.MostRecentChatMessagesForViewer(repoDID, "")
136}
137
138func (m *DBModel) MostRecentChatMessagesForViewer(repoDID, viewerDID string) ([]*streamplace.ChatDefs_MessageView, error) {
139 dbmessages := []ChatMessage{}
140 q := m.DB.
141 Preload("Repo").
142 Preload("ChatProfile").
143 Preload("ReplyTo").
144 Preload("ReplyTo.Repo").
145 Preload("ReplyTo.ChatProfile").
146 Where("streamer_repo_did = ?", repoDID).
147 // Exclude messages from users blocked by the streamer
148 Joins("LEFT JOIN blocks ON blocks.repo_did = chat_messages.streamer_repo_did AND blocks.subject_did = chat_messages.repo_did").
149 Where("blocks.rkey IS NULL"). // Only include messages where no block exists
150 // Exclude gated messages
151 Joins("LEFT JOIN gates ON gates.repo_did = chat_messages.streamer_repo_did AND gates.hidden_message = chat_messages.uri").
152 Where("gates.hidden_message IS NULL"). // Only include messages where no gate exists
153 // Exclude labeled messages
154 Joins("LEFT JOIN labels ON labels.uri = chat_messages.uri").
155 Where("labels.uri IS NULL"). // Only include messages where no label exists
156 // Exclude deleted messages
157 Where("chat_messages.deleted_at IS NULL")
158
159 if viewerDID != "" {
160 q = q.Joins("LEFT JOIN blocks AS viewer_blocks ON viewer_blocks.repo_did = ? AND viewer_blocks.subject_did = chat_messages.repo_did", viewerDID).
161 Where("viewer_blocks.rkey IS NULL")
162 }
163
164 err := q.Limit(100).
165 Order("chat_messages.created_at DESC").
166 Find(&dbmessages).Error
167 if err != nil {
168 return nil, fmt.Errorf("error retrieving replies: %w", err)
169 }
170 spmessages := []*streamplace.ChatDefs_MessageView{}
171 for _, m := range dbmessages {
172 spmessage, err := m.ToStreamplaceMessageView()
173 if err != nil {
174 return nil, fmt.Errorf("error converting feed post to bsky post view: %w", err)
175 }
176 spmessages = append(spmessages, spmessage)
177 }
178 return spmessages, nil
179}