4. Data Staging

Note

Currently RADICAL-Pilot only supports data on file abstraction level, so data == files at this moment.

Many, if not all, programs require input data to operate and create output data as a result in some form or shape. RADICAL-Pilot has a set of constructs that allows the user to specify the required staging of input and output files for a Compute Unit.

The primary constructs are on the level of the Compute Unit (Description) which are discussed in the next section. For more elaborate use-cases we also have constructs on the Compute Pilot level, which are discussed later in this chapter.

Note

RADICAL-Pilot uses system calls for local file operations and SAGA for remote transfers and URL specification.

4.1. Compute Unit I/O

To instruct RADICAL-Pilot to handle files for you, there are two things to take care of. First you need to specify the respective input and output files for the Compute Unit in so called staging directives. Additionally you need to associate these staging directives to the the Compute Unit by means of the input_staging and output_staging members.

4.1.1. What it looks like

The following code snippet shows this in action:

INPUT_FILE_NAME  = "INPUT_FILE.TXT"
OUTPUT_FILE_NAME = "OUTPUT_FILE.TXT"

# This executes: "/usr/bin/sort -o OUTPUT_FILE.TXT INPUT_FILE.TXT"
cud = radical.pilot.ComputeUnitDescription()
cud.executable = "/usr/bin/sort"
cud.arguments = ["-o", OUTPUT_FILE_NAME, INPUT_FILE_NAME]
cud.input_staging  = INPUT_FILE_NAME
cud.output_staging = OUTPUT_FILE_NAME

Here the staging directives INPUT_FILE_NAME and OUTPUT_FILE_NAME are simple strings that both specify a single filename and are associated to the Compute Unit Description cud for input and output respectively.

What this does is that the file INPUT_FILE.TXT is transferred from the local directory to the directory where the task is executed. After the task has run, the file OUTPUT_FILE.TXT that has been created by the task, will be transferred back to the local directory.

The String-Based Input and Output Transfer example demonstrates this in full glory.

4.1.2. Staging Directives

The format of the staging directives can either be a string as above or a dict of the following structure:

staging_directive = {
    'source':   source,   # radical.pilot.Url() or string (MANDATORY).
    'target':   target,   # radical.pilot.Url() or string (OPTIONAL).
    'action':   action,   # One of COPY, LINK, MOVE or TRANSFER (OPTIONAL).
    'flags':    flags,    # Zero or more of CREATE_PARENTS or SKIP_FAILED (OPTIONAL).
    'priority': priority  # A number to instruct ordering (OPTIONAL).
}

The semantics of the keys from the dict are as follows:

  • source (default: None) and target (default: os.path.basename(source)):

    In case of the staging directive being used for input, then the source refers to the location to get the input files from, e.g. the local working directory on your laptop or a remote data repository, and target refers to the working directory of the ComputeUnit. Alternatively, in case of the staging directive being used for output, then the source refers to the output files being generated by the ComputeUnit in the working directory and target refers to the location where you need to store the output data, e.g. back to your laptop or some remote data repository.

  • action (default: TRANSFER):

    The ultimate goal is to make data available to the application kernel in the ComputeUnit and to be able to make the results available for further use. Depending on the relative location of the working directory of the source to the target location, the action can be COPY (local resource), LINK (same file system), MOVE (local resource), or TRANSFER (to a remote resource).

  • flags (default: [CREATE_PARENTS, SKIP_FAILED]):

    By passing certain flags we can influence the behavior of the action. Available flags are:

    • CREATE_PARENTS: Create parent directories while writing file.
    • SKIP_FAILED: Don’t stage out files if tasks failed.

    In case of multiple values these can be passed as a list.

  • priority (default: 0):

    This optional field can be used to instruct the backend to priority the actions on the staging directives. E.g. to first stage the output that is required for immediate further analysis and afterwards some output files that are of secondary concern.

The Dictionary-Based Input and Output Transfer example demonstrates this in full glory.

When the staging directives are specified as a string as we did earlier, that implies a staging directive where the source and the target are equal to the content of the string, the action is set to the default action TRANSFER, the flags are set to the default flags CREATE_PARENTS and SKIP_FAILED, and the priority is set to the default value 0:

'INPUT_FILE.TXT' == {
    'source':   'INPUT_FILE.TXT',
    'target':   'INPUT_FILE.TXT',
    'action':   TRANSFER,
    'flags':    [CREATE_PARENTS, SKIP_FAILED],
    'priority': 0
}

