Live video on the AT Protocol
at eli/rtmp-rec 154 lines 5.7 kB view raw
1package model 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "hash/fnv" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 lexutil "github.com/bluesky-social/indigo/lex/util" 12 "gorm.io/gorm" 13 "stream.place/streamplace/pkg/streamplace" 14) 15 16type ChatMessage struct { 17 CID string `json:"cid" gorm:"primaryKey;column:cid"` 18 URI string `json:"uri" gorm:"column:uri"` 19 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_recent_messages,priority:2"` 20 ChatMessage *[]byte `json:"chatMessage" gorm:"column:chat_message"` 21 RepoDID string `json:"repoDID" gorm:"column:repo_did"` 22 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 23 ChatProfile *ChatProfile `json:"chatProfile,omitempty" gorm:"foreignKey:RepoDID;references:RepoDID"` 24 IndexedAt *time.Time `json:"indexedAt,omitempty" gorm:"column:indexed_at"` 25 StreamerRepoDID string `json:"streamerRepoDID" gorm:"column:streamer_repo_did;idx_recent_messages,priority:1"` 26 StreamerRepo *Repo `json:"streamerRepo,omitempty" gorm:"foreignKey:DID;references:StreamerRepoDID"` 27 ReplyToCID *string `json:"replyToCID,omitempty" gorm:"column:reply_to_cid"` 28 ReplyTo *ChatMessage `json:"replyTo,omitempty" gorm:"foreignKey:ReplyToCID;references:CID"` 29 DeletedAt *time.Time `json:"deletedAt,omitempty" gorm:"column:deleted_at"` 30} 31 32// hashString creates a hash from a string, used for deterministic color selection 33func hashString(s string) int { 34 h := fnv.New32a() 35 h.Write([]byte(s)) 36 return int(h.Sum32()) 37} 38 39func (m *ChatMessage) ToStreamplaceMessageView() (*streamplace.ChatDefs_MessageView, error) { 40 rec, err := lexutil.CborDecodeValue(*m.ChatMessage) 41 if err != nil { 42 return nil, fmt.Errorf("error decoding feed post: %w", err) 43 } 44 message := &streamplace.ChatDefs_MessageView{ 45 LexiconTypeID: "place.stream.chat.defs#messageView", 46 } 47 message.Uri = m.URI 48 message.Cid = m.CID 49 message.Author = &bsky.ActorDefs_ProfileViewBasic{ 50 Did: m.RepoDID, 51 } 52 if m.Repo != nil { 53 message.Author.Handle = m.Repo.Handle 54 } 55 message.Record = &lexutil.LexiconTypeDecoder{Val: rec} 56 message.IndexedAt = m.IndexedAt.UTC().Format(time.RFC3339Nano) 57 if m.ChatProfile != nil { 58 scp, err := m.ChatProfile.ToStreamplaceChatProfile() 59 if err != nil { 60 return nil, fmt.Errorf("error converting chat profile to streamplace chat profile: %w", err) 61 } 62 message.ChatProfile = scp 63 } else { 64 // If no chat profile exists, create a default one with a color based on the user's DID 65 defaultColor := DefaultColors[hashString(m.RepoDID)%len(DefaultColors)] 66 message.ChatProfile = &streamplace.ChatProfile{ 67 Color: defaultColor, 68 } 69 70 } 71 if m.ReplyTo != nil { 72 replyTo, err := m.ReplyTo.ToStreamplaceMessageView() 73 if err != nil { 74 return nil, fmt.Errorf("error converting reply to to streamplace message view: %w", err) 75 } 76 message.ReplyTo = &streamplace.ChatDefs_MessageView_ReplyTo{ 77 ChatDefs_MessageView: replyTo, 78 } 79 } 80 return message, nil 81} 82 83func (m *DBModel) CreateChatMessage(ctx context.Context, message *ChatMessage) error { 84 return m.DB.Create(message).Error 85} 86 87func (m *DBModel) DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error { 88 tx := m.DB.Model(&ChatMessage{}).Where("uri = ?", uri).Update("deleted_at", deletedAt) 89 if tx.Error != nil { 90 return tx.Error 91 } 92 if tx.RowsAffected == 0 { 93 return fmt.Errorf("no chat message found for uri: %s", uri) 94 } 95 return nil 96} 97 98func (m *DBModel) GetChatMessage(uri string) (*ChatMessage, error) { 99 var message ChatMessage 100 err := m.DB. 101 Preload("Repo"). 102 Preload("ChatProfile"). 103 Preload("ReplyTo"). 104 Preload("ReplyTo.Repo"). 105 Preload("ReplyTo.ChatProfile"). 106 Where("uri = ?", uri). 107 Where("deleted_at IS NULL"). 108 First(&message). 109 Error 110 if errors.Is(err, gorm.ErrRecordNotFound) { 111 return nil, nil 112 } 113 if err != nil { 114 return nil, fmt.Errorf("error retrieving chat message: %w", err) 115 } 116 return &message, nil 117} 118 119func (m *DBModel) MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) { 120 dbmessages := []ChatMessage{} 121 err := m.DB. 122 Preload("Repo"). 123 Preload("ChatProfile"). 124 Preload("ReplyTo"). 125 Preload("ReplyTo.Repo"). 126 Preload("ReplyTo.ChatProfile"). 127 Where("streamer_repo_did = ?", repoDID). 128 // Exclude messages from users blocked by the streamer 129 Joins("LEFT JOIN blocks ON blocks.repo_did = chat_messages.streamer_repo_did AND blocks.subject_did = chat_messages.repo_did"). 130 Where("blocks.rkey IS NULL"). // Only include messages where no block exists 131 // Exclude gated messages 132 Joins("LEFT JOIN gates ON gates.repo_did = chat_messages.streamer_repo_did AND gates.hidden_message = chat_messages.uri"). 133 Where("gates.hidden_message IS NULL"). // Only include messages where no gate exists 134 // Exclude labeled messages 135 Joins("LEFT JOIN labels ON labels.uri = chat_messages.uri"). 136 Where("labels.uri IS NULL"). // Only include messages where no label exists 137 // Exclude deleted messages 138 Where("chat_messages.deleted_at IS NULL"). 139 Limit(100). 140 Order("chat_messages.created_at DESC"). 141 Find(&dbmessages).Error 142 if err != nil { 143 return nil, fmt.Errorf("error retrieving replies: %w", err) 144 } 145 spmessages := []*streamplace.ChatDefs_MessageView{} 146 for _, m := range dbmessages { 147 spmessage, err := m.ToStreamplaceMessageView() 148 if err != nil { 149 return nil, fmt.Errorf("error converting feed post to bsky post view: %w", err) 150 } 151 spmessages = append(spmessages, spmessage) 152 } 153 return spmessages, nil 154}