Skip to main content

Notice: This Wiki is now read only and edits are no longer possible. Please see: https://gitlab.eclipse.org/eclipsefdn/helpdesk/-/wikis/Wiki-shutdown-plan for the plan.

Jump to: navigation, search

SMILA/Documentation/TaskGenerators

Task Generators

The jobmanager workflow processing is based on the processing of tasks by the workers. Tasks for a worker are generated based on changes in its input bucket(s).

In a simple workflow step, e.g. with pipelineProcessor worker, a worker has exactly one input bucket. When a bulk is put in the input bucket after a former workflow step finishes, we want exactly one (follow up) task to be generated referencing this bulk. For some workers the default behaviour is not sufficient. So we use an optional taskGenerator attribute in the worker definition referencing a task generator that is used to create the follow up tasks. If no taskGenerator is specified in a worker definition, the default taskgenerator is used (see below).

Design

TaskGenerator configuration parameters

If a task generator needs configuration parameters, these parameters have to be defined as (configuration) parameters by the appropriate worker.

TaskGenerator as OSGi service

The general idea is to define the different kinds of TaskGenerators as OSGi services. A worker can optionally reference a TaskGenerator service by name. If no TaskGenerator is specified, the DefaultTaskGenerator service is used.

If a special TaskGenerator needs access to other OSGi services this can be easily handled by OSGi service configuration.

At the moment, there's no need to let users define their own task generators. We will provide the needed task generators by software (-> similar to Workers and Data Object Types). But with the concept of task generators as OSGi service, there could be a future extension to allow users to create their own TaskGenerator services if needed.

Interface

As a TaskGenerator, what information do we need to create the follow-up tasks for a specific (follow-up) action/worker:

  • changed input bulks of the action, containing the (added) bulks that triggered the task generation
  • all input buckets of the action (may be needed to combine bucket objects of other input buckets with the changed input bucket objects)
  • all output buckets of the action, to create data object IDs to put them in the new generated tasks
  • parameters (for task generator configuration and for adding to the new generated tasks)
  • worker name
interface TaskGenerator  {
 
  String getName();
 
  List<Task> createTasks(final Map<String, List<BulkInfo>> changedInput, final Map<String, Bucket> inputBuckets,
    final Map<String, Bucket> outputBuckets, final Map<String, String> parameters, final String workerName)
    throws TaskGeneratorException;
 
  List<Task> createRunOnceTasks(final Map<String, Bucket> inputBuckets, final Map<String, Bucket> outputBuckets,
    final Map<String, String> parameters, final String workerName) throws TaskGeneratorException;
 
  void finishTask(final Task task, final TaskCompletionStatus status) throws TaskGeneratorException;
}
</tt>

Implementations

These Task Generators are already contained in bundle org.eclipse.smila.jobmanager.

DefaultTaskGenerator

This service is used either if there's no taskGenerator attribute in the worker definition, or if the following is set:

"taskGenerator": "default"

If used for a worker without input slots, it generates exactly one task with one output bulk for each output slot. For each output bulk a different ${_uuid} value is generated.

If used for a worker with input slots, it generates one task for each input bulk, regardless if the input bulks are in one or different slots. Each task contains only one input bulk and one output bulk for each output slot. The generator tries to extract the ${_uuid} value from the respective input bulk's name and reuse it for all respective output bulks. If no ${_uuid} value can be extracted from an input bulk, the output bulks in this task use each an own ${_uuid} value.

In runOnce mode the generator creates one task per existing object in the one allowed input bucket. Details are the same as for follow-up tasks.

RunOnceTriggerTaskGenerator

This is a simple specialisation of the default taskgenerator that allows to start jobs in runOnce mode when the first worker has only transient (and therefore usually empty) input buckets. It is selected in the worker description by setting:

"taskGenerator": "runOnceTrigger"

For follow-up tasks it behaves exactly like the default task generator.

If called to start a runOnce job it simply creates a single task without input bulks.

This task generator has been introduced for crawler workers that need to be initially started without any input data being available. I.e. the crawler should crawl a data source and the start point (root directory, seed URL) is specified in the data source configuration or as a job parameter. Then a first task needs to be created that does not process input bulks but just starts the crawl at the configured start point.

CombineInputWithAllTaskGenerator

TaskGenerator for two input buckets that generates tasks for each unique(see example-2 below) combination of:

[changed input bulk of first bucket X (all) object(s) from second input bucket]

If the second input bucket is empty, no tasks will be generated.

This service is used if the following taskgenerator attribute value is set in the worker definition:

"taskGenerator": "combine"

__Examples__: Assume we have a DeleteWorker action that has as input a partitionBucket with index partition objects and a deleteBucket with delete objects (objects = bulks)

  • current index partitions: P1, P2, P3
  • current delete objects: D1, D2

A new delete bulk must be applied to all existing index partitions, and also all existing delete bulks must be applied to a new index partition.

Example 1: Input change for the task generator is a new delete object D3 in the deleteBucket

  • we will combine the changed input D3 with all objects from the other input bucket (partitionBucket)
  • so we create three new tasks: T1(input: D3,P1), T2(D3,P2), T3(D3,P3)

Example 2: Input change for the task generator is a new delete object D3 in the deleteBucket and a new partition object P4 in the partitionBucket

  • we will combine the changed input D3 with all objects from the partitionBucket, and the changed input P4 with all objects from the deleteBucket
  • so we create new tasks: T1(D3,P1), T2(D3,P2), T3(D3,P3), T4(D3,P4) and T5(P4,D1), T6(P4,D2), T7(P4,D3)
  • T3 and T7 are equals so we eliminate T7 and deliver six new tasks

The generator can set additional optional parameters on the created tasks:

  • _triggeredBy: <slotname>: The task was created because the object in the named slot is new. This is not set for the single task that contains both changed input bulks in case both input slots had changes.
  • _partner.<slotname>: <objectid>: If both input slots had changed bulks, this parameters contains the object id of the first changed object in the other slot, i.e. the one not named in the _triggeredBy parameter.

The purpose of this is to allow the worker to check dependencies between the new objects.

Back to the top