2023_tacacs_watcher/main.py

119 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import platform
import time
import sys
import signal
import toml
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path
def print_hi(name):
# Use a breakpoint in the code line below to debug your script.
print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint.\
# Обработчик сигнала SIGTERM (сигнал остановки от systemd)
def handler_stop(signum, frame) -> None:
print(f'Terminate by signal: {signum}')
sys.exit(0)
# Чтение файла конфигурации
def read_toml(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as file:
data = toml.load(file)
return data
except FileNotFoundError:
print(f"Config `{file_path}` not found.")
sys.exit(1)
except toml.decoder.TomlDecodeError as e:
print(f"Error decoding config-file from `{file_path}`: {e}.")
sys.exit(1)
class LogFileHandler(FileSystemEventHandler):
def __init__(self, log_filename):
self.log_filename = log_filename
def on_modified(self, event):
if event.is_directory:
return
print(event)
# elif event.event_type == 'modified' and event.src_path == self.log_filename:
# # Обработка изменений в лог-файле
# with open(event.src_path, 'r') as file:
# new_lines = file.readlines()
# # Обработка новых строк
# for line in new_lines:
# print(f"Изменение в {self.log_filename}:: {line}", end='')
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
# Получение информации о системе
# system_info = platform.system()
# print(f"System: {system_info}")
# Установка обработчика сигнала
signal.signal(signal.SIGTERM, handler_stop)
# Считывание имени config-файла их аргументов командной строки
try:
config_file = sys.argv[1] # первый аргумент после имени скрипта
except IndexError:
config_file = "tacacs_watcher.conf"
# Чтение файла конфигурации, приветствие и выставление интервала
config = read_toml(config_file)
if 'hello' in config:
print(config['hello'])
interval = config['interval'] if 'interval' in config else 1
# получаем все группы в конфиге
group_keys = [key for key, value in config.items() if isinstance(value, dict)]
count_valid_groups = 0
observers = []
for group in group_keys:
if 'log_in' not in config[group]:
print(f"Config '{config_file}' not have key 'log_in' in group '{group}' (see config '{config_file}').")
continue
if not os.path.isfile(config[group]['log_in']):
print(f"File '{config[group]['log_in']}' not exist (see config '{config_file}').")
continue
if 'outdir' not in config[group]:
print(f"Config '{config_file}' not have key 'outdir' in group '{group}' (see config '{config_file}').")
continue
if not os.path.isdir(config[group]['outdir']):
print(f"Directory '{config[group]['outdir']}' not exist (see config '{config_file}').")
continue
observers.append(Observer())
observers[count_valid_groups].schedule(LogFileHandler(config[group]['log_in']),
path=os.path.dirname(config[group]['log_in']),
recursive=False)
observers[count_valid_groups].start()
count_valid_groups += 1
# observer = Observer()
# event_handler = LogFileHandler(config[group]['log_in'])
# observer.schedule(event_handler, path=os.path.dirname(config[group]['log_in']), recursive=False)
if count_valid_groups == 0:
# если нет ни одной валидной группы, то выходим
print(f"Config '{config_file}' not have valid groups (see config '{config_file}').")
sys.exit(1)
# все обработчики в этом цикле
try:
while True:
# вечный цикл, чтобы программа не завершилась никогда (только принудительно или через SIGTERM)
print_hi('PyCharm')
time.sleep(interval)
except KeyboardInterrupt:
for observer in observers:
observer.stop()
print('Terminate by `KeyboardInterrupt`.')
sys.exit(0)
except ValueError:
print('Error value in config-file.')