Skip to content

database

dev_tool.services.portal.database

__all__ = ['DatabaseService', 'JobStatus'] module-attribute

JobStatus

Bases: StrEnum

COMPLETED = 'completed' class-attribute instance-attribute

FAILED = 'failed' class-attribute instance-attribute

DatabaseService

Bases: BaseService

A service class for managing portal database operations.

This class provides methods for creating database dumps, downloading files, decrypting content, and restoring databases from the portal.

The constructor for the DatabaseService class.

Parameters:

Source code in dev_tool/services/portal/database/service.py
def __init__(self, config: PortalServiceConfig, client: APIClientService) -> None:
    """
    The constructor for the DatabaseService class.

    :param config: The portal service configuration.
    :param client: The API client service for making requests.
    """

    super().__init__()

    self.config = config
    self.client = client

    self.manager = PostgresManager(
        DockerPostgresDatabase(),
        CloudPostgresDatabase()
    )

    self.task_manager = TaskManager()

config = config instance-attribute

client = client instance-attribute

manager = PostgresManager(DockerPostgresDatabase(), CloudPostgresDatabase()) instance-attribute

task_manager = TaskManager() instance-attribute

admin_create_dump

A method that creates an unrestricted admin database dump on the portal.

Parameters:

  • data (dict[str, Any]) –

    The dump configuration data.

Returns:

  • dict[str, Any]

    The dump result containing filename and key identifier.

Raises:

  • PortalDumpError

    If the dump creation fails.

Source code in dev_tool/services/portal/database/service.py
def admin_create_dump(self, data: dict[str, Any]) -> dict[str, Any]:
    """
    A method that creates an unrestricted admin database dump on the portal.

    :param data: The dump configuration data.
    :return: The dump result containing filename and key identifier.
    :raises PortalDumpError: If the dump creation fails.
    """

    try:
        endpoint = self.config.admin_dump()
        response = self.client.post(endpoint, json=data, timeout=60.0)
        response.raise_for_status()
        result = response.json()

        if 'job_id' in result:
            job_id = result['job_id']
            result = self._wait_for_dump_completion(job_id)
            result['job_id'] = job_id

            return result
    except PortalDumpError:
        raise
    except Exception:
        message = 'Failed to create admin database dump'
        log.exception(message)
        raise PortalDumpError(message) from None
    else:
        return result

backup_database

A method that backs up a database to a local .sql file as a background task.

Parameters:

  • project (str) –

    The project name from pyproject.toml.

  • database (str) –

    The name of the database to backup.

Returns:

  • str

    The task ID for tracking the backup progress.

Source code in dev_tool/services/portal/database/service.py
def backup_database(self, project: str, database: str) -> str:
    """
    A method that backs up a database to a local .sql file as a background task.

    :param project: The project name from pyproject.toml.
    :param database: The name of the database to backup.
    :return: The task ID for tracking the backup progress.
    """

    def process() -> str:
        try:
            data = {
                'project': project,
                'database': database,
                'tables': None,
                'anonymize': False
            }

            result = self.admin_create_dump(data)

            job_id = result['job_id']
            filename = result['filename']
            identifier = result['key_identifier']

            encrypted_file = self.download_file(job_id, filename)
            key = self.get_decryption_key(identifier)
            content = self.decrypt_content(encrypted_file, key)

            sql_filename = filename.replace('.encrypted', '')
            sql_path = BASE / sql_filename

            with open(sql_path, 'w', encoding='utf-8') as handle:
                handle.write(content)

            encrypted_file.unlink()

            message = f'Database backup completed: {sql_filename}'
            self.notification.normal_text(message)
            log.info(message)
        except (PortalDumpError, PortalDownloadError, PortalDecryptionError):
            raise
        except Exception:
            message = f'Failed to backup database: {database}'
            log.exception(message)

            raise PortalDumpError(message) from None
        else:
            return f'Database backup saved to {sql_filename}'

    title = f'Backing up database: {database}'

    return self.task_manager.submit(
        TaskType.DATABASE_RESTORE,
        title,
        process,
        database=database
    )

create_dump

A method that creates a database dump on the portal.

Parameters:

  • data (dict[str, Any]) –

    The dump configuration data.

Returns:

  • dict[str, Any]

    The dump result containing filename and key identifier.

Raises:

  • PortalDumpError

    If the dump creation fails.

Source code in dev_tool/services/portal/database/service.py
def create_dump(self, data: dict[str, Any]) -> dict[str, Any]:
    """
    A method that creates a database dump on the portal.

    :param data: The dump configuration data.
    :return: The dump result containing filename and key identifier.
    :raises PortalDumpError: If the dump creation fails.
    """

    try:
        endpoint = self.config.dump()
        response = self.client.post(endpoint, json=data, timeout=60.0)
        response.raise_for_status()
        result = response.json()

        if 'job_id' in result:
            job_id = result['job_id']
            result = self._wait_for_dump_completion(job_id)
            result['job_id'] = job_id

            return result
    except PortalDumpError:
        raise
    except Exception:
        message = 'Failed to create database dump'
        log.exception(message)
        raise PortalDumpError(message) from None
    else:
        return result

decrypt_content

A method that decrypts an encrypted database file.

