From 54725473216937fb68b75649f8928c690d39ea2d Mon Sep 17 00:00:00 2001 From: Kyle Schwab Date: Sat, 16 Aug 2025 13:19:24 -0600 Subject: [PATCH 1/4] Initial commit. --- pydantic_settings/main.py | 2 +- pydantic_settings/sources/providers/cli.py | 439 +++++++++++++-------- pydantic_settings/sources/utils.py | 10 +- tests/test_source_cli.py | 14 +- 4 files changed, 293 insertions(+), 172 deletions(-) diff --git a/pydantic_settings/main.py b/pydantic_settings/main.py index 4df690ce..e4b61ca1 100644 --- a/pydantic_settings/main.py +++ b/pydantic_settings/main.py @@ -645,4 +645,4 @@ def serialize(model: PydanticModel) -> list[str]: """ base_settings_cls = CliApp._get_base_settings_cls(type(model)) - return CliSettingsSource._serialized_args(model, base_settings_cls.model_config) + return CliSettingsSource[Any](base_settings_cls)._serialized_args(model) diff --git a/pydantic_settings/sources/providers/cli.py b/pydantic_settings/sources/providers/cli.py index 93e0ba42..4848569c 100644 --- a/pydantic_settings/sources/providers/cli.py +++ b/pydantic_settings/sources/providers/cli.py @@ -18,6 +18,7 @@ from collections import defaultdict from collections.abc import Mapping, Sequence from enum import Enum +from functools import cached_property from textwrap import dedent from types import SimpleNamespace from typing import ( @@ -35,7 +36,7 @@ ) import typing_extensions -from pydantic import AliasChoices, AliasPath, BaseModel, Field, create_model +from pydantic import AliasChoices, AliasPath, BaseModel, Field, PrivateAttr from pydantic._internal._repr import Representation from pydantic._internal._utils import is_model_class from pydantic.dataclasses import is_pydantic_dataclass @@ -48,6 +49,7 @@ from ...exceptions import SettingsError from ...utils import _lenient_issubclass, _WithArgsTypes from ..types import ( + ForceDecode, NoDecode, PydanticModel, _CliExplicitFlag, @@ -86,6 +88,140 @@ class CliMutuallyExclusiveGroup(BaseModel): pass +class _CliArg(BaseModel): + model: Any + field_name: str + arg_prefix: str + case_sensitive: bool + hide_none_type: bool + kebab_case: bool + enable_decoding: Optional[bool] + env_prefix_len: int + args: list[str] = [] + kwargs: dict[str, Any] = {} + + _alias_names: tuple[str, ...] = PrivateAttr(()) + _alias_paths: dict[str, Optional[int]] = PrivateAttr({}) + _is_alias_path_only: bool = PrivateAttr(False) + _field_info: FieldInfo = PrivateAttr() + + def __init__( + self, + field_info: FieldInfo, + parser_map: defaultdict[str | FieldInfo, dict[Optional[int] | str, _CliArg]], + **values: Any, + ) -> None: + super().__init__(**values) + self._field_info = field_info + self._alias_names, self._is_alias_path_only = _get_alias_names( + self.field_name, self.field_info, alias_path_args=self._alias_paths, case_sensitive=self.case_sensitive + ) + + alias_path_dests = {f'{self.arg_prefix}{name}': index for name, index in self._alias_paths.items()} + if self.subcommand_dest: + for sub_model in self.sub_models: + subcommand_alias = self.subcommand_alias(sub_model) + parser_map[self.subcommand_dest][subcommand_alias] = self.model_copy(update={'args': [], 'kwargs': {}}) + parser_map[self.field_info][subcommand_alias] = parser_map[self.subcommand_dest][subcommand_alias] + elif self.dest not in alias_path_dests: + parser_map[self.dest][None] = self + parser_map[self.field_info][None] = parser_map[self.dest][None] + for alias_path_dest, index in alias_path_dests.items(): + parser_map[alias_path_dest][index] = self.model_copy(update={'args': [], 'kwargs': {}}) + parser_map[self.field_info][index] = parser_map[alias_path_dest][index] + + @classmethod + def get_kebab_case(cls, name: str, kebab_case: Optional[bool]) -> str: + return name.replace('_', '-') if kebab_case else name + + def subcommand_alias(self, sub_model: type[BaseModel]) -> str: + return self.get_kebab_case( + sub_model.__name__ if len(self.sub_models) > 1 else self.preferred_alias, self.kebab_case + ) + + @cached_property + def field_info(self) -> FieldInfo: + return self._field_info + + @cached_property + def subcommand_dest(self) -> Optional[str]: + return f'{self.arg_prefix}:subcommand' if _CliSubCommand in self.field_info.metadata else None + + @cached_property + def dest(self) -> str: + if ( + not self.subcommand_dest + and self.arg_prefix + and self.field_info.validation_alias is not None + and not self.is_parser_submodel + ): + # Strip prefix if validation alias is set and value is not complex. + # Related https://github.com/pydantic/pydantic-settings/pull/25 + return f'{self.arg_prefix}{self.preferred_alias}'[self.env_prefix_len :] + return f'{self.arg_prefix}{self.preferred_alias}' + + @cached_property + def preferred_arg_name(self) -> str: + return self.args[0].replace('_', '-') if self.kebab_case else self.args[0] + + @cached_property + def sub_models(self) -> list[type[BaseModel]]: + field_types: tuple[Any, ...] = ( + (self.field_info.annotation,) + if not get_args(self.field_info.annotation) + else get_args(self.field_info.annotation) + ) + if self.hide_none_type: + field_types = tuple([type_ for type_ in field_types if type_ is not type(None)]) + + sub_models: list[type[BaseModel]] = [] + for type_ in field_types: + if _annotation_contains_types(type_, (_CliSubCommand,), is_include_origin=False): + raise SettingsError( + f'CliSubCommand is not outermost annotation for {self.model.__name__}.{self.field_name}' + ) + elif _annotation_contains_types(type_, (_CliPositionalArg,), is_include_origin=False): + raise SettingsError( + f'CliPositionalArg is not outermost annotation for {self.model.__name__}.{self.field_name}' + ) + if is_model_class(_strip_annotated(type_)) or is_pydantic_dataclass(_strip_annotated(type_)): + sub_models.append(_strip_annotated(type_)) + return sub_models + + @cached_property + def alias_names(self) -> tuple[str, ...]: + return self._alias_names + + @cached_property + def alias_paths(self) -> dict[str, Optional[int]]: + return self._alias_paths + + @cached_property + def preferred_alias(self) -> str: + return self._alias_names[0] + + @cached_property + def is_alias_path_only(self) -> bool: + return self._is_alias_path_only + + @cached_property + def is_append_action(self) -> bool: + return not self.subcommand_dest and _annotation_contains_types( + self.field_info.annotation, (list, set, dict, Sequence, Mapping), is_strip_annotated=True + ) + + @cached_property + def is_parser_submodel(self) -> bool: + return not self.subcommand_dest and bool(self.sub_models) and not self.is_append_action + + @cached_property + def is_no_decode(self) -> bool: + return self.field_info is not None and ( + NoDecode in self.field_info.metadata + or (self.enable_decoding is False and ForceDecode not in self.field_info.metadata) + ) + + T = TypeVar('T') CliSubCommand = Annotated[Union[T, None], _CliSubCommand] CliPositionalArg = Annotated[T, _CliPositionalArg] @@ -355,18 +491,19 @@ def _load_env_vars( parsed_args = vars(parsed_args) selected_subcommands: list[str] = [] - for field_name, val in parsed_args.items(): + for field_name, val in list(parsed_args.items()): if isinstance(val, list): + if self._is_nested_alias_path_only_workaround(parsed_args, field_name, val): + continue parsed_args[field_name] = self._merge_parsed_list(val, field_name) elif field_name.endswith(':subcommand') and val is not None: - subcommand_name = field_name.split(':')[0] + val - subcommand_dest = self._cli_subcommands[field_name][subcommand_name] - selected_subcommands.append(subcommand_dest) + selected_subcommands.append(self._parser_map[field_name][val].dest) - for subcommands in self._cli_subcommands.values(): - for subcommand_dest in subcommands.values(): - if subcommand_dest not in selected_subcommands: - parsed_args[subcommand_dest] = self.cli_parse_none_str + for arg_dest, arg_map in self._parser_map.items(): + if isinstance(arg_dest, str) and arg_dest.endswith(':subcommand'): + for subcommand_dest in [arg.dest for arg in arg_map.values()]: + if subcommand_dest not in selected_subcommands: + parsed_args[subcommand_dest] = self.cli_parse_none_str parsed_args = { key: val @@ -389,6 +526,25 @@ def _load_env_vars( return self + def _is_nested_alias_path_only_workaround( + self, parsed_args: dict[str, list[str] | str], field_name: str, val: list[str] + ) -> bool: + known_arg = self._parser_map[field_name].values() + if not known_arg: + return False + arg = next(iter(known_arg)) + if arg.is_alias_path_only and arg.arg_prefix.endswith('.'): + del parsed_args[field_name] + nested_dest = arg.arg_prefix[:-1] + nested_val = f'"{arg.preferred_alias}": {self._merge_parsed_list(val, field_name)}' + parsed_args[nested_dest] = ( + f'{{{nested_val}}}' + if nested_dest not in parsed_args + else f'{parsed_args[nested_dest][:-1]}, {nested_val}}}' + ) + return True + return False + def _get_merge_parsed_list_types( self, parsed_list: list[str], field_name: str ) -> tuple[Optional[type], Optional[type]]: @@ -408,6 +564,22 @@ def _get_merge_parsed_list_types( return merge_type, inferred_type + def _merged_list_to_str(self, merged_list: list[str], field_name: str) -> str: + decode_list: list[str] = [] + is_use_decode: Optional[bool] = None + cli_arg_map = self._parser_map.get(field_name) + for index, item in enumerate(merged_list): + cli_arg = cli_arg_map.get(index) if cli_arg_map else None + is_decode = cli_arg is None or not cli_arg.is_no_decode + if is_use_decode is None: + is_use_decode = is_decode + elif is_use_decode != is_decode: + raise SettingsError('Mixing Decode and NoDecode across different AliasPath fields is not allowed') + if isinstance(item, str) and is_use_decode: + item = item.replace('\\', '\\\\') + decode_list.append(item) + return f'[{",".join(decode_list)}]' + def _merge_parsed_list(self, parsed_list: list[str], field_name: str) -> str: try: merged_list: list[str] = [] @@ -444,7 +616,7 @@ def _merge_parsed_list(self, parsed_list: list[str], field_name: str) -> str: if merge_type is str: return merged_list[0] elif merge_type is list: - return f'[{",".join(merged_list)}]' + return self._merged_list_to_str(merged_list, field_name) else: merged_dict: dict[str, str] = {} for item in merged_list: @@ -505,23 +677,6 @@ def _consume_string_or_number(self, item: str, merged_list: list[str], merge_typ merged_list.append(json.dumps({key: val})) return item[consumed:] - def _get_sub_models(self, model: type[BaseModel], field_name: str, field_info: FieldInfo) -> list[type[BaseModel]]: - field_types: tuple[Any, ...] = ( - (field_info.annotation,) if not get_args(field_info.annotation) else get_args(field_info.annotation) - ) - if self.cli_hide_none_type: - field_types = tuple([type_ for type_ in field_types if type_ is not type(None)]) - - sub_models: list[type[BaseModel]] = [] - for type_ in field_types: - if _annotation_contains_types(type_, (_CliSubCommand,), is_include_origin=False): - raise SettingsError(f'CliSubCommand is not outermost annotation for {model.__name__}.{field_name}') - elif _annotation_contains_types(type_, (_CliPositionalArg,), is_include_origin=False): - raise SettingsError(f'CliPositionalArg is not outermost annotation for {model.__name__}.{field_name}') - if is_model_class(_strip_annotated(type_)) or is_pydantic_dataclass(_strip_annotated(type_)): - sub_models.append(_strip_annotated(type_)) - return sub_models - def _verify_cli_flag_annotations(self, model: type[BaseModel], field_name: str, field_info: FieldInfo) -> None: if _CliImplicitFlag in field_info.metadata: cli_flag_name = 'CliImplicitFlag' @@ -669,7 +824,7 @@ def _parse_known_args(*args: Any, **kwargs: Any) -> Namespace: self._add_subparsers = self._connect_parser_method(add_subparsers_method, 'add_subparsers_method') self._formatter_class = formatter_class self._cli_dict_args: dict[str, type[Any] | None] = {} - self._cli_subcommands: defaultdict[str, dict[str, str]] = defaultdict(dict) + self._parser_map: defaultdict[str | FieldInfo, dict[Optional[int] | str, _CliArg]] = defaultdict(dict) self._add_parser_args( parser=self.root_parser, model=self.settings_cls, @@ -694,7 +849,7 @@ def _add_parser_args( is_model_suppressed: bool = False, ) -> ArgumentParser: subparsers: Any = None - alias_path_args: dict[str, str] = {} + alias_path_args: dict[str, Optional[int]] = {} # Ignore model default if the default is a model and not a subclass of the current model. model_default = ( None @@ -705,30 +860,40 @@ def _add_parser_args( else model_default ) for field_name, field_info in self._sort_arg_fields(model): - sub_models: list[type[BaseModel]] = self._get_sub_models(model, field_name, field_info) - alias_names, is_alias_path_only = _get_alias_names( - field_name, field_info, alias_path_args=alias_path_args, case_sensitive=self.case_sensitive + arg = _CliArg( + field_info=field_info, + parser_map=self._parser_map, + model=model, + field_name=field_name, + arg_prefix=arg_prefix, + case_sensitive=self.case_sensitive, + hide_none_type=self.cli_hide_none_type, + kebab_case=self.cli_kebab_case, + enable_decoding=self.config.get('enable_decoding'), + env_prefix_len=self.env_prefix_len, ) - preferred_alias = alias_names[0] - if _CliSubCommand in field_info.metadata: - for model in sub_models: - subcommand_alias = self._check_kebab_name( - model.__name__ if len(sub_models) > 1 else preferred_alias + alias_path_args.update(arg.alias_paths) + + if arg.subcommand_dest: + for sub_model in arg.sub_models: + subcommand_alias = arg.subcommand_alias(sub_model) + subcommand_arg = self._parser_map[arg.subcommand_dest][subcommand_alias] + subcommand_arg.args = [subcommand_alias] + subcommand_arg.kwargs['allow_abbrev'] = False + subcommand_arg.kwargs['formatter_class'] = self._formatter_class + subcommand_arg.kwargs['description'] = ( + None if sub_model.__doc__ is None else dedent(sub_model.__doc__) ) - subcommand_name = f'{arg_prefix}{subcommand_alias}' - subcommand_dest = f'{arg_prefix}{preferred_alias}' - self._cli_subcommands[f'{arg_prefix}:subcommand'][subcommand_name] = subcommand_dest - - subcommand_help = None if len(sub_models) > 1 else field_info.description + subcommand_arg.kwargs['help'] = None if len(arg.sub_models) > 1 else field_info.description if self.cli_use_class_docs_for_groups: - subcommand_help = None if model.__doc__ is None else dedent(model.__doc__) + subcommand_arg.kwargs['help'] = None if sub_model.__doc__ is None else dedent(sub_model.__doc__) subparsers = ( self._add_subparsers( parser, title='subcommands', dest=f'{arg_prefix}:subcommand', - description=field_info.description if len(sub_models) > 1 else None, + description=field_info.description if len(arg.sub_models) > 1 else None, ) if subparsers is None else subparsers @@ -742,97 +907,70 @@ def _add_parser_args( ) self._add_parser_args( - parser=self._add_parser( - subparsers, - subcommand_alias, - help=subcommand_help, - formatter_class=self._formatter_class, - description=None if model.__doc__ is None else dedent(model.__doc__), - allow_abbrev=False, - ), - model=model, + parser=self._add_parser(subparsers, *subcommand_arg.args, **subcommand_arg.kwargs), + model=sub_model, added_args=[], - arg_prefix=f'{arg_prefix}{preferred_alias}.', - subcommand_prefix=f'{subcommand_prefix}{preferred_alias}.', + arg_prefix=f'{arg.dest}.', + subcommand_prefix=f'{subcommand_prefix}{arg.preferred_alias}.', group=None, alias_prefixes=[], model_default=PydanticUndefined, ) else: flag_prefix: str = self._cli_flag_prefix - is_append_action = _annotation_contains_types( - field_info.annotation, (list, set, dict, Sequence, Mapping), is_strip_annotated=True - ) - is_parser_submodel = sub_models and not is_append_action - kwargs: dict[str, Any] = {} - kwargs['default'] = CLI_SUPPRESS - kwargs['help'] = self._help_format(field_name, field_info, model_default, is_model_suppressed) - kwargs['metavar'] = self._metavar_format(field_info.annotation) - kwargs['required'] = ( + arg.kwargs['dest'] = arg.dest + arg.kwargs['default'] = CLI_SUPPRESS + arg.kwargs['help'] = self._help_format(field_name, field_info, model_default, is_model_suppressed) + arg.kwargs['metavar'] = self._metavar_format(field_info.annotation) + arg.kwargs['required'] = ( self.cli_enforce_required and field_info.is_required() and model_default is PydanticUndefined ) - kwargs['dest'] = ( - # Strip prefix if validation alias is set and value is not complex. - # Related https://github.com/pydantic/pydantic-settings/pull/25 - f'{arg_prefix}{preferred_alias}'[self.env_prefix_len :] - if arg_prefix and field_info.validation_alias is not None and not is_parser_submodel - else f'{arg_prefix}{preferred_alias}' - ) - arg_names = self._get_arg_names(arg_prefix, subcommand_prefix, alias_prefixes, alias_names, added_args) - if not arg_names or (kwargs['dest'] in added_args): + arg_names = self._get_arg_names( + arg_prefix, subcommand_prefix, alias_prefixes, arg.alias_names, added_args + ) + if not arg_names or (arg.kwargs['dest'] in added_args): continue - self._convert_append_action(kwargs, field_info, is_append_action) + self._convert_append_action(arg.kwargs, field_info, arg.is_append_action) if _CliPositionalArg in field_info.metadata: arg_names, flag_prefix = self._convert_positional_arg( - kwargs, field_info, preferred_alias, model_default + arg.kwargs, field_info, arg.preferred_alias, model_default ) - self._convert_bool_flag(kwargs, field_info, model_default) + self._convert_bool_flag(arg.kwargs, field_info, model_default) - if is_parser_submodel: + if arg.is_parser_submodel: self._add_parser_submodels( parser, model, - sub_models, + arg.sub_models, added_args, arg_prefix, subcommand_prefix, flag_prefix, arg_names, - kwargs, + arg.kwargs, field_name, field_info, - alias_names, + arg.alias_names, model_default=model_default, is_model_suppressed=is_model_suppressed, ) elif _CliUnknownArgs in field_info.metadata: - self._cli_unknown_args[kwargs['dest']] = [] - elif not is_alias_path_only: - if group is not None: - if isinstance(group, dict): - group = self._add_group(parser, **group) - added_args += list(arg_names) - self._add_argument( - group, *(f'{flag_prefix[: len(name)]}{name}' for name in arg_names), **kwargs - ) - else: - added_args += list(arg_names) - self._add_argument( - parser, *(f'{flag_prefix[: len(name)]}{name}' for name in arg_names), **kwargs - ) + self._cli_unknown_args[arg.kwargs['dest']] = [] + elif not arg.is_alias_path_only: + if isinstance(group, dict): + group = self._add_group(parser, **group) + context = parser if group is None else group + arg.args = [f'{flag_prefix[: len(name)]}{name}' for name in arg_names] + self._add_argument(context, *arg.args, **arg.kwargs) + added_args += list(arg_names) self._add_parser_alias_paths(parser, alias_path_args, added_args, arg_prefix, subcommand_prefix, group) return parser - def _check_kebab_name(self, name: str) -> str: - if self.cli_kebab_case: - return name.replace('_', '-') - return name - def _convert_append_action(self, kwargs: dict[str, Any], field_info: FieldInfo, is_append_action: bool) -> None: if is_append_action: kwargs['action'] = 'append' @@ -853,7 +991,7 @@ def _convert_positional_arg( flag_prefix = '' arg_names = [kwargs['dest']] kwargs['default'] = PydanticUndefined - kwargs['metavar'] = self._check_kebab_name(preferred_alias.upper()) + kwargs['metavar'] = _CliArg.get_kebab_case(preferred_alias.upper(), self.cli_kebab_case) # Note: CLI positional args are always strictly required at the CLI. Therefore, use field_info.is_required in # conjunction with model_default instead of the derived kwargs['required']. @@ -879,10 +1017,11 @@ def _get_arg_names( arg_names: list[str] = [] for prefix in [arg_prefix] + alias_prefixes: for name in alias_names: - arg_name = self._check_kebab_name( + arg_name = _CliArg.get_kebab_case( f'{prefix}{name}' if subcommand_prefix == self.env_prefix - else f'{prefix.replace(subcommand_prefix, "", 1)}{name}' + else f'{prefix.replace(subcommand_prefix, "", 1)}{name}', + self.cli_kebab_case, ) if arg_name not in added_args: arg_names.append(arg_name) @@ -976,7 +1115,7 @@ def _add_parser_submodels( def _add_parser_alias_paths( self, parser: Any, - alias_path_args: dict[str, str], + alias_path_args: dict[str, Optional[int]], added_args: list[str], arg_prefix: str, subcommand_prefix: str, @@ -986,10 +1125,7 @@ def _add_parser_alias_paths( context = parser if group is not None: context = self._add_group(parser, **group) if isinstance(group, dict) else group - is_nested_alias_path = arg_prefix.endswith('.') - arg_prefix = arg_prefix[:-1] if is_nested_alias_path else arg_prefix - for name, metavar in alias_path_args.items(): - name = '' if is_nested_alias_path else name + for name, index in alias_path_args.items(): arg_name = ( f'{arg_prefix}{name}' if subcommand_prefix == self.env_prefix @@ -998,15 +1134,16 @@ def _add_parser_alias_paths( kwargs: dict[str, Any] = {} kwargs['default'] = CLI_SUPPRESS kwargs['help'] = 'pydantic alias path' - kwargs['dest'] = f'{arg_prefix}{name}' - if metavar == 'dict' or is_nested_alias_path: + kwargs['action'] = 'append' + kwargs['metavar'] = 'list' + if index is None: kwargs['metavar'] = 'dict' - else: - kwargs['action'] = 'append' - kwargs['metavar'] = 'list' - if arg_name not in added_args: - added_args.append(arg_name) - self._add_argument(context, f'{self._cli_flag_prefix}{arg_name}', **kwargs) + self._cli_dict_args[arg_name] = dict + args = [f'{self._cli_flag_prefix}{arg_name}'] + for key, arg in self._parser_map[arg_name].items(): + arg.args, arg.kwargs = args, kwargs + self._add_argument(context, *args, **kwargs) + added_args.append(arg_name) def _get_modified_args(self, obj: Any) -> tuple[str, ...]: if not self.cli_hide_none_type: @@ -1092,10 +1229,9 @@ def _is_field_suppressed(self, field_info: FieldInfo) -> bool: _help = field_info.description if field_info.description else '' return _help == CLI_SUPPRESS or CLI_SUPPRESS in field_info.metadata - @classmethod def _update_alias_path_only_default( - cls, arg_name: str, value: Any, field_info: FieldInfo, alias_path_only_defaults: dict[str, Any] - ) -> tuple[str, list[Any] | dict[str, Any]]: + self, arg_name: str, value: Any, field_info: FieldInfo, alias_path_only_defaults: dict[str, Any] + ) -> list[Any] | dict[str, Any]: alias_path: AliasPath = [ alias if isinstance(alias, AliasPath) else cast(AliasPath, alias.choices[0]) for alias in (field_info.alias, field_info.validation_alias) @@ -1103,10 +1239,6 @@ def _update_alias_path_only_default( ][0] alias_nested_paths: list[str] = alias_path.path[1:-1] # type: ignore - if '.' in arg_name: - alias_nested_paths = arg_name.split('.') + alias_nested_paths - arg_name = alias_nested_paths.pop(0) - if not alias_nested_paths: alias_path_only_defaults.setdefault(arg_name, []) alias_default = alias_path_only_defaults[arg_name] @@ -1123,62 +1255,42 @@ def _update_alias_path_only_default( alias_path_index = cast(int, alias_path.path[-1]) alias_default.extend([''] * max(alias_path_index + 1 - len(alias_default), 0)) alias_default[alias_path_index] = value - return arg_name, alias_path_only_defaults[arg_name] - - @classmethod - def _serialized_args(cls, model: PydanticModel, model_config: Any, prefix: str = '') -> list[str]: - model_field_definitions: dict[str, Any] = {} - for field_name, field_info in _get_model_fields(type(model)).items(): - model_default = getattr(model, field_name) - if field_info.default == model_default: - continue - if _CliSubCommand in field_info.metadata and model_default is None: - continue - model_field_definitions[field_name] = (field_info.annotation, field_info) - cli_serialize_cls = create_model('CliSerialize', __config__=model_config, **model_field_definitions) + return alias_path_only_defaults[arg_name] - added_args: set[str] = set() - alias_path_args: dict[str, str] = {} + def _serialized_args(self, model: PydanticModel, _is_submodel: bool = False) -> list[str]: alias_path_only_defaults: dict[str, Any] = {} optional_args: list[str | list[Any] | dict[str, Any]] = [] positional_args: list[str | list[Any] | dict[str, Any]] = [] subcommand_args: list[str] = [] - cli_settings = CliSettingsSource[Any](cli_serialize_cls) - for field_name, field_info in _get_model_fields(cli_serialize_cls).items(): + for field_name, field_info in _get_model_fields(type(model) if _is_submodel else self.settings_cls).items(): model_default = getattr(model, field_name) - alias_names, is_alias_path_only = _get_alias_names( - field_name, field_info, alias_path_args=alias_path_args, case_sensitive=cli_settings.case_sensitive - ) - preferred_alias = alias_names[0] - if _CliSubCommand in field_info.metadata: - subcommand_args.append(cls._check_kebab_name(cli_settings, preferred_alias)) - subcommand_args += cls._serialized_args(model_default, model_config) + if field_info.default == model_default: + continue + if _CliSubCommand in field_info.metadata and model_default is None: + continue + arg = next(iter(self._parser_map[field_info].values())) + if arg.subcommand_dest: + subcommand_args.append(arg.subcommand_alias(type(model_default))) + subcommand_args += self._serialized_args(model_default, _is_submodel=True) continue if is_model_class(type(model_default)) or is_pydantic_dataclass(type(model_default)): - positional_args += cls._serialized_args( - model_default, model_config, prefix=f'{prefix}{preferred_alias}.' - ) + positional_args += self._serialized_args(model_default, _is_submodel=True) continue - arg_name = f'{prefix}{cls._check_kebab_name(cli_settings, preferred_alias)}' + matched = re.match(r'(-*)(.+)', arg.preferred_arg_name) + flag_chars, arg_name = matched.groups() if matched else ('', '') value: str | list[Any] | dict[str, Any] = ( json.dumps(model_default) if isinstance(model_default, (dict, list, set)) else str(model_default) ) - if is_alias_path_only: + if arg.is_alias_path_only: # For alias path only, we wont know the complete value until we've finished parsing the entire class. In # this case, insert value as a non-string reference pointing to the relevant alias_path_only_defaults # entry and convert into completed string value later. - arg_name, value = cls._update_alias_path_only_default( - arg_name, value, field_info, alias_path_only_defaults - ) - - if arg_name in added_args: - continue - added_args.add(arg_name) + value = self._update_alias_path_only_default(arg_name, value, field_info, alias_path_only_defaults) if _CliPositionalArg in field_info.metadata: - if is_alias_path_only: + if arg.is_alias_path_only: positional_args.append(value) continue for value in model_default if isinstance(model_default, list) else [model_default]: @@ -1186,17 +1298,14 @@ def _serialized_args(cls, model: PydanticModel, model_config: Any, prefix: str = positional_args.append(value) continue - flag_chars = f'{cli_settings.cli_flag_prefix_char * min(len(arg_name), 2)}' - kwargs = {'metavar': cls._metavar_format(cli_settings, field_info.annotation)} - cls._convert_bool_flag(cli_settings, kwargs, field_info, model_default) - # Note: cls._convert_bool_flag will add action to kwargs if value is implicit bool flag - if 'action' in kwargs and model_default is False: + # Note: prepend 'no-' for boolean optional action flag if model_default value is False + if arg.kwargs.get('action') == BooleanOptionalAction and model_default is False: flag_chars += 'no-' optional_args.append(f'{flag_chars}{arg_name}') # If implicit bool flag, do not add a value - if 'action' not in kwargs: + if arg.kwargs.get('action') != BooleanOptionalAction: optional_args.append(value) serialized_args: list[str] = [] diff --git a/pydantic_settings/sources/utils.py b/pydantic_settings/sources/utils.py index a8501568..56bfb3ea 100644 --- a/pydantic_settings/sources/utils.py +++ b/pydantic_settings/sources/utils.py @@ -138,7 +138,10 @@ def _get_model_fields(model_cls: type[Any]) -> dict[str, Any]: def _get_alias_names( - field_name: str, field_info: Any, alias_path_args: dict[str, str] = {}, case_sensitive: bool = True + field_name: str, + field_info: Any, + alias_path_args: Optional[dict[str, Optional[int]]] = None, + case_sensitive: bool = True, ) -> tuple[tuple[str, ...], bool]: """Get alias names for a field, handling alias paths and case sensitivity.""" from pydantic import AliasChoices, AliasPath @@ -168,7 +171,10 @@ def _get_alias_names( for alias_path in new_alias_paths: name = cast(str, alias_path.path[0]) name = name.lower() if not case_sensitive else name - alias_path_args[name] = 'dict' if len(alias_path.path) > 2 else 'list' + if alias_path_args is not None: + alias_path_args[name] = ( + alias_path.path[1] if len(alias_path.path) > 1 and isinstance(alias_path.path[1], int) else None + ) if not alias_names and is_alias_path_only: alias_names.append(name) if not case_sensitive: diff --git a/tests/test_source_cli.py b/tests/test_source_cli.py index 5f8cc37a..d4975db2 100644 --- a/tests/test_source_cli.py +++ b/tests/test_source_cli.py @@ -334,8 +334,12 @@ class Cfg(BaseSettings, cli_avoid_json=avoid_json): 'b', '--nest.str', 'str', - '--nest', - '{"path0": ["a0","b0","c0"], "path1": ["a1","b1","c1"], "path2": {"deep": ["a2","b2","c2"]}}', + '--nest.path0', + '["a0","b0","c0"]', + '--nest.path1', + '["a1","b1","c1"]', + '--nest.path2', + '{"deep": ["a2","b2","c2"]}', ], ) assert cfg.model_dump() == { @@ -352,10 +356,12 @@ class Cfg(BaseSettings, cli_avoid_json=avoid_json): assert serialized_cli_args == [ '--nest.a', 'a', - '--nest', - '{"path1": ["", "b1"], "path2": {"deep": ["", "b2"]}}', + '--nest.path1', + '["", "b1"]', '--nest.b', 'b', + '--nest.path2', + '{"deep": ["", "b2"]}', '--nest.str', 'str', ] From 201c3c36073606eed1bf5ee552982cb2c8f515d0 Mon Sep 17 00:00:00 2001 From: Kyle Schwab Date: Sat, 16 Aug 2025 13:49:18 -0600 Subject: [PATCH 2/4] Workaround note. --- pydantic_settings/sources/providers/cli.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pydantic_settings/sources/providers/cli.py b/pydantic_settings/sources/providers/cli.py index 4848569c..6764b2c9 100644 --- a/pydantic_settings/sources/providers/cli.py +++ b/pydantic_settings/sources/providers/cli.py @@ -494,6 +494,8 @@ def _load_env_vars( for field_name, val in list(parsed_args.items()): if isinstance(val, list): if self._is_nested_alias_path_only_workaround(parsed_args, field_name, val): + # Workaround for nested alias path environment variables not being handled. + # See https://github.com/pydantic/pydantic-settings/issues/670 continue parsed_args[field_name] = self._merge_parsed_list(val, field_name) elif field_name.endswith(':subcommand') and val is not None: @@ -529,6 +531,10 @@ def _load_env_vars( def _is_nested_alias_path_only_workaround( self, parsed_args: dict[str, list[str] | str], field_name: str, val: list[str] ) -> bool: + """ + Workaround for nested alias path environment variables not being handled. + See https://github.com/pydantic/pydantic-settings/issues/670 + """ known_arg = self._parser_map[field_name].values() if not known_arg: return False From ce4586b5a04306a807391a9053e5106b79d80acf Mon Sep 17 00:00:00 2001 From: Kyle Schwab Date: Mon, 25 Aug 2025 10:40:04 -0600 Subject: [PATCH 3/4] Initial test case. --- pydantic_settings/sources/providers/cli.py | 23 ++++-- tests/test_source_cli.py | 86 +++++++++++++++++++++- 2 files changed, 99 insertions(+), 10 deletions(-) diff --git a/pydantic_settings/sources/providers/cli.py b/pydantic_settings/sources/providers/cli.py index 6764b2c9..0926fcf3 100644 --- a/pydantic_settings/sources/providers/cli.py +++ b/pydantic_settings/sources/providers/cli.py @@ -497,6 +497,12 @@ def _load_env_vars( # Workaround for nested alias path environment variables not being handled. # See https://github.com/pydantic/pydantic-settings/issues/670 continue + + cli_arg = self._parser_map.get(field_name, {}).get(None) + if cli_arg and cli_arg.is_no_decode: + parsed_args[field_name] = ','.join(val) + continue + parsed_args[field_name] = self._merge_parsed_list(val, field_name) elif field_name.endswith(':subcommand') and val is not None: selected_subcommands.append(self._parser_map[field_name][val].dest) @@ -535,7 +541,7 @@ def _is_nested_alias_path_only_workaround( Workaround for nested alias path environment variables not being handled. See https://github.com/pydantic/pydantic-settings/issues/670 """ - known_arg = self._parser_map[field_name].values() + known_arg = self._parser_map.get(field_name, {}).values() if not known_arg: return False arg = next(iter(known_arg)) @@ -573,18 +579,21 @@ def _get_merge_parsed_list_types( def _merged_list_to_str(self, merged_list: list[str], field_name: str) -> str: decode_list: list[str] = [] is_use_decode: Optional[bool] = None - cli_arg_map = self._parser_map.get(field_name) + cli_arg_map = self._parser_map.get(field_name, {}) for index, item in enumerate(merged_list): - cli_arg = cli_arg_map.get(index) if cli_arg_map else None + cli_arg = cli_arg_map.get(index) is_decode = cli_arg is None or not cli_arg.is_no_decode if is_use_decode is None: is_use_decode = is_decode elif is_use_decode != is_decode: raise SettingsError('Mixing Decode and NoDecode across different AliasPath fields is not allowed') - if isinstance(item, str) and is_use_decode: + if is_use_decode: item = item.replace('\\', '\\\\') + elif item.startswith('"') and item.endswith('"'): + item = item[1:-1] decode_list.append(item) - return f'[{",".join(decode_list)}]' + merged_list_str = ','.join(decode_list) + return f'[{merged_list_str}]' if is_use_decode else merged_list_str def _merge_parsed_list(self, parsed_list: list[str], field_name: str) -> str: try: @@ -1304,8 +1313,8 @@ def _serialized_args(self, model: PydanticModel, _is_submodel: bool = False) -> positional_args.append(value) continue - # Note: prepend 'no-' for boolean optional action flag if model_default value is False - if arg.kwargs.get('action') == BooleanOptionalAction and model_default is False: + # Note: prepend 'no-' for boolean optional action flag if model_default value is False and flag is not a short option + if arg.kwargs.get('action') == BooleanOptionalAction and model_default is False and flag_chars == '--': flag_chars += 'no-' optional_args.append(f'{flag_chars}{arg_name}') diff --git a/tests/test_source_cli.py b/tests/test_source_cli.py index d4975db2..4ff6e707 100644 --- a/tests/test_source_cli.py +++ b/tests/test_source_cli.py @@ -5,6 +5,7 @@ import time import typing from enum import IntEnum +from pathlib import Path, PureWindowsPath from typing import Annotated, Any, Dict, Generic, List, Literal, Optional, Tuple, TypeVar, Union # noqa: UP035 import pytest @@ -20,13 +21,23 @@ Field, Tag, ValidationError, + field_validator, + model_validator, ) from pydantic import ( dataclasses as pydantic_dataclasses, ) from pydantic._internal._repr import Representation -from pydantic_settings import BaseSettings, CliApp, PydanticBaseSettingsSource, SettingsConfigDict, SettingsError +from pydantic_settings import ( + BaseSettings, + CliApp, + ForceDecode, + NoDecode, + PydanticBaseSettingsSource, + SettingsConfigDict, + SettingsError, +) from pydantic_settings.sources import ( CLI_SUPPRESS, CliExplicitFlag, @@ -1547,16 +1558,33 @@ class ImplicitSettings(BaseSettings, cli_implicit_flags=True, cli_enforce_requir explicit_settings = CliApp.run(ExplicitSettings, cli_args=['--explicit_req=True', '--implicit_req']) assert explicit_settings.model_dump() == expected + serialized_args = CliApp.serialize(explicit_settings) + assert serialized_args == ['--explicit_req', 'True', '--implicit_req'] + assert CliApp.run(ExplicitSettings, cli_args=serialized_args).model_dump() == expected implicit_settings = CliApp.run(ImplicitSettings, cli_args=['--explicit_req=True', '--implicit_req']) assert implicit_settings.model_dump() == expected + serialized_args = CliApp.serialize(implicit_settings) + assert serialized_args == ['--explicit_req', 'True', '--implicit_req'] + assert CliApp.run(ImplicitSettings, cli_args=serialized_args).model_dump() == expected + expected = { + 'explicit_req': False, + 'explicit_opt': False, + 'implicit_req': False, + 'implicit_opt': False, + } + + explicit_settings = CliApp.run(ExplicitSettings, cli_args=['--explicit_req=False', '--no-implicit_req']) + assert explicit_settings.model_dump() == expected serialized_args = CliApp.serialize(explicit_settings) - assert serialized_args == ['--explicit_req', 'True', '--implicit_req'] + assert serialized_args == ['--explicit_req', 'False', '--no-implicit_req'] assert CliApp.run(ExplicitSettings, cli_args=serialized_args).model_dump() == expected + implicit_settings = CliApp.run(ImplicitSettings, cli_args=['--explicit_req=False', '--no-implicit_req']) + assert implicit_settings.model_dump() == expected serialized_args = CliApp.serialize(implicit_settings) - assert serialized_args == ['--explicit_req', 'True', '--implicit_req'] + assert serialized_args == ['--explicit_req', 'False', '--no-implicit_req'] assert CliApp.run(ImplicitSettings, cli_args=serialized_args).model_dump() == expected @@ -2749,3 +2777,55 @@ class Cfg(BaseSettings): ] assert CliApp.run(Cfg, cli_args=serialized_cli_args).model_dump() == cfg.model_dump() + + +def test_cli_decoding(): + PATH_A_STR = str(PureWindowsPath(Path.cwd())) + PATH_B_STR = str(PureWindowsPath(Path.cwd() / 'subdir')) + + class PathsDecode(BaseSettings): + path_a: Path = Field(validation_alias=AliasPath('paths', 0)) + path_b: Path = Field(validation_alias=AliasPath('paths', 1)) + + assert CliApp.run(PathsDecode, cli_args=['--paths', PATH_A_STR, '--paths', PATH_B_STR]).model_dump() == { + 'path_a': Path(PATH_A_STR), + 'path_b': Path(PATH_B_STR), + } + + class PathsListNoDecode(BaseSettings): + paths: Annotated[list[Path], NoDecode] + + @field_validator('paths', mode='before') + @classmethod + def decode_path_a(cls, paths: str) -> list[Path]: + return [Path(p) for p in paths.split(',')] + + assert CliApp.run(PathsListNoDecode, cli_args=['--paths', f'{PATH_A_STR},{PATH_B_STR}']).model_dump() == { + 'paths': [Path(PATH_A_STR), Path(PATH_B_STR)] + } + + class PathsAliasNoDecode(BaseSettings): + path_a: Annotated[Path, NoDecode] = Field(validation_alias=AliasPath('paths', 0)) + path_b: Annotated[Path, NoDecode] = Field(validation_alias=AliasPath('paths', 1)) + + @model_validator(mode='before') + @classmethod + def intercept_kwargs(cls, data: Any) -> Any: + data['paths'] = [Path(p) for p in data['paths'].split(',')] + return data + + assert CliApp.run(PathsAliasNoDecode, cli_args=['--paths', f'{PATH_A_STR},{PATH_B_STR}']).model_dump() == { + 'path_a': Path(PATH_A_STR), + 'path_b': Path(PATH_B_STR), + } + + with pytest.raises( + SettingsError, + match='Parsing error encountered for paths: Mixing Decode and NoDecode across different AliasPath fields is not allowed', + ): + + class PathsMixedDecode(BaseSettings): + path_a: Annotated[Path, ForceDecode] = Field(validation_alias=AliasPath('paths', 0)) + path_b: Annotated[Path, NoDecode] = Field(validation_alias=AliasPath('paths', 1)) + + CliApp.run(PathsMixedDecode, cli_args=['--paths', PATH_A_STR, '--paths', PATH_B_STR]) From cf9b2d22752fbd178c65e0e14749123e3e2b8dfc Mon Sep 17 00:00:00 2001 From: Kyle Schwab Date: Sun, 31 Aug 2025 12:53:48 -0600 Subject: [PATCH 4/4] Complete coverage for CliSettingsSource. --- pydantic_settings/sources/providers/cli.py | 3 - tests/test_source_cli.py | 64 +++++++++++++++++----- 2 files changed, 51 insertions(+), 16 deletions(-) diff --git a/pydantic_settings/sources/providers/cli.py b/pydantic_settings/sources/providers/cli.py index 0926fcf3..bd22e311 100644 --- a/pydantic_settings/sources/providers/cli.py +++ b/pydantic_settings/sources/providers/cli.py @@ -1305,9 +1305,6 @@ def _serialized_args(self, model: PydanticModel, _is_submodel: bool = False) -> value = self._update_alias_path_only_default(arg_name, value, field_info, alias_path_only_defaults) if _CliPositionalArg in field_info.metadata: - if arg.is_alias_path_only: - positional_args.append(value) - continue for value in model_default if isinstance(model_default, list) else [model_default]: value = json.dumps(value) if isinstance(value, (dict, list, set)) else str(value) positional_args.append(value) diff --git a/tests/test_source_cli.py b/tests/test_source_cli.py index 4ff6e707..d2503226 100644 --- a/tests/test_source_cli.py +++ b/tests/test_source_cli.py @@ -281,6 +281,7 @@ class Cfg(BaseSettings, cli_avoid_json=avoid_json): alias_choice_w_only_path: str = Field(validation_alias=AliasChoices(AliasPath('path1', 1))) alias_choice_no_path: str = Field(validation_alias=AliasChoices('b', 'c')) alias_path: str = Field(validation_alias=AliasPath('path2', 'deep', 1)) + alias_extra_deep: str = Field(validation_alias=AliasPath('path3', 'deep', 'extra', 'deep', 1)) alias_str: str = Field(validation_alias='str') cfg = CliApp.run( @@ -298,6 +299,8 @@ class Cfg(BaseSettings, cli_avoid_json=avoid_json): 'a1,b1,c1', '--path2', '{"deep": ["a2","b2","c2"]}', + '--path3', + '{"deep": {"extra": {"deep": ["a3","b3","c3"]}}}', ], ) assert cfg.model_dump() == { @@ -305,6 +308,7 @@ class Cfg(BaseSettings, cli_avoid_json=avoid_json): 'alias_choice_w_only_path': 'b1', 'alias_choice_no_path': 'b', 'alias_path': 'b2', + 'alias_extra_deep': 'b3', 'alias_str': 'str', } @@ -318,6 +322,8 @@ class Cfg(BaseSettings, cli_avoid_json=avoid_json): 'b', '--path2', '{"deep": ["", "b2"]}', + '--path3', + '{"deep": {"extra": {"deep": ["", "b3"]}}}', '--str', 'str', ] @@ -331,6 +337,7 @@ class Nested(BaseModel): alias_choice_w_only_path: str = Field(validation_alias=AliasChoices(AliasPath('path1', 1))) alias_choice_no_path: str = Field(validation_alias=AliasChoices('b', 'c')) alias_path: str = Field(validation_alias=AliasPath('path2', 'deep', 1)) + alias_extra_deep: str = Field(validation_alias=AliasPath('path3', 'deep', 'extra', 'deep', 1)) alias_str: str = Field(validation_alias='str') class Cfg(BaseSettings, cli_avoid_json=avoid_json): @@ -351,6 +358,8 @@ class Cfg(BaseSettings, cli_avoid_json=avoid_json): '["a1","b1","c1"]', '--nest.path2', '{"deep": ["a2","b2","c2"]}', + '--nest.path3', + '{"deep": {"extra": {"deep": ["a3","b3","c3"]}}}', ], ) assert cfg.model_dump() == { @@ -359,6 +368,7 @@ class Cfg(BaseSettings, cli_avoid_json=avoid_json): 'alias_choice_w_only_path': 'b1', 'alias_choice_no_path': 'b', 'alias_path': 'b2', + 'alias_extra_deep': 'b3', 'alias_str': 'str', } } @@ -373,6 +383,8 @@ class Cfg(BaseSettings, cli_avoid_json=avoid_json): 'b', '--nest.path2', '{"deep": ["", "b2"]}', + '--nest.path3', + '{"deep": {"extra": {"deep": ["", "b3"]}}}', '--nest.str', 'str', ] @@ -2468,10 +2480,12 @@ class DeepSubModel(BaseModel): class SubModel(BaseModel): sub_subcmd: CliSubCommand[DeepSubModel] + sub_other_subcmd: CliSubCommand[DeepSubModel] sub_arg: str class Root(BaseModel): root_subcmd: CliSubCommand[SubModel] + other_subcmd: CliSubCommand[SubModel] root_arg: str root = CliApp.run( @@ -2487,9 +2501,11 @@ class Root(BaseModel): ) assert root.model_dump() == { 'root_arg': 'hi', + 'other_subcmd': None, 'root_subcmd': { 'sub_arg': 'hello', 'sub_subcmd': {'deep_pos_arg': 'hey', 'deep_arg': 'bye'}, + 'sub_other_subcmd': None, }, } @@ -2512,15 +2528,16 @@ class Root(BaseModel): CliApp.run(Root) assert ( capsys.readouterr().out - == f"""usage: example.py [-h] --root-arg str {{root-subcmd}} ... + == f"""usage: example.py [-h] --root-arg str {{root-subcmd,other-subcmd}} ... {ARGPARSE_OPTIONS_TEXT}: - -h, --help show this help message and exit - --root-arg str (required) + -h, --help show this help message and exit + --root-arg str (required) subcommands: - {{root-subcmd}} + {{root-subcmd,other-subcmd}} root-subcmd + other-subcmd """ ) @@ -2529,15 +2546,17 @@ class Root(BaseModel): CliApp.run(Root) assert ( capsys.readouterr().out - == f"""usage: example.py root-subcmd [-h] --sub-arg str {{sub-subcmd}} ... + == f"""usage: example.py root-subcmd [-h] --sub-arg str + {{sub-subcmd,sub-other-subcmd}} ... {ARGPARSE_OPTIONS_TEXT}: - -h, --help show this help message and exit - --sub-arg str (required) + -h, --help show this help message and exit + --sub-arg str (required) subcommands: - {{sub-subcmd}} + {{sub-subcmd,sub-other-subcmd}} sub-subcmd + sub-other-subcmd """ ) @@ -2786,37 +2805,56 @@ def test_cli_decoding(): class PathsDecode(BaseSettings): path_a: Path = Field(validation_alias=AliasPath('paths', 0)) path_b: Path = Field(validation_alias=AliasPath('paths', 1)) + num_a: int = Field(validation_alias=AliasPath('nums', 0)) + num_b: int = Field(validation_alias=AliasPath('nums', 1)) - assert CliApp.run(PathsDecode, cli_args=['--paths', PATH_A_STR, '--paths', PATH_B_STR]).model_dump() == { + assert CliApp.run( + PathsDecode, cli_args=['--paths', PATH_A_STR, '--paths', PATH_B_STR, '--nums', '1', '--nums', '2'] + ).model_dump() == { 'path_a': Path(PATH_A_STR), 'path_b': Path(PATH_B_STR), + 'num_a': 1, + 'num_b': 2, } class PathsListNoDecode(BaseSettings): paths: Annotated[list[Path], NoDecode] + nums: Annotated[list[int], NoDecode] @field_validator('paths', mode='before') @classmethod def decode_path_a(cls, paths: str) -> list[Path]: return [Path(p) for p in paths.split(',')] - assert CliApp.run(PathsListNoDecode, cli_args=['--paths', f'{PATH_A_STR},{PATH_B_STR}']).model_dump() == { - 'paths': [Path(PATH_A_STR), Path(PATH_B_STR)] - } + @field_validator('nums', mode='before') + @classmethod + def decode_nums(cls, nums: str) -> list[int]: + return [int(n) for n in nums.split(',')] + + assert CliApp.run( + PathsListNoDecode, cli_args=['--paths', f'{PATH_A_STR},{PATH_B_STR}', '--nums', '1,2'] + ).model_dump() == {'paths': [Path(PATH_A_STR), Path(PATH_B_STR)], 'nums': [1, 2]} class PathsAliasNoDecode(BaseSettings): path_a: Annotated[Path, NoDecode] = Field(validation_alias=AliasPath('paths', 0)) path_b: Annotated[Path, NoDecode] = Field(validation_alias=AliasPath('paths', 1)) + num_a: Annotated[int, NoDecode] = Field(validation_alias=AliasPath('nums', 0)) + num_b: Annotated[int, NoDecode] = Field(validation_alias=AliasPath('nums', 1)) @model_validator(mode='before') @classmethod def intercept_kwargs(cls, data: Any) -> Any: data['paths'] = [Path(p) for p in data['paths'].split(',')] + data['nums'] = [int(n) for n in data['nums'].split(',')] return data - assert CliApp.run(PathsAliasNoDecode, cli_args=['--paths', f'{PATH_A_STR},{PATH_B_STR}']).model_dump() == { + assert CliApp.run( + PathsAliasNoDecode, cli_args=['--paths', f'{PATH_A_STR},{PATH_B_STR}', '--nums', '1,2'] + ).model_dump() == { 'path_a': Path(PATH_A_STR), 'path_b': Path(PATH_B_STR), + 'num_a': 1, + 'num_b': 2, } with pytest.raises(