Skip to content

service

dev_tool.services.postgres.service

log = logging.getLogger(__name__) module-attribute

PostgresService

Bases: BaseService

A service class for PostgreSQL database operations.

This class provides methods for creating, dropping, and managing PostgreSQL databases.

Source code in dev_tool/services/postgres/service.py
def __init__(self) -> None:
    super().__init__()

check_if_superuser_exists staticmethod

A method that checks if the specified superuser exists in PostgreSQL.

Returns:

  • bool

    True if the superuser exists, False otherwise.

Source code in dev_tool/services/postgres/service.py
@staticmethod
def check_if_superuser_exists() -> bool:
    """
    A method that checks if the specified superuser exists in PostgreSQL.

    :return: True if the superuser exists, False otherwise.
    """

    user = os.getenv('DATABASE_USER')

    try:
        with PostgresService.get_postgres_connection() as connection:
            cursor = connection.cursor()

            query = 'SELECT 1 FROM pg_roles WHERE rolname = %s'
            cursor.execute(query, (user,))

            result = cursor.fetchone()
    except PostgresConnectionError:
        raise
    except Exception:
        message = f'Failed to check if superuser "{user}" exists'
        log.exception(message)

        raise SuperuserCreationError(message) from None
    else:
        return result is not None

create_database staticmethod

A method that creates a new PostgreSQL database.

Parameters:

  • database_name (str) –

    The name of the database to create.

Source code in dev_tool/services/postgres/service.py
@staticmethod
def create_database(database_name: str) -> None:
    """
    A method that creates a new PostgreSQL database.

    :param database_name: The name of the database to create.
    """

    try:
        with PostgresService.get_postgres_connection(autocommit=True) as connection:
            cursor = connection.cursor()

            identifier = Identifier(database_name)
            query = SQL('CREATE DATABASE {}').format(identifier)
            cursor.execute(query)

            message = f'Database "{database_name}" created.'
            CONTEXT.notification.normal_text(message)

            log.debug(message)
    except PostgresConnectionError:
        raise
    except Exception:
        message = f'Failed to create database "{database_name}"'
        log.exception(message)

        raise DatabaseCreationError(message) from None

create_postgres_superuser staticmethod

A method that creates a PostgreSQL superuser if it does not exist.

Source code in dev_tool/services/postgres/service.py
@staticmethod
def create_postgres_superuser() -> None:
    """A method that creates a PostgreSQL superuser if it does not exist."""

    user = os.getenv('DATABASE_USER')

    try:
        if not PostgresService.check_if_superuser_exists():
            password = os.getenv('DATABASE_PASSWORD')

            message = f'Superuser {user} does not exist. Creating user.'
            CONTEXT.notification.normal_text(message)

            log.debug(message)

            with PostgresService.get_postgres_connection() as connection:
                cursor = connection.cursor()

                identifier = Identifier(user)
                query = SQL('CREATE USER {} WITH SUPERUSER PASSWORD %s').format(identifier)

                cursor.execute(
                    query,
                    (password,)
                )

                connection.commit()

                message = f'Superuser {user} created and privileges granted.'
                CONTEXT.notification.normal_text(message)

                log.debug(message)
        else:
            message = f'Superuser {user} already exists. Skipping...'
            CONTEXT.notification.normal_text(message)

            log.debug(message)
    except (PostgresConnectionError, SuperuserCreationError):
        raise
    except Exception:
        message = f'Unexpected error creating superuser "{user}"'
        log.exception(message)

        raise SuperuserCreationError(message) from None

drop_database staticmethod

A method that drops a PostgreSQL database.

Parameters:

  • database_name (str) –

    The name of the database to drop.

