From 87e17011f8c909856fd02a8de81adf9a7321c056 Mon Sep 17 00:00:00 2001 From: elronbandel Date: Tue, 4 Mar 2025 09:52:04 +0200 Subject: [PATCH 1/3] Use elaborated cache key and use it for filelock Signed-off-by: elronbandel --- src/unitxt/api.py | 43 +++++++++++++++++++++++++++++++++-------- src/unitxt/dataclass.py | 29 +++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 8 deletions(-) diff --git a/src/unitxt/api.py b/src/unitxt/api.py index 18daf6055..90932beb3 100644 --- a/src/unitxt/api.py +++ b/src/unitxt/api.py @@ -11,6 +11,7 @@ from .artifact import fetch_artifact from .benchmark import Benchmark from .card import TaskCard +from .dataclass import to_dict from .dataset_utils import get_dataset_artifact from .error_utils import UnitxtError from .inference import ( @@ -135,43 +136,69 @@ def create_dataset( card = TaskCard(loader=LoadFromDictionary(data=data, data_classification_policy=data_classification_policy), task=task) return load_dataset(card=card, split=split, **kwargs) +def to_str(obj): + obj_str = str(obj) + if " at 0x" in obj_str: + obj_str = obj_str.split(" at 0x")[0] + ">" + return obj_str def _source_to_dataset( source: SourceOperator, split=None, use_cache=False, streaming=False, + lock_timeout=60, # Timeout in seconds for acquiring the lock ): + import json + import os + + import filelock + from .dataset import Dataset as UnitxtDataset + # Generate a unique signature for the source + source_signature = json.dumps(to_dict(source, to_str), sort_keys=True) + config_name = "recipe-" + short_hex_hash(source_signature) + stream = source() try: ds_builder = UnitxtDataset( dataset_name="unitxt", - config_name="recipe-" + short_hex_hash(repr(source)), + config_name=config_name, version=constants.version, ) + if split is not None: stream = {split: stream[split]} + ds_builder._generators = stream - ds_builder.download_and_prepare( - verification_mode="no_checks", - download_mode=None if use_cache else "force_redownload", - ) + # Create a lock file path based on the dataset configuration + lock_file = os.path.join(os.path.expanduser("~"), ".cache", "unitxt", f"{config_name}.lock") + os.makedirs(os.path.dirname(lock_file), exist_ok=True) + + # Create a file lock + lock = filelock.FileLock(lock_file, timeout=lock_timeout) + + # Only protect the download_and_prepare operation with the lock + try: + with lock: + ds_builder.download_and_prepare( + verification_mode="no_checks", + download_mode=None if use_cache else "force_redownload", + ) + except filelock.Timeout: + raise TimeoutError(f"Could not acquire lock for {config_name} within {lock_timeout} seconds. Another process may be preparing the same dataset.") if streaming: return ds_builder.as_streaming_dataset(split=split) - return ds_builder.as_dataset( split=split, run_post_process=False, verification_mode="no_checks" ) - except DatasetGenerationError as e: raise e.__cause__ - def load_dataset( dataset_query: Optional[str] = None, split: Optional[str] = None, diff --git a/src/unitxt/dataclass.py b/src/unitxt/dataclass.py index d77ebc6c6..cc452ce60 100644 --- a/src/unitxt/dataclass.py +++ b/src/unitxt/dataclass.py @@ -296,6 +296,35 @@ def _asdict_inner(obj): return copy.deepcopy(obj) +def to_dict(obj, func=copy.deepcopy, _visited=None): + # Initialize visited set on first call + if _visited is None: + _visited = set() + + # Get object ID to track visited objects + obj_id = id(obj) + + # If we've seen this object before, return a placeholder to avoid infinite recursion + if obj_id in _visited: + return func(obj) + + # For mutable objects, add to visited set before recursing + if isinstance(obj, (dict, list)) or is_dataclass(obj) or (isinstance(obj, tuple) and hasattr(obj, "_fields")): + _visited.add(obj_id) + + if is_dataclass(obj): + return {field.name: to_dict(getattr(obj, field.name), func, _visited) for field in fields(obj)} + + if isinstance(obj, tuple) and hasattr(obj, "_fields"): # named tuple + return type(obj)(*[to_dict(v, func, _visited) for v in obj]) + + if isinstance(obj, (list, tuple)): + return type(obj)([to_dict(v, func, _visited) for v in obj]) + + if isinstance(obj, dict): + return type(obj)({to_dict(k, func, _visited): to_dict(v, func, _visited) for k, v in obj.items()}) + + return func(obj) class DataclassMeta(ABCMeta): """Metaclass for Dataclass. From cc120a3c7bce2eda6f2511cad2881896e17f916b Mon Sep 17 00:00:00 2001 From: elronbandel Date: Tue, 4 Mar 2025 13:06:49 +0200 Subject: [PATCH 2/3] Add documentation Signed-off-by: elronbandel --- src/unitxt/api.py | 27 +++++++++++++++++++++++++-- src/unitxt/dataclass.py | 16 ++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/src/unitxt/api.py b/src/unitxt/api.py index 90932beb3..c250b091c 100644 --- a/src/unitxt/api.py +++ b/src/unitxt/api.py @@ -136,7 +136,30 @@ def create_dataset( card = TaskCard(loader=LoadFromDictionary(data=data, data_classification_policy=data_classification_policy), task=task) return load_dataset(card=card, split=split, **kwargs) -def to_str(obj): +def object_to_str_without_addresses(obj): + """Generates a string representation of a Python object while removing memory address references. + + This function is useful for creating consistent and comparable string representations of objects + that would otherwise include memory addresses (e.g., ``), which can vary + between executions. By stripping the memory address, the function ensures that the representation + is stable and independent of the object's location in memory. + + Args: + obj: Any Python object to be converted to a string representation. + + Returns: + str: A string representation of the object with memory addresses removed if present. + + Example: + ```python + class MyClass: + pass + + obj = MyClass() + print(str(obj)) # "<__main__.MyClass object at 0x7f8b9d4d6e20>" + print(to_str_without_addresses(obj)) # "<__main__.MyClass object>" + ``` + """ obj_str = str(obj) if " at 0x" in obj_str: obj_str = obj_str.split(" at 0x")[0] + ">" @@ -157,7 +180,7 @@ def _source_to_dataset( from .dataset import Dataset as UnitxtDataset # Generate a unique signature for the source - source_signature = json.dumps(to_dict(source, to_str), sort_keys=True) + source_signature = json.dumps(to_dict(source, object_to_str_without_addresses), sort_keys=True) config_name = "recipe-" + short_hex_hash(source_signature) stream = source() diff --git a/src/unitxt/dataclass.py b/src/unitxt/dataclass.py index cc452ce60..b526da454 100644 --- a/src/unitxt/dataclass.py +++ b/src/unitxt/dataclass.py @@ -297,6 +297,22 @@ def _asdict_inner(obj): return copy.deepcopy(obj) def to_dict(obj, func=copy.deepcopy, _visited=None): + """Recursively converts an object into a dictionary representation while avoiding infinite recursion due to circular references. + + Args: + obj: Any Python object to be converted into a dictionary-like structure. + func (Callable, optional): A function applied to non-iterable objects. Defaults to `copy.deepcopy`. + _visited (set, optional): A set of object IDs used to track visited objects and prevent infinite recursion. + + Returns: + dict: A dictionary representation of the input object, with supported collections and dataclasses + recursively processed. + + Notes: + - Supports dataclasses, named tuples, lists, tuples, and dictionaries. + - Circular references are detected using object IDs and replaced by `func(obj)`. + - Named tuples retain their original type instead of being converted to dictionaries. + """ # Initialize visited set on first call if _visited is None: _visited = set() From 615d8debf0ebf89406a890c32dc2aab00b77086f Mon Sep 17 00:00:00 2001 From: elronbandel Date: Thu, 13 Mar 2025 16:38:13 +0200 Subject: [PATCH 3/3] Another try Signed-off-by: elronbandel --- src/unitxt/api.py | 60 +++++++++++++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/src/unitxt/api.py b/src/unitxt/api.py index c250b091c..b44b08428 100644 --- a/src/unitxt/api.py +++ b/src/unitxt/api.py @@ -1,12 +1,17 @@ import hashlib import inspect import json +import os +import random +import time from datetime import datetime from functools import lru_cache from typing import Any, Dict, List, Optional, Union +import filelock from datasets import Dataset, DatasetDict, IterableDataset, IterableDatasetDict from datasets.exceptions import DatasetGenerationError +from huggingface_hub import constants as hf_constants from .artifact import fetch_artifact from .benchmark import Benchmark @@ -172,16 +177,21 @@ def _source_to_dataset( streaming=False, lock_timeout=60, # Timeout in seconds for acquiring the lock ): - import json - import os - - import filelock - from .dataset import Dataset as UnitxtDataset # Generate a unique signature for the source source_signature = json.dumps(to_dict(source, object_to_str_without_addresses), sort_keys=True) config_name = "recipe-" + short_hex_hash(source_signature) + hf_cache_home = hf_constants.HF_HOME + lock_dir = os.path.join(hf_cache_home, "locks") + os.makedirs(lock_dir, exist_ok=True) + + # Create a lock file path based on the dataset configuration + lock_file = os.path.join(lock_dir, f"unitxt_{config_name}.lock") + + # Add retry logic + max_attempts = 5 + base_wait = 5 # seconds stream = source() @@ -197,28 +207,32 @@ def _source_to_dataset( ds_builder._generators = stream - # Create a lock file path based on the dataset configuration - lock_file = os.path.join(os.path.expanduser("~"), ".cache", "unitxt", f"{config_name}.lock") - os.makedirs(os.path.dirname(lock_file), exist_ok=True) - # Create a file lock - lock = filelock.FileLock(lock_file, timeout=lock_timeout) + for attempt in range(max_attempts): + # Create a file lock with appropriate timeout + lock = filelock.FileLock(lock_file, timeout=300) # 5 minutes - # Only protect the download_and_prepare operation with the lock - try: - with lock: - ds_builder.download_and_prepare( - verification_mode="no_checks", - download_mode=None if use_cache else "force_redownload", + try: + with lock: + ds_builder.download_and_prepare( + verification_mode="no_checks", + download_mode=None if use_cache else "force_redownload", + ) + + # If we reach here, the lock was successfully acquired and released + if streaming: + return ds_builder.as_streaming_dataset(split=split) + return ds_builder.as_dataset( + split=split, run_post_process=False, verification_mode="no_checks" ) - except filelock.Timeout: - raise TimeoutError(f"Could not acquire lock for {config_name} within {lock_timeout} seconds. Another process may be preparing the same dataset.") - if streaming: - return ds_builder.as_streaming_dataset(split=split) - return ds_builder.as_dataset( - split=split, run_post_process=False, verification_mode="no_checks" - ) + except filelock.Timeout: + if attempt < max_attempts - 1: # Not the last attempt + wait_time = base_wait * (2 ** attempt) + random.uniform(0, 1) + time.sleep(wait_time) + else: + raise TimeoutError(f"Could not acquire lock for {config_name} after {max_attempts} attempts") + except DatasetGenerationError as e: raise e.__cause__