A very experimental PLC implementation which uses BFT consensus for decentralization
1package httpapi
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "log"
9 "net"
10 "net/http"
11 "net/http/pprof"
12 "slices"
13 "strconv"
14 "strings"
15 "sync"
16 "sync/atomic"
17 "time"
18
19 "github.com/bluesky-social/indigo/atproto/atcrypto"
20 "github.com/cometbft/cometbft/node"
21 "github.com/did-method-plc/go-didplc"
22 "github.com/google/uuid"
23 cbornode "github.com/ipfs/go-ipld-cbor"
24 "github.com/palantir/stacktrace"
25 "github.com/rs/cors"
26 "github.com/samber/lo"
27
28 "tangled.org/gbl08ma.com/didplcbft/abciapp"
29 "tangled.org/gbl08ma.com/didplcbft/plc"
30)
31
32// Server represents the HTTP server for the PLC directory.
33type Server struct {
34 plc plc.ReadPLC
35 router *http.ServeMux
36 node *node.Node
37 srv http.Server
38 handlerTimeout time.Duration
39 proto string
40 addr string
41
42 started atomic.Bool
43 exitDone sync.WaitGroup
44}
45
46// NewServer creates a new instance of the Server.
47func NewServer(plc plc.ReadPLC, node *node.Node, listenAddr string, handlerTimeout time.Duration) (*Server, error) {
48 s := &Server{
49 plc: plc,
50 router: http.NewServeMux(),
51 node: node,
52 srv: http.Server{Addr: listenAddr},
53 handlerTimeout: handlerTimeout,
54 }
55 s.setupRoutes()
56
57 handler := cors.Default().Handler(s.router)
58
59 timeoutMsg, _ := json.Marshal(map[string]string{"message": "Internal server timeout"})
60
61 handler = http.TimeoutHandler(handler, s.handlerTimeout, string(timeoutMsg))
62
63 s.srv.Handler = handler
64
65 parts := strings.SplitN(listenAddr, "://", 2)
66 if len(parts) != 2 {
67 return nil, stacktrace.NewError(
68 "invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)",
69 listenAddr,
70 )
71 }
72 s.proto, s.addr = parts[0], parts[1]
73
74 return s, nil
75}
76
77// setupRoutes configures the routes for the server.
78func (s *Server) setupRoutes() {
79 s.router.HandleFunc("GET /{did}", s.makeDIDHandler(s.handleResolveDID))
80 s.router.HandleFunc("POST /{did}", s.makeDIDHandler(s.handleCreatePLC))
81 s.router.HandleFunc("GET /{did}/log", s.makeDIDHandler(s.handleGetPLCLog))
82 s.router.HandleFunc("GET /{did}/log/audit", s.makeDIDHandler(s.handleGetPLCAuditLog))
83 s.router.HandleFunc("GET /{did}/log/last", s.makeDIDHandler(s.handleGetLastOp))
84 s.router.HandleFunc("GET /{did}/data", s.makeDIDHandler(s.handleGetPLCData))
85 s.router.HandleFunc("GET /export", s.handleExport)
86
87 s.router.HandleFunc("/debug/pprof/", pprof.Index)
88 s.router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
89 s.router.HandleFunc("/debug/pprof/profile", pprof.Profile)
90 s.router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
91 s.router.HandleFunc("/debug/pprof/trace", pprof.Trace)
92
93 // Register handlers for specific profiles
94 s.router.Handle("/debug/pprof/heap", pprof.Handler("heap"))
95 s.router.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))
96 s.router.Handle("/debug/pprof/block", pprof.Handler("block"))
97 s.router.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
98}
99
100// makeDIDHandler creates a wrapper handler that extracts DID from URL path
101func (s *Server) makeDIDHandler(handler func(http.ResponseWriter, *http.Request, string)) http.HandlerFunc {
102 return func(w http.ResponseWriter, r *http.Request) {
103 handler(w, r, r.PathValue("did"))
104 }
105}
106
107func (s *Server) Start() error {
108 if !s.started.CompareAndSwap(false, true) {
109 return stacktrace.NewError("server already started")
110 }
111
112 s.exitDone.Add(1)
113
114 listener, err := net.Listen(s.proto, s.addr)
115 if err != nil {
116 return stacktrace.Propagate(err, "failed to listen on %v", s.addr)
117 }
118
119 go func() {
120 defer s.exitDone.Done()
121
122 if err := s.srv.Serve(listener); !errors.Is(err, http.ErrServerClosed) {
123 log.Fatalf("ListenAndServe(): %v", err)
124 }
125 }()
126 return nil
127}
128
129func (s *Server) Stop() error {
130 ctx, cancel := context.WithTimeout(context.Background(), s.handlerTimeout)
131 defer cancel()
132 if err := s.srv.Shutdown(ctx); err != nil {
133 return stacktrace.Propagate(err, "")
134 }
135 return nil
136}
137
138func (s *Server) Wait() {
139 s.exitDone.Wait()
140}
141
142// handleResolveDID handles the GET /{did} endpoint.
143func (s *Server) handleResolveDID(w http.ResponseWriter, r *http.Request, did string) {
144 ctx := context.Background()
145 doc, err := s.plc.Resolve(ctx, plc.CommittedTreeVersion, did)
146 if handlePLCError(w, err, did) {
147 return
148 }
149
150 wrapper := struct {
151 Context []string `json:"@context"`
152 didplc.Doc
153 }{
154 Context: []string{
155 "https://www.w3.org/ns/did/v1",
156 "https://w3id.org/security/multikey/v1",
157 },
158 Doc: doc,
159 }
160
161 for _, method := range doc.VerificationMethod {
162 pub, err := atcrypto.ParsePublicMultibase(method.PublicKeyMultibase)
163 if err != nil {
164 // not the time to be doubting what the PLC implementation is giving us, just ignore
165 continue
166 }
167 jwk, err := pub.JWK()
168 if err != nil {
169 // not the time to be doubting what the PLC implementation is giving us, just ignore
170 continue
171 }
172 addToContext := ""
173 switch jwk.Curve {
174 case "P-256":
175 addToContext = "https://w3id.org/security/suites/ecdsa-2019/v1"
176 case "secp256k1":
177 addToContext = "https://w3id.org/security/suites/secp256k1-2019/v1"
178 default:
179 continue
180 }
181 if !slices.Contains(wrapper.Context, addToContext) {
182 wrapper.Context = append(wrapper.Context, addToContext)
183 }
184 }
185
186 w.Header().Set("Content-Type", "application/did+ld+json")
187 json.NewEncoder(w).Encode(wrapper)
188}
189
190// handleCreatePLC handles the POST /{did} endpoint.
191func (s *Server) handleCreatePLC(w http.ResponseWriter, r *http.Request, did string) {
192 var op didplc.OpEnum
193 if err := json.NewDecoder(r.Body).Decode(&op); err != nil {
194 sendErrorResponse(w, http.StatusBadRequest, "Invalid operation")
195 return
196 }
197
198 if s.node == nil {
199 // Validate only
200 // Marshal the operation to JSON bytes for validation
201 opBytes, err := json.Marshal(op)
202 if err != nil {
203 sendErrorResponse(w, http.StatusBadRequest, "Invalid operation")
204 return
205 }
206
207 if err := s.plc.ValidateOperation(r.Context(), plc.CommittedTreeVersion, time.Now(), did, opBytes); err != nil {
208 sendErrorResponse(w, http.StatusBadRequest, "Invalid operation")
209 return
210 }
211 w.WriteHeader(http.StatusOK)
212 return
213 }
214
215 uuid, err := uuid.NewRandom()
216 if handlePLCError(w, err, "") {
217 return
218 }
219
220 tx := abciapp.Transaction[abciapp.CreatePlcOpArguments]{
221 Action: abciapp.TransactionActionCreatePlcOp,
222 Arguments: abciapp.CreatePlcOpArguments{
223 DID: did,
224 Operation: &op,
225 },
226 }
227
228 txBytes, err := cbornode.DumpObject(tx)
229 if handlePLCError(w, err, "") {
230 return
231 }
232
233 // broadcastTxCommit will wait for inclusion, up until the TimeoutBroadcastTxCommit configured for the node, or until the context deadline expires
234 // in practice we expect operations to be included in about one second
235 result, err := broadcastTxCommit(r.Context(), s.node, uuid.String(), txBytes)
236 // TODO more robust error handling
237 if handlePLCError(w, err, "") {
238 return
239 }
240
241 if result.CheckTx.Code != 0 {
242 sendErrorResponse(w, http.StatusBadRequest, "Invalid operation")
243 return
244 }
245
246 if result.TxResult.Code != 0 {
247 sendErrorResponse(w, http.StatusBadRequest, "Invalid operation")
248 return
249 }
250
251 w.WriteHeader(http.StatusOK)
252}
253
254// handleGetPLCLog handles the GET /{did}/log endpoint.
255func (s *Server) handleGetPLCLog(w http.ResponseWriter, r *http.Request, did string) {
256 ops, err := s.plc.OperationLog(r.Context(), plc.CommittedTreeVersion, did)
257 if handlePLCError(w, err, did) {
258 return
259 }
260
261 w.Header().Set("Content-Type", "application/json")
262 json.NewEncoder(w).Encode(ops)
263}
264
265// handleGetPLCAuditLog handles the GET /{did}/log/audit endpoint.
266func (s *Server) handleGetPLCAuditLog(w http.ResponseWriter, r *http.Request, did string) {
267 entries, err := s.plc.AuditLog(r.Context(), plc.CommittedTreeVersion, did)
268 if handlePLCError(w, err, did) {
269 return
270 }
271
272 w.Header().Set("Content-Type", "application/json")
273 json.NewEncoder(w).Encode(entries)
274}
275
276// handleGetLastOp handles the GET /{did}/log/last endpoint.
277func (s *Server) handleGetLastOp(w http.ResponseWriter, r *http.Request, did string) {
278 op, err := s.plc.LastOperation(r.Context(), plc.CommittedTreeVersion, did)
279 if handlePLCError(w, err, did) {
280 return
281 }
282
283 w.Header().Set("Content-Type", "application/json")
284 json.NewEncoder(w).Encode(op)
285}
286
287// handleGetPLCData handles the GET /{did}/data endpoint.
288func (s *Server) handleGetPLCData(w http.ResponseWriter, r *http.Request, did string) {
289 data, err := s.plc.Data(r.Context(), plc.CommittedTreeVersion, did)
290 if handlePLCError(w, err, did) {
291 return
292 }
293
294 resp := struct {
295 DID string `json:"did"`
296 RotationKeys []string `json:"rotationKeys"`
297 VerificationMethods map[string]string `json:"verificationMethods"`
298 AlsoKnownAs []string `json:"alsoKnownAs"`
299 Services map[string]didplc.OpService `json:"services"`
300 }{
301 DID: did,
302 RotationKeys: data.RotationKeys,
303 VerificationMethods: data.VerificationMethods,
304 AlsoKnownAs: data.AlsoKnownAs,
305 Services: data.Services,
306 }
307
308 w.Header().Set("Content-Type", "application/json")
309 json.NewEncoder(w).Encode(resp)
310}
311
312// handleExport handles the GET /export endpoint.
313func (s *Server) handleExport(w http.ResponseWriter, r *http.Request) {
314 query := r.URL.Query()
315 count := 1000 // The OpenAPI spec says the default is 10, but in practice the official server defaults to 1000
316 if c := query.Get("count"); c != "" {
317 if _, err := fmt.Sscanf(c, "%d", &count); err != nil {
318 sendErrorResponse(w, http.StatusBadRequest, "Invalid count parameter")
319 return
320 }
321
322 if count > 1000 {
323 sendErrorResponse(w, http.StatusBadRequest, "Invalid count parameter")
324 return
325 }
326 }
327
328 afterStr := query.Get("after")
329 after := uint64(0)
330 var err error
331 if afterStr != "" {
332 after, err = strconv.ParseUint(afterStr, 10, 64)
333 if err != nil {
334 sendErrorResponse(w, http.StatusBadRequest, "Only sequence-based pagination is supported")
335 return
336 }
337 }
338
339 entries, err := s.plc.Export(r.Context(), plc.CommittedTreeVersion, after, count)
340 if handlePLCError(w, err, "") {
341 return
342 }
343
344 w.Header().Set("Content-Type", "application/jsonlines")
345
346 type jsonEntry struct {
347 Seq uint64 `json:"seq"`
348 Type string `json:"type"`
349 *didplc.LogEntry
350 }
351 for _, entry := range entries {
352 json.NewEncoder(w).Encode(jsonEntry{
353 Seq: entry.Seq,
354 Type: "sequenced_op",
355 LogEntry: lo.ToPtr(entry.ToDIDPLCLogEntry()),
356 })
357 }
358}
359
360// handlePLCError handles errors from the PLC interface and sends the appropriate HTTP response.
361func handlePLCError(w http.ResponseWriter, err error, did string) bool {
362 if err == nil {
363 return false
364 }
365 switch {
366 case errors.Is(err, plc.ErrDIDNotFound):
367 sendErrorResponse(w, http.StatusNotFound, fmt.Sprintf("DID not registered: %s", did))
368 case errors.Is(err, plc.ErrDIDGone):
369 sendErrorResponse(w, http.StatusGone, fmt.Sprintf("DID not available: %s", did))
370 default:
371 sendErrorResponse(w, http.StatusInternalServerError, "Internal server error")
372 }
373 return true
374}
375
376// sendErrorResponse sends an error response with the specified status code and message.
377func sendErrorResponse(w http.ResponseWriter, statusCode int, message string) {
378 w.Header().Set("Content-Type", "application/json")
379 w.WriteHeader(statusCode)
380 json.NewEncoder(w).Encode(map[string]string{"message": message})
381}