from pathlib import PurePath
import networkx as nx
import itertools
import pandas as pd
from matplotlib.colors import rgb2hex
import matplotlib.pyplot as plt
from pyvis.network import Network
from ase.units import kJ,mol,Hartree,eV
#USER
from grrmpy.io.read_listlog import log2atoms,read_connections,read_energies
from grrmpy.calculator import pfp_calculator
from grrmpy.conv.atoms2smiles import atomslist2smileses
def make_color(x,cm="gnuplot"):
"""カラースケール
Parameters:
x :
0~1の間の数字
cm str:
'jet','viridis','plasma','inferno','magma','cividis','gnuplot','CMRmap','rainbow'
"""
cmap = plt.get_cmap(cm)
colorcode = rgb2hex(cmap(x))
return colorcode
def make_color_scale(val_list,cm="gnuplot"):
max_val = max(val_list)
min_val = min(val_list)
val_scale = [(i-min_val)/(max_val-min_val) for i in val_list]
color_scale = [make_color(x,cm=cm) for x in val_scale]
return color_scale
class NodeData():
def __init__(self,eq_list:list,energies:list,cm="gnuplot",indices=None):
self.energies = energies
self.atoms_list = eq_list
self.all_data = pd.DataFrame({
"node":[i for i in range(len(self))],
"label":[f"{i}" for i in range(len(self))],
"title":[f"EQ{i}\nEnergy:{e:.03f} kJ/mol" for i,e in zip(range(len(self)),self.energies)],
"color":make_color_scale(self.energies,cm=cm),
"energy":self.energies,
"group":self.grouping(indices),
})
@property
def df(self):
return self.all_data[["node","label","title","color"]]
@property
def data(self):
"""xlwingで動かすプログラムと互換性を保つため"""
data = {
"node":[i for i in range(len(self))],
"name":[f"EQ{i}" for i in range(len(self))],
"group":self.all_data["group"],
"E/Hartree":[i*(kJ/mol)/Hartree for i in self.all_data["energy"]],
"E/kJmol-1":self.all_data["energy"]
}
return pd.DataFrame(data)
@property
def nx_data(self):
columns = self.df.columns
return [(node,dict(zip(columns[1:],data))) for i,node,*data in self.df.itertuples()]
def change_color_scale(self,cm):
self.all_data["color"] = make_color_scale(self.energies,cm)
def grouping(self,indices=None):
smiles_list = atomslist2smileses(self.atoms_list,target0=indices)
unique = sorted(set(smiles_list), key=smiles_list.index)
return [unique.index(smiles) for smiles in smiles_list]
def __len__(self):
return len(self.atoms_list)
class EdgeData():
def __init__(self,ts_list,connections,energies:list):
self.energies = energies
self.atoms_list = ts_list
self.connections = connections
self.all_data = pd.DataFrame({
"edge":[i for i in range(len(self))],
"label":[f"{i}" for i in range(len(self))],
"title":[f"TS{i}\n"+
f"Energy:{e:.03f} kJ/mol\n"+
f"connections:{source}-{target}"
for i,e,(source,target)
in zip(range(len(self)),self.energies,self.connections)],
"energy":self.energies,
"source":self.source,
"target":self.target,
})
@property
def source(self):
return [source for source,_ in self.connections]
@property
def target(self):
return [target for _,target in self.connections]
@property
def data(self):
"""xlwingで動かすプログラムと互換性を保つため"""
data = {
"edge":[i for i in range(len(self))],
"name":[f"TS{i}" for i in range(len(self))],
"source":self.source,
"target":self.target,
"E/Hartree":[i*(kJ/mol)/Hartree for i in self.all_data["energy"]],
"E/kJmol-1":self.all_data["energy"]
}
return pd.DataFrame(data)
def nx_data(self,self_roop=True):
"""エッジのタプルのリストを返す(??などは消す)"""
if self_roop:
edge_list = [(ini_idx,fin_idx) for ini_idx,fin_idx in self.connections
if isinstance(ini_idx, int) and isinstance(fin_idx, int)]
else:
edge_list = [(ini_idx,fin_idx) for ini_idx,fin_idx in self.connections
if isinstance(ini_idx, int) and isinstance(fin_idx, int) and ini_idx!=fin_idx]
return edge_list
def __len__(self):
return len(self.atoms_list)
[ドキュメント]class NetGraph():
"""
Parameters:
\*listlog:
| 方法1: EQ_list.log,TS_list.log,,PT_list.logを与える.
| 方法2: eq_list,ts_list,connectionsを与える.
indices: list of int
| 原子のindex番号のリスト.
| 同一構造をグループ化するために用いる.
| 指定した原子同士の結合状態から同一構造かを判定する.
comfile:
comfileパス
poscar:
POSCARパス
calc_func:
calculatorを返す関数
Examples:
>>> G = NetGraph("XXX_EQ_list","XXX_TS_list","XXX_PT_list",comfile="XXX.com",poscar="POSCAR",indices=[i for i in range(0:10)])
or
>>> G = NetGraph("XXX_EQ_list","XXX_TS_list",comfile="XXX.com",poscar="POSCAR",indices=[i for i in range(0:10)])
or
>>> G = NetGraph(eq_list,ts_list,connections,indices=[i for i in range(0:10)])
"""
def __init__(self,
*listlog,
indices = None,
comfile = None,
poscar = None,
calc_func=pfp_calculator,
):
"""
Parameters:
*listlog:
| 方法1: EQ_list.log,TS_list.log,,PT_list.logを与える.
| 方法2: eq_list,ts_list,connectionsを与える.
indices: list of int
| 原子のindex番号のリスト.
| 同一構造をグループ化するために用いる.
| 指定した原子同士の結合状態から同一構造かを判定する.
comfile:
comfileパス
poscar:
POSCARパス
calc_func:
calculatorを返す関数
"""
# Node
if isinstance(listlog[0],str) or isinstance(listlog[0],PurePath):
# EQ_list.log,TS_list.log,PT_list.logで与えた場合
# Node
eq_energies = read_energies(listlog[0])
eq_energies = [e*Hartree*mol/kJ for e in eq_energies]
node_atoms = log2atoms(listlog[0],comfile,poscar)
# EDGE
ts_energies = list(itertools.chain.from_iterable(
[read_energies(log) for log in listlog[1:]]
))
ts_energies = [e*Hartree*mol/kJ for e in ts_energies]
edge_atoms = list(itertools.chain.from_iterable(
[log2atoms(log,comfile,poscar) for log in listlog[1:]]
))
connections = list(itertools.chain.from_iterable(
[read_connections(log) for log in listlog[1:]]
))
else:
# eq_list,ts_list,connectionsで与えた場合
# NODE
node_atoms = listlog[0]
eq_energies = self._set_calc_and_get_energy(node_atoms,calc_func)
eq_energies = [e*mol/kJ for e in eq_energies]
# EDGE
edge_atoms = listlog[1]
ts_energies = self._set_calc_and_get_energy(edge_atoms,calc_func)
ts_energies = [e*mol/kJ for e in ts_energies]
connections = listlog[2]
self.node = NodeData(node_atoms,eq_energies,indices=indices)
self.edge = EdgeData(edge_atoms,connections,ts_energies)
def _set_calc_and_get_energy(self,atoms_list,calc_func):
try:
energies = [atoms.get_potential_energy() for atoms in atoms_list]
except Exception:
for atoms in atoms_list:
atoms.calc = calc_func()
energies = [atoms.get_potential_energy() for atoms in atoms_list]
return energies
@property
def forward_energy(self):
""""単位はkJ/mol,もしマイナスになる場合0とする"""
node_e = self.node.energies
edge_e = self.edge.energies
source_idx = self.edge.source
return [None if isinstance(i,str) else e-node_e[i] if e-node_e[i]>=0 else 0
for e,i in zip(edge_e,source_idx)]
@property
def reverse_energy(self):
""""単位はkJ/mol"""
node_e = self.node.energies
edge_e = self.edge.energies
target_idx = self.edge.target
return [None if isinstance(i,str) else e-node_e[i] if e-node_e[i]>=0 else 0
for e,i in zip(edge_e,target_idx)]
@property
def node_df(self):
"""xlwingで動かすプログラムと互換性を保つため"""
return self.node.data
@property
def edge_df(self):
"""xlwingで動かすプログラムと互換性を保つため"""
df = self.edge.data
df["forward/kJmol-1"] = self.forward_energy
df["reverse/kJmol-1"] = self.reverse_energy
return df
[ドキュメント] def get_graph(self,self_loop=False,cm="gnuplot",node=None,edge=None):
"""NetWorkXのグラフを返す
Parameters:
self_loop: bool
| 自己ループを含める場合True.
cm : str
| "jet","hot"などmatplotlibで指定できるカラースケール.
Returns:
networkx.Graph: networkxのグラフ
"""
if node is None:
node = self.node.df.to_dict(orient='list')["node"]
if edge is None:
edge = self.edge.nx_data(self_loop)
G = nx.Graph()
self.node.change_color_scale(cm)
G.add_nodes_from(node)
G.add_edges_from(edge)
return G
[ドキュメント] def write_html(self,html,height="500px",width="100%",cm="gnuplot",notebook=True, show_buttons=False,self_loop=False):
"""htmlにグラフを作成する
Parameters:
html: str
保存名.html
height: str
縦幅
width: str
横幅
cm: str
| "jet","hot"などmatplotlibで指定できるカラースケール.
| デフォルトは"gnuplot".
notebook: bool
| Trueした場合,Notebook上に表示される.
| (現在機能していない)
show_buttons: bool
Trueにした場合,全ての機能を搭載したボタンが作成される.
self_loop: bool
自己ループを表示する場合はTrue.
"""
g = Network(height, width,notebook=notebook)
g.from_nx(self.get_graph(self_loop,cm))
if show_buttons:
g.show_buttons(True) # 全機能使用
else:
g.show_buttons(filter_=['physics', 'nodes']) # 一部の機能のみ使用
g.show(html)
@property
def _weight_edge(self):
"""最短距離を計算する時に用いる"""
weight_edge = [[] for _ in range(len(self.node))]
df = self.edge_df
s = df.apply( lambda r: set((r['source'], r['target'])), axis=1)
df = df.loc[s.drop_duplicates().index]
for i,edge,name,source,target,_,_,forward_e,reverse_e,in df.itertuples():
if isinstance(source, int) and isinstance(target, int) and not source==target:
weight_edge[source].append([target,forward_e,name])
weight_edge[target].append([source,reverse_e,name])
return weight_edge
@property
def _group(self):
"""最短距離を計算する時に用いる"""
df = self.node_df
group_dict = {str(group_name):df[df["group"]==group_name]["node"].astype('str').tolist() for group_name in df["group"].unique()}
return group_dict
@property
def _node_data_for_graphml(self):
df = self.node_df
df = df[df.columns[df.columns != 'E/Hartree']]
df["color"] = self.node.df["color"]
key = ["id","name","group","E/kJmol-1","color"]
node_data = [(node,dict(zip(key,[node,name,str(group)]+vals))) for _,node,name,group,*vals in df.itertuples()]
return node_data
@property
def _edge_data_for_graphml(self):
df = self.edge_df
df = df[df.columns[df.columns != 'E/Hartree']]
key = ["edge","name","E/kJmol-1","forward/kJmol-1","reverse/kJmol-1"]
edge_data = [(source,target,dict(zip(key,[edge,name]+vals)))
for _,edge,name,source,target,*vals in df.itertuples()
if not isinstance(source,str) and not isinstance(target,str) and source!=target]
return edge_data
[ドキュメント] def write_graphml(self,graphml):
"""
graphmlファイルを作成する
Parameters:
graphml: str or Path
| graphmlパス(.graphml)
"""
node = self._node_data_for_graphml
edge = self._edge_data_for_graphml
nx.write_graphml(self.get_graph(node=node,edge=edge),path)