Workers¶
Extract¶
extract.kingfisher_process¶
python -m workers.extract.kingfisher_process
Extract collections from Kingfisher Process.
Receive a message
Select data IDs for the matching collection in Kingfisher Process
Create the dataset
Update the dataset’s metadata
Acknowledge the message (point-of-no-return)
Select compiled releases from Kingfisher Process, in batches
Insert compiled releases into the
data_item
tableInitializes each item’s state as in-progress
Publishes a message with batches of item IDs
Note
Some field-level checks use external codelists. These are cached in-memory by each worker for 1 day.
extract.dataset_filter¶
python -m workers.extract.dataset_filter
Create filtered datasets.
The worker will ignore a message if the dataset is not in the CHECKED
phase.
Note
This worker is only needed if the Pelican frontend is deployed.
Check¶
check.data_item¶
python -m workers.check.data_item
Perform the field-level and compiled release-level checks.
Receive a message
Read matching items from the
data_item
tablePerform field-level checks on each item
Perform resource-level checks on each item
Marks each item’s state as complete
Store all results
Publish a message
There can be many workers processing the same dataset at the same time.
check.dataset¶
python -m workers.check.dataset
Perform the dataset-level checks.
Receive a message
Determine whether field-level and compiled release-level checks have been performed on all items
Read the items from the
data_item
table, in batchesSubmit each item to each dataset-level check
Store the results from each check
Publish a message
To determine whether field-level and compiled release-level checks have been performed on all items, it waits for the dataset to be in the CONTRACTING_PROCESS
phase and OK
state, with all its items in the OK
state (see State machine).
Note
In principle, a dataset or time-based check could depend on the results of field-level and compiled release-level checks, and a time-based check could depend on the results of a dataset check. That is why the Aggregator pattern is implemented in this worker. Otherwise, it could have been implemented in a separate worker, which would publish a message that the dataset worker and time-based worker would consume, to allow all check to run in parallel.
check.time_based¶
python -m workers.check.time_based
Perform the time-based checks.
Receive a message
Read the items from the
data_item
table for this dataset and its ancestor, in batchesSubmit each item pair to each time-based check
Store the results from each check
Publish a message
Others¶
report¶
python -m workers.report
Create reports, pick examples, and update dataset metadata.
Receive a message
Calculate compiled release-level report
Prepare (random) examples from compiled release-level checks
Calculate field-level report
Prepare (random) examples from field-level checks
Update the dataset’s metadata
wipe¶
Note
This worker is only needed when deploying the Data Registry.
python -m workers.wipe
Delete datasets.
Receive a message
Delete the dataset’s rows in:
resource_level_check
field_level_check
progress_monitor_item
progress_monitor_dataset
data_item
dataset
This worker assumes that the deployment does not enable filtered datasets (dataset_filter
table) or time-based checks (time_variance_level_check
table).