Skip to content

Commit e17f6e2

Browse files
Oauth OIDC CCloud producer example (#1769)
Added an example for OAUTH OIDC producer with support for confluent cloud --------- Co-authored-by: Sarwar Bhuiyan <[email protected]>
1 parent f354a7a commit e17f6e2

File tree

2 files changed

+139
-0
lines changed

2 files changed

+139
-0
lines changed

CHANGELOG.md

+11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# Confluent's Python client for Apache Kafka
22

3+
## v2.6.1
4+
5+
v2.6.1 is a maintenance release with the following fixes and enhancements:
6+
7+
- Added an example for OAUTH OIDC producer with support for confluent cloud (#1769, @sarwarbhuiyan)
8+
9+
confluent-kafka-python is based on librdkafka v2.6.1, see the
10+
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.1)
11+
for a complete list of changes, enhancements, fixes and upgrade considerations.
12+
13+
314
## v2.6.0
415

516
v2.6.0 is a feature release with the following features, fixes and enhancements:
+128
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2020 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
19+
# This uses OAuth client credentials grant:
20+
# https://www.oauth.com/oauth2-servers/access-tokens/client-credentials/
21+
# where client_id and client_secret are passed as HTTP Authorization header
22+
# with the OIDC method and doesn't need to set a `oauth_cb` to obtain the token,
23+
# but, given it's using cURL for http requests, when using bundled librdkafka on non RHEL based distros,
24+
# currently it needs to provide CA bundle at a specific path, that is `/etc/pki/tls/certs/ca-bundle.crt`.
25+
26+
import logging
27+
import argparse
28+
from confluent_kafka import Producer
29+
from confluent_kafka.serialization import StringSerializer
30+
31+
32+
def producer_config(args):
33+
logger = logging.getLogger(__name__)
34+
logger.setLevel(logging.DEBUG)
35+
params = {
36+
'bootstrap.servers': args.bootstrap_servers,
37+
'security.protocol': 'SASL_SSL',
38+
'sasl.mechanisms': 'OAUTHBEARER',
39+
'sasl.oauthbearer.method': 'oidc',
40+
'sasl.oauthbearer.client.id': args.client_id,
41+
'sasl.oauthbearer.client.secret': args.client_secret,
42+
'sasl.oauthbearer.token.endpoint.url': args.token_url,
43+
'sasl.oauthbearer.scope': ' '.join(args.scopes)
44+
}
45+
# These two parameters are only applicable when producing to
46+
# confluent cloud where some sasl extensions are required.
47+
if args.logical_cluster and args.identity_pool_id:
48+
params['sasl.oauthbearer.extensions'] = 'logicalCluster=' + args.logical_cluster + \
49+
',identityPoolId=' + args.identity_pool_id
50+
51+
return params
52+
53+
54+
def delivery_report(err, msg):
55+
"""
56+
Reports the failure or success of a message delivery.
57+
58+
Args:
59+
err (KafkaError): The error that occurred on None on success.
60+
61+
msg (Message): The message that was produced or failed.
62+
63+
Note:
64+
In the delivery report callback the Message.key() and Message.value()
65+
will be the binary format as encoded by any configured Serializers and
66+
not the same object that was passed to produce().
67+
If you wish to pass the original object(s) for key and value to delivery
68+
report callback we recommend a bound callback or lambda where you pass
69+
the objects along.
70+
71+
"""
72+
if err is not None:
73+
print('Delivery failed for User record {}: {}'.format(msg.key(), err))
74+
return
75+
print('User record {} successfully produced to {} [{}] at offset {}'.format(
76+
msg.key(), msg.topic(), msg.partition(), msg.offset()))
77+
78+
79+
def main(args):
80+
topic = args.topic
81+
delimiter = args.delimiter
82+
producer_conf = producer_config(args)
83+
producer = Producer(producer_conf)
84+
serializer = StringSerializer('utf_8')
85+
86+
print('Producing records to topic {}. ^C to exit.'.format(topic))
87+
while True:
88+
# Serve on_delivery callbacks from previous calls to produce()
89+
producer.poll(0.0)
90+
try:
91+
msg_data = input(">")
92+
msg = msg_data.split(delimiter)
93+
if len(msg) == 2:
94+
producer.produce(topic=topic,
95+
key=serializer(msg[0]),
96+
value=serializer(msg[1]),
97+
on_delivery=delivery_report)
98+
else:
99+
producer.produce(topic=topic,
100+
value=serializer(msg[0]),
101+
on_delivery=delivery_report)
102+
except KeyboardInterrupt:
103+
break
104+
105+
print('\nFlushing {} records...'.format(len(producer)))
106+
producer.flush()
107+
108+
109+
if __name__ == '__main__':
110+
parser = argparse.ArgumentParser(description="OAUTH example with client credentials grant")
111+
parser.add_argument('-b', dest="bootstrap_servers", required=True,
112+
help="Bootstrap broker(s) (host[:port])")
113+
parser.add_argument('-t', dest="topic", default="example_producer_oauth",
114+
help="Topic name")
115+
parser.add_argument('-d', dest="delimiter", default="|",
116+
help="Key-Value delimiter. Defaults to '|'"),
117+
parser.add_argument('--client', dest="client_id", required=True,
118+
help="Client ID for client credentials flow")
119+
parser.add_argument('--secret', dest="client_secret", required=True,
120+
help="Client secret for client credentials flow.")
121+
parser.add_argument('--token-url', dest="token_url", required=True,
122+
help="Token URL.")
123+
parser.add_argument('--scopes', dest="scopes", required=True, nargs='+',
124+
help="Scopes requested from OAuth server.")
125+
parser.add_argument('--logical-cluster', dest="logical_cluster", required=False, help="Logical Cluster.")
126+
parser.add_argument('--identity-pool-id', dest="identity_pool_id", required=False, help="Identity Pool ID.")
127+
128+
main(parser.parse_args())

0 commit comments

Comments
 (0)