Source code for cyclopts.condor.dag

"""This module defines methods to lauch DAGman Cyclopts jobs.

:author: Matthew Gidden <matthew.gidden _at_ gmail.com>
"""
from __future__ import print_function

import os
import io
import paramiko as pm
import glob
import tarfile
import shutil
import uuid

from cyclopts import tools
from cyclopts.condor.utils import exec_remote_cmd, batlab_base_dir_template

job_template = u"""JOB J_{0} {0}.sub"""

# This submission file template includes a condor execute node requirement
# called ForGidden. This requirement is used to target nonhyperthreaded cores in
# order to get accurate time comparisons for problem instance runs. In the
# future, if this tool is used by others, ForGidden should be changed on the
# Condor side to ForCyclopts, ForTimeMeasurement, or an equivalent, and this
# template should be updated. Contact chtc@cs.wisc.edu to do so.
sub_template = u"""
universe = vanilla
executable = run.sh
arguments = "'{id}_out.h5' '{instids}'"
output = {id}.out
error = {id}.err
log = {id}.log
requirements = (OpSysAndVer =?= "SL6") && Arch == "X86_64" && ( ForGidden == true )
should_transfer_files = YES
when_to_transfer_output = ON_EXIT
transfer_input_files = {homedir}/cde-cyclopts.tar.gz, {homedir}/CDE.tar.gz, {db}
request_cpus = 1
#request_memory = 2500
#request_disk = 10242880
notification = never
"""

run_template = u"""#!/bin/bash
pwd=$PWD
ls -l

tar -xf CDE.tar.gz
tar -xf cde-cyclopts.tar.gz
export PATH=$pwd/CDE/:$PATH
ls -l

mv {db} cde-package/cde-root
cd cde-package/cde-root
sed -i 's/..\/cde-exec/cde-exec/g' ../cyclopts.cde
ls -l
../cyclopts.cde exec --db {db} --solvers {solvers} \
--solvers {solvers} --family_module {module} --family_class {cname} \
--outdb $1 --instids $2
mv $1 $pwd

cd $pwd
ls -l
"""

submit_cmd = """
mkdir -p {remotedir} && cd {remotedir} &&
tar -xf {tarfile} && rm {tarfile} && cd {cddir} && 
condor_submit_dag -maxidle 1000 {submit};
"""

def _gen_files(prepdir, dbname, instids, module, cname, solvers, 
               remotehome, subfile="dag.sub", max_time=None, verbose=False):
    """Generates all files needed to run a DAGMan instance of the given input
    database.
    """    
    if verbose:
        print("generating files for {0} runs".format(len(instids)))

    # dag submit files
    max_time_line = ("\nperiodic_hold = (JobStatus == 2) && "
                     "((CurrentTime - EnteredCurrentStatus) > "
                     "({0}))").format(max_time) if max_time is not None \
                     else ""
    lines = ""
    nfiles = len(instids)
    for i in range(len(instids)):
        subname = os.path.join(prepdir, "{0}.sub".format(i))
        with io.open(subname, 'w') as f:
            sublines = sub_template.format(id=i, instids=instids[i], db=dbname,
                                           homedir=remotehome)
            sublines += max_time_line + '\nqueue'
            f.write(sublines)
        lines += job_template.format(i) + '\n'
    dagfile = os.path.join(prepdir, subfile)
    nfiles += 1
    with io.open(dagfile, 'w') as f:
        f.write(lines)
        
    # run script
    runfile = os.path.join(prepdir, "run.sh")
    nfiles += 1 
    with io.open(runfile, 'w') as f:
        f.write(run_template.format(db=dbname, module=module, cname=cname,
                                    solvers=" ".join(solvers)))
    
    return nfiles

