Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "hash/fnv"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 lexutil "github.com/bluesky-social/indigo/lex/util"
12 "gorm.io/gorm"
13 "stream.place/streamplace/pkg/streamplace"
14)
15
16type ChatMessage struct {
17 CID string `json:"cid" gorm:"primaryKey;column:cid"`
18 URI string `json:"uri" gorm:"column:uri"`
19 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_recent_messages,priority:2"`
20 ChatMessage *[]byte `json:"chatMessage" gorm:"column:chat_message"`
21 RepoDID string `json:"repoDID" gorm:"column:repo_did"`
22 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
23 ChatProfile *ChatProfile `json:"chatProfile,omitempty" gorm:"foreignKey:RepoDID;references:RepoDID"`
24 IndexedAt *time.Time `json:"indexedAt,omitempty" gorm:"column:indexed_at"`
25 StreamerRepoDID string `json:"streamerRepoDID" gorm:"column:streamer_repo_did;idx_recent_messages,priority:1"`
26 StreamerRepo *Repo `json:"streamerRepo,omitempty" gorm:"foreignKey:DID;references:StreamerRepoDID"`
27 ReplyToCID *string `json:"replyToCID,omitempty" gorm:"column:reply_to_cid"`
28 ReplyTo *ChatMessage `json:"replyTo,omitempty" gorm:"foreignKey:ReplyToCID;references:CID"`
29 DeletedAt *time.Time `json:"deletedAt,omitempty" gorm:"column:deleted_at"`
30}
31
32// hashString creates a hash from a string, used for deterministic color selection
33func hashString(s string) int {
34 h := fnv.New32a()
35 h.Write([]byte(s))
36 return int(h.Sum32())
37}
38
39func (m *ChatMessage) ToStreamplaceMessageView() (*streamplace.ChatDefs_MessageView, error) {
40 rec, err := lexutil.CborDecodeValue(*m.ChatMessage)
41 if err != nil {
42 return nil, fmt.Errorf("error decoding feed post: %w", err)
43 }
44 message := &streamplace.ChatDefs_MessageView{
45 LexiconTypeID: "place.stream.chat.defs#messageView",
46 }
47 message.Uri = m.URI
48 message.Cid = m.CID
49 message.Author = &bsky.ActorDefs_ProfileViewBasic{
50 Did: m.RepoDID,
51 }
52 if m.Repo != nil {
53 message.Author.Handle = m.Repo.Handle
54 }
55 message.Record = &lexutil.LexiconTypeDecoder{Val: rec}
56 message.IndexedAt = m.IndexedAt.UTC().Format(time.RFC3339Nano)
57 if m.ChatProfile != nil {
58 scp, err := m.ChatProfile.ToStreamplaceChatProfile()
59 if err != nil {
60 return nil, fmt.Errorf("error converting chat profile to streamplace chat profile: %w", err)
61 }
62 message.ChatProfile = scp
63 } else {
64 // If no chat profile exists, create a default one with a color based on the user's DID
65 defaultColor := DefaultColors[hashString(m.RepoDID)%len(DefaultColors)]
66 message.ChatProfile = &streamplace.ChatProfile{
67 Color: defaultColor,
68 }
69
70 }
71 if m.ReplyTo != nil {
72 replyTo, err := m.ReplyTo.ToStreamplaceMessageView()
73 if err != nil {
74 return nil, fmt.Errorf("error converting reply to to streamplace message view: %w", err)
75 }
76 message.ReplyTo = &streamplace.ChatDefs_MessageView_ReplyTo{
77 ChatDefs_MessageView: replyTo,
78 }
79 }
80 return message, nil
81}
82
83func (m *DBModel) CreateChatMessage(ctx context.Context, message *ChatMessage) error {
84 return m.DB.Create(message).Error
85}
86
87func (m *DBModel) DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error {
88 tx := m.DB.Model(&ChatMessage{}).Where("uri = ?", uri).Update("deleted_at", deletedAt)
89 if tx.Error != nil {
90 return tx.Error
91 }
92 if tx.RowsAffected == 0 {
93 return fmt.Errorf("no chat message found for uri: %s", uri)
94 }
95 return nil
96}
97
98func (m *DBModel) GetChatMessage(uri string) (*ChatMessage, error) {
99 var message ChatMessage
100 err := m.DB.
101 Preload("Repo").
102 Preload("ChatProfile").
103 Preload("ReplyTo").
104 Preload("ReplyTo.Repo").
105 Preload("ReplyTo.ChatProfile").
106 Where("uri = ?", uri).
107 Where("deleted_at IS NULL").
108 First(&message).
109 Error
110 if errors.Is(err, gorm.ErrRecordNotFound) {
111 return nil, nil
112 }
113 if err != nil {
114 return nil, fmt.Errorf("error retrieving chat message: %w", err)
115 }
116 return &message, nil
117}
118
119func (m *DBModel) MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) {
120 dbmessages := []ChatMessage{}
121 err := m.DB.
122 Preload("Repo").
123 Preload("ChatProfile").
124 Preload("ReplyTo").
125 Preload("ReplyTo.Repo").
126 Preload("ReplyTo.ChatProfile").
127 Where("streamer_repo_did = ?", repoDID).
128 // Exclude messages from users blocked by the streamer
129 Joins("LEFT JOIN blocks ON blocks.repo_did = chat_messages.streamer_repo_did AND blocks.subject_did = chat_messages.repo_did").
130 Where("blocks.rkey IS NULL"). // Only include messages where no block exists
131 // Exclude gated messages
132 Joins("LEFT JOIN gates ON gates.repo_did = chat_messages.streamer_repo_did AND gates.hidden_message = chat_messages.uri").
133 Where("gates.hidden_message IS NULL"). // Only include messages where no gate exists
134 // Exclude labeled messages
135 Joins("LEFT JOIN labels ON labels.uri = chat_messages.uri").
136 Where("labels.uri IS NULL"). // Only include messages where no label exists
137 // Exclude deleted messages
138 Where("chat_messages.deleted_at IS NULL").
139 Limit(100).
140 Order("chat_messages.created_at DESC").
141 Find(&dbmessages).Error
142 if err != nil {
143 return nil, fmt.Errorf("error retrieving replies: %w", err)
144 }
145 spmessages := []*streamplace.ChatDefs_MessageView{}
146 for _, m := range dbmessages {
147 spmessage, err := m.ToStreamplaceMessageView()
148 if err != nil {
149 return nil, fmt.Errorf("error converting feed post to bsky post view: %w", err)
150 }
151 spmessages = append(spmessages, spmessage)
152 }
153 return spmessages, nil
154}