1091 lines
68 KiB
Python
1091 lines
68 KiB
Python
# -*- coding: utf-8 -*-
|
||
# src/pganec/tui.py
|
||
|
||
import logging
|
||
import asyncio
|
||
from logging.handlers import MemoryHandler
|
||
from symtable import Class
|
||
from typing import Dict, Any, Optional, List, Tuple
|
||
from textual.app import App, ComposeResult
|
||
from textual.containers import Vertical, Horizontal, Container
|
||
from textual.css.query import DOMQuery
|
||
from textual.reactive import reactive
|
||
from textual.screen import Screen
|
||
from textual.widgets import Button, Header, Footer, Static, Log, Label, ListView, ListItem, OptionList
|
||
from textual.widget import Widget
|
||
from textual.message import Message
|
||
from textual import on
|
||
from typing import TYPE_CHECKING
|
||
if TYPE_CHECKING:
|
||
from .tui import PGanecApp
|
||
|
||
# --- Настройки и инициирование логирования модуля ---
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# --- КОНФИГ_ПЕРЕМЕННЕЫ которые использоваться в приложении (возможно стоит перенести в другой файл) ---
|
||
DB_FOR_BACKUP = "db_4b"
|
||
# --- Типы для шагов выбора ---
|
||
STEP_TYPE_SERVER = "select_server"
|
||
STEP_TYPE_DATABASE = "select_database"
|
||
STEP_TYPE_DESTINATION = "select_destination"
|
||
|
||
class WidgetType: # Определяем типы виджетов/шагов
|
||
BD_SERVERS_LIST = "db_servers_list"
|
||
DBS_IN_SERVER = "dbs_in_server"
|
||
TARGETS = "targets_list"
|
||
SERVICE_ACTIONS = "service_actions"
|
||
# Можно добавить другие типы по мере необходимости, например:
|
||
TEXT_INPUT = "text_input"
|
||
CONFIRMATION = "confirmation_step"
|
||
IN_PROGRESS = "progress"
|
||
|
||
|
||
# --- Кастомный обработчик логирования (для Textual) ---
|
||
class TextualLogHandler(logging.Handler):
|
||
"""Обработчик логирования, который направляет записи в виджет Textual Log."""
|
||
def __init__(self, textual_log_widget: Log):
|
||
super().__init__()
|
||
self.textual_log_widget = textual_log_widget
|
||
|
||
def emit(self, record: logging.LogRecord):
|
||
"""Отправляет отформатированную запись лога в виджет."""
|
||
try:
|
||
msg = self.format(record)
|
||
# Метод Log.write() является потоко-безопасным, и его можно вызывать напрямую.
|
||
self.textual_log_widget.write(msg)
|
||
except Exception:
|
||
# Стандартная обработка ошибок для Handler, если что-то пошло не так (например, виджет был удален
|
||
# или произошла ошибка форматирования)
|
||
self.handleError(record)
|
||
|
||
|
||
# --- Верхнее меню приложения (для Textual) ---
|
||
class MainMenu(Static):
|
||
def compose(self) -> ComposeResult:
|
||
yield Horizontal(
|
||
Button(label="Backup (DB\u2192Dump)", id="backup", classes="main-menu-button"),
|
||
Button(label="Restore (Dump\u2192DB)", id="restore", classes="main-menu-button"),
|
||
Button(label="Copy (DB\u2192DB)", id="copy", classes="main-menu-button"),
|
||
Button(label="Service", id="service", classes="main-menu-button"),
|
||
Button(label="Quit", id="quit", variant="error", classes="main-menu-button"),
|
||
id="menu",
|
||
)
|
||
# --- КЛАСС STEPWIDGET: НАЧАЛО---
|
||
class StepWidget(Static):
|
||
"""
|
||
Универсальный виджет многошагового выбора опций. На экране может быть несколько экземпляров этого виджета,
|
||
в зависимости от того какое содержание "закажет" основное приложение PGanecApp (в потоке)
|
||
"""
|
||
# --- Сообщения, которые этот виджет может отправлять ---
|
||
class SelectionMade(Message):
|
||
"""Сообщение о том, что в этом StepWidget был сделан выбор."""
|
||
def __init__(self, sender_widget: 'StepWidget', selection_data: Any, display_value: str):
|
||
self.sender_widget = sender_widget
|
||
self.selection_data = selection_data # Выбранные данные
|
||
self.display_value = display_value # Текст для отображения в "замороженном" состоянии
|
||
super().__init__()
|
||
|
||
class Cancelled(Message):
|
||
"""Сообщение о том, что текущий шаг в StepWidget был отменен."""
|
||
def __init__(self, sender_widget: 'StepWidget'):
|
||
self.sender_widget = sender_widget
|
||
super().__init__()
|
||
|
||
def __init__(self,
|
||
type_widget: str,
|
||
title_text: str,
|
||
step_level: int = 0, # Уровень/шаг в последовательности (он же номер экземпляра виджета в цепочке)
|
||
# is_frozen: bool = False, # Флаг, что виджет заморожен (выбор сделан) и если
|
||
context_data: Optional[Dict[str, Any]] = None, # <--- ДОБАВЬ ЭТОТ ПАРАМЕТР
|
||
*args, **kwargs):
|
||
super().__init__(*args, **kwargs)
|
||
self.type_widget: str = type_widget
|
||
self.title_text: str = title_text # Заголовок для этого шага
|
||
self.step_level: int = step_level # Шаг в конвейере выбора из PGanecApp.
|
||
self.is_frozen: bool = False # Изначально виджет не заморожен (отображается целиком)
|
||
self.context_data: Dict[str, Any] = context_data or {}
|
||
# self.is_frozen: bool = is_frozen
|
||
self.num_count: int = 1 # Счетчик для нумерации опций в этом экземпляре виджета
|
||
# self._current_selection_display_text: Optional[str] = None
|
||
self._current_selection_display_text: Optional[str] = None # Для отображения замороженного выбора
|
||
self._button_id_to_data_map: Dict[str, Any] = {} # Для связи ID кнопок с данными
|
||
# Уникальный ID для кнопки отмены этого экземпляра
|
||
self._cancel_button_id = f"cancel_step_{self.step_level}_{self.id or id(self)}" # ID кнопки "Отмена"
|
||
|
||
@property
|
||
def app_config(self) -> Dict[str, Any]:
|
||
""" Возвращает конфигурацию приложения PGanecApp, к которому принадлежит этот виджет. """
|
||
return getattr(self.app, "app_config", {})
|
||
|
||
def _create_cancel_button(self, num_for_cansel: int = 0) -> Button:
|
||
"""Создаёт кнопку 'Отмена'."""
|
||
return Button(
|
||
label=f"{num_for_cansel:02d}: Отмена", # num_count, очно, это счетчик для нумерации кнопок
|
||
id=self._cancel_button_id,
|
||
classes="step-cancel-button" # Класс для CSS в TUI
|
||
)
|
||
# Удаляем вообще все кнопки 'Отмена' во всех виджетах (если она вдруг осталась... для надежности)
|
||
# try:
|
||
# for btn in self.query(".cancel_widget"):
|
||
# btn.remove()
|
||
# except Exception: # DOMQuery.DoesNotExist: почему-то не видит статический анализатор
|
||
# pass # Кнопки уже нет, просто игнорируем
|
||
# yield Button(label=f"{self.num_count:02d}: Отмена",
|
||
# id=f"cancel_widget_{self.step_level}",
|
||
# classes="cancel_widget")
|
||
# self.mount(Button(label="{self.num_count:02d}: Отмена", id="cancel_widget", classes="list-button"))
|
||
|
||
# def _remove_cancel_button(self):
|
||
# """Удаляет кнопку 'Отмена' (при заморозке)."""
|
||
# try:
|
||
# for btn in self.query(".cancel_widget"):
|
||
# btn.remove()
|
||
# except Exception:
|
||
# pass # Кнопки уже нет
|
||
|
||
|
||
# def freeze(self, selection_id: str):
|
||
def freeze(self, selected_button_label: str):
|
||
"""Замораживает шаг: сохраняет информацию о выборе и обновляет виджет для отображения только этого выбора."""
|
||
# - Устанавливает self.is_frozen = True.
|
||
# - Сохраняет self._current_selection_display_text = selected_button_label.
|
||
# - Вызывает self.refresh(). Это заставит compose() перерисовать виджет в замороженном состоянии.
|
||
self.is_frozen = True
|
||
self._current_selection_display_text = str(selected_button_label)
|
||
# Вместо скрытия элементов, мы просто вызовем self.refresh(), а compose() сам решит, что отображать
|
||
# на основе self.is_frozen.
|
||
self.refresh()
|
||
logger.debug(
|
||
f"StepWidget (step {self.step_level}, id: {self.id}) заморожен. Выбор: {self._current_selection_display_text}")
|
||
|
||
|
||
def unfreeze(self):
|
||
""" "Размораживает" шаг, делая его снова интерактивным.
|
||
PGanecApp должен будет передать новые данные для отображения, если это необходимо,
|
||
перед вызовом unfreeze, или unfreeze должен принимать данные для re-compose.
|
||
Пока что unfreeze просто сбрасывает флаг и обновляет.
|
||
"""
|
||
# - Устанавливает self.is_frozen = False.
|
||
# - Сбрасывает self._current_selection_display_text = None.
|
||
# - Вызывает self.refresh(). compose() перерисует виджет в активном состоянии.
|
||
if not self.is_frozen: return # Уже разморожен
|
||
self.is_frozen = False
|
||
self._current_selection_display_text = None
|
||
self.num_count = 1 # Сбрасываем счетчик для нумерации
|
||
self.remove_children() # Очистим старое содержимое виджета, compose создаст новое
|
||
self.refresh() # Заставим compose() перерисовать активное состояние
|
||
logger.debug(f"StepWidget (step {self.step_level}, id: {self.id}) разморожен.")
|
||
|
||
|
||
def compose(self) -> ComposeResult:
|
||
"""Создаёт содержимое виджета."""
|
||
# - Рисует заголовок.
|
||
# - if self.is_frozen:: Рисует Static с self._current_selection_display_text.
|
||
# - else:: Рисует интерактивные кнопки выбора (на основе self.type_widget и данных, которые StepWidget может
|
||
# получить из self.app.app_config или которые ему передал PGanecApp при инициализации/конфигурации)
|
||
# и кнопку "Отмена".
|
||
# Заголовок шага
|
||
self._button_id_to_data_map.clear() # Очищаем перед каждым рендерингом активного состояния
|
||
|
||
yield Label(renderable=self.title_text, classes="step-title") # Заголовок виджета
|
||
|
||
if self.is_frozen:
|
||
# --- Виджет в ЗАМОРОЖЕННОМ состоянии ---
|
||
if self._current_selection_display_text:
|
||
yield Static(content=f"[\u2192\u2192] {self._current_selection_display_text}",
|
||
classes="frozen-choice-display")
|
||
else: # На случай, если заморожен без текста (не должно быть, но вдруг)
|
||
yield Static(content="[\u2192\u2192] Выбор сделан, но хрен знает, что выбрали",
|
||
classes="frozen-choice-display")
|
||
else:
|
||
if self.type_widget == WidgetType.BD_SERVERS_LIST:
|
||
# --- Виджет для выбора серверов баз данных ---
|
||
servers_data = self.app_config.get("servers", [])
|
||
if not servers_data:
|
||
yield Label("Нет серверов для выбора.", classes="placeholder-text")
|
||
else:
|
||
for server_conf in servers_data:
|
||
server_name = server_conf.get("name", f"NONAME {self.num_count:02d}")
|
||
button_id = server_conf.get("id", f"auto_id_{self.num_count}_{self.id or id(self)}") # ID уникальны
|
||
yield Button(label=f"{self.num_count:02d}: {server_name}", id=button_id, classes="list-button")
|
||
self._button_id_to_data_map[button_id] = server_conf # Сохраняем всю конфигурацию
|
||
self.num_count += 1
|
||
# ...
|
||
|
||
elif self.type_widget == WidgetType.DBS_IN_SERVER:
|
||
yield Label(renderable="Тут будет список кнопок для выбора базы данных в сервере...", classes="warning-text")
|
||
# ...
|
||
elif self.type_widget == WidgetType.TARGETS:
|
||
yield Label(renderable="Тут будет список кнопок для выбора тома назначения...", classes="warning-text")
|
||
# ...
|
||
elif self.type_widget == WidgetType.SERVICE_ACTIONS:
|
||
yield Label(renderable="Тут будут служебные функции...", classes="warning-text")
|
||
# ...
|
||
else:
|
||
logger.error(f"Неизвестный тип виджета: {self.type_widget}. Кыш отсюда!")
|
||
yield Label(renderable="Ошибка: неизвестный тип виджета.", classes="error-text")
|
||
# Добавляем кнопку "Отмена"
|
||
yield self._create_cancel_button(num_for_cansel= self.num_count) # Передаем текущий счетчик для кнопки "Отмена"
|
||
|
||
async def on_button_pressed(self, event: Button.Pressed) -> None:
|
||
""" Обработчик нажатия кнопок в этом виджете. """
|
||
button_id = event.button.id
|
||
if self.is_frozen: return # Виджет заморожен, на нем нет кнопок, не обрабатываем
|
||
|
||
# Если это кнопка отмены - удаляем виджет и сбрасываем флаг активности
|
||
if event.button.id == self._cancel_button_id: # Проверяем по уникальному ID кнопки отмены
|
||
logger.debug(f"Нажата 'Отмена' в StepWidget (step {self.step_level}, id: {self.id}).")
|
||
self.post_message(StepWidget.Cancelled(sender_widget=self))
|
||
# PGanecApp решит, удалять ли виджет. Если мы используем N экземпляров,
|
||
# то PGanecApp должен будет вызвать hide_and_reset() или remove() на этом экземпляре.
|
||
# Если мы создаем новый экземпляр для каждого шага, то await self.remove() здесь уместно.
|
||
# Пока оставим удаление на усмотрение PGanecApp, который получит сообщение.
|
||
self.num_count = 1 # Сброс счетчика нумерации
|
||
await self.remove() # Удаляем сам виджет из DOM
|
||
return
|
||
# self.app.selection_widget_active = False # <-- Сброс флага что виджет активен
|
||
# # Очищаем состояние виджета и удаляем его из DOM и все в нем
|
||
# self.is_frozen = False
|
||
# self.num_count = 1 # Сброс счетчика нумерации
|
||
# self._remove_cancel_button()
|
||
# self.app.selection_widget_instance = None # Сбрасываем ссылку на виджет в приложении
|
||
# self.app.selection_widget_active = False # Сбрасываем флаг активности виджета
|
||
# self.post_message(StepWidget.Cancelled(sender_widget=self)) # Сообщение об отмене
|
||
# await self.remove() # Удаляем сам виджет
|
||
# logger.info("SelectionWidget удален из DOM.")
|
||
# return
|
||
|
||
# Если это не кнопка отмены, значит это кнопка выбора
|
||
selected_option_data = self._button_id_to_data_map.get(button_id)
|
||
if selected_option_data is None:
|
||
logger.warning(f"Нет данных для кнопки с ID '{button_id}' в StepWidget (step {self.step_level}).")
|
||
return # Неизвестная кнопка (не из списка опций)
|
||
|
||
display_text_for_freeze = str(event.button.label)
|
||
|
||
logger.info(
|
||
f"StepWidget (step {self.step_level}, id: {self.id}): выбрано '{selected_option_data}' (метка: '{display_text_for_freeze}')")
|
||
# "Замораживаем" виджет, передавая ID и текст выбранной кнопки
|
||
self.freeze(selected_button_label=display_text_for_freeze)
|
||
# Отправляем сообщение приложению с выбранными данными
|
||
self.post_message(StepWidget.SelectionMade(
|
||
sender_widget=self,
|
||
selection_data=selected_option_data, # Отправляем реальные данные
|
||
display_value=display_text_for_freeze
|
||
))
|
||
# После заморозки виджета, PGanecApp должен будет обработать это сообщение и решить, что делать дальше.
|
||
|
||
async def on_mount(self) -> None:
|
||
""" Вызывается после монтирования виджета в DOM. """
|
||
if not self.is_frozen:
|
||
try:
|
||
# Пытаемся установить фокус на первую кнопку-опцию
|
||
first_button = self.query_one("Button.list-button", Button)
|
||
first_button.focus()
|
||
except DOMQuery.DoesNotExist:
|
||
try:
|
||
cancel_button = self.query_one(f"#{self._cancel_button_id}", Button)
|
||
cancel_button.focus()
|
||
except DOMQuery.DoesNotExist:
|
||
logger.warning(f"В StepWidget {self.id} нет кнопок для фокуса.")
|
||
except Exception as e: # Более общая ошибка для фокуса на кнопке отмены
|
||
logger.error(f"Ошибка при установке фокуса на кнопку Отмена в StepWidget {self.id}: {e}")
|
||
except Exception as e: # Более общая ошибка для фокуса на list-button
|
||
logger.error(f"Ошибка при установке фокуса на StepWidget {self.id}: {e}")
|
||
# --- КЛАСС STEPWIDGET: КОНЕЦ ---
|
||
|
||
|
||
|
||
|
||
# # --- Виджет для "встроенного" выбора ---
|
||
# class SelectionWidget(Static): # Наследуем от Static
|
||
# """
|
||
# Виджет для и PGanec TUI.
|
||
# """
|
||
# @property
|
||
# def app(self) -> "PGanecApp":
|
||
# """
|
||
# Возвращает ссылку на приложение PGanecApp , к которому принадлежит этот виджет. Нужно для доступа к методам
|
||
# и состоянию приложения (и статического анализатора, так как класс PGanecApp описан ниже и "отсюда не виден").
|
||
# :return:
|
||
# """
|
||
# return super().app # type: ignore
|
||
#
|
||
#
|
||
# def __init__(self,
|
||
# app_config: Optional[Dict[str, Any]] = None,
|
||
# action_type: Optional[str] = None,
|
||
# **kwargs):
|
||
# super().__init__(**kwargs)
|
||
# self.app_config = app_config if app_config is not None else {} # каждому экземпляру виджета нужна общая конфигурация
|
||
# # self.step_id = step_id # Номер этого этапа
|
||
# self.overall_action_type = action_type # Сохраняем тип действия ("backup", "restore", "copy", "service")
|
||
# self.current_step = 0 # Текущий шаг выбора (многоэтапный выбор), начинаем с 0
|
||
# self.selections: Dict[int, Any] = {} # Хранит выборы: {0: server_conf, 1: db_name, 2: dest_path}
|
||
# self._button_id_to_data_map: Dict[str, Any] = {} # Карта для хранения значений опций
|
||
# logger.debug(
|
||
# f"SelectionWidget initialized: action: '{self.overall_action_type}' / current_step: {self.current_step}"
|
||
# )
|
||
#
|
||
# # SelectionWidget(Static)
|
||
#
|
||
# def compose(self) -> ComposeResult:
|
||
# """ Создаёт содержимое виджета. """
|
||
# self._button_id_to_data_map.clear() # Очищаем карту значений
|
||
# num_count = 1 # Для нумерации опций текущего шага
|
||
#
|
||
# # --- Отображение уже сделанных выборов ---
|
||
# if self.current_step > 0 and 0 in self.selections:
|
||
# server_conf = self.selections[0]
|
||
# server_name = server_conf.get("name", "Неизвестный сервер")
|
||
# yield Static(f"1. Сервер: {server_name}", classes="previous-selection")
|
||
#
|
||
# if self.current_step > 1 and 1 in self.selections:
|
||
# db_name = self.selections[1]
|
||
# yield Static(f"2. База данных: {db_name}", classes="previous-selection")
|
||
#
|
||
# if self.current_step > 2 and 2 in self.selections: # Если есть третий шаг и он сделан
|
||
# dest_info = self.selections[2] # Может быть строка или словарь
|
||
# yield Static(f"3. Назначение: {dest_info}", classes="previous-selection")
|
||
#
|
||
# if self.overall_action_type == "backup":
|
||
# yield Label("Backup (DB\u2192Dump): Выберите BD-сервер:")
|
||
# # ... тут будет список серверов
|
||
# if "servers" in self.app_config:
|
||
# logger.info(f"TUI: Обнаружено {len(self.app_config['servers'])} серверов в конфигурации.")
|
||
# for server_conf in self.app_config["servers"]:
|
||
# server_name = server_conf.get("name")
|
||
# button_id = server_conf.get("id")
|
||
# if not button_id:
|
||
# button_id = f"server_{num_count:02d}" # Генерируем временный id, если не указан
|
||
# logger.warning(
|
||
# f"Сервер '{server_name}' не имеет 'id' в yaml-конфигурации. Установлен id='{button_id}'.")
|
||
# # continue # пропускаем?
|
||
# if button_id in self._button_id_to_data_map:
|
||
# logger.error(
|
||
# f"TUI: Дублирующийся ID кнопки '{button_id}' для сервера '{server_name}' из yaml-конфигурации. ПРОРУСКАЕМ.")
|
||
# continue
|
||
# # Добавляем кнопку для каждого сервера
|
||
# yield Button(label=f"{num_count:02d}: {server_name}", id=button_id, classes="list-button")
|
||
# self._button_id_to_data_map[button_id] = {"type": DB_FOR_BACKUP, "config": server_conf}
|
||
# num_count += 1
|
||
#
|
||
# elif self.overall_action_type == "restore":
|
||
# yield Label("Выберите том для восстановления:")
|
||
# if "targets" in self.app_config:
|
||
# logger.info(f"TUI: Обнаружено {len(self.app_config['targets'])} томов в конфигурации.")
|
||
# yield Static("Тут будет список...", classes="placeholder-text") # Временная заглушка
|
||
# # ... тут будет список томов
|
||
# elif self.overall_action_type == "copy":
|
||
# yield Label("Выберите BD-сервер (откуда копировать):")
|
||
# # ... тут будет список серверов
|
||
# elif self.overall_action_type == "service":
|
||
# yield Label("Служебные функции:")
|
||
# # ... тут будут служебные функции
|
||
#
|
||
# # Кнопка "Отмена" в конце любого списка
|
||
# yield Button(label=f"{num_count:02d}: Отмена", id="cancel_widget", classes="list-button")
|
||
#
|
||
#
|
||
# def on_mount(self) -> None:
|
||
# """ Вызывается после монтирования виджета в DOM. """
|
||
# logger.info(f"Виджет '{self.id}' (SelectionWidget) смонтирован.")
|
||
# try:
|
||
# # Пытаемся установить фокус на первую кнопку-опцию
|
||
# # Ищем первую кнопку с классом "list-button" внутри этого виджета
|
||
# first_option_button = self.query_one("Button.list-button", Button)
|
||
# first_option_button.focus()
|
||
# logger.debug(f"TUI: SelectionWidget: Фокус на кнопке-опцию: `{first_option_button.id}`")
|
||
# except Exception as e:
|
||
# # except DOMQuery.DoesNotExist:
|
||
# logger.error(f"SelectionWidget: Ошибка при установке фокуса в Виджете: {e}")
|
||
# # logger.warning(f"SelectionWidget: OptionList не найден для установки фокуса (overall_action_type: {self.overall_action_type}).")
|
||
#
|
||
# # Можно сразу установить фокус на первый элемент, если он есть
|
||
# # self.query_one(OptionList).focus() или self.query_one(Button).focus()
|
||
#
|
||
#
|
||
# async def on_button_pressed(self, event: Button.Pressed) -> None:
|
||
# button_id = event.button.id
|
||
# button_data = self._button_id_to_data_map.get(button_id) # <--- Извлекаем данные по ID кнопки
|
||
#
|
||
# logger.info(f"SelectionWidget ({self.overall_action_type}): нажата кнопка-опция id='{button_id}', data={button_data}")
|
||
#
|
||
# if button_data:
|
||
# action_type_from_data = button_data.get("type")
|
||
# if action_type_from_data == DB_FOR_BACKUP:
|
||
# server_config = button_data.get("config") # <--- Получаем полную конфигурацию сервера
|
||
# logger.info(
|
||
# f"SelectionWidget ({self.overall_action_type}): выбран сервер {server_config.get('name') if isinstance(server_config, dict) else server_config}. Далее - выбор БД/дампа.")
|
||
# # Тут будет логика для перехода к следующему шагу, используя server_config
|
||
# self.app.bell()
|
||
# # ... другие обработчики ...
|
||
#
|
||
# if event.button.id == "cancel_widget":
|
||
# logger.debug("Нажата 'Отмена' в SelectionWidget.")
|
||
# await self.remove() # Удаляем сам виджет
|
||
# self.app.selection_widget_active = False # <-- Сброс флага что виджет активен
|
||
# # Можно также послать сообщение приложению, чтобы оно обновило состояние, если нужно
|
||
# # self.app.post_message(BackupSelectionCancelled())
|
||
# self.app.set_main_menu_enabled(True) # Включаем верхнее меню обратно
|
||
|
||
|
||
# --- ОСНОВНОЕ ПРИЛОЖЕНИЕ: НАЧАЛО ---
|
||
class PGanecApp(App):
|
||
CSS = """
|
||
Screen {
|
||
layout: vertical; /* Вертикальная компоновка для всего приложения */
|
||
overflow: auto; /* Добавляем прокрутку на весь экран, если контент не влезает */
|
||
}
|
||
Header { dock: top; height: 1; }
|
||
#main_content_area {
|
||
/* Этот контейнер будет занимать все доступное пространство между Header и Log/Footer */
|
||
height: 1fr; /* 1fr означает "одна доля доступного пространства" */
|
||
overflow-y: auto; /* Если контент не влезет, появится прокрутка */
|
||
padding: 0 1;
|
||
}
|
||
#title { width: 100%; text-align: left; background: purple; padding: 0; } /* Заголовок приложения */
|
||
#menu { width: 100%; height: 3; align: left top; padding: 0; }
|
||
#menu Button.main-menu-button { /* Кнопки верхнего меню */
|
||
align: center middle;
|
||
width: 20%;
|
||
color: orange;
|
||
background: transparent;
|
||
margin: 0; padding: 0;
|
||
border: round orange;
|
||
}
|
||
#menu Button.main-menu-button:hover {
|
||
background: $primary-background-lighten-2; /* Подсветка при наведении */
|
||
}
|
||
Button:focus { border: heavy green; }
|
||
/* Стили для лога приложения */
|
||
Log#app_log_viewer { dock: bottom; height: 10; border-top: double grey; background: 15%; }
|
||
Log#app_log_viewer:focus { height: 50%; border-top: solid green; background: 45%; }
|
||
|
||
/* Стили для StepWidget */
|
||
StepWidget {
|
||
border: round $primary-lighten-1;
|
||
background: $panel;
|
||
width: 100%;
|
||
}
|
||
StepWidget .step-title {
|
||
width: 100%;
|
||
text-style: bold;
|
||
color: $text-muted;
|
||
}
|
||
StepWidget .frozen-choice-display {
|
||
padding: 0;
|
||
color: $text-muted;
|
||
text-style: italic;
|
||
border: dashed $primary-darken-1; /* Выделим замороженный выбор */
|
||
margin-top: 1; /* Отступ от заголовка */
|
||
}
|
||
StepWidget Button.list-button, StepWidget Button.step-cancel-button {
|
||
width: auto;
|
||
height: auto;
|
||
min-height: 1;
|
||
align: left middle;
|
||
border: none;
|
||
background: transparent;
|
||
color: $text;
|
||
/* padding: 0 1;
|
||
margin-bottom: 0;q
|
||
margin-left: 0; padding-left: 1; */
|
||
}
|
||
StepWidget Button.list-button:hover {
|
||
background: $primary;
|
||
}
|
||
StepWidget Button.step-cancel-button {
|
||
color: $error; margin-left: -1;
|
||
}
|
||
/* StepWidget Button.step-cancel-button:hover {
|
||
background: $error;
|
||
} */
|
||
StepWidget .placeholder-text, StepWidget .error-text {
|
||
padding: 0 1;
|
||
color: $text-muted;
|
||
}
|
||
StepWidget .error-text {
|
||
color: $error;
|
||
}
|
||
|
||
|
||
/* Стили для встроенного виджета выбора */
|
||
SelectionWidget {
|
||
padding: 1;
|
||
margin-top: 0;
|
||
background: $panel; /* Используем системный цвет панели */
|
||
/* height: auto; /* Высота будет по содержимому */
|
||
}
|
||
|
||
SelectionWidget .placeholder-text {
|
||
padding: 0;
|
||
color: $text-muted;
|
||
}
|
||
Button.list-button { /* Стили для кнопок внутри виджета */
|
||
width: auto;
|
||
height: 1;
|
||
align: left middle;
|
||
border: hidden;
|
||
background: transparent;
|
||
color: $text;
|
||
margin-left: 0; padding-left: 1;
|
||
}
|
||
Button#cancel_widget {margin-left: -1; color: red; } /* Отдельный стиль для кнопки "Отмена" */
|
||
|
||
Footer { dock: bottom; height: 1; }
|
||
|
||
|
||
"""
|
||
|
||
BINDINGS = [
|
||
# ("b", "backup", "Backup"),
|
||
# ("r", "restore", "Restore"),
|
||
# ("c", "copy", "Copy"),
|
||
# ("s", "service", "Service"),
|
||
("q", "quit", "Quit"),
|
||
("right,down,tab", "focus_next", "Focus Next"), # ("down", "focus_next", "Focus Next"), # Стрелочки для навигации (вниз/вправо)
|
||
("left", "focus_previous", "Focus Previous"), ("up", "focus_previous", "Focus Previous"), # Стрелочки (вверх/влево)
|
||
("enter", "activate", "Activate"),
|
||
("escape", "handle_escape()", "Cancel/Back") # Обработка Escape
|
||
]
|
||
|
||
# Атрибут для хранения ссылки на виджет выбора, если он отображен
|
||
# --- Атрибуты для управления потоком и состоянием ---
|
||
active_flow_type: Optional[str] = None # Тип текущего активного потока (например, "backup")
|
||
flow_data: Dict[str, Any] = {} # Данные, собранные в ходе выполнения потока
|
||
# Словарь для хранения экземпляров StepWidget текущего потока, в нем сохраним "замороженные" шаги на экране.
|
||
# Ключ - step_level (int), значение - экземпляр StepWidget.
|
||
mounted_step_widgets: Dict[int, StepWidget] = {}
|
||
|
||
# selection_widget_instance: Optional[SelectionWidget] = None # Станет ненужным
|
||
# current_step_widget: Optional[StepWidget] = None # Новый
|
||
|
||
def __init__(self,
|
||
log_level_int: int = logging.INFO,
|
||
app_config: Dict[str, Any] = None,
|
||
early_log_handler: Optional[MemoryHandler] = None,
|
||
**kwargs):
|
||
super().__init__(**kwargs)
|
||
self.log_level_int = log_level_int
|
||
self.app_config = app_config if app_config is not None else {} # Сохраняем конфигурацию
|
||
self.early_log_handler = early_log_handler # <--- Сохраняем
|
||
self.selection_widget_instance = None # Инициализируем атрибут для виджета выбора
|
||
logger.debug(f"TUI инициализирован. Log Level: `{logging.getLevelName(self.log_level_int)}`;"
|
||
f" Config Keys: {list(self.app_config.keys())}")
|
||
|
||
|
||
def compose(self) -> ComposeResult:
|
||
""" Создает основной DOM приложения. """
|
||
yield Header() # Верхний колонтитул
|
||
with Vertical(id="main_content_area"): # Верхний колонтитул
|
||
yield Static("PGanec TUI", id="title") # Заголовок приложения
|
||
yield MainMenu() # Главное меню
|
||
# Другие элементы основного интерфейса могут быть здесь
|
||
# Место для нашего виджета выбора будет здесь.
|
||
# Мы не добавляем его сразу, а будем делать это динамически.
|
||
# Можно добавить "якорь" - пустой контейнер, если нужно точное позиционирование
|
||
# yield Container(id="selection_widget_placeholder")
|
||
yield Vertical(id="steps_container") # Контейнер, куда будут добавляться StepWidget'ы
|
||
yield Log(id="app_log_viewer", highlight=True, auto_scroll=True) # Лог внизу
|
||
yield Footer() # Нижний колонтитул (для биндингов)
|
||
|
||
def set_main_menu_enabled(self, enabled: bool):
|
||
""" Включает или отключает кнопки верхнего меню. """
|
||
try:
|
||
menu = self.query_one(MainMenu)
|
||
for btn in menu.query(Button):
|
||
btn.disabled = not enabled
|
||
logger.debug(f"Главное меню {'включено' if enabled else 'выключено'}.")
|
||
except DOMQuery.DoesNotExist:
|
||
logger.error("Такого быть не должно, но MainMenu не найден... невозможно изменение состояния кнопок.")
|
||
|
||
async def _clear_current_flow_display(self, full_reset: bool = False):
|
||
"""
|
||
Очищает отображаемые виджеты шагов из контейнера #steps_container.
|
||
Если full_reset=True, также сбрасывает состояние потока и включает главное меню.
|
||
"""
|
||
logger.debug(f"Очистка отображения потока. Full reset: {full_reset}")
|
||
# steps_container = self.query_one("#steps_container", Vertical)
|
||
# Удаляем все StepWidget из DOM и из нашего словаря отслеживания
|
||
widgets_to_remove = list(self.mounted_step_widgets.values())
|
||
for widget in widgets_to_remove:
|
||
if widget.is_mounted:
|
||
try:
|
||
await widget.remove()
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при удалении StepWidget {widget.id}: {e}")
|
||
self.mounted_step_widgets.clear()
|
||
|
||
if full_reset:
|
||
self.active_flow_type = None
|
||
self.flow_data = {}
|
||
self.set_main_menu_enabled(True)
|
||
logger.info("Поток полностью сброшен, главное меню включено.")
|
||
|
||
async def _show_next_step_widget(self,
|
||
widget_type: str,
|
||
title: str,
|
||
step_level: int,
|
||
context_data: Optional[Dict[str, Any]] = None):
|
||
"""
|
||
Создает и показывает новый StepWidget для следующего шага.
|
||
Предыдущие "замороженные" виджеты остаются на экране.
|
||
"""
|
||
self.set_main_menu_enabled(False) # Отключаем меню, пока активен поток
|
||
|
||
# Создаем новый экземпляр StepWidget
|
||
new_step_widget = StepWidget(
|
||
type_widget=widget_type,
|
||
title_text=title,
|
||
step_level=step_level,
|
||
context_data=context_data,
|
||
id=f"step_{step_level}_{widget_type.lower().replace(' ', '_')}" # Уникальный ID
|
||
)
|
||
|
||
# Сохраняем ссылку на созданный виджет
|
||
self.mounted_step_widgets[step_level] = new_step_widget
|
||
# Монтируем виджет в специальный контейнер
|
||
steps_container = self.query_one("#steps_container", Vertical)
|
||
await steps_container.mount(new_step_widget)
|
||
# Фокус будет установлен в on_mount самого StepWidget
|
||
logger.info(
|
||
f"StepWidget {new_step_widget.id} (type: {widget_type}, step: {step_level}) добавлен и смонтирован.")
|
||
|
||
# --- Действия, запускаемые из BINDINGS или кнопок меню ---
|
||
async def action_start_flow(self, flow_type: str):
|
||
"""Общий метод для запуска нового потока операций по его типу."""
|
||
if self.active_flow_type is not None:
|
||
logger.warning(
|
||
f"Поток '{self.active_flow_type}' уже активен. "
|
||
f"Новый поток '{flow_type}' не может быть запущен без отмены предыдущего."
|
||
)
|
||
self.bell() # Звуковой сигнал об ошибке
|
||
return
|
||
|
||
logger.info(f"Запуск нового потока: {flow_type}")
|
||
await self._clear_current_flow_display(full_reset=True) # Полный сброс перед новым потоком
|
||
|
||
self.active_flow_type = flow_type.lower()
|
||
self.flow_data = {} # Очищаем данные предыдущего потока
|
||
|
||
# Маршрутизация к первому шагу конкретного потока
|
||
if self.active_flow_type == "backup":
|
||
await self._backup_step_1_select_server()
|
||
elif self.active_flow_type == "restore":
|
||
logger.warning(f"Поток '{self.active_flow_type}' еще не реализован.")
|
||
await self._clear_current_flow_display(full_reset=True) # Пока просто сбрасываем
|
||
elif self.active_flow_type == "copy":
|
||
logger.warning(f"Поток '{self.active_flow_type}' еще не реализован.")
|
||
await self._clear_current_flow_display(full_reset=True)
|
||
elif self.active_flow_type == "service":
|
||
logger.warning(f"Поток '{self.active_flow_type}' еще не реализован.")
|
||
await self._clear_current_flow_display(full_reset=True)
|
||
else:
|
||
logger.error(f"Попытка запустить неизвестный тип потока: {flow_type}")
|
||
await self._clear_current_flow_display(full_reset=True) # Сбрасываем, если тип потока не опознан
|
||
|
||
# --- Пример реализации потока: Backup ---
|
||
async def _backup_step_1_select_server(self):
|
||
"""Первый шаг потока 'Backup': выбор сервера."""
|
||
logger.debug("Backup Flow: Запрос шага 1 - выбор сервера.")
|
||
await self._show_next_step_widget(
|
||
widget_type=WidgetType.BD_SERVERS_LIST,
|
||
title="Backup: Шаг 1 - Выберите сервер",
|
||
step_level=0
|
||
)
|
||
|
||
async def _backup_step_2_select_db(self, server_config: Dict[str, Any]):
|
||
"""Второй шаг потока 'Backup': выбор базы данных на указанном сервере."""
|
||
logger.debug(f"Backup Flow: Запрос шага 2 - выбор БД для сервера '{server_config.get('name')}'.")
|
||
await self._show_next_step_widget(
|
||
widget_type=WidgetType.DBS_IN_SERVER,
|
||
title=f"Backup: Шаг 2 - Выберите БД на сервере '{server_config.get('name', '???')}'",
|
||
step_level=1,
|
||
context_data={"server_config": server_config} # Передаем конфиг сервера в StepWidget
|
||
)
|
||
|
||
async def _backup_step_3_select_target(self, server_config: Dict[str, Any], db_name: str):
|
||
"""Третий шаг потока 'Backup': выбор места сохранения."""
|
||
logger.debug(f"Backup Flow: Запрос шага 3 - выбор цели для БД '{db_name}'.")
|
||
await self._show_next_step_widget(
|
||
widget_type=WidgetType.TARGETS, # Предполагаем, что есть такой тип для выбора цели
|
||
title=f"Backup: Шаг 3 - Куда сохранить бэкап для '{db_name}'?",
|
||
step_level=2,
|
||
context_data={"server_config": server_config, "db_name": db_name} # Передаем контекст
|
||
)
|
||
|
||
# --- Обработчики сообщений от StepWidget ---
|
||
@on(StepWidget.SelectionMade)
|
||
async def handle_step_selection(self, message: StepWidget.SelectionMade):
|
||
"""Обрабатывает сообщение о сделанном выборе в одном из StepWidget."""
|
||
sender_widget = message.sender_widget
|
||
current_step_level = sender_widget.step_level
|
||
selected_data = message.selection_data
|
||
display_value = message.display_value # Текст, который StepWidget отобразит в "замороженном" виде
|
||
|
||
logger.info(
|
||
f"PGanecApp: Выбор сделан (Поток: {self.active_flow_type}, Шаг: {current_step_level}): "
|
||
f"Данные='{selected_data}', Отображение='{display_value}'"
|
||
)
|
||
# sender_widget уже вызвал self.freeze() и обновил свое отображение.
|
||
|
||
# Маршрутизация на основе текущего потока и завершенного шага
|
||
if self.active_flow_type == "backup":
|
||
if current_step_level == 0: # Завершился шаг выбора сервера
|
||
self.flow_data["server_config"] = selected_data
|
||
await self._backup_step_2_select_db(server_config=selected_data)
|
||
elif current_step_level == 1: # Завершился шаг выбора БД
|
||
self.flow_data["db_name"] = selected_data
|
||
# Передаем server_config из сохраненных данных потока
|
||
await self._backup_step_3_select_target(
|
||
server_config=self.flow_data["server_config"],
|
||
db_name=selected_data
|
||
)
|
||
elif current_step_level == 2: # Завершился шаг выбора цели
|
||
self.flow_data["target_info"] = selected_data
|
||
logger.info(f"ПОТОК BACKUP ЗАВЕРШЕН! Все данные для бэкапа собраны: {self.flow_data}")
|
||
self.bell()
|
||
# Здесь можно:
|
||
# 1. Показать финальное подтверждение в новом StepWidget (type=CONFIRMATION).
|
||
# 2. Добавить кнопку "Выполнить бэкап" к последнему замороженному StepWidget.
|
||
# 3. Сразу запустить операцию бэкапа.
|
||
# Пока просто выведем сообщение и очистим поток для следующего раза.
|
||
|
||
# Пример добавления кнопки "Выполнить" к последнему виджету
|
||
if sender_widget.is_mounted: # Убедимся, что виджет еще на экране
|
||
execute_button = Button("Запустить бэкап!", id="execute_backup_action", variant="success")
|
||
try:
|
||
# Удаляем кнопку отмены, если она там была (не должна быть в замороженном)
|
||
sender_widget.query_one(".step-cancel-button").remove()
|
||
except DOMQuery.DoesNotExist:
|
||
pass
|
||
await sender_widget.mount(execute_button) # Добавляем кнопку выполнения
|
||
# await self._clear_current_flow_display(full_reset=True) # Пока не очищаем, чтобы увидеть кнопку
|
||
else:
|
||
logger.warning(f"Получен выбор для неожиданного шага {current_step_level} в потоке 'backup'.")
|
||
|
||
# Добавить elif для других active_flow_type (restore, copy, service)
|
||
# elif self.active_flow_type == "restore":
|
||
# ...
|
||
|
||
@on(StepWidget.Cancelled)
|
||
async def handle_step_cancellation(self, message: StepWidget.Cancelled):
|
||
"""Обрабатывает сообщение об отмене текущего шага в StepWidget."""
|
||
cancelled_widget = message.sender_widget
|
||
cancelled_step_level = cancelled_widget.step_level
|
||
|
||
logger.info(
|
||
f"PGanecApp: Шаг {cancelled_step_level} (Поток: {self.active_flow_type}) отменен пользователем."
|
||
)
|
||
# cancelled_widget уже вызвал await self.remove() и удалил себя из DOM.
|
||
|
||
# Удаляем отмененный виджет и все последующие (если они были ошибочно созданы)
|
||
# из нашего словаря отслеживания.
|
||
levels_to_clear_from_tracking = [
|
||
lvl for lvl in self.mounted_step_widgets
|
||
if lvl >= cancelled_step_level
|
||
]
|
||
for lvl in levels_to_clear_from_tracking:
|
||
self.mounted_step_widgets.pop(lvl, None) # Удаляем из словаря
|
||
|
||
# Очищаем соответствующие данные из self.flow_data
|
||
# Это зависит от структуры self.flow_data для каждого потока
|
||
if self.active_flow_type == "backup":
|
||
if cancelled_step_level <= 2 and "target_info" in self.flow_data:
|
||
del self.flow_data["target_info"]
|
||
if cancelled_step_level <= 1 and "db_name" in self.flow_data:
|
||
del self.flow_data["db_name"]
|
||
if cancelled_step_level == 0 and "server_config" in self.flow_data:
|
||
del self.flow_data["server_config"]
|
||
# ... (elif для других active_flow_type)
|
||
|
||
if not self.mounted_step_widgets: # Если не осталось "замороженных" виджетов (отменили первый шаг)
|
||
logger.info("Отменен первый или единственный шаг. Поток завершен. Возврат в главное меню.")
|
||
await self._clear_current_flow_display(full_reset=True) # Полный сброс
|
||
else:
|
||
# "Размораживаем" предыдущий шаг, который теперь стал последним активным
|
||
# Находим максимальный step_level среди оставшихся смонтированных виджетов
|
||
previous_step_level = max(self.mounted_step_widgets.keys())
|
||
previous_widget_to_unfreeze = self.mounted_step_widgets.get(previous_step_level)
|
||
|
||
if previous_widget_to_unfreeze and previous_widget_to_unfreeze.is_mounted:
|
||
logger.info(
|
||
f"Возврат к предыдущему шагу {previous_step_level}. Размораживаем виджет {previous_widget_to_unfreeze.id}.")
|
||
previous_widget_to_unfreeze.unfreeze()
|
||
# После unfreeze, compose виджета перерисует его в активном состоянии.
|
||
# Устанавливаем фокус на "размороженный" виджет.
|
||
# Его on_mount уже отработал, так что нужно вызвать focus() явно.
|
||
self.call_after_refresh(previous_widget_to_unfreeze.focus) # Фокус после перерисовки
|
||
else:
|
||
# Этого не должно произойти, если mounted_step_widgets не пуст, но на всякий случай
|
||
logger.error("Не удалось найти предыдущий шаг для разморозки. Полный сброс потока.")
|
||
await self._clear_current_flow_display(full_reset=True)
|
||
|
||
|
||
# async def show_selection_widget(self, action_type: str) -> None:
|
||
# """Показывает виджет выбора для ."""
|
||
# # Сначала удаляем старый виджет, если он есть, чтобы не было дублей
|
||
# if self.selection_widget_instance:
|
||
# try:
|
||
# await self.selection_widget_instance.remove()
|
||
# except Exception as e: # DOMObjectMissingError может возникнуть, если уже удален
|
||
# logger.debug(f"Ошибка при удалении старого selection_widget_instance (возможно, уже удален): {e}")
|
||
# self.selection_widget_instance = None
|
||
#
|
||
# # Создаем новый экземпляр виджета
|
||
# self.selection_widget_instance = SelectionWidget(
|
||
# app_config=self.app_config,
|
||
# action_type=action_type,
|
||
# id="select_any" # Даем ему id для возможного поиска
|
||
# )
|
||
#
|
||
# self.selection_widget_active = True # Выставляем флаг, что виджет активен
|
||
# self.set_main_menu_enabled(False) # Отключаем верхнее меню
|
||
# # Находим контейнер, куда его добавить.
|
||
# # Будем добавлять после MainMenu внутри #main_content_area
|
||
# main_content_area = self.query_one("#main_content_area", Vertical)
|
||
# main_menu = self.query_one(MainMenu)
|
||
#
|
||
# # Добавляем виджет в DOM после MainMenu
|
||
# # `mount` добавляет виджет и вызывает его on_mount
|
||
# await main_content_area.mount(self.selection_widget_instance, after=main_menu)
|
||
# self.selection_widget_instance.focus() # Пытаемся установить фокус
|
||
# logger.info("SelectionWidget добавлен в DOM.")
|
||
|
||
|
||
async def on_button_pressed(self, event: Button.Pressed) -> None:
|
||
"""
|
||
Обработчик нажатия (Enter или Click) кнопки в верхнем меню.
|
||
:param event: Полученное событие нажатия кнопки на верхнем меню.
|
||
:return:
|
||
"""
|
||
button_id = event.button.id
|
||
button_classes = event.button.classes
|
||
|
||
# if button_id == "backup":
|
||
# logger.debug("TUI: Enter или Click на кнопку 'Backup' -- Инициализируем виджет выбора.")
|
||
# await self.show_selection_widget("backup")
|
||
# elif button_id == "restore":
|
||
# logger.debug("TUI: Enter или Click на кнопку 'Restore' -- Инициализируем виджет выбора.")
|
||
# await self.show_selection_widget("restore")
|
||
# elif button_id == "copy":
|
||
# logger.debug("TUI: Enter или Click на кнопку 'Copy' -- Инициализируем виджет выбора.")
|
||
# await self.show_selection_widget("copy")
|
||
# elif button_id == "service":
|
||
# logger.debug("TUI: Enter или Click на кнопку 'Service' -- Инициализируем виджет выбора.")
|
||
# await self.show_selection_widget("service")
|
||
# elif button_id == "quit":
|
||
# await self.action_quit()
|
||
|
||
# Проверяем, является ли нажатая кнопка частью главного меню
|
||
if "main-menu-button" in button_classes:
|
||
# Если активен какой-либо поток
|
||
if self.mounted_step_widgets:
|
||
logger.warning(
|
||
f"Нажата кнопка главного меню '{button_id}', но активен поток '{self.active_flow_type}'. "
|
||
"Остаемся в потоке."
|
||
)
|
||
self.bell()
|
||
# Или может выйти из потока. Пока не решил...
|
||
# await self._clear_current_flow_display(full_reset=True)
|
||
# Не продолжаем обработку кнопки меню, если только что прервали поток
|
||
# Однако, если пользователь хочет выйти (quit), это нужно разрешить.
|
||
# if button_id == "quit":
|
||
# await self.action_quit()
|
||
return
|
||
# Если поток не активен, обрабатываем кнопки главного меню
|
||
if button_id == "backup":
|
||
logger.debug("TUI: Нажата кнопка 'Backup' -- Инициализируем виджет выбора.")
|
||
await self.action_start_flow("backup")
|
||
elif button_id == "restore":
|
||
logger.debug("TUI: Нажата кнопка 'Restore' -- Инициализируем виджет выбора.")
|
||
await self.action_start_flow("restore")
|
||
elif button_id == "copy":
|
||
logger.debug("TUI: Нажата кнопка 'Copy' -- Инициализируем виджет выбора.")
|
||
await self.action_start_flow("copy")
|
||
elif button_id == "service":
|
||
logger.debug("TUI: Нажата кнопка 'Service' -- Инициализируем виджет выбора.")
|
||
await self.action_start_flow("service")
|
||
elif button_id == "quit":
|
||
logger.debug("TUI: Нажата кнопка 'Quit' -- Завершаем приложение.")
|
||
await self.action_quit()
|
||
else:
|
||
logger.warning(f"Не может быть! Неизвестная кнопка главного меню: {button_id}")
|
||
|
||
# Это не главное меню, а кнопка внутри "потока"
|
||
elif button_id == "execute_backup_action": # Обработка кнопки "Запустить бэкап"
|
||
if self.active_flow_type == "backup" and self.flow_data.get("target_info"):
|
||
logger.info(f"ПОЛЬЗОВАТЕЛЬ НАЖАЛ 'ЗАПУСТИТЬ БЭКАП' с данными: {self.flow_data}")
|
||
# >>> ЗДЕСЬ ВЫЗЫВАЕТСЯ РЕАЛЬНАЯ ЛОГИКА БЭКАПА <<<
|
||
self.bell() # Сигнал о начале
|
||
# Имитация выполнения (тут будут системные команды через Plumbum или другие подобные библиотеки)
|
||
await asyncio.sleep(1)
|
||
self.bell()
|
||
await asyncio.sleep(1)
|
||
self.bell() # Сигнал о завершении
|
||
logger.info("Бэкап (имитация) завершен!")
|
||
await self._clear_current_flow_display(full_reset=True)
|
||
else:
|
||
logger.warning("Попытка запустить бэкап без завершенного потока или неверный тип потока.")
|
||
self.bell()
|
||
else:
|
||
# Это может быть кнопка из другого места, если такие появятся.
|
||
# Пока что, если это не кнопка меню и не специальная кнопка, логируем.
|
||
logger.debug(f"Нажата кнопка с ID '{button_id}', не относящаяся к главному меню или известным действиям.")
|
||
|
||
|
||
def on_mount(self) -> None:
|
||
"""Вызывается после монтирования DOM приложения."""
|
||
# --- Обжужукиваем логирование в #app_log_viewer ---
|
||
log_viewer_widget = self.query_one("#app_log_viewer", Log)
|
||
textual_handler = TextualLogHandler(log_viewer_widget)
|
||
if self.log_level_int >= logging.INFO:
|
||
formatter = logging.Formatter(
|
||
fmt="%(asctime)s [%(levelname)-8s]\t %(message)s\n",
|
||
datefmt="%H:%M:%S" # Формат времени: часы:минуты:секунды
|
||
)
|
||
else:
|
||
formatter = logging.Formatter(
|
||
fmt="%(asctime)s [%(levelname)-8s] %(module)s.%(funcName)s:%(lineno)04d\t- %(message)s\n",
|
||
datefmt="%H:%M:%S.%s" # Формат времени: часы:минуты:секунды.милисекунды
|
||
)
|
||
textual_handler.setFormatter(formatter)
|
||
textual_handler.setLevel(self.log_level_int)
|
||
|
||
# Получаем корневой логгер
|
||
target_logger = logging.getLogger()
|
||
target_logger.addHandler(textual_handler) # Добавляем наш обработчик
|
||
|
||
# --- -- Перенос и сброс ранних логов из MemoryHandler -- ---
|
||
if self.early_log_handler:
|
||
# Устанавливаем наш textual_handler как цель для MemoryHandler
|
||
self.early_log_handler.setTarget(textual_handler)
|
||
# Сбрасываем все накопленные записи
|
||
self.early_log_handler.flush()
|
||
# Закрываем и удаляем MemoryHandler из корневого логера, так как он больше не нужен
|
||
self.early_log_handler.close()
|
||
target_logger.removeHandler(self.early_log_handler)
|
||
logger.debug("TUI: Перенесли и закрыли ранние логи… Удалили ранний MemoryHandler из корневого логера…")
|
||
self.early_log_handler = None # Очищаем ссылку
|
||
# --- -- Конец переноса и сброса -- ---
|
||
# --- Конец обжужукивания логирования ---
|
||
|
||
logger.info("TUI-логгер инициализирован")
|
||
logger.debug("test TUI-logger: DEBUG")
|
||
logger.warning("test TUI-logger: WARNING")
|
||
logger.error("test TUI-logger: ERROR")
|
||
|
||
async def action_activate_focused_widget(self) -> None:
|
||
"""Активирует (нажимает) сфокусированный виджет, если это кнопка."""
|
||
focused_widget = self.focused
|
||
if isinstance(focused_widget, Button):
|
||
logger.debug(f"action_activate: Имитация нажатия Enter на кнопке '{focused_widget.id or focused_widget.label}'")
|
||
await focused_widget.press() # Textual сам обработает это как клик
|
||
elif focused_widget:
|
||
logger.debug(f"action_activate: Enter нажат на '{type(focused_widget).__name__}', не являющемся кнопкой.")
|
||
else:
|
||
logger.debug("action_activate: Нет сфокусированного элемента.")
|
||
|
||
async def action_handle_escape(self) -> None:
|
||
"""Обрабатывает нажатие клавиши Escape."""
|
||
if self.mounted_step_widgets:
|
||
# Если есть активные шаги, Escape должен отменить последний активный шаг
|
||
logger.debug("Нажата Escape, активен поток. Попытка отменить последний шаг.")
|
||
# Находим самый "верхний" (последний добавленный) активный (не замороженный) StepWidget
|
||
active_step_to_cancel: Optional[StepWidget] = None
|
||
if self.mounted_step_widgets: # Проверяем, что словарь не пуст
|
||
# Ищем виджет с максимальным step_level, который не заморожен
|
||
# (хотя по логике, не заморожен должен быть только один - самый последний)
|
||
max_level = -1
|
||
for level, widget_instance in self.mounted_step_widgets.items():
|
||
if not widget_instance.is_frozen and level > max_level: # Ищем самый "свежий" активный
|
||
max_level = level
|
||
active_step_to_cancel = widget_instance
|
||
|
||
if active_step_to_cancel:
|
||
# Имитируем нажатие его кнопки "Отмена"
|
||
cancel_button_of_active_step = active_step_to_cancel.query_one(
|
||
f"#{active_step_to_cancel._cancel_button_id}", Button
|
||
)
|
||
await cancel_button_of_active_step.press()
|
||
else:
|
||
logger.warning("Escape: Не найден активный шаг для отмены, хотя виджеты шагов есть.")
|
||
self.bell()
|
||
else: # Этого не должно быть, если self.mounted_step_widgets не пуст
|
||
await self._clear_current_flow_display(full_reset=True)
|
||
|
||
else:
|
||
# Если нет активных шагов, Escape может, например, ничего не делать или закрывать приложение
|
||
logger.debug("Нажата Escape, нет активного потока. Действий не предусмотрено (или можно добавить выход).")
|
||
self.bell() # Просто сигнал, что нажатие обработано, но без видимого эффекта
|
||
|
||
# async def action_activate(self) -> None:
|
||
# """
|
||
# Обработчик нажатия клавиши Enter.
|
||
# :return:
|
||
# """
|
||
# focused = self.focused
|
||
# if not focused:
|
||
# logger.debug("TUI: action_activate вызван, но нет сфокусированного элемента.")
|
||
# return
|
||
# else:
|
||
# if isinstance(focused, Button):
|
||
# # Проверяем, не является ли кнопка частью нашего виджета выбора
|
||
# if not isinstance(focused, Button): # Если не кнопка - ничего не делаем
|
||
# logger.debug("TUI: Что-то нажато (Enter), но это не кнопка.")
|
||
# return
|
||
#
|
||
# # if focused.parent and isinstance(focused.parent, SelectionWidget):
|
||
# # await focused.parent.on_button_pressed(Button.Pressed(focused))
|
||
# # else:
|
||
# # await self.on_button_pressed(Button.Pressed(focused))
|
||
|
||
# # --- Обработчики быстрых клавиш ---
|
||
# # Для 'q' (Quit) уже есть в BINDINGS срабатывающий метод action_quit, он сработает автоматически (не ясно почему).
|
||
# async def action_backup(self):
|
||
# """
|
||
# Обработчик нажатия быстрой клавиши 'b' для инициализации виджета создания бэкапа.
|
||
# :return:
|
||
# """
|
||
# if getattr(self, "selection_widget_active", False): # Проверяем, активен ли виджет выбора
|
||
# return # Если активен, ничего не делаем (отменяем нажатие клавиши)
|
||
# self.query_one("#backup", Button).focus() # подсветим кнопку
|
||
# logger.debug("TUI: Быстрая клавиша 'b' -- Инициализируем виджет.")
|
||
# await self.show_selection_widget("backup")
|
||
|
||
# async def action_restore(self):
|
||
# """
|
||
# Обработчик нажатия быстрой клавиши 'r' для восстановления.
|
||
# :return:
|
||
# """
|
||
# if getattr(self, "selection_widget_active", False): # Проверяем, активен ли виджет выбора
|
||
# return
|
||
# self.query_one("#restore", Button).focus() # подсветим кнопку
|
||
# logger.debug("TUI: Быстрая клавиша 'r' -- Инициализируем виджет.")
|
||
# await self.show_selection_widget("restore")
|
||
|
||
# async def action_copy(self):
|
||
# """
|
||
# Обработчик нажатия быстрой клавиши 'c' для копирования.
|
||
# :return:
|
||
# """
|
||
# if getattr(self, "selection_widget_active", False): # Проверяем, активен ли виджет выбора
|
||
# return
|
||
# self.query_one("#copy", Button).focus()
|
||
# logger.debug("TUI: Быстрая клавиша 'c' -- Инициализируем виджет.")
|
||
# await self.show_selection_widget("copy")
|
||
|
||
# async def action_service(self):
|
||
# """
|
||
# Обработчик нажатия быстрой клавиши 's' для сервисных действий.
|
||
# :return:
|
||
# """
|
||
# if getattr(self, "selection_widget_active", False): # Проверяем, активен ли виджет выбора
|
||
# return
|
||
# self.query_one("#service", Button).focus()
|
||
# logger.debug("TUI: Быстрая клавиша 's' -- Инициализируем виджет.")
|
||
# await self.show_selection_widget("service")
|
||
|
||
|