def _submit(client, remotedir, tarname, subfile="dag.sub", 
                verbose=False):
    """Performs a condor DAG sumbission on a client using a tarball of all
    submission-related data.

    Parameters
    ----------
    client : paramiko SSHClient
        the client
    remotedir : str
        the run directory on the client
    tarname : str
        the name of the tarfile
    subfile : str, optional
        the name of the submit file
    verbose : bool, optional
        whether to print information regarding the submission process    
    """
    ffrom = tarname
    tarname = os.path.basename(tarname)
    fto = '{0}/{1}'.format(remotedir, tarname)
    if verbose:
        print("Copying from {0} to {1} on the condor submit node.".format(
                ffrom, fto))
    try:
        ftp = client.open_sftp()
        ftp.put(ffrom, fto)
        ftp.close()
    except IOError:
        raise IOError(
            'Could not find {0} on the submit node.'.format(remotedir))
    
    cddir = tarname.split(".tar.gz")[0]
    cmd = submit_cmd.format(tarfile=tarname, cddir=cddir, 
                            submit=subfile, remotedir=remotedir)
    stdin, stdout, stderr = exec_remote_cmd(client, cmd, verbose=verbose)
    
    checkfile = '/'.join([remotedir, cddir, subfile + '.dagman.out'])
    cmd = "head {0}".format(checkfile)
    stdin, stdout, stderr = exec_remote_cmd(client, cmd, verbose=verbose)
    pid = stdout.readlines()[1].split('condor_scheduniv_exec.')[1].split()[0]
    return pid

def gen_tar(rundir, db, instids, module, cname, solvers, 
            user="gidden", verbose=False):
    prepdir = '.tmp_{0}'.format(rundir)
    if not os.path.exists(prepdir):
        os.makedirs(prepdir)
    else:
        raise IOError("File preparation directory {0} already exists".format(
                prepdir))
    
    max_time = 60 * 60 * 5 # 5 hours
    remotehome = batlab_base_dir_template.format(user=user)
    nfiles = _gen_files(prepdir, os.path.basename(db), instids, module, cname, 
                        solvers, remotehome, max_time=max_time, verbose=verbose)
    
    subfiles = glob.iglob(os.path.join(prepdir, '*.sub'))
    shfiles = glob.iglob(os.path.join(prepdir, '*.sh'))

    nfiles += 1 # add db
    if verbose:
        print("tarring {0} files".format(nfiles))
    tarname = "{0}.tar.gz".format(rundir)
    with tarfile.open(tarname, 'w:gz') as tar:
        tar.add(db, arcname="{0}/{1}".format(rundir, os.path.basename(db)))
        for f in subfiles:
            basename = os.path.basename(f)
            tar.add(f, arcname="{0}/{1}".format(rundir, basename))
        for f in shfiles:
            basename = os.path.basename(f)
            tar.add(f, arcname="{0}/{1}".format(rundir, basename))
    shutil.rmtree(prepdir)
    return tarname

[docs]def submit(user, db, instids, module, cname, solvers, remotedir, host="submit-3.chtc.wisc.edu", keyfile=None, verbose=False): """Connects via SSH to a condor submit node, and executes a Cyclopts DAG run. Parameters ---------- user : str the user on the condor submit host db : str the problem instance database instids : set the set of instances to run module : str the ProblemFamily module cname : str the ProblemFamily cname solvers : list the solvers to use remotedir : str the base run directory on the condor submit node, relative to ~/cyclopts-runs host : str, optional the condor submit host keyfile : str, optional the public key file verbose : bool, optional whether to print information regarding the submission process """ client = pm.SSHClient() client.set_missing_host_key_policy(pm.AutoAddPolicy()) _, keyfile, pw = tools.ssh_test_connect(client, host, user, keyfile, auth=True) localtar = gen_tar(remotedir, db, instids, module, cname, solvers, user, verbose=verbose) if verbose: print("connecting to {0}@{1}".format(user, host)) client.connect(host, username=user, key_filename=keyfile, password=pw) pid = _submit(client, tools.cyclopts_remote_run_dir, localtar, verbose=verbose) client.close() if verbose: print("Submitted job in {0}@{1}:~/cyclopts-runs/{2} with pid: {3}".format( user, host, remotedir, pid)) os.remove(localtar)