Source code for radical.pilot.compute_pilot


__copyright__ = "Copyright 2013-2016, http://radical.rutgers.edu"
__license__   = "MIT"


import os
import sys
import copy
import time
import threading

import radical.utils as ru

from . import utils     as rpu
from . import states    as rps
from . import constants as rpc
from . import types     as rpt


# ------------------------------------------------------------------------------
#
[docs]class ComputePilot(object): """ A ComputePilot represent a resource overlay on a local or remote resource. .. note:: A ComputePilot cannot be created directly. The factory method :meth:`radical.pilot.PilotManager.submit_pilots` has to be used instead. **Example**:: pm = radical.pilot.PilotManager(session=s) pd = radical.pilot.ComputePilotDescription() pd.resource = "local.localhost" pd.cores = 2 pd.runtime = 5 # minutes pilot = pm.submit_pilots(pd) """ # -------------------------------------------------------------------------- # In terms of implementation, a Pilot is not much more than a dict whose # content are dynamically updated to reflect the state progression through # the PMGR components. As a Pilot is always created via a PMGR, it is # considered to *belong* to that PMGR, and all activities are actually # implemented by that PMGR. # # Note that this implies that we could create Pilots before submitting them # to a PMGR, w/o any problems. (FIXME?) # -------------------------------------------------------------------------- # -------------------------------------------------------------------------- # def __init__(self, pmgr, descr): # 'static' members self._descr = descr.as_dict() # sanity checks on description for check in ['resource', 'cores', 'runtime']: if not self._descr.get(check): raise ValueError("ComputePilotDescription needs '%s'" % check) # initialize state self._pmgr = pmgr self._session = self._pmgr.session self._prof = self._session._prof self._uid = ru.generate_id('pilot.%(counter)04d', ru.ID_CUSTOM) self._state = rps.NEW self._log = pmgr._log self._pilot_dict = dict() self._callbacks = dict() self._cache = dict() # cache of SAGA dir handles self._cb_lock = threading.RLock() self._exit_on_error = self._descr.get('exit_on_error') for m in rpt.PMGR_METRICS: self._callbacks[m] = dict() # we always invoke the default state cb self._callbacks[rpt.PILOT_STATE][self._default_state_cb.__name__] = { 'cb' : self._default_state_cb, 'cb_data' : None} # `as_dict()` needs `pilot_dict` and other attributes. Those should all # be available at this point (apart from the sandboxes), so we now # query for those sandboxes. self._pilot_jsurl = ru.Url() self._pilot_jshop = ru.Url() self._resource_sandbox = ru.Url() self._pilot_sandbox = ru.Url() self._client_sandbox = ru.Url() self._log.debug(' ===== 1: %s [%s]', self._pilot_sandbox, type(self._pilot_sandbox)) pilot = self.as_dict() self._log.debug(' ===== 2: %s [%s]', pilot['pilot_sandbox'], type(pilot['pilot_sandbox'])) self._pilot_jsurl, self._pilot_jshop \ = self._session._get_jsurl (pilot) self._resource_sandbox = self._session._get_resource_sandbox(pilot) self._pilot_sandbox = self._session._get_pilot_sandbox (pilot) self._client_sandbox = self._session._get_client_sandbox() self._log.debug(' ===== 3: %s [%s]', self._pilot_sandbox, type(self._pilot_sandbox)) # -------------------------------------------------------------------------- # def __repr__(self): return str(self) # -------------------------------------------------------------------------- # def __str__(self): return str([self.uid, self.resource, self.state]) # -------------------------------------------------------------------------- # def _default_state_cb(self, pilot, state): self._log.info("[Callback]: pilot %s state: %s.", self.uid, self.state) if self.state == rps.FAILED and self._exit_on_error: self._log.error("[Callback]: pilot '%s' failed (exit on error)", self.uid) # FIXME: how to tell main? Where are we in the first place? # ru.cancel_main_thread('int') raise RuntimeError('pilot %s failed - fatal!' % self.uid) # sys.exit() # -------------------------------------------------------------------------- # def _update(self, pilot_dict): """ This will update the facade object after state changes etc, and is invoked by whatever component receiving that updated information. Return True if state changed, False otherwise """ if pilot_dict['uid'] != self.uid: self._log.error('incorrect uid: %s / %s', pilot_dict['uid'], self.uid) assert(pilot_dict['uid'] == self.uid), 'update called on wrong instance' # NOTE: this method relies on state updates to arrive in order, and # without gaps. current = self.state target = pilot_dict['state'] if target not in [rps.FAILED, rps.CANCELED]: try: assert(rps._pilot_state_value(target) - rps._pilot_state_value(current)), \ 'invalid state transition' except: self._log.error('%s: invalid state transition %s -> %s', self.uid, current, target) raise # FIXME self._state = target # keep all information around self._pilot_dict = copy.deepcopy(pilot_dict) # invoke pilot specific callbacks # FIXME: this iteration needs to be thread-locked! for cb_name, cb_val in self._callbacks[rpt.PILOT_STATE].iteritems(): cb = cb_val['cb'] cb_data = cb_val['cb_data'] # print ' ~~~ call pcbs: %s -> %s : %s' % (self.uid, self.state, cb_name) self._log.debug('%s calls cb %s', self.uid, cb) if cb_data: cb(self, self.state, cb_data) else : cb(self, self.state) # ask pmgr to invoke any global callbacks self._pmgr._call_pilot_callbacks(self, self.state) # -------------------------------------------------------------------------- #
[docs] def as_dict(self): """ Returns a Python dictionary representation of the object. """ ret = { 'session': self.session.uid, 'pmgr': self.pmgr.uid, 'uid': self.uid, 'type': 'pilot', 'state': self.state, 'log': self.log, 'stdout': self.stdout, 'stderr': self.stderr, 'resource': self.resource, 'resource_sandbox': str(self._resource_sandbox), 'pilot_sandbox': str(self._pilot_sandbox), 'client_sandbox': str(self._client_sandbox), 'js_url': str(self._pilot_jsurl), 'js_hop': str(self._pilot_jshop), 'description': self.description, # this is a deep copy 'resource_details': self.resource_details } return ret
# -------------------------------------------------------------------------- # @property def session(self): """ Returns the pilot's session. **Returns:** * A :class:`Session`. """ return self._session # -------------------------------------------------------------------------- # @property def pmgr(self): """ Returns the pilot's manager. **Returns:** * A :class:`PilotManager`. """ return self._pmgr # ------------------------------------------------------------------------- # @property def resource_details(self): """ Returns agent level resource information """ return self._pilot_dict.get('resource_details') # -------------------------------------------------------------------------- # @property def uid(self): """ Returns the pilot's unique identifier. The uid identifies the pilot within a :class:`PilotManager`. **Returns:** * A unique identifier (string). """ return self._uid # -------------------------------------------------------------------------- # @property def state(self): """ Returns the current state of the pilot. **Returns:** * state (string enum) """ return self._state # -------------------------------------------------------------------------- # @property def log(self): """ Returns a list of human readable [timestamp, string] tuples describing various events during the pilot's lifetime. Those strings are not normative, only informative! **Returns:** * log (list of [timestamp, string] tuples) """ return self._pilot_dict.get('log') # -------------------------------------------------------------------------- # @property def stdout(self): """ Returns a snapshot of the pilot's STDOUT stream. If this property is queried before the pilot has reached 'DONE' or 'FAILED' state it will return None. .. warning: This can be inefficient. Output may be incomplete and/or filtered. **Returns:** * stdout (string) """ return self._pilot_dict.get('stdout') # -------------------------------------------------------------------------- # @property def stderr(self): """ Returns a snapshot of the pilot's STDERR stream. If this property is queried before the pilot has reached 'DONE' or 'FAILED' state it will return None. .. warning: This can be inefficient. Output may be incomplete and/or filtered. **Returns:** * stderr (string) """ return self._pilot_dict.get('stderr') # -------------------------------------------------------------------------- # @property def resource(self): """ Returns the resource tag of this pilot. **Returns:** * A resource tag (string) """ return self._descr.get('resource') # -------------------------------------------------------------------------- # @property def pilot_sandbox(self): """ Returns the full sandbox URL of this pilot, if that is already known, or 'None' otherwise. **Returns:** * A string """ # NOTE: The pilot has a sandbox property, containing the full sandbox # path, which is used by the pmgr to stage data back and forth. # However, the full path as visible from the pmgr side might not # be what the agent is seeing, specifically in the case of # non-shared filesystems (OSG). The agent thus uses # `$PWD` as sandbox, with the assumption that this will # get mapped to whatever is here returned as sandbox URL. # # There is thus implicit knowledge shared between the RP client # and the RP agent that `$PWD` *is* the sandbox! The same # implicitly also holds for the staging area, which is relative # to the pilot sandbox. if self._pilot_sandbox: return str(self._pilot_sandbox) else: return None @property def resource_sandbox(self): return self._resource_sandbox @property def client_sandbox(self): return self._client_sandbox # -------------------------------------------------------------------------- # @property def description(self): """ Returns the description the pilot was started with, as a dictionary. **Returns:** * description (dict) """ return copy.deepcopy(self._descr) # -------------------------------------------------------------------------- #
[docs] def register_callback(self, cb, metric=rpt.PILOT_STATE, cb_data=None): """ Registers a callback function that is triggered every time the pilot's state changes. All callback functions need to have the same signature:: def cb(obj, state) where ``object`` is a handle to the object that triggered the callback and ``state`` is the new state of that object. If 'cb_data' is given, then the 'cb' signature changes to def cb(obj, state, cb_data) and 'cb_data' are passed along. """ if metric not in rpt.PMGR_METRICS : raise ValueError ("Metric '%s' is not available on the pilot manager" % metric) with self._cb_lock: cb_name = cb.__name__ self._callbacks[metric][cb_name] = {'cb' : cb, 'cb_data' : cb_data}
# -------------------------------------------------------------------------- # def unregister_callback(self, cb, metric=rpt.PILOT_STATE): if metric and metric not in rpt.UMGR_METRICS : raise ValueError ("Metric '%s' is not available on the pilot manager" % metric) if not metric: metrics = rpt.PMGR_METRICS elif isinstance(metric, list): metrics = metric else: metrics = [metric] with self._cb_lock: for metric in metrics: if cb: to_delete = [cb.__name__] else: to_delete = self._callbacks[metric].keys() for cb_name in to_delete: if cb_name not in self._callbacks[metric]: raise ValueError("Callback '%s' is not registered" % cb_name) del(self._callbacks[metric][cb_name]) # -------------------------------------------------------------------------- #
[docs] def wait(self, state=None, timeout=None): """ Returns when the pilot reaches a specific state or when an optional timeout is reached. **Arguments:** * **state** [`list of strings`] The state(s) that pilot has to reach in order for the call to return. By default `wait` waits for the pilot to reach a **final** state, which can be one of the following: * :data:`radical.pilot.states.DONE` * :data:`radical.pilot.states.FAILED` * :data:`radical.pilot.states.CANCELED` * **timeout** [`float`] Optional timeout in seconds before the call returns regardless whether the pilot has reached the desired state or not. The default value **None** never times out. """ if not state: states = rps.FINAL elif not isinstance(state, list): states = [state] else: states = state if self.state in rps.FINAL: # we will never see another state progression. Raise an error # (unless we waited for this) if self.state in states: return # FIXME: do we want a raise here, really? This introduces a race, # really, on application level # raise RuntimeError("can't wait on a pilot in final state") return self.state start_wait = time.time() while self.state not in states: time.sleep(0.1) if timeout and (timeout <= (time.time() - start_wait)): break # if self._pmgr._terminate.is_set(): # break return self.state
# -------------------------------------------------------------------------- #
[docs] def cancel(self): """ Cancel the pilot. """ # clean connection cache try: for key in self._cache: self._cache[key].close() self._cache = dict() except: pass # print 'pilot: cancel' self._pmgr.cancel_pilots(self.uid)
# -------------------------------------------------------------------------- #
[docs] def stage_in(self, directives): ''' Stages the content of the staging directive into the pilot's staging area ''' # This staging request is actually served by the pmgr *launching* # component, because that already has a channel open to the target # resource which we can reuse. We might eventually implement or # interface to a dedicated data movement service though. # send the staging request to the pmg launcher self._pmgr._pilot_staging_input(self.as_dict(), directives)
# ------------------------------------------------------------------------------