Source code for matador.swaps.swaps

# coding: utf-8
# Distributed under the terms of the MIT License.

""" This file implements atomic swaps through the `AtomicSwapper` class. """


import re
from copy import deepcopy
from matador.utils.print_utils import print_success, print_warning
from matador.utils.chem_utils import get_periodic_table, get_stoich


[docs]class AtomicSwapper: """ This class handles the creation of input files from database queries that have swapped atoms. """ def __init__( self, cursor, swap=None, uniq=False, top=None, maintain_num_species=True, debug=False, **kwargs): """ Initialise class with query cursor and arguments. Parameters: cursor (list): cursor of documents to swap. Keyword arguments: swap (str): specification of swaps to perform, e.g. "LiP:KSn" will swap all Li->P and all K->Sn in the cursor. uniq (bool/float): filter documents by similarity with the default sim_tol (True) or the value provided here. top (int): only swap from the first `top` structures in the cursor. maintain_num_species (bool): only perform swaps that maintain the number of species in the structure debug (bool): enable debug output kwargs (dict): dictionary of extra arguments that should be ignored. """ # define some swap macros self.periodic_table = get_periodic_table() self.maintain_num_species = maintain_num_species self.swap_dict_list = None self.swap_args = swap del self.periodic_table['X'] self.template_structure = None self.cursor = list(cursor) if top is not None: self.cursor = self.cursor[:top] if len(self.cursor) == 0: return self.swap_counter = 0 self.parse_swaps(self.swap_args) swap_cursor = [] for doc in self.cursor: docs, counter = self.atomic_swaps(doc) self.swap_counter += counter if counter > 0: swap_cursor.extend(docs) self.cursor = swap_cursor if self.swap_counter > 0: print_success('Performed {} swaps.'.format(self.swap_counter)) else: print_warning('No swaps performed.') if uniq: from matador.utils.cursor_utils import filter_unique_structures print('Filtering for unique structures...') filtered_cursor = filter_unique_structures(self.cursor, debug=debug, sim_tol=uniq) print('Filtered {} down to {}'.format(len(self.cursor), len(filtered_cursor))) self.cursor = filtered_cursor
[docs] def parse_swaps(self, swap_args=None): """ Parse command line options into valid atomic species swaps. e.g. --swap LiP:NaAs ==> [[['Li'], ['P']], [['Na'], ['P']]. Handles multiple many-to-many swaps, macros for groups of the periodic table, and wildcards. Keyword arguments: swap_args (str): overrides command-line swap args. """ self.swap_pairs = [] if swap_args is None: swap_args = self.swap_args if swap_args is None: raise RuntimeError('No swap arguments passed.') if isinstance(swap_args, str): swap_args = [swap_args.strip()] if len(swap_args) > 1: raise RuntimeError('Detected whitespace in your input clear it and try again.') swap_list = swap_args[0].split(':') for swap in swap_list: if len(swap) <= 1: raise RuntimeError('Not enough arguments for swap!') # check is both options are groups if '][' in swap: tmp_list = [x for x in swap.split('][') if x != ''] # check if only first option is group elif swap[0] == '[': tmp_list = [x for x in swap.split(']') if x != ''] # check if only last option is group elif swap[-1] == ']': tmp_list = [x for x in swap.split('[') if x != ''] # check if no groups else: tmp_list = [x for x in re.split(r'([A-Z][a-z]*)', swap) if x != ''] for ind, tmp in enumerate(tmp_list): tmp_list[ind] = self._atoms_to_list(tmp) if len(tmp_list) != 2: raise RuntimeError('Unable to parse swap! {} should contain only two entries'.format(tmp_list)) self.swap_pairs.append(tmp_list) self.construct_swap_options()
def _atoms_to_list(self, atom_string): """ For a given set of atoms in a string, parse any macros and return a list of options. e.g. '[V' -> [<all group V atoms>], and 'V' -> ['V']. Parameters: atom_string (str): formula string with macros. """ if '[' in atom_string or ']' in atom_string: group = atom_string.replace('[', '') group = group.replace(']', '') if group in self.periodic_table: atom_list = self.periodic_table[group] else: atom_list = group.split(',') else: return [atom_string] return [x.strip() for x in atom_list]
[docs] def construct_swap_options(self): """ Iterate over possible combinations of multiple many-to-many swaps and create a dict for each swap. """ self.swap_dict_list = [] from itertools import product for branch in product(*([pair[1] for pair in self.swap_pairs])): self.swap_dict_list.append(dict()) for ind, pair in enumerate(self.swap_pairs): for swap_from in pair[0]: if swap_from != branch[ind]: self.swap_dict_list[-1][swap_from] = branch[ind]
[docs] def atomic_swaps(self, source_doc): """ Swap atomic species according to parsed options. Parameters: source_doc (dict): matador doc to swap from. """ new_doc = deepcopy(source_doc) swapped_docs = [] unswapped_num_species = len(set(source_doc['atom_types'])) for swap in self.swap_dict_list: if any(key in source_doc['atom_types'] for key in swap): new_doc['atom_types'] = [swap.get(atom, atom) for atom in source_doc['atom_types']] new_doc['_swapped_stoichiometry'] = get_stoich(source_doc['atom_types']) new_doc['stoichiometry'] = get_stoich(new_doc['atom_types']) new_doc['elems'] = set(new_doc['atom_types']) new_doc['num_species'] = len(new_doc['elems']) if not self.maintain_num_species or new_doc['num_species'] == unswapped_num_species: swapped_doc = deepcopy(new_doc) swapped_docs.append(swapped_doc) return swapped_docs, len(swapped_docs)