diff --git a/Cargo.lock b/Cargo.lock index 489ae3b..f1453c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -174,7 +174,7 @@ dependencies = [ "borsh-derive-internal", "borsh-schema-derive-internal", "proc-macro-crate", - "proc-macro2 1.0.56", + "proc-macro2", "syn 1.0.109", ] @@ -184,8 +184,8 @@ version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "afb438156919598d2c7bad7e1c0adf3d26ed3840dbc010db1a882a65583ca2fb" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 1.0.109", ] @@ -195,8 +195,8 @@ version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "634205cc43f74a1b9046ef87c4540ebda95696ec0f315024860cad7c5b0f5ccd" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 1.0.109", ] @@ -235,8 +235,8 @@ version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e31225543cb46f81a7e224762764f4a6a0f097b1db0b175f69e8065efaa42de5" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 1.0.109", ] @@ -246,16 +246,6 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" -[[package]] -name = "bytes" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" -dependencies = [ - "byteorder", - "iovec", -] - [[package]] name = "bytes" version = "1.4.0" @@ -293,7 +283,7 @@ dependencies = [ "js-sys", "num-integer", "num-traits", - "time", + "time 0.1.45", "wasm-bindgen", "winapi", ] @@ -426,8 +416,8 @@ dependencies = [ "cc", "codespan-reporting", "once_cell", - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "scratch", "syn 2.0.15", ] @@ -444,11 +434,24 @@ version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2345488264226bf682893e25de0769f3360aac9957980ec49361b083ddaa5bc5" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 2.0.15", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if 1.0.0", + "hashbrown 0.12.3", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "digest" version = "0.8.1" @@ -559,6 +562,21 @@ version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -566,6 +584,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -580,10 +599,21 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" dependencies = [ - "futures", + "futures 0.1.31", "num_cpus", ] +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.28" @@ -608,8 +638,10 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -660,7 +692,7 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66b91535aa35fea1523ad1b86cb6b53c28e0ae566ba4a460f4457e936cad7c6f" dependencies = [ - "bytes 1.4.0", + "bytes", "fnv", "futures-core", "futures-sink", @@ -730,7 +762,7 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" dependencies = [ - "bytes 1.4.0", + "bytes", "fnv", "itoa", ] @@ -741,7 +773,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ - "bytes 1.4.0", + "bytes", "http", "pin-project-lite", ] @@ -770,7 +802,7 @@ version = "0.14.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4" dependencies = [ - "bytes 1.4.0", + "bytes", "futures-channel", "futures-core", "futures-util", @@ -865,15 +897,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "iovec" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" -dependencies = [ - "libc", -] - [[package]] name = "ipnet" version = "2.7.2" @@ -1082,7 +1105,7 @@ dependencies = [ "serde_json", "sha1", "sha2", - "time", + "time 0.1.45", "twox-hash", "uuid 0.7.4", ] @@ -1172,11 +1195,11 @@ version = "0.1.0" dependencies = [ "anyhow", "byteorder", - "bytes 0.4.12", + "bytes", "chrono", "crossbeam", "env_logger", - "futures", + "futures 0.1.31", "futures-cpupool", "lazy_static", "log", @@ -1197,11 +1220,12 @@ dependencies = [ "serde_json", "serial_test", "serial_test_derive", - "spin", + "spin 0.9.8", "tempfile", "test-log", - "time", - "tokio-codec", + "time 0.3.21", + "tokio", + "tokio-util", "uuid 1.3.1", "zstd", ] @@ -1302,8 +1326,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" dependencies = [ "proc-macro-error-attr", - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 1.0.109", "version_check", ] @@ -1314,20 +1338,11 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "version_check", ] -[[package]] -name = "proc-macro2" -version = "0.4.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" -dependencies = [ - "unicode-xid", -] - [[package]] name = "proc-macro2" version = "1.0.56" @@ -1339,9 +1354,9 @@ dependencies = [ [[package]] name = "prometheus-client" -version = "0.20.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e227aeb6c2cfec819e999c4773b35f8c7fa37298a203ff46420095458eee567e" +checksum = "78c2f43e8969d51935d2a7284878ae053ba30034cd563f673cde37ba5205685e" dependencies = [ "dtoa", "itoa", @@ -1355,8 +1370,8 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b6a5217beb0ad503ee7fa752d451c905113d70721b937126158f3106a48cc1" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 1.0.109", ] @@ -1375,8 +1390,8 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 1.0.109", ] @@ -1386,22 +1401,13 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" -[[package]] -name = "quote" -version = "0.6.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ce23b6b870e8f94f81fb0a363d65d86675884b34a09043c81e5562f11c1f8e1" -dependencies = [ - "proc-macro2 0.4.30", -] - [[package]] name = "quote" version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" dependencies = [ - "proc-macro2 1.0.56", + "proc-macro2", ] [[package]] @@ -1584,7 +1590,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27b71749df584b7f4cac2c426c127a7c785a5106cc98f7a8feb044115f0fa254" dependencies = [ "base64 0.21.0", - "bytes 1.4.0", + "bytes", "encoding_rs", "futures-core", "futures-util", @@ -1625,7 +1631,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -1651,8 +1657,8 @@ version = "0.7.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac1c672430eb41556291981f45ca900a0239ad007242d1cb4b4167af842db666" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 1.0.109", ] @@ -1666,7 +1672,7 @@ dependencies = [ "libc", "rand 0.3.23", "rustc-serialize", - "time", + "time 0.1.45", ] [[package]] @@ -1679,7 +1685,7 @@ dependencies = [ "borsh", "bytecheck", "byteorder", - "bytes 1.4.0", + "bytes", "num-traits", "rand 0.8.5", "rkyv", @@ -1812,9 +1818,9 @@ dependencies = [ [[package]] name = "serde_bytes" -version = "0.10.5" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defbb8a83d7f34cc8380751eeb892b825944222888aff18996ea7901f24aec88" +checksum = "416bda436f9aab92e02c8e10d49a15ddd339cea90b6e340fe51ed97abb548294" dependencies = [ "serde", ] @@ -1825,8 +1831,8 @@ version = "1.0.160" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 2.0.15", ] @@ -1864,21 +1870,27 @@ dependencies = [ [[package]] name = "serial_test" -version = "0.2.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50bfbc39343545618d97869d77f38ed43e48dd77432717dbc7ed39d797f3ecbe" +checksum = "0e56dd856803e253c8f298af3f4d7eb0ae5e23a737252cd90bb4f3b435033b2d" dependencies = [ + "dashmap", + "futures 0.3.28", "lazy_static", + "log", + "parking_lot", + "serial_test_derive", ] [[package]] name = "serial_test_derive" -version = "0.2.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89dd85be2e2ad75b041c9df2892ac078fa6e0b90024028b2b9fb4125b7530f01" +checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" dependencies = [ - "quote 0.6.13", - "syn 0.15.44", + "proc-macro2", + "quote", + "syn 2.0.15", ] [[package]] @@ -1908,6 +1920,15 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "simdutf8" version = "0.1.4" @@ -1945,6 +1966,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "sql-builder" version = "3.1.1" @@ -2022,30 +2052,19 @@ checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" dependencies = [ "heck", "proc-macro-error", - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 1.0.109", ] -[[package]] -name = "syn" -version = "0.15.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca4b3b69a77cbe1ffc9e198781b7acb0c7365a883670e8f1c1bc66fba79a5c5" -dependencies = [ - "proc-macro2 0.4.30", - "quote 0.6.13", - "unicode-xid", -] - [[package]] name = "syn" version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "unicode-ident", ] @@ -2055,8 +2074,8 @@ version = "2.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a34fcf3e8b60f57e6a14301a2e916d323af98b0ea63c599441eec8558660c822" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "unicode-ident", ] @@ -2088,8 +2107,8 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38f0c854faeb68a048f0f2dc410c5ddae3bf83854ef0e4977d58306a5edef50e" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 1.0.109", ] @@ -2117,8 +2136,8 @@ version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 2.0.15", ] @@ -2133,6 +2152,22 @@ dependencies = [ "winapi", ] +[[package]] +name = "time" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc" +dependencies = [ + "serde", + "time-core", +] + +[[package]] +name = "time-core" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" + [[package]] name = "tinyvec" version = "1.6.0" @@ -2150,40 +2185,32 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.27.0" +version = "1.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001" +checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" dependencies = [ "autocfg", - "bytes 1.4.0", + "bytes", "libc", "mio", "num_cpus", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", - "windows-sys 0.45.0", -] - -[[package]] -name = "tokio-codec" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b" -dependencies = [ - "bytes 0.4.12", - "futures", - "tokio-io", + "tokio-macros", + "windows-sys 0.48.0", ] [[package]] -name = "tokio-io" -version = "0.1.13" +name = "tokio-macros" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ - "bytes 0.4.12", - "futures", - "log", + "proc-macro2", + "quote", + "syn 2.0.15", ] [[package]] @@ -2203,7 +2230,7 @@ version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" dependencies = [ - "bytes 1.4.0", + "bytes", "futures-core", "futures-sink", "pin-project-lite", @@ -2336,12 +2363,6 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" -[[package]] -name = "unicode-xid" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" - [[package]] name = "untrusted" version = "0.7.1" @@ -2382,8 +2403,8 @@ version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20e8a505384e9309dc842520c6c9348f4b141dee06aaa845522727b1b99ca235" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 2.0.15", ] @@ -2452,8 +2473,8 @@ dependencies = [ "bumpalo", "log", "once_cell", - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 1.0.109", "wasm-bindgen-shared", ] @@ -2476,7 +2497,7 @@ version = "0.2.84" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5" dependencies = [ - "quote 1.0.26", + "quote", "wasm-bindgen-macro-support", ] @@ -2486,8 +2507,8 @@ version = "0.2.84" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6" dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.26", + "proc-macro2", + "quote", "syn 1.0.109", "wasm-bindgen-backend", "wasm-bindgen-shared", @@ -2720,18 +2741,18 @@ dependencies = [ [[package]] name = "zstd" -version = "0.11.2+zstd.1.5.2" +version = "0.12.3+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" +checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "5.0.2+zstd.1.5.2" +version = "6.0.5+zstd.1.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +checksum = "d56d9e60b4b1758206c238a10165fbcae3ca37b01744e394c463463f6529d23b" dependencies = [ "libc", "zstd-sys", diff --git a/Cargo.toml b/Cargo.toml index b67c5a4..2f9f587 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,12 +26,13 @@ path = "benches/concurrent_insert/mod.rs" anyhow = "1.0" log = "0.4" toml = "0.7.3" -prometheus-client = "0.20" +prometheus-client = "0.21" +tokio = { version = "1", features = ["full"] } [dependencies] anyhow = { workspace = true } byteorder = "1.2" -bytes = "0.4" +bytes = "1.4" chrono = "0.4" crossbeam = "0.8.2" futures = "0.1" @@ -49,22 +50,23 @@ reqwest = { version = "0.11.13", default-features = false, features = ["rustls-t rust-crypto = "0.2" scheduled-thread-pool = "0.2" serde = "1.0" -serde_bytes = "0.10" +serde_bytes = "0.11" serde_derive = "1.0" serde_json = "1.0" -spin = "0.5.0" -tokio-codec = "0.1" +spin = "0.9" +tokio = { workspace = true } +tokio-util = "0.7" uuid = { version = "1.3.0", default-features = false, features = ["v4", "fast-rng", "macro-diagnostics"] } -zstd = "0.11" +zstd = "0.12" [dev-dependencies] env_logger = "0.10" scoped_threadpool = "0.1" -serial_test = "0.2" -serial_test_derive = "0.2" +serial_test = "2.0" +serial_test_derive = "2.0" tempfile = "3.0" test-log = "0.2" -time = "0.1" +time = "0.3" [profile.release] debug = true diff --git a/src/lib.rs b/src/lib.rs index 3ba45e3..b147674 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,7 +23,7 @@ extern crate mysql; #[macro_use] extern crate serde; extern crate serde_bytes; -extern crate tokio_codec; +extern crate tokio_util; #[macro_use] extern crate serde_derive; #[macro_use] @@ -51,6 +51,7 @@ pub mod error; mod location; pub mod monitors; mod rpc; +mod runtime; pub mod serde_obkv; mod util; pub use self::{ diff --git a/src/monitors/mod.rs b/src/monitors/mod.rs index 341639a..6aca0b3 100644 --- a/src/monitors/mod.rs +++ b/src/monitors/mod.rs @@ -19,3 +19,4 @@ pub mod client_metrics; pub mod prometheus; pub mod proxy_metrics; pub mod rpc_metrics; +pub mod runtime_metrics; diff --git a/src/monitors/runtime_metrics.rs b/src/monitors/runtime_metrics.rs new file mode 100644 index 0000000..1e4ff1f --- /dev/null +++ b/src/monitors/runtime_metrics.rs @@ -0,0 +1,88 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the + * Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY + * KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +use prometheus_client::{ + encoding::{EncodeLabelSet, EncodeLabelValue}, + metrics::{family::Family, gauge}, + registry::Registry, +}; + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] +pub enum ObClientRuntimeGaugeType { + Default = 0, + ConnWriter = 1, + ConnReader = 2, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct RuntimeThreadLabels { + pub rt_type: ObClientRuntimeGaugeType, +} + +#[derive(Default)] +pub struct RuntimeGaugeMetrics { + runtime_thread_alive_gauges: Family, + runtime_thread_idle_gauges: Family, +} + +impl RuntimeGaugeMetrics { + pub fn register(&self, registry: &mut Registry) { + let sub_registry = registry.sub_registry_with_prefix("runtime"); + sub_registry.register( + "Runtime Alive Threads Gauges ", + "Client alive threads num.", + self.runtime_thread_alive_gauges.clone(), + ); + sub_registry.register( + "Runtime Idle Threads Gauges ", + "Client idle threads num.", + self.runtime_thread_idle_gauges.clone(), + ); + } + + pub fn on_thread_start(&self, rt_type: ObClientRuntimeGaugeType) { + self.runtime_thread_alive_gauges + .get_or_create(&RuntimeThreadLabels { rt_type }) + .inc(); + } + + pub fn on_thread_stop(&self, rt_type: ObClientRuntimeGaugeType) { + self.runtime_thread_alive_gauges + .get_or_create(&RuntimeThreadLabels { rt_type }) + .dec(); + } + + pub fn on_thread_park(&self, rt_type: ObClientRuntimeGaugeType) { + self.runtime_thread_alive_gauges + .get_or_create(&RuntimeThreadLabels { rt_type }) + .inc(); + } + + pub fn on_thread_unpark(&self, rt_type: ObClientRuntimeGaugeType) { + self.runtime_thread_alive_gauges + .get_or_create(&RuntimeThreadLabels { rt_type }) + .dec(); + } + + pub fn get_runtime_thread_alive_gauges(&self) -> &Family { + &self.runtime_thread_alive_gauges + } + + pub fn get_runtime_thread_idle_gauges(&self) -> &Family { + &self.runtime_thread_idle_gauges + } +} diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index b1c8dc0..e7263c7 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -38,7 +38,7 @@ use byteorder::{BigEndian, ByteOrder}; use bytes::BytesMut; use crossbeam::channel::{bounded, unbounded, Receiver, Sender}; use net2::{TcpBuilder, TcpStreamExt}; -use tokio_codec::{Decoder, Encoder}; +use tokio_util::codec::{Decoder, Encoder}; use uuid::Uuid; use self::protocol::{ diff --git a/src/rpc/protocol/mod.rs b/src/rpc/protocol/mod.rs index e288957..ef7f1c8 100644 --- a/src/rpc/protocol/mod.rs +++ b/src/rpc/protocol/mod.rs @@ -21,8 +21,8 @@ use std::{ sync::atomic::{AtomicI32, Ordering}, }; -use bytes::{Buf, BufMut, BytesMut, IntoBuf}; -use tokio_codec::{Decoder, Encoder}; +use bytes::{Buf, BufMut, BytesMut}; +use tokio_util::codec::{Decoder, Encoder}; use crate::{error::Error, serde_obkv::util, util as u}; @@ -167,14 +167,14 @@ impl ProtoEncoder for ObRpcCostTime { fn encode(&self, buf: &mut BytesMut) -> Result<()> { buf.reserve(COST_TIME_ENCODE_SIZE); - buf.put_i32_be(self.len); - buf.put_i32_be(self.arrival_push_diff); - buf.put_i32_be(self.push_pop_diff); - buf.put_i32_be(self.pop_process_start_diff); - buf.put_i32_be(self.process_start_end_diff); - buf.put_i32_be(self.process_end_response_diff); - buf.put_i64_be(self.packet_id); - buf.put_i64_be(self.request_arrive_time); + buf.put_i32(self.len); + buf.put_i32(self.arrival_push_diff); + buf.put_i32(self.push_pop_diff); + buf.put_i32(self.pop_process_start_diff); + buf.put_i32(self.process_start_end_diff); + buf.put_i32(self.process_end_response_diff); + buf.put_i64(self.packet_id); + buf.put_i64(self.request_arrive_time); Ok(()) } @@ -195,14 +195,14 @@ impl ProtoDecoder for ObRpcCostTime { { let mut buf = Cursor::new(&mut *buf); - self.len = buf.get_i32_be(); - self.arrival_push_diff = buf.get_i32_be(); - self.push_pop_diff = buf.get_i32_be(); - self.pop_process_start_diff = buf.get_i32_be(); - self.process_start_end_diff = buf.get_i32_be(); - self.process_end_response_diff = buf.get_i32_be(); - self.packet_id = buf.get_i64_be(); - self.request_arrive_time = buf.get_i64_be(); + self.len = buf.get_i32(); + self.arrival_push_diff = buf.get_i32(); + self.push_pop_diff = buf.get_i32(); + self.pop_process_start_diff = buf.get_i32(); + self.process_start_end_diff = buf.get_i32(); + self.process_end_response_diff = buf.get_i32(); + self.packet_id = buf.get_i64(); + self.request_arrive_time = buf.get_i64(); } buf.advance(COST_TIME_ENCODE_SIZE); Ok(()) @@ -451,31 +451,31 @@ impl ProtoEncoder for ObRpcPacket { impl ProtoEncoder for ObRpcPacketHeader { fn encode(&self, buf: &mut BytesMut) -> Result<()> { buf.reserve(RPC_PACKET_HEADER_SIZE_V4); - buf.put_u32_be(self.pcode); + buf.put_u32(self.pcode); buf.put_u8(self.hlen); buf.put_u8(self.priority); - buf.put_u16_be(self.flag); - buf.put_i64_be(self.checksum); - buf.put_u64_be(self.tenant_id); - buf.put_u64_be(self.prev_tenant_id); - buf.put_u64_be(self.session_id); - buf.put_u64_be(self.trace_id0); - buf.put_u64_be(self.trace_id1); - buf.put_i64_be(self.timeout); - buf.put_i64_be(self.timestamp); + buf.put_u16(self.flag); + buf.put_i64(self.checksum); + buf.put_u64(self.tenant_id); + buf.put_u64(self.prev_tenant_id); + buf.put_u64(self.session_id); + buf.put_u64(self.trace_id0); + buf.put_u64(self.trace_id1); + buf.put_i64(self.timeout); + buf.put_i64(self.timestamp); self.rpc_cost_time.encode(buf)?; - buf.put_i64_be(self.cluster_id); - buf.put_i32_be(self.compress_type.clone() as i32); - buf.put_i32_be(self.original_len); + buf.put_i64(self.cluster_id); + buf.put_i32(self.compress_type.clone() as i32); + buf.put_i32(self.original_len); // TODO: check observer 4.x is ok - buf.put_i64_be(self.src_cluster_id); - buf.put_i64_be(self.unis_version); - buf.put_i32_be(self.request_level); - buf.put_i64_be(self.seq_no); - buf.put_i32_be(self.group_id); - buf.put_i64_be(self.trace_id2); - buf.put_i64_be(self.trace_id3); - buf.put_i64_be(self.cluster_name_hash); + buf.put_i64(self.src_cluster_id); + buf.put_i64(self.unis_version); + buf.put_i32(self.request_level); + buf.put_i64(self.seq_no); + buf.put_i32(self.group_id); + buf.put_i64(self.trace_id2); + buf.put_i64(self.trace_id3); + buf.put_i64(self.cluster_name_hash); Ok(()) } @@ -483,28 +483,26 @@ impl ProtoEncoder for ObRpcPacketHeader { impl ProtoDecoder for ObRpcPacketHeader { fn decode(&mut self, buf: &mut BytesMut) -> Result<()> { - let mut src = util::split_buf_to(buf, HEADER_SIZE) - .map_err(|e| { - error!( - "ObRpcPacketHeader::decode fail to split basic header, err:{}", - e - ); + let mut src = util::split_buf_to(buf, HEADER_SIZE).map_err(|e| { + error!( + "ObRpcPacketHeader::decode fail to split basic header, err:{}", e - })? - .into_buf(); + ); + e + })?; - self.pcode = src.get_u32_be(); + self.pcode = src.get_u32(); self.hlen = src.get_u8(); self.priority = src.get_u8(); - self.flag = src.get_u16_be(); - self.checksum = src.get_i64_be(); - self.tenant_id = src.get_u64_be(); - self.prev_tenant_id = src.get_u64_be(); - self.session_id = src.get_u64_be(); - self.trace_id0 = src.get_u64_be(); - self.trace_id1 = src.get_u64_be(); - self.timeout = src.get_i64_be(); - self.timestamp = src.get_i64_be(); + self.flag = src.get_u16(); + self.checksum = src.get_i64(); + self.tenant_id = src.get_u64(); + self.prev_tenant_id = src.get_u64(); + self.session_id = src.get_u64(); + self.trace_id0 = src.get_u64(); + self.trace_id1 = src.get_u64(); + self.timeout = src.get_i64(); + self.timestamp = src.get_i64(); let hlen = self.hlen as usize; @@ -518,20 +516,19 @@ impl ProtoDecoder for ObRpcPacketHeader { let mut src = util::split_buf_to( buf, self.get_encoded_size_with_newserver() - self.get_encoded_size_with_cost_time(), - )? - .into_buf(); - self.cluster_id = src.get_i64_be(); - self.compress_type = ObCompressType::from_i32(src.get_i32_be())?; - self.original_len = src.get_i32_be(); + )?; + self.cluster_id = src.get_i64(); + self.compress_type = ObCompressType::from_i32(src.get_i32())?; + self.original_len = src.get_i32(); // decode for version:V4 - self.src_cluster_id = src.get_i64_be(); - self.unis_version = src.get_i64_be(); - self.request_level = src.get_i32_be(); - self.seq_no = src.get_i64_be(); - self.group_id = src.get_i32_be(); - self.trace_id2 = src.get_i64_be(); - self.trace_id3 = src.get_i64_be(); - self.cluster_name_hash = src.get_i64_be(); + self.src_cluster_id = src.get_i64(); + self.unis_version = src.get_i64(); + self.request_level = src.get_i32(); + self.seq_no = src.get_i64(); + self.group_id = src.get_i32(); + self.trace_id2 = src.get_i64(); + self.trace_id3 = src.get_i64(); + self.cluster_name_hash = src.get_i64(); hlen - self.get_encoded_size_with_newserver() } else if hlen >= self.get_encoded_size() { // header with cost_time, cluster_id, compress_type, original_len @@ -539,11 +536,10 @@ impl ProtoDecoder for ObRpcPacketHeader { let mut src = util::split_buf_to( buf, self.get_encoded_size() - self.get_encoded_size_with_cost_time(), - )? - .into_buf(); - self.cluster_id = src.get_i64_be(); - self.compress_type = ObCompressType::from_i32(src.get_i32_be())?; - self.original_len = src.get_i32_be(); + )?; + self.cluster_id = src.get_i64(); + self.compress_type = ObCompressType::from_i32(src.get_i32())?; + self.original_len = src.get_i32(); hlen - self.get_encoded_size() } else if hlen >= self.get_encoded_size_with_cost_time_and_cluster_id() { // header with cost_time, cluster_id @@ -552,9 +548,8 @@ impl ProtoDecoder for ObRpcPacketHeader { buf, self.get_encoded_size_with_cost_time_and_cluster_id() - self.get_encoded_size_with_cost_time(), - )? - .into_buf(); - self.cluster_id = src.get_i64_be(); + )?; + self.cluster_id = src.get_i64(); hlen - self.get_encoded_size_with_cost_time_and_cluster_id() } else if hlen >= self.get_encoded_size_with_cost_time() { self.rpc_cost_time.decode(buf)?; @@ -778,9 +773,8 @@ impl ObTablePacketCodec { } } -impl Encoder for ObTablePacketCodec { +impl Encoder for ObTablePacketCodec { type Error = io::Error; - type Item = ObTablePacket; fn encode(&mut self, packet: ObTablePacket, buf: &mut BytesMut) -> Result<()> { match packet { @@ -794,8 +788,8 @@ impl Encoder for ObTablePacketCodec { let content_len = content.len(); buf.reserve(4 + 4 + 4 + 4 + content_len); buf.put_slice(MAGIC_HEADER_FLAG); - buf.put_i32_be(content_len as i32); - buf.put_i32_be(id); + buf.put_i32(content_len as i32); + buf.put_i32(id); buf.put_slice(RESERVED); buf.extend_from_slice(&content[..]); @@ -879,10 +873,10 @@ impl Decoder for ObTablePacketCodec { { let mut src = Cursor::new(&mut *buf); - let dlen = src.get_i32_be(); - let chid = src.get_i32_be(); + let dlen = src.get_i32(); + let chid = src.get_i32(); //reserved - let _reserved = src.get_i32_be(); + let _reserved = src.get_i32(); self.dlen = dlen; self.chid = chid; trace!("ObTablePacketCodec::decode chid={}, dlen={}", chid, dlen); diff --git a/src/rpc/protocol/payloads.rs b/src/rpc/protocol/payloads.rs index b89f41a..93486d7 100644 --- a/src/rpc/protocol/payloads.rs +++ b/src/rpc/protocol/payloads.rs @@ -23,7 +23,7 @@ use std::{ time::Duration, }; -use bytes::{Buf, BufMut, BytesMut, IntoBuf}; +use bytes::{Buf, BufMut, BytesMut}; use super::{BasePayLoad, ObPayload, ObTablePacketCode, ProtoDecoder, ProtoEncoder, Result}; use crate::{ @@ -1272,8 +1272,7 @@ impl ProtoDecoder for ObTableOperationResult { self.decode_base(src)?; self.header.decode(src)?; - self.operation_type = - ObTableOperationType::from_i8(util::split_buf_to(src, 1)?.into_buf().get_i8())?; + self.operation_type = ObTableOperationType::from_i8(util::split_buf_to(src, 1)?.get_i8())?; self.entity.decode(src)?; self.affected_rows = util::decode_vi64(src)?; Ok(()) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs new file mode 100644 index 0000000..45882ab --- /dev/null +++ b/src/runtime/mod.rs @@ -0,0 +1,16 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the + * Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY + * KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ diff --git a/src/serde_obkv/util.rs b/src/serde_obkv/util.rs index d59f430..c1a8fbf 100644 --- a/src/serde_obkv/util.rs +++ b/src/serde_obkv/util.rs @@ -17,7 +17,7 @@ use std::{f32, f64, io::Cursor, str}; -use bytes::{Buf, BufMut, BytesMut, IntoBuf}; +use bytes::{Buf, BufMut, BytesMut}; use super::error::{Error, Result}; @@ -222,7 +222,7 @@ pub fn encode_bytes_string(v: &[u8], buf: &mut BytesMut) -> Result<()> { buf.reserve(encoded_length_bytes_string(v)); encode_vi32(v.len() as i32, buf)?; buf.put_slice(v); - buf.put(END); + buf.put_u8(END); Ok(()) } @@ -283,14 +283,14 @@ pub fn decode_i8(buf: &mut BytesMut) -> Result { if buf.is_empty() { return Err(Error::Custom("buf EOF".into())); } - Ok(split_buf_to(buf, 1)?.into_buf().get_i8()) + Ok(split_buf_to(buf, 1)?.get_i8()) } pub fn decode_u8(buf: &mut BytesMut) -> Result { if buf.is_empty() { return Err(Error::Custom("buf EOF".into())); } - Ok(split_buf_to(buf, 1)?.into_buf().get_u8()) + Ok(split_buf_to(buf, 1)?.get_u8()) } #[cfg(test)] diff --git a/src/serde_obkv/value/mod.rs b/src/serde_obkv/value/mod.rs index a15c6e6..8bb0e2a 100644 --- a/src/serde_obkv/value/mod.rs +++ b/src/serde_obkv/value/mod.rs @@ -24,7 +24,7 @@ pub mod from; use std::hash::{Hash, Hasher}; -use bytes::{Buf, BufMut, BytesMut, IntoBuf}; +use bytes::{Buf, BufMut, BytesMut}; use serde::ser::{Serialize, Serializer}; use super::{ @@ -214,7 +214,7 @@ impl ObjMeta { pub fn decode(buf: &mut BytesMut) -> Result { if buf.len() >= 4 { - let mut buf = split_buf_to(buf, 4)?.into_buf(); + let mut buf = split_buf_to(buf, 4)?; let obj_type = ObjType::from_u8(buf.get_u8())?; let cs_level = CollationLevel::from_u8(buf.get_u8())?; let cs_type = CollationType::from_u8(buf.get_u8())?; diff --git a/tests/test_table_client_base.rs b/tests/test_table_client_base.rs index 7de2e88..7d94351 100644 --- a/tests/test_table_client_base.rs +++ b/tests/test_table_client_base.rs @@ -28,7 +28,6 @@ use std::{ }; use obkv::{error::CommonErrCode, ObTableClient, ResultCodes, Table, TableQuery, Value}; -use time::PreciseTime; pub struct BaseTest { client: Arc, @@ -46,7 +45,8 @@ impl BaseTest { pub fn test_varchar_concurrent(&self, table_name: &'static str) { let mut handles = vec![]; - let start = PreciseTime::now(); + let start = SystemTime::now(); + let counter = Arc::new(AtomicUsize::new(0)); for _ in 0..BaseTest::THREAD_NUM { let client = self.client.clone(); @@ -89,21 +89,20 @@ impl BaseTest { for handle in handles { handle.join().expect("should succeed to join"); } - let end = PreciseTime::now(); assert_eq!( BaseTest::ROW_NUM * BaseTest::THREAD_NUM, counter.load(Ordering::SeqCst) ); println!( "{} seconds for insert_or_update {} rows.", - start.to(end), + start.elapsed().unwrap().as_secs(), BaseTest::ROW_NUM * BaseTest::THREAD_NUM ); } pub fn test_bigint_concurrent(&self, table_name: &'static str) { let mut handles = vec![]; - let start = PreciseTime::now(); + let start = SystemTime::now(); let counter = Arc::new(AtomicUsize::new(0)); for _ in 0..10 { let client = self.client.clone(); @@ -138,11 +137,10 @@ impl BaseTest { for handle in handles { handle.join().expect("should succeed to join"); } - let end = PreciseTime::now(); assert_eq!(1000, counter.load(Ordering::SeqCst)); println!( "{} seconds for insert_or_update {} rows.", - start.to(end), + start.elapsed().unwrap().as_secs(), 1000 ); }