Skip to content

Commit 1f75b39

Browse files
authored
Merge pull request #20 from aiven/advancedmetricprovider
add metricsadvancedproducer
2 parents 5997a53 + 23a1783 commit 1f75b39

File tree

2 files changed

+165
-72
lines changed

2 files changed

+165
-72
lines changed

main.py

+135-72
Original file line numberDiff line numberDiff line change
@@ -12,92 +12,111 @@
1212
from metricproducer import MetricProvider
1313
from userbets import UserBetsProvider
1414
from rolling import RollingProvider
15+
from metricadvancedproducer import MetricAdvancedProvider
1516

1617

1718
MAX_NUMBER_PIZZAS_IN_ORDER = 10
1819
MAX_ADDITIONAL_TOPPINGS_IN_PIZZA = 5
1920

2021

21-
# Creating a Faker instance and seeding to have the same results every time we execute the script
22+
# Creating a Faker instance and seeding to have the
23+
# same results every time we execute the script
2224
fake = Faker()
2325
Faker.seed(4321)
2426

2527

2628
# function produce_msgs starts producing messages with Faker
27-
def produce_msgs(security_protocol='SSL',
28-
sasl_mechanism='SCRAM-SHA-256',
29-
cert_folder = '~/kafka-pizza/',
30-
username = '',
31-
password = '',
32-
hostname = 'hostname',
33-
port = '1234',
34-
topic_name = 'pizza-orders',
35-
nr_messages = -1,
36-
max_waiting_time_in_sec = 5,
37-
subject = 'pizza'):
38-
if security_protocol.upper() == 'PLAINTEXT':
29+
def produce_msgs(
30+
security_protocol="SSL",
31+
sasl_mechanism="SCRAM-SHA-256",
32+
cert_folder="~/kafka-pizza/",
33+
username="",
34+
password="",
35+
hostname="hostname",
36+
port="1234",
37+
topic_name="pizza-orders",
38+
nr_messages=-1,
39+
max_waiting_time_in_sec=5,
40+
subject="pizza",
41+
):
42+
if security_protocol.upper() == "PLAINTEXT":
3943
producer = KafkaProducer(
40-
bootstrap_servers=hostname + ':' + port,
41-
security_protocol='PLAINTEXT',
42-
value_serializer=lambda v: json.dumps(v).encode('ascii'),
43-
key_serializer=lambda v: json.dumps(v).encode('ascii')
44+
bootstrap_servers=hostname + ":" + port,
45+
security_protocol="PLAINTEXT",
46+
value_serializer=lambda v: json.dumps(v).encode("ascii"),
47+
key_serializer=lambda v: json.dumps(v).encode("ascii"),
4448
)
45-
elif security_protocol.upper() == 'SSL':
49+
elif security_protocol.upper() == "SSL":
4650
producer = KafkaProducer(
47-
bootstrap_servers=hostname + ':' + port,
48-
security_protocol='SSL',
49-
ssl_cafile=cert_folder+'/ca.pem',
50-
ssl_certfile=cert_folder+'/service.cert',
51-
ssl_keyfile=cert_folder+'/service.key',
52-
value_serializer=lambda v: json.dumps(v).encode('ascii'),
53-
key_serializer=lambda v: json.dumps(v).encode('ascii')
51+
bootstrap_servers=hostname + ":" + port,
52+
security_protocol="SSL",
53+
ssl_cafile=cert_folder + "/ca.pem",
54+
ssl_certfile=cert_folder + "/service.cert",
55+
ssl_keyfile=cert_folder + "/service.key",
56+
value_serializer=lambda v: json.dumps(v).encode("ascii"),
57+
key_serializer=lambda v: json.dumps(v).encode("ascii"),
5458
)
55-
elif security_protocol.upper() == 'SASL_SSL':
59+
elif security_protocol.upper() == "SASL_SSL":
5660
producer = KafkaProducer(
57-
bootstrap_servers=hostname + ':' + port,
58-
security_protocol='SASL_SSL',
61+
bootstrap_servers=hostname + ":" + port,
62+
security_protocol="SASL_SSL",
5963
sasl_mechanism=sasl_mechanism,
60-
ssl_cafile=cert_folder+'/ca.pem' if cert_folder else None,
64+
ssl_cafile=cert_folder + "/ca.pem" if cert_folder else None,
6165
sasl_plain_username=username,
6266
sasl_plain_password=password,
63-
value_serializer=lambda v: json.dumps(v).encode('ascii'),
64-
key_serializer=lambda v: json.dumps(v).encode('ascii')
67+
value_serializer=lambda v: json.dumps(v).encode("ascii"),
68+
key_serializer=lambda v: json.dumps(v).encode("ascii"),
6569
)
6670
else:
6771
sys.exit("This security protocol is not supported!")
6872