Source code in dev_tool/services/postgres/service.py
@staticmethod
def drop_database(database_name: str) -> None:
    """
    A method that drops a PostgreSQL database.

    :param database_name: The name of the database to drop.
    """

    try:
        with PostgresService.get_postgres_connection(autocommit=True) as connection:
            cursor = connection.cursor()

            identifier = Identifier(database_name)
            query = SQL('DROP DATABASE IF EXISTS {}').format(identifier)
            cursor.execute(query)

            message = f'Database "{database_name}" dropped.'
            CONTEXT.notification.normal_text(message)

            log.debug(message)
    except PostgresConnectionError:
        raise
    except Exception:
        message = f'Failed to drop database "{database_name}"'
        log.exception(message)

        raise DatabaseDeletionError(message) from None

get_postgres_connection staticmethod

A context manager that provides a PostgreSQL database connection.

:yield: A PostgreSQL database connection.

Parameters:

  • autocommit (bool, default: False ) –

    A flag indicating whether to enable autocommit mode.

Source code in dev_tool/services/postgres/service.py
@staticmethod
@contextmanager
def get_postgres_connection(autocommit: bool = False) -> psycopg2_connection:
    """
    A context manager that provides a PostgreSQL database connection.

    :param autocommit: A flag indicating whether to enable autocommit mode.
    :yield: A PostgreSQL database connection.
    """

    connection = None

    @retry_on_failure(maximum_retries=10, delay=2)
    def connect() -> psycopg2_connection:
        return psycopg2.connect(
            dbname='postgres',
            user=os.getenv('DATABASE_USER'),
            password=os.getenv('DATABASE_PASSWORD'),
            host=os.getenv('DATABASE_HOST', 'localhost'),
            port=int(os.getenv('DATABASE_PORT', '5432'))
        )

    try:
        try:
            connection = connect()

            if autocommit:
                connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

            yield connection
        except RetryExceededError:
            message = 'Exceeded maximum retries for Postgres connection'
            log.exception(message)

            raise PostgresConnectionError(message) from None
        except Exception:
            message = 'Failed to establish PostgreSQL connection'
            log.exception(message)

            raise PostgresConnectionError(message) from None
    finally:
        if connection:
            connection.close()

setup_django_databases staticmethod

A method that sets up databases for Django.

This method drops and recreates the main and test databases.

Source code in dev_tool/services/postgres/service.py
@staticmethod
@is_django_environment_variables
def setup_django_databases() -> None:
    """
    A method that sets up databases for Django.

    This method drops and recreates the main and test databases.
    """

    database_name = os.getenv('DATABASE_NAME')

    if database_name is None:
        message = 'Database name is required'
        log.debug(message)

        return

    try:
        PostgresService.drop_database(database_name)
        PostgresService.drop_database('test_' + database_name)
        PostgresService.create_database(database_name)
    except (DatabaseCreationError, DatabaseDeletionError):
        raise
    except Exception:
        message = 'Failed to setup Django databases'
        log.exception(message)

        raise DatabaseSetupError(message) from None

is_dump_file staticmethod

A method that checks if a database dump file exists.

Parameters:

Returns:

  • bool

    True if the dump file exists, False otherwise.

Source code in dev_tool/services/postgres/service.py
@staticmethod
@is_django_environment_variables
def is_dump_file(postgres_database: CloudPostgresDatabase | DockerPostgresDatabase | None = None) -> bool:
    """
    A method that checks if a database dump file exists.

    :param postgres_database: The PostgreSQL database object.
    :return: True if the dump file exists, False otherwise.
    """

    if is_dump_environment_variables():
        if postgres_database is None:
            postgres_database = CloudPostgresDatabase()

        sql_input_file = BASE / (postgres_database.config.name + '.sql')
        sql_output_file = BASE / (postgres_database.config.name + '-clean.sql')

        return sql_input_file.exists() or sql_output_file.exists()

    return False

dump_cloud_to_docker_database staticmethod

A method that dumps a cloud PostgreSQL database to a Docker database.

Parameters:

  • redump (bool) –

    A flag indicating whether to (re)dump the database.

  • reuse (bool) –

    A flag indicating whether to reuse an existing dump file.