4.1.3. Staging Area

As the pilot job creates an abstraction for a computational resource, the user does not necessarily know where the working directory of the Compute Pilot or the Compute Unit is. Even if he knows, the user might not have direct access to it. For this situation we have the staging area, which is a special construct so that the user can specify files relative to or in the working directory without knowing the exact location. This can be done using the following URL format:

'staging:///INPUT_FILE.TXT'

The Pipeline example demonstrates this in full glory.

4.2. Compute Pilot I/O

As mentioned earlier, in addition to the constructs on Compute Unit-level RADICAL-Pilot also has constructs on Compute Pilot-level. The main rationale for this is that often there is (input) data to be shared between multiple Compute Units. Instead of transferring the same files for every Compute Unit, we can transfer the data once to the Pilot, and then make it available to every Compute Unit that needs it.

This works in a similar way as the Compute Unit-IO, where we use also use the Staging Directive to specify the I/O transaction The difference is that in this case, the Staging Directive is not associated to the Description, but used in a direct method call pilot.stage_in(sd_pilot).

# Configure the staging directive for to insert the shared file into
# the pilot staging directory.
sd_pilot = {'source': shared_input_file_url,
            'target': os.path.join(MY_STAGING_AREA, SHARED_INPUT_FILE),
            'action': radical.pilot.TRANSFER
}
# Synchronously stage the data to the pilot
pilot.stage_in(sd_pilot)

The Shared Input Files example demonstrates this in full glory.

Note

The call to stage_in() is synchronous and will return once the transfer is complete.

4.3. Examples

Note

All of the following examples are configured to run on localhost, but they can be easily changed to run on a remote resource by modifying the resource specification in the Compute Pilot Description. Also note the comments in Staging Area when changing the examples to a remote target.

These examples require an installation of RADICAL-Pilot of course. There are download links for each of the examples.

4.3.1. String-Based Input and Output Transfer

This example demonstrates the simplest form of the data staging capabilities. The example demonstrates how a local input file is staged through RADICAL-Pilot, processed by the Compute Unit and the resulting output file is staged back to the local environment.

Note

Download the example:

curl -O https://raw.githubusercontent.com/radical-cybertools/radical.pilot/readthedocs/examples/io_staging_simple.py

import os
import sys
import radical.pilot as rp

# READ: The RADICAL-Pilot documentation: 
#   http://radicalpilot.readthedocs.org/en/latest
#
# Try running this example with RADICAL_PILOT_VERBOSE=debug set if 
# you want to see what happens behind the scenes!


#------------------------------------------------------------------------------
#
def pilot_state_cb (pilot, state) :
    """ this callback is invoked on all pilot state changes """

    print "[Callback]: ComputePilot '%s' state: %s." % (pilot.uid, state)

    if  state == rp.FAILED :
        sys.exit (1)


#------------------------------------------------------------------------------
#
def unit_state_cb (unit, state) :
    """ this callback is invoked on all unit state changes """

    print "[Callback]: ComputeUnit '%s' state: %s." % (unit.uid, state)

    if  state == rp.FAILED :
        sys.exit (1)


#------------------------------------------------------------------------------
#
if __name__ == "__main__":

    # Create a new session. A session is the 'root' object for all other
    # RADICAL-Pilot objects. It encapsulates the MongoDB connection(s) as
    # well as security credentials.
    session = rp.Session()

    # Add a Pilot Manager. Pilot managers manage one or more ComputePilots.
    pmgr = rp.PilotManager(session=session)

    # Register our callback with the PilotManager. This callback will get
    # called every time any of the pilots managed by the PilotManager
    # change their state.
    pmgr.register_callback(pilot_state_cb)

    # Define a single-core local pilot that runs for 5 minutes and cleans up
    # after itself.
    pdesc = rp.ComputePilotDescription()
    pdesc.resource = "local.localhost"
    pdesc.cores    = 1
    pdesc.runtime  = 5 # Minutes
    #pdesc.cleanup  = True

    # Launch the pilot.
    pilot = pmgr.submit_pilots(pdesc)

    # Create a Compute Unit that sorts the local password file and writes the
    # output to result.dat.
    #
    #  The exact command that is executed by the agent is:
    #    "/usr/bin/sort -o result.dat passwd"
    #
    cud = rp.ComputeUnitDescription()
    cud.executable     = "/usr/bin/sort"
    cud.arguments      = ["-o", "result.dat", "passwd"]
    cud.input_staging  = "/etc/passwd"
    cud.output_staging = "result.dat"

    # Combine the ComputePilot, the ComputeUnits and a scheduler via
    # a UnitManager object.
    umgr = rp.UnitManager(session, rp.SCHED_DIRECT_SUBMISSION)

    # Register our callback with the UnitManager. This callback will get
    # called every time any of the units managed by the UnitManager
    # change their state.
    umgr.register_callback(unit_state_cb)

    # Add the previously created ComputePilot to the UnitManager.
    umgr.add_pilots(pilot)

    # Submit the previously created ComputeUnit description to the
    # PilotManager. This will trigger the selected scheduler to start
    # assigning the ComputeUnit to the ComputePilot.
    unit = umgr.submit_units(cud)

    # Wait for the compute unit to reach a terminal state (DONE or FAILED).
    umgr.wait_units()

    print "* Task %s (executed @ %s) state: %s, exit code: %s, started: %s, " \
          "finished: %s, output file: %s" % \
          (unit.uid, unit.execution_locations, unit.state,
           unit.exit_code,  unit.start_time, unit.stop_time,
           unit.description.output_staging[0]['target'])

    # Close automatically cancels the pilot(s).
    session.close()

