[data] shard the dataset to allow multiprocessing when streaming is enabled (#7530)

* Shard the dataset when streaming to allow multiprocessing

* Allow user to not set dataset_shards to ensure backward compatibility
This commit is contained in:
Billy Cao 2025-04-01 15:36:23 +08:00 committed by GitHub
parent 6d6e0f44fc
commit 5d1cc863a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 12 additions and 4 deletions

View File

@ -204,7 +204,7 @@ Compared to ChatGLM's [P-Tuning](https://github.com/THUDM/ChatGLM2-6B/tree/main/
[23/08/11] We supported **[DPO training](https://arxiv.org/abs/2305.18290)** for instruction-tuned models. See [examples](examples/README.md) for usage.
[23/07/31] We supported **dataset streaming**. Try `streaming: true` and `max_steps: 10000` arguments to load your dataset in streaming mode.
[23/07/31] We supported **dataset streaming**. Try `streaming: true` and `max_steps: 10000` arguments to load your dataset in streaming mode. Use `dataset_shards` to enable parallel preprocessing with streaming.
[23/07/29] We released two instruction-tuned 13B models at Hugging Face. See these Hugging Face Repos ([LLaMA-2](https://huggingface.co/hiyouga/Llama-2-Chinese-13b-chat) / [Baichuan](https://huggingface.co/hiyouga/Baichuan-13B-sft)) for details.

View File

@ -206,7 +206,7 @@ https://github.com/user-attachments/assets/43b700c6-a178-41db-b1f8-8190a5d3fcfc
[23/08/11] 我们支持了指令模型的 **[DPO 训练](https://arxiv.org/abs/2305.18290)**。详细用法请参照 [examples](examples/README_zh.md)。
[23/07/31] 我们支持了**数据流式加载**。请使用 `streaming: true``max_steps: 10000` 参数来流式加载数据集。
[23/07/31] 我们支持了**数据流式加载**。请使用 `streaming: true``max_steps: 10000` 参数来流式加载数据集。`dataset_shards` 来开启多进程加载。
[23/07/29] 我们在 Hugging Face 发布了两个 13B 指令微调模型。详细内容请查阅我们的 Hugging Face 项目([LLaMA-2](https://huggingface.co/hiyouga/Llama-2-Chinese-13b-chat) / [Baichuan](https://huggingface.co/hiyouga/Baichuan-13B-sft))。

View File

@ -101,10 +101,12 @@ def _load_single_dataset(
split=dataset_attr.split,
cache_dir=cache_dir,
token=model_args.ms_hub_token,
use_streaming=data_args.streaming,
use_streaming=data_args.streaming and not data_args.dataset_shards, # only set to True when user specified streaming but do not want dataset to be sharded
)
if isinstance(dataset, MsDataset):
dataset = dataset.to_hf_dataset()
if data_args.streaming and data_args.dataset_shards:
dataset = dataset.to_iterable_dataset(num_shards=data_args.dataset_shards)
elif dataset_attr.load_from == "om_hub":
check_version("openmind>=0.8.0", mandatory=True)
@ -131,10 +133,12 @@ def _load_single_dataset(
split=dataset_attr.split,
cache_dir=model_args.cache_dir,
token=model_args.hf_hub_token,
streaming=data_args.streaming,
num_proc=data_args.preprocessing_num_workers,
trust_remote_code=model_args.trust_remote_code,
streaming=data_args.streaming and not data_args.dataset_shards,
)
if data_args.streaming and data_args.dataset_shards:
dataset = dataset.to_iterable_dataset(num_shards=data_args.dataset_shards)
if dataset_attr.num_samples is not None and not data_args.streaming:
target_num = dataset_attr.num_samples

View File

@ -83,6 +83,10 @@ class DataArguments:
default=None,
metadata={"help": "The number of processes to use for the pre-processing."},
)
dataset_shards: Optional[int] = field(
default=None,
metadata={"help": "The number of shards to split the dataset into. Only used in streaming mode. This should be set to the same as dataloader_num_workers. Not setting this while streaming data will cause the dataset to be non-sharded and thus only can be processed using one worker."},
)
max_samples: Optional[int] = field(
default=None,
metadata={"help": "For debugging purposes, truncate the number of examples for each dataset."},