Source code for radical.pilot.session

#pylint: disable=C0301, C0103, W0212

"""
.. module:: radical.pilot.session
   :platform: Unix
   :synopsis: Implementation of the Session class.

.. moduleauthor:: Ole Weidner <ole.weidner@rutgers.edu>
"""

__copyright__ = "Copyright 2013-2014, http://radical.rutgers.edu"
__license__   = "MIT"

import os 
import glob
import saga
import radical.utils as ru

from radical.pilot.object          import Object
from radical.pilot.unit_manager    import UnitManager
from radical.pilot.pilot_manager   import PilotManager
from radical.pilot.utils.logger    import logger
from radical.pilot.resource_config import ResourceConfig
from radical.pilot.exceptions      import PilotException

from radical.pilot.db              import Session as dbSession
from radical.pilot.db              import DBException

from bson.objectid                 import ObjectId



# ------------------------------------------------------------------------------
#
class _ProcessRegistry(object):
    """A _ProcessRegistry contains a dictionary of all worker processes 
    that are currently active.
    """
    def __init__(self):
        self._dict = dict()

    def register(self, key, process):
        """Add a new process to the registry.
        """
        if key not in self._dict:
            self._dict[key] = process

    def retrieve(self, key):
        """Retrieve a process from the registry.
        """
        if key not in self._dict:
            return None
        else:
            return self._dict[key]

    def keys(self):
        """List all keys of all process in the registry.
        """
        return self._dict.keys()

    def remove(self, key):
        """Remove a process from the registry.
        """
        if key in self._dict:
            del self._dict[key]

