package httpapi import ( "context" "encoding/json" "errors" "fmt" "log" "net" "net/http" "net/http/pprof" "slices" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/bluesky-social/indigo/atproto/atcrypto" "github.com/cometbft/cometbft/node" "github.com/did-method-plc/go-didplc" "github.com/google/uuid" cbornode "github.com/ipfs/go-ipld-cbor" "github.com/palantir/stacktrace" "github.com/rs/cors" "github.com/samber/lo" "tangled.org/gbl08ma.com/didplcbft/abciapp" "tangled.org/gbl08ma.com/didplcbft/plc" "tangled.org/gbl08ma.com/didplcbft/transaction" ) // Server represents the HTTP server for the PLC directory. type Server struct { txFactory *transaction.Factory plc plc.ReadPLC router *http.ServeMux node *node.Node srv http.Server handlerTimeout time.Duration proto string addr string started atomic.Bool exitDone sync.WaitGroup } // NewServer creates a new instance of the Server. func NewServer(txFactory *transaction.Factory, plc plc.ReadPLC, node *node.Node, listenAddr string, handlerTimeout time.Duration) (*Server, error) { s := &Server{ txFactory: txFactory, plc: plc, router: http.NewServeMux(), node: node, srv: http.Server{Addr: listenAddr}, handlerTimeout: handlerTimeout, } s.setupRoutes() handler := cors.Default().Handler(s.router) timeoutMsg, _ := json.Marshal(map[string]string{"message": "Internal server timeout"}) handler = http.TimeoutHandler(handler, s.handlerTimeout, string(timeoutMsg)) s.srv.Handler = handler parts := strings.SplitN(listenAddr, "://", 2) if len(parts) != 2 { return nil, stacktrace.NewError( "invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", listenAddr, ) } s.proto, s.addr = parts[0], parts[1] return s, nil } // setupRoutes configures the routes for the server. func (s *Server) setupRoutes() { s.router.HandleFunc("GET /{did}", s.makeDIDHandler(s.handleResolveDID)) s.router.HandleFunc("POST /{did}", s.makeDIDHandler(s.handleCreatePLC)) s.router.HandleFunc("GET /{did}/log", s.makeDIDHandler(s.handleGetPLCLog)) s.router.HandleFunc("GET /{did}/log/audit", s.makeDIDHandler(s.handleGetPLCAuditLog)) s.router.HandleFunc("GET /{did}/log/last", s.makeDIDHandler(s.handleGetLastOp)) s.router.HandleFunc("GET /{did}/data", s.makeDIDHandler(s.handleGetPLCData)) s.router.HandleFunc("GET /export", s.handleExport) s.router.HandleFunc("/debug/pprof/", pprof.Index) s.router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) s.router.HandleFunc("/debug/pprof/profile", pprof.Profile) s.router.HandleFunc("/debug/pprof/symbol", pprof.Symbol) s.router.HandleFunc("/debug/pprof/trace", pprof.Trace) // Register handlers for specific profiles s.router.Handle("/debug/pprof/heap", pprof.Handler("heap")) s.router.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine")) s.router.Handle("/debug/pprof/block", pprof.Handler("block")) s.router.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate")) } // makeDIDHandler creates a wrapper handler that extracts DID from URL path func (s *Server) makeDIDHandler(handler func(http.ResponseWriter, *http.Request, string)) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { handler(w, r, r.PathValue("did")) } } func (s *Server) Start() error { if !s.started.CompareAndSwap(false, true) { return stacktrace.NewError("server already started") } s.exitDone.Add(1) listener, err := net.Listen(s.proto, s.addr) if err != nil { return stacktrace.Propagate(err, "failed to listen on %v", s.addr) } go func() { defer s.exitDone.Done() if err := s.srv.Serve(listener); !errors.Is(err, http.ErrServerClosed) { log.Fatalf("ListenAndServe(): %v", err) } }() return nil } func (s *Server) Stop() error { ctx, cancel := context.WithTimeout(context.Background(), s.handlerTimeout) defer cancel() if err := s.srv.Shutdown(ctx); err != nil { return stacktrace.Propagate(err, "") } return nil } func (s *Server) Wait() { s.exitDone.Wait() } // handleResolveDID handles the GET /{did} endpoint. func (s *Server) handleResolveDID(w http.ResponseWriter, r *http.Request, did string) { ctx := context.Background() doc, err := s.plc.Resolve(ctx, s.txFactory.ReadCommitted(), did) if handlePLCError(w, err, did) { return } wrapper := struct { Context []string `json:"@context"` didplc.Doc }{ Context: []string{ "https://www.w3.org/ns/did/v1", "https://w3id.org/security/multikey/v1", }, Doc: doc, } for _, method := range doc.VerificationMethod { pub, err := atcrypto.ParsePublicMultibase(method.PublicKeyMultibase) if err != nil { // not the time to be doubting what the PLC implementation is giving us, just ignore continue } jwk, err := pub.JWK() if err != nil { // not the time to be doubting what the PLC implementation is giving us, just ignore continue } addToContext := "" switch jwk.Curve { case "P-256": addToContext = "https://w3id.org/security/suites/ecdsa-2019/v1" case "secp256k1": addToContext = "https://w3id.org/security/suites/secp256k1-2019/v1" default: continue } if !slices.Contains(wrapper.Context, addToContext) { wrapper.Context = append(wrapper.Context, addToContext) } } w.Header().Set("Content-Type", "application/did+ld+json") json.NewEncoder(w).Encode(wrapper) } // handleCreatePLC handles the POST /{did} endpoint. func (s *Server) handleCreatePLC(w http.ResponseWriter, r *http.Request, did string) { var op didplc.OpEnum if err := json.NewDecoder(r.Body).Decode(&op); err != nil { sendErrorResponse(w, http.StatusBadRequest, "Invalid operation") return } if s.node == nil { // Validate only // Marshal the operation to JSON bytes for validation opBytes, err := json.Marshal(op) if err != nil { sendErrorResponse(w, http.StatusBadRequest, "Invalid operation") return } if err := s.plc.ValidateOperation(r.Context(), s.txFactory.ReadCommitted(), did, opBytes); err != nil { sendErrorResponse(w, http.StatusBadRequest, "Invalid operation") return } w.WriteHeader(http.StatusOK) return } uuid, err := uuid.NewRandom() if handlePLCError(w, err, "") { return } tx := abciapp.Transaction[abciapp.CreatePlcOpArguments]{ Action: abciapp.TransactionActionCreatePlcOp, Arguments: abciapp.CreatePlcOpArguments{ DID: did, Operation: &op, }, } txBytes, err := cbornode.DumpObject(tx) if handlePLCError(w, err, "") { return } // broadcastTxCommit will wait for inclusion, up until the TimeoutBroadcastTxCommit configured for the node, or until the context deadline expires // in practice we expect operations to be included in about one second result, err := broadcastTxCommit(r.Context(), s.node, uuid.String(), txBytes) // TODO more robust error handling if handlePLCError(w, err, "") { return } if result.CheckTx.Code != 0 { sendErrorResponse(w, http.StatusBadRequest, "Invalid operation") return } if result.TxResult.Code != 0 { sendErrorResponse(w, http.StatusBadRequest, "Invalid operation") return } w.WriteHeader(http.StatusOK) } // handleGetPLCLog handles the GET /{did}/log endpoint. func (s *Server) handleGetPLCLog(w http.ResponseWriter, r *http.Request, did string) { ops, err := s.plc.OperationLog(r.Context(), s.txFactory.ReadCommitted(), did) if handlePLCError(w, err, did) { return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(ops) } // handleGetPLCAuditLog handles the GET /{did}/log/audit endpoint. func (s *Server) handleGetPLCAuditLog(w http.ResponseWriter, r *http.Request, did string) { entries, err := s.plc.AuditLog(r.Context(), s.txFactory.ReadCommitted(), did) if handlePLCError(w, err, did) { return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(entries) } // handleGetLastOp handles the GET /{did}/log/last endpoint. func (s *Server) handleGetLastOp(w http.ResponseWriter, r *http.Request, did string) { op, err := s.plc.LastOperation(r.Context(), s.txFactory.ReadCommitted(), did) if handlePLCError(w, err, did) { return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(op) } // handleGetPLCData handles the GET /{did}/data endpoint. func (s *Server) handleGetPLCData(w http.ResponseWriter, r *http.Request, did string) { data, err := s.plc.Data(r.Context(), s.txFactory.ReadCommitted(), did) if handlePLCError(w, err, did) { return } resp := struct { DID string `json:"did"` RotationKeys []string `json:"rotationKeys"` VerificationMethods map[string]string `json:"verificationMethods"` AlsoKnownAs []string `json:"alsoKnownAs"` Services map[string]didplc.OpService `json:"services"` }{ DID: did, RotationKeys: data.RotationKeys, VerificationMethods: data.VerificationMethods, AlsoKnownAs: data.AlsoKnownAs, Services: data.Services, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(resp) } // handleExport handles the GET /export endpoint. func (s *Server) handleExport(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() count := 1000 // The OpenAPI spec says the default is 10, but in practice the official server defaults to 1000 if c := query.Get("count"); c != "" { if _, err := fmt.Sscanf(c, "%d", &count); err != nil { sendErrorResponse(w, http.StatusBadRequest, "Invalid count parameter") return } if count > 1000 { sendErrorResponse(w, http.StatusBadRequest, "Invalid count parameter") return } } afterStr := query.Get("after") after := uint64(0) var err error if afterStr != "" { after, err = strconv.ParseUint(afterStr, 10, 64) if err != nil { sendErrorResponse(w, http.StatusBadRequest, "Only sequence-based pagination is supported") return } } entries, err := s.plc.Export(r.Context(), s.txFactory.ReadCommitted(), after, count) if handlePLCError(w, err, "") { return } w.Header().Set("Content-Type", "application/jsonlines") type jsonEntry struct { Seq uint64 `json:"seq"` Type string `json:"type"` *didplc.LogEntry } for _, entry := range entries { json.NewEncoder(w).Encode(jsonEntry{ Seq: entry.Seq, Type: "sequenced_op", LogEntry: lo.ToPtr(entry.ToDIDPLCLogEntry()), }) } } // handlePLCError handles errors from the PLC interface and sends the appropriate HTTP response. func handlePLCError(w http.ResponseWriter, err error, did string) bool { if err == nil { return false } switch { case errors.Is(err, plc.ErrDIDNotFound): sendErrorResponse(w, http.StatusNotFound, fmt.Sprintf("DID not registered: %s", did)) case errors.Is(err, plc.ErrDIDGone): sendErrorResponse(w, http.StatusGone, fmt.Sprintf("DID not available: %s", did)) default: sendErrorResponse(w, http.StatusInternalServerError, "Internal server error") } return true } // sendErrorResponse sends an error response with the specified status code and message. func sendErrorResponse(w http.ResponseWriter, statusCode int, message string) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) json.NewEncoder(w).Encode(map[string]string{"message": message}) }