Skip to content

strategy

dev_tool.db.postgres.strategy

log = logging.getLogger(__name__) module-attribute

PostgresDatabaseStrategy

Bases: ABC

An abstract base class for PostgreSQL database strategies.

This class defines the interface for interacting with PostgreSQL databases.

The constructor for the PostgresDatabaseStrategy class.

Parameters:

Source code in dev_tool/db/postgres/strategy.py
def __init__(self, config: DatabaseConfig) -> None:
    """
    The constructor for the PostgresDatabaseStrategy class.

    :param config: The database configuration.
    """

    self.config = config

config = config instance-attribute

create_database abstractmethod

A method that creates a database.

Source code in dev_tool/db/postgres/strategy.py
@abstractmethod
def create_database(self) -> None:
    """A method that creates a database."""

    raise NotImplementedError

create_user

A method that creates a database user.

Source code in dev_tool/db/postgres/strategy.py
def create_user(self) -> None:
    """A method that creates a database user."""

    connection = None

    try:
        connection = self._connect(autocommit=True)

        with connection.cursor() as cursor:
            identifier = Identifier(self.config.user)
            query = SQL('CREATE USER {} WITH SUPERUSER PASSWORD %s').format(identifier)

            cursor.execute(
                query,
                (self.config.password,)
            )
    except PostgreSQLConnectionError:
        raise
    except psycopg2.IntegrityError:
        message = f'User already exists: {self.config.user}'
        CONTEXT.notification.warning_text(message)

        log.exception(message)
    except (psycopg2.DatabaseError, psycopg2.Error) as exception:
        message = f'Database error creating user: {self.config.user}: {exception}'
        CONTEXT.notification.error_banner(message)

        log.exception(message)
        raise PostgreSQLConnectionError(message) from None
    except Exception as exception:
        message = f'Failed to create user: {self.config.user}: {exception}'
        CONTEXT.notification.error_banner(message)

        log.exception(message)
        raise PostgreSQLConnectionError(message) from None
    finally:
        if connection:
            connection.close()

database_exists

A method that checks if a database exists.

Returns:

  • bool

    True if the database exists, False otherwise.

Source code in dev_tool/db/postgres/strategy.py
def database_exists(self) -> bool:
    """
    A method that checks if a database exists.

    :return: True if the database exists, False otherwise.
    """

    connection = None

    try:
        connection = self._connect(autocommit=False)

        with connection.cursor() as cursor:
            cursor.execute(
                'SELECT 1 FROM pg_database WHERE datname = %s',
                (self.config.name,)
            )
            return cursor.fetchone() is not None
    except PostgreSQLConnectionError:
        raise
    except (psycopg2.DatabaseError, psycopg2.Error) as exception:
        message = f'Database error checking if database exists {self.config.name}: {exception}'
        CONTEXT.notification.error_banner(message)

        log.exception(message)
        raise PostgreSQLConnectionError(message) from None
    except Exception as exception:
        message = f'Failed to check if database exists {self.config.name}: {exception}'
        CONTEXT.notification.error_banner(message)

        log.exception(message)
        raise PostgreSQLConnectionError(message) from None
    finally:
        if connection:
            connection.close()

drop_database abstractmethod

A method that drops a database.

Source code in dev_tool/db/postgres/strategy.py
@abstractmethod
def drop_database(self) -> None:
    """A method that drops a database."""

    raise NotImplementedError

disconnect_all

A method that disconnects all users from a database.

Source code in dev_tool/db/postgres/strategy.py
def disconnect_all(self) -> None:
    """A method that disconnects all users from a database."""

    connection = None

    try:
        connection = self._connect(autocommit=True)

        with connection.cursor() as cursor:
            query = """
                SELECT pg_terminate_backend(pg_stat_activity.pid)
                FROM pg_stat_activity
                WHERE pg_stat_activity.datname = %s AND pid <> pg_backend_pid()
            """

            cursor.execute(
                query,
                (self.config.name,)
            )
    except PostgreSQLConnectionError:
        raise
    except (psycopg2.DatabaseError, psycopg2.Error) as exception:
        message = f'Database error disconnecting users from database {self.config.name}: {exception}'
        CONTEXT.notification.error_banner(message)

        log.exception(message)
        raise PostgreSQLConnectionError(message) from None
    except Exception as exception:
        message = f'Failed to disconnect all users from database {self.config.name}: {exception}'
        CONTEXT.notification.error_banner(message)

        log.exception(message)
        raise PostgreSQLConnectionError(message) from None
    finally:
        if connection:
            connection.close()

dump_database

A method that dumps a database to a file.

Parameters:

  • input_file (Path) –

    The path to the output file.

Source code in dev_tool/db/postgres/strategy.py
def dump_database(self, input_file: Path) -> None:
    """
    A method that dumps a database to a file.

    :param input_file: The path to the output file.
    """

    raise NotImplementedError