6973
if nr_messages <= 0:
70-
nr_messages = float('inf')
74+
nr_messages = float("inf")
7175
i = 0
72-
73-
if subject == 'stock':
76+
77+
if subject == "stock":
7478
fake.add_provider(StockProvider)
75-
elif subject == 'realstock':
79+
elif subject == "realstock":
7680
fake.add_provider(RealStockProvider)
77-
elif subject == 'metric':
81+
elif subject == "metric":
7882
fake.add_provider(MetricProvider)
79-
elif subject == 'userbehaviour':
83+
elif subject == "advancedmetric":
84+
fake.add_provider(MetricAdvancedProvider)
85+
elif subject == "userbehaviour":
8086
fake.add_provider(UserBehaviorProvider)
81-
elif subject == 'bet':
87+
elif subject == "bet":
8288
fake.add_provider(UserBetsProvider)
83-
elif subject == 'rolling':
89+
elif subject == "rolling":
8490
fake.add_provider(RollingProvider)
8591
else:
8692
fake.add_provider(PizzaProvider)
8793
while i < nr_messages:
88-
if subject in ['stock', 'userbehaviour', 'realstock', 'metric', 'bet', 'rolling']:
94+
if subject in [
95+
"stock",
96+
"userbehaviour",
97+
"realstock",
98+
"metric",
99+
"bet",
100+
"rolling",
101+
"advancedmetric",
102+
]:
89103
message, key = fake.produce_msg()
90104
else:
91-
message, key = fake.produce_msg(fake, i, MAX_NUMBER_PIZZAS_IN_ORDER, MAX_ADDITIONAL_TOPPINGS_IN_PIZZA)
105+
message, key = fake.produce_msg(
106+
fake,
107+
i,
108+
MAX_NUMBER_PIZZAS_IN_ORDER,
109+
MAX_ADDITIONAL_TOPPINGS_IN_PIZZA,
110+
)
92111

93-
print('Sending: {}'.format(message))
112+
print("Sending: {}".format(message))
94113
# sending the message to Kafka
95-
producer.send(topic_name,
96-
key=key,
97-
value=message)
114+
producer.send(topic_name, key=key, value=message)
98115
# Sleeping time
99-
sleep_time = random.randint(0, int(max_waiting_time_in_sec * 10000))/10000
100-
print('Sleeping for...'+str(sleep_time)+'s')
116+
sleep_time = (
117+
random.randint(0, int(max_waiting_time_in_sec * 10000)) / 10000
118+
)
119+
print("Sleeping for..." + str(sleep_time) + "s")
101120
time.sleep(sleep_time)
102121

