Skip to content

Commit 6c7677e

Browse files
author
Gurov Ilya
authored
refactor: incorporate will_accept() checks into publish() (#108)
1 parent 0132a46 commit 6c7677e

File tree

5 files changed

+47
-69
lines changed

5 files changed

+47
-69
lines changed

google/cloud/pubsub_v1/publisher/_batch/base.py

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -109,32 +109,6 @@ def status(self):
109109
"""
110110
raise NotImplementedError
111111

112-
def will_accept(self, message):
113-
"""Return True if the batch is able to accept the message.
114-
115-
In concurrent implementations, the attributes on the current batch
116-
may be modified by other workers. With this in mind, the caller will
117-
likely want to hold a lock that will make sure the state remains
118-
the same after the "will accept?" question is answered.
119-
120-
Args:
121-
message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message.
122-
123-
Returns:
124-
bool: Whether this batch can accept the message.
125-
"""
126-
# If this batch is not accepting messages generally, return False.
127-
if self.status != BatchStatus.ACCEPTING_MESSAGES:
128-
return False
129-
130-
# If this message will make the batch exceed the ``max_messages``
131-
# setting, return False.
132-
if len(self.messages) >= self.settings.max_messages:
133-
return False
134-
135-
# Okay, everything is good.
136-
return True
137-
138112
def cancel(self, cancellation_reason):
139113
"""Complete pending futures with an exception.
140114

google/cloud/pubsub_v1/publisher/_batch/thread.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,8 @@ def publish(self, message):
333333
self._status != base.BatchStatus.ERROR
334334
), "Publish after stop() or publish error."
335335

336-
if not self.will_accept(message):
337-
return future
336+
if self.status != base.BatchStatus.ACCEPTING_MESSAGES:
337+
return
338338

339339
size_increase = types.PublishRequest(messages=[message]).ByteSize()
340340

tests/unit/pubsub_v1/publisher/batch/test_base.py

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -46,33 +46,3 @@ def test_len():
4646
assert len(batch) == 0
4747
batch.publish(types.PubsubMessage(data=b"foo"))
4848
assert len(batch) == 1
49-
50-
51-
def test_will_accept():
52-
batch = create_batch(status=BatchStatus.ACCEPTING_MESSAGES)
53-
message = types.PubsubMessage()
54-
assert batch.will_accept(message) is True
55-
56-
57-
def test_will_accept_oversize():
58-
batch = create_batch(
59-
settings=types.BatchSettings(max_bytes=10),
60-
status=BatchStatus.ACCEPTING_MESSAGES,
61-
)
62-
message = types.PubsubMessage(data=b"abcdefghijklmnopqrstuvwxyz")
63-
assert batch.will_accept(message) is True
64-
65-
66-
def test_will_not_accept_status():
67-
batch = create_batch(status="talk to the hand")
68-
message = types.PubsubMessage()
69-
assert batch.will_accept(message) is False
70-
71-
72-
def test_will_not_accept_number():
73-
batch = create_batch(
74-
settings=types.BatchSettings(max_messages=-1),
75-
status=BatchStatus.ACCEPTING_MESSAGES,
76-
)
77-
message = types.PubsubMessage(data=b"abc")
78-
assert batch.will_accept(message) is False

tests/unit/pubsub_v1/publisher/batch/test_thread.py

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -287,18 +287,56 @@ def test_publish_updating_batch_size():
287287
assert batch.size > 0 # I do not always trust protobuf.
288288

289289

290-
def test_publish_not_will_accept():
290+
def test_publish():
291+
batch = create_batch()
292+
message = types.PubsubMessage()
293+
future = batch.publish(message)
294+
295+
assert len(batch.messages) == 1
296+
assert batch._futures == [future]
297+
298+
299+
def test_publish_max_messages_zero():
291300
batch = create_batch(topic="topic_foo", max_messages=0)
292-
base_request_size = types.PublishRequest(topic="topic_foo").ByteSize()
293301

294-
# Publish the message.
295302
message = types.PubsubMessage(data=b"foobarbaz")
303+
with mock.patch.object(batch, "commit") as commit:
304+
future = batch.publish(message)
305+
306+
assert future is not None
307+
assert len(batch.messages) == 1
308+
assert batch._futures == [future]
309+
commit.assert_called_once()
310+
311+
312+
def test_publish_max_messages_enforced():
313+
batch = create_batch(topic="topic_foo", max_messages=1)
314+
315+
message = types.PubsubMessage(data=b"foobarbaz")
316+
message2 = types.PubsubMessage(data=b"foobarbaz2")
317+
318+
future = batch.publish(message)
319+
future2 = batch.publish(message2)
320+
321+
assert future is not None
322+
assert future2 is None
323+
assert len(batch.messages) == 1
324+
assert len(batch._futures) == 1
325+
326+
327+
def test_publish_max_bytes_enforced():
328+
batch = create_batch(topic="topic_foo", max_bytes=15)
329+
330+
message = types.PubsubMessage(data=b"foobarbaz")
331+
message2 = types.PubsubMessage(data=b"foobarbaz2")
332+
296333
future = batch.publish(message)
334+
future2 = batch.publish(message2)
297335

298-
assert future is None
299-
assert batch.size == base_request_size
300-
assert batch.messages == []
301-
assert batch._futures == []
336+
assert future is not None
337+
assert future2 is None
338+
assert len(batch.messages) == 1
339+
assert len(batch._futures) == 1
302340

303341

304342
def test_publish_exceed_max_messages():

tests/unit/pubsub_v1/publisher/test_publisher_client.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ def test_publish():
135135
batch = mock.Mock(spec=client._batch_class)
136136

137137
# Set the mock up to claim indiscriminately that it accepts all messages.
138-
batch.will_accept.return_value = True
139138
batch.publish.side_effect = (future1, future2)
140139

141140
topic = "topic/path"
@@ -169,7 +168,6 @@ def test_publish_error_exceeding_flow_control_limits():
169168
client = publisher.Client(credentials=creds, publisher_options=publisher_options)
170169

171170
mock_batch = mock.Mock(spec=client._batch_class)
172-
mock_batch.will_accept.return_value = True
173171
topic = "topic/path"
174172
client._set_batch(topic, mock_batch)
175173

@@ -216,7 +214,6 @@ def test_publish_attrs_bytestring():
216214
# Use a mock in lieu of the actual batch class.
217215
batch = mock.Mock(spec=client._batch_class)
218216
# Set the mock up to claim indiscriminately that it accepts all messages.
219-
batch.will_accept.return_value = True
220217

221218
topic = "topic/path"
222219
client._set_batch(topic, batch)
@@ -431,7 +428,6 @@ def test_publish_with_ordering_key():
431428
future1.add_done_callback = mock.Mock(spec=["__call__"])
432429
future2.add_done_callback = mock.Mock(spec=["__call__"])
433430

434-
batch.will_accept.return_value = True
435431
batch.publish.side_effect = (future1, future2)
436432

437433
topic = "topic/path"

0 commit comments

Comments
 (0)