grant_all_privileges

A method that grants all privileges to a database user.

Source code in dev_tool/db/postgres/strategy.py
def grant_all_privileges(self) -> None:
    """A method that grants all privileges to a database user."""

    connection = None

    try:
        connection = self._connect(autocommit=True)

        with connection.cursor() as cursor:
            identifier = Identifier(self.config.user)

            query = (
                SQL('ALTER ROLE {} WITH CREATEDB CREATEROLE REPLICATION BYPASSRLS')
                .format(identifier)
            )

            cursor.execute(query)
    except PostgreSQLConnectionError:
        raise
    except (psycopg2.DatabaseError, psycopg2.Error) as exception:
        message = f'Database error granting privileges to user {self.config.user}: {exception}'
        CONTEXT.notification.error_banner(message)

        log.exception(message)
        raise PostgreSQLConnectionError(message) from None
    except Exception as exception:
        message = f'Failed to grant privileges to user {self.config.user}: {exception}'
        CONTEXT.notification.error_banner(message)

        log.exception(message)
        raise PostgreSQLConnectionError(message) from None
    finally:
        if connection:
            connection.close()

restore_database_from_content

A method that restores a database from content via stdin.

Parameters:

  • content (str) –

    The content to restore.

Source code in dev_tool/db/postgres/strategy.py
def restore_database_from_content(self, content: str) -> None:
    """
    A method that restores a database from content via stdin.

    :param content: The content to restore.
    """

    def check_stdin(process: subprocess.Popen) -> None:
        if process.stdin is None:
            error = 'Failed to open stdin for database restore'
            CONTEXT.notification.error_banner(error)
            log.exception(error)
            raise PostgreSQLConnectionError(error)

    def check_early_termination(process: subprocess.Popen, stderr: str) -> None:
        if process.poll() is not None:
            error = f'psql process died early: {stderr}'
            CONTEXT.notification.error_banner(error)
            log.exception(error)
            raise PostgreSQLConnectionError(error)

    def check_return_code(returncode: int, stderr: str) -> None:
        if returncode != 0:
            error = f'Failed to restore database from content: {stderr}'
            CONTEXT.notification.error_banner(error)
            log.exception(error)
            raise PostgreSQLConnectionError(error)

    def handle_broken_pipe(stderr: str) -> None:
        error = f'psql connection broken. Error output: {stderr}'
        CONTEXT.notification.error_banner(error)
        log.exception(error)
        raise PostgreSQLConnectionError(error)

    def read_stream(stream: TextIOWrapper, buffer: list[str]) -> None:
        for line in stream:
            buffer.append(line) # noqa: PERF402

    stdout_lines = []
    stderr_lines = []
    stdout_thread = None
    stderr_thread = None

    try:
        self.drop_database()
        self.create_database()

        content = self._sanitize(content)

        version = CONTEXT.configuration.get_docker_config().get('postgres-version')

        command = [
            'docker', 'run',
            '--rm',
            '-i',
            '-e', f'PGPASSWORD={self.config.password}',
            f'postgres:{version}',
            'psql',
            '-h', 'host.docker.internal',
            '-U', self.config.user,
            '-d', self.config.name,
            '-v', 'ON_ERROR_STOP=1'
        ]

        process = subprocess.Popen(
            command,
            encoding='utf-8',
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )

        check_stdin(process)

        assert process.stdin is not None

        stdout_thread = threading.Thread(
            target=read_stream,
            args=(process.stdout, stdout_lines)
        )

        stderr_thread = threading.Thread(
            target=read_stream,
            args=(process.stderr, stderr_lines)
        )

        stdout_thread.start()
        stderr_thread.start()

        size = 65536

        for i in range(0, len(content), size):
            stderr_thread.join(timeout=0)
            stderr = ''.join(stderr_lines)
            check_early_termination(process, stderr)

            chunk = content[i:i+size]
            process.stdin.write(chunk)
            process.stdin.flush()

        process.stdin.close()

        stdout_thread.join()
        stderr_thread.join()

        process.wait()

        stderr = ''.join(stderr_lines)
        check_return_code(process.returncode, stderr)

        message = 'Database restored successfully.'
        CONTEXT.notification.normal_text(message)
        log.debug(message)
    except subprocess.CalledProcessError as exception:
        error = f'Failed to restore database from content: {exception}'
        CONTEXT.notification.error_banner(error)

        log.exception(error)
        raise PostgreSQLConnectionError(error) from None
    except BrokenPipeError:
        if stderr_thread:
            stderr_thread.join(timeout=1)

        stderr = ''.join(stderr_lines)
        handle_broken_pipe(stderr)
    except Exception as exception:
        error = f'Unexpected error restoring database from content: {exception}'
        CONTEXT.notification.error_banner(error)

        log.exception(error)
        raise PostgreSQLConnectionError(error) from None

restore_database_from_file

