Source code for matador.plugins.voronoi_interface.voronoi_similarity

# encoding: utf-8
""" This file implements lattice-site level measures of similarity, using
Voronoi decompositions.

Sketch of methodology:

* For a list of structures, take each in turn, compute the Voronoi substructure
and create normalised padded arrays so that they can be compared.

* For each structure, find the *unique* sites, to some definition of unique, initially
just a simple np.isclose() on the site arrays.

* Now do an all-to-all comparison of the unique sites in each structure, yielding
an overall list of unique substructures. This step should have a dial that can be turned
such that all sites fold onto each other, or all sites become distinct, i.e. sensitivity
vs specificity.

* [OPTIONAL] The remaining unique sites across the whole list can now be clustered, if desired,
using e.g. hierarchical clustering.

* Every site in every structure can now be assigned to one of the unique sites above, or
one of the unique clusters.

* Finally, the structures themselves can be clustered by the sites that are present, if desired.

"""
from collections import defaultdict
import numpy as np

from matador.utils.cell_utils import frac2cart


[docs]def are_sites_the_same(site_A, site_B, rtol=1e-2, atol=1e-2): """ Simple check for uniqueness of sites based on their Voronoi substructure. Input: | site_A : dict, with elements as keys, containing numpy arrays of normalised solid angles. | site_B : dict, as A. Args: | rtol : relative tolerance in solid angle for np.allclose, | atol : absolute tolerance in solid angle for np.allclose. Returns: | True if sites have approximately the same values for each species, else False. """ # if a key is missing from A or B, check that its not some extraneous distant atom for key in site_B: if key not in site_A: site_A_missing_key = np.zeros_like(site_B[key]) if not (np.allclose(site_A_missing_key, site_B[key], atol=atol, rtol=rtol) or np.allclose(site_B[key], site_A_missing_key, atol=atol, rtol=rtol)): return False # now check remaining keys for key in site_A: if key not in site_B: site_B_missing_key = np.zeros_like(site_A[key]) if not (np.allclose(site_B_missing_key, site_A[key], atol=atol, rtol=rtol) or np.allclose(site_A[key], site_B_missing_key, atol=atol, rtol=rtol)): return False else: if len(site_A[key]) != len(site_B[key]): if len(site_A[key]) > len(site_B[key]): padded_B = np.append(site_B[key], np.zeros((len(site_A[key])-len(site_B[key])))) if not (np.allclose(site_A[key], padded_B, atol=atol, rtol=rtol) or np.allclose(site_A[key], padded_B, atol=atol, rtol=rtol)): return False else: padded_A = np.append(site_A[key], np.zeros((len(site_B[key])-len(site_A[key])))) assert np.allclose(padded_A, site_B[key], atol=atol, rtol=rtol) is np.allclose(site_B[key], padded_A, atol=atol, rtol=rtol) if not (np.allclose(padded_A, site_B[key], atol=atol, rtol=rtol) or np.allclose(site_B[key], padded_A, atol=atol, rtol=rtol)): return False else: if not (np.allclose(site_A[key], site_B[key], atol=atol, rtol=rtol) or np.allclose(site_B[key], site_A[key], rtol=rtol, atol=atol)): return False return True
[docs]def collect_unique_sites(cursor): """ Collect unique substrucs from each structure into a single dict. Input: | cursor: list(dict), list of structures with pre-computed unique sites. Returns: | unique_environments: dict(list), dict with element symbol keys containing a list of each unique substructure. """ unique_environments = defaultdict(list) for doc in cursor: for elem in doc['unique_substrucs']: for substruc in doc['unique_substrucs'][elem]: unique_environments[elem].append(substruc) return unique_environments
[docs]def get_max_coordination_of_elem(single_elem_environments): """ For a given set of elemental environments, find the greatest number of each element in a site. Input: | single_elem_environments: list(dict), list of voronoi substructures, Returns: | max_num_elem: dict(int), greatest number of each element in a substruc. """ max_num_elem = {} for site_ind, site in enumerate(single_elem_environments): for elem, value in site: elem_len = len([val for val in site if val[0] == elem]) if elem not in max_num_elem: max_num_elem[elem] = 0 if elem_len > max_num_elem[elem]: max_num_elem[elem] = elem_len return max_num_elem
[docs]def create_site_array(unique_environments, max_num_elems=None, elems=None, normalise=False): """ Create padded numpy arrays based on unique environments provided. Input: | unique_environments: dict(list), dict with element keys full of local substructures. Args: | max_num_elems : dict(dict(int)), dict with element keys, containing sub-dict with element keys that contain the largest number of each type of element contributing to a site. If None, this will be calculated based on the input environments. The site array is padded to this size. | elems : list(str), custom list of elements to loop over. | normalise : bool, whether to normalise site array to 1. Returns: | site_array : dict(np.ndarray), dict with element keys full of normalised site arrays. | max_num_elems : dict(dict(int)), dict with element keys containing the highest number of atoms of an element contributing to a particular site, e.g. {'P': {'K': 10}}. """ site_array = defaultdict(list) if elems is None: elems = [elem for elem in unique_environments] if max_num_elems is None: max_num_elems = dict() for elem in elems: max_num_elems[elem] = get_max_coordination_of_elem(unique_environments[elem]) for elem in elems: for site_ind, site in enumerate(unique_environments[elem]): site_array[elem].append(dict()) total_angle = 0 for _elem in elems: if _elem not in max_num_elems[elem]: max_num_elems[elem][_elem] = 0 site_array[elem][-1][_elem] = np.zeros((max_num_elems[elem][_elem])) count = 0 for species, angle in site: if species == _elem: site_array[elem][-1][_elem][count] = angle count += 1 total_angle += angle if normalise: for _elem in elems: site_array[elem][-1][_elem] /= total_angle return site_array, max_num_elems
[docs]def set_substruc_dict(doc): """ Compute voronoi substructure and collect into dict. Sets the 'voronoi_substruc' and 'substruc_dict' entries in the doc. Input: | doc: dict, structure to compute substructure of. """ if 'voronoi_substruc' not in doc: from matador.plugins.voronoi_interface.voronoi_interface import get_voronoi_substructure doc['voronoi_substruc'] = get_voronoi_substructure(doc) voronoi_substruc_dict = dict() elems = set(doc['atom_types']) for elem in elems: voronoi_substruc_dict[elem] = [substruc[1] for substruc in doc['voronoi_substruc'] if substruc[0] == elem] doc['substruc_dict'] = voronoi_substruc_dict
[docs]def set_site_array(doc, normalise=False): """ Set the 'site_array' entry in the chosen document. Creates a dict of numpy arrays of normalised solid angles with element keys from the import calculated substructure. Input: | doc: dict, matador doc containing substruc_dict. Args: | normalise: bool, whether to normalise site_arrays to total angle. """ if 'substruc_dict' not in doc: set_substruc_dict(doc) doc['site_array'], doc['max_num_elems'] = create_site_array(doc['substruc_dict'], normalise=normalise)
[docs]def get_unique_sites(doc, atol=1e-2, rtol=1e-2): if 'substruc_dict' not in doc: set_substruc_dict(doc) elems = set(doc['atom_types']) substruc_dict = doc['substruc_dict'] if 'site_array' not in doc: set_site_array(doc) site_array = doc['site_array'] unique_sites = dict() degeneracies = dict() similar_sites = dict() for elem in elems: unique_sites[elem] = set() similar_sites[elem] = [] for i in range(len(site_array[elem])): for j in range(i+1, len(site_array[elem])): same = are_sites_the_same(site_array[elem][i], site_array[elem][j], atol=atol, rtol=rtol) if same: added = False for ind, site in enumerate(similar_sites[elem]): if i in site: similar_sites[elem][ind].add(j) added = True break if j in site: similar_sites[elem][ind].add(i) added = True break if not added: similar_sites[elem].append(set([i, j])) if not any([i in _set for _set in similar_sites[elem]]): similar_sites[elem].append(set([i])) # for env in site_array[elem]: # print(env) from copy import deepcopy temp_similar_sites = deepcopy(similar_sites[elem]) valid = False _iter = 0 while not valid: scrubbed = False _iter += 1 if _iter > 100: raise RuntimeError for index in range(len(site_array[elem])): index_sites = [] for ind, site in enumerate(temp_similar_sites): if index in site: index_sites.append(ind) # if index is in multiple sites, combine all sites into the first if len(index_sites) > 1: # construct mean of each set of sites, and if they are too far apart, get worried site_mean = [] for j in range(len(index_sites)): site_mean.append(defaultdict(list)) for _elem in site_array[elem][i]: site_mean[-1][_elem].append(np.mean(np.asarray([site_array[elem][i][_elem] for i in temp_similar_sites[index_sites[j]]]), axis=0)) assert np.shape(site_mean[-1][_elem][-1]) == np.shape(site_array[elem][0][_elem]) for j in range(len(index_sites)-1, 0, -1): # let means deviate by twice the amount if not are_sites_the_same(site_mean[j], site_mean[0], atol=5*atol, rtol=5*rtol): raise RuntimeError('Numerical instability: site groups with distinct means were clustered.') [temp_similar_sites[index_sites[0]].add(i) for i in list(temp_similar_sites[index_sites[j]])] del temp_similar_sites[index_sites[j]] scrubbed = True break elif len(index_sites) == 0: raise RuntimeError if not scrubbed: valid = True similar_sites[elem] = temp_similar_sites unique_sites[elem] = [next(iter(site)) for site in similar_sites[elem]] degeneracies[elem] = [len(_set) for _set in similar_sites[elem]] doc['unique_site_inds'] = unique_sites doc['unique_substrucs'] = dict() doc['unique_site_array'] = dict() doc['unique_site_stddev'] = dict() doc['similar_sites'] = similar_sites for elem in elems: doc['unique_substrucs'][elem] = [substruc_dict[elem][ind] for ind in unique_sites[elem]] doc['unique_site_array'][elem] = [] doc['unique_site_stddev'][elem] = [] for site in similar_sites[elem]: doc['unique_site_array'][elem].append(dict()) doc['unique_site_stddev'][elem].append(dict()) for _elem in site_array[elem][0]: doc['unique_site_array'][elem][-1][_elem] = np.mean(np.asarray([site_array[elem][i][_elem] for i in site]), axis=0) doc['unique_site_stddev'][elem][-1][_elem] = np.std(np.asarray([site_array[elem][i][_elem] for i in site]), axis=0) doc['site_degeneracies'] = degeneracies
[docs]def compare_docs(docA, docB, elems): matching = 0 print('COMPARING', ' '.join(docA['text_id']), 'vs', ' '.join(docB['text_id'])) for elem in elems: if elem not in docA['unique_substrucs'] or elem not in docB['unique_substrucs']: print('Missing elements.') return False for i, site in enumerate(docA['unique_substrucs'][elem]): for other_site in docB['unique_substrucs'][elem]: if are_sites_the_same(site, other_site, ['K', 'P']): matching += docA['site_degeneracies'][elem][i] print(matching, '/', sum(docA['site_degeneracies'][elem])) if matching / sum(docA['site_degeneracies'][elem]) > 0.5: return True
[docs]def cluster(X, hull, max_of_elems, elems=None, method='KMeans', elem='P', quantile=None, n_clusters=None, cmap='tab10'): if method == 'KMeans': from sklearn.cluster import KMeans inertia = [] elem = 'P' est = KMeans(n_clusters=n_clusters, n_jobs=-2, precompute_distances=True) est.fit(X) inertia.append(est.inertia_) labels = est.labels_ # means = est.cluster_centers_ print('Number of clusters: {}'.format(n_clusters)) elif method == 'MeanShift': from sklearn.cluster import MeanShift, estimate_bandwidth bandwidth = estimate_bandwidth(X, quantile=quantile if quantile is not None else 0.2) est = MeanShift(bandwidth=bandwidth, cluster_all=False) est.fit(X) labels = est.labels_ # means = est.cluster_centers_ n_clusters = len(set(labels)) print('Number of estimate clusters:', n_clusters) """ means_of_point_class = np.asarray([means[label] for label in labels]) labels = labels[np.argsort(means_of_point_class[:, 0])] X = X[np.argsort(means_of_point_class[:, 0])] temp_labels = np.zeros_like(labels) sorted_labels = np.zeros_like(labels) for i in range(len(labels)-1): if labels[i] != labels[i+1]: temp_labels[i+1] += 1 current_group = 0 for i in range(len(temp_labels)): if temp_labels[i] == 1: current_group += 1 sorted_labels[i] = current_group """ # fig, axarr = plt.subplots(2, 2, figsize=(10,10)) # sns.set_palette(sns.color_palette(cmap, n_colors=n_clusters)) # colours = [sns.palettes.color_palette(cmap, n_colors=n_clusters)[label] for label in labels] # # sns.stripplot(x=labels, y=plot_X[:, 0], ax=axarr[0, 0], jitter=0.1) # axarr[0, 0].set_ylabel('Fraction of solid angle seen as P') # # axarr[0, 1].scatter(plot_X[:, 1], plot_X[:, 3], c=colours, lw=0, alpha=0.3) # axarr[0, 1].set_xlabel('Fraction of solid angle seen as P') # axarr[0, 1].set_ylabel('Number of contrib P atoms') # # axarr[1, 0].scatter(plot_X[:, 0], plot_X[:, 2], c=colours, lw=0, alpha=0.3) # axarr[1, 0].set_xlabel('Fraction of solid angle seen as K') # axarr[1, 0].set_ylabel('Number of contrib K atoms') # # sns.stripplot(x=labels, y=plot_X[:, 3], ax=axarr[1, 1], jitter=0.1) # axarr[1, 1].set_ylabel('Number of contrib Pf atoms') # fig.suptitle('Number of clusters: {}'.format(n_clusters)) for doc in hull.hull_cursor: site_classes = [] if elem not in doc['unique_substrucs']: doc['class'] = -1 doc['class_err'] = 0 doc['class_sites'] = [-1] continue for ind, substruc in enumerate(doc['unique_substrucs'][elem]): site_array = np.zeros((1, int(np.sum(max_of_elems[elem])))) elem_counters = np.zeros((len(elems)), dtype=int) for elem_ind, _elem in enumerate(elems): if elem_ind != 0: elem_counters[elem_ind] += max_of_elems[elem][elem_ind-1] for elem_ind, _elem in enumerate(elems): for atom in substruc: if atom[0] == _elem: site_array[0][elem_counters[elem_ind]] = atom[1] elem_counters[elem_ind] += 1 site_classes.append(est.predict(site_array)) site_classes = [val[0] for val in site_classes] doc['class'] = np.median(site_classes) doc['class_sites'] = [val for val in site_classes] doc['class_err'] = (len(set(doc['class_sites']))-1)/n_clusters class_vector = np.zeros((n_clusters)) for site in site_classes: class_vector[int(site)] += 1 doc['class_vector'] = class_vector return est
[docs]def plot_class_hull(hull, n_clusters, plot_class=None): from random import sample import matplotlib.pyplot as plt fig = plt.figure(figsize=(12, 20)) ax = fig.add_subplot(211) ax2 = fig.add_subplot(212) ax2 = hull.plot_2d_hull(show=False, ax=ax2) colours = plt.cm.Dark2(np.linspace(0, 1, n_clusters*10)) class_concs = defaultdict(list) for doc in hull.hull_cursor: class_concs[int(doc['class'])].append(doc['concentration']) # ax.errorbar(doc['concentration'], doc['class'], yerr=doc['class_err'], fmt='o', c=colours[int(10*doc['class'])], alpha=0.01, zorder=100) for _class in class_concs: ax.errorbar(np.mean(class_concs[_class]), _class, xerr=np.std(class_concs[_class]), fmt='o', c=colours[int(10*_class)], lw=2, zorder=1000) bounds = [0.32, 0.57, 0.74] for bound in bounds: ax.axvline(bound, ls='--', lw=1) ax.set_ylim(-1.5, n_clusters) ax.set_xlim(-0.1, 1.1) for doc in hull.hull_cursor: ax2.scatter(doc['concentration'], doc['formation_enthalpy_per_atom'], c='#eeeeee', s=60, zorder=9999, lw=0) for doc in sample(hull.hull_cursor, len(hull.hull_cursor)): if plot_class is not None: if doc['class'] not in plot_class: continue else: ax2.scatter(doc['concentration'], doc['formation_enthalpy_per_atom'], c=colours[int(10*(doc['class']))], s=50, alpha=max(1-2*doc['class_err'], 0.1), zorder=99999, lw=0) else: if doc['hull_distance'] <= 1e-12: ax2.scatter(doc['concentration'], doc['formation_enthalpy_per_atom'], c=colours[int(10*(doc['class']))], s=75, zorder=99999999, lw=1) else: ax2.scatter(doc['concentration'], doc['formation_enthalpy_per_atom'], c=colours[int(10*(doc['class']))], s=50, alpha=max(1-2*doc['class_err'], 0.1), zorder=99999, lw=0)
[docs]def find_site_index(doc, elem, elem_ind): elem_count = 0 for idx, atom in enumerate(doc['atom_types']): if atom == elem: if elem_count == elem_ind: print('Found substruc at {}'.format(elem_count)) return elem_count elem_count += 1 return False
[docs]def viz_site(doc, targ_substruc, elem, rmax=6): if 'positions_abs' not in doc: doc['positions_abs'] = frac2cart(doc['lattice_cart'], doc['positions_frac']) for ind, substruc in enumerate(doc['substruc_dict'][elem]): if substruc == targ_substruc: elem_ind = ind elem_count = find_site_index(doc, elem, elem_ind) if elem_count is not False: break targ_site = doc['positions_frac'][elem_count] targ_pos = doc['positions_abs'][elem_count] print(elem_count) from itertools import product from collections import defaultdict doc['lattice_cart'] = np.asarray(doc['lattice_cart']) neighbour_doc = defaultdict(list) neighbour_doc['positions_abs'].append(targ_pos) neighbour_doc['positions_frac'].append(targ_site) neighbour_doc['atom_types'].append('U') neighbour_doc['lattice_cart'] = doc['lattice_cart'] for j, pos in enumerate(doc['positions_abs']): for prod in product(range(-1, 2), repeat=3): trans = np.zeros((3)) for ind, multi in enumerate(prod): trans += doc['lattice_cart'][ind] * multi dist2 = 0 for i in range(3): dist2 += (targ_pos[i]-(pos+trans)[i])**2 if dist2 < rmax**2: neighbour_doc['positions_abs'].append(pos + trans) neighbour_doc['positions_frac'].append((np.asarray(doc['positions_frac'][j]) + prod).tolist()) neighbour_doc['atom_types'].append(doc['atom_types'][j]) print(targ_site) return elem_count, neighbour_doc