103122
# Force flushing of all messages
@@ -106,23 +125,65 @@ def produce_msgs(security_protocol='SSL',
106125
i = i + 1
107126
producer.flush()
108127

128+
109129
# calling the main produce_msgs function: parameters are:
110130
# * nr_messages: number of messages to produce
111131
# * max_waiting_time_in_sec: maximum waiting time in sec between messages
112132

133+
113134
def main():
114135
parser = argparse.ArgumentParser()
115-
parser.add_argument('--security-protocol', help='Security protocol for Kafka (PLAINTEXT, SSL, SASL_SSL)', required=True)
116-
parser.add_argument('--sasl-mechanism', help='SASL mechanism for Kafka (PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512)', required=False)
117-
parser.add_argument('--cert-folder', help='Path to folder containing required Kafka certificates. Required --security-protocol equal SSL or SASL_SSL', required=False)
118-
parser.add_argument('--username', help='Username. Required if security-protocol is SASL_SSL', required=False)
119-
parser.add_argument('--password', help='Password. Required if security-protocol is SASL_SSL', required=False)
120-
parser.add_argument('--host', help='Kafka Host (obtained from Aiven console)', required=True)
121-
parser.add_argument('--port', help='Kafka Port (obtained from Aiven console)', required=True)
122-
parser.add_argument('--topic-name', help='Topic Name', required=True)
123-
parser.add_argument('--nr-messages', help='Number of messages to produce (0 for unlimited)', required=True)
124-
parser.add_argument('--max-waiting-time', help='Max waiting time between messages (0 for none)', required=True)
125-
parser.add_argument('--subject', help='What type of content to produce (possible choices are [pizza, userbehaviour, stock, realstock, metric] pizza is the default', required=False)
136+
parser.add_argument(
137+
"--security-protocol",
138+
help="Security protocol for Kafka (PLAINTEXT, SSL, SASL_SSL)",
139+
required=True,
140+
)
141+
parser.add_argument(
142+
"--sasl-mechanism",
143+
help="SASL mechanism for Kafka (PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512)",
144+
required=False,
145+
)
146+
parser.add_argument(
147+
"--cert-folder",
148+
help="Path to folder containing required Kafka certificates. Required --security-protocol equal SSL or SASL_SSL",
149+
required=False,
150+
)
151+
parser.add_argument(
152+
"--username",
153+
help="Username. Required if security-protocol is SASL_SSL",
154+
required=False,
155+
)
156+
parser.add_argument(
157+
"--password",
158+
help="Password. Required if security-protocol is SASL_SSL",
159+
required=False,
160+
)
161+
parser.add_argument(
162+
"--host",
163+
help="Kafka Host (obtained from Aiven console)",
164+
required=True,
165+
)
166+
parser.add_argument(
167+
"--port",
168+
help="Kafka Port (obtained from Aiven console)",
169+
required=True,
170+
)
171+
parser.add_argument("--topic-name", help="Topic Name", required=True)
172+
parser.add_argument(
173+
"--nr-messages",
174+
help="Number of messages to produce (0 for unlimited)",
175+
required=True,
176+
)
177+
parser.add_argument(
178+
"--max-waiting-time",
179+
help="Max waiting time between messages (0 for none)",
180+
required=True,
181+
)
182+
parser.add_argument(
183+
"--subject",
184+
help="What type of content to produce (possible choices are [pizza, userbehaviour, stock, realstock, metric, advancedmetric] pizza is the default",
185+
required=False,
186+
)
126187
args = parser.parse_args()
127188
p_security_protocol = args.security_protocol
128189
p_cert_folder = args.cert_folder
@@ -131,21 +192,23 @@ def main():
131192
p_sasl_mechanism = args.sasl_mechanism
132193
p_hostname = args.host
133194
p_port = args.port
134-
p_topic_name= args.topic_name
195+
p_topic_name = args.topic_name
135196
p_subject = args.subject
136-
produce_msgs(security_protocol=p_security_protocol,
137-
cert_folder=p_cert_folder,
138-
username=p_username,
139-
password=p_password,
140-
hostname=p_hostname,
141-
port=p_port,
142-
topic_name=p_topic_name,
143-
nr_messages=int(args.nr_messages),
144-
max_waiting_time_in_sec=float(args.max_waiting_time),
145-
subject=p_subject,
146-
sasl_mechanism=p_sasl_mechanism,
147-
)
197+
produce_msgs(
198+
security_protocol=p_security_protocol,
199+
cert_folder=p_cert_folder,
200+
username=p_username,
201+
password=p_password,
202+
hostname=p_hostname,
203+
port=p_port,
204+
topic_name=p_topic_name,
205+
nr_messages=int(args.nr_messages),
206+
max_waiting_time_in_sec=float(args.max_waiting_time),
207+
subject=p_subject,
208+
sasl_mechanism=p_sasl_mechanism,
209+
)
148210
print(args.nr_messages)
149211

150-
if __name__ == '__main__':
212+
213+
if __name__ == "__main__":
151214
main()

metricadvancedproducer.py

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from faker.providers import BaseProvider
2+
import random
3+
import time
4+
5+
NUMBER_HOSTS = 100000
6+
NUMBER_CPU = 30
7+
8+
9+
class MetricAdvancedProvider(BaseProvider):
10+
def hostname(self):
11+
return "hostname" + random.randint(0, NUMBER_HOSTS)
12+
13+
def cpu_id(self):
14+
15+
return "cpu" + random.randint(0, NUMBER_CPU)
16+
17+
def usage(self):
18+
return random.random() * 30 + 70
19+
20+
def produce_msg(self):
21+
hostname = self.hostname()
22+
ts = time.time()
23+
message = {
24+
"hostname": hostname,
25+
"cpu": self.cpu_id(),
26+
"usage": self.usage(),
27+
"occurred_at": int(ts * 1000),
28+
}
29+
key = {"hostname": hostname}
30+
return message, key

0 commit comments

Comments
 (0)