Skip to content

Commit 6a53134

Browse files
author
Takashi Matsuo
authored
[tasks] testing: use fixtures for the queue (#4049)
fixes #4045 fixes #4044 I don't know why these tests started to fail, but anyways we'd better use fixtures and temporary queues.
1 parent 9446474 commit 6a53134

File tree

2 files changed

+44
-4
lines changed

2 files changed

+44
-4
lines changed

tasks/create_http_task_test.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,35 @@
1313
# limitations under the License.
1414

1515
import os
16+
import uuid
17+
18+
from google.cloud import tasks_v2
19+
import pytest
1620

1721
import create_http_task
1822

1923
TEST_PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')
2024
TEST_LOCATION = os.getenv('TEST_QUEUE_LOCATION', 'us-central1')
21-
TEST_QUEUE_NAME = os.getenv('TEST_QUEUE_NAME', 'my-queue')
25+
TEST_QUEUE_NAME = f'my-queue-{uuid.uuid4().hex}'
26+
27+
28+
@pytest.fixture()
29+
def test_queue():
30+
client = tasks_v2.CloudTasksClient()
31+
parent = client.location_path(TEST_PROJECT_ID, TEST_LOCATION)
32+
queue = {
33+
# The fully qualified path to the queue
34+
'name': client.queue_path(
35+
TEST_PROJECT_ID, TEST_LOCATION, TEST_QUEUE_NAME),
36+
}
37+
q = client.create_queue(parent, queue)
38+
39+
yield q
40+
41+
client.delete_queue(q.name)
2242

2343

24-
def test_create_http_task():
44+
def test_create_http_task(test_queue):
2545
url = 'https://example.com/task_handler'
2646
result = create_http_task.create_http_task(
2747
TEST_PROJECT_ID, TEST_QUEUE_NAME, TEST_LOCATION, url)

tasks/create_http_task_with_token_test.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,37 @@
1313
# limitations under the License.
1414

1515
import os
16+
import uuid
17+
18+
from google.cloud import tasks_v2
19+
import pytest
1620

1721
import create_http_task_with_token
1822

1923
TEST_PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')
2024
TEST_LOCATION = os.getenv('TEST_QUEUE_LOCATION', 'us-central1')
21-
TEST_QUEUE_NAME = os.getenv('TEST_QUEUE_NAME', 'my-queue')
25+
TEST_QUEUE_NAME = f'my-queue-{uuid.uuid4().hex}'
2226
TEST_SERVICE_ACCOUNT = (
2327
'test-run-invoker@python-docs-samples-tests.iam.gserviceaccount.com')
2428

2529

26-
def test_create_http_task_with_token():
30+
@pytest.fixture()
31+
def test_queue():
32+
client = tasks_v2.CloudTasksClient()
33+
parent = client.location_path(TEST_PROJECT_ID, TEST_LOCATION)
34+
queue = {
35+
# The fully qualified path to the queue
36+
'name': client.queue_path(
37+
TEST_PROJECT_ID, TEST_LOCATION, TEST_QUEUE_NAME),
38+
}
39+
q = client.create_queue(parent, queue)
40+
41+
yield q
42+
43+
client.delete_queue(q.name)
44+
45+
46+
def test_create_http_task_with_token(test_queue):
2747
url = 'https://example.com/task_handler'
2848
result = create_http_task_with_token.create_http_task(TEST_PROJECT_ID,
2949
TEST_QUEUE_NAME,

0 commit comments

Comments
 (0)