Skip to content

[ServiceBus] expand kwargs in public API #22353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jan 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import datetime
import uuid
import logging
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Any, Mapping, cast
from typing import Optional, Dict, List, Union, Iterable, TYPE_CHECKING, Any, Mapping, cast

import six

Expand Down Expand Up @@ -92,8 +92,24 @@ class ServiceBusMessage(

"""

def __init__(self, body, **kwargs):
# type: (Optional[Union[str, bytes]], Any) -> None
def __init__(
self,
body: Optional[Union[str, bytes]],
*,
application_properties: Optional[Dict[str, Any]] = None,
session_id: Optional[str] = None,
message_id: Optional[str] = None,
scheduled_enqueue_time_utc: Optional[datetime.datetime] = None,
time_to_live: Optional[datetime.timedelta] = None,
content_type: Optional[str] = None,
correlation_id: Optional[str] = None,
subject: Optional[str] = None,
partition_key: Optional[str] = None,
to: Optional[str] = None,
reply_to: Optional[str] = None,
reply_to_session_id: Optional[str] = None,
**kwargs: Any
) -> None:
# Although we might normally thread through **kwargs this causes
# problems as MessageProperties won't absorb spurious args.
self._encoding = kwargs.pop("encoding", "UTF-8")
Expand All @@ -108,20 +124,18 @@ def __init__(self, body, **kwargs):
self._raw_amqp_message = AmqpAnnotatedMessage(message=self.message)
else:
self._build_message(body)
self.application_properties = kwargs.pop("application_properties", None)
self.session_id = kwargs.pop("session_id", None)
self.message_id = kwargs.pop("message_id", None)
self.content_type = kwargs.pop("content_type", None)
self.correlation_id = kwargs.pop("correlation_id", None)
self.to = kwargs.pop("to", None)
self.reply_to = kwargs.pop("reply_to", None)
self.reply_to_session_id = kwargs.pop("reply_to_session_id", None)
self.subject = kwargs.pop("subject", None)
self.scheduled_enqueue_time_utc = kwargs.pop(
"scheduled_enqueue_time_utc", None
)
self.time_to_live = kwargs.pop("time_to_live", None)
self.partition_key = kwargs.pop("partition_key", None)
self.application_properties = application_properties
self.session_id = session_id
self.message_id = message_id
self.content_type = content_type
self.correlation_id = correlation_id
self.to = to
self.reply_to = reply_to
self.reply_to_session_id = reply_to_session_id
self.subject = subject
self.scheduled_enqueue_time_utc = scheduled_enqueue_time_utc
self.time_to_live = time_to_live
self.partition_key = partition_key

def __str__(self):
# type: () -> str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Any, Union, TYPE_CHECKING
from typing import Any, Union, Optional, TYPE_CHECKING
import logging
from weakref import WeakSet

Expand All @@ -15,16 +15,25 @@
)
from ._servicebus_sender import ServiceBusSender
from ._servicebus_receiver import ServiceBusReceiver
from ._common.auto_lock_renewer import AutoLockRenewer
from ._common._configuration import Configuration
from ._common.utils import (
create_authentication,
generate_dead_letter_entity_name,
strip_protocol_from_uri,
)
from ._common.constants import ServiceBusSubQueue
from ._common.constants import (
ServiceBusSubQueue,
ServiceBusReceiveMode,
)

if TYPE_CHECKING:
from azure.core.credentials import TokenCredential, AzureSasCredential, AzureNamedKeyCredential
from azure.core.credentials import (
TokenCredential,
AzureSasCredential,
AzureNamedKeyCredential,
)


_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -75,15 +84,32 @@ class ServiceBusClient(object):

"""

def __init__(self, fully_qualified_namespace, credential, **kwargs):
# type: (str, Union[TokenCredential, AzureSasCredential, AzureNamedKeyCredential], Any) -> None
def __init__(
self,
fully_qualified_namespace: str,
credential: Union[
"TokenCredential", "AzureSasCredential", "AzureNamedKeyCredential"
],
*,
retry_total: int = 3,
retry_backoff_factor: float = 0.8,
retry_backoff_max: int = 120,
retry_mode: str = "exponential",
**kwargs: Any
) -> None:
# If the user provided http:// or sb://, let's be polite and strip that.
self.fully_qualified_namespace = strip_protocol_from_uri(
fully_qualified_namespace.strip()
)

self._credential = credential
self._config = Configuration(**kwargs)
self._config = Configuration(
retry_total=retry_total,
retry_backoff_factor=retry_backoff_factor,
retry_backoff_max=retry_backoff_max,
retry_mode=retry_mode,
**kwargs
)
self._connection = None
# Optional entity name, can be the name of Queue or Topic. Intentionally not advertised, typically be needed.
self._entity_name = kwargs.get("entity_name")
Expand Down Expand Up @@ -134,8 +160,16 @@ def close(self):
self._connection.destroy()

@classmethod
def from_connection_string(cls, conn_str, **kwargs):
# type: (str, Any) -> ServiceBusClient
def from_connection_string(
cls,
conn_str: str,
*,
retry_total: int = 3,
retry_backoff_factor: float = 0.8,
retry_backoff_max: int = 120,
retry_mode: str = "exponential",
**kwargs: Any
) -> "ServiceBusClient":
"""
Create a ServiceBusClient from a connection string.

Expand Down Expand Up @@ -181,6 +215,10 @@ def from_connection_string(cls, conn_str, **kwargs):
fully_qualified_namespace=host,
entity_name=entity_in_conn_str or kwargs.pop("entity_name", None),
credential=credential, # type: ignore
retry_total=retry_total,
retry_backoff_factor=retry_backoff_factor,
retry_backoff_max=retry_backoff_max,
retry_mode=retry_mode,
**kwargs
)

Expand Down Expand Up @@ -227,8 +265,20 @@ def get_queue_sender(self, queue_name, **kwargs):
self._handlers.add(handler)
return handler

def get_queue_receiver(self, queue_name, **kwargs):
# type: (str, Any) -> ServiceBusReceiver
def get_queue_receiver(
self,
queue_name: str,
*,
session_id: Optional[str] = None,
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
receive_mode: Union[
ServiceBusReceiveMode, str
] = ServiceBusReceiveMode.PEEK_LOCK,
max_wait_time: Optional[float] = None,
auto_lock_renewer: Optional[AutoLockRenewer] = None,
prefetch_count: int = 0,
**kwargs: Any
) -> ServiceBusReceiver:
"""Get ServiceBusReceiver for the specific queue.

:param str queue_name: The path of specific Service Bus Queue the client connects to.
Expand Down Expand Up @@ -280,8 +330,7 @@ def get_queue_receiver(self, queue_name, **kwargs):
"the connection string used to construct the ServiceBusClient."
)

