Source code for cyclopts.tools

"""Provides some useful tools for working with Cyclopts, including reporting
output.

:author: Matthew Gidden <matthew.gidden _at_ gmail.com>
"""
from __future__ import print_function

import os
import io
import uuid
import shutil
import operator
import tables as t
import numpy as np
from functools import reduce
from collections import defaultdict, Iterable, Sequence, Mapping
import paramiko as pm
from os import kill
from signal import alarm, signal, SIGALRM, SIGKILL
from subprocess import PIPE, Popen
import getpass
import importlib
import itertools as itools
import gc
import resource

import cyclopts
from cyclopts.params import PARAM_CTOR_ARGS, Param, BoolParam, SupConstrParam, \
    CoeffParam

FILTERS = t.Filters(complevel=4)

cyclopts_remote_run_dir = 'cyclopts-runs'

[docs]class Incrementer(object): """A simple helper class to increment a value""" def __init__(self, start = 0): """Parameters ---------- start : int, optional an initial value """ self._val = start - 1
[docs] def next(self): """Returns an incremented value""" self._val += 1 return self._val
class NotSpecified(object): """A helper class singleton for run control meaning that a 'real' value has not been given.""" def __repr__(self): return "NotSpecified" NotSpecified = NotSpecified() # # Run Control # # Code basis taken from xdress' run control in xdress/utils.py. #
[docs]class RunControl(object): """A composable configuration class for cyclopts. Unlike argparse.Namespace, this keeps the object dictionary (__dict__) separate from the run control attributes dictionary (_dict). Modified from xdress' run control in xdress/utils.py""" def __init__(self, **kwargs): """Parameters ------------- kwargs : optional Items to place into run control. """ self._dict = {} for k, v in kwargs.items(): setattr(self, k, v) self._updaters = {} def __getattr__(self, key): if key in self._dict: return self._dict[key] elif key in self.__dict__: return self.__dict__[key] elif key in self.__class__.__dict__: return self.__class__.__dict__[key] else: msg = "RunControl object has no attribute {0!r}.".format(key) raise AttributeError(msg) def __setattr__(self, key, value): if key.startswith('_'): self.__dict__[key] = value else: if value is NotSpecified and key in self: return self._dict[key] = value def __delattr__(self, key): if key in self._dict: del self._dict[key] elif key in self.__dict__: del self.__dict__[key] elif key in self.__class__.__dict__: del self.__class__.__dict__[key] else: msg = "RunControl object has no attribute {0!r}.".format(key) raise AttributeError(msg) def __iter__(self): return iter(self._dict) def __repr__(self): keys = sorted(self._dict.keys()) s = ", ".join(["{0!s}={1!r}".format(k, self._dict[k]) for k in keys]) return "{0}({1})".format(self.__class__.__name__, s) def _pformat(self): keys = sorted(self._dict.keys()) f = lambda k: "{0!s}={1}".format(k, pformat(self._dict[k], indent=2)) s = ",\n ".join(map(f, keys)) return "{0}({1})".format(self.__class__.__name__, s) def __contains__(self, key): return key in self._dict or key in self.__dict__ or \ key in self.__class__.__dict__ def __eq__(self, other): if hasattr(other, '_dict'): return self._dict == other._dict elif isinstance(other, Mapping): return self._dict == other else: return NotImplemented def __ne__(self, other): if hasattr(other, '_dict'): return self._dict != other._dict elif isinstance(other, Mapping): return self._dict != other else: return NotImplemented def _update(self, other): """Updates the rc with values from another mapping. If this rc has if a key is in self, other, and self._updaters, then the updaters value is called to perform the update. This function should return a copy to be safe and not update in-place. """ if hasattr(other, '_dict'): other = other._dict elif not hasattr(other, 'items'): other = dict(other) for k, v in other.items(): if v is NotSpecified: pass elif k in self._updaters and k in self: v = self._updaters[k](getattr(self, k), v) setattr(self, k, v)
[docs]def parse_rc(files): """Parse a list of rc files. Parameters ---------- files : list or str the files to parse Returns ------- rc : RunControl """ files = [files] if isinstance(files, basestring) else files rc = RunControl() for rcfile in files: if not os.path.isfile(rcfile): continue rcdict = {} exec_file(rcfile, rcdict, rcdict) rc._update(rcdict) return rc
[docs]def exec_file(filename, glb=None, loc=None): """A function equivalent to the Python 2.x execfile statement. Taken from xdress/utils.py""" with io.open(filename, 'r') as f: src = f.read() exec(compile(src, filename, "exec"), glb, loc)
def _merge_leaf(node, dest_file): src = node dest = dest_file.get_node(node._v_pathname) if isinstance(node, t.Table): dtypes = src.dtype.names # this is a hack because appending rows throws an error # see http://stackoverflow.com/questions/17847587/pytables-appending-recarray # dest.append([row for row in src.iterrows()]) for src_row in src.iterrows(): dest_row = dest.row for j in range(len(dtypes)): dest_row[dtypes[j]] = src_row[j] dest_row.append() dest.flush() def _merge_node(node, dest_file): if not dest_file.__contains__(node._v_pathname): node._v_file.copy_node( node._v_pathname, newparent=dest_file.get_node(node._v_parent._v_pathname), recursive=True) dest_file.flush() return if isinstance(node, t.Leaf): _merge_leaf(node, dest_file) else: for child in node._v_children: _merge_node(node._v_file.get_node(node._v_pathname + '/' + child), dest_file)
[docs]def combine(files, new_file=None, clean=False): """Combines two or more databases with identical layout, writing their output into a new file or appending to the first in the list. Parameters ---------- files : iterator An iterator listing all databases to combine new_file : str, optional The new database to write to. If None, all databases are appended to the end of the first database in the list. clean : bool, optional Whether to remove original files after combining them """ if new_file is not None and os.path.exists(new_file): raise ValueError('Cannot write combined hdf5 files to an existing location.') first = files.next() if new_file is not None: shutil.copyfile(first, new_file) fname = new_file if clean: os.remove(first) else: fname = first aggdb = t.open_file(fname, 'a') for f in files: db = t.open_file(f, 'r') _merge_node(db.root, aggdb) aggdb.flush() db.close() if clean: os.remove(f) aggdb.close()
[docs]def get_process_children(pid): """Return ------ children : list of ints all of a processes' children """ p = Popen('ps --no-headers -o pid --ppid %d' % pid, shell = True, stdout = PIPE, stderr = PIPE) stdout, stderr = p.communicate() return [int(p) for p in stdout.split()]
[docs]def ssh_test_connect(client, host, user, keyfile=None, auth=True): """Tests an ssh connection and returns success or failure thereof. Parameters ---------- client : paramiko SSH client host : str user : str keyfile : str, optional auth : bool, optional whether to prompt for a password authorization on failure """ keyfile = keyfile if keyfile is not None else \ os.path.join(os.environ['HOME'], '.ssh','id_rsa.pub') try: client.connect(host, username=user, key_filename=keyfile) client.close() can_connect = True except pm.AuthenticationException: can_connect = False except pm.BadHostKeyException: import pdb; pdb.set_trace() can_connect = False password = None if not can_connect and auth: password = False while not password: password = getpass.getpass("{0}@{1} password: ".format(user, host)) # pub = pm.ssh_pub_key(keyfile) # cmds = ["mkdir -p ~/.ssh", # 'echo "{0}" >> ~/.ssh/authorized_keys'.format(pub), # 'chmod og-rw ~/.ssh/authorized_keys', # 'chmod a-x ~/.ssh/authorized_keys', # 'chmod 700 ~/.ssh', # ] client.connect(host, username=user, password=password) # for cmd in cmds: # stdin, stdout, stderr = client.exec_command(cmd) # client.close() # # verify thatthis key works # client.connect(host, username=user, key_filename=keyfile) # client.close() can_connect = True print("finished connecting") return can_connect, keyfile, password
def uuidhex(bytes=bytes): return uuid.UUID(bytes=bytes).hex
[docs]def memusg(pid): """in kb""" # could also use # resource.getrusage(resource.RUSAGE_SELF).ru_maxrss fname = os.path.join(os.path.sep, 'proc', str(pid), 'status') with io.open(fname) as f: lines = f.readlines() return float(next(l for l in lines if l.startswith('VmSize')).split()[1])
[docs]def obj_info(kind=None, rcs=None, args=None): """Get information about an importable object Parameters ---------- kind : str the kind of object rcs : list of RunControl objects or single object, optional rcs are checked in order args : argparse args, optional CLI args Return ------ info : tuple of package, module, and class names """ mod, cname, pack = None, None, None rcs = [rcs] if not isinstance(rcs, list) else rcs sources = [args] + rcs # try CLI first, then rcs in order for source in sources: if source is None: continue attr = '{0}_package'.format(kind) if pack is None and mod is None and cname is None: if hasattr(source, attr): pack = getattr(source, attr) if mod is None: attr = '{0}_module'.format(kind) if hasattr(source, attr): mod = getattr(source, attr) if cname is None: attr = '{0}_class'.format(kind) if hasattr(source, attr): cname = getattr(source, attr) return pack, mod, cname
[docs]def get_obj(kind=None, rcs=None, args=None): """Get an object of certain kind, e.g. species or family. Both the rc and args argument will be searched for attributes named <kind>_package, <kind>_module, and <kind>_cname. The package/module is then imported and an instance of the cname is returned. The CLI is searched before the rcs. Parameters ---------- kind : str the kind of object rcs : list of RunControl objects or single object, optional rcs are checked in order args : argparse args, optional CLI args Return ------ inst : an object instance """ pack, mod, cname = obj_info(kind=kind, rcs=rcs, args=args) try: mod = importlib.import_module(mod, package=pack) except AttributeError: raise RuntimeError('Could not find {0} module {1}. Make sure to add ' 'a {0}_module entry to a run control file or the ' 'CLI.'.format(kind, mod)) if cname is None or not hasattr(mod, cname): raise RuntimeError('Could not find {0} class {1}. Make sure to add ' 'a {0}_class entry to a run control file or the ' 'CLI.'.format(kind, cname)) inst = getattr(mod, cname)() return inst
[docs]def collect_instids(h5file, path, rc=None, instids=None, colname='instid'): """Collects all instids as specified. If rc and instids is None, all ids found in the h5file's path are collected. Otherwise, instids provided by the instid listing and the paramater space defined by the run control `inst_queries` parameter are collected. Parameters ---------- h5file : PyTables File object the file to collect ids from path : str the path to a property table node rc : RunControl object, optional a run control object specifying a subset of instids to collect instids : collection of uuids explicit instids to collect colname : str the instance id column name Return ------ instids : set of uuids """ instids = set(instids) if instids is not None else set() rc = rc if rc is not None else RunControl() instids |= set(uuid.UUID(x) for x in rc.inst_ids) \ if 'inst_ids' in rc else set() # inst queries are a mapping from instance table names to queryable # conditions, the result of which is a collection of instids that meet those # conditions h5node = h5file.get_node(path) conds = rc.inst_queries if 'inst_queries' in rc else [] if isinstance(conds, basestring): conds = [conds] if len(conds) > 0: ops = conds[1::2] conds = ['({0})'.format(c) if \ not c.lstrip().startswith('(') and \ not c.rstrip().endswith(')') else c for c in conds[::2]] cond = ' '.join( [' '.join(i) for i in \ itools.izip_longest(conds, ops, fillvalue='')]).strip() vals = [x[colname] for x in h5node.where(cond)] vals = [x + '\0' if len(x) == 15 else x for x in vals] instids |= set(uuid.UUID(bytes=x) for x in vals) # if no ids, then run everything if len(instids) == 0: for row in h5node.iterrows(): iid = row[colname] if len(iid) == 15: iid += '\0' instids.add(uuid.UUID(bytes=iid)) return instids
"""simple utility for determining if something is a sequence (and not a string)""" seq_not_str = lambda obj: isinstance(obj, Sequence) \ and not isinstance(obj, basestring)
[docs]def n_permutations(x, iter_keys=[], recurse=True): """Parameters ---------- x : dict, list, or other iter_keys : a list of keys atomic values should be iterables, optional recurse : bool, whether to recurse at the lowest level Returns ------- n : int the total number of permutations of values in x, if x has container values, those are recusively interrogated as well """ n = 1 if seq_not_str(x): if seq_not_str(x[0]): if recurse: for y in x: n *= n_permutations(y, recurse=recurse) else: n *= len(x) else: factor = len(x) if recurse else 1 n *= factor elif isinstance(x, Mapping): for k, v in x.items(): flag = False if k in iter_keys else True # in blacklist n *= n_permutations(v, recurse=flag) return n
[docs]def expand_args(x): """Parameters ---------- x : list of lists of arguments Returns ------- args : generator a generator that returns a collection of single arguments """ for y in itools.product(*x): yield y
def conv_insts(fam, fam_tables, sp, sp_tables, ninst=1, update_freq=100, verbose=False): n = 0 for point in sp.points(): param_uuid = uuid.uuid4() sp.record_point(point, param_uuid, sp_tables) for i in range(ninst): inst_uuid = uuid.uuid4() inst = sp.gen_inst(point) fam.record_inst(inst, inst_uuid, param_uuid, sp.name, fam_tables) if n % update_freq == 0: if verbose: print('Total writes: {0}'.format( sum([tbl.n_writes for tbl in fam_tables.values() + sp_tables.values()]))) print('Memusg before collect: {0}'.format( resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)) gc.collect() if verbose: print('Memusg after collect: {0}'.format( resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)) print('{0} instances have been converted'.format(n)) n += 1 if verbose: print('{0} instances have been converted'.format(n))
[docs]def cyc_members(obj): """return a list of persistable members per the Cyclopts style guide.""" members = obj.__class__.__dict__.keys() cycfilter = lambda x: x.startswith('_') or x.endswith('_') or x[0].isupper() return [x for x in members if not cycfilter(x)] # def run_insts_mp(): # q = mp.Queue() # pool = mp.Pool(4, multi_proc_gen, (q,)) # lock = mp.Lock() # for point in sp.points(): # param_uuid = uuid.uuid4() # if lock is not None: # lock.acquire() # sp.record_point(point, param_uuid, sp_manager.tables) # if lock is not None: # lock.release() # for i in range(ninst): # inst_uuid = uuid.uuid4() # # q.put((inst_uuid, param_uuid, point, sp, fam, # # fam_manager.tables, lock)) # q.put((inst_uuid, param_uuid, lock)) # while not q.empty(): # if verbose and q.qsize() % update_freq == 0: # print('{0} instances have been converted'.format(n)) # time.sleep(1)