# -*- coding: utf-8 -*- # src/pganec/tui.py import logging import asyncio from logging.handlers import MemoryHandler from symtable import Class from typing import Dict, Any, Optional, List, Tuple from textual.app import App, ComposeResult from textual.containers import Vertical, Horizontal, Container from textual.css.query import DOMQuery from textual.reactive import reactive from textual.screen import Screen from textual.widgets import Button, Header, Footer, Static, Log, Label, ListView, ListItem, OptionList from textual.widget import Widget from textual.message import Message from textual import on from typing import TYPE_CHECKING if TYPE_CHECKING: from .tui import PGanecApp # --- Настройки и инициирование логирования модуля --- logger = logging.getLogger(__name__) # --- КОНФИГ_ПЕРЕМЕННЕЫ которые использоваться в приложении (возможно стоит перенести в другой файл) --- DB_FOR_BACKUP = "db_4b" # --- Типы для шагов выбора --- STEP_TYPE_SERVER = "select_server" STEP_TYPE_DATABASE = "select_database" STEP_TYPE_DESTINATION = "select_destination" class WidgetType: # Определяем типы виджетов/шагов BD_SERVERS_LIST = "db_servers_list" DBS_IN_SERVER = "dbs_in_server" TARGETS = "targets_list" SERVICE_ACTIONS = "service_actions" # Можно добавить другие типы по мере необходимости, например: TEXT_INPUT = "text_input" CONFIRMATION = "confirmation_step" IN_PROGRESS = "progress" # --- Кастомный обработчик логирования (для Textual) --- class TextualLogHandler(logging.Handler): """Обработчик логирования, который направляет записи в виджет Textual Log.""" def __init__(self, textual_log_widget: Log): super().__init__() self.textual_log_widget = textual_log_widget def emit(self, record: logging.LogRecord): """Отправляет отформатированную запись лога в виджет.""" try: msg = self.format(record) # Метод Log.write() является потоко-безопасным, и его можно вызывать напрямую. self.textual_log_widget.write(msg) except Exception: # Стандартная обработка ошибок для Handler, если что-то пошло не так (например, виджет был удален # или произошла ошибка форматирования) self.handleError(record) # --- Верхнее меню приложения (для Textual) --- class MainMenu(Static): def compose(self) -> ComposeResult: yield Horizontal( Button(label="Backup (DB\u2192Dump)", id="backup", classes="main-menu-button"), Button(label="Restore (Dump\u2192DB)", id="restore", classes="main-menu-button"), Button(label="Copy (DB\u2192DB)", id="copy", classes="main-menu-button"), Button(label="Service", id="service", classes="main-menu-button"), Button(label="Quit", id="quit", variant="error", classes="main-menu-button"), id="menu", ) # --- КЛАСС STEPWIDGET: НАЧАЛО--- class StepWidget(Static): """ Универсальный виджет многошагового выбора опций. На экране может быть несколько экземпляров этого виджета, в зависимости от того какое содержание "закажет" основное приложение PGanecApp (в потоке) """ # --- Сообщения, которые этот виджет может отправлять --- class SelectionMade(Message): """Сообщение о том, что в этом StepWidget был сделан выбор.""" def __init__(self, sender_widget: 'StepWidget', selection_data: Any, display_value: str): self.sender_widget = sender_widget self.selection_data = selection_data # Выбранные данные self.display_value = display_value # Текст для отображения в "замороженном" состоянии super().__init__() class Cancelled(Message): """Сообщение о том, что текущий шаг в StepWidget был отменен.""" def __init__(self, sender_widget: 'StepWidget'): self.sender_widget = sender_widget super().__init__() def __init__(self, type_widget: str, title_text: str, step_level: int = 0, # Уровень/шаг в последовательности (он же номер экземпляра виджета в цепочке) # is_frozen: bool = False, # Флаг, что виджет заморожен (выбор сделан) и если context_data: Optional[Dict[str, Any]] = None, # <--- ДОБАВЬ ЭТОТ ПАРАМЕТР *args, **kwargs): super().__init__(*args, **kwargs) self.type_widget: str = type_widget self.title_text: str = title_text # Заголовок для этого шага self.step_level: int = step_level # Шаг в конвейере выбора из PGanecApp. self.is_frozen: bool = False # Изначально виджет не заморожен (отображается целиком) self.context_data: Dict[str, Any] = context_data or {} # self.is_frozen: bool = is_frozen self.num_count: int = 1 # Счетчик для нумерации опций в этом экземпляре виджета # self._current_selection_display_text: Optional[str] = None self._current_selection_display_text: Optional[str] = None # Для отображения замороженного выбора self._button_id_to_data_map: Dict[str, Any] = {} # Для связи ID кнопок с данными # Уникальный ID для кнопки отмены этого экземпляра self._cancel_button_id = f"cancel_step_{self.step_level}_{self.id or id(self)}" # ID кнопки "Отмена" @property def app_config(self) -> Dict[str, Any]: """ Возвращает конфигурацию приложения PGanecApp, к которому принадлежит этот виджет. """ return getattr(self.app, "app_config", {}) def _create_cancel_button(self, num_for_cansel: int = 0) -> Button: """Создаёт кнопку 'Отмена'.""" return Button( label=f"{num_for_cansel:02d}: Отмена", # num_count, очно, это счетчик для нумерации кнопок id=self._cancel_button_id, classes="step-cancel-button" # Класс для CSS в TUI ) # Удаляем вообще все кнопки 'Отмена' во всех виджетах (если она вдруг осталась... для надежности) # try: # for btn in self.query(".cancel_widget"): # btn.remove() # except Exception: # DOMQuery.DoesNotExist: почему-то не видит статический анализатор # pass # Кнопки уже нет, просто игнорируем # yield Button(label=f"{self.num_count:02d}: Отмена", # id=f"cancel_widget_{self.step_level}", # classes="cancel_widget") # self.mount(Button(label="{self.num_count:02d}: Отмена", id="cancel_widget", classes="list-button")) # def _remove_cancel_button(self): # """Удаляет кнопку 'Отмена' (при заморозке).""" # try: # for btn in self.query(".cancel_widget"): # btn.remove() # except Exception: # pass # Кнопки уже нет # def freeze(self, selection_id: str): def freeze(self, selected_button_label: str): """Замораживает шаг: сохраняет информацию о выборе и обновляет виджет для отображения только этого выбора.""" # - Устанавливает self.is_frozen = True. # - Сохраняет self._current_selection_display_text = selected_button_label. # - Вызывает self.refresh(). Это заставит compose() перерисовать виджет в замороженном состоянии. self.is_frozen = True self._current_selection_display_text = str(selected_button_label) # Вместо скрытия элементов, мы просто вызовем self.refresh(), а compose() сам решит, что отображать # на основе self.is_frozen. self.refresh() logger.debug( f"StepWidget (step {self.step_level}, id: {self.id}) заморожен. Выбор: {self._current_selection_display_text}") def unfreeze(self): """ "Размораживает" шаг, делая его снова интерактивным. PGanecApp должен будет передать новые данные для отображения, если это необходимо, перед вызовом unfreeze, или unfreeze должен принимать данные для re-compose. Пока что unfreeze просто сбрасывает флаг и обновляет. """ # - Устанавливает self.is_frozen = False. # - Сбрасывает self._current_selection_display_text = None. # - Вызывает self.refresh(). compose() перерисует виджет в активном состоянии. if not self.is_frozen: return # Уже разморожен self.is_frozen = False self._current_selection_display_text = None self.num_count = 1 # Сбрасываем счетчик для нумерации self.remove_children() # Очистим старое содержимое виджета, compose создаст новое self.refresh() # Заставим compose() перерисовать активное состояние logger.debug(f"StepWidget (step {self.step_level}, id: {self.id}) разморожен.") def compose(self) -> ComposeResult: """Создаёт содержимое виджета.""" # - Рисует заголовок. # - if self.is_frozen:: Рисует Static с self._current_selection_display_text. # - else:: Рисует интерактивные кнопки выбора (на основе self.type_widget и данных, которые StepWidget может # получить из self.app.app_config или которые ему передал PGanecApp при инициализации/конфигурации) # и кнопку "Отмена". # Заголовок шага self._button_id_to_data_map.clear() # Очищаем перед каждым рендерингом активного состояния yield Label(renderable=self.title_text, classes="step-title") # Заголовок виджета if self.is_frozen: # --- Виджет в ЗАМОРОЖЕННОМ состоянии --- if self._current_selection_display_text: yield Static(content=f"[\u2192\u2192] {self._current_selection_display_text}", classes="frozen-choice-display") else: # На случай, если заморожен без текста (не должно быть, но вдруг) yield Static(content="[\u2192\u2192] Выбор сделан, но хрен знает, что выбрали", classes="frozen-choice-display") else: if self.type_widget == WidgetType.BD_SERVERS_LIST: # --- Виджет для выбора серверов баз данных --- servers_data = self.app_config.get("servers", []) if not servers_data: yield Label("Нет серверов для выбора.", classes="placeholder-text") else: for server_conf in servers_data: server_name = server_conf.get("name", f"NONAME {self.num_count:02d}") button_id = server_conf.get("id", f"auto_id_{self.num_count}_{self.id or id(self)}") # ID уникальны yield Button(label=f"{self.num_count:02d}: {server_name}", id=button_id, classes="list-button") self._button_id_to_data_map[button_id] = server_conf # Сохраняем всю конфигурацию self.num_count += 1 # ... elif self.type_widget == WidgetType.DBS_IN_SERVER: yield Label(renderable="Тут будет список кнопок для выбора базы данных в сервере...", classes="warning-text") # ... elif self.type_widget == WidgetType.TARGETS: yield Label(renderable="Тут будет список кнопок для выбора тома назначения...", classes="warning-text") # ... elif self.type_widget == WidgetType.SERVICE_ACTIONS: yield Label(renderable="Тут будут служебные функции...", classes="warning-text") # ... else: logger.error(f"Неизвестный тип виджета: {self.type_widget}. Кыш отсюда!") yield Label(renderable="Ошибка: неизвестный тип виджета.", classes="error-text") # Добавляем кнопку "Отмена" yield self._create_cancel_button(num_for_cansel= self.num_count) # Передаем текущий счетчик для кнопки "Отмена" async def on_button_pressed(self, event: Button.Pressed) -> None: """ Обработчик нажатия кнопок в этом виджете. """ button_id = event.button.id if self.is_frozen: return # Виджет заморожен, на нем нет кнопок, не обрабатываем # Если это кнопка отмены - удаляем виджет и сбрасываем флаг активности if event.button.id == self._cancel_button_id: # Проверяем по уникальному ID кнопки отмены logger.debug(f"Нажата 'Отмена' в StepWidget (step {self.step_level}, id: {self.id}).") self.post_message(StepWidget.Cancelled(sender_widget=self)) # PGanecApp решит, удалять ли виджет. Если мы используем N экземпляров, # то PGanecApp должен будет вызвать hide_and_reset() или remove() на этом экземпляре. # Если мы создаем новый экземпляр для каждого шага, то await self.remove() здесь уместно. # Пока оставим удаление на усмотрение PGanecApp, который получит сообщение. self.num_count = 1 # Сброс счетчика нумерации await self.remove() # Удаляем сам виджет из DOM return # self.app.selection_widget_active = False # <-- Сброс флага что виджет активен # # Очищаем состояние виджета и удаляем его из DOM и все в нем # self.is_frozen = False # self.num_count = 1 # Сброс счетчика нумерации # self._remove_cancel_button() # self.app.selection_widget_instance = None # Сбрасываем ссылку на виджет в приложении # self.app.selection_widget_active = False # Сбрасываем флаг активности виджета # self.post_message(StepWidget.Cancelled(sender_widget=self)) # Сообщение об отмене # await self.remove() # Удаляем сам виджет # logger.info("SelectionWidget удален из DOM.") # return # Если это не кнопка отмены, значит это кнопка выбора selected_option_data = self._button_id_to_data_map.get(button_id) if selected_option_data is None: logger.warning(f"Нет данных для кнопки с ID '{button_id}' в StepWidget (step {self.step_level}).") return # Неизвестная кнопка (не из списка опций) display_text_for_freeze = str(event.button.label) logger.info( f"StepWidget (step {self.step_level}, id: {self.id}): выбрано '{selected_option_data}' (метка: '{display_text_for_freeze}')") # "Замораживаем" виджет, передавая ID и текст выбранной кнопки self.freeze(selected_button_label=display_text_for_freeze) # Отправляем сообщение приложению с выбранными данными self.post_message(StepWidget.SelectionMade( sender_widget=self, selection_data=selected_option_data, # Отправляем реальные данные display_value=display_text_for_freeze )) # После заморозки виджета, PGanecApp должен будет обработать это сообщение и решить, что делать дальше. async def on_mount(self) -> None: """ Вызывается после монтирования виджета в DOM. """ if not self.is_frozen: try: # Пытаемся установить фокус на первую кнопку-опцию first_button = self.query_one("Button.list-button", Button) first_button.focus() except DOMQuery.DoesNotExist: try: cancel_button = self.query_one(f"#{self._cancel_button_id}", Button) cancel_button.focus() except DOMQuery.DoesNotExist: logger.warning(f"В StepWidget {self.id} нет кнопок для фокуса.") except Exception as e: # Более общая ошибка для фокуса на кнопке отмены logger.error(f"Ошибка при установке фокуса на кнопку Отмена в StepWidget {self.id}: {e}") except Exception as e: # Более общая ошибка для фокуса на list-button logger.error(f"Ошибка при установке фокуса на StepWidget {self.id}: {e}") # --- КЛАСС STEPWIDGET: КОНЕЦ --- # # --- Виджет для "встроенного" выбора --- # class SelectionWidget(Static): # Наследуем от Static # """ # Виджет для и PGanec TUI. # """ # @property # def app(self) -> "PGanecApp": # """ # Возвращает ссылку на приложение PGanecApp , к которому принадлежит этот виджет. Нужно для доступа к методам # и состоянию приложения (и статического анализатора, так как класс PGanecApp описан ниже и "отсюда не виден"). # :return: # """ # return super().app # type: ignore # # # def __init__(self, # app_config: Optional[Dict[str, Any]] = None, # action_type: Optional[str] = None, # **kwargs): # super().__init__(**kwargs) # self.app_config = app_config if app_config is not None else {} # каждому экземпляру виджета нужна общая конфигурация # # self.step_id = step_id # Номер этого этапа # self.overall_action_type = action_type # Сохраняем тип действия ("backup", "restore", "copy", "service") # self.current_step = 0 # Текущий шаг выбора (многоэтапный выбор), начинаем с 0 # self.selections: Dict[int, Any] = {} # Хранит выборы: {0: server_conf, 1: db_name, 2: dest_path} # self._button_id_to_data_map: Dict[str, Any] = {} # Карта для хранения значений опций # logger.debug( # f"SelectionWidget initialized: action: '{self.overall_action_type}' / current_step: {self.current_step}" # ) # # # SelectionWidget(Static) # # def compose(self) -> ComposeResult: # """ Создаёт содержимое виджета. """ # self._button_id_to_data_map.clear() # Очищаем карту значений # num_count = 1 # Для нумерации опций текущего шага # # # --- Отображение уже сделанных выборов --- # if self.current_step > 0 and 0 in self.selections: # server_conf = self.selections[0] # server_name = server_conf.get("name", "Неизвестный сервер") # yield Static(f"1. Сервер: {server_name}", classes="previous-selection") # # if self.current_step > 1 and 1 in self.selections: # db_name = self.selections[1] # yield Static(f"2. База данных: {db_name}", classes="previous-selection") # # if self.current_step > 2 and 2 in self.selections: # Если есть третий шаг и он сделан # dest_info = self.selections[2] # Может быть строка или словарь # yield Static(f"3. Назначение: {dest_info}", classes="previous-selection") # # if self.overall_action_type == "backup": # yield Label("Backup (DB\u2192Dump): Выберите BD-сервер:") # # ... тут будет список серверов # if "servers" in self.app_config: # logger.info(f"TUI: Обнаружено {len(self.app_config['servers'])} серверов в конфигурации.") # for server_conf in self.app_config["servers"]: # server_name = server_conf.get("name") # button_id = server_conf.get("id") # if not button_id: # button_id = f"server_{num_count:02d}" # Генерируем временный id, если не указан # logger.warning( # f"Сервер '{server_name}' не имеет 'id' в yaml-конфигурации. Установлен id='{button_id}'.") # # continue # пропускаем? # if button_id in self._button_id_to_data_map: # logger.error( # f"TUI: Дублирующийся ID кнопки '{button_id}' для сервера '{server_name}' из yaml-конфигурации. ПРОРУСКАЕМ.") # continue # # Добавляем кнопку для каждого сервера # yield Button(label=f"{num_count:02d}: {server_name}", id=button_id, classes="list-button") # self._button_id_to_data_map[button_id] = {"type": DB_FOR_BACKUP, "config": server_conf} # num_count += 1 # # elif self.overall_action_type == "restore": # yield Label("Выберите том для восстановления:") # if "targets" in self.app_config: # logger.info(f"TUI: Обнаружено {len(self.app_config['targets'])} томов в конфигурации.") # yield Static("Тут будет список...", classes="placeholder-text") # Временная заглушка # # ... тут будет список томов # elif self.overall_action_type == "copy": # yield Label("Выберите BD-сервер (откуда копировать):") # # ... тут будет список серверов # elif self.overall_action_type == "service": # yield Label("Служебные функции:") # # ... тут будут служебные функции # # # Кнопка "Отмена" в конце любого списка # yield Button(label=f"{num_count:02d}: Отмена", id="cancel_widget", classes="list-button") # # # def on_mount(self) -> None: # """ Вызывается после монтирования виджета в DOM. """ # logger.info(f"Виджет '{self.id}' (SelectionWidget) смонтирован.") # try: # # Пытаемся установить фокус на первую кнопку-опцию # # Ищем первую кнопку с классом "list-button" внутри этого виджета # first_option_button = self.query_one("Button.list-button", Button) # first_option_button.focus() # logger.debug(f"TUI: SelectionWidget: Фокус на кнопке-опцию: `{first_option_button.id}`") # except Exception as e: # # except DOMQuery.DoesNotExist: # logger.error(f"SelectionWidget: Ошибка при установке фокуса в Виджете: {e}") # # logger.warning(f"SelectionWidget: OptionList не найден для установки фокуса (overall_action_type: {self.overall_action_type}).") # # # Можно сразу установить фокус на первый элемент, если он есть # # self.query_one(OptionList).focus() или self.query_one(Button).focus() # # # async def on_button_pressed(self, event: Button.Pressed) -> None: # button_id = event.button.id # button_data = self._button_id_to_data_map.get(button_id) # <--- Извлекаем данные по ID кнопки # # logger.info(f"SelectionWidget ({self.overall_action_type}): нажата кнопка-опция id='{button_id}', data={button_data}") # # if button_data: # action_type_from_data = button_data.get("type") # if action_type_from_data == DB_FOR_BACKUP: # server_config = button_data.get("config") # <--- Получаем полную конфигурацию сервера # logger.info( # f"SelectionWidget ({self.overall_action_type}): выбран сервер {server_config.get('name') if isinstance(server_config, dict) else server_config}. Далее - выбор БД/дампа.") # # Тут будет логика для перехода к следующему шагу, используя server_config # self.app.bell() # # ... другие обработчики ... # # if event.button.id == "cancel_widget": # logger.debug("Нажата 'Отмена' в SelectionWidget.") # await self.remove() # Удаляем сам виджет # self.app.selection_widget_active = False # <-- Сброс флага что виджет активен # # Можно также послать сообщение приложению, чтобы оно обновило состояние, если нужно # # self.app.post_message(BackupSelectionCancelled()) # self.app.set_main_menu_enabled(True) # Включаем верхнее меню обратно # --- ОСНОВНОЕ ПРИЛОЖЕНИЕ: НАЧАЛО --- class PGanecApp(App): CSS = """ Screen { layout: vertical; /* Вертикальная компоновка для всего приложения */ overflow: auto; /* Добавляем прокрутку на весь экран, если контент не влезает */ } Header { dock: top; height: 1; } #main_content_area { /* Этот контейнер будет занимать все доступное пространство между Header и Log/Footer */ height: 1fr; /* 1fr означает "одна доля доступного пространства" */ overflow-y: auto; /* Если контент не влезет, появится прокрутка */ padding: 0 1; } #title { width: 100%; text-align: left; background: purple; padding: 0; } /* Заголовок приложения */ #menu { width: 100%; height: 3; align: left top; padding: 0; } #menu Button.main-menu-button { /* Кнопки верхнего меню */ align: center middle; width: 20%; color: orange; background: transparent; margin: 0; padding: 0; border: round orange; } #menu Button.main-menu-button:hover { background: $primary-background-lighten-2; /* Подсветка при наведении */ } Button:focus { border: heavy green; } /* Стили для лога приложения */ Log#app_log_viewer { dock: bottom; height: 10; border-top: double grey; background: 15%; } Log#app_log_viewer:focus { height: 50%; border-top: solid green; background: 45%; } /* Стили для StepWidget */ StepWidget { border: round $primary-lighten-1; background: $panel; width: 100%; } StepWidget .step-title { width: 100%; text-style: bold; color: $text-muted; } StepWidget .frozen-choice-display { padding: 0; color: $text-muted; text-style: italic; border: dashed $primary-darken-1; /* Выделим замороженный выбор */ margin-top: 1; /* Отступ от заголовка */ } StepWidget Button.list-button, StepWidget Button.step-cancel-button { width: auto; height: auto; min-height: 1; align: left middle; border: none; background: transparent; color: $text; /* padding: 0 1; margin-bottom: 0;q margin-left: 0; padding-left: 1; */ } StepWidget Button.list-button:hover { background: $primary; } StepWidget Button.step-cancel-button { color: $error; margin-left: -1; } /* StepWidget Button.step-cancel-button:hover { background: $error; } */ StepWidget .placeholder-text, StepWidget .error-text { padding: 0 1; color: $text-muted; } StepWidget .error-text { color: $error; } /* Стили для встроенного виджета выбора */ SelectionWidget { padding: 1; margin-top: 0; background: $panel; /* Используем системный цвет панели */ /* height: auto; /* Высота будет по содержимому */ } SelectionWidget .placeholder-text { padding: 0; color: $text-muted; } Button.list-button { /* Стили для кнопок внутри виджета */ width: auto; height: 1; align: left middle; border: hidden; background: transparent; color: $text; margin-left: 0; padding-left: 1; } Button#cancel_widget {margin-left: -1; color: red; } /* Отдельный стиль для кнопки "Отмена" */ Footer { dock: bottom; height: 1; } """ BINDINGS = [ # ("b", "backup", "Backup"), # ("r", "restore", "Restore"), # ("c", "copy", "Copy"), # ("s", "service", "Service"), ("q", "quit", "Quit"), ("right,down,tab", "focus_next", "Focus Next"), # ("down", "focus_next", "Focus Next"), # Стрелочки для навигации (вниз/вправо) ("left", "focus_previous", "Focus Previous"), ("up", "focus_previous", "Focus Previous"), # Стрелочки (вверх/влево) ("enter", "activate", "Activate"), ("escape", "handle_escape()", "Cancel/Back") # Обработка Escape ] # Атрибут для хранения ссылки на виджет выбора, если он отображен # --- Атрибуты для управления потоком и состоянием --- active_flow_type: Optional[str] = None # Тип текущего активного потока (например, "backup") flow_data: Dict[str, Any] = {} # Данные, собранные в ходе выполнения потока # Словарь для хранения экземпляров StepWidget текущего потока, в нем сохраним "замороженные" шаги на экране. # Ключ - step_level (int), значение - экземпляр StepWidget. mounted_step_widgets: Dict[int, StepWidget] = {} # selection_widget_instance: Optional[SelectionWidget] = None # Станет ненужным # current_step_widget: Optional[StepWidget] = None # Новый def __init__(self, log_level_int: int = logging.INFO, app_config: Dict[str, Any] = None, early_log_handler: Optional[MemoryHandler] = None, **kwargs): super().__init__(**kwargs) self.log_level_int = log_level_int self.app_config = app_config if app_config is not None else {} # Сохраняем конфигурацию self.early_log_handler = early_log_handler # <--- Сохраняем self.selection_widget_instance = None # Инициализируем атрибут для виджета выбора logger.debug(f"TUI инициализирован. Log Level: `{logging.getLevelName(self.log_level_int)}`;" f" Config Keys: {list(self.app_config.keys())}") def compose(self) -> ComposeResult: """ Создает основной DOM приложения. """ yield Header() # Верхний колонтитул with Vertical(id="main_content_area"): # Верхний колонтитул yield Static("PGanec TUI", id="title") # Заголовок приложения yield MainMenu() # Главное меню # Другие элементы основного интерфейса могут быть здесь # Место для нашего виджета выбора будет здесь. # Мы не добавляем его сразу, а будем делать это динамически. # Можно добавить "якорь" - пустой контейнер, если нужно точное позиционирование # yield Container(id="selection_widget_placeholder") yield Vertical(id="steps_container") # Контейнер, куда будут добавляться StepWidget'ы yield Log(id="app_log_viewer", highlight=True, auto_scroll=True) # Лог внизу yield Footer() # Нижний колонтитул (для биндингов) def set_main_menu_enabled(self, enabled: bool): """ Включает или отключает кнопки верхнего меню. """ try: menu = self.query_one(MainMenu) for btn in menu.query(Button): btn.disabled = not enabled logger.debug(f"Главное меню {'включено' if enabled else 'выключено'}.") except DOMQuery.DoesNotExist: logger.error("Такого быть не должно, но MainMenu не найден... невозможно изменение состояния кнопок.") async def _clear_current_flow_display(self, full_reset: bool = False): """ Очищает отображаемые виджеты шагов из контейнера #steps_container. Если full_reset=True, также сбрасывает состояние потока и включает главное меню. """ logger.debug(f"Очистка отображения потока. Full reset: {full_reset}") # steps_container = self.query_one("#steps_container", Vertical) # Удаляем все StepWidget из DOM и из нашего словаря отслеживания widgets_to_remove = list(self.mounted_step_widgets.values()) for widget in widgets_to_remove: if widget.is_mounted: try: await widget.remove() except Exception as e: logger.error(f"Ошибка при удалении StepWidget {widget.id}: {e}") self.mounted_step_widgets.clear() if full_reset: self.active_flow_type = None self.flow_data = {} self.set_main_menu_enabled(True) logger.info("Поток полностью сброшен, главное меню включено.") async def _show_next_step_widget(self, widget_type: str, title: str, step_level: int, context_data: Optional[Dict[str, Any]] = None): """ Создает и показывает новый StepWidget для следующего шага. Предыдущие "замороженные" виджеты остаются на экране. """ self.set_main_menu_enabled(False) # Отключаем меню, пока активен поток # Создаем новый экземпляр StepWidget new_step_widget = StepWidget( type_widget=widget_type, title_text=title, step_level=step_level, context_data=context_data, id=f"step_{step_level}_{widget_type.lower().replace(' ', '_')}" # Уникальный ID ) # Сохраняем ссылку на созданный виджет self.mounted_step_widgets[step_level] = new_step_widget # Монтируем виджет в специальный контейнер steps_container = self.query_one("#steps_container", Vertical) await steps_container.mount(new_step_widget) # Фокус будет установлен в on_mount самого StepWidget logger.info( f"StepWidget {new_step_widget.id} (type: {widget_type}, step: {step_level}) добавлен и смонтирован.") # --- Действия, запускаемые из BINDINGS или кнопок меню --- async def action_start_flow(self, flow_type: str): """Общий метод для запуска нового потока операций по его типу.""" if self.active_flow_type is not None: logger.warning( f"Поток '{self.active_flow_type}' уже активен. " f"Новый поток '{flow_type}' не может быть запущен без отмены предыдущего." ) self.bell() # Звуковой сигнал об ошибке return logger.info(f"Запуск нового потока: {flow_type}") await self._clear_current_flow_display(full_reset=True) # Полный сброс перед новым потоком self.active_flow_type = flow_type.lower() self.flow_data = {} # Очищаем данные предыдущего потока # Маршрутизация к первому шагу конкретного потока if self.active_flow_type == "backup": await self._backup_step_1_select_server() elif self.active_flow_type == "restore": logger.warning(f"Поток '{self.active_flow_type}' еще не реализован.") await self._clear_current_flow_display(full_reset=True) # Пока просто сбрасываем elif self.active_flow_type == "copy": logger.warning(f"Поток '{self.active_flow_type}' еще не реализован.") await self._clear_current_flow_display(full_reset=True) elif self.active_flow_type == "service": logger.warning(f"Поток '{self.active_flow_type}' еще не реализован.") await self._clear_current_flow_display(full_reset=True) else: logger.error(f"Попытка запустить неизвестный тип потока: {flow_type}") await self._clear_current_flow_display(full_reset=True) # Сбрасываем, если тип потока не опознан # --- Пример реализации потока: Backup --- async def _backup_step_1_select_server(self): """Первый шаг потока 'Backup': выбор сервера.""" logger.debug("Backup Flow: Запрос шага 1 - выбор сервера.") await self._show_next_step_widget( widget_type=WidgetType.BD_SERVERS_LIST, title="Backup: Шаг 1 - Выберите сервер", step_level=0 ) async def _backup_step_2_select_db(self, server_config: Dict[str, Any]): """Второй шаг потока 'Backup': выбор базы данных на указанном сервере.""" logger.debug(f"Backup Flow: Запрос шага 2 - выбор БД для сервера '{server_config.get('name')}'.") await self._show_next_step_widget( widget_type=WidgetType.DBS_IN_SERVER, title=f"Backup: Шаг 2 - Выберите БД на сервере '{server_config.get('name', '???')}'", step_level=1, context_data={"server_config": server_config} # Передаем конфиг сервера в StepWidget ) async def _backup_step_3_select_target(self, server_config: Dict[str, Any], db_name: str): """Третий шаг потока 'Backup': выбор места сохранения.""" logger.debug(f"Backup Flow: Запрос шага 3 - выбор цели для БД '{db_name}'.") await self._show_next_step_widget( widget_type=WidgetType.TARGETS, # Предполагаем, что есть такой тип для выбора цели title=f"Backup: Шаг 3 - Куда сохранить бэкап для '{db_name}'?", step_level=2, context_data={"server_config": server_config, "db_name": db_name} # Передаем контекст ) # --- Обработчики сообщений от StepWidget --- @on(StepWidget.SelectionMade) async def handle_step_selection(self, message: StepWidget.SelectionMade): """Обрабатывает сообщение о сделанном выборе в одном из StepWidget.""" sender_widget = message.sender_widget current_step_level = sender_widget.step_level selected_data = message.selection_data display_value = message.display_value # Текст, который StepWidget отобразит в "замороженном" виде logger.info( f"PGanecApp: Выбор сделан (Поток: {self.active_flow_type}, Шаг: {current_step_level}): " f"Данные='{selected_data}', Отображение='{display_value}'" ) # sender_widget уже вызвал self.freeze() и обновил свое отображение. # Маршрутизация на основе текущего потока и завершенного шага if self.active_flow_type == "backup": if current_step_level == 0: # Завершился шаг выбора сервера self.flow_data["server_config"] = selected_data await self._backup_step_2_select_db(server_config=selected_data) elif current_step_level == 1: # Завершился шаг выбора БД self.flow_data["db_name"] = selected_data # Передаем server_config из сохраненных данных потока await self._backup_step_3_select_target( server_config=self.flow_data["server_config"], db_name=selected_data ) elif current_step_level == 2: # Завершился шаг выбора цели self.flow_data["target_info"] = selected_data logger.info(f"ПОТОК BACKUP ЗАВЕРШЕН! Все данные для бэкапа собраны: {self.flow_data}") self.bell() # Здесь можно: # 1. Показать финальное подтверждение в новом StepWidget (type=CONFIRMATION). # 2. Добавить кнопку "Выполнить бэкап" к последнему замороженному StepWidget. # 3. Сразу запустить операцию бэкапа. # Пока просто выведем сообщение и очистим поток для следующего раза. # Пример добавления кнопки "Выполнить" к последнему виджету if sender_widget.is_mounted: # Убедимся, что виджет еще на экране execute_button = Button("Запустить бэкап!", id="execute_backup_action", variant="success") try: # Удаляем кнопку отмены, если она там была (не должна быть в замороженном) sender_widget.query_one(".step-cancel-button").remove() except DOMQuery.DoesNotExist: pass await sender_widget.mount(execute_button) # Добавляем кнопку выполнения # await self._clear_current_flow_display(full_reset=True) # Пока не очищаем, чтобы увидеть кнопку else: logger.warning(f"Получен выбор для неожиданного шага {current_step_level} в потоке 'backup'.") # Добавить elif для других active_flow_type (restore, copy, service) # elif self.active_flow_type == "restore": # ... @on(StepWidget.Cancelled) async def handle_step_cancellation(self, message: StepWidget.Cancelled): """Обрабатывает сообщение об отмене текущего шага в StepWidget.""" cancelled_widget = message.sender_widget cancelled_step_level = cancelled_widget.step_level logger.info( f"PGanecApp: Шаг {cancelled_step_level} (Поток: {self.active_flow_type}) отменен пользователем." ) # cancelled_widget уже вызвал await self.remove() и удалил себя из DOM. # Удаляем отмененный виджет и все последующие (если они были ошибочно созданы) # из нашего словаря отслеживания. levels_to_clear_from_tracking = [ lvl for lvl in self.mounted_step_widgets if lvl >= cancelled_step_level ] for lvl in levels_to_clear_from_tracking: self.mounted_step_widgets.pop(lvl, None) # Удаляем из словаря # Очищаем соответствующие данные из self.flow_data # Это зависит от структуры self.flow_data для каждого потока if self.active_flow_type == "backup": if cancelled_step_level <= 2 and "target_info" in self.flow_data: del self.flow_data["target_info"] if cancelled_step_level <= 1 and "db_name" in self.flow_data: del self.flow_data["db_name"] if cancelled_step_level == 0 and "server_config" in self.flow_data: del self.flow_data["server_config"] # ... (elif для других active_flow_type) if not self.mounted_step_widgets: # Если не осталось "замороженных" виджетов (отменили первый шаг) logger.info("Отменен первый или единственный шаг. Поток завершен. Возврат в главное меню.") await self._clear_current_flow_display(full_reset=True) # Полный сброс else: # "Размораживаем" предыдущий шаг, который теперь стал последним активным # Находим максимальный step_level среди оставшихся смонтированных виджетов previous_step_level = max(self.mounted_step_widgets.keys()) previous_widget_to_unfreeze = self.mounted_step_widgets.get(previous_step_level) if previous_widget_to_unfreeze and previous_widget_to_unfreeze.is_mounted: logger.info( f"Возврат к предыдущему шагу {previous_step_level}. Размораживаем виджет {previous_widget_to_unfreeze.id}.") previous_widget_to_unfreeze.unfreeze() # После unfreeze, compose виджета перерисует его в активном состоянии. # Устанавливаем фокус на "размороженный" виджет. # Его on_mount уже отработал, так что нужно вызвать focus() явно. self.call_after_refresh(previous_widget_to_unfreeze.focus) # Фокус после перерисовки else: # Этого не должно произойти, если mounted_step_widgets не пуст, но на всякий случай logger.error("Не удалось найти предыдущий шаг для разморозки. Полный сброс потока.") await self._clear_current_flow_display(full_reset=True) # async def show_selection_widget(self, action_type: str) -> None: # """Показывает виджет выбора для .""" # # Сначала удаляем старый виджет, если он есть, чтобы не было дублей # if self.selection_widget_instance: # try: # await self.selection_widget_instance.remove() # except Exception as e: # DOMObjectMissingError может возникнуть, если уже удален # logger.debug(f"Ошибка при удалении старого selection_widget_instance (возможно, уже удален): {e}") # self.selection_widget_instance = None # # # Создаем новый экземпляр виджета # self.selection_widget_instance = SelectionWidget( # app_config=self.app_config, # action_type=action_type, # id="select_any" # Даем ему id для возможного поиска # ) # # self.selection_widget_active = True # Выставляем флаг, что виджет активен # self.set_main_menu_enabled(False) # Отключаем верхнее меню # # Находим контейнер, куда его добавить. # # Будем добавлять после MainMenu внутри #main_content_area # main_content_area = self.query_one("#main_content_area", Vertical) # main_menu = self.query_one(MainMenu) # # # Добавляем виджет в DOM после MainMenu # # `mount` добавляет виджет и вызывает его on_mount # await main_content_area.mount(self.selection_widget_instance, after=main_menu) # self.selection_widget_instance.focus() # Пытаемся установить фокус # logger.info("SelectionWidget добавлен в DOM.") async def on_button_pressed(self, event: Button.Pressed) -> None: """ Обработчик нажатия (Enter или Click) кнопки в верхнем меню. :param event: Полученное событие нажатия кнопки на верхнем меню. :return: """ button_id = event.button.id button_classes = event.button.classes # if button_id == "backup": # logger.debug("TUI: Enter или Click на кнопку 'Backup' -- Инициализируем виджет выбора.") # await self.show_selection_widget("backup") # elif button_id == "restore": # logger.debug("TUI: Enter или Click на кнопку 'Restore' -- Инициализируем виджет выбора.") # await self.show_selection_widget("restore") # elif button_id == "copy": # logger.debug("TUI: Enter или Click на кнопку 'Copy' -- Инициализируем виджет выбора.") # await self.show_selection_widget("copy") # elif button_id == "service": # logger.debug("TUI: Enter или Click на кнопку 'Service' -- Инициализируем виджет выбора.") # await self.show_selection_widget("service") # elif button_id == "quit": # await self.action_quit() # Проверяем, является ли нажатая кнопка частью главного меню if "main-menu-button" in button_classes: # Если активен какой-либо поток if self.mounted_step_widgets: logger.warning( f"Нажата кнопка главного меню '{button_id}', но активен поток '{self.active_flow_type}'. " "Остаемся в потоке." ) self.bell() # Или может выйти из потока. Пока не решил... # await self._clear_current_flow_display(full_reset=True) # Не продолжаем обработку кнопки меню, если только что прервали поток # Однако, если пользователь хочет выйти (quit), это нужно разрешить. # if button_id == "quit": # await self.action_quit() return # Если поток не активен, обрабатываем кнопки главного меню if button_id == "backup": logger.debug("TUI: Нажата кнопка 'Backup' -- Инициализируем виджет выбора.") await self.action_start_flow("backup") elif button_id == "restore": logger.debug("TUI: Нажата кнопка 'Restore' -- Инициализируем виджет выбора.") await self.action_start_flow("restore") elif button_id == "copy": logger.debug("TUI: Нажата кнопка 'Copy' -- Инициализируем виджет выбора.") await self.action_start_flow("copy") elif button_id == "service": logger.debug("TUI: Нажата кнопка 'Service' -- Инициализируем виджет выбора.") await self.action_start_flow("service") elif button_id == "quit": logger.debug("TUI: Нажата кнопка 'Quit' -- Завершаем приложение.") await self.action_quit() else: logger.warning(f"Не может быть! Неизвестная кнопка главного меню: {button_id}") # Это не главное меню, а кнопка внутри "потока" elif button_id == "execute_backup_action": # Обработка кнопки "Запустить бэкап" if self.active_flow_type == "backup" and self.flow_data.get("target_info"): logger.info(f"ПОЛЬЗОВАТЕЛЬ НАЖАЛ 'ЗАПУСТИТЬ БЭКАП' с данными: {self.flow_data}") # >>> ЗДЕСЬ ВЫЗЫВАЕТСЯ РЕАЛЬНАЯ ЛОГИКА БЭКАПА <<< self.bell() # Сигнал о начале # Имитация выполнения (тут будут системные команды через Plumbum или другие подобные библиотеки) await asyncio.sleep(1) self.bell() await asyncio.sleep(1) self.bell() # Сигнал о завершении logger.info("Бэкап (имитация) завершен!") await self._clear_current_flow_display(full_reset=True) else: logger.warning("Попытка запустить бэкап без завершенного потока или неверный тип потока.") self.bell() else: # Это может быть кнопка из другого места, если такие появятся. # Пока что, если это не кнопка меню и не специальная кнопка, логируем. logger.debug(f"Нажата кнопка с ID '{button_id}', не относящаяся к главному меню или известным действиям.") def on_mount(self) -> None: """Вызывается после монтирования DOM приложения.""" # --- Обжужукиваем логирование в #app_log_viewer --- log_viewer_widget = self.query_one("#app_log_viewer", Log) textual_handler = TextualLogHandler(log_viewer_widget) if self.log_level_int >= logging.INFO: formatter = logging.Formatter( fmt="%(asctime)s [%(levelname)-8s]\t %(message)s\n", datefmt="%H:%M:%S" # Формат времени: часы:минуты:секунды ) else: formatter = logging.Formatter( fmt="%(asctime)s [%(levelname)-8s] %(module)s.%(funcName)s:%(lineno)04d\t- %(message)s\n", datefmt="%H:%M:%S.%s" # Формат времени: часы:минуты:секунды.милисекунды ) textual_handler.setFormatter(formatter) textual_handler.setLevel(self.log_level_int) # Получаем корневой логгер target_logger = logging.getLogger() target_logger.addHandler(textual_handler) # Добавляем наш обработчик # --- -- Перенос и сброс ранних логов из MemoryHandler -- --- if self.early_log_handler: # Устанавливаем наш textual_handler как цель для MemoryHandler self.early_log_handler.setTarget(textual_handler) # Сбрасываем все накопленные записи self.early_log_handler.flush() # Закрываем и удаляем MemoryHandler из корневого логера, так как он больше не нужен self.early_log_handler.close() target_logger.removeHandler(self.early_log_handler) logger.debug("TUI: Перенесли и закрыли ранние логи… Удалили ранний MemoryHandler из корневого логера…") self.early_log_handler = None # Очищаем ссылку # --- -- Конец переноса и сброса -- --- # --- Конец обжужукивания логирования --- logger.info("TUI-логгер инициализирован") logger.debug("test TUI-logger: DEBUG") logger.warning("test TUI-logger: WARNING") logger.error("test TUI-logger: ERROR") async def action_activate_focused_widget(self) -> None: """Активирует (нажимает) сфокусированный виджет, если это кнопка.""" focused_widget = self.focused if isinstance(focused_widget, Button): logger.debug(f"action_activate: Имитация нажатия Enter на кнопке '{focused_widget.id or focused_widget.label}'") await focused_widget.press() # Textual сам обработает это как клик elif focused_widget: logger.debug(f"action_activate: Enter нажат на '{type(focused_widget).__name__}', не являющемся кнопкой.") else: logger.debug("action_activate: Нет сфокусированного элемента.") async def action_handle_escape(self) -> None: """Обрабатывает нажатие клавиши Escape.""" if self.mounted_step_widgets: # Если есть активные шаги, Escape должен отменить последний активный шаг logger.debug("Нажата Escape, активен поток. Попытка отменить последний шаг.") # Находим самый "верхний" (последний добавленный) активный (не замороженный) StepWidget active_step_to_cancel: Optional[StepWidget] = None if self.mounted_step_widgets: # Проверяем, что словарь не пуст # Ищем виджет с максимальным step_level, который не заморожен # (хотя по логике, не заморожен должен быть только один - самый последний) max_level = -1 for level, widget_instance in self.mounted_step_widgets.items(): if not widget_instance.is_frozen and level > max_level: # Ищем самый "свежий" активный max_level = level active_step_to_cancel = widget_instance if active_step_to_cancel: # Имитируем нажатие его кнопки "Отмена" cancel_button_of_active_step = active_step_to_cancel.query_one( f"#{active_step_to_cancel._cancel_button_id}", Button ) await cancel_button_of_active_step.press() else: logger.warning("Escape: Не найден активный шаг для отмены, хотя виджеты шагов есть.") self.bell() else: # Этого не должно быть, если self.mounted_step_widgets не пуст await self._clear_current_flow_display(full_reset=True) else: # Если нет активных шагов, Escape может, например, ничего не делать или закрывать приложение logger.debug("Нажата Escape, нет активного потока. Действий не предусмотрено (или можно добавить выход).") self.bell() # Просто сигнал, что нажатие обработано, но без видимого эффекта # async def action_activate(self) -> None: # """ # Обработчик нажатия клавиши Enter. # :return: # """ # focused = self.focused # if not focused: # logger.debug("TUI: action_activate вызван, но нет сфокусированного элемента.") # return # else: # if isinstance(focused, Button): # # Проверяем, не является ли кнопка частью нашего виджета выбора # if not isinstance(focused, Button): # Если не кнопка - ничего не делаем # logger.debug("TUI: Что-то нажато (Enter), но это не кнопка.") # return # # # if focused.parent and isinstance(focused.parent, SelectionWidget): # # await focused.parent.on_button_pressed(Button.Pressed(focused)) # # else: # # await self.on_button_pressed(Button.Pressed(focused)) # # --- Обработчики быстрых клавиш --- # # Для 'q' (Quit) уже есть в BINDINGS срабатывающий метод action_quit, он сработает автоматически (не ясно почему). # async def action_backup(self): # """ # Обработчик нажатия быстрой клавиши 'b' для инициализации виджета создания бэкапа. # :return: # """ # if getattr(self, "selection_widget_active", False): # Проверяем, активен ли виджет выбора # return # Если активен, ничего не делаем (отменяем нажатие клавиши) # self.query_one("#backup", Button).focus() # подсветим кнопку # logger.debug("TUI: Быстрая клавиша 'b' -- Инициализируем виджет.") # await self.show_selection_widget("backup") # async def action_restore(self): # """ # Обработчик нажатия быстрой клавиши 'r' для восстановления. # :return: # """ # if getattr(self, "selection_widget_active", False): # Проверяем, активен ли виджет выбора # return # self.query_one("#restore", Button).focus() # подсветим кнопку # logger.debug("TUI: Быстрая клавиша 'r' -- Инициализируем виджет.") # await self.show_selection_widget("restore") # async def action_copy(self): # """ # Обработчик нажатия быстрой клавиши 'c' для копирования. # :return: # """ # if getattr(self, "selection_widget_active", False): # Проверяем, активен ли виджет выбора # return # self.query_one("#copy", Button).focus() # logger.debug("TUI: Быстрая клавиша 'c' -- Инициализируем виджет.") # await self.show_selection_widget("copy") # async def action_service(self): # """ # Обработчик нажатия быстрой клавиши 's' для сервисных действий. # :return: # """ # if getattr(self, "selection_widget_active", False): # Проверяем, активен ли виджет выбора # return # self.query_one("#service", Button).focus() # logger.debug("TUI: Быстрая клавиша 's' -- Инициализируем виджет.") # await self.show_selection_widget("service")