Skip to main content

Notice: This Wiki is now read only and edits are no longer possible. Please see: https://gitlab.eclipse.org/eclipsefdn/helpdesk/-/wikis/Wiki-shutdown-plan for the plan.

Jump to: navigation, search

SMILA/Documentation/Worker/PipeletProcessorWorker

PipeletProcessorWorker (bundle org.eclipse.smila.processing.worker)

The PipeletProcessorWorker is a worker designed to execute a single pipelet directly, without pipeline overhead.

JavaDoc

This page gives only a rough overview of the service. Please refer to the JavaDoc for detailed information about the Java components.

Configuration

The PipeletProcessorWorker is configured via incoming task parameters. These parameters could have been set e.g. in a job definition.

Parameter Description Default value
pipeletName Name of the pipelet to execute ---
keepAttachmentsInMemory By default, attachments on processed records are kept in memory. If you don't have much memory or very large attachments it may be useful to set this parameter to false and have a BinaryStorage service activated. Then attachments are stored in BinaryStorage while processing and less memory is used. However, processing will probably be slower in this case. After processing the attachments will be removed from BinaryStorage again. If no BinaryStorage service is active, all attachments will be kept in memory and this parameter will not have any effect. true
writeAttachmentsToOutput By default, attachments on incoming records are also added to the output records (if any are written). If this parameter is set to false, only record metadata is written to the output bulk. This can save a lot of IO if attachments are not needed anymore in the workflow after this worker. true

Sample job definition that sets the parameters:

{
  "name":"myJob",
  "parameters":{
    "pipeletName": "MySamplePipelet",
    ...
   },
  "workflow":"myWorkflow"
}

PipeletProcessorWorker definition in workers.json

GET /smila/jobmanager/workers/pipeletProcessor/

HTTP/1.x 200 OK

{
  "name" : "pipeletProcessor",
  "readOnly" : true,
  "parameters" : [ 
    {
     "name" : "pipeletName"
    },
    {
     "name" : "<pipeletParameters>",
     "optional" : true,
     "type" : "any"
    }
   ],
  "input" : [ {
    "name" : "input",
    "type" : "recordBulks"
  } ],
  "output" : [ {
    "name" : "output",
    "type" : "recordBulks",
    "modes" : [ "optional" ]
  } ]
}

The output bucket of the worker is optional, hence in an asynchronous workflow the worker does not need to have a successor. If the output bucket is not defined, the result records of the pipeline processing are not persisted to a bulk, but thrown away. This makes sense if the pipelet stores the records somewhere itself, e.g. adds them to an index.

Access task parameters in pipelets

As the worker creates a new pipelet instance for each processed task, it uses all task parameters as the configuration object passed to the configure() method of the pipelet. It does not add them as _parameters to the processed records as the PipelineProcessorWorker does. Thus the parameters can contain both initialization parameters and runtime parameters, because most pipelets use the configuration as a fallback when getting runtime parameters.

If the internal parameter _failOnError was not set before, the worker will set the parameter to "false". This means that the called pipelet should continue processing records and not stop when processing defect records. The pipelet itself must implement this behavior. How to achieve this is explained in How to write a Pipelet.

Error handling

The following errors may occur when a task for the PipeletProcessorWorker is processed:

  • Pipelet parameter missing or invalid parameter
    • If the given pipelet parameter is not set (or invalid) the task will fail with a non-recoverable error.
  • ProcessingException while processing a bunch of parallel records.
    • Recoverable ProcessingException: The current task will fail with a recoverable error, so the whole task (with all records) will be repeated.
    • Non-recoverable ProcessingException: An error will be logged and the worker will continue with the next bunch of records. The records of the current bunch will be lost. (This is implemented in a way as to not fail the whole task with all its input records in case of a single record defect.)

Back to the top