Source code for radical.pilot.pilot_manager

#pylint: disable=C0301, C0103, W0212

.. module:: radical.pilot.pilot_manager
   :platform: Unix
   :synopsis: Provides the interface for the PilotManager class.

.. moduleauthor:: Ole Weidner <>

__copyright__ = "Copyright 2013-2014,"
__license__ = "MIT"

import os
import time
import glob
import copy

from radical.pilot.states import *
from radical.pilot.exceptions import *

from radical.pilot.object import Object
from radical.pilot.controller import PilotManagerController
from radical.pilot.compute_pilot import ComputePilot
from radical.pilot.utils.logger import logger
from radical.pilot.exceptions import PilotException, BadParameter
from radical.pilot.resource_config import ResourceConfig

# -----------------------------------------------------------------------------
[docs]class PilotManager(Object): """A PilotManager holds :class:`radical.pilot.ComputePilot` instances that are submitted via the :func:`radical.pilot.PilotManager.submit_pilots` method. It is possible to attach one or more :ref:`chapter_machconf` to a PilotManager to outsource machine specific configuration parameters to an external configuration file. Each PilotManager has a unique identifier :data:`radical.pilot.PilotManager.uid` that can be used to re-connect to previoulsy created PilotManager in a given :class:`radical.pilot.Session`. **Example**:: s = radical.pilot.Session(database_url=dbURL) pm1 = radical.pilot.PilotManager(session=s, resource_configurations=RESCONF) # Re-connect via the 'get()' method. pm2 = radical.pilot.PilotManager.get(session=s, pilot_manager_id=pm1.uid) # pm1 and pm2 are pointing to the same PilotManager assert pm1.uid == pm2.uid """ # ------------------------------------------------------------------------- #
[docs] def __init__(self, session, pilot_launcher_workers=1, _reconnect=False): """Creates a new PilotManager and attaches is to the session. .. note:: The `resource_configurations` (see :ref:`chapter_machconf`) parameter is currently mandatory for creating a new PilotManager instance. **Arguments:** * **session** [:class:`radical.pilot.Session`]: The session instance to use. * **resource_configurations** [`string` or `list of strings`]: A list of URLs pointing to :ref:`chapter_machconf`. Currently `file://`, `http://` and `https://` URLs are supported. If one or more resource_configurations are provided, Pilots submitted via this PilotManager can access the configuration entries in the files via the :class:`ComputePilotDescription`. For example:: pm = radical.pilot.PilotManager(session=s) pd = radical.pilot.ComputePilotDescription() pd.resource = "futuregrid.india" # defined in futuregrid.json pd.cores = 16 pd.runtime = 5 # minutes pilot = pm.submit_pilots(pd) * pilot_launcher_workers (`int`): The number of pilot launcher worker processes to start in the background. .. note:: `pilot_launcher_workers` can be used to tune RADICAL-Pilot's performance. However, you should only change the default values if you know what you are doing. **Returns:** * A new `PilotManager` object [:class:`radical.pilot.PilotManager`]. **Raises:** * :class:`radical.pilot.PilotException` """ self._session = session self._worker = None self._uid = None if _reconnect == True: return ############################### # Create a new pilot manager. # ############################### # Start a worker process fo this PilotManager instance. The worker # process encapsulates database access, persitency et al. self._worker = PilotManagerController( pilot_manager_uid=None, pilot_manager_data={}, pilot_launcher_workers=pilot_launcher_workers, session=self._session, db_connection=session._dbs, db_connection_info=session._connection_info) self._worker.start() self._uid = self._worker.pilot_manager_uid # Each pilot manager has a worker thread associated with it. The task # of the worker thread is to check and update the state of pilots, fire # callbacks and so on. self._session._pilot_manager_objects.append(self) self._session._process_registry.register(self._uid, self._worker) #-------------------------------------------------------------------------- #
[docs] def close(self, terminate=True): """Shuts down the PilotManager and its background workers in a coordinated fashion. **Arguments:** * **terminate** [`bool`]: If set to True, all active pilots will get canceled (default: False). """ logger.error("pmgr %s closing" % (str(self._uid))) # Spit out a warning in case the object was already closed. if not self._uid: logger.error("PilotManager object already closed.") return # before we terminate pilots, we have to kill the pilot launcher threads # -- otherwise we'll run into continous race conditions due to the # ongoing state checks... if self._worker is not None: # Stop the worker process logger.error("pmgr %s cancel worker %s" % (str(self._uid), self._worker.cancel_launcher() logger.error("pmgr %s canceled worker %s" % (str(self._uid), # If terminate is set, we cancel all pilots. if terminate : # cancel all pilots, make sure they are gone, and close the pilot # managers. for pilot in self.get_pilots () : logger.error("pmgr %s cancels pilot %s" % (str(self._uid), pilot._uid)) self.cancel_pilots () # FIXME: # # wait_pilots() will wait until all pilots picked up the sent cancel # signal and died. However, that can take a loooong time. For # example, if a pilot is in 'PENDING_ACTIVE' state, this will have to # wait until the pilot is bootstrapped, started, connected to the DB, # and shut down again. Or, for a pilot which just got a shitload of # units, it will have to wait until the pilot started all those units # and then checks its command queue again. Or, if the pilot job # already died, wait will block until the state checker kicks in and # declares the pilot as dead, which takes a couple of minutes. # # Solution would be to add a CANCELING state and to wait for that one, # too, which basically means to wait until the cancel signal has been # sent. There is not much more to do at this point anyway. This is at # the moment faked in the manager controler, which sets that state # after sending the cancel command. This should be converted into # a proper state -- that would, btw, remove the need for a cancel # command in the first place, as the pilot can just pull its own state # instead, and cancel on CANCELING... # # self.wait_pilots () wait_for_cancel = True all_pilots = self.get_pilots () while wait_for_cancel : wait_for_cancel = False for pilot in all_pilots : logger.error("pmgr %s wait for pilot %s (%s)" % (str(self._uid), pilot._uid, pilot.state)) if pilot.state not in [DONE, FAILED, CANCELED, CANCELING] : time.sleep (1) wait_for_cancel = True break for pilot in self.get_pilots () : logger.error("pmgr %s canceled pilot %s" % (str(self._uid), pilot._uid)) logger.error("pmgr %s stops worker %s" % (str(self._uid), self._worker.stop() self._worker.join() logger.error("pmgr %s stopped worker %s" % (str(self._uid), # Remove worker from registry self._session._process_registry.remove(self._uid) logger.error("pmgr %s closed" % (str(self._uid))) self._uid = None #-------------------------------------------------------------------------- #
@classmethod def _reconnect(cls, session, pilot_manager_id): """PRIVATE: reconnect to an existing pilot manager. """ uid_exists = PilotManagerController.uid_exists( db_connection=session._dbs, pilot_manager_uid=pilot_manager_id ) if not uid_exists: raise BadParameter( "PilotManager with id '%s' not in database." % pilot_manager_id) obj = cls(session=session, _reconnect=True) obj._uid = pilot_manager_id # Retrieve or start a worker process fo this PilotManager instance. worker = session._process_registry.retrieve(pilot_manager_id) if worker is not None: obj._worker = worker else: obj._worker = PilotManagerController( pilot_manager_uid=pilot_manager_id, pilot_manager_data={}, session=session, db_connection=session._dbs, db_connection_info=session._connection_info) session._process_registry.register(pilot_manager_id, obj._worker) # start the worker if it's not already running if obj._worker.is_alive() is False: obj._worker.start() return obj # ------------------------------------------------------------------------- #
[docs] def as_dict(self): """Returns a Python dictionary representation of the object. """ self._assert_obj_is_valid() object_dict = { 'uid': self.uid } return object_dict # ------------------------------------------------------------------------- #
def __str__(self): """Returns a string representation of the object. """ return str(self.as_dict()) # ------------------------------------------------------------------------- #
[docs] def submit_pilots(self, pilot_descriptions): """Submits a new :class:`radical.pilot.ComputePilot` to a resource. **Returns:** * One or more :class:`radical.pilot.ComputePilot` instances [`list of :class:`radical.pilot.ComputePilot`]. **Raises:** * :class:`radical.pilot.PilotException` """ # Check if the object instance is still valid. self._assert_obj_is_valid() # Implicit list conversion. return_list_type = True if not isinstance(pilot_descriptions, list): return_list_type = False pilot_descriptions = [pilot_descriptions] # Itereate over the pilot descriptions, try to create a pilot for # each one and append it to 'pilot_obj_list'. pilot_obj_list = list() for pilot_description in pilot_descriptions: if pilot_description.resource is None: error_msg = "ComputePilotDescription does not define mandatory attribute 'resource'." raise BadParameter(error_msg) elif pilot_description.runtime is None: error_msg = "ComputePilotDescription does not define mandatory attribute 'runtime'." raise BadParameter(error_msg) elif pilot_description.cores is None: error_msg = "ComputePilotDescription does not define mandatory attribute 'cores'." raise BadParameter(error_msg) resource_key = pilot_description.resource resource_cfg = self._session.get_resource_config(resource_key) # Check resource-specific mandatory attributes if "mandatory_args" in resource_cfg: for ma in resource_cfg["mandatory_args"]: if getattr(pilot_description, ma) is None: error_msg = "ComputePilotDescription does not define attribute '{0}' which is required for '{1}'.".format(ma, resource_key) raise BadParameter(error_msg) # we expand and exchange keys in the resource config, depending on # the selected schema so better use a deep copy... import copy resource_cfg = copy.deepcopy (resource_cfg) schema = pilot_description['access_schema'] if not schema : if 'schemas' in resource_cfg : schema = resource_cfg['schemas'][0] # import pprint # print "no schema, using %s" % schema # pprint.pprint (pilot_description) if not schema in resource_cfg : # import pprint # pprint.pprint (resource_cfg) logger.warning ("schema %s unknown for resource %s -- continue with defaults" \ % (schema, resource_key)) else : for key in resource_cfg[schema] : # merge schema specific resource keys into the # resource config resource_cfg[key] = resource_cfg[schema][key] # If 'default_sandbox' is defined, set it. if pilot_description.sandbox is not None: if "valid_roots" in resource_cfg and resource_cfg["valid_roots"] is not None: is_valid = False for vr in resource_cfg["valid_roots"]: if pilot_description.sandbox.startswith(vr): is_valid = True if is_valid is False: raise BadParameter("Working directory for resource '%s' defined as '%s' but needs to be rooted in %s " % (resource_key, pilot_description.sandbox, resource_cfg["valid_roots"])) # After the sanity checks have passed, we can register a pilot # startup request with the worker process and create a facade # object. pilot = ComputePilot._create( pilot_description=pilot_description, pilot_manager_obj=self) pilot_uid = self._worker.register_start_pilot_request( pilot=pilot, resource_config=resource_cfg) pilot._uid = pilot_uid pilot_obj_list.append(pilot) # Implicit return value conversion if return_list_type : return pilot_obj_list else: return pilot_obj_list[0] # ------------------------------------------------------------------------- #
[docs] def list_pilots(self): """Lists the unique identifiers of all :class:`radical.pilot.ComputePilot` instances associated with this PilotManager **Returns:** * A list of :class:`radical.pilot.ComputePilot` uids [`string`]. **Raises:** * :class:`radical.pilot.PilotException` """ # Check if the object instance is still valid. self._assert_obj_is_valid() # Get the pilot list from the worker return self._worker.list_pilots() # ------------------------------------------------------------------------- #
[docs] def get_pilots(self, pilot_ids=None): """Returns one or more :class:`radical.pilot.ComputePilot` instances. **Arguments:** * **pilot_uids** [`list of strings`]: If pilot_uids is set, only the Pilots with the specified uids are returned. If pilot_uids is `None`, all Pilots are returned. **Returns:** * A list of :class:`radical.pilot.ComputePilot` objects [`list of :class:`radical.pilot.ComputePilot`]. **Raises:** * :class:`radical.pilot.PilotException` """ self._assert_obj_is_valid() return_list_type = True if (not isinstance(pilot_ids, list)) and (pilot_ids is not None): return_list_type = False pilot_ids = [pilot_ids] pilots = ComputePilot._get(pilot_ids=pilot_ids, pilot_manager_obj=self) if return_list_type : return pilots else : return pilots[0] # ------------------------------------------------------------------------- #
[docs] def wait_pilots(self, pilot_ids=None, state=[DONE, FAILED, CANCELED], timeout=None): """Returns when one or more :class:`radical.pilot.ComputePilots` reach a specific state or when an optional timeout is reached. If `pilot_uids` is `None`, `wait_pilots` returns when **all** Pilots reach the state defined in `state`. **Arguments:** * **pilot_uids** [`string` or `list of strings`] If pilot_uids is set, only the Pilots with the specified uids are considered. If pilot_uids is `None` (default), all Pilots are considered. * **state** [`list of strings`] The state(s) that Pilots have to reach in order for the call to return. By default `wait_pilots` waits for the Pilots to reach a **terminal** state, which can be one of the following: * :data:`radical.pilot.DONE` * :data:`radical.pilot.FAILED` * :data:`radical.pilot.CANCELED` * **timeout** [`float`] Optional timeout in seconds before the call returns regardless whether the Pilots have reached the desired state or not. The default value **-1.0** never times out. **Raises:** * :class:`radical.pilot.PilotException` """ self._assert_obj_is_valid() if not isinstance(state, list): state = [state] return_list_type = True if (not isinstance(pilot_ids, list)) and (pilot_ids is not None): return_list_type = False pilot_ids = [pilot_ids] start = time.time() all_ok = False states = list() while not all_ok : pilots = self._worker.get_compute_pilot_data(pilot_ids=pilot_ids) all_ok = True states = list() for pilot in pilots : if pilot['state'] not in state : all_ok = False states.append (pilot['state']) # check timeout if (None != timeout) and (timeout <= (time.time() - start)): if not all_ok : logger.debug ("wait timed out: %s" % states) break # sleep a little if this cycle was idle if not all_ok : time.sleep (0.1) # done waiting if return_list_type : return states else : return states[0] # ------------------------------------------------------------------------- #
[docs] def cancel_pilots(self, pilot_ids=None): """Cancels one or more ComputePilots. **Arguments:** * **pilot_uids** [`string` or `list of strings`] If pilot_uids is set, only the Pilots with the specified uids are canceled. If pilot_uids is `None`, all Pilots are canceled. **Raises:** * :class:`radical.pilot.PilotException` """ # Check if the object instance is still valid. self._assert_obj_is_valid() # Implicit list conversion. if (not isinstance(pilot_ids, list)) and (pilot_ids is not None): pilot_ids = [pilot_ids] # Register the cancelation request with the worker. self._worker.register_cancel_pilots_request(pilot_ids=pilot_ids) # ------------------------------------------------------------------------- #
[docs] def register_callback(self, callback_function, callback_data=None): """Registers a new callback function with the PilotManager. Manager-level callbacks get called if any of the ComputePilots managed by the PilotManager change their state. All callback functions need to have the same signature:: def callback_func(obj, state, data) where ``object`` is a handle to the object that triggered the callback, ``state`` is the new state of that object, and ``data`` are the data passed on callback registration. """ self._assert_obj_is_valid() self._worker.register_manager_callback(callback_function, callback_data)