SQL/XML Dumps/Command management walkthrough
A speedy tour of the low level command classes
editSo you want to know all about the cruft in commandmanagement.py! Yeah, I didn’t think so. But let’s get started anyways.
Here’s some sample code that runs a pipeline of commands (think: zcat | grep | awk ) and later runs a series of commands (think: grep this | wc -l; grep that | wc -l )
The relevant classes are CommandPipeline ([1]) and CommandSeries ([2]), so with that in mind, let’s look at the code.
Sample code
editSome of the code to run the command series was taken from ProcessMonitor ([3]), because we pretty much never run a command series directly. Instead they are run by making a list of them, even a list with only one command series in it, and letting CommandsInParallel ([4]) handle it.
#!/usr/bin/python3
'''
sample methods illustrating the use and innards of some
classes from the command_management module
'''
import fcntl
import select
import sys
import os
from dumps.commandmanagement import CommandPipeline, CommandSeries
def usage():
'''display a short help message about the use of this script'''
sys.stderr.write("Usage: sample_commands.py <path-to-python-dumps-repo>\n")
sys.exit(1)
running one pipeline of commands
edit
def run_pipeline(pipeline, quiet=False):
'''
run a pipeline of commands that produces output and display
that output if there is any
'''
proc = CommandPipeline(pipeline, quiet=quiet)
# This method expects output to be small; don't use this for any commands
# that may produce megabytes of output
proc.run_pipeline_get_output()
return_ok = bool(proc.exited_successfully())
if not return_ok:
print("Some commands failed:", proc.get_failed_cmds_with_retcode())
else:
output = proc.output()
if output:
# output returned is in bytes; python supports bytes and unicode strings
output = output.decode("utf-8").rstrip()
print("Result:", output)
waiting for command output or completion
edit
def setup_polling_for_process(proc):
'''
set up a poller for stdout, stderr for the specified process
'''
# this code is simplified from that in ProcessMonitor in the command_management module
poller = select.poll()
poller.register(proc.stderr, select.POLLIN | select.POLLPRI)
fderr = proc.stderr.fileno()
flerr = fcntl.fcntl(fderr, fcntl.F_GETFL)
fcntl.fcntl(fderr, fcntl.F_SETFL, flerr | os.O_NONBLOCK)
if proc.stdout:
poller.register(proc.stdout, select.POLLIN | select.POLLPRI)
fdout = proc.stdout.fileno()
flout = fcntl.fcntl(fdout, fcntl.F_GETFL)
fcntl.fcntl(fdout, fcntl.F_SETFL, flout | os.O_NONBLOCK)
return poller
def handle_events(poller, waiting, series, proc, quiet):
'''
given a list of poll events, read from the appropriate
file descriptors and return the accumulated stdout and
error output, if any
'''
# this code is simplified from that in ProcessMonitor in the command_management module
output = ""
error_out = ""
command_completed = False
for (filed, event) in waiting:
series.in_progress_pipeline().set_poll_state(event)
# check_poll_ready_for_read checks if the event, which we have
# stashed in an attribute, has one of the flags
# select.POLLIN or select.POLLPRI set
if series.in_progress_pipeline().check_poll_ready_for_read():
out = os.read(filed, 1024)
if out:
if filed == proc.stderr.fileno():
error_out = error_out + out.decode("utf-8")
elif filed == proc.stdout.fileno():
output = output + out.decode("utf-8")
else:
# possible eof? what would cause this?
pass
# check_for_poll_errors checks if the stashed event has one of
# the flags select.POLLHUP, select.POLLNVAL or select.POLLERR set
elif series.in_progress_pipeline().check_for_poll_errors():
poller.unregister(filed)
# Note: if the fd closed prematurely and the proc then runs for hours to
# completion, we will get no updates here.
proc.wait()
if not quiet:
print("returned from {pid} with {retcode}".format(
pid=proc.pid, retcode=proc.returncode))
command_completed = True
return output, error_out, command_completed
running a series of command pipelines, getting the output
edit
def get_series_output(series, quiet=False):
'''
run the pipelines in a command series, capture and display the
output if any, along with any errors
'''
# is there some process running that might produce output from
# one of the pipelines? remember we only run one pipeline at a
# time, and the series object knows which pipeline is running at
# any given time, and which process is at the end of the pipeline
# to produce output
while series.process_producing_output():
proc = series.process_producing_output()
# we need to be able to check when there is output to stdout
# or stderr from the process, and capture it. we accumulate
# all errors into one string and all output into another,
# making the assumption that there are not megabytes of either.
poller = setup_polling_for_process(proc)
command_completed = False
# this code is simplified from that in ProcessMonitor in the
# command_management module, and it has been split up into the
# smaller methods handle_events and setup_polling_for_process.
output = ""
error_out = ""
while not command_completed:
# time is in milliseconds
waiting = poller.poll(500)
if waiting:
new_out, new_err, command_completed = handle_events(
poller, waiting, series, proc, quiet)
if new_out:
output += new_out
if new_err:
error_out += new_err
if output:
print("Result:", output)
if error_out:
print("Errors:", error_out)
# run next command in series, if any
# this checks to be sure that the current pipeline's last command
# is indeed complete, before starting the next pipeline;
# it calls check_for_poll_errors() (it expects to see one)
# and check_poll_ready_for_read() (it expects this to be false)
series.continue_commands()
def run_series(series, quiet=False, shell=False):
'''
run a command series the pipelines of which may or may not
produce output, and display the output from each, if any
'''
procs = CommandSeries(series, quiet, shell)
procs.start_commands()
get_series_output(procs, quiet)
if not procs.exited_successfully():
print("Some commands failed:", procs.pipelines_with_errors())
putting it all together
editWe use the repo path and files in the repo to grep for text we know is in there.
def do_main():
'''
entry point
'''
if len(sys.argv) != 2:
usage()
repo_path = sys.argv[1]
print("")
# This is a command pipeline: two or more commands as lists, the
# output of each to be piped to the next.
# Note that the full path of each command (grep, wc) is given.
pipeline = [["/bin/grep", "command", os.path.join(
repo_path, "dumps/commandmanagement.py")],
["/usr/bin/wc", "-l"]]
print("Running pipeline with default args:")
print("----------------------")
run_pipeline(pipeline)
print("")
print("Running pipeline with quiet:")
print("----------------------")
run_pipeline(pipeline, quiet=True)
pipeline_one = [["/bin/grep", "command", os.path.join(
repo_path, "dumps/commandmanagement.py")],
["/usr/bin/wc", "-l"]]
pipeline_two = [["/bin/grep", "command", os.path.join(
repo_path, "dumps/commandmanagement.py")],
["/usr/bin/wc", "-c"]]
# This is a command series: two or more pipelines which are run
# one after the other, waiting for one to complete before the
# next is started.
series = [pipeline_one, pipeline_two]
print("\n=====================\n")
print("Running series with default args:")
print("----------------------")
run_series(series)
print("")
print("Running series with quiet:")
print("----------------------")
run_series(series, quiet=True)
if __name__ == '__main__':
do_main()
Sample output
editPlace the module ([5]) in the subdirectory xmldumps-backup of the python dumps repo, and run it with no args to see how you should run it. TL;DR: pass the full path of the dumps repo as the only arg.
[ariel@bigtrouble xmldumps-backup]$ python sample_commands.py /home/ariel/dumps/xmldumps-backup Running pipeline with default args: ---------------------- command /bin/grep command /home/ariel/dumps/xmldumps-backup/dumps/commandmanagement.py (260851) started... command /usr/bin/wc -l (260852) started... Result: 131 Running pipeline with quiet: ---------------------- Result: 131 ===================== Running series with default args: ---------------------- command /bin/grep command /home/ariel/dumps/xmldumps-backup/dumps/commandmanagement.py (260855) started... command /usr/bin/wc -l (260856) started... returned from 260856 with 0 Result: 131 returned from 260856 with 0 returned from 260856 with 0 command /bin/grep command /home/ariel/dumps/xmldumps-backup/dumps/commandmanagement.py (260857) started... command /usr/bin/wc -c (260858) started... returned from 260858 with 0 Result: 6989 returned from 260858 with 0 returned from 260858 with 0 Running series with quiet: ---------------------- Result: 131 Result: 6989