Live video on the AT Protocol
at issue-784 179 lines 6.5 kB view raw
1package model 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "hash/fnv" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/api/bsky" 12 lexutil "github.com/bluesky-social/indigo/lex/util" 13 "github.com/rivo/uniseg" 14 "gorm.io/gorm" 15 "stream.place/streamplace/pkg/streamplace" 16) 17 18type ChatMessage struct { 19 CID string `json:"cid" gorm:"primaryKey;column:cid"` 20 URI string `json:"uri" gorm:"column:uri"` 21 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_recent_messages,priority:2"` 22 ChatMessage *[]byte `json:"chatMessage" gorm:"column:chat_message"` 23 RepoDID string `json:"repoDID" gorm:"column:repo_did"` 24 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 25 ChatProfile *ChatProfile `json:"chatProfile,omitempty" gorm:"foreignKey:RepoDID;references:RepoDID"` 26 IndexedAt *time.Time `json:"indexedAt,omitempty" gorm:"column:indexed_at"` 27 StreamerRepoDID string `json:"streamerRepoDID" gorm:"column:streamer_repo_did;idx_recent_messages,priority:1"` 28 StreamerRepo *Repo `json:"streamerRepo,omitempty" gorm:"foreignKey:DID;references:StreamerRepoDID"` 29 ReplyToCID *string `json:"replyToCID,omitempty" gorm:"column:reply_to_cid"` 30 ReplyTo *ChatMessage `json:"replyTo,omitempty" gorm:"foreignKey:ReplyToCID;references:CID"` 31 DeletedAt *time.Time `json:"deletedAt,omitempty" gorm:"column:deleted_at"` 32} 33 34// hashString creates a hash from a string, used for deterministic color selection 35func hashString(s string) int { 36 h := fnv.New32a() 37 h.Write([]byte(s)) 38 return int(h.Sum32()) 39} 40 41func (m *ChatMessage) ToStreamplaceMessageView() (*streamplace.ChatDefs_MessageView, error) { 42 rec, err := lexutil.CborDecodeValue(*m.ChatMessage) 43 if err != nil { 44 return nil, fmt.Errorf("error decoding feed post: %w", err) 45 } 46 // Truncate message text if it is a ChatMessage 47 if msg, ok := rec.(*streamplace.ChatMessage); ok { 48 graphemeCount := uniseg.GraphemeClusterCount(msg.Text) 49 if graphemeCount > 300 { 50 gr := uniseg.NewGraphemes(msg.Text) 51 var result strings.Builder 52 for count := 0; count < 300 && gr.Next(); count++ { 53 result.WriteString(gr.Str()) 54 } 55 msg.Text = result.String() 56 } 57 } 58 59 message := &streamplace.ChatDefs_MessageView{ 60 LexiconTypeID: "place.stream.chat.defs#messageView", 61 } 62 message.Uri = m.URI 63 message.Cid = m.CID 64 message.Author = &bsky.ActorDefs_ProfileViewBasic{ 65 Did: m.RepoDID, 66 } 67 if m.Repo != nil { 68 message.Author.Handle = m.Repo.Handle 69 } 70 message.Record = &lexutil.LexiconTypeDecoder{Val: rec} 71 message.IndexedAt = m.IndexedAt.UTC().Format(time.RFC3339Nano) 72 if m.ChatProfile != nil { 73 scp, err := m.ChatProfile.ToStreamplaceChatProfile() 74 if err != nil { 75 return nil, fmt.Errorf("error converting chat profile to streamplace chat profile: %w", err) 76 } 77 message.ChatProfile = scp 78 } else { 79 // If no chat profile exists, create a default one with a color based on the user's DID 80 defaultColor := DefaultColors[hashString(m.RepoDID)%len(DefaultColors)] 81 message.ChatProfile = &streamplace.ChatProfile{ 82 Color: defaultColor, 83 } 84 85 } 86 if m.ReplyTo != nil { 87 replyTo, err := m.ReplyTo.ToStreamplaceMessageView() 88 if err != nil { 89 return nil, fmt.Errorf("error converting reply to to streamplace message view: %w", err) 90 } 91 message.ReplyTo = &streamplace.ChatDefs_MessageView_ReplyTo{ 92 ChatDefs_MessageView: replyTo, 93 } 94 } 95 return message, nil 96} 97 98func (m *DBModel) CreateChatMessage(ctx context.Context, message *ChatMessage) error { 99 return m.DB.Create(message).Error 100} 101 102func (m *DBModel) DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error { 103 tx := m.DB.Model(&ChatMessage{}).Where("uri = ?", uri).Update("deleted_at", deletedAt) 104 if tx.Error != nil { 105 return tx.Error 106 } 107 if tx.RowsAffected == 0 { 108 return fmt.Errorf("no chat message found for uri: %s", uri) 109 } 110 return nil 111} 112 113func (m *DBModel) GetChatMessage(uri string) (*ChatMessage, error) { 114 var message ChatMessage 115 err := m.DB. 116 Preload("Repo"). 117 Preload("ChatProfile"). 118 Preload("ReplyTo"). 119 Preload("ReplyTo.Repo"). 120 Preload("ReplyTo.ChatProfile"). 121 Where("uri = ?", uri). 122 Where("deleted_at IS NULL"). 123 First(&message). 124 Error 125 if errors.Is(err, gorm.ErrRecordNotFound) { 126 return nil, nil 127 } 128 if err != nil { 129 return nil, fmt.Errorf("error retrieving chat message: %w", err) 130 } 131 return &message, nil 132} 133 134func (m *DBModel) MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) { 135 return m.MostRecentChatMessagesForViewer(repoDID, "") 136} 137 138func (m *DBModel) MostRecentChatMessagesForViewer(repoDID, viewerDID string) ([]*streamplace.ChatDefs_MessageView, error) { 139 dbmessages := []ChatMessage{} 140 q := m.DB. 141 Preload("Repo"). 142 Preload("ChatProfile"). 143 Preload("ReplyTo"). 144 Preload("ReplyTo.Repo"). 145 Preload("ReplyTo.ChatProfile"). 146 Where("streamer_repo_did = ?", repoDID). 147 // Exclude messages from users blocked by the streamer 148 Joins("LEFT JOIN blocks ON blocks.repo_did = chat_messages.streamer_repo_did AND blocks.subject_did = chat_messages.repo_did"). 149 Where("blocks.rkey IS NULL"). // Only include messages where no block exists 150 // Exclude gated messages 151 Joins("LEFT JOIN gates ON gates.repo_did = chat_messages.streamer_repo_did AND gates.hidden_message = chat_messages.uri"). 152 Where("gates.hidden_message IS NULL"). // Only include messages where no gate exists 153 // Exclude labeled messages 154 Joins("LEFT JOIN labels ON labels.uri = chat_messages.uri"). 155 Where("labels.uri IS NULL"). // Only include messages where no label exists 156 // Exclude deleted messages 157 Where("chat_messages.deleted_at IS NULL") 158 159 if viewerDID != "" { 160 q = q.Joins("LEFT JOIN blocks AS viewer_blocks ON viewer_blocks.repo_did = ? AND viewer_blocks.subject_did = chat_messages.repo_did", viewerDID). 161 Where("viewer_blocks.rkey IS NULL") 162 } 163 164 err := q.Limit(100). 165 Order("chat_messages.created_at DESC"). 166 Find(&dbmessages).Error 167 if err != nil { 168 return nil, fmt.Errorf("error retrieving replies: %w", err) 169 } 170 spmessages := []*streamplace.ChatDefs_MessageView{} 171 for _, m := range dbmessages { 172 spmessage, err := m.ToStreamplaceMessageView() 173 if err != nil { 174 return nil, fmt.Errorf("error converting feed post to bsky post view: %w", err) 175 } 176 spmessages = append(spmessages, spmessage) 177 } 178 return spmessages, nil 179}