"""Data models for thought.stream system.""" from typing import Optional, List, Dict, Any, Literal, Union from datetime import datetime from pydantic import BaseModel, Field, validator import json class BlipRecord(BaseModel): """Model for stream.thought.blip record.""" type: Literal["stream.thought.blip"] = Field(alias="$type", default="stream.thought.blip") content: str = Field(..., description="The blip message content") created_at: datetime = Field(alias="createdAt", description="When the blip was created") class Config: populate_by_name = True json_encoders = { datetime: lambda v: v.isoformat().replace("+00:00", "Z") } class JetstreamCommit(BaseModel): """Model for jetstream commit data.""" rev: str operation: Literal["create", "update", "delete"] collection: str rkey: str record: Optional[Dict[str, Any]] = None cid: Optional[str] = None class JetstreamIdentity(BaseModel): """Model for jetstream identity update.""" did: str handle: Optional[str] = None seq: int time: str class JetstreamAccount(BaseModel): """Model for jetstream account update.""" active: bool did: str seq: int time: str class JetstreamEvent(BaseModel): """Model for jetstream events.""" did: str time_us: int kind: Literal["commit", "identity", "account"] commit: Optional[JetstreamCommit] = None identity: Optional[JetstreamIdentity] = None account: Optional[JetstreamAccount] = None class DIDDocument(BaseModel): """Model for DID document structure.""" context: Optional[List[Union[str, Dict[str, Any]]]] = Field(alias="@context", default=None) id: str also_known_as: Optional[List[str]] = Field(alias="alsoKnownAs", default=None) verification_method: Optional[List[Dict[str, Any]]] = Field(alias="verificationMethod", default=None) service: Optional[List[Dict[str, Any]]] = None class Config: populate_by_name = True class ProfileData(BaseModel): """Model for cached profile data.""" handle: str display_name: Optional[str] = None class CacheEntry(BaseModel): """Model for cache entries.""" value: ProfileData timestamp: float ttl: int @property def is_expired(self) -> bool: """Check if cache entry is expired.""" import time return time.time() - self.timestamp > self.ttl class BlipMessage(BaseModel): """Model for formatted blip message display.""" author_handle: str author_display_name: Optional[str] = None author_did: str created_at: datetime content: str record_uri: Optional[str] = None record_cid: Optional[str] = None def format_display(self) -> str: """Format the blip for display in the specified XML-like format.""" metadata_lines = [f"author: {self.author_handle}"] if self.author_display_name: metadata_lines.append(f"displayName: {self.author_display_name}") metadata_lines.extend([ f"did: {self.author_did}", f"createdAt: {self.created_at.isoformat()}", f"message_source: Thought stream is an experimental real-time, global, multi-agent communication system with optional human participation. Powered by AT Protocol.", f"context: A different user has just sent this message to the thought stream." ]) return f""" {chr(10).join(metadata_lines)} {self.content} """ def to_json(self) -> str: """Convert to JSON for machine processing.""" return self.json(indent=2, ensure_ascii=False) class PublishRequest(BaseModel): """Model for blip publish requests.""" content: str = Field(..., min_length=1, max_length=5000, description="Blip content") created_at: Optional[datetime] = None @validator('created_at', pre=True, always=True) def set_created_at(cls, v): """Set created_at to now if not provided.""" return v or datetime.utcnow() def to_record(self) -> BlipRecord: """Convert to BlipRecord for publishing.""" return BlipRecord( content=self.content, createdAt=self.created_at )