# -----------------------------------------------------------------------------

4.3.2. Dictionary-Based Input and Output Transfer

This example demonstrates the use of the staging directives structure to have more control over the staging behavior. The flow of the example is similar to that of the previous example, but here we show that by using the dict-based Staging Directive, one can specify different names and paths for the local and remote files, a feature that is often required in real-world applications.

Note

Download the example:

curl -O https://raw.githubusercontent.com/radical-cybertools/radical.pilot/readthedocs/examples/io_staging_dict.py

import os
import sys
import radical.pilot as rp

# READ: The RADICAL-Pilot documentation: 
#   http://radicalpilot.readthedocs.org/en/latest
#
# Try running this example with RADICAL_PILOT_VERBOSE=debug set if 
# you want to see what happens behind the scenes!


#------------------------------------------------------------------------------
#
def pilot_state_cb (pilot, state) :
    """ this callback is invoked on all pilot state changes """

    print "[Callback]: ComputePilot '%s' state: %s." % (pilot.uid, state)

    if  state == rp.FAILED :
        sys.exit (1)


#------------------------------------------------------------------------------
#
def unit_state_cb (unit, state) :
    """ this callback is invoked on all unit state changes """

    print "[Callback]: ComputeUnit '%s' state: %s." % (unit.uid, state)

    if  state == rp.FAILED :
        sys.exit (1)


#------------------------------------------------------------------------------
#
if __name__ == "__main__":

    # Create a new session. A session is the 'root' object for all other
    # RADICAL-Pilot objects. It encapsulates the MongoDB connection(s) as
    # well as security credentials.
    session = rp.Session()

    # Add a Pilot Manager. Pilot managers manage one or more ComputePilots.
    pmgr = rp.PilotManager(session=session)

    # Register our callback with the PilotManager. This callback will get
    # called every time any of the pilots managed by the PilotManager
    # change their state.
    pmgr.register_callback(pilot_state_cb)

    # Define a single-core local pilot that runs for 5 minutes and cleans up
    # after itself.
    pdesc = rp.ComputePilotDescription()
    pdesc.resource = "local.localhost"
    pdesc.cores    = 8
    pdesc.runtime  = 5 # Minutes
    #pdesc.cleanup  = True

    # Launch the pilot.
    pilot = pmgr.submit_pilots(pdesc)

    input_sd = {
        'source': '/etc/passwd',
        'target': 'input.dat'
    }

    output_sd = {
        'source': 'result.dat',
        'target': '/tmp/result.dat'
    }

    # Create a Compute Unit that sorts the local password file and writes the
    # output to result.dat.
    #
    #  The exact command that is executed by the agent is:
    #    "/usr/bin/sort -o result.dat input.dat"
    #
    cud = rp.ComputeUnitDescription()
    cud.executable     = "sort"
    cud.arguments      = ["-o", "result.dat", "input.dat"]
    cud.input_staging  = input_sd
    cud.output_staging = output_sd

    # Combine the ComputePilot, the ComputeUnits and a scheduler via
    # a UnitManager object.
    umgr = rp.UnitManager(session, rp.SCHED_DIRECT_SUBMISSION)

    # Register our callback with the UnitManager. This callback will get
    # called every time any of the units managed by the UnitManager
    # change their state.
    umgr.register_callback(unit_state_cb)

    # Add the previously created ComputePilot to the UnitManager.
    umgr.add_pilots(pilot)

    # Submit the previously created ComputeUnit description to the
    # PilotManager. This will trigger the selected scheduler to start
    # assigning the ComputeUnit to the ComputePilot.
    unit = umgr.submit_units(cud)

    # Wait for the compute unit to reach a terminal state (DONE or FAILED).
    umgr.wait_units()

    print "* Task %s (executed @ %s) state: %s, exit code: %s, started: %s, " \
          "finished: %s, output file: %s" % \
          (unit.uid, unit.execution_locations, unit.state,
           unit.exit_code,  unit.start_time, unit.stop_time,
           unit.description.output_staging[0]['target'])

    # Close automatically cancels the pilot(s).
    session.close()

