Source code for radical.pilot.compute_unit


__copyright__ = "Copyright 2013-2016, http://radical.rutgers.edu"
__license__   = "MIT"


import os
import copy
import time
import threading

import radical.utils as ru

from . import utils     as rpu
from . import states    as rps
from . import constants as rpc
from . import types     as rpt

from . import compute_unit_description as cud

from .staging_directives import expand_description
from .staging_directives import TRANSFER, COPY, LINK, MOVE, STAGING_AREA


# ------------------------------------------------------------------------------
#
[docs]class ComputeUnit(object): """ A ComputeUnit represent a 'task' that is executed on a ComputePilot. ComputeUnits allow to control and query the state of this task. .. note:: A unit cannot be created directly. The factory method :meth:`radical.pilot.UnitManager.submit_units` has to be used instead. **Example**:: umgr = radical.pilot.UnitManager(session=s) ud = radical.pilot.ComputeUnitDescription() ud.executable = "/bin/date" unit = umgr.submit_units(ud) """ # -------------------------------------------------------------------------- # In terms of implementation, a CU is not much more than a dict whose # content are dynamically updated to reflect the state progression through # the UMGR components. As a CU is always created via a UMGR, it is # considered to *belong* to that UMGR, and all activities are actually # implemented by that UMGR. # # Note that this implies that we could create CUs before submitting them # to a UMGR, w/o any problems. (FIXME?) # -------------------------------------------------------------------------- # -------------------------------------------------------------------------- # def __init__(self, umgr, descr): # NOTE GPU: we allow `mpi` for backward compatibility - but need to # convert the bool into a decent value for `cpu_process_type` if descr[cud.CPU_PROCESS_TYPE] in [True, 'True']: descr[cud.CPU_PROCESS_TYPE] = cud.MPI # ensure that the description is viable descr.verify() # 'static' members self._descr = descr.as_dict() self._umgr = umgr # initialize state self._session = self._umgr.session self._uid = ru.generate_id('unit.%(counter)06d', ru.ID_CUSTOM) self._state = rps.NEW self._log = umgr._log self._exit_code = None self._stdout = None self._stderr = None self._pilot = descr.get('pilot') self._resource_sandbox = None self._pilot_sandbox = None self._unit_sandbox = None self._client_sandbox = None self._callbacks = dict() self._cb_lock = threading.RLock() for m in rpt.UMGR_METRICS: self._callbacks[m] = dict() # we always invke the default state cb self._callbacks[rpt.UNIT_STATE][self._default_state_cb.__name__] = { 'cb' : self._default_state_cb, 'cb_data' : None} # If staging directives exist, expand them to the full dict version. Do # not, however, expand any URLs as of yet, as we likely don't have # sufficient information about pilot sandboxes etc. expand_description(self._descr) self._umgr.advance(self.as_dict(), rps.NEW, publish=False, push=False) # -------------------------------------------------------------------------- # def __repr__(self): return str(self.as_dict()) # -------------------------------------------------------------------------- # def __str__(self): return [self.uid, self.pilot, self.state] # -------------------------------------------------------------------------- # def _default_state_cb(self, unit, state): self._log.info("[Callback]: unit %s state: %s.", self.uid, self.state) # -------------------------------------------------------------------------- # def _update(self, unit_dict): """ This will update the facade object after state changes etc, and is invoked by whatever component receiving that updated information. Return True if state changed, False otherwise """ assert(unit_dict['uid'] == self.uid), 'update called on wrong instance' # NOTE: this method relies on state updates to arrive in order, and # without gaps. current = self.state target = unit_dict['state'] if target not in [rps.FAILED, rps.CANCELED]: try: assert(rps._unit_state_value(target) - rps._unit_state_value(current) == 1), \ 'invalid state transition' except: self._log.error('%s: invalid state transition %s -> %s', self.uid, current, target) raise self._state = target # we update all fields # FIXME: well, not all really :/ # FIXME: setattr is ugly... we should maintain all state in a dict. for key in ['state', 'stdout', 'stderr', 'exit_code', 'pilot', 'resource_sandbox', 'pilot_sandbox', 'unit_sandbox', 'client_sandbox']: val = unit_dict.get(key, None) if val != None: setattr(self, "_%s" % key, val) # invoke unit specific callbacks for cb_name, cb_val in self._callbacks[rpt.UNIT_STATE].iteritems(): cb = cb_val['cb'] cb_data = cb_val['cb_data'] self._log.debug('%s calls state cb %s', self.uid, cb) if cb_data: cb(self, self.state, cb_data) else : cb(self, self.state) # ask umgr to invoke any global callbacks self._umgr._call_unit_callbacks(self, self.state) # -------------------------------------------------------------------------- #
[docs] def as_dict(self): """ Returns a Python dictionary representation of the object. """ ret = { 'type': 'unit', 'umgr': self.umgr.uid, 'uid': self.uid, 'name': self.name, 'state': self.state, 'exit_code': self.exit_code, 'stdout': self.stdout, 'stderr': self.stderr, 'pilot': self.pilot, 'resource_sandbox': self.resource_sandbox, 'pilot_sandbox': self.pilot_sandbox, 'unit_sandbox': self.unit_sandbox, 'client_sandbox': self.client_sandbox, 'description': self.description # this is a deep copy } return ret
# -------------------------------------------------------------------------- # @property def session(self): """ Returns the unit's session. **Returns:** * A :class:`Session`. """ return self._session # -------------------------------------------------------------------------- # @property def umgr(self): """ Returns the unit's manager. **Returns:** * A :class:`UnitManager`. """ return self._umgr # -------------------------------------------------------------------------- # @property def uid(self): """ Returns the unit's unique identifier. The uid identifies the unit within a :class:`UnitManager`. **Returns:** * A unique identifier (string). """ return self._uid # -------------------------------------------------------------------------- # @property def name(self): """ Returns the unit's application specified name. **Returns:** * A name (string). """ return self._descr.get('name') # -------------------------------------------------------------------------- # @property def state(self): """ Returns the current state of the unit. **Returns:** * state (string enum) """ return self._state # -------------------------------------------------------------------------- # @property def exit_code(self): """ Returns the exit code of the unit, if that is already known, or 'None' otherwise. **Returns:** * exit code (int) """ return self._exit_code # -------------------------------------------------------------------------- # @property def stdout(self): """ Returns a snapshot of the executable's STDOUT stream. If this property is queried before the unit has reached 'DONE' or 'FAILED' state it will return None. .. warning: This can be inefficient. Output may be incomplete and/or filtered. **Returns:** * stdout (string) """ return self._stdout # -------------------------------------------------------------------------- # @property def stderr(self): """ Returns a snapshot of the executable's STDERR stream. If this property is queried before the unit has reached 'DONE' or 'FAILED' state it will return None. .. warning: This can be inefficient. Output may be incomplete and/or filtered. **Returns:** * stderr (string) """ return self._stderr # -------------------------------------------------------------------------- # @property def pilot(self): """ Returns the pilot ID of this unit, if that is already known, or 'None' otherwise. **Returns:** * A pilot ID (string) """ return self._pilot # -------------------------------------------------------------------------- # @property def working_directory(self): # **NOTE:** deprecated, use *`sandbox`* return self.sandbox @property def sandbox(self): return self.unit_sandbox @property def unit_sandbox(self): """ Returns the full sandbox URL of this unit, if that is already known, or 'None' otherwise. **Returns:** * A URL (radical.utils.Url). """ # NOTE: The unit has a sandbox property, containing the full sandbox # path, which is used by the umgr to stage data back and forth. # However, the full path as visible from the umgr side might not # be what the agent is seeing, specifically in the case of # non-shared filesystems (OSG). The agent thus uses # `$PWD/cu['uid']` as sandbox, with the assumption that this will # get mapped to whatever is here returned as sandbox URL. # # There is thus implicit knowledge shared between the RP client # and the RP agent on how the sandbox path is formed! return self._unit_sandbox @property def resource_sandbox(self): return self._resource_sandbox @property def pilot_sandbox(self): return self._pilot_sandbox @property def client_sandbox(self): return self._client_sandbox # -------------------------------------------------------------------------- # @property def description(self): """ Returns the description the unit was started with, as a dictionary. **Returns:** * description (dict) """ return copy.deepcopy(self._descr) # -------------------------------------------------------------------------- # @property def metadata(self): """ Returns the metadata field of the unit's description """ return copy.deepcopy(self._descr.get('metadata')) # -------------------------------------------------------------------------- #
[docs] def register_callback(self, cb, cb_data=None): """ Registers a callback function that is triggered every time the unit's state changes. All callback functions need to have the same signature:: def cb(obj, state) where ``object`` is a handle to the object that triggered the callback and ``state`` is the new state of that object. If 'cb_data' is given, then the 'cb' signature changes to def cb(obj, state, cb_data) and 'cb_data' are passed along. """ self._umgr.register_callback(self.uid, rpt.UNIT_STATE, cb, cb_data)
# -------------------------------------------------------------------------- #
[docs] def wait(self, state=None, timeout=None): """ Returns when the unit reaches a specific state or when an optional timeout is reached. **Arguments:** * **state** [`list of strings`] The state(s) that unit has to reach in order for the call to return. By default `wait` waits for the unit to reach a **final** state, which can be one of the following: * :data:`radical.pilot.states.DONE` * :data:`radical.pilot.states.FAILED` * :data:`radical.pilot.states.CANCELED` * **timeout** [`float`] Optional timeout in seconds before the call returns regardless whether the unit has reached the desired state or not. The default value **None** never times out. """ if not state: states = rps.FINAL if not isinstance(state, list): states = [state] else: states = state if self.state in rps.FINAL: # we will never see another state progression. Raise an error # (unless we waited for this) if self.state in states: return # FIXME: do we want a raise here, really? This introduces a race, # really, on application level # raise RuntimeError("can't wait on a unit in final state") return self.state start_wait = time.time() while self.state not in states: time.sleep(0.1) if timeout and (timeout <= (time.time() - start_wait)): break # if self._umgr._terminate.is_set(): # break return self.state
# -------------------------------------------------------------------------- #
[docs] def cancel(self): """ Cancel the unit. """ self._umgr.cancel_units(self.uid)
# ------------------------------------------------------------------------------