Source code for radical.pilot.unit_manager

    #pylint: disable=C0301, C0103, W0212

.. module:: radical.pilot.unit_manager
   :platform: Unix
   :synopsis: Implementation of the UnitManager class.

.. moduleauthor:: Ole Weidner <>

__copyright__ = "Copyright 2013-2014,"
__license__ = "MIT"

import os
import time
import weakref

from radical.pilot.compute_unit import ComputeUnit
from radical.pilot.utils.logger import logger

from radical.pilot.controller   import UnitManagerController
from radical.pilot.scheduler    import get_scheduler

from radical.pilot.types        import *
from radical.pilot.states       import *
from radical.pilot.exceptions   import *

# -----------------------------------------------------------------------------
[docs]class UnitManager(object): """A UnitManager manages :class:`radical.pilot.ComputeUnit` instances which represent the **executable** workload in RADICAL-Pilot. A UnitManager connects the ComputeUnits with one or more :class:`Pilot` instances (which represent the workload **executors** in RADICAL-Pilot) and a **scheduler** which determines which :class:`ComputeUnit` gets executed on which :class:`Pilot`. Each UnitManager has a unique identifier :data:`radical.pilot.UnitManager.uid` that can be used to re-connect to previoulsy created UnitManager in a given :class:`radical.pilot.Session`. **Example**:: s = radical.pilot.Session(database_url=DBURL) pm = radical.pilot.PilotManager(session=s) pd = radical.pilot.ComputePilotDescription() pd.resource = "futuregrid.alamo" pd.cores = 16 p1 = pm.submit_pilots(pd) # create first pilot with 16 cores p2 = pm.submit_pilots(pd) # create second pilot with 16 cores # Create a workload of 128 '/bin/sleep' compute units compute_units = [] for unit_count in range(0, 128): cu = radical.pilot.ComputeUnitDescription() cu.executable = "/bin/sleep" cu.arguments = ['60'] compute_units.append(cu) # Combine the two pilots, the workload and a scheduler via # a UnitManager. um = radical.pilot.UnitManager(session=session, scheduler=radical.pilot.SCHED_ROUND_ROBIN) um.add_pilot(p1) um.submit_units(compute_units) """ # ------------------------------------------------------------------------- #
[docs] def __init__(self, session, scheduler=None, input_transfer_workers=2, output_transfer_workers=2, _reconnect=False): """Creates a new UnitManager and attaches it to the session. **Args:** * session (`string`): The session instance to use. * scheduler (`string`): The name of the scheduler plug-in to use. * input_transfer_workers (`int`): The number of input file transfer worker processes to launch in the background. * output_transfer_workers (`int`): The number of output file transfer worker processes to launch in the background. .. note:: `input_transfer_workers` and `output_transfer_workers` can be used to tune RADICAL-Pilot's file transfer performance. However, you should only change the default values if you know what you are doing. **Raises:** * :class:`radical.pilot.PilotException` """ self._session = session self._worker = None self._pilots = list() # keep track of some changing metrics self.wait_queue_size = 0 if _reconnect is False: # Start a worker process fo this UnitManager instance. The worker # process encapsulates database access et al. self._worker = UnitManagerController( unit_manager_uid=None, scheduler=scheduler, input_transfer_workers=input_transfer_workers, output_transfer_workers=output_transfer_workers, session=self._session, db_connection=session._dbs, db_connection_info=session._connection_info) self._worker.start() self._uid = self._worker.unit_manager_uid self._scheduler = get_scheduler(name=scheduler, manager=self, session=self._session) # Each unit manager has a worker thread associated with it. # The task of the worker thread is to check and update the state # of units, fire callbacks and so on. self._session._unit_manager_objects.append(self) self._session._process_registry.register(self._uid, self._worker) else: # re-connect. do nothing pass #-------------------------------------------------------------------------- #
[docs] def close(self): """Shuts down the UnitManager and its background workers in a coordinated fashion. """ if not self._uid: logger.warning("UnitManager object already closed.") return if self._worker is not None: self._worker.stop() # Remove worker from registry self._session._process_registry.remove(self._uid)"Closed UnitManager %s." % str(self._uid)) self._uid = None #-------------------------------------------------------------------------- #
@classmethod def _reconnect(cls, session, unit_manager_id): """PRIVATE: Reconnect to an existing UnitManager. """ uid_exists = UnitManagerController.uid_exists( db_connection=session._dbs, unit_manager_uid=unit_manager_id) if not uid_exists: raise BadParameter( "UnitManager with id '%s' not in database." % unit_manager_id) # The UnitManager object obj = cls(session=session, scheduler=None, _reconnect=True) # Retrieve or start a worker process fo this PilotManager instance. worker = session._process_registry.retrieve(unit_manager_id) if worker is not None: obj._worker = worker else: obj._worker = UnitManagerController( unit_manager_uid=unit_manager_id, session=session, db_connection=session._dbs, db_connection_info=session._connection_info) session._process_registry.register(unit_manager_id, obj._worker) # start the worker if it's not already running if obj._worker.is_alive() is False: obj._worker.start() # Now that the worker is running (again), we can get more information # about the UnitManager um_data = obj._worker.get_unit_manager_data() obj._scheduler = get_scheduler(name=um_data['scheduler'], manager=obj, session=obj._session) # FIXME: we need to tell the scheduler about all the pilots... obj._uid = unit_manager_id"Reconnected to existing UnitManager %s." % str(obj)) return obj # ------------------------------------------------------------------------- #
[docs] def as_dict(self): """Returns a Python dictionary representation of the UnitManager object. """ obj_dict = { 'uid': self.uid, 'scheduler': self.scheduler, 'scheduler_details': self.scheduler_details } return obj_dict # ------------------------------------------------------------------------- #
def __str__(self): """Returns a string representation of the UnitManager object. """ return str(self.as_dict()) #-------------------------------------------------------------------------- # @property def uid(self): """Returns the unique id. """ return self._uid #-------------------------------------------------------------------------- # @property def scheduler(self): """Returns the scheduler name. """ if not self._uid: raise IncorrectState(msg="Invalid object instance.") return #-------------------------------------------------------------------------- # @property def scheduler_details(self): """Returns the scheduler logs. """ if not self._uid: raise IncorrectState(msg="Invalid object instance.") return "NO SCHEDULER DETAILS (Not Implemented)" # ------------------------------------------------------------------------- #
[docs] def add_pilots(self, pilots): """Associates one or more pilots with the unit manager. **Arguments:** * **pilots** [:class:`radical.pilot.ComputePilot` or list of :class:`radical.pilot.ComputePilot`]: The pilot objects that will be added to the unit manager. **Raises:** * :class:`radical.pilot.PilotException` """ if not self._uid: raise IncorrectState(msg="Invalid object instance.") if not isinstance(pilots, list): pilots = [pilots] pilot_ids = self.list_pilots() for pilot in pilots : if pilot.uid in pilot_ids : logger.warning ('adding the same pilot twice (%s)' % pilot.uid) self._worker.add_pilots(pilots) # let the scheduler know... for pilot in pilots : self._scheduler.pilot_added (pilot) # also keep the instances around for pilot in pilots : self._pilots.append (pilot) # ------------------------------------------------------------------------- #
[docs] def list_pilots(self): """Lists the UIDs of the pilots currently associated with the unit manager. **Returns:** * A list of :class:`radical.pilot.ComputePilot` UIDs [`string`]. **Raises:** * :class:`radical.pilot.PilotException` """ if not self._uid: raise IncorrectState(msg="Invalid object instance.") return self._worker.get_pilot_uids() # ------------------------------------------------------------------------- #
[docs] def get_pilots(self): """get the pilots instances currently associated with the unit manager. **Returns:** * A list of :class:`radical.pilot.ComputePilot` instances. **Raises:** * :class:`radical.pilot.PilotException` """ if not self._uid: raise IncorrectState(msg="Invalid object instance.") return self._pilots # ------------------------------------------------------------------------- #
[docs] def remove_pilots(self, pilot_ids, drain=True): """Disassociates one or more pilots from the unit manager. TODO: Implement 'drain'. After a pilot has been removed from a unit manager, it won't process any of the unit manager's units anymore. Calling `remove_pilots` doesn't stop the pilot itself. **Arguments:** * **drain** [`boolean`]: Drain determines what happens to the units which are managed by the removed pilot(s). If `True`, all units currently assigned to the pilot are allowed to finish execution. If `False` (the default), then `ACTIVE` units will be canceled. **Raises:** * :class:`radical.pilot.PilotException` """ if not self._uid: raise IncorrectState(msg="Invalid object instance.") if not isinstance(pilot_ids, list): pilot_ids = [pilot_ids] self._worker.remove_pilots(pilot_ids) # FIXME: # if a pilot gets removed, we need to re-assign all its units to other # pilots. We thus move them all into the wait queue, and call the # global rescheduler. Some of the CUs might already be in final state # -- those are ignored (they'll be in the done_queue anyways). We leave # it to the scheduling policy what happens to non-NEW CUs in the # wait_queue, i.e. if they get rescheduled, or if they'll raise an # error. # let the scheduler know... for pilot_id in pilot_ids : self._scheduler.pilot_removed (pilot_id) # update instance list for pilot_id in pilot_ids : for pilot in self._pilots[:] : if pilot_id == pilot.uid : self._pilots.remove (pilot) # ------------------------------------------------------------------------- #
[docs] def list_units(self): """Returns the UIDs of the :class:`radical.pilot.ComputeUnit` managed by this unit manager. **Returns:** * A list of :class:`radical.pilot.ComputeUnit` UIDs [`string`]. """ if not self._uid: raise IncorrectState(msg="Invalid object instance.") return self._worker.get_compute_unit_uids() # ------------------------------------------------------------------------- #
[docs] def submit_units(self, unit_descriptions): """Submits on or more :class:`radical.pilot.ComputeUnit` instances to the unit manager. **Arguments:** * **unit_descriptions** [:class:`radical.pilot.ComputeUnitDescription` or list of :class:`radical.pilot.ComputeUnitDescription`]: The description of the compute unit instance(s) to create. **Returns:** * A list of :class:`radical.pilot.ComputeUnit` objects. **Raises:** * :class:`radical.pilot.PilotException` """ if not self._uid: raise IncorrectState(msg="Invalid object instance.") return_list_type = True if not isinstance(unit_descriptions, list): return_list_type = False unit_descriptions = [unit_descriptions] # we return a list of compute units ret = list() # the scheduler will return a dictionary of the form: # { # ud_1 : pilot_id_a, # ud_2 : pilot_id_b # ... # } # # The scheduler may not be able to schedule some units - those will # have 'None' as pilot ID. units = list() for ud in unit_descriptions : units.append (ComputeUnit._create (unit_description=ud, unit_manager_obj=self, local_state=NEW)) self._worker.publish_compute_units (units=units) schedule = None try: schedule = self._scheduler.schedule (units=units) except Exception as e: logger.exception ("Internal error - unit scheduler failed") raise self.handle_schedule (schedule) if return_list_type : return units else : return units[0] # ------------------------------------------------------------------------- #
def handle_schedule (self, schedule) : # we want to use bulk submission to the pilots, so we collect all units # assigned to the same set of pilots. At the same time, we select # unscheduled units for later insertion into the wait queue. if not schedule : logger.debug ('skipping empty unit schedule') return # print 'handle schedule:' # import pprint # pprint.pprint (schedule) # pilot_cu_map = dict() unscheduled = list() pilot_ids = self.list_pilots () for unit in schedule['units'].keys() : pid = schedule['units'][unit] if None == pid : unscheduled.append (unit) continue else : if pid not in pilot_ids : raise RuntimeError ("schedule points to unknown pilot %s" % pid) if pid not in pilot_cu_map : pilot_cu_map[pid] = list() pilot_cu_map[pid].append (unit) # submit to all pilots which got something submitted to for pid in pilot_cu_map.keys(): units_to_schedule = list() # if a kernel name is in the cu descriptions set, do kernel expansion for unit in pilot_cu_map[pid] : if not pid in schedule['pilots'] : # lost pilot, do not schedule unit logger.warn ("unschedule unit %s, lost pilot %s" % (unit.uid, pid)) continue unit.sandbox = schedule['pilots'][pid]['sandbox'] + "/unit-" + str(unit.uid) ud = unit.description if 'kernel' in ud and ud['kernel'] : try : from radical.ensemblemd.mdkernels import MDTaskDescription except Exception as ex : logger.error ("Kernels are not supported in" \ "compute unit descriptions -- install " \ "radical.ensemblemd.mdkernels!") # FIXME: unit needs a '_set_state() method or something! self._session._dbs.set_compute_unit_state (unit._uid, FAILED, ["kernel expansion failed"]) continue pilot_resource = schedule['pilots'][pid]['resource'] mdtd = MDTaskDescription () mdtd.kernel = ud.kernel mdtd_bound = mdtd.bind (resource=pilot_resource) ud.environment = mdtd_bound.environment ud.pre_exec = mdtd_bound.pre_exec ud.executable = mdtd_bound.executable ud.mpi = mdtd_bound.mpi units_to_schedule.append (unit) if len(units_to_schedule) : self._worker.schedule_compute_units (pilot_uid=pid, units=units_to_schedule) # report any change in wait_queue_size old_wait_queue_size = self.wait_queue_size self.wait_queue_size = len(unscheduled) if old_wait_queue_size != self.wait_queue_size : self._worker.fire_manager_callback (WAIT_QUEUE_SIZE, self, self.wait_queue_size) if len(unscheduled) : self._worker.unschedule_compute_units (units=unscheduled) ('%s units remain unscheduled' % len(unscheduled)) # ------------------------------------------------------------------------- #
[docs] def get_units(self, unit_ids=None): """Returns one or more compute units identified by their IDs. **Arguments:** * **unit_ids** [`string` or `list of strings`]: The IDs of the compute unit objects to return. **Returns:** * A list of :class:`radical.pilot.ComputeUnit` objects. **Raises:** * :class:`radical.pilot.PilotException` """ if not self._uid: raise IncorrectState(msg="Invalid object instance.") return_list_type = True if (not isinstance(unit_ids, list)) and (unit_ids is not None): return_list_type = False unit_ids = [unit_ids] units = ComputeUnit._get(unit_ids=unit_ids, unit_manager_obj=self) if return_list_type : return units else : return units[0] # ------------------------------------------------------------------------- #
[docs] def wait_units(self, unit_ids=None, state=[DONE, FAILED, CANCELED], timeout=None): """Returns when one or more :class:`radical.pilot.ComputeUnits` reach a specific state. If `unit_uids` is `None`, `wait_units` returns when **all** ComputeUnits reach the state defined in `state`. **Example**:: # TODO -- add example **Arguments:** * **unit_uids** [`string` or `list of strings`] If unit_uids is set, only the ComputeUnits with the specified uids are considered. If unit_uids is `None` (default), all ComputeUnits are considered. * **state** [`string`] The state that ComputeUnits have to reach in order for the call to return. By default `wait_units` waits for the ComputeUnits to reach a terminal state, which can be one of the following: * :data:`radical.pilot.DONE` * :data:`radical.pilot.FAILED` * :data:`radical.pilot.CANCELED` * **timeout** [`float`] Timeout in seconds before the call returns regardless of Pilot state changes. The default value **None** waits forever. **Raises:** * :class:`radical.pilot.PilotException` """ if not self._uid: raise IncorrectState(msg="Invalid object instance.") if not isinstance(state, list): state = [state] return_list_type = True if (not isinstance(unit_ids, list)) and (unit_ids is not None): return_list_type = False unit_ids = [unit_ids] units = self.get_units (unit_ids) start = time.time() all_ok = False states = list() while not all_ok : all_ok = True states = list() for unit in units : if unit.state not in state : all_ok = False states.append (unit.state) # check timeout if (None != timeout) and (timeout <= (time.time() - start)): if not all_ok : logger.debug ("wait timed out: %s" % states) break # sleep a little if this cycle was idle if not all_ok : time.sleep (0.1) # done waiting if return_list_type : return states else : return states[0] # ------------------------------------------------------------------------- #
[docs] def cancel_units(self, unit_ids=None): """Cancel one or more :class:`radical.pilot.ComputeUnits`. **Arguments:** * **unit_ids** [`string` or `list of strings`]: The IDs of the compute unit objects to cancel. **Raises:** * :class:`radical.pilot.PilotException` """ if not self._uid: raise IncorrectState(msg="Invalid object instance.") if (not isinstance(unit_ids, list)) and (unit_ids is not None): unit_ids = [unit_ids] cus = self.get_units(unit_ids) for cu in cus: cu.cancel() # ------------------------------------------------------------------------- #
[docs] def register_callback(self, callback_function, metric=UNIT_STATE, callback_data=None): """ Registers a new callback function with the UnitManager. Manager-level callbacks get called if the specified metric changes. The default metric `UNIT_STATE` fires the callback if any of the ComputeUnits managed by the PilotManager change their state. All callback functions need to have the same signature:: def callback_func(obj, value, data) where ``object`` is a handle to the object that triggered the callback, ``value`` is the metric, and ``data`` is the data provided on callback registration.. In the example of `UNIT_STATE` above, the object would be the unit in question, and the value would be the new state of the unit. Available metrics are: * `UNIT_STATE`: fires when the state of any of the units which are managed by this unit manager instance is changing. It communicates the unit object instance and the units new state. * `WAIT_QUEUE_SIZE`: fires when the number of unscheduled units (i.e. of units which have not been assigned to a pilot for execution) changes. """ if metric not in UNIT_MANAGER_METRICS : raise ValueError ("Metric '%s' is not available on the unit manager" % metric) self._worker.register_manager_callback(callback_function, metric, callback_data)