[test] add allreduce test on npu (#9619)

Co-authored-by: frozenleaves <frozen@Mac.local>
This commit is contained in:
浮梦
2025-12-16 21:33:30 +08:00
committed by GitHub
parent a0179772ab
commit 18c21bce5a
20 changed files with 419 additions and 70 deletions

View File

@@ -21,4 +21,4 @@ style:
ruff format $(check_dirs) ruff format $(check_dirs)
test: test:
CUDA_VISIBLE_DEVICES= ASCEND_RT_VISIBLE_DEVICES=0 WANDB_DISABLED=true pytest -vv --import-mode=importlib tests/ tests_v1/ WANDB_DISABLED=true pytest -vv --import-mode=importlib tests/ tests_v1/

View File

@@ -20,7 +20,6 @@ from transformers import AutoModelForCausalLM
from trl import AutoModelForCausalLMWithValueHead from trl import AutoModelForCausalLMWithValueHead
from ..data import get_dataset, get_template_and_fix_tokenizer from ..data import get_dataset, get_template_and_fix_tokenizer
from ..extras.misc import get_current_device
from ..hparams import get_infer_args, get_train_args from ..hparams import get_infer_args, get_train_args
from ..model import load_model, load_tokenizer from ..model import load_model, load_tokenizer
@@ -81,17 +80,16 @@ def load_reference_model(
is_trainable: bool = False, is_trainable: bool = False,
add_valuehead: bool = False, add_valuehead: bool = False,
) -> Union["PreTrainedModel", "LoraModel"]: ) -> Union["PreTrainedModel", "LoraModel"]:
current_device = get_current_device()
if add_valuehead: if add_valuehead:
model: AutoModelForCausalLMWithValueHead = AutoModelForCausalLMWithValueHead.from_pretrained( model: AutoModelForCausalLMWithValueHead = AutoModelForCausalLMWithValueHead.from_pretrained(
model_path, torch_dtype=torch.float16, device_map=current_device model_path, torch_dtype=torch.float16, device_map="auto"
) )
if not is_trainable: if not is_trainable:
model.v_head = model.v_head.to(torch.float16) model.v_head = model.v_head.to(torch.float16)
return model return model
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16, device_map=current_device) model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16, device_map="auto")
if use_lora or use_pissa: if use_lora or use_pissa:
model = PeftModel.from_pretrained( model = PeftModel.from_pretrained(
model, lora_path, subfolder="pissa_init" if use_pissa else None, is_trainable=is_trainable model, lora_path, subfolder="pissa_init" if use_pissa else None, is_trainable=is_trainable

View File

@@ -103,6 +103,36 @@ def is_torch_xpu_available():
return get_current_accelerator().type == DeviceType.XPU return get_current_accelerator().type == DeviceType.XPU
def get_current_device() -> "torch.device":
r"""Get the current available device."""
if is_torch_xpu_available():
device = "xpu:{}".format(os.getenv("LOCAL_RANK", "0"))
elif is_torch_npu_available():
device = "npu:{}".format(os.getenv("LOCAL_RANK", "0"))
elif is_torch_mps_available():
device = "mps:{}".format(os.getenv("LOCAL_RANK", "0"))
elif is_torch_cuda_available():
device = "cuda:{}".format(os.getenv("LOCAL_RANK", "0"))
else:
device = "cpu"
return torch.device(device)
def get_device_count() -> int:
r"""Get the number of available devices."""
if is_torch_xpu_available():
return torch.xpu.device_count()
elif is_torch_npu_available():
return torch.npu.device_count()
elif is_torch_mps_available():
return torch.mps.device_count()
elif is_torch_cuda_available():
return torch.cuda.device_count()
else:
return 0
def all_gather(tensor: Tensor, group: Optional[ProcessGroup] = None) -> Tensor: def all_gather(tensor: Tensor, group: Optional[ProcessGroup] = None) -> Tensor:
"""Gathers the tensor from all ranks and concats them along the first dim.""" """Gathers the tensor from all ranks and concats them along the first dim."""
world_size = get_world_size() world_size = get_world_size()

View File

@@ -0,0 +1,34 @@
# Copyright 2025 the LlamaFactory team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import socket
def find_available_port() -> int:
r"""Find an available port on the local machine."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
return port
def is_env_enabled(env_var: str, default: str = "0") -> bool:
r"""Check if the environment variable is enabled."""
return os.getenv(env_var, default).lower() in ["true", "y", "1"]
if __name__ == "__main__":
print(find_available_port())

View File

@@ -17,15 +17,18 @@
Contains shared fixtures, pytest configuration, and custom markers. Contains shared fixtures, pytest configuration, and custom markers.
""" """
import os
import pytest import pytest
from pytest import Config, Item from pytest import Config, Item
from llamafactory.extras.misc import get_current_device, is_env_enabled from llamafactory.extras.misc import get_current_device, get_device_count, is_env_enabled
from llamafactory.extras.packages import is_transformers_version_greater_than
from llamafactory.train.test_utils import patch_valuehead_model from llamafactory.train.test_utils import patch_valuehead_model
try: try:
CURRENT_DEVICE = get_current_device().type CURRENT_DEVICE = get_current_device().type # cpu | cuda | npu
except Exception: except Exception:
CURRENT_DEVICE = "cpu" CURRENT_DEVICE = "cpu"
@@ -33,46 +36,36 @@ except Exception:
def pytest_configure(config: Config): def pytest_configure(config: Config):
"""Register custom pytest markers.""" """Register custom pytest markers."""
config.addinivalue_line( config.addinivalue_line(
"markers", "slow: marks tests as slow (deselect with '-m \"not slow\"' or set RUN_SLOW=1 to run)" "markers",
"slow: marks tests as slow (deselect with '-m \"not slow\"' or set RUN_SLOW=1 to run)",
)
config.addinivalue_line(
"markers",
"runs_on: test requires specific device type, e.g., @pytest.mark.runs_on(['cuda'])",
)
config.addinivalue_line(
"markers",
"require_distributed(num_devices): allow multi-device execution (default: 2)",
) )
config.addinivalue_line("markers", "runs_on: test requires specific device, e.g., @pytest.mark.runs_on(['cpu'])")
def _handle_runs_on(items: list[Item]): def _handle_runs_on(items: list[Item]):
"""Skip tests on specified devices based on runs_on marker. """Skip tests on specified device TYPES (cpu/cuda/npu)."""
Usage:
# Skip tests on specified devices
@pytest.mark.runs_on(['cpu'])
def test_something():
pass
"""
for item in items: for item in items:
runs_on_marker = item.get_closest_marker("runs_on") marker = item.get_closest_marker("runs_on")
if runs_on_marker: if not marker:
runs_on_devices = runs_on_marker.args[0] continue
# Compatibility handling: Allow a single string instead of a list devices = marker.args[0]
# Example: @pytest.mark.("cpu") if isinstance(devices, str):
if isinstance(runs_on_devices, str): devices = [devices]
runs_on_devices = [runs_on_devices]
if CURRENT_DEVICE not in runs_on_devices: if CURRENT_DEVICE not in devices:
item.add_marker( item.add_marker(pytest.mark.skip(reason=f"test requires one of {devices} (current: {CURRENT_DEVICE})"))
pytest.mark.skip(reason=f"test requires one of {runs_on_devices} (current: {CURRENT_DEVICE})")
)
def _handle_slow_tests(items: list[Item]): def _handle_slow_tests(items: list[Item]):
"""Skip slow tests unless RUN_SLOW environment variable is set. """Skip slow tests unless RUN_SLOW is enabled."""
Usage:
# Skip slow tests (default)
@pytest.mark.slow
# Run slow tests
RUN_SLOW=1 pytest tests/
"""
if not is_env_enabled("RUN_SLOW", "0"): if not is_env_enabled("RUN_SLOW", "0"):
skip_slow = pytest.mark.skip(reason="slow test (set RUN_SLOW=1 to run)") skip_slow = pytest.mark.skip(reason="slow test (set RUN_SLOW=1 to run)")
for item in items: for item in items:
@@ -80,10 +73,82 @@ def _handle_slow_tests(items: list[Item]):
item.add_marker(skip_slow) item.add_marker(skip_slow)
def _get_visible_devices_env():
"""Return device visibility env var name."""
if CURRENT_DEVICE == "cuda":
return "CUDA_VISIBLE_DEVICES"
if CURRENT_DEVICE == "npu":
return "ASCEND_RT_VISIBLE_DEVICES"
return None
def _handle_device_visibility(items: list[Item]):
"""Handle device visibility based on test markers."""
env_key = _get_visible_devices_env()
if env_key is None or CURRENT_DEVICE == "cpu":
return
# Parse visible devices
visible_devices_env = os.environ.get(env_key)
if visible_devices_env is None:
available = get_device_count()
else:
visible_devices = [v for v in visible_devices_env.split(",") if v != ""]
available = len(visible_devices)
for item in items:
marker = item.get_closest_marker("require_distributed")
if not marker:
continue
required = marker.args[0] if marker.args else 2
if available < required:
item.add_marker(pytest.mark.skip(reason=f"test requires {required} devices, but only {available} visible"))
def pytest_collection_modifyitems(config: Config, items: list[Item]): def pytest_collection_modifyitems(config: Config, items: list[Item]):
"""Modify test collection based on markers and environment.""" """Modify test collection based on markers and environment."""
# Handle version compatibility (from HEAD)
if not is_transformers_version_greater_than("4.57.0"):
skip_bc = pytest.mark.skip(reason="Skip backward compatibility tests")
for item in items:
if "tests_v1" in str(item.fspath):
item.add_marker(skip_bc)
_handle_slow_tests(items) _handle_slow_tests(items)
_handle_runs_on(items) _handle_runs_on(items)
_handle_device_visibility(items)
@pytest.fixture(autouse=True)
def _manage_distributed_env(request, monkeypatch):
"""Set environment variables for distributed tests if specific devices are requested."""
env_key = _get_visible_devices_env()
if not env_key:
return
# Save old environment for logic checks, monkeypatch handles restoration
old_value = os.environ.get(env_key)
marker = request.node.get_closest_marker("require_distributed")
if marker:
# Distributed test
required = marker.args[0] if marker.args else 2
specific_devices = marker.args[1] if len(marker.args) > 1 else None
if specific_devices:
devices_str = ",".join(map(str, specific_devices))
else:
devices_str = ",".join(str(i) for i in range(required))
monkeypatch.setenv(env_key, devices_str)
else:
# Non-distributed test
if old_value:
visible_devices = [v for v in old_value.split(",") if v != ""]
monkeypatch.setenv(env_key, visible_devices[0] if visible_devices else "0")
else:
monkeypatch.setenv(env_key, "0")
@pytest.fixture @pytest.fixture

View File

@@ -42,7 +42,7 @@ TRAIN_ARGS = {
} }
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
@pytest.mark.parametrize("num_samples", [16]) @pytest.mark.parametrize("num_samples", [16])
def test_feedback_data(num_samples: int): def test_feedback_data(num_samples: int):
train_dataset = load_dataset_module(**TRAIN_ARGS)["train_dataset"] train_dataset = load_dataset_module(**TRAIN_ARGS)["train_dataset"]

View File

@@ -25,7 +25,7 @@ TINY_LLAMA3 = os.getenv("TINY_LLAMA3", "llamafactory/tiny-random-Llama-3")
UNUSED_TOKEN = "<|UNUSED_TOKEN|>" UNUSED_TOKEN = "<|UNUSED_TOKEN|>"
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
@pytest.mark.parametrize("special_tokens", [False, True]) @pytest.mark.parametrize("special_tokens", [False, True])
def test_add_tokens(special_tokens: bool): def test_add_tokens(special_tokens: bool):
if special_tokens: if special_tokens:

View File

@@ -39,7 +39,7 @@ INFER_ARGS = {
} }
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
@pytest.mark.xfail(is_transformers_version_greater_than("4.48"), reason="Attention refactor.") @pytest.mark.xfail(is_transformers_version_greater_than("4.48"), reason="Attention refactor.")
def test_attention(): def test_attention():
attention_available = ["disabled"] attention_available = ["disabled"]

View File

@@ -39,7 +39,7 @@ TRAIN_ARGS = {
} }
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
@pytest.mark.parametrize("disable_gradient_checkpointing", [False, True]) @pytest.mark.parametrize("disable_gradient_checkpointing", [False, True])
def test_vanilla_checkpointing(disable_gradient_checkpointing: bool): def test_vanilla_checkpointing(disable_gradient_checkpointing: bool):
model = load_train_model(disable_gradient_checkpointing=disable_gradient_checkpointing, **TRAIN_ARGS) model = load_train_model(disable_gradient_checkpointing=disable_gradient_checkpointing, **TRAIN_ARGS)
@@ -47,14 +47,14 @@ def test_vanilla_checkpointing(disable_gradient_checkpointing: bool):
assert getattr(module, "gradient_checkpointing") != disable_gradient_checkpointing assert getattr(module, "gradient_checkpointing") != disable_gradient_checkpointing
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_unsloth_gradient_checkpointing(): def test_unsloth_gradient_checkpointing():
model = load_train_model(use_unsloth_gc=True, **TRAIN_ARGS) model = load_train_model(use_unsloth_gc=True, **TRAIN_ARGS)
for module in filter(lambda m: hasattr(m, "gradient_checkpointing"), model.modules()): for module in filter(lambda m: hasattr(m, "gradient_checkpointing"), model.modules()):
assert module._gradient_checkpointing_func.__self__.__name__ == "UnslothGradientCheckpointing" assert module._gradient_checkpointing_func.__self__.__name__ == "UnslothGradientCheckpointing"
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_upcast_layernorm(): def test_upcast_layernorm():
model = load_train_model(upcast_layernorm=True, **TRAIN_ARGS) model = load_train_model(upcast_layernorm=True, **TRAIN_ARGS)
for name, param in model.named_parameters(): for name, param in model.named_parameters():
@@ -62,7 +62,7 @@ def test_upcast_layernorm():
assert param.dtype == torch.float32 assert param.dtype == torch.float32
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_upcast_lmhead_output(): def test_upcast_lmhead_output():
model = load_train_model(upcast_lmhead_output=True, **TRAIN_ARGS) model = load_train_model(upcast_lmhead_output=True, **TRAIN_ARGS)
inputs = torch.randn((1, 16), dtype=torch.float16, device=get_current_device()) inputs = torch.randn((1, 16), dtype=torch.float16, device=get_current_device())

View File

@@ -24,7 +24,7 @@ from llamafactory.model.model_utils.misc import find_expanded_modules
HF_TOKEN = os.getenv("HF_TOKEN") HF_TOKEN = os.getenv("HF_TOKEN")
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
@pytest.mark.skipif(not HF_TOKEN, reason="Gated model.") @pytest.mark.skipif(not HF_TOKEN, reason="Gated model.")
def test_expanded_modules(): def test_expanded_modules():
config = AutoConfig.from_pretrained("meta-llama/Meta-Llama-3-8B-Instruct") config = AutoConfig.from_pretrained("meta-llama/Meta-Llama-3-8B-Instruct")

View File

@@ -18,7 +18,7 @@ import torch
from llamafactory.model.model_utils.packing import get_seqlens_in_batch, get_unpad_data from llamafactory.model.model_utils.packing import get_seqlens_in_batch, get_unpad_data
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
@pytest.mark.parametrize( @pytest.mark.parametrize(
"attention_mask,golden_seq_lens", "attention_mask,golden_seq_lens",
[ [

View File

@@ -23,7 +23,7 @@ from llamafactory.hparams import FinetuningArguments, ModelArguments
from llamafactory.model.adapter import init_adapter from llamafactory.model.adapter import init_adapter
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
@pytest.mark.parametrize("freeze_vision_tower", (False, True)) @pytest.mark.parametrize("freeze_vision_tower", (False, True))
@pytest.mark.parametrize("freeze_multi_modal_projector", (False, True)) @pytest.mark.parametrize("freeze_multi_modal_projector", (False, True))
@pytest.mark.parametrize("freeze_language_model", (False, True)) @pytest.mark.parametrize("freeze_language_model", (False, True))
@@ -49,7 +49,7 @@ def test_visual_full(freeze_vision_tower: bool, freeze_multi_modal_projector: bo
assert param.requires_grad != freeze_language_model assert param.requires_grad != freeze_language_model
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
@pytest.mark.parametrize("freeze_vision_tower,freeze_language_model", ((False, False), (False, True), (True, False))) @pytest.mark.parametrize("freeze_vision_tower,freeze_language_model", ((False, False), (False, True), (True, False)))
def test_visual_lora(freeze_vision_tower: bool, freeze_language_model: bool): def test_visual_lora(freeze_vision_tower: bool, freeze_language_model: bool):
model_args = ModelArguments(model_name_or_path="Qwen/Qwen2-VL-2B-Instruct") model_args = ModelArguments(model_name_or_path="Qwen/Qwen2-VL-2B-Instruct")
@@ -82,7 +82,7 @@ def test_visual_lora(freeze_vision_tower: bool, freeze_language_model: bool):
assert (merger_param_name in trainable_params) is False assert (merger_param_name in trainable_params) is False
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_visual_model_save_load(): def test_visual_model_save_load():
# check VLM's state dict: https://github.com/huggingface/transformers/pull/38385 # check VLM's state dict: https://github.com/huggingface/transformers/pull/38385
model_args = ModelArguments(model_name_or_path="Qwen/Qwen2-VL-2B-Instruct") model_args = ModelArguments(model_name_or_path="Qwen/Qwen2-VL-2B-Instruct")

View File

@@ -30,7 +30,7 @@ INFER_ARGS = {
} }
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_base(): def test_base():
model = load_infer_model(**INFER_ARGS) model = load_infer_model(**INFER_ARGS)
ref_model = load_reference_model(TINY_LLAMA3) ref_model = load_reference_model(TINY_LLAMA3)

View File

@@ -44,7 +44,7 @@ INFER_ARGS = {
} }
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_freeze_train_all_modules(): def test_freeze_train_all_modules():
model = load_train_model(freeze_trainable_layers=1, **TRAIN_ARGS) model = load_train_model(freeze_trainable_layers=1, **TRAIN_ARGS)
for name, param in model.named_parameters(): for name, param in model.named_parameters():
@@ -56,7 +56,7 @@ def test_freeze_train_all_modules():
assert param.dtype == torch.float16 assert param.dtype == torch.float16
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_freeze_train_extra_modules(): def test_freeze_train_extra_modules():
model = load_train_model(freeze_trainable_layers=1, freeze_extra_modules="embed_tokens,lm_head", **TRAIN_ARGS) model = load_train_model(freeze_trainable_layers=1, freeze_extra_modules="embed_tokens,lm_head", **TRAIN_ARGS)
for name, param in model.named_parameters(): for name, param in model.named_parameters():
@@ -68,7 +68,7 @@ def test_freeze_train_extra_modules():
assert param.dtype == torch.float16 assert param.dtype == torch.float16
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_freeze_inference(): def test_freeze_inference():
model = load_infer_model(**INFER_ARGS) model = load_infer_model(**INFER_ARGS)
for param in model.parameters(): for param in model.parameters():

View File

@@ -44,7 +44,7 @@ INFER_ARGS = {
} }
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_full_train(): def test_full_train():
model = load_train_model(**TRAIN_ARGS) model = load_train_model(**TRAIN_ARGS)
for param in model.parameters(): for param in model.parameters():
@@ -52,7 +52,7 @@ def test_full_train():
assert param.dtype == torch.float32 assert param.dtype == torch.float32
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_full_inference(): def test_full_inference():
model = load_infer_model(**INFER_ARGS) model = load_infer_model(**INFER_ARGS)
for param in model.parameters(): for param in model.parameters():

View File

@@ -55,35 +55,35 @@ INFER_ARGS = {
} }
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_lora_train_qv_modules(): def test_lora_train_qv_modules():
model = load_train_model(lora_target="q_proj,v_proj", **TRAIN_ARGS) model = load_train_model(lora_target="q_proj,v_proj", **TRAIN_ARGS)
linear_modules, _ = check_lora_model(model) linear_modules, _ = check_lora_model(model)
assert linear_modules == {"q_proj", "v_proj"} assert linear_modules == {"q_proj", "v_proj"}
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_lora_train_all_modules(): def test_lora_train_all_modules():
model = load_train_model(lora_target="all", **TRAIN_ARGS) model = load_train_model(lora_target="all", **TRAIN_ARGS)
linear_modules, _ = check_lora_model(model) linear_modules, _ = check_lora_model(model)
assert linear_modules == {"q_proj", "k_proj", "v_proj", "o_proj", "up_proj", "gate_proj", "down_proj"} assert linear_modules == {"q_proj", "k_proj", "v_proj", "o_proj", "up_proj", "gate_proj", "down_proj"}
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_lora_train_extra_modules(): def test_lora_train_extra_modules():
model = load_train_model(additional_target="embed_tokens,lm_head", **TRAIN_ARGS) model = load_train_model(additional_target="embed_tokens,lm_head", **TRAIN_ARGS)
_, extra_modules = check_lora_model(model) _, extra_modules = check_lora_model(model)
assert extra_modules == {"embed_tokens", "lm_head"} assert extra_modules == {"embed_tokens", "lm_head"}
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_lora_train_old_adapters(): def test_lora_train_old_adapters():
model = load_train_model(adapter_name_or_path=TINY_LLAMA_ADAPTER, create_new_adapter=False, **TRAIN_ARGS) model = load_train_model(adapter_name_or_path=TINY_LLAMA_ADAPTER, create_new_adapter=False, **TRAIN_ARGS)
ref_model = load_reference_model(TINY_LLAMA3, TINY_LLAMA_ADAPTER, use_lora=True, is_trainable=True) ref_model = load_reference_model(TINY_LLAMA3, TINY_LLAMA_ADAPTER, use_lora=True, is_trainable=True)
compare_model(model, ref_model) compare_model(model, ref_model)
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_lora_train_new_adapters(): def test_lora_train_new_adapters():
model = load_train_model(adapter_name_or_path=TINY_LLAMA_ADAPTER, create_new_adapter=True, **TRAIN_ARGS) model = load_train_model(adapter_name_or_path=TINY_LLAMA_ADAPTER, create_new_adapter=True, **TRAIN_ARGS)
ref_model = load_reference_model(TINY_LLAMA3, TINY_LLAMA_ADAPTER, use_lora=True, is_trainable=True) ref_model = load_reference_model(TINY_LLAMA3, TINY_LLAMA_ADAPTER, use_lora=True, is_trainable=True)
@@ -92,7 +92,7 @@ def test_lora_train_new_adapters():
) )
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
@pytest.mark.usefixtures("fix_valuehead_cpu_loading") @pytest.mark.usefixtures("fix_valuehead_cpu_loading")
def test_lora_train_valuehead(): def test_lora_train_valuehead():
model = load_train_model(add_valuehead=True, **TRAIN_ARGS) model = load_train_model(add_valuehead=True, **TRAIN_ARGS)
@@ -103,7 +103,7 @@ def test_lora_train_valuehead():
assert torch.allclose(state_dict["v_head.summary.bias"], ref_state_dict["v_head.summary.bias"]) assert torch.allclose(state_dict["v_head.summary.bias"], ref_state_dict["v_head.summary.bias"])
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
def test_lora_inference(): def test_lora_inference():
model = load_infer_model(**INFER_ARGS) model = load_infer_model(**INFER_ARGS)
ref_model = load_reference_model(TINY_LLAMA3, TINY_LLAMA_ADAPTER, use_lora=True).merge_and_unload() ref_model = load_reference_model(TINY_LLAMA3, TINY_LLAMA_ADAPTER, use_lora=True).merge_and_unload()

View File

@@ -49,7 +49,7 @@ INFER_ARGS = {
} }
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
@pytest.mark.xfail(reason="PiSSA initialization is not stable in different platform.") @pytest.mark.xfail(reason="PiSSA initialization is not stable in different platform.")
def test_pissa_train(): def test_pissa_train():
model = load_train_model(**TRAIN_ARGS) model = load_train_model(**TRAIN_ARGS)
@@ -57,7 +57,7 @@ def test_pissa_train():
compare_model(model, ref_model) compare_model(model, ref_model)
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
@pytest.mark.xfail(reason="Known connection error.") @pytest.mark.xfail(reason="Known connection error.")
def test_pissa_inference(): def test_pissa_inference():
model = load_infer_model(**INFER_ARGS) model = load_infer_model(**INFER_ARGS)

View File

@@ -59,7 +59,7 @@ class DataCollatorWithVerbose(DataCollatorWithPadding):
return {k: v[:, :1] for k, v in batch.items()} # truncate input length return {k: v[:, :1] for k, v in batch.items()} # truncate input length
@pytest.mark.runs_on(["cpu", "npu"]) @pytest.mark.runs_on(["cpu", "npu", "cuda"])
@pytest.mark.parametrize("disable_shuffling", [False, True]) @pytest.mark.parametrize("disable_shuffling", [False, True])
def test_shuffle(disable_shuffling: bool): def test_shuffle(disable_shuffling: bool):
model_args, data_args, training_args, finetuning_args, _ = get_train_args( model_args, data_args, training_args, finetuning_args, _ = get_train_args(

View File

@@ -0,0 +1,93 @@
# Copyright 2025 the LlamaFactory team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from llamafactory.v1.accelerator.helper import ReduceOp, all_reduce, is_torch_cuda_available, is_torch_npu_available
from llamafactory.v1.utils.utils import find_available_port
def _dist_worker(rank, world_size):
if is_torch_cuda_available():
backend = "nccl"
device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(rank)
elif is_torch_npu_available():
backend = "hccl"
device = torch.device(f"npu:{rank}")
torch.npu.set_device(rank)
else:
backend = "gloo"
device = torch.device("cpu")
dist.init_process_group(
backend=backend,
rank=rank,
world_size=world_size,
)
# --------------------
# Test all_reduce SUM
# --------------------
y = torch.tensor(rank + 1.0, device=device)
y_sum = all_reduce(y.clone(), op=ReduceOp.SUM)
assert y_sum.item() == 3.0
# --------------------
# Test all_reduce MEAN
# --------------------
y_mean = all_reduce(y.clone(), op=ReduceOp.MEAN)
assert y_mean.item() == pytest.approx(1.5)
# --------------------
# Test all_reduce MAX
# --------------------
y_max = all_reduce(y.clone(), op=ReduceOp.MAX)
assert y_max.item() == 2.0
dist.destroy_process_group()
@pytest.mark.runs_on(["npu", "cuda"])
@pytest.mark.require_distributed(2)
def test_distributed_ops(monkeypatch):
monkeypatch.setenv("MASTER_ADDR", "127.0.0.1")
monkeypatch.setenv("MASTER_PORT", str(find_available_port()))
WORLD_SIZE = 2
mp.spawn(
_dist_worker,
args=(WORLD_SIZE,),
nprocs=WORLD_SIZE,
join=True,
)
@pytest.mark.runs_on(["npu", "cuda"])
@pytest.mark.require_distributed(4)
def test_required_multi():
# test require_distributed mark ok
pass
@pytest.mark.runs_on(["npu", "cuda"])
@pytest.mark.require_distributed(999)
def test_required_invalid():
# test require_distributed mark not ok,
raise RuntimeError(
"this case should not be run, please check whether the require_distributed mark implementation is correct"
)

View File

@@ -12,18 +12,147 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""LLaMA-Factory test configuration.
Contains shared fixtures, pytest configuration, and custom markers.
"""
import os
import pytest import pytest
from pytest import Config, Item from pytest import Config, Item
from llamafactory.train.test_utils import patch_valuehead_model
from llamafactory.v1.accelerator.helper import get_current_device, get_device_count
from llamafactory.v1.utils.packages import is_transformers_version_greater_than from llamafactory.v1.utils.packages import is_transformers_version_greater_than
from llamafactory.v1.utils.utils import is_env_enabled
try:
CURRENT_DEVICE = get_current_device().type # cpu | cuda | npu
except Exception:
CURRENT_DEVICE = "cpu"
def pytest_configure(config: Config):
"""Register custom pytest markers."""
config.addinivalue_line(
"markers",
"slow: marks tests as slow (deselect with '-m \"not slow\"' or set RUN_SLOW=1 to run)",
)
config.addinivalue_line(
"markers",
"runs_on: test requires specific device type, e.g., @pytest.mark.runs_on(['cuda'])",
)
config.addinivalue_line(
"markers",
"require_distributed(num_devices): allow multi-device execution (default: 2)",
)
def _handle_runs_on(items: list[Item]):
"""Skip tests on specified device TYPES (cpu/cuda/npu)."""
for item in items:
marker = item.get_closest_marker("runs_on")
if not marker:
continue
devices = marker.args[0]
if isinstance(devices, str):
devices = [devices]
if CURRENT_DEVICE not in devices:
item.add_marker(pytest.mark.skip(reason=f"test requires one of {devices} (current: {CURRENT_DEVICE})"))
def _handle_slow_tests(items: list[Item]):
"""Skip slow tests unless RUN_SLOW is enabled."""
if not is_env_enabled("RUN_SLOW", "0"):
skip_slow = pytest.mark.skip(reason="slow test (set RUN_SLOW=1 to run)")
for item in items:
if "slow" in item.keywords:
item.add_marker(skip_slow)
def _get_visible_devices_env():
"""Return device visibility env var name."""
if CURRENT_DEVICE == "cuda":
return "CUDA_VISIBLE_DEVICES"
if CURRENT_DEVICE == "npu":
return "ASCEND_RT_VISIBLE_DEVICES"
return None
def _handle_device_visibility(items: list[Item]):
"""Handle device visibility based on test markers."""
env_key = _get_visible_devices_env()
if env_key is None or CURRENT_DEVICE == "cpu":
return
# Parse visible devices
visible_devices_env = os.environ.get(env_key)
if visible_devices_env is None:
available = get_device_count()
else:
visible_devices = [v for v in visible_devices_env.split(",") if v != ""]
available = len(visible_devices)
for item in items:
marker = item.get_closest_marker("require_distributed")
if not marker:
continue
required = marker.args[0] if marker.args else 2
if available < required:
item.add_marker(pytest.mark.skip(reason=f"test requires {required} devices, but only {available} visible"))
def pytest_collection_modifyitems(config: Config, items: list[Item]): def pytest_collection_modifyitems(config: Config, items: list[Item]):
if is_transformers_version_greater_than("4.57.0"): """Modify test collection based on markers and environment."""
# Handle version compatibility (from HEAD)
if not is_transformers_version_greater_than("4.57.0"):
skip_bc = pytest.mark.skip(reason="Skip backward compatibility tests")
for item in items:
if "tests_v1" in str(item.fspath):
item.add_marker(skip_bc)
_handle_slow_tests(items)
_handle_runs_on(items)
_handle_device_visibility(items)
@pytest.fixture(autouse=True)
def _manage_distributed_env(request, monkeypatch):
"""Set environment variables for distributed tests if specific devices are requested."""
env_key = _get_visible_devices_env()
if not env_key:
return return
skip_bc = pytest.mark.skip(reason="Skip backward compatibility tests") # Save old environment for logic checks, monkeypatch handles restoration
old_value = os.environ.get(env_key)
for item in items: marker = request.node.get_closest_marker("require_distributed")
if "tests_v1" in str(item.fspath): if marker:
item.add_marker(skip_bc) # Distributed test
required = marker.args[0] if marker.args else 2
specific_devices = marker.args[1] if len(marker.args) > 1 else None
if specific_devices:
devices_str = ",".join(map(str, specific_devices))
else:
devices_str = ",".join(str(i) for i in range(required))
monkeypatch.setenv(env_key, devices_str)
else:
# Non-distributed test
if old_value:
visible_devices = [v for v in old_value.split(",") if v != ""]
monkeypatch.setenv(env_key, visible_devices[0] if visible_devices else "0")
else:
monkeypatch.setenv(env_key, "0")
@pytest.fixture
def fix_valuehead_cpu_loading():
"""Fix valuehead model loading."""
patch_valuehead_model()