# -----------------------------------------------------------------------------

4.3.3. Shared Input Files

This example demonstrates the staging of a shared input file by means of the stage_in() method of the pilot and consequently making that available to all compute units.

Note

Download the example:

curl -O https://raw.githubusercontent.com/radical-cybertools/radical.pilot/readthedocs/examples/io_staging_shared.py

import os
import radical.pilot

SHARED_INPUT_FILE = 'shared_input_file.txt'
MY_STAGING_AREA = 'staging_area'

#------------------------------------------------------------------------------
#
if __name__ == "__main__":

    try:

        # Create shared input file
        os.system('/bin/echo -n "Hello world, " > %s' % SHARED_INPUT_FILE)
        radical_cockpit_occupants = ['Alice', 'Bob', 'Carol', 'Eve']

        # Create per unit input files
        for idx, occ in enumerate(radical_cockpit_occupants):
            input_file = 'input_file-%d.txt' % (idx+1)
            os.system('/bin/echo "%s" > %s' % (occ, input_file))

        # Create a new session. A session is the 'root' object for all other
        # RADICAL-Pilot objects. It encapsulates the MongoDB connection(s) as
        # well as security credentials.
        session = radical.pilot.Session()

        # Add a Pilot Manager. Pilot managers manage one or more ComputePilots.
        pmgr = radical.pilot.PilotManager(session=session)

        # Define a C-core on $RESOURCE that runs for M minutes and
        # uses $HOME/radical.pilot.sandbox as sandbox directory.
        pdesc = radical.pilot.ComputePilotDescription()
        pdesc.resource = "xsede.trestles"
        pdesc.runtime  = 5 # M minutes
        pdesc.cores    = 8 # C cores

        # Launch the pilot.
        pilot = pmgr.submit_pilots(pdesc)

        # Define the url of the local file in the local directory
        shared_input_file_url = 'file://%s/%s' % (os.getcwd(), SHARED_INPUT_FILE)

        staged_file = "~/%s/%s" % (MY_STAGING_AREA, SHARED_INPUT_FILE)
        print "##########################"
        print staged_file
        print "##########################"

        # Configure the staging directive for to insert the shared file into
        # the pilot staging directory.
        sd_pilot = {'source': shared_input_file_url,
                    'target': staged_file,
                    'action': radical.pilot.TRANSFER
        }
        # Synchronously stage the data to the pilot
        pilot.stage_in(sd_pilot)

        # Configure the staging directive for shared input file.
        sd_shared = {'source': staged_file, 
                     'target': SHARED_INPUT_FILE,
                     'action': radical.pilot.LINK
        }

        # Combine the ComputePilot, the ComputeUnits and a scheduler via
        # a UnitManager object.
        umgr = radical.pilot.UnitManager(session, radical.pilot.SCHED_BACKFILLING)

        # Add the previously created ComputePilot to the UnitManager.
        umgr.add_pilots(pilot)

        compute_unit_descs = []

        for unit_idx in range(len(radical_cockpit_occupants)):

            # Configure the per unit input file.
            input_file = 'input_file-%d.txt' % (unit_idx+1)

            # Configure the for per unit output file.
            output_file = 'output_file-%d.txt' % (unit_idx+1)

            # Actual task description.
            # Concatenate the shared input and the task specific input.
            cud = radical.pilot.ComputeUnitDescription()
            cud.executable = '/bin/bash'
            cud.arguments = ['-c', 'cat %s %s > %s' %
                             (SHARED_INPUT_FILE, input_file, output_file)]
            cud.cores = 1
            cud.input_staging = [sd_shared, input_file]
            cud.output_staging = output_file

            compute_unit_descs.append(cud)

        # Submit the previously created ComputeUnit descriptions to the
        # PilotManager. This will trigger the selected scheduler to start
        # assigning ComputeUnits to the ComputePilots.
        units = umgr.submit_units(compute_unit_descs)

        # Wait for all compute units to finish.
        umgr.wait_units()

        for unit in umgr.get_units():

            # Get the stdout and stderr streams of the ComputeUnit.
            print " STDOUT: %s" % unit.stdout
            print " STDERR: %s" % unit.stderr

        session.close()

    except radical.pilot.PilotException, ex:
        print "Error: %s" % ex

