прак_зад2
35. Модуль для поиска и архивирования файлов
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import re
import shutil
import zipfile
import tarfile
import argparse
from datetime import datetime
class FileProcessor:
"""
Класс для поиска, копирования и архивирования файлов
"""
def __init__(self, source_dir, output_dir=None, extensions=None, recursive=True,
exclude_dirs=None, name_pattern=None, min_size=None, max_size=None,
newer_than=None, older_than=None):
"""
Инициализирует обработчик файлов
Args:
source_dir (str): Директория для поиска файлов
output_dir (str): Директория для сохранения результатов
extensions (list): Список расширений для поиска
recursive (bool): Рекурсивный поиск в поддиректориях
exclude_dirs (list): Список директорий для исключения
name_pattern (str): Регулярное выражение для фильтрации по имени
min_size (int): Минимальный размер файла в байтах
max_size (int): Максимальный размер файла в байтах
newer_than (str): Искать файлы новее указанной даты (формат: YYYY-MM-DD)
older_than (str): Искать файлы старее указанной даты (формат: YYYY-MM-DD)
"""
self.source_dir = os.path.abspath(source_dir)
self.output_dir = os.path.abspath(output_dir) if output_dir else None
self.extensions = [ext.lower() for ext in extensions] if extensions else None
self.recursive = recursive
self.exclude_dirs = [os.path.abspath(d) for d in exclude_dirs] if exclude_dirs else []
self.name_pattern = re.compile(name_pattern) if name_pattern else None
self.min_size = min_size
self.max_size = max_size self.newer_than = self._parse_date(newer_than) if newer_than else None
self.older_than = self._parse_date(older_than) if older_than else None
self.found_files = []
# Проверяем существование исходной директории
if not os.path.isdir(self.source_dir):
raise ValueError(f"Директория '{self.source_dir}' не существует")
# Создаем выходную директорию, если она указана и не существует
if self.output_dir and not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
def _parse_date(self, date_str):
"""
Преобразует строку даты в объект datetime
Args:
date_str (str): Строка даты в формате YYYY-MM-DD
Returns:
datetime: Объект datetime или None в случае ошибки
"""
try:
return datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
print(f"Ошибка: некорректный формат даты '{date_str}', ожидается YYYY-MM-DD")
return None
def _match_file(self, file_path):
"""
Проверяет, соответствует ли файл заданным критериям
Args:
file_path (str): Путь к файлу
Returns:
bool: True, если файл соответствует критериям, иначе False
"""
# Получаем информацию о файле
filename = os.path.basename(file_path)
_, extension = os.path.splitext(filename)
extension = extension.lower()
# Проверяем расширение
if self.extensions and extension[1:] not in self.extensions:
return False
# Проверяем шаблон имени
if self.name_pattern and not self.name_pattern.search(filename):
return False
# Проверяем размер файла
file_size = os.path.getsize(file_path)
if self.min_size is not None and file_size < self.min_size:
return False
if self.max_size is not None and file_size > self.max_size:
return False
# Проверяем дату модификации
mod_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if self.newer_than and mod_time < self.newer_than:
return False
if self.older_than and mod_time > self.older_than:
return False
return True
def find_files(self):
"""
Ищет файлы, соответствующие заданным критериям
Returns:
list: Список путей к найденным файлам
"""
self.found_files = []
# Функция для обхода директории
def walk_directory(directory):
try:
# Проверяем, не исключена ли директория
abs_dir = os.path.abspath(directory)
if abs_dir in self.exclude_dirs:
return
# Получаем список файлов и поддиректорий
with os.scandir(directory) as entries:
for entry in entries:
# Пропускаем скрытые файлы и директории
if entry.name.startswith('.'):
continue
# Обрабатываем файлы
if entry.is_file():
if self._match_file(entry.path):
self.found_files.append(entry.path)
# Обрабатываем поддиректории, если включен рекурсивный поиск
elif entry.is_dir() and self.recursive:
# Проверяем, не исключена ли поддиректория
abs_subdir = os.path.abspath(entry.path)
if abs_subdir not in self.exclude_dirs:
walk_directory(entry.path)
except PermissionError as e:
print(f"Ошибка доступа: {e}")
except Exception as e:
print(f"Ошибка при обходе директории '{directory}': {e}")
# Начинаем обход с исходной директории
walk_directory(self.source_dir)
print(f"Найдено {len(self.found_files)} файлов")
return self.found_files
def copy_files(self, preserve_structure=False):
"""
Копирует найденные файлы в выходную директорию
Args:
preserve_structure (bool): Сохранять структуру директорий
Returns:
int: Количество скопированных файлов
"""
if not self.output_dir:
raise ValueError("Не указана выходная директория")
if not self.found_files:
self.find_files()
if not self.found_files:
print("Нет файлов для копирования")
return 0
copied_count = 0
for src_path in self.found_files:
try:
# Определяем путь для копирования
if preserve_structure:
# Сохраняем структуру директорий относительно исходной директории
rel_path = os.path.relpath(src_path, self.source_dir)
dst_path = os.path.join(self.output_dir, rel_path)
# Создаем директории, если они не существуют
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
else:
# Копируем файлы в выходную директорию без сохранения структуры
dst_path = os.path.join(self.output_dir, os.path.basename(src_path))
# Обрабатываем дубликаты имен
if os.path.exists(dst_path):
name, ext = os.path.splitext(os.path.basename(src_path))
dst_path = os.path.join(self.output_dir, f"{name}_{copied_count}{ext}")
# Копируем файл
shutil.copy2(src_path, dst_path)
copied_count += 1
print(f"Скопирован файл: {src_path} -> {dst_path}")
except Exception as e:
print(f"Ошибка при копировании файла '{src_path}': {e}")
print(f"Скопировано {copied_count} файлов")
return copied_count
def create_zip_archive(self, archive_path, compress_level=zipfile.ZIP_DEFLATED, preserve_structure=False):
"""
Создает ZIP-архив с найденными файлами
Args:
archive_path (str): Путь к архиву
compress_level (int): Уровень сжатия
preserve_structure (bool): Сохранять структуру директорий
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if not self.found_files:
self.find_files()
if not self.found_files:
print("Нет файлов для архивирования")
return False
try:
# Создаем директорию для архива, если она не существует
archive_dir = os.path.dirname(os.path.abspath(archive_path))
os.makedirs(archive_dir, exist_ok=True)
# Создаем архив
with zipfile.ZipFile(archive_path, 'w', compress_level) as zipf:
for src_path in self.found_files:
try:
# Определяем путь для архивирования
if preserve_structure:
# Сохраняем структуру директорий относительно исходной директории
arcname = os.path.relpath(src_path, self.source_dir)
else:
# Архивируем файлы без сохранения структуры
arcname = os.path.basename(src_path)
# Добавляем файл в архив
zipf.write(src_path, arcname)
print(f"Добавлен файл: {src_path} -> {arcname}")
except Exception as e:
print(f"Ошибка при добавлении файла '{src_path}' в архив: {e}")
print(f"Создан архив: {archive_path} ({len(self.found_files)} файлов)")
return True
except Exception as e:
print(f"Ошибка при создании архива: {e}")
return False
def create_tar_archive(self, archive_path, compress_type='gz', preserve_structure=False):
"""
Создает TAR-архив с найденными файлами
Args:
archive_path (str): Путь к архиву
compress_type (str): Тип сжатия ('gz', 'bz2', 'xz' или None)
preserve_structure (bool): Сохранять структуру директорий
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if not self.found_files:
self.find_files()
if not self.found_files:
print("Нет файлов для архивирования")
return False
# Определяем режим открытия архива
mode = 'w'
if compress_type == 'gz':
mode = 'w:gz'
elif compress_type == 'bz2':
mode = 'w:bz2'
elif compress_type == 'xz':
mode = 'w:xz'
try:
# Создаем директорию для архива, если она не существует
archive_dir = os.path.dirname(os.path.abspath(archive_path))
os.makedirs(archive_dir, exist_ok=True)
# Создаем архив
with tarfile.open(archive_path, mode) as tar:
for src_path in self.found_files:
try:
# Определяем путь для архивирования
if preserve_structure:
# Сохраняем структуру директорий относительно исходной директории
arcname = os.path.relpath(src_path, self.source_dir)
else:
# Архивируем файлы без сохранения структуры
arcname = os.path.basename(src_path)
# Добавляем файл в архив
tar.add(src_path, arcname=arcname)
print(f"Добавлен файл: {src_path} -> {arcname}")
except Exception as e:
print(f"Ошибка при добавлении файла '{src_path}' в архив: {e}")
print(f"Создан архив: {archive_path} ({len(self.found_files)} файлов)")
return True
except Exception as e:
print(f"Ошибка при создании архива: {e}")
return False
def print_file_info(self):
"""
Выводит информацию о найденных файлах
"""
if not self.found_files:
self.find_files()
if not self.found_files:
print("Файлы не найдены")
return
print(f"Найдено {len(self.found_files)} файлов:")
total_size = 0
for i, file_path in enumerate(self.found_files, 1):
try:
file_size = os.path.getsize(file_path)
mod_time = datetime.fromtimestamp(os.path.getmtime(file_path))
# Форматируем размер файла
if file_size < 1024:
size_str = f"{file_size} B"
elif file_size < 1024 * 1024:
size_str = f"{file_size / 1024:.2f} KB"
elif file_size < 1024 * 1024 * 1024:
size_str = f"{file_size / (1024 * 1024):.2f} MB"
else:
size_str = f"{file_size / (1024 * 1024 * 1024):.2f} GB"
print(f"{i}. {file_path}")
print(f" Размер: {size_str}, Изменен: {mod_time.strftime('%Y-%m-%d %H:%M:%S')}")
total_size += file_size
except Exception as e:
print(f"{i}. {file_path}")
print(f" Ошибка при получении информации: {e}")
# Форматируем общий размер
if total_size < 1024:
total_size_str = f"{total_size} B"
elif total_size < 1024 * 1024:
total_size_str = f"{total_size / 1024:.2f} KB"
elif total_size < 1024 * 1024 * 1024:
total_size_str = f"{total_size / (1024 * 1024):.2f} MB"
else:
total_size_str = f"{total_size / (1024 * 1024 * 1024):.2f} GB"
print(f"\nОбщий размер: {total_size_str}")
def main():
parser = argparse.ArgumentParser(description='Поиск и архивирование файлов')
parser.add_argument('--source', '-s', required=True,
help='Директория для поиска файлов')
parser.add_argument('--output', '-o',
help='Директория для сохранения скопированных файлов')
parser.add_argument('--extensions', '-e', nargs='+',
help='Расширения файлов для поиска (без точки)')
parser.add_argument('--no-recursive', action='store_true',
help='Отключить рекурсивный поиск в поддиректориях')
parser.add_argument('--exclude', nargs='+',
help='Директории для исключения из поиска')
parser.add_argument('--name', help='Регулярное выражение для фильтрации по имени')
parser.add_argument('--min-size', type=int, help='Минимальный размер файла в байтах')
parser.add_argument('--max-size', type=int, help='Максимальный размер файла в байтах')
parser.add_argument('--newer-than', help='Искать файлы новее указанной даты (YYYY-MM-DD)')
parser.add_argument('--older-than', help='Искать файлы старее указанной даты (YYYY-MM-DD)')
parser.add_argument('--preserve-structure', action='store_true',
help='Сохранять структуру директорий при копировании или архивировании')
# Параметры для создания архива
parser.add_argument('--zip', help='Путь к создаваемому ZIP-архиву')
parser.add_argument('--tar', help='Путь к создаваемому TAR-архиву')
parser.add_argument('--compress', choices=['gz', 'bz2', 'xz', 'none'], default='gz',
help='Тип сжатия для TAR-архива (по умолчанию: gz)')
args = parser.parse_args()
try:
# Создаем обработчик файлов
processor = FileProcessor(
source_dir=args.source,
output_dir=args.output,
extensions=args.extensions,
recursive=not args.no_recursive,
exclude_dirs=args.exclude,
name_pattern=args.name,
min_size=args.min_size,
max_size=args.max_size,
newer_than=args.newer_than,
older_than=args.older_than
)
# Ищем файлы
processor.find_files()
# Выводим информацию о найденных файлах
processor.print_file_info()
# Копируем файлы, если указана выходная директория
if args.output:
processor.copy_files(preserve_structure=args.preserve_structure)
# Создаем ZIP-архив, если указан путь
if args.zip:
processor.create_zip_archive(args.zip, preserve_structure=args.preserve_structure)
# Создаем TAR-архив, если указан путь
if args.tar:
compress_type = args.compress if args.compress != 'none' else None
processor.create_tar_archive(args.tar, compress_type, preserve_structure=args.preserve_structure)
return 0
except Exception as e:
print(f"Ошибка: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())
Использование:
# Поиск файлов с расширением .jpg и .png
python file_processor.py --source ~/Pictures --extensions jpg png
# Поиск файлов, скопировать и создать архив
python file_processor.py --source ~/Documents --extensions pdf doc docx --output ~/Backup/docs --zip ~/Backup/docs.zip
# Поиск файлов по критериям размера и даты
python file_processor.py --source ~/Downloads --min-size 1000000 --newer-than 2023-01-01
36. Модуль для аутентификации пользователя
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import hashlib
import secrets
import sqlite3
import argparse
import re
import time
import jwt
from datetime import datetime, timedelta
from flask import Flask, request, jsonify, redirect, url_for, render_template, session, flash, g
class PasswordHasher:
"""
Класс для хеширования и проверки паролей
"""
def __init__(self, salt_length=16, hash_algorithm='sha256', iterations=100000):
"""
Инициализирует хеширователь паролей
Args:
salt_length (int): Длина соли в байтах
hash_algorithm (str): Алгоритм хеширования ('sha256', 'sha512', 'sha3_256', 'sha3_512')
iterations (int): Количество итераций для PBKDF2
"""
self.salt_length = salt_length
self.hash_algorithm = hash_algorithm
self.iterations = iterations
def hash_password(self, password):
"""
Создает хеш пароля с солью
Args:
password (str): Пароль для хеширования
Returns:
str: Строка формата 'алгоритм:итерации:соль:хеш'
"""
# Генерируем случайную соль
salt = secrets.token_hex(self.salt_length)
# Создаем хеш пароля
password_hash = self._pbkdf2_hash(password, salt)
# Возвращаем строку формата 'алгоритм:итерации:соль:хеш'
return f"{self.hash_algorithm}:{self.iterations}:{salt}:{password_hash}"
def verify_password(self, password, hashed_password):
"""
Проверяет соответствие пароля хешу
Args:
password (str): Пароль для проверки
hashed_password (str): Хеш пароля в формате 'алгоритм:итерации:соль:хеш'
Returns:
bool: True, если пароль соответствует хешу, иначе False
"""
try:
# Разбираем строку хеша
parts = hashed_password.split(':')
if len(parts) != 4:
return False
algorithm, iterations, salt, stored_hash = parts
# Проверяем алгоритм и количество итераций
if algorithm != self.hash_algorithm:
# Адаптируемся к алгоритму из хеша
saved_algorithm = self.hash_algorithm
self.hash_algorithm = algorithm
result = self.verify_password(password, hashed_password)
self.hash_algorithm = saved_algorithm
return result
# Преобразуем количество итераций в число
iterations = int(iterations)
# Создаем хеш пароля с указанными параметрами
password_hash = self._pbkdf2_hash(password, salt, algorithm, iterations)
# Сравниваем хеши
return password_hash == stored_hash
except Exception:
return False
def _pbkdf2_hash(self, password, salt, algorithm=None, iterations=None):
"""
Создает PBKDF2-хеш пароля
Args:
password (str): Пароль для хеширования
salt (str): Соль в виде строки hex
algorithm (str): Алгоритм хеширования
iterations (int): Количество итераций
Returns:
str: Хеш пароля в виде строки hex
"""
if algorithm is None:
algorithm = self.hash_algorithm
if iterations is None:
iterations = self.iterations
# Получаем функцию хеширования
hash_func = self._get_hash_function(algorithm)
# Кодируем пароль и соль в байты
password_bytes = password.encode('utf-8')
salt_bytes = bytes.fromhex(salt)
# Создаем хеш
dk = self._pbkdf2_hmac(hash_func, password_bytes, salt_bytes, iterations)
# Возвращаем хеш в виде строки hex
return dk.hex()
def _get_hash_function(self, algorithm):
"""
Возвращает функцию хеширования для указанного алгоритма
Args:
algorithm (str): Алгоритм хеширования
Returns:
callable: Функция хеширования
"""
if algorithm == 'sha256':
return hashlib.sha256
elif algorithm == 'sha512':
return hashlib.sha512
elif algorithm == 'sha3_256':
return hashlib.sha3_256
elif algorithm == 'sha3_512':
return hashlib.sha3_512
else:
raise ValueError(f"Неподдерживаемый алгоритм хеширования: {algorithm}")
def _pbkdf2_hmac(self, hash_func, password, salt, iterations, dklen=64):
"""
Реализация PBKDF2-HMAC
Args:
hash_func (callable): Функция хеширования
password (bytes): Пароль в виде байтов
salt (bytes): Соль в виде байтов
iterations (int): Количество итераций
dklen (int): Длина выходного ключа
Returns:
bytes: Выходной ключ
"""
# Используем встроенную функцию PBKDF2-HMAC
return hashlib.pbkdf2_hmac(
hash_func().name,
password,
salt,
iterations,
dklen
)
class DatabaseManager:
"""
Класс для управления базой данных пользователей
"""
def __init__(self, db_path):
"""
Инициализирует менеджер базы данных
Args:
db_path (str): Путь к файлу базы данных
"""
self.db_path = db_path
self.conn = None
self.cursor = None
# Инициализируем базу данных
self._init_db()
def _get_connection(self):
"""
Получает соединение с базой данных
Returns:
sqlite3.Connection: Соединение с базой данных
"""
if self.conn is None:
# Создаем директорию для базы данных, если она не существует
db_dir = os.path.dirname(os.path.abspath(self.db_path))
os.makedirs(db_dir, exist_ok=True)
# Устанавливаем соединение
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
return self.conn
def _get_cursor(self):
"""
Получает курсор базы данных
Returns:
sqlite3.Cursor: Курсор базы данных
"""
if self.cursor is None:
self.cursor = self._get_connection().cursor()
return self.cursor
def _init_db(self):
"""
Инициализирует базу данных
"""
# Получаем соединение и курсор
conn = self._get_connection()
cursor = self._get_cursor()
# Создаем таблицу пользователей
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
first_name TEXT,
last_name TEXT,
role TEXT DEFAULT 'user',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP,
active BOOLEAN DEFAULT TRUE
)
''')
# Создаем таблицу сессий
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
token TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
ip_address TEXT,
user_agent TEXT,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
# Создаем таблицу попыток входа
cursor.execute('''
CREATE TABLE IF NOT EXISTS login_attempts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
ip_address TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
success BOOLEAN NOT NULL
)
''')
# Создаем индексы
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_username ON users (username)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_email ON users (email)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_token ON sessions (token)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions (user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_login_attempts_username ON login_attempts (username)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_login_attempts_ip_address ON login_attempts (ip_address)')
# Сохраняем изменения
conn.commit()
def create_user(self, username, email, password, first_name=None, last_name=None, role='user'):
"""
Создает нового пользователя
Args:
username (str): Имя пользователя
email (str): Email
password (str): Пароль
first_name (str): Имя
last_name (str): Фамилия
role (str): Роль пользователя
Returns:
int: ID созданного пользователя или None в случае ошибки
"""
# Проверяем уникальность имени пользователя и email
if self.get_user_by_username(username) is not None:
print(f"Пользователь с именем '{username}' уже существует")
return None
if self.get_user_by_email(email) is not None:
print(f"Пользователь с email '{email}' уже существует")
return None
try:
# Хешируем пароль
hasher = PasswordHasher()
password_hash = hasher.hash_password(password)
# Вставляем пользователя в базу данных
cursor = self._get_cursor()
cursor.execute('''
INSERT INTO users (username, email, password_hash, first_name, last_name, role)
VALUES (?, ?, ?, ?, ?, ?)
''', (username, email, password_hash, first_name, last_name, role))
# Сохраняем изменения
self._get_connection().commit()
# Возвращаем ID пользователя
return cursor.lastrowid
except Exception as e:
print(f"Ошибка при создании пользователя: {e}")
return None
def update_user(self, user_id, **kwargs):
"""
Обновляет данные пользователя
Args:
user_id (int): ID пользователя
**kwargs: Обновляемые поля (email, password, first_name, last_name, role, active)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Проверяем существование пользователя
user = self.get_user_by_id(user_id)
if user is None:
print(f"Пользователь с ID {user_id} не найден")
return False try:
# Подготавливаем запрос
set_clauses = []
params = []
# Обновляем email
if 'email' in kwargs:
# Проверяем уникальность email
existing_user = self.get_user_by_email(kwargs['email'])
if existing_user is not None and existing_user['id'] != user_id:
print(f"Пользователь с email '{kwargs['email']}' уже существует")
return False
set_clauses.append('email = ?')
params.append(kwargs['email'])
# Обновляем пароль
if 'password' in kwargs:
hasher = PasswordHasher()
password_hash = hasher.hash_password(kwargs['password'])
set_clauses.append('password_hash = ?')
params.append(password_hash)
# Обновляем имя
if 'first_name' in kwargs:
set_clauses.append('first_name = ?')
params.append(kwargs['first_name'])
# Обновляем фамилию
if 'last_name' in kwargs:
set_clauses.append('last_name = ?')
params.append(kwargs['last_name'])
# Обновляем роль
if 'role' in kwargs:
set_clauses.append('role = ?')
params.append(kwargs['role'])
# Обновляем статус активности
if 'active' in kwargs:
set_clauses.append('active = ?')
params.append(kwargs['active'])
# Если нет обновляемых полей, возвращаем успех
if not set_clauses:
return True
# Строим SQL-запрос
sql = f"UPDATE users SET {', '.join(set_clauses)} WHERE id = ?"
params.append(user_id)
# Выполняем запрос
cursor = self._get_cursor()
cursor.execute(sql, params)
# Сохраняем изменения
self._get_connection().commit()
return True
except Exception as e:
print(f"Ошибка при обновлении пользователя: {e}")
return False
def delete_user(self, user_id):
"""
Удаляет пользователя
Args:
user_id (int): ID пользователя
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Проверяем существование пользователя
user = self.get_user_by_id(user_id)
if user is None:
print(f"Пользователь с ID {user_id} не найден")
return False
try:
# Удаляем пользователя
cursor = self._get_cursor()
cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
# Сохраняем изменения
self._get_connection().commit()
return True
except Exception as e:
print(f"Ошибка при удалении пользователя: {e}")
return False
def get_user_by_id(self, user_id):
"""
Получает пользователя по ID
Args:
user_id (int): ID пользователя
Returns:
dict: Данные пользователя или None, если пользователь не найден
"""
try:
cursor = self._get_cursor()
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if user:
return dict(user)
else:
return None
except Exception as e:
print(f"Ошибка при получении пользователя: {e}")
return None
def get_user_by_username(self, username):
"""
Получает пользователя по имени пользователя
Args:
username (str): Имя пользователя
Returns:
dict: Данные пользователя или None, если пользователь не найден
"""
try:
cursor = self._get_cursor()
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
user = cursor.fetchone()
if user:
return dict(user)
else:
return None
except Exception as e:
print(f"Ошибка при получении пользователя: {e}")
return None
def get_user_by_email(self, email):
"""
Получает пользователя по email
Args:
email (str): Email
Returns:
dict: Данные пользователя или None, если пользователь не найден
"""
try:
cursor = self._get_cursor()
cursor.execute("SELECT * FROM users WHERE email = ?", (email,))
user = cursor.fetchone()
if user:
return dict(user)
else:
return None
except Exception as e:
print(f"Ошибка при получении пользователя: {e}")
return None
def verify_user(self, username, password):
"""
Проверяет учетные данные пользователя
Args:
username (str): Имя пользователя или email
password (str): Пароль
Returns:
dict: Данные пользователя в случае успешной аутентификации или None
"""
# Определяем, является ли username email-адресом
is_email = '@' in username
# Получаем пользователя
user = None
try:
cursor = self._get_cursor()
if is_email:
cursor.execute("SELECT * FROM users WHERE email = ?", (username,))
else:
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
user = cursor.fetchone()
except Exception as e:
print(f"Ошибка при получении пользователя: {e}")
return None
# Если пользователь не найден, возвращаем None
if user is None:
return None
# Проверяем пароль
hasher = PasswordHasher()
if hasher.verify_password(password, user['password_hash']):
# Обновляем время последнего входа
self.update_last_login(user['id'])
return dict(user)
else:
return None
def update_last_login(self, user_id):
"""
Обновляет время последнего входа пользователя
Args:
user_id (int): ID пользователя
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Обновляем время последнего входа
cursor = self._get_cursor()
cursor.execute(
"UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ?",
(user_id,)
)
# Сохраняем изменения
self._get_connection().commit()
return True
except Exception as e:
print(f"Ошибка при обновлении времени последнего входа: {e}")
return False
def create_session(self, user_id, token, expires_at, ip_address=None, user_agent=None):
"""
Создает новую сессию пользователя
Args:
user_id (int): ID пользователя
token (str): Токен сессии
expires_at (datetime): Время истечения сессии
ip_address (str): IP-адрес пользователя
user_agent (str): User-Agent пользователя
Returns:
int: ID созданной сессии или None в случае ошибки
"""
try:
# Проверяем существование пользователя
user = self.get_user_by_id(user_id)
if user is None:
print(f"Пользователь с ID {user_id} не найден")
return None
# Создаем сессию
cursor = self._get_cursor()
cursor.execute('''
INSERT INTO sessions (user_id, token, expires_at, ip_address, user_agent)
VALUES (?, ?, ?, ?, ?)
''', (user_id, token, expires_at, ip_address, user_agent))
# Сохраняем изменения
self._get_connection().commit()
# Возвращаем ID сессии
return cursor.lastrowid
except Exception as e:
print(f"Ошибка при создании сессии: {e}")
return None
def get_session(self, token):
"""
Получает сессию по токену
Args:
token (str): Токен сессии
Returns:
dict: Данные сессии или None, если сессия не найдена
"""
try:
cursor = self._get_cursor()
cursor.execute('''
SELECT s.*, u.username, u.email, u.role, u.active
FROM sessions s
JOIN users u ON s.user_id = u.id
WHERE s.token = ?
''', (token,))
session = cursor.fetchone()
if session:
return dict(session)
else:
return None
except Exception as e:
print(f"Ошибка при получении сессии: {e}")
return None
def delete_session(self, token):
"""
Удаляет сессию
Args:
token (str): Токен сессии
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
cursor = self._get_cursor()
cursor.execute("DELETE FROM sessions WHERE token = ?", (token,))
# Сохраняем изменения
self._get_connection().commit()
return True
except Exception as e:
print(f"Ошибка при удалении сессии: {e}")
return False
def delete_expired_sessions(self):
"""
Удаляет истекшие сессии
Returns:
int: Количество удаленных сессий
"""
try:
cursor = self._get_cursor()
cursor.execute("DELETE FROM sessions WHERE expires_at < CURRENT_TIMESTAMP")
# Сохраняем изменения
self._get_connection().commit()
# Возвращаем количество удаленных сессий
return cursor.rowcount
except Exception as e:
print(f"Ошибка при удалении истекших сессий: {e}")
return 0
def log_login_attempt(self, username, ip_address=None, success=False):
"""
Записывает попытку входа
Args:
username (str): Имя пользователя
ip_address (str): IP-адрес
success (bool): Успешная попытка или нет
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
cursor = self._get_cursor()
cursor.execute('''
INSERT INTO login_attempts (username, ip_address, success)
VALUES (?, ?, ?)
''', (username, ip_address, success))
# Сохраняем изменения
self._get_connection().commit()
return True
except Exception as e:
print(f"Ошибка при записи попытки входа: {e}")
return False
def get_failed_login_attempts(self, username=None, ip_address=None, time_window=None):
"""
Получает количество неудачных попыток входа
Args:
username (str): Имя пользователя
ip_address (str): IP-адрес
time_window (int): Временное окно в секундах
Returns:
int: Количество неудачных попыток входа
"""
try:
cursor = self._get_cursor()
# Формируем условия запроса
conditions = ["success = 0"]
params = []
if username:
conditions.append("username = ?")
params.append(username)
if ip_address:
conditions.append("ip_address = ?")
params.append(ip_address)
if time_window:
# Определяем временное окно
window_timestamp = datetime.now() - timedelta(seconds=time_window)
conditions.append("timestamp > ?")
params.append(window_timestamp)
# Формируем SQL-запрос
sql = f"SELECT COUNT(*) FROM login_attempts WHERE {' AND '.join(conditions)}"
# Выполняем запрос
cursor.execute(sql, params)
count = cursor.fetchone()[0]
return count
except Exception as e:
print(f"Ошибка при получении количества неудачных попыток входа: {e}")
return 0
def reset_failed_login_attempts(self, username=None, ip_address=None):
"""
Сбрасывает счетчик неудачных попыток входа
Args:
username (str): Имя пользователя
ip_address (str): IP-адрес
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
cursor = self._get_cursor()
# Формируем условия запроса
conditions = ["success = 0"]
params = []
if username:
conditions.append("username = ?")
params.append(username)
if ip_address:
conditions.append("ip_address = ?")
params.append(ip_address)
# Если не указаны ни username, ни ip_address, возвращаем False
if len(conditions) == 1:
return False
# Формируем SQL-запрос
sql = f"DELETE FROM login_attempts WHERE {' AND '.join(conditions)}"
# Выполняем запрос
cursor.execute(sql, params)
# Сохраняем изменения
self._get_connection().commit()
return True
except Exception as e:
print(f"Ошибка при сбросе счетчика неудачных попыток входа: {e}")
return False
def close(self):
"""
Закрывает соединение с базой данных
"""
if self.conn:
self.conn.close()
self.conn = None
self.cursor = None
class AuthManager:
"""
Класс для управления аутентификацией и авторизацией
"""
def __init__(self, db_manager, secret_key, token_expiry=3600, max_failed_attempts=5, lockout_time=300):
"""
Инициализирует менеджер аутентификации
Args:
db_manager (DatabaseManager): Менеджер базы данных
secret_key (str): Секретный ключ для подписи токенов
token_expiry (int): Время жизни токена в секундах
max_failed_attempts (int): Максимальное количество неудачных попыток входа
lockout_time (int): Время блокировки в секундах
"""
self.db_manager = db_manager
self.secret_key = secret_key
self.token_expiry = token_expiry
self.max_failed_attempts = max_failed_attempts
self.lockout_time = lockout_time
def register_user(self, username, email, password, first_name=None, last_name=None):
"""
Регистрирует нового пользователя
Args:
username (str): Имя пользователя
email (str): Email
password (str): Пароль
first_name (str): Имя
last_name (str): Фамилия
Returns:
dict: Данные пользователя или None в случае ошибки
"""
# Проверяем корректность данных
if not self._validate_username(username):
print("Некорректное имя пользователя")
return None
if not self._validate_email(email):
print("Некорректный email")
return None
if not self._validate_password(password):
print("Некорректный пароль")
return None
# Создаем пользователя
user_id = self.db_manager.create_user(
username=username,
email=email,
password=password,
first_name=first_name,
last_name=last_name
)
if user_id:
return self.db_manager.get_user_by_id(user_id)
else:
return None
def authenticate(self, username, password, ip_address=None, user_agent=None):
"""
Аутентифицирует пользователя
Args:
username (str): Имя пользователя или email
password (str): Пароль
ip_address (str): IP-адрес
user_agent (str): User-Agent
Returns:
dict: Данные пользователя и токен или None в случае ошибки
"""
# Проверяем блокировку по количеству неудачных попыток
if self._is_locked_out(username, ip_address):
print("Аккаунт временно заблокирован из-за слишком большого количества неудачных попыток входа")
return None
# Проверяем учетные данные
user = self.db_manager.verify_user(username, password)
# Логируем попытку входа
self.db_manager.log_login_attempt(username, ip_address, user is not None)
if user is None:
return None
# Проверяем, активен ли пользователь
if not user.get('active', True):
print(f"Пользователь '{username}' неактивен")
return None
# Сбрасываем счетчик неудачных попыток входа
self.db_manager.reset_failed_login_attempts(username, ip_address)
# Создаем токен
token = self._generate_token(user)
# Сохраняем сессию
expires_at = datetime.now() + timedelta(seconds=self.token_expiry)
self.db_manager.create_session(user['id'], token, expires_at, ip_address, user_agent)
return {
'user': user,
'token': token,
'expires_at': expires_at
}
def verify_token(self, token):
"""
Проверяет токен
Args:
token (str): Токен
Returns:
dict: Данные сессии или None, если токен недействителен
"""
# Получаем сессию
session = self.db_manager.get_session(token)
if session is None:
return None
# Проверяем срок действия
expires_at = datetime.strptime(session['expires_at'], '%Y-%m-%d %H:%M:%S')
if expires_at < datetime.now():
# Удаляем истекшую сессию
self.db_manager.delete_session(token)
return None
# Проверяем, активен ли пользователь
if not session.get('active', True):
# Удаляем сессию
self.db_manager.delete_session(token)
return None
return session
def logout(self, token):
"""
Выполняет выход пользователя
Args:
token (str): Токен сессии
Returns:
bool: True в случае успеха, False в случае ошибки
"""
return self.db_manager.delete_session(token)
def _validate_username(self, username):
"""
Проверяет корректность имени пользователя
Args:
username (str): Имя пользователя
Returns:
bool: True, если имя пользователя корректно, иначе False
"""
# Проверяем длину
if len(username) < 3 or len(username) > 30:
return False
# Проверяем допустимые символы
if not re.match(r'^[a-zA-Z0-9_.-]+$', username):
return False
return True
def _validate_email(self, email):
"""
Проверяет корректность email
Args:
email (str): Email
Returns:
bool: True, если email корректен, иначе False
"""
# Проверяем корректность email с помощью регулярного выражения
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
return False
return True
def _validate_password(self, password):
"""
Проверяет корректность пароля
Args:
password (str): Пароль
Returns:
bool: True, если пароль корректен, иначе False
"""
# Проверяем длину
if len(password) < 8:
return False
# Проверяем наличие цифр
if not re.search(r'\d', password):
return False
# Проверяем наличие букв в верхнем регистре
if not re.search(r'[A-Z]', password):
return False
# Проверяем наличие букв в нижнем регистре
if not re.search(r'[a-z]', password):
return False
# Проверяем наличие специальных символов
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
return False
return True
def _generate_token(self, user):
"""
Генерирует токен для пользователя
Args:
user (dict): Данные пользователя
Returns:
str: Токен
"""
# Создаем JWT-токен
payload = {
'sub': user['id'],
'username': user['username'],
'email': user['email'],
'role': user['role'],
'iat': datetime.now(),
'exp': datetime.now() + timedelta(seconds=self.token_expiry)
}
return jwt.encode(payload, self.secret_key, algorithm='HS256')
def _is_locked_out(self, username, ip_address):
"""
Проверяет, заблокирован ли пользователь из-за слишком большого количества неудачных попыток входа
Args:
username (str): Имя пользователя
ip_address (str): IP-адрес
Returns:
bool: True, если пользователь заблокирован, иначе False
"""
# Получаем количество неудачных попыток входа
failed_attempts = self.db_manager.get_failed_login_attempts(
username=username,
ip_address=ip_address,
time_window=self.lockout_time
)
return failed_attempts >= self.max_failed_attempts
# Инициализация Flask-приложения
app = Flask(__name__)
app.config.update(
SECRET_KEY=secrets.token_hex(16),
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='Strict',
PERMANENT_SESSION_LIFETIME=3600
)
# Инициализация менеджеров базы данных и аутентификации
db_manager = None
auth_manager = None
# Конфигурация приложения
DB_PATH = 'auth.db'
SECRET_KEY = secrets.token_hex(16)
TOKEN_EXPIRY = 3600
MAX_FAILED_ATTEMPTS = 5
LOCKOUT_TIME = 300
# Страницы для приложения
@app.route('/')
def index():
# Проверяем, аутентифицирован ли пользователь
if 'user_id' in session:
user = db_manager.get_user_by_id(session['user_id'])
return render_template('index.html', user=user)
else:
return render_template('index.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username', '')
password = request.form.get('password', '')
# Получаем IP-адрес и User-Agent
ip_address = request.remote_addr
user_agent = request.user_agent.string
# Аутентифицируем пользователя
result = auth_manager.authenticate(username, password, ip_address, user_agent)
if result:
# Устанавливаем сессию
session['user_id'] = result['user']['id']
session['username'] = result['user']['username']
session['role'] = result['user']['role']
session.permanent = True
# Редирект на главную страницу
return redirect(url_for('index'))
else:
flash('Неверное имя пользователя или пароль', 'error')
return render_template('login.html')
@app.route('/logout')
def logout():
# Удаляем сессию
session.clear()
# Редирект на главную страницу
return redirect(url_for('index'))
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username', '')
email = request.form.get('email', '')
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
first_name = request.form.get('first_name', '')
last_name = request.form.get('last_name', '')
# Проверяем совпадение паролей
if password != confirm_password:
flash('Пароли не совпадают', 'error')
return render_template('register.html')
# Регистрируем пользователя
user = auth_manager.register_user(
username=username,
email=email,
password=password,
first_name=first_name,
last_name=last_name
)
if user:
flash('Регистрация успешно завершена. Теперь вы можете войти.', 'success')
return redirect(url_for('login'))
else:
flash('Ошибка при регистрации. Пожалуйста, проверьте введенные данные.', 'error')
return render_template('register.html')
@app.route('/profile')
def profile():
# Проверяем, аутентифицирован ли пользователь
if 'user_id' not in session:
return redirect(url_for('login'))
# Получаем данные пользователя
user = db_manager.get_user_by_id(session['user_id'])
return render_template('profile.html', user=user)
# API для аутентификации
@app.route('/api/auth/login', methods=['POST'])
def api_login():
data = request.get_json()
if not data or 'username' not in data or 'password' not in data:
return jsonify({'error': 'Неверные параметры запроса'}), 400
# Получаем IP-адрес и User-Agent
ip_address = request.remote_addr
user_agent = request.user_agent.string
# Аутентифицируем пользователя
result = auth_manager.authenticate(data['username'], data['password'], ip_address, user_agent)
if result:
return jsonify({
'token': result['token'],
'expires_at': result['expires_at'].isoformat(),
'user': {
'id': result['user']['id'],
'username': result['user']['username'],
'email': result['user']['email'],
'role': result['user']['role'],
'first_name': result['user']['first_name'],
'last_name': result['user']['last_name']
}
})
else:
return jsonify({'error': 'Неверное имя пользователя или пароль'}), 401
@app.route('/api/auth/register', methods=['POST'])
def api_register():
data = request.get_json()
if not data or 'username' not in data or 'email' not in data or 'password' not in data:
return jsonify({'error': 'Неверные параметры запроса'}), 400
# Регистрируем пользователя
user = auth_manager.register_user(
username=data['username'],
email=data['email'],
password=data['password'],
first_name=data.get('first_name'),
last_name=data.get('last_name')
)
if user:
return jsonify({
'id': user['id'],
'username': user['username'],
'email': user['email'],
'role': user['role'], 'first_name': user['first_name'],
'last_name': user['last_name']
})
else:
return jsonify({'error': 'Ошибка при регистрации пользователя'}), 400
@app.route('/api/auth/verify', methods=['POST'])
def api_verify():
data = request.get_json()
if not data or 'token' not in data:
return jsonify({'error': 'Неверные параметры запроса'}), 400
# Проверяем токен
session = auth_manager.verify_token(data['token'])
if session:
return jsonify({
'valid': True,
'user': {
'id': session['user_id'],
'username': session['username'],
'email': session['email'],
'role': session['role']
}
})
else:
return jsonify({'valid': False}), 401
@app.route('/api/auth/logout', methods=['POST'])
def api_logout():
data = request.get_json()
if not data or 'token' not in data:
return jsonify({'error': 'Неверные параметры запроса'}), 400
# Выполняем выход
result = auth_manager.logout(data['token'])
if result:
return jsonify({'success': True})
else:
return jsonify({'error': 'Ошибка при выходе из системы'}), 400
# Middleware для защиты API
def api_auth_required(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Получаем токен из заголовка Authorization
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Требуется аутентификация'}), 401
# Извлекаем токен
token = auth_header[7:]
# Проверяем токен
session = auth_manager.verify_token(token)
if not session:
return jsonify({'error': 'Неверный или истекший токен'}), 401
# Сохраняем данные пользователя
g.user = {
'id': session['user_id'],
'username': session['username'],
'email': session['email'],
'role': session['role']
}
# Вызываем защищенную функцию
return func(*args, **kwargs)
return wrapper
# Защищенный API-метод
@app.route('/api/protected', methods=['GET'])
@api_auth_required
def api_protected():
return jsonify({
'message': 'Доступ разрешен',
'user': g.user
})
# Инициализация приложения
@app.before_first_request
def init_app():
global db_manager, auth_manager
# Инициализируем менеджер базы данных
db_manager = DatabaseManager(DB_PATH)
# Инициализируем менеджер аутентификации
auth_manager = AuthManager(
db_manager=db_manager,
secret_key=SECRET_KEY,
token_expiry=TOKEN_EXPIRY,
max_failed_attempts=MAX_FAILED_ATTEMPTS,
lockout_time=LOCKOUT_TIME
)
# Удаляем истекшие сессии
db_manager.delete_expired_sessions()
# Закрытие соединения с базой данных
@app.teardown_appcontext
def close_db(error):
if db_manager:
db_manager.close()
# Шаблоны для приложения
@app.template_filter('format_date')
def format_date(value, format='%d.%m.%Y %H:%M'):
if value:
if isinstance(value, str):
# Пытаемся преобразовать строку в дату
try:
value = datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
except ValueError:
return value
return value.strftime(format)
return ''
# Запуск приложения при прямом вызове скрипта
def main():
parser = argparse.ArgumentParser(description='Модуль аутентификации пользователя')
parser.add_argument('--host', default='127.0.0.1', help='Хост для запуска приложения')
parser.add_argument('--port', type=int, default=5000, help='Порт для запуска приложения')
parser.add_argument('--debug', action='store_true', help='Режим отладки')
subparsers = parser.add_subparsers(dest='command', help='Команда')
# Команда createuser - создание пользователя
create_user_parser = subparsers.add_parser('createuser', help='Создание пользователя')
create_user_parser.add_argument('--username', required=True, help='Имя пользователя')
create_user_parser.add_argument('--email', required=True, help='Email')
create_user_parser.add_argument('--password', required=True, help='Пароль')
create_user_parser.add_argument('--first-name', help='Имя')
create_user_parser.add_argument('--last-name', help='Фамилия')
create_user_parser.add_argument('--role', choices=['user', 'admin'], default='user', help='Роль пользователя')
# Команда listusers - вывод списка пользователей
subparsers.add_parser('listusers', help='Вывод списка пользователей')
# Команда run - запуск приложения
run_parser = subparsers.add_parser('run', help='Запуск приложения')
args = parser.parse_args()
# Инициализируем менеджер базы данных
db_manager = DatabaseManager(DB_PATH)
# Обработка команд
if args.command == 'createuser':
# Создаем хеширователь паролей
hasher = PasswordHasher()
# Хешируем пароль
password_hash = hasher.hash_password(args.password)
try:
# Создаем пользователя
cursor = db_manager._get_cursor()
cursor.execute('''
INSERT INTO users (username, email, password_hash, first_name, last_name, role)
VALUES (?, ?, ?, ?, ?, ?)
''', (args.username, args.email, password_hash, args.first_name, args.last_name, args.role))
# Сохраняем изменения
db_manager._get_connection().commit()
print(f"Пользователь '{args.username}' успешно создан")
except sqlite3.IntegrityError:
print(f"Ошибка: пользователь с именем '{args.username}' или email '{args.email}' уже существует")
except Exception as e:
print(f"Ошибка при создании пользователя: {e}")
elif args.command == 'listusers':
try:
# Получаем список пользователей
cursor = db_manager._get_cursor()
cursor.execute("SELECT id, username, email, role, created_at FROM users")
users = cursor.fetchall()
if users:
print("\nСписок пользователей:")
print("ID | Имя пользователя | Email | Роль | Дата создания")
print("-" * 80)
for user in users:
print(f"{user['id']} | {user['username']} | {user['email']} | {user['role']} | {user['created_at']}")
else:
print("Пользователи не найдены")
except Exception as e:
print(f"Ошибка при получении списка пользователей: {e}")
elif args.command == 'run' or args.command is None:
# Запускаем приложение
app.run(host=args.host, port=args.port, debug=args.debug)
# Закрываем соединение с базой данных
db_manager.close()
if __name__ == "__main__":
main()
Использование:
# Запуск приложения
python auth_module.py run --host 0.0.0.0 --port 5000
# Создание пользователя
python auth_module.py createuser --username admin --email admin@example.com --password P@ssw0rd! --role admin
# Вывод списка пользователей
python auth_module.py listusers
HTML шаблоны
Для полноценной работы веб-приложения необходимо создать HTML шаблоны. Вот примеры основных шаблонов:
Базовый шаблон (base.html)
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}Система аутентификации{% endblock %}</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css">
<style>
body {
padding-top: 60px;
padding-bottom: 40px;
}
</style>
</head>
<body>
<nav class="navbar navbar-expand-md navbar-dark bg-dark fixed-top">
<div class="container">
<a class="navbar-brand" href="{{ url_for('index') }}">Система аутентификации</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarNav">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarNav">
<ul class="navbar-nav ms-auto">
{% if session.user_id %}
<li class="nav-item">
<a class="nav-link" href="{{ url_for('profile') }}">Профиль</a>
</li>
<li class="nav-item">
<a class="nav-link" href="{{ url_for('logout') }}">Выход</a>
</li>
{% else %}
<li class="nav-item">
<a class="nav-link" href="{{ url_for('login') }}">Вход</a>
</li>
<li class="nav-item">
<a class="nav-link" href="{{ url_for('register') }}">Регистрация</a>
</li>
{% endif %}
</ul>
</div>
</div>
</nav>
<div class="container">
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
<div class="alert alert-{{ category if category != 'error' else 'danger' }} alert-dismissible fade show" role="alert">
{{ message }}
<button type="button" class="btn-close" data-bs-dismiss="alert" aria-label="Close"></button>
</div>
{% endfor %}
{% endif %}
{% endwith %}
{% block content %}{% endblock %}
</div>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/js/bootstrap.bundle.min.js"></script>
</body>
</html>
Главная страница (index.html)
{% extends "base.html" %}
{% block title %}Главная{% endblock %}
{% block content %}
<div class="jumbotron">
<h1 class="display-4">Добро пожаловать!</h1>
{% if session.user_id %}
<p class="lead">Вы вошли как {{ session.username }}</p>
<hr class="my-4">
<p>Перейдите в <a href="{{ url_for('profile') }}">профиль</a> для просмотра подробной информации.</p>
{% else %}
<p class="lead">Пожалуйста, войдите или зарегистрируйтесь для доступа к защищенным ресурсам.</p>
<hr class="my-4">
<p>
<a href="{{ url_for('login') }}" class="btn btn-primary">Вход</a>
<a href="{{ url_for('register') }}" class="btn btn-secondary">Регистрация</a>
</p>
{% endif %}
</div>
{% endblock %}
Страница входа (login.html)
{% extends "base.html" %}
{% block title %}Вход{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-6">
<div class="card">
<div class="card-header">
<h4 class="mb-0">Вход в систему</h4>
</div>
<div class="card-body">
<form method="post">
<div class="mb-3">
<label for="username" class="form-label">Имя пользователя или Email</label>
<input type="text" class="form-control" id="username" name="username" required>
</div>
<div class="mb-3">
<label for="password" class="form-label">Пароль</label>
<input type="password" class="form-control" id="password" name="password" required>
</div>
<button type="submit" class="btn btn-primary">Войти</button>
</form>
</div>
<div class="card-footer">
<p class="mb-0">Нет аккаунта? <a href="{{ url_for('register') }}">Зарегистрируйтесь</a></p>
</div>
</div>
</div>
</div>
{% endblock %}
Страница регистрации (register.html)
{% extends "base.html" %}
{% block title %}Регистрация{% endblock %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-6">
<div class="card">
<div class="card-header">
<h4 class="mb-0">Регистрация</h4>
</div>
<div class="card-body">
<form method="post">
<div class="mb-3">
<label for="username" class="form-label">Имя пользователя</label>
<input type="text" class="form-control" id="username" name="username" required>
<small class="form-text text-muted">От 3 до 30 символов, только буквы, цифры, символы _ . -</small>
</div>
<div class="mb-3">
<label for="email" class="form-label">Email</label>
<input type="email" class="form-control" id="email" name="email" required>
</div>
<div class="mb-3">
<label for="first_name" class="form-label">Имя</label>
<input type="text" class="form-control" id="first_name" name="first_name">
</div>
<div class="mb-3">
<label for="last_name" class="form-label">Фамилия</label>
<input type="text" class="form-control" id="last_name" name="last_name">
</div>
<div class="mb-3">
<label for="password" class="form-label">Пароль</label>
<input type="password" class="form-control" id="password" name="password" required>
<small class="form-text text-muted">Минимум 8 символов, должен содержать заглавные и строчные буквы, цифры и спецсимволы</small>
</div>
<div class="mb-3">
<label for="confirm_password" class="form-label">Подтверждение пароля</label>
<input type="password" class="form-control" id="confirm_password" name="confirm_password" required>
</div>
<button type="submit" class="btn btn-primary">Зарегистрироваться</button>
</form>
</div>
<div class="card-footer">
<p class="mb-0">Уже есть аккаунт? <a href="{{ url_for('login') }}">Войдите</a></p>
</div>
</div>
</div>
</div>
{% endblock %}
Страница профиля (profile.html)
{% extends "base.html" %}
{% block title %}Профиль{% endblock %}
{% block content %}
<div class="card">
<div class="card-header">
<h4 class="mb-0">Профиль пользователя</h4>
</div>
<div class="card-body">
<div class="row">
<div class="col-md-6">
<h5>Информация о пользователе</h5>
<table class="table">
<tr>
<th>Имя пользователя:</th>
<td>{{ user.username }}</td>
</tr>
<tr>
<th>Email:</th>
<td>{{ user.email }}</td>
</tr>
<tr>
<th>Полное имя:</th>
<td>{{ user.first_name or '' }} {{ user.last_name or '' }}</td>
</tr>
<tr>
<th>Роль:</th>
<td>{{ user.role }}</td>
</tr>
<tr>
<th>Дата регистрации:</th>
<td>{{ user.created_at|format_date }}</td>
</tr>
<tr>
<th>Последний вход:</th>
<td>{{ user.last_login|format_date or 'Нет данных' }}</td>
</tr>
</table>
</div>
</div>
</div>
</div>
{% endblock %}
37. Модуль для обработки изображений
Этот модуль аналогичен модулю из задачи №31, но с расширенной функциональностью. Для экономии места я не буду повторять уже представленное решение, а вместо этого предложу просто добавить дополнительные функции к существующему модулю:
def rotate_image_exact(self, angle, expand=True, center=None, translate=None, fillcolor=None):
"""
Поворачивает изображение на точный угол
Args:
angle (float): Угол поворота в градусах
expand (bool): Расширять холст при повороте
center (tuple): Центр поворота (x, y)
translate (tuple): Смещение (x, y) после поворота
fillcolor (tuple): Цвет фона для заполнения новых областей
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# ...
def apply_watermark(self, watermark_image_path, position='center', opacity=0.5):
"""
Добавляет водяной знак на изображение
Args:
watermark_image_path (str): Путь к изображению водяного знака
position (str): Позиция ('center', 'top-left', 'top-right', 'bottom-left', 'bottom-right')
opacity (float): Прозрачность водяного знака (0.0 - 1.0)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# ...
def create_thumbnail(self, size, keep_aspect_ratio=True):
"""
Создаёт миниатюру изображения
Args:
size (tuple): Размер миниатюры (ширина, высота)
keep_aspect_ratio (bool): Сохранять пропорции
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# ...
def add_frame(self, width, color):
"""
Добавляет рамку к изображению
Args:
width (int): Ширина рамки в пикселях
color (tuple or str): Цвет рамки (RGB-кортеж или название цвета)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# ...
def convert_format(self, format):
"""
Конвертирует изображение в другой формат
Args:
format (str): Целевой формат ('JPEG', 'PNG', 'GIF', 'BMP', 'TIFF')
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# ...
def add_text(self, text, position, font_path=None, font_size=20, color='black', background=None):
"""
Добавляет текст на изображение
Args:
text (str): Текст для добавления
position (tuple): Позиция (x, y)
font_path (str): Путь к файлу шрифта
font_size (int): Размер шрифта
color (tuple or str): Цвет текста
background (tuple or str): Цвет фона
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# ...
def create_collage(self, images, rows, cols, spacing=0, background='white'):
"""
Создаёт коллаж из нескольких изображений
Args:
images (list): Список путей к изображениям
rows (int): Количество строк
cols (int): Количество столбцов
spacing (int): Расстояние между изображениями
background (tuple or str): Цвет фона
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# ...
def create_gif(self, image_paths, output_path, duration=100):
"""
Создаёт анимированный GIF из набора изображений
Args:
image_paths (list): Список путей к изображениям
output_path (str): Путь для сохранения GIF
duration (int): Длительность кадра в миллисекундах
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# ...
def split_image(self, rows, cols, output_dir):
"""
Разделяет изображение на части
Args:
rows (int): Количество строк
cols (int): Количество столбцов
output_dir (str): Директория для сохранения частей
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# ...
38. Модуль для генерации отчетов
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import json
import csv
import sqlite3
import datetime
import argparse
import tempfile
import matplotlib.pyplot as plt
import pandas as pd
from fpdf import FPDF
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill
from openpyxl.utils import get_column_letter
from openpyxl.drawing.image import Image as XLImage
from jinja2 import Environment, FileSystemLoader
class ReportGenerator:
"""
Класс для генерации отчетов на основе данных из различных источников
"""
def __init__(self, data_source=None, template_dir=None):
"""
Инициализирует генератор отчетов
Args:
data_source: Источник данных (путь к файлу или соединение с БД)
template_dir (str): Директория с шаблонами для отчетов
"""
self.data_source = data_source
self.template_dir = template_dir or os.path.join(os.path.dirname(__file__), 'templates')
# Загружаем шаблоны, если директория существует
if os.path.isdir(self.template_dir):
self.jinja_env = Environment(loader=FileSystemLoader(self.template_dir))
else:
self.jinja_env = None
# Данные для отчета
self.data = None
self.metadata = {
'title': 'Отчет',
'author': 'Report Generator',
'date': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'description': 'Автоматически сгенерированный отчет'
}
def set_metadata(self, title=None, author=None, description=None):
"""
Устанавливает метаданные отчета
Args:
title (str): Название отчета
author (str): Автор отчета
description (str): Описание отчета
"""
if title:
self.metadata['title'] = title
if author:
self.metadata['author'] = author
if description:
self.metadata['description'] = description
def load_data_from_json(self, json_file=None):
"""
Загружает данные из JSON-файла
Args:
json_file (str): Путь к JSON-файлу (если None, используется data_source)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
file_path = json_file or self.data_source
if not file_path:
print("Ошибка: не указан путь к JSON-файлу")
return False
try:
with open(file_path, 'r', encoding='utf-8') as f:
self.data = json.load(f)
return True
except Exception as e:
print(f"Ошибка при загрузке данных из JSON: {e}")
return False
def load_data_from_csv(self, csv_file=None, delimiter=',', encoding='utf-8'):
"""
Загружает данные из CSV-файла
Args:
csv_file (str): Путь к CSV-файлу (если None, используется data_source)
delimiter (str): Разделитель полей
encoding (str): Кодировка файла
Returns:
bool: True в случае успеха, False в случае ошибки
"""
file_path = csv_file or self.data_source
if not file_path:
print("Ошибка: не указан путь к CSV-файлу")
return False
try:
# Читаем CSV в DataFrame
df = pd.read_csv(file_path, delimiter=delimiter, encoding=encoding)
# Преобразуем DataFrame в список словарей
self.data = df.to_dict('records')
return True
except Exception as e:
print(f"Ошибка при загрузке данных из CSV: {e}")
return False
def load_data_from_database(self, query, params=None, db_connection=None):
"""
Загружает данные из базы данных
Args:
query (str): SQL-запрос
params (tuple): Параметры запроса
db_connection: Соединение с БД (если None, используется data_source)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
conn = db_connection or self.data_source
if not conn:
print("Ошибка: не указано соединение с базой данных")
return False
try:
# Преобразуем данные в список словарей
if isinstance(conn, str):
# Если data_source - строка, считаем её путем к файлу SQLite
with sqlite3.connect(conn) as db:
db.row_factory = sqlite3.Row
cursor = db.cursor()
cursor.execute(query, params or ())
self.data = [dict(row) for row in cursor.fetchall()]
else:
# Иначе считаем, что это уже соединение с БД
cursor = conn.cursor()
cursor.execute(query, params or ())
# Получаем имена столбцов
columns = [description[0] for description in cursor.description]
# Преобразуем строки в словари
self.data = [dict(zip(columns, row)) for row in cursor.fetchall()]
return True
except Exception as e:
print(f"Ошибка при загрузке данных из базы данных: {e}")
return False
def load_data_from_dataframe(self, df):
"""
Загружает данные из pandas DataFrame
Args:
df (pandas.DataFrame): DataFrame с данными
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
self.data = df.to_dict('records')
return True
except try:
self.data = df.to_dict('records')
return True
except Exception as e:
print(f"Ошибка при загрузке данных из DataFrame: {e}")
return False
def set_data(self, data):
"""
Устанавливает данные для отчета напрямую
Args:
data: Данные для отчета (список словарей, словарь или pandas DataFrame)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
if isinstance(data, pd.DataFrame):
self.data = data.to_dict('records')
elif isinstance(data, list):
self.data = data
elif isinstance(data, dict):
self.data = [data]
else:
print(f"Ошибка: неподдерживаемый тип данных {type(data)}")
return False
return True
except Exception as e:
print(f"Ошибка при установке данных: {e}")
return False
def generate_text_report(self, output_file, template=None):
"""
Генерирует текстовый отчет
Args:
output_file (str): Путь к выходному файлу
template (str): Имя шаблона (если None, используется базовый текстовый формат)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.data is None:
print("Ошибка: данные не загружены")
return False
try:
# Создаем директорию для выходного файла, если она не существует
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True)
# Используем шаблон, если он указан и доступен
if template and self.jinja_env:
template_obj = self.jinja_env.get_template(f"{template}.txt")
output = template_obj.render(
data=self.data,
metadata=self.metadata,
now=datetime.datetime.now()
)
else:
# Базовый текстовый формат
output = f"{self.metadata['title']}\n"
output += f"Автор: {self.metadata['author']}\n"
output += f"Дата: {self.metadata['date']}\n"
if self.metadata['description']:
output += f"\n{self.metadata['description']}\n"
output += "\n" + "=" * 80 + "\n\n"
# Выводим данные
if isinstance(self.data, list) and self.data:
# Если данные - список словарей, форматируем как таблицу
if isinstance(self.data[0], dict):
# Определяем заголовки из первого элемента
headers = list(self.data[0].keys())
# Определяем ширину столбцов
col_widths = [len(h) for h in headers]
for row in self.data:
for i, header in enumerate(headers):
if header in row:
col_widths[i] = max(col_widths[i], len(str(row[header])))
# Выводим заголовки
header_line = ""
for i, header in enumerate(headers):
header_line += f"{header:{col_widths[i]}} | "
output += header_line.rstrip(" | ") + "\n"
# Выводим разделитель
separator = ""
for width in col_widths:
separator += "-" * width + "-+-"
output += separator.rstrip("-+-") + "\n"
# Выводим данные
for row in self.data:
row_line = ""
for i, header in enumerate(headers):
value = row.get(header, "")
row_line += f"{str(value):{col_widths[i]}} | "
output += row_line.rstrip(" | ") + "\n"
else:
# Если данные - просто список, выводим каждый элемент
for i, item in enumerate(self.data, 1):
output += f"{i}. {item}\n"
elif isinstance(self.data, dict):
# Если данные - словарь, выводим пары ключ-значение
for key, value in self.data.items():
output += f"{key}: {value}\n"
else:
output += str(self.data)
# Записываем результат в файл
with open(output_file, 'w', encoding='utf-8') as f:
f.write(output)
print(f"Текстовый отчет сохранен в {output_file}")
return True
except Exception as e:
print(f"Ошибка при генерации текстового отчета: {e}")
return False
def generate_html_report(self, output_file, template=None, css=None):
"""
Генерирует HTML-отчет
Args:
output_file (str): Путь к выходному файлу
template (str): Имя шаблона (если None, используется базовый HTML-формат)
css (str): Путь к CSS-файлу
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.data is None:
print("Ошибка: данные не загружены")
return False
try:
# Создаем директорию для выходного файла, если она не существует
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True)
# Используем шаблон, если он указан и доступен
if template and self.jinja_env:
template_obj = self.jinja_env.get_template(f"{template}.html")
output = template_obj.render(
data=self.data,
metadata=self.metadata,
now=datetime.datetime.now()
)
else:
# Базовый HTML-формат
css_link = f'<link rel="stylesheet" href="{css}">' if css else ''
output = f"""<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{self.metadata['title']}</title>
{css_link}
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
tr:nth-child(even) {{ background-color: #f9f9f9; }}
.header {{ margin-bottom: 20px; }}
.footer {{ margin-top: 20px; font-size: 0.8em; color: #666; }}
</style>
</head>
<body>
<div class="header">
<h1>{self.metadata['title']}</h1>
<p><strong>Автор:</strong> {self.metadata['author']}</p>
<p><strong>Дата:</strong> {self.metadata['date']}</p>
"""
if self.metadata['description']:
output += f"<p>{self.metadata['description']}</p>\n"
output += "</div>\n<div class='content'>\n"
# Выводим данные
if isinstance(self.data, list) and self.data:
# Если данные - список словарей, форматируем как таблицу
if isinstance(self.data[0], dict):
# Определяем заголовки из первого элемента
headers = list(self.data[0].keys())
output += "<table>\n<thead>\n<tr>\n"
for header in headers:
output += f"<th>{header}</th>\n"
output += "</tr>\n</thead>\n<tbody>\n"
for row in self.data:
output += "<tr>\n"
for header in headers:
value = row.get(header, "")
output += f"<td>{value}</td>\n"
output += "</tr>\n"
output += "</tbody>\n</table>\n"
else:
# Если данные - просто список, выводим как маркированный список
output += "<ul>\n"
for item in self.data:
output += f"<li>{item}</li>\n"
output += "</ul>\n"
elif isinstance(self.data, dict):
# Если данные - словарь, выводим как определения
output += "<dl>\n"
for key, value in self.data.items():
output += f"<dt>{key}</dt>\n<dd>{value}</dd>\n"
output += "</dl>\n"
else:
output += f"<p>{self.data}</p>\n"
output += """
</div>
<div class="footer">
<p>Отчет сгенерирован автоматически {}</p>
</div>
</body>
</html>
""".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# Записываем результат в файл
with open(output_file, 'w', encoding='utf-8') as f:
f.write(output)
print(f"HTML-отчет сохранен в {output_file}")
return True
except Exception as e:
print(f"Ошибка при генерации HTML-отчета: {e}")
return False
def generate_pdf_report(self, output_file, template=None):
"""
Генерирует PDF-отчет
Args:
output_file (str): Путь к выходному файлу
template (str): Имя шаблона (если None, используется базовый PDF-формат)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.data is None:
print("Ошибка: данные не загружены")
return False
try:
# Создаем директорию для выходного файла, если она не существует
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True)
# Инициализируем PDF
pdf = FPDF()
pdf.add_page()
pdf.set_auto_page_break(auto=True, margin=15)
# Шрифты
pdf.add_font('DejaVu', '', 'DejaVuSansCondensed.ttf', uni=True)
pdf.add_font('DejaVu', 'B', 'DejaVuSansCondensed-Bold.ttf', uni=True)
# Используем шаблон, если он указан и доступен
if template and self.jinja_env:
# Создаем временный HTML-файл
with tempfile.NamedTemporaryFile(suffix='.html', delete=False) as tmp_file:
tmp_path = tmp_file.name
# Генерируем HTML по шаблону
template_obj = self.jinja_env.get_template(f"{template}.html")
html_content = template_obj.render(
data=self.data,
metadata=self.metadata,
now=datetime.datetime.now()
)
# Записываем HTML во временный файл
with open(tmp_path, 'w', encoding='utf-8') as f:
f.write(html_content)
# Конвертируем HTML в PDF
try:
from weasyprint import HTML
HTML(tmp_path).write_pdf(output_file)
except ImportError:
# Если weasyprint не установлен, используем базовый формат
os.unlink(tmp_path)
return self._generate_basic_pdf(pdf, output_file)
# Удаляем временный файл
os.unlink(tmp_path)
else:
# Используем базовый формат PDF
return self._generate_basic_pdf(pdf, output_file)
print(f"PDF-отчет сохранен в {output_file}")
return True
except Exception as e:
print(f"Ошибка при генерации PDF-отчета: {e}")
return False
def _generate_basic_pdf(self, pdf, output_file):
"""
Генерирует PDF-отчет в базовом формате
Args:
pdf (FPDF): Объект PDF
output_file (str): Путь к выходному файлу
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Заголовок отчета
pdf.set_font('DejaVu', 'B', 16)
pdf.cell(0, 10, self.metadata['title'], 0, 1, 'C')
pdf.set_font('DejaVu', '', 10)
pdf.cell(0, 10, f"Автор: {self.metadata['author']}", 0, 1)
pdf.cell(0, 10, f"Дата: {self.metadata['date']}", 0, 1)
if self.metadata['description']:
pdf.ln(5)
pdf.multi_cell(0, 10, self.metadata['description'])
pdf.ln(10)
pdf.line(10, pdf.get_y(), 200, pdf.get_y())
pdf.ln(10)
# Выводим данные
if isinstance(self.data, list) and self.data:
# Если данные - список словарей, форматируем как таблицу
if isinstance(self.data[0], dict):
# Определяем заголовки из первого элемента
headers = list(self.data[0].keys())
# Определяем ширину столбцов
col_width = 180 / len(headers)
# Выводим заголовки
pdf.set_font('DejaVu', 'B', 10)
for header in headers:
pdf.cell(col_width, 10, str(header), 1, 0, 'C')
pdf.ln()
# Выводим данные
pdf.set_font('DejaVu', '', 10)
for row in self.data:
for header in headers:
value = row.get(header, "")
pdf.cell(col_width, 10, str(value), 1, 0, 'L')
pdf.ln()
else:
# Если данные - просто список, выводим каждый элемент
for i, item in enumerate(self.data, 1):
pdf.multi_cell(0, 10, f"{i}. {item}")
pdf.ln(5)
elif isinstance(self.data, dict):
# Если данные - словарь, выводим пары ключ-значение
for key, value in self.data.items():
pdf.set_font('DejaVu', 'B', 10)
pdf.cell(40, 10, str(key) + ":", 0, 0)
pdf.set_font('DejaVu', '', 10)
pdf.multi_cell(0, 10, str(value))
pdf.ln(5)
else:
pdf.multi_cell(0, 10, str(self.data))
# Подвал
pdf.ln(10)
pdf.set_font('DejaVu', '', 8)
pdf.set_text_color(128, 128, 128)
pdf.cell(0, 10, f"Отчет сгенерирован автоматически {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", 0, 0, 'C')
# Сохраняем PDF
pdf.output(output_file)
print(f"PDF-отчет сохранен в {output_file}")
return True
except Exception as e:
print(f"Ошибка при генерации базового PDF-отчета: {e}")
return False
def generate_excel_report(self, output_file, template=None):
"""
Генерирует Excel-отчет
Args:
output_file (str): Путь к выходному файлу
template (str): Имя шаблона (если None, используется базовый Excel-формат)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.data is None:
print("Ошибка: данные не загружены")
return False
try:
# Создаем директорию для выходного файла, если она не существует
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True)
# Создаем новую книгу Excel
wb = Workbook()
ws = wb.active
ws.title = "Отчет"
# Устанавливаем заголовок отчета
ws["A1"] = self.metadata['title']
ws["A1"].font = Font(bold=True, size=14)
ws.merge_cells("A1:E1")
ws["A2"] = f"Автор: {self.metadata['author']}"
ws["A3"] = f"Дата: {self.metadata['date']}"
if self.metadata['description']:
ws["A4"] = self.metadata['description']
ws.merge_cells("A4:E4")
current_row = 5
else:
current_row = 4
# Добавляем пустую строку
current_row += 1
# Выводим данные
if isinstance(self.data, list) and self.data:
# Если данные - список словарей, форматируем как таблицу
if isinstance(self.data[0], dict):
# Определяем заголовки из первого элемента
headers = list(self.data[0].keys())
# Выводим заголовки
for col_idx, header in enumerate(headers, 1):
cell = ws.cell(row=current_row, column=col_idx, value=header)
cell.font = Font(bold=True)
cell.alignment = Alignment(horizontal='center')
cell.fill = PatternFill(start_color="E0E0E0", end_color="E0E0E0", fill_type="solid")
current_row += 1
# Выводим данные
for row in self.data:
for col_idx, header in enumerate(headers, 1):
value = row.get(header, "")
ws.cell(row=current_row, column=col_idx, value=value)
current_row += 1
# Автонастройка ширины столбцов
for col_idx, header in enumerate(headers, 1):
column_letter = get_column_letter(col_idx)
ws.column_dimensions[column_letter].auto_size = True
else:
# Если данные - просто список, выводим каждый элемент
for i, item in enumerate(self.data, 1):
ws.cell(row=current_row, column=1, value=f"{i}.")
ws.cell(row=current_row, column=2, value=str(item))
current_row += 1
elif isinstance(self.data, dict):
# Если данные - словарь, выводим пары ключ-значение
for key, value in self.data.items():
cell = ws.cell(row=current_row, column=1, value=str(key))
cell.font = Font(bold=True)
ws.cell(row=current_row, column=2, value=str(value))
current_row += 1
else:
ws.cell(row=current_row, column=1, value=str(self.data))
# Добавляем подвал
current_row += 2
footer = ws.cell(row=current_row, column=1, value=f"Отчет сгенерирован автоматически {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
footer.font = Font(size=8, color="808080")
ws.merge_cells(f"A{current_row}:E{current_row}")
# Сохраняем файл
wb.save(output_file)
print(f"Excel-отчет сохранен в {output_file}")
return True
except Exception as e:
print(f"Ошибка при генерации Excel-отчета: {e}")
return False
def generate_chart(self, chart_type, x_column, y_column, title=None, xlabel=None, ylabel=None):
"""
Генерирует график на основе данных
Args:
chart_type (str): Тип графика ('line', 'bar', 'pie', 'scatter')
x_column (str): Столбец для оси X
y_column (str): Столбец для оси Y
title (str): Заголовок графика
xlabel (str): Подпись оси X
ylabel (str): Подпись оси Y
Returns:
matplotlib.figure.Figure: Объект графика или None в случае ошибки
"""
if self.data is None:
print("Ошибка: данные не загружены")
return None
try:
# Преобразуем данные в DataFrame
df = pd.DataFrame(self.data)
# Проверяем наличие столбцов
if x_column not in df.columns:
print(f"Ошибка: столбец '{x_column}' не найден в данных")
return None
if y_column not in df.columns:
print(f"Ошибка: столбец '{y_column}' не найден в данных")
return None
# Создаем график
plt.figure(figsize=(10, 6))
if chart_type == 'line':
plt.plot(df[x_column], df[y_column], marker='o')
elif chart_type == 'bar':
plt.bar(df[x_column], df[y_column])
elif chart_type == 'pie':
plt.pie(df[y_column], labels=df[x_column], autopct='%1.1f%%')
elif chart_type == 'scatter':
plt.scatter(df[x_column], df[y_column])
else:
print(f"Ошибка: неподдерживаемый тип графика '{chart_type}'")
return None
# Настраиваем график
plt.title(title or f"График {y_column} от {x_column}")
if chart_type != 'pie':
plt.xlabel(xlabel or x_column)
plt.ylabel(ylabel or y_column)
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
return plt.gcf()
except Exception as e:
print(f"Ошибка при генерации графика: {e}")
return None
def save_chart(self, chart, output_file, dpi=100):
"""
Сохраняет график в файл
Args:
chart: Объект графика
output_file (str): Путь к выходному файлу
dpi (int): Разрешение изображения
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Создаем директорию для выходного файла, если она не существует
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True)
# Сохраняем график
chart.savefig(output_file, dpi=dpi, bbox_inches='tight')
print(f"График сохранен в {output_file}")
return True
except Exception as e:
print(f"Ошибка при сохранении графика: {e}")
return False
def add_chart_to_excel(self, excel_file, sheet_name, chart, cell_position="A10"):
"""
Добавляет график на лист Excel
Args:
excel_file (str): Путь к файлу Excel
sheet_name (str): Имя листа
chart: Объект графика
cell_position (str): Позиция ячейки
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Сохраняем график во временный файл
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file:
chart_path = tmp_file.name
chart.savefig(chart_path, dpi=100, bbox_inches='tight')
# Открываем файл Excel
from openpyxl import load_workbook
wb = load_workbook(excel_file)
# Получаем лист
if sheet_name in wb.sheetnames:
ws = wb[sheet_name]
else:
ws = wb.create_sheet(sheet_name)
# Добавляем изображение
img = XLImage(chart_path)
ws.add_image(img, cell_position)
# Сохраняем файл
wb.save(excel_file)
# Удаляем временный файл
os.unlink(chart_path)
print(f"График добавлен в Excel-файл {excel_file}")
return True
except Exception as e:
print(f"Ошибка при добавлении графика в Excel: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Генератор отчетов')
parser.add_argument('--data', required=True, help='Путь к файлу с данными (JSON, CSV) или запрос к базе данных')
parser.add_argument('--type', required=True, choices=['json', 'csv', 'db'], help='Тип источника данных')
parser.add_argument('--output', required=True, help='Путь к выходному файлу')
parser.add_argument('--format', required=True, choices=['text', 'html', 'pdf', 'excel'], help='Формат отчета')
parser.add_argument('--title', help='Заголовок отчета')
parser.add_argument('--author', help='Автор отчета')
parser.add_argument('--description', help='Описание отчета')
parser.add_argument('--template', help='Имя шаблона отчета')
parser.add_argument('--chart', choices=['line', 'bar', 'pie', 'scatter'], help='Тип графика')
parser.add_argument('--x-column', help='Столбец для оси X графика')
parser.add_argument('--y-column', help='Столбец для оси Y графика')
parser.add_argument('--chart-title', help='Заголовок графика')
parser.add_argument('--chart-output', help='Путь для сохранения графика')
args = parser.parse_args()
# Создаем генератор отчетов
generator = ReportGenerator()
# Устанавливаем метаданные
if args.title or args.author or args.description:
generator.set_metadata(args.title, args.author, args.description)
# Загружаем данные
if args.type == 'json':
if not generator.load_data_from_json(args.data):
return 1
elif args.type == 'csv':
if not generator.load_data_from_csv(args.data):
return 1
elif args.type == 'db':
if args.data.endswith('.db') or args.data.endswith('.sqlite'):
# Предполагаем, что это файл SQLite и выполняем простой запрос
if not generator.load_data_from_database("SELECT * FROM main_table", db_connection=args.data):
return 1
else:
print("Ошибка: для работы с базой данных укажите путь к файлу SQLite")
return 1
# Генерируем график, если требуется
if args.chart and args.x_column and args.y_column:
chart = generator.generate_chart(
args.chart,
args.x_column,
args.y_column,
args.chart_title
)
if chart is None:
return 1
# Сохраняем график, если указан путь if args.chart_output:
if not generator.save_chart(chart, args.chart_output):
return 1
# Генерируем отчет
if args.format == 'text':
if not generator.generate_text_report(args.output, args.template):
return 1
elif args.format == 'html':
if not generator.generate_html_report(args.output, args.template):
return 1
elif args.format == 'pdf':
if not generator.generate_pdf_report(args.output, args.template):
return 1
elif args.format == 'excel':
if not generator.generate_excel_report(args.output, args.template):
return 1
# Добавляем график в Excel, если он был создан и не был сохранен отдельно
if args.chart and args.x_column and args.y_column and not args.chart_output:
generator.add_chart_to_excel(args.output, "График", chart)
return 0
if __name__ == "__main__":
sys.exit(main())
Использование:
# Генерация текстового отчета из JSON-файла
python report_generator.py --data data.json --type json --output report.txt --format text --title "Отчет по продажам" --author "Аналитик"
# Генерация HTML-отчета из CSV-файла с графиком
python report_generator.py --data sales.csv --type csv --output report.html --format html --title "Отчет по продажам" --chart line --x-column "Дата" --y-column "Продажи" --chart-output sales_chart.png
# Генерация Excel-отчета из базы данных
python report_generator.py --data database.sqlite --type db --output report.xlsx --format excel --title "Статистика клиентов"
39. Модуль для манипуляции файлами и каталогами
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import shutil
import argparse
import zipfile
import tarfile
import tempfile
import hashlib
import time
import fnmatch
import re
from pathlib import Path
from datetime import datetime
class FileManager:
"""
Класс для манипуляций с файлами и каталогами
"""
def __init__(self):
"""
Инициализирует менеджер файлов
"""
pass
def read_file(self, file_path, encoding='utf-8', binary=False):
"""
Читает содержимое файла
Args:
file_path (str): Путь к файлу
encoding (str): Кодировка файла (для текстовых файлов)
binary (bool): Читать файл в бинарном режиме
Returns:
str or bytes: Содержимое файла или None в случае ошибки
"""
try:
mode = 'rb' if binary else 'r'
kwargs = {} if binary else {'encoding': encoding}
with open(file_path, mode, **kwargs) as f:
return f.read()
except Exception as e:
print(f"Ошибка при чтении файла '{file_path}': {e}")
return None
def write_file(self, file_path, content, encoding='utf-8', binary=False, append=False):
"""
Записывает содержимое в файл
Args:
file_path (str): Путь к файлу
content (str or bytes): Содержимое для записи
encoding (str): Кодировка файла (для текстовых файлов)
binary (bool): Записать файл в бинарном режиме
append (bool): Добавить содержимое в конец файла
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Создаем директорию для файла, если она не существует
directory = os.path.dirname(os.path.abspath(file_path))
os.makedirs(directory, exist_ok=True)
mode = 'ab' if binary and append else 'wb' if binary else 'a' if append else 'w'
kwargs = {} if binary else {'encoding': encoding}
with open(file_path, mode, **kwargs) as f:
f.write(content)
return True
except Exception as e:
print(f"Ошибка при записи в файл '{file_path}': {e}")
return False
def create_directory(self, directory_path, mode=0o755, exist_ok=True):
"""
Создает директорию
Args:
directory_path (str): Путь к директории
mode (int): Права доступа
exist_ok (bool): Не выбрасывать исключение, если директория уже существует
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
os.makedirs(directory_path, mode=mode, exist_ok=exist_ok)
return True
except Exception as e:
print(f"Ошибка при создании директории '{directory_path}': {e}")
return False
def remove_directory(self, directory_path, recursive=False):
"""
Удаляет директорию
Args:
directory_path (str): Путь к директории
recursive (bool): Рекурсивное удаление (включая все содержимое)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
if recursive:
shutil.rmtree(directory_path)
else:
os.rmdir(directory_path)
return True
except Exception as e:
print(f"Ошибка при удалении директории '{directory_path}': {e}")
return False
def remove_file(self, file_path):
"""
Удаляет файл
Args:
file_path (str): Путь к файлу
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
os.remove(file_path)
return True
except Exception as e:
print(f"Ошибка при удалении файла '{file_path}': {e}")
return False
def copy_file(self, source_path, destination_path, overwrite=False):
"""
Копирует файл
Args:
source_path (str): Путь к исходному файлу
destination_path (str): Путь к целевому файлу
overwrite (bool): Перезаписать существующий файл
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Проверяем существование целевого файла
if os.path.exists(destination_path) and not overwrite:
print(f"Файл '{destination_path}' уже существует. Используйте overwrite=True для перезаписи.")
return False
# Создаем директорию для целевого файла, если она не существует
directory = os.path.dirname(os.path.abspath(destination_path))
os.makedirs(directory, exist_ok=True)
# Копируем файл
shutil.copy2(source_path, destination_path)
return True
except Exception as e:
print(f"Ошибка при копировании файла '{source_path}' в '{destination_path}': {e}")
return False
def move_file(self, source_path, destination_path, overwrite=False):
"""
Перемещает файл
Args:
source_path (str): Путь к исходному файлу
destination_path (str): Путь к целевому файлу
overwrite (bool): Перезаписать существующий файл
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Проверяем существование целевого файла
if os.path.exists(destination_path) and not overwrite:
print(f"Файл '{destination_path}' уже существует. Используйте overwrite=True для перезаписи.")
return False
# Создаем директорию для целевого файла, если она не существует
directory = os.path.dirname(os.path.abspath(destination_path))
os.makedirs(directory, exist_ok=True)
# Перемещаем файл
shutil.move(source_path, destination_path)
return True
except Exception as e:
print(f"Ошибка при перемещении файла '{source_path}' в '{destination_path}': {e}")
return False
def rename_file(self, file_path, new_name):
"""
Переименовывает файл
Args:
file_path (str): Путь к файлу
new_name (str): Новое имя файла (без пути)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
directory = os.path.dirname(os.path.abspath(file_path))
new_path = os.path.join(directory, new_name)
# Проверяем существование целевого файла
if os.path.exists(new_path):
print(f"Файл '{new_path}' уже существует.")
return False
# Переименовываем файл
os.rename(file_path, new_path)
return True
except Exception as e:
print(f"Ошибка при переименовании файла '{file_path}' в '{new_name}': {e}")
return False
def copy_directory(self, source_dir, destination_dir, overwrite=False):
"""
Копирует директорию
Args:
source_dir (str): Путь к исходной директории
destination_dir (str): Путь к целевой директории
overwrite (bool): Перезаписать существующую директорию
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Проверяем существование целевой директории
if os.path.exists(destination_dir) and not overwrite:
print(f"Директория '{destination_dir}' уже существует. Используйте overwrite=True для перезаписи.")
return False
# Удаляем существующую директорию, если overwrite=True
if os.path.exists(destination_dir) and overwrite:
shutil.rmtree(destination_dir)
# Копируем директорию
shutil.copytree(source_dir, destination_dir)
return True
except Exception as e:
print(f"Ошибка при копировании директории '{source_dir}' в '{destination_dir}': {e}")
return False
def list_directory(self, directory_path, pattern=None, recursive=False, include_hidden=False):
"""
Выводит список файлов и директорий
Args:
directory_path (str): Путь к директории
pattern (str): Шаблон для фильтрации файлов
recursive (bool): Рекурсивный вывод
include_hidden (bool): Включать скрытые файлы и директории
Returns:
list: Список путей к файлам и директориям или None в случае ошибки
"""
try:
result = []
if recursive:
# Рекурсивный обход
for root, dirs, files in os.walk(directory_path):
# Фильтруем скрытые директории
if not include_hidden:
dirs[:] = [d for d in dirs if not d.startswith('.')]
for name in files:
# Фильтруем скрытые файлы
if not include_hidden and name.startswith('.'):
continue
file_path = os.path.join(root, name)
# Применяем шаблон, если он указан
if pattern is None or fnmatch.fnmatch(name, pattern):
result.append(file_path)
else:
# Не рекурсивный обход
with os.scandir(directory_path) as entries:
for entry in entries:
# Фильтруем скрытые файлы и директории
if not include_hidden and entry.name.startswith('.'):
continue
# Применяем шаблон, если он указан
if pattern is None or fnmatch.fnmatch(entry.name, pattern):
result.append(entry.path)
return sorted(result)
except Exception as e:
print(f"Ошибка при получении списка файлов из директории '{directory_path}': {e}")
return None
def get_file_info(self, file_path):
"""
Получает информацию о файле
Args:
file_path (str): Путь к файлу
Returns:
dict: Информация о файле или None в случае ошибки
"""
try:
stat = os.stat(file_path)
# Получаем базовую информацию
info = {
'path': os.path.abspath(file_path),
'name': os.path.basename(file_path),
'directory': os.path.dirname(os.path.abspath(file_path)),
'size': stat.st_size,
'size_human': self._format_size(stat.st_size),
'created': datetime.fromtimestamp(stat.st_ctime),
'modified': datetime.fromtimestamp(stat.st_mtime),
'accessed': datetime.fromtimestamp(stat.st_atime),
'mode': stat.st_mode,
'is_file': os.path.isfile(file_path),
'is_dir': os.path.isdir(file_path),
'is_link': os.path.islink(file_path),
'extension': os.path.splitext(file_path)[1][1:].lower() if os.path.isfile(file_path) else None
}
# Получаем MD5-хеш для файлов (не для директорий и символических ссылок)
if info['is_file'] and not info['is_link']:
info['md5'] = self._calculate_file_hash(file_path)
return info
except Exception as e:
print(f"Ошибка при получении информации о файле '{file_path}': {e}")
return None
def find_files(self, directory, name_pattern=None, extension=None, min_size=None, max_size=None,
newer_than=None, older_than=None, recursive=True, include_hidden=False):
"""
Ищет файлы, соответствующие заданным критериям
Args:
directory (str): Директория для поиска
name_pattern (str): Шаблон имени файла
extension (str): Расширение файла
min_size (int): Минимальный размер файла в байтах
max_size (int): Максимальный размер файла в байтах
newer_than (datetime): Искать файлы новее указанной даты
older_than (datetime): Искать файлы старее указанной даты
recursive (bool): Рекурсивный поиск
include_hidden (bool): Включать скрытые файлы
Returns:
list: Список путей к найденным файлам или None в случае ошибки
"""
try:
result = []
# Компилируем регулярное выражение для шаблона имени
name_regex = re.compile(fnmatch.translate(name_pattern)) if name_pattern else None
for root, dirs, files in os.walk(directory):
# Фильтруем скрытые директории
if not include_hidden:
dirs[:] = [d for d in dirs if not d.startswith('.')]
for name in files:
# Фильтруем скрытые файлы
if not include_hidden and name.startswith('.'):
continue
file_path = os.path.join(root, name)
# Проверяем шаблон имени
if name_regex and not name_regex.match(name):
continue
# Проверяем расширение
if extension and not name.lower().endswith(f".{extension.lower()}"):
continue
# Получаем информацию о файле
stat = os.stat(file_path)
# Проверяем размер
if min_size is not None and stat.st_size < min_size:
continue
if max_size is not None and stat.st_size > max_size:
continue
# Проверяем дату модификации
modified = datetime.fromtimestamp(stat.st_mtime)
if newer_than is not None and modified < newer_than:
continue
if older_than is not None and modified > older_than:
continue
# Добавляем файл в результат
result.append(file_path)
# Если поиск не рекурсивный, прерываем цикл после первой итерации
if not recursive:
break
return sorted(result)
except Exception as e:
print(f"Ошибка при поиске файлов в директории '{directory}': {e}")
return None
def create_zip_archive(self, output_file, files, compression=zipfile.ZIP_DEFLATED):
"""
Создает ZIP-архив
Args:
output_file (str): Путь к выходному файлу
files (list): Список файлов или директорий для архивации
compression (int): Метод сжатия
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Создаем директорию для выходного файла, если она не существует
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True)
with zipfile.ZipFile(output_file, 'w', compression) as zipf:
# Обрабатываем каждый файл или директорию
for item in files:
if os.path.isfile(item):
# Добавляем файл в архив
zipf.write(item, os.path.basename(item))
elif os.path.isdir(item):
# Добавляем директорию и все ее содержимое
for root, _, filenames in os.walk(item):
for filename in filenames:
file_path = os.path.join(root, filename)
arcname = os.path.relpath(file_path, os.path.dirname(item))
zipf.write(file_path, arcname)
return True
except Exception as e:
print(f"Ошибка при создании ZIP-архива '{output_file}': {e}")
return False
def extract_zip_archive(self, archive_file, output_dir):
"""
Распаковывает ZIP-архив
Args:
archive_file (str): Путь к архиву
output_dir (str): Директория для распаковки
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Создаем директорию для распаковки, если она не существует
os.makedirs(output_dir, exist_ok=True)
with zipfile.ZipFile(archive_file, 'r') as zipf:
zipf.extractall(output_dir)
return True
except Exception as e:
print(f"Ошибка при распаковке ZIP-архива '{archive_file}': {e}")
return False
def create_tar_archive(self, output_file, files, compression='gz'):
"""
Создает TAR-архив
Args:
output_file (str): Путь к выходному файлу
files (list): Список файлов или директорий для архивации
compression (str): Метод сжатия ('gz', 'bz2', 'xz' или None)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Создаем директорию для выходного файла, если она не существует
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True)
# Определяем режим открытия архива
mode = 'w:'
if compression == 'gz':
mode = 'w:gz'
elif compression == 'bz2':
mode = 'w:bz2'
elif compression == 'xz':
mode = 'w:xz'
with tarfile.open(output_file, mode) as tarf:
# Обрабатываем каждый файл или директорию
for item in files:
if os.path.isfile(item):
# Добавляем файл в архив
tarf.add(item, arcname=os.path.basename(item))
elif os.path.isdir(item):
# Добавляем директорию и все ее содержимое
tarf.add(item, arcname=os.path.basename(item))
return True
except Exception as e:
print(f"Ошибка при создании TAR-архива '{output_file}': {e}")
return False
def extract_tar_archive(self, archive_file, output_dir):
"""
Распаковывает TAR-архив
Args:
archive_file (str): Путь к архиву
output_dir (str): Директория для распаковки
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Создаем директорию для распаковки, если она не существует
os.makedirs(output_dir, exist_ok=True)
with tarfile.open(archive_file, 'r:*') as tarf:
tarf.extractall(output_dir)
return True
except Exception as e:
print(f"Ошибка при распаковке TAR-архива '{archive_file}': {e}")
return False
def create_temporary_file(self, prefix=None, suffix=None, content=None, directory=None):
"""
Создает временный файл
Args:
prefix (str): Префикс для имени файла
suffix (str): Суффикс для имени файла
content (str or bytes): Содержимое для записи в файл
directory (str): Директория для создания файла
Returns:
str: Путь к созданному файлу или None в случае ошибки
"""
try:
# Создаем временный файл
with tempfile.NamedTemporaryFile(
prefix=prefix, suffix=suffix, dir=directory, delete=False) as tmp:
# Записываем содержимое, если оно указано
if content is not None:
if isinstance(content, str):
tmp.write(content.encode('utf-8'))
else:
tmp.write(content)
return tmp.name
except Exception as e:
print(f"Ошибка при создании временного файла: {e}")
return None
def create_temporary_directory(self, prefix=None, suffix=None, directory=None):
"""
Создает временную директорию
Args:
prefix (str): Префикс для имени директории
suffix (str): Суффикс для имени директории
directory (str): Родительская директория
Returns:
str: Путь к созданной директории или None в случае ошибки
"""
try:
# Создаем временную директорию
return tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=directory)
except Exception as e:
print(f"Ошибка при создании временной директории: {e}")
return None
def replace_in_file(self, file_path, search_text, replace_text, encoding='utf-8'):
"""
Заменяет текст в файле
Args:
file_path (str): Путь к файлу
search_text (str): Текст для поиска
replace_text (str): Текст для замены
encoding (str): Кодировка файла
Returns:
int: Количество замен или -1 в случае ошибки
"""
try:
# Читаем содержимое файла
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
# Выполняем замену
new_content, count = re.subn(search_text, replace_text, content)
# Записываем обновленное содержимое
with open(file_path, 'w', encoding=encoding) as f:
f.write(new_content)
return count
except Exception as e:
print(f"Ошибка при замене текста в файле '{file_path}': {e}")
return -1
def _calculate_file_hash(self, file_path, algorithm='md5'):
"""
Вычисляет хеш файла
Args:
file_path (str): Путь к файлу
algorithm (str): Алгоритм хеширования
Returns:
str: Хеш файла
"""
# Выбираем алгоритм хеширования
if algorithm == 'md5':
hasher = hashlib.md5()
elif algorithm == 'sha1':
hasher = hashlib.sha1()
elif algorithm == 'sha256':
hasher = hashlib.sha256()
else:
raise ValueError(f"Неподдерживаемый алгоритм хеширования: {algorithm}")
# Вычисляем хеш
with open(file_path, 'rb') as f:
# Читаем файл блоками для экономии памяти
for chunk in iter(lambda: f.read(4096), b''):
hasher.update(chunk)
return hasher.hexdigest()
def _format_size(self, size_bytes):
"""
Форматирует размер в байтах в человеко-читаемый формат
Args:
size_bytes (int): Размер в байтах
Returns:
str: Отформатированный размер
"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0 or unit == 'TB':
break
size_bytes /= 1024.0
return f"{size_bytes:.2f} {unit}"
def main():
parser = argparse.ArgumentParser(description='Управление файлами и каталогами')
subparsers = parser.add_subparsers(dest='command', help='Команда')
# Команда read - чтение файла
read_parser = subparsers.add_parser('read', help='Чтение файла')
read_parser.add_argument('file', help='Путь к файлу')
read_parser.add_argument('--encoding', default='utf-8', help='Кодировка файла')
read_parser.add_argument('--binary', action='store_true', help='Читать в бинарном режиме')
# Команда write - запись в файл
write_parser = subparsers.add_parser('write', help='Запись в файл')
write_parser.add_argument('file', help='Путь к файлу')
write_parser.add_argument('--content', help='Содержимое для записи')
write_parser.add_argument('--content-file', help='Файл с содержимым для записи')
write_parser.add_argument('--encoding', default='utf-8', help='Кодировка файла')
write_parser.add_argument('--binary', action='store_true', help='Записать в бинарном режиме')
write_parser.add_argument('--append', action='store_true', help='Добавить в конец файла')
# Команда copy - копирование файла
copy_parser = subparsers.add_parser('copy', help='Копирование файла')
copy_parser.add_argument('source', help='Путь к исходному файлу')
copy_parser.add_argument('destination', help='Путь к целевому файлу')
copy_parser.add_argument('--overwrite', action='store_true', help='Перезаписать существующий файл')
# Команда move - перемещение файла move_parser = subparsers.add_parser('move', help='Перемещение файла')
move_parser.add_argument('source', help='Путь к исходному файлу')
move_parser.add_argument('destination', help='Путь к целевому файлу')
move_parser.add_argument('--overwrite', action='store_true', help='Перезаписать существующий файл')
# Команда rename - переименование файла
rename_parser = subparsers.add_parser('rename', help='Переименование файла')
rename_parser.add_argument('file', help='Путь к файлу')
rename_parser.add_argument('name', help='Новое имя файла')
# Команда delete - удаление файла
delete_parser = subparsers.add_parser('delete', help='Удаление файла')
delete_parser.add_argument('path', help='Путь к файлу или директории')
delete_parser.add_argument('--recursive', action='store_true', help='Рекурсивное удаление директории')
# Команда mkdir - создание директории
mkdir_parser = subparsers.add_parser('mkdir', help='Создание директории')
mkdir_parser.add_argument('directory', help='Путь к директории')
# Команда list - вывод списка файлов и директорий
list_parser = subparsers.add_parser('list', help='Вывод списка файлов и директорий')
list_parser.add_argument('directory', help='Путь к директории')
list_parser.add_argument('--pattern', help='Шаблон для фильтрации файлов')
list_parser.add_argument('--recursive', action='store_true', help='Рекурсивный вывод')
list_parser.add_argument('--include-hidden', action='store_true', help='Включать скрытые файлы и директории')
# Команда info - вывод информации о файле
info_parser = subparsers.add_parser('info', help='Вывод информации о файле')
info_parser.add_argument('file', help='Путь к файлу')
# Команда find - поиск файлов
find_parser = subparsers.add_parser('find', help='Поиск файлов')
find_parser.add_argument('directory', help='Директория для поиска')
find_parser.add_argument('--name', help='Шаблон имени файла')
find_parser.add_argument('--extension', help='Расширение файла')
find_parser.add_argument('--min-size', type=int, help='Минимальный размер файла в байтах')
find_parser.add_argument('--max-size', type=int, help='Максимальный размер файла в байтах')
find_parser.add_argument('--newer-than', help='Искать файлы новее указанной даты (YYYY-MM-DD)')
find_parser.add_argument('--older-than', help='Искать файлы старее указанной даты (YYYY-MM-DD)')
find_parser.add_argument('--no-recursive', action='store_true', help='Не выполнять рекурсивный поиск')
find_parser.add_argument('--include-hidden', action='store_true', help='Включать скрытые файлы')
# Команда zip - создание ZIP-архива
zip_parser = subparsers.add_parser('zip', help='Создание ZIP-архива')
zip_parser.add_argument('output', help='Путь к выходному файлу')
zip_parser.add_argument('files', nargs='+', help='Список файлов или директорий для архивации')
# Команда unzip - распаковка ZIP-архива
unzip_parser = subparsers.add_parser('unzip', help='Распаковка ZIP-архива')
unzip_parser.add_argument('archive', help='Путь к архиву')
unzip_parser.add_argument('--output', '-o', default='.', help='Директория для распаковки')
# Команда tar - создание TAR-архива
tar_parser = subparsers.add_parser('tar', help='Создание TAR-архива')
tar_parser.add_argument('output', help='Путь к выходному файлу')
tar_parser.add_argument('files', nargs='+', help='Список файлов или директорий для архивации')
tar_parser.add_argument('--compression', choices=['gz', 'bz2', 'xz', 'none'], default='gz',
help='Метод сжатия (по умолчанию: gz)')
# Команда untar - распаковка TAR-архива
untar_parser = subparsers.add_parser('untar', help='Распаковка TAR-архива')
untar_parser.add_argument('archive', help='Путь к архиву')
untar_parser.add_argument('--output', '-o', default='.', help='Директория для распаковки')
# Команда replace - замена текста в файле
replace_parser = subparsers.add_parser('replace', help='Замена текста в файле')
replace_parser.add_argument('file', help='Путь к файлу')
replace_parser.add_argument('search', help='Текст для поиска')
replace_parser.add_argument('replace', help='Текст для замены')
replace_parser.add_argument('--encoding', default='utf-8', help='Кодировка файла')
args = parser.parse_args()
# Создаем менеджер файлов
manager = FileManager()
if args.command == 'read':
# Чтение файла
content = manager.read_file(args.file, args.encoding, args.binary)
if content is not None:
if args.binary:
# Выводим шестнадцатеричное представление для бинарных данных
hex_view = ' '.join(f'{b:02x}' for b in content[:100])
print(f"Первые 100 байт (hex): {hex_view}")
print(f"Всего прочитано: {len(content)} байт")
else:
# Выводим текстовое содержимое
print(content)
elif args.command == 'write':
# Запись в файл
content = None
if args.content_file:
# Читаем содержимое из файла
with open(args.content_file, 'rb' if args.binary else 'r',
**({} if args.binary else {'encoding': args.encoding})) as f:
content = f.read()
elif args.content:
content = args.content
else:
print("Ошибка: необходимо указать содержимое для записи")
return 1
if manager.write_file(args.file, content, args.encoding, args.binary, args.append):
print(f"Содержимое успешно записано в файл {args.file}")
else:
return 1
elif args.command == 'copy':
# Копирование файла
if manager.copy_file(args.source, args.destination, args.overwrite):
print(f"Файл скопирован: {args.source} -> {args.destination}")
else:
return 1
elif args.command == 'move':
# Перемещение файла
if manager.move_file(args.source, args.destination, args.overwrite):
print(f"Файл перемещен: {args.source} -> {args.destination}")
else:
return 1
elif args.command == 'rename':
# Переименование файла
if manager.rename_file(args.file, args.name):
print(f"Файл переименован: {args.file} -> {args.name}")
else:
return 1
elif args.command == 'delete':
# Удаление файла или директории
if os.path.isfile(args.path):
if manager.remove_file(args.path):
print(f"Файл удален: {args.path}")
else:
return 1
elif os.path.isdir(args.path):
if manager.remove_directory(args.path, args.recursive):
print(f"Директория удалена: {args.path}")
else:
return 1
else:
print(f"Ошибка: '{args.path}' не существует")
return 1
elif args.command == 'mkdir':
# Создание директории
if manager.create_directory(args.directory):
print(f"Директория создана: {args.directory}")
else:
return 1
elif args.command == 'list':
# Вывод списка файлов и директорий
items = manager.list_directory(args.directory, args.pattern, args.recursive, args.include_hidden)
if items is not None:
for item in items:
print(item)
print(f"Всего элементов: {len(items)}")
else:
return 1
elif args.command == 'info':
# Вывод информации о файле
info = manager.get_file_info(args.file)
if info is not None:
for key, value in info.items():
print(f"{key}: {value}")
else:
return 1
elif args.command == 'find':
# Парсим даты, если они указаны
newer_than = None
if args.newer_than:
newer_than = datetime.strptime(args.newer_than, '%Y-%m-%d')
older_than = None
if args.older_than:
older_than = datetime.strptime(args.older_than, '%Y-%m-%d')
# Поиск файлов
files = manager.find_files(
args.directory,
args.name,
args.extension,
args.min_size,
args.max_size,
newer_than,
older_than,
not args.no_recursive,
args.include_hidden
)
if files is not None:
for file in files:
print(file)
print(f"Найдено файлов: {len(files)}")
else:
return 1
elif args.command == 'zip':
# Создание ZIP-архива
if manager.create_zip_archive(args.output, args.files):
print(f"ZIP-архив создан: {args.output}")
else:
return 1
elif args.command == 'unzip':
# Распаковка ZIP-архива
if manager.extract_zip_archive(args.archive, args.output):
print(f"ZIP-архив распакован в {args.output}")
else:
return 1
elif args.command == 'tar':
# Создание TAR-архива
compression = None if args.compression == 'none' else args.compression
if manager.create_tar_archive(args.output, args.files, compression):
print(f"TAR-архив создан: {args.output}")
else:
return 1
elif args.command == 'untar':
# Распаковка TAR-архива
if manager.extract_tar_archive(args.archive, args.output):
print(f"TAR-архив распакован в {args.output}")
else:
return 1
elif args.command == 'replace':
# Замена текста в файле
count = manager.replace_in_file(args.file, args.search, args.replace, args.encoding)
if count >= 0:
print(f"Заменено вхождений: {count}")
else:
return 1
else:
parser.print_help()
return 0
if __name__ == "__main__":
sys.exit(main())
Использование:
# Чтение файла
python file_manager.py read /путь/к/файлу
# Запись в файл
python file_manager.py write /путь/к/файлу --content "Текст для записи"
# Копирование файла
python file_manager.py copy /исходный/файл /целевой/файл --overwrite
# Поиск файлов
python file_manager.py find /путь/к/директории --extension txt --min-size 1000
# Создание архива
python file_manager.py zip archive.zip /путь/к/файлу1 /путь/к/файлу2
# Распаковка архива
python file_manager.py unzip archive.zip --output /путь/для/распаковки
40. Модуль для сортировки элементов
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import time
import random
import argparse
import json
import csv
from operator import itemgetter, attrgetter
from functools import cmp_to_key
class SortingAlgorithms:
"""
Класс с реализациями различных алгоритмов сортировки
"""
@staticmethod
def bubble_sort(arr, key=None, reverse=False):
"""
Сортировка пузырьком
Args:
arr (list): Список для сортировки
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
Returns:
list: Отсортированный список
"""
# Делаем копию списка
result = arr.copy()
n = len(result)
# Функция для сравнения элементов
def compare(a, b):
if key is not None:
a = key(a)
b = key(b)
if reverse:
return a < b
else:
return a > b
# Сортировка
for i in range(n):
# Флаг, указывающий на то, были ли перестановки в текущем проходе
swapped = False
for j in range(0, n - i - 1):
if compare(result[j], result[j + 1]):
result[j], result[j + 1] = result[j + 1], result[j]
swapped = True
# Если не было перестановок, список уже отсортирован
if not swapped:
break
return result
@staticmethod
def selection_sort(arr, key=None, reverse=False):
"""
Сортировка выбором
Args:
arr (list): Список для сортировки
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
Returns:
list: Отсортированный список
"""
# Делаем копию списка
result = arr.copy()
n = len(result)
# Функция для сравнения элементов
def compare(a, b):
if key is not None:
a = key(a)
b = key(b)
if reverse:
return a > b
else:
return a < b
# Сортировка
for i in range(n):
# Находим минимальный (или максимальный) элемент
idx = i
for j in range(i + 1, n):
if compare(result[j], result[idx]):
idx = j
# Меняем местами найденный элемент с текущим
result[i], result[idx] = result[idx], result[i]
return result
@staticmethod
def insertion_sort(arr, key=None, reverse=False):
"""
Сортировка вставками
Args:
arr (list): Список для сортировки
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
Returns:
list: Отсортированный список
"""
# Делаем копию списка
result = arr.copy()
n = len(result)
# Функция для сравнения элементов
def compare(a, b):
if key is not None:
a = key(a)
b = key(b)
if reverse:
return a > b
else:
return a < b
# Сортировка
for i in range(1, n):
key_item = result[i]
j = i - 1
# Перемещаем элементы, большие key_item, на одну позицию вперед
while j >= 0 and not compare(result[j], key_item):
result[j + 1] = result[j]
j -= 1
result[j + 1] = key_item
return result
@staticmethod
def merge_sort(arr, key=None, reverse=False):
"""
Сортировка слиянием
Args:
arr (list): Список для сортировки
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
Returns:
list: Отсортированный список
"""
# Функция для сравнения элементов
def compare(a, b):
if key is not None:
a = key(a)
b = key(b)
if reverse:
return a > b
else:
return a < b
# Функция слияния двух отсортированных списков
def merge(left, right):
result = []
i = j = 0
while i < len(left) and j < len(right):
if compare(left[i], right[j]):
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
# Добавляем оставшиеся элементы
result.extend(left[i:])
result.extend(right[j:])
return result
# Рекурсивная функция сортировки
def merge_sort_recursive(arr):
# Базовый случай: список из 0 или 1 элемента уже отсортирован
if len(arr) <= 1:
return arr
# Разделяем список на две части
mid = len(arr) // 2
left = merge_sort_recursive(arr[:mid])
right = merge_sort_recursive(arr[mid:])
# Объединяем отсортированные половины
return merge(left, right)
# Выполняем сортировку
return merge_sort_recursive(arr.copy())
@staticmethod
def quick_sort(arr, key=None, reverse=False):
"""
Быстрая сортировка
Args:
arr (list): Список для сортировки
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
Returns:
list: Отсортированный список
"""
# Функция для сравнения элементов
def compare(a, b):
if key is not None:
a = key(a)
b = key(b)
if reverse:
return a > b
else:
return a < b
# Рекурсивная функция сортировки
def quick_sort_recursive(arr):
# Базовый случай: список из 0 или 1 элемента уже отсортирован
if len(arr) <= 1:
return arr
# Выбираем опорный элемент
pivot = arr[len(arr) // 2]
# Разделяем список на три части: меньше опорного, равные опорному и больше опорного
left = [x for x in arr if compare(x, pivot)]
middle = [x for x in arr if not compare(x, pivot) and not compare(pivot, x)]
right = [x for x in arr if compare(pivot, x)]
# Рекурсивно сортируем и объединяем части
return quick_sort_recursive(left) + middle + quick_sort_recursive(right)
# Выполняем сортировку
return quick_sort_recursive(arr.copy())
@staticmethod
def heap_sort(arr, key=None, reverse=False):
"""
Сортировка кучей (пирамидальная сортировка)
Args:
arr (list): Список для сортировки
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
Returns:
list: Отсортированный список
"""
# Делаем копию списка
result = arr.copy()
n = len(result)
# Функция для сравнения элементов
def compare(a, b):
if key is not None:
a = key(a)
b = key(b)
if reverse:
return a < b
else:
return a > b
# Функция для просеивания элемента вниз в куче
def heapify(arr, n, i):
largest = i
left = 2 * i + 1
right = 2 * i + 2
if left < n and compare(arr[largest], arr[left]):
largest = left
if right < n and compare(arr[largest], arr[right]):
largest = right
if largest != i:
arr[i], arr[largest] = arr[largest], arr[i]
heapify(arr, n, largest)
# Строим кучу (перестраиваем список)
for i in range(n // 2 - 1, -1, -1):
heapify(result, n, i)
# Извлекаем элементы из кучи по одному
for i in range(n - 1, 0, -1):
result[0], result[i] = result[i], result[0]
heapify(result, i, 0)
return result
@staticmethod
def counting_sort(arr, key=None, reverse=False):
"""
Сортировка подсчетом
Args:
arr (list): Список для сортировки
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
Returns:
list: Отсортированный список или None, если сортировка не применима
"""
# Проверяем, что элементы целые числа
if key is None:
for item in arr:
if not isinstance(item, int) or item < 0:
return None
else:
for item in arr:
value = key(item)
if not isinstance(value, int) or value < 0:
return None
# Делаем копию списка
result = arr.copy()
# Находим максимальное значение
if key is None:
max_value = max(result)
else:
max_value = max(result, key=key)
max_value = key(max_value)
# Создаем массив для подсчета
count = [0] * (max_value + 1)
# Подсчитываем количество элементов
for item in result:
value = item if key is None else key(item)
count[value] += 1
# Изменяем count, чтобы он содержал позиции
for i in range(1, len(count)):
count[i] += count[i - 1]
# Строим выходной массив
output = [0] * len(result)
# Обрабатываем элементы в обратном порядке для устойчивой сортировки
for i in range(len(result) - 1, -1, -1):
value = result[i] if key is None else key(result[i])
output[count[value] - 1] = result[i]
count[value] -= 1
# Копируем выходной массив в result
result = output
# При необходимости инвертируем порядок
if reverse:
result.reverse()
return result
@staticmethod
def radix_sort(arr, key=None, reverse=False):
"""
Поразрядная сортировка
Args:
arr (list): Список для сортировки
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
Returns:
list: Отсортированный список или None, если сортировка не применима
"""
# Проверяем, что элементы целые положительные числа
if key is None:
for item in arr:
if not isinstance(item, int) or item < 0:
return None
else:
for item in arr:
value = key(item)
if not isinstance(value, int) or value < 0:
return None
# Делаем копию списка
result = arr.copy()
# Функция для получения i-го разряда числа в десятичной системе
def get_digit(number, i):
return (number // (10 ** i)) % 10
# Сортировка по разрядам
max_value = max(result) if key is None else key(max(result, key=key))
exp = 1
while max_value // exp > 0:
# Используем сортировку подсчетом для текущего разряда
count = [0] * 10
output = [0] * len(result)
# Подсчитываем количество элементов для каждого значения текущего разряда
for item in result:
value = item if key is None else key(item)
digit = get_digit(value, 0 if exp == 1 else int(exp / 10))
count[digit] += 1
# Изменяем count, чтобы он содержал позиции
for i in range(1, 10):
count[i] += count[i - 1]
# Строим выходной массив
for i in range(len(result) - 1, -1, -1):
value = result[i] if key is None else key(result[i])
digit = get_digit(value, 0 if exp == 1 else int(exp / 10))
output[count[digit] - 1] = result[i]
count[digit] -= 1
# Копируем выходной массив в result
result = output
# Переходим к следующему разряду
exp *= 10
# При необходимости инвертируем порядок
if reverse:
result.reverse()
return result
@staticmethod
def bucket_sort(arr, key=None, reverse=False, bucket_count=10):
"""
Блочная сортировка
Args:
arr (list): Список для сортировки
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
bucket_count (int): Количество блоков
Returns:
list: Отсортированный список или None, если сортировка не применима
"""
# Проверяем, что элементы числа из интервала [0, 1)
if key is None:
for item in arr:
if not isinstance(item, (int, float)) or item < 0 or item >= 1:
return None
else:
for item in arr:
value = key(item)
if not isinstance(value, (int, float)) or value < 0 or value >= 1:
return None
# Делаем копию списка
result = arr.copy()
# Создаем пустые блоки
buckets = [[] for _ in range(bucket_count)]
# Распределяем элементы по блокам
for item in result:
value = item if key is None else key(item)
buckets[int(value * bucket_count)].append(item)
# Сортируем каждый блок встроенной сортировкой
for i in range(bucket_count):
buckets[i].sort(key=key, reverse=reverse)
# Объединяем блоки
result = []
for bucket in buckets:
result.extend(bucket)
return result
@staticmethod
def shell_sort(arr, key=None, reverse=False):
"""
Сортировка Шелла
Args:
arr (list): Список для сортировки
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
Returns:
list: Отсортированный список
"""
# Делаем копию списка
result = arr.copy()
n = len(result)
# Функция для сравнения элементов def compare(a, b):
if key is not None:
a = key(a)
b = key(b)
if reverse:
return a > b
else:
return a < b
# Сортировка
gap = n // 2
while gap > 0:
for i in range(gap, n):
temp = result[i]
j = i
while j >= gap and not compare(result[j - gap], temp):
result[j] = result[j - gap]
j -= gap
result[j] = temp
gap //= 2
return result
@staticmethod
def tim_sort(arr, key=None, reverse=False):
"""
Timsort - гибридный алгоритм сортировки, используемый в Python
Args:
arr (list): Список для сортировки
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
Returns:
list: Отсортированный список
"""
# Делаем копию списка и используем встроенную сортировку Python (timsort)
result = arr.copy()
result.sort(key=key, reverse=reverse)
return result
@staticmethod
def bogosort(arr, key=None, reverse=False, max_iterations=1000):
"""
Случайная сортировка (Bogosort)
Args:
arr (list): Список для сортировки
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
max_iterations (int): Максимальное количество итераций
Returns:
list: Отсортированный список или None, если превышено максимальное количество итераций
"""
# Делаем копию списка
result = arr.copy()
n = len(result)
# Функция для проверки, отсортирован ли список
def is_sorted(arr):
for i in range(1, len(arr)):
if key is not None:
a, b = key(arr[i-1]), key(arr[i])
else:
a, b = arr[i-1], arr[i]
if reverse:
if a < b:
return False
else:
if a > b:
return False
return True
# Сортировка
iterations = 0
while not is_sorted(result) and iterations < max_iterations:
# Перемешиваем список
random.shuffle(result)
iterations += 1
if iterations == max_iterations:
return None
return result
class Sorter:
"""
Класс для сортировки элементов
"""
def __init__(self):
"""
Инициализирует сортировщик
"""
self.algorithms = {
'bubble': SortingAlgorithms.bubble_sort,
'selection': SortingAlgorithms.selection_sort,
'insertion': SortingAlgorithms.insertion_sort,
'merge': SortingAlgorithms.merge_sort,
'quick': SortingAlgorithms.quick_sort,
'heap': SortingAlgorithms.heap_sort,
'counting': SortingAlgorithms.counting_sort,
'radix': SortingAlgorithms.radix_sort,
'bucket': SortingAlgorithms.bucket_sort,
'shell': SortingAlgorithms.shell_sort,
'tim': SortingAlgorithms.tim_sort,
'python': SortingAlgorithms.tim_sort, # Alias for tim_sort (Python's built-in sort)
'bogo': SortingAlgorithms.bogosort
}
def sort(self, items, algorithm='python', key=None, reverse=False, measure_time=False):
"""
Сортирует элементы выбранным алгоритмом
Args:
items (list): Список элементов для сортировки
algorithm (str): Название алгоритма сортировки
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
measure_time (bool): Измерять время выполнения
Returns:
tuple: Кортеж (отсортированный_список, время_выполнения) если measure_time=True,
иначе только отсортированный список
"""
# Выбираем алгоритм сортировки
sort_function = self.algorithms.get(algorithm.lower())
if sort_function is None:
raise ValueError(f"Неизвестный алгоритм сортировки: {algorithm}")
# Засекаем время
start_time = time.time()
# Выполняем сортировку
sorted_items = sort_function(items, key, reverse)
# Вычисляем время выполнения
execution_time = time.time() - start_time
if measure_time:
return sorted_items, execution_time
else:
return sorted_items
def sort_by_attribute(self, items, attribute, algorithm='python', reverse=False, measure_time=False):
"""
Сортирует объекты по значению атрибута
Args:
items (list): Список объектов для сортировки
attribute (str): Имя атрибута для сортировки
algorithm (str): Название алгоритма сортировки
reverse (bool): Сортировка в обратном порядке
measure_time (bool): Измерять время выполнения
Returns:
tuple: Кортеж (отсортированный_список, время_выполнения) если measure_time=True,
иначе только отсортированный список
"""
return self.sort(items, algorithm, attrgetter(attribute), reverse, measure_time)
def sort_by_key(self, items, key_name, algorithm='python', reverse=False, measure_time=False):
"""
Сортирует словари по значению ключа
Args:
items (list): Список словарей для сортировки
key_name: Ключ для сортировки
algorithm (str): Название алгоритма сортировки
reverse (bool): Сортировка в обратном порядке
measure_time (bool): Измерять время выполнения
Returns:
tuple: Кортеж (отсортированный_список, время_выполнения) если measure_time=True,
иначе только отсортированный список
"""
return self.sort(items, algorithm, itemgetter(key_name), reverse, measure_time)
def sort_by_multiple_keys(self, items, keys, algorithm='python', reverse=False, measure_time=False):
"""
Сортирует словари по нескольким ключам
Args:
items (list): Список словарей для сортировки
keys (list): Список ключей для сортировки
algorithm (str): Название алгоритма сортировки
reverse (bool): Сортировка в обратном порядке
measure_time (bool): Измерять время выполнения
Returns:
tuple: Кортеж (отсортированный_список, время_выполнения) если measure_time=True,
иначе только отсортированный список
"""
return self.sort(items, algorithm, itemgetter(*keys), reverse, measure_time)
def sort_by_custom_compare(self, items, compare_func, algorithm='python', measure_time=False):
"""
Сортирует элементы с использованием пользовательской функции сравнения
Args:
items (list): Список элементов для сортировки
compare_func (callable): Функция сравнения, принимающая два аргумента и возвращающая
отрицательное число, если первый элемент меньше второго,
положительное число, если первый элемент больше второго,
или ноль, если элементы равны
algorithm (str): Название алгоритма сортировки
measure_time (bool): Измерять время выполнения
Returns:
tuple: Кортеж (отсортированный_список, время_выполнения) если measure_time=True,
иначе только отсортированный список
"""
# Для этого метода нельзя использовать встроенные функции и нужно реализовать свою
if algorithm == 'python' or algorithm == 'tim':
# Для встроенной сортировки используем cmp_to_key
return self.sort(items, algorithm, key=cmp_to_key(compare_func), reverse=False, measure_time=measure_time)
else:
# Реализуем свой компаратор для других алгоритмов
def key_func(item):
return item
def reverse_func(is_less):
return not is_less
# Выбираем алгоритм сортировки
sort_function = self.algorithms.get(algorithm.lower())
if sort_function is None:
raise ValueError(f"Неизвестный алгоритм сортировки: {algorithm}")
# Засекаем время
start_time = time.time()
# Создаем копию списка
result = items.copy()
# Реализуем свою сортировку для алгоритмов, которые требуют key и reverse
if algorithm in ['bubble', 'selection', 'insertion', 'shell']:
n = len(result)
for i in range(n):
for j in range(0, n - i - 1):
if compare_func(result[j], result[j + 1]) > 0:
result[j], result[j + 1] = result[j + 1], result[j]
else:
# Для остальных алгоритмов используем встроенную сортировку Python
result.sort(key=cmp_to_key(compare_func))
# Вычисляем время выполнения
execution_time = time.time() - start_time
if measure_time:
return result, execution_time
else:
return result
def benchmark_algorithms(self, items, algorithms=None, key=None, reverse=False):
"""
Сравнивает скорость работы различных алгоритмов сортировки
Args:
items (list): Список элементов для сортировки
algorithms (list): Список названий алгоритмов для сравнения
key (callable): Функция для получения ключа сортировки
reverse (bool): Сортировка в обратном порядке
Returns:
dict: Словарь {алгоритм: время_выполнения}
"""
if algorithms is None:
# По умолчанию сравниваем все алгоритмы, кроме bogosort
algorithms = [algo for algo in self.algorithms.keys() if algo != 'bogo']
results = {}
for algorithm in algorithms:
try:
_, execution_time = self.sort(items, algorithm, key, reverse, measure_time=True)
results[algorithm] = execution_time
except Exception as e:
results[algorithm] = f"Ошибка: {e}"
return results
def main():
parser = argparse.ArgumentParser(description='Сортировка элементов')
parser.add_argument('--input', '-i', required=True, help='Путь к входному файлу или строка с данными')
parser.add_argument('--format', '-f', choices=['json', 'csv', 'text'], default='json',
help='Формат входных данных (по умолчанию: json)')
parser.add_argument('--output', '-o', help='Путь к выходному файлу')
parser.add_argument('--algorithm', '-a', default='python',
help='Алгоритм сортировки (по умолчанию: python)')
parser.add_argument('--key', '-k', help='Ключ для сортировки (для словарей)')
parser.add_argument('--keys', nargs='+', help='Ключи для сортировки по нескольким полям (для словарей)')
parser.add_argument('--reverse', '-r', action='store_true', help='Сортировка в обратном порядке')
parser.add_argument('--benchmark', '-b', action='store_true', help='Сравнить скорость алгоритмов')
parser.add_argument('--algorithms', nargs='+',
help='Список алгоритмов для сравнения (для режима benchmark)')
args = parser.parse_args()
# Создаем сортировщик
sorter = Sorter()
# Загружаем данные
items = None
# Проверяем, является ли input файлом
if os.path.exists(args.input):
try:
if args.format == 'json':
with open(args.input, 'r', encoding='utf-8') as f:
items = json.load(f)
elif args.format == 'csv':
items = []
with open(args.input, 'r', encoding='utf-8', newline='') as f:
reader = csv.DictReader(f)
for row in reader:
items.append(dict(row))
elif args.format == 'text':
with open(args.input, 'r', encoding='utf-8') as f:
items = [line.strip() for line in f if line.strip()]
except Exception as e:
print(f"Ошибка при загрузке данных из файла: {e}")
return 1
else:
# Считаем, что input - это строка с данными
try:
if args.format == 'json':
items = json.loads(args.input)
elif args.format == 'csv':
items = []
lines = args.input.strip().split('\n')
if not lines:
print("Ошибка: пустые входные данные")
return 1
# Разбираем заголовки и данные
headers = lines[0].split(',')
for line in lines[1:]:
values = line.split(',')
items.append(dict(zip(headers, values)))
elif args.format == 'text':
items = args.input.strip().split(',')
except Exception as e:
print(f"Ошибка при разборе входных данных: {e}")
return 1
# Проверяем, что данные загружены
if items is None or not items:
print("Ошибка: пустые входные данные")
return 1
# Режим сравнения алгоритмов
if args.benchmark:
results = sorter.benchmark_algorithms(
items,
args.algorithms,
itemgetter(args.key) if args.key else None,
args.reverse
)
# Выводим результаты
print("Результаты сравнения алгоритмов:")
for algorithm, time_or_error in sorted(results.items(), key=lambda x: (isinstance(x[1], str), x[1])):
if isinstance(time_or_error, str):
print(f"{algorithm}: {time_or_error}")
else:
print(f"{algorithm}: {time_or_error:.6f} сек.")
return 0
# Сортировка
try:
# Выбираем метод сортировки в зависимости от ключей
if args.keys:
sorted_items = sorter.sort_by_multiple_keys(
items, args.keys, args.algorithm, args.reverse
)
elif args.key:
sorted_items = sorter.sort_by_key(
items, args.key, args.algorithm, args.reverse
)
else:
sorted_items = sorter.sort(items, args.algorithm, reverse=args.reverse)
# Сохраняем результаты
if args.output:
try:
if args.format == 'json':
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(sorted_items, f, ensure_ascii=False, indent=4)
elif args.format == 'csv':
if not sorted_items or not isinstance(sorted_items[0], dict):
print("Ошибка: данные не являются словарями")
return 1
with open(args.output, 'w', encoding='utf-8', newline='') as f:
# Получаем все ключи из первого элемента
fieldnames = list(sorted_items[0].keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(sorted_items)
elif args.format == 'text':
with open(args.output, 'w', encoding='utf-8') as f:
for item in sorted_items:
f.write(str(item) + '\n')
print(f"Результаты сортировки сохранены в {args.output}")
except Exception as e:
print(f"Ошибка при сохранении результатов: {e}")
return 1
else:
# Выводим результаты
if args.format == 'json':
print(json.dumps(sorted_items, ensure_ascii=False, indent=4))
elif args.format == 'csv':
if not sorted_items or not isinstance(sorted_items[0], dict):
print("Ошибка: данные не являются словарями")
return 1
# Получаем все ключи из первого элемента
fieldnames = list(sorted_items[0].keys())
print(','.join(fieldnames))
for item in sorted_items:
print(','.join(str(item.get(field, '')) for field in fieldnames))
elif args.format == 'text':
for item in sorted_items:
print(item)
except Exception as e:
print(f"Ошибка при сортировке данных: {e}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
Использование:
# Сортировка списка чисел
python sorter.py --input "[5, 3, 8, 1, 2, 7]" --format json
# Сортировка списка словарей по ключу
python sorter.py --input data.json --key "name" --reverse
# Сортировка CSV-файла по нескольким полям
python sorter.py --input data.csv --format csv --keys age name --output sorted.csv
# Сравнение скорости алгоритмов сортировки
python sorter.py --input data.json --benchmark --algorithms bubble quick merge
41. Модуль для анализа и фильтрации системных журналов
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import os
import sys
import argparse
import gzip
import datetime
from collections import Counter, defaultdict
class LogAnalyzer:
"""
Класс для анализа и фильтрации системных журналов
"""
def __init__(self, log_path=None):
"""
Инициализирует анализатор логов
Args:
log_path (str): Путь к файлу лога или директории с логами
"""
self.log_path = log_path
self.logs = []
self.parsed_logs = []
# Регулярные выражения для парсинга разных форматов логов
# Стандартный формат syslog
self.syslog_pattern = re.compile(
r'(?P<month>[A-Za-z]{3})\s+(?P<day>\d{1,2})\s+(?P<time>\d{2}:\d{2}:\d{2})'
r'\s+(?P<hostname>[^ ]+)'
r'\s+(?P<process>[^:]+)(?:\[(?P<pid>\d+)\])?:'
r'\s+(?P<message>.*)'
)
# Формат с датой и ISO-таймстампом
self.iso_pattern = re.compile(
r'(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:[+-]\d{4})?)'
r'\s+(?P<hostname>[^ ]+)'
r'\s+(?P<process>[^:]+)(?:\[(?P<pid>\d+)\])?:'
r'\s+(?P<message>.*)'
)
# Формат с уровнем важности
self.level_pattern = re.compile(
r'(?P<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?)'
r'\s+(?P<level>[A-Z]+)'
r'\s+(?P<process>[^:]+)(?:\[(?P<pid>\d+)\])?:'
r'\s+(?P<message>.*)'
)
def load_logs(self, log_path=None):
"""
Загружает логи из файла или директории
Args:
log_path (str): Путь к файлу лога или директории с логами
Returns:
int: Количество загруженных строк лога
"""
if log_path:
self.log_path = log_path
if not self.log_path:
raise ValueError("Путь к логам не указан")
self.logs = []
# Если путь указывает на директорию, загружаем все файлы логов
if os.path.isdir(self.log_path):
for root, _, files in os.walk(self.log_path):
for file in files:
if file.endswith('.log') or file.endswith('.gz'):
file_path = os.path.join(root, file)
self._load_log_file(file_path)
else:
# Загружаем один файл лога
self._load_log_file(self.log_path)
print(f"Загружено {len(self.logs)} строк лога")
return len(self.logs)
def _load_log_file(self, file_path):
"""
Загружает отдельный файл лога
Args:
file_path (str): Путь к файлу лога
"""
print(f"Загрузка файла лога: {file_path}")
try:
# Открываем gzip файл, если расширение .gz
if file_path.endswith('.gz'):
with gzip.open(file_path, 'rt', encoding='utf-8', errors='replace') as f:
self.logs.extend(f.readlines())
else:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
self.logs.extend(f.readlines())
except Exception as e:
print(f"Ошибка при загрузке файла {file_path}: {e}")
def parse_logs(self):
"""
Парсит загруженные логи с использованием регулярных выражений
Returns:
list: Список распарсенных записей логов
"""
self.parsed_logs = []
for line in self.logs:
line = line.strip()
if not line:
continue
log_entry = None
# Пробуем разные шаблоны парсинга
for pattern_name, pattern in [('syslog', self.syslog_pattern),
('iso', self.iso_pattern),
('level', self.level_pattern)]:
match = pattern.match(line)
if match:
log_entry = match.groupdict()
log_entry['pattern'] = pattern_name
log_entry['raw'] = line
# Преобразуем метку времени в datetime
if pattern_name == 'syslog':
# Предполагаем текущий год
current_year = datetime.datetime.now().year
timestamp_str = f"{current_year} {log_entry['month']} {log_entry['day']} {log_entry['time']}"
try:
log_entry['timestamp'] = datetime.datetime.strptime(
timestamp_str, "%Y %b %d %H:%M:%S"
)
except ValueError:
log_entry['timestamp'] = None
elif pattern_name == 'iso':
try:
log_entry['timestamp'] = datetime.datetime.fromisoformat(log_entry['timestamp'])
except ValueError:
log_entry['timestamp'] = None
elif pattern_name == 'level':
try:
log_entry['timestamp'] = datetime.datetime.strptime(
log_entry['timestamp'], "%Y-%m-%d %H:%M:%S"
)
except ValueError:
log_entry['timestamp'] = None
break
# Если ни один шаблон не подошел, сохраняем строку как есть
if log_entry is None:
log_entry = {
'pattern': 'unknown',
'raw': line,
'message': line,
'timestamp': None
}
self.parsed_logs.append(log_entry)
print(f"Проанализировано {len(self.parsed_logs)} записей лога")
return self.parsed_logs
def filter_logs(self, keywords=None, regex=None, process=None, timerange=None,
level=None, exclude=None, limit=None):
"""
Фильтрует логи по различным критериям
Args:
keywords (list): Список ключевых слов для поиска
regex (str): Регулярное выражение для поиска
process (str): Фильтр по процессу
timerange (tuple): Кортеж (start_time, end_time) для фильтрации по времени
level (str): Фильтр по уровню важности
exclude (list): Список ключевых слов для исключения
limit (int): Максимальное количество результатов
Returns:
list: Отфильтрованные записи логов
"""
if not self.parsed_logs:
self.parse_logs()
filtered = self.parsed_logs.copy()
# Фильтрация по ключевым словам
if keywords:
keywords_lower = [kw.lower() for kw in keywords]
filtered = [
entry for entry in filtered
if any(kw in entry['raw'].lower() for kw in keywords_lower)
]
# Фильтрация по регулярному выражению
if regex:
try:
pattern = re.compile(regex)
filtered = [
entry for entry in filtered
if pattern.search(entry['raw'])
]
except re.error as e:
print(f"Ошибка в регулярном выражении: {e}")
# Фильтрация по процессу
if process:
filtered = [
entry for entry in filtered
if 'process' in entry and entry['process'] and process.lower() in entry['process'].lower()
]
# Фильтрация по уровню важности
if level:
filtered = [
entry for entry in filtered
if 'level' in entry and entry['level'] and entry['level'].lower() == level.lower()
]
# Фильтрация по диапазону времени
if timerange and all(timerange):
start_time, end_time = timerange
filtered = [
entry for entry in filtered
if entry['timestamp'] and start_time <= entry['timestamp'] <= end_time
]
# Исключение ключевых слов
if exclude:
exclude_lower = [ex.lower() for ex in exclude]
filtered = [
entry for entry in filtered
if not any(ex in entry['raw'].lower() for ex in exclude_lower)
]
# Ограничение количества результатов
if limit and limit > 0:
filtered = filtered[:limit]
print(f"Отфильтровано {len(filtered)} записей")
return filtered
def count_by_field(self, field, filtered_logs=None):
"""
Подсчитывает частоту значений в указанном поле
Args:
field (str): Поле для подсчета
filtered_logs (list): Список отфильтрованных логов (если None, используются все логи)
Returns:
Counter: Счетчик частоты значений
"""
logs = filtered_logs if filtered_logs is not None else self.parsed_logs
counter = Counter()
for entry in logs:
if field in entry and entry[field]:
counter[entry[field]] += 1
return counter
def count_by_time(self, interval='hour', filtered_logs=None):
"""
Подсчитывает количество записей по временным интервалам
Args:
interval (str): Интервал ('minute', 'hour', 'day', 'month')
filtered_logs (list): Список отфильтрованных логов (если None, используются все логи)
Returns:
dict: Словарь {интервал: количество}
"""
logs = filtered_logs if filtered_logs is not None else self.parsed_logs
counter = defaultdict(int)
for entry in logs:
if 'timestamp' in entry and entry['timestamp']:
if interval == 'minute':
key = entry['timestamp'].strftime('%Y-%m-%d %H:%M')
elif interval == 'hour':
key = entry['timestamp'].strftime('%Y-%m-%d %H')
elif interval == 'day':
key = entry['timestamp'].strftime('%Y-%m-%d')
elif interval == 'month':
key = entry['timestamp'].strftime('%Y-%m')
else:
key = str(entry['timestamp'])
counter[key] += 1
return dict(counter)
def find_error_patterns(self, filtered_logs=None, min_count=2):
"""
Ищет повторяющиеся паттерны ошибок в логах
Args:
filtered_logs (list): Список отфильтрованных логов (если None, используются все логи)
min_count (int): Минимальное количество повторений для выделения паттерна
Returns:
list: Список кортежей (паттерн, количество)
"""
logs = filtered_logs if filtered_logs is not None else self.parsed_logs
# Ищем только в сообщениях с ошибками
error_keywords = ['error', 'exception', 'fail', 'critical', 'alert', 'emergency']
error_logs = [
entry for entry in logs
if any(kw in entry['raw'].lower() for kw in error_keywords)
]
# Подсчитываем частоту сообщений
message_counter = Counter()
for entry in error_logs:
if 'message' in entry and entry['message']:
# Удаляем переменные части (IP, даты, числа)
message = re.sub(r'\d+\.\d+\.\d+\.\d+', 'IP_ADDR', entry['message'])
message = re.sub(r'\d{4}-\d{2}-\d{2}', 'DATE', message)
message = re.sub(r'\d+', 'NUM', message)
message_counter[message] += 1
# Отбираем паттерны, которые встречаются минимальное количество раз
patterns = [(message, count) for message, count in message_counter.items() if count >= min_count]
# Сортируем по количеству в убывающем порядке
patterns.sort(key=lambda x: x[1], reverse=True)
return patterns
def generate_report(self, filtered_logs=None, output_file=None):
"""
Генерирует отчет на основе анализа логов
Args:
filtered_logs (list): Список отфильтрованных логов (если None, используются все логи)
output_file (str): Путь к файлу для сохранения отчета
Returns:
str: Текст отчета
"""
logs = filtered_logs if filtered_logs is not None else self.parsed_logs
report = []
report.append("=== Отчет по анализу системных журналов ===")
report.append(f"Дата создания: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"Всего проанализировано записей: {len(logs)}")
report.append("")
# Общая статистика
report.append("== Общая статистика ==")
process_counts = self.count_by_field('process', logs)
report.append(f"Топ-10 процессов:")
for process, count in process_counts.most_common(10):
report.append(f" {process}: {count}")
report.append("")
# Распределение по времени
time_counts = self.count_by_time('hour', logs)
report.append(f"Распределение событий по часам:")
for time_key, count in sorted(time_counts.items()):
report.append(f" {time_key}: {count}")
report.append("")
# Паттерны ошибок
error_patterns = self.find_error_patterns(logs)
report.append(f"Распространенные паттерны ошибок:")
for pattern, count in error_patterns[:10]: # Топ-10 паттернов
report.append(f" [{count}] {pattern}")
report.append("")
# Примеры ошибок
error_keywords = ['error', 'exception', 'fail', 'critical', 'alert', 'emergency']
error_logs = [
entry for entry in logs
if any(kw in entry['raw'].lower() for kw in error_keywords)
]
report.append(f"Примеры последних ошибок:")
for entry in sorted(error_logs, key=lambda x: x.get('timestamp', 0), reverse=True)[:5]:
timestamp = entry.get('timestamp', 'Unknown time')
process = entry.get('process', 'Unknown process')
message = entry.get('message', entry.get('raw', 'No message'))
report.append(f" [{timestamp}] {process}: {message}")
report_text = "\n".join(report)
# Сохраняем отчет в файл
if output_file:
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report_text)
print(f"Отчет сохранен в файл: {output_file}")
except Exception as e:
print(f"Ошибка при сохранении отчета: {e}")
return report_text
def main():
parser = argparse.ArgumentParser(description='Анализ и фильтрация системных журналов')
parser.add_argument('--log', '-l', required=True, help='Путь к файлу лога или директории с логами')
parser.add_argument('--keywords', '-k', nargs='+', help='Ключевые слова для поиска')
parser.add_argument('--regex', '-r', help='Регулярное выражение для поиска')
parser.add_argument('--process', '-p', help='Фильтр по процессу')
parser.add_argument('--level', help='Фильтр по уровню важности')
parser.add_argument('--start-time', help='Начало временного диапазона (YYYY-MM-DD HH:MM:SS)')
parser.add_argument('--end-time', help='Конец временного диапазона (YYYY-MM-DD HH:MM:SS)')
parser.add_argument('--exclude', '-e', nargs='+', help='Ключевые слова для исключения')
parser.add_argument('--limit', '-m', type=int, help='Максимальное количество результатов')
parser.add_argument('--output', '-o', help='Путь к файлу для сохранения отчета')
parser.add_argument('--report', action='store_true', help='Генерировать отчет')
args = parser.parse_args()
analyzer = LogAnalyzer(args.log)
analyzer.load_logs()
analyzer.parse_logs()
# Обработка временного диапазона
start_time = None
end_time = None
if args.start_time:
try:
start_time = datetime.datetime.strptime(args.start_time, "%Y-%m-%d %H:%M:%S")
except ValueError:
print(f"Ошибка в формате начального времени: {args.start_time}")
return 1
if args.end_time:
try:
end_time = datetime.datetime.strptime(args.end_time, "%Y-%m-%d %H:%M:%S")
except ValueError:
print(f"Ошибка в формате конечного времени: {args.end_time}")
return 1
timerange = (start_time, end_time) if start_time or end_time else None
# Фильтрация логов
filtered_logs = analyzer.filter_logs(
keywords=args.keywords,
regex=args.regex,
process=args.process,
timerange=timerange,
level=args.level,
exclude=args.exclude,
limit=args.limit
)
# Вывод отфильтрованных логов
if not args.report:
for entry in filtered_logs:
print(entry['raw'])
# Генерация отчета
if args.report or args.output:
report = analyzer.generate_report(filtered_logs, args.output)
if args.report:
print(report)
return 0
if __name__ == "__main__":
sys.exit(main())
Использование:
# Анализ логов с поиском ошибок
python log_analyzer.py -l /var/log/syslog -k error fail exception
# Фильтрация по процессу и генерация отчета
python log_analyzer.py -l /var/log/syslog -p apache2 --report -o apache_report.txt
# Фильтрация по временному диапазону
python log_analyzer.py -l /var/log/auth.log --start-time "2023-05-01 00:00:00" --end-time "2023-05-02 00:00:00"
42. Модуль для мониторинга использования ресурсов системы(проблемы)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import time
import psutil
import argparse
import platform
import socket
import json
import csv
import logging
import smtplib
import threading
import datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import matplotlib.pyplot as plt
from pathlib import Path
class SystemMonitor:
"""
Класс для мониторинга ресурсов системы и уведомления о превышении пороговых значений
"""
def __init__(self, interval=5, log_file=None, thresholds=None):
"""
Инициализирует монитор системы
Args:
interval (int): Интервал мониторинга в секундах
log_file (str): Путь к файлу логов
thresholds (dict): Пороговые значения для ресурсов
"""
self.interval = interval
self.log_file = log_file
# Устанавливаем пороговые значения по умолчанию, если не указаны
self.thresholds = {
'cpu_percent': 80.0,
'memory_percent': 80.0,
'disk_percent': 90.0,
'network_mbps': 100.0,
'swap_percent': 80.0
}
# Обновляем пороговые значения из аргументов
if thresholds:
self.thresholds.update(thresholds)
# Инициализируем логгер
self._setup_logger()
# Данные мониторинга
self.monitoring_data = []
# Флаг для остановки мониторинга
self.stop_monitoring = False
# Настройки уведомлений
self.notification_settings = {
'email': {
'enabled': False,
'server': None,
'port': 587,
'username': None,
'password': None,
'sender': None,
'recipients': []
}
}
# Метаданные системы
self.system_info = self._get_system_info()
def _setup_logger(self):
"""
Настраивает логгер
"""
self.logger = logging.getLogger("SystemMonitor")
self.logger.setLevel(logging.INFO)
# Форматтер для логов
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# Обработчик для вывода в консоль
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# Обработчик для вывода в файл
if self.log_file:
try:
# Создаем директорию для файла логов
log_dir = os.path.dirname(os.path.abspath(self.log_file))
os.makedirs(log_dir, exist_ok=True)
file_handler = logging.FileHandler(self.log_file)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
except Exception as e:
self.logger.error(f"Ошибка при настройке файлового логгера: {e}")
def _get_system_info(self):
"""
Получает информацию о системе
Returns:
dict: Информация о системе
"""
info = {
'hostname': socket.gethostname(),
'platform': platform.platform(),
'processor': platform.processor(),
'cpu_count': psutil.cpu_count(logical=True),
'physical_cpu_count': psutil.cpu_count(logical=False),
'total_memory': psutil.virtual_memory().total,
'total_disk': {disk.mountpoint: disk.total for disk in psutil.disk_partitions() if os.path.exists(disk.mountpoint)},
'boot_time': datetime.datetime.fromtimestamp(psutil.boot_time()).strftime('%Y-%m-%d %H:%M:%S'),
'python_version': platform.python_version()
}
return info
def set_email_notification(self, server, port, username, password, sender, recipients):
"""
Настраивает отправку уведомлений по электронной почте
Args:
server (str): SMTP-сервер
port (int): Порт SMTP-сервера
username (str): Имя пользователя
password (str): Пароль
sender (str): Адрес отправителя
recipients (list): Список адресов получателей
"""
self.notification_settings['email']['enabled'] = True
self.notification_settings['email']['server'] = server
self.notification_settings['email']['port'] = port
self.notification_settings['email']['username'] = username
self.notification_settings['email']['password'] = password
self.notification_settings['email']['sender'] = sender
self.notification_settings['email']['recipients'] = recipients
self.logger.info("Настройка уведомлений по электронной почте выполнена")
def send_email_notification(self, subject, message):
"""
Отправляет уведомление по электронной почте
Args:
subject (str): Тема письма
message (str): Текст письма
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if not self.notification_settings['email']['enabled']:
self.logger.warning("Уведомления по электронной почте не включены")
return False
try:
msg = MIMEMultipart()
msg['From'] = self.notification_settings['email']['sender']
msg['To'] = ', '.join(self.notification_settings['email']['recipients'])
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain'))
server = smtplib.SMTP(
self.notification_settings['email']['server'],
self.notification_settings['email']['port']
)
server.starttls()
server.login(
self.notification_settings['email']['username'],
self.notification_settings['email']['password']
)
server.send_message(msg)
server.quit()
self.logger.info(f"Уведомление отправлено: {subject}")
return True
except Exception as e:
self.logger.error(f"Ошибка при отправке уведомления: {e}")
return False
def get_current_metrics(self):
"""
Получает текущие метрики использования ресурсов
Returns:
dict: Метрики использования ресурсов
"""
# Получаем метрики CPU
cpu_percent = psutil.cpu_percent(interval=0.1)
cpu_times = psutil.cpu_times_percent(interval=0.1)
# Получаем метрики памяти
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
# Получаем метрики дисков
disk_usage = {}
for disk in psutil.disk_partitions():
try:
if os.path.exists(disk.mountpoint):
usage = psutil.disk_usage(disk.mountpoint)
disk_usage[disk.mountpoint] = {
'total': usage.total,
'used': usage.used,
'free': usage.free,
'percent': usage.percent
}
except Exception:
pass
# Получаем метрики сети
net_io_counters = psutil.net_io_counters()
# Собираем метрики в словарь
metrics = {
'timestamp': datetime.datetime.now().isoformat(),
'cpu': {
'total_percent': cpu_percent,
'user_percent': cpu_times.user,
'system_percent': cpu_times.system,
'idle_percent': cpu_times.idle
},
'memory': {
'total': memory.total,
'available': memory.available,
'used': memory.used,
'percent': memory.percent
},
'swap': {
'total': swap.total,
'used': swap.used,
'free': swap.free,
'percent': swap.percent
},
'disk': disk_usage,
'network': {
'bytes_sent': net_io_counters.bytes_sent,
'bytes_recv': net_io_counters.bytes_recv,
'packets_sent': net_io_counters.packets_sent,
'packets_recv': net_io_counters.packets_recv
}
}
return metrics
def check_thresholds(self, metrics):
"""
Проверяет превышение пороговых значений
Args:
metrics (dict): Метрики использования ресурсов
Returns:
list: Список превышенных пороговых значений
"""
alerts = []
# Проверяем CPU if metrics['cpu']['total_percent'] > self.thresholds['cpu_percent']:
alerts.append({
'resource': 'CPU',
'current': metrics['cpu']['total_percent'],
'threshold': self.thresholds['cpu_percent'],
'message': f"Загрузка CPU превысила порог: {metrics['cpu']['total_percent']}% (порог: {self.thresholds['cpu_percent']}%)"
})
# Проверяем память
if metrics['memory']['percent'] > self.thresholds['memory_percent']:
alerts.append({
'resource': 'Memory',
'current': metrics['memory']['percent'],
'threshold': self.thresholds['memory_percent'],
'message': f"Использование памяти превысило порог: {metrics['memory']['percent']}% (порог: {self.thresholds['memory_percent']}%)"
})
# Проверяем swap
if metrics['swap']['percent'] > self.thresholds['swap_percent']:
alerts.append({
'resource': 'Swap',
'current': metrics['swap']['percent'],
'threshold': self.thresholds['swap_percent'],
'message': f"Использование swap превысило порог: {metrics['swap']['percent']}% (порог: {self.thresholds['swap_percent']}%)"
})
# Проверяем диски
for mountpoint, usage in metrics['disk'].items():
if usage['percent'] > self.thresholds['disk_percent']:
alerts.append({
'resource': f"Disk ({mountpoint})",
'current': usage['percent'],
'threshold': self.thresholds['disk_percent'],
'message': f"Использование диска {mountpoint} превысило порог: {usage['percent']}% (порог: {self.thresholds['disk_percent']}%)"
})
return alerts
def handle_alerts(self, alerts):
"""
Обрабатывает оповещения о превышении пороговых значений
Args:
alerts (list): Список оповещений
"""
if not alerts:
return
# Логируем оповещения
for alert in alerts:
self.logger.warning(alert['message'])
# Отправляем уведомление по почте
if self.notification_settings['email']['enabled']:
subject = f"Системный мониторинг: {len(alerts)} предупреждений на {socket.gethostname()}"
message = "Отчет о превышении пороговых значений ресурсов:\n\n"
for alert in alerts:
message += f"- {alert['message']}\n"
message += f"\nВремя: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
message += f"\nСистема: {self.system_info['platform']}"
self.send_email_notification(subject, message)
def start_monitoring(self, duration=None):
"""
Запускает мониторинг ресурсов системы
Args:
duration (int): Продолжительность мониторинга в секундах (если None, то до явной остановки)
"""
self.logger.info(f"Начало мониторинга (интервал: {self.interval} сек)")
# Сбрасываем флаг остановки
self.stop_monitoring = False
# Очищаем данные мониторинга
self.monitoring_data = []
# Запоминаем время начала мониторинга
start_time = time.time()
last_net_io = psutil.net_io_counters()
last_time = start_time
try:
while not self.stop_monitoring:
current_time = time.time()
elapsed = current_time - start_time
# Проверяем, не истекла ли продолжительность мониторинга
if duration is not None and elapsed >= duration:
self.logger.info(f"Мониторинг завершен по истечении времени ({duration} сек)")
break
# Получаем текущие метрики
metrics = self.get_current_metrics()
# Расчет скорости сети
current_net_io = psutil.net_io_counters()
time_diff = current_time - last_time
bytes_sent_speed = (current_net_io.bytes_sent - last_net_io.bytes_sent) / time_diff
bytes_recv_speed = (current_net_io.bytes_recv - last_net_io.bytes_recv) / time_diff
# Преобразуем в Мбит/сек
mbits_sent_speed = bytes_sent_speed * 8 / 1_000_000
mbits_recv_speed = bytes_recv_speed * 8 / 1_000_000
# Добавляем скорость сети в метрики
metrics['network']['mbits_sent_speed'] = mbits_sent_speed
metrics['network']['mbits_recv_speed'] = mbits_recv_speed
metrics['network']['total_mbits_speed'] = mbits_sent_speed + mbits_recv_speed
# Обновляем переменные для следующей итерации
last_net_io = current_net_io
last_time = current_time
# Проверяем превышение пороговых значений
alerts = self.check_thresholds(metrics)
if alerts:
self.handle_alerts(alerts)
# Добавляем метрики в историю
self.monitoring_data.append(metrics)
# Выводим информацию в консоль
self.logger.info(f"CPU: {metrics['cpu']['total_percent']}%, Память: {metrics['memory']['percent']}%, Сеть: {metrics['network']['total_mbits_speed']:.2f} Мбит/с")
# Ждем до следующего интервала
time.sleep(self.interval)
except KeyboardInterrupt:
self.logger.info("Мониторинг остановлен пользователем")
except Exception as e:
self.logger.error(f"Ошибка при мониторинге: {e}")
finally:
self.stop_monitoring = True
def stop(self):
"""
Останавливает мониторинг
"""
self.stop_monitoring = True
self.logger.info("Запрошена остановка мониторинга")
def start_monitoring_thread(self, duration=None):
"""
Запускает мониторинг в отдельном потоке
Args:
duration (int): Продолжительность мониторинга в секундах
Returns:
threading.Thread: Поток мониторинга
"""
thread = threading.Thread(target=self.start_monitoring, args=(duration,))
thread.daemon = True
thread.start()
return thread
def save_monitoring_data(self, output_file, format='json'):
"""
Сохраняет данные мониторинга в файл
Args:
output_file (str): Путь к выходному файлу
format (str): Формат файла ('json' или 'csv')
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if not self.monitoring_data:
self.logger.warning("Нет данных для сохранения")
return False
try:
# Создаем директорию для выходного файла
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True)
if format.lower() == 'json':
# Сохраняем в формате JSON
with open(output_file, 'w', encoding='utf-8') as f:
json.dump({
'system_info': self.system_info,
'monitoring_data': self.monitoring_data
}, f, ensure_ascii=False, indent=2)
elif format.lower() == 'csv':
# Сохраняем в формате CSV
with open(output_file, 'w', encoding='utf-8', newline='') as f:
# Подготавливаем заголовки CSV
writer = csv.writer(f)
# Формируем заголовки на основе первой записи
headers = ['timestamp', 'cpu_total_percent', 'memory_percent',
'swap_percent', 'network_mbits_sent', 'network_mbits_recv']
# Добавляем заголовки дисков
for mountpoint in self.monitoring_data[0]['disk'].keys():
headers.append(f"disk_{mountpoint.replace('/', '_')}_percent")
writer.writerow(headers)
# Записываем данные
for data in self.monitoring_data:
row = [
data['timestamp'],
data['cpu']['total_percent'],
data['memory']['percent'],
data['swap']['percent'],
data['network'].get('mbits_sent_speed', 0),
data['network'].get('mbits_recv_speed', 0),
]
# Добавляем данные дисков
for mountpoint in headers[6:]:
mp = mountpoint[5:].replace('_', '/')
if mp in data['disk']:
row.append(data['disk'][mp]['percent'])
else:
row.append(None)
writer.writerow(row)
else:
self.logger.error(f"Неподдерживаемый формат: {format}")
return False
self.logger.info(f"Данные мониторинга сохранены в {output_file}")
return True
except Exception as e:
self.logger.error(f"Ошибка при сохранении данных мониторинга: {e}")
return False
def generate_report(self, output_file=None):
"""
Генерирует отчет о мониторинге
Args:
output_file (str): Путь к выходному файлу
Returns:
str: Текст отчета
"""
if not self.monitoring_data:
return "Нет данных для формирования отчета"
report = []
report.append("=== Отчет о мониторинге системы ===")
report.append(f"Дата создания: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append("")
# Информация о системе
report.append("== Информация о системе ==")
report.append(f"Хост: {self.system_info['hostname']}")
report.append(f"Платформа: {self.system_info['platform']}")
report.append(f"Процессор: {self.system_info['processor']}")
report.append(f"CPU ядер: {self.system_info['physical_cpu_count']} физических, {self.system_info['cpu_count']} логических")
report.append(f"ОЗУ: {self._format_bytes(self.system_info['total_memory'])}")
for mountpoint, total in self.system_info['total_disk'].items():
report.append(f"Диск {mountpoint}: {self._format_bytes(total)}")
report.append(f"Время запуска: {self.system_info['boot_time']}")
report.append("")
# Статистика мониторинга
report.append("== Статистика мониторинга ==")
# Период мониторинга
start_time = datetime.datetime.fromisoformat(self.monitoring_data[0]['timestamp'])
end_time = datetime.datetime.fromisoformat(self.monitoring_data[-1]['timestamp'])
duration = (end_time - start_time).total_seconds()
report.append(f"Период: {start_time.strftime('%Y-%m-%d %H:%M:%S')} - {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"Продолжительность: {self._format_duration(duration)}")
report.append(f"Количество измерений: {len(self.monitoring_data)}")
report.append(f"Интервал: {self.interval} сек")
report.append("")
# Статистика CPU
cpu_values = [data['cpu']['total_percent'] for data in self.monitoring_data]
report.append("CPU:")
report.append(f" Среднее: {sum(cpu_values) / len(cpu_values):.2f}%")
report.append(f" Максимум: {max(cpu_values):.2f}%")
report.append(f" Минимум: {min(cpu_values):.2f}%")
report.append("")
# Статистика памяти
memory_values = [data['memory']['percent'] for data in self.monitoring_data]
report.append("Память:")
report.append(f" Среднее: {sum(memory_values) / len(memory_values):.2f}%")
report.append(f" Максимум: {max(memory_values):.2f}%")
report.append(f" Минимум: {min(memory_values):.2f}%")
report.append("")
# Статистика swap
swap_values = [data['swap']['percent'] for data in self.monitoring_data]
report.append("Swap:")
report.append(f" Среднее: {sum(swap_values) / len(swap_values):.2f}%")
report.append(f" Максимум: {max(swap_values):.2f}%")
report.append(f" Минимум: {min(swap_values):.2f}%")
report.append("")
# Статистика дисков
report.append("Диски:")
for mountpoint in self.monitoring_data[0]['disk'].keys():
disk_values = [data['disk'][mountpoint]['percent'] for data in self.monitoring_data if mountpoint in data['disk']]
if disk_values:
report.append(f" {mountpoint}:")
report.append(f" Среднее: {sum(disk_values) / len(disk_values):.2f}%")
report.append(f" Максимум: {max(disk_values):.2f}%")
report.append(f" Минимум: {min(disk_values):.2f}%")
report.append("")
# Статистика сети
network_recv = [data['network'].get('mbits_recv_speed', 0) for data in self.monitoring_data if 'mbits_recv_speed' in data['network']]
network_sent = [data['network'].get('mbits_sent_speed', 0) for data in self.monitoring_data if 'mbits_sent_speed' in data['network']]
if network_recv and network_sent:
report.append("Сеть:")
report.append(f" Прием (Мбит/с):")
report.append(f" Среднее: {sum(network_recv) / len(network_recv):.2f}")
report.append(f" Максимум: {max(network_recv):.2f}")
report.append(f" Минимум: {min(network_recv):.2f}")
report.append(f" Передача (Мбит/с):")
report.append(f" Среднее: {sum(network_sent) / len(network_sent):.2f}")
report.append(f" Максимум: {max(network_sent):.2f}")
report.append(f" Минимум: {min(network_sent):.2f}")
report.append("")
# Пороговые значения
report.append("== Пороговые значения ==")
for resource, threshold in self.thresholds.items():
report.append(f"{resource}: {threshold}")
report_text = "\n".join(report)
# Сохраняем отчет в файл
if output_file:
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report_text)
self.logger.info(f"Отчет сохранен в файл: {output_file}")
except Exception as e:
self.logger.error(f"Ошибка при сохранении отчета: {e}")
return report_text
def generate_charts(self, output_dir):
"""
Генерирует графики на основе данных мониторинга
Args:
output_dir (str): Директория для сохранения графиков
Returns:
list: Список путей к созданным графикам
"""
if not self.monitoring_data:
self.logger.warning("Нет данных для создания графиков")
return []
try:
# Создаем директорию для графиков
os.makedirs(output_dir, exist_ok=True)
chart_files = []
timestamps = [datetime.datetime.fromisoformat(data['timestamp']) for data in self.monitoring_data]
# График CPU
plt.figure(figsize=(10, 6))
plt.plot(timestamps, [data['cpu']['total_percent'] for data in self.monitoring_data], label='Общая загрузка')
plt.plot(timestamps, [data['cpu']['user_percent'] for data in self.monitoring_data], label='Пользовательские процессы')
plt.plot(timestamps, [data['cpu']['system_percent'] for data in self.monitoring_data], label='Системные процессы')
plt.axhline(y=self.thresholds['cpu_percent'], color='r', linestyle='--', label=f"Порог ({self.thresholds['cpu_percent']}%)")
plt.title('Загрузка CPU')
plt.xlabel('Время')
plt.ylabel('Загрузка (%)')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
cpu_chart = os.path.join(output_dir, 'cpu_usage.png')
plt.savefig(cpu_chart)
plt.close()
chart_files.append(cpu_chart)
# График памяти
plt.figure(figsize=(10, 6))
plt.plot(timestamps, [data['memory']['percent'] for data in self.monitoring_data], label='ОЗУ')
plt.plot(timestamps, [data['swap']['percent'] for data in self.monitoring_data], label='Swap')
plt.axhline(y=self.thresholds['memory_percent'], color='r', linestyle='--', label=f"Порог ОЗУ ({self.thresholds['memory_percent']}%)")
plt.axhline(y=self.thresholds['swap_percent'], color='orange', linestyle='--', label=f"Порог Swap ({self.thresholds['swap_percent']}%)")
plt.title('Использование памяти')
plt.xlabel('Время')
plt.ylabel('Использование (%)')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
memory_chart = os.path.join(output_dir, 'memory_usage.png')
plt.savefig(memory_chart)
plt.close()
chart_files.append(memory_chart)
# График дисков
plt.figure(figsize=(10, 6))
for mountpoint in self.monitoring_data[0]['disk'].keys():
disk_values = []
for data in self.monitoring_data:
if mountpoint in data['disk']:
disk_values.append(data['disk'][mountpoint]['percent'])
else:
disk_values.append(None)
plt.plot(timestamps, disk_values, label=f"Диск {mountpoint}")
plt.axhline(y=self.thresholds['disk_percent'], color='r', linestyle='--', label=f"Порог ({self.thresholds['disk_percent']}%)")
plt.title('Использование дисков')
plt.xlabel('Время')
plt.ylabel('Использование (%)')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
disk_chart = os.path.join(output_dir, 'disk_usage.png')
plt.savefig(disk_chart)
plt.close()
chart_files.append(disk_chart)
# График сети
network_recv = []
network_sent = []
for data in self.monitoring_data:
if 'mbits_recv_speed' in data['network'] and 'mbits_sent_speed' in data['network']:
network_recv.append(data['network']['mbits_recv_speed'])
network_sent.append(data['network']['mbits_sent_speed'])
else:
network_recv.append(0)
network_sent.append(0)
plt.figure(figsize=(10, 6))
plt.plot(timestamps, network_recv, label='Прием')
plt.plot(timestamps, network_sent, label='Передача')
plt.title('Сетевой трафик')
plt.xlabel('Время')
plt.ylabel('Скорость (Мбит/с)')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
network_chart = os.path.join(output_dir, 'network_traffic.png')
plt.savefig(network_chart)
plt.close()
chart_files.append(network_chart)
self.logger.info(f"Графики сохранены в директорию: {output_dir}")
return chart_files
except Exception as e:
self.logger.error(f"Ошибка при создании графиков: {e}")
return []
def _format_bytes(self, bytes_value):
"""
Форматирует количество байт в человеко-читаемый формат
Args:
bytes_value (int): Количество байт
Returns:
str: Отформатированная строка
"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_value < 1024.0:
return f"{bytes_value:.2f} {unit}"
bytes_value /= 1024.0
return f"{bytes_value:.2f} PB"
def _format_duration(self, seconds):
"""
Форматирует продолжительность в человеко-читаемый формат
Args:
seconds (float): Продолжительность в секундах
Returns:
str: Отформатированная строка
"""
hours, remainder = divmod(seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if hours > 0:
return f"{int(hours)}ч {int(minutes)}м {int(seconds)}с"
elif minutes > 0:
return f"{int(minutes)}м {int(seconds)}с"
else:
return f"{int(seconds)}с"
def main():
parser = argparse.ArgumentParser(description='Мониторинг ресурсов системы')
parser.add_argument('--interval', '-i', type=int, default=5,
help='Интервал мониторинга в секундах (по умолчанию: 5)')
parser.add_argument('--duration', '-d', type=int,
help='Продолжительность мониторинга в секундах (по умолчанию: бесконечно)')
parser.add_argument('--log-file', '-l',
help='Путь к файлу логов')
parser.add_argument('--output', '-o',
help='Путь к выходному файлу с данными мониторинга')
parser.add_argument('--format', '-f', choices=['json', 'csv'], default='json',
help='Формат выходного файла (по умолчанию: json)')
parser.add_argument('--report', '-r',
help='Путь к файлу отчета')
parser.add_argument('--charts', '-c',
help='Директория для сохранения графиков')
# Параметры пороговых значений
parser.add_argument('--cpu-threshold', type=float, default=80.0,
help='Пороговое значение для CPU (%) (по умолчанию: 80.0)')
parser.add_argument('--memory-threshold', type=float, default=80.0,
help='Пороговое значение для памяти (%) (по умолчанию: 80.0)')
parser.add_argument('--disk-threshold', type=float, default=90.0,
help='Пороговое значение для дисков (%) (по умолчанию: 90.0)')
parser.add_argument('--swap-threshold', type=float, default=80.0,
help='Пороговое значение для swap (%) (по умолчанию: 80.0)')
parser.add_argument('--network-threshold', type=float, default=100.0,
help='Пороговое значение для сети (Мбит/с) (по умолчанию: 100.0)')
# Параметры уведомлений по почте
parser.add_argument('--email-server',
help='SMTP-сервер для отправки уведомлений')
parser.add_argument('--email-port', type=int, default=587,
help='Порт SMTP-сервера (по умолчанию: 587)')
parser.add_argument('--email-username',
help='Имя пользователя для SMTP-сервера')
parser.add_argument('--email-password',
help='Пароль для SMTP-сервера')
parser.add_argument('--email-sender',
help='Адрес отправителя')
parser.add_argument('--email-recipients', nargs='+',
help='Список адресов получателей')
args = parser.parse_args()
# Создаем монитор системы
thresholds = {
'cpu_percent': args.cpu_threshold,
'memory_percent': args.memory_threshold,
'disk_percent': args.disk_threshold,
'swap_percent': args.swap_threshold,
'network_mbps': args.network_threshold
}
monitor = SystemMonitor(args.interval, args.log_file, thresholds)
# Настраиваем уведомления по почте
if all([args.email_server, args.email_username, args.email_password, args.email_sender, args.email_recipients]):
monitor.set_email_notification(
args.email_server,
args.email_port,
args.email_username,
args.email_password,
args.email_sender,
args.email_recipients
)
# Запускаем мониторинг
print(f"Начало мониторинга с интервалом {args.interval} сек", end='')
if args.duration:
print(f", продолжительность {args.duration} сек", end='')
print()
monitor.start_monitoring(args.duration)
# Сохраняем данные мониторинга
if args.output:
monitor.save_monitoring_data(args.output, args.format)
# Генерируем отчет
if args.report:
report = monitor.generate_report(args.report)
print("\n" + report)
# Генерируем графики
if args.charts:
monitor.generate_charts(args.charts)
return 0
if __name__ == "__main__":
sys.exit(main())
Использование:
# Базовый мониторинг с интервалом 5 секунд в течение 1 минуты
python system_monitor.py -i 5 -d 60
# Мониторинг с сохранением данных и генерацией отчета
python system_monitor.py -i 10 -d 3600 -o monitoring_data.json -r report.txt
# Мониторинг с генерацией графиков
python system_monitor.py -i 5 -d 600 -c charts_dir
# Мониторинг с настройкой пороговых значений и уведомлением по почте
python system_monitor.py -i 30 --cpu-threshold 70 --memory-threshold 75 --email-server smtp.gmail.com --email-username user@gmail.com --email-password password --email-sender user@gmail.com --email-recipients admin@example.com
Задача 43: Модуль для автоматического создания резервных копий(раб)
Вот пример реализации модуля для автоматического резервного копирования файлов и директорий на Python:
import os
import shutil
import datetime
import time
import schedule
import logging
from pathlib import Path
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler("backup.log"), logging.StreamHandler()]
)
class BackupManager:
def __init__(self, source_paths, destination_path, backup_time="01:00",
retention_days=30, cloud_upload=False, cloud_config=None):
"""
Инициализация менеджера резервного копирования
:param source_paths: Список путей к файлам/директориям для резервного копирования
:param destination_path: Путь назначения для резервных копий
:param backup_time: Время для ежедневного резервного копирования (формат "ЧЧ:ММ")
:param retention_days: Количество дней для хранения резервных копий
:param cloud_upload: Флаг для загрузки в облако
:param cloud_config: Конфигурация для облачного хранилища (если используется)
"""
self.source_paths = source_paths
self.destination_path = Path(destination_path)
self.backup_time = backup_time
self.retention_days = retention_days
self.cloud_upload = cloud_upload
self.cloud_config = cloud_config
# Создаем директорию назначения, если она не существует
if not self.destination_path.exists():
self.destination_path.mkdir(parents=True)
logging.info(f"Инициализирован менеджер резервного копирования. "
f"Резервные копии будут создаваться ежедневно в {self.backup_time}")
def create_backup(self):
"""Создание резервной копии"""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_dir = self.destination_path / f"backup_{timestamp}"
try:
# Создаем директорию для текущей резервной копии
backup_dir.mkdir(parents=True)
# Копируем каждый исходный путь
for source_path in self.source_paths:
source = Path(source_path)
dest = backup_dir / source.name
if source.is_file():
shutil.copy2(source, dest)
logging.info(f"Файл {source} скопирован в {dest}")
elif source.is_dir():
shutil.copytree(source, dest)
logging.info(f"Директория {source} скопирована в {dest}")
else:
logging.warning(f"Источник {source} не найден")
# Архивируем резервную копию
archive_name = f"backup_{timestamp}"
shutil.make_archive(
str(self.destination_path / archive_name),
'zip',
root_dir=str(backup_dir)
)
logging.info(f"Создан архив {archive_name}.zip")
# Удаляем временную директорию
shutil.rmtree(backup_dir)
# Загружаем в облако, если необходимо
if self.cloud_upload and self.cloud_config:
self.upload_to_cloud(f"{archive_name}.zip")
# Очищаем старые резервные копии
self.cleanup_old_backups()
logging.info("Резервное копирование успешно завершено")
return True
except Exception as e:
logging.error(f"Ошибка при создании резервной копии: {e}")
return False
def upload_to_cloud(self, file_name):
"""Загрузка резервной копии в облако"""
try:
logging.info(f"Начинается загрузка {file_name} в облачное хранилище...")
# Здесь должен быть код для загрузки в облако в зависимости от настроек self.cloud_config
# Пример для AWS S3:
"""
import boto3
s3 = boto3.client(
's3',
aws_access_key_id=self.cloud_config['access_key'],
aws_secret_access_key=self.cloud_config['secret_key']
)
s3.upload_file(
str(self.destination_path / file_name),
self.cloud_config['bucket'],
file_name
)
"""
# Пример для Google Cloud Storage:
"""
from google.cloud import storage
client = storage.Client.from_service_account_json(self.cloud_config['credentials_file'])
bucket = client.get_bucket(self.cloud_config['bucket'])
blob = bucket.blob(file_name)
blob.upload_from_filename(str(self.destination_path / file_name))
"""
logging.info(f"Файл {file_name} успешно загружен в облачное хранилище")
except Exception as e:
logging.error(f"Ошибка при загрузке в облако: {e}")
def cleanup_old_backups(self):
"""Удаление устаревших резервных копий"""
try:
current_time = datetime.datetime.now()
cutoff_date = current_time - datetime.timedelta(days=self.retention_days)
for item in self.destination_path.glob("backup_*.zip"):
try:
# Извлекаем дату из имени файла
date_str = item.stem.split('_')[1] + '_' + item.stem.split('_')[2]
item_date = datetime.datetime.strptime(date_str, "%Y%m%d_%H%M%S")
# Если файл старше заданного срока хранения - удаляем
if item_date < cutoff_date:
os.remove(item)
logging.info(f"Удалена устаревшая резервная копия: {item}")
except Exception as e:
logging.warning(f"Ошибка при обработке файла {item}: {e}")
except Exception as e:
logging.error(f"Ошибка при очистке старых резервных копий: {e}")
def start_scheduler(self):
"""Запуск планировщика резервного копирования"""
schedule.every().day.at(self.backup_time).do(self.create_backup)
logging.info(f"Планировщик запущен. Резервное копирование будет выполняться ежедневно в {self.backup_time}")
while True:
schedule.run_pending()
time.sleep(60) # Проверяем расписание каждую минуту
# Пример использования:
if __name__ == "__main__":
# Пути к директориям/файлам для резервного копирования
sources = [
"/path/to/important/files",
"/path/to/documents",
"/path/to/project"
]
# Путь для сохранения резервных копий
destination = "/path/to/backup/location"
# Настройки для облачного хранилища (пример для AWS S3)
cloud_settings = {
'provider': 's3',
'access_key': 'YOUR_ACCESS_KEY',
'secret_key': 'YOUR_SECRET_KEY',
'bucket': 'your-backup-bucket'
}
# Создаем и запускаем менеджер резервного копирования
backup_manager = BackupManager(
source_paths=sources,
destination_path=destination,
backup_time="02:00", # Резервное копирование в 2 часа ночи
retention_days=14, # Хранить резервные копии 14 дней
cloud_upload=True,
cloud_config=cloud_settings
)
# Запуск планировщика (или можно вызвать backup_manager.create_backup() для однократного копирования)
backup_manager.start_scheduler()
Как использовать этот модуль
Базовое использование
sources = [
"C:/Users/username/Documents",
"C:/Users/username/Pictures",
"C:/important_data.txt"
]
destination = "D:/Backups"
backup_manager = BackupManager(
source_paths=sources,
destination_path=destination,
backup_time="03:00", # Резервное копирование в 3 часа ночи
retention_days=30 # Хранить копии в течение 30 дней
)
# Для запуска с планировщиком
backup_manager.start_scheduler()
# Или для немедленного однократного резервного копирования
# backup_manager.create_backup()
Загрузка в облако (AWS S3)
Для загрузки резервных копий в AWS S3, необходимо:
Установить библиотеку boto3:
pip install boto3
Настроить параметры облачного хранилища:
cloud_settings = {
'provider': 's3',
'access_key': 'YOUR_AWS_ACCESS_KEY',
'secret_key': 'YOUR_AWS_SECRET_KEY',
'bucket': 'your-backup-bucket'
}
backup_manager = BackupManager(
source_paths=sources,
destination_path=destination,
backup_time="02:00",
retention_days=14,
cloud_upload=True, # Включаем загрузку в облако
cloud_config=cloud_settings
)
Раскомментировать код в методе upload_to_cloud для выбранного облачного провайдера.
Рекомендации по улучшению
Ваш код очень хорош, но можно рассмотреть несколько улучшений:
Дополнительные проверки:
# В методе create_backup добавить проверку наличия исходных путей
for source_path in self.source_paths:
source = Path(source_path)
if not source.exists():
logging.warning(f"Исходный путь {source_path} не существует. Пропускаем.")
continue
Гибкое расписание:
def set_schedule(self, schedule_type='daily', time='01:00', days=None):
"""
Настройка расписания резервного копирования
:param schedule_type: Тип расписания ('daily', 'weekly', 'monthly')
:param time: Время запуска (формат "ЧЧ:ММ")
:param days: Дни для запуска (для 'weekly' - дни недели, для 'monthly' - числа месяца)
"""
if schedule_type == 'daily':
schedule.every().day.at(time).do(self.create_backup)
elif schedule_type == 'weekly' and days:
for day in days:
if day.lower() == 'monday':
schedule.every().monday.at(time).do(self.create_backup)
# ... (другие дни недели)
elif schedule_type == 'monthly' and days:
# Реализация ежемесячного бэкапа
pass
logging.info(f"Расписание настроено: {schedule_type}, время: {time}, дни: {days}")
Обработка ошибок при копировании:
# В методе create_backup улучшить обработку ошибок для каждого файла/директории
for source_path in self.source_paths:
try:
source = Path(source_path)
dest = backup_dir / source.name
if source.is_file():
shutil.copy2(source, dest)
logging.info(f"Файл {source} скопирован в {dest}")
elif source.is_dir():
shutil.copytree(source, dest)
logging.info(f"Директория {source} скопирована в {dest}")
else:
logging.warning(f"Источник {source} не найден")
except PermissionError:
logging.error(f"Ошибка доступа при копировании {source_path}")
except Exception as e:
logging.error(f"Ошибка при копировании {source_path}: {e}")
Мониторинг прогресса для больших файлов:
def copy_with_progress(self, source, destination):
"""Копирование с отображением прогресса для больших файлов"""
file_size = os.path.getsize(source)
copied = 0
with open(source, 'rb') as src, open(destination, 'wb') as dst:
while True:
buffer = src.read(8192)
if not buffer:
break
dst.write(buffer)
copied += len(buffer)
progress = (copied / file_size) * 100
if copied % (10 * 1024 * 1024) == 0: # Логируем каждые 10 МБ
logging.info(f"Копирование {source.name}: {progress:.1f}%")
logging.info(f"Копирование {source.name} завершено")
Проверка целостности резервных копий:
def verify_backup(self, archive_path):
"""Проверка целостности архива"""
import zipfile
try:
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
result = zip_ref.testzip()
if result is None:
logging.info(f"Архив {archive_path} прошел проверку целостности")
return True
else:
logging.error(f"Повреждённый файл в архиве {archive_path}: {result}")
return False
except Exception as e:
logging.error(f"Ошибка при проверке архива {archive_path}: {e}")
return False
Задача 44: Модуль для мониторинга работоспособности системы(вроде раб)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import psutil
import time
import platform
import logging
import smtplib
import socket
import requests
import threading
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import datetime
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler("system_monitor.log"), logging.StreamHandler()]
)
class SystemMonitor:
def __init__(self, config):
"""
Инициализация монитора системы
:param config: Словарь с конфигурацией монитора
- thresholds: Пороговые значения для метрик
- check_interval: Интервал проверки в секундах
- notification: Настройки уведомлений
- services: Список сервисов для проверки
"""
self.config = config
self.thresholds = config.get('thresholds', {})
self.check_interval = config.get('check_interval', 60)
self.notification_config = config.get('notification', {})
self.services = config.get('services', [])
self.alert_history = {} # История оповещений для предотвращения спама
self.alert_cooldown = config.get('alert_cooldown', 1800) # 30 минут по умолчанию
logging.info(f"Монитор системы инициализирован с интервалом проверки {self.check_interval} секунд")
def check_cpu_usage(self):
"""Проверка загрузки CPU"""
cpu_usage = psutil.cpu_percent(interval=1)
threshold = self.thresholds.get('cpu_usage', 90)
if cpu_usage > threshold:
message = f"Внимание! Высокая загрузка CPU: {cpu_usage}% (порог: {threshold}%)"
logging.warning(message)
return False, message
return True, f"Загрузка CPU в норме: {cpu_usage}%"
def check_memory_usage(self):
"""Проверка использования памяти"""
memory = psutil.virtual_memory()
memory_usage = memory.percent
threshold = self.thresholds.get('memory_usage', 85)
if memory_usage > threshold:
message = (f"Внимание! Высокое использование памяти: {memory_usage}% "
f"(порог: {threshold}%, свободно: {memory.available / (1024**3):.2f} ГБ)")
logging.warning(message)
return False, message
return True, f"Использование памяти в норме: {memory_usage}%"
def check_disk_usage(self):
"""Проверка использования дискового пространства"""
problems = []
all_ok = True
for partition in psutil.disk_partitions():
if platform.system() == 'Windows' and 'cdrom' in partition.opts:
continue # Пропускаем CD-ROM на Windows
try:
usage = psutil.disk_usage(partition.mountpoint)
threshold = self.thresholds.get('disk_usage', 90)
if usage.percent > threshold:
message = (f"Внимание! Высокое использование диска {partition.mountpoint}: "
f"{usage.percent}% (порог: {threshold}%, "
f"свободно: {usage.free / (1024**3):.2f} ГБ)")
problems.append(message)
logging.warning(message)
all_ok = False
except Exception as e:
logging.error(f"Ошибка при проверке диска {partition.mountpoint}: {e}")
if all_ok:
return True, "Использование дисков в норме"
else:
return False, "\n".join(problems)
def check_network_connectivity(self):
"""Проверка сетевого соединения"""
hosts = self.config.get('network_hosts', ['8.8.8.8', 'google.com'])
problems = []
all_ok = True
for host in hosts:
try:
# Пробуем сделать ping
if platform.system().lower() == 'windows':
response = os.system(f"ping -n 1 -w 1000 {host} > nul")
else:
response = os.system(f"ping -c 1 -W 1 {host} > /dev/null 2>&1")
if response != 0:
message = f"Внимание! Нет соединения с {host}"
problems.append(message)
logging.warning(message)
all_ok = False
except Exception as e:
logging.error(f"Ошибка при проверке сетевого соединения с {host}: {e}")
if all_ok:
return True, "Сетевое соединение в норме"
else:
return False, "\n".join(problems)
def check_services(self):
"""Проверка работоспособности сервисов"""
if not self.services:
return True, "Нет сервисов для проверки"
problems = []
all_ok = True
for service in self.services:
try:
if service.get('type') == 'http':
url = service.get('url')
timeout = service.get('timeout', 5)
expected_code = service.get('expected_code', 200)
response = requests.get(url, timeout=timeout)
if response.status_code != expected_code:
message = (f"Внимание! Сервис {url} вернул код {response.status_code} "
f"(ожидался: {expected_code})")
problems.append(message)
logging.warning(message)
all_ok = False
elif service.get('type') == 'port':
host = service.get('host')
port = service.get('port')
timeout = service.get('timeout', 5)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
if result != 0:
message = f"Внимание! Порт {port} на хосте {host} недоступен"
problems.append(message)
logging.warning(message)
all_ok = False
elif service.get('type') == 'process':
process_name = service.get('name')
found = False
for proc in psutil.process_iter(['name']):
if process_name.lower() in proc.info['name'].lower():
found = True
break
if not found:
message = f"Внимание! Процесс {process_name} не запущен"
problems.append(message)
logging.warning(message)
all_ok = False
except Exception as e:
message = f"Ошибка при проверке сервиса {service.get('name', 'неизвестный')}: {e}"
problems.append(message)
logging.error(message)
all_ok = False
if all_ok:
return True, "Все сервисы работают нормально"
else:
return False, "\n".join(problems)
def send_email_notification(self, subject, message):
"""Отправка уведомления по электронной почте"""
email_config = self.notification_config.get('email')
if not email_config:
logging.warning("Настройки email не заданы. Уведомление не отправлено.")
return False
try:
msg = MIMEMultipart()
msg['From'] = email_config.get('from')
msg['To'] = email_config.get('to')
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain'))
server = smtplib.SMTP(email_config.get('smtp_server'), email_config.get('smtp_port'))
server.starttls()
server.login(email_config.get('username'), email_config.get('password'))
server.send_message(msg)
server.quit()
logging.info(f"Email-уведомление отправлено: {subject}")
return True
except Exception as e:
logging.error(f"Ошибка при отправке email-уведомления: {e}")
return False
def send_sms_notification(self, message):
"""Отправка SMS-уведомления"""
sms_config = self.notification_config.get('sms')
if not sms_config:
logging.warning("Настройки SMS не заданы. Уведомление не отправлено.")
return False
try:
# Здесь должен быть код для отправки SMS через выбранный сервис
# Пример для Twilio:
"""
from twilio.rest import Client
client = Client(sms_config.get('account_sid'), sms_config.get('auth_token'))
client.messages.create(
body=message,
from_=sms_config.get('from_number'),
to=sms_config.get('to_number')
)
"""
logging.info("SMS-уведомление отправлено")
return True
except Exception as e:
logging.error(f"Ошибка при отправке SMS-уведомления: {e}")
return False
def send_notification(self, alert_id, subject, message):
"""Отправка уведомления через все настроенные каналы с учетом истории оповещений"""
current_time = time.time()
# Проверяем, не отправляли ли мы уже оповещение об этой проблеме недавно
if alert_id in self.alert_history:
last_notification_time = self.alert_history[alert_id]
if current_time - last_notification_time < self.alert_cooldown:
logging.info(f"Уведомление о {alert_id} пропущено (период охлаждения не истек)")
return
# Сохраняем время отправки уведомления
self.alert_history[alert_id] = current_time
# Формируем полное сообщение с датой и системной информацией
full_message = f"""
УВЕДОМЛЕНИЕ СИСТЕМНОГО МОНИТОРИНГА
--------------------------------------
Время: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Хост: {socket.gethostname()}
IP: {socket.gethostbyname(socket.gethostname())}
--------------------------------------
{message}
"""
# Отправляем уведомления через настроенные каналы
if self.notification_config.get('email'):
self.send_email_notification(subject, full_message)
if self.notification_config.get('sms'):
# Для SMS отправляем сокращенное сообщение
short_message = f"Проблема: {subject}"
self.send_sms_notification(short_message)
def run_check(self):
"""Выполнение всех проверок и отправка уведомлений при необходимости"""
logging.info("Запуск проверки системы...")
problems = []
# CPU
cpu_ok, cpu_message = self.check_cpu_usage()
if not cpu_ok:
problems.append(cpu_message)
self.send_notification('cpu_usage', "Высокая загрузка CPU", cpu_message)
# Память
mem_ok, mem_message = self.check_memory_usage()
if not mem_ok:
problems.append(mem_message)
self.send_notification('memory_usage', "Высокое использование памяти", mem_message)
# Диски
disk_ok, disk_message = self.check_disk_usage()
if not disk_ok:
problems.append(disk_message)
self.send_notification('disk_usage', "Проблема с дисковым пространством", disk_message)
# Сеть
net_ok, net_message = self.check_network_connectivity()
if not net_ok:
problems.append(net_message)
self.send_notification('network', "Проблема с сетевым соединением", net_message)
# Сервисы
service_ok, service_message = self.check_services()
if not service_ok:
problems.append(service_message)
self.send_notification('services', "Проблема с сервисами", service_message)
if problems:
logging.warning("Обнаружены проблемы в системе:")
for problem in problems:
logging.warning(f"- {problem}")
else:
logging.info("Система работает нормально")
def start_monitoring(self):
"""Запуск непрерывного мониторинга"""
logging.info(f"Запуск непрерывного мониторинга с интервалом {self.check_interval} секунд")
while True:
try:
self.run_check()
except Exception as e:
logging.error(f"Ошибка при выполнении проверки: {e}")
time.sleep(self.check_interval)
def start_monitoring_thread(self):
"""Запуск мониторинга в отдельном потоке"""
monitor_thread = threading.Thread(target=self.start_monitoring)
monitor_thread.daemon = True
monitor_thread.start()
return monitor_thread
def main():
"""Основная функция для запуска мониторинга"""
# Конфигурация мониторинга
monitor_config = {
'thresholds': {
'cpu_usage': 80, # Порог использования CPU (%)
'memory_usage': 85, # Порог использования памяти (%)
'disk_usage': 90, # Порог использования диска (%)
},
'check_interval': 300, # Интервал проверки (сек)
'alert_cooldown': 1800, # Период охлаждения для повторных уведомлений (сек)
'network_hosts': ['8.8.8.8', 'google.com'], # Хосты для проверки сети
'services': [
{
'type': 'http',
'name': 'Google Website',
'url': 'https://www.google.com',
'expected_code': 200,
'timeout': 5
},
# Мониторинг стандартных процессов Windows
{
'type': 'process',
'name': 'explorer' # Windows Explorer
},
{
'type': 'process',
'name': 'svchost' # Хост службы Windows
}
],
'notification': {
# Настройки для Email-уведомлений
'email': {
'from': 'monitor@example.com',
'to': 'admin@example.com',
'smtp_server': 'smtp.example.com',
'smtp_port': 587,
'username': 'monitor@example.com',
'password': 'your_password'
},
# Настройки для SMS-уведомлений
'sms': {
'provider': 'twilio',
'account_sid': 'your_account_sid',
'auth_token': 'your_auth_token',
'from_number': '+1234567890',
'to_number': '+1234567890'
}
}
}
# Создаем и запускаем монитор
monitor = SystemMonitor(monitor_config)
try:
# Запускаем мониторинг с возможностью прерывания
print("Мониторинг запущен. Нажмите Ctrl+C для остановки.")
monitor.start_monitoring() # Блокирующий вызов
except KeyboardInterrupt:
print("\nМониторинг остановлен пользователем.")
# Точка входа в программу
if __name__ == "__main__":
main()
Инструкция по использованию
Установка зависимостей
Перед запуском модуля необходимо установить следующие пакеты:
pip install psutil requests
Для отправки SMS через Twilio (опционально):
pip install twilio
Настройка
Пороговые значения для оповещений:
cpu_usage: порог загрузки CPU (по умолчанию 80%)memory_usage: порог использования памяти (по умолчанию 85%)disk_usage: порог использования дисков (по умолчанию 90%)
Интервал проверки:
check_interval: время между проверками в секундах (по умолчанию 300)
Настройка уведомлений:
- Для Email-уведомлений укажите SMTP-сервер, учетные данные и адреса
- Для SMS-уведомлений укажите параметры вашего SMS-провайдера
Проверяемые сервисы:
- Список проверяемых веб-сервисов, портов и процессов
Запуск
python system_monitor.py
Режимы работы
Обычный режим:
monitor = SystemMonitor(monitor_config)
monitor.start_monitoring()
Фоновый режим (для запуска в отдельном потоке):
monitor = SystemMonitor(monitor_config)
thread = monitor.start_monitoring_thread()
# Продолжение выполнения основной программы
Примеры настройки
Мониторинг сервера базы данных
'services': [
{
'type': 'port',
'name': 'MySQL',
'host': 'localhost',
'port': 3306,
'timeout': 3
},
{
'type': 'process',
'name': 'mysqld'
}
]
Мониторинг веб-сервера
'services': [
{
'type': 'http',
'name': 'Web Server',
'url': 'http://localhost',
'expected_code': 200,
'timeout': 5
},
{
'type': 'process',
'name': 'apache2' # или nginx
}
]
Настройка Email-уведомлений через Gmail
'email': {
'from': 'your.email@gmail.com',
'to': 'admin@example.com',
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'username': 'your.email@gmail.com',
'password': 'your_app_password' # Пароль приложения для Gmail
}
Задача 45: Игра "Угадай число" с графическим интерфейсом(раб)
Вот пример реализации игры "Угадай число" с графическим интерфейсом на Python с использованием библиотеки tkinter:
import tkinter as tk
import random
from tkinter import messagebox, font
class GuessNumberGame:
def __init__(self, root):
"""
Инициализация игры "Угадай число"
:param root: Корневой элемент tkinter
"""
self.root = root
self.root.title("Угадай число")
self.root.geometry("400x300")
self.root.resizable(False, False)
self.root.configure(bg="#f0f0f0")
# Загадываем число
self.secret_number = random.randint(1, 100)
self.attempts = 0
# Создаем элементы интерфейса
self.create_widgets()
def create_widgets(self):
"""Создание элементов графического интерфейса"""
# Заголовок
title_font = font.Font(family="Arial", size=16, weight="bold")
title = tk.Label(
self.root,
text="Угадай число от 1 до 100",
font=title_font,
bg="#f0f0f0",
fg="#333333"
)
title.pack(pady=20)
# Рамка для элементов ввода
input_frame = tk.Frame(self.root, bg="#f0f0f0")
input_frame.pack(pady=10)
# Поле для ввода числа
self.guess_var = tk.StringVar()
entry_font = font.Font(family="Arial", size=12)
self.entry = tk.Entry(
input_frame,
textvariable=self.guess_var,
font=entry_font,
width=10,
justify='center',
bd=2,
relief=tk.GROOVE
)
self.entry.pack(side=tk.LEFT, padx=5)
self.entry.focus_set() # Устанавливаем фокус на поле ввода
# Кнопка "Проверить"
button_font = font.Font(family="Arial", size=12)
self.check_button = tk.Button(
input_frame,
text="Проверить",
font=button_font,
command=self.check_guess,
bg="#4CAF50",
fg="white",
activebackground="#45a049",
activeforeground="white",
relief=tk.RAISED,
bd=2,
padx=10
)
self.check_button.pack(side=tk.LEFT, padx=5)
# Привязываем нажатие Enter к проверке числа
self.root.bind('<Return>', lambda event: self.check_guess())
# Область для отображения результатов
result_font = font.Font(family="Arial", size=12)
self.result_label = tk.Label(
self.root,
text="Введите число и нажмите 'Проверить'",
font=result_font,
bg="#f0f0f0",
fg="#555555",
height=2
)
self.result_label.pack(pady=10)
# Область для отображения истории попыток
history_frame = tk.Frame(self.root, bg="#f0f0f0")
history_frame.pack(fill=tk.BOTH, expand=True, padx=20, pady=10)
history_label = tk.Label(
history_frame,
text="История попыток:",
font=font.Font(family="Arial", size=10, weight="bold"),
bg="#f0f0f0",
fg="#333333",
anchor='w'
)
history_label.pack(anchor='w')
self.history_text = tk.Text(
history_frame,
font=font.Font(family="Arial", size=10),
height=5,
width=40,
bg="#ffffff",
relief=tk.SUNKEN,
bd=1
)
self.history_text.pack(fill=tk.BOTH, expand=True)
self.history_text.config(state=tk.DISABLED) # Делаем поле только для чтения
# Кнопка "Новая игра" self.new_game_button = tk.Button(
self.root,
text="Новая игра",
font=button_font,
command=self.new_game,
bg="#2196F3",
fg="white",
activebackground="#0b7dda",
activeforeground="white",
relief=tk.RAISED,
bd=2,
padx=10
)
self.new_game_button.pack(pady=10)
def check_guess(self):
"""Проверка введенного числа"""
# Получаем введенное значение
guess_str = self.guess_var.get().strip()
# Проверяем, что введено число
if not guess_str.isdigit():
self.result_label.config(
text="Пожалуйста, введите целое число!",
fg="#FF0000"
)
return
# Преобразуем в целое число
guess = int(guess_str)
# Проверяем диапазон числа
if guess < 1 or guess > 100:
self.result_label.config(
text="Число должно быть от 1 до 100!",
fg="#FF0000"
)
return
# Увеличиваем счетчик попыток
self.attempts += 1
# Добавляем в историю
self.history_text.config(state=tk.NORMAL)
self.history_text.insert(
tk.END,
f"Попытка {self.attempts}: {guess} - "
)
# Проверяем результат
if guess < self.secret_number:
self.result_label.config(
text=f"Загаданное число больше {guess}",
fg="#FF8C00"
)
self.history_text.insert(tk.END, "Больше\n")
elif guess > self.secret_number:
self.result_label.config(
text=f"Загаданное число меньше {guess}",
fg="#FF8C00"
)
self.history_text.insert(tk.END, "Меньше\n")
else:
self.result_label.config(
text=f"Поздравляем! Вы угадали число {self.secret_number}!",
fg="#008000"
)
self.history_text.insert(tk.END, "Угадано!\n")
messagebox.showinfo(
"Победа!",
f"Вы угадали число {self.secret_number} за {self.attempts} попыток!"
)
# Блокируем ввод после победы
self.entry.config(state=tk.DISABLED)
self.check_button.config(state=tk.DISABLED)
self.history_text.config(state=tk.DISABLED)
self.history_text.see(tk.END) # Прокручиваем историю вниз
# Очищаем поле ввода
self.guess_var.set("")
self.entry.focus_set()
def new_game(self):
"""Начать новую игру"""
# Загадываем новое число
self.secret_number = random.randint(1, 100)
self.attempts = 0
# Сбрасываем интерфейс
self.guess_var.set("")
self.result_label.config(
text="Введите число и нажмите 'Проверить'",
fg="#555555"
)
# Очищаем историю
self.history_text.config(state=tk.NORMAL)
self.history_text.delete(1.0, tk.END)
self.history_text.config(state=tk.DISABLED)
# Включаем элементы управления
self.entry.config(state=tk.NORMAL)
self.check_button.config(state=tk.NORMAL)
self.entry.focus_set()
# Запуск приложения
if __name__ == "__main__":
root = tk.Tk()
app = GuessNumberGame(root)
root.mainloop()
Задача 46: GUI-клиент для простого чата(раб)
Вот пример реализации GUI-клиента для простого чата с использованием библиотеки tkinter:
import tkinter as tk
from tkinter import scrolledtext, messagebox, font
import time
import random
from datetime import datetime
class ChatClient:
def __init__(self, root):
"""
Инициализация простого чат-клиента
:param root: Корневой элемент tkinter
"""
self.root = root
self.root.title("Простой чат-клиент")
self.root.geometry("600x500")
self.root.minsize(500, 400)
# Для имитации общения (без серверной части)
self.user_name = "Пользователь"
self.bot_name = "Чат-бот"
self.bot_responses = [
"Интересная мысль!",
"Я согласен с вами.",
"Хммм, нужно подумать об этом.",
"Расскажите подробнее, пожалуйста.",
"А что вы думаете по этому поводу?",
"Правда? Почему вы так считаете?",
"Мне кажется, это отличная идея!",
"Я бы хотел узнать больше об этом.",
"Спасибо за сообщение!",
"Давайте обсудим это подробнее."
]
# Создаем интерфейс
self.create_widgets()
# Приветственное сообщение
self.add_message(f"{self.bot_name}", "Привет! Я простой чат-бот. Напиши мне что-нибудь!")
def create_widgets(self):
"""Создание элементов графического интерфейса"""
# Основной шрифт
main_font = font.Font(family="Arial", size=10)
# Создаем основной фрейм с отступами
main_frame = tk.Frame(self.root, padx=10, pady=10)
main_frame.pack(fill=tk.BOTH, expand=True)
# Разделяем окно на верхнюю (история чата) и нижнюю (ввод) части
self.root.grid_rowconfigure(0, weight=1)
self.root.grid_columnconfigure(0, weight=1)
# Создаем верхнюю часть - история сообщений
chat_frame = tk.Frame(main_frame)
chat_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
# Область вывода сообщений с прокруткой
self.chat_display = scrolledtext.ScrolledText(
chat_frame,
wrap=tk.WORD,
font=main_font,
bg="#ffffff",
relief=tk.SUNKEN,
bd=1
)
self.chat_display.pack(fill=tk.BOTH, expand=True)
self.chat_display.config(state=tk.DISABLED) # Только для чтения
# Создаем нижнюю часть - поле ввода и кнопка отправки
input_frame = tk.Frame(main_frame)
input_frame.pack(fill=tk.X, pady=(0, 5))
# Поле ввода сообщения
self.message_var = tk.StringVar()
self.message_entry = tk.Entry(
input_frame,
textvariable=self.message_var,
font=main_font,
relief=tk.SUNKEN,
bd=1
)
self.message_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 5))
self.message_entry.focus_set() # Устанавливаем фокус на поле ввода
# Привязываем нажатие Enter к отправке сообщения
self.message_entry.bind('<Return>', lambda event: self.send_message())
# Кнопка отправки
self.send_button = tk.Button(
input_frame,
text="Отправить",
font=main_font,
command=self.send_message,
bg="#4CAF50",
fg="white",
activebackground="#45a049",
activeforeground="white",
relief=tk.RAISED,
bd=1,
padx=10
)
self.send_button.pack(side=tk.RIGHT)
# Нижняя панель с дополнительными элементами управления
control_frame = tk.Frame(main_frame)
control_frame.pack(fill=tk.X)
# Кнопка очистки чата
self.clear_button = tk.Button(
control_frame,
text="Очистить чат",
font=font.Font(family="Arial", size=9),
command=self.clear_chat,
bg="#f0f0f0",
relief=tk.GROOVE,
bd=1
)
self.clear_button.pack(side=tk.LEFT)
# Метка статуса
self.status_label = tk.Label(
control_frame,
text="Статус: онлайн",
font=font.Font(family="Arial", size=9),
fg="#008000"
)
self.status_label.pack(side=tk.RIGHT)
# Настройка стилей для отображения сообщений
self.chat_display.tag_config('user', foreground="#003366", font=font.Font(family="Arial", size=10, weight="bold"))
self.chat_display.tag_config('bot', foreground="#660033", font=font.Font(family="Arial", size=10, weight="bold"))
self.chat_display.tag_config('time', foreground="#999999", font=font.Font(family="Arial", size=8))
self.chat_display.tag_config('message', font=font.Font(family="Arial", size=10))
def send_message(self):
"""Отправка сообщения"""
# Получаем текст сообщения
message = self.message_var.get().strip()
# Проверяем, что сообщение не пустое
if not message:
return
# Добавляем сообщение в чат
self.add_message(self.user_name, message)
# Очищаем поле ввода
self.message_var.set("")
# Имитируем ответ бота (без серверной части)
self.root.after(random.randint(500, 2000), self.simulate_bot_response)
def add_message(self, sender, message):
"""Добавление сообщения в чат"""
# Текущее время
current_time = datetime.now().strftime("%H:%M")
# Разблокируем виджет для добавления текста
self.chat_display.config(state=tk.NORMAL)
# Вставляем сообщение с форматированием
if self.chat_display.index('end-1c') != '1.0': # Если не первое сообщение, добавляем перевод строки
self.chat_display.insert(tk.END, '\n\n')
# Вставляем имя отправителя и время
sender_tag = 'user' if sender == self.user_name else 'bot'
self.chat_display.insert(tk.END, f"{sender} ", sender_tag)
self.chat_display.insert(tk.END, f"({current_time})", 'time')
self.chat_display.insert(tk.END, f":\n{message}", 'message')
# Прокручиваем до последнего сообщения
self.chat_display.see(tk.END)
# Снова блокируем виджет
self.chat_display.config(state=tk.DISABLED)
def simulate_bot_response(self):
"""Имитация ответа бота (для демонстрации без серверной части)"""
# Выбираем случайный ответ
response = random.choice(self.bot_responses)
# Добавляем сообщение в чат
self.add_message(self.bot_name, response)
def clear_chat(self):
"""Очистка истории чата"""
# Спрашиваем подтверждение
if messagebox.askyesno("Очистить чат", "Вы уверены, что хотите очистить историю чата?"):
# Очищаем историю
self.chat_display.config(state=tk.NORMAL)
self.chat_display.delete(1.0, tk.END)
self.chat_display.config(state=tk.DISABLED)
# Добавляем информационное сообщение
self.add_message(self.bot_name, "История чата очищена. Начнем сначала!")
# Запуск приложения
if __name__ == "__main__":
root = tk.Tk()
app = ChatClient(root)
root.mainloop()
Дополнительные улучшения GUI-клиента для чата
Для более реалистичного чат-клиента можно добавить еще некоторые функции, например:
# Добавьте эти методы в класс ChatClient
def change_username(self):
"""Изменение имени пользователя"""
new_name = tk.simpledialog.askstring("Изменить имя", "Введите новое имя пользователя:",
initialvalue=self.user_name)
if new_name and new_name.strip():
old_name = self.user_name
self.user_name = new_name.strip()
self.add_message(self.bot_name, f"Имя пользователя изменено с '{old_name}' на '{self.user_name}'")
def save_chat_history(self):
"""Сохранение истории чата в файл"""
filename = tk.filedialog.asksaveasfilename(
defaultextension=".txt",
filetypes=[("Text files", "*.txt"), ("All files", "*.*")],
title="Сохранить историю чата"
)
if filename:
try:
with open(filename, 'w', encoding='utf-8') as file:
file.write(self.chat_display.get(1.0, tk.END))
messagebox.showinfo("Сохранение", "История чата успешно сохранена!")
except Exception as e:
messagebox.showerror("Ошибка", f"Не удалось сохранить историю чата: {e}")
def show_emojis(self):
"""Показать панель эмодзи"""
emojis = ["😊", "😂", "😍", "🤔", "👍", "👎", "❤️", "👋", "🎉", "🔥"]
emoji_window = tk.Toplevel(self.root)
emoji_window.title("Эмодзи")
emoji_window.geometry("250x100")
emoji_window.resizable(False, False)
emoji_frame = tk.Frame(emoji_window)
emoji_frame.pack(padx=10, pady=10)
for emoji in emojis:
btn = tk.Button(
emoji_frame,
text=emoji,
font=font.Font(size=14),
command=lambda e=emoji: self.insert_emoji(e)
)
btn.pack(side=tk.LEFT, padx=2)
def insert_emoji(self, emoji):
"""Вставить эмодзи в поле ввода"""
current_text = self.message_var.get()
self.message_var.set(current_text + emoji)
self.message_entry.focus_set()
def toggle_dark_mode(self):
"""Переключение темной/светлой темы"""
if hasattr(self, 'dark_mode') and self.dark_mode:
# Переключение на светлую тему
self.dark_mode = False
self.root.configure(bg="#f0f0f0")
self.chat_display.configure(bg="#ffffff", fg="#000000")
self.message_entry.configure(bg="#ffffff", fg="#000000")
# ... (настройка цветов для других виджетов)
else:
# Переключение на темную тему
self.dark_mode = True
self.root.configure(bg="#2d2d2d")
self.chat_display.configure(bg="#3d3d3d", fg="#ffffff")
self.message_entry.configure(bg="#2d2d2d", fg="#ffffff")
# ... (настройка цветов для других виджетов)
Комментарии
Отправить комментарий