From 6ba9969bad891fbcf376cc1d28000cc68bb80f76 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 4 Nov 2022 21:52:52 +0800 Subject: [PATCH 01/12] Improve apidoc with pydoctor tool Signed-off-by: tison --- pulsar/__init__.py | 221 +++++++++++++++++++++------------------------ 1 file changed, 103 insertions(+), 118 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 6fd38d1..a4eb3e3 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -23,81 +23,23 @@ Currently, the supported Python versions are 3.7, 3.8, 3.9 and 3.10. -## Install from PyPI +================= +Install from PyPI +================= -Download Python wheel binary files for MacOS and Linux -directly from the PyPI archive. +Download Python wheel binary files for macOS and Linux directly from +the PyPI archive: - #!shell - $ sudo pip install pulsar-client +.. code-block:: shell -## Install from sources + sudo pip install pulsar-client -Follow the instructions to compile the Pulsar C++ client library. This method -will also build the Python binding for the library. +======================== +Install from source code +======================== -To install the Python bindings: - - #!shell - $ cd pulsar-client-cpp/python - $ sudo python setup.py install - -## Examples - -### [Producer](#pulsar.Producer) example - - #!python - import pulsar - - client = pulsar.Client('pulsar://localhost:6650') - - producer = client.create_producer('my-topic') - - for i in range(10): - producer.send(('Hello-%d' % i).encode('utf-8')) - - client.close() - -#### [Consumer](#pulsar.Consumer) Example - - #!python - import pulsar - - client = pulsar.Client('pulsar://localhost:6650') - - consumer = client.subscribe('my-topic', 'my-subscription') - - while True: - msg = consumer.receive() - try: - print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) - consumer.acknowledge(msg) - except Exception: - consumer.negative_acknowledge(msg) - - client.close() - -### [Async producer](#pulsar.Producer.send_async) example - - #!python - import pulsar - - client = pulsar.Client('pulsar://localhost:6650') - - producer = client.create_producer( - 'my-topic', - block_if_queue_full=True, - batching_enabled=True, - batching_max_publish_delay_ms=10 - ) - - def send_callback(res, msg_id): - print('Message published res=%s', res) - - while True: - producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback) - - client.close() +Read the instructions on `source code repository +`_. """ import logging @@ -149,7 +91,7 @@ def partition(self): def serialize(self): """ Returns a bytes representation of the message id. - This bytes sequence can be stored and later deserialized. + This byte sequence can be stored and later deserialized. """ return self._msg_id.serialize() @@ -208,7 +150,7 @@ def event_timestamp(self): def message_id(self): """ - The message ID that can be used to refere to this particular message. + The message ID that can be used to refer to this particular message. """ return self._message.message_id() @@ -355,10 +297,10 @@ def __init__(self, username=None, password=None, method='basic', auth_params_str For example, if you want to create a basic authentication instance whose username is "my-user" and password is "my-pass", there are two ways: - ``` - auth = AuthenticationBasic('my-user', 'my-pass') - auth = AuthenticationBasic(auth_params_string='{"username": "my-user", "password": "my-pass"}') - ``` + .. code-block:: python + + auth = AuthenticationBasic('my-user', 'my-pass') + auth = AuthenticationBasic(auth_params_string='{"username": "my-user", "password": "my-pass"}') **Args** * username : str, optional @@ -455,7 +397,7 @@ def __init__(self, service_url, * `listener_name`: Listener name for lookup. Clients can use listenerName to choose one of the listeners as the service URL to create a connection to the broker as long as the network is accessible. - advertisedListeners must enabled in broker side. + advertisedListeners must be enabled in broker side. """ _check_type(str, service_url, 'service_url') _check_type_or_none(Authentication, authentication, 'authentication') @@ -779,7 +721,7 @@ def my_listener(consumer, message): Default: `False`. * max_pending_chunked_message: Consumer buffers chunk messages into memory until it receives all the chunks of the original message. - While consuming chunk-messages, chunks from same message might not be contiguous in the stream and they + While consuming chunk-messages, chunks from same message might not be contiguous in the stream, and they might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publisher failed to publish all chunks of the messages. @@ -788,7 +730,7 @@ def my_listener(consumer, message): Default: `10`. * auto_ack_oldest_chunked_message_on_queue_full: - Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it + Buffering large number of outstanding uncompleted chunked messages can create memory pressure, and it can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery. @@ -988,6 +930,19 @@ def close(self): class Producer: """ The Pulsar message producer, used to publish messages on a topic. + + Examples + -------- + + .. code-block:: python + + import pulsar + + client = pulsar.Client('pulsar://localhost:6650') + producer = client.create_producer('my-topic') + for i in range(10): + producer.send(('Hello-%d' % i).encode('utf-8')) + client.close() """ def topic(self): @@ -1007,12 +962,12 @@ def last_sequence_id(self): """ Get the last sequence id that was published by this producer. - This represent either the automatically assigned or custom sequence id + This represents either the automatically assigned or custom sequence id (set on the `MessageBuilder`) that was published and acknowledged by the broker. After recreating a producer with the same producer name, this will return the last message that was published in the previous producer session, or -1 if - there no message was ever published. + there was no message ever published. """ return self._producer.last_sequence_id() @@ -1055,7 +1010,7 @@ def send(self, content, * `event_timestamp`: Timestamp in millis of the timestamp of event creation * `deliver_at`: - Specify the this message should not be delivered earlier than the + Specify the message should not be delivered earlier than the specified timestamp. The timestamp is milliseconds and based on UTC * `deliver_after`: @@ -1080,49 +1035,61 @@ def send_async(self, content, callback, """ Send a message asynchronously. - The `callback` will be invoked once the message has been acknowledged - by the broker. + Examples + -------- + + The `callback` will be invoked once the message has been acknowledged by the broker. - Example: + .. code-block:: python + + import pulsar + + client = pulsar.Client('pulsar://localhost:6650') + producer = client.create_producer( + 'my-topic', + block_if_queue_full=True, + batching_enabled=True, + batching_max_publish_delay_ms=10) - #!python def callback(res, msg_id): - print('Message published: %s' % res) + print('Message published res=%s', res) - producer.send_async(msg, callback) + while True: + producer.send_async(('Hello-%d' % i).encode('utf-8'), callback) - When the producer queue is full, by default the message will be rejected - and the callback invoked with an error code. + client.close() - **Args** - * `content`: - A `bytes` object with the message payload. + When the producer queue is full, by default the message will be rejected + and the callback invoked with an error code. - **Options** - * `properties`: - A dict of application0-defined string properties. - * `partition_key`: - Sets the partition key for the message routing. A hash of this key is - used to determine the message's topic partition. - * `sequence_id`: - Specify a custom sequence id for the message being published. - * `replication_clusters`: Override namespace replication clusters. Note - that it is the caller's responsibility to provide valid cluster names - and that all clusters have been previously configured as topics. - Given an empty list, the message will replicate per the namespace - configuration. - * `disable_replication`: - Do not replicate this message. - * `event_timestamp`: - Timestamp in millis of the timestamp of event creation - * `deliver_at`: - Specify the this message should not be delivered earlier than the - specified timestamp. - The timestamp is milliseconds and based on UTC - * `deliver_after`: - Specify a delay in timedelta for the delivery of the messages. + Parameters + ---------- + + content + A `bytes` object with the message payload. + callback + A callback that is invoked once the message has been acknowledged by the broker. + properties: optional + A dict of application0-defined string properties. + partition_key: optional + Sets the partition key for the message routing. A hash of this key is + used to determine the message's topic partition. + sequence_id: optional + Specify a custom sequence id for the message being published. + replication_clusters: optional + Override namespace replication clusters. Note that it is the caller's responsibility + to provide valid cluster names and that all clusters have been previously configured + as topics. Given an empty list, the message will replicate per the namespace configuration. + disable_replication: optional + Do not replicate this message. + event_timestamp: optional + Timestamp in millis of the timestamp of event creation + deliver_at: optional + Specify the message should not be delivered earlier than the specified timestamp. + deliver_after: optional + Specify a delay in timedelta for the delivery of the messages. """ msg = self._build_msg(content, properties, partition_key, sequence_id, replication_clusters, disable_replication, event_timestamp, @@ -1191,6 +1158,24 @@ def is_connected(self): class Consumer: """ Pulsar consumer. + + Examples + -------- + + .. code-block:: python + + import pulsar + + client = pulsar.Client('pulsar://localhost:6650') + consumer = client.subscribe('my-topic', 'my-subscription') + while True: + msg = consumer.receive() + try: + print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) + consumer.acknowledge(msg) + except Exception: + consumer.negative_acknowledge(msg) + client.close() """ def topic(self): @@ -1227,7 +1212,7 @@ def receive(self, timeout_millis=None): **Options** * `timeout_millis`: - If specified, the receive will raise an exception if a message is not + If specified, the receiver will raise an exception if a message is not available within the timeout. """ if timeout_millis is None: @@ -1377,7 +1362,7 @@ def read_next(self, timeout_millis=None): **Options** * `timeout_millis`: - If specified, the receive will raise an exception if a message is not + If specified, the receiver will raise an exception if a message is not available within the timeout. """ if timeout_millis is None: From 3fd9691a3215db1becd0e5c16e61e7590a15288f Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 4 Nov 2022 22:20:03 +0800 Subject: [PATCH 02/12] more args to parameters Signed-off-by: tison --- pulsar/__init__.py | 279 +++++++++++++++++++++++---------------------- 1 file changed, 141 insertions(+), 138 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index a4eb3e3..62345e4 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -208,12 +208,13 @@ def __init__(self, dynamicLibPath, authParamsString): """ Create the authentication provider instance. - **Args** + Parameters + ---------- - * `dynamicLibPath`: Path to the authentication provider shared library - (such as `tls.so`) - * `authParamsString`: Comma-separated list of provider-specific - configuration params + dynamicLibPath: str + Path to the authentication provider shared library (such as `tls.so`) + authParamsString: str + Comma-separated list of provider-specific configuration params """ _check_type(str, dynamicLibPath, 'dynamicLibPath') _check_type(str, authParamsString, 'authParamsString') @@ -228,10 +229,13 @@ def __init__(self, certificate_path, private_key_path): """ Create the TLS authentication provider instance. - **Args** + Parameters + ---------- - * `certificatePath`: Path to the public certificate - * `privateKeyPath`: Path to private TLS key + certificate_path: str + Path to the public certificate + private_key_path: str + Path to private TLS key """ _check_type(str, certificate_path, 'certificate_path') _check_type(str, private_key_path, 'private_key_path') @@ -246,10 +250,11 @@ def __init__(self, token): """ Create the token authentication provider instance. - **Args** + Parameters + ---------- - * `token`: A string containing the token or a functions that provides a - string with the token + token + A string containing the token or a functions that provides a string with the token """ if not (isinstance(token, str) or callable(token)): raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'") @@ -264,9 +269,11 @@ def __init__(self, auth_params_string): """ Create the Athenz authentication provider instance. - **Args** + Parameters + ---------- - * `auth_params_string`: JSON encoded configuration for Athenz client + auth_params_string: str + JSON encoded configuration for Athenz client """ _check_type(str, auth_params_string, 'auth_params_string') self.auth = _pulsar.AuthenticationAthenz(auth_params_string) @@ -279,9 +286,11 @@ def __init__(self, auth_params_string): """ Create the Oauth2 authentication provider instance. - **Args** + Parameters + ---------- - * `auth_params_string`: JSON encoded configuration for Oauth2 client + auth_params_string: str + JSON encoded configuration for Oauth2 client """ _check_type(str, auth_params_string, 'auth_params_string') self.auth = _pulsar.AuthenticationOauth2(auth_params_string) @@ -351,53 +360,46 @@ def __init__(self, service_url, """ Create a new Pulsar client instance. - **Args** - - * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/ - - **Options** + Parameters + ---------- - * `authentication`: - Set the authentication provider to be used with the broker. For example: - `AuthenticationTls`, `AuthenticationToken`, `AuthenticationAthenz` or `AuthenticationOauth2` - * `operation_timeout_seconds`: - Set timeout on client operations (subscribe, create producer, close, - unsubscribe). - * `io_threads`: - Set the number of IO threads to be used by the Pulsar client. - * `message_listener_threads`: - Set the number of threads to be used by the Pulsar client when - delivering messages through message listener. The default is 1 thread - per Pulsar client. If using more than 1 thread, messages for distinct - `message_listener`s will be delivered in different threads, however a - single `MessageListener` will always be assigned to the same thread. - * `concurrent_lookup_requests`: - Number of concurrent lookup-requests allowed on each broker connection - to prevent overload on the broker. - * `log_conf_file_path`: - Initialize log4cxx from a configuration file. - * `use_tls`: - Configure whether to use TLS encryption on the connection. This setting - is deprecated. TLS will be automatically enabled if the `serviceUrl` is - set to `pulsar+ssl://` or `https://` - * `tls_trust_certs_file_path`: - Set the path to the trusted TLS certificate file. If empty defaults to - certifi. - * `tls_allow_insecure_connection`: - Configure whether the Pulsar client accepts untrusted TLS certificates - from the broker. - * `tls_validate_hostname`: - Configure whether the Pulsar client validates that the hostname of the - endpoint, matches the common name on the TLS certificate presented by - the endpoint. - * `logger`: - Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`. - * `connection_timeout_ms`: - Set timeout in milliseconds on TCP connections. - * `listener_name`: - Listener name for lookup. Clients can use listenerName to choose one of the listeners - as the service URL to create a connection to the broker as long as the network is accessible. - advertisedListeners must be enabled in broker side. + service_url: str + The Pulsar service url eg: pulsar://my-broker.com:6650/ + authentication: Authentication, optional + Set the authentication provider to be used with the broker. For example: + `AuthenticationTls`, `AuthenticationToken`, `AuthenticationAthenz` or `AuthenticationOauth2` + operation_timeout_seconds: int, default=30 + Set timeout on client operations (subscribe, create producer, close, unsubscribe). + io_threads: int, default=1 + Set the number of IO threads to be used by the Pulsar client. + message_listener_threads: int, default=1 + Set the number of threads to be used by the Pulsar client when delivering messages through + message listener. The default is 1 thread per Pulsar client. If using more than 1 thread, + messages for distinct `message_listener`s will be delivered in different threads, however a + single `MessageListener` will always be assigned to the same thread. + concurrent_lookup_requests: int, default=50000 + Number of concurrent lookup-requests allowed on each broker connection to prevent overload + on the broker. + log_conf_file_path: str, optional + Initialize log4cxx from a configuration file. + use_tls: bool, default=False + Configure whether to use TLS encryption on the connection. This setting is deprecated. + TLS will be automatically enabled if the `serviceUrl` is set to `pulsar+ssl://` or `https://` + tls_trust_certs_file_path: str, optional + Set the path to the trusted TLS certificate file. If empty defaults to certifi. + tls_allow_insecure_connection: bool, default=False + Configure whether the Pulsar client accepts untrusted TLS certificates from the broker. + tls_validate_hostname: bool, default=False + Configure whether the Pulsar client validates that the hostname of the endpoint, + matches the common name on the TLS certificate presented by the endpoint. + logger: optional + Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`. + connection_timeout_ms: int, default=10000 + Set timeout in milliseconds on TCP connections. + listener_name: str, optional + Listener name for lookup. Clients can use listenerName to choose one of the listeners as + the service URL to create a connection to the broker as long as the network is accessible. + `advertisedListeners` must be enabled in broker side. """ _check_type(str, service_url, 'service_url') _check_type_or_none(Authentication, authentication, 'authentication') @@ -481,89 +483,90 @@ def create_producer(self, topic, """ Create a new producer on a given topic. - **Args** - - * `topic`: - The topic name - - **Options** + Parameters + ---------- - * `producer_name`: - Specify a name for the producer. If not assigned, - the system will generate a globally unique name which can be accessed - with `Producer.producer_name()`. When specifying a name, it is app to - the user to ensure that, for a given topic, the producer name is unique - across all Pulsar's clusters. - * `schema`: - Define the schema of the data that will be published by this producer. - The schema will be used for two purposes: - - Validate the data format against the topic defined schema - - Perform serialization/deserialization between data and objects - An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`. - * `initial_sequence_id`: - Set the baseline for the sequence ids for messages - published by the producer. First message will be using - `(initialSequenceId + 1)`` as its sequence id and subsequent messages will - be assigned incremental sequence ids, if not otherwise specified. - * `send_timeout_millis`: - If a message is not acknowledged by the server before the - `send_timeout` expires, an error will be reported. - * `compression_type`: - Set the compression type for the producer. By default, message - payloads are not compressed. Supported compression types are - `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`. - ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that - release in order to be able to receive messages compressed with ZSTD. - SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that - release in order to be able to receive messages compressed with SNAPPY. - * `max_pending_messages`: - Set the max size of the queue holding the messages pending to receive - an acknowledgment from the broker. - * `max_pending_messages_across_partitions`: - Set the max size of the queue holding the messages pending to receive - an acknowledgment across partitions from the broker. - * `block_if_queue_full`: Set whether `send_async` operations should - block when the outgoing message queue is full. - * `message_routing_mode`: - Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`, - other option is `PartitionsRoutingMode.UseSinglePartition` - * `lazy_start_partitioned_producers`: - This config affects producers of partitioned topics only. It controls whether - producers register and connect immediately to the owner broker of each partition - or start lazily on demand. The internal producer of one partition is always - started eagerly, chosen by the routing policy, but the internal producers of - any additional partitions are started on demand, upon receiving their first - message. - Using this mode can reduce the strain on brokers for topics with large numbers of - partitions and when the SinglePartition routing policy is used without keyed messages. - Because producer connection can be on demand, this can produce extra send latency - for the first messages of a given partition. - * `properties`: - Sets the properties for the producer. The properties associated with a producer - can be used for identify a producer at broker side. - * `batching_type`: - Sets the batching type for the producer. - There are two batching type: DefaultBatching and KeyBasedBatching. - - Default batching + topic: str + The topic name + producer_name: str, optional + Specify a name for the producer. If not assigned, the system will generate a globally unique name + which can be accessed with `Producer.producer_name()`. When specifying a name, it is app to the user + to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters. + schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema + Define the schema of the data that will be published by this producer, e.g, + `schema=JsonSchema(MyRecordClass)`. + + The schema will be used for two purposes: + * Validate the data format against the topic defined schema + * Perform serialization/deserialization between data and objects + initial_sequence_id: int, optional + Set the baseline for the sequence ids for messages published by the producer. First message will be + using `(initialSequenceId + 1)` as its sequence id and subsequent messages will be assigned + incremental sequence ids, if not otherwise specified. + send_timeout_millis: int, default=30000 + If a message is not acknowledged by the server before the `send_timeout` expires, an error will be reported. + compression_type: CompressionType, default=CompressionType.NONE + Set the compression type for the producer. By default, message payloads are not compressed. + + Supported compression types are `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` + and `CompressionType.SNAPPY`. + + ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that release in order to + be able to receive messages compressed with ZSTD. + + SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that release in order to + be able to receive messages compressed with SNAPPY. + max_pending_messages: int, default=1000 + Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. + max_pending_messages_across_partitions: int, default=50000 + Set the max size of the queue holding the messages pending to receive an acknowledgment across partitions + from the broker. + block_if_queue_full: bool, default=False + Set whether `send_async` operations should block when the outgoing message queue is full. + message_routing_mode: PartitionsRoutingMode, default=PartitionsRoutingMode.RoundRobinDistribution + Set the message routing mode for the partitioned producer. + + Supported modes: + + * `PartitionsRoutingMode.RoundRobinDistribution` + * `PartitionsRoutingMode.UseSinglePartition`. + lazy_start_partitioned_producers: bool, default=False + This config affects producers of partitioned topics only. It controls whether producers register + and connect immediately to the owner broker of each partition or start lazily on demand. The internal + producer of one partition is always started eagerly, chosen by the routing policy, but the internal + producers of any additional partitions are started on demand, upon receiving their first message. + + Using this mode can reduce the strain on brokers for topics with large numbers of partitions and when + the SinglePartition routing policy is used without keyed messages. Because producer connection can be + on demand, this can produce extra send latency for the first messages of a given partition. + properties: dict, optional + Sets the properties for the producer. The properties associated with a producer can be used for identify + a producer at broker side. + batching_type: BatchingType, default=BatchingType.Default + Sets the batching type for the producer. + + There are two batching type: DefaultBatching and KeyBasedBatching. + + * Default batching incoming single messages: - (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) + (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) batched into single batch message: - [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)] + [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)] - - KeyBasedBatching + * KeyBasedBatching incoming single messages: - (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) + (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) batched into single batch message: - [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)] - * `chunking_enabled`: - If message size is higher than allowed max publish-payload size by broker then chunking_enabled - helps producer to split message into multiple chunks and publish them to broker separately and in - order. So, it allows client to successfully publish large size of messages in pulsar. - * encryption_key: - The key used for symmetric encryption, configured on the producer side - * crypto_key_reader: - Symmetric encryption class implementation, configuring public key encryption messages for the producer - and private key decryption messages for the consumer + [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)] + chunking_enabled: bool, default=False + If message size is higher than allowed max publish-payload size by broker then chunking_enabled helps + producer to split message into multiple chunks and publish them to broker separately and in order. + So, it allows client to successfully publish large size of messages in pulsar. + encryption_key: str, optional + The key used for symmetric encryption, configured on the producer side + crypto_key_reader: CryptoKeyReader, optional + Symmetric encryption class implementation, configuring public key encryption messages for the producer + and private key decryption messages for the consumer """ _check_type(str, topic, 'topic') _check_type_or_none(str, producer_name, 'producer_name') From 59bd48aa663308285a53f251dff230a7ce3c9b33 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 4 Nov 2022 22:31:29 +0800 Subject: [PATCH 03/12] fix syntax Signed-off-by: tison --- pulsar/__init__.py | 51 +++++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 62345e4..34dc93a 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -212,7 +212,7 @@ def __init__(self, dynamicLibPath, authParamsString): ---------- dynamicLibPath: str - Path to the authentication provider shared library (such as `tls.so`) + Path to the authentication provider shared library (such as ``tls.so``) authParamsString: str Comma-separated list of provider-specific configuration params """ @@ -311,18 +311,21 @@ def __init__(self, username=None, password=None, method='basic', auth_params_str auth = AuthenticationBasic('my-user', 'my-pass') auth = AuthenticationBasic(auth_params_string='{"username": "my-user", "password": "my-pass"}') - **Args** - * username : str, optional - * password : str, optional - * method : str, optional - The authentication method name (default is 'basic') - * auth_params_string : str, optional - The JSON presentation of all fields above (default is None) - If it's not None, the other parameters will be ignored. + + Parameters + ---------- + username : str, optional + password : str, optional + method : str, default='basic' + The authentication method name + auth_params_string : str, optional + The JSON presentation of all fields above. If it's not None, the other parameters will be ignored. Here is an example JSON presentation: - {"username": "my-user", "password": "my-pass", "method": "oms3.0"} - The `username` and `password` fields are required. If the "method" field is not set, - it will be "basic" by default. + + {"username": "my-user", "password": "my-pass", "method": "oms3.0"} + + The ``username`` and ``password`` fields are required. If the "method" field is not set, it will be + "basic" by default. """ if auth_params_string is not None: _check_type(str, auth_params_string, 'auth_params_string') @@ -384,7 +387,7 @@ def __init__(self, service_url, Initialize log4cxx from a configuration file. use_tls: bool, default=False Configure whether to use TLS encryption on the connection. This setting is deprecated. - TLS will be automatically enabled if the `serviceUrl` is set to `pulsar+ssl://` or `https://` + TLS will be automatically enabled if the ``serviceUrl`` is set to ``pulsar+ssl://`` or ``https://`` tls_trust_certs_file_path: str, optional Set the path to the trusted TLS certificate file. If empty defaults to certifi. tls_allow_insecure_connection: bool, default=False @@ -494,22 +497,26 @@ def create_producer(self, topic, to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters. schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema Define the schema of the data that will be published by this producer, e.g, - `schema=JsonSchema(MyRecordClass)`. + ``schema=JsonSchema(MyRecordClass)``. The schema will be used for two purposes: * Validate the data format against the topic defined schema * Perform serialization/deserialization between data and objects initial_sequence_id: int, optional Set the baseline for the sequence ids for messages published by the producer. First message will be - using `(initialSequenceId + 1)` as its sequence id and subsequent messages will be assigned + using ``(initialSequenceId + 1)`` as its sequence id and subsequent messages will be assigned incremental sequence ids, if not otherwise specified. send_timeout_millis: int, default=30000 If a message is not acknowledged by the server before the `send_timeout` expires, an error will be reported. compression_type: CompressionType, default=CompressionType.NONE Set the compression type for the producer. By default, message payloads are not compressed. - Supported compression types are `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` - and `CompressionType.SNAPPY`. + Supported compression types: + + * CompressionType.LZ4 + * CompressionType.ZLib + * CompressionType.ZSTD + * CompressionType.SNAPPY ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that release in order to be able to receive messages compressed with ZSTD. @@ -547,16 +554,14 @@ def create_producer(self, topic, There are two batching type: DefaultBatching and KeyBasedBatching. - * Default batching - incoming single messages: + DefaultBatching will batch single messages: (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) - batched into single batch message: + ... into single batch message: [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)] - * KeyBasedBatching - incoming single messages: + KeyBasedBatching will batch incoming single messages: (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) - batched into single batch message: + ... into single batch message: [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)] chunking_enabled: bool, default=False If message size is higher than allowed max publish-payload size by broker then chunking_enabled helps From 3e4b25de9d5d7a475e4142d3a9eee8395667da6f Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 4 Nov 2022 22:47:25 +0800 Subject: [PATCH 04/12] more args to parameters Signed-off-by: tison --- pulsar/__init__.py | 212 +++++++++++++++++++++------------------------ 1 file changed, 101 insertions(+), 111 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 34dc93a..8215d02 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -653,81 +653,70 @@ def subscribe(self, topic, subscription_name, """ Subscribe to the given topic and subscription combination. - **Args** + Parameters + ---------- - * `topic`: The name of the topic, list of topics or regex pattern. - This method will accept these forms: - - `topic='my-topic'` - - `topic=['topic-1', 'topic-2', 'topic-3']` - - `topic=re.compile('persistent://public/default/topic-*')` - * `subscription`: The name of the subscription. + topic: + The name of the topic, list of topics or regex pattern. This method will accept these forms: + * ``topic='my-topic'`` + * ``topic=['topic-1', 'topic-2', 'topic-3']`` + * ``topic=re.compile('persistent://public/default/topic-*')`` + subscription_name: str + The name of the subscription. + consumer_type: ConsumerType, default=ConsumerType.Exclusive + Select the subscription type to be used when subscribing to the topic. + schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema + Define the schema of the data that will be received by this consumer. + message_listener: optional + Sets a message listener for the consumer. When the listener is set, the application will + receive messages through it. Calls to ``consumer.receive()`` will not be allowed. + The listener function needs to accept (consumer, message), for example: - **Options** + .. code-block:: python - * `consumer_type`: - Select the subscription type to be used when subscribing to the topic. - * `schema`: - Define the schema of the data that will be received by this consumer. - * `message_listener`: - Sets a message listener for the consumer. When the listener is set, - the application will receive messages through it. Calls to - `consumer.receive()` will not be allowed. The listener function needs - to accept (consumer, message), for example: - - #!python def my_listener(consumer, message): # process message consumer.acknowledge(message) - - * `receiver_queue_size`: - Sets the size of the consumer receive queue. The consumer receive - queue controls how many messages can be accumulated by the consumer - before the application calls `receive()`. Using a higher value could - potentially increase the consumer throughput at the expense of higher - memory utilization. Setting the consumer queue size to zero decreases - the throughput of the consumer by disabling pre-fetching of messages. - This approach improves the message distribution on shared subscription - by pushing messages only to those consumers that are ready to process - them. Neither receive with timeout nor partitioned topics can be used - if the consumer queue size is zero. The `receive()` function call - should not be interrupted when the consumer queue size is zero. The - default value is 1000 messages and should work well for most use - cases. - * `max_total_receiver_queue_size_across_partitions` - Set the max total receiver queue size across partitions. - This setting will be used to reduce the receiver queue size for individual partitions - * `consumer_name`: - Sets the consumer name. - * `unacked_messages_timeout_ms`: - Sets the timeout in milliseconds for unacknowledged messages. The - timeout needs to be greater than 10 seconds. An exception is thrown if - the given value is less than 10 seconds. If a successful - acknowledgement is not sent within the timeout, all the unacknowledged - messages are redelivered. - * `negative_ack_redelivery_delay_ms`: - The delay after which to redeliver the messages that failed to be - processed (with the `consumer.negative_acknowledge()`) - * `broker_consumer_stats_cache_time_ms`: - Sets the time duration for which the broker-side consumer stats will - be cached in the client. - * `is_read_compacted`: - Selects whether to read the compacted version of the topic - * `properties`: - Sets the properties for the consumer. The properties associated with a consumer - can be used for identify a consumer at broker side. - * `pattern_auto_discovery_period`: - Periods of seconds for consumer to auto discover match topics. - * `initial_position`: - Set the initial position of a consumer when subscribing to the topic. + receiver_queue_size: int, default=1000 + Sets the size of the consumer receive queue. The consumer receive queue controls how many messages can be + accumulated by the consumer before the application calls `receive()`. Using a higher value could potentially + increase the consumer throughput at the expense of higher memory utilization. Setting the consumer queue + size to zero decreases the throughput of the consumer by disabling pre-fetching of messages. + + This approach improves the message distribution on shared subscription by pushing messages only to those + consumers that are ready to process them. Neither receive with timeout nor partitioned topics can be used + if the consumer queue size is zero. The `receive()` function call should not be interrupted when the + consumer queue size is zero. The default value is 1000 messages and should work well for most use cases. + max_total_receiver_queue_size_across_partitions: int, default=50000 + Set the max total receiver queue size across partitions. This setting will be used to reduce the + receiver queue size for individual partitions + consumer_name: str, optional + Sets the consumer name. + unacked_messages_timeout_ms: int, optional + Sets the timeout in milliseconds for unacknowledged messages. The timeout needs to be greater than + 10 seconds. An exception is thrown if the given value is less than 10 seconds. If a successful + acknowledgement is not sent within the timeout, all the unacknowledged messages are redelivered. + negative_ack_redelivery_delay_ms: int, default=60000 + The delay after which to redeliver the messages that failed to be processed + (with the ``consumer.negative_acknowledge()``) + broker_consumer_stats_cache_time_ms: int, default=30000 + Sets the time duration for which the broker-side consumer stats will be cached in the client. + is_read_compacted: bool, default=False + Selects whether to read the compacted version of the topic + properties: dict, optional + Sets the properties for the consumer. The properties associated with a consumer can be used for + identify a consumer at broker side. + pattern_auto_discovery_period: int, default=60 + Periods of seconds for consumer to auto discover match topics. + initial_position: InitialPosition, default=InitialPosition.Latest + Set the initial position of a consumer when subscribing to the topic. It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`. - Default: `Latest`. - * crypto_key_reader: - Symmetric encryption class implementation, configuring public key encryption messages for the producer - and private key decryption messages for the consumer - * replicate_subscription_state_enabled: - Set whether the subscription status should be replicated. - Default: `False`. - * max_pending_chunked_message: + crypto_key_reader: CryptoKeyReader, optional + Symmetric encryption class implementation, configuring public key encryption messages for the producer + and private key decryption messages for the consumer + replicate_subscription_state_enabled: bool, default=False + Set whether the subscription status should be replicated. + max_pending_chunked_message: int, default=10 Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from same message might not be contiguous in the stream, and they might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage @@ -735,18 +724,13 @@ def my_listener(consumer, message): messages on the topic concurrently or publisher failed to publish all chunks of the messages. If it's zero, the pending chunked messages will not be limited. - - Default: `10`. - * auto_ack_oldest_chunked_message_on_queue_full: + auto_ack_oldest_chunked_message_on_queue_full: bool, default=False Buffering large number of outstanding uncompleted chunked messages can create memory pressure, and it can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery. - Default: `False`. - * start_message_id_inclusive: + start_message_id_inclusive: bool, default=False Set the consumer to include the given position of any reset operation like Consumer::seek. - - Default: `False`. """ _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') @@ -826,54 +810,60 @@ def create_reader(self, topic, start_message_id, """ Create a reader on a particular topic - **Args** + Parameters + ---------- + + topic: + The name of the topic. + start_message_id: + The initial reader positioning is done by specifying a message id. The options are: + + * ``MessageId.earliest``: + + Start reading from the earliest message available in the topic + + * ``MessageId.latest``: + + Start reading from the end topic, only getting messages published after the reader was created + + * ``MessageId``: + + When passing a particular message id, the reader will position itself on that specific position. + The first message to be read will be the message next to the specified messageId. + Message id can be serialized into a string and deserialized back into a `MessageId` object: - * `topic`: The name of the topic. - * `start_message_id`: The initial reader positioning is done by specifying a message id. - The options are: - * `MessageId.earliest`: Start reading from the earliest message available in the topic - * `MessageId.latest`: Start reading from the end topic, only getting messages published - after the reader was created - * `MessageId`: When passing a particular message id, the reader will position itself on - that specific position. The first message to be read will be the message next to the - specified messageId. Message id can be serialized into a string and deserialized - back into a `MessageId` object: + .. code-block:: python # Serialize to string s = msg.message_id().serialize() # Deserialize from string msg_id = MessageId.deserialize(s) + schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema + Define the schema of the data that will be received by this reader. + reader_listener: optional + Sets a message listener for the reader. When the listener is set, the application will receive messages + through it. Calls to ``reader.read_next()`` will not be allowed. The listener function needs to accept + (reader, message), for example: - **Options** - - * `schema`: - Define the schema of the data that will be received by this reader. - * `reader_listener`: - Sets a message listener for the reader. When the listener is set, - the application will receive messages through it. Calls to - `reader.read_next()` will not be allowed. The listener function needs - to accept (reader, message), for example: + .. code-block:: python def my_listener(reader, message): # process message pass - - * `receiver_queue_size`: - Sets the size of the reader receive queue. The reader receive - queue controls how many messages can be accumulated by the reader - before the application calls `read_next()`. Using a higher value could - potentially increase the reader throughput at the expense of higher - memory utilization. - * `reader_name`: - Sets the reader name. - * `subscription_role_prefix`: - Sets the subscription role prefix. - * `is_read_compacted`: - Selects whether to read the compacted version of the topic - * crypto_key_reader: - Symmetric encryption class implementation, configuring public key encryption messages for the producer - and private key decryption messages for the consumer + receiver_queue_size: int, default=1000 + Sets the size of the reader receive queue. The reader receive queue controls how many messages can be + accumulated by the reader before the application calls `read_next()`. Using a higher value could + potentially increase the reader throughput at the expense of higher memory utilization. + reader_name: str, optional + Sets the reader name. + subscription_role_prefix: str, optional + Sets the subscription role prefix. + is_read_compacted: bool, default=False + Selects whether to read the compacted version of the topic + crypto_key_reader: CryptoKeyReader, optional + Symmetric encryption class implementation, configuring public key encryption messages for the producer + and private key decryption messages for the consumer """ _check_type(str, topic, 'topic') _check_type(_pulsar.MessageId, start_message_id, 'start_message_id') From 281d7fe0526a08565891fb8b64de1f5608dd97d8 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 4 Nov 2022 22:53:42 +0800 Subject: [PATCH 05/12] more args to parameters Signed-off-by: tison --- pulsar/__init__.py | 123 ++++++++++++++++++++++++++------------------- 1 file changed, 70 insertions(+), 53 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 8215d02..7fcb109 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -903,8 +903,17 @@ def get_topic_partitions(self, topic): This can be used to discover the partitions and create Reader, Consumer or Producer instances directly on a particular partition. - :param topic: the topic name to lookup - :return: a list of partition name + + Parameters + ---------- + + topic: str + the topic name to lookup + + Returns + ------- + list + a list of partition name """ _check_type(str, topic, 'topic') return self._client.get_topic_partitions(topic) @@ -984,36 +993,31 @@ def send(self, content, Returns a `MessageId` object that represents where the message is persisted. - **Args** - - * `content`: - A `bytes` object with the message payload. - - **Options** + Parameters + ---------- - * `properties`: - A dict of application-defined string properties. - * `partition_key`: - Sets the partition key for message routing. A hash of this key is used - to determine the message's topic partition. - * `sequence_id`: - Specify a custom sequence id for the message being published. - * `replication_clusters`: - Override namespace replication clusters. Note that it is the caller's - responsibility to provide valid cluster names and that all clusters - have been previously configured as topics. Given an empty list, + content: + A ``bytes`` object with the message payload. + properties: optional + A dict of application-defined string properties. + partition_key: optional + Sets the partition key for message routing. A hash of this key is used + to determine the message's topic partition. + sequence_id: optional + Specify a custom sequence id for the message being published. + replication_clusters: optional + Override namespace replication clusters. Note that it is the caller's responsibility to provide valid + cluster names and that all clusters have been previously configured as topics. Given an empty list, the message will replicate according to the namespace configuration. - * `disable_replication`: - Do not replicate this message. - * `event_timestamp`: - Timestamp in millis of the timestamp of event creation - * `deliver_at`: - Specify the message should not be delivered earlier than the - specified timestamp. - The timestamp is milliseconds and based on UTC - * `deliver_after`: - Specify a delay in timedelta for the delivery of the messages. - + disable_replication: bool, default=False + Do not replicate this message. + event_timestamp: optional + Timestamp in millis of the timestamp of event creation + deliver_at: optional + Specify the message should not be delivered earlier than the specified timestamp. + The timestamp is milliseconds and based on UTC + deliver_after: optional + Specify a delay in timedelta for the delivery of the messages. """ msg = self._build_msg(content, properties, partition_key, sequence_id, replication_clusters, disable_replication, event_timestamp, @@ -1231,10 +1235,11 @@ def acknowledge(self, message): This method will block until an acknowledgement is sent to the broker. After that, the message will not be re-delivered to this consumer. - **Args** + Parameters + ---------- - * `message`: - The received message or message id. + message: + The received message or message id. """ if isinstance(message, Message): self._consumer.acknowledge(message._message) @@ -1249,10 +1254,11 @@ def acknowledge_cumulative(self, message): This method will block until an acknowledgement is sent to the broker. After that, the messages will not be re-delivered to this consumer. - **Args** + Parameters + ---------- - * `message`: - The received message or message id. + message: + The received message or message id. """ if isinstance(message, Message): self._consumer.acknowledge_cumulative(message._message) @@ -1269,10 +1275,11 @@ def negative_acknowledge(self, message): This call is not blocking. - **Args** + Parameters + ---------- - * `message`: - The received message or message id. + message: + The received message or message id. """ if isinstance(message, Message): self._consumer.negative_acknowledge(message._message) @@ -1312,10 +1319,11 @@ def seek(self, messageid): Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions. - **Args** + Parameters + ---------- - * `message`: - The message id for seek, OR an integer event time to seek to + message: + The message id for seek, OR an integer event time to seek to """ self._consumer.seek(messageid) @@ -1387,10 +1395,11 @@ def seek(self, messageid): Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions. - **Args** + Parameters + ---------- - * `message`: - The message id for seek, OR an integer event time to seek to + message: + The message id for seek, OR an integer event time to seek to """ self._reader.seek(messageid) @@ -1416,10 +1425,13 @@ def __init__(self, public_key_path, private_key_path): """ Create crypto key reader. - **Args** + Parameters + ---------- - * `public_key_path`: Path to the public key - * `private_key_path`: Path to private key + public_key_path: str + Path to the public key + private_key_path: str + Path to private key """ _check_type(str, public_key_path, 'public_key_path') _check_type(str, private_key_path, 'private_key_path') @@ -1430,9 +1442,11 @@ class ConsoleLogger: """ Logger that writes on standard output - **Args** + Attributes + ---------- - * `log_level`: The logging level. eg: `pulsar.LoggerLevel.Info` + log_level: + The logging level, eg: ``pulsar.LoggerLevel.Info`` """ def __init__(self, log_level=_pulsar.LoggerLevel.Info): _check_type(_pulsar.LoggerLevel, log_level, 'log_level') @@ -1443,10 +1457,13 @@ class FileLogger: """ Logger that writes into a file - **Args** + Attributes + ---------- - * `log_level`: The logging level. eg: `pulsar.LoggerLevel.Info` - * `log_file`: The file where to write the logs + log_level: + The logging level, eg: `pulsar.LoggerLevel.Info` + log_file: + The file where to write the logs """ def __init__(self, log_level, log_file): _check_type(_pulsar.LoggerLevel, log_level, 'log_level') From cd41a13944b964cc951a822565f15aacfdd7d692 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 4 Nov 2022 22:55:26 +0800 Subject: [PATCH 06/12] more args to parameters Signed-off-by: tison --- pulsar/__init__.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 7fcb109..d13796e 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1322,7 +1322,7 @@ def seek(self, messageid): Parameters ---------- - message: + messageid: The message id for seek, OR an integer event time to seek to """ self._consumer.seek(messageid) @@ -1398,7 +1398,7 @@ def seek(self, messageid): Parameters ---------- - message: + messageid: The message id for seek, OR an integer event time to seek to """ self._reader.seek(messageid) @@ -1442,11 +1442,11 @@ class ConsoleLogger: """ Logger that writes on standard output - Attributes - ---------- + Attributes + ---------- - log_level: - The logging level, eg: ``pulsar.LoggerLevel.Info`` + log_level: + The logging level, eg: ``pulsar.LoggerLevel.Info`` """ def __init__(self, log_level=_pulsar.LoggerLevel.Info): _check_type(_pulsar.LoggerLevel, log_level, 'log_level') @@ -1457,13 +1457,13 @@ class FileLogger: """ Logger that writes into a file - Attributes - ---------- + Attributes + ---------- - log_level: - The logging level, eg: `pulsar.LoggerLevel.Info` - log_file: - The file where to write the logs + log_level: + The logging level, eg: `pulsar.LoggerLevel.Info` + log_file: + The file where to write the logs """ def __init__(self, log_level, log_file): _check_type(_pulsar.LoggerLevel, log_level, 'log_level') From 776be9d26cf1ca7853e3f3b47a0c7644e92e7745 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 4 Nov 2022 22:57:53 +0800 Subject: [PATCH 07/12] format function docstrings Signed-off-by: tison --- pulsar/functions/context.py | 301 ++++++++++++++++++----------------- pulsar/functions/function.py | 22 +-- pulsar/functions/serde.py | 83 +++++----- 3 files changed, 210 insertions(+), 196 deletions(-) diff --git a/pulsar/functions/context.py b/pulsar/functions/context.py index c1f6801..51b86f0 100644 --- a/pulsar/functions/context.py +++ b/pulsar/functions/context.py @@ -36,156 +36,159 @@ # specific language governing permissions and limitations # under the License. # -"""context.py: Context defines context information available during -# processing of a request. +""" +Context defines context information available during processing of a request. """ from abc import abstractmethod + class Context(object): - """Interface defining information available at process time""" - @abstractmethod - def get_message_id(self): - """Return the messageid of the current message that we are processing""" - pass - - @abstractmethod - def get_message_key(self): - """Return the key of the current message that we are processing""" - pass - - @abstractmethod - def get_message_eventtime(self): - """Return the event time of the current message that we are processing""" - pass - - @abstractmethod - def get_message_properties(self): - """Return the message properties kv map of the current message that we are processing""" - pass - - @abstractmethod - def get_current_message_topic_name(self): - """Returns the topic name of the message that we are processing""" - pass - - @abstractmethod - def get_function_tenant(self): - """Returns the tenant of the message that's being processed""" - pass - - @abstractmethod - def get_function_namespace(self): - """Returns the namespace of the message that's being processed""" - - @abstractmethod - def get_function_name(self): - """Returns the function name that we are a part of""" - pass - - @abstractmethod - def get_function_id(self): - """Returns the function id that we are a part of""" - pass - - @abstractmethod - def get_instance_id(self): - """Returns the instance id that is executing the function""" - pass - - @abstractmethod - def get_function_version(self): - """Returns the version of function that we are executing""" - pass - - @abstractmethod - def get_logger(self): - """Returns the logger object that can be used to do logging""" - pass - - @abstractmethod - def get_user_config_value(self, key): - """Returns the value of the user-defined config. If the key doesn't exist, None is returned""" - pass - - @abstractmethod - def get_user_config_map(self): - """Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)""" - pass - - @abstractmethod - def get_secret(self, secret_name): - """Returns the secret value associated with the name. None if nothing was found""" - pass - - @abstractmethod - def get_partition_key(self): - """Returns partition key of the input message is one exists""" - pass - - - @abstractmethod - def record_metric(self, metric_name, metric_value): - """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)""" - pass - - @abstractmethod - def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None): - """Publishes message to topic_name by first serializing the message using serde_class_name serde - The message will have properties specified if any - - The available options for message_conf: - - properties, - partition_key, - sequence_id, - replication_clusters, - disable_replication, - event_timestamp - - """ - pass - - @abstractmethod - def get_input_topics(self): - """Returns the input topics of function""" - pass - - @abstractmethod - def get_output_topic(self): - """Returns the output topic of function""" - pass - - @abstractmethod - def get_output_serde_class_name(self): - """return output Serde class""" - pass - - @abstractmethod - def ack(self, msgid, topic): - """ack this message id""" - pass - - @abstractmethod - def incr_counter(self, key, amount): - """incr the counter of a given key in the managed state""" - pass - - @abstractmethod - def get_counter(self, key): - """get the counter of a given key in the managed state""" - pass - - @abstractmethod - def del_counter(self, key): - """delete the counter of a given key in the managed state""" - pass - - @abstractmethod - def put_state(self, key, value): - """update the value of a given key in the managed state""" - pass - - @abstractmethod - def get_state(self, key): - """get the value of a given key in the managed state""" - pass + """Interface defining information available at process time""" + + @abstractmethod + def get_message_id(self): + """Return the messageid of the current message that we are processing""" + pass + + @abstractmethod + def get_message_key(self): + """Return the key of the current message that we are processing""" + pass + + @abstractmethod + def get_message_eventtime(self): + """Return the event time of the current message that we are processing""" + pass + + @abstractmethod + def get_message_properties(self): + """Return the message properties kv map of the current message that we are processing""" + pass + + @abstractmethod + def get_current_message_topic_name(self): + """Returns the topic name of the message that we are processing""" + pass + + @abstractmethod + def get_function_tenant(self): + """Returns the tenant of the message that's being processed""" + pass + + @abstractmethod + def get_function_namespace(self): + """Returns the namespace of the message that's being processed""" + + @abstractmethod + def get_function_name(self): + """Returns the function name that we are a part of""" + pass + + @abstractmethod + def get_function_id(self): + """Returns the function id that we are a part of""" + pass + + @abstractmethod + def get_instance_id(self): + """Returns the instance id that is executing the function""" + pass + + @abstractmethod + def get_function_version(self): + """Returns the version of function that we are executing""" + pass + + @abstractmethod + def get_logger(self): + """Returns the logger object that can be used to do logging""" + pass + + @abstractmethod + def get_user_config_value(self, key): + """Returns the value of the user-defined config. If the key doesn't exist, None is returned""" + pass + + @abstractmethod + def get_user_config_map(self): + """Returns the entire user-defined config as a dict + (the dict will be empty if no user-defined config is supplied)""" + pass + + @abstractmethod + def get_secret(self, secret_name): + """Returns the secret value associated with the name. None if nothing was found""" + pass + + @abstractmethod + def get_partition_key(self): + """Returns partition key of the input message is one exists""" + pass + + @abstractmethod + def record_metric(self, metric_name, metric_value): + """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)""" + pass + + @abstractmethod + def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, + compression_type=None, callback=None, message_conf=None): + """Publishes message to topic_name by first serializing the message using serde_class_name serde + The message will have properties specified if any + + The available options for message_conf: + + properties, + partition_key, + sequence_id, + replication_clusters, + disable_replication, + event_timestamp + + """ + pass + + @abstractmethod + def get_input_topics(self): + """Returns the input topics of function""" + pass + + @abstractmethod + def get_output_topic(self): + """Returns the output topic of function""" + pass + + @abstractmethod + def get_output_serde_class_name(self): + """return output Serde class""" + pass + + @abstractmethod + def ack(self, msgid, topic): + """ack this message id""" + pass + + @abstractmethod + def incr_counter(self, key, amount): + """incr the counter of a given key in the managed state""" + pass + + @abstractmethod + def get_counter(self, key): + """get the counter of a given key in the managed state""" + pass + + @abstractmethod + def del_counter(self, key): + """delete the counter of a given key in the managed state""" + pass + + @abstractmethod + def put_state(self, key, value): + """update the value of a given key in the managed state""" + pass + + @abstractmethod + def get_state(self, key): + """get the value of a given key in the managed state""" + pass diff --git a/pulsar/functions/function.py b/pulsar/functions/function.py index ce2919d..107d25d 100644 --- a/pulsar/functions/function.py +++ b/pulsar/functions/function.py @@ -36,16 +36,20 @@ # specific language governing permissions and limitations # under the License. # -"""function.py: This is the core interface of the function api. -# The process method is called for every message of the input topic of the -# function. The incoming input bytes are deserialized using the serde. -# The process function can optionally emit an output +""" +This is the core interface of the function api. + +The process method is called for every message of the input topic of the +function. The incoming input bytes are deserialized using the serde. +The process function can optionally emit an output """ from abc import abstractmethod + class Function(object): - """Interface for Pulsar Function""" - @abstractmethod - def process(self, input, context): - """Process input message""" - pass \ No newline at end of file + """Interface for Pulsar Function""" + + @abstractmethod + def process(self, input, context): + """Process input message""" + pass diff --git a/pulsar/functions/serde.py b/pulsar/functions/serde.py index 7b07673..367d55c 100644 --- a/pulsar/functions/serde.py +++ b/pulsar/functions/serde.py @@ -36,52 +36,59 @@ # specific language governing permissions and limitations # under the License. # -"""serde.py: SerDe defines the interface for serialization/deserialization. -# Everytime a message is read from pulsar topic, the serde is invoked to -# serialize the bytes into an object before invoking the process method. -# Anytime a python object needs to be written back to pulsar, it is -# serialized into bytes before writing. """ -from abc import abstractmethod +SerDe defines the interface for serialization/deserialization. +Everytime a message is read from pulsar topic, the serde is invoked to +serialize the bytes into an object before invoking the process method. +Anytime a python object needs to be written back to pulsar, it is +serialized into bytes before writing. +""" import pickle +from abc import abstractmethod + class SerDe(object): - """Interface for Serialization/Deserialization""" - @abstractmethod - def serialize(self, input): - """Serialize input message into bytes""" - pass + """Interface for Serialization/Deserialization""" + + @abstractmethod + def serialize(self, input): + """Serialize input message into bytes""" + pass + + @abstractmethod + def deserialize(self, input_bytes): + """Serialize input_bytes into an object""" + pass - @abstractmethod - def deserialize(self, input_bytes): - """Serialize input_bytes into an object""" - pass class PickleSerDe(SerDe): - """Pickle based serializer""" - def serialize(self, input): - return pickle.dumps(input) + """Pickle based serializer""" + + def serialize(self, input): + return pickle.dumps(input) + + def deserialize(self, input_bytes): + return pickle.loads(input_bytes) - def deserialize(self, input_bytes): - return pickle.loads(input_bytes) class IdentitySerDe(SerDe): - """Simple Serde that just conversion to string and back""" - def __init__(self): - self._types = [int, float, complex, str] - - def serialize(self, input): - if type(input) in self._types: - return str(input).encode('utf-8') - if type(input) == bytes: - return input - raise TypeError("IdentitySerde cannot serialize object of type %s" % type(input)) - - def deserialize(self, input_bytes): - for typ in self._types: - try: - return typ(input_bytes.decode('utf-8')) - except: - pass - return input_bytes + """Simple Serde that just conversion to string and back""" + + def __init__(self): + self._types = [int, float, complex, str] + + def serialize(self, input): + if type(input) in self._types: + return str(input).encode('utf-8') + if type(input) == bytes: + return input + raise TypeError("IdentitySerde cannot serialize object of type %s" % type(input)) + + def deserialize(self, input_bytes): + for typ in self._types: + try: + return typ(input_bytes.decode('utf-8')) + except: + pass + return input_bytes From 5152dcb6617e0ec57f619899fe310660af0dced4 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 4 Nov 2022 23:48:57 +0800 Subject: [PATCH 08/12] more attributes Signed-off-by: tison --- pulsar/__init__.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index d13796e..027132f 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -64,16 +64,21 @@ class MessageId: """ - Represents a message id + Represents a message id. + + Attributes + ---------- + + earliest: + Represents the earliest message stored in a topic + latest: + Represents the latest message published on a topic """ def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1): self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index) - 'Represents the earliest message stored in a topic' earliest = _pulsar.MessageId.earliest - - 'Represents the latest message published on a topic' latest = _pulsar.MessageId.latest def ledger_id(self): From 4d6a7954f5f76b9f62540ba43c1afe9f9689ccc6 Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 5 Nov 2022 09:40:21 +0800 Subject: [PATCH 09/12] fix format Signed-off-by: tison --- pulsar/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 027132f..bf717d2 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -512,7 +512,7 @@ def create_producer(self, topic, using ``(initialSequenceId + 1)`` as its sequence id and subsequent messages will be assigned incremental sequence ids, if not otherwise specified. send_timeout_millis: int, default=30000 - If a message is not acknowledged by the server before the `send_timeout` expires, an error will be reported. + If a message is not acknowledged by the server before the ``send_timeout`` expires, an error will be reported. compression_type: CompressionType, default=CompressionType.NONE Set the compression type for the producer. By default, message payloads are not compressed. From 29d3df4f5911b100e5c0ae098653987cbbc67ab8 Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 5 Nov 2022 23:56:31 +0800 Subject: [PATCH 10/12] more args to parameters Signed-off-by: tison --- pulsar/__init__.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index bf717d2..080c3a5 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1216,11 +1216,11 @@ def receive(self, timeout_millis=None): If a message is not immediately available, this method will block until a new message is available. - **Options** + Parameters + ---------- - * `timeout_millis`: - If specified, the receiver will raise an exception if a message is not - available within the timeout. + timeout_millis: int, optional + If specified, the receiver will raise an exception if a message is not available within the timeout. """ if timeout_millis is None: msg = self._consumer.receive() @@ -1370,11 +1370,11 @@ def read_next(self, timeout_millis=None): If a message is not immediately available, this method will block until a new message is available. - **Options** + Parameters + ---------- - * `timeout_millis`: - If specified, the receiver will raise an exception if a message is not - available within the timeout. + timeout_millis: int, optional + If specified, the receiver will raise an exception if a message is not available within the timeout. """ if timeout_millis is None: msg = self._reader.read_next() From 17439a7c5893466d90baf45173ef122d84d89ce3 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 6 Nov 2022 00:05:03 +0800 Subject: [PATCH 11/12] fix links Signed-off-by: tison --- pulsar/__init__.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 080c3a5..c1195de 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -374,8 +374,12 @@ def __init__(self, service_url, service_url: str The Pulsar service url eg: pulsar://my-broker.com:6650/ authentication: Authentication, optional - Set the authentication provider to be used with the broker. For example: - `AuthenticationTls`, `AuthenticationToken`, `AuthenticationAthenz` or `AuthenticationOauth2` + Set the authentication provider to be used with the broker. Supported methods: + + * `AuthenticationTLS` + * `AuthenticationToken` + * `AuthenticationAthenz` + * `AuthenticationOauth2` operation_timeout_seconds: int, default=30 Set timeout on client operations (subscribe, create producer, close, unsubscribe). io_threads: int, default=1 @@ -383,8 +387,8 @@ def __init__(self, service_url, message_listener_threads: int, default=1 Set the number of threads to be used by the Pulsar client when delivering messages through message listener. The default is 1 thread per Pulsar client. If using more than 1 thread, - messages for distinct `message_listener`s will be delivered in different threads, however a - single `MessageListener` will always be assigned to the same thread. + messages for distinct ``message_listener``s will be delivered in different threads, however a + single ``MessageListener`` will always be assigned to the same thread. concurrent_lookup_requests: int, default=50000 Number of concurrent lookup-requests allowed on each broker connection to prevent overload on the broker. @@ -407,7 +411,7 @@ def __init__(self, service_url, listener_name: str, optional Listener name for lookup. Clients can use listenerName to choose one of the listeners as the service URL to create a connection to the broker as long as the network is accessible. - `advertisedListeners` must be enabled in broker side. + ``advertisedListeners`` must be enabled in broker side. """ _check_type(str, service_url, 'service_url') _check_type_or_none(Authentication, authentication, 'authentication') @@ -975,7 +979,7 @@ def last_sequence_id(self): Get the last sequence id that was published by this producer. This represents either the automatically assigned or custom sequence id - (set on the `MessageBuilder`) that was published and acknowledged by the broker. + (set on the ``MessageBuilder``) that was published and acknowledged by the broker. After recreating a producer with the same producer name, this will return the last message that was published in the previous producer session, or -1 if @@ -1045,7 +1049,7 @@ def send_async(self, content, callback, Examples -------- - The `callback` will be invoked once the message has been acknowledged by the broker. + The ``callback`` will be invoked once the message has been acknowledged by the broker. .. code-block:: python @@ -1293,8 +1297,7 @@ def negative_acknowledge(self, message): def pause_message_listener(self): """ - Pause receiving messages via the `message_listener` until - `resume_message_listener()` is called. + Pause receiving messages via the ``message_listener`` until `resume_message_listener()` is called. """ self._consumer.pause_message_listener() @@ -1466,7 +1469,7 @@ class FileLogger: ---------- log_level: - The logging level, eg: `pulsar.LoggerLevel.Info` + The logging level, eg: ``pulsar.LoggerLevel.Info`` log_file: The file where to write the logs """ From 2648d5f57d7f06739f321347a588f82daa6c7a0e Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 6 Nov 2022 00:13:25 +0800 Subject: [PATCH 12/12] update README.md Signed-off-by: tison --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index b0857a0..c25c98f 100644 --- a/README.md +++ b/README.md @@ -110,3 +110,17 @@ Run a single unit test (e.g. `PulsarTest.test_tls_auth`): ```bash python3 ./tests/pulsar_test.py 'PulsarTest.test_tls_auth' ``` + +## Generate API docs + +Pulsar Python Client uses [pydoctor](https://github.com/twisted/pydoctor) to generate API docs. To generate by yourself, run the following command in the root path of this repository: + +```bash +pip3 install pydoctor +pydoctor --make-html \ + --html-viewsource-base=https://github.com/apache/pulsar-client-python/tree/ \ + --docformat=numpy --theme=readthedocs \ + --intersphinx=https://docs.python.org/3/objects.inv \ + --html-output= \ + pulsar +```