import os
import subprocess
from collections import namedtuple
try:
from collections.abc import Iterable
except ImportError: # python < 3.3
from collections import Iterable
from .utils import (checkdir, string_rep, requires_command,
split_command_string, decode_string)
from .basenode import BaseNode
JobArg = namedtuple('JobArg', ['arg', 'name', 'retry'])
[docs]class Job(BaseNode):
"""
Job object consisting of an executable to be run, potentially with a
series of different command-line arguments.
Note that the ``submit``, ``error``, ``log``, and ``output`` parameters
can be explicitly given or configured by setting ``PYCONDOR_SUBMIT_DIR``,
``PYCONDOR_ERROR_DIR``, ``PYCONDOR_LOG_DIR``, and ``PYCONDOR_OUTPUT_DIR``
environment variables. An explicitly given value will be used over an
environment variable, while an environment variable will be used over a
default value.
Parameters
----------
name : str
Name of the Job instance. This will also be the name of the
corresponding error, log, output, and submit files associated with
this Job.
executable : str
Path to corresponding executable for Job.
error : str or None, optional
Path to directory where condor Job error files will be written (default
is None, will not be included in Job submit file).
log : str or None, optional
Path to directory where condor Job log files will be written (default
is None, will not be included in Job submit file).
output : str or None, optional
Path to directory where condor Job output files will be written
(default is None, will not be included in Job submit file).
submit : str, optional
Path to directory where condor Job submit files will be written
(defaults to the directory was the Job was submitted from).
request_memory : str or None, optional
Memory request to be included in submit file.
request_disk : str or None, optional
Disk request to be included in submit file.
request_cpus : int or None, optional
Number of CPUs to request in submit file.
.. versionadded:: 0.1.0
getenv : bool or None, optional
Whether or not to use the current environment settings when running
the job (default is None).
universe : str or None, optional
Universe execution environment to be specified in submit file
(default is None).
initialdir : str or None, optional
Initial directory for relative paths (defaults to the directory was
the job was submitted from).
notification : str or None, optional
E-mail notification preference (default is None).
requirements : str or None, optional
Additional requirements to be included in ClassAd.
queue : int or None, optional
Integer specifying how many times you would like this job to run.
extra_lines : list or None, optional
List of additional lines to be added to submit file.
dag : Dagman, optional
If specified, Job will be added to dag (default is None).
arguments : str or iterable, optional
Arguments with which to initialize the Job list of arguments
(default is None).
retry : int or None, optional
Option to specify the number of retries for all Job arguments. This
can be superseded for arguments added via the add_arg() method.
Note: this feature is only available to Jobs that are submitted via a
Dagman (default is None; no retries).
verbose : int, optional
Level of logging verbosity option are 0-warning, 1-info,
2-debugging (default is 0).
Attributes
----------
args : list
List of arguments for this Job instance.
parents : list
Only set when included in a Dagman. List of parent Jobs and Dagmans.
Ensures that Jobs and Dagmans in the parents list will complete
before this Job is submitted to HTCondor.
children : list
Only set when included in a Dagman. List of child Jobs and Dagmans.
Ensures that Jobs and Dagmans in the children list will be
submitted only after this Job has completed.
Examples
--------
>>> import pycondor
>>> job = pycondor.Job('myjob', 'myscript.py')
>>> job.build_submit()
"""
def __init__(self, name, executable, error=None, log=None, output=None,
submit=None, request_memory=None, request_disk=None,
request_cpus=None, getenv=None, universe=None,
initialdir=None, notification=None, requirements=None,
queue=None, extra_lines=None, dag=None, arguments=None,
retry=None, verbose=0):
super(Job, self).__init__(name, submit, extra_lines, dag, verbose)
self.executable = string_rep(executable)
self.error = error
self.log = log
self.output = output
self.request_memory = request_memory
self.request_disk = request_disk
self.request_cpus = request_cpus
self.getenv = getenv
self.universe = universe
self.initialdir = initialdir
self.notification = notification
self.requirements = requirements
self.queue = queue
if retry is not None and not isinstance(retry, int):
raise TypeError('retry must be an int')
self.retry = retry
self.args = []
if arguments is not None:
if isinstance(arguments, str):
self.add_arg(arguments)
elif isinstance(arguments, Iterable):
for arg in arguments:
self.add_arg(arg)
else:
raise TypeError('arguments must be a string or an iterable')
self.logger.debug('{} initialized'.format(self.name))
def __repr__(self):
nondefaults = ''
default_attr = ['name', 'executable', 'logger']
for attr in sorted(vars(self)):
if getattr(self, attr) and attr not in default_attr:
nondefaults += ', {}={}'.format(attr, getattr(self, attr))
output = 'Job(name={}, executable={}{})'.format(
self.name, os.path.basename(self.executable), nondefaults)
return output
def __iter__(self):
return iter(self.args)
def __len__(self):
return len(self.args)
[docs] def add_arg(self, arg, name=None, retry=None):
"""Add argument to Job
Parameters
----------
arg : str
Argument to append to Job args list.
name : str or None, optional
Option to specify a name related to this argument. If a name is
specified, then a separate set of log, output, and error files
will be generated for this particular argument
(default is ``None``).
.. versionadded:: 0.1.2
retry : int or None, optional
Option to specify the number of times to retry this node. Default
number of retries is 0. Note: this feature is only available to
Jobs that are submitted via a Dagman.
.. versionadded:: 0.1.2
Returns
-------
self : object
Returns self.
"""
# Validate user input
if not isinstance(arg, str):
raise TypeError('arg must be a string')
elif name is not None and not isinstance(name, str):
raise TypeError('name must be a string')
elif retry is not None and not isinstance(retry, int):
raise TypeError('retry must be an int')
if retry is not None:
job_arg = JobArg(arg=arg, name=name, retry=retry)
else:
job_arg = JobArg(arg=arg, name=name, retry=self.retry)
self.args.append(job_arg)
self.logger.debug('Added argument \'{}\' to Job {}'.format(arg,
self.name))
return self
[docs] def add_args(self, args):
"""Adds multiple arguments to Job
Parameters
----------
args : iterable
Iterable of arguments to append to the arguments list
Returns
-------
self : object
Returns self.
"""
for arg in args:
self.add_arg(arg)
return self
def _make_submit_script(self, makedirs=True, fancyname=True, indag=False):
# Retrying failed nodes is only available to Jobs in a Dagman
self._has_arg_retries = any([job_arg.retry for job_arg in self.args])
if self._has_arg_retries and (not indag):
message = 'Retrying failed Jobs is only available when ' + \
'submitting from a Dagman.'
self.logger.error(message)
raise NotImplementedError(message)
# Check that paths/files exist
for directory in [self.submit, self.log, self.output, self.error]:
if directory is not None:
checkdir(directory + '/', makedirs)
lines = []
submit_attrs = ['universe', 'executable', 'request_memory',
'request_disk', 'request_cpus', 'getenv',
'initialdir', 'notification', 'requirements']
for submit_attr in submit_attrs:
if getattr(self, submit_attr) is not None:
submit_attr_str = string_rep(getattr(self, submit_attr))
lines.append('{} = {}'.format(submit_attr, submit_attr_str))
name = self._get_fancyname() if fancyname else self.name
self.submit_name = name
submit_file = os.path.join(self.submit, '{}.submit'.format(name))
checkdir(submit_file, makedirs)
# Add submit_file data member to job for later use
self.submit_file = submit_file
# Set up log, output, and error files paths
self._has_arg_names = any([arg.name for arg in self.args])
for attr in ['log', 'output', 'error']:
dir_env_var = os.getenv('PYCONDOR_{}_DIR'.format(attr.upper()))
if getattr(self, attr) is not None:
dir_path = getattr(self, attr)
elif dir_env_var is not None:
dir_path = dir_env_var
else:
continue
# Add log/output/error files to submit file lines
if self._has_arg_names:
file_path = os.path.join(dir_path,
'$(job_name).{}'.format(attr))
else:
file_path = os.path.join(dir_path,
'{}.{}'.format(name, attr))
lines.append('{} = {}'.format(attr, file_path))
setattr(self, '{}_file'.format(attr), file_path)
checkdir(file_path, makedirs)
# Add any extra lines to submit file, if specified
if self.extra_lines:
lines.extend(self.extra_lines)
# Add arguments and queue line
if self.queue is not None and not isinstance(self.queue, int):
raise ValueError('queue must be of type int')
# If building this submit file for a job that's being managed by DAGMan
# just add simple arguments and queue lines
if indag:
if len(self.args) > 0:
lines.append('arguments = $(ARGS)')
if self._has_arg_names:
lines.append('job_name = $(job_name)')
if self.queue:
lines.append('queue {}'.format(self.queue))
else:
lines.append('queue')
else:
if self.args and self.queue:
if len(self.args) > 1:
raise NotImplementedError(
'At this time multiple arguments and queue values '
'are only supported through Dagman')
else:
arg = self.args[0].arg
lines.append('arguments = {}'.format(string_rep(arg,
quotes=True)))
lines.append('queue {}'.format(self.queue))
# Any arguments supplied will be taken care of via the queue line
elif self.args:
for arg, arg_name, _ in self.args:
lines.append('arguments = {}'.format(arg))
if not self._has_arg_names:
pass
elif arg_name is not None:
lines.append('job_name = {}_{}'.format(name, arg_name))
else:
lines.append('job_name = {}'.format(name))
lines.append('queue')
elif self.queue:
lines.append('queue {}'.format(self.queue))
else:
lines.append('queue')
with open(submit_file, 'w') as f:
f.writelines('\n'.join(lines))
return
[docs] def build(self, makedirs=True, fancyname=True):
"""Build and saves the submit file for Job
Parameters
----------
makedirs : bool, optional
If Job directories (e.g. error, output, log, submit) don't exist,
create them (default is ``True``).
fancyname : bool, optional
Appends the date and unique id number to error, log, output, and
submit files. For example, instead of ``jobname.submit`` the submit
file becomes ``jobname_YYYYMMD_id``. This is useful when running
several Jobs of the same name (default is ``True``).
Returns
-------
self : object
Returns self.
"""
self.logger.info(
'Building submission file for Job {}...'.format(self.name))
self._make_submit_script(makedirs, fancyname, indag=False)
self._built = True
if len(self.args) >= 10:
self.logger.warning('You are submitting a Job with {} arguments. '
'Consider using a Dagman in the future to '
'help monitor jobs.'.format(len(self.args)))
self.logger.info('Condor submission file for {} successfully '
'built!'.format(self.name))
return self
def _build_from_dag(self, makedirs=True, fancyname=True):
self.logger.debug(
'Building submission file for Job {}...'.format(self.name))
self._make_submit_script(makedirs, fancyname, indag=True)
self._built = True
self.logger.debug('Condor submission file for {} successfully '
'built!'.format(self.name))
return
[docs] @requires_command('condor_submit')
def submit_job(self, submit_options=None):
"""Submits Job to condor
Parameters
----------
submit_options : str, optional
Options to be passed to ``condor_submit`` for this Job
(see the `condor_submit documentation
<http://research.cs.wisc.edu/htcondor/manual/current/condor_submit.html>`_
for possible options).
Returns
-------
self : object
Returns self.
Examples
--------
>>> import pycondor
>>> job = pycondor.Job('myjob', 'myscript.py')
>>> job.build()
>>> job.submit_job(submit_options='-maxjobs 1000 -interactive')
"""
# Ensure that submit file has been written
if not self._built:
raise ValueError('build() must be called before submit()')
# Ensure that there are no parent relationships
if len(self.parents) != 0:
raise ValueError('Attempting to submit a Job with parents. '
'Interjob relationships requires Dagman.')
# Ensure that there are no child relationships
if len(self.children) != 0:
raise ValueError('Attempting to submit a Job with children. '
'Interjob relationships requires Dagman.')
# Construct and execute condor_submit command
command = 'condor_submit'
if submit_options is not None:
command += ' {}'.format(submit_options)
command += ' {}'.format(self.submit_file)
proc = subprocess.Popen(
split_command_string(command),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = proc.communicate()
print(decode_string(out))
return self
[docs] @requires_command('condor_submit')
def build_submit(self, makedirs=True, fancyname=True, submit_options=None):
"""Calls build and submit sequentially
Parameters
----------
makedirs : bool, optional
If Job directories (e.g. error, output, log, submit) don't exist,
create them (default is ``True``).
fancyname : bool, optional
Appends the date and unique id number to error, log, output, and
submit files. For example, instead of ``jobname.submit`` the submit
file becomes ``jobname_YYYYMMD_id``. This is useful when running
several Jobs of the same name (default is ``True``).
submit_options : str, optional
Options to be passed to ``condor_submit`` for this Job
(see the `condor_submit documentation
<http://research.cs.wisc.edu/htcondor/manual/current/condor_submit.html>`_
for possible options).
Returns
-------
self : object
Returns self.
"""
self.build(makedirs, fancyname)
self.submit_job(submit_options=submit_options)
return self