4. Data Staging¶
Note
Currently RADICAL-Pilot only supports data on file abstraction level, so data == files at this moment.
Many, if not all, programs require input data to operate and create output data as a result in some form or shape. RADICAL-Pilot has a set of constructs that allows the user to specify the required staging of input and output files for a Compute Unit.
The primary constructs are on the level of the Compute Unit (Description) which are discussed in the next section. For more elaborate use-cases we also have constructs on the Compute Pilot level, which are discussed later in this chapter.
Note
RADICAL-Pilot uses system calls for local file operations and SAGA for remote transfers and URL specification.
4.1. Compute Unit I/O¶
To instruct RADICAL-Pilot to handle files for you, there are two things to
take care of.
First you need to specify the respective input and output files for the
Compute Unit in so called staging directives.
Additionally you need to associate these staging directives to the the
Compute Unit by means of the input_staging
and output_staging
members.
4.1.1. What it looks like¶
The following code snippet shows this in action:
INPUT_FILE_NAME = "INPUT_FILE.TXT"
OUTPUT_FILE_NAME = "OUTPUT_FILE.TXT"
# This executes: "/usr/bin/sort -o OUTPUT_FILE.TXT INPUT_FILE.TXT"
cud = radical.pilot.ComputeUnitDescription()
cud.executable = "/usr/bin/sort"
cud.arguments = ["-o", OUTPUT_FILE_NAME, INPUT_FILE_NAME]
cud.input_staging = INPUT_FILE_NAME
cud.output_staging = OUTPUT_FILE_NAME
Here the staging directives INPUT_FILE_NAME
and OUTPUT_FILE_NAME
are simple strings that both specify a single filename and are associated to
the Compute Unit Description cud
for input and output respectively.
What this does is that the file INPUT_FILE.TXT is transferred from the local directory to the directory where the task is executed. After the task has run, the file OUTPUT_FILE.TXT that has been created by the task, will be transferred back to the local directory.
The String-Based Input and Output Transfer example demonstrates this in full glory.
4.1.2. Staging Directives¶
The format of the staging directives can either be a string as above or a dict of the following structure:
staging_directive = {
'source': source, # radical.pilot.Url() or string (MANDATORY).
'target': target, # radical.pilot.Url() or string (OPTIONAL).
'action': action, # One of COPY, LINK, MOVE or TRANSFER (OPTIONAL).
'flags': flags, # Zero or more of CREATE_PARENTS or SKIP_FAILED (OPTIONAL).
'priority': priority # A number to instruct ordering (OPTIONAL).
}
The semantics of the keys from the dict are as follows:
source
(default: None) andtarget
(default: os.path.basename(source)):In case of the staging directive being used for input, then the
source
refers to the location to get the input files from, e.g. the local working directory on your laptop or a remote data repository, andtarget
refers to the working directory of the ComputeUnit. Alternatively, in case of the staging directive being used for output, then thesource
refers to the output files being generated by the ComputeUnit in the working directory andtarget
refers to the location where you need to store the output data, e.g. back to your laptop or some remote data repository.
action
(default: TRANSFER):The ultimate goal is to make data available to the application kernel in the ComputeUnit and to be able to make the results available for further use. Depending on the relative location of the working directory of the
source
to thetarget
location, the action can beCOPY
(local resource),LINK
(same file system),MOVE
(local resource), orTRANSFER
(to a remote resource).
flags
(default: [CREATE_PARENTS, SKIP_FAILED]):By passing certain flags we can influence the behavior of the action. Available flags are:
CREATE_PARENTS
: Create parent directories while writing file.SKIP_FAILED
: Don’t stage out files if tasks failed.
In case of multiple values these can be passed as a list.
priority
(default: 0):This optional field can be used to instruct the backend to priority the actions on the
staging directives
. E.g. to first stage the output that is required for immediate further analysis and afterwards some output files that are of secondary concern.
The Dictionary-Based Input and Output Transfer example demonstrates this in full glory.
When the staging directives are specified as a string as we did earlier,
that implies a staging directive where the source
and the target
are equal to the content of the string, the action
is set to the default
action TRANSFER
, the flags
are set to the default flags
CREATE_PARENTS
and SKIP_FAILED
, and the priority
is set to the
default value 0
:
'INPUT_FILE.TXT' == {
'source': 'INPUT_FILE.TXT',
'target': 'INPUT_FILE.TXT',
'action': TRANSFER,
'flags': [CREATE_PARENTS, SKIP_FAILED],
'priority': 0
}
4.1.3. Staging Area¶
As the pilot job creates an abstraction for a computational resource, the user does not necessarily know where the working directory of the Compute Pilot or the Compute Unit is. Even if he knows, the user might not have direct access to it. For this situation we have the staging area, which is a special construct so that the user can specify files relative to or in the working directory without knowing the exact location. This can be done using the following URL format:
'staging:///INPUT_FILE.TXT'
The Pipeline example demonstrates this in full glory.
4.2. Compute Pilot I/O¶
As mentioned earlier, in addition to the constructs on Compute Unit-level RADICAL-Pilot also has constructs on Compute Pilot-level. The main rationale for this is that often there is (input) data to be shared between multiple Compute Units. Instead of transferring the same files for every Compute Unit, we can transfer the data once to the Pilot, and then make it available to every Compute Unit that needs it.
This works in a similar way as the Compute Unit-IO, where we use also use
the Staging Directive to specify the I/O transaction
The difference is that in this case, the Staging Directive is not associated
to the Description, but used in a direct method call pilot.stage_in(sd_pilot)
.
# Configure the staging directive for to insert the shared file into
# the pilot staging directory.
sd_pilot = {'source': shared_input_file_url,
'target': os.path.join(MY_STAGING_AREA, SHARED_INPUT_FILE),
'action': radical.pilot.TRANSFER
}
# Synchronously stage the data to the pilot
pilot.stage_in(sd_pilot)
The Shared Input Files example demonstrates this in full glory.
Note
The call to stage_in()
is synchronous and will return once the
transfer is complete.
4.3. Examples¶
Note
All of the following examples are configured to run on localhost, but they can be easily changed to run on a remote resource by modifying the resource specification in the Compute Pilot Description. Also note the comments in Staging Area when changing the examples to a remote target.
These examples require an installation of RADICAL-Pilot of course. There are download links for each of the examples.
4.3.1. String-Based Input and Output Transfer¶
This example demonstrates the simplest form of the data staging capabilities. The example demonstrates how a local input file is staged through RADICAL-Pilot, processed by the Compute Unit and the resulting output file is staged back to the local environment.
Note
Download the example:
curl -O https://raw.githubusercontent.com/radical-cybertools/radical.pilot/readthedocs/examples/io_staging_simple.py
import os
import sys
import radical.pilot as rp
# READ: The RADICAL-Pilot documentation:
# http://radicalpilot.readthedocs.org/en/latest
#
# Try running this example with RADICAL_PILOT_VERBOSE=debug set if
# you want to see what happens behind the scenes!
#------------------------------------------------------------------------------
#
def pilot_state_cb (pilot, state) :
""" this callback is invoked on all pilot state changes """
print "[Callback]: ComputePilot '%s' state: %s." % (pilot.uid, state)
if state == rp.FAILED :
sys.exit (1)
#------------------------------------------------------------------------------
#
def unit_state_cb (unit, state) :
""" this callback is invoked on all unit state changes """
print "[Callback]: ComputeUnit '%s' state: %s." % (unit.uid, state)
if state == rp.FAILED :
sys.exit (1)
#------------------------------------------------------------------------------
#
if __name__ == "__main__":
# Create a new session. A session is the 'root' object for all other
# RADICAL-Pilot objects. It encapsulates the MongoDB connection(s) as
# well as security credentials.
session = rp.Session()
# Add a Pilot Manager. Pilot managers manage one or more ComputePilots.
pmgr = rp.PilotManager(session=session)
# Register our callback with the PilotManager. This callback will get
# called every time any of the pilots managed by the PilotManager
# change their state.
pmgr.register_callback(pilot_state_cb)
# Define a single-core local pilot that runs for 5 minutes and cleans up
# after itself.
pdesc = rp.ComputePilotDescription()
pdesc.resource = "local.localhost"
pdesc.cores = 1
pdesc.runtime = 5 # Minutes
#pdesc.cleanup = True
# Launch the pilot.
pilot = pmgr.submit_pilots(pdesc)
# Create a Compute Unit that sorts the local password file and writes the
# output to result.dat.
#
# The exact command that is executed by the agent is:
# "/usr/bin/sort -o result.dat passwd"
#
cud = rp.ComputeUnitDescription()
cud.executable = "/usr/bin/sort"
cud.arguments = ["-o", "result.dat", "passwd"]
cud.input_staging = "/etc/passwd"
cud.output_staging = "result.dat"
# Combine the ComputePilot, the ComputeUnits and a scheduler via
# a UnitManager object.
umgr = rp.UnitManager(session, rp.SCHED_DIRECT_SUBMISSION)
# Register our callback with the UnitManager. This callback will get
# called every time any of the units managed by the UnitManager
# change their state.
umgr.register_callback(unit_state_cb)
# Add the previously created ComputePilot to the UnitManager.
umgr.add_pilots(pilot)
# Submit the previously created ComputeUnit description to the
# PilotManager. This will trigger the selected scheduler to start
# assigning the ComputeUnit to the ComputePilot.
unit = umgr.submit_units(cud)
# Wait for the compute unit to reach a terminal state (DONE or FAILED).
umgr.wait_units()
print "* Task %s (executed @ %s) state: %s, exit code: %s, started: %s, " \
"finished: %s, output file: %s" % \
(unit.uid, unit.execution_locations, unit.state,
unit.exit_code, unit.start_time, unit.stop_time,
unit.description.output_staging[0]['target'])
# Close automatically cancels the pilot(s).
session.close()
# -----------------------------------------------------------------------------
4.3.2. Dictionary-Based Input and Output Transfer¶
This example demonstrates the use of the staging directives structure to have more control over the staging behavior. The flow of the example is similar to that of the previous example, but here we show that by using the dict-based Staging Directive, one can specify different names and paths for the local and remote files, a feature that is often required in real-world applications.
Note
Download the example:
curl -O https://raw.githubusercontent.com/radical-cybertools/radical.pilot/readthedocs/examples/io_staging_dict.py
import os
import sys
import radical.pilot as rp
# READ: The RADICAL-Pilot documentation:
# http://radicalpilot.readthedocs.org/en/latest
#
# Try running this example with RADICAL_PILOT_VERBOSE=debug set if
# you want to see what happens behind the scenes!
#------------------------------------------------------------------------------
#
def pilot_state_cb (pilot, state) :
""" this callback is invoked on all pilot state changes """
print "[Callback]: ComputePilot '%s' state: %s." % (pilot.uid, state)
if state == rp.FAILED :
sys.exit (1)
#------------------------------------------------------------------------------
#
def unit_state_cb (unit, state) :
""" this callback is invoked on all unit state changes """
print "[Callback]: ComputeUnit '%s' state: %s." % (unit.uid, state)
if state == rp.FAILED :
sys.exit (1)
#------------------------------------------------------------------------------
#
if __name__ == "__main__":
# Create a new session. A session is the 'root' object for all other
# RADICAL-Pilot objects. It encapsulates the MongoDB connection(s) as
# well as security credentials.
session = rp.Session()
# Add a Pilot Manager. Pilot managers manage one or more ComputePilots.
pmgr = rp.PilotManager(session=session)
# Register our callback with the PilotManager. This callback will get
# called every time any of the pilots managed by the PilotManager
# change their state.
pmgr.register_callback(pilot_state_cb)
# Define a single-core local pilot that runs for 5 minutes and cleans up
# after itself.
pdesc = rp.ComputePilotDescription()
pdesc.resource = "local.localhost"
pdesc.cores = 8
pdesc.runtime = 5 # Minutes
#pdesc.cleanup = True
# Launch the pilot.
pilot = pmgr.submit_pilots(pdesc)
input_sd = {
'source': '/etc/passwd',
'target': 'input.dat'
}
output_sd = {
'source': 'result.dat',
'target': '/tmp/result.dat'
}
# Create a Compute Unit that sorts the local password file and writes the
# output to result.dat.
#
# The exact command that is executed by the agent is:
# "/usr/bin/sort -o result.dat input.dat"
#
cud = rp.ComputeUnitDescription()
cud.executable = "sort"
cud.arguments = ["-o", "result.dat", "input.dat"]
cud.input_staging = input_sd
cud.output_staging = output_sd
# Combine the ComputePilot, the ComputeUnits and a scheduler via
# a UnitManager object.
umgr = rp.UnitManager(session, rp.SCHED_DIRECT_SUBMISSION)
# Register our callback with the UnitManager. This callback will get
# called every time any of the units managed by the UnitManager
# change their state.
umgr.register_callback(unit_state_cb)
# Add the previously created ComputePilot to the UnitManager.
umgr.add_pilots(pilot)
# Submit the previously created ComputeUnit description to the
# PilotManager. This will trigger the selected scheduler to start
# assigning the ComputeUnit to the ComputePilot.
unit = umgr.submit_units(cud)
# Wait for the compute unit to reach a terminal state (DONE or FAILED).
umgr.wait_units()
print "* Task %s (executed @ %s) state: %s, exit code: %s, started: %s, " \
"finished: %s, output file: %s" % \
(unit.uid, unit.execution_locations, unit.state,
unit.exit_code, unit.start_time, unit.stop_time,
unit.description.output_staging[0]['target'])
# Close automatically cancels the pilot(s).
session.close()
# -----------------------------------------------------------------------------
4.3.4. Pipeline¶
This example demonstrates a two-step pipeline that makes use of a remote pilot staging area, where the first step of the pipeline copies the intermediate output into and that is picked up by the second step in the pipeline.
Note
Download the example:
curl -O https://raw.githubusercontent.com/radical-cybertools/radical.pilot/readthedocs/examples/io_staging_pipeline.py
import os
import radical.pilot
INPUT_FILE = 'input_file.txt'
INTERMEDIATE_FILE = 'intermediate_file.txt'
OUTPUT_FILE = 'output_file.txt'
#------------------------------------------------------------------------------
#
if __name__ == "__main__":
try:
# Create input file
radical_cockpit_occupants = ['Carol', 'Eve', 'Alice', 'Bob']
for occ in radical_cockpit_occupants:
os.system('/bin/echo "%s" >> %s' % (occ, INPUT_FILE))
# Create a new session. A session is the 'root' object for all other
# RADICAL-Pilot objects. It encapsulates the MongoDB connection(s) as
# well as security credentials.
session = radical.pilot.Session()
# Add a Pilot Manager. Pilot managers manage one or more ComputePilots.
pmgr = radical.pilot.PilotManager(session)
# Define a C-core on stamped that runs for M minutes and
# uses $HOME/radical.pilot.sandbox as sandbox directory.
pdesc = radical.pilot.ComputePilotDescription()
pdesc.resource = "local.localhost"
pdesc.runtime = 15 # M minutes
pdesc.cores = 2 # C cores
# Launch the pilot.
pilot = pmgr.submit_pilots(pdesc)
# Combine the ComputePilot, the ComputeUnits and a scheduler via
# a UnitManager object.
umgr = radical.pilot.UnitManager(
session=session,
scheduler=radical.pilot.SCHED_DIRECT_SUBMISSION)
# Add the previously created ComputePilot to the UnitManager.
umgr.add_pilots(pilot)
# Configure the staging directive for intermediate data
sd_inter_out = {
'source': INTERMEDIATE_FILE,
# Note the triple slash, because of URL peculiarities
'target': 'staging:///%s' % INTERMEDIATE_FILE,
'action': radical.pilot.COPY
}
# Task 1: Sort the input file and output to intermediate file
cud1 = radical.pilot.ComputeUnitDescription()
cud1.executable = 'sort'
cud1.arguments = ['-o', INTERMEDIATE_FILE, INPUT_FILE]
cud1.input_staging = INPUT_FILE
cud1.output_staging = sd_inter_out
# Submit the first task for execution.
umgr.submit_units(cud1)
# Wait for the compute unit to finish.
umgr.wait_units()
# Configure the staging directive for input intermediate data
sd_inter_in = {
# Note the triple slash, because of URL peculiarities
'source': 'staging:///%s' % INTERMEDIATE_FILE,
'target': INTERMEDIATE_FILE,
'action': radical.pilot.LINK
}
# Task 2: Take the first line of the sort intermediate file and write to output
cud2 = radical.pilot.ComputeUnitDescription()
cud2.executable = '/bin/bash'
cud2.arguments = ['-c', 'head -n1 %s > %s' %
(INTERMEDIATE_FILE, OUTPUT_FILE)]
cud2.input_staging = sd_inter_in
cud2.output_staging = OUTPUT_FILE
# Submit the second CU for execution.
umgr.submit_units(cud2)
# Wait for the compute unit to finish.
umgr.wait_units()
session.close()
except radical.pilot.PilotException, ex:
print "Error: %s" % ex