Source code for radical.pilot.unit_manager


__copyright__ = "Copyright 2013-2016, http://radical.rutgers.edu"
__license__   = "MIT"


import os
import time
import threading

import radical.utils as ru

from . import utils     as rpu
from . import states    as rps
from . import constants as rpc
from . import types     as rpt

from . import compute_unit_description as rpcud

from .umgr import scheduler as rpus


# ------------------------------------------------------------------------------
#
[docs]class UnitManager(rpu.Component): """ A UnitManager manages :class:`radical.pilot.ComputeUnit` instances which represent the **executable** workload in RADICAL-Pilot. A UnitManager connects the ComputeUnits with one or more :class:`Pilot` instances (which represent the workload **executors** in RADICAL-Pilot) and a **scheduler** which determines which :class:`ComputeUnit` gets executed on which :class:`Pilot`. **Example**:: s = rp.Session(database_url=DBURL) pm = rp.PilotManager(session=s) pd = rp.ComputePilotDescription() pd.resource = "futuregrid.alamo" pd.cores = 16 p1 = pm.submit_pilots(pd) # create first pilot with 16 cores p2 = pm.submit_pilots(pd) # create second pilot with 16 cores # Create a workload of 128 '/bin/sleep' compute units compute_units = [] for unit_count in range(0, 128): cu = rp.ComputeUnitDescription() cu.executable = "/bin/sleep" cu.arguments = ['60'] compute_units.append(cu) # Combine the two pilots, the workload and a scheduler via # a UnitManager. um = rp.UnitManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN) um.add_pilot(p1) um.submit_units(compute_units) The unit manager can issue notification on unit state changes. Whenever state notification arrives, any callback registered for that notification is fired. NOTE: State notifications can arrive out of order wrt the unit state model! """ # -------------------------------------------------------------------------- #
[docs] def __init__(self, session, scheduler=None): """ Creates a new UnitManager and attaches it to the session. **Arguments:** * session [:class:`radical.pilot.Session`]: The session instance to use. * scheduler (`string`): The name of the scheduler plug-in to use. **Returns:** * A new `UnitManager` object [:class:`radical.pilot.UnitManager`]. """ self._bridges = dict() self._components = dict() self._pilots = dict() self._pilots_lock = threading.RLock() self._units = dict() self._units_lock = threading.RLock() self._callbacks = dict() self._cb_lock = threading.RLock() self._terminate = threading.Event() self._closed = False self._rec_id = 0 # used for session recording for m in rpt.UMGR_METRICS: self._callbacks[m] = dict() cfg = ru.read_json("%s/configs/umgr_%s.json" \ % (os.path.dirname(__file__), os.environ.get('RADICAL_PILOT_UMGR_CFG', 'default'))) if scheduler: # overwrite the scheduler from the config file cfg['scheduler'] = scheduler if not cfg.get('scheduler'): # set default scheduler if needed cfg['scheduler'] = rpus.SCHEDULER_DEFAULT assert(cfg['db_poll_sleeptime']), 'db_poll_sleeptime not configured' # initialize the base class (with no intent to fork) self._uid = ru.generate_id('umgr') cfg['owner'] = self.uid rpu.Component.__init__(self, cfg, session) self.start(spawn=False) self._log.info('started umgr %s', self._uid) # only now we have a logger... :/ self._rep.info('<<create unit manager') # The output queue is used to forward submitted units to the # scheduling component. self.register_output(rps.UMGR_SCHEDULING_PENDING, rpc.UMGR_SCHEDULING_QUEUE) # the umgr will also collect units from the agent again, for output # staging and finalization self.register_output(rps.UMGR_STAGING_OUTPUT_PENDING, rpc.UMGR_STAGING_OUTPUT_QUEUE) # register the state notification pull cb # FIXME: this should be a tailing cursor in the update worker self.register_timed_cb(self._state_pull_cb, timer=self._cfg['db_poll_sleeptime']) # register callback which pulls units back from agent # FIXME: this should be a tailing cursor in the update worker self.register_timed_cb(self._unit_pull_cb, timer=self._cfg['db_poll_sleeptime']) # also listen to the state pubsub for unit state changes self.register_subscriber(rpc.STATE_PUBSUB, self._state_sub_cb) # let session know we exist self._session._register_umgr(self) self._prof.prof('setup_done', uid=self._uid) self._rep.ok('>>ok\n')
# -------------------------------------------------------------------------- # def initialize_common(self): # the manager must not carry bridge and component handles across forks ru.atfork(self._atfork_prepare, self._atfork_parent, self._atfork_child) # -------------------------------------------------------------------------- # def _atfork_prepare(self): pass def _atfork_parent(self) : pass def _atfork_child(self) : self._bridges = dict() self._components = dict() # -------------------------------------------------------------------------- # def finalize_parent(self): # terminate umgr components for c in self._components: c.stop() c.join() # terminate umgr bridges for b in self._bridges: b.stop() b.join() # -------------------------------------------------------------------------- #
[docs] def close(self): """ Shut down the UnitManager, and all umgr components. """ # we do not cancel units at this point, in case any component or pilot # wants to continue to progress unit states, which should indeed be # independent from the umgr life cycle. if self._closed: return self._terminate.set() self.stop() self._rep.info('<<close unit manager') # we don't want any callback invokations during shutdown # FIXME: really? with self._cb_lock: self._callbacks = dict() for m in rpt.UMGR_METRICS: self._callbacks[m] = dict() self._log.info("Closed UnitManager %s." % self._uid) self._closed = True self._rep.ok('>>ok\n')
# -------------------------------------------------------------------------- #
[docs] def is_valid(self, term=True): # don't check during termination if self._closed: return True return super(UnitManager, self).is_valid(term)
# -------------------------------------------------------------------------- #
[docs] def as_dict(self): """ Returns a dictionary representation of the UnitManager object. """ ret = { 'uid': self.uid, 'cfg': self.cfg } return ret
# -------------------------------------------------------------------------- # def __str__(self): """ Returns a string representation of the UnitManager object. """ return str(self.as_dict()) #--------------------------------------------------------------------------- # def _pilot_state_cb(self, pilot, state): if self._terminate.is_set(): return False # we register this callback for pilots added to this umgr. It will # specifically look out for pilots which complete, and will make sure # that all units are pulled back into umgr control if that happens # prematurely. # # If we find units which have not completed the agent part of the unit # state model, we declare them FAILED. If they can be restarted, we # resubmit an identical unit, which then will get a new unit ID. This # avoids state model confusion (the state model is right now expected to # be linear), but is not intuitive for the application (FIXME). # # FIXME: there is a race with the umgr scheduler which may, just now, # and before being notified about the pilot's demise, send new # units to the pilot. # we only look into pilot states when the umgr is still active # FIXME: note that there is a race in that the umgr can be closed while # we are in the cb. # FIXME: should is_valid be used? Either way, `self._closed` is not an # `mt.Event`! if self._closed: self._log.debug('umgr closed, ignore pilot state (%s: %s)', pilot.uid, pilot.state) return True if state in rps.FINAL: self._log.debug('pilot %s is final - pull units', pilot.uid) unit_cursor = self.session._dbs._c.find({ 'type' : 'unit', 'pilot' : pilot.uid, 'umgr' : self.uid, 'control' : {'$in' : ['agent_pending', 'agent']}}) if not unit_cursor.count(): units = list() else: units = list(unit_cursor) self._log.debug("units pulled: %3d (pilot dead)", len(units)) if not units: return True # update the units to avoid pulling them again next time. # NOTE: this needs not locking with the unit pulling in the # _unit_pull_cb, as that will only pull umgr_pending # units. uids = [unit['uid'] for unit in units] self._session._dbs._c.update({'type' : 'unit', 'uid' : {'$in' : uids}}, {'$set' : {'control' : 'umgr'}}, multi=True) to_restart = list() for unit in units: unit['state'] = rps.FAILED if not unit['description'].get('restartable'): self._log.debug('unit %s not restartable', unit['uid']) continue self._log.debug('unit %s is restartable', unit['uid']) unit['restarted'] = True ud = rpcud.ComputeUnitDescription(unit['description']) to_restart.append(ud) # FIXME: increment some restart counter in the description? # FIXME: reference the resulting new uid in the old unit. if to_restart and not self._closed: self._log.debug('restart %s units', len(to_restart)) restarted = self.submit_units(to_restart) for u in restarted: self._log.debug('restart unit %s', u.uid) # final units are not pushed self.advance(units, publish=True, push=False) return True # -------------------------------------------------------------------------- # def _state_pull_cb(self): if self._terminate.is_set(): return False # pull all unit states from the DB, and compare to the states we know # about. If any state changed, update the unit instance and issue # notification callbacks as needed. Do not advance the state (again). # FIXME: we also pull for dead units. That is not efficient... # FIXME: this needs to be converted into a tailed cursor in the update # worker units = self._session._dbs.get_units(umgr_uid=self.uid) for unit in units: if not self._update_unit(unit, publish=True, advance=False): return False return True # -------------------------------------------------------------------------- # def _unit_pull_cb(self): if self._terminate.is_set(): return False # pull units from the agent which are about to get back # under umgr control, and push them into the respective queues # FIXME: this should also be based on a tailed cursor # FIXME: Unfortunately, 'find_and_modify' is not bulkable, so we have # to use 'find'. To avoid finding the same units over and over # again, we update the 'control' field *before* running the next # find -- so we do it right here. unit_cursor = self.session._dbs._c.find({'type' : 'unit', 'umgr' : self.uid, 'control' : 'umgr_pending'}) if not unit_cursor.count(): # no units whatsoever... self._log.info("units pulled: 0") return True # this is not an error # update the units to avoid pulling them again next time. units = list(unit_cursor) uids = [unit['uid'] for unit in units] self._session._dbs._c.update({'type' : 'unit', 'uid' : {'$in' : uids}}, {'$set' : {'control' : 'umgr'}}, multi=True) self._log.info("units pulled: %4d", len(units)) self._prof.prof('get', msg="bulk size: %d" % len(units), uid=self.uid) for unit in units: # we need to make sure to have the correct state: uid = unit['uid'] self._prof.prof('get', uid=uid) old = unit['state'] new = rps._unit_state_collapse(unit['states']) if old != new: self._log.debug("unit pulled %s: %s / %s", uid, old, new) unit['state'] = new unit['control'] = 'umgr' # now we really own the CUs, and can start working on them (ie. push # them into the pipeline). We don't record state transition profile # events though - the transition has already happened. self.advance(units, publish=True, push=True, prof=False) return True # -------------------------------------------------------------------------- # def _state_sub_cb(self, topic, msg): if self._terminate.is_set(): return False cmd = msg.get('cmd') arg = msg.get('arg') if cmd != 'update': self._log.debug('ignore state cb msg with cmd %s', cmd) return True if isinstance(arg, list): things = arg else : things = [arg] for thing in things: if thing.get('type') == 'unit': self._log.debug('umgr state cb for unit: %s', thing['uid']) # we got the state update from the state callback - don't # publish it again self._update_unit(thing, publish=False, advance=False) else: self._log.debug('umgr state cb ignores %s/%s', thing.get('uid'), thing.get('state')) return True # -------------------------------------------------------------------------- # def _update_unit(self, unit_dict, publish=False, advance=False): # FIXME: this is breaking the bulk! uid = unit_dict['uid'] with self._units_lock: # we don't care about units we don't know if uid not in self._units: return True # only update on state changes current = self._units[uid].state target = unit_dict['state'] if current == target: return True target, passed = rps._unit_state_progress(uid, current, target) if target in [rps.CANCELED, rps.FAILED]: # don't replay intermediate states passed = passed[-1:] for s in passed: unit_dict['state'] = s self._units[uid]._update(unit_dict) # we don't usually advance state at this point, but just keep up # with state changes reported from elsewhere if advance: self.advance(unit_dict, s, publish=publish, push=False, prof=False) return True # -------------------------------------------------------------------------- # def _call_unit_callbacks(self, unit_obj, state): with self._cb_lock: for cb_name, cb_val in self._callbacks[rpt.UNIT_STATE].iteritems(): self._log.debug('%s calls state cb %s for %s', self.uid, cb_name, unit_obj.uid) cb = cb_val['cb'] cb_data = cb_val['cb_data'] if cb_data: cb(unit_obj, state, cb_data) else : cb(unit_obj, state) # -------------------------------------------------------------------------- # # FIXME: this needs to go to the scheduler def _default_wait_queue_size_cb(self, umgr, wait_queue_size): # FIXME: this needs to come from the scheduler? if self._terminate.is_set(): return False self._log.info("[Callback]: wait_queue_size: %s.", wait_queue_size) # -------------------------------------------------------------------------- # @property def uid(self): """ Returns the unique id. """ return self._uid # -------------------------------------------------------------------------- # @property def scheduler(self): """ Returns the scheduler name. """ return self._cfg.get('scheduler') # -------------------------------------------------------------------------- #
[docs] def add_pilots(self, pilots): """ Associates one or more pilots with the unit manager. **Arguments:** * **pilots** [:class:`radical.pilot.ComputePilot` or list of :class:`radical.pilot.ComputePilot`]: The pilot objects that will be added to the unit manager. """ self.is_valid() if not isinstance(pilots, list): pilots = [pilots] if len(pilots) == 0: raise ValueError('cannot add no pilots') self._rep.info('<<add %d pilot(s)' % len(pilots)) with self._pilots_lock: # sanity check, and keep pilots around for inspection for pilot in pilots: pid = pilot.uid if pid in self._pilots: raise ValueError('pilot %s already added' % pid) self._pilots[pid] = pilot # sinscribe for state updates pilot.register_callback(self._pilot_state_cb) pilot_docs = [pilot.as_dict() for pilot in pilots] # publish to the command channel for the scheduler to pick up self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'add_pilots', 'arg' : {'pilots': pilot_docs, 'umgr' : self.uid}}) self._rep.ok('>>ok\n')
# -------------------------------------------------------------------------- #
[docs] def list_pilots(self): """ Lists the UIDs of the pilots currently associated with the unit manager. **Returns:** * A list of :class:`radical.pilot.ComputePilot` UIDs [`string`]. """ self.is_valid() with self._pilots_lock: return self._pilots.keys()
# -------------------------------------------------------------------------- #
[docs] def get_pilots(self): """ Get the pilots instances currently associated with the unit manager. **Returns:** * A list of :class:`radical.pilot.ComputePilot` instances. """ self.is_valid() with self._pilots_lock: return self._pilots.values()
# -------------------------------------------------------------------------- #
[docs] def remove_pilots(self, pilot_ids, drain=False): """ Disassociates one or more pilots from the unit manager. After a pilot has been removed from a unit manager, it won't process any of the unit manager's units anymore. Calling `remove_pilots` doesn't stop the pilot itself. **Arguments:** * **drain** [`boolean`]: Drain determines what happens to the units which are managed by the removed pilot(s). If `True`, all units currently assigned to the pilot are allowed to finish execution. If `False` (the default), then non-final units will be canceled. """ # TODO: Implement 'drain'. # NOTE: the actual removal of pilots from the scheduler is asynchron! if drain: raise RuntimeError("'drain' is not yet implemented") self.is_valid() if not isinstance(pilot_ids, list): pilot_ids = [pilot_ids] if len(pilot_ids) == 0: raise ValueError('cannot remove no pilots') self._rep.info('<<add %d pilot(s)' % len(pilot_ids)) with self._pilots_lock: # sanity check, and keep pilots around for inspection for pid in pilot_ids: if pid not in self._pilots: raise ValueError('pilot %s not added' % pid) del(self._pilots[pid]) # publish to the command channel for the scheduler to pick up self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'remove_pilots', 'arg' : {'pids' : pilot_ids, 'umgr' : self.uid}}) self._rep.ok('>>ok\n')
# -------------------------------------------------------------------------- #
[docs] def list_units(self): """ Returns the UIDs of the :class:`radical.pilot.ComputeUnit` managed by this unit manager. **Returns:** * A list of :class:`radical.pilot.ComputeUnit` UIDs [`string`]. """ self.is_valid() with self._pilots_lock: return self._units.keys()
# -------------------------------------------------------------------------- #
[docs] def submit_units(self, descriptions): """ Submits on or more :class:`radical.pilot.ComputeUnit` instances to the unit manager. **Arguments:** * **descriptions** [:class:`radical.pilot.ComputeUnitDescription` or list of :class:`radical.pilot.ComputeUnitDescription`]: The description of the compute unit instance(s) to create. **Returns:** * A list of :class:`radical.pilot.ComputeUnit` objects. """ from .compute_unit import ComputeUnit self.is_valid() ret_list = True if not isinstance(descriptions, list): ret_list = False descriptions = [descriptions] if len(descriptions) == 0: raise ValueError('cannot submit no unit descriptions') self._rep.info('<<submit %d unit(s)\n\t' % len(descriptions)) # we return a list of compute units units = list() for ud in descriptions: if not ud.executable: raise ValueError('compute unit executable must be defined') unit = ComputeUnit(umgr=self, descr=ud) units.append(unit) # keep units around with self._units_lock: self._units[unit.uid] = unit if self._session._rec: ru.write_json(ud.as_dict(), "%s/%s.batch.%03d.json" % (self._session._rec, unit.uid, self._rec_id)) self._rep.progress() if self._session._rec: self._rec_id += 1 # insert units into the database, as a bulk. unit_docs = [u.as_dict() for u in units] self._session._dbs.insert_units(unit_docs) # Only after the insert can we hand the units over to the next # components (ie. advance state). self.advance(unit_docs, rps.UMGR_SCHEDULING_PENDING, publish=True, push=True) self._rep.ok('>>ok\n') if ret_list: return units else : return units[0]
# -------------------------------------------------------------------------- #
[docs] def get_units(self, uids=None): """Returns one or more compute units identified by their IDs. **Arguments:** * **uids** [`string` or `list of strings`]: The IDs of the compute unit objects to return. **Returns:** * A list of :class:`radical.pilot.ComputeUnit` objects. """ self.is_valid() if not uids: with self._units_lock: ret = self._units.values() return ret ret_list = True if (not isinstance(uids, list)) and (uids is not None): ret_list = False uids = [uids] ret = list() with self._units_lock: for uid in uids: if uid not in self._units: raise ValueError('unit %s not known' % uid) ret.append(self._units[uid]) if ret_list: return ret else : return ret[0]
# -------------------------------------------------------------------------- #
[docs] def wait_units(self, uids=None, state=None, timeout=None): """ Returns when one or more :class:`radical.pilot.ComputeUnits` reach a specific state. If `uids` is `None`, `wait_units` returns when **all** ComputeUnits reach the state defined in `state`. This may include units which have previously terminated or waited upon. **Example**:: # TODO -- add example **Arguments:** * **uids** [`string` or `list of strings`] If uids is set, only the ComputeUnits with the specified uids are considered. If uids is `None` (default), all ComputeUnits are considered. * **state** [`string`] The state that ComputeUnits have to reach in order for the call to return. By default `wait_units` waits for the ComputeUnits to reach a terminal state, which can be one of the following: * :data:`radical.pilot.rps.DONE` * :data:`radical.pilot.rps.FAILED` * :data:`radical.pilot.rps.CANCELED` * **timeout** [`float`] Timeout in seconds before the call returns regardless of Pilot state changes. The default value **None** waits forever. """ self.is_valid() if not uids: with self._units_lock: uids = list() for uid,unit in self._units.iteritems(): if unit.state not in rps.FINAL: uids.append(uid) if not state: states = rps.FINAL elif isinstance(state, list): states = state else: states = [state] # we simplify state check by waiting for the *earliest* of the given # states - if the unit happens to be in any later state, we are sure the # earliest has passed as well. check_state_val = rps._unit_state_values[rps.FINAL[-1]] for state in states: check_state_val = min(check_state_val, rps._unit_state_values[state]) ret_list = True if not isinstance(uids, list): ret_list = False uids = [uids] self._rep.info('<<wait for %d unit(s)\n\t' % len(uids)) start = time.time() to_check = None with self._units_lock: to_check = [self._units[uid] for uid in uids] # We don't want to iterate over all units again and again, as that would # duplicate checks on units which were found in matching states. So we # create a list from which we drop the units as we find them in # a matching state self._rep.idle(mode='start') while to_check and not self._terminate.is_set(): # check timeout if timeout and (timeout <= (time.time() - start)): self._log.debug ("wait timed out") break time.sleep (0.1) # FIXME: print percentage... self._rep.idle() # print 'wait units: %s' % [[u.uid, u.state] for u in to_check] check_again = list() for unit in to_check: # we actually don't check if a unit is in a specific (set of) # state(s), but rather check if it ever *has been* in any of # those states if unit.state not in rps.FINAL and \ rps._unit_state_values[unit.state] <= check_state_val: # this unit does not match the wait criteria check_again.append(unit) else: # stop watching this unit if unit.state in [rps.FAILED]: self._rep.idle(color='error', c='-') elif unit.state in [rps.CANCELED]: self._rep.idle(color='warn', c='*') else: self._rep.idle(color='ok', c='+') to_check = check_again self.is_valid() self._rep.idle(mode='stop') if to_check: self._rep.warn('>>timeout\n') else : self._rep.ok( '>>ok\n') # grab the current states to return state = None with self._units_lock: states = [self._units[uid].state for uid in uids] # done waiting if ret_list: return states else : return states[0]
# -------------------------------------------------------------------------- #
[docs] def cancel_units(self, uids=None): """ Cancel one or more :class:`radical.pilot.ComputeUnits`. Note that cancellation of units is *immediate*, i.e. their state is immediately set to `CANCELED`, even if some RP component may still operate on the units. Specifically, other state transitions, including other final states (`DONE`, `FAILED`) can occur *after* cancellation. This is a side effect of an optimization: we consider this acceptable tradeoff in the sense "Oh, that unit was DONE at point of cancellation -- ok, we can use the results, sure!". If that behavior is not wanted, set the environment variable: export RADICAL_PILOT_STRICT_CANCEL=True **Arguments:** * **uids** [`string` or `list of strings`]: The IDs of the compute units objects to cancel. """ self.is_valid() if not uids: with self._units_lock: uids = self._units.keys() else: if not isinstance(uids, list): uids = [uids] # NOTE: We advance all units to cancelled, and send a cancellation # control command. If that command is picked up *after* some # state progression, we'll see state transitions after cancel. # For non-final states that is not a problem, as it is equivalent # with a state update message race, which our state collapse # mechanism accounts for. For an eventual non-canceled final # state, we do get an invalid state transition. That is also # corrected eventually in the state collapse, but the point # remains, that the state model is temporarily violated. We # consider this a side effect of the fast-cancel optimization. # # The env variable 'RADICAL_PILOT_STRICT_CANCEL == True' will # disable this optimization. # # FIXME: the effect of the env var is not well tested if 'RADICAL_PILOT_STRICT_CANCEL' not in os.environ: with self._units_lock: units = [self._units[uid] for uid in uids ] unit_docs = [unit.as_dict() for unit in units] self.advance(unit_docs, state=rps.CANCELED, publish=True, push=True) # we *always* issue the cancellation command to the local components self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'cancel_units', 'arg' : {'uids' : uids, 'umgr' : self.uid}}) # we also inform all pilots about the cancelation request self._session._dbs.pilot_command(cmd='cancel_units', arg={'uids':uids}) # In the default case of calling 'advance' above, we just set the state, # so we *know* units are canceled. But we nevertheless wait until that # state progression trickled through, so that the application will see # the same state on unit inspection. self.wait_units(uids=uids)
# -------------------------------------------------------------------------- #
[docs] def register_callback(self, cb, metric=rpt.UNIT_STATE, cb_data=None): """ Registers a new callback function with the UnitManager. Manager-level callbacks get called if the specified metric changes. The default metric `UNIT_STATE` fires the callback if any of the ComputeUnits managed by the PilotManager change their state. All callback functions need to have the same signature:: def cb(obj, value, cb_data) where ``object`` is a handle to the object that triggered the callback, ``value`` is the metric, and ``data`` is the data provided on callback registration.. In the example of `UNIT_STATE` above, the object would be the unit in question, and the value would be the new state of the unit. Available metrics are: * `UNIT_STATE`: fires when the state of any of the units which are managed by this unit manager instance is changing. It communicates the unit object instance and the units new state. * `WAIT_QUEUE_SIZE`: fires when the number of unscheduled units (i.e. of units which have not been assigned to a pilot for execution) changes. """ # FIXME: the signature should be (self, metrics, cb, cb_data) if metric not in rpt.UMGR_METRICS : raise ValueError ("Metric '%s' not available on the umgr" % metric) with self._cb_lock: cb_name = cb.__name__ self._callbacks[metric][cb_name] = {'cb' : cb, 'cb_data' : cb_data}
# -------------------------------------------------------------------------- # def unregister_callback(self, cb=None, metric=None): if metric and metric not in rpt.UMGR_METRICS : raise ValueError ("Metric '%s' not available on the umgr" % metric) if not metric: metrics = rpt.UMGR_METRICS elif isinstance(metric, list): metrics = metric else: metrics = [metric] with self._cb_lock: for metric in metrics: if cb: to_delete = [cb.__name__] else: to_delete = self._callbacks[metric].keys() for cb_name in to_delete: if cb_name not in self._callbacks[metric]: raise ValueError("Callback %s not registered" % cb_name) del(self._callbacks[metric][cb_name])
# ------------------------------------------------------------------------------