Workers

Extract

extract.kingfisher_process

python -m workers.extract.kingfisher_process

Extract collections from Kingfisher Process.

  1. Receive a message

  2. Select data IDs for the matching collection in Kingfisher Process

  3. Create the dataset

  4. Update the dataset’s metadata

  5. Acknowledge the message (point-of-no-return)

  6. Select compiled releases from Kingfisher Process, in batches

  7. Insert compiled releases into the data_item table

  8. Initializes each item’s state as in-progress

  9. Publishes a message with batches of item IDs

Note

Some field-level checks use external codelists. These are cached in-memory by each worker for 1 day.

extract.dataset_filter

python -m workers.extract.dataset_filter

Create filtered datasets.

The worker will ignore a message if the dataset is not in the CHECKED phase.

Note

This worker is only needed if the Pelican frontend is deployed.

Check

check.data_item

python -m workers.check.data_item

Perform the field-level and compiled release-level checks.

  1. Receive a message

  2. Read matching items from the data_item table

  3. Perform field-level checks on each item

  4. Perform resource-level checks on each item

  5. Marks each item’s state as complete

  6. Store all results

  7. Publish a message

There can be many workers processing the same dataset at the same time.

check.dataset

python -m workers.check.dataset

Perform the dataset-level checks.

  1. Receive a message

  2. Determine whether field-level and compiled release-level checks have been performed on all items

  3. Read the items from the data_item table, in batches

  4. Submit each item to each dataset-level check

  5. Store the results from each check

  6. Publish a message

To determine whether field-level and compiled release-level checks have been performed on all items, it waits for the dataset to be in the CONTRACTING_PROCESS phase and OK state, with all its items in the OK state (see State machine).

Note

In principle, a dataset or time-based check could depend on the results of field-level and compiled release-level checks, and a time-based check could depend on the results of a dataset check. That is why the Aggregator pattern is implemented in this worker. Otherwise, it could have been implemented in a separate worker, which would publish a message that the dataset worker and time-based worker would consume, to allow all check to run in parallel.

check.time_based

python -m workers.check.time_based

Perform the time-based checks.

  1. Receive a message

  2. Read the items from the data_item table for this dataset and its ancestor, in batches

  3. Submit each item pair to each time-based check

  4. Store the results from each check

  5. Publish a message

Others

report

python -m workers.report

Create reports, pick examples, and update dataset metadata.

  1. Receive a message

  2. Calculate compiled release-level report

  3. Prepare (random) examples from compiled release-level checks

  4. Calculate field-level report

  5. Prepare (random) examples from field-level checks

  6. Update the dataset’s metadata

wipe

Note

This worker is only needed when deploying the Data Registry.

python -m workers.wipe

Delete datasets.

  1. Receive a message

  2. Delete the dataset’s rows in:

    • resource_level_check

    • field_level_check

    • progress_monitor_item

    • progress_monitor_dataset

    • data_item

    • dataset

This worker assumes that the deployment does not enable filtered datasets (dataset_filter table) or time-based checks (time_variance_level_check table).