A very experimental PLC implementation which uses BFT consensus for decentralization
24
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 96 lines 2.9 kB view raw
1package main 2 3import ( 4 "context" 5 6 abci "github.com/cometbft/cometbft/abci/types" 7 mempl "github.com/cometbft/cometbft/mempool" 8 "github.com/cometbft/cometbft/node" 9 "github.com/cometbft/cometbft/rpc/core" 10 coretypes "github.com/cometbft/cometbft/rpc/core/types" 11 cmttypes "github.com/cometbft/cometbft/types" 12 "github.com/gbl08ma/stacktrace" 13 "github.com/google/uuid" 14 "tangled.org/gbl08ma.com/didplcbft/types" 15) 16 17type txSubmitter struct { 18 node *node.Node 19} 20 21var _ types.MempoolSubmitter = (*txSubmitter)(nil) 22 23// BroadcastTx implements [types.MempoolSubmitter]. 24func (t *txSubmitter) BroadcastTx(ctx context.Context, tx cmttypes.Tx, waitForInclusion bool) (*coretypes.ResultBroadcastTxCommit, error) { 25 uuid, err := uuid.NewRandom() 26 if err != nil { 27 return nil, stacktrace.Propagate(err) 28 } 29 subscriber := uuid.String() 30 eventBus := t.node.EventBus() 31 mempool := t.node.Mempool() 32 // Subscribe to tx being committed in block. 33 34 var txSub cmttypes.Subscription 35 if waitForInclusion { 36 subCtx, cancel := context.WithTimeout(ctx, core.SubscribeTimeout) 37 defer cancel() 38 q := cmttypes.EventQueryTxFor(tx) 39 txSub, err = eventBus.Subscribe(subCtx, subscriber, q) 40 if err != nil { 41 return nil, stacktrace.Propagate(err, "failed to subscribe to tx") 42 } 43 defer func() { 44 err := eventBus.Unsubscribe(context.Background(), subscriber, q) 45 _ = err 46 }() 47 } 48 49 // Broadcast tx and wait for CheckTx result 50 checkTxResCh := make(chan *abci.ResponseCheckTx, 1) 51 err = mempool.CheckTx(tx, func(res *abci.ResponseCheckTx) { 52 select { 53 case <-ctx.Done(): 54 case checkTxResCh <- res: 55 } 56 }, mempl.TxInfo{}) 57 if err != nil { 58 return nil, stacktrace.Propagate(err, "error on broadcastTxCommit") 59 } 60 select { 61 case <-ctx.Done(): 62 return nil, stacktrace.Propagate(ctx.Err(), "broadcast confirmation not received") 63 case checkTxRes := <-checkTxResCh: 64 if !waitForInclusion || checkTxRes.Code != abci.CodeTypeOK { 65 return &coretypes.ResultBroadcastTxCommit{ 66 CheckTx: *checkTxRes, 67 TxResult: abci.ExecTxResult{}, 68 Hash: tx.Hash(), 69 }, nil 70 } 71 72 // Wait for the tx to be included in a block or timeout. 73 select { 74 case <-ctx.Done(): 75 return nil, stacktrace.Propagate(ctx.Err(), "inclusion confirmation not received") 76 case msg := <-txSub.Out(): // The tx was included in a block. 77 txResultEvent := msg.Data().(cmttypes.EventDataTx) 78 return &coretypes.ResultBroadcastTxCommit{ 79 CheckTx: *checkTxRes, 80 TxResult: txResultEvent.Result, 81 Hash: tx.Hash(), 82 Height: txResultEvent.Height, 83 }, nil 84 case <-txSub.Canceled(): 85 err := txSub.Err() 86 if err == nil { 87 err = stacktrace.NewError("CometBFT exited") 88 } 89 return &coretypes.ResultBroadcastTxCommit{ 90 CheckTx: *checkTxRes, 91 TxResult: abci.ExecTxResult{}, 92 Hash: tx.Hash(), 93 }, stacktrace.Propagate(err, "txSub was canceled") 94 } 95 } 96}