From cbf3de41eadc1d3a79c195f1a8f88bd523814048 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 3 Jun 2026 12:05:56 +0200 Subject: [PATCH 1/6] test: add polling helpers for eventually-consistent integration tests Add a wall-clock-deadline `poll_until_condition` helper, generalize `maybe_await` to any awaitable, and refactor `collect_iterate_until_present` to reuse a shared drain step. --- tests/integration/_utils.py | 86 +++++++++++++++++++++++++++---------- 1 file changed, 64 insertions(+), 22 deletions(-) diff --git a/tests/integration/_utils.py b/tests/integration/_utils.py index 4412808a..186c1f7c 100644 --- a/tests/integration/_utils.py +++ b/tests/integration/_utils.py @@ -1,17 +1,18 @@ from __future__ import annotations import asyncio +import inspect import secrets import string import time from collections.abc import AsyncIterator, Iterator from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Protocol, TypeVar, overload +from typing import TYPE_CHECKING, Any, Protocol, TypeVar, cast, overload import pytest if TYPE_CHECKING: - from collections.abc import Callable, Coroutine + from collections.abc import Awaitable, Callable # Environment variable names for test configuration TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN' @@ -92,22 +93,14 @@ def get_random_resource_name(label: str) -> str: return name_template.format(label, get_crypto_random_object_id(random_id_length)) -@overload -async def maybe_await(value: Coroutine[Any, Any, T]) -> T: ... - - -@overload -async def maybe_await(value: T) -> T: ... - - -async def maybe_await(value: T | Coroutine[Any, Any, T]) -> T: - """Await coroutines, pass through other values. +async def maybe_await(value: Awaitable[T] | T) -> T: + """Await `value` if it is awaitable, otherwise return it unchanged. Enables unified test code for both sync and async clients: result = await maybe_await(client.datasets().list()) """ - if hasattr(value, '__await__'): - return await value # ty: ignore[invalid-await] + if inspect.isawaitable(value): + return await cast('Awaitable[T]', value) return value @@ -119,6 +112,49 @@ async def maybe_sleep(seconds: float, *, is_async: bool) -> None: time.sleep(seconds) # noqa: ASYNC251 +@overload +async def poll_until_condition( + fn: Callable[[], Awaitable[T]], + condition: Callable[[T], bool] = ..., + *, + timeout: float = ..., + poll_interval: float = ..., +) -> T: ... +@overload +async def poll_until_condition( + fn: Callable[[], T], + condition: Callable[[T], bool] = ..., + *, + timeout: float = ..., + poll_interval: float = ..., +) -> T: ... +async def poll_until_condition( + fn: Callable[[], Awaitable[T] | T], + condition: Callable[[T], bool] = bool, + *, + timeout: float = 5, + poll_interval: float = 1, +) -> T: + """Poll `fn` until `condition(result)` is True or the timeout expires. + + Polls `fn` at `poll_interval`-second intervals until `condition` is satisfied or `timeout` seconds have elapsed. + Returns the last polled result regardless of whether the condition was met, so the caller can run its own + assertion. The default condition checks for a truthy result. + + Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent state (e.g. a freshly + created resource appearing in a listing) that may take a variable amount of time to propagate. + """ + deadline = time.monotonic() + timeout + result = await maybe_await(fn()) + while not condition(result): + remaining = deadline - time.monotonic() + if remaining <= 0: + break + await asyncio.sleep(min(poll_interval, remaining)) + result = await maybe_await(fn()) + return result + + async def collect_iterate_until_present( iterator_factory: Callable[[], Iterator[_HasIdT] | AsyncIterator[_HasIdT]], expected_ids: set[str], @@ -132,7 +168,7 @@ async def collect_iterate_until_present( Handles eventual consistency on listing endpoints: under parallel load a freshly created resource may not appear in the listing for a short window. Each attempt - builds a fresh iterator via `iterator_factory`, drains it, and breaks early once + builds a fresh iterator via `iterator_factory`, drains it, and stops early once `expected_ids` is a subset of the collected items' `.id` values. The most recent collection is returned regardless of whether the condition was met, so the caller can run its own assertion with a helpful failure message. @@ -142,18 +178,16 @@ async def collect_iterate_until_present( expected_ids: IDs that must all appear in the collected items. item_type: Asserted to match the runtime type of each yielded item. is_async: Whether the iterator is async (and so are sleeps). - max_attempts: Maximum number of polling rounds. - interval: Seconds to sleep before each attempt. + max_attempts: Maximum number of polling rounds, guaranteed regardless of how long each drain takes. + interval: Seconds to sleep between attempts. Returns: The most recently collected items. """ - collected: list[_HasIdT] = [] - for attempt in range(max_attempts): - if attempt > 0: - await maybe_sleep(interval, is_async=is_async) + + async def drain() -> list[_HasIdT]: iterator = iterator_factory() - collected = [] + collected: list[_HasIdT] = [] if is_async: assert isinstance(iterator, AsyncIterator) async for item in iterator: @@ -164,8 +198,16 @@ async def collect_iterate_until_present( for item in iterator: assert isinstance(item, item_type) collected.append(item) + return collected + + # Loop on attempt count rather than a wall-clock deadline: drains take HTTP time, and charging it + # against a deadline would mean fewer retries under load — exactly when they are needed most. + collected = await drain() + for _ in range(max_attempts - 1): if expected_ids.issubset(item.id for item in collected): break + await maybe_sleep(interval, is_async=is_async) + collected = await drain() return collected From e23e54d97468715b68adb7019e8b67ef542b6fa5 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 3 Jun 2026 12:06:02 +0200 Subject: [PATCH 2/6] test: fix flaky test_request_queue_unlock_requests Lock writes propagate asynchronously after `list_and_lock_head` returns, so unlocking immediately could see fewer locks than acquired; poll `list_head` until the locked IDs disappear from the queue head before unlocking. --- tests/integration/test_request_queue.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index d0986903..1f773d20 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -12,6 +12,7 @@ get_random_string, maybe_await, maybe_sleep, + poll_until_condition, ) from apify_client._models import ( BatchAddResult, @@ -560,6 +561,17 @@ async def test_request_queue_unlock_requests(client: ApifyClient | ApifyClientAs assert isinstance(result, LockedRequestQueueHead) lock_response = result assert len(lock_response.items) == 3 + locked_ids = {item.id for item in lock_response.items} + + # Locks are acknowledged before they are visible to subsequent reads, so unlocking immediately can + # see fewer locks than were just acquired. Since locked requests are excluded from the queue head, + # poll `list_head` until the locked IDs disappear from it (best-effort mitigation of the race). + async def all_locks_visible() -> bool: + head = await maybe_await(rq_client.list_head(limit=5)) + assert isinstance(head, RequestQueueHead) + return locked_ids.isdisjoint(item.id for item in head.items) + + await poll_until_condition(all_locks_visible, timeout=30, poll_interval=1) # Unlock all requests unlock_response = await maybe_await(rq_client.unlock_requests()) From c5e26795de4e13febee02fb15e90fc0df9b77fd9 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 3 Jun 2026 12:23:07 +0200 Subject: [PATCH 3/6] test: bring back call_with_exp_backoff polling helper --- tests/integration/_utils.py | 51 ++++++++++++++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/tests/integration/_utils.py b/tests/integration/_utils.py index 186c1f7c..2d099bf8 100644 --- a/tests/integration/_utils.py +++ b/tests/integration/_utils.py @@ -2,6 +2,7 @@ import asyncio import inspect +import logging import secrets import string import time @@ -14,6 +15,8 @@ if TYPE_CHECKING: from collections.abc import Awaitable, Callable +logger = logging.getLogger(__name__) + # Environment variable names for test configuration TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN' TOKEN_ENV_VAR_2 = 'APIFY_TEST_USER_2_API_TOKEN' @@ -112,6 +115,51 @@ async def maybe_sleep(seconds: float, *, is_async: bool) -> None: time.sleep(seconds) # noqa: ASYNC251 +@overload +async def call_with_exp_backoff( + fn: Callable[[], Awaitable[T]], + condition: Callable[[T], bool] = ..., + *, + max_retries: int = ..., + base_delay: float = ..., +) -> T: ... +@overload +async def call_with_exp_backoff( + fn: Callable[[], T], + condition: Callable[[T], bool] = ..., + *, + max_retries: int = ..., + base_delay: float = ..., +) -> T: ... +async def call_with_exp_backoff( + fn: Callable[[], Awaitable[T] | T], + condition: Callable[[T], bool] = bool, + *, + max_retries: int = 5, + base_delay: float = 1.0, +) -> T: + """Call `fn`, retrying with exponential backoff until `condition(result)` is True. + + Calls `fn` and checks whether `condition` holds for its result. If it does not, `fn` is retried up to + `max_retries` times, sleeping `base_delay * 2 ** attempt` seconds before each retry. The last result is + returned regardless of whether the condition was ever satisfied, so the caller can run its own assertion. + + Use this instead of `poll_until_condition` when the wait time is highly variable (e.g. an Actor run + container starting up): the growing delay covers a long horizon with few calls. + """ + result = await maybe_await(fn()) + for attempt in range(max_retries): + if condition(result): + return result + delay = base_delay * 2**attempt + logger.info( + 'Condition not met for %r, retrying in %ss (attempt %d/%d).', result, delay, attempt + 1, max_retries + ) + await asyncio.sleep(delay) + result = await maybe_await(fn()) + return result + + @overload async def poll_until_condition( fn: Callable[[], Awaitable[T]], @@ -142,7 +190,8 @@ async def poll_until_condition( assertion. The default condition checks for a truthy result. Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent state (e.g. a freshly - created resource appearing in a listing) that may take a variable amount of time to propagate. + created resource appearing in a listing) that may take a variable amount of time to propagate. Unlike + `call_with_exp_backoff`, the interval between polls stays constant. """ deadline = time.monotonic() + timeout result = await maybe_await(fn()) From a071daceebfbc31b294406e75900783b2a0a789e Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 3 Jun 2026 12:23:15 +0200 Subject: [PATCH 4/6] test: replace fixed sleeps and hand-rolled loops with polling helpers --- tests/integration/test_dataset.py | 136 ++++++++++---- tests/integration/test_key_value_store.py | 135 ++++++++------ tests/integration/test_request_queue.py | 215 +++++++++------------- tests/integration/test_run.py | 21 ++- 4 files changed, 272 insertions(+), 235 deletions(-) diff --git a/tests/integration/test_dataset.py b/tests/integration/test_dataset.py index c628ac99..8d522689 100644 --- a/tests/integration/test_dataset.py +++ b/tests/integration/test_dataset.py @@ -16,7 +16,7 @@ collect_iterate_until_present, get_random_resource_name, maybe_await, - maybe_sleep, + poll_until_condition, ) from apify_client._models import Dataset, DatasetListItem, DatasetStatistics, ListOfDatasets from apify_client._resource_clients.dataset import DatasetItemsPage @@ -238,7 +238,7 @@ async def test_dataset_update(client: ApifyClient | ApifyClientAsync) -> None: await maybe_await(dataset_client.delete()) -async def test_dataset_push_and_list_items(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_dataset_push_and_list_items(client: ApifyClient | ApifyClientAsync) -> None: """Test pushing items to dataset and listing them.""" dataset_name = get_random_resource_name('dataset') @@ -255,12 +255,13 @@ async def test_dataset_push_and_list_items(client: ApifyClient | ApifyClientAsyn ] await maybe_await(dataset_client.push_items(items_to_push)) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until all items are visible (eventual consistency) + async def get_items() -> DatasetItemsPage: + page = await maybe_await(dataset_client.list_items()) + assert isinstance(page, DatasetItemsPage) + return page - # List items - items_page = await maybe_await(dataset_client.list_items()) - assert isinstance(items_page, DatasetItemsPage) + items_page = await poll_until_condition(get_items, lambda page: len(page.items) == 3) assert len(items_page.items) == 3 assert items_page.count == 3 # Note: items_page.total may be 0 immediately after push due to eventual consistency @@ -274,7 +275,7 @@ async def test_dataset_push_and_list_items(client: ApifyClient | ApifyClientAsyn await maybe_await(dataset_client.delete()) -async def test_dataset_list_items_with_pagination(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_dataset_list_items_with_pagination(client: ApifyClient | ApifyClientAsync) -> None: """Test listing items with pagination parameters.""" dataset_name = get_random_resource_name('dataset') @@ -287,8 +288,13 @@ async def test_dataset_list_items_with_pagination(client: ApifyClient | ApifyCli items_to_push = [{'index': i, 'value': i * 10} for i in range(10)] await maybe_await(dataset_client.push_items(items_to_push)) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until all items are visible (eventual consistency) + async def get_items() -> DatasetItemsPage: + page = await maybe_await(dataset_client.list_items()) + assert isinstance(page, DatasetItemsPage) + return page + + await poll_until_condition(get_items, lambda page: len(page.items) == 10) # List with limit items_page = await maybe_await(dataset_client.list_items(limit=5)) @@ -311,7 +317,7 @@ async def test_dataset_list_items_with_pagination(client: ApifyClient | ApifyCli await maybe_await(dataset_client.delete()) -async def test_dataset_list_items_with_fields(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_dataset_list_items_with_fields(client: ApifyClient | ApifyClientAsync) -> None: """Test listing items with field filtering.""" dataset_name = get_random_resource_name('dataset') @@ -327,12 +333,13 @@ async def test_dataset_list_items_with_fields(client: ApifyClient | ApifyClientA ] await maybe_await(dataset_client.push_items(items_to_push)) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until all items are visible (eventual consistency), listing with the fields filter + async def get_items() -> DatasetItemsPage: + page = await maybe_await(dataset_client.list_items(fields=['id', 'name'])) + assert isinstance(page, DatasetItemsPage) + return page - # List with fields filter - items_page = await maybe_await(dataset_client.list_items(fields=['id', 'name'])) - assert isinstance(items_page, DatasetItemsPage) + items_page = await poll_until_condition(get_items, lambda page: len(page.items) == 2) assert len(items_page.items) == 2 # Verify only specified fields are returned @@ -358,8 +365,13 @@ async def test_dataset_iterate_items(client: ApifyClient | ApifyClientAsync, *, items_to_push = [{'index': i} for i in range(5)] await maybe_await(dataset_client.push_items(items_to_push)) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until all items are visible (eventual consistency) + async def get_items() -> DatasetItemsPage: + page = await maybe_await(dataset_client.list_items()) + assert isinstance(page, DatasetItemsPage) + return page + + await poll_until_condition(get_items, lambda page: len(page.items) == 5) # Iterate over items iterator = dataset_client.iterate_items() @@ -398,7 +410,7 @@ async def test_dataset_delete_nonexistent(client: ApifyClient | ApifyClientAsync assert retrieved_dataset is None -async def test_dataset_get_statistics(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_dataset_get_statistics(client: ApifyClient | ApifyClientAsync) -> None: """Test getting dataset statistics.""" dataset_name = get_random_resource_name('dataset') @@ -414,8 +426,13 @@ async def test_dataset_get_statistics(client: ApifyClient | ApifyClientAsync, *, ] await maybe_await(dataset_client.push_items(items_to_push)) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until all items are visible (eventual consistency) + async def get_items() -> DatasetItemsPage: + page = await maybe_await(dataset_client.list_items()) + assert isinstance(page, DatasetItemsPage) + return page + + await poll_until_condition(get_items, lambda page: len(page.items) == 2) # Get statistics statistics = await maybe_await(dataset_client.get_statistics()) @@ -452,7 +469,7 @@ async def test_dataset_collection_iterate(client: ApifyClient | ApifyClientAsync await maybe_await(client.dataset(ds_id).delete()) -async def test_dataset_list_items_desc(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_dataset_list_items_desc(client: ApifyClient | ApifyClientAsync) -> None: """Test listing items in descending order.""" dataset_name = get_random_resource_name('dataset') created_dataset = await maybe_await(client.datasets().get_or_create(name=dataset_name)) @@ -462,7 +479,14 @@ async def test_dataset_list_items_desc(client: ApifyClient | ApifyClientAsync, * try: items_to_push = [{'idx': i} for i in range(5)] await maybe_await(dataset_client.push_items(items_to_push)) - await maybe_sleep(1, is_async=is_async) + + # Poll until all items are visible (eventual consistency) + async def get_items() -> DatasetItemsPage: + page = await maybe_await(dataset_client.list_items()) + assert isinstance(page, DatasetItemsPage) + return page + + await poll_until_condition(get_items, lambda page: len(page.items) == 5) # Default ordering - ascending page_asc = await maybe_await(dataset_client.list_items()) @@ -478,7 +502,7 @@ async def test_dataset_list_items_desc(client: ApifyClient | ApifyClientAsync, * await maybe_await(dataset_client.delete()) -async def test_dataset_list_items_omit_and_clean(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_dataset_list_items_omit_and_clean(client: ApifyClient | ApifyClientAsync) -> None: """Test list_items with `omit`, `clean`, `skip_hidden`, and `skip_empty` filters.""" dataset_name = get_random_resource_name('dataset') created_dataset = await maybe_await(client.datasets().get_or_create(name=dataset_name)) @@ -493,7 +517,14 @@ async def test_dataset_list_items_omit_and_clean(client: ApifyClient | ApifyClie {'id': 2, 'name': 'also visible', '#secret': 'shh', 'extra': 'Y'}, ] await maybe_await(dataset_client.push_items(items_to_push)) - await maybe_sleep(1, is_async=is_async) + + # Poll until all items are visible (eventual consistency) + async def get_items() -> DatasetItemsPage: + page = await maybe_await(dataset_client.list_items()) + assert isinstance(page, DatasetItemsPage) + return page + + await poll_until_condition(get_items, lambda page: len(page.items) == len(items_to_push)) # `omit` should remove the `extra` field omit_page = await maybe_await(dataset_client.list_items(omit=['extra'])) @@ -534,14 +565,13 @@ async def test_dataset_iterate_items_chunked(client: ApifyClient | ApifyClientAs items_to_push = [{'idx': i} for i in range(12)] await maybe_await(dataset_client.push_items(items_to_push)) - # Poll until all items are visible (eventual consistency); 12 items + 3 paginated reads - # is more demanding than other dataset tests, so a single 1s sleep is not safe. - for _ in range(5): - await maybe_sleep(1, is_async=is_async) - head = await maybe_await(dataset_client.list_items(limit=12)) - assert isinstance(head, DatasetItemsPage) - if len(head.items) == 12: - break + # Poll until all 12 items are visible (eventual consistency) so the chunked iteration sees every page + async def get_items() -> DatasetItemsPage: + page = await maybe_await(dataset_client.list_items(limit=12)) + assert isinstance(page, DatasetItemsPage) + return page + + await poll_until_condition(get_items, lambda page: len(page.items) == 12) # chunk_size=5 forces 3 underlying pages for 12 items iterator = dataset_client.iterate_items(chunk_size=5) @@ -575,7 +605,14 @@ async def test_dataset_iterate_items_with_fields(client: ApifyClient | ApifyClie try: items_to_push = [{'id': i, 'name': f'item-{i}', 'extra': 'drop-me'} for i in range(3)] await maybe_await(dataset_client.push_items(items_to_push)) - await maybe_sleep(1, is_async=is_async) + + # Poll until all items are visible (eventual consistency) + async def get_items() -> DatasetItemsPage: + page = await maybe_await(dataset_client.list_items()) + assert isinstance(page, DatasetItemsPage) + return page + + await poll_until_condition(get_items, lambda page: len(page.items) == 3) iterator = dataset_client.iterate_items(fields=['id', 'name']) collected: list[dict] = [] @@ -597,7 +634,7 @@ async def test_dataset_iterate_items_with_fields(client: ApifyClient | ApifyClie await maybe_await(dataset_client.delete()) -async def test_dataset_create_items_public_url(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_dataset_create_items_public_url(client: ApifyClient | ApifyClientAsync) -> None: """Test generating a signed public URL for dataset items and fetching from it.""" dataset_name = get_random_resource_name('dataset') created_dataset = await maybe_await(client.datasets().get_or_create(name=dataset_name)) @@ -607,7 +644,14 @@ async def test_dataset_create_items_public_url(client: ApifyClient | ApifyClient try: items = [{'id': i, 'value': i * 10} for i in range(3)] await maybe_await(dataset_client.push_items(items)) - await maybe_sleep(1, is_async=is_async) + + # Poll until all items are visible (eventual consistency) so the public URL serves all of them + async def get_items() -> DatasetItemsPage: + page = await maybe_await(dataset_client.list_items()) + assert isinstance(page, DatasetItemsPage) + return page + + await poll_until_condition(get_items, lambda page: len(page.items) == 3) public_url = await maybe_await(dataset_client.create_items_public_url(expires_in=timedelta(minutes=5))) assert isinstance(public_url, str) @@ -623,7 +667,7 @@ async def test_dataset_create_items_public_url(client: ApifyClient | ApifyClient await maybe_await(dataset_client.delete()) -async def test_dataset_get_items_as_bytes_csv(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_dataset_get_items_as_bytes_csv(client: ApifyClient | ApifyClientAsync) -> None: """Test get_items_as_bytes with non-JSON item_format (csv).""" dataset_name = get_random_resource_name('dataset') created_dataset = await maybe_await(client.datasets().get_or_create(name=dataset_name)) @@ -633,7 +677,14 @@ async def test_dataset_get_items_as_bytes_csv(client: ApifyClient | ApifyClientA try: items = [{'id': 1, 'name': 'first'}, {'id': 2, 'name': 'second'}] await maybe_await(dataset_client.push_items(items)) - await maybe_sleep(1, is_async=is_async) + + # Poll until all items are visible (eventual consistency) + async def get_items() -> DatasetItemsPage: + page = await maybe_await(dataset_client.list_items()) + assert isinstance(page, DatasetItemsPage) + return page + + await poll_until_condition(get_items, lambda page: len(page.items) == 2) raw = await maybe_await(dataset_client.get_items_as_bytes(item_format='csv')) assert isinstance(raw, bytes) @@ -663,8 +714,13 @@ async def test_dataset_stream_items(client: ApifyClient | ApifyClientAsync, *, i ] await maybe_await(dataset_client.push_items(items_to_push)) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until all items are visible (eventual consistency) + async def get_items() -> DatasetItemsPage: + page = await maybe_await(dataset_client.list_items()) + assert isinstance(page, DatasetItemsPage) + return page + + await poll_until_condition(get_items, lambda page: len(page.items) == 3) # Stream items using context manager stream_ctx = dataset_client.stream_items(item_format='json') diff --git a/tests/integration/test_key_value_store.py b/tests/integration/test_key_value_store.py index 37557ecb..712c6eeb 100644 --- a/tests/integration/test_key_value_store.py +++ b/tests/integration/test_key_value_store.py @@ -16,6 +16,7 @@ get_random_resource_name, maybe_await, maybe_sleep, + poll_until_condition, ) from apify_client._models import KeyValueStore, KeyValueStoreKey, ListOfKeys, ListOfKeyValueStores from apify_client.errors import ApifyApiError @@ -254,7 +255,7 @@ async def test_key_value_store_update(client: ApifyClient | ApifyClientAsync) -> await maybe_await(store_client.delete()) -async def test_key_value_store_set_and_get_record(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_key_value_store_set_and_get_record(client: ApifyClient | ApifyClientAsync) -> None: """Test setting and getting records from key-value store.""" store_name = get_random_resource_name('kvs') @@ -267,11 +268,11 @@ async def test_key_value_store_set_and_get_record(client: ApifyClient | ApifyCli test_value = {'name': 'Test Item', 'value': 123, 'nested': {'data': 'value'}} await maybe_await(store_client.set_record('test-key', test_value)) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until the record is visible (eventual consistency) + async def get_record() -> dict | None: + return await maybe_await(store_client.get_record('test-key')) - # Get the record - record = await maybe_await(store_client.get_record('test-key')) + record = await poll_until_condition(get_record, lambda record: record is not None) assert isinstance(record, dict) assert record['key'] == 'test-key' assert record['value'] == test_value @@ -280,9 +281,7 @@ async def test_key_value_store_set_and_get_record(client: ApifyClient | ApifyCli await maybe_await(store_client.delete()) -async def test_key_value_store_set_and_get_text_record( - client: ApifyClient | ApifyClientAsync, *, is_async: bool -) -> None: +async def test_key_value_store_set_and_get_text_record(client: ApifyClient | ApifyClientAsync) -> None: """Test setting and getting text records.""" store_name = get_random_resource_name('kvs') @@ -295,11 +294,11 @@ async def test_key_value_store_set_and_get_text_record( test_text = 'Hello, this is a test text!' await maybe_await(store_client.set_record('text-key', test_text, content_type='text/plain')) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until the record is visible (eventual consistency) + async def get_record() -> dict | None: + return await maybe_await(store_client.get_record('text-key')) - # Get the record - record = await maybe_await(store_client.get_record('text-key')) + record = await poll_until_condition(get_record, lambda record: record is not None) assert isinstance(record, dict) assert record['key'] == 'text-key' assert record['value'] == test_text @@ -308,7 +307,7 @@ async def test_key_value_store_set_and_get_text_record( await maybe_await(store_client.delete()) -async def test_key_value_store_list_keys(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_key_value_store_list_keys(client: ApifyClient | ApifyClientAsync) -> None: """Test listing keys in the key-value store.""" store_name = get_random_resource_name('kvs') @@ -321,12 +320,13 @@ async def test_key_value_store_list_keys(client: ApifyClient | ApifyClientAsync, for i in range(5): await maybe_await(store_client.set_record(f'key-{i}', {'index': i})) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until all keys are visible (eventual consistency) + async def get_keys() -> ListOfKeys: + keys = await maybe_await(store_client.list_keys()) + assert isinstance(keys, ListOfKeys) + return keys - # List keys - keys_response = await maybe_await(store_client.list_keys()) - assert isinstance(keys_response, ListOfKeys) + keys_response = await poll_until_condition(get_keys, lambda keys: len(keys.items) == 5) assert len(keys_response.items) == 5 # Verify key names @@ -337,7 +337,7 @@ async def test_key_value_store_list_keys(client: ApifyClient | ApifyClientAsync, await maybe_await(store_client.delete()) -async def test_key_value_store_list_keys_with_limit(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_key_value_store_list_keys_with_limit(client: ApifyClient | ApifyClientAsync) -> None: """Test listing keys with limit parameter.""" store_name = get_random_resource_name('kvs') @@ -350,18 +350,19 @@ async def test_key_value_store_list_keys_with_limit(client: ApifyClient | ApifyC for i in range(10): await maybe_await(store_client.set_record(f'item-{i:02d}', {'index': i})) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until enough keys are visible (eventual consistency), listing with the limit applied + async def get_keys() -> ListOfKeys: + keys = await maybe_await(store_client.list_keys(limit=5)) + assert isinstance(keys, ListOfKeys) + return keys - # List with limit - keys_response = await maybe_await(store_client.list_keys(limit=5)) - assert isinstance(keys_response, ListOfKeys) + keys_response = await poll_until_condition(get_keys, lambda keys: len(keys.items) == 5) assert len(keys_response.items) == 5 finally: await maybe_await(store_client.delete()) -async def test_key_value_store_record_exists(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_key_value_store_record_exists(client: ApifyClient | ApifyClientAsync) -> None: """Test checking if a record exists.""" store_name = get_random_resource_name('kvs') @@ -373,11 +374,13 @@ async def test_key_value_store_record_exists(client: ApifyClient | ApifyClientAs # Set a record await maybe_await(store_client.set_record('exists-key', {'data': 'value'})) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until the record is visible (eventual consistency) + async def added_record_exists() -> bool: + exists = await maybe_await(store_client.record_exists('exists-key')) + assert isinstance(exists, bool) + return exists - # Check existence - exists = await maybe_await(store_client.record_exists('exists-key')) + exists = await poll_until_condition(added_record_exists) assert exists is True exists = await maybe_await(store_client.record_exists('non-existent-key')) assert exists is False @@ -385,7 +388,7 @@ async def test_key_value_store_record_exists(client: ApifyClient | ApifyClientAs await maybe_await(store_client.delete()) -async def test_key_value_store_delete_record(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_key_value_store_delete_record(client: ApifyClient | ApifyClientAsync) -> None: """Test deleting a record from the store.""" store_name = get_random_resource_name('kvs') @@ -397,21 +400,18 @@ async def test_key_value_store_delete_record(client: ApifyClient | ApifyClientAs # Set a record await maybe_await(store_client.set_record('delete-me', {'data': 'value'})) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until the record is visible (eventual consistency) + async def get_record() -> dict | None: + return await maybe_await(store_client.get_record('delete-me')) - # Verify it exists - record = await maybe_await(store_client.get_record('delete-me')) + record = await poll_until_condition(get_record, lambda record: record is not None) assert record is not None # Delete the record await maybe_await(store_client.delete_record('delete-me')) - # Wait briefly - await maybe_sleep(1, is_async=is_async) - - # Verify it's gone - record = await maybe_await(store_client.get_record('delete-me')) + # Poll until the deletion is reflected (eventual consistency) + record = await poll_until_condition(get_record, lambda record: record is None) assert record is None finally: await maybe_await(store_client.delete()) @@ -575,9 +575,7 @@ async def test_key_value_store_collection_iterate(client: ApifyClient | ApifyCli await maybe_await(client.key_value_store(kvs_id).delete()) -async def test_key_value_store_set_and_get_binary_record( - client: ApifyClient | ApifyClientAsync, *, is_async: bool -) -> None: +async def test_key_value_store_set_and_get_binary_record(client: ApifyClient | ApifyClientAsync) -> None: """Test setting and retrieving a binary (bytes) record.""" store_name = get_random_resource_name('kvs') created_store = await maybe_await(client.key_value_stores().get_or_create(name=store_name)) @@ -588,10 +586,13 @@ async def test_key_value_store_set_and_get_binary_record( # Store an explicit bytes value with a binary content type binary_value = b'\x89PNG\r\n\x1a\n' + b'fake-png-bytes' await maybe_await(store_client.set_record('image.png', binary_value, content_type='image/png')) - await maybe_sleep(1, is_async=is_async) + # Poll until the record is visible (eventual consistency); # get_record_as_bytes returns raw bytes (no auto-decoding) - record = await maybe_await(store_client.get_record_as_bytes('image.png')) + async def get_record() -> dict | None: + return await maybe_await(store_client.get_record_as_bytes('image.png')) + + record = await poll_until_condition(get_record, lambda record: record is not None) assert isinstance(record, dict) assert record['key'] == 'image.png' assert record['value'] == binary_value @@ -600,7 +601,7 @@ async def test_key_value_store_set_and_get_binary_record( await maybe_await(store_client.delete()) -async def test_key_value_store_get_record_public_url(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_key_value_store_get_record_public_url(client: ApifyClient | ApifyClientAsync) -> None: """Test get_record_public_url returns a working signed URL.""" store_name = get_random_resource_name('kvs') created_store = await maybe_await(client.key_value_stores().get_or_create(name=store_name)) @@ -609,7 +610,14 @@ async def test_key_value_store_get_record_public_url(client: ApifyClient | Apify try: await maybe_await(store_client.set_record('my-record', {'hello': 'world'})) - await maybe_sleep(1, is_async=is_async) + + # Poll until the record is visible (eventual consistency) so the public URL serves it + async def added_record_exists() -> bool: + exists = await maybe_await(store_client.record_exists('my-record')) + assert isinstance(exists, bool) + return exists + + await poll_until_condition(added_record_exists) public_url = await maybe_await(store_client.get_record_public_url('my-record')) assert isinstance(public_url, str) @@ -624,9 +632,7 @@ async def test_key_value_store_get_record_public_url(client: ApifyClient | Apify await maybe_await(store_client.delete()) -async def test_key_value_store_create_keys_public_url( - client: ApifyClient | ApifyClientAsync, *, is_async: bool -) -> None: +async def test_key_value_store_create_keys_public_url(client: ApifyClient | ApifyClientAsync) -> None: """Test create_keys_public_url returns a working signed URL for listing keys.""" store_name = get_random_resource_name('kvs') created_store = await maybe_await(client.key_value_stores().get_or_create(name=store_name)) @@ -636,7 +642,14 @@ async def test_key_value_store_create_keys_public_url( try: for i in range(3): await maybe_await(store_client.set_record(f'key-{i}', {'idx': i})) - await maybe_sleep(1, is_async=is_async) + + # Poll until all keys are visible (eventual consistency) so the public URL lists all of them + async def get_keys() -> ListOfKeys: + keys = await maybe_await(store_client.list_keys()) + assert isinstance(keys, ListOfKeys) + return keys + + await poll_until_condition(get_keys, lambda keys: len(keys.items) == 3) public_url = await maybe_await(store_client.create_keys_public_url(limit=10, expires_in=timedelta(minutes=5))) assert isinstance(public_url, str) @@ -664,7 +677,14 @@ async def test_key_value_store_stream_record_own(client: ApifyClient | ApifyClie try: await maybe_await(store_client.set_record('stream-key', {'data': 'streamed'})) - await maybe_sleep(1, is_async=is_async) + + # Poll until the record is visible (eventual consistency) before streaming it + async def added_record_exists() -> bool: + exists = await maybe_await(store_client.record_exists('stream-key')) + assert isinstance(exists, bool) + return exists + + await poll_until_condition(added_record_exists) if is_async: async with store_client.stream_record('stream-key') as stream: # ty: ignore[invalid-context-manager] @@ -680,9 +700,7 @@ async def test_key_value_store_stream_record_own(client: ApifyClient | ApifyClie await maybe_await(store_client.delete()) -async def test_key_value_store_list_keys_with_exclusive_start_key( - client: ApifyClient | ApifyClientAsync, *, is_async: bool -) -> None: +async def test_key_value_store_list_keys_with_exclusive_start_key(client: ApifyClient | ApifyClientAsync) -> None: """Test listing keys with the exclusive_start_key cursor parameter.""" store_name = get_random_resource_name('kvs') created_store = await maybe_await(client.key_value_stores().get_or_create(name=store_name)) @@ -693,7 +711,14 @@ async def test_key_value_store_list_keys_with_exclusive_start_key( # Use zero-padded names so lexicographic order is predictable for i in range(5): await maybe_await(store_client.set_record(f'key-{i:02d}', {'idx': i})) - await maybe_sleep(1, is_async=is_async) + + # Poll until all keys are visible (eventual consistency) so pagination is exercised, not truncated + async def get_keys() -> ListOfKeys: + keys = await maybe_await(store_client.list_keys()) + assert isinstance(keys, ListOfKeys) + return keys + + await poll_until_condition(get_keys, lambda keys: len(keys.items) == 5) # First page first_page = await maybe_await(store_client.list_keys(limit=2)) diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 1f773d20..9d7c79ae 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -11,7 +11,6 @@ get_random_resource_name, get_random_string, maybe_await, - maybe_sleep, poll_until_condition, ) from apify_client._models import ( @@ -44,23 +43,19 @@ async def ensure_queue_is_populated( rq_client: RequestQueueClient | RequestQueueClientAsync, *, expected_count: int, - is_async: bool, ) -> None: """Poll the queue until `expected_count` requests are visible. Uses `list_head` (without side effects) so polling does not lock items, which would otherwise lead to an ambiguous count of actually-locked requests in tests that exercise locking. """ - head_response: RequestQueueHead | None = None - for _ in range(5): - await maybe_sleep(1, is_async=is_async) - result = await maybe_await(rq_client.list_head(limit=expected_count)) - assert isinstance(result, RequestQueueHead) - head_response = result - if len(head_response.items) == expected_count: - break - - assert head_response is not None + + async def get_head() -> RequestQueueHead: + head = await maybe_await(rq_client.list_head(limit=expected_count)) + assert isinstance(head, RequestQueueHead) + return head + + head_response = await poll_until_condition(get_head, lambda head: len(head.items) == expected_count) assert len(head_response.items) == expected_count @@ -99,7 +94,7 @@ async def test_request_queue_collection_get_or_create(client: ApifyClient | Apif await maybe_await(client.request_queue(rq.id).delete()) -async def test_request_queue_lock(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_request_queue_lock(client: ApifyClient | ApifyClientAsync) -> None: created_rq = await maybe_await(client.request_queues().get_or_create(name=get_random_resource_name('queue'))) assert isinstance(created_rq, RequestQueue) rq = client.request_queue(created_rq.id, client_key=get_random_string(10)) @@ -112,16 +107,12 @@ async def test_request_queue_lock(client: ApifyClient | ApifyClientAsync, *, is_ ) # Poll until all requests are available for locking (eventual consistency) - get_head_and_lock_response: LockedRequestQueueHead | None = None - for _ in range(5): - await maybe_sleep(1, is_async=is_async) - result = await maybe_await(rq.list_and_lock_head(limit=10, lock_duration=timedelta(seconds=10))) - assert isinstance(result, LockedRequestQueueHead) - get_head_and_lock_response = result - if len(get_head_and_lock_response.items) == 10: - break - - assert get_head_and_lock_response is not None + async def lock_head() -> LockedRequestQueueHead: + head = await maybe_await(rq.list_and_lock_head(limit=10, lock_duration=timedelta(seconds=10))) + assert isinstance(head, LockedRequestQueueHead) + return head + + get_head_and_lock_response = await poll_until_condition(lock_head, lambda head: len(head.items) == 10) assert len(get_head_and_lock_response.items) == 10 for locked_request in get_head_and_lock_response.items: @@ -203,7 +194,7 @@ async def test_request_queue_update(client: ApifyClient | ApifyClientAsync) -> N await maybe_await(rq_client.delete()) -async def test_request_queue_add_and_get_request(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_request_queue_add_and_get_request(client: ApifyClient | ApifyClientAsync) -> None: """Test adding and getting a request from the queue.""" rq_name = get_random_resource_name('queue') @@ -223,11 +214,11 @@ async def test_request_queue_add_and_get_request(client: ApifyClient | ApifyClie assert add_result.request_id is not None assert add_result.was_already_present is False - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until the request is visible (eventual consistency) + async def get_added_request() -> Request | None: + return await maybe_await(rq_client.get_request(add_result.request_id)) - # Get the request - request = await maybe_await(rq_client.get_request(add_result.request_id)) + request = await poll_until_condition(get_added_request, lambda request: request is not None) assert isinstance(request, Request) assert str(request.url) == 'https://example.com/test' assert request.unique_key == 'test-key-1' @@ -235,7 +226,7 @@ async def test_request_queue_add_and_get_request(client: ApifyClient | ApifyClie await maybe_await(rq_client.delete()) -async def test_request_queue_list_head(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_request_queue_list_head(client: ApifyClient | ApifyClientAsync) -> None: """Test listing requests from the head of the queue.""" rq_name = get_random_resource_name('queue') @@ -251,22 +242,18 @@ async def test_request_queue_list_head(client: ApifyClient | ApifyClientAsync, * ) # Poll until requests are available (eventual consistency) - head_response: RequestQueueHead | None = None - for _ in range(5): - await maybe_sleep(1, is_async=is_async) - result = await maybe_await(rq_client.list_head(limit=3)) - assert isinstance(result, RequestQueueHead) - head_response = result - if len(head_response.items) == 3: - break - - assert head_response is not None + async def get_head() -> RequestQueueHead: + head = await maybe_await(rq_client.list_head(limit=3)) + assert isinstance(head, RequestQueueHead) + return head + + head_response = await poll_until_condition(get_head, lambda head: len(head.items) == 3) assert len(head_response.items) == 3 finally: await maybe_await(rq_client.delete()) -async def test_request_queue_list_requests(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_request_queue_list_requests(client: ApifyClient | ApifyClientAsync) -> None: """Test listing all requests in the queue.""" rq_name = get_random_resource_name('queue') @@ -282,22 +269,18 @@ async def test_request_queue_list_requests(client: ApifyClient | ApifyClientAsyn ) # Poll until all requests are available (eventual consistency) - list_response: ListOfRequests | None = None - for _ in range(5): - await maybe_sleep(1, is_async=is_async) - result = await maybe_await(rq_client.list_requests()) - assert isinstance(result, ListOfRequests) - list_response = result - if len(list_response.items) == 5: - break - - assert list_response is not None + async def get_requests() -> ListOfRequests: + response = await maybe_await(rq_client.list_requests()) + assert isinstance(response, ListOfRequests) + return response + + list_response = await poll_until_condition(get_requests, lambda response: len(response.items) == 5) assert len(list_response.items) == 5 finally: await maybe_await(rq_client.delete()) -async def test_request_queue_delete_request(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_request_queue_delete_request(client: ApifyClient | ApifyClientAsync) -> None: """Test deleting a request from the queue.""" rq_name = get_random_resource_name('queue') @@ -312,27 +295,24 @@ async def test_request_queue_delete_request(client: ApifyClient | ApifyClientAsy ) assert isinstance(add_result, RequestRegistration) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until the request is visible (eventual consistency) + async def get_added_request() -> Request | None: + return await maybe_await(rq_client.get_request(add_result.request_id)) - # Verify it exists - request = await maybe_await(rq_client.get_request(add_result.request_id)) + request = await poll_until_condition(get_added_request, lambda request: request is not None) assert request is not None # Delete the request await maybe_await(rq_client.delete_request(add_result.request_id)) - # Wait briefly - await maybe_sleep(1, is_async=is_async) - - # Verify it's gone - deleted_request = await maybe_await(rq_client.get_request(add_result.request_id)) + # Poll until the deletion is reflected (eventual consistency) + deleted_request = await poll_until_condition(get_added_request, lambda request: request is None) assert deleted_request is None finally: await maybe_await(rq_client.delete()) -async def test_request_queue_batch_add_requests(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_request_queue_batch_add_requests(client: ApifyClient | ApifyClientAsync) -> None: """Test adding multiple requests in batch.""" rq_name = get_random_resource_name('queue') @@ -351,22 +331,18 @@ async def test_request_queue_batch_add_requests(client: ApifyClient | ApifyClien assert len(batch_response.unprocessed_requests) == 0 # Poll until all requests are available (eventual consistency) - list_response: ListOfRequests | None = None - for _ in range(5): - await maybe_sleep(1, is_async=is_async) - result = await maybe_await(rq_client.list_requests()) - assert isinstance(result, ListOfRequests) - list_response = result - if len(list_response.items) == 10: - break - - assert list_response is not None + async def get_requests() -> ListOfRequests: + response = await maybe_await(rq_client.list_requests()) + assert isinstance(response, ListOfRequests) + return response + + list_response = await poll_until_condition(get_requests, lambda response: len(response.items) == 10) assert len(list_response.items) == 10 finally: await maybe_await(rq_client.delete()) -async def test_request_queue_batch_delete_requests(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_request_queue_batch_delete_requests(client: ApifyClient | ApifyClientAsync) -> None: """Test deleting multiple requests in batch.""" rq_name = get_random_resource_name('queue') @@ -382,16 +358,12 @@ async def test_request_queue_batch_delete_requests(client: ApifyClient | ApifyCl ) # Poll until all requests are available (eventual consistency) - list_response: ListOfRequests | None = None - for _ in range(5): - await maybe_sleep(1, is_async=is_async) - result = await maybe_await(rq_client.list_requests()) - assert isinstance(result, ListOfRequests) - list_response = result - if len(list_response.items) == 10: - break - - assert list_response is not None + async def get_requests() -> ListOfRequests: + response = await maybe_await(rq_client.list_requests()) + assert isinstance(response, ListOfRequests) + return response + + list_response = await poll_until_condition(get_requests, lambda response: len(response.items) == 10) assert len(list_response.items) == 10 requests_to_delete: list[RequestDraftDeleteDict] = [] for item in list_response.items[:5]: @@ -404,16 +376,7 @@ async def test_request_queue_batch_delete_requests(client: ApifyClient | ApifyCl assert len(delete_response.processed_requests) == 5 # Poll until deletions are reflected (eventual consistency) - remaining: ListOfRequests | None = None - for _ in range(5): - await maybe_sleep(1, is_async=is_async) - result = await maybe_await(rq_client.list_requests()) - assert isinstance(result, ListOfRequests) - remaining = result - if len(remaining.items) == 5: - break - - assert remaining is not None + remaining = await poll_until_condition(get_requests, lambda response: len(response.items) == 5) assert len(remaining.items) == 5 finally: await maybe_await(rq_client.delete()) @@ -435,7 +398,7 @@ async def test_request_queue_delete_nonexistent(client: ApifyClient | ApifyClien assert retrieved_rq is None -async def test_request_queue_list_and_lock_head(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_request_queue_list_and_lock_head(client: ApifyClient | ApifyClientAsync) -> None: """Test locking requests from the head of the queue.""" rq_name = get_random_resource_name('queue') @@ -450,7 +413,7 @@ async def test_request_queue_list_and_lock_head(client: ApifyClient | ApifyClien rq_client.add_request({'url': f'https://example.com/lock-{i}', 'unique_key': f'lock-{i}'}) ) - await ensure_queue_is_populated(rq_client, expected_count=5, is_async=is_async) + await ensure_queue_is_populated(rq_client, expected_count=5) result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60))) assert isinstance(result, LockedRequestQueueHead) @@ -465,7 +428,7 @@ async def test_request_queue_list_and_lock_head(client: ApifyClient | ApifyClien await maybe_await(rq_client.delete()) -async def test_request_queue_prolong_request_lock(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_request_queue_prolong_request_lock(client: ApifyClient | ApifyClientAsync) -> None: """Test prolonging a request lock.""" rq_name = get_random_resource_name('queue') @@ -478,16 +441,12 @@ async def test_request_queue_prolong_request_lock(client: ApifyClient | ApifyCli await maybe_await(rq_client.add_request({'url': 'https://example.com/prolong', 'unique_key': 'prolong-test'})) # Poll until the request is available for locking (eventual consistency) - lock_response: LockedRequestQueueHead | None = None - for _ in range(5): - await maybe_sleep(1, is_async=is_async) - result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60))) - assert isinstance(result, LockedRequestQueueHead) - lock_response = result - if len(lock_response.items) == 1: - break - - assert lock_response is not None + async def lock_head() -> LockedRequestQueueHead: + head = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60))) + assert isinstance(head, LockedRequestQueueHead) + return head + + lock_response = await poll_until_condition(lock_head, lambda head: len(head.items) == 1) assert len(lock_response.items) == 1 locked_request = lock_response.items[0] original_lock_expires = locked_request.lock_expires_at @@ -503,7 +462,7 @@ async def test_request_queue_prolong_request_lock(client: ApifyClient | ApifyCli await maybe_await(rq_client.delete()) -async def test_request_queue_delete_request_lock(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_request_queue_delete_request_lock(client: ApifyClient | ApifyClientAsync) -> None: """Test deleting a request lock.""" rq_name = get_random_resource_name('queue') @@ -516,16 +475,12 @@ async def test_request_queue_delete_request_lock(client: ApifyClient | ApifyClie await maybe_await(rq_client.add_request({'url': 'https://example.com/unlock', 'unique_key': 'unlock-test'})) # Poll until the request is available for locking (eventual consistency) - lock_response: LockedRequestQueueHead | None = None - for _ in range(5): - await maybe_sleep(1, is_async=is_async) - result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60))) - assert isinstance(result, LockedRequestQueueHead) - lock_response = result - if len(lock_response.items) == 1: - break - - assert lock_response is not None + async def lock_head() -> LockedRequestQueueHead: + head = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60))) + assert isinstance(head, LockedRequestQueueHead) + return head + + lock_response = await poll_until_condition(lock_head, lambda head: len(head.items) == 1) assert len(lock_response.items) == 1 locked_request = lock_response.items[0] @@ -540,7 +495,7 @@ async def test_request_queue_delete_request_lock(client: ApifyClient | ApifyClie await maybe_await(rq_client.delete()) -async def test_request_queue_unlock_requests(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_request_queue_unlock_requests(client: ApifyClient | ApifyClientAsync) -> None: """Test unlocking all requests locked by the client.""" rq_name = get_random_resource_name('queue') @@ -555,7 +510,7 @@ async def test_request_queue_unlock_requests(client: ApifyClient | ApifyClientAs rq_client.add_request({'url': f'https://example.com/unlock-{i}', 'unique_key': f'unlock-{i}'}) ) - await ensure_queue_is_populated(rq_client, expected_count=5, is_async=is_async) + await ensure_queue_is_populated(rq_client, expected_count=5) result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60))) assert isinstance(result, LockedRequestQueueHead) @@ -581,7 +536,7 @@ async def all_locks_visible() -> bool: await maybe_await(rq_client.delete()) -async def test_request_queue_update_request(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_request_queue_update_request(client: ApifyClient | ApifyClientAsync) -> None: """Test updating a request in the queue.""" rq_name = get_random_resource_name('queue') @@ -600,11 +555,11 @@ async def test_request_queue_update_request(client: ApifyClient | ApifyClientAsy assert isinstance(add_result, RequestRegistration) assert add_result.request_id is not None - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until the request is visible (eventual consistency), then use its full data + async def get_added_request() -> Request | None: + return await maybe_await(rq_client.get_request(add_result.request_id)) - # Get the request to get its full data - original_request = await maybe_await(rq_client.get_request(add_result.request_id)) + original_request = await poll_until_condition(get_added_request, lambda request: request is not None) assert isinstance(original_request, Request) assert original_request.unique_key is not None @@ -662,7 +617,7 @@ async def test_request_queue_iterate_requests(client: ApifyClient | ApifyClientA added_urls.append(request_draft.url) # Wait until all 7 requests are indexed (eventual consistency) - await ensure_queue_is_populated(rq_client, expected_count=7, is_async=is_async) + await ensure_queue_is_populated(rq_client, expected_count=7) # Iterate with a small chunk so multiple pages are fetched iterator = rq_client.iterate_requests(chunk_size=3) @@ -686,9 +641,7 @@ async def test_request_queue_iterate_requests(client: ApifyClient | ApifyClientA await maybe_await(rq_client.delete()) -async def test_request_queue_list_requests_with_cursor( - client: ApifyClient | ApifyClientAsync, *, is_async: bool -) -> None: +async def test_request_queue_list_requests_with_cursor(client: ApifyClient | ApifyClientAsync) -> None: """Test list_requests pagination via limit and the opaque cursor token.""" rq = await maybe_await(client.request_queues().get_or_create(name=get_random_resource_name('rq'))) assert isinstance(rq, RequestQueue) @@ -701,7 +654,7 @@ async def test_request_queue_list_requests_with_cursor( ) # Wait for all 5 requests to be indexed so pagination is exercised, not truncated - await ensure_queue_is_populated(rq_client, expected_count=5, is_async=is_async) + await ensure_queue_is_populated(rq_client, expected_count=5) # First page first_page = await maybe_await(rq_client.list_requests(limit=2)) @@ -720,9 +673,7 @@ async def test_request_queue_list_requests_with_cursor( await maybe_await(rq_client.delete()) -async def test_request_queue_list_requests_with_filter( - client: ApifyClient | ApifyClientAsync, *, is_async: bool -) -> None: +async def test_request_queue_list_requests_with_filter(client: ApifyClient | ApifyClientAsync) -> None: """Test list_requests with the `filter` parameter (pending only).""" rq = await maybe_await(client.request_queues().get_or_create(name=get_random_resource_name('rq'))) assert isinstance(rq, RequestQueue) @@ -735,7 +686,7 @@ async def test_request_queue_list_requests_with_filter( ) # Wait for all 3 requests to be indexed before filtering - await ensure_queue_is_populated(rq_client, expected_count=3, is_async=is_async) + await ensure_queue_is_populated(rq_client, expected_count=3) # All three requests are pending - filter=['pending'] should return all of them. pending_page = await maybe_await(rq_client.list_requests(filter=['pending'])) diff --git a/tests/integration/test_run.py b/tests/integration/test_run.py index 44f1771e..612a249f 100644 --- a/tests/integration/test_run.py +++ b/tests/integration/test_run.py @@ -6,7 +6,7 @@ from datetime import UTC, datetime, timedelta from typing import TYPE_CHECKING -from ._utils import maybe_await, maybe_sleep +from ._utils import call_with_exp_backoff, maybe_await from apify_client._models import Dataset, KeyValueStore, ListOfRuns, RequestQueue, Run, RunShort from apify_client.errors import ApifyApiError @@ -279,7 +279,7 @@ async def test_run_runs_client(client: ApifyClient | ApifyClientAsync) -> None: assert first_run.act_id is not None -async def test_run_metamorph(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_run_metamorph(client: ApifyClient | ApifyClientAsync) -> None: """Test metamorphing a run into another Actor.""" # Start an actor that will run long enough to metamorph. We use hello-world and try to metamorph it into itself actor = client.actor(HELLO_WORLD_ACTOR) @@ -290,8 +290,11 @@ async def test_run_metamorph(client: ApifyClient | ApifyClientAsync, *, is_async run_client = client.run(run.id) try: - # Wait a bit for the run to start properly - await maybe_sleep(2, is_async=is_async) + # Wait until the run starts (leaves READY), with exponential backoff: container startup time varies widely + async def get_run() -> Run | None: + return await maybe_await(run_client.get()) + + await call_with_exp_backoff(get_run, lambda run: isinstance(run, Run) and run.status != 'READY') # Metamorph the run into the same actor (allowed) with new input # Note: hello-world may finish before we can metamorph, so we handle that case @@ -318,7 +321,7 @@ async def test_run_metamorph(client: ApifyClient | ApifyClientAsync, *, is_async await maybe_await(run_client.delete()) -async def test_run_reboot(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: +async def test_run_reboot(client: ApifyClient | ApifyClientAsync) -> None: """Test rebooting a running Actor.""" # Start an actor actor = client.actor(HELLO_WORLD_ACTOR) @@ -329,9 +332,11 @@ async def test_run_reboot(client: ApifyClient | ApifyClientAsync, *, is_async: b run_client = client.run(run.id) try: - # Wait a bit and check if the run is still running - await maybe_sleep(1, is_async=is_async) - current_run = await maybe_await(run_client.get()) + # Wait until the run starts (leaves READY), with exponential backoff: container startup time varies widely + async def get_run() -> Run | None: + return await maybe_await(run_client.get()) + + current_run = await call_with_exp_backoff(get_run, lambda run: isinstance(run, Run) and run.status != 'READY') # Only try to reboot if the run is still running # Note: There's a race condition - run may finish between check and reboot call From be4f57a7ff120e1436c056b840d0af19b05d2d13 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 3 Jun 2026 12:39:00 +0200 Subject: [PATCH 5/6] test: merge call_with_exp_backoff into poll_until_condition via backoff_factor --- tests/integration/_utils.py | 60 ++++++----------------------------- tests/integration/test_run.py | 10 ++++-- 2 files changed, 16 insertions(+), 54 deletions(-) diff --git a/tests/integration/_utils.py b/tests/integration/_utils.py index 2d099bf8..e17c21ae 100644 --- a/tests/integration/_utils.py +++ b/tests/integration/_utils.py @@ -2,7 +2,6 @@ import asyncio import inspect -import logging import secrets import string import time @@ -15,8 +14,6 @@ if TYPE_CHECKING: from collections.abc import Awaitable, Callable -logger = logging.getLogger(__name__) - # Environment variable names for test configuration TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN' TOKEN_ENV_VAR_2 = 'APIFY_TEST_USER_2_API_TOKEN' @@ -115,51 +112,6 @@ async def maybe_sleep(seconds: float, *, is_async: bool) -> None: time.sleep(seconds) # noqa: ASYNC251 -@overload -async def call_with_exp_backoff( - fn: Callable[[], Awaitable[T]], - condition: Callable[[T], bool] = ..., - *, - max_retries: int = ..., - base_delay: float = ..., -) -> T: ... -@overload -async def call_with_exp_backoff( - fn: Callable[[], T], - condition: Callable[[T], bool] = ..., - *, - max_retries: int = ..., - base_delay: float = ..., -) -> T: ... -async def call_with_exp_backoff( - fn: Callable[[], Awaitable[T] | T], - condition: Callable[[T], bool] = bool, - *, - max_retries: int = 5, - base_delay: float = 1.0, -) -> T: - """Call `fn`, retrying with exponential backoff until `condition(result)` is True. - - Calls `fn` and checks whether `condition` holds for its result. If it does not, `fn` is retried up to - `max_retries` times, sleeping `base_delay * 2 ** attempt` seconds before each retry. The last result is - returned regardless of whether the condition was ever satisfied, so the caller can run its own assertion. - - Use this instead of `poll_until_condition` when the wait time is highly variable (e.g. an Actor run - container starting up): the growing delay covers a long horizon with few calls. - """ - result = await maybe_await(fn()) - for attempt in range(max_retries): - if condition(result): - return result - delay = base_delay * 2**attempt - logger.info( - 'Condition not met for %r, retrying in %ss (attempt %d/%d).', result, delay, attempt + 1, max_retries - ) - await asyncio.sleep(delay) - result = await maybe_await(fn()) - return result - - @overload async def poll_until_condition( fn: Callable[[], Awaitable[T]], @@ -167,6 +119,7 @@ async def poll_until_condition( *, timeout: float = ..., poll_interval: float = ..., + backoff_factor: float = ..., ) -> T: ... @overload async def poll_until_condition( @@ -175,6 +128,7 @@ async def poll_until_condition( *, timeout: float = ..., poll_interval: float = ..., + backoff_factor: float = ..., ) -> T: ... async def poll_until_condition( fn: Callable[[], Awaitable[T] | T], @@ -182,6 +136,7 @@ async def poll_until_condition( *, timeout: float = 5, poll_interval: float = 1, + backoff_factor: float = 1, ) -> T: """Poll `fn` until `condition(result)` is True or the timeout expires. @@ -190,16 +145,19 @@ async def poll_until_condition( assertion. The default condition checks for a truthy result. Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent state (e.g. a freshly - created resource appearing in a listing) that may take a variable amount of time to propagate. Unlike - `call_with_exp_backoff`, the interval between polls stays constant. + created resource appearing in a listing) that may take a variable amount of time to propagate. For highly + variable wait times (e.g. an Actor run container starting up), pass `backoff_factor` > 1 to multiply the + interval after each poll, covering a long timeout with few calls. """ deadline = time.monotonic() + timeout + delay = poll_interval result = await maybe_await(fn()) while not condition(result): remaining = deadline - time.monotonic() if remaining <= 0: break - await asyncio.sleep(min(poll_interval, remaining)) + await asyncio.sleep(min(delay, remaining)) + delay *= backoff_factor result = await maybe_await(fn()) return result diff --git a/tests/integration/test_run.py b/tests/integration/test_run.py index 612a249f..c8ae2c26 100644 --- a/tests/integration/test_run.py +++ b/tests/integration/test_run.py @@ -6,7 +6,7 @@ from datetime import UTC, datetime, timedelta from typing import TYPE_CHECKING -from ._utils import call_with_exp_backoff, maybe_await +from ._utils import maybe_await, poll_until_condition from apify_client._models import Dataset, KeyValueStore, ListOfRuns, RequestQueue, Run, RunShort from apify_client.errors import ApifyApiError @@ -294,7 +294,9 @@ async def test_run_metamorph(client: ApifyClient | ApifyClientAsync) -> None: async def get_run() -> Run | None: return await maybe_await(run_client.get()) - await call_with_exp_backoff(get_run, lambda run: isinstance(run, Run) and run.status != 'READY') + await poll_until_condition( + get_run, lambda run: isinstance(run, Run) and run.status != 'READY', timeout=30, backoff_factor=2 + ) # Metamorph the run into the same actor (allowed) with new input # Note: hello-world may finish before we can metamorph, so we handle that case @@ -336,7 +338,9 @@ async def test_run_reboot(client: ApifyClient | ApifyClientAsync) -> None: async def get_run() -> Run | None: return await maybe_await(run_client.get()) - current_run = await call_with_exp_backoff(get_run, lambda run: isinstance(run, Run) and run.status != 'READY') + current_run = await poll_until_condition( + get_run, lambda run: isinstance(run, Run) and run.status != 'READY', timeout=30, backoff_factor=2 + ) # Only try to reboot if the run is still running # Note: There's a race condition - run may finish between check and reboot call From c856ab132b3c736d31b5b46c02a16d1f8eceb3ca Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 3 Jun 2026 12:50:43 +0200 Subject: [PATCH 6/6] test: move integration _utils.py to shared tests/_utils.py --- tests/__init__.py | 0 tests/{integration => }/_utils.py | 0 tests/integration/conftest.py | 2 +- tests/integration/test_actor.py | 2 +- tests/integration/test_actor_env_var.py | 2 +- tests/integration/test_actor_version.py | 2 +- tests/integration/test_apify_client.py | 2 +- tests/integration/test_build.py | 2 +- tests/integration/test_dataset.py | 2 +- tests/integration/test_key_value_store.py | 2 +- tests/integration/test_log.py | 2 +- tests/integration/test_request_queue.py | 2 +- tests/integration/test_run.py | 2 +- tests/integration/test_schedule.py | 2 +- tests/integration/test_store.py | 2 +- tests/integration/test_task.py | 2 +- tests/integration/test_user.py | 2 +- tests/integration/test_webhook.py | 2 +- tests/integration/test_webhook_dispatch.py | 2 +- 19 files changed, 17 insertions(+), 17 deletions(-) create mode 100644 tests/__init__.py rename tests/{integration => }/_utils.py (100%) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/_utils.py b/tests/_utils.py similarity index 100% rename from tests/integration/_utils.py rename to tests/_utils.py diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index f6bd2053..1cdb3689 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -6,7 +6,7 @@ import pytest -from ._utils import ( +from .._utils import ( API_URL_ENV_VAR, TOKEN_ENV_VAR, TOKEN_ENV_VAR_2, diff --git a/tests/integration/test_actor.py b/tests/integration/test_actor.py index ebe97479..ab271334 100644 --- a/tests/integration/test_actor.py +++ b/tests/integration/test_actor.py @@ -6,7 +6,7 @@ from datetime import UTC, datetime, timedelta from typing import TYPE_CHECKING -from ._utils import get_random_resource_name, maybe_await +from .._utils import get_random_resource_name, maybe_await from apify_client._models import ( Actor, ActorChargeEvent, diff --git a/tests/integration/test_actor_env_var.py b/tests/integration/test_actor_env_var.py index 50be746d..bd9cb6c9 100644 --- a/tests/integration/test_actor_env_var.py +++ b/tests/integration/test_actor_env_var.py @@ -5,7 +5,7 @@ from collections.abc import AsyncIterator, Iterator from typing import TYPE_CHECKING -from ._utils import get_random_resource_name, maybe_await +from .._utils import get_random_resource_name, maybe_await from apify_client._models import Actor, EnvVar, ListOfEnvVars if TYPE_CHECKING: diff --git a/tests/integration/test_actor_version.py b/tests/integration/test_actor_version.py index 98fe8f77..ed3a1bb5 100644 --- a/tests/integration/test_actor_version.py +++ b/tests/integration/test_actor_version.py @@ -5,7 +5,7 @@ from collections.abc import AsyncIterator, Iterator from typing import TYPE_CHECKING -from ._utils import get_random_resource_name, maybe_await +from .._utils import get_random_resource_name, maybe_await from apify_client._models import Actor, ListOfVersions, Version if TYPE_CHECKING: diff --git a/tests/integration/test_apify_client.py b/tests/integration/test_apify_client.py index 5ec8e940..126f40b3 100644 --- a/tests/integration/test_apify_client.py +++ b/tests/integration/test_apify_client.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING -from ._utils import maybe_await +from .._utils import maybe_await from apify_client._models import UserPrivateInfo, UserPublicInfo if TYPE_CHECKING: diff --git a/tests/integration/test_build.py b/tests/integration/test_build.py index 6d1ebe89..2f1bd1ce 100644 --- a/tests/integration/test_build.py +++ b/tests/integration/test_build.py @@ -6,7 +6,7 @@ from datetime import timedelta from typing import TYPE_CHECKING -from ._utils import get_random_resource_name, maybe_await +from .._utils import get_random_resource_name, maybe_await from apify_client._models import Actor, Build, BuildShort, ListOfBuilds if TYPE_CHECKING: diff --git a/tests/integration/test_dataset.py b/tests/integration/test_dataset.py index 8d522689..1bd9429b 100644 --- a/tests/integration/test_dataset.py +++ b/tests/integration/test_dataset.py @@ -11,7 +11,7 @@ import impit import pytest -from ._utils import ( +from .._utils import ( DatasetFixture, collect_iterate_until_present, get_random_resource_name, diff --git a/tests/integration/test_key_value_store.py b/tests/integration/test_key_value_store.py index 712c6eeb..6f4a561d 100644 --- a/tests/integration/test_key_value_store.py +++ b/tests/integration/test_key_value_store.py @@ -10,7 +10,7 @@ import impit import pytest -from ._utils import ( +from .._utils import ( KvsFixture, collect_iterate_until_present, get_random_resource_name, diff --git a/tests/integration/test_log.py b/tests/integration/test_log.py index 9b61ab3b..df687e2f 100644 --- a/tests/integration/test_log.py +++ b/tests/integration/test_log.py @@ -5,7 +5,7 @@ from contextlib import AbstractAsyncContextManager, AbstractContextManager from typing import TYPE_CHECKING -from ._utils import maybe_await +from .._utils import maybe_await from apify_client._models import ListOfBuilds, Run from apify_client.http_clients import HttpResponse diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 9d7c79ae..4ad0105f 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -6,7 +6,7 @@ from datetime import timedelta from typing import TYPE_CHECKING -from ._utils import ( +from .._utils import ( collect_iterate_until_present, get_random_resource_name, get_random_string, diff --git a/tests/integration/test_run.py b/tests/integration/test_run.py index c8ae2c26..5a3ecfac 100644 --- a/tests/integration/test_run.py +++ b/tests/integration/test_run.py @@ -6,7 +6,7 @@ from datetime import UTC, datetime, timedelta from typing import TYPE_CHECKING -from ._utils import maybe_await, poll_until_condition +from .._utils import maybe_await, poll_until_condition from apify_client._models import Dataset, KeyValueStore, ListOfRuns, RequestQueue, Run, RunShort from apify_client.errors import ApifyApiError diff --git a/tests/integration/test_schedule.py b/tests/integration/test_schedule.py index 11cfed8b..0d136a59 100644 --- a/tests/integration/test_schedule.py +++ b/tests/integration/test_schedule.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING -from ._utils import collect_iterate_until_present, get_random_resource_name, maybe_await +from .._utils import collect_iterate_until_present, get_random_resource_name, maybe_await from apify_client._models import Actor, ListOfSchedules, Schedule, ScheduleActionRunActor, ScheduleShort if TYPE_CHECKING: diff --git a/tests/integration/test_store.py b/tests/integration/test_store.py index 137b5552..dfacc87d 100644 --- a/tests/integration/test_store.py +++ b/tests/integration/test_store.py @@ -7,7 +7,7 @@ import pytest -from ._utils import maybe_await +from .._utils import maybe_await from apify_client._models import ListOfStoreActors, StoreListActor if TYPE_CHECKING: diff --git a/tests/integration/test_task.py b/tests/integration/test_task.py index d73845ec..78c3880b 100644 --- a/tests/integration/test_task.py +++ b/tests/integration/test_task.py @@ -6,7 +6,7 @@ from datetime import timedelta from typing import TYPE_CHECKING -from ._utils import collect_iterate_until_present, get_random_resource_name, maybe_await +from .._utils import collect_iterate_until_present, get_random_resource_name, maybe_await from apify_client._models import Actor, ListOfRuns, ListOfTasks, ListOfWebhooks, Run, RunShort, Task, TaskShort if TYPE_CHECKING: diff --git a/tests/integration/test_user.py b/tests/integration/test_user.py index 5abde190..1abd9a78 100644 --- a/tests/integration/test_user.py +++ b/tests/integration/test_user.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING -from ._utils import maybe_await +from .._utils import maybe_await from apify_client._models import AccountLimits, MonthlyUsage, UserPrivateInfo, UserPublicInfo from apify_client.errors import ApifyApiError diff --git a/tests/integration/test_webhook.py b/tests/integration/test_webhook.py index 044f65d2..35fd30e0 100644 --- a/tests/integration/test_webhook.py +++ b/tests/integration/test_webhook.py @@ -9,7 +9,7 @@ from apify_client import ApifyClient, ApifyClientAsync -from ._utils import collect_iterate_until_present, maybe_await +from .._utils import collect_iterate_until_present, maybe_await from apify_client._models import ( ListOfRuns, ListOfWebhookDispatches, diff --git a/tests/integration/test_webhook_dispatch.py b/tests/integration/test_webhook_dispatch.py index 4101267d..8b46aaff 100644 --- a/tests/integration/test_webhook_dispatch.py +++ b/tests/integration/test_webhook_dispatch.py @@ -5,7 +5,7 @@ from collections.abc import AsyncIterator, Iterator from typing import TYPE_CHECKING -from ._utils import maybe_await +from .._utils import maybe_await from apify_client._models import ListOfWebhookDispatches, WebhookDispatch if TYPE_CHECKING: