diff --git a/src/apify/_actor.py b/src/apify/_actor.py index d675f1bd..f829fa54 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -14,6 +14,7 @@ from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars from apify_shared.utils import ignore_docs, maybe_extract_enum_member_value from crawlee import service_locator +from crawlee._utils.context import ensure_context from crawlee.events import ( Event, EventAbortingData, @@ -97,7 +98,7 @@ def __init__( ) ) - self._is_initialized = False + self._active = False @ignore_docs async def __aenter__(self) -> Self: @@ -142,6 +143,11 @@ def __call__(self, configuration: Configuration | None = None, *, configure_logg """Make a new Actor instance with a non-default configuration.""" return self.__class__(configuration=configuration, configure_logging=configure_logging) + @property + def active(self) -> bool: + """Indicate whether the context is active.""" + return self._active + @property def apify_client(self) -> ApifyClientAsync: """The ApifyClientAsync instance the Actor instance uses.""" @@ -172,10 +178,6 @@ def _local_storage_client(self) -> BaseStorageClient: """The local storage client the Actor instance uses.""" return service_locator.get_storage_client() - def _raise_if_not_initialized(self) -> None: - if not self._is_initialized: - raise RuntimeError('The Actor was not initialized!') - def _raise_if_cloud_requested_but_not_configured(self, *, force_cloud: bool) -> None: if not force_cloud: return @@ -197,7 +199,7 @@ async def init(self) -> None: This method should be called immediately before performing any additional Actor actions, and it should be called only once. """ - if self._is_initialized: + if self._active: raise RuntimeError('The Actor was already initialized!') self._is_exiting = False @@ -221,9 +223,9 @@ async def init(self) -> None: # https://github.com/apify/apify-sdk-python/issues/146 await self._event_manager.__aenter__() + self._active = True - self._is_initialized = True - + @ensure_context async def exit( self, *, @@ -243,12 +245,8 @@ async def exit( status_message: The final status message that the Actor should display. cleanup_timeout: How long we should wait for event listeners. """ - self._raise_if_not_initialized() - self._is_exiting = True - exit_code = maybe_extract_enum_member_value(exit_code) - self.log.info('Exiting Actor', extra={'exit_code': exit_code}) async def finalize() -> None: @@ -264,7 +262,7 @@ async def finalize() -> None: await self._event_manager.__aexit__(None, None, None) await asyncio.wait_for(finalize(), cleanup_timeout.total_seconds()) - self._is_initialized = False + self._active = False if is_running_in_ipython(): self.log.debug(f'Not calling sys.exit({exit_code}) because Actor is running in IPython') @@ -275,6 +273,7 @@ async def finalize() -> None: else: sys.exit(exit_code) + @ensure_context async def fail( self, *, @@ -291,8 +290,6 @@ async def fail( exception: The exception with which the Actor failed. status_message: The final status message that the Actor should display. """ - self._raise_if_not_initialized() - # In IPython, we don't run `sys.exit()` during Actor exits, # so the exception traceback will be printed on its own if exception and not is_running_in_ipython(): @@ -338,6 +335,7 @@ def new_client( timeout_secs=int(timeout.total_seconds()) if timeout else None, ) + @ensure_context async def open_dataset( self, *, @@ -362,7 +360,6 @@ async def open_dataset( Returns: An instance of the `Dataset` class for the given ID or name. """ - self._raise_if_not_initialized() self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud) storage_client = self._cloud_storage_client if force_cloud else self._local_storage_client @@ -374,6 +371,7 @@ async def open_dataset( storage_client=storage_client, ) + @ensure_context async def open_key_value_store( self, *, @@ -397,7 +395,6 @@ async def open_key_value_store( Returns: An instance of the `KeyValueStore` class for the given ID or name. """ - self._raise_if_not_initialized() self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud) storage_client = self._cloud_storage_client if force_cloud else self._local_storage_client @@ -408,6 +405,7 @@ async def open_key_value_store( storage_client=storage_client, ) + @ensure_context async def open_request_queue( self, *, @@ -433,7 +431,6 @@ async def open_request_queue( Returns: An instance of the `RequestQueue` class for the given ID or name. """ - self._raise_if_not_initialized() self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud) storage_client = self._cloud_storage_client if force_cloud else self._local_storage_client @@ -445,24 +442,22 @@ async def open_request_queue( storage_client=storage_client, ) + @ensure_context async def push_data(self, data: dict | list[dict]) -> None: """Store an object or a list of objects to the default dataset of the current Actor run. Args: data: The data to push to the default dataset. """ - self._raise_if_not_initialized() - if not data: return dataset = await self.open_dataset() await dataset.push_data(data) + @ensure_context async def get_input(self) -> Any: """Get the Actor input value from the default key-value store associated with the current Actor run.""" - self._raise_if_not_initialized() - input_value = await self.get_value(self._configuration.input_key) input_secrets_private_key = self._configuration.input_secrets_private_key_file input_secrets_key_passphrase = self._configuration.input_secrets_private_key_passphrase @@ -475,6 +470,7 @@ async def get_input(self) -> Any: return input_value + @ensure_context async def get_value(self, key: str, default_value: Any = None) -> Any: """Get a value from the default key-value store associated with the current Actor run. @@ -482,11 +478,10 @@ async def get_value(self, key: str, default_value: Any = None) -> Any: key: The key of the record which to retrieve. default_value: Default value returned in case the record does not exist. """ - self._raise_if_not_initialized() - key_value_store = await self.open_key_value_store() return await key_value_store.get_value(key, default_value) + @ensure_context async def set_value( self, key: str, @@ -501,8 +496,6 @@ async def set_value( value: The value of the record which to set, or None, if the record should be deleted. content_type: The content type which should be set to the value. """ - self._raise_if_not_initialized() - key_value_store = await self.open_key_value_store() return await key_value_store.set_value(key, value, content_type=content_type) @@ -529,6 +522,7 @@ def on( @overload def on(self, event_name: Event, listener: EventListener[None]) -> EventListener[Any]: ... + @ensure_context def on(self, event_name: Event, listener: EventListener[Any]) -> EventListener[Any]: """Add an event listener to the Actor's event manager. @@ -553,8 +547,6 @@ def on(self, event_name: Event, listener: EventListener[Any]) -> EventListener[A event_name: The Actor event to listen for. listener: The function to be called when the event is emitted (can be async). """ - self._raise_if_not_initialized() - self._event_manager.on(event=event_name, listener=listener) return listener @@ -571,6 +563,7 @@ def off(self, event_name: Literal[Event.EXIT], listener: EventListener[EventExit @overload def off(self, event_name: Event, listener: EventListener[None]) -> None: ... + @ensure_context def off(self, event_name: Event, listener: Callable | None = None) -> None: """Remove a listener, or all listeners, from an Actor event. @@ -579,14 +572,13 @@ def off(self, event_name: Event, listener: Callable | None = None) -> None: listener: The listener which is supposed to be removed. If not passed, all listeners of this event are removed. """ - self._raise_if_not_initialized() - self._event_manager.off(event=event_name, listener=listener) def is_at_home(self) -> bool: """Return `True` when the Actor is running on the Apify platform, and `False` otherwise (e.g. local run).""" return self._configuration.is_at_home + @ensure_context def get_env(self) -> dict: """Return a dictionary with information parsed from all the `APIFY_XXX` environment variables. @@ -594,8 +586,6 @@ def get_env(self) -> dict: [Actor documentation](https://docs.apify.com/actors/development/environment-variables). If some variables are not defined or are invalid, the corresponding value in the resulting dictionary will be None. """ - self._raise_if_not_initialized() - config = dict[str, Any]() for field_name, field in Configuration.model_fields.items(): if field.deprecated: @@ -616,6 +606,7 @@ def get_env(self) -> dict: env_vars = {env_var.value.lower(): env_var.name.lower() for env_var in [*ActorEnvVars, *ApifyEnvVars]} return {option_name: config[env_var] for env_var, option_name in env_vars.items() if env_var in config} + @ensure_context async def start( self, actor_id: str, @@ -653,8 +644,6 @@ async def start( Returns: Info about the started Actor run """ - self._raise_if_not_initialized() - client = self.new_client(token=token) if token else self._apify_client if webhooks: @@ -676,6 +665,7 @@ async def start( return ActorRun.model_validate(api_result) + @ensure_context async def abort( self, run_id: str, @@ -699,8 +689,6 @@ async def abort( Returns: Info about the aborted Actor run. """ - self._raise_if_not_initialized() - client = self.new_client(token=token) if token else self._apify_client if status_message: @@ -710,6 +698,7 @@ async def abort( return ActorRun.model_validate(api_result) + @ensure_context async def call( self, actor_id: str, @@ -747,8 +736,6 @@ async def call( Returns: Info about the started Actor run. """ - self._raise_if_not_initialized() - client = self.new_client(token=token) if token else self._apify_client if webhooks: @@ -770,6 +757,7 @@ async def call( return ActorRun.model_validate(api_result) + @ensure_context async def call_task( self, task_id: str, @@ -809,8 +797,6 @@ async def call_task( Returns: Info about the started Actor run. """ - self._raise_if_not_initialized() - client = self.new_client(token=token) if token else self._apify_client if webhooks: @@ -831,6 +817,7 @@ async def call_task( return ActorRun.model_validate(api_result) + @ensure_context async def metamorph( self, target_actor_id: str, @@ -855,8 +842,6 @@ async def metamorph( content_type: The content type of the input. custom_after_sleep: How long to sleep for after the metamorph, to wait for the container to be stopped. """ - self._raise_if_not_initialized() - if not self.is_at_home(): self.log.error('Actor.metamorph() is only supported when running on the Apify platform.') return @@ -878,6 +863,7 @@ async def metamorph( if custom_after_sleep: await asyncio.sleep(custom_after_sleep.total_seconds()) + @ensure_context async def reboot( self, *, @@ -892,8 +878,6 @@ async def reboot( event_listeners_timeout: How long should the Actor wait for Actor event listeners to finish before exiting. custom_after_sleep: How long to sleep for after the reboot, to wait for the container to be stopped. """ - self._raise_if_not_initialized() - if not self.is_at_home(): self.log.error('Actor.reboot() is only supported when running on the Apify platform.') return @@ -933,6 +917,7 @@ async def reboot( if custom_after_sleep: await asyncio.sleep(custom_after_sleep.total_seconds()) + @ensure_context async def add_webhook( self, webhook: Webhook, @@ -960,8 +945,6 @@ async def add_webhook( Returns: The created webhook. """ - self._raise_if_not_initialized() - if not self.is_at_home(): self.log.error('Actor.add_webhook() is only supported when running on the Apify platform.') return @@ -980,6 +963,7 @@ async def add_webhook( idempotency_key=idempotency_key, ) + @ensure_context async def set_status_message( self, status_message: str, @@ -995,8 +979,6 @@ async def set_status_message( Returns: The updated Actor run object. """ - self._raise_if_not_initialized() - if not self.is_at_home(): title = 'Terminal status message' if is_terminal else 'Status message' self.log.info(f'[{title}]: {status_message}') @@ -1012,6 +994,7 @@ async def set_status_message( return ActorRun.model_validate(api_result) + @ensure_context async def create_proxy_configuration( self, *, @@ -1044,8 +1027,6 @@ async def create_proxy_configuration( ProxyConfiguration object with the passed configuration, or None, if no proxy should be used based on the configuration. """ - self._raise_if_not_initialized() - if actor_proxy_input is not None: if actor_proxy_input.get('useApifyProxy', False): country_code = country_code or actor_proxy_input.get('apifyProxyCountry') diff --git a/tests/integration/test_actor_lifecycle.py b/tests/integration/test_actor_lifecycle.py index 7a975c99..1f436440 100644 --- a/tests/integration/test_actor_lifecycle.py +++ b/tests/integration/test_actor_lifecycle.py @@ -15,7 +15,7 @@ async def test_actor_init_and_double_init_prevention( async def main() -> None: my_actor = Actor await my_actor.init() - assert my_actor._is_initialized is True + assert my_actor.active is True double_init = False try: await my_actor.init() @@ -33,7 +33,7 @@ async def main() -> None: raise await my_actor.exit() assert double_init is False - assert my_actor._is_initialized is False + assert my_actor.active is False actor = await make_actor(label='actor-init', main_func=main) run_result = await run_actor(actor) @@ -49,8 +49,8 @@ async def main() -> None: import apify._actor async with Actor: - assert apify._actor.Actor._is_initialized - assert apify._actor.Actor._is_initialized is False + assert apify._actor.Actor.active + assert apify._actor.Actor.active is False actor = await make_actor(label='with-actor-init', main_func=main) run_result = await run_actor(actor) diff --git a/tests/unit/actor/test_actor_lifecycle.py b/tests/unit/actor/test_actor_lifecycle.py index 33af45e6..dfdef40b 100644 --- a/tests/unit/actor/test_actor_lifecycle.py +++ b/tests/unit/actor/test_actor_lifecycle.py @@ -21,37 +21,37 @@ async def test_actor_properly_init_with_async() -> None: async with Actor: assert cast(Proxy, apify._actor.Actor).__wrapped__ is not None - assert cast(Proxy, apify._actor.Actor).__wrapped__._is_initialized - assert not cast(Proxy, apify._actor.Actor).__wrapped__._is_initialized + assert cast(Proxy, apify._actor.Actor).__wrapped__.active + assert not cast(Proxy, apify._actor.Actor).__wrapped__.active async def test_actor_init() -> None: my_actor = _ActorType() await my_actor.init() - assert my_actor._is_initialized is True + assert my_actor.active is True await my_actor.exit() - assert my_actor._is_initialized is False + assert my_actor.active is False async def test_double_init_raises_error(prepare_test_env: Callable) -> None: async with Actor: - assert Actor._is_initialized + assert Actor.active with pytest.raises(RuntimeError): await Actor.init() prepare_test_env() async with Actor() as actor: - assert actor._is_initialized + assert actor.active with pytest.raises(RuntimeError): await actor.init() prepare_test_env() async with _ActorType() as actor: - assert actor._is_initialized + assert actor.active with pytest.raises(RuntimeError): await actor.init() @@ -73,7 +73,7 @@ def on_event(event_type: Event) -> Callable: my_actor = _ActorType() async with my_actor: - assert my_actor._is_initialized + assert my_actor.active my_actor.on(Event.PERSIST_STATE, on_event(Event.PERSIST_STATE)) my_actor.on(Event.SYSTEM_INFO, on_event(Event.SYSTEM_INFO)) await asyncio.sleep(1) @@ -96,9 +96,9 @@ async def test_exit_without_init_raises_error() -> None: async def test_actor_fails_cleanly() -> None: async with _ActorType() as my_actor: - assert my_actor._is_initialized + assert my_actor.active await my_actor.fail() - assert my_actor._is_initialized is False + assert my_actor.active is False async def test_actor_handles_failure_gracefully() -> None: @@ -106,11 +106,11 @@ async def test_actor_handles_failure_gracefully() -> None: with contextlib.suppress(Exception): async with _ActorType() as my_actor: - assert my_actor._is_initialized + assert my_actor.active raise Exception('Failed') # noqa: TRY002 assert my_actor is not None - assert my_actor._is_initialized is False + assert my_actor.active is False async def test_fail_without_init_raises_error() -> None: