7
7
import time
8
8
from datetime import datetime , timedelta
9
9
import concurrent
10
+ import sys
10
11
import uuid
11
12
12
13
from azure .servicebus import ServiceBusClient , Message , BatchMessage
@@ -18,22 +19,30 @@ class ReceiveType:
18
19
pull = "pull"
19
20
20
21
21
- class StressTestResults :
22
- total_sent = 0
23
- total_received = 0
24
- time_elapsed = None
25
- state_by_sender = {}
26
- state_by_receiver = {}
22
+ class StressTestResults (object ):
23
+ def __init__ (self ):
24
+ self .total_sent = 0
25
+ self .total_received = 0
26
+ self .time_elapsed = None
27
+ self .state_by_sender = {}
28
+ self .state_by_receiver = {}
27
29
30
+ def __repr__ (self ):
31
+ return str (vars (self ))
28
32
29
- class StressTestRunnerState :
33
+
34
+ class StressTestRunnerState (object ):
30
35
'''Per-runner state, e.g. if you spawn 3 senders each will have this as their state object,
31
36
which will be coalesced at completion into StressTestResults'''
32
- total_sent = 0
33
- total_received = 0
37
+ def __init__ (self ):
38
+ self .total_sent = 0
39
+ self .total_received = 0
34
40
35
41
36
42
class StressTestRunner :
43
+ '''Framework for running a service bus stress test.
44
+ Duration can be overriden via the --stress_test_duration flag from the command line'''
45
+
37
46
def __init__ (self ,
38
47
senders ,
39
48
receivers ,
@@ -58,6 +67,11 @@ def __init__(self,
58
67
# If we ever require multiple runs of this one after another, just make Run() reset this.
59
68
self ._state = StressTestRunnerState ()
60
69
70
+ self ._duration_override = None
71
+ for arg in sys .argv :
72
+ if arg .startswith ('--stress_test_duration_seconds=' ):
73
+ self ._duration_override = timedelta (seconds = int (arg .split ('=' )[1 ]))
74
+
61
75
62
76
# Plugin functions the caller can override to further tailor the test.
63
77
@staticmethod
@@ -103,7 +117,7 @@ def _ConstructMessage(self):
103
117
message = Message (self .PreProcessMessageBody ("a" * self .message_size ))
104
118
self .PreProcessMessage (message )
105
119
batch .add (message )
106
- self .PreProcessBatch (batch )
120
+ self .PreProcessMessageBatch (batch )
107
121
return batch
108
122
else :
109
123
message = Message (self .PreProcessMessageBody ("a" * self .message_size ))
@@ -147,7 +161,7 @@ def _Receive(self, receiver, end_time):
147
161
148
162
def Run (self ):
149
163
start_time = datetime .now ()
150
- end_time = start_time + self .duration
164
+ end_time = start_time + ( self ._duration_override or self . duration )
151
165
sent_messages = 0
152
166
received_messages = 0
153
167
with concurrent .futures .ProcessPoolExecutor (max_workers = 4 ) as proc_pool :
@@ -160,5 +174,6 @@ def Run(self):
160
174
result .total_sent = sum ([r .total_sent for r in result .state_by_sender .values ()])
161
175
result .total_received = sum ([r .total_received for r in result .state_by_receiver .values ()])
162
176
result .time_elapsed = end_time - start_time
177
+ print ("Stress test completed. Results:\n " , result )
163
178
return result
164
179
0 commit comments