Source code for ai2_kit.domain.deepmd

from ai2_kit.core.artifact import Artifact, ArtifactDict
from ai2_kit.core.script import BashTemplate
from ai2_kit.core.script import BashScript, BashStep
from ai2_kit.core.job import gather_jobs
from ai2_kit.core.log import get_logger
from ai2_kit.core.util import dict_nested_get, expand_globs, dump_json, list_split, flatten
from ai2_kit.core.pydantic import BaseModel
from ai2_kit.tool.dpdata import set_fparam, register_data_types


from typing import List, Tuple, Optional
from dataclasses import dataclass
import os
import copy
import random
import sys

from .iface import ICllTrainOutput, BaseCllContext, TRAINING_MODE
from .data import DataFormat, get_data_format
from .dpff import set_dplr_ext_from_cp2k_output, build_sel_type_assertion

from .constant import (
    DP_CHECKPOINT_FILE,
    DP_DISP_FILE,
    DP_PROFILING_FILE,
    DP_INPUT_FILE,
    DP_FROZEN_MODEL,
    DP_ORIGINAL_MODEL
)

logger = get_logger(__name__)


[docs]class CllDeepmdInputConfig(BaseModel):
[docs] class DwTraningConfig(BaseModel): """ Options for deep wannier model training """ input_template: dict """ Deepmd input template. Ref: https://docs.deepmodeling.com/projects/deepmd/en/master/model/dplr.html """
train_dw: Optional[DwTraningConfig] = None """ Options for deep wannier model training, if None, then deep wannier model will not be trained. """ model_num: int = 4 """ Total number of models to train. """ init_dataset: List[str] = [] """ Dataset used to initialize training. """ input_template: dict = dict() """ Deepmd input template. """ compress_model: bool = False """ Whether to compress model after training. """ isolate_outliers: bool = False """ If isolate_outliers is enabled, then outlier data will be separated from training data. """ outlier_f_cutoff: float = 10. """ The threshold of force magnitude to determine whether a data is outlier. """ outlier_weight: float = 0.003 """ The weight of outlier data in training data. """ fixture_models: List[str] = [] """ Fixture models used to initialize training, support glob pattern. If this is not empty, then the whole training process will be skipped. This feature is useful for debugging, or explore more structures without training. The models should be on the remote executor. The name fixture is used as the concept of fixture in pytest. """ group_by_formula: bool = False """ Grouping dataset by formula If this is enabled, then the dataset will be grouped by formula. Otherwise, the dataset will be grouped by ancestor. Set this to True when you have multiple structures with the same ancestor. """
[docs]class CllDeepmdContextConfig(BaseModel): script_template: BashTemplate dp_cmd: str = 'dp' concurrency: int = 5
[docs]@dataclass class CllDeepmdInput: config: CllDeepmdInputConfig mode: TRAINING_MODE type_map: List[str] sel_type: Optional[List[int]] old_dataset: List[Artifact] # training data used by previous iteration new_dataset: List[Artifact] # training data used by current iteration
[docs]@dataclass class CllDeepmdContext(BaseCllContext): config: CllDeepmdContextConfig
[docs]@dataclass class GenericDeepmdOutput(ICllTrainOutput): models: List[Artifact] dataset: List[Artifact]
[docs] def get_mlp_models(self) -> List[Artifact]: return self.models
[docs] def get_training_dataset(self) -> List[Artifact]: return self.dataset
[docs]async def cll_deepmd(input: CllDeepmdInput, ctx: CllDeepmdContext): executor = ctx.resource_manager.default_executor # if fix_models is set, return it and skip training if len(input.config.fixture_models) > 0: logger.info(f'Using fixture models: {input.config.fixture_models}') model_paths = executor.run_python_fn(expand_globs)(input.config.fixture_models) assert len(model_paths) > 0, f'No fixture models found: {input.config.fixture_models}' return GenericDeepmdOutput( dataset=input.old_dataset.copy(), models=[Artifact.of(url=url, format=DataFormat.DEEPMD_MODEL) for url in model_paths] ) # setup workspace work_dir = os.path.join(executor.work_dir, ctx.path_prefix) [new_dataset_dir, tasks_dir, outlier_dir] = executor.setup_workspace( work_dir, ['new_dataset', 'tasks', 'outlier_dataset']) init_dataset = ctx.resource_manager.resolve_artifacts(input.config.init_dataset) # input dataset contains data that generated by previous iteration # it should not contain the initial dataset input_dataset: List[Artifact] = input.old_dataset.copy() # make new dataset from raw data new_dataset, outlier_dataset = executor.run_python_fn(make_deepmd_dataset)( dataset_dir=new_dataset_dir, outlier_dir=outlier_dir, raw_data_collection=[a.to_dict() for a in input.new_dataset], type_map=input.type_map, isolate_outliers=input.config.isolate_outliers, outlier_f_cutoff=input.config.outlier_f_cutoff, group_by_formula=input.config.group_by_formula, deepmd_input_template=input.config.input_template, sel_type=input.sel_type, mode=input.mode, ) input_dataset += [ Artifact.of(**a) for a in new_dataset] # use attrs to distinguish outlier dataset input_dataset += [ Artifact.of(**{**a, 'attrs': {**a['attrs'], 'outlier': True}}) for a in outlier_dataset] # classify dataset train_systems, outlier_systems, validation_systems = _classify_dataset(input_dataset + init_dataset) # setup deep wannier training job if needed dw_input_template = None if input.config.train_dw and input.mode == 'dpff': dw_input_template = input.config.train_dw.input_template # make task dirs dp_task_dirs, dw_task_dir = executor.run_python_fn(make_deepmd_task_dirs)( input_template=input.config.input_template, model_num=input.config.model_num, type_map=input.type_map, train_systems=train_systems, outlier_systems=outlier_systems, validation_systems=validation_systems, outlier_weight=input.config.outlier_weight, isolate_outliers=input.config.isolate_outliers, dw_input_template=dw_input_template, base_dir=tasks_dir, ) # run dw training job if needed if dw_task_dir is not None: logger.info(f'Run deep wannier training job, output dir: {dw_task_dir}') steps = _build_deepmd_steps( dp_cmd=ctx.config.dp_cmd, compress_model=input.config.compress_model, cwd=dw_task_dir, ) dw_train_script = BashScript( template=ctx.config.script_template, steps=steps, ) job = executor.submit(dw_train_script.render(), cwd=tasks_dir) await gather_jobs([job], max_tries=2) logger.info(f'Deep wannier training job finished, output dir: {dw_task_dir}') all_steps: List[list] = [] # run dp training jobs for task_dir in dp_task_dirs: executor.mkdir(task_dir) steps = _build_deepmd_steps( dp_cmd=ctx.config.dp_cmd, compress_model=input.config.compress_model, cwd=task_dir, ) all_steps.append(steps) # submit jobs by the number of concurrency jobs = [] for i, steps_group in enumerate(list_split(all_steps, ctx.config.concurrency)): if not steps_group: continue script = BashScript( template=ctx.config.script_template, steps=flatten(steps_group) ) job = executor.submit(script.render(), cwd=tasks_dir) jobs.append(job) await gather_jobs(jobs, max_tries=2) logger.info(f'All models are trained, output dirs: {dp_task_dirs}') return GenericDeepmdOutput( dataset=input_dataset.copy(), models=[Artifact.of( url=os.path.join(url, DP_FROZEN_MODEL), format=DataFormat.DEEPMD_MODEL, ) for url in dp_task_dirs] )
def _classify_dataset(dataset: List[Artifact]): """ Classify dataset into train, outlier and validation """ train_systems: List[str] = [] outlier_systems: List[str] = [] validation_systems: List[str] = [] for a in dataset: if dict_nested_get(a.attrs, ['deepmd', 'validation_data'], False): validation_systems.append(a.url) else: if dict_nested_get(a.attrs, ['outlier'], False): outlier_systems.append(a.url) else: train_systems.append(a.url) # FIXME: there are users reporting duplicated data in dataset # So we need to check and remove duplicated data # TODO: find the root cause of duplicated data and remove this workaround def _unique(l: list): r = sorted(set(l)) if len(r) != len(l): logger.warning(f'Found duplicated data in dataset: {l}') return r return _unique(train_systems), _unique(outlier_systems), _unique(validation_systems) def _build_deepmd_steps(dp_cmd: str, compress_model: bool, cwd: str,): steps = [] dp_train_cmd = f'{dp_cmd} train {DP_INPUT_FILE}' dp_train_cmd_restart = f'if [ ! -f model.ckpt.index ]; then {dp_train_cmd}; else {dp_train_cmd} --restart model.ckpt; fi' steps.append( BashStep(cmd=dp_train_cmd_restart, cwd=cwd, checkpoint='dp-train') # type: ignore ) if compress_model: steps.append(BashStep(cmd=[dp_cmd, 'freeze', '-o', DP_ORIGINAL_MODEL, '&&', dp_cmd, 'compress', '-i', DP_ORIGINAL_MODEL, '-o', DP_FROZEN_MODEL], cwd=cwd)) else: steps.append(BashStep(cmd=[dp_cmd, 'freeze', '-o', DP_FROZEN_MODEL], cwd=cwd)) return steps def __export_remote_functions(): import dpdata from itertools import groupby def make_deepmd_task_dirs(input_template: dict, model_num: int, type_map: List[str], train_systems: List[str], outlier_systems: List[str], validation_systems: List[str], isolate_outliers: bool, outlier_weight: float, dw_input_template: Optional[dict], base_dir: str, ): dp_task_dirs = [os.path.join(base_dir, f'{i:03d}') for i in range(model_num)] for task_dir in dp_task_dirs: os.makedirs(task_dir, exist_ok=True) dp_input = make_deepmd_input( input_template=input_template, type_map=type_map, train_systems=train_systems, outlier_systems=outlier_systems, validation_systems=validation_systems, isolate_outliers=isolate_outliers, outlier_weight=outlier_weight, ) # If dw_model is set in dp_input, # it will create a softlink named dw_model.pb to the current dir, # and then modify the value in dw_model to dw_model.pb. dw_model = dict_nested_get(dp_input, ['model', 'modifier', 'model_name'], None) if dw_model is not None: # Create a soft link named 'dw_model.pb' to the file specified by dw_model link_target = os.path.join(task_dir, 'dw_model.pb') os.system(f'ln -sf {dw_model} {link_target}') # Modify the value in dw_model to 'dw_model.pb' dp_input['model']['modifier']['model_name'] = 'dw_model.pb' dp_input_path = os.path.join(task_dir, DP_INPUT_FILE) dump_json(dp_input, dp_input_path) dw_task_dir = None if dw_input_template: dw_task_dir = os.path.join(base_dir, 'train_dw') os.makedirs(dw_task_dir, exist_ok=True) dw_input = make_deepmd_input( input_template=dw_input_template, type_map=type_map, train_systems=train_systems, outlier_systems=outlier_systems, validation_systems=validation_systems, isolate_outliers=isolate_outliers, outlier_weight=outlier_weight, ) dw_input_path = os.path.join(dw_task_dir, DP_INPUT_FILE) dump_json(dw_input, dw_input_path) return dp_task_dirs, dw_task_dir def make_deepmd_input(input_template: dict, type_map: List[str], train_systems: List[str], outlier_systems: List[str], validation_systems: List[str], isolate_outliers: bool, outlier_weight: float, ): # create dp train input file # ref: https://github.com/deepmodeling/dpgen2/blob/master/examples/ch4/param_CH4_deepmd-kit-2.1.1.json # ref: https://github.com/deepmodeling/dpgen2/blob/master/dpgen2/op/prep_dp_train.py # ref: https://github.com/deepmodeling/dpgen2/blob/master/dpgen2/op/run_dp_train.py def _random_seed(): return random.randrange(sys.maxsize) % (1 << 32) dp_input = copy.deepcopy(input_template) training: dict = dp_input['training'] # set output files training['disp_file'] = DP_DISP_FILE training['save_ckpt'] = DP_CHECKPOINT_FILE training['profiling_file'] = DP_PROFILING_FILE # set random seed discriptor = dp_input['model']['descriptor'] if discriptor['type'] == 'hybrid': for d in discriptor['list']: d['seed'] = _random_seed() else: discriptor['seed'] = _random_seed() dp_input['model']['fitting_net']['seed'] = _random_seed() dp_input['training']['seed'] = _random_seed() auto_prob_str = "prob_sys_size" if isolate_outliers and len(outlier_systems) > 0: # if isolate_outlier is enabled, then we need to set different weight for outlier data # e.g: prob_sys_size;0:57:0.9997;57:60:0.0003 auto_prob_str += f';0:{len(train_systems)}:{1-outlier_weight}' auto_prob_str += f';{len(train_systems)}:{len(train_systems)+len(outlier_systems)}:{outlier_weight}' # v2 training data training_data = training.get('training_data', {}) training_data = { 'set_prefix': 'set', 'batch_size': 'auto', 'sys_probs': None, ** training_data, 'systems': train_systems + outlier_systems, 'auto_prob_style': auto_prob_str, } training['training_data'] = training_data # ignore validation section if no data is provided, or else dp will throw error # OSError: [Errno cannot find valid a data system] Please check your setting for data systems if len(validation_systems) > 0: validation_data = { 'systems': validation_systems, 'set_prefix': training_data['set_prefix'], 'batch_size': training_data['batch_size'], } training['validation_data'] = validation_data # other params dp_input['model']['type_map'] = type_map return dp_input def make_deepmd_dataset( dataset_dir: str, outlier_dir: str, raw_data_collection: List[ArtifactDict], isolate_outliers: bool, outlier_f_cutoff: float, type_map: List[str], deepmd_input_template: dict, group_by_formula: bool, mode: str, sel_type: Optional[List[int]], ): register_data_types() dataset_collection: List[Tuple[ArtifactDict, dpdata.LabeledSystem]] = [] outlier_collection: List[Tuple[ArtifactDict, dpdata.LabeledSystem]] = [] for raw_data in raw_data_collection: data_format = get_data_format(raw_data) # type: ignore dp_system = None if data_format == DataFormat.CP2K_OUTPUT_DIR: dp_system = dpdata.LabeledSystem(os.path.join(raw_data['url'], 'output'), fmt='cp2k/output', type_map=type_map) elif data_format == DataFormat.VASP_OUTPUT_DIR: dp_system = dpdata.LabeledSystem(os.path.join(raw_data['url'], 'OUTCAR'), fmt='vasp/outcar', type_map=type_map) else: raise ValueError(f"Unsupported data format: {data_format}") # one case of len(dp_system) == 0 is when the system is not converged if dp_system is None or 0 == len(dp_system): continue # skip invalid data # generate extra data for dpff if mode == 'dpff': modifier = deepmd_input_template['model']['modifier'] model_name = modifier['model_name'] assert sel_type is not None, 'sel_type must be set in dpff mode' if data_format == DataFormat.CP2K_OUTPUT_DIR: # Arguments of DPLR model can be found here: # https://github.com/deepmodeling/deepmd-kit/blob/master/doc/model/dplr.md set_dplr_ext_from_cp2k_output( dp_sys=dp_system, cp2k_output=os.path.join(raw_data['url'], 'output'), wannier_file=os.path.join(raw_data['url'], 'wannier.xyz'), ext_efield=raw_data['attrs']['efield'], type_map=type_map, sys_charge_map=modifier['sys_charge_map'], model_charge_map=modifier['model_charge_map'], ewald_h=modifier['ewald_h'], ewald_beta=modifier['ewald_beta'], sel_type=sel_type, ) else: raise ValueError(f"Unsupported data format: {data_format}") # set fparam if existed fparam = raw_data['attrs'].get('dp_fparam', None) if fparam is not None: set_fparam(dp_system, fparam) if isolate_outliers and dp_system.data['forces'].max() > outlier_f_cutoff: outlier_collection.append((raw_data, dp_system)) else: dataset_collection.append((raw_data, dp_system)) _write_dp_dataset = _write_dp_dataset_by_formula if group_by_formula else _write_dp_dataset_by_ancestor dataset_dirs = _write_dp_dataset(dp_system_list=dataset_collection, out_dir=dataset_dir, type_map=type_map) outlier_dirs = _write_dp_dataset(dp_system_list=outlier_collection, out_dir=outlier_dir, type_map=type_map) return dataset_dirs, outlier_dirs def _write_dp_dataset_by_formula(dp_system_list: List[Tuple[ArtifactDict, dpdata.LabeledSystem]], out_dir: str, type_map: List[str]): """ Write dp dataset that grouping by formula Use dpdata.MultipleSystems to merge systems with the same formula Use this when group by ancestor not works for you """ if len(dp_system_list) == 0: return [] os.makedirs(out_dir, exist_ok=True) multi_systems = dpdata.MultiSystems(dp_system_list[0][1]) for _, system in dp_system_list[1:]: multi_systems.append(system) multi_systems.to_deepmd_npy(out_dir, type_map=type_map) # type: ignore return [ { 'url': os.path.join(out_dir, fname), 'format': DataFormat.DEEPMD_NPY, 'attrs': {}, # it is meaning less to set attrs in this case } for fname in os.listdir(out_dir)] def _write_dp_dataset_by_ancestor(dp_system_list: List[Tuple[ArtifactDict, dpdata.LabeledSystem]], out_dir: str, type_map: List[str]): """ write dp dataset that grouping by ancestor """ output_dirs: List[ArtifactDict] = [] for key, dp_system_group in groupby(dp_system_list, key=lambda x: x[0]['attrs']['ancestor']): dp_system_group = list(dp_system_group) if 0 == len(dp_system_group): continue # skip empty dataset group_out_dir = os.path.join(out_dir, key.replace('/', '_')) # merge dp_systems with the same ancestor into single data set dp_system = dp_system_group[0][1] for item in dp_system_group[1:]: dp_system += item[1] dp_system.to_deepmd_npy(group_out_dir, set_size=len(dp_system), type_map=type_map) # type: ignore # inherit attrs key from input artifact output_dirs.append({'url': group_out_dir, 'format': DataFormat.DEEPMD_NPY, 'attrs': {**dp_system_group[0][0]['attrs']}}) # type: ignore return output_dirs return ( make_deepmd_task_dirs, make_deepmd_input, make_deepmd_dataset, ) ( make_deepmd_task_dirs, make_deepmd_input, make_deepmd_dataset, ) = __export_remote_functions()