# ------------------------------------------------------------------------------
#
[docs]class Session (saga.Session, Object): """A Session encapsulates a RADICAL-Pilot instance and is the *root* object for all other RADICAL-Pilot objects. A Session holds :class:`radical.pilot.PilotManager` and :class:`radical.pilot.UnitManager` instances which in turn hold :class:`radical.pilot.Pilot` and :class:`radical.pilot.ComputeUnit` instances. Each Session has a unique identifier :data:`radical.pilot.Session.uid` that can be used to re-connect to a RADICAL-Pilot instance in the database. **Example**:: s1 = radical.pilot.Session(database_url=DBURL) s2 = radical.pilot.Session(database_url=DBURL, session_uid=s1.uid) # s1 and s2 are pointing to the same session assert s1.uid == s2.uid """ #--------------------------------------------------------------------------- #
[docs] def __init__ (self, database_url=None, database_name="radicalpilot", session_uid=None): """Creates a new or reconnects to an exising session. If called without a session_uid, a new Session instance is created and stored in the database. If session_uid is set, an existing session is retrieved from the database. **Arguments:** * **database_url** (`string`): The MongoDB URL. If none is given, RP uses the environment variable RADICAL_PILOT_DBURL. If that is not set, an error will be raises. * **database_name** (`string`): An alternative database name (default: 'radicalpilot'). * **session_uid** (`string`): If session_uid is set, we try re-connect to an existing session instead of creating a new one. **Returns:** * A new Session instance. **Raises:** * :class:`radical.pilot.DatabaseError` """ # init the base class inits saga.Session.__init__ (self) Object.__init__ (self) # before doing anything else, set up the debug helper for the lifetime # of the session. self._debug_helper = ru.DebugHelper () # Dictionaries holding all manager objects created during the session. self._pilot_manager_objects = list() self._unit_manager_objects = list() # Create a new process registry. All objects belonging to this # session will register their worker processes (if they have any) # in this registry. This makes it easier to shut down things in # a more coordinate fashion. self._process_registry = _ProcessRegistry() # The resource configuration dictionary associated with the session. self._resource_configs = {} self._database_url = database_url self._database_name = database_name if not self._database_url : self._database_url = os.getenv ("RADICAL_PILOT_DBURL", None) if not self._database_url : raise PilotException ("no database URL (set RADICAL_PILOT_DBURL)") logger.info("using database url %s" % self._database_url) # if the database url contains a path element, we interpret that as # database name (without the leading slash) tmp_url = ru.Url (self._database_url) if tmp_url.path and \ tmp_url.path[0] == '/' and \ len(tmp_url.path) > 1 : self._database_name = tmp_url.path[1:] logger.info("using database path %s" % self._database_name) else : logger.info("using database name %s" % self._database_name) # Loading all "default" resource configurations module_path = os.path.dirname(os.path.abspath(__file__)) default_cfgs = "%s/configs/*.json" % module_path config_files = glob.glob(default_cfgs) for config_file in config_files: rcs = ResourceConfig.from_file(config_file) if rcs : for rc in rcs: logger.info("Loaded resource configurations for %s" % rc) self._resource_configs[rc] = rcs[rc].as_dict() user_cfgs = "%s/.radical/pilot/configs/*.json" % os.environ.get ('HOME') config_files = glob.glob(user_cfgs) for config_file in config_files: rcs = ResourceConfig.from_file(config_file) if rcs : for rc in rcs: logger.info("Loaded resource configurations for %s" % rc) if rc in self._resource_configs : # config exists -- merge user config into it ru.dict_merge (self._resource_configs[rc], rcs[rc].as_dict(), policy='overwrite') else : # new config -- add as is self._resource_configs[rc] = rcs[rc].as_dict() default_aliases = "%s/configs/aliases.json" % module_path self._resource_aliases = ru.read_json_str (default_aliases)['aliases'] ########################## ## CREATE A NEW SESSION ## ########################## if session_uid is None: try: self._uid = str(ObjectId()) self._last_reconnect = None self._dbs, self._created, self._connection_info = \ dbSession.new(sid=self._uid, db_url=self._database_url, db_name=database_name) logger.info("New Session created%s." % str(self)) except Exception, ex: raise PilotException("Couldn't create new session (database URL '%s' incorrect?): %s" \ % (self._database_url, ex)) ###################################### ## RECONNECT TO AN EXISTING SESSION ## ###################################### else: try: self._uid = session_uid # otherwise, we reconnect to an existing session self._dbs, session_info, self._connection_info = \ dbSession.reconnect(sid=self._uid, db_url=self._database_url, db_name=database_name) self._created = session_info["created"] self._last_reconnect = session_info["last_reconnect"] logger.info("Reconnected to existing Session %s." % str(self)) except Exception, ex: raise PilotException("Couldn't re-connect to session: %s" % ex) #--------------------------------------------------------------------------- #
[docs] def close(self, cleanup=True, terminate=True, delete=None): """Closes the session. All subsequent attempts access objects attached to the session will result in an error. If cleanup is set to True (default) the session data is removed from the database. **Arguments:** * **cleanup** (`bool`): Remove session from MongoDB (implies * terminate) * **terminate** (`bool`): Shut down all pilots associated with the session. **Raises:** * :class:`radical.pilot.IncorrectState` if the session is closed or doesn't exist. """ logger.error("session %s closing" % (str(self._uid))) uid = self._uid if not self._uid: logger.error("Session object already closed.") return # we keep 'delete' for backward compatibility. If it was set, and the # other flags (cleanup, terminate) are as defaulted (True), then delete # will supercede them. Delete is considered deprecated though, and # we'll thus issue a warning. if delete != None: if cleanup == True and terminate == True : cleanup = delete terminate = delete logger.warning("'delete' flag on session is deprecated. " \ "Please use 'cleanup' and 'terminate' instead!") if cleanup : # cleanup implies terminate terminate = True for pmgr in self._pilot_manager_objects: logger.error("session %s closes pmgr %s" % (str(self._uid), pmgr._uid)) pmgr.close (terminate=terminate) logger.error("session %s closed pmgr %s" % (str(self._uid), pmgr._uid)) for umgr in self._unit_manager_objects: logger.error("session %s closes umgr %s" % (str(self._uid), umgr._uid)) umgr.close() logger.error("session %s closed umgr %s" % (str(self._uid), umgr._uid)) if cleanup : self._destroy_db_entry() logger.error("session %s closed" % (str(self._uid))) #--------------------------------------------------------------------------- #
[docs] def as_dict(self): """Returns a Python dictionary representation of the object. """ object_dict = { "uid": self._uid, "created": self._created, "last_reconnect": self._last_reconnect, "database_name": self._connection_info.dbname, "database_auth": self._connection_info.dbauth, "database_url" : self._connection_info.dburl } return object_dict #--------------------------------------------------------------------------- #
def __str__(self): """Returns a string representation of the object. """ return str(self.as_dict()) #--------------------------------------------------------------------------- # @property def created(self): """Returns the UTC date and time the session was created. """ self._assert_obj_is_valid() return self._created #--------------------------------------------------------------------------- # @property def last_reconnect(self): """Returns the most recent UTC date and time the session was reconnected to. """ self._assert_obj_is_valid() return self._last_reconnect #--------------------------------------------------------------------------- # def _destroy_db_entry(self): """Terminates the session and removes it from the database. All subsequent attempts access objects attached to the session and attempts to re-connect to the session via its uid will result in an error. **Raises:** * :class:`radical.pilot.IncorrectState` if the session is closed or doesn't exist. """ self._assert_obj_is_valid() self._dbs.delete() logger.info("Deleted session %s from database." % self._uid) self._uid = None #--------------------------------------------------------------------------- #
[docs] def list_pilot_managers(self): """Lists the unique identifiers of all :class:`radical.pilot.PilotManager` instances associated with this session. **Example**:: s = radical.pilot.Session(database_url=DBURL) for pm_uid in s.list_pilot_managers(): pm = radical.pilot.PilotManager(session=s, pilot_manager_uid=pm_uid) **Returns:** * A list of :class:`radical.pilot.PilotManager` uids (`list` oif strings`). **Raises:** * :class:`radical.pilot.IncorrectState` if the session is closed or doesn't exist. """ self._assert_obj_is_valid() return self._dbs.list_pilot_manager_uids() # -------------------------------------------------------------------------- #
[docs] def get_pilot_managers(self, pilot_manager_ids=None) : """ Re-connects to and returns one or more existing PilotManager(s). **Arguments:** * **session** [:class:`radical.pilot.Session`]: The session instance to use. * **pilot_manager_uid** [`string`]: The unique identifier of the PilotManager we want to re-connect to. **Returns:** * One or more new [:class:`radical.pilot.PilotManager`] objects. **Raises:** * :class:`radical.pilot.pilotException` if a PilotManager with `pilot_manager_uid` doesn't exist in the database. """ self._assert_obj_is_valid() return_scalar = False if pilot_manager_ids is None: pilot_manager_ids = self.list_pilot_managers() elif not isinstance(pilot_manager_ids, list): pilot_manager_ids = [pilot_manager_ids] return_scalar = True pilot_manager_objects = [] for pilot_manager_id in pilot_manager_ids: pilot_manager = PilotManager._reconnect(session=self, pilot_manager_id=pilot_manager_id) pilot_manager_objects.append(pilot_manager) self._pilot_manager_objects.append(pilot_manager) if return_scalar is True: pilot_manager_objects = pilot_manager_objects[0] return pilot_manager_objects #--------------------------------------------------------------------------- #
[docs] def list_unit_managers(self): """Lists the unique identifiers of all :class:`radical.pilot.UnitManager` instances associated with this session. **Example**:: s = radical.pilot.Session(database_url=DBURL) for pm_uid in s.list_unit_managers(): pm = radical.pilot.PilotManager(session=s, pilot_manager_uid=pm_uid) **Returns:** * A list of :class:`radical.pilot.UnitManager` uids (`list` of `strings`). **Raises:** * :class:`radical.pilot.IncorrectState` if the session is closed or doesn't exist. """ self._assert_obj_is_valid() return self._dbs.list_unit_manager_uids() # -------------------------------------------------------------------------- #
[docs] def get_unit_managers(self, unit_manager_ids=None) : """ Re-connects to and returns one or more existing UnitManager(s). **Arguments:** * **session** [:class:`radical.pilot.Session`]: The session instance to use. * **pilot_manager_uid** [`string`]: The unique identifier of the PilotManager we want to re-connect to. **Returns:** * One or more new [:class:`radical.pilot.PilotManager`] objects. **Raises:** * :class:`radical.pilot.pilotException` if a PilotManager with `pilot_manager_uid` doesn't exist in the database. """ self._assert_obj_is_valid() return_scalar = False if unit_manager_ids is None: unit_manager_ids = self.list_unit_managers() elif not isinstance(unit_manager_ids, list): unit_manager_ids = [unit_manager_ids] return_scalar = True unit_manager_objects = [] for unit_manager_id in unit_manager_ids: unit_manager = UnitManager._reconnect(session=self, unit_manager_id=unit_manager_id) unit_manager_objects.append(unit_manager) self._unit_manager_objects.append(unit_manager) if return_scalar is True: unit_manager_objects = unit_manager_objects[0] return unit_manager_objects # ------------------------------------------------------------------------- #
[docs] def add_resource_config(self, resource_config): """Adds a new :class:`radical.pilot.ResourceConfig` to the PilotManager's dictionary of known resources, or accept a string which points to a configuration file. For example:: rc = radical.pilot.ResourceConfig rc.name = "mycluster" rc.job_manager_endpoint = "ssh+pbs://mycluster rc.filesystem_endpoint = "sftp://mycluster rc.default_queue = "private" rc.bootstrapper = "default_bootstrapper.sh" pm = radical.pilot.PilotManager(session=s) pm.add_resource_config(rc) pd = radical.pilot.ComputePilotDescription() pd.resource = "mycluster" pd.cores = 16 pd.runtime = 5 # minutes pilot = pm.submit_pilots(pd) """ if isinstance (resource_config, basestring) : rcs = ResourceConfig.from_file(resource_config) if rcs : for rc in rcs: logger.info("Loaded resource configurations for %s" % rc) self._resource_configs[rc] = rcs[rc].as_dict() else : self._resource_configs [resource_config.name] = resource_config.as_dict() # ------------------------------------------------------------------------- #
[docs] def get_resource_config (self, resource_key): """Returns a dictionary of the requested resource config """ if resource_key in self._resource_aliases : logger.warning ("using alias '%s' for deprecated resource key '%s'" \ % (self._resource_aliases[resource_key], resource_key)) resource_key = self._resource_aliases[resource_key] if resource_key not in self._resource_configs: error_msg = "Resource key '%s' is not known." % resource_key raise PilotException(error_msg) return self._resource_configs[resource_key]