sub_queue = kwargs.get("sub_queue", None)
if sub_queue and kwargs.get("session_id"):
if sub_queue and session_id:
raise ValueError(
"session_id and sub_queue can not be specified simultaneously. "
"To connect to the sub queue of a sessionful queue, "
Expand Down Expand Up @@ -314,6 +363,12 @@ def get_queue_receiver(self, queue_name, **kwargs):
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
session_id=session_id,
sub_queue=sub_queue,
receive_mode=receive_mode,
max_wait_time=max_wait_time,
auto_lock_renewer=auto_lock_renewer,
prefetch_count=prefetch_count,
**kwargs
)
self._handlers.add(handler)
Expand Down Expand Up @@ -361,8 +416,21 @@ def get_topic_sender(self, topic_name, **kwargs):
self._handlers.add(handler)
return handler

def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
# type: (str, str, Any) -> ServiceBusReceiver
def get_subscription_receiver(
self,
topic_name: str,
subscription_name: str,
*,
session_id: Optional[str] = None,
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
receive_mode: Union[
ServiceBusReceiveMode, str
] = ServiceBusReceiveMode.PEEK_LOCK,
max_wait_time: Optional[float] = None,
auto_lock_renewer: Optional[AutoLockRenewer] = None,
prefetch_count: int = 0,
**kwargs: Any
) -> ServiceBusReceiver:
"""Get ServiceBusReceiver for the specific subscription under the topic.

:param str topic_name: The name of specific Service Bus Topic the client connects to.
Expand Down Expand Up @@ -417,8 +485,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
"the connection string used to construct the ServiceBusClient."
)

sub_queue = kwargs.get("sub_queue", None)
if sub_queue and kwargs.get("session_id"):
if sub_queue and session_id:
raise ValueError(
"session_id and sub_queue can not be specified simultaneously. "
"To connect to the sub queue of a sessionful subscription, "
Expand Down Expand Up @@ -446,6 +513,12 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
session_id=session_id,
sub_queue=sub_queue,
receive_mode=receive_mode,
max_wait_time=max_wait_time,
auto_lock_renewer=auto_lock_renewer,
prefetch_count=prefetch_count,
**kwargs
)
except ValueError:
Expand All @@ -467,6 +540,12 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
session_id=session_id,
sub_queue=sub_queue,
receive_mode=receive_mode,
max_wait_time=max_wait_time,
auto_lock_renewer=auto_lock_renewer,
prefetch_count=prefetch_count,
**kwargs
)
self._handlers.add(handler)
Expand Down
Loading