# Copyright 2024 the LlamaFactory team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from dataclasses import dataclass from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Tuple, Union from typing_extensions import override from ..extras import logging from ..extras.misc import check_version from .data_utils import Role from .formatter import EmptyFormatter, FunctionFormatter, StringFormatter, ToolFormatter from .mm_plugin import get_mm_plugin if TYPE_CHECKING: from transformers import PreTrainedTokenizer from ..hparams import DataArguments from .formatter import SLOTS, Formatter from .mm_plugin import BasePlugin from .tool_utils import FunctionCall logger = logging.get_logger(__name__) @dataclass class Template: format_user: "Formatter" format_assistant: "Formatter" format_system: "Formatter" format_function: "Formatter" format_observation: "Formatter" format_tools: "Formatter" format_prefix: "Formatter" default_system: str stop_words: List[str] efficient_eos: bool replace_eos: bool replace_jinja_template: bool mm_plugin: "BasePlugin" def encode_oneturn( self, tokenizer: "PreTrainedTokenizer", messages: Sequence[Dict[str, str]], system: Optional[str] = None, tools: Optional[str] = None, ) -> Tuple[List[int], List[int]]: r""" Returns a single pair of token ids representing prompt and response respectively. """ encoded_messages = self._encode(tokenizer, messages, system, tools) prompt_ids = [] for encoded_ids in encoded_messages[:-1]: prompt_ids += encoded_ids answer_ids = encoded_messages[-1] return prompt_ids, answer_ids def encode_multiturn( self, tokenizer: "PreTrainedTokenizer", messages: Sequence[Dict[str, str]], system: Optional[str] = None, tools: Optional[str] = None, ) -> List[Tuple[List[int], List[int]]]: r""" Returns multiple pairs of token ids representing prompts and responses respectively. """ encoded_messages = self._encode(tokenizer, messages, system, tools) return [(encoded_messages[i], encoded_messages[i + 1]) for i in range(0, len(encoded_messages), 2)] def extract_tool(self, content: str) -> Union[str, List["FunctionCall"]]: r""" Extracts tool message. """ return self.format_tools.extract(content) def _encode( self, tokenizer: "PreTrainedTokenizer", messages: Sequence[Dict[str, str]], system: Optional[str], tools: Optional[str], ) -> List[List[int]]: r""" Encodes formatted inputs to pairs of token ids. Turn 0: prefix + system + query resp Turn t: sep + query resp """ system = system or self.default_system encoded_messages = [] for i, message in enumerate(messages): elements = [] if i == 0: elements += self.format_prefix.apply() if system or tools: tool_text = self.format_tools.apply(content=tools)[0] if tools else "" elements += self.format_system.apply(content=(system + tool_text)) if message["role"] == Role.USER.value: elements += self.format_user.apply(content=message["content"], idx=str(i // 2)) elif message["role"] == Role.ASSISTANT.value: elements += self.format_assistant.apply(content=message["content"]) elif message["role"] == Role.OBSERVATION.value: elements += self.format_observation.apply(content=message["content"]) elif message["role"] == Role.FUNCTION.value: elements += self.format_function.apply(content=message["content"]) else: raise NotImplementedError("Unexpected role: {}".format(message["role"])) encoded_messages.append(self._convert_elements_to_ids(tokenizer, elements)) return encoded_messages def _convert_elements_to_ids(self, tokenizer: "PreTrainedTokenizer", elements: "SLOTS") -> List[int]: r""" Converts elements to token ids. """ token_ids = [] for elem in elements: if isinstance(elem, str): if len(elem) != 0: token_ids += tokenizer.encode(elem, add_special_tokens=False) elif isinstance(elem, dict): token_ids += [tokenizer.convert_tokens_to_ids(elem.get("token"))] elif isinstance(elem, set): if "bos_token" in elem and tokenizer.bos_token_id is not None: token_ids += [tokenizer.bos_token_id] elif "eos_token" in elem and tokenizer.eos_token_id is not None: token_ids += [tokenizer.eos_token_id] else: raise ValueError(f"Input must be string, set[str] or dict[str, str], got {type(elem)}") return token_ids @dataclass class Llama2Template(Template): @override def _encode( self, tokenizer: "PreTrainedTokenizer", messages: Sequence[Dict[str, str]], system: str, tools: str, ) -> List[List[int]]: r""" Encodes formatted inputs to pairs of token ids. Turn 0: prefix + system + query resp Turn t: sep + query resp """ system = system or self.default_system encoded_messages = [] for i, message in enumerate(messages): elements = [] system_text = "" if i == 0: elements += self.format_prefix.apply() if system or tools: tool_text = self.format_tools.apply(content=tools)[0] if tools else "" system_text = self.format_system.apply(content=(system + tool_text))[0] if message["role"] == Role.USER.value: elements += self.format_user.apply(content=system_text + message["content"]) elif message["role"] == Role.ASSISTANT.value: elements += self.format_assistant.apply(content=message["content"]) elif message["role"] == Role.OBSERVATION.value: elements += self.format_observation.apply(content=message["content"]) elif message["role"] == Role.FUNCTION.value: elements += self.format_function.apply(content=message["content"]) else: raise NotImplementedError("Unexpected role: {}".format(message["role"])) encoded_messages.append(self._convert_elements_to_ids(tokenizer, elements)) return encoded_messages TEMPLATES: Dict[str, "Template"] = {} def _register_template( name: str, format_user: Optional["Formatter"] = None, format_assistant: Optional["Formatter"] = None, format_system: Optional["Formatter"] = None, format_function: Optional["Formatter"] = None, format_observation: Optional["Formatter"] = None, format_tools: Optional["Formatter"] = None, format_prefix: Optional["Formatter"] = None, default_system: str = "", stop_words: Sequence[str] = [], efficient_eos: bool = False, replace_eos: bool = False, replace_jinja_template: bool = False, mm_plugin: "BasePlugin" = get_mm_plugin(name="base"), ) -> None: r""" Registers a chat template. To add the following chat template: ``` user prompt here model response here user prompt here model response here ``` The corresponding code should be: ``` _register_template( name="custom", format_user=StringFormatter(slots=["{{content}}\n"]), format_assistant=StringFormatter(slots=["{{content}}\n"]), format_prefix=EmptyFormatter(""), ) ``` """ template_class = Llama2Template if any(k in name for k in ("llama2", "mistral", "pixtral")) else Template default_slots = ["{{content}}"] if efficient_eos else ["{{content}}", {"eos_token"}] default_user_formatter = StringFormatter(slots=["{{content}}"]) default_assistant_formatter = StringFormatter(slots=default_slots) default_function_formatter = FunctionFormatter(slots=default_slots, tool_format="default") default_tool_formatter = ToolFormatter(tool_format="default") default_prefix_formatter = EmptyFormatter() TEMPLATES[name] = template_class( format_user=format_user or default_user_formatter, format_assistant=format_assistant or default_assistant_formatter, format_system=format_system or default_user_formatter, format_function=format_function or default_function_formatter, format_observation=format_observation or format_user or default_user_formatter, format_tools=format_tools or default_tool_formatter, format_prefix=format_prefix or default_prefix_formatter, default_system=default_system, stop_words=stop_words, efficient_eos=efficient_eos, replace_eos=replace_eos, replace_jinja_template=replace_jinja_template, mm_plugin=mm_plugin, ) def _add_or_replace_eos_token(tokenizer: "PreTrainedTokenizer", eos_token: str) -> None: is_added = tokenizer.eos_token_id is None num_added_tokens = tokenizer.add_special_tokens({"eos_token": eos_token}) if is_added: logger.info_rank0(f"Add eos token: {tokenizer.eos_token}") else: logger.info_rank0(f"Replace eos token: {tokenizer.eos_token}") if num_added_tokens > 0: logger.warning_rank0("New tokens have been added, make sure `resize_vocab` is True.") def _jinja_escape(content: str) -> str: return content.replace("'", r"\'") def _convert_slots_to_jinja(slots: "SLOTS", tokenizer: "PreTrainedTokenizer", placeholder: str = "content") -> str: slot_items = [] for slot in slots: if isinstance(slot, str): slot_pieces = slot.split("{{content}}") if slot_pieces[0]: slot_items.append("'" + _jinja_escape(slot_pieces[0]) + "'") if len(slot_pieces) > 1: slot_items.append(placeholder) if slot_pieces[1]: slot_items.append("'" + _jinja_escape(slot_pieces[1]) + "'") elif isinstance(slot, set): # do not use {{ eos_token }} since it may be replaced if "bos_token" in slot and tokenizer.bos_token_id is not None: slot_items.append("'" + tokenizer.bos_token + "'") elif "eos_token" in slot and tokenizer.eos_token_id is not None: slot_items.append("'" + tokenizer.eos_token + "'") elif isinstance(slot, dict): raise ValueError("Dict is not supported.") return " + ".join(slot_items) def _get_jinja_template(template: "Template", tokenizer: "PreTrainedTokenizer") -> str: r""" Returns the jinja template. """ jinja_template = "" prefix = _convert_slots_to_jinja(template.format_prefix.apply(), tokenizer) if prefix: jinja_template += "{{ " + prefix + " }}" if template.default_system: jinja_template += "{% set system_message = '" + _jinja_escape(template.default_system) + "' %}" jinja_template += ( "{% if messages[0]['role'] == 'system' %}{% set loop_messages = messages[1:] %}" "{% set system_message = messages[0]['content'] %}{% else %}{% set loop_messages = messages %}{% endif %}" ) system_message = _convert_slots_to_jinja(template.format_system.apply(), tokenizer, placeholder="system_message") if not isinstance(template, Llama2Template): jinja_template += "{% if system_message is defined %}{{ " + system_message + " }}{% endif %}" jinja_template += "{% for message in loop_messages %}" jinja_template += "{% set content = message['content'] %}" if isinstance(template, Llama2Template): jinja_template += "{% if loop.index0 == 0 and system_message is defined %}" jinja_template += "{% set content = " + system_message + " + message['content'] %}" jinja_template += "{% endif %}" jinja_template += "{% if message['role'] == 'user' %}" user_message = _convert_slots_to_jinja(template.format_user.apply(), tokenizer) jinja_template += "{{ " + user_message + " }}" jinja_template += "{% elif message['role'] == 'assistant' %}" assistant_message = _convert_slots_to_jinja(template.format_assistant.apply(), tokenizer) jinja_template += "{{ " + assistant_message + " }}" jinja_template += "{% endif %}" jinja_template += "{% endfor %}" return jinja_template def get_template_and_fix_tokenizer(tokenizer: "PreTrainedTokenizer", data_args: "DataArguments") -> "Template": r""" Gets chat template and fixes the tokenizer. """ if data_args.template is None: template = TEMPLATES["empty"] # placeholder else: template = TEMPLATES.get(data_args.template, None) if template is None: raise ValueError(f"Template {data_args.template} does not exist.") if template.mm_plugin.__class__.__name__ != "BasePlugin": check_version("transformers>=4.45.0") if data_args.train_on_prompt and template.efficient_eos: raise ValueError("Current template does not support `train_on_prompt`.") if data_args.tool_format is not None: logger.info_rank0(f"Using tool format: {data_args.tool_format}.") default_slots = ["{{content}}"] if template.efficient_eos else ["{{content}}", {"eos_token"}] template.format_function = FunctionFormatter(slots=default_slots, tool_format=data_args.tool_format) template.format_tools = ToolFormatter(tool_format=data_args.tool_format) stop_words = template.stop_words if template.replace_eos: if not stop_words: raise ValueError("Stop words are required to replace the EOS token.") _add_or_replace_eos_token(tokenizer, eos_token=stop_words[0]) stop_words = stop_words[1:] if tokenizer.eos_token_id is None: _add_or_replace_eos_token(tokenizer, eos_token="<|endoftext|>") if tokenizer.pad_token_id is None: tokenizer.pad_token = tokenizer.eos_token logger.info_rank0(f"Add pad token: {tokenizer.pad_token}") if stop_words: num_added_tokens = tokenizer.add_special_tokens( dict(additional_special_tokens=stop_words), replace_additional_special_tokens=False ) logger.info_rank0("Add {} to stop words.".format(",".join(stop_words))) if num_added_tokens > 0: logger.warning_rank0("New tokens have been added, make sure `resize_vocab` is True.") if tokenizer.chat_template is None or template.replace_jinja_template: try: tokenizer.chat_template = _get_jinja_template(template, tokenizer) except ValueError as e: logger.info_rank0(f"Cannot add this chat template to tokenizer: {e}.") return template _register_template( name="alpaca", format_user=StringFormatter(slots=["### Instruction:\n{{content}}\n\n### Response:\n"]), format_assistant=StringFormatter(slots=["{{content}}", {"eos_token"}, "\n\n"]), default_system=( "Below is an instruction that describes a task. " "Write a response that appropriately completes the request.\n\n" ), replace_jinja_template=True, ) _register_template( name="aquila", format_user=StringFormatter(slots=["Human: {{content}}###Assistant:"]), format_assistant=StringFormatter(slots=["{{content}}###"]), format_system=StringFormatter(slots=["System: {{content}}###"]), default_system=( "A chat between a curious human and an artificial intelligence assistant. " "The assistant gives helpful, detailed, and polite answers to the human's questions." ), stop_words=[""], ) _register_template( name="atom", format_user=StringFormatter( slots=[{"bos_token"}, "Human: {{content}}\n", {"eos_token"}, {"bos_token"}, "Assistant:"] ), format_assistant=StringFormatter(slots=["{{content}}\n", {"eos_token"}]), ) _register_template( name="baichuan", format_user=StringFormatter(slots=[{"token": ""}, "{{content}}", {"token": ""}]), efficient_eos=True, ) _register_template( name="baichuan2", format_user=StringFormatter(slots=["{{content}}"]), efficient_eos=True, ) _register_template( name="belle", format_user=StringFormatter(slots=["Human: {{content}}\n\nBelle: "]), format_assistant=StringFormatter(slots=["{{content}}", {"eos_token"}, "\n\n"]), format_prefix=EmptyFormatter(slots=[{"bos_token"}]), ) _register_template( name="bluelm", format_user=StringFormatter(slots=[{"token": "[|Human|]:"}, "{{content}}", {"token": "[|AI|]:"}]), ) _register_template( name="breeze", format_user=StringFormatter(slots=["[INST] {{content}} [/INST] "]), format_prefix=EmptyFormatter(slots=[{"bos_token"}]), efficient_eos=True, ) _register_template( name="chatglm2", format_user=StringFormatter(slots=["[Round {{idx}}]\n\n问:{{content}}\n\n答:"]), format_prefix=EmptyFormatter(slots=[{"token": "[gMASK]"}, {"token": "sop"}]), efficient_eos=True, ) _register_template( name="chatglm3", format_user=StringFormatter(slots=[{"token": "<|user|>"}, "\n", "{{content}}", {"token": "<|assistant|>"}]), format_assistant=StringFormatter(slots=["\n", "{{content}}"]), format_system=StringFormatter(slots=[{"token": "<|system|>"}, "\n", "{{content}}"]), format_function=FunctionFormatter(slots=["{{content}}"], tool_format="glm4"), format_observation=StringFormatter( slots=[{"token": "<|observation|>"}, "\n", "{{content}}", {"token": "<|assistant|>"}] ), format_tools=ToolFormatter(tool_format="glm4"), format_prefix=EmptyFormatter(slots=[{"token": "[gMASK]"}, {"token": "sop"}]), stop_words=["<|user|>", "<|observation|>"], efficient_eos=True, ) _register_template( name="chatml", format_user=StringFormatter(slots=["<|im_start|>user\n{{content}}<|im_end|>\n<|im_start|>assistant\n"]), format_assistant=StringFormatter(slots=["{{content}}<|im_end|>\n"]), format_system=StringFormatter(slots=["<|im_start|>system\n{{content}}<|im_end|>\n"]), format_observation=StringFormatter(slots=["<|im_start|>tool\n{{content}}<|im_end|>\n<|im_start|>assistant\n"]), stop_words=["<|im_end|>", "<|im_start|>"], replace_eos=True, replace_jinja_template=True, ) # copied from chatml template _register_template( name="chatml_de", format_user=StringFormatter(slots=["<|im_start|>user\n{{content}}<|im_end|>\n<|im_start|>assistant\n"]), format_assistant=StringFormatter(slots=["{{content}}<|im_end|>\n"]), format_system=StringFormatter(slots=["<|im_start|>system\n{{content}}<|im_end|>\n"]), format_observation=StringFormatter(slots=["<|im_start|>tool\n{{content}}<|im_end|>\n<|im_start|>assistant\n"]), default_system="Du bist ein freundlicher und hilfsbereiter KI-Assistent.", stop_words=["<|im_end|>", "<|im_start|>"], replace_eos=True, replace_jinja_template=True, ) _register_template( name="codegeex2", format_prefix=EmptyFormatter(slots=[{"token": "[gMASK]"}, {"token": "sop"}]), ) _register_template( name="codegeex4", format_user=StringFormatter(slots=["<|user|>\n{{content}}<|assistant|>\n"]), format_system=StringFormatter(slots=["<|system|>\n{{content}}"]), format_function=FunctionFormatter(slots=["{{content}}"], tool_format="glm4"), format_observation=StringFormatter(slots=["<|observation|>\n{{content}}<|assistant|>\n"]), format_tools=ToolFormatter(tool_format="glm4"), format_prefix=EmptyFormatter(slots=["[gMASK]"]), default_system=( "你是一位智能编程助手,你叫CodeGeeX。你会为用户回答关于编程、代码、计算机方面的任何问题," "并提供格式规范、可以执行、准确安全的代码,并在必要时提供详细的解释。" ), stop_words=["<|user|>", "<|observation|>"], efficient_eos=True, ) _register_template( name="cohere", format_user=StringFormatter( slots=[ ( "<|START_OF_TURN_TOKEN|><|USER_TOKEN|>{{content}}<|END_OF_TURN_TOKEN|>" "<|START_OF_TURN_TOKEN|><|CHATBOT_TOKEN|>" ) ] ), format_system=StringFormatter(slots=["<|START_OF_TURN_TOKEN|><|SYSTEM_TOKEN|>{{content}}<|END_OF_TURN_TOKEN|>"]), format_prefix=EmptyFormatter(slots=[{"bos_token"}]), ) _register_template( name="cpm", format_user=StringFormatter(slots=["<用户>{{content}}"]), format_prefix=EmptyFormatter(slots=[{"bos_token"}]), ) # copied from chatml template _register_template( name="cpm3", format_user=StringFormatter(slots=["<|im_start|>user\n{{content}}<|im_end|>\n<|im_start|>assistant\n"]), format_assistant=StringFormatter(slots=["{{content}}<|im_end|>\n"]), format_system=StringFormatter(slots=["<|im_start|>system\n{{content}}<|im_end|>\n"]), format_prefix=EmptyFormatter(slots=[{"bos_token"}]), stop_words=["<|im_end|>"], ) _register_template( name="cpm_o", format_user=StringFormatter(slots=["<|im_start|>user\n{{content}}<|im_end|>\n<|im_start|>assistant\n"]), format_system=StringFormatter(slots=["<|im_start|>system\n{{content}}<|im_end|>\n"]), format_function=FunctionFormatter(slots=["{{content}}", "<|im_end|>"], tool_format="qwen"), format_observation=StringFormatter( slots=["<|im_start|>user\n\n{{content}}\n<|im_end|>\n<|im_start|>assistant\n"] ), format_tools=ToolFormatter(tool_format="qwen"), format_separator=EmptyFormatter(slots=["\n"]), default_system="You are a helpful assistant.", stop_words=["<|im_end|>"], mm_plugin=get_mm_plugin(name="cpm_o", image_token="", video_token="