Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "hash/fnv"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 lexutil "github.com/bluesky-social/indigo/lex/util"
12 "gorm.io/gorm"
13 "stream.place/streamplace/pkg/streamplace"
14)
15
16type ChatMessage struct {
17 CID string `json:"cid" gorm:"primaryKey;column:cid"`
18 URI string `json:"uri" gorm:"column:uri"`
19 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_recent_messages,priority:2"`
20 ChatMessage *[]byte `json:"chatMessage" gorm:"column:chat_message"`
21 RepoDID string `json:"repoDID" gorm:"column:repo_did"`
22 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
23 ChatProfile *ChatProfile `json:"chatProfile,omitempty" gorm:"foreignKey:RepoDID;references:RepoDID"`
24 IndexedAt *time.Time `json:"indexedAt,omitempty" gorm:"column:indexed_at"`
25 StreamerRepoDID string `json:"streamerRepoDID" gorm:"column:streamer_repo_did;idx_recent_messages,priority:1"`
26 StreamerRepo *Repo `json:"streamerRepo,omitempty" gorm:"foreignKey:DID;references:StreamerRepoDID"`
27 ReplyToCID *string `json:"replyToCID,omitempty" gorm:"column:reply_to_cid"`
28 ReplyTo *ChatMessage `json:"replyTo,omitempty" gorm:"foreignKey:ReplyToCID;references:CID"`
29}
30
31// hashString creates a hash from a string, used for deterministic color selection
32func hashString(s string) int {
33 h := fnv.New32a()
34 h.Write([]byte(s))
35 return int(h.Sum32())
36}
37
38func (m *ChatMessage) ToStreamplaceMessageView() (*streamplace.ChatDefs_MessageView, error) {
39 rec, err := lexutil.CborDecodeValue(*m.ChatMessage)
40 if err != nil {
41 return nil, fmt.Errorf("error decoding feed post: %w", err)
42 }
43 message := &streamplace.ChatDefs_MessageView{
44 LexiconTypeID: "place.stream.chat.defs#messageView",
45 }
46 message.Uri = m.URI
47 message.Cid = m.CID
48 message.Author = &bsky.ActorDefs_ProfileViewBasic{
49 Did: m.RepoDID,
50 }
51 if m.Repo != nil {
52 message.Author.Handle = m.Repo.Handle
53 }
54 message.Record = &lexutil.LexiconTypeDecoder{Val: rec}
55 message.IndexedAt = m.IndexedAt.UTC().Format(time.RFC3339Nano)
56 if m.ChatProfile != nil {
57 scp, err := m.ChatProfile.ToStreamplaceChatProfile()
58 if err != nil {
59 return nil, fmt.Errorf("error converting chat profile to streamplace chat profile: %w", err)
60 }
61 message.ChatProfile = scp
62 } else {
63 // If no chat profile exists, create a default one with a color based on the user's DID
64 defaultColor := defaultColors[hashString(m.RepoDID)%len(defaultColors)]
65 message.ChatProfile = &streamplace.ChatProfile{
66 Color: defaultColor,
67 }
68
69 }
70 if m.ReplyTo != nil {
71 replyTo, err := m.ReplyTo.ToStreamplaceMessageView()
72 if err != nil {
73 return nil, fmt.Errorf("error converting reply to to streamplace message view: %w", err)
74 }
75 message.ReplyTo = &streamplace.ChatDefs_MessageView_ReplyTo{
76 ChatDefs_MessageView: replyTo,
77 }
78 }
79 return message, nil
80}
81
82func (m *DBModel) CreateChatMessage(ctx context.Context, message *ChatMessage) error {
83 return m.DB.Create(message).Error
84}
85
86func (m *DBModel) GetChatMessage(cid string) (*ChatMessage, error) {
87 var message ChatMessage
88 err := m.DB.
89 Preload("Repo").
90 Preload("ChatProfile").
91 Preload("ReplyTo").
92 Preload("ReplyTo.Repo").
93 Preload("ReplyTo.ChatProfile").
94 Where("cid = ?", cid).
95 First(&message).
96 Error
97 if errors.Is(err, gorm.ErrRecordNotFound) {
98 return nil, nil
99 }
100 if err != nil {
101 return nil, fmt.Errorf("error retrieving chat message: %w", err)
102 }
103 return &message, nil
104}
105
106func (m *DBModel) MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) {
107 dbmessages := []ChatMessage{}
108 err := m.DB.
109 Preload("Repo").
110 Preload("ChatProfile").
111 Preload("ReplyTo").
112 Preload("ReplyTo.Repo").
113 Preload("ReplyTo.ChatProfile").
114 Where("streamer_repo_did = ?", repoDID).
115 // Exclude messages from users blocked by the streamer
116 Joins("LEFT JOIN blocks ON blocks.repo_did = chat_messages.streamer_repo_did AND blocks.subject_did = chat_messages.repo_did").
117 Where("blocks.rkey IS NULL"). // Only include messages where no block exists
118 Limit(100).
119 Order("chat_messages.created_at DESC").
120 Find(&dbmessages).Error
121 if err != nil {
122 return nil, fmt.Errorf("error retrieving replies: %w", err)
123 }
124 spmessages := []*streamplace.ChatDefs_MessageView{}
125 for _, m := range dbmessages {
126 spmessage, err := m.ToStreamplaceMessageView()
127 if err != nil {
128 return nil, fmt.Errorf("error converting feed post to bsky post view: %w", err)
129 }
130 spmessages = append(spmessages, spmessage)
131 }
132 return spmessages, nil
133}