PostProcessors

Ganga can be instructed to do many things after a job completes. Each object can be added to the postprocessors field of the Job object and they will be carried out in order. The available Post-Processing options are detailed below:

Try it out

When using the prime factorisation example from the Tutorial plugin (Tutorial Plugin) it was not satisfactory that the individual prime factors were distributed over different files. A simple TextMerger can collate the numbers into a single file.

j = Job(application = PrimeFactorizer(number=268709474635016474894472456), \
        inputdata = PrimeTableDataset(table_id_lower=1, table_id_upper=30), \
        splitter = PrimeFactorizerSplitter(numsubjobs=10), \
        postprocessors = TextMerger(files=['factors.dat']))

When the job has finished, there will now be a single file that we can look at

j.peek('factors.dat')

See below for how a CustomMerger could be used to provide a more unified output.

Mergers

A merger is an object which will merge files from each subjobs and place it the master job output folder. The method to merge depends on the type of merger object (or file type). For example, if each subjob produces a root file ‘thesis_data.root’ and you would like this to be merged you can attach a RootMerger object to your job:

j.postprocessors.append(RootMerger(files = ['thesis_data.root'],ignorefailed = True,overwrite = True))

When the job is finished this merger object will then merge the root files and place them in j.outputdir. The ignorefailed flag toggles whether the merge can proceed if a subjob has failed. The overwrite flag toggles whether to overwrite the output if it already exists. If a merger fails to merge, then the merger will fail the job and subsequent postprocessors will not run. Also, be aware that the merger will only run if the files are available locally, Ganga won’t automatically download them for you (unless you use Tasks) to avoid running out of local space. You can always run the mergers separately though:

j.postprocessors[0].merge()

There are several mergers available:

TextMerger

TextMerger(compress = True)

Used for merging .txt, .log, etc. In addition to the normal attributes, you can also choose to compress the output with

RootMerger

TextMerger(compress = True)

Used for root files. In addition to the normal attributes, you can also pass additional arguments to hadd.

CustomMerger

A custom merger where you can define your own merge function. For this merger to work you must supply a path to a python module which carries out the merge with

CustomMerger().module = '~/mymerger.py'

In mymerger.py you must define a function called mergefiles(file_list,output_file), e.g:

import os
def mergefiles(file_list,output_file):
      f_out = file(output_file,'w')
      for f in file_list:
            f_in = file(f)
            f_out.write(f_in.read())
            f_in.close()
      f_out.flush()
      f_out.close()

This function would mimic the TextMerger, but with more control to the user. Note that the overwrite and ignorefailed flags will still work here as a normal merger object.

SmartMerger

The final merger object which can be used is the SmartMerger(), which will choose a merger object based on the output file extension. It supports different file types. For example the following SmartMerger would use a RootMerger for ‘thesis_data.root’ and a TextMerger for ‘stdout’.

SmartMerger(files = ['thesis_data.root','stdout'],overwrite = True)

Note that:

j.postprocessors.append(SmartMerger(files = ['thesis_data.root','stdout'],overwrite = True))

is equivalent to doing:

j.postprocessors.append(TextMerger(files = ['stdout'],overwrite = True))
j.postprocessors.append(RootMerger(files = ['thesis_data.root'],overwrite = False))

However in the second instance you gain more control as you have access to the Root/TextMerger specific attributes, but at the expense of more code. Choose which objects work best for you.

Checkers

A checker is an object which will fail otherwise completed jobs based on certain conditions. However, if a checker is misconfigured the default is to do nothing (pass the job), this is different to the merger. Currently there are three Checkers:

FileChecker

Checks the list of output files and fails job if a particular string is found (or not found). For example, you could do:

fc = FileChecker(files = ['stdout'], searchStrings = ['Segmentation'])

You can also enforce that your file must exist, by setting filesMustExists to True:

fc.filesMustExist = True

If a job does not produce a stdout file, the checker will fail the job. This FileChecker will look in your stdout file and grep the file for the string ‘Segmentation’. If it finds it, the job will fail. If you want to fail the job a string doesn’t exist, then you can do:

fc.searchStrings = ['SUCCESS']
fc.failIfFound = False

In this case the FileChecker will fail the job if the string ‘SUCCESS’ is not found.

RootFileChecker

This checks that all your ROOT files are closed properly and have nonzero size. Also checks the merging procedure worked properly. Adding a RootFileChecker to your job will add some protection against hadd failures, and ensure that your ROOT files are mergable. If you do:

This checker will check that each ROOT file has non-zero file size and is not a zombie. If you also have a merger, it will check the output from hadd, ensure that the sum of the subjob entries is the same as the master job entries, and check that each ROOT file has the same file structure. RootFileChecker inherits from FileChecker so you can also ensure that the ROOT files must exist.

CustomChecker

This is probably the most useful checker and allows the user to use private python code to decide if a job should fail or not. The CustomChecker will execute your script and fail the job based on the output. For example, you can make a checker in your home directory called mychecker.py. In this file you must define a function called check(j), which takes in your job as input and returns True (pass) or False (fail)

import os

def check(j):
    outputfile = os.path.join(j.outputdir,'thesis_data.root')
    return os.path.exists(outputfile)

Then in ganga do:

cc = CustomChecker(module = '~/mychecker.py')

This checker will then fail jobs which don’t produce a ‘thesis_data.root’ root file.

Notifier

The notifier is an object which will email you about your jobs upon completion. The default behaviour is to email when master jobs have finished and when subjobs have failed. Emails are not sent upon failure if the auto-resubmit feature is used. Important note: Emails will only be sent when ganga is running, and so the Notifier is only useful if you have ganga running in the background (e.g. screen session, GangaService). To make a notifier, just do something like:

n = Notifier(address = 'myaddress.cern.ch')

If you want emails about every subjob, do

n = Notifier(address = 'myaddress.cern.ch')

Management of post processors with your job

You can add multiple post processors to a Job and Ganga will order them to some degree. Mergers appear first, then checkers, then finally the notifier. It will preserve the order within each class though (e.g. The ordering of the #checkers is defined by the user). To add some postprocessors to your job, you can do something like

tm = TextMerger(files=['stdout'], compress=True)
rm = RootMerger(files=['thesis_data.root'], args='-f6')
fc = FileChecker(files=['stdout'], searchStrings=['Segmentation'])
cc = CustomChecker(module='~/mychecker.py')
n = Notifier(address='myadress.cern.ch')

j.postprocessors = [tm, rm, fc, cc, n]

or:

j.postprocessors.append(fc)
j.postprocessors.append(tm)
j.postprocessors.append(rm)
j.postprocessors.append(cc)
j.postprocessors.append(n)

You can also remove postprocessors:

In [21]:j.postprocessors
Out[21]: [SmartMerger (
 files = [] ,
 ignorefailed = False ,
 overwrite = False
 ), FileChecker (
 files = [] ,
 checkSubjobs = False ,
 searchStrings = [] ,
 failIfFound = True
 ), Notifier (
 verbose = False ,
 address = ''
 )]

In [22]:j.postprocessors.remove(FileChecker())

In [23]:j.postprocessors
Out[23]: [SmartMerger (
 files = [] ,
 ignorefailed = False ,
 overwrite = False
 ), Notifier (
 verbose = False ,
 address = ''
 )]