A very experimental PLC implementation which uses BFT consensus for decentralization
1package main
2
3import (
4 "flag"
5 "fmt"
6 "log"
7 "os"
8 "os/signal"
9 "path/filepath"
10 "sync"
11 "syscall"
12 "time"
13
14 "github.com/cometbft/cometbft/p2p"
15 "github.com/cometbft/cometbft/privval"
16 "github.com/cometbft/cometbft/proxy"
17 "github.com/samber/lo"
18 "tangled.org/gbl08ma.com/didplcbft/abciapp"
19 "tangled.org/gbl08ma.com/didplcbft/httpapi"
20
21 bftconfig "github.com/cometbft/cometbft/config"
22 cmtflags "github.com/cometbft/cometbft/libs/cli/flags"
23 cmtlog "github.com/cometbft/cometbft/libs/log"
24 nm "github.com/cometbft/cometbft/node"
25 "github.com/dgraph-io/badger/v4"
26 "github.com/dgraph-io/badger/v4/options"
27 "github.com/spf13/viper"
28)
29
30var homeDir string
31
32func init() {
33 flag.StringVar(&homeDir, "data-dir", "", "Path to the CometBFT config directory (if empty, uses ./didplcbft-data)")
34}
35
36func main() {
37 flag.Parse()
38 if homeDir == "" {
39 homeDir = filepath.Join(lo.Must(os.Getwd()), "didplcbft-data")
40 }
41
42 config := DefaultConfig()
43 config.SetRoot(homeDir)
44 viper.SetConfigFile(fmt.Sprintf("%s/%s", homeDir, "config/config.toml"))
45
46 if err := viper.ReadInConfig(); err != nil {
47 log.Fatalf("Reading config: %v", err)
48 }
49 if err := viper.Unmarshal(config); err != nil {
50 log.Fatalf("Decoding config: %v", err)
51 }
52 if err := config.ValidateBasic(); err != nil {
53 log.Fatalf("Invalid configuration data: %v", err)
54 }
55 badgerDBPath := filepath.Join(homeDir, "badger")
56 badgerDB, err := badger.Open(badger.
57 DefaultOptions(badgerDBPath).
58 WithBlockCacheSize(2 << 30).
59 WithBlockSize(8 * 1024).
60 WithNumMemtables(3).
61 WithNumLevelZeroTables(3).
62 WithCompression(options.Snappy))
63 if err != nil {
64 log.Fatalf("Opening badger database: %v", err)
65 }
66
67 for err == nil {
68 err = badgerDB.RunValueLogGC(0.5)
69 }
70
71 /*err = badgerDB.Flatten(4)
72 if err != nil {
73 log.Fatalf("Flattening badger database: %v", err)
74 }*/
75
76 var wg sync.WaitGroup
77 closeGoroutinesCh := make(chan struct{})
78 wg.Go(func() {
79 ticker := time.NewTicker(5 * time.Minute)
80 defer ticker.Stop()
81 for {
82 select {
83 case <-ticker.C:
84 var err error
85 for err == nil {
86 err = badgerDB.RunValueLogGC(0.5)
87 }
88 case <-closeGoroutinesCh:
89 return
90 }
91 }
92 })
93
94 defer func() {
95 if err := badgerDB.Close(); err != nil {
96 log.Printf("Closing badger database: %v", err)
97 }
98 }()
99
100 app, plc, cleanup, err := abciapp.NewDIDPLCApplication(badgerDB, filepath.Join(homeDir, "snapshots"))
101 if err != nil {
102 log.Fatalf("failed to create DIDPLC application: %v", err)
103 }
104 defer cleanup()
105
106 pv := privval.LoadFilePV(
107 config.PrivValidatorKeyFile(),
108 config.PrivValidatorStateFile(),
109 )
110
111 nodeKey, err := p2p.LoadNodeKey(config.NodeKeyFile())
112 if err != nil {
113 log.Fatalf("failed to load node's key: %v", err)
114 }
115
116 logger := cmtlog.NewTMLogger(cmtlog.NewSyncWriter(os.Stdout))
117 logger, err = cmtflags.ParseLogLevel(config.LogLevel, logger, bftconfig.DefaultLogLevel)
118
119 if err != nil {
120 log.Fatalf("failed to parse log level: %v", err)
121 }
122
123 node, err := nm.NewNode(
124 config.Config,
125 pv,
126 nodeKey,
127 proxy.NewLocalClientCreator(app),
128 nm.DefaultGenesisDocProviderFunc(config.Config),
129 bftconfig.DefaultDBProvider,
130 nm.DefaultMetricsProvider(config.Config.Instrumentation),
131 logger,
132 )
133
134 if err != nil {
135 log.Fatalf("Creating node: %v", err)
136 }
137
138 err = node.Start()
139 if err != nil {
140 log.Fatalf("Starting node: %v", err)
141 }
142 defer func() {
143 node.Stop()
144 node.Wait()
145 }()
146
147 if config.PLC.ListenAddress != "" {
148 plcAPIServer, err := httpapi.NewServer(plc, node, config.PLC.ListenAddress, 30*time.Second)
149 if err != nil {
150 log.Fatalf("Creating PLC API server: %v", err)
151 }
152
153 err = plcAPIServer.Start()
154 if err != nil {
155 log.Fatalf("Starting PLC API server: %v", err)
156 }
157 defer func() {
158 plcAPIServer.Stop()
159 plcAPIServer.Wait()
160 }()
161 }
162
163 c := make(chan os.Signal, 1)
164 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
165 <-c
166 close(closeGoroutinesCh)
167 wg.Wait()
168}