import logging
import threading
from functools import cache
from typing import Any
import orjson
import psycopg2.extensions
import psycopg2.extras
from yapw.clients import AsyncConsumer, Blocking
from pelican.util import settings
global db_connected, db_connection
db_connected = False
db_connection = None
db_cursor_idx = 0
logger = logging.getLogger(__name__)
# RabbitMQ
[docs]
def encode(message: Any, content_type: str | None) -> bytes:
"""
Encode the body of a message for RabbitMQ.
:param message: a decoded message
:param content_type: the message's content type
"""
return orjson.dumps(message)
[docs]
def decode(body: bytes, content_type: str | None) -> Any:
"""
Decode the body of a message from RabbitMQ.
:param message: an encoded message
:param content_type: the message's content type
"""
return orjson.loads(body)
YAPW_KWARGS = {
"url": settings.RABBIT_URL,
"exchange": settings.RABBIT_EXCHANGE_NAME,
"encode": encode,
"decode": decode,
}
[docs]
def consume(*args: Any, prefetch_count=1, **kwargs: Any) -> None:
"""
Consume messages from RabbitMQ.
"""
client = AsyncConsumer(*args, prefetch_count=prefetch_count, **kwargs, **YAPW_KWARGS)
client.start()
[docs]
def publish(*args: Any, **kwargs: Any) -> None:
"""
Publish a message to RabbitMQ.
"""
client = Blocking(**YAPW_KWARGS)
try:
client.publish(*args, **kwargs)
finally:
client.close()
# PostgreSQL
psycopg2.extras.register_default_jsonb(loads=orjson.loads, globally=True)
[docs]
class Json(psycopg2.extras.Json):
[docs]
def dumps(self, obj):
return orjson.dumps(obj)
[docs]
def get_cursor(name="") -> psycopg2.extensions.cursor:
"""
Connect to the database, if needed, and return a database cursor.
"""
global db_connected, db_connection, db_cursor_idx
if not db_connected:
db_connection = psycopg2.connect(settings.DATABASE_URL)
db_connected = True
kwargs = {}
if name:
# https://github.com/django/django/blob/stable/4.2.x/django/db/backends/postgresql/base.py#L469
db_cursor_idx += 1
kwargs["name"] = f"{name}-{threading.current_thread().ident}-{db_cursor_idx}"
# Avoid "named cursor isn't valid anymore". Another option is to use a separate connection.
# https://www.psycopg.org/docs/usage.html#server-side-cursors
kwargs["withhold"] = True
return db_connection.cursor(cursor_factory=psycopg2.extras.DictCursor, **kwargs)
[docs]
def commit() -> None:
"""
Commit the transaction.
"""
db_connection.commit()
[docs]
def rollback() -> None:
"""
Rollback the transaction.
"""
db_connection.rollback()
[docs]
class state:
IN_PROGRESS = "IN_PROGRESS"
OK = "OK"
[docs]
class phase:
CONTRACTING_PROCESS = "CONTRACTING_PROCESS"
DATASET = "DATASET"
TIME_VARIANCE = "TIME_VARIANCE"
CHECKED = "CHECKED"
DELETED = "DELETED"
[docs]
def initialize_dataset_state(dataset_id: int) -> None:
"""
Initialize a dataset's progress.
:param dataset_id: the dataset's ID
"""
sql = """\
INSERT INTO progress_monitor_dataset (dataset_id, phase, state, size)
VALUES (%(dataset_id)s, %(phase)s, %(state)s, 0)
"""
with get_cursor() as cursor:
cursor.execute(sql, {"dataset_id": dataset_id, "phase": phase.CONTRACTING_PROCESS, "state": state.IN_PROGRESS})
[docs]
def update_dataset_state(dataset_id: int, phase: str, state: str, size: int | None = None) -> None:
"""
Update a dataset's progress to the given phase and state.
:param dataset_id: the dataset's ID
:param phase: the phase to be set
:param state: the state to set
:param size: number of data items to process
"""
variables = {"phase": phase, "state": state, "dataset_id": dataset_id}
sql = """\
UPDATE progress_monitor_dataset
SET phase = %(phase)s, state = %(state)s, modified = now()
WHERE dataset_id = %(dataset_id)s
"""
if size:
variables["size"] = size
sql = """\
UPDATE progress_monitor_dataset
SET phase = %(phase)s, state = %(state)s, modified = now(), size = %(size)s
WHERE dataset_id = %(dataset_id)s
"""
with get_cursor() as cursor:
cursor.execute(sql, variables)
[docs]
def initialize_items_state(dataset_id: int, item_ids: list[int]) -> None:
"""
Initialize data items' progress.
:param dataset_id: the dataset's ID
:param item_ids: the data items' IDs
"""
sql = """\
INSERT INTO progress_monitor_item (dataset_id, item_id, state)
VALUES %s
"""
with get_cursor() as cursor:
psycopg2.extras.execute_values(cursor, sql, [(dataset_id, item_id, state.IN_PROGRESS) for item_id in item_ids])
[docs]
def update_items_state(dataset_id: int, item_ids: list[int], state: str) -> None:
"""
Update data items' progress to the given state.
:param dataset_id: the dataset's ID
:param item_ids: the data items' IDs
:param state: the state to set
"""
sql = """\
UPDATE progress_monitor_item
SET state = data.state, modified = now()
FROM (VALUES %s) AS data (dataset_id, item_id, state)
WHERE progress_monitor_item.dataset_id = data.dataset_id AND progress_monitor_item.item_id = data.item_id
"""
with get_cursor() as cursor:
psycopg2.extras.execute_values(cursor, sql, [(dataset_id, item_id, state) for item_id in item_ids])
[docs]
def get_processed_items_count(dataset_id: int) -> int:
"""
Return the number of items processed.
:param dataset_id: the dataset's ID
"""
with get_cursor() as cursor:
cursor.execute(
"SELECT COUNT(*) cnt FROM progress_monitor_item WHERE dataset_id = %(dataset_id)s AND state = %(state)s",
{"dataset_id": dataset_id, "state": state.OK},
)
return cursor.fetchone()["cnt"]
# The check.dataset worker calls this function when phase=CONTRACTING_PROCESS and state=OK, at which point size is set.
[docs]
@cache
def get_total_items_count(dataset_id: int) -> int:
"""
Return the number of items to process.
:param dataset_id: the dataset's ID
"""
with get_cursor() as cursor:
cursor.execute(
"SELECT size FROM progress_monitor_dataset WHERE dataset_id = %(dataset_id)s", {"dataset_id": dataset_id}
)
return cursor.fetchone()["size"]
[docs]
def get_dataset_progress(dataset_id: int) -> tuple[Any, ...] | None:
"""
Return the dataset's progress.
:param dataset_id: the dataset's ID
"""
with get_cursor() as cursor:
cursor.execute(
"SELECT * FROM progress_monitor_dataset WHERE dataset_id = %(dataset_id)s", {"dataset_id": dataset_id}
)
return cursor.fetchone()