diff --git a/src/llamafactory/data/data_utils.py b/src/llamafactory/data/data_utils.py index 76ded47e..4666aabc 100644 --- a/src/llamafactory/data/data_utils.py +++ b/src/llamafactory/data/data_utils.py @@ -13,16 +13,15 @@ # limitations under the License. from enum import Enum, unique -from typing import TYPE_CHECKING, Dict, List, Sequence, Set, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Set, TypedDict, Union -from datasets import concatenate_datasets, interleave_datasets +from datasets import DatasetDict, concatenate_datasets, interleave_datasets from ..extras.logging import get_logger if TYPE_CHECKING: from datasets import Dataset, IterableDataset - from transformers import Seq2SeqTrainingArguments from ..hparams import DataArguments @@ -42,24 +41,29 @@ class Role(str, Enum): OBSERVATION = "observation" +class DatasetModule(TypedDict): + train_dataset: Optional[Union["Dataset", "IterableDataset"]] + eval_dataset: Optional[Union["Dataset", "IterableDataset"]] + + def merge_dataset( - all_datasets: List[Union["Dataset", "IterableDataset"]], - data_args: "DataArguments", - training_args: "Seq2SeqTrainingArguments", + all_datasets: List[Union["Dataset", "IterableDataset"]], data_args: "DataArguments", seed: int ) -> Union["Dataset", "IterableDataset"]: if len(all_datasets) == 1: return all_datasets[0] elif data_args.mix_strategy == "concat": if data_args.streaming: logger.warning("The samples between different datasets will not be mixed in streaming mode.") + return concatenate_datasets(all_datasets) elif data_args.mix_strategy.startswith("interleave"): if not data_args.streaming: logger.warning("We recommend using `mix_strategy=concat` in non-streaming mode.") + return interleave_datasets( datasets=all_datasets, probabilities=data_args.interleave_probs, - seed=training_args.seed, + seed=seed, stopping_strategy="first_exhausted" if data_args.mix_strategy.endswith("under") else "all_exhausted", ) else: @@ -67,22 +71,17 @@ def merge_dataset( def split_dataset( - dataset: Union["Dataset", "IterableDataset"], data_args: "DataArguments", training_args: "Seq2SeqTrainingArguments" -) -> Dict[str, "Dataset"]: - if training_args.do_train: - if data_args.val_size > 1e-6: # Split the dataset - if data_args.streaming: - dataset = dataset.shuffle(buffer_size=data_args.buffer_size, seed=training_args.seed) - val_set = dataset.take(int(data_args.val_size)) - train_set = dataset.skip(int(data_args.val_size)) - return {"train_dataset": train_set, "eval_dataset": val_set} - else: - val_size = int(data_args.val_size) if data_args.val_size > 1 else data_args.val_size - dataset = dataset.train_test_split(test_size=val_size, seed=training_args.seed) - return {"train_dataset": dataset["train"], "eval_dataset": dataset["test"]} - else: - if data_args.streaming: - dataset = dataset.shuffle(buffer_size=data_args.buffer_size, seed=training_args.seed) - return {"train_dataset": dataset} - else: # do_eval or do_predict - return {"eval_dataset": dataset} + dataset: Union["Dataset", "IterableDataset"], data_args: "DataArguments", seed: int +) -> "DatasetDict": + r""" + Splits the dataset and returns a dataset dict containing train set (required) and validation set (optional). + """ + if data_args.streaming: + dataset = dataset.shuffle(buffer_size=data_args.buffer_size, seed=seed) + val_set = dataset.take(int(data_args.val_size)) + train_set = dataset.skip(int(data_args.val_size)) + return DatasetDict({"train": train_set, "validation": val_set}) + else: + val_size = int(data_args.val_size) if data_args.val_size > 1 else data_args.val_size + dataset = dataset.train_test_split(test_size=val_size, seed=seed) + return DatasetDict({"train": dataset["train"], "validation": dataset["test"]})