#pylint: disable=C0301, C0103, W0212
"""
.. module:: radical.pilot.session
:platform: Unix
:synopsis: Implementation of the Session class.
.. moduleauthor:: Ole Weidner <ole.weidner@rutgers.edu>
"""
__copyright__ = "Copyright 2013-2014, http://radical.rutgers.edu"
__license__ = "MIT"
import os
import glob
import saga
import radical.utils as ru
from radical.pilot.object import Object
from radical.pilot.unit_manager import UnitManager
from radical.pilot.pilot_manager import PilotManager
from radical.pilot.utils.logger import logger
from radical.pilot.resource_config import ResourceConfig
from radical.pilot.exceptions import PilotException
from radical.pilot.db import Session as dbSession
from radical.pilot.db import DBException
from bson.objectid import ObjectId
# ------------------------------------------------------------------------------
#
class _ProcessRegistry(object):
"""A _ProcessRegistry contains a dictionary of all worker processes
that are currently active.
"""
def __init__(self):
self._dict = dict()
def register(self, key, process):
"""Add a new process to the registry.
"""
if key not in self._dict:
self._dict[key] = process
def retrieve(self, key):
"""Retrieve a process from the registry.
"""
if key not in self._dict:
return None
else:
return self._dict[key]
def keys(self):
"""List all keys of all process in the registry.
"""
return self._dict.keys()
def remove(self, key):
"""Remove a process from the registry.
"""
if key in self._dict:
del self._dict[key]
# ------------------------------------------------------------------------------
#
[docs]class Session (saga.Session, Object):
"""A Session encapsulates a RADICAL-Pilot instance and is the *root* object
for all other RADICAL-Pilot objects.
A Session holds :class:`radical.pilot.PilotManager` and :class:`radical.pilot.UnitManager`
instances which in turn hold :class:`radical.pilot.Pilot` and
:class:`radical.pilot.ComputeUnit` instances.
Each Session has a unique identifier :data:`radical.pilot.Session.uid` that can be
used to re-connect to a RADICAL-Pilot instance in the database.
**Example**::
s1 = radical.pilot.Session(database_url=DBURL)
s2 = radical.pilot.Session(database_url=DBURL, session_uid=s1.uid)
# s1 and s2 are pointing to the same session
assert s1.uid == s2.uid
"""
#---------------------------------------------------------------------------
#
[docs] def __init__ (self, database_url=None, database_name="radicalpilot", session_uid=None):
"""Creates a new or reconnects to an exising session.
If called without a session_uid, a new Session instance is created and
stored in the database. If session_uid is set, an existing session is
retrieved from the database.
**Arguments:**
* **database_url** (`string`): The MongoDB URL. If none is given,
RP uses the environment variable RADICAL_PILOT_DBURL. If that is
not set, an error will be raises.
* **database_name** (`string`): An alternative database name
(default: 'radicalpilot').
* **session_uid** (`string`): If session_uid is set, we try
re-connect to an existing session instead of creating a new one.
**Returns:**
* A new Session instance.
**Raises:**
* :class:`radical.pilot.DatabaseError`
"""
# init the base class inits
saga.Session.__init__ (self)
Object.__init__ (self)
# before doing anything else, set up the debug helper for the lifetime
# of the session.
self._debug_helper = ru.DebugHelper ()
# Dictionaries holding all manager objects created during the session.
self._pilot_manager_objects = list()
self._unit_manager_objects = list()
# Create a new process registry. All objects belonging to this
# session will register their worker processes (if they have any)
# in this registry. This makes it easier to shut down things in
# a more coordinate fashion.
self._process_registry = _ProcessRegistry()
# The resource configuration dictionary associated with the session.
self._resource_configs = {}
self._database_url = database_url
self._database_name = database_name
if not self._database_url :
self._database_url = os.getenv ("RADICAL_PILOT_DBURL", None)
if not self._database_url :
raise PilotException ("no database URL (set RADICAL_PILOT_DBURL)")
logger.info("using database url %s" % self._database_url)
# if the database url contains a path element, we interpret that as
# database name (without the leading slash)
tmp_url = ru.Url (self._database_url)
if tmp_url.path and \
tmp_url.path[0] == '/' and \
len(tmp_url.path) > 1 :
self._database_name = tmp_url.path[1:]
logger.info("using database path %s" % self._database_name)
else :
logger.info("using database name %s" % self._database_name)
# Loading all "default" resource configurations
module_path = os.path.dirname(os.path.abspath(__file__))
default_cfgs = "%s/configs/*.json" % module_path
config_files = glob.glob(default_cfgs)
for config_file in config_files:
rcs = ResourceConfig.from_file(config_file)
if rcs :
for rc in rcs:
logger.info("Loaded resource configurations for %s" % rc)
self._resource_configs[rc] = rcs[rc].as_dict()
user_cfgs = "%s/.radical/pilot/configs/*.json" % os.environ.get ('HOME')
config_files = glob.glob(user_cfgs)
for config_file in config_files:
rcs = ResourceConfig.from_file(config_file)
if rcs :
for rc in rcs:
logger.info("Loaded resource configurations for %s" % rc)
if rc in self._resource_configs :
# config exists -- merge user config into it
ru.dict_merge (self._resource_configs[rc],
rcs[rc].as_dict(),
policy='overwrite')
else :
# new config -- add as is
self._resource_configs[rc] = rcs[rc].as_dict()
default_aliases = "%s/configs/aliases.json" % module_path
self._resource_aliases = ru.read_json_str (default_aliases)['aliases']
##########################
## CREATE A NEW SESSION ##
##########################
if session_uid is None:
try:
self._uid = str(ObjectId())
self._last_reconnect = None
self._dbs, self._created, self._connection_info = \
dbSession.new(sid=self._uid,
db_url=self._database_url,
db_name=database_name)
logger.info("New Session created%s." % str(self))
except Exception, ex:
raise PilotException("Couldn't create new session (database URL '%s' incorrect?): %s" \
% (self._database_url, ex))
######################################
## RECONNECT TO AN EXISTING SESSION ##
######################################
else:
try:
self._uid = session_uid
# otherwise, we reconnect to an existing session
self._dbs, session_info, self._connection_info = \
dbSession.reconnect(sid=self._uid,
db_url=self._database_url,
db_name=database_name)
self._created = session_info["created"]
self._last_reconnect = session_info["last_reconnect"]
logger.info("Reconnected to existing Session %s." % str(self))
except Exception, ex:
raise PilotException("Couldn't re-connect to session: %s" % ex)
#---------------------------------------------------------------------------
#
[docs] def close(self, cleanup=True, terminate=True, delete=None):
"""Closes the session.
All subsequent attempts access objects attached to the session will
result in an error. If cleanup is set to True (default) the session
data is removed from the database.
**Arguments:**
* **cleanup** (`bool`): Remove session from MongoDB (implies * terminate)
* **terminate** (`bool`): Shut down all pilots associated with the session.
**Raises:**
* :class:`radical.pilot.IncorrectState` if the session is closed
or doesn't exist.
"""
logger.error("session %s closing" % (str(self._uid)))
uid = self._uid
if not self._uid:
logger.error("Session object already closed.")
return
# we keep 'delete' for backward compatibility. If it was set, and the
# other flags (cleanup, terminate) are as defaulted (True), then delete
# will supercede them. Delete is considered deprecated though, and
# we'll thus issue a warning.
if delete != None:
if cleanup == True and terminate == True :
cleanup = delete
terminate = delete
logger.warning("'delete' flag on session is deprecated. " \
"Please use 'cleanup' and 'terminate' instead!")
if cleanup :
# cleanup implies terminate
terminate = True
for pmgr in self._pilot_manager_objects:
logger.error("session %s closes pmgr %s" % (str(self._uid), pmgr._uid))
pmgr.close (terminate=terminate)
logger.error("session %s closed pmgr %s" % (str(self._uid), pmgr._uid))
for umgr in self._unit_manager_objects:
logger.error("session %s closes umgr %s" % (str(self._uid), umgr._uid))
umgr.close()
logger.error("session %s closed umgr %s" % (str(self._uid), umgr._uid))
if cleanup :
self._destroy_db_entry()
logger.error("session %s closed" % (str(self._uid)))
#---------------------------------------------------------------------------
#
[docs] def as_dict(self):
"""Returns a Python dictionary representation of the object.
"""
object_dict = {
"uid": self._uid,
"created": self._created,
"last_reconnect": self._last_reconnect,
"database_name": self._connection_info.dbname,
"database_auth": self._connection_info.dbauth,
"database_url" : self._connection_info.dburl
}
return object_dict
#---------------------------------------------------------------------------
#
def __str__(self):
"""Returns a string representation of the object.
"""
return str(self.as_dict())
#---------------------------------------------------------------------------
#
@property
def created(self):
"""Returns the UTC date and time the session was created.
"""
self._assert_obj_is_valid()
return self._created
#---------------------------------------------------------------------------
#
@property
def last_reconnect(self):
"""Returns the most recent UTC date and time the session was
reconnected to.
"""
self._assert_obj_is_valid()
return self._last_reconnect
#---------------------------------------------------------------------------
#
def _destroy_db_entry(self):
"""Terminates the session and removes it from the database.
All subsequent attempts access objects attached to the session and
attempts to re-connect to the session via its uid will result in
an error.
**Raises:**
* :class:`radical.pilot.IncorrectState` if the session is closed
or doesn't exist.
"""
self._assert_obj_is_valid()
self._dbs.delete()
logger.info("Deleted session %s from database." % self._uid)
self._uid = None
#---------------------------------------------------------------------------
#
[docs] def list_pilot_managers(self):
"""Lists the unique identifiers of all :class:`radical.pilot.PilotManager`
instances associated with this session.
**Example**::
s = radical.pilot.Session(database_url=DBURL)
for pm_uid in s.list_pilot_managers():
pm = radical.pilot.PilotManager(session=s, pilot_manager_uid=pm_uid)
**Returns:**
* A list of :class:`radical.pilot.PilotManager` uids (`list` oif strings`).
**Raises:**
* :class:`radical.pilot.IncorrectState` if the session is closed
or doesn't exist.
"""
self._assert_obj_is_valid()
return self._dbs.list_pilot_manager_uids()
# --------------------------------------------------------------------------
#
[docs] def get_pilot_managers(self, pilot_manager_ids=None) :
""" Re-connects to and returns one or more existing PilotManager(s).
**Arguments:**
* **session** [:class:`radical.pilot.Session`]:
The session instance to use.
* **pilot_manager_uid** [`string`]:
The unique identifier of the PilotManager we want
to re-connect to.
**Returns:**
* One or more new [:class:`radical.pilot.PilotManager`] objects.
**Raises:**
* :class:`radical.pilot.pilotException` if a PilotManager with
`pilot_manager_uid` doesn't exist in the database.
"""
self._assert_obj_is_valid()
return_scalar = False
if pilot_manager_ids is None:
pilot_manager_ids = self.list_pilot_managers()
elif not isinstance(pilot_manager_ids, list):
pilot_manager_ids = [pilot_manager_ids]
return_scalar = True
pilot_manager_objects = []
for pilot_manager_id in pilot_manager_ids:
pilot_manager = PilotManager._reconnect(session=self, pilot_manager_id=pilot_manager_id)
pilot_manager_objects.append(pilot_manager)
self._pilot_manager_objects.append(pilot_manager)
if return_scalar is True:
pilot_manager_objects = pilot_manager_objects[0]
return pilot_manager_objects
#---------------------------------------------------------------------------
#
[docs] def list_unit_managers(self):
"""Lists the unique identifiers of all :class:`radical.pilot.UnitManager`
instances associated with this session.
**Example**::
s = radical.pilot.Session(database_url=DBURL)
for pm_uid in s.list_unit_managers():
pm = radical.pilot.PilotManager(session=s, pilot_manager_uid=pm_uid)
**Returns:**
* A list of :class:`radical.pilot.UnitManager` uids (`list` of `strings`).
**Raises:**
* :class:`radical.pilot.IncorrectState` if the session is closed
or doesn't exist.
"""
self._assert_obj_is_valid()
return self._dbs.list_unit_manager_uids()
# --------------------------------------------------------------------------
#
[docs] def get_unit_managers(self, unit_manager_ids=None) :
""" Re-connects to and returns one or more existing UnitManager(s).
**Arguments:**
* **session** [:class:`radical.pilot.Session`]:
The session instance to use.
* **pilot_manager_uid** [`string`]:
The unique identifier of the PilotManager we want
to re-connect to.
**Returns:**
* One or more new [:class:`radical.pilot.PilotManager`] objects.
**Raises:**
* :class:`radical.pilot.pilotException` if a PilotManager with
`pilot_manager_uid` doesn't exist in the database.
"""
self._assert_obj_is_valid()
return_scalar = False
if unit_manager_ids is None:
unit_manager_ids = self.list_unit_managers()
elif not isinstance(unit_manager_ids, list):
unit_manager_ids = [unit_manager_ids]
return_scalar = True
unit_manager_objects = []
for unit_manager_id in unit_manager_ids:
unit_manager = UnitManager._reconnect(session=self, unit_manager_id=unit_manager_id)
unit_manager_objects.append(unit_manager)
self._unit_manager_objects.append(unit_manager)
if return_scalar is True:
unit_manager_objects = unit_manager_objects[0]
return unit_manager_objects
# -------------------------------------------------------------------------
#
[docs] def add_resource_config(self, resource_config):
"""Adds a new :class:`radical.pilot.ResourceConfig` to the PilotManager's
dictionary of known resources, or accept a string which points to
a configuration file.
For example::
rc = radical.pilot.ResourceConfig
rc.name = "mycluster"
rc.job_manager_endpoint = "ssh+pbs://mycluster
rc.filesystem_endpoint = "sftp://mycluster
rc.default_queue = "private"
rc.bootstrapper = "default_bootstrapper.sh"
pm = radical.pilot.PilotManager(session=s)
pm.add_resource_config(rc)
pd = radical.pilot.ComputePilotDescription()
pd.resource = "mycluster"
pd.cores = 16
pd.runtime = 5 # minutes
pilot = pm.submit_pilots(pd)
"""
if isinstance (resource_config, basestring) :
rcs = ResourceConfig.from_file(resource_config)
if rcs :
for rc in rcs:
logger.info("Loaded resource configurations for %s" % rc)
self._resource_configs[rc] = rcs[rc].as_dict()
else :
self._resource_configs [resource_config.name] = resource_config.as_dict()
# -------------------------------------------------------------------------
#
[docs] def get_resource_config (self, resource_key):
"""Returns a dictionary of the requested resource config
"""
if resource_key in self._resource_aliases :
logger.warning ("using alias '%s' for deprecated resource key '%s'" \
% (self._resource_aliases[resource_key], resource_key))
resource_key = self._resource_aliases[resource_key]
if resource_key not in self._resource_configs:
error_msg = "Resource key '%s' is not known." % resource_key
raise PilotException(error_msg)
return self._resource_configs[resource_key]