A very experimental PLC implementation which uses BFT consensus for decentralization
at main 11 kB view raw
1package httpapi 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "log" 9 "net" 10 "net/http" 11 "net/http/pprof" 12 "slices" 13 "strconv" 14 "strings" 15 "sync" 16 "sync/atomic" 17 "time" 18 19 "github.com/bluesky-social/indigo/atproto/atcrypto" 20 "github.com/cometbft/cometbft/node" 21 "github.com/did-method-plc/go-didplc" 22 "github.com/google/uuid" 23 cbornode "github.com/ipfs/go-ipld-cbor" 24 "github.com/palantir/stacktrace" 25 "github.com/rs/cors" 26 "github.com/samber/lo" 27 28 "tangled.org/gbl08ma.com/didplcbft/abciapp" 29 "tangled.org/gbl08ma.com/didplcbft/plc" 30) 31 32// Server represents the HTTP server for the PLC directory. 33type Server struct { 34 plc plc.ReadPLC 35 router *http.ServeMux 36 node *node.Node 37 srv http.Server 38 handlerTimeout time.Duration 39 proto string 40 addr string 41 42 started atomic.Bool 43 exitDone sync.WaitGroup 44} 45 46// NewServer creates a new instance of the Server. 47func NewServer(plc plc.ReadPLC, node *node.Node, listenAddr string, handlerTimeout time.Duration) (*Server, error) { 48 s := &Server{ 49 plc: plc, 50 router: http.NewServeMux(), 51 node: node, 52 srv: http.Server{Addr: listenAddr}, 53 handlerTimeout: handlerTimeout, 54 } 55 s.setupRoutes() 56 57 handler := cors.Default().Handler(s.router) 58 59 timeoutMsg, _ := json.Marshal(map[string]string{"message": "Internal server timeout"}) 60 61 handler = http.TimeoutHandler(handler, s.handlerTimeout, string(timeoutMsg)) 62 63 s.srv.Handler = handler 64 65 parts := strings.SplitN(listenAddr, "://", 2) 66 if len(parts) != 2 { 67 return nil, stacktrace.NewError( 68 "invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", 69 listenAddr, 70 ) 71 } 72 s.proto, s.addr = parts[0], parts[1] 73 74 return s, nil 75} 76 77// setupRoutes configures the routes for the server. 78func (s *Server) setupRoutes() { 79 s.router.HandleFunc("GET /{did}", s.makeDIDHandler(s.handleResolveDID)) 80 s.router.HandleFunc("POST /{did}", s.makeDIDHandler(s.handleCreatePLC)) 81 s.router.HandleFunc("GET /{did}/log", s.makeDIDHandler(s.handleGetPLCLog)) 82 s.router.HandleFunc("GET /{did}/log/audit", s.makeDIDHandler(s.handleGetPLCAuditLog)) 83 s.router.HandleFunc("GET /{did}/log/last", s.makeDIDHandler(s.handleGetLastOp)) 84 s.router.HandleFunc("GET /{did}/data", s.makeDIDHandler(s.handleGetPLCData)) 85 s.router.HandleFunc("GET /export", s.handleExport) 86 87 s.router.HandleFunc("/debug/pprof/", pprof.Index) 88 s.router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) 89 s.router.HandleFunc("/debug/pprof/profile", pprof.Profile) 90 s.router.HandleFunc("/debug/pprof/symbol", pprof.Symbol) 91 s.router.HandleFunc("/debug/pprof/trace", pprof.Trace) 92 93 // Register handlers for specific profiles 94 s.router.Handle("/debug/pprof/heap", pprof.Handler("heap")) 95 s.router.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine")) 96 s.router.Handle("/debug/pprof/block", pprof.Handler("block")) 97 s.router.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate")) 98} 99 100// makeDIDHandler creates a wrapper handler that extracts DID from URL path 101func (s *Server) makeDIDHandler(handler func(http.ResponseWriter, *http.Request, string)) http.HandlerFunc { 102 return func(w http.ResponseWriter, r *http.Request) { 103 handler(w, r, r.PathValue("did")) 104 } 105} 106 107func (s *Server) Start() error { 108 if !s.started.CompareAndSwap(false, true) { 109 return stacktrace.NewError("server already started") 110 } 111 112 s.exitDone.Add(1) 113 114 listener, err := net.Listen(s.proto, s.addr) 115 if err != nil { 116 return stacktrace.Propagate(err, "failed to listen on %v", s.addr) 117 } 118 119 go func() { 120 defer s.exitDone.Done() 121 122 if err := s.srv.Serve(listener); !errors.Is(err, http.ErrServerClosed) { 123 log.Fatalf("ListenAndServe(): %v", err) 124 } 125 }() 126 return nil 127} 128 129func (s *Server) Stop() error { 130 ctx, cancel := context.WithTimeout(context.Background(), s.handlerTimeout) 131 defer cancel() 132 if err := s.srv.Shutdown(ctx); err != nil { 133 return stacktrace.Propagate(err, "") 134 } 135 return nil 136} 137 138func (s *Server) Wait() { 139 s.exitDone.Wait() 140} 141 142// handleResolveDID handles the GET /{did} endpoint. 143func (s *Server) handleResolveDID(w http.ResponseWriter, r *http.Request, did string) { 144 ctx := context.Background() 145 doc, err := s.plc.Resolve(ctx, plc.CommittedTreeVersion, did) 146 if handlePLCError(w, err, did) { 147 return 148 } 149 150 wrapper := struct { 151 Context []string `json:"@context"` 152 didplc.Doc 153 }{ 154 Context: []string{ 155 "https://www.w3.org/ns/did/v1", 156 "https://w3id.org/security/multikey/v1", 157 }, 158 Doc: doc, 159 } 160 161 for _, method := range doc.VerificationMethod { 162 pub, err := atcrypto.ParsePublicMultibase(method.PublicKeyMultibase) 163 if err != nil { 164 // not the time to be doubting what the PLC implementation is giving us, just ignore 165 continue 166 } 167 jwk, err := pub.JWK() 168 if err != nil { 169 // not the time to be doubting what the PLC implementation is giving us, just ignore 170 continue 171 } 172 addToContext := "" 173 switch jwk.Curve { 174 case "P-256": 175 addToContext = "https://w3id.org/security/suites/ecdsa-2019/v1" 176 case "secp256k1": 177 addToContext = "https://w3id.org/security/suites/secp256k1-2019/v1" 178 default: 179 continue 180 } 181 if !slices.Contains(wrapper.Context, addToContext) { 182 wrapper.Context = append(wrapper.Context, addToContext) 183 } 184 } 185 186 w.Header().Set("Content-Type", "application/did+ld+json") 187 json.NewEncoder(w).Encode(wrapper) 188} 189 190// handleCreatePLC handles the POST /{did} endpoint. 191func (s *Server) handleCreatePLC(w http.ResponseWriter, r *http.Request, did string) { 192 var op didplc.OpEnum 193 if err := json.NewDecoder(r.Body).Decode(&op); err != nil { 194 sendErrorResponse(w, http.StatusBadRequest, "Invalid operation") 195 return 196 } 197 198 if s.node == nil { 199 // Validate only 200 // Marshal the operation to JSON bytes for validation 201 opBytes, err := json.Marshal(op) 202 if err != nil { 203 sendErrorResponse(w, http.StatusBadRequest, "Invalid operation") 204 return 205 } 206 207 if err := s.plc.ValidateOperation(r.Context(), plc.CommittedTreeVersion, time.Now(), did, opBytes); err != nil { 208 sendErrorResponse(w, http.StatusBadRequest, "Invalid operation") 209 return 210 } 211 w.WriteHeader(http.StatusOK) 212 return 213 } 214 215 uuid, err := uuid.NewRandom() 216 if handlePLCError(w, err, "") { 217 return 218 } 219 220 tx := abciapp.Transaction[abciapp.CreatePlcOpArguments]{ 221 Action: abciapp.TransactionActionCreatePlcOp, 222 Arguments: abciapp.CreatePlcOpArguments{ 223 DID: did, 224 Operation: &op, 225 }, 226 } 227 228 txBytes, err := cbornode.DumpObject(tx) 229 if handlePLCError(w, err, "") { 230 return 231 } 232 233 // broadcastTxCommit will wait for inclusion, up until the TimeoutBroadcastTxCommit configured for the node, or until the context deadline expires 234 // in practice we expect operations to be included in about one second 235 result, err := broadcastTxCommit(r.Context(), s.node, uuid.String(), txBytes) 236 // TODO more robust error handling 237 if handlePLCError(w, err, "") { 238 return 239 } 240 241 if result.CheckTx.Code != 0 { 242 sendErrorResponse(w, http.StatusBadRequest, "Invalid operation") 243 return 244 } 245 246 if result.TxResult.Code != 0 { 247 sendErrorResponse(w, http.StatusBadRequest, "Invalid operation") 248 return 249 } 250 251 w.WriteHeader(http.StatusOK) 252} 253 254// handleGetPLCLog handles the GET /{did}/log endpoint. 255func (s *Server) handleGetPLCLog(w http.ResponseWriter, r *http.Request, did string) { 256 ops, err := s.plc.OperationLog(r.Context(), plc.CommittedTreeVersion, did) 257 if handlePLCError(w, err, did) { 258 return 259 } 260 261 w.Header().Set("Content-Type", "application/json") 262 json.NewEncoder(w).Encode(ops) 263} 264 265// handleGetPLCAuditLog handles the GET /{did}/log/audit endpoint. 266func (s *Server) handleGetPLCAuditLog(w http.ResponseWriter, r *http.Request, did string) { 267 entries, err := s.plc.AuditLog(r.Context(), plc.CommittedTreeVersion, did) 268 if handlePLCError(w, err, did) { 269 return 270 } 271 272 w.Header().Set("Content-Type", "application/json") 273 json.NewEncoder(w).Encode(entries) 274} 275 276// handleGetLastOp handles the GET /{did}/log/last endpoint. 277func (s *Server) handleGetLastOp(w http.ResponseWriter, r *http.Request, did string) { 278 op, err := s.plc.LastOperation(r.Context(), plc.CommittedTreeVersion, did) 279 if handlePLCError(w, err, did) { 280 return 281 } 282 283 w.Header().Set("Content-Type", "application/json") 284 json.NewEncoder(w).Encode(op) 285} 286 287// handleGetPLCData handles the GET /{did}/data endpoint. 288func (s *Server) handleGetPLCData(w http.ResponseWriter, r *http.Request, did string) { 289 data, err := s.plc.Data(r.Context(), plc.CommittedTreeVersion, did) 290 if handlePLCError(w, err, did) { 291 return 292 } 293 294 resp := struct { 295 DID string `json:"did"` 296 RotationKeys []string `json:"rotationKeys"` 297 VerificationMethods map[string]string `json:"verificationMethods"` 298 AlsoKnownAs []string `json:"alsoKnownAs"` 299 Services map[string]didplc.OpService `json:"services"` 300 }{ 301 DID: did, 302 RotationKeys: data.RotationKeys, 303 VerificationMethods: data.VerificationMethods, 304 AlsoKnownAs: data.AlsoKnownAs, 305 Services: data.Services, 306 } 307 308 w.Header().Set("Content-Type", "application/json") 309 json.NewEncoder(w).Encode(resp) 310} 311 312// handleExport handles the GET /export endpoint. 313func (s *Server) handleExport(w http.ResponseWriter, r *http.Request) { 314 query := r.URL.Query() 315 count := 1000 // The OpenAPI spec says the default is 10, but in practice the official server defaults to 1000 316 if c := query.Get("count"); c != "" { 317 if _, err := fmt.Sscanf(c, "%d", &count); err != nil { 318 sendErrorResponse(w, http.StatusBadRequest, "Invalid count parameter") 319 return 320 } 321 322 if count > 1000 { 323 sendErrorResponse(w, http.StatusBadRequest, "Invalid count parameter") 324 return 325 } 326 } 327 328 afterStr := query.Get("after") 329 after := uint64(0) 330 var err error 331 if afterStr != "" { 332 after, err = strconv.ParseUint(afterStr, 10, 64) 333 if err != nil { 334 sendErrorResponse(w, http.StatusBadRequest, "Only sequence-based pagination is supported") 335 return 336 } 337 } 338 339 entries, err := s.plc.Export(r.Context(), plc.CommittedTreeVersion, after, count) 340 if handlePLCError(w, err, "") { 341 return 342 } 343 344 w.Header().Set("Content-Type", "application/jsonlines") 345 346 type jsonEntry struct { 347 Seq uint64 `json:"seq"` 348 Type string `json:"type"` 349 *didplc.LogEntry 350 } 351 for _, entry := range entries { 352 json.NewEncoder(w).Encode(jsonEntry{ 353 Seq: entry.Seq, 354 Type: "sequenced_op", 355 LogEntry: lo.ToPtr(entry.ToDIDPLCLogEntry()), 356 }) 357 } 358} 359 360// handlePLCError handles errors from the PLC interface and sends the appropriate HTTP response. 361func handlePLCError(w http.ResponseWriter, err error, did string) bool { 362 if err == nil { 363 return false 364 } 365 switch { 366 case errors.Is(err, plc.ErrDIDNotFound): 367 sendErrorResponse(w, http.StatusNotFound, fmt.Sprintf("DID not registered: %s", did)) 368 case errors.Is(err, plc.ErrDIDGone): 369 sendErrorResponse(w, http.StatusGone, fmt.Sprintf("DID not available: %s", did)) 370 default: 371 sendErrorResponse(w, http.StatusInternalServerError, "Internal server error") 372 } 373 return true 374} 375 376// sendErrorResponse sends an error response with the specified status code and message. 377func sendErrorResponse(w http.ResponseWriter, statusCode int, message string) { 378 w.Header().Set("Content-Type", "application/json") 379 w.WriteHeader(statusCode) 380 json.NewEncoder(w).Encode(map[string]string{"message": message}) 381}