Bases: BaseService
A service class for PostgreSQL database operations.
This class provides methods for creating, dropping, and managing PostgreSQL databases.
Source code in dev_tool/services/postgres/service.py
| def __init__(self) -> None:
super().__init__()
|
check_if_superuser_exists
staticmethod
A method that checks if the specified superuser exists in PostgreSQL.
Returns:
-
bool
–
True if the superuser exists, False otherwise.
Source code in dev_tool/services/postgres/service.py
| @staticmethod
def check_if_superuser_exists() -> bool:
"""
A method that checks if the specified superuser exists in PostgreSQL.
:return: True if the superuser exists, False otherwise.
"""
user = os.getenv('DATABASE_USER')
try:
with PostgresService.get_postgres_connection() as connection:
cursor = connection.cursor()
query = 'SELECT 1 FROM pg_roles WHERE rolname = %s'
cursor.execute(query, (user,))
result = cursor.fetchone()
except PostgresConnectionError:
raise
except Exception:
message = f'Failed to check if superuser "{user}" exists'
log.exception(message)
raise SuperuserCreationError(message) from None
else:
return result is not None
|
create_database
staticmethod
A method that creates a new PostgreSQL database.
Parameters:
-
database_name
(str)
–
The name of the database to create.
Source code in dev_tool/services/postgres/service.py
| @staticmethod
def create_database(database_name: str) -> None:
"""
A method that creates a new PostgreSQL database.
:param database_name: The name of the database to create.
"""
try:
with PostgresService.get_postgres_connection(autocommit=True) as connection:
cursor = connection.cursor()
identifier = Identifier(database_name)
query = SQL('CREATE DATABASE {}').format(identifier)
cursor.execute(query)
message = f'Database "{database_name}" created.'
CONTEXT.notification.normal_text(message)
log.debug(message)
except PostgresConnectionError:
raise
except Exception:
message = f'Failed to create database "{database_name}"'
log.exception(message)
raise DatabaseCreationError(message) from None
|
create_postgres_superuser
staticmethod
A method that creates a PostgreSQL superuser if it does not exist.
Source code in dev_tool/services/postgres/service.py
| @staticmethod
def create_postgres_superuser() -> None:
"""A method that creates a PostgreSQL superuser if it does not exist."""
user = os.getenv('DATABASE_USER')
try:
if not PostgresService.check_if_superuser_exists():
password = os.getenv('DATABASE_PASSWORD')
message = f'Superuser {user} does not exist. Creating user.'
CONTEXT.notification.normal_text(message)
log.debug(message)
with PostgresService.get_postgres_connection() as connection:
cursor = connection.cursor()
identifier = Identifier(user)
query = SQL('CREATE USER {} WITH SUPERUSER PASSWORD %s').format(identifier)
cursor.execute(
query,
(password,)
)
connection.commit()
message = f'Superuser {user} created and privileges granted.'
CONTEXT.notification.normal_text(message)
log.debug(message)
else:
message = f'Superuser {user} already exists. Skipping...'
CONTEXT.notification.normal_text(message)
log.debug(message)
except (PostgresConnectionError, SuperuserCreationError):
raise
except Exception:
message = f'Unexpected error creating superuser "{user}"'
log.exception(message)
raise SuperuserCreationError(message) from None
|
drop_database
staticmethod
A method that drops a PostgreSQL database.
Parameters:
-
database_name
(str)
–
The name of the database to drop.
Source code in dev_tool/services/postgres/service.py
| @staticmethod
def drop_database(database_name: str) -> None:
"""
A method that drops a PostgreSQL database.
:param database_name: The name of the database to drop.
"""
try:
with PostgresService.get_postgres_connection(autocommit=True) as connection:
cursor = connection.cursor()
identifier = Identifier(database_name)
query = SQL('DROP DATABASE IF EXISTS {}').format(identifier)
cursor.execute(query)
message = f'Database "{database_name}" dropped.'
CONTEXT.notification.normal_text(message)
log.debug(message)
except PostgresConnectionError:
raise
except Exception:
message = f'Failed to drop database "{database_name}"'
log.exception(message)
raise DatabaseDeletionError(message) from None
|
get_postgres_connection
staticmethod
A context manager that provides a PostgreSQL database connection.
:yield: A PostgreSQL database connection.
Parameters:
-
autocommit
(bool, default:
False
)
–
A flag indicating whether to enable autocommit mode.
Source code in dev_tool/services/postgres/service.py
| @staticmethod
@contextmanager
def get_postgres_connection(autocommit: bool = False) -> psycopg2_connection:
"""
A context manager that provides a PostgreSQL database connection.
:param autocommit: A flag indicating whether to enable autocommit mode.
:yield: A PostgreSQL database connection.
"""
connection = None
@retry_on_failure(maximum_retries=10, delay=2)
def connect() -> psycopg2_connection:
return psycopg2.connect(
dbname='postgres',
user=os.getenv('DATABASE_USER'),
password=os.getenv('DATABASE_PASSWORD'),
host=os.getenv('DATABASE_HOST', 'localhost'),
port=int(os.getenv('DATABASE_PORT', '5432'))
)
try:
try:
connection = connect()
if autocommit:
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
yield connection
except RetryExceededError:
message = 'Exceeded maximum retries for Postgres connection'
log.exception(message)
raise PostgresConnectionError(message) from None
except Exception:
message = 'Failed to establish PostgreSQL connection'
log.exception(message)
raise PostgresConnectionError(message) from None
finally:
if connection:
connection.close()
|
setup_django_databases
staticmethod
A method that sets up databases for Django.
This method drops and recreates the main and test databases.
Source code in dev_tool/services/postgres/service.py
| @staticmethod
@is_django_environment_variables
def setup_django_databases() -> None:
"""
A method that sets up databases for Django.
This method drops and recreates the main and test databases.
"""
database_name = os.getenv('DATABASE_NAME')
if database_name is None:
message = 'Database name is required'
log.debug(message)
return
try:
PostgresService.drop_database(database_name)
PostgresService.drop_database('test_' + database_name)
PostgresService.create_database(database_name)
except (DatabaseCreationError, DatabaseDeletionError):
raise
except Exception:
message = 'Failed to setup Django databases'
log.exception(message)
raise DatabaseSetupError(message) from None
|
is_dump_file
staticmethod
A method that checks if a database dump file exists.
Parameters:
Returns:
-
bool
–
True if the dump file exists, False otherwise.
Source code in dev_tool/services/postgres/service.py
| @staticmethod
@is_django_environment_variables
def is_dump_file(postgres_database: CloudPostgresDatabase | DockerPostgresDatabase | None = None) -> bool:
"""
A method that checks if a database dump file exists.
:param postgres_database: The PostgreSQL database object.
:return: True if the dump file exists, False otherwise.
"""
if is_dump_environment_variables():
if postgres_database is None:
postgres_database = CloudPostgresDatabase()
sql_input_file = BASE / (postgres_database.config.name + '.sql')
sql_output_file = BASE / (postgres_database.config.name + '-clean.sql')
return sql_input_file.exists() or sql_output_file.exists()
return False
|
dump_cloud_to_docker_database
staticmethod
A method that dumps a cloud PostgreSQL database to a Docker database.
Parameters:
-
redump
(bool)
–
A flag indicating whether to (re)dump the database.
-
reuse
(bool)
–
A flag indicating whether to reuse an existing dump file.
Source code in dev_tool/services/postgres/service.py
| @staticmethod
@is_postgres_environment_variables
def dump_cloud_to_docker_database(redump: bool, reuse: bool) -> None:
"""
A method that dumps a cloud PostgreSQL database to a Docker database.
:param redump: A flag indicating whether to (re)dump the database.
:param reuse: A flag indicating whether to reuse an existing dump file.
"""
if is_dump_environment_variables():
try:
cloud_database = CloudPostgresDatabase()
docker_database = DockerPostgresDatabase()
postgres_manager = PostgresManager(
docker_database=docker_database,
cloud_database=cloud_database
)
if redump:
postgres_manager.from_cloud_database_to_docker_database()
return
if PostgresService.is_dump_file(cloud_database):
if reuse:
postgres_manager.from_sql_file_to_docker_database()
else:
message = 'Skipping reuse of existing dump file...'
CONTEXT.notification.normal_text(message)
log.debug(message)
else:
message = 'No dump file exists. Nothing to reuse and no (re)dump requested.'
CONTEXT.notification.normal_text(message)
log.debug(message)
except Exception:
message = 'Failed to dump cloud database to Docker database'
log.exception(message)
raise DatabaseDumpError(message) from None
else:
message = 'Skipping dumping to Docker database...'
CONTEXT.notification.normal_text(message)
log.debug(message)
|
restore_from_content
A method that restores a database from decrypted content.
Parameters:
-
content
(str)
–
The decrypted SQL content to restore.
Source code in dev_tool/services/postgres/service.py
| def restore_from_content(self, content: str) -> None:
"""
A method that restores a database from decrypted content.
:param content: The decrypted SQL content to restore.
"""
try:
manager = PostgresManager(
DockerPostgresDatabase(),
CloudPostgresDatabase()
)
manager.from_decrypted_content_to_docker_database(content)
message = 'Database restored successfully from content'
CONTEXT.notification.success_text(message)
log.info(message)
except Exception:
message = 'Failed to restore database from content'
log.exception(message)
raise DatabaseRestoreError(message) from None
|
restore_from_sql_file
A method that restores a database from a raw SQL file.
Parameters:
-
file
(Path)
–
The path to the SQL file.
Source code in dev_tool/services/postgres/service.py
| def restore_from_sql_file(self, file: Path) -> None:
"""
A method that restores a database from a raw SQL file.
:param file: The path to the SQL file.
"""
try:
sql_output_file = BASE / f'{file.stem}-clean.sql'
manager = PostgresManager(
DockerPostgresDatabase(),
CloudPostgresDatabase()
)
manager.update_sql_file(file, sql_output_file)
manager.docker_database.disconnect_all()
if not manager.docker_database.user_exists():
manager.docker_database.create_user()
manager.docker_database.grant_all_privileges()
manager.docker_database.restore_database_from_file(sql_output_file)
if sql_output_file.exists():
sql_output_file.unlink()
message = f'Database restored successfully from {file.name}'
CONTEXT.notification.success_text(message)
log.info(message)
except Exception:
message = f'Failed to restore database from SQL file {file.name}'
log.exception(message)
raise DatabaseRestoreError(message) from None
|