7
7
import time
8
8
from datetime import datetime , timedelta
9
9
import concurrent
10
+ import sys
10
11
import uuid
11
12
12
13
from azure .servicebus import ServiceBusClient , Message , BatchMessage
@@ -18,22 +19,33 @@ class ReceiveType:
18
19
pull = "pull"
19
20
20
21
21
- class StressTestResults :
22
- total_sent = 0
23
- total_received = 0
24
- time_elapsed = None
25
- state_by_sender = {}
26
- state_by_receiver = {}
22
+ class StressTestResults (object ):
23
+ def __init__ (self ):
24
+ self .total_sent = 0
25
+ self .total_received = 0
26
+ self .time_elapsed = None
27
+ self .state_by_sender = {}
28
+ self .state_by_receiver = {}
27
29
30
+ def __repr__ (self ):
31
+ return str (vars (self ))
28
32
29
- class StressTestRunnerState :
33
+
34
+ class StressTestRunnerState (object ):
30
35
'''Per-runner state, e.g. if you spawn 3 senders each will have this as their state object,
31
36
which will be coalesced at completion into StressTestResults'''
32
- total_sent = 0
33
- total_received = 0
37
+ def __init__ (self ):
38
+ self .total_sent = 0
39
+ self .total_received = 0
40
+
41
+ def __repr__ (self ):
42
+ return str (vars (self ))
34
43
35
44
36
45
class StressTestRunner :
46
+ '''Framework for running a service bus stress test.
47
+ Duration can be overriden via the --stress_test_duration flag from the command line'''
48
+
37
49
def __init__ (self ,
38
50
senders ,
39
51
receivers ,
@@ -58,6 +70,11 @@ def __init__(self,
58
70
# If we ever require multiple runs of this one after another, just make Run() reset this.
59
71
self ._state = StressTestRunnerState ()
60
72
73
+ self ._duration_override = None
74
+ for arg in sys .argv :
75
+ if arg .startswith ('--stress_test_duration_seconds=' ):
76
+ self ._duration_override = timedelta (seconds = int (arg .split ('=' )[1 ]))
77
+
61
78
62
79
# Plugin functions the caller can override to further tailor the test.
63
80
@staticmethod
@@ -103,7 +120,7 @@ def _ConstructMessage(self):
103
120
message = Message (self .PreProcessMessageBody ("a" * self .message_size ))
104
121
self .PreProcessMessage (message )
105
122
batch .add (message )
106
- self .PreProcessBatch (batch )
123
+ self .PreProcessMessageBatch (batch )
107
124
return batch
108
125
else :
109
126
message = Message (self .PreProcessMessageBody ("a" * self .message_size ))
@@ -147,7 +164,7 @@ def _Receive(self, receiver, end_time):
147
164
148
165
def Run (self ):
149
166
start_time = datetime .now ()
150
- end_time = start_time + self .duration
167
+ end_time = start_time + ( self ._duration_override or self . duration )
151
168
sent_messages = 0
152
169
received_messages = 0
153
170
with concurrent .futures .ProcessPoolExecutor (max_workers = 4 ) as proc_pool :
@@ -160,5 +177,6 @@ def Run(self):
160
177
result .total_sent = sum ([r .total_sent for r in result .state_by_sender .values ()])
161
178
result .total_received = sum ([r .total_received for r in result .state_by_receiver .values ()])
162
179
result .time_elapsed = end_time - start_time
180
+ print ("Stress test completed. Results:\n " , result )
163
181
return result
164
182
0 commit comments