__copyright__ = "Copyright 2013-2014, http://radical.rutgers.edu"
__license__ = "MIT"
import saga.attributes as attributes
# ------------------------------------------------------------------------------
# Attribute description keys
NAME = 'name'
EXECUTABLE = 'executable'
ARGUMENTS = 'arguments'
ENVIRONMENT = 'environment'
CORES = 'cores' # deprecated
CPU_PROCESSES = 'cpu_processes'
CPU_PROCESS_TYPE = 'cpu_process_type'
CPU_THREADS = 'cpu_threads'
CPU_THREAD_TYPE = 'cpu_thread_type'
GPU_PROCESSES = 'gpu_processes'
GPU_PROCESS_TYPE = 'gpu_process_type'
GPU_THREADS = 'gpu_threads'
GPU_THREAD_TYPE = 'gpu_thread_type'
LFS_PER_PROCESS = 'lfs_per_process'
TAG = 'tag'
INPUT_STAGING = 'input_staging'
OUTPUT_STAGING = 'output_staging'
PRE_EXEC = 'pre_exec'
POST_EXEC = 'post_exec'
KERNEL = 'kernel'
CLEANUP = 'cleanup'
PILOT = 'pilot'
STDOUT = 'stdout'
STDERR = 'stderr'
RESTARTABLE = 'restartable'
METADATA = 'metadata'
# process / thread types (for both, CPU and GPU processes/threads)
POSIX = 'POSIX' # native threads / application threads
MPI = 'MPI'
OpenMP = 'OpenMP'
CUDA = 'CUDA'
# ------------------------------------------------------------------------------
#
[docs]class ComputeUnitDescription(attributes.Attributes):
"""
A ComputeUnitDescription object describes the requirements and properties
of a :class:`radical.pilot.ComputeUnit` and is passed as a parameter to
:meth:`radical.pilot.UnitManager.submit_units` to instantiate and run
a new unit.
.. note:: A ComputeUnitDescription **MUST** define at least an
`executable` or `kernel` -- all other elements are optional.
**Example**::
# TODO
.. data:: executable
The executable to launch (`string`). The executable is expected to be
either available via `$PATH` on the target resource, or to be an absolute
path.
default: `None`
.. data:: cpu_processes
number of application processes to start on CPU cores
default: 0
.. data:: cpu_threads
number of threads each process will start on CPU cores
default: 1
.. data:: cpu_process_type
process type, determines startup method (POSIX, MPI)
default: POSIX
.. data:: cpu_thread_type
thread type, influences startup and environment (POSIX, OpenMP)
default: POSIX
.. data:: gpu_processes
number of application processes to start on GPU cores
default: 0
.. data:: gpu_threads
number of threads each process will start on GPU cores
default: 1
.. data:: gpu_process_type
process type, determines startup method (POSIX, MPI)
default: POSIX
.. data:: gpu_thread_type
thread type, influences startup and environment (POSIX, OpenMP, CUDA)
default: POSIX
.. data:: lfs (local file storage)
amount of data (MB) required on the local file system of the node
default: 0
.. data:: name
A descriptive name for the compute unit (`string`). This attribute can
be used to map individual units back to application level workloads.
default: `None`
.. data:: arguments
The command line arguments for the given `executable` (`list` of
`strings`).
default: `[]`
.. data:: environment
Environment variables to set in the environment before execution
(`dict`).
default: `{}`
.. data:: stdout
The name of the file to store stdout in (`string`).
default: `STDOUT`
.. data:: stderr
The name of the file to store stderr in (`string`).
default: `STDERR`
.. data:: input_staging
The files that need to be staged before execution (`list` of `staging
directives`, see below).
default: `{}`
.. data:: output_staging
The files that need to be staged after execution (`list` of `staging
directives`, see below).
default: `{}`
.. data:: pre_exec
Actions (shell commands) to perform before this task starts (`list` of
`strings`). Note that the set of shell commands given here are expected
to load environments, check for work directories and data, etc. They are
not expected to consume any significant amount of CPU time or other
resources! Deviating from that rule will likely result in reduced
overall throughput.
No assumption should be made as to where these commands are executed
(although RP attempts to perform them in the unit's execution
environment).
No assumption should be made on the specific shell environment the
commands are executed in.
Errors in executing these commands will result in the unit to enter
`FAILED` state, and no execution of the actual workload will be
attempted.
default: `[]`
.. data:: post_exec
Actions (shell commands) to perform after this task finishes (`list` of
`strings`). The same remarks as on `pre_exec` apply, inclusive the point
on error handling, which again will cause the unit to fail, even if the
actual execution was successful..
default: `[]`
.. data:: kernel
Name of a simulation kernel which expands to description attributes once
the unit is scheduled to a pilot (and resource).
.. note:: TODO: explain in detail, reference ENMDTK.
default: `None`
.. data:: restartable
If the unit starts to execute on a pilot, but cannot finish because the
pilot fails or is canceled, can the unit be restarted on a different
pilot / resource?
default: `False`
.. data:: metadata
user defined metadata
default: `None`
.. data:: cleanup
If cleanup (a `bool`) is set to `True`, the pilot will delete the entire
unit sandbox upon termination. This includes all generated output data in
that sandbox. Output staging will be performed before cleanup.
Note that unit sandboxes are also deleted if the pilot's own `cleanup`
flag is set.
default: `False`
.. data:: pilot
If specified as `string` (pilot uid), the unit is submitted to the pilot
with the given ID. If that pilot is not known to the unit manager, an
exception is raised.
Staging Directives
==================
The Staging Directives are specified using a dict in the following form:
staging_directive = {
'source' : None, # see 'Location' below
'target' : None, # see 'Location' below
'action' : None, # See 'Action operators' below
'flags' : None, # See 'Flags' below
'priority': 0 # Control ordering of actions (unused)
}
Locations
---------
`source` and `target` locations can be given as strings or `ru.URL`
instances. Strings containing `://` are converted into URLs immediately.
Otherwise they are considered absolute or relative paths and are then
interpreted in the context of the client's working directory.
RP accepts the following special URL schemas:
* `client://` : relative to the client's working directory
* `resource://`: relative to the RP sandbox on the target resource
* `pilot://` : relative to the pilot sandbox on the target resource
* `unit://` : relative to the unit sandbox on the target resource
In all these cases, the `hostname` element of the URL is expected to be
empty, and the path is *always* considered relative to the locations
specified above (even though URLs usually don't have a notion of relative
paths).
Action operators
----------------
RP accepts the following action operators:
* rp.TRANSFER: remote file transfer from `source` URL to `target` URL.
* rp.COPY : local file copy, ie. not crossing host boundaries
* rp.MOVE : local file move
* rp.LINK : local file symlink
Flags
-----
rp.CREATE_PARENTS: create the directory hierarchy for targets on the fly
rp.RECURSIVE : if `source` is a directory, handle it recursively
"""
# --------------------------------------------------------------------------
#
def __init__(self, from_dict=None):
# initialize attributes
attributes.Attributes.__init__(self)
# set attribute interface properties
self._attributes_extensible (False)
self._attributes_camelcasing (True)
# register properties with the attribute interface
# action description
self._attributes_register(KERNEL, None, attributes.STRING, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(NAME, None, attributes.STRING, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(EXECUTABLE, None, attributes.STRING, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(ARGUMENTS, None, attributes.STRING, attributes.VECTOR, attributes.WRITEABLE)
self._attributes_register(ENVIRONMENT, None, attributes.STRING, attributes.DICT, attributes.WRITEABLE)
self._attributes_register(PRE_EXEC, None, attributes.STRING, attributes.VECTOR, attributes.WRITEABLE)
self._attributes_register(POST_EXEC, None, attributes.STRING, attributes.VECTOR, attributes.WRITEABLE)
self._attributes_register(RESTARTABLE, None, attributes.BOOL, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(METADATA, None, attributes.ANY, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(CLEANUP, None, attributes.BOOL, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(PILOT, None, attributes.STRING, attributes.SCALAR, attributes.WRITEABLE)
# I/O
self._attributes_register(STDOUT, None, attributes.STRING, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(STDERR, None, attributes.STRING, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(INPUT_STAGING, None, attributes.ANY, attributes.VECTOR, attributes.WRITEABLE)
self._attributes_register(OUTPUT_STAGING, None, attributes.ANY, attributes.VECTOR, attributes.WRITEABLE)
# resource requirements
self._attributes_register(CPU_PROCESSES, None, attributes.INT, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(CPU_PROCESS_TYPE, None, attributes.STRING, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(CPU_THREADS, None, attributes.INT, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(CPU_THREAD_TYPE, None, attributes.STRING, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(GPU_PROCESSES, None, attributes.INT, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(GPU_PROCESS_TYPE, None, attributes.STRING, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(GPU_THREADS, None, attributes.INT, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(GPU_THREAD_TYPE, None, attributes.STRING, attributes.SCALAR, attributes.WRITEABLE)
self._attributes_register(LFS_PER_PROCESS, None, attributes.INT, attributes.SCALAR, attributes.WRITEABLE)
# tag -- user level tag that can be used in scheduling
self._attributes_register(TAG, None, attributes.STRING, attributes.SCALAR, attributes.WRITEABLE)
# dependencies
# self._attributes_register(RUN_AFTER, None, attributes.STRING, attributes.VECTOR, attributes.WRITEABLE)
# self._attributes_register(START_AFTER, None, attributes.STRING, attributes.VECTOR, attributes.WRITEABLE)
# self._attributes_register(CONCURRENT_WITH, None, attributes.STRING, attributes.VECTOR, attributes.WRITEABLE)
# self._attributes_register(START_TIME, None, attributes.TIME, attributes.SCALAR, attributes.WRITEABLE)
# self._attributes_register(RUN_TIME, None, attributes.TIME, attributes.SCALAR, attributes.WRITEABLE)
# explicitly set attrib defaults so they get listed and included via as_dict()
self.set_attribute (KERNEL, None)
self.set_attribute (NAME, None)
self.set_attribute (EXECUTABLE, None)
self.set_attribute (ARGUMENTS, list())
self.set_attribute (ENVIRONMENT, dict())
self.set_attribute (PRE_EXEC, list())
self.set_attribute (POST_EXEC, list())
self.set_attribute (STDOUT, None)
self.set_attribute (STDERR, None)
self.set_attribute (INPUT_STAGING, list())
self.set_attribute (OUTPUT_STAGING, list())
self.set_attribute (CPU_PROCESSES, 1)
self.set_attribute (CPU_PROCESS_TYPE, '')
self.set_attribute (CPU_THREADS, 1)
self.set_attribute (CPU_THREAD_TYPE, '')
self.set_attribute (GPU_PROCESSES, 0)
self.set_attribute (GPU_PROCESS_TYPE, '')
self.set_attribute (GPU_THREADS, 1)
self.set_attribute (GPU_THREAD_TYPE, '')
self.set_attribute (GPU_THREAD_TYPE, '')
self.set_attribute (LFS_PER_PROCESS, 0)
self.set_attribute (TAG, None)
self.set_attribute (RESTARTABLE, False)
self.set_attribute (METADATA, None)
self.set_attribute (CLEANUP, False)
self.set_attribute (PILOT, '')
self._attributes_register_deprecated(CORES, CPU_PROCESSES)
self._attributes_register_deprecated(MPI, CPU_PROCESS_TYPE)
# apply initialization dict
if from_dict:
self.from_dict(from_dict)
# --------------------------------------------------------------------------
#
def __deepcopy__ (self, memo):
other = ComputeUnitDescription ()
for key in self.list_attributes ():
other.set_attribute(key, self.get_attribute (key))
return other
# --------------------------------------------------------------------------
#
def __str__(self):
"""Returns a string representation of the object.
"""
return str(self.as_dict())
# --------------------------------------------------------------------------
#
[docs] def verify(self):
'''
Verify that the description is syntactically and semantically correct.
This method encapsulates checks beyond the SAGA attribute level checks.
'''
# replace 'None' values for strng types with '', for int types with '0'.
if self.get(KERNEL ) is None: self[KERNEL ] = ''
if self.get(NAME ) is None: self[NAME ] = ''
if self.get(EXECUTABLE ) is None: self[EXECUTABLE ] = ''
if self.get(ARGUMENTS ) is None: self[ARGUMENTS ] = ''
if self.get(ENVIRONMENT ) is None: self[ENVIRONMENT ] = ''
if self.get(PRE_EXEC ) is None: self[PRE_EXEC ] = ''
if self.get(POST_EXEC ) is None: self[POST_EXEC ] = ''
if self.get(PILOT ) is None: self[PILOT ] = ''
if self.get(STDOUT ) is None: self[STDOUT ] = ''
if self.get(STDERR ) is None: self[STDERR ] = ''
if self.get(CPU_PROCESS_TYPE) is None: self[CPU_PROCESS_TYPE] = ''
if self.get(CPU_THREAD_TYPE ) is None: self[CPU_THREAD_TYPE ] = ''
if self.get(GPU_PROCESS_TYPE) is None: self[GPU_PROCESS_TYPE] = ''
if self.get(GPU_THREAD_TYPE ) is None: self[GPU_THREAD_TYPE ] = ''
if self.get(CPU_PROCESSES ) is None: self[CPU_PROCESSES ] = 0
if self.get(CPU_THREADS ) is None: self[CPU_THREADS ] = 0
if self.get(GPU_PROCESSES ) is None: self[GPU_PROCESSES ] = 0
if self.get(GPU_THREADS ) is None: self[GPU_THREADS ] = 0
if not self.get('executable') and \
not self.get('kernel') :
raise ValueError("CU description needs 'executable' or 'kernel'")
# ------------------------------------------------------------------------------