A method that restores a database from an SQL file.

Parameters:

  • sql (Path) –

    The path to the SQL file.

Source code in dev_tool/db/postgres/strategy.py
def restore_database_from_file(self, sql: Path) -> None:
    """
    A method that restores a database from an SQL file.

    :param sql: The path to the SQL file.
    """

    try:
        self.drop_database()
        self.create_database()

        version = CONTEXT.configuration.get_docker_config().get('postgres-version')

        mount = '/tmp/mounted' # noqa: S108

        command = [
            'docker', 'run',
            '--rm',
            '-v', f'{sql.parent}:{mount}',
            '-e', f'PGPASSWORD={self.config.password}',
            f'postgres:{version}',
            'psql',
            '-h', 'host.docker.internal',
            '-U', self.config.user,
            '-d', self.config.name,
            '-v', 'ON_ERROR_STOP=1',
            '-f', f'{mount}/{sql.name}'
        ]

        subprocess.run(command, check=True)

        message = 'Database restored successfully.'
        CONTEXT.notification.normal_text(message)

        log.exception(message)
    except subprocess.CalledProcessError as exception:
        message = f'Failed to restore database from SQL file {sql}: {exception}'
        CONTEXT.notification.error_banner(message)

        log.exception(message)
        raise PostgreSQLConnectionError(message) from None
    except Exception as exception:
        message = f'Unexpected error restoring database from SQL file {sql}: {exception}'
        CONTEXT.notification.error_banner(message)

        log.exception(message)
        raise PostgreSQLConnectionError(message) from None

execute_pgcron_setup

Execute pgcron setup SQL against the postgres database.

Parameters:

  • pgcron_sql (str) –

    The pgcron SQL to execute.

Source code in dev_tool/db/postgres/strategy.py
def execute_pgcron_setup(self, pgcron_sql: str) -> None:
    """
    Execute pgcron setup SQL against the postgres database.

    :param pgcron_sql: The pgcron SQL to execute.
    """

    def check_stdin(process: subprocess.Popen) -> None:
        if process.stdin is None:
            message = 'Failed to open stdin for pgcron setup'
            raise PostgreSQLConnectionError(message)

    def read_stream(stream: TextIOWrapper, buffer: list[str]) -> None:
        for line in stream:
            buffer.append(line) # noqa: PERF402

    try:
        version = CONTEXT.configuration.get_docker_config().get('postgres-version')

        command = [
            'docker', 'run',
            '--rm',
            '-i',
            '-e', f'PGPASSWORD={self.config.password}',
            f'postgres:{version}',
            'psql',
            '-h', 'host.docker.internal',
            '-U', self.config.user,
            '-d', 'postgres',
            '-v', 'ON_ERROR_STOP=0'
        ]

        process = subprocess.Popen(
            command,
            encoding='utf-8',
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )

        check_stdin(process)

        assert process.stdin is not None

        stdout_lines = []
        stderr_lines = []

        stdout_thread = threading.Thread(
            target=read_stream,
            args=(process.stdout, stdout_lines)
        )

        stderr_thread = threading.Thread(
            target=read_stream,
            args=(process.stderr, stderr_lines)
        )

        stdout_thread.start()
        stderr_thread.start()

        process.stdin.write(pgcron_sql)
        process.stdin.flush()
        process.stdin.close()

        stdout_thread.join()
        stderr_thread.join()

        process.wait()

        stderr = ''.join(stderr_lines)

        if process.returncode != 0 and 'does not exist' not in stderr:
            message = f'pgcron setup had errors: {stderr}'
            CONTEXT.notification.warning_text(message)
            log.warning(message)

    except Exception as e:
        message = f'Error executing pgcron setup: {e}'
        CONTEXT.notification.warning_text(message)

        log.warning(message)

user_exists

A method that checks if a database user exists.

Returns:

  • bool

    True if the user exists, False otherwise.

Source code in dev_tool/db/postgres/strategy.py
def user_exists(self) -> bool:
    """
    A method that checks if a database user exists.

    :return: True if the user exists, False otherwise.
    """

    connection = None

    try:
        connection = self._connect(autocommit=False)

        with connection.cursor() as cursor:
            query = 'SELECT 1 FROM pg_roles WHERE rolname = %s'

            cursor.execute(
                query,
                (self.config.user,)
            )

            return cursor.fetchone() is not None
    except PostgreSQLConnectionError:
        raise
    except (psycopg2.DatabaseError, psycopg2.Error):
        message = f'Database error checking if user exists: {self.config.user}'
        CONTEXT.notification.error_banner(message)

        log.exception(message)
        raise PostgreSQLConnectionError(message) from None
    except Exception:
        message = f'Failed to check if user exists: {self.config.user}'
        CONTEXT.notification.error_banner(message)

        log.exception(message)
        raise PostgreSQLConnectionError(message) from None
    finally:
        if connection:
            connection.close()