grrmpy.neb.repetitive_neb のソースコード

from ase.optimize import FIRE,LBFGS
from ase.io import write
from ase.neb import NEB,interpolate
from ase.units import kJ, mol
import numpy as np
from scipy.signal import argrelmax,argrelmin
# USER
from grrmpy.calculator import pfp_calculator
from grrmpy.neb.functions import to_html_nebgraph
from grrmpy.functions import copy_images, set_calc2images, set_const2images, get_energy_list, n_sampling
from grrmpy.optimize.attach import automate_maxstep,same_energy_and_fmax,steep_peak
from grrmpy.optimize.optimizer import OptWithCond

NEB_MAXSTEP_CLIMB_FALSE = {10:0.01,5:0.05,2:0.07,1:0.1,0.1:0.2,0.07:0.3,0.06:0.4,0:0.5,}
NEB_MAXSTEP_CLIMB_TRUE = {1:0.05,0:0.01,}
OPT_MAXSTEP = {10:0.1,5:0.2,2:0.3,0:0.35,}


[ドキュメント]class RepetitiveNEB(): """ | 一つのNEBイメージに対して,異なる条件で複数回NEB計算を行なうためのクラス. | デフォルトではclimb,maxsteps,fmax,stepsなど,NEBやOptimizerの引数を変えられる. Parameters: images: list of Atoms | Atomsのリスト constraints: ASE Constraint | constrainオブジェクト,複数制約をかける場合リストで与える. parallel: bool | 並列計算を行なう場合True. mic: bool | 最小イメージ規則にする場合True. デフォルトでは周期境界条件の場合自動でTrueにする. calc_func: functions object | claculatorを返す関数 name: | 指定した名前でlogファイルが作成される. Properties: | RepetitiveNEBのプロパティ(の一部). | このクラスを継承するし, 新たなクラスを作成することでカスタマイズされたNEBを作成することができる. | 例えば,updateにイメージを切断,挿入する関数を与えることでSNEBは作成された. images: list of Atoms | 現在実行中のNEBのimages neb: NEB | 現在実行中のneb opt: Optimizer | 現在実行中のOptimizer converged: bool | 直近のNEBの収束状況 """ _default_param = { "base_optimizer":FIRE, "climb_list":[False,False,True], "maxstep_list":[0.03,0.1,None], "fmax_list":[0.05,0.05,0.05], "steps_list":[200,2000,2000], "neb_kwargs":{}, "opt_kwargs":{}, "automate_maxstep_true":NEB_MAXSTEP_CLIMB_TRUE, "automate_maxstep_false":NEB_MAXSTEP_CLIMB_FALSE, "same_energy_and_fmax_arg":{"times":10,"row":2,"force_digit":4,"energy_digit":6}, "steep_peak_arg":{"max_ea":300, "nsteps":100}, } def __init__(self,images, constraints=[], parallel=True, mic=None, calc_func=pfp_calculator,name=None,): """異なる条件で繰り返しNEB計算を行なう Parameters: images: list of Atoms Atomsのリスト constraints: ASE Constraint constrainオブジェクト,複数制約をかける場合リストで与える. parallel: bool 並列計算を行なう場合True. mic: bool 最小イメージ規則にする場合True. デフォルトでは周期境界条件の場合自動でTrueにする. calc_func: functions object claculatorを返す関数 name: 指定した名前でlogファイルが作成される. """ self.images = copy_images(images) self.calc_func = calc_func self.constraints = constraints set_calc2images(self.images,self.calc_func) set_const2images(self.images, self.constraints) self.name=self.__class__.__name__ if name is None else name self.parallel = parallel self.constraints = constraints if mic is None: mic = True if any(self.images[0].get_pbc()) else False self.mic = mic self.attach_list = [] self.update_list = [] self.attach_stop_list = [] @property def default_param(self): """run()実行時のデフォルトの引数""" return self._default_param def check_param(self): neb = ["climb_list","maxstep_list","fmax_list","steps_list"] neb_len = [len(getattr(self, i)) for i in neb] if not all([i==neb_len[0] for i in neb_len]): raise KeyError(f"{', '.join(neb)}は同じ要素数である必要があります") def set_param(self,**param): default_param = self.default_param unexpected_argument = set(param)-set(default_param) if len(unexpected_argument) != 0: raise Exception(f"予期しない引数({', '.join(unexpected_argument)})が存在します.\n"+ f"'self.default_param??'で引数を確認できます\n" f"Parameters:\n{', '.join(self.default_param.keys())}") default_param.update(param) for key,val in default_param.items(): setattr(self, key, val) def run_neb(self, climb_list, maxstep_list, fmax_list, steps_list): for climb,maxstep,fmax,steps in zip(climb_list,maxstep_list,fmax_list,steps_list): self.neb = NEB(self.images,climb=climb,parallel=self.parallel,**self.neb_kwargs) self.opt = OptWithCond(self.base_optimizer, self.neb, logfile=f"{self.name}.log", maxstep=maxstep if maxstep else 0.2) self.opt.attach_stop(same_energy_and_fmax(self.opt,**self.same_energy_and_fmax_arg)) self.opt.attach_stop(lambda:steep_peak(self.opt,**self.steep_peak_arg)) if maxstep is None: self.opt.attach( lambda:automate_maxstep( self.opt, self.automate_maxstep_true if climb else self.automate_maxstep_false)) for args,kwargs in self.attach_list: self.opt.attach(*args,**kwargs) for args,kwargs in self.attach_stop_list: self.opt.attach_stop(*args,**kwargs) self.converged = self.opt.run(fmax=fmax,steps=steps) self.do_update() return self.converged
[ドキュメント] def attach(self,*args,**kwargs): """ASEのOptimizerと同じ""" self.attach_list.append((args,kwargs))
[ドキュメント] def attach_stop(self,*args,**kwargs): """ | True or Falseを与える関数を設定する | ASEのoptimizerのようにイタレーション毎に与えた関数が実行される. | 関数の戻り値がTrueになった場合, 計算が停止する. """ self.attach_stop_list.append((args,kwargs))
[ドキュメント] def update(self,function): """1回のNEB計算が終了する毎に実行される関数を指定できる""" self.update_list.append(function)
def do_update(self): for function in self.update_list: function()
[ドキュメント] def run(self,**params): """ | 計算を実行する | 設定可能な引数がdefault_paramで確認できる. Parameters: base_optimizer: Optimizer | NEB計算に使用するOptimizer. | ここで指定したOptimizerを継承して作成されるOptWithCondオプティマイザーが実際には使用される. climb_list: list of bool 毎回のNEB計算のclib maxstep_list: list of float | 毎回のNEB計算のmaxstep_list | 要素にNoneを指定した場合,forceに合わせて自動でmaxstepを変化させる. | どのように変化させるかはautomate_maxstep_true,automate_maxstep_falseで指定できる. fmax_list: list of float | 毎回のNEB計算のfmax steps_list: list of int | 毎回のNEB計算のsteps neb_kwargs: dict | NEBのインスタンス時の引数を指定したい場合に用いる | 例えばばね定数kを変えたい場合{"k"=0.2}のようにする | ここで指定した値は全てのNEB計算に適用される opt_kwargs: dict | Optimizerのインスタンス引数を指定したい場合に用いる. automate_maxstep_true: dict | climb=True時にautomaxstepを使用した際のmaxstep値 automate_maxstep_false: dict | climb=False時にautomaxstepを使用した際のmaxstep値 same_energy_and_fmax_arg: dict | 停止基準 steep_peak_arg: | 停止基準 """ self.set_param(**params) self.check_param() return self.run_neb(self.climb_list, self.maxstep_list, self.fmax_list, self.steps_list)
class CutNEB(RepetitiveNEB): def __init__(self,images,constraints=[],parallel=True,calc_func=pfp_calculator,**kargs): from grrmpy.io.html import write_html #循環インポートになるため super().__init__(images,constraints,parallel,calc_func,**kargs) self.barrierless = False self.attach(lambda:write(f"{self.name}.traj",self.images)) self.attach(lambda:write_html(f"{self.name}.html",self.images)) def adjust_images(self,images,final_nimages,reverse=False): """ imagesに新らたにimageを追加したり,削除したりしてfinal_nimages+1個のイメージになるように調節する """ if final_nimages+1 == len(images): return copy_images(images) if final_nimages+1 < len(images): # イメージを削除する new_images = copy_images(n_sampling(images,final_nimages+1,True)) else: add_n = final_nimages-len(images)+1 # 新たに追加するimages数 box = len(images)-1 # imageとimegeの間の数(この間に新たなimageを挿入する) basic_n = add_n // box extra_n = add_n % box insert_list = [basic_n for _ in range(box)] for i in range(extra_n): i = i if reverse else -(i+1) insert_list[i] += 1 new_images = [] for i,n in enumerate(insert_list): p_images = [images[i].copy() for _ in range(n+1)]+[images[i+1].copy()] interpolate(p_images,self.mic) new_images+=p_images[:-1] new_images+=[images[-1]] return new_images def cut_images(self,threshold,nimages): energy_list = get_energy_list(self.images,self.calc_func) energy_list = np.array([energy*mol/kJ for energy in energy_list]) # 単位換算&np.array化 if len(argrelmax(energy_list)[0])==0 and len(argrelmin(energy_list)[0])==0: self.barrierless = True return # バリアレスの場合 # 削除するindex番号を調べる ts_i = energy_list[1:-1].argmax()+1 # 両端以外で最大値を探す ini_e = energy_list[:ts_i].min() ts_e = energy_list[ts_i] fin_e = energy_list[ts_i:].min() ini_threshold_val = (ts_e-ini_e)*threshold*0.01 + ini_e fin_threshold_val = (ts_e-fin_e)*threshold*0.01 + fin_e ini_del_idxes = [i for i,b in enumerate(energy_list<ini_threshold_val) if b and i<ts_i] # 削るindex番号 fin_del_idxes = [i for i,b in enumerate(energy_list<fin_threshold_val) if b and i>ts_i] # 削るindex番号 # いき過ぎたイメージの削除を阻止する if ts_i-self.neighborhood <= 0: ini_del_idxes = [] # 削除するindex番号 elif ts_i-self.neighborhood in ini_del_idxes: ini_del_idxes = [i for i in range(0,ts_i-self.neighborhood)] if ts_i+self.neighborhood >= len(energy_list)-1: fin_del_idxes = [] elif ts_i+self.neighborhood in fin_del_idxes: fin_del_idxes = [i for i in range(ts_i+self.neighborhood+1,len(energy_list))] self.new_ini_idx = ini_del_idxes[-1]+1 if ini_del_idxes != [] else 0 # write_progress_htmlに伝えるため self.new_fin_idx = fin_del_idxes[0]-1 if ini_del_idxes != [] else len(self.images)-1 # 両端を削る ini_deleted_images = [image for i,image in enumerate(self.images[:ts_i+1]) if not i in ini_del_idxes] fin_deleted_images = [image for i,image in enumerate(self.images[ts_i:],start=ts_i) if not i in fin_del_idxes] # nimagesに合わせ新たにimagesを追加または削除 ini_images = self.adjust_images(ini_deleted_images,int((nimages-1)/2),reverse=True) fin_images = self.adjust_images(fin_deleted_images,int((nimages-1)/2)) self.images = ini_images + fin_images[1:] set_calc2images(self.images,self.calc_func) set_const2images(self.images,self.constraints) @property def default_param(self): param = super().default_param this_param = { "nimages_list":[13 for _ in range(len(param["climb_list"])-1)], #親クラスのデフォルト値が変更されてもいいように "threshold_list":[10 for _ in range(len(param["climb_list"])-1)], "neighborhood":1, } param.update(this_param) return param def write_progress_html(self,text=True): with open(f"{self.name}_progress.html","a") as f: html_text = to_html_nebgraph(self.neb,self.calc_func,False,include_plotlyjs="cdn") if text: f.write(f"<p>{self.new_ini_idx},{self.new_fin_idx}</p>") f.write(html_text) def check_param(self): super().check_param() if len(self.climb_list) != len(self.nimages_list)+1 or len(self.climb_list) != len(self.threshold_list)+1: raise Exception(f"len(climb_list)==len(nimages_list)+1==len(threshold_list)+1を満たす必要があります") if any([i%2==0 for i in self.nimages_list]): raise Exception(f"nimages_listは奇数のリストです") def set_param(self,**params): super().set_param(**params) image_change_iter = iter( [lambda:self.cut_images(threshold,nimages) for threshold,nimages in zip(self.threshold_list,self.nimages_list)]+ [lambda:None]) self.update(lambda:next(image_change_iter)()) write_html_iter = iter( [lambda:self.write_progress_html(True) for _ in self.threshold_list] + [lambda:self.write_progress_html(False)] ) self.update(lambda:next(write_html_iter)()) class SNEB(RepetitiveNEB): def __init__(self,**kargs): super().__init__(**kargs) def updata_images(self,images): return images def check_param(self): super().check_param() if len(self.climb_list) != len(self.nimages_list)+1: raise KeyError(f"len(climb_list)==len(nimages_list)+1を満たす必要があります") def set_param(self,**params): super().set_param(**params) attach_list = [{"function":lambda:write(f"{self.name}.traj",self.images)}, {"function":lambda:write_html(f"{self.name}.html",self.images)}] self.attach_list += attach_list