Skip to content

Client streaming #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ dist
**/*.egg-info
output
.idea
.DS_Store
.tox
109 changes: 9 additions & 100 deletions betterproto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,30 @@
import struct
import sys
from abc import ABC
from base64 import b64encode, b64decode
from base64 import b64decode, b64encode
from datetime import datetime, timedelta, timezone
import stringcase
from typing import (
Any,
AsyncGenerator,
Callable,
Collection,
Dict,
Generator,
Iterator,
List,
Mapping,
Optional,
Set,
SupportsBytes,
Tuple,
Type,
TypeVar,
Union,
get_type_hints,
TYPE_CHECKING,
)


import grpclib.const
import stringcase

from ._types import ST, T
from .casing import safe_snake_case

if TYPE_CHECKING:
from grpclib._protocols import IProtoMessage
from grpclib.client import Channel
from grpclib.metadata import Deadline
from .grpc.grpclib_client import ServiceStub

if not (sys.version_info.major == 3 and sys.version_info.minor >= 7):
# Apply backport of datetime.fromisoformat from 3.7
Expand Down Expand Up @@ -429,10 +422,6 @@ def parse_fields(value: bytes) -> Generator[ParsedField, None, None]:
)


# Bound type variable to allow methods to return `self` of subclasses
T = TypeVar("T", bound="Message")


class ProtoClassMetadata:
oneof_group_by_field: Dict[str, str]
oneof_field_by_group: Dict[str, Set[dataclasses.Field]]
Expand All @@ -451,7 +440,7 @@ class ProtoClassMetadata:

def __init__(self, cls: Type["Message"]):
by_field = {}
by_group = {}
by_group: Dict[str, Set] = {}
by_field_name = {}
by_field_number = {}

Expand Down Expand Up @@ -604,7 +593,7 @@ def __bytes__(self) -> bytes:
serialize_empty = False
if isinstance(value, Message) and value._serialized_on_wire:
# Empty messages can still be sent on the wire if they were
# set (or received empty).
# set (or recieved empty).
serialize_empty = True

if value == self._get_field_default(field_name) and not (
Expand Down Expand Up @@ -791,7 +780,7 @@ def FromString(cls: Type[T], data: bytes) -> T:

def to_dict(
self, casing: Casing = Casing.CAMEL, include_default_values: bool = False
) -> dict:
) -> Dict[str, Any]:
"""
Returns a dict representation of this message instance which can be
used to serialize to e.g. JSON. Defaults to camel casing for
Expand Down Expand Up @@ -1024,83 +1013,3 @@ def _get_wrapper(proto_type: str) -> Type:
TYPE_STRING: StringValue,
TYPE_BYTES: BytesValue,
}[proto_type]


_Value = Union[str, bytes]
_MetadataLike = Union[Mapping[str, _Value], Collection[Tuple[str, _Value]]]


class ServiceStub(ABC):
"""
Base class for async gRPC service stubs.
"""

def __init__(
self,
channel: "Channel",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional[_MetadataLike] = None,
) -> None:
self.channel = channel
self.timeout = timeout
self.deadline = deadline
self.metadata = metadata

def __resolve_request_kwargs(
self,
timeout: Optional[float],
deadline: Optional["Deadline"],
metadata: Optional[_MetadataLike],
):
return {
"timeout": self.timeout if timeout is None else timeout,
"deadline": self.deadline if deadline is None else deadline,
"metadata": self.metadata if metadata is None else metadata,
}

async def _unary_unary(
self,
route: str,
request: "IProtoMessage",
response_type: Type[T],
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional[_MetadataLike] = None,
) -> T:
"""Make a unary request and return the response."""
async with self.channel.request(
route,
grpclib.const.Cardinality.UNARY_UNARY,
type(request),
response_type,
**self.__resolve_request_kwargs(timeout, deadline, metadata),
) as stream:
await stream.send_message(request, end=True)
response = await stream.recv_message()
assert response is not None
return response

async def _unary_stream(
self,
route: str,
request: "IProtoMessage",
response_type: Type[T],
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional[_MetadataLike] = None,
) -> AsyncGenerator[T, None]:
"""Make a unary request and return the stream response iterator."""
async with self.channel.request(
route,
grpclib.const.Cardinality.UNARY_STREAM,
type(request),
response_type,
**self.__resolve_request_kwargs(timeout, deadline, metadata),
) as stream:
await stream.send_message(request, end=True)
async for message in stream:
yield message
9 changes: 9 additions & 0 deletions betterproto/_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from typing import TYPE_CHECKING, TypeVar

if TYPE_CHECKING:
from . import Message
from grpclib._protocols import IProtoMessage

# Bound type variable to allow methods to return `self` of subclasses
T = TypeVar("T", bound="Message")
ST = TypeVar("ST", bound="IProtoMessage")
Empty file added betterproto/grpc/__init__.py
Empty file.
170 changes: 170 additions & 0 deletions betterproto/grpc/grpclib_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
from abc import ABC
import asyncio
import grpclib.const
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Collection,
Iterable,
Mapping,
Optional,
Tuple,
TYPE_CHECKING,
Type,
Union,
)
from .._types import ST, T

if TYPE_CHECKING:
from grpclib._protocols import IProtoMessage
from grpclib.client import Channel, Stream
from grpclib.metadata import Deadline


_Value = Union[str, bytes]
_MetadataLike = Union[Mapping[str, _Value], Collection[Tuple[str, _Value]]]
_MessageSource = Union[Iterable["IProtoMessage"], AsyncIterable["IProtoMessage"]]


class ServiceStub(ABC):
"""
Base class for async gRPC clients.
"""

def __init__(
self,
channel: "Channel",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional[_MetadataLike] = None,
) -> None:
self.channel = channel
self.timeout = timeout
self.deadline = deadline
self.metadata = metadata

def __resolve_request_kwargs(
self,
timeout: Optional[float],
deadline: Optional["Deadline"],
metadata: Optional[_MetadataLike],
):
return {
"timeout": self.timeout if timeout is None else timeout,
"deadline": self.deadline if deadline is None else deadline,
"metadata": self.metadata if metadata is None else metadata,
}

async def _unary_unary(
self,
route: str,
request: "IProtoMessage",
response_type: Type[T],
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional[_MetadataLike] = None,
) -> T:
"""Make a unary request and return the response."""
async with self.channel.request(
route,
grpclib.const.Cardinality.UNARY_UNARY,
type(request),
response_type,
**self.__resolve_request_kwargs(timeout, deadline, metadata),
) as stream:
await stream.send_message(request, end=True)
response = await stream.recv_message()
assert response is not None
return response

async def _unary_stream(
self,
route: str,
request: "IProtoMessage",
response_type: Type[T],
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional[_MetadataLike] = None,
) -> AsyncIterator[T]:
"""Make a unary request and return the stream response iterator."""
async with self.channel.request(
route,
grpclib.const.Cardinality.UNARY_STREAM,
type(request),
response_type,
**self.__resolve_request_kwargs(timeout, deadline, metadata),
) as stream:
await stream.send_message(request, end=True)
async for message in stream:
yield message

async def _stream_unary(
self,
route: str,
request_iterator: _MessageSource,
request_type: Type[ST],
response_type: Type[T],
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional[_MetadataLike] = None,
) -> T:
"""Make a stream request and return the response."""
async with self.channel.request(
route,
grpclib.const.Cardinality.STREAM_UNARY,
request_type,
response_type,
**self.__resolve_request_kwargs(timeout, deadline, metadata),
) as stream:
await self._send_messages(stream, request_iterator)
response = await stream.recv_message()
assert response is not None
return response

async def _stream_stream(
self,
route: str,
request_iterator: _MessageSource,
request_type: Type[ST],
response_type: Type[T],
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional[_MetadataLike] = None,
) -> AsyncIterator[T]:
"""
Make a stream request and return an AsyncIterator to iterate over response
messages.
"""
async with self.channel.request(
route,
grpclib.const.Cardinality.STREAM_STREAM,
request_type,
response_type,
**self.__resolve_request_kwargs(timeout, deadline, metadata),
) as stream:
await stream.send_request()
sending_task = asyncio.ensure_future(
self._send_messages(stream, request_iterator)
)
try:
async for response in stream:
yield response
except:
sending_task.cancel()
raise

@staticmethod
async def _send_messages(stream, messages: _MessageSource):
if isinstance(messages, AsyncIterable):
async for message in messages:
await stream.send_message(message)
else:
for message in messages:
await stream.send_message(message)
await stream.end()
Empty file.
Loading