"""This module defines methods to lauch Worker Queue Cyclopts jobs.
:author: Matthew Gidden <matthew.gidden _at_ gmail.com>
"""
from __future__ import print_function
import paramiko as pm
import os
import io
import tarfile
import shutil
import stat
from cyclopts import tools
from cyclopts.condor.utils import exec_remote_cmd, batlab_base_dir_template
run_lines = u"""#!/bin/bash
pwd=$PWD
outdb=$1
instids=$2
# work queue puts workers down two directories
indb_rel=../../$3
indb_abs=`readlink -f $indb_rel`
indb_dir=`dirname $indb_abs`
indb=`basename $indb_abs`
echo "pwd: $PWD"
echo "indb_rel : $indb_rel"
echo "indb dir: $indb_dir"
echo "indb dir listing"
ls -l $indb_dir
echo "indb_abs: $indb_abs"
if [ ! -f $indb_abs ]; then
echo "expected input file $indb_abs does not exist"
exit 1
fi
echo "pwd pre-tar:"
ls -l
tar -xf CDE.tar.gz
tar -xf cde-cyclopts.tar.gz
export PATH=$pwd/CDE/:$PATH
echo "pwd post-tar:"
ls -l
cde_root='cde-package/cde-root'
ln $indb_abs $cde_root/$indb
cd $cde_root
sed -i 's/..\/cde-exec/cde-exec/g' ../cyclopts.cde
echo "confirming $indb exists:"
if [ ! -f $indb ]; then
echo "expected input file $indb does not exist"
exit 1
fi
echo "execute dir pre-execute"
ls -l
echo "Executing ../cyclopts.cde exec --db $indb --outdb $outdb --instids $instids"
../cyclopts.cde exec --db $indb --outdb $outdb --instids $instids \
--solvers {solvers} --family_module {module} --family_class {cname}
echo "execute dir post-execute"
ls -l
mv $outdb $pwd
cd $pwd
echo "pwd dir post-execute"
ls -l
"""
def gen_tar(remotedir, db, instids, module, cname, solvers,
user="gidden", verbose=False):
prepdir = '.tmp_{0}'.format(remotedir)
if not os.path.exists(prepdir):
os.makedirs(prepdir)
else:
raise IOError("File preparation directory {0} already exists".format(
prepdir))
remotehome = batlab_base_dir_template.format(user=user)
nfiles = 0
runlines = run_lines.format(solvers=" ".join(solvers), module=module,
cname=cname)
runfile = os.path.join(prepdir, 'run.sh')
with io.open(runfile, mode='w') as f:
f.write(runlines)
# chmod 775
os.chmod(runfile, stat.S_IRWXG | stat.S_IRWXU | stat.S_IXOTH | stat.S_IROTH)
nfiles += 1
idfile = os.path.join(prepdir, 'uuids')
with io.open(idfile, mode='w') as f:
for i in instids:
f.write(u'{0}\n'.format(i))
nfiles += 1
base = os.path.dirname(os.path.abspath(__file__))
mastername = 'launch_master.py'
masterfile = os.path.join(base, mastername)
nfiles += 1
nfiles += 1 # add db
if verbose:
print("tarring {0} files".format(nfiles))
tarname = "{0}.tar.gz".format(remotedir)
with tarfile.open(tarname, 'w:gz') as tar:
tar.add(db, arcname="{0}/{1}".format(remotedir, os.path.basename(db)))
tar.add(runfile, arcname="{0}/{1}".format(remotedir,
os.path.basename(runfile)))
tar.add(idfile, arcname="{0}/{1}".format(remotedir,
os.path.basename(idfile)))
tar.add(masterfile, arcname="{0}/{1}".format(remotedir,
os.path.basename(masterfile)))
shutil.rmtree(prepdir)
return tarname
submit_cmd = """
mkdir -p {remotedir} && cd {remotedir} &&
tar -xf {tarfile} && rm {tarfile} && cd {cddir} &&
nohup python -u launch_master.py port={port} user={user} nids={nids} indb={indb} nodes={nodes} --log={log} &> launch_master.out &
"""
def _submit(client, remotedir, tarname, nids, indb, log=False,
port='5422', user='gidden', nodes=None, verbose=False):
"""Performs a condor Work Queue sumbission on a client using a tarball of all
submission-related data.
Parameters
----------
client : paramiko SSHClient
the client
remotedir : str
the run directory on the client
tarname : str
the name of the tarfile
nids : int
the number of ids being run
indb : str
the name of the input database
log : bool
whether or not to keep worker/queue logs
port : str, optional
the port for the workers and master to communicate on
user : str, optional
the user to run the jobs on
nodes : list, optional
a list of execute nodes prefixes (e.g., e121.chtc.wisc.edu -> e121)
verbose : bool, optional
whether to print information regarding the submission process
"""
ffrom = tarname
tarname = os.path.basename(tarname)
fto = '{0}/{1}'.format(remotedir, tarname)
nodes = ['e121', 'e122', 'e123', 'e124', 'e125', 'e126'] if nodes is None else nodes
if verbose:
print("Copying from {0} to {1} on the condor submit node.".format(
ffrom, fto))
try:
ftp = client.open_sftp()
ftp.put(ffrom, fto)
ftp.close()
except IOError:
raise IOError(
'Could not find {0} on the submit node.'.format(remotedir))
cddir = tarname.split(".tar.gz")[0]
cmd = submit_cmd.format(tarfile=tarname, cddir=cddir,
remotedir=remotedir, port=port, user=user,
nids=nids, indb=indb, log=log,
nodes=",".join(nodes))
print("Remotely executing '{0}'".format(cmd))
stdin, stdout, stderr = client.exec_command(cmd)
return stdout.channel.recv_exit_status()
[docs]def submit(user, db, instids, module, cname, solvers, remotedir, log=False,
host="submit-3.chtc.wisc.edu", keyfile=None,
nodes=None,
port='5422', verbose=False):
"""Connects via SSH to a condor submit node, and executes a Cyclopts Work
Queue run.
Parameters
----------
user : str
the user on the condor submit host
db : str
the problem instance database
instids : set
the set of instances to run
module : str
the ProblemFamily module
cname : str
the ProblemFamily cname
solvers : list
the solvers to use
remotedir : str
the base run directory on the condor submit node, relative to
~/cyclopts-runs
log : bool
whether or not to keep worker/queue logs
host : str, optional
the condor submit host
keyfile : str, optional
the public key file
nodes : list, optional
a list of execute nodes prefixes (e.g., e121.chtc.wisc.edu -> e121)
port : str, optional
the port to use for master/worker communication
verbose : bool, optional
whether to print information regarding the submission process
"""
client = pm.SSHClient()
client.set_missing_host_key_policy(pm.AutoAddPolicy())
_, keyfile, pw = tools.ssh_test_connect(client, host, user, keyfile, auth=True)
localtar = gen_tar(remotedir, db, instids, module, cname, solvers, user,
verbose=verbose)
if verbose:
print("connecting to {0}@{1}".format(user, host))
client.connect(host, username=user, key_filename=keyfile, password=pw)
rtn = _submit(client, tools.cyclopts_remote_run_dir, localtar,
len(instids), os.path.basename(db), log=log, nodes=nodes,
port=port, verbose=verbose)
client.close()
if verbose:
print("Submitted job in {0}@{1}:~/cyclopts-runs/{2} with exit "
"code: {rtn}".format(
user, host, remotedir, rtn=rtn))
os.remove(localtar)