Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "hash/fnv"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/api/bsky"
12 lexutil "github.com/bluesky-social/indigo/lex/util"
13 "github.com/rivo/uniseg"
14 "gorm.io/gorm"
15 "stream.place/streamplace/pkg/streamplace"
16)
17
18type ChatMessage struct {
19 CID string `json:"cid" gorm:"primaryKey;column:cid"`
20 URI string `json:"uri" gorm:"column:uri"`
21 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_recent_messages,priority:2"`
22 ChatMessage *[]byte `json:"chatMessage" gorm:"column:chat_message"`
23 RepoDID string `json:"repoDID" gorm:"column:repo_did"`
24 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
25 ChatProfile *ChatProfile `json:"chatProfile,omitempty" gorm:"foreignKey:RepoDID;references:RepoDID"`
26 IndexedAt *time.Time `json:"indexedAt,omitempty" gorm:"column:indexed_at"`
27 StreamerRepoDID string `json:"streamerRepoDID" gorm:"column:streamer_repo_did;idx_recent_messages,priority:1"`
28 StreamerRepo *Repo `json:"streamerRepo,omitempty" gorm:"foreignKey:DID;references:StreamerRepoDID"`
29 ReplyToCID *string `json:"replyToCID,omitempty" gorm:"column:reply_to_cid"`
30 ReplyTo *ChatMessage `json:"replyTo,omitempty" gorm:"foreignKey:ReplyToCID;references:CID"`
31 DeletedAt *time.Time `json:"deletedAt,omitempty" gorm:"column:deleted_at"`
32}
33
34// hashString creates a hash from a string, used for deterministic color selection
35func hashString(s string) int {
36 h := fnv.New32a()
37 h.Write([]byte(s))
38 return int(h.Sum32())
39}
40
41func (m *ChatMessage) ToStreamplaceMessageView() (*streamplace.ChatDefs_MessageView, error) {
42 rec, err := lexutil.CborDecodeValue(*m.ChatMessage)
43 if err != nil {
44 return nil, fmt.Errorf("error decoding feed post: %w", err)
45 }
46 // Truncate message text if it is a ChatMessage
47 if msg, ok := rec.(*streamplace.ChatMessage); ok {
48 graphemeCount := uniseg.GraphemeClusterCount(msg.Text)
49 if graphemeCount > 300 {
50 gr := uniseg.NewGraphemes(msg.Text)
51 var result strings.Builder
52 for count := 0; count < 300 && gr.Next(); count++ {
53 result.WriteString(gr.Str())
54 }
55 msg.Text = result.String()
56 }
57 }
58
59 message := &streamplace.ChatDefs_MessageView{
60 LexiconTypeID: "place.stream.chat.defs#messageView",
61 }
62 message.Uri = m.URI
63 message.Cid = m.CID
64 message.Author = &bsky.ActorDefs_ProfileViewBasic{
65 Did: m.RepoDID,
66 }
67 if m.Repo != nil {
68 message.Author.Handle = m.Repo.Handle
69 }
70 message.Record = &lexutil.LexiconTypeDecoder{Val: rec}
71 message.IndexedAt = m.IndexedAt.UTC().Format(time.RFC3339Nano)
72 if m.ChatProfile != nil {
73 scp, err := m.ChatProfile.ToStreamplaceChatProfile()
74 if err != nil {
75 return nil, fmt.Errorf("error converting chat profile to streamplace chat profile: %w", err)
76 }
77 message.ChatProfile = scp
78 } else {
79 // If no chat profile exists, create a default one with a color based on the user's DID
80 defaultColor := DefaultColors[hashString(m.RepoDID)%len(DefaultColors)]
81 message.ChatProfile = &streamplace.ChatProfile{
82 Color: defaultColor,
83 }
84
85 }
86 if m.ReplyTo != nil {
87 replyTo, err := m.ReplyTo.ToStreamplaceMessageView()
88 if err != nil {
89 return nil, fmt.Errorf("error converting reply to to streamplace message view: %w", err)
90 }
91 message.ReplyTo = &streamplace.ChatDefs_MessageView_ReplyTo{
92 ChatDefs_MessageView: replyTo,
93 }
94 }
95 return message, nil
96}
97
98func (m *DBModel) CreateChatMessage(ctx context.Context, message *ChatMessage) error {
99 return m.DB.Create(message).Error
100}
101
102func (m *DBModel) DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error {
103 tx := m.DB.Model(&ChatMessage{}).Where("uri = ?", uri).Update("deleted_at", deletedAt)
104 if tx.Error != nil {
105 return tx.Error
106 }
107 if tx.RowsAffected == 0 {
108 return fmt.Errorf("no chat message found for uri: %s", uri)
109 }
110 return nil
111}
112
113func (m *DBModel) GetChatMessage(uri string) (*ChatMessage, error) {
114 var message ChatMessage
115 err := m.DB.
116 Preload("Repo").
117 Preload("ChatProfile").
118 Preload("ReplyTo").
119 Preload("ReplyTo.Repo").
120 Preload("ReplyTo.ChatProfile").
121 Where("uri = ?", uri).
122 Where("deleted_at IS NULL").
123 First(&message).
124 Error
125 if errors.Is(err, gorm.ErrRecordNotFound) {
126 return nil, nil
127 }
128 if err != nil {
129 return nil, fmt.Errorf("error retrieving chat message: %w", err)
130 }
131 return &message, nil
132}
133
134func (m *DBModel) MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) {
135 dbmessages := []ChatMessage{}
136 err := m.DB.
137 Preload("Repo").
138 Preload("ChatProfile").
139 Preload("ReplyTo").
140 Preload("ReplyTo.Repo").
141 Preload("ReplyTo.ChatProfile").
142 Where("streamer_repo_did = ?", repoDID).
143 // Exclude messages from users blocked by the streamer
144 Joins("LEFT JOIN blocks ON blocks.repo_did = chat_messages.streamer_repo_did AND blocks.subject_did = chat_messages.repo_did").
145 Where("blocks.rkey IS NULL"). // Only include messages where no block exists
146 // Exclude gated messages
147 Joins("LEFT JOIN gates ON gates.repo_did = chat_messages.streamer_repo_did AND gates.hidden_message = chat_messages.uri").
148 Where("gates.hidden_message IS NULL"). // Only include messages where no gate exists
149 // Exclude labeled messages
150 Joins("LEFT JOIN labels ON labels.uri = chat_messages.uri").
151 Where("labels.uri IS NULL"). // Only include messages where no label exists
152 // Exclude deleted messages
153 Where("chat_messages.deleted_at IS NULL").
154 Limit(100).
155 Order("chat_messages.created_at DESC").
156 Find(&dbmessages).Error
157 if err != nil {
158 return nil, fmt.Errorf("error retrieving replies: %w", err)
159 }
160 spmessages := []*streamplace.ChatDefs_MessageView{}
161 for _, m := range dbmessages {
162 spmessage, err := m.ToStreamplaceMessageView()
163 if err != nil {
164 return nil, fmt.Errorf("error converting feed post to bsky post view: %w", err)
165 }
166 spmessages = append(spmessages, spmessage)
167 }
168 return spmessages, nil
169}