Parameters:

  • file (Path) –

    The path to the encrypted file.

  • data (dict[str, str]) –

    The decryption key data.

Returns:

  • str

    The decrypted content as a string.

Raises:

  • PortalDecryptionError

    If decryption fails.

Source code in dev_tool/services/portal/database/service.py
def decrypt_content(self, file: Path, data: dict[str, str]) -> str:
    """
    A method that decrypts an encrypted database file.

    :param file: The path to the encrypted file.
    :param data: The decryption key data.
    :return: The decrypted content as a string.
    :raises PortalDecryptionError: If decryption fails.
    """

    try:
        secret = data['key'].encode()
        fernet = Fernet(secret)

        chunks = []

        with open(file, 'rb') as handle:
            for line in handle:
                line = line.strip()

                if line:
                    chunk = fernet.decrypt(line)
                    chunks.append(chunk)
    except Exception:
        message = 'Failed to decrypt database file'
        log.exception(message)

        raise PortalDecryptionError(message) from None
    else:
        return b''.join(chunks).decode('utf-8')

download_file

A method that downloads a file from the portal.

Parameters:

  • job_id (str) –

    The job ID for the dump.

  • filename (str) –

    The name of the file to download.

Returns:

  • Path

    The path to the downloaded file.

Raises:

  • PortalDownloadError

    If the download fails.

Source code in dev_tool/services/portal/database/service.py
def download_file(self, job_id: str, filename: str) -> Path:
    """
    A method that downloads a file from the portal.

    :param job_id: The job ID for the dump.
    :param filename: The name of the file to download.
    :return: The path to the downloaded file.
    :raises PortalDownloadError: If the download fails.
    """

    endpoint = self.config.download(job_id)
    path = BASE / filename

    try:
        with self.client.stream('GET', endpoint) as response:
            response.raise_for_status()

            with open(path, 'wb') as handle:
                for chunk in response.iter_bytes(chunk_size=8192):
                    if chunk:
                        handle.write(chunk)
    except Exception:
        message = 'Failed to download file'
        log.exception(message)

        raise PortalDownloadError(message) from None
    else:
        return path

get_decryption_key

A method that retrieves the decryption key from the portal.

Parameters:

  • identifier (str) –

    The key identifier.

Returns:

Raises:

  • PortalDecryptionError

    If fetching the key fails.

Source code in dev_tool/services/portal/database/service.py
def get_decryption_key(self, identifier: str) -> dict[str, str]:
    """
    A method that retrieves the decryption key from the portal.

    :param identifier: The key identifier.
    :return: The decryption key data.
    :raises PortalDecryptionError: If fetching the key fails.
    """

    try:
        endpoint = self.config.decrypt(identifier)
        response = self.client.get(endpoint)

        response.raise_for_status()
    except Exception:
        message = 'Unable to fetch the decryption key'
        log.exception(message)

        raise PortalDecryptionError(message) from None
    else:
        return response.json()

restore_database

A method that restores a database from the portal as a background task.

Parameters:

  • project (str) –

    The project name from pyproject.toml.

  • database (str) –

    The name of the database to restore.

Returns:

  • str

    The task ID for tracking the restoration progress.

Source code in dev_tool/services/portal/database/service.py
def restore_database(self, project: str, database: str) -> str:
    """
    A method that restores a database from the portal as a background task.

    :param project: The project name from pyproject.toml.
    :param database: The name of the database to restore.
    :return: The task ID for tracking the restoration progress.
    """

    existing = self._is_existing_dump(database)
    reuse = False

    if existing:
        reuse = get_user_confirmation(
            f'Found existing encrypted dump: {existing.name}. Do you want to re-use it?'
        )

        if not reuse:
            pattern = f'{database}_*.sql.encrypted'

            for old_dump in BASE.glob(pattern):
                old_dump.unlink()
                log.info(f'Deleted old dump: {old_dump.name}')

    def process() -> str:
        try:
            if reuse and existing:
                log.info(f'Re-using existing dump: {existing.name}')

                timestamp_parts = existing.stem.replace('.sql', '').split('_')[-2:]
                timestamp = f'{timestamp_parts[0]}_{timestamp_parts[1]}'
                identifier = f'{database}_{timestamp}'

                key = self.get_decryption_key(identifier)
                content = self.decrypt_content(existing, key)

                self.manager.from_decrypted_content_to_docker_database(content)

                return f'Database restoration completed successfully for {database} using existing dump'

            data = {
                'project': project,
                'database': database,
                'tables': None,
                'anonymize': True
            }

            result = self.create_dump(data)

            job_id = result['job_id']
            filename = result['filename']
            identifier = result['key_identifier']

            file = self.download_file(job_id, filename)
            key = self.get_decryption_key(identifier)
            content = self.decrypt_content(file, key)

            self.manager.from_decrypted_content_to_docker_database(content)
        except (PortalDumpError, PortalDownloadError, PortalDecryptionError):
            raise
        except Exception:
            message = f'Failed to restore database: {database}'
            log.exception(message)

            raise PortalDumpError(message) from None
        else:
            return f'Database restoration completed successfully for {database}'

    title = f'Restoring database: {database}'

    return self.task_manager.submit(
        TaskType.DATABASE_RESTORE,
        title,
        process,
        database=database
    )