Skip to content

database

dev_tool.services.portal.database

__all__ = ['DatabaseService', 'JobStatus'] module-attribute

JobStatus

Bases: StrEnum

COMPLETED = 'completed' class-attribute instance-attribute

FAILED = 'failed' class-attribute instance-attribute

DatabaseService

Bases: BaseService

A service class for managing portal database operations.

This class provides methods for creating database dumps, downloading files, decrypting content, and restoring databases from the portal.

The constructor for the DatabaseService class.

Parameters:

Source code in dev_tool/services/portal/database/service.py
def __init__(self, config: PortalServiceConfig, client: APIClientService) -> None:
    """
    The constructor for the DatabaseService class.

    :param config: The portal service configuration.
    :param client: The API client service for making requests.
    """

    super().__init__()

    self.config = config
    self.client = client

    self.manager = PostgresManager(
        DockerPostgresDatabase(),
        CloudPostgresDatabase()
    )

    self.task_manager = TaskManager()

config = config instance-attribute

client = client instance-attribute

manager = PostgresManager(DockerPostgresDatabase(), CloudPostgresDatabase()) instance-attribute

task_manager = TaskManager() instance-attribute

admin_create_dump

A method that creates an unrestricted admin database dump on the portal.

Parameters:

  • data (dict[str, Any]) –

    The dump configuration data.

Returns:

  • dict[str, Any]

    The dump result containing filename and key identifier.

Raises:

  • PortalDumpError

    If the dump creation fails.

Source code in dev_tool/services/portal/database/service.py
def admin_create_dump(self, data: dict[str, Any]) -> dict[str, Any]:
    """
    A method that creates an unrestricted admin database dump on the portal.

    :param data: The dump configuration data.
    :return: The dump result containing filename and key identifier.
    :raises PortalDumpError: If the dump creation fails.
    """

    try:
        endpoint = self.config.admin_dump()
        response = self.client.post(endpoint, json=data, timeout=60.0)
        response.raise_for_status()
        result = response.json()

        if 'job_id' in result:
            job_id = result['job_id']
            result = self._wait_for_dump_completion(job_id)
            result['job_id'] = job_id

            return result
    except PortalDumpError:
        raise
    except Exception as exception:
        message = 'Failed to create admin database dump'
        log.exception(message)

        raise PortalDumpError(message) from exception
    else:
        return result

backup_database

A method that backs up a database to a local .sql file as a background task.

Parameters:

  • project (str) –

    The project name from pyproject.toml.

  • database (str) –

    The name of the database to backup.

Returns:

  • str

    The task ID for tracking the backup progress.

Source code in dev_tool/services/portal/database/service.py
def backup_database(self, project: str, database: str) -> str:
    """
    A method that backs up a database to a local .sql file as a background task.

    :param project: The project name from pyproject.toml.
    :param database: The name of the database to backup.
    :return: The task ID for tracking the backup progress.
    """

    def process() -> str:
        try:
            data = {
                'project': project,
                'database': database,
                'tables': None,
                'anonymize': False
            }

            result = self.admin_create_dump(data)

            job_id = result['job_id']
            filename = result['filename']
            identifier = result['key_identifier']

            encrypted_file = self.download_file(job_id, filename)
            key_data = self.get_decryption_key(identifier)
            key = bytes.fromhex(key_data['key'])

            sql_filename = filename.replace('.gz.encrypted', '')
            sql_path = BASE / sql_filename

            self._decrypt_to_file(encrypted_file, sql_path, key)

            encrypted_file.unlink()

            message = f'Database backup completed: {sql_filename}'
            self.notification.normal_text(message)
            log.info(message)
        except (PortalDumpError, PortalDownloadError, PortalDecryptionError):
            raise
        except Exception as exception:
            message = f'Failed to backup database: {database}'
            log.exception(message)

            raise PortalDumpError(message) from exception
        else:
            return f'Database backup saved to {sql_filename}'

    title = f'Backing up database: {database}'

    return self.task_manager.submit(
        TaskType.DATABASE_RESTORE,
        title,
        process,
        database=database
    )

create_dump

A method that creates a database dump on the portal.

Parameters:

  • data (dict[str, Any]) –

    The dump configuration data.

Returns:

  • dict[str, Any]

    The dump result containing filename and key identifier.

Raises:

  • PortalDumpError

    If the dump creation fails.

Source code in dev_tool/services/portal/database/service.py
def create_dump(self, data: dict[str, Any]) -> dict[str, Any]:
    """
    A method that creates a database dump on the portal.

    :param data: The dump configuration data.
    :return: The dump result containing filename and key identifier.
    :raises PortalDumpError: If the dump creation fails.
    """

    try:
        endpoint = self.config.dump()
        response = self.client.post(endpoint, json=data, timeout=60.0)
        response.raise_for_status()
        result = response.json()

        if 'job_id' in result:
            job_id = result['job_id']
            result = self._wait_for_dump_completion(job_id)
            result['job_id'] = job_id

            return result
    except PortalDumpError:
        raise
    except Exception as exception:
        message = 'Failed to create database dump'
        log.exception(message)

        raise PortalDumpError(message) from exception
    else:
        return result

decrypt_content

A method that decrypts an encrypted file and returns the content.

Parameters:

  • file (Path) –

    The path to the encrypted file.

  • key_data (dict[str, str]) –

    The key data dictionary containing the hex-encoded key.

Returns:

  • str

    The decrypted content as a string.

Raises:

  • PortalDecryptionError

    If decryption fails.

