5.1. Getting Started

This is where you should start if you are new to RADICAL-Pilot. It is highly recommended that you carefully read and understand all of this before you go off and start developing your own applications.

In this chapter we explain the main components of RADICAL-Pilot and the foundations of their function and their interplay. For your convenience, you can find a fully working example at the end of this page.

After you have worked through this chapter, you will understand how to launch a local ComputePilot and use a UnitManager to schedule and run ComputeUnits (tasks) on it. Throughout this chapter you will also find links to more advanced topics like launching ComputePilots on remote HPC clusters and scheduling.

Note

This chapter assumes that you have successfully installed RADICAL-Pilot on (see chapter Installation).

5.1.1. Loading the Module

In order to use RADICAL-Pilot in your Python application, you need to import the radical.pilot module.

import radical.pilot

You can check / print the version of your RADICAL-Pilot installation via the version property.

print radical.pilot.version

5.1.2. Creating a Session

A radical.pilot.Session is the root object for all other objects in RADICAL- Pilot. You can think of it as a tree or a directory structure with a Session as root. Each Session can have zero or more radical.pilot.Context, radical.pilot.PilotManager and radical.pilot.UnitManager attached to it.

(~~~~~~~~~)
(         ) <---- [Session]
( MongoDB )       |
(         )       |---- Context
(_________)       |---- ....
                  |
                  |---- [PilotManager]
                  |     |
                  |     |---- ComputePilot
                  |     |---- ComputePilot
                  |
                  |---- [UnitManager]
                  |     |
                  |     |---- ComputeUnit
                  |     |---- ComputeUnit
                  |     |....
                  |
                  |---- [UnitManager]
                  |     |
                  |     |....
                  |
                  |....

A Session also encapsulates the connection(s) to a back end MongoDB server which is the brain and central nervous system of RADICAL-Pilot. More information about how RADICAL-Pilot uses MongoDB can be found in the Introduction section.

To create a new Session, the only thing you need to provide is the URL of a MongoDB server:

session = radical.pilot.Session(database_url="mongodb://my-mongodb-server.edu:27017")

Each Session has a unique identifier (uid) and methods to traverse its members. The Session uid can be used to disconnect and reconnect to a Session as required. This is covered in Disconnecting and Reconnecting.

print "UID           : %s" % session.uid
print "Contexts      : %s" % session.list_contexts()
print "UnitManagers  : %s" % session.list_unit_managers()
print "PilotManagers : %s" % session.list_pilot_managers()

Warning

Always call radical.pilot.Session.close() before your application terminates. This will ensure that RADICAL-Pilot shuts down properly.

5.1.3. Creating a ComputePilot

A radical.pilot.ComputePilot is responsible for ComputeUnit (task) execution. ComputePilots can be launched either locally or remotely, on a single machine or on one or more HPC clusters. In this example we just use local ComputePilots, but more on remote ComputePilots and how to launch them on HPC clusters can be found in Launching Remote / HPC ComputePilots.

As shown in the hierarchy above, ComputePilots are grouped in radical.pilot.PilotManager containers, so before you can launch a ComputePilot, you need to add a PilotManager to your Session. Just like a Session, a PilotManager has a unique id (uid) as well as a traversal method (list_pilots).

pmgr = radical.pilot.PilotManager(session=session)
print "PM UID        : %s" % pmgr.uid
print "Pilots        : %s" % pmgr.list_pilots()

In order to create a new ComputePilot, you first need to describe its requirements and properties. This is done with the help of a radical.pilot.ComputePilotDescription object. The mandatory properties that you need to define are:

  • resource - The name (hostname) of the target system or localhost to launch a local ComputePilot.
  • runtime - The runtime (in minutes) of the ComputePilot agent.
  • cores - The number or cores the ComputePilot agent will try to allocate.

You can define and submit a 2-core local pilot that runs for 5 minutes like this:

pdesc = radical.pilot.ComputePilotDescription()
pdesc.resource  = "local.localhost"
pdesc.runtime   = 5 # minutes
pdesc.cores     = 2

A ComputePilot is launched by passing the ComputePilotDescription to the submit_pilots() method of the PilotManager. This automatically adds the ComputePilot to the PilotManager. Like any other object in RADICAL-Pilot, a ComputePilot also has a unique identifier (uid)

pilot = pmgr.submit_pilots(pdesc)
print "Pilot UID     : %s" % pilot.uid

Warning

Note that submit_pilots() is a non-blocking call and that the submitted ComputePilot agent will not terminate when your Python scripts finishes. ComputePilot agents terminate only after they have reached their runtime limit or if you call radical.pilot.PilotManager.cancel_pilots() or radical.pilot.ComputePilot.cancel().

