Live video on the AT Protocol
at eli/docs-url-fix 133 lines 4.7 kB view raw
1package model 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "hash/fnv" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 lexutil "github.com/bluesky-social/indigo/lex/util" 12 "gorm.io/gorm" 13 "stream.place/streamplace/pkg/streamplace" 14) 15 16type ChatMessage struct { 17 CID string `json:"cid" gorm:"primaryKey;column:cid"` 18 URI string `json:"uri" gorm:"column:uri"` 19 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_recent_messages,priority:2"` 20 ChatMessage *[]byte `json:"chatMessage" gorm:"column:chat_message"` 21 RepoDID string `json:"repoDID" gorm:"column:repo_did"` 22 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 23 ChatProfile *ChatProfile `json:"chatProfile,omitempty" gorm:"foreignKey:RepoDID;references:RepoDID"` 24 IndexedAt *time.Time `json:"indexedAt,omitempty" gorm:"column:indexed_at"` 25 StreamerRepoDID string `json:"streamerRepoDID" gorm:"column:streamer_repo_did;idx_recent_messages,priority:1"` 26 StreamerRepo *Repo `json:"streamerRepo,omitempty" gorm:"foreignKey:DID;references:StreamerRepoDID"` 27 ReplyToCID *string `json:"replyToCID,omitempty" gorm:"column:reply_to_cid"` 28 ReplyTo *ChatMessage `json:"replyTo,omitempty" gorm:"foreignKey:ReplyToCID;references:CID"` 29} 30 31// hashString creates a hash from a string, used for deterministic color selection 32func hashString(s string) int { 33 h := fnv.New32a() 34 h.Write([]byte(s)) 35 return int(h.Sum32()) 36} 37 38func (m *ChatMessage) ToStreamplaceMessageView() (*streamplace.ChatDefs_MessageView, error) { 39 rec, err := lexutil.CborDecodeValue(*m.ChatMessage) 40 if err != nil { 41 return nil, fmt.Errorf("error decoding feed post: %w", err) 42 } 43 message := &streamplace.ChatDefs_MessageView{ 44 LexiconTypeID: "place.stream.chat.defs#messageView", 45 } 46 message.Uri = m.URI 47 message.Cid = m.CID 48 message.Author = &bsky.ActorDefs_ProfileViewBasic{ 49 Did: m.RepoDID, 50 } 51 if m.Repo != nil { 52 message.Author.Handle = m.Repo.Handle 53 } 54 message.Record = &lexutil.LexiconTypeDecoder{Val: rec} 55 message.IndexedAt = m.IndexedAt.UTC().Format(time.RFC3339Nano) 56 if m.ChatProfile != nil { 57 scp, err := m.ChatProfile.ToStreamplaceChatProfile() 58 if err != nil { 59 return nil, fmt.Errorf("error converting chat profile to streamplace chat profile: %w", err) 60 } 61 message.ChatProfile = scp 62 } else { 63 // If no chat profile exists, create a default one with a color based on the user's DID 64 defaultColor := defaultColors[hashString(m.RepoDID)%len(defaultColors)] 65 message.ChatProfile = &streamplace.ChatProfile{ 66 Color: defaultColor, 67 } 68 69 } 70 if m.ReplyTo != nil { 71 replyTo, err := m.ReplyTo.ToStreamplaceMessageView() 72 if err != nil { 73 return nil, fmt.Errorf("error converting reply to to streamplace message view: %w", err) 74 } 75 message.ReplyTo = &streamplace.ChatDefs_MessageView_ReplyTo{ 76 ChatDefs_MessageView: replyTo, 77 } 78 } 79 return message, nil 80} 81 82func (m *DBModel) CreateChatMessage(ctx context.Context, message *ChatMessage) error { 83 return m.DB.Create(message).Error 84} 85 86func (m *DBModel) GetChatMessage(cid string) (*ChatMessage, error) { 87 var message ChatMessage 88 err := m.DB. 89 Preload("Repo"). 90 Preload("ChatProfile"). 91 Preload("ReplyTo"). 92 Preload("ReplyTo.Repo"). 93 Preload("ReplyTo.ChatProfile"). 94 Where("cid = ?", cid). 95 First(&message). 96 Error 97 if errors.Is(err, gorm.ErrRecordNotFound) { 98 return nil, nil 99 } 100 if err != nil { 101 return nil, fmt.Errorf("error retrieving chat message: %w", err) 102 } 103 return &message, nil 104} 105 106func (m *DBModel) MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) { 107 dbmessages := []ChatMessage{} 108 err := m.DB. 109 Preload("Repo"). 110 Preload("ChatProfile"). 111 Preload("ReplyTo"). 112 Preload("ReplyTo.Repo"). 113 Preload("ReplyTo.ChatProfile"). 114 Where("streamer_repo_did = ?", repoDID). 115 // Exclude messages from users blocked by the streamer 116 Joins("LEFT JOIN blocks ON blocks.repo_did = chat_messages.streamer_repo_did AND blocks.subject_did = chat_messages.repo_did"). 117 Where("blocks.rkey IS NULL"). // Only include messages where no block exists 118 Limit(100). 119 Order("chat_messages.created_at DESC"). 120 Find(&dbmessages).Error 121 if err != nil { 122 return nil, fmt.Errorf("error retrieving replies: %w", err) 123 } 124 spmessages := []*streamplace.ChatDefs_MessageView{} 125 for _, m := range dbmessages { 126 spmessage, err := m.ToStreamplaceMessageView() 127 if err != nil { 128 return nil, fmt.Errorf("error converting feed post to bsky post view: %w", err) 129 } 130 spmessages = append(spmessages, spmessage) 131 } 132 return spmessages, nil 133}