Your locally hosted lumina server for IDAPro
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add async connection pool (#58)

authored by

Naim A and committed by
GitHub
e2da794d 226280b0

+153 -244
+93 -97
Cargo.lock
··· 13 13 14 14 [[package]] 15 15 name = "async-trait" 16 - version = "0.1.60" 16 + version = "0.1.61" 17 17 source = "registry+https://github.com/rust-lang/crates.io-index" 18 - checksum = "677d1d8ab452a3936018a687b20e6f7cf5363d713b732b8884001317b0e48aa3" 18 + checksum = "705339e0e4a9690e2908d2b3d049d85682cf19fbd5782494498fbf7003a6a282" 19 19 dependencies = [ 20 20 "proc-macro2", 21 21 "quote", ··· 102 102 103 103 [[package]] 104 104 name = "clap" 105 - version = "4.0.32" 105 + version = "4.1.1" 106 106 source = "registry+https://github.com/rust-lang/crates.io-index" 107 - checksum = "a7db700bc935f9e43e88d00b0850dae18a63773cfbec6d8e070fccf7fef89a39" 107 + checksum = "4ec7a4128863c188deefe750ac1d1dfe66c236909f845af04beed823638dc1b2" 108 108 dependencies = [ 109 109 "bitflags", 110 110 "clap_lex", ··· 115 115 116 116 [[package]] 117 117 name = "clap_lex" 118 - version = "0.3.0" 118 + version = "0.3.1" 119 119 source = "registry+https://github.com/rust-lang/crates.io-index" 120 - checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8" 120 + checksum = "783fe232adfca04f90f56201b26d79682d4cd2625e0bc7290b95123afe558ade" 121 121 dependencies = [ 122 122 "os_str_bytes", 123 123 ] ··· 127 127 version = "0.2.0" 128 128 dependencies = [ 129 129 "binascii", 130 + "deadpool-postgres", 130 131 "futures-util", 131 132 "log", 132 133 "native-tls", 133 134 "postgres-native-tls", 134 135 "serde", 135 136 "tokio", 136 - "tokio-postgres", 137 137 "toml", 138 138 "warp", 139 139 ] ··· 171 171 dependencies = [ 172 172 "generic-array", 173 173 "typenum", 174 + ] 175 + 176 + [[package]] 177 + name = "deadpool" 178 + version = "0.9.5" 179 + source = "registry+https://github.com/rust-lang/crates.io-index" 180 + checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e" 181 + dependencies = [ 182 + "async-trait", 183 + "deadpool-runtime", 184 + "num_cpus", 185 + "retain_mut", 186 + "tokio", 187 + ] 188 + 189 + [[package]] 190 + name = "deadpool-postgres" 191 + version = "0.10.4" 192 + source = "registry+https://github.com/rust-lang/crates.io-index" 193 + checksum = "051c50d234dab03bd29a537859bb7f50cca718c90b05e5c5746b9ae2a73f7278" 194 + dependencies = [ 195 + "deadpool", 196 + "log", 197 + "tokio", 198 + "tokio-postgres", 199 + ] 200 + 201 + [[package]] 202 + name = "deadpool-runtime" 203 + version = "0.1.2" 204 + source = "registry+https://github.com/rust-lang/crates.io-index" 205 + checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1" 206 + dependencies = [ 207 + "tokio", 174 208 ] 175 209 176 210 [[package]] ··· 548 582 549 583 [[package]] 550 584 name = "io-lifetimes" 551 - version = "1.0.3" 585 + version = "1.0.4" 552 586 source = "registry+https://github.com/rust-lang/crates.io-index" 553 - checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c" 587 + checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e" 554 588 dependencies = [ 555 589 "libc", 556 - "windows-sys 0.42.0", 590 + "windows-sys", 557 591 ] 558 592 559 593 [[package]] ··· 565 599 "hermit-abi 0.2.6", 566 600 "io-lifetimes", 567 601 "rustix", 568 - "windows-sys 0.42.0", 602 + "windows-sys", 569 603 ] 570 604 571 605 [[package]] ··· 665 699 "libc", 666 700 "log", 667 701 "wasi", 668 - "windows-sys 0.42.0", 702 + "windows-sys", 669 703 ] 670 704 671 705 [[package]] ··· 716 750 717 751 [[package]] 718 752 name = "once_cell" 719 - version = "1.16.0" 753 + version = "1.17.0" 720 754 source = "registry+https://github.com/rust-lang/crates.io-index" 721 - checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" 755 + checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66" 722 756 723 757 [[package]] 724 758 name = "openssl" ··· 783 817 784 818 [[package]] 785 819 name = "parking_lot_core" 786 - version = "0.9.5" 820 + version = "0.9.6" 787 821 source = "registry+https://github.com/rust-lang/crates.io-index" 788 - checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba" 822 + checksum = "ba1ef8814b5c993410bb3adfad7a5ed269563e4a2f90c41f5d85be7fb47133bf" 789 823 dependencies = [ 790 824 "cfg-if", 791 825 "libc", 792 826 "redox_syscall", 793 827 "smallvec", 794 - "windows-sys 0.42.0", 828 + "windows-sys", 795 829 ] 796 830 797 831 [[package]] ··· 916 950 917 951 [[package]] 918 952 name = "proc-macro2" 919 - version = "1.0.49" 953 + version = "1.0.50" 920 954 source = "registry+https://github.com/rust-lang/crates.io-index" 921 - checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5" 955 + checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2" 922 956 dependencies = [ 923 957 "unicode-ident", 924 958 ] ··· 979 1013 980 1014 [[package]] 981 1015 name = "regex" 982 - version = "1.7.0" 1016 + version = "1.7.1" 983 1017 source = "registry+https://github.com/rust-lang/crates.io-index" 984 - checksum = "e076559ef8e241f2ae3479e36f97bd5741c0330689e217ad51ce2c76808b868a" 1018 + checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733" 985 1019 dependencies = [ 986 1020 "aho-corasick", 987 1021 "memchr", ··· 1004 1038 ] 1005 1039 1006 1040 [[package]] 1041 + name = "retain_mut" 1042 + version = "0.1.9" 1043 + source = "registry+https://github.com/rust-lang/crates.io-index" 1044 + checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0" 1045 + 1046 + [[package]] 1007 1047 name = "rustix" 1008 - version = "0.36.6" 1048 + version = "0.36.7" 1009 1049 source = "registry+https://github.com/rust-lang/crates.io-index" 1010 - checksum = "4feacf7db682c6c329c4ede12649cd36ecab0f3be5b7d74e6a20304725db4549" 1050 + checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03" 1011 1051 dependencies = [ 1012 1052 "bitflags", 1013 1053 "errno", 1014 1054 "io-lifetimes", 1015 1055 "libc", 1016 1056 "linux-raw-sys", 1017 - "windows-sys 0.42.0", 1057 + "windows-sys", 1018 1058 ] 1019 1059 1020 1060 [[package]] ··· 1040 1080 1041 1081 [[package]] 1042 1082 name = "schannel" 1043 - version = "0.1.20" 1083 + version = "0.1.21" 1044 1084 source = "registry+https://github.com/rust-lang/crates.io-index" 1045 - checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2" 1085 + checksum = "713cfb06c7059f3588fb8044c0fad1d09e3c01d225e25b9220dbfdcf16dbb1b3" 1046 1086 dependencies = [ 1047 - "lazy_static", 1048 - "windows-sys 0.36.1", 1087 + "windows-sys", 1049 1088 ] 1050 1089 1051 1090 [[package]] ··· 1248 1287 1249 1288 [[package]] 1250 1289 name = "termcolor" 1251 - version = "1.1.3" 1290 + version = "1.2.0" 1252 1291 source = "registry+https://github.com/rust-lang/crates.io-index" 1253 - checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" 1292 + checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" 1254 1293 dependencies = [ 1255 1294 "winapi-util", 1256 1295 ] ··· 1292 1331 1293 1332 [[package]] 1294 1333 name = "tokio" 1295 - version = "1.23.1" 1334 + version = "1.24.2" 1296 1335 source = "registry+https://github.com/rust-lang/crates.io-index" 1297 - checksum = "38a54aca0c15d014013256222ba0ebed095673f89345dd79119d912eb561b7a8" 1336 + checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb" 1298 1337 dependencies = [ 1299 1338 "autocfg", 1300 1339 "bytes", ··· 1307 1346 "signal-hook-registry", 1308 1347 "socket2", 1309 1348 "tokio-macros", 1310 - "windows-sys 0.42.0", 1349 + "windows-sys", 1311 1350 ] 1312 1351 1313 1352 [[package]] ··· 1430 1469 1431 1470 [[package]] 1432 1471 name = "try-lock" 1433 - version = "0.2.3" 1472 + version = "0.2.4" 1434 1473 source = "registry+https://github.com/rust-lang/crates.io-index" 1435 - checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" 1474 + checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" 1436 1475 1437 1476 [[package]] 1438 1477 name = "tungstenite" ··· 1607 1646 1608 1647 [[package]] 1609 1648 name = "windows-sys" 1610 - version = "0.36.1" 1611 - source = "registry+https://github.com/rust-lang/crates.io-index" 1612 - checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" 1613 - dependencies = [ 1614 - "windows_aarch64_msvc 0.36.1", 1615 - "windows_i686_gnu 0.36.1", 1616 - "windows_i686_msvc 0.36.1", 1617 - "windows_x86_64_gnu 0.36.1", 1618 - "windows_x86_64_msvc 0.36.1", 1619 - ] 1620 - 1621 - [[package]] 1622 - name = "windows-sys" 1623 1649 version = "0.42.0" 1624 1650 source = "registry+https://github.com/rust-lang/crates.io-index" 1625 1651 checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" 1626 1652 dependencies = [ 1627 1653 "windows_aarch64_gnullvm", 1628 - "windows_aarch64_msvc 0.42.0", 1629 - "windows_i686_gnu 0.42.0", 1630 - "windows_i686_msvc 0.42.0", 1631 - "windows_x86_64_gnu 0.42.0", 1654 + "windows_aarch64_msvc", 1655 + "windows_i686_gnu", 1656 + "windows_i686_msvc", 1657 + "windows_x86_64_gnu", 1632 1658 "windows_x86_64_gnullvm", 1633 - "windows_x86_64_msvc 0.42.0", 1659 + "windows_x86_64_msvc", 1634 1660 ] 1635 1661 1636 1662 [[package]] 1637 1663 name = "windows_aarch64_gnullvm" 1638 - version = "0.42.0" 1664 + version = "0.42.1" 1639 1665 source = "registry+https://github.com/rust-lang/crates.io-index" 1640 - checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" 1666 + checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" 1641 1667 1642 1668 [[package]] 1643 1669 name = "windows_aarch64_msvc" 1644 - version = "0.36.1" 1670 + version = "0.42.1" 1645 1671 source = "registry+https://github.com/rust-lang/crates.io-index" 1646 - checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" 1647 - 1648 - [[package]] 1649 - name = "windows_aarch64_msvc" 1650 - version = "0.42.0" 1651 - source = "registry+https://github.com/rust-lang/crates.io-index" 1652 - checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" 1672 + checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" 1653 1673 1654 1674 [[package]] 1655 1675 name = "windows_i686_gnu" 1656 - version = "0.36.1" 1657 - source = "registry+https://github.com/rust-lang/crates.io-index" 1658 - checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" 1659 - 1660 - [[package]] 1661 - name = "windows_i686_gnu" 1662 - version = "0.42.0" 1663 - source = "registry+https://github.com/rust-lang/crates.io-index" 1664 - checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" 1665 - 1666 - [[package]] 1667 - name = "windows_i686_msvc" 1668 - version = "0.36.1" 1676 + version = "0.42.1" 1669 1677 source = "registry+https://github.com/rust-lang/crates.io-index" 1670 - checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" 1678 + checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" 1671 1679 1672 1680 [[package]] 1673 1681 name = "windows_i686_msvc" 1674 - version = "0.42.0" 1682 + version = "0.42.1" 1675 1683 source = "registry+https://github.com/rust-lang/crates.io-index" 1676 - checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" 1684 + checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" 1677 1685 1678 1686 [[package]] 1679 1687 name = "windows_x86_64_gnu" 1680 - version = "0.36.1" 1688 + version = "0.42.1" 1681 1689 source = "registry+https://github.com/rust-lang/crates.io-index" 1682 - checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" 1683 - 1684 - [[package]] 1685 - name = "windows_x86_64_gnu" 1686 - version = "0.42.0" 1687 - source = "registry+https://github.com/rust-lang/crates.io-index" 1688 - checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" 1690 + checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" 1689 1691 1690 1692 [[package]] 1691 1693 name = "windows_x86_64_gnullvm" 1692 - version = "0.42.0" 1694 + version = "0.42.1" 1693 1695 source = "registry+https://github.com/rust-lang/crates.io-index" 1694 - checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" 1696 + checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" 1695 1697 1696 1698 [[package]] 1697 1699 name = "windows_x86_64_msvc" 1698 - version = "0.36.1" 1700 + version = "0.42.1" 1699 1701 source = "registry+https://github.com/rust-lang/crates.io-index" 1700 - checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" 1701 - 1702 - [[package]] 1703 - name = "windows_x86_64_msvc" 1704 - version = "0.42.0" 1705 - source = "registry+https://github.com/rust-lang/crates.io-index" 1706 - checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" 1702 + checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
+2 -2
common/Cargo.toml
··· 10 10 tokio = {version = "1.23", features = ["full"], optional = true} 11 11 log = {version = "0.4", features = ["release_max_level_debug"]} 12 12 serde = {version = "1.0", features = ["derive"]} 13 - tokio-postgres = {version = "0.7", optional = true} 14 13 postgres-native-tls = {version = "0.5", optional = true} 15 14 native-tls = {version = "0.2", optional = true} 16 15 futures-util = "0.3" 17 16 toml = "0.5" 18 17 warp = {version = "0.3", optional = true} 19 18 binascii = "0.1" 19 + deadpool-postgres = {version = "0.10", optional = true} 20 20 21 21 [features] 22 22 default = ["web", "db"] 23 23 web = ["warp"] 24 - db = ["tokio", "tokio-postgres", "postgres-native-tls", "native-tls"] 24 + db = ["tokio", "postgres-native-tls", "native-tls", "deadpool-postgres"]
+58 -114
common/src/db/mod.rs
··· 1 1 use log::*; 2 - use tokio_postgres::{Client, NoTls}; 2 + use postgres_native_tls::MakeTlsConnector; 3 + use deadpool_postgres::{tokio_postgres::NoTls, Manager, Pool}; 3 4 use serde::Serialize; 4 - use std::{collections::HashMap, sync::Arc}; 5 - use tokio::sync::RwLock; 5 + use std::{collections::HashMap, sync::Arc, str::FromStr}; 6 6 use crate::config::Config; 7 7 8 8 pub type DynConfig = dyn crate::config::HasConfig + Send + Sync; 9 9 10 10 pub struct Database { 11 - config: Arc<DynConfig>, 12 - conn: RwLock<Client>, 13 - cache: RwLock<HashMap<String, tokio_postgres::Statement>>, 11 + pool: deadpool_postgres::Pool, 14 12 } 15 13 16 14 pub struct FunctionInfo { ··· 33 31 } 34 32 35 33 impl Database { 36 - pub async fn open(config: Arc<DynConfig>) -> Result<Self, tokio_postgres::Error> { 37 - let client = Self::connect(config.get_config()).await?; 34 + pub async fn open(config: Arc<DynConfig>) -> Result<Self, deadpool_postgres::tokio_postgres::Error> { 35 + let connection_string = config.get_config().database.connection_info.as_str(); 36 + let pg_config = deadpool_postgres::tokio_postgres::Config::from_str(connection_string)?; 37 + 38 + let mgr_config = deadpool_postgres::ManagerConfig { 39 + recycling_method: deadpool_postgres::RecyclingMethod::Verified 40 + }; 41 + 42 + let manager = if config.get_config().database.use_tls { 43 + let tls = Self::make_tls(config.get_config()).await; 44 + Manager::from_config(pg_config, tls, mgr_config) 45 + } else { 46 + Manager::from_config(pg_config, NoTls, mgr_config) 47 + }; 48 + 49 + let pool = Pool::builder(manager) 50 + .build() 51 + .expect("failed to build pool"); 38 52 39 53 Ok(Database{ 40 - config, 41 - conn: RwLock::new(client), 42 - cache: RwLock::new(HashMap::new()), 54 + pool, 43 55 }) 44 56 } 45 57 46 - pub async fn get_conn(&self) -> tokio::sync::RwLockReadGuard<'_, Client> { 47 - self.conn.read().await 48 - } 49 - 50 - async fn connect_tls(conn_info: &Config) -> Result<tokio_postgres::Client, tokio_postgres::Error> { 51 - use postgres_native_tls::MakeTlsConnector; 58 + async fn make_tls(conn_info: &Config) -> MakeTlsConnector { 52 59 use native_tls::{TlsConnector, Certificate, Identity}; 53 60 54 61 let mut tls_connector = TlsConnector::builder(); ··· 70 77 .build() 71 78 .expect("failed to build TlsConnector"); 72 79 73 - let connector = MakeTlsConnector::new(tls_connector); 74 - 75 - let (client, conn) = tokio_postgres::connect(&conn_info.database.connection_info, connector).await?; 76 - info!("database connected (tls)."); 77 - 78 - tokio::spawn(async { 79 - if let Err(e) = conn.await { 80 - error!("db connection error: {}", e); 81 - } 82 - }); 83 - 84 - Ok(client) 85 - } 86 - 87 - async fn connect_plain(conn_info: &str) -> Result<tokio_postgres::Client, tokio_postgres::Error> { 88 - let (client, conn) = tokio_postgres::connect(conn_info, NoTls).await?; 89 - info!("database connected."); 90 - 91 - tokio::spawn(async { 92 - if let Err(e) = conn.await { 93 - error!("db connection error: {}", e); 94 - } 95 - }); 96 - 97 - Ok(client) 98 - } 99 - 100 - async fn connect(conn_info: &Config) -> Result<tokio_postgres::Client, tokio_postgres::Error> { 101 - if conn_info.database.use_tls { 102 - info!("connecting with TLS..."); 103 - Self::connect_tls(conn_info).await 104 - } 105 - else { 106 - info!("connecting plain..."); 107 - Self::connect_plain(&conn_info.database.connection_info).await 108 - } 109 - } 110 - 111 - async fn prepare_cached<'a, 'b>(&'a self, sql: &'b str) -> Result<tokio_postgres::Statement, tokio_postgres::Error> { 112 - { 113 - let rd = self.cache.read().await; 114 - if let Some(v) = rd.get(sql) { 115 - return Ok(v.clone()); 116 - } 117 - } 118 - { 119 - let stmt = self.conn.read().await.prepare(sql).await?; 120 - let mut wr = self.cache.write().await; 121 - wr.insert(sql.to_string(), stmt); 122 - 123 - let v = wr.get(sql).expect("failed to get recently added value"); 124 - Ok(v.clone()) 125 - } 80 + MakeTlsConnector::new(tls_connector) 126 81 } 127 82 128 - pub async fn get_funcs(&self, funcs: &[crate::rpc::PullMetadataFunc<'_>]) -> Result<Vec<Option<FunctionInfo>>, tokio_postgres::Error> { 129 - let stmt = self.prepare_cached(r#" 83 + pub async fn get_funcs(&self, funcs: &[crate::rpc::PullMetadataFunc<'_>]) -> Result<Vec<Option<FunctionInfo>>, deadpool_postgres::PoolError> { 84 + let conn = self.pool.get().await?; 85 + let stmt = conn 86 + .prepare_cached(r#" 130 87 WITH best AS ( 131 88 select chksum,MAX(rank) as maxrank from funcs f1 132 89 WHERE chksum = ANY($1) ··· 135 92 SELECT f2.name,f2.len,f2.metadata,f2.chksum FROM best 136 93 LEFT JOIN funcs f2 ON (best.chksum=f2.chksum AND best.maxrank=f2.rank) 137 94 "#).await?; 138 - 139 - let conn = self.conn.read().await; 140 95 141 96 let chksums: Vec<&[u8]> = funcs.iter().map(|v| v.mb_hash).collect(); 142 97 ··· 167 122 Ok(res) 168 123 } 169 124 170 - pub async fn get_or_create_user<'a>(&self, user: &'a crate::rpc::RpcHello<'a>, funcs: Option<&'a crate::rpc::PushMetadata<'a>>) -> Result<i32, tokio_postgres::Error> { 171 - let stmt = self.prepare_cached( 125 + pub async fn get_or_create_user<'a>(&self, user: &'a crate::rpc::RpcHello<'a>, funcs: Option<&'a crate::rpc::PushMetadata<'a>>) -> Result<i32, deadpool_postgres::PoolError> { 126 + let conn = self.pool.get().await?; 127 + let stmt = conn.prepare_cached( 172 128 r#" 173 129 WITH ins AS ( 174 130 INSERT INTO users(lic_id, lic_data, hostname) ··· 185 141 let lic_data = user.license_data; 186 142 let hostname = funcs.map(|v| v.hostname); 187 143 188 - let row = self.conn.read().await.query(&stmt, &[&lic_id, &hostname, &lic_data]).await?; 144 + let row = conn.query(&stmt, &[&lic_id, &hostname, &lic_data]).await?; 189 145 if !row.is_empty() { 190 146 let id = row[0].get(0); 191 147 if row.len() > 1 { ··· 199 155 } 200 156 } 201 157 202 - async fn get_or_create_file<'a>(&self, funcs: &'a crate::rpc::PushMetadata<'a>) -> Result<i32, tokio_postgres::Error> { 203 - let stmt = self.prepare_cached(r#" 158 + async fn get_or_create_file<'a>(&self, funcs: &'a crate::rpc::PushMetadata<'a>) -> Result<i32, deadpool_postgres::PoolError> { 159 + let conn = self.pool.get().await?; 160 + let stmt = conn.prepare_cached(r#" 204 161 WITH ins AS ( 205 162 INSERT INTO files(chksum) 206 163 VALUES ($1) ··· 214 171 215 172 let hash = &funcs.md5[..]; 216 173 217 - let id: i32 = self.conn.read().await 218 - .query_one(&stmt, &[&hash]).await? 174 + let id: i32 = conn.query_one(&stmt, &[&hash]).await? 219 175 .get(0); 220 176 Ok(id) 221 177 } 222 178 223 - async fn get_or_create_db<'a>(&self, user: &'a crate::rpc::RpcHello<'a>, funcs: &'a crate::rpc::PushMetadata<'a>) -> Result<i32, tokio_postgres::Error> { 179 + async fn get_or_create_db<'a>(&self, user: &'a crate::rpc::RpcHello<'a>, funcs: &'a crate::rpc::PushMetadata<'a>) -> Result<i32, deadpool_postgres::PoolError> { 224 180 let file_id = self.get_or_create_file(funcs); 225 181 let user_id = self.get_or_create_user(user, Some(funcs)); 226 182 227 183 let (file_id, user_id): (i32, i32) = futures_util::try_join!(file_id, user_id)?; 228 184 229 - let stmt = self.prepare_cached(r#" 185 + let conn = self.pool.get().await?; 186 + let stmt = conn.prepare_cached(r#" 230 187 WITH ins AS ( 231 188 INSERT INTO dbs (user_id, file_id, file_path, idb_path) 232 189 VALUES ($1, $2, $3, $4) ··· 242 199 let file_path = funcs.file_path; 243 200 244 201 trace!("fid={}; uid={}", file_id, user_id); 245 - let row = self.conn.read().await 246 - .query_one(&stmt, &[&user_id, &file_id, &file_path, &idb_path]).await?; 202 + let row = conn.query_one(&stmt, &[&user_id, &file_id, &file_path, &idb_path]).await?; 247 203 248 204 let db_id = row.get(0); 249 205 250 206 Ok(db_id) 251 207 } 252 208 253 - pub async fn push_funcs<'a, 'b>(&'b self, user: &'a crate::rpc::RpcHello<'a>, funcs: &'a crate::rpc::PushMetadata<'a>, scores: &[u32]) -> Result<Vec<bool>, tokio_postgres::Error> { 209 + pub async fn push_funcs<'a, 'b>(&'b self, user: &'a crate::rpc::RpcHello<'a>, funcs: &'a crate::rpc::PushMetadata<'a>, scores: &[u32]) -> Result<Vec<bool>, deadpool_postgres::PoolError> { 254 210 let db_id = self.get_or_create_db(user, funcs).await?; 255 211 256 - let stmt = self.prepare_cached(r#" 212 + let mut conn = self.pool.get().await?; 213 + let stmt = conn.prepare_cached(r#" 257 214 INSERT INTO funcs AS f (name, len, chksum, metadata, db_id, rank) 258 215 VALUES ($1, $2, $3, $4, $5, $6) 259 216 ON CONFLICT (db_id,chksum) DO UPDATE ··· 268 225 269 226 // NOTE: Do not access self.conn/prepare_cached before dropping tx - it will deadlock! 270 227 { 271 - let mut tx = self.conn.write().await; 272 - let tx = tx.transaction().await?; 228 + let tx = conn.transaction().await?; 273 229 274 230 for (func, &score) in funcs.funcs.iter().zip(scores.iter()) { 275 231 let name = func.name; ··· 293 249 } 294 250 295 251 pub async fn is_online(&self) -> bool { 296 - let read = self.conn.read().await; 297 - !read.is_closed() 298 - } 299 - 300 - pub async fn reconnect(&self) -> Result<(), tokio_postgres::Error> { 301 - let connection = Self::connect(self.config.get_config()).await?; 302 - 303 - let conn = &mut *self.conn.write().await; 304 - 305 - self.cache.write().await.clear(); 306 - 307 - *conn = connection; 308 - Ok(()) 252 + !self.pool.is_closed() 309 253 } 310 254 311 - pub async fn get_stats(&self) -> Result<DbStats, tokio_postgres::Error> { 312 - let stmt = self.prepare_cached(r#" 255 + pub async fn get_stats(&self) -> Result<DbStats, deadpool_postgres::PoolError> { 256 + let conn = self.pool.get().await?; 257 + let stmt = conn.prepare_cached(r#" 313 258 SELECT 314 259 (SELECT COUNT(*)::int FROM users) as users, 315 260 (SELECT COUNT(distinct lic_id)::int FROM users) as hosts, ··· 318 263 (SELECT COUNT(*)::int FROM dbs) as dbs, 319 264 (SELECT COUNT(*)::int FROM files) as files 320 265 "#).await?; 321 - let db = self.conn.read().await; 322 - let row = db.query_one(&stmt, &[]).await?; 266 + let row = conn.query_one(&stmt, &[]).await?; 323 267 324 268 Ok(DbStats { 325 269 unique_lics: row.try_get(0)?, ··· 331 275 }) 332 276 } 333 277 334 - pub async fn get_file_funcs(&self, md5: &[u8], offset: i64, limit: i64) -> Result<Vec<(String, u32, [u8; 16])>, tokio_postgres::Error> { 335 - let stmt = self.prepare_cached(r#" 278 + pub async fn get_file_funcs(&self, md5: &[u8], offset: i64, limit: i64) -> Result<Vec<(String, u32, [u8; 16])>, deadpool_postgres::PoolError> { 279 + let conn = self.pool.get().await?; 280 + let stmt = conn.prepare_cached(r#" 336 281 SELECT fns.name, fns.len, fns.chksum FROM funcs AS fns 337 282 LEFT JOIN dbs AS d ON (d.id=fns.db_id) 338 283 LEFT JOIN files AS f ON (d.file_id=f.id) ··· 341 286 LIMIT $2 342 287 OFFSET $3 343 288 "#).await?; 344 - let db = self.conn.read().await; 345 - let rows = db.query(&stmt, &[&md5, &limit, &offset]).await?; 289 + let rows = conn.query(&stmt, &[&md5, &limit, &offset]).await?; 346 290 347 291 let res = rows.into_iter() 348 292 .map(|row| { ··· 359 303 Ok(res) 360 304 } 361 305 362 - pub async fn get_files_with_func(&self, func: &[u8]) -> Result<Vec<[u8; 16]>, tokio_postgres::Error> { 363 - let stmt = self.prepare_cached(r#" 306 + pub async fn get_files_with_func(&self, func: &[u8]) -> Result<Vec<[u8; 16]>, deadpool_postgres::PoolError> { 307 + let conn = self.pool.get().await?; 308 + let stmt = conn.prepare_cached(r#" 364 309 SELECT DISTINCT f.chksum FROM files f 365 310 LEFT JOIN dbs d ON (d.file_id = f.id) 366 311 LEFT JOIN funcs fns ON (fns.db_id = d.id) 367 312 WHERE 368 313 fns.chksum = $1 369 314 "#).await?; 370 - let db = self.conn.read().await; 371 - let rows = db.query(&stmt, &[&func]).await?; 315 + let rows = conn.query(&stmt, &[&func]).await?; 372 316 373 317 let res = rows 374 318 .into_iter()
-6
common/src/rpc/mod.rs
··· 14 14 Utf8Error(std::str::Utf8Error), 15 15 IOError(std::io::Error), 16 16 Serde(String), 17 - DbError(tokio_postgres::Error), 18 17 InvalidData, 19 18 OutOfMemory, 20 19 Todo, ··· 47 46 impl From<std::str::Utf8Error> for Error { 48 47 fn from(v: std::str::Utf8Error) -> Self { 49 48 Error::Utf8Error(v) 50 - } 51 - } 52 - impl From<tokio_postgres::Error> for Error { 53 - fn from(v: tokio_postgres::Error) -> Self { 54 - Error::DbError(v) 55 49 } 56 50 } 57 51 impl From<std::collections::TryReserveError> for Error {
-25
lumen/src/main.rs
··· 239 239 } 240 240 } 241 241 242 - async fn maintenance(state: std::sync::Weak<SharedState_>) { 243 - let mut timer = tokio::time::interval(std::time::Duration::from_secs(10)); 244 - 245 - loop { 246 - timer.tick().await; 247 - 248 - if let Some(state) = state.upgrade() { 249 - 250 - if !state.db.is_online().await { 251 - warn!("db is offline; attempting to reconnect..."); 252 - match state.db.reconnect().await { 253 - Ok(_) => info!("reconnected."), 254 - Err(err) => error!("failed to reconnect: {}", err), 255 - } 256 - } 257 - 258 - } else { 259 - warn!("shared state is not available"); 260 - break; 261 - } 262 - } 263 - } 264 - 265 242 fn main() { 266 243 setup_logger(); 267 244 let matches = clap::Command::new("lumen") ··· 311 288 config, 312 289 server_name, 313 290 }); 314 - 315 - rt.spawn(maintenance(Arc::downgrade(&state))); 316 291 317 292 let tls_acceptor; 318 293