Source code in dev_tool/services/portal/database/service.py
def decrypt_content(self, file: Path, key_data: dict[str, str]) -> str:
    """
    A method that decrypts an encrypted file and returns the content.

    :param file: The path to the encrypted file.
    :param key_data: The key data dictionary containing the hex-encoded key.
    :return: The decrypted content as a string.
    :raises PortalDecryptionError: If decryption fails.
    """

    try:
        key = bytes.fromhex(key_data['key'])
        aesgcm = AESGCM(key)
        decompressor = zlib.decompressobj(wbits=31)
        result: list[str] = []

        with open(file, 'rb') as source:
            while True:
                header = source.read(16)

                if not header:
                    break

                chunk_size = int.from_bytes(header[:4], 'big')
                nonce = header[4:16]

                encrypted = source.read(chunk_size)
                decrypted = aesgcm.decrypt(nonce, encrypted, None)
                decompressed = decompressor.decompress(decrypted)

                result.append(decompressed.decode('utf-8'))

            result.append(decompressor.flush().decode('utf-8'))
    except Exception as exception:
        message = 'Failed to decrypt content'
        log.exception(message)

        raise PortalDecryptionError(message) from exception
    else:
        return ''.join(result)

download_file

A method that downloads a file from the portal.

Parameters:

  • job_id (str) –

    The job ID for the dump.

  • filename (str) –

    The name of the file to download.

Returns:

  • Path

    The path to the downloaded file.

Raises:

  • PortalDownloadError

    If the download fails.

Source code in dev_tool/services/portal/database/service.py
def download_file(self, job_id: str, filename: str) -> Path:
    """
    A method that downloads a file from the portal.

    :param job_id: The job ID for the dump.
    :param filename: The name of the file to download.
    :return: The path to the downloaded file.
    :raises PortalDownloadError: If the download fails.
    """

    endpoint = self.config.download(job_id)
    path = BASE / filename

    try:
        with self.client.stream('GET', endpoint) as response:
            response.raise_for_status()

            with open(path, 'wb') as handle:
                for chunk in response.iter_bytes(chunk_size=8192):
                    if chunk:
                        handle.write(chunk)
    except Exception as exception:
        message = 'Failed to download file'
        log.exception(message)

        raise PortalDownloadError(message) from exception
    else:
        return path

get_decryption_key

A method that retrieves the decryption key from the portal.

Parameters:

  • identifier (str) –

    The key identifier.

Returns:

Raises:

  • PortalDecryptionError

    If fetching the key fails.

Source code in dev_tool/services/portal/database/service.py
def get_decryption_key(self, identifier: str) -> dict[str, str]:
    """
    A method that retrieves the decryption key from the portal.

    :param identifier: The key identifier.
    :return: The decryption key data.
    :raises PortalDecryptionError: If fetching the key fails.
    """

    try:
        endpoint = self.config.decrypt(identifier)
        response = self.client.get(endpoint)

        response.raise_for_status()
    except Exception as exception:
        message = 'Unable to fetch the decryption key'
        log.exception(message)

        raise PortalDecryptionError(message) from exception
    else:
        return response.json()

restore_database

A method that restores a database from the portal as a background task.

Parameters:

  • project (str) –

    The project name from pyproject.toml.

  • database (str) –

    The name of the database to restore.

Returns:

  • str

    The task ID for tracking the restoration progress.

Source code in dev_tool/services/portal/database/service.py
def restore_database(self, project: str, database: str) -> str:
    """
    A method that restores a database from the portal as a background task.

    :param project: The project name from pyproject.toml.
    :param database: The name of the database to restore.
    :return: The task ID for tracking the restoration progress.
    """

    existing = self._is_existing_dump(database)
    reuse = False

    if existing:
        reuse = get_user_confirmation(
            f'Found existing encrypted dump: {existing.name}. Do you want to re-use it?'
        )

        if not reuse:
            pattern = f'{database}_*.sql.gz.encrypted'

            for old_dump in BASE.glob(pattern):
                old_dump.unlink()
                log.info(f'Deleted old dump: {old_dump.name}')

    def process() -> str:
        try:
            if reuse and existing:
                log.info(f'Re-using existing dump: {existing.name}')

                identifier = existing.name
                key_data = self.get_decryption_key(identifier)
                key = bytes.fromhex(key_data['key'])

                with tempfile.TemporaryDirectory() as tmpdir:
                    sql_file = Path(tmpdir) / existing.name.replace('.gz.encrypted', '')
                    self._decrypt_to_file(existing, sql_file, key)
                    self.manager.from_decrypted_file_to_docker_database(sql_file)

                return f'Database restoration completed successfully for {database} using existing dump'

            data = {
                'project': project,
                'database': database,
                'tables': None,
                'anonymize': True
            }

            result = self.create_dump(data)

            job_id = result['job_id']
            filename = result['filename']
            identifier = result['key_identifier']

            encrypted_file = self.download_file(job_id, filename)
            key_data = self.get_decryption_key(identifier)
            key = bytes.fromhex(key_data['key'])

            with tempfile.TemporaryDirectory() as tmpdir:
                sql_file = Path(tmpdir) / filename.replace('.gz.encrypted', '')
                self._decrypt_to_file(encrypted_file, sql_file, key)
                self.manager.from_decrypted_file_to_docker_database(sql_file)
        except (PortalDumpError, PortalDownloadError, PortalDecryptionError):
            raise
        except Exception as exception:
            message = f'Failed to restore database: {database}: {exception}'
            log.exception(message)

            raise PortalDumpError(message) from exception
        else:
            return f'Database restoration completed successfully for {database}'

    title = f'Restoring database: {database}'

    return self.task_manager.submit(
        TaskType.DATABASE_RESTORE,
        title,
        process,
        database=database
    )