Skip to content

Commit 4927b70

Browse files
anguillanneufplamut
authored andcommitted
Pub/Sub: update how subscriber client listens to StreamingPullFuture [(#2475)](GoogleCloudPlatform/python-docs-samples#2475)
* update sub.py & requirements.txt * fix flaky subscriber test with separate subscriptions
1 parent 3c38d9e commit 4927b70

File tree

9 files changed

+148
-164
lines changed

9 files changed

+148
-164
lines changed

samples/snippets/iam_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019 Google Inc. All Rights Reserved.
1+
# Copyright 2016 Google Inc. All Rights Reserved.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.

samples/snippets/publisher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22

3-
# Copyright 2019 Google LLC. All Rights Reserved.
3+
# Copyright 2016 Google LLC. All Rights Reserved.
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.

samples/snippets/publisher_test.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019 Google Inc. All Rights Reserved.
1+
# Copyright 2016 Google Inc. All Rights Reserved.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -36,13 +36,11 @@ def topic(client):
3636
topic_path = client.topic_path(PROJECT, TOPIC)
3737

3838
try:
39-
client.delete_topic(topic_path)
40-
except Exception:
41-
pass
42-
43-
client.create_topic(topic_path)
39+
response = client.get_topic(topic_path)
40+
except: # noqa
41+
response = client.create_topic(topic_path)
4442

45-
yield topic_path
43+
yield response.name
4644

4745

4846
def _make_sleep_patch():

samples/snippets/quickstart.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22

3-
# Copyright 2019 Google Inc. All Rights Reserved.
3+
# Copyright 2016 Google Inc. All Rights Reserved.
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.

samples/snippets/quickstart/sub.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
# [START pubsub_quickstart_sub_all]
1818
import argparse
19-
import time
2019
# [START pubsub_quickstart_sub_deps]
2120
from google.cloud import pubsub_v1
2221
# [END pubsub_quickstart_sub_deps]
@@ -34,20 +33,22 @@ def sub(project_id, subscription_name):
3433
project_id, subscription_name)
3534

3635
def callback(message):
37-
print('Received message {} of message ID {}'.format(
36+
print('Received message {} of message ID {}\n'.format(
3837
message, message.message_id))
3938
# Acknowledge the message. Unack'ed messages will be redelivered.
4039
message.ack()
41-
print('Acknowledged message of message ID {}\n'.format(
42-
message.message_id))
40+
print('Acknowledged message {}\n'.format(message.message_id))
4341

44-
client.subscribe(subscription_path, callback=callback)
42+
streaming_pull_future = client.subscribe(
43+
subscription_path, callback=callback)
4544
print('Listening for messages on {}..\n'.format(subscription_path))
4645

47-
# Keep the main thread from exiting so the subscriber can
48-
# process messages in the background.
49-
while True:
50-
time.sleep(60)
46+
# Calling result() on StreamingPullFuture keeps the main thread from
47+
# exiting while messages get processed in the callbacks.
48+
try:
49+
streaming_pull_future.result()
50+
except: # noqa
51+
streaming_pull_future.cancel()
5152

5253

5354
if __name__ == '__main__':

samples/snippets/quickstart/sub_test.py

Lines changed: 42 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
import mock
1817
import os
1918
import pytest
20-
import time
2119

2220
from google.api_core.exceptions import AlreadyExists
2321
from google.cloud import pubsub_v1
@@ -29,84 +27,79 @@
2927
TOPIC = 'quickstart-sub-test-topic'
3028
SUBSCRIPTION = 'quickstart-sub-test-topic-sub'
3129

32-
33-
@pytest.fixture(scope='module')
34-
def publisher_client():
35-
yield pubsub_v1.PublisherClient()
30+
publisher_client = pubsub_v1.PublisherClient()
31+
subscriber_client = pubsub_v1.SubscriberClient()
3632

3733

3834
@pytest.fixture(scope='module')
39-
def topic_path(publisher_client):
35+
def topic_path():
4036
topic_path = publisher_client.topic_path(PROJECT, TOPIC)
4137

4238
try:
43-
publisher_client.create_topic(topic_path)
39+
topic = publisher_client.create_topic(topic_path)
40+
return topic.name
4441
except AlreadyExists:
45-
pass
46-
47-
yield topic_path
48-
49-
50-
@pytest.fixture(scope='module')
51-
def subscriber_client():
52-
yield pubsub_v1.SubscriberClient()
42+
return topic_path
5343

5444

5545
@pytest.fixture(scope='module')
56-
def subscription(subscriber_client, topic_path):
46+
def subscription_path(topic_path):
5747
subscription_path = subscriber_client.subscription_path(
5848
PROJECT, SUBSCRIPTION)
5949

6050
try:
61-
subscriber_client.create_subscription(subscription_path, topic_path)
51+
subscription = subscriber_client.create_subscription(
52+
subscription_path, topic_path)
53+
return subscription.name
6254
except AlreadyExists:
63-
pass
64-
65-
yield SUBSCRIPTION
55+
return subscription_path
6656

6757

68-
@pytest.fixture
69-
def to_delete(publisher_client, subscriber_client):
70-
doomed = []
71-
yield doomed
72-
for client, item in doomed:
58+
def _to_delete(resource_paths):
59+
for item in resource_paths:
7360
if 'topics' in item:
7461
publisher_client.delete_topic(item)
7562
if 'subscriptions' in item:
7663
subscriber_client.delete_subscription(item)
7764

7865

79-
def _make_sleep_patch():
80-
real_sleep = time.sleep
66+
def _publish_messages(topic_path):
67+
publish_future = publisher_client.publish(topic_path, data=b'Hello World!')
68+
publish_future.result()
69+
8170

82-
def new_sleep(period):
83-
if period == 60:
84-
real_sleep(10)
85-
raise RuntimeError('sigil')
86-
else:
87-
real_sleep(period)
71+
def _sub_timeout(project_id, subscription_name):
72+
# This is an exactly copy of `sub.py` except
73+
# StreamingPullFuture.result() will time out after 10s.
74+
client = pubsub_v1.SubscriberClient()
75+
subscription_path = client.subscription_path(
76+
project_id, subscription_name)
8877

89-
return mock.patch('time.sleep', new=new_sleep)
78+
def callback(message):
79+
print('Received message {} of message ID {}\n'.format(
80+
message, message.message_id))
81+
message.ack()
82+
print('Acknowledged message {}\n'.format(message.message_id))
9083

84+
streaming_pull_future = client.subscribe(
85+
subscription_path, callback=callback)
86+
print('Listening for messages on {}..\n'.format(subscription_path))
87+
88+
try:
89+
streaming_pull_future.result(timeout=10)
90+
except: # noqa
91+
streaming_pull_future.cancel()
9192

92-
def test_sub(publisher_client,
93-
topic_path,
94-
subscriber_client,
95-
subscription,
96-
to_delete,
97-
capsys):
9893

99-
publisher_client.publish(topic_path, data=b'Hello, World!')
94+
def test_sub(monkeypatch, topic_path, subscription_path, capsys):
95+
monkeypatch.setattr(sub, 'sub', _sub_timeout)
10096

101-
to_delete.append((publisher_client, topic_path))
97+
_publish_messages(topic_path)
10298

103-
with _make_sleep_patch():
104-
with pytest.raises(RuntimeError, match='sigil'):
105-
sub.sub(PROJECT, subscription)
99+
sub.sub(PROJECT, SUBSCRIPTION)
106100

107-
to_delete.append((subscriber_client,
108-
'projects/{}/subscriptions/{}'.format(PROJECT,
109-
SUBSCRIPTION)))
101+
# Clean up resources.
102+
_to_delete([topic_path, subscription_path])
110103

111104
out, _ = capsys.readouterr()
112105
assert "Received message" in out

samples/snippets/quickstart_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22

3-
# Copyright 2019 Google Inc. All Rights Reserved.
3+
# Copyright 2016 Google Inc. All Rights Reserved.
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.

samples/snippets/subscriber.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22

3-
# Copyright 2019 Google Inc. All Rights Reserved.
3+
# Copyright 2016 Google Inc. All Rights Reserved.
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.

0 commit comments

Comments
 (0)