Note

You can change to the ComputePilot sandbox directory (/tmp/radical.pilot.sandbox in the above example) to see the raw logs and output files of the ComputePilot agent(s) [pilot-<uid>] as well as the working directories and output of the individual ComputeUnits ([task-<uid>]).

[/<sandbox-dir>/]
|
|----[pilot-<uid>/]
|    |
|    |---- STDERR
|    |---- STDOUT
|    |---- AGENT.LOG
|    |---- [task-<uid>/]
|    |---- [task-<uid>/]
|    |....
|
|....

Knowing where to find these files might come in handy for debugging purposes but it is not required for regular RADICAL-Pilot usage.

5.1.4. Creating ComputeUnits (Tasks)

After you have launched a ComputePilot, you can now generate a few radical.pilot.ComputeUnit objects for the ComputePilot to execute. You can think of a ComputeUnit as something very similar to an operating system process that consists of an executable, a list of arguments, and an environment along with some runtime requirements.

Analogous to ComputePilots, a ComputeUnit is described via a radical.pilot.ComputeUnitDescription object. The mandatory properties that you need to define are:

  • executable - The executable to launch.
  • arguments - The arguments to pass to the executable.
  • cores - The number of cores required by the executable.

For example, you can create a workload of 8 ‘/bin/sleep’ ComputeUnits like this:

compute_units = []

for unit_count in range(0, 8):
    cu = radical.pilot.ComputeUnitDescription()
    cu.environment = {"SLEEP_TIME" : "10"}
    cu.executable  = "/bin/sleep"
    cu.arguments   = ["$SLEEP_TIME"]
    cu.cores       = 1

    compute_units.append(cu)

Note

The example above uses a single executable that requires only one core. It is however possible to run multiple commands in one ComputeUnit. This is described in Executing Multiple Commands in a Single ComputeUnit. If you want to run multi-core executables, like for example MPI programs, check out Executing Multicore / Multithreaded ComputeUnits.

5.1.5. Input- / Output-File Transfer

Often, a computational task doesn’t just consist of an executable with some arguments but also needs some input data. For this reason, a radical.pilot.ComputeUnitDescription allows the definition of input_staging and output_staging:

  • input_staging defines a list of local files that need to be transferred to the execution resource before a ComputeUnit can start running.
  • output_staging defines a list of remote files that need to be transferred back to the local machine after a ComputeUnit has finished execution.

See Data Staging for more information on data staging.

Furthermore, a ComputeUnit provides two properties radical.pilot.ComputeUnit.stdout and radical.pilot.ComputeUnit.stderr that can be used to access a ComputeUnit’s STDOUT and STDERR files after it has finished execution.

Example:

cu = radical.pilot.ComputeUnitDescription()
cu.executable    = "/bin/cat"
cu.arguments     = ["file1.dat", "file2.dat"]
cu.cores         = 1
cu.input_staging = ["./file1.dat", "./file2.dat"]

5.1.6. Adding Callbacks

Events in RADICAL-Pilot are mostly asynchronous as they happen at one or more distributed components, namely the ComputePilot agents. At any time during the execution of a workload, ComputePilots and ComputeUnits can begin or finish execution or fail with an error.

RADICAL-Pilot provides callbacks as a method to react to these events asynchronously when they occur. ComputePilots, PilotManagers, ComputeUnits and UnitManagers all have a register_callbacks method:

A simple callback that prints the state of all pilots would look something like this:

def pilot_state_cb(pilot, state):
    print "[Callback]: ComputePilot '%s' state changed to '%s'."% (pilot.uid, state)

pmgr = radical.pilot.PilotManager(session=session)
pmgr.register_callback(pilot_state_cb)

Note

Using callbacks can greatly improve the performance of an application since it eradicates the necessity for global / blocking wait() calls and state polling. More about callbacks can be read in Programming with Callbacks.

5.1.7. Scheduling ComputeUnits

In the previous steps we have created and launched a ComputePilot (via a PilotManager) and created a list of ComputeUnitDescriptions. In order to put it all together and execute the ComputeUnits on the ComputePilot, we need to create a radical.pilot.UnitManager instance.

As shown in the diagram below, a UnitManager combines three things: the ComputeUnits, added via radical.pilot.UnitManager.submit_units(), one or more ComputePilots, added via radical.pilot.UnitManager.add_pilots() and a Unit Scheduler. Once instantiated, a UnitManager assigns the submitted CUs to one of its ComputePilots based on the selected scheduling algorithm.