Source code in dev_tool/services/postgres/service.py
@staticmethod
@is_postgres_environment_variables
def dump_cloud_to_docker_database(redump: bool, reuse: bool) -> None:
    """
    A method that dumps a cloud PostgreSQL database to a Docker database.

    :param redump: A flag indicating whether to (re)dump the database.
    :param reuse: A flag indicating whether to reuse an existing dump file.
    """

    if is_dump_environment_variables():
        try:
            cloud_database = CloudPostgresDatabase()
            docker_database = DockerPostgresDatabase()

            postgres_manager = PostgresManager(
                docker_database=docker_database,
                cloud_database=cloud_database
            )

            if redump:
                postgres_manager.from_cloud_database_to_docker_database()
                return

            if PostgresService.is_dump_file(cloud_database):
                if reuse:
                    postgres_manager.from_sql_file_to_docker_database()
                else:
                    message = 'Skipping reuse of existing dump file...'
                    CONTEXT.notification.normal_text(message)

                    log.debug(message)
            else:
                message = 'No dump file exists. Nothing to reuse and no (re)dump requested.'
                CONTEXT.notification.normal_text(message)

                log.debug(message)
        except Exception:
            message = 'Failed to dump cloud database to Docker database'
            log.exception(message)

            raise DatabaseDumpError(message) from None
    else:
        message = 'Skipping dumping to Docker database...'
        CONTEXT.notification.normal_text(message)

        log.debug(message)

restore_from_content

A method that restores a database from decrypted content.

Parameters:

  • content (str) –

    The decrypted SQL content to restore.

Source code in dev_tool/services/postgres/service.py
def restore_from_content(self, content: str) -> None:
    """
    A method that restores a database from decrypted content.

    :param content: The decrypted SQL content to restore.
    """

    try:
        manager = PostgresManager(
            DockerPostgresDatabase(),
            CloudPostgresDatabase()
        )

        manager.from_decrypted_content_to_docker_database(content)

        message = 'Database restored successfully from content'
        CONTEXT.notification.success_text(message)

        log.info(message)
    except Exception:
        message = 'Failed to restore database from content'
        log.exception(message)

        raise DatabaseRestoreError(message) from None

restore_from_sql_file

A method that restores a database from a raw SQL file.

Parameters:

  • file (Path) –

    The path to the SQL file.

Source code in dev_tool/services/postgres/service.py
def restore_from_sql_file(self, file: Path) -> None:
    """
    A method that restores a database from a raw SQL file.

    :param file: The path to the SQL file.
    """

    try:
        sql_output_file = BASE / f'{file.stem}-clean.sql'

        manager = PostgresManager(
            DockerPostgresDatabase(),
            CloudPostgresDatabase()
        )

        manager.update_sql_file(file, sql_output_file)
        manager.docker_database.disconnect_all()

        if not manager.docker_database.user_exists():
            manager.docker_database.create_user()

        manager.docker_database.grant_all_privileges()
        manager.docker_database.restore_database_from_file(sql_output_file)

        if sql_output_file.exists():
            sql_output_file.unlink()

        message = f'Database restored successfully from {file.name}'
        CONTEXT.notification.success_text(message)

        log.info(message)
    except Exception:
        message = f'Failed to restore database from SQL file {file.name}'
        log.exception(message)

        raise DatabaseRestoreError(message) from None

is_dump_environment_variables

A function that checks if all required environment variables for database dumping are set.

Parameters:

  • suppress (bool, default: True ) –

    A flag indicating whether to suppress warning messages.

Returns:

  • bool

    True if all required variables are set, False otherwise.

Source code in dev_tool/services/postgres/service.py
def is_dump_environment_variables(suppress: bool = True) -> bool:
    """
    A function that checks if all required environment variables for database dumping are set.

    :param suppress: A flag indicating whether to suppress warning messages.
    :return: True if all required variables are set, False otherwise.
    """

    required = (
        'CLOUD_DATABASE_HOST',
        'CLOUD_DATABASE_PORT',
        'CLOUD_DATABASE_NAME',
        'CLOUD_DATABASE_USER',
        'CLOUD_DATABASE_PASSWORD'
    )

    return is_in_env_variables(required, suppress=suppress)