a recursive dns resolver
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request 'add clickhouse metrics' (#4) from feature/metrics into main

Reviewed-on: https://code.kiri.systems/kiri/alky/pulls/4

+1062 -155
+3
.env
··· 1 + GOOSE_DRIVER="clickhouse" 2 + GOOSE_MIGRATION_DIR="./migrations/" 3 + GOOSE_DBSTRING="clickhouse://default:clickhouse@localhost:9000/default?dial_timeout=10s&compress=true"
+21
Justfile
··· 1 + init: 2 + docker-compose up -d 3 + goose up 4 + 5 + status: 6 + goose status 7 + 8 + format: 9 + go fmt ./... 10 + fd .go . | xargs gofumpt -l -w 11 + go mod tidy 12 + 13 + version: 14 + #!/usr/bin/env sh 15 + VERSION=$(git describe --tags 2>/dev/null || git rev-parse --short HEAD) 16 + echo "$VERSION" 17 + 18 + build: format 19 + #!/usr/bin/env sh 20 + VERSION=$(just version) 21 + go build -ldflags "-X code.kiri.systems/kiri/alky/pkg/metrics.version=$VERSION" .
+25
docker-compose.yml
··· 1 + services: 2 + clickhouse: 3 + image: clickhouse/clickhouse-server:latest 4 + container_name: clickhouse 5 + network_mode: host 6 + environment: 7 + - CLICKHOUSE_DB=default 8 + - CLICKHOUSE_USER=default 9 + - CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 10 + - CLICKHOUSE_PASSWORD=clickhouse 11 + ports: 12 + - "8123:8123" 13 + - "9000:9000" 14 + - "9009:9009" 15 + ulimits: 16 + nofile: 17 + soft: 262144 18 + hard: 262144 19 + healthcheck: 20 + test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1 21 + interval: 30s 22 + timeout: 5s 23 + retries: 3 24 + start_period: 30s 25 + restart: unless-stopped
+21
docs/alky.toml
··· 33 33 # Type: Integer 34 34 expiration_time = 300 35 35 36 + [metrics] 37 + # ClickHouse connection string 38 + dsn = "clickhouse://default:clickhouse@localhost:9000/default" 39 + 40 + # Number of metricsto buffer before sending to ClickHouse 41 + batch_size = 1000 42 + 43 + # How often to flush metrics to ClickHouse 44 + flush_interval = "10s" 45 + 46 + # How long to retain metrics data 47 + # This uses time.ParseDuration semantics 48 + retention_period = "720h" 49 + 50 + [cache] 51 + # The maximum number of items to store in the cache. 52 + max_items = 5000 53 + 54 + # How often the cache will evict items. 55 + cleanup_interval = "5m" 56 + 36 57 [advanced] 37 58 # Timeout (in milliseconds) for outgoing queries before being cancelled. 38 59 query_timeout = 100
+20 -1
go.mod
··· 3 3 go 1.22.5 4 4 5 5 require ( 6 - code.kiri.systems/kiri/magna v0.0.0-20240922043826-2c2a1c508469 6 + code.kiri.systems/kiri/magna v0.0.0-20250211050847-abb0522bcd30 7 7 github.com/BurntSushi/toml v1.4.0 8 + github.com/ClickHouse/clickhouse-go/v2 v2.31.0 9 + ) 10 + 11 + require ( 12 + github.com/ClickHouse/ch-go v0.64.1 // indirect 13 + github.com/andybalholm/brotli v1.1.1 // indirect 14 + github.com/go-faster/city v1.0.1 // indirect 15 + github.com/go-faster/errors v0.7.1 // indirect 16 + github.com/google/uuid v1.6.0 // indirect 17 + github.com/klauspost/compress v1.17.11 // indirect 18 + github.com/paulmach/orb v0.11.1 // indirect 19 + github.com/pierrec/lz4/v4 v4.1.22 // indirect 20 + github.com/pkg/errors v0.9.1 // indirect 21 + github.com/segmentio/asm v1.2.0 // indirect 22 + github.com/shopspring/decimal v1.4.0 // indirect 23 + go.opentelemetry.io/otel v1.34.0 // indirect 24 + go.opentelemetry.io/otel/trace v1.34.0 // indirect 25 + golang.org/x/sys v0.30.0 // indirect 26 + gopkg.in/yaml.v3 v3.0.1 // indirect 8 27 )
+105 -6
go.sum
··· 1 - code.kiri.systems/kiri/magna v0.0.0-20240721214902-8d0a079dbd84 h1:igzBX4k3REg0WZExjGLWW7/wu/X+U6QlbMc8aeO2030= 2 - code.kiri.systems/kiri/magna v0.0.0-20240721214902-8d0a079dbd84/go.mod h1:gSzCiTKyKlUEjGgl/qTb8rxF0QUVuWOEORAsTXA0qyI= 3 - code.kiri.systems/kiri/magna v0.0.0-20240922043826-2c2a1c508469 h1:LUvvGcJ7DuW3eo7yblNH2igCJzYsbWJQ08iZEXBWplc= 4 - code.kiri.systems/kiri/magna v0.0.0-20240922043826-2c2a1c508469/go.mod h1:gSzCiTKyKlUEjGgl/qTb8rxF0QUVuWOEORAsTXA0qyI= 1 + code.kiri.systems/kiri/magna v0.0.0-20250211050847-abb0522bcd30 h1:ORu6TXli7rdqczAOE3Mi+Xc4IlzcgEpNXjUWeNLoqxg= 2 + code.kiri.systems/kiri/magna v0.0.0-20250211050847-abb0522bcd30/go.mod h1:gSzCiTKyKlUEjGgl/qTb8rxF0QUVuWOEORAsTXA0qyI= 5 3 github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= 6 4 github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= 5 + github.com/ClickHouse/ch-go v0.64.1 h1:FWpP+QU4KchgzpEekuv8YoI/fUc4H2r6Bwc5WwrzvcI= 6 + github.com/ClickHouse/ch-go v0.64.1/go.mod h1:RBUynvczWwVzhS6Up9lPKlH1mrk4UAmle6uzCiW4Pkc= 7 + github.com/ClickHouse/clickhouse-go/v2 v2.31.0 h1:9MNHRDYXjFTJizGEJM1DfYAqdra/ohprPoZ+LPiuHXQ= 8 + github.com/ClickHouse/clickhouse-go/v2 v2.31.0/go.mod h1:V1aZaG0ctMbd8KVi+D4loXi97duWYtHiQHMCgipKJcI= 9 + github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= 10 + github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= 11 + github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 7 12 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= 8 13 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 14 + github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= 15 + github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= 16 + github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= 17 + github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo= 18 + github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= 19 + github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= 20 + github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= 21 + github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= 22 + github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= 23 + github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= 24 + github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= 25 + github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= 26 + github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= 27 + github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= 28 + github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= 29 + github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= 30 + github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= 31 + github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= 32 + github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= 33 + github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= 34 + github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= 35 + github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= 36 + github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= 37 + github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= 38 + github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= 39 + github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU= 40 + github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= 41 + github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= 42 + github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= 43 + github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= 44 + github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= 45 + github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= 9 46 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 10 47 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 11 - github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= 12 - github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= 48 + github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= 49 + github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= 50 + github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= 51 + github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= 52 + github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= 53 + github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= 54 + github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= 55 + github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= 56 + github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= 57 + github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= 58 + github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= 59 + github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= 60 + github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= 61 + github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= 62 + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= 63 + github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= 64 + github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= 65 + go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= 66 + go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= 67 + go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= 68 + go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= 69 + go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= 70 + golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= 71 + golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= 72 + golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= 73 + golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= 74 + golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= 75 + golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= 76 + golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= 77 + golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= 78 + golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= 79 + golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= 80 + golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= 81 + golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= 82 + golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= 83 + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= 84 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= 85 + golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= 86 + golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= 87 + golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= 88 + golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= 89 + golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= 90 + golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 91 + golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= 92 + golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= 93 + golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= 94 + golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= 95 + golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= 96 + golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= 97 + golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= 98 + golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= 99 + golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= 100 + golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= 101 + golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= 102 + golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= 103 + golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= 104 + golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= 105 + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= 106 + google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= 107 + google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= 108 + gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 109 + gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= 110 + gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 111 + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= 13 112 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= 14 113 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+53 -21
main.go
··· 9 9 10 10 "code.kiri.systems/kiri/alky/pkg/config" 11 11 "code.kiri.systems/kiri/alky/pkg/dns" 12 + "code.kiri.systems/kiri/alky/pkg/metrics" 12 13 "code.kiri.systems/kiri/alky/pkg/rootservers" 13 14 ) 14 15 ··· 26 27 log.Fatal(err) 27 28 } 28 29 29 - rootServers, err := rootservers.DecodeRootHints(cfg.Server.RootHintsFile) 30 + logger := setupLogger(&cfg) 31 + 32 + metricsClient, err := metrics.NewClickHouseMetrics(&cfg.Metrics, logger) 30 33 if err != nil { 31 34 log.Fatal(err) 32 35 } 33 - 34 - var logger *slog.Logger 35 - switch cfg.Logging.Output { 36 - case "file": 37 - f, err := os.OpenFile(cfg.Logging.FilePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0o644) 38 - if err != nil { 39 - log.Fatal(err) 40 - } 36 + defer metricsClient.Close() 41 37 42 - logger = slog.New(slog.NewJSONHandler(f, nil)) 43 - case "stdout": 44 - fallthrough 45 - default: 46 - logger = slog.New(slog.NewJSONHandler(os.Stdout, nil)) 38 + rootServers, err := rootservers.DecodeRootHints(cfg.Server.RootHintsFile) 39 + if err != nil { 40 + log.Fatal(err) 47 41 } 48 42 49 - memCache := dns.NewMemoryCache() 50 - var cache dns.Cache = memCache 43 + cache := dns.NewMemoryCache(cfg.Cache.MaxItems, cfg.Cache.CleanupInterval.Duration) 51 44 52 45 handler := &dns.QueryHandler{ 53 46 RootServers: rootServers, 54 47 Timeout: time.Duration(cfg.Advanced.QueryTimeout) * time.Second, 55 - Cache: &cache, 48 + Cache: cache, 49 + Logger: logger, 56 50 } 57 51 58 - logConfig := &dns.LogConfig{Logger: logger} 52 + go monitorCacheMetrics(cache, metricsClient, logger) 59 53 60 54 rateLimitHandler := dns.RateLimitMiddleware(&dns.RateLimitConfig{ 61 55 Rate: float64(cfg.Ratelimit.Rate), ··· 63 57 WindowLength: time.Duration(cfg.Ratelimit.Window) * time.Second, 64 58 ExpirationTime: time.Duration(cfg.Ratelimit.ExpirationTime) * time.Second, 65 59 })(handler) 66 - loggingHandler := dns.LoggingMiddleware(logConfig)(rateLimitHandler) 60 + 61 + metricsHandler := metrics.MetricsMiddleware(metricsClient)(rateLimitHandler) 62 + 63 + loggingHandler := dns.LoggingMiddleware(&dns.LogConfig{ 64 + Logger: logger, 65 + Level: slog.LevelInfo, 66 + })(metricsHandler) 67 67 68 68 s := dns.Server{ 69 69 Address: cfg.Server.Address, ··· 72 72 UDPSize: 512, 73 73 ReadTimeout: 2 * time.Second, 74 74 WriteTimeout: 2 * time.Second, 75 - 76 - Logger: logger, 75 + Logger: logger, 77 76 } 78 77 79 78 if err := s.ListenAndServe(); err != nil { 80 79 slog.Error("Failed to start server", "error", err) 81 80 } 82 81 } 82 + 83 + func monitorCacheMetrics(cache *dns.MemoryCache, metricsClient *metrics.ClickHouseMetrics, logger *slog.Logger) { 84 + ticker := time.NewTicker(1 * time.Minute) 85 + defer ticker.Stop() 86 + 87 + for range ticker.C { 88 + stats := cache.GetStats() 89 + metricsClient.RecordCacheStats(stats) 90 + logger.Info("Cache metrics recorded to ClickHouse") 91 + } 92 + } 93 + 94 + func setupLogger(cfg *config.Config) *slog.Logger { 95 + var logger *slog.Logger 96 + 97 + handlerOpts := &slog.HandlerOptions{ 98 + Level: slog.LevelDebug, 99 + } 100 + 101 + switch cfg.Logging.Output { 102 + case "file": 103 + f, err := os.OpenFile(cfg.Logging.FilePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0o644) 104 + if err != nil { 105 + log.Fatal(err) 106 + } 107 + 108 + logger = slog.New(slog.NewJSONHandler(f, handlerOpts)) 109 + default: 110 + logger = slog.New(slog.NewJSONHandler(os.Stdout, handlerOpts)) 111 + } 112 + 113 + return logger 114 + }
+34
migrations/00001_initial.sql
··· 1 + -- +goose Up 2 + CREATE TABLE IF NOT EXISTS alky_dns_queries ( 3 + timestamp DateTime, 4 + instance_id String, 5 + query_name String, 6 + query_type String, 7 + query_class String, 8 + remote_addr String, 9 + response_code String, 10 + duration Int64, 11 + cache_hit Bool 12 + ) ENGINE = MergeTree() 13 + PARTITION BY toYYYYMM(timestamp) 14 + ORDER BY (timestamp, instance_id, query_name) 15 + TTL timestamp + toIntervalDay(30); 16 + 17 + CREATE TABLE IF NOT EXISTS alky_dns_cache_metrics ( 18 + timestamp DateTime, 19 + instance_id String, 20 + total_queries Int64, 21 + cache_hits Int64, 22 + cache_misses Int64, 23 + negative_hits Int64, 24 + positive_hits Int64, 25 + evictions Int64, 26 + size Int, 27 + ) ENGINE = MergeTree() 28 + PARTITION BY toYYYYMM(timestamp) 29 + ORDER BY (timestamp, instance_id) 30 + TTL timestamp + toIntervalDay(30); 31 + 32 + -- +goose Down 33 + DROP TABLE IF EXISTS alky_dns_queries; 34 + DROP TABLE IF EXISTS alky_dns_cache_metrics;
+49
pkg/config/config.go
··· 2 2 3 3 import ( 4 4 "fmt" 5 + "time" 5 6 6 7 "github.com/BurntSushi/toml" 7 8 ) ··· 24 25 ExpirationTime int `toml:"expiration_time"` 25 26 } 26 27 28 + type MetricsConfig struct { 29 + DSN string `toml:"dsn"` 30 + BatchSize int `toml:"batch_size"` 31 + FlushInterval duration `toml:"flush_interval"` 32 + RetentionPeriod duration `toml:"retention_period"` 33 + } 34 + 27 35 type AdvancedConfig struct { 28 36 QueryTimeout int `toml:"query_timeout"` 37 + } 38 + 39 + type CacheConfig struct { 40 + MaxItems int `toml:"max_items"` 41 + CleanupInterval duration `toml:"cleanup_interval"` 29 42 } 30 43 31 44 type Config struct { 32 45 Server ServerConfig `toml:"server"` 33 46 Logging LoggingConfig `toml:"logging"` 34 47 Ratelimit RatelimitConfig `toml:"ratelimit"` 48 + Metrics MetricsConfig `toml:"metrics"` 49 + Cache CacheConfig `toml:"cache"` 35 50 Advanced AdvancedConfig `toml:"advanced"` 51 + } 52 + 53 + type duration struct { 54 + time.Duration 55 + } 56 + 57 + func (d *duration) UnmarshalText(text []byte) error { 58 + var err error 59 + d.Duration, err = time.ParseDuration(string(text)) 60 + return err 36 61 } 37 62 38 63 func LoadConfig(path string) (Config, error) { ··· 55 80 56 81 if cfg.Logging.Output == "file" && cfg.Logging.FilePath == "" { 57 82 return cfg, fmt.Errorf("If `[logging.output]` is `file` then `file_path` must be set.") 83 + } 84 + 85 + if cfg.Metrics.DSN == "" { 86 + cfg.Metrics.DSN = "clickhouse://localhost:9000/default" 87 + } 88 + 89 + if cfg.Metrics.BatchSize == 0 { 90 + cfg.Metrics.BatchSize = 1000 91 + } 92 + 93 + if cfg.Metrics.FlushInterval.Duration == 0 { 94 + cfg.Metrics.FlushInterval.Duration = 10 * time.Second 95 + } 96 + 97 + if cfg.Metrics.RetentionPeriod.Duration == 0 { 98 + cfg.Metrics.RetentionPeriod.Duration = 30 * 24 * time.Hour 99 + } 100 + 101 + if cfg.Cache.MaxItems == 0 { 102 + cfg.Cache.MaxItems = 5000 103 + } 104 + 105 + if cfg.Cache.CleanupInterval.Duration == 0 { 106 + cfg.Cache.CleanupInterval.Duration = 5 * time.Minute 58 107 } 59 108 60 109 if cfg.Advanced.QueryTimeout == 0 {
+222 -12
pkg/dns/cache.go
··· 1 1 package dns 2 2 3 3 import ( 4 + "strings" 4 5 "sync" 5 6 "time" 6 7 7 8 "code.kiri.systems/kiri/magna" 8 9 ) 9 10 11 + type BailiwickRule int 12 + 13 + const ( 14 + BailiwickSame BailiwickRule = iota 15 + BailiwickChild 16 + BailiwickOutside 17 + ) 18 + 19 + type CacheEntry struct { 20 + Answer []CachedResourceRecord 21 + Authority []CachedResourceRecord 22 + Additional []CachedResourceRecord 23 + NegativeTTL time.Duration 24 + ExpireAt time.Time 25 + CreateTime time.Time 26 + IsNegative bool 27 + } 28 + 10 29 type CachedResourceRecord struct { 11 - Record magna.ResourceRecord 12 - ExpireAt time.Time 30 + Record magna.ResourceRecord 31 + ExpireAt time.Time 32 + BailiwickRule BailiwickRule 33 + } 34 + 35 + type MemoryCache struct { 36 + entries map[string]*CacheEntry 37 + mu sync.RWMutex 38 + stats CacheStats 39 + maxSize int 40 + cleaupInterval time.Duration 13 41 } 14 42 15 - type CacheEntry struct { 16 - Answer []CachedResourceRecord 43 + type CacheStats struct { 44 + TotalQueries int64 45 + CacheHits int64 46 + CacheMisses int64 47 + NegativeHits int64 48 + PositiveHits int64 49 + Evictions int64 50 + Size int64 17 51 } 18 52 19 53 type Cache interface { 20 54 Get(key string) (*CacheEntry, bool) 21 55 Set(key string, entry *CacheEntry) 56 + GetStats() *CacheStats 22 57 } 23 58 24 - type MemoryCache struct { 25 - entries map[string]*CacheEntry 26 - mu sync.RWMutex 27 - } 28 - 29 - func NewMemoryCache() *MemoryCache { 30 - return &MemoryCache{ 31 - entries: make(map[string]*CacheEntry), 59 + func NewMemoryCache(maxSize int, cleanupInterval time.Duration) *MemoryCache { 60 + cache := &MemoryCache{ 61 + entries: make(map[string]*CacheEntry), 62 + maxSize: maxSize, 63 + cleaupInterval: cleanupInterval, 32 64 } 65 + 66 + go cache.periodicCleanup() 67 + return cache 33 68 } 34 69 35 70 func (c *MemoryCache) Get(key string) (*CacheEntry, bool) { 36 71 c.mu.RLock() 37 72 c.mu.RUnlock() 38 73 74 + c.stats.TotalQueries++ 75 + 39 76 entry, exists := c.entries[key] 40 77 if !exists { 78 + c.stats.CacheMisses++ 79 + return nil, false 80 + } 81 + 82 + if time.Now().After(entry.ExpireAt) { 83 + c.stats.CacheMisses++ 41 84 return nil, false 42 85 } 86 + 87 + if entry.IsNegative { 88 + c.stats.NegativeHits++ 89 + } else { 90 + c.stats.PositiveHits++ 91 + } 92 + c.stats.CacheHits++ 43 93 44 94 return entry, true 45 95 } ··· 48 98 c.mu.Lock() 49 99 defer c.mu.Unlock() 50 100 101 + if len(c.entries) >= c.maxSize { 102 + c.evictOldest() 103 + } 104 + 51 105 c.entries[key] = entry 106 + c.stats.Size = int64(len(c.entries)) 107 + } 108 + 109 + func (c *MemoryCache) GetStats() *CacheStats { 110 + c.mu.RLock() 111 + defer c.mu.RUnlock() 112 + 113 + stats := c.stats 114 + return &stats 115 + } 116 + 117 + func (c *MemoryCache) evictOldest() { 118 + var oldestKey string 119 + var oldestTime time.Time 120 + 121 + first := true 122 + for k, e := range c.entries { 123 + if first || e.CreateTime.Before(oldestTime) { 124 + oldestKey = k 125 + oldestTime = e.CreateTime 126 + first = false 127 + } 128 + } 129 + 130 + if oldestKey != "" { 131 + delete(c.entries, oldestKey) 132 + c.stats.Evictions++ 133 + } 134 + } 135 + 136 + func (c *MemoryCache) periodicCleanup() { 137 + ticker := time.NewTicker(c.cleaupInterval) 138 + defer ticker.Stop() 139 + 140 + for range ticker.C { 141 + c.cleanup() 142 + } 143 + } 144 + 145 + func (c *MemoryCache) cleanup() { 146 + c.mu.Lock() 147 + defer c.mu.Unlock() 148 + 149 + now := time.Now() 150 + for k, e := range c.entries { 151 + if now.After(e.ExpireAt) { 152 + delete(c.entries, k) 153 + } 154 + } 155 + 156 + c.stats.Size = int64(len(c.entries)) 157 + } 158 + 159 + func getMinTTL(records []magna.ResourceRecord) uint32 { 160 + if len(records) == 0 { 161 + return 0 162 + } 163 + 164 + minTTL := records[0].TTL 165 + for _, rr := range records[1:] { 166 + if rr.TTL < minTTL { 167 + minTTL = rr.TTL 168 + } 169 + } 170 + return minTTL 171 + } 172 + 173 + func CreateCacheEntry(msg *magna.Message, zone string) *CacheEntry { 174 + now := time.Now() 175 + entry := &CacheEntry{ 176 + CreateTime: now, 177 + IsNegative: msg.Header.RCode == magna.NXDOMAIN, 178 + } 179 + 180 + if entry.IsNegative { 181 + var soaTTL uint32 182 + for _, auth := range msg.Authority { 183 + if auth.RType == magna.SOAType { 184 + soaTTL = auth.TTL 185 + break 186 + } 187 + } 188 + 189 + if soaTTL == 0 { 190 + soaTTL = 900 191 + } 192 + 193 + entry.NegativeTTL = time.Duration(soaTTL) * time.Second 194 + entry.ExpireAt = now.Add(entry.NegativeTTL) 195 + entry.Authority = make([]CachedResourceRecord, len(msg.Authority)) 196 + for i, rr := range msg.Authority { 197 + rule := determineBailiwickRule(zone, rr.Name) 198 + entry.Authority[i] = CachedResourceRecord{ 199 + Record: rr, 200 + ExpireAt: now.Add(time.Duration(rr.TTL) * time.Second), 201 + BailiwickRule: rule, 202 + } 203 + } 204 + 205 + return entry 206 + } 207 + 208 + minTTL := getMinTTL(msg.Answer) 209 + entry.ExpireAt = now.Add(time.Duration(minTTL) * time.Second) 210 + 211 + entry.Answer = make([]CachedResourceRecord, len(msg.Answer)) 212 + for i, rr := range msg.Answer { 213 + rule := determineBailiwickRule(zone, rr.Name) 214 + entry.Answer[i] = CachedResourceRecord{ 215 + Record: rr, 216 + ExpireAt: now.Add(time.Duration(rr.TTL) * time.Second), 217 + BailiwickRule: rule, 218 + } 219 + } 220 + 221 + entry.Authority = make([]CachedResourceRecord, len(msg.Authority)) 222 + for i, rr := range msg.Authority { 223 + rule := determineBailiwickRule(zone, rr.Name) 224 + entry.Authority[i] = CachedResourceRecord{ 225 + Record: rr, 226 + ExpireAt: now.Add(time.Duration(rr.TTL) * time.Second), 227 + BailiwickRule: rule, 228 + } 229 + } 230 + 231 + entry.Additional = make([]CachedResourceRecord, len(msg.Additional)) 232 + for i, rr := range msg.Additional { 233 + rule := determineBailiwickRule(zone, rr.Name) 234 + entry.Additional[i] = CachedResourceRecord{ 235 + Record: rr, 236 + ExpireAt: now.Add(time.Duration(rr.TTL) * time.Second), 237 + BailiwickRule: rule, 238 + } 239 + } 240 + 241 + return entry 242 + } 243 + 244 + func isSubdomainOf(a, b string) bool { 245 + if a == b { 246 + return true 247 + } 248 + 249 + return strings.HasSuffix(b, "."+a) 250 + } 251 + 252 + func determineBailiwickRule(zone, name string) BailiwickRule { 253 + if zone == name { 254 + return BailiwickSame 255 + } 256 + 257 + if isSubdomainOf(zone, name) { 258 + return BailiwickChild 259 + } 260 + 261 + return BailiwickOutside 52 262 }
+179 -110
pkg/dns/resolve.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 + "log/slog" 6 7 "net" 7 - "strings" 8 8 "time" 9 9 10 10 "code.kiri.systems/kiri/magna" ··· 13 13 type QueryHandler struct { 14 14 RootServers []string 15 15 Timeout time.Duration 16 - Cache *Cache 16 + Cache *MemoryCache 17 + Logger *slog.Logger 17 18 } 18 19 19 20 type queryResponse struct { ··· 23 24 } 24 25 25 26 func (h *QueryHandler) ServeDNS(w ResponseWriter, r *Request) { 26 - msg := h.processQuery(r.Message.Encode()) 27 - w.WriteMsg(msg) 28 - } 29 - 30 - func (h *QueryHandler) processQuery(messageBuffer []byte) *magna.Message { 31 - var query magna.Message 32 - if err := query.Decode(messageBuffer); err != nil { 33 - return nil 27 + if len(r.Message.Question) < 1 { 28 + h.Logger.Debug("received query with no questions") 29 + msg := r.Message.CreateReply(r.Message) 30 + msg = msg.SetRCode(magna.FORMERR) 31 + w.WriteMsg(msg) 32 + return 34 33 } 35 34 36 - msg := new(magna.Message) 37 - msg = msg.CreateReply(&query) 38 - 39 - if len(query.Question) < 1 { 40 - return msg.SetRCode(magna.FORMERR) 41 - } 42 - 43 - question := query.Question[0] 44 - msg = msg.AddQuestion(question) 35 + question := r.Message.Question[0] 36 + answers, authority, err := h.resolveQuestion(r.Context, question, h.RootServers) 45 37 46 - if question.QClass != magna.IN { 47 - return msg.SetRCode(magna.NOTIMP) 48 - } 38 + msg := r.Message.CreateReply(r.Message) 39 + msg.Header.RA = true 49 40 50 - answer, err := h.resolveWithCache(question) 51 41 if err != nil { 52 - return msg.SetRCode(magna.SERVFAIL) 42 + if err.Error() == "nxdomain" { 43 + msg = msg.SetRCode(magna.NXDOMAIN) 44 + msg.Authority = authority 45 + msg.Header.NSCount = uint16(len(authority)) 46 + } else { 47 + msg = msg.SetRCode(magna.SERVFAIL) 48 + } 49 + } else { 50 + msg.Answer = answers 51 + msg.Header.ANCount = uint16(len(answers)) 52 + msg = msg.SetRCode(magna.NOERROR) 53 53 } 54 54 55 - if len(answer) == 0 { 56 - return msg.SetRCode(magna.NXDOMAIN) 57 - } 55 + w.WriteMsg(msg) 56 + } 58 57 59 - msg.Header.ANCount = uint16(len(answer)) 60 - msg.Answer = answer 61 - return msg.SetRCode(magna.NOERROR) 58 + func (h *QueryHandler) resolveQuestion(ctx context.Context, question magna.Question, servers []string) ([]magna.ResourceRecord, []magna.ResourceRecord, error) { 59 + return h.resolveQuestionWithZone(ctx, question, servers, ".") 62 60 } 63 61 64 - func (h *QueryHandler) resolveWithCache(question magna.Question) ([]magna.ResourceRecord, error) { 65 - cacheKey := fmt.Sprintf("%s:%s:%s", strings.ToLower(question.QName), question.QType.String(), question.QClass.String()) 62 + func (h *QueryHandler) resolveQuestionWithZone(ctx context.Context, question magna.Question, servers []string, currentZone string) ([]magna.ResourceRecord, []magna.ResourceRecord, error) { 63 + cacheKey := fmt.Sprintf("%s:%s:%s", question.QName, question.QType.String(), question.QClass.String()) 66 64 67 - if e, found := (*h.Cache).Get(cacheKey); found { 68 - now := time.Now() 69 - var updatedAnswer []magna.ResourceRecord 70 - var cname *magna.ResourceRecord 71 - hasAddressRecord := false 65 + if h.Cache != nil { 66 + if entry, hit := h.Cache.Get(cacheKey); hit { 67 + now := time.Now() 72 68 73 - for _, cachedRR := range e.Answer { 74 - if now.Before(cachedRR.ExpireAt) { 75 - updatedRR := cachedRR.Record 76 - updatedRR.TTL = uint32(cachedRR.ExpireAt.Sub(now).Seconds()) 77 - updatedAnswer = append(updatedAnswer, updatedRR) 69 + if r := getCurrentRequest(ctx); r != nil { 70 + r.Context = setCacheHit(r.Context) 71 + } 78 72 79 - if updatedRR.RType == magna.CNAMEType && cname == nil { 80 - cname = &updatedRR 81 - } else if updatedRR.RType == question.QType { 82 - hasAddressRecord = true 83 - } 73 + if entry.IsNegative { 74 + return nil, convertCachedToMagna(entry.Authority, now), fmt.Errorf("nxdomain") 84 75 } 85 - } 86 76 87 - if len(updatedAnswer) > 0 { 88 - // add AAAA types when magna supports those record types 89 - if cname != nil && !hasAddressRecord && (question.QType == magna.AType) { 90 - cnameTarget := cname.RData.String() 91 - aRecords, err := h.resolveWithCache(magna.Question{QName: cnameTarget, QType: question.QType, QClass: question.QClass}) 92 - if err == nil && len(aRecords) > 0 { 93 - updatedAnswer = append(updatedAnswer, aRecords...) 94 - } 77 + validAnswers := convertCachedToMagna(entry.Answer, now) 78 + if len(validAnswers) > 0 { 79 + return validAnswers, nil, nil 95 80 } 96 - return updatedAnswer, nil 97 - } 98 - } 99 - 100 - answer, err := h.resolveQuestion(question, h.RootServers) 101 - if err != nil { 102 - return nil, err 103 - } 104 - 105 - now := time.Now() 106 - cachedAnswer := make([]CachedResourceRecord, len(answer)) 107 - for i, rr := range answer { 108 - cachedAnswer[i] = CachedResourceRecord{ 109 - Record: rr, 110 - ExpireAt: now.Add(time.Duration(rr.TTL) * time.Second), 111 - } 112 - } 113 - 114 - entry := &CacheEntry{ 115 - Answer: cachedAnswer, 116 - } 117 - (*h.Cache).Set(cacheKey, entry) 118 - 119 - if len(answer) > 0 && answer[0].RType == magna.CNAMEType && question.QType == magna.AType { 120 - cnameTarget := answer[len(answer)-1].RData.String() 121 - addressRecords, err := h.resolveWithCache(magna.Question{QName: cnameTarget, QType: question.QType, QClass: question.QClass}) 122 - if err == nil && len(addressRecords) > 0 { 123 - answer = append(answer, addressRecords...) 81 + } else { 82 + if r := getCurrentRequest(ctx); r != nil { 83 + r.Context = setCacheMiss(r.Context) 84 + } 124 85 } 125 86 } 126 87 127 - return answer, nil 128 - } 129 - 130 - func (h *QueryHandler) resolveQuestion(question magna.Question, servers []string) ([]magna.ResourceRecord, error) { 131 - ctx, cancel := context.WithCancel(context.Background()) 88 + ctx, cancel := context.WithTimeout(ctx, h.Timeout) 132 89 defer cancel() 133 90 134 91 ch := make(chan queryResponse, len(servers)) 135 - 136 92 for _, s := range servers { 137 93 go queryServer(ctx, question, s, ch, h.Timeout) 138 94 } ··· 141 97 select { 142 98 case res := <-ch: 143 99 if res.Error != nil { 144 - break 100 + h.Logger.Debug("server query failed", 101 + "server", res.Server, 102 + "error", res.Error) 103 + continue 145 104 } 146 105 147 106 msg := res.MSG 107 + zone := extractZone(&msg) 108 + if zone == "" { 109 + zone = currentZone 110 + } 111 + 112 + if msg.Header.RCode == magna.NXDOMAIN { 113 + entry := CreateCacheEntry(&msg, zone) 114 + h.Cache.Set(cacheKey, entry) 115 + now := time.Now() 116 + return nil, convertCachedToMagna(entry.Authority, now), fmt.Errorf("nxdomain") 117 + } 118 + 148 119 if msg.Header.ANCount > 0 { 149 120 if msg.Answer[0].RType == magna.CNAMEType { 150 - cname_answers, err := h.resolveQuestion(magna.Question{QName: msg.Answer[0].RData.String(), QType: question.QType, QClass: question.QClass}, h.RootServers) 151 - if err != nil { 152 - continue 121 + h.Logger.Debug("following CNAME", 122 + "cname", msg.Answer[0].RData.String()) 123 + 124 + entry := CreateCacheEntry(&msg, zone) 125 + h.Cache.Set(cacheKey, entry) 126 + 127 + answers, auth, err := h.resolveQuestionWithZone(ctx, magna.Question{QName: msg.Answer[0].RData.String(), QType: question.QType, QClass: question.QClass}, h.RootServers, zone) 128 + if err == nil { 129 + return append(msg.Answer, answers...), auth, nil 153 130 } 154 - msg.Answer = append(msg.Answer, cname_answers...) 131 + 132 + return nil, auth, err 155 133 } 156 134 157 - return msg.Answer, nil 135 + entry := CreateCacheEntry(&msg, zone) 136 + h.Cache.Set(cacheKey, entry) 137 + return msg.Answer, nil, nil 158 138 } 159 139 160 140 if msg.Header.ARCount > 0 { ··· 165 145 } 166 146 } 167 147 168 - return h.resolveQuestion(question, nextZone) 148 + if len(nextZone) > 0 { 149 + return h.resolveQuestion(ctx, question, nextZone) 150 + } 169 151 } 170 152 171 153 if msg.Header.NSCount > 0 { 172 - var ns []string 173 - for _, a := range msg.Authority { 174 - if a.RType == magna.NSType { 175 - ans, err := h.resolveQuestion(magna.Question{QName: a.RData.String(), QType: magna.AType, QClass: magna.IN}, h.RootServers) 176 - if err != nil { 177 - break 154 + nsRecords := make(map[string]string) 155 + glueRecords := make(map[string]string) 156 + 157 + if msg.Header.ARCount > 0 { 158 + for _, additional := range msg.Additional { 159 + if additional.RType == magna.AType { 160 + rule := determineBailiwickRule(zone, additional.Name) 161 + if rule != BailiwickOutside { 162 + glueRecords[additional.Name] = additional.RData.String() 163 + } 178 164 } 179 - for _, x := range ans { 180 - ns = append(ns, x.RData.String()) 165 + } 166 + } 167 + 168 + var nextServers []string 169 + var needResolution []string 170 + 171 + for _, auth := range msg.Authority { 172 + if auth.RType == magna.NSType { 173 + rule := determineBailiwickRule(zone, auth.Name) 174 + if rule != BailiwickOutside { 175 + nsName := auth.RData.String() 176 + nsRecords[nsName] = "" 177 + 178 + if ip, exists := glueRecords[nsName]; exists { 179 + nextServers = append(nextServers, ip) 180 + } else { 181 + needResolution = append(needResolution, nsName) 182 + } 181 183 } 182 184 } 183 185 } 184 186 185 - return h.resolveQuestion(question, ns) 187 + if len(nextServers) > 0 { 188 + h.Logger.Debug("using glue records for resolution", 189 + "servers", nextServers) 190 + return h.resolveQuestionWithZone(ctx, question, nextServers, zone) 191 + } 192 + 193 + for _, ns := range needResolution { 194 + answers, _, err := h.resolveQuestionWithZone( 195 + ctx, 196 + magna.Question{ 197 + QName: ns, 198 + QType: magna.AType, 199 + QClass: magna.IN, 200 + }, 201 + h.RootServers, 202 + zone, 203 + ) 204 + if err == nil { 205 + for _, ans := range answers { 206 + nextServers = append(nextServers, ans.RData.String()) 207 + } 208 + } 209 + } 210 + 211 + if len(nextServers) > 0 { 212 + return h.resolveQuestionWithZone(ctx, question, nextServers, zone) 213 + } 186 214 } 187 215 188 - return []magna.ResourceRecord{}, nil 189 - case <-time.After(h.Timeout): 190 - cancel() 216 + case <-ctx.Done(): 217 + return nil, nil, ctx.Err() 191 218 } 192 219 } 193 220 194 - return []magna.ResourceRecord{}, nil 221 + return nil, nil, fmt.Errorf("all queries failed") 195 222 } 196 223 197 224 func queryServer(ctx context.Context, question magna.Question, server string, ch chan<- queryResponse, timeout time.Duration) { ··· 238 265 ch <- queryResponse{Server: server, Error: fmt.Errorf("timeout")} 239 266 } 240 267 } 268 + 269 + func convertCachedToMagna(cached []CachedResourceRecord, now time.Time) []magna.ResourceRecord { 270 + result := make([]magna.ResourceRecord, 0, len(cached)) 271 + for _, record := range cached { 272 + if now.Before(record.ExpireAt) { 273 + rr := record.Record 274 + rr.TTL = uint32(record.ExpireAt.Sub(now).Seconds()) 275 + result = append(result, rr) 276 + } 277 + } 278 + return result 279 + } 280 + 281 + func getCurrentRequest(ctx context.Context) *Request { 282 + if ctx == nil { 283 + return nil 284 + } 285 + 286 + if r, ok := ctx.Value(contextKey("request")).(*Request); ok { 287 + return r 288 + } 289 + 290 + return nil 291 + } 292 + 293 + func extractZone(msg *magna.Message) string { 294 + for _, auth := range msg.Authority { 295 + if auth.RType == magna.SOAType { 296 + return auth.Name 297 + } 298 + 299 + if auth.RType == magna.NSType { 300 + return auth.Name 301 + } 302 + } 303 + 304 + if len(msg.Question) > 0 { 305 + return msg.Question[0].QName 306 + } 307 + 308 + return "" 309 + }
+32 -5
pkg/dns/server.go
··· 1 1 package dns 2 2 3 3 import ( 4 + "context" 4 5 "encoding/binary" 5 6 "fmt" 6 7 "io" ··· 12 13 "code.kiri.systems/kiri/magna" 13 14 ) 14 15 16 + type contextKey string 17 + 18 + const ( 19 + cacheHitKey contextKey = "cache_hit" 20 + ) 21 + 22 + func setCacheHit(ctx context.Context) context.Context { 23 + return context.WithValue(ctx, cacheHitKey, true) 24 + } 25 + 26 + func setCacheMiss(ctx context.Context) context.Context { 27 + return context.WithValue(ctx, cacheHitKey, false) 28 + } 29 + 30 + func GetCacheHit(ctx context.Context) bool { 31 + v := ctx.Value(cacheHitKey) 32 + if v == nil { 33 + return false 34 + } 35 + 36 + return v.(bool) 37 + } 38 + 15 39 type Handler interface { 16 40 ServeDNS(ResponseWriter, *Request) 17 41 } ··· 23 47 } 24 48 25 49 type Request struct { 50 + Context context.Context 26 51 RemoteAddr net.Addr 27 52 Message *magna.Message 28 53 } ··· 89 114 ReadTimeout time.Duration 90 115 WriteTimeout time.Duration 91 116 Logger *slog.Logger 92 - Cache Cache 117 + Cache *MemoryCache 93 118 } 94 119 95 120 func (srv *Server) ListenAndServe() error { ··· 184 209 for { 185 210 conn, err := listener.Accept() 186 211 if err != nil { 187 - srv.Logger.Warn("tcp accept error:", err) 212 + srv.Logger.Warn("tcp accept error:", "error", err) 188 213 continue 189 214 } 190 215 ··· 203 228 204 229 sizeBuffer := make([]byte, 2) 205 230 if _, err := io.ReadFull(conn, sizeBuffer); err != nil { 206 - srv.Logger.Warn("tcp-error", err) 231 + srv.Logger.Warn("tcp error occurred", "error", err) 207 232 return 208 233 } 209 234 210 235 size := binary.BigEndian.Uint16(sizeBuffer) 211 236 data := make([]byte, size) 212 237 if _, err := io.ReadFull(conn, data); err != nil { 213 - srv.Logger.Warn("tcp-error", err) 238 + srv.Logger.Warn("tcp error occurred", "error", err) 214 239 return 215 240 } 216 241 ··· 226 251 func (srv *Server) handleQuery(messageBuffer []byte, w ResponseWriter, remoteAddr net.Addr) { 227 252 var query magna.Message 228 253 if err := query.Decode(messageBuffer); err != nil { 229 - srv.Logger.Warn("decode error", err) 254 + srv.Logger.Warn("message decode error", "error", err) 230 255 return 231 256 } 232 257 ··· 234 259 Message: &query, 235 260 RemoteAddr: remoteAddr, 236 261 } 262 + 263 + r.Context = context.WithValue(context.Background(), contextKey("request"), r) 237 264 238 265 srv.Handler.ServeDNS(w, r) 239 266 }
+238
pkg/metrics/clickhouse.go
··· 1 + package metrics 2 + 3 + import ( 4 + "database/sql" 5 + "fmt" 6 + "log/slog" 7 + "sync" 8 + "time" 9 + 10 + _ "github.com/ClickHouse/clickhouse-go/v2" 11 + 12 + "code.kiri.systems/kiri/alky/pkg/config" 13 + "code.kiri.systems/kiri/alky/pkg/dns" 14 + ) 15 + 16 + type ClickHouseMetrics struct { 17 + db *sql.DB 18 + config *config.MetricsConfig 19 + queryBuffer []QueryMetric 20 + cacheBuffer []CacheMetric 21 + mu sync.Mutex 22 + stopChan chan struct{} 23 + logger *slog.Logger 24 + } 25 + 26 + type QueryMetric struct { 27 + Timestamp time.Time 28 + InstanceID string 29 + QueryName string 30 + QueryType string 31 + QueryClass string 32 + RemoteAddr string 33 + ResponseCode string 34 + Duration int64 35 + CacheHit bool 36 + } 37 + 38 + type CacheMetric struct { 39 + Timestamp time.Time 40 + TotalQueries int64 41 + CacheHits int64 42 + CacheMisses int64 43 + NegativeHits int64 44 + PositiveHits int64 45 + Evictions int64 46 + Size int64 47 + } 48 + 49 + func NewClickHouseMetrics(config *config.MetricsConfig, logger *slog.Logger) (*ClickHouseMetrics, error) { 50 + db, err := sql.Open("clickhouse", config.DSN) 51 + if err != nil { 52 + return nil, fmt.Errorf("failed to connect to ClickHouse: %w", err) 53 + } 54 + 55 + m := &ClickHouseMetrics{ 56 + db: db, 57 + config: config, 58 + queryBuffer: make([]QueryMetric, 0, config.BatchSize), 59 + cacheBuffer: make([]CacheMetric, 0, config.BatchSize), 60 + stopChan: make(chan struct{}), 61 + logger: logger, 62 + } 63 + 64 + if err := m.changeTTL(); err != nil { 65 + db.Close() 66 + return nil, fmt.Errorf("failed to initialize tables: %w", err) 67 + } 68 + 69 + go m.flushLoop() 70 + return m, nil 71 + } 72 + 73 + func (m *ClickHouseMetrics) RecordQuery(metric QueryMetric) { 74 + m.mu.Lock() 75 + defer m.mu.Unlock() 76 + 77 + m.queryBuffer = append(m.queryBuffer, metric) 78 + if len(m.queryBuffer) >= m.config.BatchSize { 79 + m.flush() 80 + } 81 + } 82 + 83 + func (m *ClickHouseMetrics) RecordCacheMetrics(metric CacheMetric) { 84 + m.mu.Lock() 85 + defer m.mu.Unlock() 86 + 87 + m.cacheBuffer = append(m.cacheBuffer, metric) 88 + if len(m.cacheBuffer) >= m.config.BatchSize { 89 + m.flush() 90 + } 91 + } 92 + 93 + func (m *ClickHouseMetrics) RecordCacheStats(stats *dns.CacheStats) { 94 + m.RecordCacheMetrics(CacheMetric{ 95 + Timestamp: time.Now(), 96 + TotalQueries: stats.TotalQueries, 97 + CacheHits: stats.CacheHits, 98 + CacheMisses: stats.CacheMisses, 99 + NegativeHits: stats.NegativeHits, 100 + PositiveHits: stats.PositiveHits, 101 + Evictions: stats.Evictions, 102 + Size: stats.Size, 103 + }) 104 + } 105 + 106 + func (m *ClickHouseMetrics) flushLoop() { 107 + ticker := time.NewTicker(m.config.FlushInterval.Duration) 108 + defer ticker.Stop() 109 + 110 + for { 111 + select { 112 + case <-ticker.C: 113 + m.mu.Lock() 114 + m.flush() 115 + m.mu.Unlock() 116 + case <-m.stopChan: 117 + return 118 + } 119 + } 120 + } 121 + 122 + func (m *ClickHouseMetrics) flush() { 123 + if len(m.queryBuffer) > 0 { 124 + if err := m.flushQueries(); err != nil { 125 + m.logger.Error("Failed to flush query metrics", "error", err) 126 + } 127 + m.queryBuffer = m.queryBuffer[:0] 128 + } 129 + 130 + if len(m.cacheBuffer) > 0 { 131 + if err := m.flushCacheMetrics(); err != nil { 132 + m.logger.Error("Failed to flush cache metrics", "error", err) 133 + } 134 + m.cacheBuffer = m.cacheBuffer[:0] 135 + } 136 + } 137 + 138 + func (m *ClickHouseMetrics) changeTTL() error { 139 + if _, err := m.db.Exec( 140 + "ALTER TABLE alky_dns_queries MODIFY TTL timestamp + toIntervalSecond(?)", 141 + int(m.config.RetentionPeriod.Seconds()), 142 + ); err != nil { 143 + return fmt.Errorf("failed to update alky_dns_queries TTL: %w", err) 144 + } 145 + 146 + if _, err := m.db.Exec( 147 + "ALTER TABLE alky_dns_cache_metrics MODIFY TTL timestamp + toIntervalSecond(?)", 148 + int(m.config.RetentionPeriod.Seconds()), 149 + ); err != nil { 150 + return fmt.Errorf("failed to update alky_dns_cache_metrics TTL: %w", err) 151 + } 152 + 153 + return nil 154 + } 155 + 156 + func (m *ClickHouseMetrics) flushQueries() error { 157 + tx, err := m.db.Begin() 158 + if err != nil { 159 + return err 160 + } 161 + defer tx.Rollback() 162 + 163 + stmt, err := tx.Prepare(` 164 + INSERT INTO alky_dns_queries ( 165 + timestamp, instance_id, query_name, query_type, query_class, 166 + remote_addr, response_code, duration, cache_hit 167 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 168 + `) 169 + if err != nil { 170 + return err 171 + } 172 + defer stmt.Close() 173 + 174 + for _, metric := range m.queryBuffer { 175 + _, err := stmt.Exec( 176 + metric.Timestamp, 177 + metric.InstanceID, 178 + metric.QueryName, 179 + metric.QueryType, 180 + metric.QueryClass, 181 + metric.RemoteAddr, 182 + metric.ResponseCode, 183 + metric.Duration, 184 + metric.CacheHit, 185 + ) 186 + if err != nil { 187 + return err 188 + } 189 + } 190 + 191 + return tx.Commit() 192 + } 193 + 194 + func (m *ClickHouseMetrics) flushCacheMetrics() error { 195 + tx, err := m.db.Begin() 196 + if err != nil { 197 + return err 198 + } 199 + defer tx.Rollback() 200 + 201 + stmt, err := tx.Prepare(` 202 + INSERT INTO alky_dns_cache_metrics ( 203 + timestamp, instance_id, total_queries, cache_hits, cache_misses, 204 + negative_hits, positive_hits, evictions, size 205 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 206 + `) 207 + if err != nil { 208 + return err 209 + } 210 + defer stmt.Close() 211 + 212 + for _, metric := range m.cacheBuffer { 213 + _, err := stmt.Exec( 214 + metric.Timestamp, 215 + GetInstanceID(), 216 + metric.TotalQueries, 217 + metric.CacheHits, 218 + metric.CacheMisses, 219 + metric.NegativeHits, 220 + metric.PositiveHits, 221 + metric.Evictions, 222 + metric.Size, 223 + ) 224 + if err != nil { 225 + return err 226 + } 227 + } 228 + 229 + return tx.Commit() 230 + } 231 + 232 + func (m *ClickHouseMetrics) Close() error { 233 + close(m.stopChan) 234 + m.mu.Lock() 235 + defer m.mu.Unlock() 236 + m.flush() 237 + return m.db.Close() 238 + }
+60
pkg/metrics/middleware.go
··· 1 + package metrics 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "os" 7 + "time" 8 + 9 + "code.kiri.systems/kiri/alky/pkg/dns" 10 + ) 11 + 12 + var ( 13 + instanceID string 14 + version string 15 + ) 16 + 17 + func init() { 18 + var err error 19 + hostname, err := os.Hostname() 20 + if err != nil { 21 + hostname = "unknown" 22 + } 23 + 24 + if version != "" { 25 + instanceID = fmt.Sprintf("%s-%s", hostname, version) 26 + } else { 27 + instanceID = hostname 28 + } 29 + } 30 + 31 + func MetricsMiddleware(metrics *ClickHouseMetrics) func(dns.Handler) dns.Handler { 32 + return func(next dns.Handler) dns.Handler { 33 + return dns.HandlerFunc(func(w dns.ResponseWriter, r *dns.Request) { 34 + if r.Context == nil { 35 + r.Context = context.Background() 36 + } 37 + 38 + start := time.Now() 39 + next.ServeDNS(w, r) 40 + duration := time.Since(start) 41 + 42 + question := r.Message.Question[0] 43 + metrics.RecordQuery(QueryMetric{ 44 + Timestamp: time.Now(), 45 + InstanceID: instanceID, 46 + QueryName: question.QName, 47 + QueryType: question.QType.String(), 48 + QueryClass: question.QClass.String(), 49 + RemoteAddr: r.RemoteAddr.String(), 50 + ResponseCode: r.Message.Header.RCode.String(), 51 + Duration: duration.Nanoseconds(), 52 + CacheHit: dns.GetCacheHit(r.Context), 53 + }) 54 + }) 55 + } 56 + } 57 + 58 + func GetInstanceID() string { 59 + return instanceID 60 + }