+----+  +----+  +----+  +----+       +----+
| CU |  | CU |  | CU |  | CU |  ...  | CU |
+----+  +----+  +----+  +----+       +----+
   |       |       |       |            |
   |_______|_______|_______|____________|
                     |
                     v submit_units()
             +---------------+
             |  UnitManager  |
             |---------------|
             |               |
             |  <SCHEDULER>  |
             +---------------+
                     ^ add_pilots()
                     |
           __________|___________
           |       |            |
        +~~~~+  +~~~~+       +~~~~+
        | CP |  | CP |  ...  | CP |
        +~~~~+  +~~~~+       +~~~~+

Since we have only one ComputePilot, we don’t need any specific scheduling algorithm for our example. We choose SCHED_DIRECT_SUBMISSION which simply passes the ComputeUnits on to the ComputePilot.

umgr = radical.pilot.UnitManager(session=session, scheduler=radical.pilot.SCHED_DIRECT_SUBMISSION)

umgr.add_pilots(pilot)
umgr.submit_units(compute_units)

umgr.wait_units()

The radical.pilot.UnitManager.wait_units() call blocks until all ComputeUnits have been executed by the UnitManager. Simple control flows / dependencies can be realized with wait_units(), however, for more complex control flows it can become inefficient due to its blocking nature. To address this, RADICAL-Pilot also provides mechanisms for asynchronous notifications and callbacks. This is discussed in more detail in Application Control Flow with Callbacks.

Note

The SCHED_DIRECT_SUBMISSION only works with a sinlge ComputePilot. If you add more than one ComputePilot to a UnitManager, you will end up with an error. If you want to use RADICAL-Pilot to run multiple ComputePilots concurrently, possibly on different machines, check out Launching Remote / HPC ComputePilots.

5.1.8. Results and Inspection

for unit in umgr.get_units():
    print "unit id  : %s" % unit.uid
    print "  state  : %s" % unit.state
    print "  history:"
    for entry in unit.state_history :
        print "           %s : %s" (entry.timestamp, entry.state)

5.1.9. Cleanup and Shutdown

When your application has finished executing all ComputeUnits, it should make an attempt to cancel the ComputePilot. If a ComputePilot is not canceled, it will continue running until it reaches its runtime limit, even if application has terminated.

An individual ComputePilot is canceled by calling radical.pilot.ComputePilot.cancel(). Alternatively, all ComputePilots of a PilotManager can be canceled by calling radical.pilot.PilotManager.cancel_pilots().

pmgr.cancel_pilots()

Before your application terminates, you should always call radical.pilot.Session.close() to ensure that your RADICAL-Pilot session terminates properly. If you haven’t canceled the pilots before explicitly, close() will take care of that implicitly (control it via the terminate parameter). close() will also delete all traces of the session from the database (control this with the cleanup parameter).

session.close(cleanup=True, terminate=True)

5.1.10. What’s Next?

Now that you understand the basic mechanics of RADICAL-Pilot, it’s time to dive into some of the more advanced topics. We suggest that you check out the following chapters next:

  • Error Handling Strategies. Error handling is crucial for any RADICAL-Pilot application! This chapter captures everything from exception handling to state callbacks.
  • Launching Remote / HPC ComputePilots. In this chapter we explain how to launch ComputePilots on remote HPC clusters, something you most definitely want to do.
  • Disconnecting and Reconnecting. This chapter is very useful for example if you work with long-running tasks that don’t need continuous supervision.

5.1.11. The Complete Example

Below is a complete and working example that puts together everything we discussed in this section. You can download the sources from here.

import os
import sys
import time
import radical.pilot as rp

# READ: The RADICAL-Pilot documentation: 
#   http://radicalpilot.readthedocs.org/en/latest
#
# Try running this example with RADICAL_PILOT_VERBOSE=debug set if 
# you want to see what happens behind the scences!


#------------------------------------------------------------------------------
#
def pilot_state_cb (pilot, state) :
    """ this callback is invoked on all pilot state changes """

    print "[Callback]: ComputePilot '%s' state: %s." % (pilot.uid, state)

    if  state == rp.FAILED :
        sys.exit (1)


#------------------------------------------------------------------------------
#
def unit_state_cb (unit, state) :
    """ this callback is invoked on all unit state changes """

    print "[Callback]: ComputeUnit  '%s: %s' (on %s) state: %s." \
        % (unit.name, unit.uid, unit.pilot_id, state)

    if  state == rp.FAILED :
        print "stderr: %s" % unit.stderr
        sys.exit (1)


