From 6c53471de2fa66ee4c5fd8826053feb209753617 Mon Sep 17 00:00:00 2001 From: Eric Tang <46737979+erictang000@users.noreply.github.com> Date: Wed, 9 Apr 2025 20:31:35 -0700 Subject: [PATCH] [data] support for specifying a dataset in cloud storage (#7567) * add support for loading datasets from s3/gcs * add comments to readme * run linter and address comments * add option to pass in kwargs to ray init (i.e. runtime env) * address comment * revert mixed up changes --- README.md | 2 +- data/README.md | 7 +++-- src/llamafactory/data/data_utils.py | 49 +++++++++++++++++++++++++++++ src/llamafactory/data/loader.py | 9 ++++-- src/llamafactory/data/parser.py | 2 ++ 5 files changed, 63 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 295c15d2..b836487e 100644 --- a/README.md +++ b/README.md @@ -554,7 +554,7 @@ pip install . ### Data Preparation -Please refer to [data/README.md](data/README.md) for checking the details about the format of dataset files. You can either use datasets on HuggingFace / ModelScope / Modelers hub or load the dataset in local disk. +Please refer to [data/README.md](data/README.md) for checking the details about the format of dataset files. You can use datasets on HuggingFace / ModelScope / Modelers hub, load the dataset in local disk, or specify a path to s3/gcs cloud storage. > [!NOTE] > Please update `data/dataset_info.json` to use your custom dataset. diff --git a/data/README.md b/data/README.md index 3eeda044..de04335a 100644 --- a/data/README.md +++ b/data/README.md @@ -4,9 +4,10 @@ Currently we support datasets in **alpaca** and **sharegpt** format. ```json "dataset_name": { - "hf_hub_url": "the name of the dataset repository on the Hugging Face hub. (if specified, ignore script_url and file_name)", - "ms_hub_url": "the name of the dataset repository on the Model Scope hub. (if specified, ignore script_url and file_name)", - "script_url": "the name of the directory containing a dataset loading script. (if specified, ignore file_name)", + "hf_hub_url": "the name of the dataset repository on the Hugging Face hub. (if specified, ignore script_url, file_name and cloud_file_name)", + "ms_hub_url": "the name of the dataset repository on the Model Scope hub. (if specified, ignore script_url, file_name and cloud_file_name)", + "script_url": "the name of the directory containing a dataset loading script. (if specified, ignore file_name and cloud_file_name)", + "cloud_file_name": "the name of the dataset file in s3/gcs cloud storage. (if specified, ignore file_name)", "file_name": "the name of the dataset folder or dataset file in this directory. (required if above are not specified)", "formatting": "the format of the dataset. (optional, default: alpaca, can be chosen from {alpaca, sharegpt})", "ranking": "whether the dataset is a preference dataset or not. (default: False)", diff --git a/src/llamafactory/data/data_utils.py b/src/llamafactory/data/data_utils.py index d3184fb6..3ea4aff7 100644 --- a/src/llamafactory/data/data_utils.py +++ b/src/llamafactory/data/data_utils.py @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json from enum import Enum, unique from typing import TYPE_CHECKING, Optional, TypedDict, Union +import fsspec from datasets import DatasetDict, concatenate_datasets, interleave_datasets from ..extras import logging @@ -138,3 +140,50 @@ def get_dataset_module(dataset: Union["Dataset", "DatasetDict"]) -> "DatasetModu dataset_module["train_dataset"] = dataset return dataset_module + + +def setup_fs(path, anon=False): + """Set up a filesystem object based on the path protocol.""" + storage_options = {"anon": anon} if anon else {} + + if path.startswith("s3://"): + fs = fsspec.filesystem("s3", **storage_options) + elif path.startswith(("gs://", "gcs://")): + fs = fsspec.filesystem("gcs", **storage_options) + else: + raise ValueError(f"Unsupported protocol in path: {path}. Use 's3://' or 'gs://'") + return fs + + +def read_cloud_json(cloud_path): + """Read a JSON/JSONL file from cloud storage (S3 or GCS). + + Args: + cloud_path : str + Cloud path in the format: + - 's3://bucket-name/file.json' for AWS S3 + - 'gs://bucket-name/file.jsonl' or 'gcs://bucket-name/file.jsonl' for Google Cloud Storage + lines : bool, default=True + If True, read the file as JSON Lines format (one JSON object per line) + """ + try: + # Try with anonymous access first + fs = setup_fs(cloud_path, anon=True) + return _read_json_with_fs(fs, cloud_path, lines=cloud_path.endswith(".jsonl")) + except Exception: + # Try again with credentials + fs = setup_fs(cloud_path) + return _read_json_with_fs(fs, cloud_path, lines=cloud_path.endswith(".jsonl")) + + +def _read_json_with_fs(fs, path, lines=True): + """Helper function to read JSON/JSONL files using fsspec.""" + with fs.open(path, "r") as f: + if lines: + # Read JSONL (JSON Lines) format - one JSON object per line + data = [json.loads(line) for line in f if line.strip()] + else: + # Read regular JSON format + data = json.load(f) + + return data diff --git a/src/llamafactory/data/loader.py b/src/llamafactory/data/loader.py index 9bfc55e3..29eb5f17 100644 --- a/src/llamafactory/data/loader.py +++ b/src/llamafactory/data/loader.py @@ -16,13 +16,13 @@ import os from typing import TYPE_CHECKING, Literal, Optional, Union import numpy as np -from datasets import load_dataset, load_from_disk +from datasets import Dataset, load_dataset, load_from_disk from ..extras import logging from ..extras.constants import FILEEXT2TYPE from ..extras.misc import check_version, has_tokenized_data from .converter import align_dataset -from .data_utils import get_dataset_module, merge_dataset, split_dataset +from .data_utils import get_dataset_module, merge_dataset, read_cloud_json, split_dataset from .parser import get_dataset_list from .processor import ( FeedbackDatasetProcessor, @@ -67,6 +67,9 @@ def _load_single_dataset( data_name = dataset_attr.subset data_dir = dataset_attr.folder + elif dataset_attr.load_from == "cloud_file": + data_path = dataset_attr.dataset_name + elif dataset_attr.load_from == "file": data_files = [] local_path = os.path.join(data_args.dataset_dir, dataset_attr.dataset_name) @@ -122,6 +125,8 @@ def _load_single_dataset( token=model_args.om_hub_token, streaming=data_args.streaming, ) + elif dataset_attr.load_from == "cloud_file": + dataset = Dataset.from_list(read_cloud_json(data_path), split=dataset_attr.split) else: dataset = load_dataset( path=data_path, diff --git a/src/llamafactory/data/parser.py b/src/llamafactory/data/parser.py index 27bff26a..d7220c7b 100644 --- a/src/llamafactory/data/parser.py +++ b/src/llamafactory/data/parser.py @@ -141,6 +141,8 @@ def get_dataset_list(dataset_names: Optional[list[str]], dataset_dir: str) -> li dataset_attr = DatasetAttr("hf_hub", dataset_name=dataset_info[name]["hf_hub_url"]) elif "script_url" in dataset_info[name]: dataset_attr = DatasetAttr("script", dataset_name=dataset_info[name]["script_url"]) + elif "cloud_file_name" in dataset_info[name]: + dataset_attr = DatasetAttr("cloud_file", dataset_name=dataset_info[name]["cloud_file_name"]) else: dataset_attr = DatasetAttr("file", dataset_name=dataset_info[name]["file_name"])