@@ -14,57 +14,103 @@ async fn run_unified() {
14
14
}
15
15
16
16
#[ tokio:: test]
17
- async fn command_batching ( ) {
17
+ async fn max_write_batch_size_batching ( ) {
18
18
let handler = Arc :: new ( EventHandler :: new ( ) ) ;
19
19
let client = Client :: test_builder ( )
20
20
. event_handler ( handler. clone ( ) )
21
21
. build ( )
22
22
. await ;
23
23
let mut subscriber = handler. subscribe ( ) ;
24
24
25
- let max_object_size = client. server_info . max_bson_object_size as usize ;
26
- let max_message_size = client. server_info . max_message_size_bytes as usize ;
25
+ let max_write_batch_size = client. server_info . max_write_batch_size . unwrap ( ) as usize ;
27
26
28
- let namespace = Namespace :: new ( "command_batching" , "command_batching" ) ;
29
- let large_doc = doc ! { "a" : "b" . repeat( max_object_size / 2 ) } ;
30
- let models = vec ! [
31
- WriteModel :: InsertOne {
32
- namespace: namespace. clone( ) ,
33
- document: large_doc,
34
- } ;
35
- 3
36
- ] ;
37
- client. bulk_write ( models) . await . unwrap ( ) ;
27
+ let model = WriteModel :: InsertOne {
28
+ namespace : Namespace :: new ( "db" , "coll" ) ,
29
+ document : doc ! { "a" : "b" } ,
30
+ } ;
31
+ let models = vec ! [ model; max_write_batch_size + 1 ] ;
32
+
33
+ let result = client. bulk_write ( models) . await . unwrap ( ) ;
34
+ assert_eq ! ( result. inserted_count as usize , max_write_batch_size + 1 ) ;
35
+
36
+ let ( first_started, _) = subscriber
37
+ . wait_for_successful_command_execution ( Duration :: from_millis ( 500 ) , "bulkWrite" )
38
+ . await
39
+ . expect ( "no events observed" ) ;
40
+ let first_len = first_started. command . get_array ( "ops" ) . unwrap ( ) . len ( ) ;
41
+ assert_eq ! ( first_len, max_write_batch_size) ;
42
+
43
+ let ( second_started, _) = subscriber
44
+ . wait_for_successful_command_execution ( Duration :: from_millis ( 500 ) , "bulkWrite" )
45
+ . await
46
+ . expect ( "no events observed" ) ;
47
+ let second_len = second_started. command . get_array ( "ops" ) . unwrap ( ) . len ( ) ;
48
+ assert_eq ! ( second_len, 1 ) ;
49
+ }
50
+
51
+ #[ tokio:: test]
52
+ async fn max_bson_object_size_with_document_sequences ( ) {
53
+ let handler = Arc :: new ( EventHandler :: new ( ) ) ;
54
+ let client = Client :: test_builder ( )
55
+ . event_handler ( handler. clone ( ) )
56
+ . build ( )
57
+ . await ;
58
+ let mut subscriber = handler. subscribe ( ) ;
59
+
60
+ let max_bson_object_size = client. server_info . max_bson_object_size as usize ;
61
+
62
+ let document = doc ! { "a" : "b" . repeat( max_bson_object_size / 2 ) } ;
63
+ let model = WriteModel :: InsertOne {
64
+ namespace : Namespace :: new ( "db" , "coll" ) ,
65
+ document,
66
+ } ;
67
+ let models = vec ! [ model; 2 ] ;
68
+
69
+ let result = client. bulk_write ( models) . await . unwrap ( ) ;
70
+ assert_eq ! ( result. inserted_count as usize , 2 ) ;
38
71
39
72
let ( started, _) = subscriber
40
73
. wait_for_successful_command_execution ( Duration :: from_millis ( 500 ) , "bulkWrite" )
41
74
. await
42
75
. expect ( "no events observed" ) ;
43
- let ops = started. command . get_array ( "ops" ) . unwrap ( ) ;
44
- assert_eq ! ( ops. len( ) , 3 ) ;
45
-
46
- let large_doc = doc ! { "a" : "b" . repeat( max_object_size - 5000 ) } ;
47
- let num_models = max_message_size / max_object_size + 1 ;
48
- let models = vec ! [
49
- WriteModel :: InsertOne {
50
- namespace: namespace. clone( ) ,
51
- document: large_doc
52
- } ;
53
- num_models
54
- ] ;
55
- client. bulk_write ( models) . await . unwrap ( ) ;
76
+ let len = started. command . get_array ( "ops" ) . unwrap ( ) . len ( ) ;
77
+ assert_eq ! ( len, 2 ) ;
78
+ }
79
+
80
+ #[ tokio:: test]
81
+ async fn max_message_size_bytes_batching ( ) {
82
+ let handler = Arc :: new ( EventHandler :: new ( ) ) ;
83
+ let client = Client :: test_builder ( )
84
+ . event_handler ( handler. clone ( ) )
85
+ . build ( )
86
+ . await ;
87
+ let mut subscriber = handler. subscribe ( ) ;
88
+
89
+ let max_bson_object_size = client. server_info . max_bson_object_size as usize ;
90
+ let max_message_size_bytes = client. server_info . max_message_size_bytes as usize ;
91
+
92
+ let document = doc ! { "a" : "b" . repeat( max_bson_object_size - 500 ) } ;
93
+ let model = WriteModel :: InsertOne {
94
+ namespace : Namespace :: new ( "db" , "coll" ) ,
95
+ document,
96
+ } ;
97
+ let num_models = max_message_size_bytes / max_bson_object_size + 1 ;
98
+ let models = vec ! [ model; num_models] ;
99
+
100
+ let result = client. bulk_write ( models) . await . unwrap ( ) ;
101
+ assert_eq ! ( result. inserted_count as usize , num_models) ;
56
102
57
103
let ( first_started, _) = subscriber
58
104
. wait_for_successful_command_execution ( Duration :: from_millis ( 500 ) , "bulkWrite" )
59
105
. await
60
106
. expect ( "no events observed" ) ;
61
- let first_len = first_started. command . get_array ( "ops" ) . unwrap ( ) . len ( ) ;
62
- assert ! ( first_len < num_models) ;
107
+ let first_ops_len = first_started. command . get_array ( "ops" ) . unwrap ( ) . len ( ) ;
63
108
64
109
let ( second_started, _) = subscriber
65
110
. wait_for_successful_command_execution ( Duration :: from_millis ( 500 ) , "bulkWrite" )
66
111
. await
67
112
. expect ( "no events observed" ) ;
68
- let second_len = second_started. command . get_array ( "ops" ) . unwrap ( ) . len ( ) ;
69
- assert_eq ! ( first_len + second_len, num_models) ;
113
+ let second_ops_len = second_started. command . get_array ( "ops" ) . unwrap ( ) . len ( ) ;
114
+
115
+ assert_eq ! ( first_ops_len + second_ops_len, num_models) ;
70
116
}
0 commit comments