#------------------------------------------------------------------------------
#
def wait_queue_size_cb(umgr, wait_queue_size):
    """ 
    this callback is called when the size of the unit managers wait_queue
    changes.
    """
    print "[Callback]: UnitManager  '%s' wait_queue_size changed to %s." \
        % (umgr.uid, wait_queue_size)

    pilots = umgr.get_pilots ()
    for pilot in pilots :
        print "pilot %s: %s" % (pilot.uid, pilot.state)

    if  wait_queue_size == 0 :
        for pilot in pilots :
            if  pilot.state in [rp.PENDING_LAUNCH,
                                rp.LAUNCHING     ,
                                rp.PENDING_ACTIVE] :
                print "cancel pilot %s" % pilot.uid
                umgr.remove_pilot (pilot.uid)
                pilot.cancel ()


#------------------------------------------------------------------------------
#
if __name__ == "__main__":

    # prepare some input files for the compute units
    os.system ('hostname > file1.dat')
    os.system ('date     > file2.dat')

    # Create a new session. A session is the 'root' object for all other
    # RADICAL-Pilot objects. It encapsulates the MongoDB connection(s) as
    # well as security credentials.
    session = rp.Session()
    print "session id: %s" % session.uid

    # Add a Pilot Manager. Pilot managers manage one or more ComputePilots.
    pmgr = rp.PilotManager(session=session)

    # Register our callback with the PilotManager. This callback will get
    # called every time any of the pilots managed by the PilotManager
    # change their state.
    pmgr.register_callback(pilot_state_cb)

    # Define a 4-core local pilot that runs for 10 minutes and cleans up
    # after itself.
    pdesc = rp.ComputePilotDescription()
    pdesc.resource = "local.localhost"
    pdesc.runtime  = 5 # minutes
    pdesc.cores    = 1
    pdesc.cleanup  = True

    # Launch the pilot.
    pilot = pmgr.submit_pilots(pdesc)

    # Combine the ComputePilot, the ComputeUnits and a scheduler via
    # a UnitManager object.
    umgr = rp.UnitManager(
        session=session,
        scheduler=rp.SCHED_BACKFILLING)

    # Register our callback with the UnitManager. This callback will get
    # called every time any of the units managed by the UnitManager
    # change their state.
    umgr.register_callback(unit_state_cb, rp.UNIT_STATE)

    # Register also a callback which tells us when all units have been
    # assigned to pilots
    umgr.register_callback(wait_queue_size_cb, rp.WAIT_QUEUE_SIZE)


    # Add the previously created ComputePilot to the UnitManager.
    umgr.add_pilots(pilot)

    # Create a workload of ComputeUnits (tasks). Each compute unit
    # uses /bin/cat to concatenate two input files, file1.dat and
    # file2.dat. The output is written to STDOUT. cu.environment is
    # used to demonstrate how to set environment variables within a
    # ComputeUnit - it's not strictly necessary for this example. As
    # a shell script, the ComputeUnits would look something like this:
    #
    #    export INPUT1=file1.dat
    #    export INPUT2=file2.dat
    #    /bin/cat $INPUT1 $INPUT2
    #
    cuds = []
    for unit_count in range(0, 20):
        cud = rp.ComputeUnitDescription()
        cud.name          = "unit_%03d" % unit_count
        cud.executable    = "/bin/sh"
        cud.environment   = {'INPUT1': 'file1.dat', 'INPUT2': 'file2.dat'}
        cud.arguments     = ["-l", "-c", "cat $INPUT1 $INPUT2"]
        cud.cores         = 1
        cud.input_staging = ['file1.dat', 'file2.dat']

        cuds.append(cud)

    # Submit the previously created ComputeUnit descriptions to the
    # PilotManager. This will trigger the selected scheduler to start
    # assigning ComputeUnits to the ComputePilots.
    units = umgr.submit_units(cuds)

    # Wait for all compute units to reach a terminal state (DONE or FAILED).
    umgr.wait_units()

    print 'units all done'
    print '----------------------------------------------------------------------'

    for unit in units :
        unit.wait ()

    for unit in units:
        print "* Task %s (executed @ %s) state %s, exit code: %s, started: %s, finished: %s, stdout: %s" \
            % (unit.uid, unit.execution_locations, unit.state, unit.exit_code, unit.start_time, unit.stop_time, unit.stdout)

    # Close automatically cancels the pilot(s).
    session.close (terminate=True)

    # delete the test data files
    os.system ('rm file1.dat')
    os.system ('rm file2.dat')