mirror of
https://github.com/hiyouga/LLaMA-Factory.git
synced 2025-08-02 11:42:49 +08:00
[data] support for specifying a dataset in cloud storage (#7567)
* add support for loading datasets from s3/gcs * add comments to readme * run linter and address comments * add option to pass in kwargs to ray init (i.e. runtime env) * address comment * revert mixed up changes
This commit is contained in:
parent
39c1e29ed7
commit
6c53471de2
@ -554,7 +554,7 @@ pip install .
|
|||||||
|
|
||||||
### Data Preparation
|
### Data Preparation
|
||||||
|
|
||||||
Please refer to [data/README.md](data/README.md) for checking the details about the format of dataset files. You can either use datasets on HuggingFace / ModelScope / Modelers hub or load the dataset in local disk.
|
Please refer to [data/README.md](data/README.md) for checking the details about the format of dataset files. You can use datasets on HuggingFace / ModelScope / Modelers hub, load the dataset in local disk, or specify a path to s3/gcs cloud storage.
|
||||||
|
|
||||||
> [!NOTE]
|
> [!NOTE]
|
||||||
> Please update `data/dataset_info.json` to use your custom dataset.
|
> Please update `data/dataset_info.json` to use your custom dataset.
|
||||||
|
@ -4,9 +4,10 @@ Currently we support datasets in **alpaca** and **sharegpt** format.
|
|||||||
|
|
||||||
```json
|
```json
|
||||||
"dataset_name": {
|
"dataset_name": {
|
||||||
"hf_hub_url": "the name of the dataset repository on the Hugging Face hub. (if specified, ignore script_url and file_name)",
|
"hf_hub_url": "the name of the dataset repository on the Hugging Face hub. (if specified, ignore script_url, file_name and cloud_file_name)",
|
||||||
"ms_hub_url": "the name of the dataset repository on the Model Scope hub. (if specified, ignore script_url and file_name)",
|
"ms_hub_url": "the name of the dataset repository on the Model Scope hub. (if specified, ignore script_url, file_name and cloud_file_name)",
|
||||||
"script_url": "the name of the directory containing a dataset loading script. (if specified, ignore file_name)",
|
"script_url": "the name of the directory containing a dataset loading script. (if specified, ignore file_name and cloud_file_name)",
|
||||||
|
"cloud_file_name": "the name of the dataset file in s3/gcs cloud storage. (if specified, ignore file_name)",
|
||||||
"file_name": "the name of the dataset folder or dataset file in this directory. (required if above are not specified)",
|
"file_name": "the name of the dataset folder or dataset file in this directory. (required if above are not specified)",
|
||||||
"formatting": "the format of the dataset. (optional, default: alpaca, can be chosen from {alpaca, sharegpt})",
|
"formatting": "the format of the dataset. (optional, default: alpaca, can be chosen from {alpaca, sharegpt})",
|
||||||
"ranking": "whether the dataset is a preference dataset or not. (default: False)",
|
"ranking": "whether the dataset is a preference dataset or not. (default: False)",
|
||||||
|
@ -12,9 +12,11 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
from enum import Enum, unique
|
from enum import Enum, unique
|
||||||
from typing import TYPE_CHECKING, Optional, TypedDict, Union
|
from typing import TYPE_CHECKING, Optional, TypedDict, Union
|
||||||
|
|
||||||
|
import fsspec
|
||||||
from datasets import DatasetDict, concatenate_datasets, interleave_datasets
|
from datasets import DatasetDict, concatenate_datasets, interleave_datasets
|
||||||
|
|
||||||
from ..extras import logging
|
from ..extras import logging
|
||||||
@ -138,3 +140,50 @@ def get_dataset_module(dataset: Union["Dataset", "DatasetDict"]) -> "DatasetModu
|
|||||||
dataset_module["train_dataset"] = dataset
|
dataset_module["train_dataset"] = dataset
|
||||||
|
|
||||||
return dataset_module
|
return dataset_module
|
||||||
|
|
||||||
|
|
||||||
|
def setup_fs(path, anon=False):
|
||||||
|
"""Set up a filesystem object based on the path protocol."""
|
||||||
|
storage_options = {"anon": anon} if anon else {}
|
||||||
|
|
||||||
|
if path.startswith("s3://"):
|
||||||
|
fs = fsspec.filesystem("s3", **storage_options)
|
||||||
|
elif path.startswith(("gs://", "gcs://")):
|
||||||
|
fs = fsspec.filesystem("gcs", **storage_options)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Unsupported protocol in path: {path}. Use 's3://' or 'gs://'")
|
||||||
|
return fs
|
||||||
|
|
||||||
|
|
||||||
|
def read_cloud_json(cloud_path):
|
||||||
|
"""Read a JSON/JSONL file from cloud storage (S3 or GCS).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cloud_path : str
|
||||||
|
Cloud path in the format:
|
||||||
|
- 's3://bucket-name/file.json' for AWS S3
|
||||||
|
- 'gs://bucket-name/file.jsonl' or 'gcs://bucket-name/file.jsonl' for Google Cloud Storage
|
||||||
|
lines : bool, default=True
|
||||||
|
If True, read the file as JSON Lines format (one JSON object per line)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Try with anonymous access first
|
||||||
|
fs = setup_fs(cloud_path, anon=True)
|
||||||
|
return _read_json_with_fs(fs, cloud_path, lines=cloud_path.endswith(".jsonl"))
|
||||||
|
except Exception:
|
||||||
|
# Try again with credentials
|
||||||
|
fs = setup_fs(cloud_path)
|
||||||
|
return _read_json_with_fs(fs, cloud_path, lines=cloud_path.endswith(".jsonl"))
|
||||||
|
|
||||||
|
|
||||||
|
def _read_json_with_fs(fs, path, lines=True):
|
||||||
|
"""Helper function to read JSON/JSONL files using fsspec."""
|
||||||
|
with fs.open(path, "r") as f:
|
||||||
|
if lines:
|
||||||
|
# Read JSONL (JSON Lines) format - one JSON object per line
|
||||||
|
data = [json.loads(line) for line in f if line.strip()]
|
||||||
|
else:
|
||||||
|
# Read regular JSON format
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
|
return data
|
||||||
|
@ -16,13 +16,13 @@ import os
|
|||||||
from typing import TYPE_CHECKING, Literal, Optional, Union
|
from typing import TYPE_CHECKING, Literal, Optional, Union
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from datasets import load_dataset, load_from_disk
|
from datasets import Dataset, load_dataset, load_from_disk
|
||||||
|
|
||||||
from ..extras import logging
|
from ..extras import logging
|
||||||
from ..extras.constants import FILEEXT2TYPE
|
from ..extras.constants import FILEEXT2TYPE
|
||||||
from ..extras.misc import check_version, has_tokenized_data
|
from ..extras.misc import check_version, has_tokenized_data
|
||||||
from .converter import align_dataset
|
from .converter import align_dataset
|
||||||
from .data_utils import get_dataset_module, merge_dataset, split_dataset
|
from .data_utils import get_dataset_module, merge_dataset, read_cloud_json, split_dataset
|
||||||
from .parser import get_dataset_list
|
from .parser import get_dataset_list
|
||||||
from .processor import (
|
from .processor import (
|
||||||
FeedbackDatasetProcessor,
|
FeedbackDatasetProcessor,
|
||||||
@ -67,6 +67,9 @@ def _load_single_dataset(
|
|||||||
data_name = dataset_attr.subset
|
data_name = dataset_attr.subset
|
||||||
data_dir = dataset_attr.folder
|
data_dir = dataset_attr.folder
|
||||||
|
|
||||||
|
elif dataset_attr.load_from == "cloud_file":
|
||||||
|
data_path = dataset_attr.dataset_name
|
||||||
|
|
||||||
elif dataset_attr.load_from == "file":
|
elif dataset_attr.load_from == "file":
|
||||||
data_files = []
|
data_files = []
|
||||||
local_path = os.path.join(data_args.dataset_dir, dataset_attr.dataset_name)
|
local_path = os.path.join(data_args.dataset_dir, dataset_attr.dataset_name)
|
||||||
@ -122,6 +125,8 @@ def _load_single_dataset(
|
|||||||
token=model_args.om_hub_token,
|
token=model_args.om_hub_token,
|
||||||
streaming=data_args.streaming,
|
streaming=data_args.streaming,
|
||||||
)
|
)
|
||||||
|
elif dataset_attr.load_from == "cloud_file":
|
||||||
|
dataset = Dataset.from_list(read_cloud_json(data_path), split=dataset_attr.split)
|
||||||
else:
|
else:
|
||||||
dataset = load_dataset(
|
dataset = load_dataset(
|
||||||
path=data_path,
|
path=data_path,
|
||||||
|
@ -141,6 +141,8 @@ def get_dataset_list(dataset_names: Optional[list[str]], dataset_dir: str) -> li
|
|||||||
dataset_attr = DatasetAttr("hf_hub", dataset_name=dataset_info[name]["hf_hub_url"])
|
dataset_attr = DatasetAttr("hf_hub", dataset_name=dataset_info[name]["hf_hub_url"])
|
||||||
elif "script_url" in dataset_info[name]:
|
elif "script_url" in dataset_info[name]:
|
||||||
dataset_attr = DatasetAttr("script", dataset_name=dataset_info[name]["script_url"])
|
dataset_attr = DatasetAttr("script", dataset_name=dataset_info[name]["script_url"])
|
||||||
|
elif "cloud_file_name" in dataset_info[name]:
|
||||||
|
dataset_attr = DatasetAttr("cloud_file", dataset_name=dataset_info[name]["cloud_file_name"])
|
||||||
else:
|
else:
|
||||||
dataset_attr = DatasetAttr("file", dataset_name=dataset_info[name]["file_name"])
|
dataset_attr = DatasetAttr("file", dataset_name=dataset_info[name]["file_name"])
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user