4.3.4. Pipeline

This example demonstrates a two-step pipeline that makes use of a remote pilot staging area, where the first step of the pipeline copies the intermediate output into and that is picked up by the second step in the pipeline.

Note

Download the example:

curl -O https://raw.githubusercontent.com/radical-cybertools/radical.pilot/readthedocs/examples/io_staging_pipeline.py

import os
import radical.pilot

INPUT_FILE = 'input_file.txt'
INTERMEDIATE_FILE = 'intermediate_file.txt'
OUTPUT_FILE = 'output_file.txt'

#------------------------------------------------------------------------------
#
if __name__ == "__main__":

    try:
        # Create input file
        radical_cockpit_occupants = ['Carol', 'Eve', 'Alice', 'Bob']
        for occ in radical_cockpit_occupants:
            os.system('/bin/echo "%s" >> %s' % (occ, INPUT_FILE))

        # Create a new session. A session is the 'root' object for all other
        # RADICAL-Pilot objects. It encapsulates the MongoDB connection(s) as
        # well as security credentials.
        session = radical.pilot.Session()

        # Add a Pilot Manager. Pilot managers manage one or more ComputePilots.
        pmgr = radical.pilot.PilotManager(session)

        # Define a C-core on stamped that runs for M minutes and
        # uses $HOME/radical.pilot.sandbox as sandbox directory.
        pdesc = radical.pilot.ComputePilotDescription()
        pdesc.resource = "local.localhost"
        pdesc.runtime = 15 # M minutes
        pdesc.cores = 2 # C cores

        # Launch the pilot.
        pilot = pmgr.submit_pilots(pdesc)

        # Combine the ComputePilot, the ComputeUnits and a scheduler via
        # a UnitManager object.
        umgr = radical.pilot.UnitManager(
            session=session,
            scheduler=radical.pilot.SCHED_DIRECT_SUBMISSION)

        # Add the previously created ComputePilot to the UnitManager.
        umgr.add_pilots(pilot)

        # Configure the staging directive for intermediate data
        sd_inter_out = {
            'source': INTERMEDIATE_FILE,
            # Note the triple slash, because of URL peculiarities
            'target': 'staging:///%s' % INTERMEDIATE_FILE,
            'action': radical.pilot.COPY
        }

        # Task 1: Sort the input file and output to intermediate file
        cud1 = radical.pilot.ComputeUnitDescription()
        cud1.executable = 'sort'
        cud1.arguments = ['-o', INTERMEDIATE_FILE, INPUT_FILE]
        cud1.input_staging = INPUT_FILE
        cud1.output_staging = sd_inter_out

        # Submit the first task for execution.
        umgr.submit_units(cud1)

        # Wait for the compute unit to finish.
        umgr.wait_units()

        # Configure the staging directive for input intermediate data
        sd_inter_in = {
            # Note the triple slash, because of URL peculiarities
            'source': 'staging:///%s' % INTERMEDIATE_FILE,
            'target': INTERMEDIATE_FILE,
            'action': radical.pilot.LINK
        }

        # Task 2: Take the first line of the sort intermediate file and write to output
        cud2 = radical.pilot.ComputeUnitDescription()
        cud2.executable = '/bin/bash'
        cud2.arguments = ['-c', 'head -n1 %s > %s' %
                          (INTERMEDIATE_FILE, OUTPUT_FILE)]
        cud2.input_staging = sd_inter_in
        cud2.output_staging = OUTPUT_FILE

        # Submit the second CU for execution.
        umgr.submit_units(cud2)

        # Wait for the compute unit to finish.
        umgr.wait_units()

        session.close()

    except radical.pilot.PilotException, ex:
        print "Error: %s" % ex