Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.9.1 154 lines 5.7 kB view raw
1package model 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "hash/fnv" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 lexutil "github.com/bluesky-social/indigo/lex/util" 12 "gorm.io/gorm" 13 "stream.place/streamplace/pkg/streamplace" 14) 15 16type ChatMessage struct { 17 CID string `json:"cid" gorm:"primaryKey;column:cid"` 18 URI string `json:"uri" gorm:"column:uri"` 19 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_recent_messages,priority:2"` 20 ChatMessage *[]byte `json:"chatMessage" gorm:"column:chat_message"` 21 RepoDID string `json:"repoDID" gorm:"column:repo_did"` 22 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 23 ChatProfile *ChatProfile `json:"chatProfile,omitempty" gorm:"foreignKey:RepoDID;references:RepoDID"` 24 IndexedAt *time.Time `json:"indexedAt,omitempty" gorm:"column:indexed_at"` 25 StreamerRepoDID string `json:"streamerRepoDID" gorm:"column:streamer_repo_did;idx_recent_messages,priority:1"` 26 StreamerRepo *Repo `json:"streamerRepo,omitempty" gorm:"foreignKey:DID;references:StreamerRepoDID"` 27 ReplyToCID *string `json:"replyToCID,omitempty" gorm:"column:reply_to_cid"` 28 ReplyTo *ChatMessage `json:"replyTo,omitempty" gorm:"foreignKey:ReplyToCID;references:CID"` 29 DeletedAt *time.Time `json:"deletedAt,omitempty" gorm:"column:deleted_at"` 30} 31 32// hashString creates a hash from a string, used for deterministic color selection 33func hashString(s string) int { 34 h := fnv.New32a() 35 h.Write([]byte(s)) 36 return int(h.Sum32()) 37} 38 39func (m *ChatMessage) ToStreamplaceMessageView() (*streamplace.ChatDefs_MessageView, error) { 40 rec, err := lexutil.CborDecodeValue(*m.ChatMessage) 41 if err != nil { 42 return nil, fmt.Errorf("error decoding feed post: %w", err) 43 } 44 message := &streamplace.ChatDefs_MessageView{ 45 LexiconTypeID: "place.stream.chat.defs#messageView", 46 } 47 message.Uri = m.URI 48 message.Cid = m.CID 49 message.Author = &bsky.ActorDefs_ProfileViewBasic{ 50 Did: m.RepoDID, 51 } 52 if m.Repo != nil { 53 message.Author.Handle = m.Repo.Handle 54 } 55 message.Record = &lexutil.LexiconTypeDecoder{Val: rec} 56 message.IndexedAt = m.IndexedAt.UTC().Format(time.RFC3339Nano) 57 if m.ChatProfile != nil { 58 scp, err := m.ChatProfile.ToStreamplaceChatProfile() 59 if err != nil { 60 return nil, fmt.Errorf("error converting chat profile to streamplace chat profile: %w", err) 61 } 62 message.ChatProfile = scp 63 } else { 64 // If no chat profile exists, create a default one with a color based on the user's DID 65 defaultColor := DefaultColors[hashString(m.RepoDID)%len(DefaultColors)] 66 message.ChatProfile = &streamplace.ChatProfile{ 67 Color: defaultColor, 68 } 69 70 } 71 if m.ReplyTo != nil { 72 replyTo, err := m.ReplyTo.ToStreamplaceMessageView() 73 if err != nil { 74 return nil, fmt.Errorf("error converting reply to to streamplace message view: %w", err) 75 } 76 message.ReplyTo = &streamplace.ChatDefs_MessageView_ReplyTo{ 77 ChatDefs_MessageView: replyTo, 78 } 79 } 80 return message, nil 81} 82 83func (m *DBModel) CreateChatMessage(ctx context.Context, message *ChatMessage) error { 84 return m.DB.Create(message).Error 85} 86 87func (m *DBModel) DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error { 88 tx := m.DB.Model(&ChatMessage{}).Where("uri = ?", uri).Update("deleted_at", deletedAt) 89 if tx.Error != nil { 90 return tx.Error 91 } 92 if tx.RowsAffected == 0 { 93 return fmt.Errorf("no chat message found for uri: %s", uri) 94 } 95 return nil 96} 97 98func (m *DBModel) GetChatMessage(uri string) (*ChatMessage, error) { 99 var message ChatMessage 100 err := m.DB. 101 Preload("Repo"). 102 Preload("ChatProfile"). 103 Preload("ReplyTo"). 104 Preload("ReplyTo.Repo"). 105 Preload("ReplyTo.ChatProfile"). 106 Where("uri = ?", uri). 107 Where("deleted_at IS NULL"). 108 First(&message). 109 Error 110 if errors.Is(err, gorm.ErrRecordNotFound) { 111 return nil, nil 112 } 113 if err != nil { 114 return nil, fmt.Errorf("error retrieving chat message: %w", err) 115 } 116 return &message, nil 117} 118 119func (m *DBModel) MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) { 120 dbmessages := []ChatMessage{} 121 err := m.DB. 122 Preload("Repo"). 123 Preload("ChatProfile"). 124 Preload("ReplyTo"). 125 Preload("ReplyTo.Repo"). 126 Preload("ReplyTo.ChatProfile"). 127 Where("streamer_repo_did = ?", repoDID). 128 // Exclude messages from users blocked by the streamer 129 Joins("LEFT JOIN blocks ON blocks.repo_did = chat_messages.streamer_repo_did AND blocks.subject_did = chat_messages.repo_did"). 130 Where("blocks.rkey IS NULL"). // Only include messages where no block exists 131 // Exclude gated messages 132 Joins("LEFT JOIN gates ON gates.repo_did = chat_messages.streamer_repo_did AND gates.hidden_message = chat_messages.uri"). 133 Where("gates.hidden_message IS NULL"). // Only include messages where no gate exists 134 // Exclude labeled messages 135 Joins("LEFT JOIN labels ON labels.uri = chat_messages.uri"). 136 Where("labels.uri IS NULL"). // Only include messages where no label exists 137 // Exclude deleted messages 138 Where("chat_messages.deleted_at IS NULL"). 139 Limit(100). 140 Order("chat_messages.created_at DESC"). 141 Find(&dbmessages).Error 142 if err != nil { 143 return nil, fmt.Errorf("error retrieving replies: %w", err) 144 } 145 spmessages := []*streamplace.ChatDefs_MessageView{} 146 for _, m := range dbmessages { 147 spmessage, err := m.ToStreamplaceMessageView() 148 if err != nil { 149 return nil, fmt.Errorf("error converting feed post to bsky post view: %w", err) 150 } 151 spmessages = append(spmessages, spmessage) 152 } 153 return spmessages, nil 154}