Skip to content

service

dev_tool.services.task.service

log = logging.getLogger(__name__) module-attribute

TaskManager

Source code in dev_tool/services/task/service.py
def __init__(self) -> None:
    if not self._initialized:
        self._initialized = True
        self._callbacks = {}
        self._cancellation_events = {}
        self._lock = threading.Lock()
        self._posthooks = {}
        self._prehooks = {}
        self._thread_task_map = {}
        self._watchers = set()

        self.tasks: dict[str, TaskResult] = {}
        self.notification = CONTEXT.get_notification_service()

tasks = {} instance-attribute

notification = CONTEXT.get_notification_service() instance-attribute

__new__

Source code in dev_tool/services/task/service.py
def __new__(cls) -> Self:
    if cls._instance is None:
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._initialized = False

    return cls._instance

set_posthook

A method that attaches a posthook to an existing task.

Source code in dev_tool/services/task/service.py
def set_posthook(self, identifier: str, hook: Callable[[], None]) -> None:
    """A method that attaches a posthook to an existing task."""

    with self._lock:
        self._posthooks[identifier] = hook

set_prehook

A method that attaches a prehook to an existing task.

Source code in dev_tool/services/task/service.py
def set_prehook(self, identifier: str, hook: Callable[[], None]) -> None:
    """A method that attaches a prehook to an existing task."""

    with self._lock:
        self._prehooks[identifier] = hook

register_watcher

A method that registers a progress watcher.

Parameters:

  • watcher (Callable[[TaskResult], None]) –

    The watcher function to register.

Source code in dev_tool/services/task/service.py
def register_watcher(self, watcher: Callable[[TaskResult], None]) -> None:
    """
    A method that registers a progress watcher.

    :param watcher: The watcher function to register.
    """

    with self._lock:
        self._watchers.add(watcher)

unregister_watcher

A method that unregisters a progress watcher.

Parameters:

  • watcher (Callable[[TaskResult], None]) –

    The watcher function to unregister.

Source code in dev_tool/services/task/service.py
def unregister_watcher(self, watcher: Callable[[TaskResult], None]) -> None:
    """
    A method that unregisters a progress watcher.

    :param watcher: The watcher function to unregister.
    """

    with self._lock:
        self._watchers.discard(watcher)

submit

A method that submits a task for background execution.

Parameters:

  • task_type (TaskType) –

    The type of task to submit.

  • title (str) –

    The task title.

  • func (Callable[[], str]) –

    The function to execute.

  • completion_callback (Callable[[TaskResult], None] | None, default: None ) –

    Optional callback for task completion.

  • parameters

    Additional parameters for the task.

Returns:

  • str

    The task ID.

Source code in dev_tool/services/task/service.py
def submit(self, task_type: TaskType, title: str, func: Callable[[], str], completion_callback: Callable[[TaskResult], None] | None = None, **parameters) -> str:
    """
    A method that submits a task for background execution.

    :param task_type: The type of task to submit.
    :param title: The task title.
    :param func: The function to execute.
    :param completion_callback: Optional callback for task completion.
    :param parameters: Additional parameters for the task.
    :return: The task ID.
    """

    task_id = str(uuid4())[:8]

    task = TaskResult(
        task_id=task_id,
        task_type=task_type,
        status=TaskStatus.PENDING,
        title=title,
        parameters=parameters
    )

    cancellation_event = threading.Event()

    with self._lock:
        self.tasks[task_id] = task
        self._cancellation_events[task_id] = cancellation_event

        if completion_callback:
            self._callbacks[task_id] = completion_callback

    self._notify(task)

    thread = threading.Thread(
        target=self._execute,
        args=(task_id, func),
        daemon=True
    )

    thread.start()
    return task_id

cancel

A method that cancels a running or pending task.

Parameters:

  • task_id (str) –

    The ID of the task to cancel.

Returns:

  • bool

    True if the task was cancelled, False if not found or already completed.

Source code in dev_tool/services/task/service.py
def cancel(self, task_id: str) -> bool:
    """
    A method that cancels a running or pending task.

    :param task_id: The ID of the task to cancel.
    :return: True if the task was cancelled, False if not found or already completed.
    """

    with self._lock:
        if task_id not in self.tasks:
            return False

        task = self.tasks[task_id]

        if task.status not in [TaskStatus.PENDING, TaskStatus.RUNNING]:
            return False

        if task_id in self._cancellation_events:
            self._cancellation_events[task_id].set()

        task.status = TaskStatus.CANCELLED
        task.completed = time.time()

    self.notification.dismiss_by_identifier(task_id)
    self._notify(task)
    self.notification.warning_banner(f'Cancelled: {task.title}', duration=3.0)
    log.debug(f'Cancelled task {task_id}')

    return True

is_cancelled

A method that checks if a task has been cancelled.

Parameters:

  • task_id (str | None, default: None ) –

    The ID of the task to check. If None, checks current thread.

