Skip to content

runner

dev_tool.services.runner

__all__ = ['BaseProcessRunner'] module-attribute

BaseProcessRunner

Bases: ABC

An abstract base class for process runners.

This class handles the common process lifecycle including starting, streaming output, and stopping a subprocess.

The constructor for the BaseProcessRunner class.

Parameters:

  • project_name (str) –

    The project name for container naming.

Source code in dev_tool/services/runner/base.py
def __init__(self, project_name: str) -> None:
    """
    The constructor for the BaseProcessRunner class.

    :param project_name: The project name for container naming.
    """

    self.environment: dict[str, str] = {}
    self.output_thread: threading.Thread | None = None
    self.process: subprocess.Popen | None = None
    self.project_name = project_name
    self.ready_event = threading.Event()
    self.stop_event = threading.Event()

environment = {} instance-attribute

output_thread = None instance-attribute

process = None instance-attribute

project_name = project_name instance-attribute

ready_event = threading.Event() instance-attribute

stop_event = threading.Event() instance-attribute

run

A method that starts the subprocess and output thread.

Source code in dev_tool/services/runner/base.py
def run(self) -> None:
    """A method that starts the subprocess and output thread."""

    if not self._is_available():
        return

    if self._is_containerized() and not self._container_exists():
        return

    if not self._is_containerized():
        self._stop_container()

    creationflags = 0

    if sys.platform == OperatingSystem.WINDOWS:
        creationflags = subprocess.CREATE_NEW_PROCESS_GROUP

    command = self._build_command()

    env = None

    if self.environment:
        env = {**os.environ, **self.environment}

    self.process = subprocess.Popen(
        command,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        creationflags=creationflags,
        env=env
    )

    self.output_thread = threading.Thread(target=self._stream_output, daemon=True)
    self.output_thread.start()

run_and_wait

A method that starts the subprocess and blocks until it exits or is interrupted.

Source code in dev_tool/services/runner/base.py
def run_and_wait(self) -> None:
    """A method that starts the subprocess and blocks until it exits or is interrupted."""

    self.run()

    if self.process is None:
        return

    handler = signal.getsignal(signal.SIGINT)

    try:
        signal.signal(signal.SIGINT, lambda _s, _f: self.stop_event.set())
        signal_registered = True
    except ValueError:
        signal_registered = False

    try:
        while not self.stop_event.is_set():
            if self.process.poll() is not None:
                break

            self.stop_event.wait(timeout=0.1)
    except KeyboardInterrupt:
        self.stop_event.set()
    finally:
        self.stop()

        if signal_registered:
            signal.signal(signal.SIGINT, handler)

stop

A method that stops the subprocess and cleans up resources.

Source code in dev_tool/services/runner/base.py
def stop(self) -> None:
    """A method that stops the subprocess and cleans up resources."""

    self.stop_event.set()

    if self._is_containerized():
        self._stop_container()

    self._stop_process()

    if self.process:
        try:
            self.process.wait(timeout=5)
        except subprocess.TimeoutExpired:
            self.process.kill()

        if self.process.stdout:
            self.process.stdout.close()

wait_until_ready

A method that blocks until the process signals readiness or the timeout expires.

Parameters:

  • timeout (float, default: 30.0 ) –

    The maximum time to wait in seconds.

Returns:

  • bool

    True if the process is ready, False if the timeout was reached.

Source code in dev_tool/services/runner/base.py
def wait_until_ready(self, timeout: float = 30.0) -> bool:
    """
    A method that blocks until the process signals readiness or the timeout expires.

    :param timeout: The maximum time to wait in seconds.
    :return: True if the process is ready, False if the timeout was reached.
    """

    return self.ready_event.wait(timeout=timeout)