# Collaboration Feature Implementation Plan > **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Implement real-time collaboration for Markdown documents with up to 5 users, paragraph-level comments, and invite-based access control. **Architecture:** Server-side collaboration hub with WebSocket connections. Server maintains canonical document state, broadcasts edits, debounces ATProto persistence. Comments stored in separate ATProto collection. **Tech Stack:** Go (stdlib, gorilla/websocket), ATProto XRPC, SQLite --- ## Chunk 1: Database Migration and Models ### Task 1.1: Create invites migration **Files:** - Create: `migrations/005_create_invites.sql` - [ ] **Step 1: Write the migration** ```sql CREATE TABLE IF NOT EXISTS invites ( id TEXT PRIMARY KEY, document_rkey TEXT NOT NULL, token TEXT NOT NULL UNIQUE, created_by_did TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, expires_at DATETIME NOT NULL ); CREATE INDEX idx_invites_document ON invites(document_rkey); CREATE INDEX idx_invites_token ON invites(token); ``` - [ ] **Step 2: Commit** ```bash git add migrations/005_create_invites.sql git commit -m "feat: add invites table migration" ``` ### Task 1.2: Update models for collaboration **Files:** - Modify: `internal/model/models.go` - [ ] **Step 1: Write the failing test (skip - no existing tests)** - [ ] **Step 2: Add Invite and Comment types** ```go type Invite struct { ID string DocumentRKey string Token string CreatedBy string CreatedAt time.Time ExpiresAt time.Time } type Comment struct { URI string DocumentURI string ParagraphID string Text string AuthorDID string AuthorName string CreatedAt string } ``` - [ ] **Step 3: Add Collaborators field to Document** In the `Document` struct, add: ```go Collaborators []string `json:"collaborators,omitempty"` ``` - [ ] **Step 4: Commit** ```bash git add internal/model/models.go git commit -m "feat: add Invite, Comment models and Document.collaborators" ``` --- ## Chunk 2: Collaboration Package (Core Logic) ### Task 2.1: Create collaboration hub **Files:** - Create: `internal/collaboration/hub.go` - [ ] **Step 1: Write the hub with WebSocket room management** ```go package collaboration import ( "log" "sync" ) type Hub struct { rooms map[string]*Room mu sync.RWMutex } type Room struct { documentRKey string clients map[*Client]bool broadcast chan []byte register chan *Client unregister chan *Client mu sync.RWMutex } func NewHub() *Hub { return &Hub{ rooms: make(map[string]*Room), } } func (h *Hub) GetOrCreateRoom(rkey string) *Room { h.mu.Lock() defer h.mu.Unlock() if room, exists := h.rooms[rkey]; exists { return room } room := &Room{ documentRKey: rkey, clients: make(map[*Client]bool), broadcast: make(chan []byte, 256), register: make(chan *Client), unregister: make(chan *Client), } h.rooms[rkey] = room go room.run() return room } func (r *Room) run() { for { select { case client := <-r.register: r.mu.Lock() r.clients[client] = true r.mu.Unlock() r.broadcastPresence() case client := <-r.unregister: r.mu.Lock() if _, ok := r.clients[client]; ok { delete(r.clients, client) close(client.send) } r.mu.Unlock() r.broadcastPresence() case message := <-r.broadcast: r.mu.RLock() for client := range r.clients { select { case client.send <- message: default: close(client.send) delete(r.clients, client) } } r.mu.RUnlock() } } } func (r *Room) Broadcast(message []byte) { r.broadcast <- message } func (r *Room) broadcastPresence() { // Implementation in Task 2.2 } ``` - [ ] **Step 2: Commit** ```bash git add internal/collaboration/hub.go git commit -m "feat: add collaboration hub with room management" ``` ### Task 2.2: Create client representation **Files:** - Create: `internal/collaboration/client.go` - [ ] **Step 1: Write the client struct** ```go package collaboration import ( "github.com/gorilla/websocket" ) type Client struct { hub *Hub conn *websocket.Conn send chan []byte DID string Name string Color string roomKey string } type ClientMessage struct { Type string `json:"type"` RKey string `json:"rkey,omitempty"` DID string `json:"did,omitempty"` Delta json.RawMessage `json:"delta,omitempty"` Cursor *CursorPos `json:"cursor,omitempty"` Comment *CommentMsg `json:"comment,omitempty"` } type CursorPos struct { Position int `json:"position"` SelectionEnd int `json:"selectionEnd"` } type CommentMsg struct { ParagraphID string `json:"paragraphId"` Text string `json:"text"` } type PresenceUser struct { DID string `json:"did"` Name string `json:"name"` Color string `json:"color"` } type PresenceMessage struct { Type string `json:"type"` Users []PresenceUser `json:"users"` } func NewClient(hub *Hub, conn *websocket.Conn, did, name, color, roomKey string) *Client { return &Client{ hub: hub, conn: conn, send: make(chan []byte, 256), DID: did, Name: name, Color: color, roomKey: roomKey, } } func (c *Client) ReadPump() { defer func() { c.hub.unregister <- c c.conn.Close() }() for { _, message, err := c.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("WebSocket error: %v", err) } break } // Handle message - dispatch to appropriate handler } } func (c *Client) WritePump() { defer c.conn.Close() for { message, ok := <-c.send if !ok { c.conn.WriteMessage(websocket.CloseMessage, []byte{}) return } if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil { return } } } ``` - [ ] **Step 2: Implement presence broadcasting in hub** Add to `hub.go`: ```go func (r *Room) GetPresence() []PresenceUser { r.mu.RLock() defer r.mu.RUnlock() users := make([]PresenceUser, 0, len(r.clients)) for client := range r.clients { users = append(users, PresenceUser{ DID: client.DID, Name: client.Name, Color: client.Color, }) } return users } func (r *Room) broadcastPresence() { presence := PresenceMessage{ Type: "presence", Users: r.GetPresence(), } data, _ := json.Marshal(presence) r.Broadcast(data) } ``` - [ ] **Step 3: Commit** ```bash git add internal/collaboration/client.go internal/collaboration/hub.go git commit -m "feat: add client representation and presence broadcasting" ``` ### Task 2.3: Create invite system **Files:** - Create: `internal/collaboration/invite.go` - [ ] **Step 1: Write the invite logic** ```go package collaboration import ( "crypto/rand" "crypto/sha256" "encoding/hex" "time" "github.com/limeleaf/diffdown/internal/db" ) func GenerateInviteToken() (string, error) { bytes := make([]byte, 32) if _, err := rand.Read(bytes); err != nil { return "", err } hash := sha256.Sum256(bytes) return hex.EncodeToString(hash[:]), nil } func CreateInvite(db *db.DB, documentRKey, createdByDID string) (*model.Invite, error) { token, err := GenerateInviteToken() if err != nil { return nil, err } invite := &model.Invite{ ID: db.NewID(), DocumentRKey: documentRKey, Token: token, CreatedBy: createdByDID, CreatedAt: time.Now(), ExpiresAt: time.Now().Add(7 * 24 * time.Hour), } err = db.CreateInvite(invite) return invite, err } func ValidateInvite(db *db.DB, token, documentRKey string) (*model.Invite, error) { invite, err := db.GetInviteByToken(token) if err != nil { return nil, err } if invite.DocumentRKey != documentRKey { return nil, fmt.Errorf("invite does not match document") } if time.Now().After(invite.ExpiresAt) { return nil, fmt.Errorf("invite expired") } return invite, nil } ``` - [ ] **Step 2: Add DB methods** In `internal/db/db.go`, add: ```go func (db *DB) CreateInvite(invite *model.Invite) error { _, err := db.Exec(` INSERT INTO invites (id, document_rkey, token, created_by_did, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?)`, invite.ID, invite.DocumentRKey, invite.Token, invite.CreatedBy, invite.CreatedAt, invite.ExpiresAt) return err } func (db *DB) GetInviteByToken(token string) (*model.Invite, error) { row := db.QueryRow(`SELECT id, document_rkey, token, created_by_did, created_at, expires_at FROM invites WHERE token = ?`, token) var invite model.Invite err := row.Scan(&invite.ID, &invite.DocumentRKey, &invite.Token, &invite.CreatedBy, &invite.CreatedAt, &invite.ExpiresAt) if err != nil { return nil, err } return &invite, nil } ``` - [ ] **Step 3: Commit** ```bash git add internal/collaboration/invite.go internal/db/db.go git commit -m "feat: add invite generation and validation" ``` ### Task 2.4: Create OT helpers **Files:** - Create: `internal/collaboration/ot.go` - [ ] **Step 1: Write simplified OT logic** ```go package collaboration import "sync" type OTEngine struct { mu sync.Mutex documentText string version int } func NewOTEngine(initialText string) *OTEngine { return &OTEngine{ documentText: initialText, version: 0, } } type Operation struct { From int `json:"from"` To int `json:"to"` Insert string `json:"insert"` Author string `json:"author"` } func (ot *OTEngine) Apply(op Operation) string { ot.mu.Lock() defer ot.mu.Unlock() // Simple last-write-wins if op.To > len(ot.documentText) { op.To = len(ot.documentText) } if op.From > len(ot.documentText) { op.From = len(ot.documentText) } newText := ot.documentText[:op.From] + op.Insert + ot.documentText[op.To:] ot.documentText = newText ot.version++ return ot.documentText } func (ot *OTEngine) GetText() string { ot.mu.Lock() defer ot.mu.Unlock() return ot.documentText } func (ot *OTEngine) GetVersion() int { ot.mu.Lock() defer ot.mu.Unlock() return ot.version } ``` - [ ] **Step 2: Commit** ```bash git add internal/collaboration/ot.go git commit -m "feat: add simplified OT engine" ``` --- ## Chunk 3: HTTP Handlers ### Task 3.1: Document invite handler **Files:** - Modify: `internal/handler/handler.go` - [ ] **Step 1: Add DocumentInvite handler** ```go func (h *Handler) DocumentInvite(w http.ResponseWriter, r *http.Request) { user := h.currentUser(r) if user == nil { http.Redirect(w, r, "/auth/login", http.StatusSeeOther) return } rKey := model.RKeyFromURI(r.URL.Path) if rKey == "" { http.Error(w, "Invalid document", http.StatusBadRequest) return } // Get document to verify ownership client := h.xrpcClient(r) if client == nil { h.render(w, "document_edit.html", PageData{Error: "Not authenticated with ATProto"}) return } doc, err := client.GetDocument(rKey) if err != nil { http.Error(w, "Document not found", http.StatusNotFound) return } // Verify user is creator (DID matches) session, _ := h.db.GetATProtoSession(user.ID) if session == nil || session.DID != doc.URI { http.Error(w, "Unauthorized", http.StatusForbidden) return } // Check collaborator limit (5 max) if len(doc.Collaborators) >= 5 { http.Error(w, "Maximum collaborators reached", http.StatusBadRequest) return } // Create invite invite, err := collaboration.CreateInvite(h.db, rKey, session.DID) if err != nil { log.Printf("DocumentInvite: create invite: %v", err) http.Error(w, "Failed to create invite", http.StatusInternalServerError) return } inviteLink := fmt.Sprintf("%s/doc/%s?invite=%s", os.Getenv("BASE_URL"), rKey, invite.Token) h.render(w, "document_edit.html", PageData{ Content: map[string]interface{}{ "document": doc, "inviteLink": inviteLink, }, }) } ``` - [ ] **Step 2: Register route in main.go** ```go mux.HandleFunc("POST /api/docs/{rkey}/invite", handler.DocumentInvite) ``` - [ ] **Step 3: Commit** ```bash git add internal/handler/handler.go cmd/server/main.go git commit -m "feat: add document invite handler" ``` ### Task 3.2: Accept invite handler **Files:** - Modify: `internal/handler/handler.go` - [ ] **Step 1: Add AcceptInvite handler** ```go func (h *Handler) AcceptInvite(w http.ResponseWriter, r *http.Request) { user := h.currentUser(r) if user == nil { http.Redirect(w, r, "/auth/login", http.StatusSeeOther) return } rKey := model.RKeyFromURI(r.URL.Path) inviteToken := r.URL.Query().Get("invite") if inviteToken == "" { http.Error(w, "Invalid invite", http.StatusBadRequest) return } // Validate invite invite, err := collaboration.ValidateInvite(h.db, inviteToken, rKey) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } // Get ATProto session session, err := h.db.GetATProtoSession(user.ID) if err != nil || session == nil { http.Redirect(w, r, "/auth/atproto", http.StatusSeeOther) return } // Add user to collaborators via ATProto client, err := h.newXRPCClient(session) if err != nil { http.Error(w, "Failed to connect to ATProto", http.StatusInternalServerError) return } // Get current document doc, err := client.GetDocument(rKey) if err != nil { http.Error(w, "Document not found", http.StatusNotFound) return } // Check if already collaborator for _, c := range doc.Collaborators { if c == session.DID { http.Redirect(w, r, "/doc/"+rKey, http.StatusSeeOther) return } } // Add to collaborators doc.Collaborators = append(doc.Collaborators, session.DID) err = client.PutDocument(rKey, doc) if err != nil { log.Printf("AcceptInvite: add collaborator: %v", err) http.Error(w, "Failed to add collaborator", http.StatusInternalServerError) return } // Delete invite token after use h.db.DeleteInvite(invite.Token) http.Redirect(w, r, "/doc/"+rKey, http.StatusSeeOther) } ``` - [ ] **Step 2: Register route** ```go mux.HandleFunc("GET /doc/{rkey}/accept", handler.AcceptInvite) ``` - [ ] **Step 3: Commit** ```bash git add internal/handler/handler.go cmd/server/main.go git commit -m "feat: add accept invite handler" ``` ### Task 3.3: Comment handlers **Files:** - Modify: `internal/handler/handler.go` - [ ] **Step 1: Add CommentCreate handler** ```go func (h *Handler) CommentCreate(w http.ResponseWriter, r *http.Request) { user := h.currentUser(r) if user == nil { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } rKey := model.RKeyFromURI(r.URL.Path) if rKey == "" { http.Error(w, "Invalid document", http.StatusBadRequest) return } // Parse request body var req struct { ParagraphID string `json:"paragraphId"` Text string `json:"text"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Invalid request", http.StatusBadRequest) return } if req.Text == "" { http.Error(w, "Comment text required", http.StatusBadRequest) return } // Get ATProto session session, err := h.db.GetATProtoSession(user.ID) if err != nil || session == nil { http.Error(w, "Not authenticated with ATProto", http.StatusUnauthorized) return } client, err := h.newXRPCClient(session) if err != nil { http.Error(w, "Failed to connect to ATProto", http.StatusInternalServerError) return } // Create comment record comment := &model.Comment{ DocumentURI: fmt.Sprintf("at://%s/com.diffdown.document/%s", session.DID, rKey), ParagraphID: req.ParagraphID, Text: req.Text, AuthorDID: session.DID, } uri, err := client.CreateComment(comment) if err != nil { log.Printf("CommentCreate: %v", err) http.Error(w, "Failed to create comment", http.StatusInternalServerError) return } h.jsonResponse(w, map[string]string{"uri": uri}, http.StatusCreated) } ``` - [ ] **Step 2: Add CommentList handler** ```go func (h *Handler) CommentList(w http.ResponseWriter, r *http.Request) { rKey := model.RKeyFromURI(r.URL.Path) if rKey == "" { http.Error(w, "Invalid document", http.StatusBadRequest) return } user := h.currentUser(r) if user == nil { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } session, err := h.db.GetATProtoSession(user.ID) if err != nil || session == nil { http.Error(w, "Not authenticated with ATProto", http.StatusUnauthorized) return } client, err := h.newXRPCClient(session) if err != nil { http.Error(w, "Failed to connect to ATProto", http.StatusInternalServerError) return } comments, err := client.ListComments(rKey) if err != nil { log.Printf("CommentList: %v", err) http.Error(w, "Failed to list comments", http.StatusInternalServerError) return } h.jsonResponse(w, comments, http.StatusOK) } ``` - [ ] **Step 3: Register routes** ```go mux.HandleFunc("POST /api/docs/{rkey}/comments", handler.CommentCreate) mux.HandleFunc("GET /api/docs/{rkey}/comments", handler.CommentList) ``` - [ ] **Step 4: Commit** ```bash git add internal/handler/handler.go cmd/server/main.go git commit -m "feat: add comment handlers" ``` --- ## Chunk 4: WebSocket Handler ### Task 4.1: WebSocket upgrade handler **Files:** - Modify: `internal/handler/handler.go`, `cmd/server/main.go` - [ ] **Step 1: Add CollaboratorWebSocket handler** ```go var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } func (h *Handler) CollaboratorWebSocket(w http.ResponseWriter, r *http.Request) { rKey := model.RKeyFromURI(r.URL.Path) if rKey == "" { http.Error(w, "Invalid document", http.StatusBadRequest) return } // Get access token and DPoP proof from query params accessToken := r.URL.Query().Get("access_token") dpopProof := r.URL.Query().Get("dpop_proof") if accessToken == "" || dpopProof == "" { http.Error(w, "Missing auth tokens", http.StatusUnauthorized) return } // Validate tokens and get DID did, name, err := h.validateWSToken(accessToken, dpopProof) if err != nil { http.Error(w, "Invalid tokens", http.StatusUnauthorized) return } // Get document and verify collaborator access session, _ := h.db.GetATProtoSessionByDID(did) if session == nil { http.Error(w, "No ATProto session", http.StatusUnauthorized) return } client, err := h.newXRPCClient(session) if err != nil { http.Error(w, "Failed to connect to ATProto", http.StatusInternalServerError) return } doc, err := client.GetDocument(rKey) if err != nil { http.Error(w, "Document not found", http.StatusNotFound) return } // Check if user is collaborator isCollaborator := false for _, c := range doc.Collaborators { if c == did { isCollaborator = true break } } if !isCollaborator { http.Error(w, "Not a collaborator", http.StatusForbidden) return } // Generate color based on DID color := colorFromDID(did) // Upgrade connection conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("WebSocket upgrade failed: %v", err) return } // Get room and register client room := h.CollaborationHub.GetOrCreateRoom(rKey) wsClient := collaboration.NewClient(h.CollaborationHub, conn, did, name, color, rKey) room.Register <- wsClient // Start pumps go wsClient.WritePump() wsClient.ReadPump() } func (h *Handler) validateWSToken(accessToken, dpopProof string) (string, string, error) { // Validate JWT and DPoP proof, extract DID and name // Use existing ATProto token validation return "", "", nil } func colorFromDID(did string) string { colors := []string{"#e74c3c", "#3498db", "#2ecc71", "#9b59b6", "#f39c12"} hash := 0 for _, c := range did { hash += int(c) } return colors[hash%len(colors)] } ``` - [ ] **Step 2: Wire up Hub in main.go** ```go // In main.go, add to Handler struct or global var collaborationHub = collaboration.NewHub() // Pass to handler handler := &handler.Handler{ DB: db, Store: store, Render: r, CollaborationHub: collaborationHub, } ``` - [ ] **Step 3: Register WebSocket route** ```go mux.HandleFunc("GET /ws/doc/{rkey}", handler.CollaboratorWebSocket) ``` - [ ] **Step 4: Commit** ```bash git add internal/handler/handler.go cmd/server/main.go git commit -m "feat: add WebSocket collaboration handler" ``` --- ## Chunk 5: Frontend Updates ### Task 5.1: WebSocket client and presence **Files:** - Modify: `templates/document_edit.html` - [ ] **Step 1: Add WebSocket connection** ```javascript // Add to document_edit.html let ws = null; let collaborators = []; function connectWebSocket(rkey) { const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; const wsUrl = `${protocol}//${window.location.host}/ws/doc/${rkey}?access_token=${encodeURIComponent(getAccessToken())}&dpop_proof=${encodeURIComponent(getDPoPProof())}`; ws = new WebSocket(wsUrl); ws.onopen = () => { console.log('WebSocket connected'); ws.send(JSON.stringify({ type: 'join', rkey: rkey, did: getCurrentDID() })); }; ws.onmessage = (event) => { const msg = JSON.parse(event.data); handleWSMessage(msg); }; ws.onclose = () => { console.log('WebSocket disconnected'); setTimeout(() => connectWebSocket(rkey), 3000); }; } function handleWSMessage(msg) { switch (msg.type) { case 'presence': updatePresenceSidebar(msg.users); break; case 'edit': applyRemoteEdit(msg.delta); break; case 'sync': setEditorContent(msg.content); break; } } function updatePresenceSidebar(users) { collaborators = users; const sidebar = document.getElementById('presence-sidebar'); if (!sidebar) return; sidebar.innerHTML = users.map(u => `