Returns:

  • bool

    True if the task is cancelled, False otherwise.

Source code in dev_tool/services/task/service.py
def is_cancelled(self, task_id: str | None = None) -> bool:
    """
    A method that checks if a task has been cancelled.

    :param task_id: The ID of the task to check. If None, checks current thread.
    :return: True if the task is cancelled, False otherwise.
    """

    if task_id is None:
        thread_id = threading.get_ident()

        with self._lock:
            task_id = self._thread_task_map.get(thread_id)

        if task_id is None:
            return False

    with self._lock:
        if task_id in self._cancellation_events:
            return self._cancellation_events[task_id].is_set()

        return False

get

A method that gets all tasks.

Returns:

Source code in dev_tool/services/task/service.py
def get(self) -> list[TaskResult]:
    """
    A method that gets all tasks.

    :return: A list of all tasks sorted by creation time.
    """

    with self._lock:
        tasks = list(self.tasks.values())
        return sorted(tasks, key=lambda t: t.created, reverse=True)

get_active

A method that gets all active tasks.

Returns:

Source code in dev_tool/services/task/service.py
def get_active(self) -> list[TaskResult]:
    """
    A method that gets all active tasks.

    :return: A list of pending and running tasks.
    """

    with self._lock:
        return [
            task for task in self.tasks.values()
            if task.status in [TaskStatus.PENDING, TaskStatus.RUNNING]
        ]

get_completed

A method that gets all completed tasks.

Returns:

Source code in dev_tool/services/task/service.py
def get_completed(self) -> list[TaskResult]:
    """
    A method that gets all completed tasks.

    :return: A list of completed tasks.
    """

    with self._lock:
        return [
            task for task in self.tasks.values()
            if task.status == TaskStatus.COMPLETED
        ]

get_failed

A method that gets all failed tasks.

Returns:

Source code in dev_tool/services/task/service.py
def get_failed(self) -> list[TaskResult]:
    """
    A method that gets all failed tasks.

    :return: A list of failed tasks.
    """

    with self._lock:
        return [
            task for task in self.tasks.values()
            if task.status == TaskStatus.FAILED
        ]

get_task

A method that gets a specific task by ID.

Parameters:

  • task_id (str) –

    The task ID to retrieve.

Returns:

  • TaskResult | None

    The task result or None if not found.

Source code in dev_tool/services/task/service.py
def get_task(self, task_id: str) -> TaskResult | None:
    """
    A method that gets a specific task by ID.

    :param task_id: The task ID to retrieve.
    :return: The task result or None if not found.
    """

    with self._lock:
        return self.tasks.get(task_id)

get_latest_completed

A method that gets the most recently completed task.

Returns:

  • TaskResult | None

    The latest completed task or None if none exist.

Source code in dev_tool/services/task/service.py
def get_latest_completed(self) -> TaskResult | None:
    """
    A method that gets the most recently completed task.

    :return: The latest completed task or None if none exist.
    """

    with self._lock:
        completed = [
            task for task in self.tasks.values()
            if task.status == TaskStatus.COMPLETED
        ]

        if completed:
            return max(completed, key=lambda t: t.completed or 0)

        return None

clear_completed

A method that clears all completed and failed tasks.

Source code in dev_tool/services/task/service.py
def clear_completed(self) -> None:
    """A method that clears all completed and failed tasks."""

    with self._lock:
        self.tasks = {
            task_id: task for task_id, task in self.tasks.items()
            if task.status not in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]
        }

    self.notification.success_banner('Completed task(s) cleared', duration=5.0)

remove

A method that removes a specific task.

Parameters:

  • task (str) –

    The task ID to remove.

Returns:

  • bool

    True if the task was removed, False if not found.

Source code in dev_tool/services/task/service.py
def remove(self, task: str) -> bool:
    """
    A method that removes a specific task.

    :param task: The task ID to remove.
    :return: True if the task was removed, False if not found.
    """

    with self._lock:
        if task in self.tasks:
            del self.tasks[task]
            self._callbacks.pop(task, None)
            self._cancellation_events.pop(task, None)

            return True

        return False

has_pending_completion_notifications

A method that checks if there are recent completion notifications.

Returns:

  • bool

    True if there are recent completions, False otherwise.

Source code in dev_tool/services/task/service.py
def has_pending_completion_notifications(self) -> bool:
    """
    A method that checks if there are recent completion notifications.

    :return: True if there are recent completions, False otherwise.
    """

    with self._lock:
        return any(
            task.status == TaskStatus.COMPLETED and task.completed and
            time.time() - task.completed < 300
            for task in self.tasks.values()
        )

wait

A method that waits for a task to complete.

Source code in dev_tool/services/task/service.py
def wait(self, identifier: str, interval: float = 0.5) -> TaskResult | None:
    """A method that waits for a task to complete."""

    while True:
        task = self.get_task(identifier)

        if not task:
            return None

        if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]:
            return task

        time.sleep(interval)