Workers

pelican.util.workers.is_step_required(*steps)[source]

Return whether to run the step(s).

Param:

one or more steps

Parameters:

steps (str)

Return type:

bool

pelican.util.workers.process_items(client_state, channel, method, routing_key, cursors, dataset_id, ids, insert_items)[source]

Ack the message, initialize the dataset’s and items’ progress, insert items into the database in batches, and publish messages to process the items in batches.

Parameters:
  • routing_key (str) – the routing key for the outgoing message

  • cursors (dict[str, Any]) – the database cursors (“default” is required)

  • dataset_id (int) – the dataset’s ID

  • ids (list[int]) – the ID’s of rows to import

  • insert_items (Callable[[dict[str, Any], int, list[int]], None]) – a function to insert the items, taking cursors, dataset_id, ids

  • client_state (State)

  • channel (Channel)

  • method (Deliver)

Return type:

None

pelican.util.workers.finish_callback(client_state, channel, method, dataset_id, phase=None, routing_key=None)[source]

Update the dataset’s state, publish a message if a routing key is provided, and ack the message.

Parameters:
  • dataset_id (int) – the dataset’s ID

  • phase (str | None) – the dataset’s phase

  • routing_key (str | None) – the routing key for the outgoing message

  • client_state (State)

  • channel (Channel)

  • method (Deliver)

Return type:

None