Практиическая часть ПМ 0.1_
1. Модуль для шифрования/дешифрования текста алгоритмом Цезаря(работает
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
def caesar_cipher(text, shift, action='encrypt'):
"""
Шифрует или дешифрует текст с помощью алгоритма Цезаря.
Поддерживает как латинский, так и русский алфавиты.
Args:
text (str): Исходный текст
shift (int): Сдвиг символов
action (str): 'encrypt' для шифрования, 'decrypt' для дешифрования
Returns:
str: Зашифрованный или расшифрованный текст
"""
# Определение алфавитов
latin_lower = 'abcdefghijklmnopqrstuvwxyz'
latin_upper = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
russian_lower = 'абвгдеёжзийклмнопрстуфхцчшщъыьэюя'
russian_upper = 'АБВГДЕЁЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯ'
result = ""
# Если действие - дешифрование, инвертируем сдвиг
if action == 'decrypt':
shift = -shift
for char in text:
if char in latin_lower:
# Шифрование строчных латинских букв
index = latin_lower.find(char)
new_index = (index + shift) % len(latin_lower)
result += latin_lower[new_index]
elif char in latin_upper:
# Шифрование заглавных латинских букв
index = latin_upper.find(char)
new_index = (index + shift) % len(latin_upper)
result += latin_upper[new_index]
elif char in russian_lower:
# Шифрование строчных русских букв
index = russian_lower.find(char)
new_index = (index + shift) % len(russian_lower)
result += russian_lower[new_index]
elif char in russian_upper:
# Шифрование заглавных русских букв
index = russian_upper.find(char)
new_index = (index + shift) % len(russian_upper)
result += russian_upper[new_index]
else:
# Не алфавитные символы оставляем без изменений
result += char
return result
def main():
parser = argparse.ArgumentParser(description='Шифрование/дешифрование текста методом Цезаря')
parser.add_argument('action', choices=['encrypt', 'decrypt'], help='Действие: шифрование или дешифрование')
parser.add_argument('text', help='Текст для обработки')
parser.add_argument('shift', type=int, help='Величина сдвига (целое число)')
args = parser.parse_args()
result = caesar_cipher(args.text, args.shift, args.action)
print(f"Результат: {result}")
if __name__ == "__main__":
main()Использование:
python caesar.py encrypt "привет мир" 3
python caesar.py decrypt "тулзиш пlu" 3
2. Скрипт для преобразования CSV в JSON с группировкой по авторам(tcnm ghj
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import csv
import json
import argparse
from collections import defaultdict
def csv_to_json_by_author(csv_file, json_file):
"""
Преобразует CSV-файл с данными о книгах в JSON с группировкой по авторам
Args:
csv_file (str): Путь к исходному CSV-файлу
json_file (str): Путь к выходному JSON-файлу
"""
# Словарь для группировки книг по авторам
books_by_author = defaultdict(list)
# Чтение CSV файла
with open(csv_file, 'r', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
# Добавляем книгу в список книг автора
author = row['author']
book_info = {
'title': row['title'],
'year': int(row['year']) if row['year'].isdigit() else row['year']
}
books_by_author[author].append(book_info)
# Преобразуем defaultdict в обычный словарь для JSON
result = dict(books_by_author)
# Записываем результат в JSON файл
with open(json_file, 'w', encoding='utf-8') as file:
json.dump(result, file, ensure_ascii=False, indent=4)
print(f"Данные успешно преобразованы и сохранены в {json_file}")
def main():
parser = argparse.ArgumentParser(description='Преобразование CSV-файла с книгами в JSON с группировкой по авторам')
parser.add_argument('csv_file', help='Путь к исходному CSV-файлу')
parser.add_argument('json_file', help='Путь к выходному JSON-файлу')
args = parser.parse_args()
csv_to_json_by_author(args.csv_file, args.json_file)
if __name__ == "__main__":
main()
Использование:
python csv_to_json.py books.csv books_by_author.json
3. Функция для поиска дубликатов файлов(раб)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import hashlib
from collections import defaultdict
def calculate_file_hash(filepath):
"""
Вычисляет MD5-хеш содержимого файла
Args:
filepath (str): Путь к файлу
Returns:
str: MD5-хеш файла
"""
hash_md5 = hashlib.md5()
with open(filepath, "rb") as f:
# Читаем файл блоками для экономии памяти
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def find_duplicate_files(dir_path):
"""
Рекурсивно ищет в директории файлы с одинаковым содержимым
Args:
dir_path (str): Путь к директории для поиска
Returns:
list: Список списков путей к дубликатам файлов
"""
# Словарь для хранения хешей файлов
files_by_hash = defaultdict(list)
# Рекурсивно обходим директорию
for root, _, files in os.walk(dir_path):
for filename in files:
filepath = os.path.join(root, filename)
try:
# Получаем хеш файла и добавляем путь в соответствующий список
file_hash = calculate_file_hash(filepath)
files_by_hash[file_hash].append(filepath)
except (IOError, OSError) as e:
print(f"Ошибка при обработке файла {filepath}: {e}")
# Отбираем только группы с более чем одним файлом (дубликаты)
duplicate_groups = [file_list for file_list in files_by_hash.values() if len(file_list) > 1]
return duplicate_groups
# Пример использования
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Использование: python find_duplicates.py <путь_к_директории>")
sys.exit(1)
dir_path = sys.argv[1]
duplicates = find_duplicate_files(dir_path)
if not duplicates:
print("Дубликатов не найдено")
else:
print(f"Найдено {len(duplicates)} групп дубликатов:")
for i, group in enumerate(duplicates, 1):
print(f"\nГруппа {i}:")
for filepath in group:
print(f" {filepath}")
Использование:
python find_duplicates.py /путь/к/директории
4. Скрипт для логирования изменений в директории(раб)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import time
import argparse
from datetime import datetime
def scan_directory(dir_path):
"""
Сканирует директорию и возвращает словарь с информацией о файлах
Args:
dir_path (str): Путь к директории
Returns:
dict: Словарь {путь_к_файлу: (размер, время_модификации)}
"""
files_info = {}
for root, _, files in os.walk(dir_path):
for filename in files:
filepath = os.path.join(root, filename)
try:
# Получаем информацию о файле
stat_info = os.stat(filepath)
files_info[filepath] = (stat_info.st_size, stat_info.st_mtime)
except (IOError, OSError) as e:
print(f"Ошибка при обработке файла {filepath}: {e}")
return files_info
def log_changes(dir_path, log_file, interval=60):
"""
Отслеживает изменения в директории и логирует их
Args:
dir_path (str): Путь к отслеживаемой директории
log_file (str): Путь к файлу логов
interval (int): Интервал проверки в секундах
"""
print(f"Начато отслеживание изменений в директории: {dir_path}")
print(f"Логи записываются в файл: {log_file}")
print(f"Интервал проверки: {interval} секунд")
print("Нажмите Ctrl+C для остановки...")
# Первоначальное сканирование
prev_files_info = scan_directory(dir_path)
try:
while True:
time.sleep(interval)
# Новое сканирование
current_files_info = scan_directory(dir_path)
# Определяем изменения
new_files = []
deleted_files = []
modified_files = []
# Новые и измененные файлы
for filepath, (size, mtime) in current_files_info.items():
if filepath not in prev_files_info:
new_files.append(filepath)
elif prev_files_info[filepath] != (size, mtime):
modified_files.append(filepath)
# Удаленные файлы
for filepath in prev_files_info:
if filepath not in current_files_info:
deleted_files.append(filepath)
# Если есть изменения, записываем их в лог
if new_files or deleted_files or modified_files:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(log_file, 'a', encoding='utf-8') as f:
f.write(f"\n=== Изменения обнаружены в {timestamp} ===\n")
if new_files:
f.write("\nНовые файлы:\n")
for filepath in new_files:
f.write(f"+ {filepath}\n")
if modified_files:
f.write("\nИзмененные файлы:\n")
for filepath in modified_files:
f.write(f"* {filepath}\n")
if deleted_files:
f.write("\nУдаленные файлы:\n")
for filepath in deleted_files:
f.write(f"- {filepath}\n")
print(f"Изменения записаны в лог ({timestamp})")
# Обновляем предыдущее состояние
prev_files_info = current_files_info
except KeyboardInterrupt:
print("\nОтслеживание остановлено")
def main():
parser = argparse.ArgumentParser(description='Отслеживание изменений в директории')
parser.add_argument('dir_path', help='Путь к отслеживаемой директории')
parser.add_argument('--log-file', default='log.txt', help='Путь к файлу логов (по умолчанию: log.txt)')
parser.add_argument('--interval', type=int, default=60, help='Интервал проверки в секундах (по умолчанию: 60)')
args = parser.parse_args()
log_changes(args.dir_path, args.log_file, args.interval)
if __name__ == "__main__":
main()
Использование:
python monitor_directory.py /путь/к/директории --interval 30
5. Модуль для шифрования/дешифрования алгоритмом Цезаря
Эта задача дублирует задачу №1, поэтому код будет тем же самым.
6. Скрипт для поиска файлов по расширению(раб)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import argparse
def find_files_by_extension(directory, extension):
"""
Находит все файлы с заданным расширением в указанной директории и поддиректориях
Args:
directory (str): Путь к директории для поиска
extension (str): Расширение файла (например, '.txt')
Returns:
list: Список кортежей (путь_к_файлу, размер_в_МБ)
"""
if not extension.startswith('.'):
extension = '.' + extension
found_files = []
for root, _, files in os.walk(directory):
for filename in files:
if filename.endswith(extension):
filepath = os.path.join(root, filename)
try:
# Получаем размер файла в МБ
size_mb = os.path.getsize(filepath) / (1024 * 1024)
found_files.append((filepath, size_mb))
except (IOError, OSError) as e:
print(f"Ошибка при обработке файла {filepath}: {e}")
return found_files
def main():
parser = argparse.ArgumentParser(description='Поиск файлов по расширению')
parser.add_argument('directory', help='Директория для поиска')
parser.add_argument('extension', help='Расширение файла (например, txt)')
parser.add_argument('--sort', choices=['name', 'size'], default='name',
help='Сортировка результатов (по умолчанию: name)')
args = parser.parse_args()
# Находим файлы
found_files = find_files_by_extension(args.directory, args.extension)
# Сортируем результаты
if args.sort == 'name':
found_files.sort(key=lambda x: x[0])
else: # sort by size
found_files.sort(key=lambda x: x[1], reverse=True)
# Выводим результаты
if not found_files:
print(f"Файлы с расширением '{args.extension}' не найдены в директории {args.directory}")
else:
print(f"Найдено {len(found_files)} файлов с расширением '{args.extension}':")
for filepath, size_mb in found_files:
print(f"{filepath} - {size_mb:.2f} МБ")
if __name__ == "__main__":
main()
Использование:
python find_by_extension.py /путь/к/директории txt --sort size
7. Функция для объединения JSON-файлов
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import json
import argparse
from collections import defaultdict
def merge_json_files(json_dir, output_file):
"""
Объединяет все JSON-файлы из указанной директории в один файл
Args:
json_dir (str): Путь к директории с JSON-файлами
output_file (str): Путь к выходному файлу
"""
# Для хранения объединенных данных
merged_data = {}
# Счетчик обработанных файлов
processed_files = 0
# Обходим все файлы в директории
for filename in os.listdir(json_dir):
if filename.endswith('.json'):
filepath = os.path.join(json_dir, filename)
try:
with open(filepath, 'r', encoding='utf-8') as file:
data = json.load(file)
# Для первого файла просто копируем структуру
if not merged_data:
merged_data = data
else:
# Рекурсивно объединяем структуры данных
merged_data = merge_structures(merged_data, data)
processed_files += 1
print(f"Обработан файл: {filename}")
except json.JSONDecodeError as e:
print(f"Ошибка декодирования JSON в файле {filename}: {e}")
except Exception as e:
print(f"Ошибка при обработке файла {filename}: {e}")
# Записываем объединенные данные в выходной файл
with open(output_file, 'w', encoding='utf-8') as file:
json.dump(merged_data, file, ensure_ascii=False, indent=4)
print(f"Объединение завершено. Обработано файлов: {processed_files}")
print(f"Результат сохранен в: {output_file}")
def merge_structures(a, b):
"""
Рекурсивно объединяет две структуры данных
Args:
a: Первая структура данных
b: Вторая структура данных
Returns:
Объединенная структура данных
"""
# Если оба аргумента - словари, объединяем их рекурсивно
if isinstance(a, dict) and isinstance(b, dict):
result = a.copy()
for key, value in b.items():
if key in result:
result[key] = merge_structures(result[key], value)
else:
result[key] = value
return result
# Если оба аргумента - списки, объединяем их
elif isinstance(a, list) and isinstance(b, list):
return a + b
# Для примитивных типов или несовпадающих типов берем значение из b
else:
return b
def main():
parser = argparse.ArgumentParser(description='Объединение JSON-файлов')
parser.add_argument('json_dir', help='Директория с JSON-файлами')
parser.add_argument('output_file', help='Путь к выходному файлу')
args = parser.parse_args()
merge_json_files(args.json_dir, args.output_file)
if __name__ == "__main__":
main()
Использование:
Примеры команд
Пример 1: Использование текущей директории
Если JSON-файлы находятся в поддиректории json_files текущего каталога:
python merge_json.py json_files merged_output.json
Пример 2: Использование абсолютного пути
python merge_json.py C:\Users\имя_пользователя\Documents\json_files C:\Users\имя_пользователя\Documents\merged_output.json
Пример 3: Пути с пробелами
Если пути содержат пробелы, заключите их в кавычки:
python merge_json.py "C:\My Documents\json files" "merged result.json"
8. Утилита для сравнения двух директорий
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import argparse
def get_file_list(directory):
"""
Получает список файлов в директории (с относительными путями)
Args:
directory (str): Путь к директории
Returns:
set: Множество относительных путей к файлам
"""
file_list = set()
for root, _, files in os.walk(directory):
for filename in files:
# Вычисляем путь относительно исходной директории
full_path = os.path.join(root, filename)
rel_path = os.path.relpath(full_path, directory)
file_list.add(rel_path)
return file_list
def compare_directories(dir1, dir2):
"""
Сравнивает содержимое двух директорий
Args:
dir1 (str): Путь к первой директории
dir2 (str): Путь к второй директории
Returns:
tuple: (файлы_только_в_dir1, файлы_только_в_dir2)
"""
# Получаем списки файлов
files_dir1 = get_file_list(dir1)
files_dir2 = get_file_list(dir2)
# Находим разницу
only_in_dir1 = files_dir1 - files_dir2
only_in_dir2 = files_dir2 - files_dir1
return only_in_dir1, only_in_dir2
def main():
parser = argparse.ArgumentParser(description='Сравнение директорий')
parser.add_argument('dir1', help='Первая директория')
parser.add_argument('dir2', help='Вторая директория')
parser.add_argument('--output', help='Путь к файлу для сохранения результатов')
args = parser.parse_args()
# Проверяем существование директорий
if not os.path.isdir(args.dir1):
print(f"Ошибка: Директория '{args.dir1}' не существует")
return
if not os.path.isdir(args.dir2):
print(f"Ошибка: Директория '{args.dir2}' не существует")
return
only_in_dir1, only_in_dir2 = compare_directories(args.dir1, args.dir2)
# Форматируем результаты
dir1_name = os.path.basename(os.path.abspath(args.dir1))
dir2_name = os.path.basename(os.path.abspath(args.dir2))
output_lines = []
output_lines.append(f"Сравнение директорий '{args.dir1}' и '{args.dir2}'")
output_lines.append("="*80)
output_lines.append(f"\nФайлы, присутствующие только в '{dir1_name}' ({len(only_in_dir1)}):")
if only_in_dir1:
for filename in sorted(only_in_dir1):
output_lines.append(f" {filename}")
else:
output_lines.append(" Нет файлов")
output_lines.append(f"\nФайлы, присутствующие только в '{dir2_name}' ({len(only_in_dir2)}):")
if only_in_dir2:
for filename in sorted(only_in_dir2):
output_lines.append(f" {filename}")
else:
output_lines.append(" Нет файлов")
# Выводим результаты
output_text = "\n".join(output_lines)
if args.output:
with open(args.output, 'w', encoding='utf-8') as file:
file.write(output_text)
print(f"Результаты сохранены в файл: {args.output}")
else:
print(output_text)
if __name__ == "__main__":
main()
Использование:
python compare_dirs.py /путь/к/директории1 /путь/к/директории2 --output результаты.txt
9. Скрипт для создания резервных копий
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import zipfile
import datetime
import argparse
def create_backup(directory, output_dir=None, prefix="backup"):
"""
Создает резервную копию директории в ZIP-архиве с датой в имени файла
Args:
directory (str): Путь к директории для резервного копирования
output_dir (str): Директория для сохранения архива (по умолчанию та же, что и исходная)
prefix (str): Префикс имени архива
Returns:
str: Путь к созданному архиву
"""
# Проверяем существование директории
if not os.path.isdir(directory):
raise ValueError(f"Директория '{directory}' не существует")
# Если output_dir не указан, используем родительскую директорию исходной
if output_dir is None:
output_dir = os.path.dirname(os.path.abspath(directory))
# Создаем output_dir, если он не существует
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Формируем имя архива с текущей датой
current_date = datetime.datetime.now().strftime("%Y%m%d")
archive_name = f"{prefix}_{current_date}.zip"
archive_path = os.path.join(output_dir, archive_name)
# Получаем абсолютный путь к директории и её имя
abs_dir_path = os.path.abspath(directory)
base_dir_name = os.path.basename(abs_dir_path)
parent_dir = os.path.dirname(abs_dir_path)
# Создаем архив
print(f"Создание резервной копии директории '{directory}'...")
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(directory):
# Определяем путь относительно родительской директории
rel_dir = os.path.relpath(root, parent_dir)
for file in files:
file_path = os.path.join(root, file)
# Добавляем файл в архив с сохранением структуры директорий
archive_path_in_zip = os.path.join(rel_dir, file)
zipf.write(file_path, archive_path_in_zip)
print(f"Добавлен файл: {file_path}")
print(f"Резервная копия успешно создана: {archive_path}")
return archive_path
def main():
parser = argparse.ArgumentParser(description='Создание резервной копии директории')
parser.add_argument('directory', help='Директория для резервного копирования')
parser.add_argument('--output-dir', help='Директория для сохранения архива')
parser.add_argument('--prefix', default='backup', help='Префикс имени архива (по умолчанию: backup)')
args = parser.parse_args()
try:
archive_path = create_backup(args.directory, args.output_dir, args.prefix)
print(f"Резервная копия создана: {archive_path}")
except Exception as e:
print(f"Ошибка при создании резервной копии: {e}")
if __name__ == "__main__":
main()
Использование:
python backup.py /путь/к/директории --output-dir /путь/для/сохранения/архива --prefix my_backup
10. Функция для валидации паролей
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import argparse
def validate_passwords(passwords):
"""
Проверяет список паролей на соответствие требованиям
Требования:
- Длина не менее 8 символов
- Наличие хотя бы одной цифры
- Наличие хотя бы одного спецсимвола
Args:
passwords (list): Список паролей для проверки
Returns:
list: Список валидных паролей
"""
valid_passwords = []
for password in passwords:
# Проверяем длину
if len(password) < 8:
continue
# Проверяем наличие цифры
if not re.search(r'\d', password):
continue
# Проверяем наличие спецсимвола
if not re.search(r'[!@#$%^&*()_+\-=\[\]{};:\'",.<>/?\\|]', password):
continue
# Если все проверки пройдены, добавляем пароль в список валидных
valid_passwords.append(password)
return valid_passwords
def main():
parser = argparse.ArgumentParser(description='Валидация паролей')
parser.add_argument('--file', help='Путь к файлу со списком паролей (по одному на строку)')
parser.add_argument('--passwords', nargs='+', help='Список паролей через пробел')
args = parser.parse_args()
if not args.file and not args.passwords:
print("Ошибка: Необходимо указать либо файл с паролями, либо список паролей")
return
passwords = []
# Получаем пароли из файла
if args.file:
try:
with open(args.file, 'r') as file:
passwords = [line.strip() for line in file if line.strip()]
except Exception as e:
print(f"Ошибка при чтении файла: {e}")
return
# Добавляем пароли из аргументов командной строки
if args.passwords:
passwords.extend(args.passwords)
# Валидируем пароли
valid_passwords = validate_passwords(passwords)
# Выводим результаты
if not valid_passwords:
print("Ни один из паролей не соответствует требованиям")
else:
print(f"Валидные пароли ({len(valid_passwords)} из {len(passwords)}):")
for password in valid_passwords:
print(f" {password}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import argparse
def validate_passwords(passwords):
"""
Проверяет список паролей на соответствие требованиям
Требования:
- Длина не менее 8 символов
- Наличие хотя бы одной цифры
- Наличие хотя бы одного спецсимвола
Args:
passwords (list): Список паролей для проверки
Returns:
list: Список валидных паролей
"""
valid_passwords = []
for password in passwords:
# Проверяем длину
if len(password) < 8:
continue
# Проверяем наличие цифры
if not re.search(r'\d', password):
continue
# Проверяем наличие спецсимвола
if not re.search(r'[!@#$%^&*()_+\-=\[\]{};:\'",.<>/?\\|]', password):
continue
# Если все проверки пройдены, добавляем пароль в список валидных
valid_passwords.append(password)
return valid_passwords
def main():
parser = argparse.ArgumentParser(description='Валидация паролей')
parser.add_argument('--file', help='Путь к файлу со списком паролей (по одному на строку)')
parser.add_argument('--passwords', nargs='+', help='Список паролей через пробел')
args = parser.parse_args()
if not args.file and not args.passwords:
print("Ошибка: Необходимо указать либо файл с паролями, либо список паролей")
return
passwords = []
# Получаем пароли из файла
if args.file:
try:
with open(args.file, 'r') as file:
passwords = [line.strip() for line in file if line.strip()]
except Exception as e:
print(f"Ошибка при чтении файла: {e}")
return
# Добавляем пароли из аргументов командной строки
if args.passwords:
passwords.extend(args.passwords)
# Валидируем пароли
valid_passwords = validate_passwords(passwords)
# Выводим результаты
if not valid_passwords:
print("Ни один из паролей не соответствует требованиям")
else:
print(f"Валидные пароли ({len(valid_passwords)} из {len(passwords)}):")
for password in valid_passwords:
print(f" {password}")
if __name__ == "__main__":
main()
Использование:
Программа validate_passwords.py проверяет пароли на соответствие следующим требованиям:
- Длина не менее 8 символов
- Наличие хотя бы одной цифры
- Наличие хотя бы одного специального символа
Программа validate_passwords.py проверяет пароли на соответствие следующим требованиям:
- Длина не менее 8 символов
- Наличие хотя бы одной цифры
- Наличие хотя бы одного специального символа
Варианты запуска программы
Вариант 1: Проверка списка паролей, указанных в командной строке
python validate_passwords.py --passwords пароль1 пароль2 пароль3
Например:
python validate_passwords.py --passwords pass123 password123! 12345678 Strong@Pass123
python validate_passwords.py --passwords пароль1 пароль2 пароль3
Например:
python validate_passwords.py --passwords pass123 password123! 12345678 Strong@Pass123
Вариант 2: Проверка паролей из файла
python validate_passwords.py --file /путь/к/файлу/с/паролями.txt
Файл должен содержать по одному паролю на строку. Например:
pass123
password123!
12345678
Strong@Pass123
python validate_passwords.py --file /путь/к/файлу/с/паролями.txt
Файл должен содержать по одному паролю на строку. Например:
pass123
password123!
12345678
Strong@Pass123
Вариант 3: Комбинированный подход (и файл, и пароли в командной строке)
python validate_passwords.py --file /путь/к/файлу/с/паролями.txt --passwords дополнительный1 дополнительный2
python validate_passwords.py --file /путь/к/файлу/с/паролями.txt --passwords дополнительный1 дополнительный2
Примеры использования
Пример 1: Проверка паролей, указанных в командной строке
python validate_passwords.py --passwords 123456 password Password123 Pass@word123
Вывод:
Валидные пароли (1 из 4):
Pass@word123
python validate_passwords.py --passwords 123456 password Password123 Pass@word123
Вывод:
Валидные пароли (1 из 4):
Pass@word123
Пример 2: Проверка паролей из файла
Создайте файл passwords.txt с содержимым:
123456
password
Password123
Pass@word123
Secure!123
Затем выполните:
python validate_passwords.py --file passwords.txt
Вывод:
Валидные пароли (2 из 5):
Pass@word123
Secure!123
Создайте файл passwords.txt с содержимым:
123456
password
Password123
Pass@word123
Secure!123
Затем выполните:
python validate_passwords.py --file passwords.txt
Вывод:
Валидные пароли (2 из 5):
Pass@word123
Secure!123
11. Утилита для пакетного переименования файлов
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import argparse
from pathlib import Path
def batch_rename(directory, pattern, replacement, file_filter=None, preview=False):
"""
Пакетное переименование файлов в директории по шаблону
Args:
directory (str): Путь к директории с файлами
pattern (str): Шаблон для переименования (например, 'image_{num:03d}{ext}')
replacement (str): Шаблон замены
file_filter (str): Регулярное выражение для фильтрации файлов
preview (bool): Режим предварительного просмотра (без переименования)
Returns:
list: Список кортежей (старое_имя, новое_имя)
"""
# Проверяем директорию
if not os.path.isdir(directory):
raise ValueError(f"Директория '{directory}' не существует")
# Получаем список файлов
files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
# Фильтруем файлы, если указан фильтр
if file_filter:
pattern_obj = re.compile(file_filter)
files = [f for f in files if pattern_obj.search(f)]
# Сортируем файлы для последовательной нумерации
files.sort()
# Список операций переименования
rename_operations = []
# Создаем шаблон для замены
for i, filename in enumerate(files, 1):
# Получаем расширение файла
_, ext = os.path.splitext(filename)
# Формируем новое имя
new_name = pattern.format(num=i, ext=ext)
# Добавляем операцию
old_path = os.path.join(directory, filename)
new_path = os.path.join(directory, new_name)
rename_operations.append((old_path, new_path))
# Выполняем переименование, если не в режиме предпросмотра
if not preview:
for old_path, new_path in rename_operations:
os.rename(old_path, new_path)
return rename_operations
def main():
parser = argparse.ArgumentParser(description='Пакетное переименование файлов')
parser.add_argument('directory', help='Директория с файлами')
parser.add_argument('pattern', help='Шаблон для новых имен файлов (например, "image_{num:03d}{ext}")')
parser.add_argument('--filter', help='Регулярное выражение для фильтрации файлов')
parser.add_argument('--preview', action='store_true', help='Режим предварительного просмотра (без переименования)')
args = parser.parse_args()
try:
rename_operations = batch_rename(args.directory, args.pattern, None, args.filter, args.preview)
# Выводим результаты
print(f"{'Предварительный просмотр' if args.preview else 'Выполнено'} переименование {len(rename_operations)} файлов:")
for old_path, new_path in rename_operations:
print(f" {os.path.basename(old_path)} → {os.path.basename(new_path)}")
if args.preview:
print("\nФайлы не были переименованы. Используйте команду без --preview для выполнения переименования.")
except Exception as e:
print(f"Ошибка: {e}")
if __name__ == "__main__":
main()
Использование:
Программа batch_rename.py позволяет массово переименовывать файлы в указанной директории по заданному шаблону. Вот подробная инструкция по использованию:
Основной синтаксис команды
python batch_rename.py ДИРЕКТОРИЯ ШАБЛОН [--filter ФИЛЬТР] [--preview]
Где:
ДИРЕКТОРИЯ - путь к папке с файлами для переименованияШАБЛОН - формат новых имен файлов (например, "image_{num:03d}{ext}")--filter ФИЛЬТР - опциональный параметр для фильтрации файлов по регулярному выражению--preview - опциональный параметр для предварительного просмотра без фактического переименования
Формат шаблона имени
В шаблоне можно использовать следующие переменные:
{num} - порядковый номер файла{num:03d} - порядковый номер с форматированием (здесь: 3 цифры с ведущими нулями){ext} - расширение исходного файла (с точкой)
Примеры использования
Пример 1: Базовое переименование
Переименовать все файлы в папке "photos" в формат "photo_001.jpg", "photo_002.jpg" и т.д.:
python batch_rename.py photos "photo_{num:03d}{ext}"
Пример 2: Предварительный просмотр
Показать, как будут переименованы файлы, но не выполнять переименование:
python batch_rename.py photos "photo_{num:03d}{ext}" --preview
Пример 3: Фильтрация файлов
Переименовать только JPG-файлы:
python batch_rename.py photos "image_{num:03d}{ext}" --filter "\.jpg$"
Пример 4: Комбинированный пример
Предпросмотр переименования только PNG-файлов:
python batch_rename.py photos "screenshot_{num:04d}{ext}" --filter "\.png$" --preview
12. Скрипт для извлечения ссылок из HTML-файла
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import argparse
from html.parser import HTMLParser
class LinkParser(HTMLParser):
"""
HTML-парсер для извлечения ссылок из HTML-файла
"""
def __init__(self):
super().__init__()
self.links = []
def handle_starttag(self, tag, attrs):
# Извлекаем ссылки из тегов <a> и <link>
if tag in ['a', 'link']:
for attr, value in attrs:
if attr == 'href' and value:
self.links.append(value)
# Извлекаем src из тегов <img>, <script>, <iframe>
if tag in ['img', 'script', 'iframe']:
for attr, value in attrs:
if attr == 'src' and value:
self.links.append(value)
def extract_links_from_html(html_content):
"""
Извлекает все ссылки из HTML-содержимого
Args:
html_content (str): HTML-содержимое
Returns:
list: Список найденных ссылок
"""
parser = LinkParser()
parser.feed(html_content)
# Удаляем дубликаты и сортируем
unique_links = sorted(set(parser.links))
return unique_links
def main():
parser = argparse.ArgumentParser(description='Извлечение ссылок из HTML-файла')
parser.add_argument('html_file', help='Путь к HTML-файлу')
parser.add_argument('--output', help='Путь к выходному файлу для сохранения ссылок')
parser.add_argument('--filter', help='Фильтр ссылок по регулярному выражению')
args = parser.parse_args()
try:
# Чтение HTML-файла
with open(args.html_file, 'r', encoding='utf-8') as file:
html_content = file.read()
# Извлечение ссылок
links = extract_links_from_html(html_content)
# Применение фильтра, если указан
if args.filter:
pattern = re.compile(args.filter)
links = [link for link in links if pattern.search(link)]
# Формирование вывода
output_text = "\n".join(links)
# Сохранение результатов
if args.output:
with open(args.output, 'w', encoding='utf-8') as file:
file.write(output_text)
print(f"Извлечено {len(links)} ссылок, сохранено в {args.output}")
else:
print(f"Извлечено {len(links)} ссылок:")
print(output_text)
except Exception as e:
print(f"Ошибка: {e}")
if __name__ == "__main__":
main()
Использование:
Примеры использования
Пример 1: Базовое извлечение ссылок
Извлечь все ссылки из HTML-файла и вывести их в консоль:
python extract_links.py page.html
Пример 2: Сохранение ссылок в файл
Извлечь все ссылки и сохранить их в текстовый файл:
python extract_links.py page.html --output links.txt
Пример 3: Фильтрация ссылок
Извлечь только ссылки, содержащие слово "image" или "photo":
python extract_links.py page.html --filter "image|photo"
Пример 4: Извлечение только внешних ссылок
Извлечь только ссылки на внешние сайты (начинающиеся с http или https):
python extract_links.py page.html --filter "^https?://"
Пример 5: Комбинированный пример
Извлечь только ссылки на PDF-файлы и сохранить их в файл:
python extract_links.py page.html --filter "\.pdf$" --output pdf_links.txt
Что извлекает программа
Программа находит следующие типы ссылок:
Атрибуты href из тегов <a> и <link>:
<a href="https://example.com">Ссылка</a>
<link href="styles.css" rel="stylesheet">
Атрибуты src из тегов <img>, <script> и <iframe>:
<img src="image.jpg" alt="Изображение">
<script src="script.js"></script>
<iframe src="https://example.com/embed"></iframe>
13. Модуль для логирования действий пользователя
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import time
import json
import logging
from datetime import datetime
class UserActionLogger:
"""
Класс для логирования действий пользователя в консольном приложении
"""
def __init__(self, log_file=None, log_level=logging.INFO, format_string=None):
"""
Инициализирует объект логгера
Args:
log_file (str): Путь к файлу логов
log_level (int): Уровень логирования
format_string (str): Строка формата для логов
"""
# Настройка имени файла лога, если не указано
if log_file is None:
log_dir = os.path.join(os.path.expanduser("~"), ".logs")
os.makedirs(log_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d")
log_file = os.path.join(log_dir, f"user_actions_{timestamp}.log")
# Настройка форматирования, если не указано
if format_string is None:
format_string = '%(asctime)s - %(levelname)s - %(message)s'
# Настройка логгера
self.logger = logging.getLogger("user_actions")
self.logger.setLevel(log_level)
# Создаем обработчик файла
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(log_level)
# Создаем форматтер
formatter = logging.Formatter(format_string)
file_handler.setFormatter(formatter)
# Добавляем обработчик к логгеру
self.logger.addHandler(file_handler)
# Сохраняем путь к файлу логов
self.log_file = log_file
# Информация о сессии
self.session_start = datetime.now()
self.session_id = int(time.time())
# Логируем начало сессии
self.log_action("SESSION_START", {"session_id": self.session_id})
def log_action(self, action_type, details=None):
"""
Логирует действие пользователя
Args:
action_type (str): Тип действия
details (dict): Детали действия
"""
# Если детали не указаны, используем пустой словарь
if details is None:
details = {}
# Формируем данные лога
log_data = {
"timestamp": datetime.now().isoformat(),
"action_type": action_type,
"session_id": self.session_id,
"details": details
}
# Преобразуем в JSON и логируем
log_message = json.dumps(log_data, ensure_ascii=False)
self.logger.info(log_message)
return log_data
def log_command(self, command, args=None, status="SUCCESS"):
"""
Логирует выполнение команды
Args:
command (str): Команда
args (dict): Аргументы команды
status (str): Статус выполнения
"""
return self.log_action("COMMAND", {
"command": command,
"arguments": args or {},
"status": status
})
def log_error(self, error_message, error_type=None, details=None):
"""
Логирует ошибку
Args:
error_message (str): Сообщение об ошибке
error_type (str): Тип ошибки
details (dict): Дополнительные детали
"""
error_details = {
"message": error_message,
"type": error_type or "UNKNOWN"
}
if details:
error_details.update(details)
return self.log_action("ERROR", error_details)
def log_user_input(self, input_text, input_type="TEXT"):
"""
Логирует пользовательский ввод
Args:
input_text (str): Текст ввода
input_type (str): Тип ввода
"""
return self.log_action("USER_INPUT", {
"text": input_text,
"type": input_type
})
def log_system_event(self, event_name, details=None):
"""
Логирует системное событие
Args:
event_name (str): Название события
details (dict): Детали события
"""
return self.log_action("SYSTEM_EVENT", {
"event": event_name,
"details": details or {}
})
def end_session(self):
"""
Завершает сессию логирования
"""
session_duration = (datetime.now() - self.session_start).total_seconds()
self.log_action("SESSION_END", {
"session_id": self.session_id,
"duration_seconds": session_duration
})
# Закрываем обработчики
for handler in self.logger.handlers:
handler.close()
self.logger.removeHandler(handler)
# Пример использования
if __name__ == "__main__":
# Инициализация логгера
logger = UserActionLogger(log_file="user_actions.log")
# Примеры логирования различных действий
try:
# Логируем пользовательский ввод
user_input = input("Введите команду: ")
logger.log_user_input(user_input)
# Логируем выполнение команды
if user_input == "help":
logger.log_command("help", status="SUCCESS")
print("Доступные команды: help, exit")
elif user_input == "exit":
logger.log_command("exit", status="SUCCESS")
print("Выход из программы")
else:
# Логируем ошибку
logger.log_error(f"Неизвестная команда: {user_input}", "INVALID_COMMAND")
print(f"Ошибка: неизвестная команда '{user_input}'")
# Логируем системное событие
logger.log_system_event("EXAMPLE_COMPLETE")
finally:
# Завершаем сессию
logger.end_session()
print(f"Логи сохранены в {logger.log_file}")
Использование:
# Создаем логгер
logger = UserActionLogger(log_file="app_log.log")
try:
# Логируем действия пользователя
logger.log_user_input("open file.txt")
logger.log_command("open", {"filename": "file.txt"})
# При возникновении ошибки
logger.log_error("Файл не найден", "FILE_NOT_FOUND")
finally:
# Завершаем сессию при выходе
logger.end_session()
14. Простой чат-сервер на сокетах(ПРОБЛЕМАА)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import threading
import argparse
import time
import sys
import select
class ChatServer:
"""
Простой чат-сервер на сокетах
"""
def __init__(self, host='0.0.0.0', port=8888):
"""
Инициализирует сервер
Args:
host (str): Адрес хоста для прослушивания
port (int): Порт для прослушивания
"""
self.host = host
self.port = port
self.server_socket = None
self.clients = {} # {соединение: имя_пользователя}
self.lock = threading.Lock()
self.running = False
def start(self):
"""
Запускает сервер
"""
# Создаем TCP сокет
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Настраиваем повторное использование адреса
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
# Привязываем сокет к адресу и порту
self.server_socket.bind((self.host, self.port))
# Начинаем прослушивание
self.server_socket.listen(5)
self.running = True
print(f"Сервер запущен на {self.host}:{self.port}")
# Запускаем поток для обработки консольных команд
console_thread = threading.Thread(target=self.handle_console)
console_thread.daemon = True
console_thread.start()
# Основной цикл сервера
while self.running:
# Принимаем новое соединение
client_socket, client_address = self.server_socket.accept()
# Запускаем поток для обработки клиента
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, client_address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nПрерывание работы сервера...")
finally:
self.stop()
def stop(self):
"""
Останавливает сервер
"""
self.running = False
# Отправляем сообщение всем клиентам о закрытии сервера
with self.lock:
for client in list(self.clients.keys()):
try:
client.send("Сервер закрывается. Соединение будет разорвано.".encode('utf-8'))
client.close()
except:
pass
self.clients.clear()
# Закрываем серверный сокет
if self.server_socket:
self.server_socket.close()
print("Сервер остановлен")
def handle_client(self, client_socket, client_address):
"""
Обрабатывает подключение клиента
Args:
client_socket (socket): Сокет клиента
client_address (tuple): Адрес клиента
"""
# Запрашиваем имя пользователя
client_socket.send("Введите ваше имя: ".encode('utf-8'))
try:
# Получаем имя пользователя
username = client_socket.recv(1024).decode('utf-8').strip()
# Если имя не получено, генерируем случайное
if not username:
username = f"User_{int(time.time())}"
# Регистрируем клиента
with self.lock:
self.clients[client_socket] = username
# Отправляем приветственное сообщение
client_socket.send(f"Добро пожаловать, {username}! Для выхода введите /exit".encode('utf-8'))
# Оповещаем других пользователей
self.broadcast(f"[SERVER] {username} присоединился к чату", except_socket=client_socket)
print(f"Новое подключение: {username} ({client_address[0]}:{client_address[1]})")
# Обрабатываем сообщения клиента
while self.running:
message = client_socket.recv(1024).decode('utf-8').strip()
if not message:
break
# Обрабатываем команду выхода
if message.lower() == '/exit':
client_socket.send("До свидания!".encode('utf-8'))
break
# Отправляем сообщение всем пользователям
self.broadcast(f"{username}: {message}")
except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError):
pass
except Exception as e:
print(f"Ошибка при обработке клиента: {e}")
finally:
# Удаляем клиента при отключении
with self.lock:
if client_socket in self.clients:
username = self.clients[client_socket]
del self.clients[client_socket]
# Оповещаем других пользователей
self.broadcast(f"[SERVER] {username} покинул чат")
# Закрываем соединение
client_socket.close()
print(f"Соединение с {username} закрыто")
def broadcast(self, message, except_socket=None):
"""
Отправляет сообщение всем подключенным клиентам
Args:
message (str): Сообщение для отправки
except_socket (socket): Сокет, который нужно исключить из рассылки
"""
with self.lock:
for client_socket in list(self.clients.keys()):
if client_socket != except_socket:
try:
client_socket.send(message.encode('utf-8'))
except (ConnectionResetError, BrokenPipeError):
# Удаляем клиента, если не удалось отправить сообщение
username = self.clients.pop(client_socket, "Unknown")
print(f"Клиент {username} отключен из-за ошибки соединения")
def handle_console(self):
"""
Обрабатывает консольные команды для управления сервером
"""
print("Доступные команды:")
print(" users - показать список пользователей")
print(" kick <имя> - отключить пользователя")
print(" broadcast <сообщение> - отправить сообщение всем")
print(" stop - остановить сервер")
while self.running:
try:
# Проверяем, доступен ли ввод с клавиатуры
if select.select([sys.stdin], [], [], 0.1)[0]:
command = sys.stdin.readline().strip()
if command == "users":
with self.lock:
if not self.clients:
print("Нет подключенных пользователей")
else:
print("Подключенные пользователи:")
for i, username in enumerate(self.clients.values(), 1):
print(f" {i}. {username}")
elif command.startswith("kick "):
username_to_kick = command[5:].strip()
kicked = False
with self.lock:
for client_socket, username in list(self.clients.items()):
if username == username_to_kick:
try:
client_socket.send("[SERVER] Вы были отключены администратором.".encode('utf-8'))
client_socket.close()
del self.clients[client_socket]
kicked = True
print(f"Пользователь {username} отключен")
except:
pass
if not kicked:
print(f"Пользователь {username_to_kick} не найден")
elif command.startswith("broadcast "):
message = command[10:].strip()
if message:
self.broadcast(f"[SERVER] {message}")
print("Сообщение отправлено")
elif command == "stop":
print("Останавливаем сервер...")
self.stop()
break
# Небольшая задержка, чтобы не загружать CPU
time.sleep(0.1)
except Exception as e:
print(f"Ошибка при обработке консольной команды: {e}")
class ChatClient:
"""
Клиент для подключения к чат-серверу
"""
def __init__(self, host='localhost', port=8888):
"""
Инициализирует клиент
Args:
host (str): Адрес сервера
port (int): Порт сервера
"""
self.host = host
self.port = port
self.client_socket = None
self.running = False
def start(self):
"""
Запускает клиент и подключается к серверу
"""
# Создаем TCP сокет
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Подключаемся к серверу
print(f"Подключение к серверу {self.host}:{self.port}...")
self.client_socket.connect((self.host, self.port))
self.running = True
# Запускаем поток для приема сообщений
receive_thread = threading.Thread(target=self.receive_messages)
receive_thread.daemon = True
receive_thread.start()
# Поток для отправки сообщений
self.send_messages()
except ConnectionRefusedError:
print("Не удалось подключиться к серверу. Убедитесь, что сервер запущен.")
except Exception as e:
print(f"Ошибка: {e}")
finally:
self.stop()
def stop(self):
"""
Останавливает клиент
"""
self.running = False
# Закрываем соединение
if self.client_socket:
self.client_socket.close()
def receive_messages(self):
"""
Принимает сообщения от сервера
"""
try:
while self.running:
message = self.client_socket.recv(1024).decode('utf-8')
if not message:
print("Соединение с сервером разорвано")
self.running = False
break
print(message)
except (ConnectionResetError, BrokenPipeError):
print("Соединение с сервером разорвано")
except Exception as e:
print(f"Ошибка при получении сообщений: {e}")
finally:
self.running = False
def send_messages(self):
"""
Отправляет сообщения на сервер
"""
try:
while self.running:
message = input()
if message.lower() == '/exit':
self.client_socket.send(message.encode('utf-8'))
break
self.client_socket.send(message.encode('utf-8'))
except (ConnectionResetError, BrokenPipeError):
print("Соединение с сервером разорвано")
except Exception as e:
print(f"Ошибка при отправке сообщений: {e}")
def main():
parser = argparse.ArgumentParser(description='Простой чат-сервер/клиент')
parser.add_argument('--mode', choices=['server', 'client'], required=True,
help='Режим работы: server или client')
parser.add_argument('--host', default='localhost',
help='Адрес хоста (по умолчанию: localhost)')
parser.add_argument('--port', type=int, default=8888,
help='Порт (по умолчанию: 8888)')
args = parser.parse_args()
if args.mode == 'server':
# Запускаем сервер
server = ChatServer(args.host, args.port)
server.start()
else:
# Запускаем клиент
client = ChatClient(args.host, args.port)
client.start()
if __name__ == "__main__":
main()
Использование:
Для запуска сервера:
python chat.py --mode server --port 8888
Для подключения клиента:
python chat.py --mode client --host localhost --port 8888
15. Скрипт для поиска дубликатов изображений
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import hashlib
import shutil
import argparse
from PIL import Image
import io
def calculate_image_hash(image_path):
"""
Вычисляет перцептивный хеш изображения
Args:
image_path (str): Путь к изображению
Returns:
str: Хеш изображения
"""
try:
# Открываем изображение
with Image.open(image_path) as img:
# Изменяем размер до 8x8
img = img.resize((8, 8), Image.LANCZOS).convert('L')
# Вычисляем среднее значение пикселей
pixels = list(img.getdata())
avg_pixel = sum(pixels) // len(pixels)
# Вычисляем бинарный хеш
bits = ''.join('1' if pixel >= avg_pixel else '0' for pixel in pixels)
# Преобразуем бинарную строку в шестнадцатеричное представление
hashval = int(bits, 2)
hash_hex = format(hashval, '016x')
return hash_hex
except Exception as e:
print(f"Ошибка при обработке {image_path}: {e}")
return None
def calculate_file_hash(file_path):
"""
Вычисляет MD5-хеш файла
Args:
file_path (str): Путь к файлу
Returns:
str: MD5-хеш файла
"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def is_image_file(file_path):
"""
Проверяет, является ли файл изображением
Args:
file_path (str): Путь к файлу
Returns:
bool: True, если файл является изображением
"""
# Расширения популярных форматов изображений
image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}
# Проверяем расширение файла
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext in image_extensions:
try:
# Проверяем, что файл действительно является изображением
with Image.open(file_path) as img:
img.verify()
return True
except:
return False
return False
def find_duplicate_images(directory, hash_method='phash', similarity_threshold=0):
"""
Находит дубликаты изображений в указанной директории
Args:
directory (str): Путь к директории
hash_method (str): Метод хеширования ('phash' или 'md5')
similarity_threshold (int): Порог сходства для перцептивного хеша (0-10)
Returns:
dict: Словарь групп дубликатов {hash: [file_paths]}
"""
# Словарь для хранения хешей изображений
images_by_hash = {}
# Счетчики
total_files = 0
processed_files = 0
# Получаем список всех файлов в директории и поддиректориях
for root, _, files in os.walk(directory):
for filename in files:
filepath = os.path.join(root, filename)
total_files += 1
# Проверяем, является ли файл изображением
if is_image_file(filepath):
# Вычисляем хеш в зависимости от выбранного метода
if hash_method == 'phash':
file_hash = calculate_image_hash(filepath)
else:
file_hash = calculate_file_hash(filepath)
if file_hash:
# Добавляем путь к файлу в соответствующий список
if file_hash not in images_by_hash:
images_by_hash[file_hash] = []
images_by_hash[file_hash].append(filepath)
processed_files += 1
# Отбираем только группы с более чем одним файлом (дубликаты)
duplicate_groups = {h: files for h, files in images_by_hash.items() if len(files) > 1}
print(f"Обработано {processed_files} изображений из {total_files} файлов")
print(f"Найдено {len(duplicate_groups)} групп дубликатов")
return duplicate_groups
def move_duplicates(duplicate_groups, output_dir):
"""
Перемещает дубликаты в указанную директорию
Args:
duplicate_groups (dict): Словарь групп дубликатов {hash: [file_paths]}
output_dir (str): Путь к директории для перемещения дубликатов
"""
# Создаем директорию, если она не существует
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Счетчики
total_duplicates = 0
moved_files = 0
# Перемещаем дубликаты
for hash_val, file_list in duplicate_groups.items():
# Создаем поддиректорию для группы дубликатов
group_dir = os.path.join(output_dir, f"group_{hash_val[:8]}")
os.makedirs(group_dir, exist_ok=True)
# Перемещаем все файлы из группы, кроме первого
for i, filepath in enumerate(file_list):
# Первый файл оставляем на месте, остальные перемещаем
if i > 0:
filename = os.path.basename(filepath)
dest_path = os.path.join(group_dir, filename)
# Если файл с таким именем уже существует, добавляем суффикс
if os.path.exists(dest_path):
base, ext = os.path.splitext(filename)
dest_path = os.path.join(group_dir, f"{base}_{i}{ext}")
try:
shutil.move(filepath, dest_path)
moved_files += 1
print(f"Перемещен: {filepath} -> {dest_path}")
except Exception as e:
print(f"Ошибка при перемещении {filepath}: {e}")
total_duplicates += 1 if i > 0 else 0
print(f"Перемещено {moved_files} дубликатов из {total_duplicates}")
def main():
parser = argparse.ArgumentParser(description='Поиск дубликатов изображений')
parser.add_argument('directory', help='Директория для поиска')
parser.add_argument('--output', default='duplicates',
help='Директория для перемещения дубликатов (по умолчанию: duplicates)')
parser.add_argument('--method', choices=['phash', 'md5'], default='phash',
help='Метод хеширования (по умолчанию: phash)')
parser.add_argument('--threshold', type=int, default=0,
help='Порог сходства для перцептивного хеша (0-10, по умолчанию: 0)')
parser.add_argument('--preview', action='store_true',
help='Режим предварительного просмотра (без перемещения файлов)')
args = parser.parse_args()
# Проверяем существование директории
if not os.path.isdir(args.directory):
print(f"Ошибка: Директория '{args.directory}' не существует")
return
print(f"Поиск дубликатов изображений в {args.directory}...")
duplicate_groups = find_duplicate_images(args.directory, args.method, args.threshold)
if not duplicate_groups:
print("Дубликаты не найдены")
return
# Выводим найденные группы дубликатов
print("\nНайденные группы дубликатов:")
for i, (hash_val, file_list) in enumerate(duplicate_groups.items(), 1):
print(f"\nГруппа {i} (хеш: {hash_val[:8]}):")
for j, filepath in enumerate(file_list):
status = "оригинал" if j == 0 else "дубликат"
print(f" {j+1}. {filepath} ({status})")
# Перемещаем дубликаты, если не в режиме предпросмотра
if not args.preview:
output_dir = os.path.join(os.path.dirname(os.path.abspath(args.directory)), args.output)
print(f"\nПеремещение дубликатов в {output_dir}...")
move_duplicates(duplicate_groups, output_dir)
else:
print("\nРежим предварительного просмотра. Файлы не перемещены.")
print("Для перемещения файлов запустите скрипт без флага --preview")
if __name__ == "__main__":
main()
Примеры использования скрипта
После установки Pillow вы сможете использовать ваш скрипт так:
Простой поиск дубликатов в папке:
python 123.py "путь/к/папке/с/изображениями"
Использование MD5-хеширования вместо перцептивного:
python 123.py "путь/к/папке/с/изображениями" --method md5
Режим предварительного просмотра (без перемещения файлов):
python 123.py "путь/к/папке/с/изображениями" --preview
Указание пользовательской директории для дубликатов:
python 123.py "путь/к/папке/с/изображениями" --output "путь/для/дубликатов"
16. Функция для валидации email-адресов
16. Функция для валидации email-адресов
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import argparse
import socket
import dns.resolver
def validate_email(email, check_dns=False, check_format_only=False):
"""
Валидирует email-адрес с использованием регулярных выражений
Args:
email (str): Email-адрес для проверки
check_dns (bool): Проверять ли существование домена
check_format_only (bool): Проверять только формат без подробных ошибок
Returns:
tuple: (валидный, сообщение_об_ошибке)
"""
# Проверяем базовый формат email
if not isinstance(email, str):
return False, "Email должен быть строкой"
# Проверяем общий формат
basic_check = re.match(r'^[^@]+@[^@]+\.[^@]+$', email)
if not basic_check:
return False, "Неверный формат email (должен быть 'user@domain.tld')"
# Если требуется только базовая проверка
if check_format_only:
return True, "Email имеет правильный формат"
# Разделяем email на локальную часть и домен
try:
local_part, domain = email.rsplit('@', 1)
except ValueError:
return False, "Email должен содержать символ @"
# Проверка локальной части
if not local_part:
return False, "Локальная часть email не может быть пустой"
if len(local_part) > 64:
return False, "Локальная часть email слишком длинная (максимум 64 символа)"
# Проверка на недопустимые символы в локальной части
if not re.match(r'^[a-zA-Z0-9.!#$%&\'*+/=?^_`{|}~-]+$', local_part):
return False, "Локальная часть содержит недопустимые символы"
# Проверка на две точки подряд в локальной части
if '..' in local_part:
return False, "Локальная часть не может содержать две точки подряд"
# Проверка домена
if not domain:
return False, "Домен не может быть пустым"
if len(domain) > 255:
return False, "Домен слишком длинный (максимум 255 символов)"
# Проверка формата домена
domain_parts = domain.split('.')
# Проверка на достаточное количество частей домена
if len(domain_parts) < 2:
return False, "Домен должен содержать хотя бы одну точку"
# Проверка формата каждой части домена
for part in domain_parts:
if not part:
return False, "Части домена не могут быть пустыми"
if len(part) > 63:
return False, f"Часть домена '{part}' слишком длинная (максимум 63 символа)"
if not re.match(r'^[a-zA-Z0-9-]+$', part):
return False, f"Часть домена '{part}' содержит недопустимые символы"
if part[0] == '-' or part[-1] == '-':
return False, f"Часть домена '{part}' не может начинаться или заканчиваться дефисом"
# Проверка TLD
tld = domain_parts[-1]
if not re.match(r'^[a-zA-Z]{2,}$', tld):
return False, f"Неверный TLD '{tld}' (должен содержать минимум 2 буквы)"
# Проверка существования домена (опционально)
if check_dns:
try:
# Проверка MX-записи
dns.resolver.resolve(domain, 'MX')
return True, "Email валиден и домен имеет MX-запись"
except dns.resolver.NoAnswer:
try:
# Проверка A-записи как запасной вариант
dns.resolver.resolve(domain, 'A')
return True, "Email валиден, но домен не имеет MX-записи (найдена A-запись)"
except dns.resolver.NoAnswer:
return False, f"Домен '{domain}' не имеет ни MX, ни A записей"
except dns.resolver.NXDOMAIN:
return False, f"Домен '{domain}' не существует"
except Exception as e:
return False, f"Ошибка при проверке A-записи: {str(e)}"
except dns.resolver.NXDOMAIN:
return False, f"Домен '{domain}' не существует"
except Exception as e:
return False, f"Ошибка при проверке MX-записи: {str(e)}"
return True, "Email имеет правильный формат"
def validate_emails(emails, check_dns=False):
"""
Проверяет список email-адресов
Args:
emails (list): Список email-адресов для проверки
check_dns (bool): Проверять ли существование домена
Returns:
dict: Словарь с результатами {email: (валидный, сообщение)}
"""
results = {}
for email in emails:
is_valid, message = validate_email(email, check_dns)
results[email] = (is_valid, message)
return results
def main():
parser = argparse.ArgumentParser(description='Валидация email-адресов')
parser.add_argument('emails', nargs='*', help='Email-адреса для проверки')
parser.add_argument('--file', help='Путь к файлу со списком email-адресов (по одному на строку)')
parser.add_argument('--check-dns', action='store_true', help='Проверять существование домена')
parser.add_argument('--output', help='Путь к файлу для сохранения результатов')
args = parser.parse_args()
emails = args.emails.copy()
# Получаем email-адреса из файла
if args.file:
try:
with open(args.file, 'r', encoding='utf-8') as file:
file_emails = [line.strip() for line in file if line.strip()]
emails.extend(file_emails)
except Exception as e:
print(f"Ошибка при чтении файла: {e}")
return
if not emails:
print("Ошибка: Не указаны email-адреса для проверки")
return
# Убираем дубликаты и сортируем
emails = sorted(set(emails))
# Проверяем адреса
results = validate_emails(emails, args.check_dns)
# Формируем вывод
output_lines = []
valid_count = sum(1 for is_valid, _ in results.values() if is_valid)
output_lines.append(f"Результаты проверки ({valid_count} из {len(emails)} прошли валидацию):")
output_lines.append("="*80)
# Сначала валидные адреса
output_lines.append("\nВалидные адреса:")
for email, (is_valid, message) in sorted(results.items()):
if is_valid:
output_lines.append(f" {email} - {message}")
# Затем невалидные
output_lines.append("\nНевалидные адреса:")
for email, (is_valid, message) in sorted(results.items()):
if not is_valid:
output_lines.append(f" {email} - {message}")
# Выводим результаты
output_text = "\n".join(output_lines)
if args.output:
with open(args.output, 'w', encoding='utf-8') as file:
file.write(output_text)
print(f"Результаты сохранены в файл: {args.output}")
else:
print(output_text)
if __name__ == "__main__":
main()
Использование:
python validate_emails.py user@example.com invalid@email --check-dns --output results.txt
Примеры использования скрипта
После установки dnspython вы сможете использовать скрипт следующим образом:
Проверка одного или нескольких адресов:
validate_emails.py user@example.com user2@example.com
Проверка с включенной проверкой DNS (требует dnspython):
validate_emails.py user@example.com --check-dns
Использование файла со списком адресов:
python validate_emails.py --file emails.txt
Сохранение результатов в файл:
python validate_emails.py user@example.com --output results.txt
Комбинация параметров:
python validate_emails.py user@example.com --check-dns --output results.txt
17. Консольная утилита для курса
Вот мое полное решение:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import requests
import argparse
import datetime
import json
import os
import re
from tabulate import tabulate
class CurrencyRates:
"""
Класс для получения курсов валют
"""
def __init__(self, cache_dir=None):
"""
Инициализирует объект для получения курсов валют
Args:
cache_dir (str): Путь к директории для кэширования данных
"""
# URL для API Центрального Банка России
self.cbr_api_url = 'https://www.cbr-xml-daily.ru/daily_json.js'
# URL для API Европейского Центрального Банка
self.ecb_api_url = 'https://api.exchangerate.host/latest'
# Директория для кэширования
if cache_dir is None:
cache_dir = os.path.join(os.path.expanduser("~"), ".currency_cache")
self.cache_dir = cache_dir
# Создаем директорию для кэша, если она не существует
os.makedirs(self.cache_dir, exist_ok=True)
def get_rates(self, base_currency, target_currency, days=7, source='auto'):
"""
Получает курсы валют за указанное количество дней
Args:
base_currency (str): Базовая валюта
target_currency (str): Целевая валюта
days (int): Количество дней для отображения
source (str): Источник данных ('cbr', 'ecb', 'auto')
Returns:
list: Список кортежей (дата, курс)
"""
rates = []
today = datetime.datetime.now().date()
# Сначала получаем курс на сегодня, чтобы иметь актуальные данные
# для случаев, когда в выходные/праздничные дни курс не меняется
today_str = today.strftime('%Y-%m-%d')
today_rate = None
# Выбираем источник для текущей пары валют
sources = []
if source == 'cbr':
sources = ['cbr']
elif source == 'ecb':
sources = ['ecb']
else: # auto
# Для пары с RUB сначала пробуем ЦБР
if 'RUB' in [base_currency, target_currency]:
sources = ['cbr', 'ecb']
else:
sources = ['ecb', 'cbr']
# Получаем сегодняшний курс
for src in sources:
if src == 'cbr':
today_rate = self._get_rate_from_cbr(base_currency, target_currency, today_str)
if today_rate is not None:
last_source = 'cbr'
break
else: # ecb
today_rate = self._get_rate_from_ecb(base_currency, target_currency, today_str)
if today_rate is not None:
last_source = 'ecb'
break
# Сохраняем последний известный курс
last_known_rate = today_rate
# Запрашиваем курсы для каждого дня
for i in range(days):
# Вычисляем дату
date = today - datetime.timedelta(days=days-1-i)
date_str = date.strftime('%Y-%m-%d')
rate = None
# Запрашиваем курс для текущей даты
for src in sources:
# Проверяем кэш
rate = self._get_from_cache(base_currency, target_currency, date_str, src)
if rate is not None:
# Сохраняем источник, из которого получили данные
last_source = src
break
# Если в кэше нет, получаем из соответствующего API
if src == 'cbr':
# Для ЦБР пропускаем выходные
if date.weekday() >= 5: # 5, 6 - суббота, воскресенье
continue
rate = self._get_rate_from_cbr(base_currency, target_currency, date_str)
if rate is not None:
# Сохраняем в кэш
last_source = 'cbr'
self._save_to_cache(base_currency, target_currency, date_str, rate, src)
break
else: # ecb
rate = self._get_rate_from_ecb(base_currency, target_currency, date_str)
if rate is not None:
# Сохраняем в кэш
last_source = 'ecb'
self._save_to_cache(base_currency, target_currency, date_str, rate, src)
break
# Если не удалось получить курс ни из одного источника, используем последний известный
if rate is None:
# Для выходных, праздников или недоступных данных используем последний известный курс
rate = last_known_rate
if rate is not None:
print(f"Используем последний известный курс для {date_str}: {rate}")
else:
# Обновляем последний известный курс
last_known_rate = rate
# Добавляем курс в список результатов
if rate is not None:
rates.append((date_str, rate))
else:
print(f"Не удалось получить курс {base_currency}/{target_currency} за {date_str}")
return rates
def _get_rate_from_cbr(self, base_currency, target_currency, date_str):
"""
Получает курс валют от ЦБ РФ
Args:
base_currency (str): Базовая валюта
target_currency (str): Целевая валюта
date_str (str): Дата в формате 'YYYY-MM-DD'
Returns:
float: Курс валюты или None в случае ошибки
"""
# Для CBR доступны только курсы к рублю
if target_currency != 'RUB' and base_currency != 'RUB':
return None
try:
# Если дата сегодняшняя, используем daily_json.js
today_str = datetime.datetime.now().date().strftime('%Y-%m-%d')
if date_str == today_str:
response = requests.get(self.cbr_api_url, timeout=10)
if response.status_code != 200:
print(f"Ошибка запроса к ЦБ РФ: {response.status_code}")
return None
data = response.json()
# Получаем курсы
if base_currency == 'RUB':
# RUB -> X
if target_currency in data['Valute']:
nominal = data['Valute'][target_currency]['Nominal']
value = data['Valute'][target_currency]['Value']
return nominal / value
else:
# X -> RUB
if base_currency in data['Valute']:
nominal = data['Valute'][base_currency]['Nominal']
value = data['Valute'][base_currency]['Value']
return value / nominal
else:
# Для исторических данных используем архив
date_obj = datetime.datetime.strptime(date_str, '%Y-%m-%d')
formatted_date = date_obj.strftime('%d/%m/%Y')
url = f'https://www.cbr.ru/scripts/XML_daily.asp?date_req={formatted_date}'
response = requests.get(url, timeout=10)
# Проверяем успешность запроса
if response.status_code != 200:
print(f"Ошибка запроса к ЦБ РФ: {response.status_code}")
return None
# Парсим XML
if base_currency == 'RUB':
# RUB -> X
pattern = f'<CharCode>{target_currency}</CharCode>.*?<Nominal>(\\d+)</Nominal>.*?<Value>([^<]+)</Value>'
match = re.search(pattern, response.text, re.DOTALL)
if match:
nominal = int(match.group(1))
value_str = match.group(2).replace(',', '.')
return nominal / float(value_str)
else:
# X -> RUB
pattern = f'<CharCode>{base_currency}</CharCode>.*?<Nominal>(\\d+)</Nominal>.*?<Value>([^<]+)</Value>'
match = re.search(pattern, response.text, re.DOTALL)
if match:
nominal = int(match.group(1))
value_str = match.group(2).replace(',', '.')
return float(value_str) / nominal
except Exception as e:
print(f"Ошибка при получении данных от ЦБ РФ: {e}")
return None
def _get_rate_from_ecb(self, base_currency, target_currency, date_str):
"""
Получает курс валют от ЕЦБ
Args:
base_currency (str): Базовая валюта
target_currency (str): Целевая валюта
date_str (str): Дата в формате 'YYYY-MM-DD'
Returns:
float: Курс валюты или None в случае ошибки
"""
try:
# Формируем URL для запроса
url = f'{self.ecb_api_url}?base={base_currency}&symbols={target_currency}&date={date_str}'
# Делаем запрос
response = requests.get(url, timeout=10)
# Проверяем статус ответа
if response.status_code != 200:
print(f"Ошибка запроса к ЕЦБ: {response.status_code}")
return None
data = response.json()
# Проверяем успешность запроса
if data.get('success', False) and 'rates' in data and target_currency in data['rates']:
return data['rates'][target_currency]
else:
print(f"Ошибка в данных от ЕЦБ: {data.get('error', 'Неизвестная ошибка')}")
except Exception as e:
print(f"Ошибка при получении данных от ЕЦБ: {e}")
return None
def _get_cache_path(self, base_currency, target_currency, date_str, source):
"""
Возвращает путь к кэш-файлу
Args:
base_currency (str): Базовая валюта
target_currency (str): Целевая валюта
date_str (str): Дата
source (str): Источник данных
Returns:
str: Путь к кэш-файлу
"""
filename = f"{base_currency}_{target_currency}_{date_str}_{source}.json"
return os.path.join(self.cache_dir, filename)
def _get_from_cache(self, base_currency, target_currency, date_str, source):
"""
Получает данные из кэша
Args:
base_currency (str): Базовая валюта
target_currency (str): Целевая валюта
date_str (str): Дата
source (str): Источник данных
Returns:
float: Курс валюты или None, если кэш отсутствует или устарел
"""
# Пропускаем кэширование для сегодняшней даты
if date_str == datetime.datetime.now().date().strftime('%Y-%m-%d'):
return None
cache_path = self._get_cache_path(base_currency, target_currency, date_str, source)
if os.path.exists(cache_path):
try:
with open(cache_path, 'r', encoding='utf-8') as file:
data = json.load(file)
return data.get('rate')
except Exception as e:
print(f"Ошибка чтения кэша: {e}")
return None
return None
def _save_to_cache(self, base_currency, target_currency, date_str, rate, source):
"""
Сохраняет данные в кэш
Args:
base_currency (str): Базовая валюта
target_currency (str): Целевая валюта
date_str (str): Дата
rate (float): Курс валюты
source (str): Источник данных
"""
# Пропускаем кэширование для сегодняшней даты
if date_str == datetime.datetime.now().date().strftime('%Y-%m-%d'):
return
cache_path = self._get_cache_path(base_currency, target_currency, date_str, source)
try:
with open(cache_path, 'w', encoding='utf-8') as file:
json.dump({'rate': rate}, file)
except Exception as e:
print(f"Ошибка при сохранении данных в кэш: {e}")
def main():
parser = argparse.ArgumentParser(description='Показывает изменение курса валют за последние дни')
parser.add_argument('base', help='Базовая валюта (например, USD)')
parser.add_argument('target', help='Целевая валюта (например, RUB)')
parser.add_argument('--days', type=int, default=7,
help='Количество дней для отображения (по умолчанию: 7)')
parser.add_argument('--source', choices=['cbr', 'ecb', 'auto'], default='auto',
help='Источник данных (по умолчанию: auto)')
parser.add_argument('--output', help='Путь к файлу для сохранения результатов')
args = parser.parse_args()
# Нормализуем коды валют
base_currency = args.base.upper()
target_currency = args.target.upper()
# Получаем курсы валют
currency_rates = CurrencyRates()
rates = currency_rates.get_rates(base_currency, target_currency, args.days, args.source)
if not rates:
print(f"Не удалось получить курс {base_currency}/{target_currency}")
return
# Рассчитываем изменения
changes = []
prev_rate = None
for date_str, rate in rates:
if prev_rate is not None and prev_rate > 0:
change_abs = rate - prev_rate
change_pct = (change_abs / prev_rate) * 100
changes.append((change_abs, change_pct))
else:
changes.append((0, 0))
prev_rate = rate
# Формируем таблицу
headers = ["Дата", f"Курс {base_currency}/{target_currency}", "Изменение", "Изменение %"]
table_data = []
for i, ((date_str, rate), (change_abs, change_pct)) in enumerate(zip(rates, changes)):
# Форматируем изменения
if i == 0:
change_text = "—"
change_pct_text = "—"
else:
sign = "+" if change_abs >= 0 else ""
change_text = f"{sign}{change_abs:.4f}"
change_pct_text = f"{sign}{change_pct:.2f}%"
table_data.append([date_str, f"{rate:.4f}", change_text, change_pct_text])
# Выводим таблицу
table = tabulate(table_data, headers, tablefmt="fancy_grid")
# Формируем итоговый текст
if len(rates) >= 2:
first_rate = rates[0][1]
last_rate = rates[-1][1]
total_change_abs = last_rate - first_rate
total_change_pct = (total_change_abs / first_rate) * 100 if first_rate > 0 else 0
output_lines = []
output_lines.append(f"Курс {base_currency}/{target_currency} за последние {args.days} дней:")
output_lines.append(table)
# Итоговое изменение
sign = "+" if total_change_abs >= 0 else ""
output_lines.append(f"\nИтоговое изменение: {sign}{total_change_abs:.4f} ({sign}{total_change_pct:.2f}%)")
# Трендовая логика для пары USD/RUB
if base_currency == 'USD' and target_currency == 'RUB':
# Рост USD/RUB означает ослабление рубля
if total_change_pct > 1:
trend = f"{target_currency} значительно ослабевает к {base_currency}"
elif total_change_pct > 0.2:
trend = f"{target_currency} ослабевает к {base_currency}"
elif total_change_pct < -1:
trend = f"{target_currency} значительно укрепляется к {base_currency}"
elif total_change_pct < -0.2:
trend = f"{target_currency} укрепляется к {base_currency}"
else:
trend = f"Курс {base_currency}/{target_currency} относительно стабилен"
elif base_currency == 'RUB':
# Рост RUB/XXX означает укрепление рубля
if total_change_pct > 1:
trend = f"{base_currency} значительно укрепляется к {target_currency}"
elif total_change_pct > 0.2:
trend = f"{base_currency} укрепляется к {target_currency}"
elif total_change_pct < -1:
trend = f"{base_currency} значительно ослабевает к {target_currency}"
elif total_change_pct < -0.2:
trend = f"{base_currency} ослабевает к {target_currency}"
else:
trend = f"Курс {base_currency}/{target_currency} относительно стабилен"
else:
# Для других валютных пар
if total_change_pct > 1:
trend = f"{target_currency} значительно ослабевает к {base_currency}"
elif total_change_pct > 0.2:
trend = f"{target_currency} ослабевает к {base_currency}"
elif total_change_pct < -1:
trend = f"{target_currency} значительно укрепляется к {base_currency}"
elif total_change_pct < -0.2:
trend = f"{target_currency} укрепляется к {base_currency}"
else:
trend = f"Курс {base_currency}/{target_currency} относительно стабилен"
output_lines.append(f"Тренд: {trend}")
# Формируем вывод
output_text = "\n".join(output_lines)
# Выводим результаты или сохраняем в файл
if args.output:
with open(args.output, 'w', encoding='utf-8') as file:
file.write(output_text)
print(f"Результаты сохранены в файл: {args.output}")
else:
print(output_text)
else:
print(f"Недостаточно данных для анализа курса {base_currency}/{target_currency}")
if __name__ == "__main__": main()
использование:
python currency_rates.py USD RUB --days 7 --source auto
18. Скрипт для переименования фотографий по дате создания
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import argparse
import datetime
import shutil
from PIL import Image
from PIL.ExifTags import TAGS
def get_image_date(image_path):
"""
Извлекает дату создания изображения из EXIF-данных
Args:
image_path (str): Путь к изображению
Returns:
datetime.datetime: Дата и время создания изображения или None
"""
try:
img = Image.open(image_path)
# Проверяем, есть ли EXIF-данные
if not hasattr(img, '_getexif') or img._getexif() is None:
return None
exif = {
TAGS.get(tag, tag): value
for tag, value in img._getexif().items()
}
# Порядок приоритета для полей с датой
date_fields = [
'DateTimeOriginal', # Дата и время создания оригинального изображения
'CreateDate', # Дата создания
'DateTimeDigitized', # Дата и время оцифровки
'DateTime' # Дата и время изменения файла
]
# Ищем первое непустое поле с датой
for field in date_fields:
if field in exif and exif[field]:
date_str = exif[field]
# Парсим строку даты в формате "YYYY:MM:DD HH:MM:SS"
try:
return datetime.datetime.strptime(date_str, '%Y:%m:%d %H:%M:%S')
except ValueError:
continue
return None
except Exception as e:
print(f"Ошибка при извлечении EXIF-данных из {image_path}: {e}")
return None
def get_file_date(file_path):
"""
Получает дату модификации файла
Args:
file_path (str): Путь к файлу
Returns:
datetime.datetime: Дата и время модификации файла
"""
mod_time = os.path.getmtime(file_path)
return datetime.datetime.fromtimestamp(mod_time)
def rename_photos(directory, output_dir=None, format_str='%Y-%m-%d_%H-%M-%S',
recursive=False, use_file_date=False, preview=False):
"""
Переименовывает фотографии в директории по дате создания
Args:
directory (str): Путь к директории с фотографиями
output_dir (str): Путь к директории для сохранения переименованных файлов
format_str (str): Строка формата для имени файла
recursive (bool): Рекурсивный обход директорий
use_file_date (bool): Использовать дату модификации файла, если EXIF-данные недоступны
preview (bool): Режим предварительного просмотра (без переименования)
Returns:
list: Список кортежей (старое_имя, новое_имя)
"""
# Проверяем существование директории
if not os.path.isdir(directory):
raise ValueError(f"Директория '{directory}' не существует")
# Если output_dir не указан, используем исходную директорию
if output_dir is None:
output_dir = directory
else:
# Создаем output_dir, если он не существует
os.makedirs(output_dir, exist_ok=True)
# Список операций переименования
rename_operations = []
# Функция обработки директории
def process_directory(dir_path):
nonlocal rename_operations
# Получаем список файлов в директории
items = os.listdir(dir_path)
for item in items:
item_path = os.path.join(dir_path, item)
# Если это директория и включен рекурсивный обход
if os.path.isdir(item_path) and recursive:
process_directory(item_path)
continue
# Пропускаем не-файлы
if not os.path.isfile(item_path):
continue
# Определяем расширение файла
_, ext = os.path.splitext(item_path)
ext = ext.lower()
# Обрабатываем только изображения
if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp']:
continue
# Получаем дату создания
date = get_image_date(item_path)
# Если EXIF-данные недоступны и разрешено использовать дату файла
if date is None and use_file_date:
date = get_file_date(item_path)
# Если дата все равно не определена, пропускаем файл
if date is None:
print(f"Пропуск {item_path}: не удалось определить дату создания")
continue
# Формируем новое имя файла
new_filename = date.strftime(format_str) + ext
new_path = os.path.join(output_dir, new_filename)
# Если файл с таким именем уже существует, добавляем суффикс
counter = 1
while os.path.exists(new_path) and new_path != item_path:
new_filename = f"{date.strftime(format_str)}_{counter}{ext}"
new_path = os.path.join(output_dir, new_filename)
counter += 1
# Добавляем операцию переименования
rename_operations.append((item_path, new_path))
# Обрабатываем директорию
process_directory(directory)
# Выполняем переименование, если не в режиме предпросмотра
if not preview:
for old_path, new_path in rename_operations:
# Если пути совпадают, пропускаем
if old_path == new_path:
continue
try:
# Если output_dir совпадает с исходной директорией, используем rename
if os.path.dirname(old_path) == os.path.dirname(new_path):
os.rename(old_path, new_path)
else:
# Иначе копируем файл в новую директорию
shutil.copy2(old_path, new_path)
except Exception as e:
print(f"Ошибка при переименовании {old_path} -> {new_path}: {e}")
return rename_operations
def main():
parser = argparse.ArgumentParser(description='Переименование фотографий по дате создания из EXIF')
parser.add_argument('directory', help='Директория с фотографиями')
parser.add_argument('--output', help='Директория для сохранения переименованных файлов')
parser.add_argument('--format', default='%Y-%m-%d_%H-%M-%S',
help='Формат имени файла (по умолчанию: %%Y-%%m-%%d_%%H-%%M-%%S)')
parser.add_argument('--recursive', action='store_true',
help='Рекурсивный обход директорий')
parser.add_argument('--use-file-date', action='store_true',
help='Использовать дату модификации файла, если EXIF недоступен')
parser.add_argument('--preview', action='store_true',
help='Режим предварительного просмотра (без переименования)')
args = parser.parse_args()
try:
print(f"Обработка фотографий в директории {args.directory}...")
rename_ops = rename_photos(
args.directory,
args.output,
args.format,
args.recursive,
args.use_file_date,
args.preview
)
# Выводим результаты
if not rename_ops:
print("Не найдено фотографий для переименования")
else:
print(f"{'Предварительный просмотр переименования' if args.preview else 'Переименовано'} {len(rename_ops)} файлов:")
for i, (old_path, new_path) in enumerate(rename_ops, 1):
old_name = os.path.basename(old_path)
new_name = os.path.basename(new_path)
# Пропускаем файлы, которые не изменились
if old_path == new_path:
continue
print(f" {i}. {old_name} -> {new_name}")
if args.preview:
print("\nФайлы не были переименованы. Используйте команду без --preview для выполнения переименования.")
except Exception as e:
print(f"Ошибка: {e}")
if __name__ == "__main__":
main()
Использование:
python rename_photos.py /путь/к/директории --recursive --use-file-date --preview
19. Утилита для мониторинга использования CPU и памяти
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import time
import argparse
import psutil
import curses
import datetime
from collections import deque
class ProcessMonitor:
"""
Класс для мониторинга использования CPU и памяти процесса
"""
def __init__(self, pid=None, interval=2, history_size=30):
"""
Инициализирует монитор процесса
Args:
pid (int): ID процесса для мониторинга (по умолчанию текущий процесс)
interval (float): Интервал обновления в секундах
history_size (int): Размер истории для графиков
"""
# Если PID не указан, используем текущий процесс
self.pid = pid if pid is not None else os.getpid()
self.interval = interval
self.history_size = history_size
# Получаем объект процесса
try:
self.process = psutil.Process(self.pid)
self.process_info = {
'name': self.process.name(),
'username': self.process.username(),
'create_time': datetime.datetime.fromtimestamp(
self.process.create_time()
).strftime('%Y-%m-%d %H:%M:%S')
}
except psutil.NoSuchProcess:
raise ValueError(f"Процесс с PID {self.pid} не найден")
# История использования ресурсов
self.cpu_history = deque(maxlen=history_size)
self.memory_history = deque(maxlen=history_size)
self.io_read_history = deque(maxlen=history_size)
self.io_write_history = deque(maxlen=history_size)
# Заполняем историю начальными значениями
# Это поможет графикам отобразиться сразу
self.cpu_history.append(0)
self.memory_history.append(0)
self.io_read_history.append(0)
self.io_write_history.append(0)
# Начальные значения для IO
try:
io_counters = self.process.io_counters()
self.last_read_bytes = io_counters.read_bytes
self.last_write_bytes = io_counters.write_bytes
except (psutil.AccessDenied, AttributeError):
self.last_read_bytes = 0
self.last_write_bytes = 0
def get_system_info(self):
"""
Получает общую информацию о системе
Returns:
dict: Информация о системе
"""
return {
'cpu_count': psutil.cpu_count(),
'cpu_percent': psutil.cpu_percent(),
'memory_total': psutil.virtual_memory().total,
'memory_available': psutil.virtual_memory().available,
'memory_percent': psutil.virtual_memory().percent
}
def update_stats(self):
"""
Обновляет статистику использования ресурсов
Returns:
dict: Текущая статистика процесса
"""
try:
# CPU и память
cpu_percent = self.process.cpu_percent(interval=None) / psutil.cpu_count()
memory_info = self.process.memory_info()
memory_percent = self.process.memory_percent()
# IO статистика
try:
io_counters = self.process.io_counters()
read_bytes = io_counters.read_bytes
write_bytes = io_counters.write_bytes
# Вычисляем скорость IO в байтах/сек
read_speed = (read_bytes - self.last_read_bytes) / self.interval
write_speed = (write_bytes - self.last_write_bytes) / self.interval
self.last_read_bytes = read_bytes
self.last_write_bytes = write_bytes
except (psutil.AccessDenied, AttributeError):
read_bytes = 0
write_bytes = 0
read_speed = 0
write_speed = 0
# Информация о потоках и дескрипторах
num_threads = self.process.num_threads()
try:
num_fds = self.process.num_fds()
except AttributeError:
num_fds = 0
# Обновляем историю
self.cpu_history.append(cpu_percent)
self.memory_history.append(memory_percent)
self.io_read_history.append(read_speed)
self.io_write_history.append(write_speed)
# Формируем результат
return {
'cpu_percent': cpu_percent,
'memory_rss': memory_info.rss,
'memory_vms': memory_info.vms,
'memory_percent': memory_percent,
'read_bytes': read_bytes,
'write_bytes': write_bytes,
'read_speed': read_speed,
'write_speed': write_speed,
'num_threads': num_threads,
'num_fds': num_fds
}
except psutil.NoSuchProcess:
raise ValueError(f"Процесс с PID {self.pid} завершен")
def format_bytes(self, bytes_value):
"""
Форматирует количество байт в человеко-читаемый формат
Args:
bytes_value (int): Количество байт
Returns:
str: Отформатированная строка
"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_value < 1024:
return f"{bytes_value:.2f} {unit}"
bytes_value /= 1024
return f"{bytes_value:.2f} PB"
def draw_graph(self, win, y, x, width, height, data, title, max_value=None):
"""
Отрисовывает график в окне curses
Args:
win: Окно curses
y, x (int): Координаты верхнего левого угла
width, height (int): Ширина и высота графика
data (list): Данные для отображения
title (str): Заголовок графика
max_value (float): Максимальное значение для масштабирования
"""
# Отрисовываем рамку (используем ASCII символы вместо Unicode)
win.addstr(y, x, '+' + '-' * (width - 2) + '+')
for i in range(1, height - 1):
win.addstr(y + i, x, '|')
win.addstr(y + i, x + width - 1, '|')
win.addstr(y + height - 1, x, '+' + '-' * (width - 2) + '+')
# Заголовок
win.addstr(y, x + 2, f' {title} ')
# Определяем максимальное значение
if max_value is None:
max_value = max(data) if data else 1
# Выводим текущее значение
if data:
try:
win.addstr(y + 1, x + 2, f'Текущее: {data[-1]:.2f}%')
except:
pass
# Коэффициент масштабирования
scale = (height - 3) / max_value if max_value > 0 else 1
# Отображаем данные
for i, value in enumerate(data):
if i >= width - 2:
break
# Вычисляем высоту столбца
bar_height = int(value * scale)
bar_height = min(bar_height, height - 3)
# Отрисовываем столбец с использованием ASCII символов
for j in range(bar_height):
win.addstr(y + height - 2 - j, x + i + 1, '#')
def run_curses_ui(self, stdscr):
"""
Запускает интерфейс мониторинга на базе curses
Args:
stdscr: Стандартный экран curses
"""
# Настройка curses
curses.curs_set(0) # Скрываем курсор
curses.start_color()
curses.use_default_colors()
# Определяем цветовые пары
curses.init_pair(1, curses.COLOR_GREEN, -1)
curses.init_pair(2, curses.COLOR_CYAN, -1)
curses.init_pair(3, curses.COLOR_YELLOW, -1)
curses.init_pair(4, curses.COLOR_RED, -1)
# Получаем размеры терминала
max_y, max_x = stdscr.getmaxyx()
# Главный цикл
running = True
while running:
try:
# Очищаем экран
stdscr.clear()
# Получаем статистику
stats = self.update_stats()
system_info = self.get_system_info()
# Заголовок
header = f" Монитор процесса: {self.process_info['name']} (PID: {self.pid}) "
stdscr.addstr(0, (max_x - len(header)) // 2, header, curses.A_BOLD)
# Информация о процессе
process_info_y = 2
stdscr.addstr(process_info_y, 2, f"Имя процесса: {self.process_info['name']}", curses.color_pair(1))
stdscr.addstr(process_info_y + 1, 2, f"PID: {self.pid}", curses.color_pair(1))
stdscr.addstr(process_info_y + 2, 2, f"Пользователь: {self.process_info['username']}", curses.color_pair(1))
stdscr.addstr(process_info_y + 3, 2, f"Время запуска: {self.process_info['create_time']}", curses.color_pair(1))
# Информация о системе
system_info_y = 2
system_info_x = max_x // 2
stdscr.addstr(system_info_y, system_info_x, f"CPU: {system_info['cpu_percent']}% (всего ядер: {system_info['cpu_count']})", curses.color_pair(2))
stdscr.addstr(system_info_y + 1, system_info_x, f"Память: {system_info['memory_percent']}% использовано", curses.color_pair(2))
stdscr.addstr(system_info_y + 2, system_info_x, f"Всего памяти: {self.format_bytes(system_info['memory_total'])}", curses.color_pair(2))
stdscr.addstr(system_info_y + 3, system_info_x, f"Доступно: {self.format_bytes(system_info['memory_available'])}", curses.color_pair(2))
# Текущая статистика процесса
stats_y = 7
stdscr.addstr(stats_y, 2, f"CPU: {stats['cpu_percent']:.2f}%", curses.color_pair(3))
stdscr.addstr(stats_y + 1, 2, f"Память: {stats['memory_percent']:.2f}% ({self.format_bytes(stats['memory_rss'])})", curses.color_pair(3))
stdscr.addstr(stats_y + 2, 2, f"Чтение: {self.format_bytes(stats['read_speed'])}/с", curses.color_pair(3))
stdscr.addstr(stats_y + 3, 2, f"Запись: {self.format_bytes(stats['write_speed'])}/с", curses.color_pair(3))
stdscr.addstr(stats_y + 4, 2, f"Потоки: {stats['num_threads']}", curses.color_pair(3))
stdscr.addstr(stats_y + 5, 2, f"Файловые дескрипторы: {stats['num_fds']}", curses.color_pair(3))
# Отладочная информация о количестве данных
stdscr.addstr(12, 2, f"CPU история: {len(self.cpu_history)} точек", curses.color_pair(4))
stdscr.addstr(12, max_x // 2, f"Память история: {len(self.memory_history)} точек", curses.color_pair(4))
# Графики
graph_width = (max_x - 6) // 2
graph_height = 10
# График CPU
self.draw_graph(stdscr, 13, 2, graph_width, graph_height,
list(self.cpu_history), "CPU %", 100)
# График памяти
self.draw_graph(stdscr, 13, 4 + graph_width, graph_width, graph_height,
list(self.memory_history), "Память %", 100)
# Инструкции
footer = " Q: Выход | R: Сброс графиков "
stdscr.addstr(max_y - 1, (max_x - len(footer)) // 2, footer, curses.A_BOLD)
# Обновляем экран
stdscr.refresh()
# Ожидаем ввод с таймаутом
stdscr.timeout(int(self.interval * 1000))
key = stdscr.getch()
# Обрабатываем ввод
if key == ord('q') or key == ord('Q'):
running = False
elif key == ord('r') or key == ord('R'):
self.cpu_history.clear()
self.memory_history.clear()
self.io_read_history.clear()
self.io_write_history.clear()
except (KeyboardInterrupt, ValueError):
break
except curses.error:
# Игнорируем ошибки curses, которые могут возникнуть при изменении размера терминала
pass
def main():
parser = argparse.ArgumentParser(description='Мониторинг использования CPU и памяти процесса')
parser.add_argument('--pid', type=int, help='ID процесса для мониторинга (по умолчанию: текущий процесс)')
parser.add_argument('--interval', type=float, default=2.0,
help='Интервал обновления в секундах (по умолчанию: 2.0)')
parser.add_argument('--history', type=int, default=30,
help='Размер истории для графиков (по умолчанию: 30)')
args = parser.parse_args()
try:
# Инициализируем монитор
monitor = ProcessMonitor(args.pid, args.interval, args.history)
# Запускаем интерфейс
curses.wrapper(monitor.run_curses_ui)
except ValueError as e:
print(f"Ошибка: {e}")
except KeyboardInterrupt:
print("\nМониторинг остановлен")
if __name__ == "__main__":
main()
Использование:
pip install windows-cursespython process_monitor.py --interval 1.0
20. Генератор случайных безопасных паролей
20. Генератор случайных безопасных паролей
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import random
import string
import secrets
import re
import sys
import math
class PasswordGenerator:
"""
Генератор случайных паролей с настраиваемыми параметрами
"""
def __init__(self):
# Наборы символов для генерации паролей
self.lowercase = string.ascii_lowercase
self.uppercase = string.ascii_uppercase
self.digits = string.digits
self.special_chars = "!@#$%^&*()-_=+[]{}|;:,.<>?/~"
self.similar_chars = "iIl1Lo0OB8S5Z2"
self.ambiguous_chars = "{}[]()/\\'\"`~,;:.<>"
def generate_password(self, length=12, use_uppercase=True, use_digits=True,
use_special=True, no_similar=False, no_ambiguous=False,
min_uppercase=1, min_digits=1, min_special=1):
"""
Генерирует случайный пароль с заданными параметрами
Args:
length (int): Длина пароля
use_uppercase (bool): Использовать заглавные буквы
use_digits (bool): Использовать цифры
use_special (bool): Использовать специальные символы
no_similar (bool): Исключить похожие символы (iIl1Lo0OB8S5Z2)
no_ambiguous (bool): Исключить неоднозначные символы ({}[]()/\'"`~,;:.<>)
min_uppercase (int): Минимальное количество заглавных букв
min_digits (int): Минимальное количество цифр
min_special (int): Минимальное количество специальных символов
Returns:
str: Сгенерированный пароль
"""
# Проверяем, что длина пароля достаточна для удовлетворения всех требований
min_required = 0
if use_uppercase:
min_required += min_uppercase
if use_digits:
min_required += min_digits
if use_special:
min_required += min_special
if length < min_required:
raise ValueError(f"Длина пароля ({length}) меньше минимальной требуемой длины ({min_required})")
# Формируем набор доступных символов
available_chars = self.lowercase
if use_uppercase:
available_chars += self.uppercase
if use_digits:
available_chars += self.digits
if use_special:
available_chars += self.special_chars
# Исключаем похожие символы, если требуется
if no_similar:
available_chars = ''.join(c for c in available_chars if c not in self.similar_chars)
# Исключаем неоднозначные символы, если требуется
if no_ambiguous:
available_chars = ''.join(c for c in available_chars if c not in self.ambiguous_chars)
# Проверяем, что остались символы для генерации пароля
if not available_chars:
raise ValueError("После применения всех ограничений не осталось символов для генерации пароля")
# Формируем наборы символов для удовлетворения требований по минимальному количеству символов uppercase_chars = ''.join(c for c in self.uppercase if c in available_chars)
digit_chars = ''.join(c for c in self.digits if c in available_chars)
special_chars = ''.join(c for c in self.special_chars if c in available_chars)
# Проверяем, что доступны символы для удовлетворения всех требований
if use_uppercase and min_uppercase > 0 and not uppercase_chars:
raise ValueError("Не осталось заглавных букв для удовлетворения требований")
if use_digits and min_digits > 0 and not digit_chars:
raise ValueError("Не осталось цифр для удовлетворения требований")
if use_special and min_special > 0 and not special_chars:
raise ValueError("Не осталось специальных символов для удовлетворения требований")
# Генерируем пароль
password = []
# Добавляем минимальное количество символов каждого типа
if use_uppercase and min_uppercase > 0:
password.extend(secrets.choice(uppercase_chars) for _ in range(min_uppercase))
if use_digits and min_digits > 0:
password.extend(secrets.choice(digit_chars) for _ in range(min_digits))
if use_special and min_special > 0:
password.extend(secrets.choice(special_chars) for _ in range(min_special))
# Добавляем оставшиеся символы
remaining_length = length - len(password)
password.extend(secrets.choice(available_chars) for _ in range(remaining_length))
# Перемешиваем пароль для случайного порядка символов
random.shuffle(password)
# Преобразуем список символов в строку
return ''.join(password)
def calculate_password_strength(self, password):
"""
Оценивает стойкость пароля
Args:
password (str): Пароль для оценки
Returns:
dict: Информация о стойкости пароля
"""
# Проверяем длину пароля
length = len(password)
# Проверяем наличие разных типов символов
has_lowercase = bool(re.search(r'[a-z]', password))
has_uppercase = bool(re.search(r'[A-Z]', password))
has_digits = bool(re.search(r'\d', password))
has_special = bool(re.search(r'[^a-zA-Z0-9]', password))
# Подсчитываем количество разных типов символов
char_types = sum([has_lowercase, has_uppercase, has_digits, has_special])
# Оцениваем энтропию (в битах)
charset_size = 0
if has_lowercase:
charset_size += 26
if has_uppercase:
charset_size += 26
if has_digits:
charset_size += 10
if has_special:
charset_size += 33 # Примерное количество спецсимволов
# Вычисляем энтропию по формуле: log2(charset_size^length)
entropy = length * math.log2(max(charset_size, 1))
# Оцениваем время взлома
# Предполагаем, что современный компьютер может проверять
# около 1 миллиарда паролей в секунду при атаке перебором
crack_time_seconds = (charset_size ** length) / (10**9 * 2)
# Определяем уровень стойкости
if entropy < 40:
strength = "Очень слабый"
elif entropy < 60:
strength = "Слабый"
elif entropy < 80:
strength = "Средний"
elif entropy < 100:
strength = "Сильный"
else:
strength = "Очень сильный"
# Форматируем время взлома
if crack_time_seconds < 60:
crack_time = f"{crack_time_seconds:.2f} секунд"
elif crack_time_seconds < 3600:
crack_time = f"{crack_time_seconds / 60:.2f} минут"
elif crack_time_seconds < 86400:
crack_time = f"{crack_time_seconds / 3600:.2f} часов"
elif crack_time_seconds < 31536000:
crack_time = f"{crack_time_seconds / 86400:.2f} дней"
elif crack_time_seconds < 31536000 * 100:
crack_time = f"{crack_time_seconds / 31536000:.2f} лет"
else:
crack_time = "более 100 лет"
return {
'length': length,
'has_lowercase': has_lowercase,
'has_uppercase': has_uppercase,
'has_digits': has_digits,
'has_special': has_special,
'char_types': char_types,
'entropy': entropy,
'crack_time': crack_time,
'strength': strength
}
def generate_multiple_passwords(self, count, **kwargs):
"""
Генерирует несколько паролей с одинаковыми параметрами
Args:
count (int): Количество паролей
**kwargs: Параметры для generate_password
Returns:
list: Список сгенерированных паролей
"""
passwords = []
for _ in range(count):
passwords.append(self.generate_password(**kwargs))
return passwords
def format_password_strength(strength_info):
"""
Форматирует информацию о стойкости пароля для вывода
Args:
strength_info (dict): Информация о стойкости пароля
Returns:
str: Отформатированная строка
"""
lines = []
lines.append(f"Уровень стойкости: {strength_info['strength']}")
lines.append(f"Длина: {strength_info['length']} символов")
# Информация о типах символов
char_types = []
if strength_info['has_lowercase']:
char_types.append("строчные буквы")
if strength_info['has_uppercase']:
char_types.append("заглавные буквы")
if strength_info['has_digits']:
char_types.append("цифры")
if strength_info['has_special']:
char_types.append("специальные символы")
lines.append(f"Используемые типы символов ({strength_info['char_types']}): {', '.join(char_types)}")
lines.append(f"Энтропия: {strength_info['entropy']:.2f} бит")
lines.append(f"Примерное время взлома перебором: {strength_info['crack_time']}")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description='Генератор случайных безопасных паролей')
parser.add_argument('--length', type=int, default=12,
help='Длина пароля (по умолчанию: 12)')
parser.add_argument('--count', type=int, default=1,
help='Количество паролей для генерации (по умолчанию: 1)')
parser.add_argument('--no-uppercase', action='store_true',
help='Не использовать заглавные буквы')
parser.add_argument('--no-digits', action='store_true',
help='Не использовать цифры')
parser.add_argument('--no-special', action='store_true',
help='Не использовать специальные символы')
parser.add_argument('--no-similar', action='store_true',
help='Исключить похожие символы (iIl1Lo0OB8S5Z2)')
parser.add_argument('--no-ambiguous', action='store_true',
help='Исключить неоднозначные символы ({}[]()/\\\'"`~,;:.<>)')
parser.add_argument('--min-uppercase', type=int, default=1,
help='Минимальное количество заглавных букв (по умолчанию: 1)')
parser.add_argument('--min-digits', type=int, default=1,
help='Минимальное количество цифр (по умолчанию: 1)')
parser.add_argument('--min-special', type=int, default=1,
help='Минимальное количество специальных символов (по умолчанию: 1)')
parser.add_argument('--analyze', action='store_true',
help='Анализировать стойкость сгенерированных паролей')
parser.add_argument('--check', help='Проверить стойкость указанного пароля')
args = parser.parse_args()
generator = PasswordGenerator()
# Проверяем указанный пароль
if args.check:
strength_info = generator.calculate_password_strength(args.check)
print(f"Анализ пароля: {args.check}")
print("-" * 50)
print(format_password_strength(strength_info))
return
try:
# Генерируем пароли
passwords = generator.generate_multiple_passwords(
args.count,
length=args.length,
use_uppercase=not args.no_uppercase,
use_digits=not args.no_digits,
use_special=not args.no_special,
no_similar=args.no_similar,
no_ambiguous=args.no_ambiguous,
min_uppercase=args.min_uppercase if not args.no_uppercase else 0,
min_digits=args.min_digits if not args.no_digits else 0,
min_special=args.min_special if not args.no_special else 0
)
# Выводим пароли
for i, password in enumerate(passwords, 1):
print(f"Пароль {i}: {password}")
# Анализируем стойкость, если требуется
if args.analyze:
strength_info = generator.calculate_password_strength(password)
print("-" * 50)
print(format_password_strength(strength_info))
print("-" * 50)
except ValueError as e:
print(f"Ошибка: {e}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
Использование:
python password_generator.py --length 16 --min-special 2 --no-similar --analyze
21. Простая система контроля версий (VCS)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import json
import shutil
import hashlib
import datetime
import argparse
from pathlib import Path
class SimpleVCS:
"""
Простая система контроля версий, которая умеет создавать снимки состояния директории
и восстанавливать их по требованию
"""
def __init__(self, directory):
"""
Инициализирует систему контроля версий для указанной директории
Args:
directory (str): Путь к директории
"""
self.directory = os.path.abspath(directory)
# Проверяем существование директории
if not os.path.isdir(self.directory):
raise ValueError(f"Директория '{directory}' не существует")
# Директория для хранения снимков
self.vcs_dir = os.path.join(self.directory, ".simple_vcs")
# Файл для хранения метаданных
self.meta_file = os.path.join(self.vcs_dir, "metadata.json")
# Инициализируем репозиторий, если необходимо
self._init_repo()
def _init_repo(self):
"""
Инициализирует репозиторий, если он не существует
"""
# Создаем директорию для снимков, если она не существует
if not os.path.exists(self.vcs_dir):
os.makedirs(self.vcs_dir)
# Инициализируем метаданные
metadata = {
"snapshots": [],
"last_snapshot_id": 0,
"created_at": datetime.datetime.now().isoformat()
}
# Сохраняем метаданные
with open(self.meta_file, 'w', encoding='utf-8') as file:
json.dump(metadata, file, indent=4)
print(f"Инициализирован репозиторий в {self.directory}")
def _load_metadata(self):
"""
Загружает метаданные репозитория
Returns:
dict: Метаданные репозитория
"""
with open(self.meta_file, 'r', encoding='utf-8') as file:
return json.load(file)
def _save_metadata(self, metadata):
"""
Сохраняет метаданные репозитория
Args:
metadata (dict): Метаданные репозитория
"""
with open(self.meta_file, 'w', encoding='utf-8') as file:
json.dump(metadata, file, indent=4)
def _calculate_file_hash(self, file_path):
"""
Вычисляет хеш содержимого файла
Args:
file_path (str): Путь к файлу
Returns:
str: Хеш файла
"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def _get_snapshot_dir(self, snapshot_id):
"""
Возвращает путь к директории снимка
Args:
snapshot_id (int): ID снимка
Returns:
str: Путь к директории снимка
"""
return os.path.join(self.vcs_dir, f"snapshot_{snapshot_id}")
def create_snapshot(self, message=None):
"""
Создает снимок текущего состояния директории
Args:
message (str): Сообщение для снимка
Returns:
int: ID созданного снимка
"""
# Загружаем метаданные
metadata = self._load_metadata()
# Генерируем ID для нового снимка
snapshot_id = metadata["last_snapshot_id"] + 1
# Директория для хранения снимка
snapshot_dir = self._get_snapshot_dir(snapshot_id)
os.makedirs(snapshot_dir)
# Собираем информацию о файлах
files_info = []
for root, dirs, files in os.walk(self.directory):
# Пропускаем директорию с репозиторием
if root == self.vcs_dir or root.startswith(self.vcs_dir + os.path.sep):
continue
# Пропускаем скрытые директории
if os.path.basename(root).startswith('.'):
continue
for file in files:
# Пропускаем скрытые файлы
if file.startswith('.'):
continue
file_path = os.path.join(root, file)
rel_path = os.path.relpath(file_path, self.directory)
# Пропускаем файлы в игнорируемых директориях
if any(part.startswith('.') for part in rel_path.split(os.path.sep)):
continue
try:
# Вычисляем хеш файла
file_hash = self._calculate_file_hash(file_path)
# Добавляем информацию о файле
file_info = {
"path": rel_path,
"hash": file_hash,
"size": os.path.getsize(file_path),
"modified": os.path.getmtime(file_path)
}
files_info.append(file_info)
# Копируем файл в директорию снимка
snapshot_file_path = os.path.join(snapshot_dir, file_hash)
if not os.path.exists(snapshot_file_path):
shutil.copy2(file_path, snapshot_file_path)
except (IOError, OSError) as e:
print(f"Ошибка при обработке файла {file_path}: {e}")
# Создаем информацию о снимке
snapshot = {
"id": snapshot_id,
"created_at": datetime.datetime.now().isoformat(),
"message": message,
"files": files_info
}
# Сохраняем метаданные снимка
snapshot_meta_file = os.path.join(snapshot_dir, "snapshot.json")
with open(snapshot_meta_file, 'w', encoding='utf-8') as file:
json.dump(snapshot, file, indent=4)
# Обновляем общие метаданные
metadata["snapshots"].append({
"id": snapshot_id,
"created_at": snapshot["created_at"],
"message": message,
"files_count": len(files_info)
})
metadata["last_snapshot_id"] = snapshot_id
self._save_metadata(metadata)
print(f"Создан снимок #{snapshot_id}: {message or 'Без описания'}")
print(f"Сохранено {len(files_info)} файлов")
return snapshot_id
def list_snapshots(self):
"""
Выводит список всех снимков
"""
metadata = self._load_metadata()
if not metadata["snapshots"]:
print("Снимки отсутствуют")
return
print(f"Репозиторий: {self.directory}")
print(f"Всего снимков: {len(metadata['snapshots'])}")
print("-" * 80)
for snapshot in metadata["snapshots"]:
created_at = datetime.datetime.fromisoformat(snapshot["created_at"])
formatted_date = created_at.strftime("%Y-%m-%d %H:%M:%S")
print(f"Снимок #{snapshot['id']} от {formatted_date}")
print(f"Файлов: {snapshot['files_count']}")
if snapshot["message"]:
print(f"Описание: {snapshot['message']}")
print("-" * 80)
def restore_snapshot(self, snapshot_id, target_dir=None, force=False):
"""
Восстанавливает состояние директории из снимка
Args:
snapshot_id (int): ID снимка для восстановления
target_dir (str): Целевая директория для восстановления (если не указана, используется исходная)
force (bool): Принудительное восстановление (перезапись существующих файлов)
"""
# Загружаем метаданные
metadata = self._load_metadata()
# Проверяем существование снимка
if not any(snap["id"] == snapshot_id for snap in metadata["snapshots"]):
print(f"Снимок #{snapshot_id} не найден")
return False
# Если целевая директория не указана, используем исходную
if target_dir is None:
target_dir = self.directory
else:
target_dir = os.path.abspath(target_dir)
# Создаем целевую директорию, если она не существует
if not os.path.exists(target_dir):
os.makedirs(target_dir)
# Путь к директории снимка
snapshot_dir = self._get_snapshot_dir(snapshot_id)
# Загружаем информацию о снимке
snapshot_meta_file = os.path.join(snapshot_dir, "snapshot.json")
try:
with open(snapshot_meta_file, 'r', encoding='utf-8') as file:
snapshot = json.load(file)
except FileNotFoundError:
print(f"Метаданные снимка #{snapshot_id} не найдены")
return False
# Проверяем наличие изменений в целевой директории
if not force and target_dir == self.directory:
has_changes = False
for file_info in snapshot["files"]:
target_path = os.path.join(target_dir, file_info["path"])
if os.path.exists(target_path):
if self._calculate_file_hash(target_path) != file_info["hash"]:
has_changes = True
break
else:
has_changes = True
break
if has_changes:
print("В директории есть несохраненные изменения.")
print("Используйте --force для принудительного восстановления.")
return False
# Восстанавливаем файлы
restored_count = 0
skipped_count = 0
for file_info in snapshot["files"]:
source_path = os.path.join(snapshot_dir, file_info["hash"])
target_path = os.path.join(target_dir, file_info["path"])
# Создаем директории, если необходимо
os.makedirs(os.path.dirname(target_path), exist_ok=True)
try:
# Если файл существует и не изменялся, пропускаем его
if os.path.exists(target_path) and not force:
if self._calculate_file_hash(target_path) == file_info["hash"]:
skipped_count += 1
continue
# Копируем файл
shutil.copy2(source_path, target_path)
restored_count += 1
except (IOError, OSError) as e:
print(f"Ошибка при восстановлении файла {file_info['path']}: {e}")
created_at = datetime.datetime.fromisoformat(snapshot["created_at"])
formatted_date = created_at.strftime("%Y-%m-%d %H:%M:%S")
print(f"Восстановлен снимок #{snapshot_id} от {formatted_date}")
if snapshot["message"]:
print(f"Описание: {snapshot['message']}")
print(f"Восстановлено файлов: {restored_count}")
print(f"Пропущено файлов: {skipped_count}")
return True
def show_diff(self, snapshot_id=None):
"""
Показывает различия между снимком и текущим состоянием директории
Args:
snapshot_id (int): ID снимка для сравнения (если не указан, используется последний)
"""
# Загружаем метаданные
metadata = self._load_metadata()
if not metadata["snapshots"]:
print("Снимки отсутствуют")
return
# Если ID снимка не указан, используем последний
if snapshot_id is None:
snapshot_id = metadata["last_snapshot_id"]
else:
# Проверяем существование снимка
if not any(snap["id"] == snapshot_id for snap in metadata["snapshots"]):
print(f"Снимок #{snapshot_id} не найден")
return
# Путь к директории снимка
snapshot_dir = self._get_snapshot_dir(snapshot_id)
# Загружаем информацию о снимке
snapshot_meta_file = os.path.join(snapshot_dir, "snapshot.json")
try:
with open(snapshot_meta_file, 'r', encoding='utf-8') as file:
snapshot = json.load(file)
except FileNotFoundError:
print(f"Метаданные снимка #{snapshot_id} не найдены")
return
# Создаем словарь файлов снимка для быстрого поиска
snapshot_files = {file_info["path"]: file_info for file_info in snapshot["files"]}
# Текущие файлы
current_files = {}
for root, dirs, files in os.walk(self.directory):
# Пропускаем директорию с репозиторием
if root == self.vcs_dir or root.startswith(self.vcs_dir + os.path.sep):
continue
# Пропускаем скрытые директории
if os.path.basename(root).startswith('.'):
continue
for file in files:
# Пропускаем скрытые файлы
if file.startswith('.'):
continue
file_path = os.path.join(root, file)
rel_path = os.path.relpath(file_path, self.directory)
# Пропускаем файлы в игнорируемых директориях
if any(part.startswith('.') for part in rel_path.split(os.path.sep)):
continue
try:
# Вычисляем хеш файла
file_hash = self._calculate_file_hash(file_path)
# Добавляем информацию о файле
current_files[rel_path] = {
"hash": file_hash,
"size": os.path.getsize(file_path)
}
except (IOError, OSError) as e:
print(f"Ошибка при обработке файла {file_path}: {e}")
# Находим различия
modified_files = []
new_files = []
deleted_files = []
# Измененные и удаленные файлы
for path, file_info in snapshot_files.items():
if path in current_files:
if current_files[path]["hash"] != file_info["hash"]:
modified_files.append(path)
else:
deleted_files.append(path)
# Новые файлы
for path in current_files:
if path not in snapshot_files:
new_files.append(path)
# Выводим результаты
created_at = datetime.datetime.fromisoformat(snapshot["created_at"])
formatted_date = created_at.strftime("%Y-%m-%d %H:%M:%S")
print(f"Сравнение с снимком #{snapshot_id} от {formatted_date}:")
if not modified_files and not new_files and not deleted_files:
print("Изменений нет")
return
if modified_files:
print("\nИзмененные файлы:")
for path in sorted(modified_files):
snapshot_size = snapshot_files[path]["size"]
current_size = current_files[path]["size"]
size_diff = current_size - snapshot_size
sign = "+" if size_diff >= 0 else ""
print(f" M {path} ({sign}{size_diff} байт)")
if new_files:
print("\nНовые файлы:")
for path in sorted(new_files):
size = current_files[path]["size"]
print(f" A {path} ({size} байт)")
if deleted_files:
print("\nУдаленные файлы:")
for path in sorted(deleted_files):
size = snapshot_files[path]["size"]
print(f" D {path} ({size} байт)")
print(f"\nВсего изменений: {len(modified_files) + len(new_files) + len(deleted_files)}")
def main():
parser = argparse.ArgumentParser(description='Простая система контроля версий')
parser.add_argument('--directory', '-d', default=os.getcwd(),
help='Путь к директории (по умолчанию: текущая директория)')
subparsers = parser.add_subparsers(dest='command', help='Команда')
# Команда init
init_parser = subparsers.add_parser('init', help='Инициализировать репозиторий')
# Команда snapshot
snapshot_parser = subparsers.add_parser('snapshot', help='Создать снимок состояния директории')
snapshot_parser.add_argument('--message', '-m', help='Сообщение для снимка')
# Команда list
list_parser = subparsers.add_parser('list', help='Вывести список снимков')
# Команда restore
restore_parser = subparsers.add_parser('restore', help='Восстановить состояние директории из снимка')
restore_parser.add_argument('snapshot_id', type=int, help='ID снимка для восстановления')
restore_parser.add_argument('--target', '-t', help='Целевая директория для восстановления')
restore_parser.add_argument('--force', '-f', action='store_true',
help='Принудительное восстановление (перезапись существующих файлов)')
# Команда diff
diff_parser = subparsers.add_parser('diff', help='Показать различия между снимком и текущим состоянием')
diff_parser.add_argument('snapshot_id', type=int, nargs='?',
help='ID снимка для сравнения (если не указан, используется последний)')
args = parser.parse_args()
try:
vcs = SimpleVCS(args.directory)
if args.command == 'init':
# Репозиторий уже инициализирован в конструкторе
print(f"Репозиторий в {args.directory} уже инициализирован")
elif args.command == 'snapshot':
vcs.create_snapshot(args.message)
elif args.command == 'list':
vcs.list_snapshots()
elif args.command == 'restore':
vcs.restore_snapshot(args.snapshot_id, args.target, args.force)
elif args.command == 'diff':
vcs.show_diff(args.snapshot_id)
else:
parser.print_help()
except Exception as e:
print(f"Ошибка: {e}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
Использование:
# Инициализация репозитория
python simple_vcs.py -d /путь/к/директории init
# Создание снимка
python simple_vcs.py -d /путь/к/директории snapshot -m "Описание снимка"
# Список снимков
python simple_vcs.py -d /путь/к/директории list
# Восстановление снимка
python simple_vcs.py -d /путь/к/директории restore 1 --force
# Просмотр различий
python simple_vcs.py -d /путь/к/директории diff 1
22. Утилита для пакетной обработки CSV-файлов
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import csv
import argparse
import sys
from collections import defaultdict
class CSVProcessor:
"""
Класс для пакетной обработки CSV-файлов
"""
def __init__(self, delimiter=',', quotechar='"', encoding='utf-8'):
"""
Инициализирует обработчик CSV-файлов
Args:
delimiter (str): Разделитель полей
quotechar (str): Символ кавычек
encoding (str): Кодировка файлов
"""
self.delimiter = delimiter
self.quotechar = quotechar
self.encoding = encoding
def read_csv(self, file_path):
"""
Читает данные из CSV-файла
Args:
file_path (str): Путь к CSV-файлу
Returns:
tuple: (заголовки, строки данных)
"""
with open(file_path, 'r', encoding=self.encoding, newline='') as file:
reader = csv.reader(file, delimiter=self.delimiter, quotechar=self.quotechar)
headers = next(reader)
data = list(reader)
return headers, data
def write_csv(self, file_path, headers, data):
"""
Записывает данные в CSV-файл
Args:
file_path (str): Путь к выходному CSV-файлу
headers (list): Заголовки столбцов
data (list): Строки данных
"""
with open(file_path, 'w', encoding=self.encoding, newline='') as file:
writer = csv.writer(file, delimiter=self.delimiter, quotechar=self.quotechar, quoting=csv.QUOTE_MINIMAL)
writer.writerow(headers)
writer.writerows(data)
def merge_files(self, file_paths, output_file, by_columns=None):
"""
Объединяет несколько CSV-файлов в один
Args:
file_paths (list): Список путей к CSV-файлам
output_file (str): Путь к выходному CSV-файлу
by_columns (list): Список столбцов для объединения (если None, то простое слияние)
Returns:
tuple: (количество объединенных строк, количество файлов)
"""
# Проверяем наличие файлов
for file_path in file_paths:
if not os.path.isfile(file_path):
raise FileNotFoundError(f"Файл '{file_path}' не найден")
# Если файлов нет, возвращаем ошибку
if not file_paths:
raise ValueError("Не указаны входные файлы")
merged_headers = []
all_headers = []
merged_data = []
# Простое слияние (добавление строк)
if by_columns is None:
for i, file_path in enumerate(file_paths):
headers, data = self.read_csv(file_path)
# Для первого файла запоминаем заголовки
if i == 0:
merged_headers = headers
all_headers.append(headers)
merged_data.extend(data)
else:
# Проверяем совпадение заголовков
if headers != merged_headers:
print(f"Предупреждение: заголовки в файле '{file_path}' отличаются")
all_headers.append(headers)
# Если количество столбцов не совпадает, пропускаем файл
if len(headers) != len(merged_headers):
print(f"Пропуск файла '{file_path}': несовместимое количество столбцов")
continue
merged_data.extend(data)
# Записываем результат
self.write_csv(output_file, merged_headers, merged_data)
return len(merged_data), len(file_paths)
# Объединение по ключевым столбцам
else:
# Проверяем корректность столбцов для объединения
if not isinstance(by_columns, list) or not by_columns:
raise ValueError("Некорректно указаны столбцы для объединения")
# Словарь для хранения объединенных данных
merged_rows = defaultdict(dict)
for file_path in file_paths:
headers, data = self.read_csv(file_path)
all_headers.append(headers)
# Проверяем наличие ключевых столбцов
missing_columns = [col for col in by_columns if col not in headers]
if missing_columns:
print(f"Пропуск файла '{file_path}': отсутствуют столбцы {missing_columns}")
continue
# Индексы ключевых столбцов
key_indices = [headers.index(col) for col in by_columns]
# Обрабатываем строки
for row in data:
# Формируем ключ из значений ключевых столбцов
key = tuple(row[idx] for idx in key_indices)
# Добавляем значения столбцов в словарь
for i, value in enumerate(row):
column_name = headers[i]
# Добавляем только если столбец отсутствует или это ключевой столбец
if column_name not in merged_rows[key] or column_name in by_columns:
merged_rows[key][column_name] = value
# Формируем заголовки для выходного файла
merged_headers = set()
for headers_list in all_headers:
merged_headers.update(headers_list)
# Сортируем заголовки: сначала ключевые, потом остальные
merged_headers = by_columns + sorted(col for col in merged_headers if col not in by_columns)
# Формируем строки данных
merged_data = []
for key, row_data in merged_rows.items():
row = []
for column in merged_headers:
row.append(row_data.get(column, ""))
merged_data.append(row)
# Записываем результат
self.write_csv(output_file, merged_headers, merged_data)
return len(merged_data), len(file_paths)
def filter_columns(self, input_file, output_file, columns=None, exclude_columns=None):
"""
Фильтрует столбцы CSV-файла
Args:
input_file (str): Путь к входному CSV-файлу
output_file (str): Путь к выходному CSV-файлу
columns (list): Список столбцов для сохранения
exclude_columns (list): Список столбцов для исключения
Returns:
tuple: (количество обработанных строк, количество столбцов)
"""
# Проверяем наличие файла
if not os.path.isfile(input_file):
raise FileNotFoundError(f"Файл '{input_file}' не найден")
# Проверяем, что указан хотя бы один параметр
if columns is None and exclude_columns is None:
raise ValueError("Необходимо указать либо столбцы для сохранения, либо столбцы для исключения")
# Читаем данные
headers, data = self.read_csv(input_file)
# Определяем индексы столбцов для сохранения
indices_to_keep = []
if columns is not None:
# Проверяем наличие указанных столбцов
missing_columns = [col for col in columns if col not in headers]
if missing_columns:
raise ValueError(f"Столбцы {missing_columns} отсутствуют в файле")
# Сохраняем указанные столбцы
indices_to_keep = [headers.index(col) for col in columns]
else:
# Исключаем указанные столбцы
indices_to_keep = [i for i, col in enumerate(headers) if col not in exclude_columns]
# Фильтруем заголовки и данные
filtered_headers = [headers[i] for i in indices_to_keep]
filtered_data = [[row[i] for i in indices_to_keep] for row in data]
# Записываем результат
self.write_csv(output_file, filtered_headers, filtered_data)
return len(filtered_data), len(filtered_headers)
def filter_rows(self, input_file, output_file, column=None, value=None, operator='=', case_sensitive=False): """
Фильтрует строки CSV-файла по значению в указанном столбце
Args:
input_file (str): Путь к входному CSV-файлу
output_file (str): Путь к выходному CSV-файлу
column (str): Название столбца для фильтрации
value (str): Значение для сравнения
operator (str): Оператор сравнения ('=', '!=', '>', '<', '>=', '<=', 'contains')
case_sensitive (bool): Учитывать регистр при сравнении строк
Returns:
int: Количество отфильтрованных строк
"""
# Проверяем наличие файла
if not os.path.isfile(input_file):
raise FileNotFoundError(f"Файл '{input_file}' не найден")
# Проверяем корректность параметров
if column is None:
raise ValueError("Не указан столбец для фильтрации")
# Читаем данные
headers, data = self.read_csv(input_file)
# Проверяем наличие столбца
if column not in headers:
raise ValueError(f"Столбец '{column}' отсутствует в файле")
# Определяем индекс столбца
column_index = headers.index(column)
# Функция для сравнения значений
def compare(row_value, filter_value):
# Для числовых операторов пытаемся преобразовать значения в числа
if operator in ['>', '<', '>=', '<=']:
try:
row_num = float(row_value)
filter_num = float(filter_value)
if operator == '>':
return row_num > filter_num
elif operator == '<':
return row_num < filter_num
elif operator == '>=':
return row_num >= filter_num
elif operator == '<=':
return row_num <= filter_num
except (ValueError, TypeError):
# Если не удалось преобразовать в числа, используем лексикографическое сравнение
pass
# Для строковых операторов
if not case_sensitive:
row_value = row_value.lower()
filter_value = filter_value.lower()
if operator == '=':
return row_value == filter_value
elif operator == '!=':
return row_value != filter_value
elif operator == 'contains':
return filter_value in row_value
# По умолчанию считаем условие не выполненным
return False
# Фильтруем строки
filtered_data = [row for row in data if compare(row[column_index], value)]
# Записываем результат
self.write_csv(output_file, headers, filtered_data)
return len(filtered_data)
def sort_data(self, input_file, output_file, columns, reverse=False):
"""
Сортирует CSV-файл по указанным столбцам
Args:
input_file (str): Путь к входному CSV-файлу
output_file (str): Путь к выходному CSV-файлу
columns (list): Список столбцов для сортировки
reverse (bool): Сортировка в обратном порядке
Returns:
int: Количество обработанных строк
"""
# Проверяем наличие файла
if not os.path.isfile(input_file):
raise FileNotFoundError(f"Файл '{input_file}' не найден")
# Проверяем корректность столбцов для сортировки
if not isinstance(columns, list) or not columns:
raise ValueError("Некорректно указаны столбцы для сортировки")
# Читаем данные
headers, data = self.read_csv(input_file)
# Проверяем наличие указанных столбцов
missing_columns = [col for col in columns if col not in headers]
if missing_columns:
raise ValueError(f"Столбцы {missing_columns} отсутствуют в файле")
# Определяем индексы столбцов для сортировки
indices = [headers.index(col) for col in columns]
# Сортируем данные
def sort_key(row):
values = []
for idx in indices:
val = row[idx]
# Пытаемся преобразовать значение в число
try:
val = float(val)
except (ValueError, TypeError):
pass
values.append(val)
return values
sorted_data = sorted(data, key=sort_key, reverse=reverse)
# Записываем результат
self.write_csv(output_file, headers, sorted_data)
return len(sorted_data)
def group_data(self, input_file, output_file, group_by, aggregate_funcs=None):
"""
Группирует данные CSV-файла по указанным столбцам
Args:
input_file (str): Путь к входному CSV-файлу
output_file (str): Путь к выходному CSV-файлу
group_by (list): Список столбцов для группировки
aggregate_funcs (dict): Словарь функций агрегации вида {столбец: функция}
Returns:
int: Количество групп
"""
# Проверяем наличие файла
if not os.path.isfile(input_file):
raise FileNotFoundError(f"Файл '{input_file}' не найден")
# Проверяем корректность столбцов для группировки
if not isinstance(group_by, list) or not group_by:
raise ValueError("Некорректно указаны столбцы для группировки")
# Читаем данные
headers, data = self.read_csv(input_file)
# Проверяем наличие указанных столбцов
missing_columns = [col for col in group_by if col not in headers]
if missing_columns:
raise ValueError(f"Столбцы {missing_columns} отсутствуют в файле")
# Если функции агрегации не указаны, используем count для всех столбцов
if aggregate_funcs is None:
aggregate_funcs = {}
# Проверяем функции агрегации
for col in aggregate_funcs:
if col not in headers:
raise ValueError(f"Столбец '{col}' для агрегации отсутствует в файле")
# Индексы столбцов для группировки
group_indices = [headers.index(col) for col in group_by]
# Словарь для хранения сгруппированных данных
grouped_data = defaultdict(list)
# Группируем данные
for row in data:
# Формируем ключ из значений столбцов для группировки
key = tuple(row[idx] for idx in group_indices)
grouped_data[key].append(row)
# Формируем результат
result_headers = group_by + [f"{col}_{func}" for col, func in aggregate_funcs.items()]
result_data = []
for key, rows in grouped_data.items():
new_row = list(key)
# Применяем функции агрегации
for col, func in aggregate_funcs.items():
col_idx = headers.index(col)
values = [row[col_idx] for row in rows]
# Пытаемся преобразовать значения в числа
try:
values = [float(val) for val in values if val]
except (ValueError, TypeError):
# Если не удалось преобразовать, оставляем как есть
pass
# Применяем функцию агрегации
if func == 'count':
result = len(rows)
elif func == 'sum':
result = sum(values) if values else 0
elif func == 'avg':
result = sum(values) / len(values) if values else 0
elif func == 'min':
result = min(values) if values else ''
elif func == 'max':
result = max(values) if values else ''
else:
raise ValueError(f"Неизвестная функция агрегации '{func}'")
new_row.append(str(result))
result_data.append(new_row)
# Сортируем результат по ключу группировки
result_data.sort()
# Записываем результат
self.write_csv(output_file, result_headers, result_data)
return len(result_data)
def parse_key_value_arg(arg):
"""
Парсит аргумент вида "ключ=значение"
Args:
arg (str): Аргумент для парсинга
Returns:
tuple: (ключ, значение)
"""
if '=' not in arg:
raise ValueError(f"Неверный формат аргумента '{arg}', ожидается 'ключ=значение'")
key, value = arg.split('=', 1)
return key.strip(), value.strip()
def parse_aggregate_funcs(args):
"""
Парсит аргументы агрегации вида "столбец:функция"
Args:
args (list): Список аргументов
Returns:
dict: Словарь {столбец: функция}
"""
result = {}
for arg in args:
if ':' not in arg:
raise ValueError(f"Неверный формат аргумента агрегации '{arg}', ожидается 'столбец:функция'")
column, func = arg.split(':', 1)
result[column.strip()] = func.strip()
return result
def main():
parser = argparse.ArgumentParser(description='Утилита для пакетной обработки CSV-файлов')
subparsers = parser.add_subparsers(dest='command', help='Команда')
# Общие параметры
parser.add_argument('--delimiter', default=',', help='Разделитель полей (по умолчанию: ,)')
parser.add_argument('--quotechar', default='"', help='Символ кавычек (по умолчанию: ")')
parser.add_argument('--encoding', default='utf-8', help='Кодировка файлов (по умолчанию: utf-8)')
# Команда merge - объединение файлов
merge_parser = subparsers.add_parser('merge', help='Объединить несколько CSV-файлов')
merge_parser.add_argument('input_files', nargs='+', help='Входные CSV-файлы')
merge_parser.add_argument('--output', '-o', required=True, help='Выходной CSV-файл')
merge_parser.add_argument('--by-columns', '-b', nargs='+', help='Столбцы для объединения')
# Команда filter-columns - фильтрация столбцов
filter_cols_parser = subparsers.add_parser('filter-columns', help='Фильтрация столбцов CSV-файла')
filter_cols_parser.add_argument('input_file', help='Входной CSV-файл')
filter_cols_parser.add_argument('--output', '-o', required=True, help='Выходной CSV-файл')
filter_cols_parser.add_argument('--columns', '-c', nargs='+', help='Столбцы для сохранения')
filter_cols_parser.add_argument('--exclude', '-e', nargs='+', help='Столбцы для исключения')
# Команда filter-rows - фильтрация строк
filter_rows_parser = subparsers.add_parser('filter-rows', help='Фильтрация строк CSV-файла')
filter_rows_parser.add_argument('input_file', help='Входной CSV-файл')
filter_rows_parser.add_argument('--output', '-o', required=True, help='Выходной CSV-файл')
filter_rows_parser.add_argument('--column', '-c', required=True, help='Столбец для фильтрации')
filter_rows_parser.add_argument('--value', '-v', required=True, help='Значение для сравнения')
filter_rows_parser.add_argument('--operator', choices=['=', '!=', '>', '<', '>=', '<=', 'contains'],
default='=', help='Оператор сравнения (по умолчанию: =)')
filter_rows_parser.add_argument('--case-sensitive', action='store_true',
help='Учитывать регистр при сравнении строк')
# Команда sort - сортировка данных
sort_parser = subparsers.add_parser('sort', help='Сортировка CSV-файла')
sort_parser.add_argument('input_file', help='Входной CSV-файл')
sort_parser.add_argument('--output', '-o', required=True, help='Выходной CSV-файл')
sort_parser.add_argument('--columns', '-c', nargs='+', required=True, help='Столбцы для сортировки')
sort_parser.add_argument('--reverse', '-r', action='store_true', help='Сортировка в обратном порядке')
# Команда group - группировка данных
group_parser = subparsers.add_parser('group', help='Группировка данных CSV-файла')
group_parser.add_argument('input_file', help='Входной CSV-файл')
group_parser.add_argument('--output', '-o', required=True, help='Выходной CSV-файл')
group_parser.add_argument('--group-by', '-g', nargs='+', required=True, help='Столбцы для группировки')
group_parser.add_argument('--aggregate', '-a', nargs='+',
help='Функции агрегации вида "столбец:функция"')
args = parser.parse_args()
# Создаем обработчик CSV-файлов
processor = CSVProcessor(args.delimiter, args.quotechar, args.encoding)
try:
if args.command == 'merge':
rows_count, files_count = processor.merge_files(
args.input_files, args.output, args.by_columns
)
print(f"Объединено {rows_count} строк из {files_count} файлов")
elif args.command == 'filter-columns':
if args.columns is None and args.exclude is None:
print("Ошибка: Необходимо указать либо столбцы для сохранения (--columns), либо столбцы для исключения (--exclude)")
return 1
rows_count, cols_count = processor.filter_columns(
args.input_file, args.output, args.columns, args.exclude
)
print(f"Отфильтровано {rows_count} строк, оставлено {cols_count} столбцов")
elif args.command == 'filter-rows':
rows_count = processor.filter_rows(
args.input_file, args.output, args.column, args.value,
args.operator, args.case_sensitive
)
print(f"Отфильтровано {rows_count} строк")
elif args.command == 'sort':
rows_count = processor.sort_data(
args.input_file, args.output, args.columns, args.reverse
)
print(f"Отсортировано {rows_count} строк")
elif args.command == 'group':
aggregate_funcs = {}
if args.aggregate:
aggregate_funcs = parse_aggregate_funcs(args.aggregate)
groups_count = processor.group_data(
args.input_file, args.output, args.group_by, aggregate_funcs
)
print(f"Данные сгруппированы, получено {groups_count} групп")
else:
parser.print_help()
return 1
except Exception as e:
print(f"Ошибка: {e}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
Использование:
# Объединение файлов
python csv_processor.py merge file1.csv file2.csv --output merged.csv
# Объединение файлов по столбцам
python csv_processor.py merge file1.csv file2.csv --output merged.csv --by-columns id name
# Фильтрация столбцов
python csv_processor.py filter-columns data.csv --output filtered.csv --columns id name age
# Фильтрация строк
python csv_processor.py filter-rows data.csv --output filtered.csv --column age --value 18 --operator ">"
# Сортировка данных
python csv_processor.py sort data.csv --output sorted.csv --columns name age
# Группировка данных
python csv_processor.py group data.csv --output grouped.csv --group-by city --aggregate "age:avg" "salary:sum"
23. Скрипт для сортировки файлов в "Загрузках" браузера
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import shutil
import time
import argparse
import logging
from datetime import datetime
from pathlib import Path
import mimetypes
class FileSorter:
"""
Класс для сортировки файлов по типам
"""
def __init__(self, source_dir, dest_dir=None, move_files=True):
"""
Инициализирует сортировщик файлов
Args:
source_dir (str): Директория для сортировки файлов
dest_dir (str): Директория для перемещения отсортированных файлов
(если None, используется source_dir)
move_files (bool): Перемещать файлы вместо копирования
"""
self.source_dir = os.path.abspath(source_dir)
self.dest_dir = os.path.abspath(dest_dir) if dest_dir else self.source_dir
self.move_files = move_files
# Настраиваем логирование
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
self.logger = logging.getLogger('FileSorter')
# Инициализируем mimetypes
mimetypes.init()
# Определяем категории файлов
self.categories = {
'images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp', '.svg', '.ico'],
'documents': ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.rtf', '.odt', '.ods', '.odp'],
'audio': ['.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a', '.wma'],
'video': ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.m4v', '.mpg', '.mpeg'],
'archives': ['.zip', '.rar', '.7z', '.tar', '.gz', '.bz2', '.xz', '.iso'],
'code': ['.py', '.js', '.html', '.css', '.php', '.java', '.c', '.cpp', '.h', '.sh', '.bat', '.ps1', '.json', '.xml', '.yaml', '.yml'],
'ebooks': ['.epub', '.mobi', '.azw', '.azw3', '.fb2'],
'installers': ['.exe', '.msi', '.deb', '.rpm', '.dmg', '.pkg', '.apk']
}
# Обратный словарь для быстрого определения категории
self.extension_to_category = {}
for category, extensions in self.categories.items():
for ext in extensions:
self.extension_to_category[ext] = category
def _get_file_category(self, file_path):
"""
Определяет категорию файла по его расширению
Args:
file_path (str): Путь к файлу
Returns:
str: Категория файла ('other', если категория не определена)
"""
# Определяем расширение файла
ext = os.path.splitext(file_path)[1].lower()
# Возвращаем категорию или 'other'
return self.extension_to_category.get(ext, 'other')
def _create_category_dirs(self):
"""
Создает директории для категорий файлов
Returns:
dict: Словарь {категория: путь_к_директории}
"""
category_dirs = {}
# Создаем директорию для категории 'other'
categories = list(self.categories.keys()) + ['other']
for category in categories:
category_path = os.path.join(self.dest_dir, category)
# Создаем директорию, если она не существует
if not os.path.exists(category_path):
os.makedirs(category_path)
self.logger.info(f"Создана директория для категории '{category}': {category_path}")
category_dirs[category] = category_path
return category_dirs
def sort_files(self, recursive=False, min_file_age=0):
"""
Сортирует файлы по категориям
Args:
recursive (bool): Рекурсивно обрабатывать поддиректории
min_file_age (int): Минимальный возраст файла в минутах
Returns:
dict: Статистика сортировки
"""
# Проверяем существование исходной директории
if not os.path.isdir(self.source_dir):
raise ValueError(f"Директория '{self.source_dir}' не существует")
# Создаем директории для категорий
category_dirs = self._create_category_dirs()
# Статистика сортировки
stats = {
'processed': 0,
'moved': 0,
'skipped': 0,
'errors': 0,
'by_category': {}
}
# Получаем текущее время
current_time = time.time()
# Функция для обработки одного файла
def process_file(file_path):
try:
# Пропускаем скрытые файлы
if os.path.basename(file_path).startswith('.'):
stats['skipped'] += 1
return
# Проверяем возраст файла
if min_file_age > 0:
file_age_minutes = (current_time - os.path.getmtime(file_path)) / 60
if file_age_minutes < min_file_age:
self.logger.debug(f"Пропуск файла {file_path}: слишком новый (возраст: {file_age_minutes:.1f} мин.)")
stats['skipped'] += 1
return
# Определяем категорию файла
category = self._get_file_category(file_path)
# Получаем директорию назначения
target_dir = category_dirs[category]
# Формируем путь к целевому файлу
filename = os.path.basename(file_path)
target_path = os.path.join(target_dir, filename)
# Проверяем, существует ли файл с таким именем
if os.path.exists(target_path):
# Если файлы идентичны, пропускаем
if os.path.getsize(file_path) == os.path.getsize(target_path):
self.logger.debug(f"Пропуск файла {file_path}: уже существует")
stats['skipped'] += 1
return
# Добавляем временную метку к имени файла
base, ext = os.path.splitext(filename)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
new_filename = f"{base}_{timestamp}{ext}"
target_path = os.path.join(target_dir, new_filename)
# Перемещаем или копируем файл
if self.move_files:
shutil.move(file_path, target_path)
self.logger.info(f"Перемещен файл {file_path} -> {target_path}")
else:
shutil.copy2(file_path, target_path)
self.logger.info(f"Скопирован файл {file_path} -> {target_path}")
# Обновляем статистику
stats['moved'] += 1
stats['by_category'][category] = stats['by_category'].get(category, 0) + 1
except Exception as e:
self.logger.error(f"Ошибка при обработке файла {file_path}: {e}")
stats['errors'] += 1
# Обход файлов
if recursive:
for root, dirs, files in os.walk(self.source_dir):
# Пропускаем директории категорий
if root == self.dest_dir or any(root.startswith(os.path.join(self.dest_dir, cat)) for cat in category_dirs.keys()):
continue
for filename in files:
file_path = os.path.join(root, filename)
stats['processed'] += 1
process_file(file_path)
else:
# Обрабатываем только файлы в исходной директории
for filename in os.listdir(self.source_dir):
file_path = os.path.join(self.source_dir, filename)
if os.path.isfile(file_path):
stats['processed'] += 1
process_file(file_path)
# Выводим статистику
self.logger.info(f"Обработка завершена. Обработано файлов: {stats['processed']}")
self.logger.info(f"Перемещено: {stats['moved']}, пропущено: {stats['skipped']}, ошибок: {stats['errors']}")
# Выводим статистику по категориям
for category, count in sorted(stats['by_category'].items(), key=lambda x: x[1], reverse=True):
self.logger.info(f"Категория '{category}': {count} файлов")
return stats
def main():
parser = argparse.ArgumentParser(description='Сортировка файлов по типам')
parser.add_argument('source_dir', nargs='?', default=os.path.expanduser('~/Downloads'),
help='Директория для сортировки файлов (по умолчанию: ~/Downloads)')
parser.add_argument('--dest-dir', '-d',
help='Директория для перемещения отсортированных файлов (по умолчанию: source_dir)')
parser.add_argument('--copy', '-c', action='store_true',
help='Копировать файлы вместо перемещения')
parser.add_argument('--recursive', '-r', action='store_true',
help='Рекурсивно обрабатывать поддиректории')
parser.add_argument('--min-age', '-a', type=int, default=0,
help='Минимальный возраст файла в минутах (по умолчанию: 0)')
parser.add_argument('--verbose', '-v', action='store_true',
help='Подробный вывод')
args = parser.parse_args()
# Настраиваем уровень логирования
if args.verbose:
logging.getLogger('FileSorter').setLevel(logging.DEBUG)
try:
# Создаем сортировщик файлов
sorter = FileSorter(args.source_dir, args.dest_dir, not args.copy)
# Сортируем файлы
stats = sorter.sort_files(args.recursive, args.min_age)
return 0 if stats['errors'] == 0 else 1
except Exception as e:
logging.error(f"Ошибка: {e}")
return 1
if __name__ == "__main__":
import sys
sys.exit(main())
Использование:
# Сортировка файлов в директории "Загрузки"
python sort_downloads.py
# Сортировка файлов в указанной директории с копированием вместо перемещения
python sort_downloads.py /путь/к/директории --copy
# Рекурсивная сортировка файлов, которые старше 10 минут
python sort_downloads.py --recursive --min-age 10
24. Система для асинхронного скачивания файлов
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import time
import argparse
import asyncio
import aiohttpimport os
import sys
import time
import argparse
import asyncio
import aiohttp
import logging
import signal
from urllib.parse import urlparse, unquote
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime, timedelta
class DownloadLimiter:
"""
Класс для ограничения скорости скачивания
"""
def __init__(self, speed_limit=None):
"""
Инициализирует ограничитель скорости
Args:
speed_limit (int): Лимит скорости в байтах в секунду
"""
self.speed_limit = speed_limit # Байт в секунду
self.downloaded = 0
self.start_time = time.time()
def reset(self):
"""
Сбрасывает счетчики
"""
self.downloaded = 0
self.start_time = time.time()
async def wait_if_needed(self, chunk_size):
"""
Ожидает, если скорость скачивания превышает лимит
Args:
chunk_size (int): Размер последнего скачанного чанка
"""
if not self.speed_limit:
return
self.downloaded += chunk_size
elapsed = time.time() - self.start_time
if elapsed > 0:
current_speed = self.downloaded / elapsed
if current_speed > self.speed_limit:
# Вычисляем время ожидания
expected_time = self.downloaded / self.speed_limit
wait_time = expected_time - elapsed
if wait_time > 0:
await asyncio.sleep(wait_time)
class ProgressBar:
"""
Класс для отображения прогресс-бара в консоли
"""
def __init__(self, total_size, filename, width=50):
"""
Инициализирует прогресс-бар
Args:
total_size (int): Общий размер файла в байтах
filename (str): Имя файла
width (int): Ширина прогресс-бара
"""
self.total_size = total_size
self.filename = filename
self.width = width
self.downloaded = 0
self.start_time = time.time()
self.last_update = 0
self.update_interval = 0.2 # Интервал обновления в секундах
def update(self, chunk_size):
"""
Обновляет прогресс-бар
Args:
chunk_size (int): Размер скачанного чанка
"""
self.downloaded += chunk_size
current_time = time.time()
# Обновляем прогресс-бар не чаще, чем раз в update_interval секунд
if current_time - self.last_update < self.update_interval and self.downloaded < self.total_size:
return
self.last_update = current_time
# Вычисляем процент выполнения
percent = min(100, int(self.downloaded * 100 / self.total_size))
# Вычисляем скорость скачивания
elapsed = current_time - self.start_time
speed = self.downloaded / elapsed if elapsed > 0 else 0
# Вычисляем оставшееся время
if speed > 0 and self.downloaded < self.total_size:
eta_seconds = (self.total_size - self.downloaded) / speed
eta = str(timedelta(seconds=int(eta_seconds)))
else:
eta = "00:00:00"
# Формируем прогресс-бар
filled_width = int(self.width * percent / 100)
bar = '█' * filled_width + '░' * (self.width - filled_width)
# Форматируем вывод
formatted_speed = self._format_size(speed) + "/s"
formatted_size = f"{self._format_size(self.downloaded)}/{self._format_size(self.total_size)}"
# Выводим прогресс-бар
sys.stdout.write(f"\r{self.filename} [{bar}] {percent}% {formatted_size} {formatted_speed} ETA: {eta}")
sys.stdout.flush()
# Если скачивание завершено, добавляем перевод строки
if self.downloaded >= self.total_size:
sys.stdout.write("\n")
sys.stdout.flush()
def _format_size(self, size_bytes):
"""
Форматирует размер в байтах в человеко-читаемый формат
Args:
size_bytes (int): Размер в байтах
Returns:
str: Отформатированный размер
"""
if size_bytes < 1024:
return f"{size_bytes}B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes/1024:.1f}KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes/(1024*1024):.1f}MB"
else:
return f"{size_bytes/(1024*1024*1024):.1f}GB"
class AsyncDownloader:
"""
Класс для асинхронного скачивания файлов
"""
def __init__(self, speed_limit=None, chunk_size=8192, timeout=30, max_retries=3,
output_dir=None, overwrite=False, user_agent=None):
"""
Инициализирует загрузчик файлов
Args:
speed_limit (int): Ограничение скорости скачивания в КБ/с
chunk_size (int): Размер чанка для скачивания
timeout (int): Таймаут соединения в секундах
max_retries (int): Максимальное количество повторных попыток
output_dir (str): Директория для сохранения файлов
overwrite (bool): Перезаписывать существующие файлы
user_agent (str): User-Agent для HTTP-запросов
"""
self.speed_limit = speed_limit * 1024 if speed_limit else None # Переводим в байты
self.chunk_size = chunk_size
self.timeout = timeout
self.max_retries = max_retries
self.output_dir = output_dir or os.getcwd()
self.overwrite = overwrite
self.user_agent = user_agent or "AsyncDownloader/1.0"
# Создаем директорию для сохранения файлов, если она не существует
os.makedirs(self.output_dir, exist_ok=True)
# Настраиваем логирование
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
self.logger = logging.getLogger('AsyncDownloader')
# Список активных задач
self.active_tasks = set()
# Флаг для обработки сигналов
self.shutdown_event = asyncio.Event()
def _get_filename_from_url(self, url):
"""
Извлекает имя файла из URL
Args:
url (str): URL файла
Returns:
str: Имя файла
"""
parsed_url = urlparse(url)
path = unquote(parsed_url.path)
filename = os.path.basename(path)
# Если имя файла не определено, используем хост и временную метку
if not filename:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{parsed_url.netloc}_{timestamp}"
return filename
async def download_file(self, url, output_path=None):
"""
Скачивает файл по URL
Args:
url (str): URL файла
output_path (str): Путь для сохранения файла
Returns:
tuple: (успешность, путь_к_файлу, сообщение)
"""
# Если путь не указан, определяем его из URL
if not output_path:
filename = self._get_filename_from_url(url)
output_path = os.path.join(self.output_dir, filename)
# Проверяем, существует ли файл
if os.path.exists(output_path) and not self.overwrite:
return False, output_path, "Файл уже существует"
# Создаем лимитер скорости
limiter = DownloadLimiter(self.speed_limit)
# Счетчик попыток
retries = 0
while retries <= self.max_retries:
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
async with session.get(url, headers={"User-Agent": self.user_agent}) as response:
# Проверяем статус ответа
if response.status != 200:
retries += 1
self.logger.warning(f"Ошибка при скачивании {url}: HTTP {response.status}")
await asyncio.sleep(1 * retries) # Увеличиваем задержку с каждой попыткой
continue
# Получаем размер файла
total_size = int(response.headers.get('content-length', 0))
# Создаем прогресс-бар
progress = ProgressBar(total_size, os.path.basename(output_path))
# Открываем файл для записи
with open(output_path, 'wb') as f:
# Сбрасываем лимитер
limiter.reset()
# Скачиваем файл по частям
async for chunk in response.content.iter_chunked(self.chunk_size):
# Проверяем сигнал остановки
if self.shutdown_event.is_set():
return False, output_path, "Скачивание прервано пользователем"
# Записываем чанк
f.write(chunk)
# Обновляем прогресс
progress.update(len(chunk))
# Ожидаем, если скорость превышена
await limiter.wait_if_needed(len(chunk))
self.logger.info(f"Скачан файл: {url} -> {output_path}")
return True, output_path, "Успешно скачано"
except asyncio.TimeoutError:
retries += 1
self.logger.warning(f"Таймаут при скачивании {url} (попытка {retries}/{self.max_retries})")
await asyncio.sleep(1 * retries)
except (aiohttp.ClientError, IOError) as e:
retries += 1
self.logger.warning(f"Ошибка при скачивании {url}: {e} (попытка {retries}/{self.max_retries})")
await asyncio.sleep(1 * retries)
except Exception as e:
return False, output_path, f"Неожиданная ошибка: {e}"
return False, output_path, f"Превышено количество попыток ({self.max_retries})"
async def download_files(self, urls, max_concurrent=3):
"""
Скачивает несколько файлов асинхронно
Args:
urls (list): Список URL файлов
max_concurrent (int): Максимальное количество одновременных скачиваний
Returns:
list: Список результатов скачивания
"""
# Семафор для ограничения количества одновременных скачиваний
semaphore = asyncio.Semaphore(max_concurrent)
# Функция-обертка для использования семафора
async def download_with_semaphore(url):
async with semaphore:
return await self.download_file(url)
# Создаем задачи для скачивания
tasks = []
for url in urls:
task = asyncio.create_task(download_with_semaphore(url))
self.active_tasks.add(task)
task.add_done_callback(self.active_tasks.discard)
tasks.append(task)
# Ожидаем завершения всех задач
results = await asyncio.gather(*tasks, return_exceptions=True)
# Обрабатываем результаты
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append((False, None, f"Ошибка: {result}"))
else:
processed_results.append(result)
return processed_results
def signal_handler(self):
"""
Обработчик сигналов для корректного завершения
"""
self.logger.info("Получен сигнал остановки, завершаем работу...")
self.shutdown_event.set()
async def main_async():
parser = argparse.ArgumentParser(description='Асинхронное скачивание файлов по URL')
parser.add_argument('urls', nargs='+', help='URLs файлов для скачивания')
parser.add_argument('--output-dir', '-o', help='Директория для сохранения файлов')
parser.add_argument('--speed-limit', '-s', type=int, help='Ограничение скорости скачивания (KB/s)')
parser.add_argument('--concurrent', '-c', type=int, default=3,
help='Максимальное количество одновременных скачиваний (по умолчанию: 3)')
parser.add_argument('--timeout', '-t', type=int, default=30,
help='Таймаут соединения в секундах (по умолчанию: 30)')
parser.add_argument('--retries', '-r', type=int, default=3,
help='Максимальное количество повторных попыток (по умолчанию: 3)')
parser.add_argument('--overwrite', '-f', action='store_true',
help='Перезаписывать существующие файлы')
parser.add_argument('--user-agent', '-u',
help='User-Agent для HTTP-запросов')
parser.add_argument('--verbose', '-v', action='store_true',
help='Подробный вывод')
# Чтение URL из файла
parser.add_argument('--file', '-i', help='Файл со списком URL (по одному на строку)')
args = parser.parse_args()
# Настраиваем уровень логирования
if args.verbose:
logging.getLogger('AsyncDownloader').setLevel(logging.DEBUG)
# Получаем список URL
urls = args.urls.copy()
# Если указан файл, добавляем URL из него
if args.file:
try:
with open(args.file, 'r') as file:
file_urls = [line.strip() for line in file if line.strip()]
urls.extend(file_urls)
except IOError as e:
logging.error(f"Ошибка при чтении файла с URL: {e}")
return 1
# Создаем загрузчик
downloader = AsyncDownloader(
speed_limit=args.speed_limit,
timeout=args.timeout,
max_retries=args.retries,
output_dir=args.output_dir,
overwrite=args.overwrite,
user_agent=args.user_agent
)
# Обработчик сигналов для корректного завершения
loop = asyncio.get_running_loop()
for signame in ('SIGINT', 'SIGTERM'):
try:
loop.add_signal_handler(
getattr(signal, signame),
downloader.signal_handler
)
except NotImplementedError:
# На Windows некоторые сигналы могут быть недоступны
pass
print(f"Начинаем скачивание {len(urls)} файлов (одновременно: {args.concurrent})...")
# Скачиваем файлы
results = await downloader.download_files(urls, args.concurrent)
# Выводим статистику
success_count = sum(1 for result in results if result[0])
failed_count = len(results) - success_count
print("\nСтатистика скачивания:")
print(f"Всего: {len(results)}, успешно: {success_count}, с ошибками: {failed_count}")
if failed_count > 0:
print("\nОшибки при скачивании:")
for i, (success, path, message) in enumerate(results):
if not success:
print(f" {urls[i]}: {message}")
return 0 if failed_count == 0 else 1
def main():
# Запускаем асинхронную функцию
return asyncio.run(main_async())
if __name__ == "__main__":
sys.exit(main())
Использование:
# Скачивание одного файла
python async_downloader.py https://example.com/file.zip
# Скачивание нескольких файлов с ограничением скорости
python async_downloader.py https://example.com/file1.zip https://example.com/file2.zip --speed-limit 1024
# Скачивание файлов из списка с ограничением количества одновременных закачек
python async_downloader.py --file urls.txt --concurrent 5 --output-dir downloads
25. Декоратор @memory_profiler
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import time
import functools
import tracemalloc
import logging
from datetime import datetime
def setup_logger(log_file=None, log_level=logging.INFO):
"""
Настраивает и возвращает логгер
Args:
log_file (str): Путь к файлу логов
log_level (int): Уровень логирования
Returns:
logging.Logger: Настроенный логгер
"""
# Создаем логгер
logger = logging.getLogger('memory_profiler')
logger.setLevel(log_level)
# Форматтер для логов
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Обработчик для вывода в консоль
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Обработчик для вывода в файл, если указан
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
def format_memory(bytes_value):
"""
Форматирует количество байт в человеко-читаемый формат
Args:
bytes_value (int): Количество байт
Returns:
str: Отформатированное значение
"""
if bytes_value < 1024:
return f"{bytes_value} B"
elif bytes_value < 1024 * 1024:
return f"{bytes_value / 1024:.2f} KB"
elif bytes_value < 1024 * 1024 * 1024:
return f"{bytes_value / (1024 * 1024):.2f} MB"
else:
return f"{bytes_value / (1024 * 1024 * 1024):.2f} GB"
def get_process_memory():
"""
Возвращает текущее потребление памяти процессом
Returns:
int: Текущее потребление памяти в байтах
"""
import psutil
process = psutil.Process(os.getpid())
return process.memory_info().rss
def memory_profiler(log_file=None, log_level=logging.INFO, detailed=False):
"""
Декоратор для профилирования потребления памяти функцией
Args:
log_file (str): Путь к файлу логов
log_level (int): Уровень логирования
detailed (bool): Вывод детальной информации о выделении памяти
Returns:
function: Декорированная функция
"""
logger = setup_logger(log_file, log_level)
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Сохраняем имя функции
func_name = func.__name__
# Запускаем трассировку памяти
tracemalloc.start()
# Замеряем начальное потребление памяти
start_memory = get_process_memory()
start_snapshot = tracemalloc.take_snapshot()
logger.info(f"[START] {func_name} - Начальное потребление памяти: {format_memory(start_memory)}")
# Засекаем время выполнения
start_time = time.time()
# Флаг для отслеживания исключений
exception_occurred = False
result = None
try:
# Вызываем функцию
result = func(*args, **kwargs)
return result
except Exception as e:
exception_occurred = True
logger.error(f"[ERROR] {func_name} - Исключение: {e}")
raise
finally:
# Замеряем время выполнения
execution_time = time.time() - start_time
# Замеряем конечное потребление памяти
end_memory = get_process_memory()
end_snapshot = tracemalloc.take_snapshot()
# Вычисляем разницу
memory_diff = end_memory - start_memory
# Находим пиковое потребление
peak_stats = tracemalloc.get_traced_memory()
peak_memory = peak_stats[1] # Второй элемент - пиковое потребление
# Форматируем результаты
status = "ERROR" if exception_occurred else "END"
memory_diff_str = format_memory(memory_diff)
peak_memory_str = format_memory(peak_memory)
sign = "+" if memory_diff >= 0 else ""
logger.info(f"[{status}] {func_name} - Конечное потребление памяти: {format_memory(end_memory)}")
logger.info(f"[{status}] {func_name} - Разница памяти: {sign}{memory_diff_str}")
logger.info(f"[{status}] {func_name} - Пиковое потребление памяти: {peak_memory_str}")
logger.info(f"[{status}] {func_name} - Время выполнения: {execution_time:.4f} сек.")
# Выводим детальную информацию, если требуется
if detailed:
# Анализируем разницу между снимками
top_stats = end_snapshot.compare_to(start_snapshot, 'lineno')
logger.info(f"[DETAILS] {func_name} - Топ 10 мест выделения памяти:")
for stat in top_stats[:10]:
logger.info(f" {stat}")
# Останавливаем трассировку памяти
tracemalloc.stop()
return wrapper
return decorator
# Пример использования
if __name__ == "__main__":
@memory_profiler(detailed=True)
def create_large_list(size):
"""
Создает большой список
Args:
size (int): Размер списка
Returns:
list: Созданный список
"""
return [i for i in range(size)]
@memory_profiler(detailed=True)
def create_large_dict(size):
"""
Создает большой словарь
Args:
size (int): Размер словаря
Returns:
dict: Созданный словарь
"""
return {i: str(i) * 100 for i in range(size)}
# Тестируем декоратор
large_list = create_large_list(1000000)
large_dict = create_large_dict(100000)
Использование:
# Импортируем декоратор
from memory_profiler import memory_profiler
# Применяем декоратор к функции
@memory_profiler(log_file="memory_profile.log", detailed=True)
def process_data(data):
# Ваш код
result = []
for i in range(1000000):
result.append(i * i)
return result
# Вызываем функцию
result = process_data([1, 2, 3, 4, 5])
26. Утилита для сортировки файлов в папке "Загрузки"
Э
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import shutil
import datetime
from pathlib import Path
import logging
# Настройка логирования
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()])
logger = logging.getLogger(__name__)
class FileSorter:
def __init__(self, downloads_path=None):
"""Инициализация сортировщика файлов."""
# Если путь не указан, используем домашнюю директорию и папку 'Загрузки'
if downloads_path is None:
self.downloads_path = os.path.join(Path.home(), 'Downloads1')
else:
self.downloads_path = downloads_path
# Определение категорий файлов по расширениям
self.categories = {
'Документы': ['.pdf', '.doc', '.docx', '.txt', '.rtf', '.odt', '.xls', '.xlsx', '.ppt', '.pptx'],
'Изображения': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.svg', '.webp'],
'Видео': ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.m4v'],
'Аудио': ['.mp3', '.wav', '.ogg', '.flac', '.aac', '.m4a'],
'Архивы': ['.zip', '.rar', '.7z', '.tar', '.gz', '.bz2'],
'Программы': ['.exe', '.msi', '.deb', '.rpm', '.dmg', '.app', '.apk', '.appx'],
'Код': ['.py', '.js', '.html', '.css', '.java', '.cpp', '.c', '.h', '.go', '.php', '.rb']
}
def get_category(self, file_path):
"""Определение категории файла по расширению."""
_, extension = os.path.splitext(file_path)
extension = extension.lower()
for category, extensions in self.categories.items():
if extension in extensions:
return category
return 'Other'
def create_category_folders(self):
"""Создание подпапок для каждой категории."""
# Добавляем 'Other' к списку категорий
all_categories = list(self.categories.keys()) + ['Other']
for category in all_categories:
category_path = os.path.join(self.downloads_path, category)
if not os.path.exists(category_path):
os.makedirs(category_path)
logger.info(f"Создана папка: {category_path}")
def get_date_prefix(self, file_path):
"""Получение префикса даты изменения в формате YYYYMMDD."""
mod_timestamp = os.path.getmtime(file_path)
mod_date = datetime.datetime.fromtimestamp(mod_timestamp)
return mod_date.strftime('%Y%m%d')
def sort_files(self):
"""Сортировка файлов по категориям и датам."""
# Создание папок категорий, если их нет
self.create_category_folders()
# Получение списка файлов в папке загрузок
files = [f for f in os.listdir(self.downloads_path)
if os.path.isfile(os.path.join(self.downloads_path, f))]
for filename in files:
# Пропускаем скрытые файлы
if filename.startswith('.'):
continue
source_path = os.path.join(self.downloads_path, filename)
# Определяем категорию файла
category = self.get_category(source_path)
# Получаем префикс даты
date_prefix = self.get_date_prefix(source_path)
# Создаем новое имя файла с префиксом даты
new_filename = f"{date_prefix}_{filename}"
# Путь назначения для файла
dest_folder = os.path.join(self.downloads_path, category)
dest_path = os.path.join(dest_folder, new_filename)
# Если файл с таким именем уже существует, добавляем счетчик
counter = 1
while os.path.exists(dest_path):
new_filename = f"{date_prefix}_{counter}_{filename}"
dest_path = os.path.join(dest_folder, new_filename)
counter += 1
try:
# Перемещение файла
shutil.move(source_path, dest_path)
logger.info(f"Перемещен: {filename} -> {category}/{new_filename}")
except Exception as e:
logger.error(f"Ошибка при перемещении {filename}: {str(e)}")
def main():
"""Основная функция запуска сортировщика."""
try:
print("Запуск сортировки файлов в папке 'Загрузки'...")
sorter = FileSorter()
sorter.sort_files()
print("Сортировка файлов успешно завершена!")
except Exception as e:
print(f"Произошла ошибка: {str(e)}")
logger.exception("Необработанная ошибка при сортировке файлов")
if __name__ == "__main__":
main()
И в методе process_file класса FileSorter нужно добавить следующий код перед финальным перемещением файла:
# Добавляем префикс даты к имени файла
target_path = add_date_prefix(target_path)
27. Скрипт для XOR-шифрования файлов
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import argparse
import hashlib
from getpass import getpass
def generate_key_from_password(password, key_length):
"""
Генерирует ключ из пароля
Args:
password (str): Пароль
key_length (int): Требуемая длина ключа
Returns:
bytes: Ключ для шифрования
"""
# Используем SHA-256 для создания начального ключа
sha256 = hashlib.sha256()
sha256.update(password.encode('utf-8'))
key = sha256.digest()
# Если нужен ключ большего размера, расширяем его
while len(key) < key_length:
sha256 = hashlib.sha256()
sha256.update(key)
key += sha256.digest()
# Обрезаем до нужной длины
return key[:key_length]
def xor_encrypt_decrypt(input_file, output_file, password):
"""
XOR-шифрует/дешифрует содержимое файла с паролем
Args:
input_file (str): Путь к входному файлу
output_file (str): Путь к выходному файлу
password (str): Пароль для шифрования/дешифрования
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Проверяем существование входного файла
if not os.path.isfile(input_file):
print(f"Ошибка: Файл '{input_file}' не существует")
return False
# Получаем размер входного файла
file_size = os.path.getsize(input_file)
# Если файл пустой, просто создаем пустой выходной файл
if file_size == 0:
with open(output_file, 'wb') as f:
pass
return True
# Генерируем ключ из пароля key = generate_key_from_password(password, min(file_size, 1024*1024)) # Ограничиваем размер ключа 1 МБ
key_length = len(key)
# Открываем файлы
with open(input_file, 'rb') as f_in, open(output_file, 'wb') as f_out:
# Обрабатываем файл блоками
position = 0
chunk_size = 1024 * 1024 # 1 МБ
while True:
chunk = f_in.read(chunk_size)
if not chunk:
break
# Шифруем/дешифруем чанк
encrypted_chunk = bytearray(len(chunk))
for i in range(len(chunk)):
# XOR с циклическим использованием ключа
key_index = (position + i) % key_length
encrypted_chunk[i] = chunk[i] ^ key[key_index]
# Записываем результат
f_out.write(encrypted_chunk)
position += len(chunk)
# Показываем прогресс
progress = min(100, int(position * 100 / file_size))
sys.stdout.write(f"\rПрогресс: {progress}%")
sys.stdout.flush()
print("\nОперация завершена успешно")
return True
except Exception as e:
print(f"Ошибка при обработке файла: {e}")
# Удаляем частично созданный выходной файл в случае ошибки
if os.path.exists(output_file):
try:
os.remove(output_file)
except:
pass
return False
def main():
parser = argparse.ArgumentParser(description='XOR-шифрование/дешифрование файлов с паролем')
parser.add_argument('command', choices=['encrypt', 'decrypt'],
help='Команда: encrypt для шифрования, decrypt для дешифрования')
parser.add_argument('input_file', help='Путь к входному файлу')
parser.add_argument('password', nargs='?', default=None,
help='Пароль (если не указан, будет запрошен интерактивно)')
parser.add_argument('--output', '-o', help='Путь к выходному файлу (по умолчанию определяется автоматически)')
parser.add_argument('--force', '-f', action='store_true',
help='Перезаписывать существующие файлы без запроса')
args = parser.parse_args()
# Определяем пароль
password = args.password
if password is None:
password = getpass("Введите пароль: ")
# Просим подтвердить пароль для шифрования
if args.command == 'encrypt':
confirm = getpass("Подтвердите пароль: ")
if password != confirm:
print("Ошибка: пароли не совпадают")
return 1
# Определяем выходной файл
output_file = args.output
if output_file is None:
if args.command == 'encrypt':
output_file = args.input_file + '.xor'
else:
# Для дешифрования удаляем расширение .xor, если оно есть
if args.input_file.endswith('.xor'):
output_file = args.input_file[:-4]
else:
output_file = args.input_file + '.decrypted'
# Проверяем существование выходного файла
if os.path.exists(output_file) and not args.force:
answer = input(f"Файл '{output_file}' уже существует. Перезаписать? (y/n): ")
if answer.lower() != 'y':
print("Операция отменена")
return 1
# Шифруем/дешифруем файл
success = xor_encrypt_decrypt(args.input_file, output_file, password)
if success:
print(f"Файл {args.command}ed: {args.input_file} -> {output_file}")
return 0
else:
return 1
if __name__ == "__main__":
sys.exit(main())
Использование:
# Шифрование файла
python xor_encrypt.py encrypt secret.txt mysecretpassword
# Дешифрование файла
python xor_encrypt.py decrypt secret.txt.xor mysecretpassword
# Интерактивный ввод пароля
python xor_encrypt.py encrypt secret.txt
28. Консольное приложение для учета студентов
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import csv
import argparse
from tabulate import tabulate
class StudentManager:
"""
Класс для учета студентов
"""
def __init__(self, csv_file):
"""
Инициализирует менеджер студентов
Args:
csv_file (str): Путь к CSV-файлу с данными студентов
"""
self.csv_file = csv_file
self.students = []
self.headers = ["ID", "Фамилия", "Имя", "Отчество", "Группа", "Оценки"]
# Загружаем данные из файла
self.load_data()
def load_data(self):
"""
Загружает данные из CSV-файла
"""
self.students = []
# Если файл не существует, создаем его
if not os.path.exists(self.csv_file):
self.save_data()
return
try:
with open(self.csv_file, 'r', encoding='utf-8', newline='') as file:
reader = csv.reader(file)
# Пропускаем заголовки
next(reader, None)
for row in reader:
# Проверяем количество столбцов
if len(row) >= 6:
student = {
"id": row[0],
"last_name": row[1],
"first_name": row[2],
"middle_name": row[3],
"group": row[4],
"grades": self._parse_grades(row[5])
}
self.students.append(student)
except Exception as e:
print(f"Ошибка при загрузке данных: {e}")
def save_data(self):
"""
Сохраняет данные в CSV-файл
"""
try:
with open(self.csv_file, 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
# Записываем заголовки
writer.writerow(self.headers)
# Записываем данные студентов
for student in self.students:
grades_str = self._format_grades(student["grades"])
writer.writerow([
student["id"],
student["last_name"],
student["first_name"],
student["middle_name"],
student["group"],
grades_str
])
print(f"Данные успешно сохранены в {self.csv_file}")
except Exception as e:
print(f"Ошибка при сохранении данных: {e}")
def _parse_grades(self, grades_str):
"""
Парсит строку с оценками в список чисел
Args:
grades_str (str): Строка с оценками
Returns:
list: Список оценок
"""
if not grades_str:
return []
try:
# Разделяем строку с оценками и преобразуем в числа
return [int(grade.strip()) for grade in grades_str.split(',') if grade.strip()]
except ValueError:
print(f"Предупреждение: некорректные оценки '{grades_str}'")
return []
def _format_grades(self, grades):
"""
Форматирует список оценок в строку
Args:
grades (list): Список оценок
Returns:
str: Строка с оценками, разделенными запятыми
"""
return ', '.join(map(str, grades))
def generate_id(self):
"""
Генерирует новый уникальный ID для студента
Returns:
str: Новый ID
"""
if not self.students:
return "1"
# Находим максимальный ID и увеличиваем на 1
try:
max_id = max(int(student["id"]) for student in self.students)
return str(max_id + 1)
except ValueError:
# Если не удалось преобразовать ID в число, генерируем на основе количества студентов
return str(len(self.students) + 1)
def add_student(self, last_name, first_name, middle_name, group, grades=None):
"""
Добавляет нового студента
Args:
last_name (str): Фамилия
first_name (str): Имя
middle_name (str): Отчество
group (str): Группа
grades (list): Список оценок
Returns:
dict: Данные добавленного студента
"""
if grades is None:
grades = []
# Создаем нового студента
student = {
"id": self.generate_id(),
"last_name": last_name,
"first_name": first_name,
"middle_name": middle_name,
"group": group,
"grades": grades
}
# Добавляем студента
self.students.append(student)
# Сохраняем данные
self.save_data()
return student
def update_student(self, student_id, **kwargs):
"""
Обновляет данные студента
Args:
student_id (str): ID студента
**kwargs: Обновляемые поля (last_name, first_name, middle_name, group, grades)
Returns:
bool: True в случае успеха, False если студент не найден
"""
# Находим студента по ID
for student in self.students:
if student["id"] == student_id:
# Обновляем указанные поля
for field, value in kwargs.items():
if field in student:
student[field] = value
# Сохраняем данные
self.save_data()
return True
print(f"Студент с ID '{student_id}' не найден")
return False
def delete_student(self, student_id):
"""
Удаляет студента
Args:
student_id (str): ID студента
Returns:
bool: True в случае успеха, False если студент не найден
"""
# Находим индекс студента
for i, student in enumerate(self.students):
if student["id"] == student_id:
# Удаляем студента
del self.students[i]
# Сохраняем данные
self.save_data()
return True
print(f"Студент с ID '{student_id}' не найден")
return False
def get_student_by_id(self, student_id):
"""
Возвращает студента по ID
Args:
student_id (str): ID студента
Returns:
dict: Данные студента или None, если студент не найден
"""
for student in self.students:
if student["id"] == student_id:
return student
return None
def search_by_last_name(self, last_name):
"""
Ищет студентов по фамилии
Args:
last_name (str): Фамилия для поиска
Returns:
list: Список найденных студентов
"""
# Приводим к нижнему регистру для регистронезависимого поиска
search_name = last_name.lower()
# Находим всех студентов, у которых фамилия содержит искомую строку
return [student for student in self.students
if search_name in student["last_name"].lower()]
def calculate_average_grade(self, student_id=None):
"""
Вычисляет средний балл студента или всех студентов
Args:
student_id (str): ID студента (если None, вычисляется для всех студентов)
Returns:
float or dict: Средний балл студента или словарь {id: средний_балл}
"""
if student_id:
# Вычисляем средний балл для конкретного студента
student = self.get_student_by_id(student_id)
if student and student["grades"]:
return sum(student["grades"]) / len(student["grades"])
return 0.0
else:
# Вычисляем средний балл для всех студентов
result = {}
for student in self.students:
if student["grades"]:
result[student["id"]] = sum(student["grades"]) / len(student["grades"])
else:
result[student["id"]] = 0.0
return result
def get_all_students(self):
"""
Возвращает список всех студентов
Returns:
list: Список всех студентов
"""
return self.students
def print_students(self, students=None):
"""
Выводит таблицу студентов
Args:
students (list): Список студентов для вывода (если None, выводятся все студенты)
"""
if students is None:
students = self.students
if not students:
print("Студенты не найдены")
return
# Подготавливаем данные для таблицы
table_data = []
for student in students:
# Вычисляем средний балл
avg_grade = 0.0
if student["grades"]:
avg_grade = sum(student["grades"]) / len(student["grades"])
table_data.append([
student["id"],
student["last_name"],
student["first_name"],
student["middle_name"],
student["group"],
self._format_grades(student["grades"]),
f"{avg_grade:.2f}"
])
# Выводим таблицу
headers = self.headers + ["Средний балл"]
print(tabulate(table_data, headers=headers, tablefmt="fancy_grid"))
def main():
parser = argparse.ArgumentParser(description='Консольное приложение для учета студентов')
parser.add_argument('--file', '-f', default='students.csv',
help='Путь к CSV-файлу с данными (по умолчанию: students.csv)')
subparsers = parser.add_subparsers(dest='command', help='Команда')
# Команда list - вывод списка всех студентов
list_parser = subparsers.add_parser('list', help='Вывести список всех студентов')
# Команда add - добавление нового студента
add_parser = subparsers.add_parser('add', help='Добавить нового студента')
add_parser.add_argument('--last', required=True, help='Фамилия')
add_parser.add_argument('--first', required=True, help='Имя')
add_parser.add_argument('--middle', required=True, help='Отчество')
add_parser.add_argument('--group', required=True, help='Группа')
add_parser.add_argument('--grades', help='Оценки через запятую')
# Команда update - обновление данных студента
update_parser = subparsers.add_parser('update', help='Обновить данные студента')
update_parser.add_argument('--id', required=True, help='ID студента')
update_parser.add_argument('--last', help='Фамилия')
update_parser.add_argument('--first', help='Имя')
update_parser.add_argument('--middle', help='Отчество')
update_parser.add_argument('--group', help='Группа')
update_parser.add_argument('--grades', help='Оценки через запятую')
# Команда delete - удаление студента
delete_parser = subparsers.add_parser('delete', help='Удалить студента')
delete_parser.add_argument('--id', required=True, help='ID студента')
# Команда get - получение данных студента по ID
get_parser = subparsers.add_parser('get', help='Получить данные студента по ID')
get_parser.add_argument('--id', required=True, help='ID студента')
# Команда search - поиск студентов по фамилии
search_parser = subparsers.add_parser('search', help='Поиск студентов по фамилии')
search_parser.add_argument('--last', required=True, help='Фамилия для поиска')
# Команда average - вычисление среднего балла
average_parser = subparsers.add_parser('average', help='Вычислить средний балл')
average_parser.add_argument('--id', help='ID студента (если не указан, вычисляется для всех)')
args = parser.parse_args()
# Создаем менеджер студентов
manager = StudentManager(args.file)
if args.command == 'list':
# Выводим список всех студентов
manager.print_students()
elif args.command == 'add':
# Парсим оценки
grades = []
if args.grades:
grades = manager._parse_grades(args.grades)
# Добавляем студента
student = manager.add_student(
args.last, args.first, args.middle, args.group, grades
)
print(f"Добавлен студент с ID: {student['id']}")
elif args.command == 'update':
# Подготавливаем данные для обновления
update_data = {}
if args.last:
update_data['last_name'] = args.last
if args.first:
update_data['first_name'] = args.first
if args.middle:
update_data['middle_name'] = args.middle
if args.group:
update_data['group'] = args.group
if args.grades:
update_data['grades'] = manager._parse_grades(args.grades)
# Обновляем данные студента
if manager.update_student(args.id, **update_data):
print(f"Данные студента с ID '{args.id}' успешно обновлены")
elif args.command == 'delete':
# Удаляем студента
if manager.delete_student(args.id):
print(f"Студент с ID '{args.id}' успешно удален")
elif args.command == 'get':
# Получаем данные студента
student = manager.get_student_by_id(args.id)
if student:
manager.print_students([student])
else:
print(f"Студент с ID '{args.id}' не найден")
elif args.command == 'search':
# Ищем студентов по фамилии
students = manager.search_by_last_name(args.last)
manager.print_students(students)
elif args.command == 'average':
if args.id:
# Вычисляем средний балл для конкретного студента
student = manager.get_student_by_id(args.id)
if student:
avg_grade = manager.calculate_average_grade(args.id)
print(f"Средний балл студента {student['last_name']} {student['first_name']}: {avg_grade:.2f}")
else:
print(f"Студент с ID '{args.id}' не найден")
else:
# Вычисляем средний балл для всех студентов
avg_grades = manager.calculate_average_grade()
print("Средний балл студентов:")
for student in manager.students:
avg = avg_grades[student["id"]]
print(f"{student['last_name']} {student['first_name']} ({student['group']}): {avg:.2f}")
else:
parser.print_help()
return 0
if __name__ == "__main__":
sys.exit(main())
Использование:
# Вывод списка всех студентов
python student_manager.py list
# Добавление нового студента
python student_manager.py add --last Иванов --first Иван --middle Иванович --group ИТ-101 --grades "5, 4, 5, 3"
# Поиск студентов по фамилии
python student_manager.py search --last Иванов
# Вычисление среднего балла
python student_manager.py average
29. Аналог grep для поиска строк в текстовых файла(РАБОТАЕТ)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import re
import glob
import argparse
from concurrent.futures import ThreadPoolExecutor
class PyGrep:
"""
Класс для поиска строк в текстовых файлах
"""
def __init__(self, pattern, ignore_case=False, whole_word=False, regexp=False,
line_number=False, count=False, recursive=False, color=True):
"""
Инициализирует искатель строк
Args:
pattern (str): Строка или регулярное выражение для поиска
ignore_case (bool): Игнорировать регистр
whole_word (bool): Искать только целые слова
regexp (bool): Использовать регулярные выражения
line_number (bool): Выводить номера строк
count (bool): Выводить только количество совпадений
recursive (bool): Рекурсивный поиск в поддиректориях
color (bool): Выделять совпадения цветом
"""
self.pattern = pattern
self.ignore_case = ignore_case
self.whole_word = whole_word
self.regexp = regexp
self.line_number = line_number
self.count = count
self.recursive = recursive
self.color = color and sys.stdout.isatty() # Используем цвет только если вывод в терминал
# Компилируем регулярное выражение
self._compile_pattern()
# Определяем кодировки для проверки
self.encodings = ['utf-8', 'latin-1', 'cp1251']
def _compile_pattern(self):
"""
Компилирует регулярное выражение для поиска
"""
pattern = self.pattern
if not self.regexp:
# Экранируем специальные символы в шаблоне
pattern = re.escape(pattern)
if self.whole_word:
# Добавляем границы слова
pattern = r'\b' + pattern + r'\b'
# Компилируем регулярное выражение
flags = re.IGNORECASE if self.ignore_case else 0
self.regex = re.compile(pattern, flags)
def _read_file(self, file_path):
"""
Читает файл, определяя кодировку
Args:
file_path (str): Путь к файлу
Returns:
tuple: (содержимое файла, кодировка) или (None, None) в случае ошибки
"""
for encoding in self.encodings:
try:
with open(file_path, 'r', encoding=encoding) as file:
return file.read(), encoding
except UnicodeDecodeError:
continue
except Exception as e:
print(f"Ошибка при чтении файла '{file_path}': {e}", file=sys.stderr)
return None, None
print(f"Не удалось определить кодировку файла '{file_path}'", file=sys.stderr)
return None, None
def _colorize_match(self, line, matches):
"""
Выделяет совпадения в строке цветом
Args:
line (str): Исходная строка
matches (list): Список объектов совпадений
Returns:
str: Строка с выделенными совпадениями
"""
if not self.color or not matches:
return line
# ANSI-цвета
HIGHLIGHT = '\033[1;31m' # Красный цвет, жирный
RESET = '\033[0m' # Сброс форматирования
# Сортируем совпадения в обратном порядке, чтобы не нарушать индексы
matches.sort(key=lambda m: m.start(), reverse=True)
result = line
for match in matches:
start, end = match.span()
result = result[:start] + HIGHLIGHT + result[start:end] + RESET + result[end:]
return result
def search_file(self, file_path):
"""
Ищет совпадения в файле
Args:
file_path (str): Путь к файлу
Returns:
list: Список найденных строк
"""
# Читаем файл
content, encoding = self._read_file(file_path)
if content is None:
return []
# Разбиваем на строки
lines = content.splitlines()
# Ищем совпадения
matches_count = 0
results = []
for i, line in enumerate(lines, 1):
# Находим все совпадения в строке
matches = list(self.regex.finditer(line))
if matches:
matches_count += len(matches)
if not self.count:
# Форматируем строку результата
result_line = ""
# Добавляем имя файла
result_line += f"{file_path}:"
# Добавляем номер строки
if self.line_number:
result_line += f"{i}:"
# Добавляем содержимое строки с выделением совпадений
result_line += self._colorize_match(line, matches)
results.append(result_line)
# Если нужно только количество совпадений
if self.count and matches_count > 0:
results.append(f"{file_path}: {matches_count}")
return results
def search_files(self, file_patterns, max_workers=None):
"""
Ищет совпадения в нескольких файлах
Args:
file_patterns (list): Список шаблонов файлов
max_workers (int): Максимальное количество потоков
Returns:
list: Список найденных строк
"""
# Получаем список файлов
files = []
for pattern in file_patterns:
# Для каждого шаблона находим соответствующие файлы
if self.recursive and os.path.isdir(pattern):
# Рекурсивный поиск во всех файлах в директории
for root, _, filenames in os.walk(pattern):
for filename in filenames:
files.append(os.path.join(root, filename))
else:
# Используем glob для обработки шаблонов имен файлов
matched_files = glob.glob(pattern, recursive=self.recursive)
files.extend([f for f in matched_files if os.path.isfile(f)])
if not files:
print("Файлы не найдены", file=sys.stderr)
return []
all_results = []
# Параллельный поиск
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Запускаем поиск в каждом файле
future_to_file = {executor.submit(self.search_file, file): file for file in files}
# Собираем результаты
for future in future_to_file:
results = future.result()
all_results.extend(results)
return all_results
def main():
parser = argparse.ArgumentParser(description='Поиск строк в текстовых файлах')
parser.add_argument('pattern', help='Строка или регулярное выражение для поиска')
parser.add_argument('files', nargs='+', help='Файлы или шаблоны файлов для поиска')
parser.add_argument('-i', '--ignore-case', action='store_true',
help='Игнорировать регистр')
parser.add_argument('-w', '--word', action='store_true',
help='Искать только целые слова')
parser.add_argument('-E', '--regexp', action='store_true',
help='Использовать регулярные выражения')
parser.add_argument('-n', '--line-number', action='store_true',
help='Выводить номера строк')
parser.add_argument('-c', '--count', action='store_true',
help='Выводить только количество совпадений')
parser.add_argument('-r', '--recursive', action='store_true',
help='Рекурсивный поиск в поддиректориях')
parser.add_argument('--no-color', action='store_true',
help='Не выделять совпадения цветом')
parser.add_argument('-j', '--jobs', type=int, default=None,
help='Максимальное количество параллельных процессов')
args = parser.parse_args()
# Создаем искатель строк
grep = PyGrep(
args.pattern,
ignore_case=args.ignore_case,
whole_word=args.word,
regexp=args.regexp,
line_number=args.line_number,
count=args.count,
recursive=args.recursive,
color=not args.no_color
)
# Выполняем поиск
results = grep.search_files(args.files, args.jobs)
# Выводим результаты
for result in results:
print(result)
# Возвращаем код выхода: 0, если найдены совпадения, иначе 1
return 0 if results else 1
if __name__ == "__main__":
sys.exit(main())
Примеры правильного использования
# Поиск слова "example" в файле text.txt
python pygrep.py example text.txt
# Поиск слова "error" во всех текстовых файлах текущей директории
python pygrep.py error *.txt
# Поиск с дополнительными опциями
python pygrep.py -i -n error log.txt
30. Программа для управления списком дел
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import datetime
import argparse
from colorama import init, Fore, Style
# Инициализация colorama для кросс-платформенной поддержки цветов
init()
class TaskManager:
"""
Класс для управления списком задач
"""
def __init__(self, tasks_file="tasks.txt"):
"""
Инициализирует менеджер задач
Args:
tasks_file (str): Путь к файлу с задачами
"""
self.tasks_file = tasks_file
self.tasks = []
# Загружаем задачи из файла
self.load_tasks()
def load_tasks(self):
"""
Загружает задачи из файла
"""
self.tasks = []
# Если файл не существует, создаем его
if not os.path.exists(self.tasks_file):
with open(self.tasks_file, 'w', encoding='utf-8') as f:
pass
return
# Читаем задачи из файла
try:
with open(self.tasks_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
# Пропускаем пустые строки
if not line:
continue
# Проверяем наличие флага выполнения
if line.startswith('[x] '):
completed = True
task = line[4:]
elif line.startswith('[ ] '):
completed = False
task = line[4:]
else:
# Если формат не соответствует, считаем задачу невыполненной
completed = False
task = line
# Проверяем наличие метки даты
date_tag = None
if ' #' in task:
task, tag = task.rsplit(' #', 1)
try:
# Пытаемся парсить дату из тега
date_tag = datetime.datetime.strptime(tag, '%Y-%m-%d').date()
except ValueError:
# Если не удалось парсить, считаем это обычным текстом
task = f"{task} #{tag}"
self.tasks.append({
'text': task,
'completed': completed,
'date': date_tag
})
except Exception as e:
print(f"Ошибка при загрузке задач: {e}")
def save_tasks(self):
"""
Сохраняет задачи в файл
"""
try:
with open(self.tasks_file, 'w', encoding='utf-8') as f:
for task in self.tasks:
# Формируем строку задачи
status = '[x]' if task['completed'] else '[ ]'
task_text = task['text']
# Добавляем метку даты, если она есть
if task['date']:
task_text = f"{task_text} #{task['date'].strftime('%Y-%m-%d')}"
# Записываем задачу в файл
f.write(f"{status} {task_text}\n")
except Exception as e:
print(f"Ошибка при сохранении задач: {e}")
def add_task(self, text, date=None):
"""
Добавляет новую задачу
Args:
text (str): Текст задачи
date (datetime.date): Дата для задачи
"""
self.tasks.append({
'text': text,
'completed': False,
'date': date
})
self.save_tasks()
print(f"Добавлена задача: {text}")
def complete_task(self, task_index):
"""
Отмечает задачу как выполненную
Args:
task_index (int): Индекс задачи
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Проверяем корректность индекса
if task_index < 0 or task_index >= len(self.tasks):
print(f"Ошибка: задача с индексом {task_index} не существует")
return False
# Отмечаем задачу как выполненную
task = self.tasks[task_index]
task['completed'] = True
self.save_tasks()
print(f"Задача отмечена как выполненная: {task['text']}")
return True
def uncomplete_task(self, task_index):
"""
Отмечает задачу как невыполненную
Args:
task_index (int): Индекс задачи
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Проверяем корректность индекса
if task_index < 0 or task_index >= len(self.tasks):
print(f"Ошибка: задача с индексом {task_index} не существует")
return False
# Отмечаем задачу как невыполненную
task = self.tasks[task_index]
task['completed'] = False
self.save_tasks()
print(f"Задача отмечена как невыполненная: {task['text']}")
return True
def remove_task(self, task_index):
"""
Удаляет задачу
Args:
task_index (int): Индекс задачи
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Проверяем корректность индекса
if task_index < 0 or task_index >= len(self.tasks):
print(f"Ошибка: задача с индексом {task_index} не существует")
return False
# Удаляем задачу
task = self.tasks.pop(task_index)
self.save_tasks()
print(f"Задача удалена: {task['text']}")
return True
def edit_task(self, task_index, text=None, date=None, completed=None):
"""
Редактирует задачу
Args:
task_index (int): Индекс задачи
text (str): Новый текст задачи
date (datetime.date): Новая дата для задачи
completed (bool): Новый статус выполнения
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Проверяем корректность индекса
if task_index < 0 or task_index >= len(self.tasks):
print(f"Ошибка: задача с индексом {task_index} не существует")
return False
# Обновляем задачу
task = self.tasks[task_index]
if text is not None:
task['text'] = text
if date is not None:
task['date'] = date
if completed is not None:
task['completed'] = completed
self.save_tasks()
print(f"Задача обновлена: {task['text']}")
return True
def clear_completed(self):
"""
Удаляет все выполненные задачи
Returns:
int: Количество удаленных задач
"""
# Находим выполненные задачи
completed_tasks = [task for task in self.tasks if task['completed']]
# Обновляем список задач
self.tasks = [task for task in self.tasks if not task['completed']]
self.save_tasks()
print(f"Удалено {len(completed_tasks)} выполненных задач")
return len(completed_tasks)
def list_tasks(self, show_completed=True, show_uncompleted=True, filter_date=None):
"""
Выводит список задач
Args:
show_completed (bool): Показывать выполненные задачи
show_uncompleted (bool): Показывать невыполненные задачи
filter_date (datetime.date): Фильтр по дате
"""
# Фильтруем задачи
filtered_tasks = []
for task in self.tasks:
# Проверяем статус выполнения
if task['completed'] and not show_completed:
continue
if not task['completed'] and not show_uncompleted:
continue
# Проверяем дату
if filter_date is not None and task['date'] != filter_date:
continue
filtered_tasks.append(task)
# Если задач нет, выводим сообщение
if not filtered_tasks:
print("Нет задач для отображения")
return
# Выводим задачи
print("Список задач:")
for i, task in enumerate(filtered_tasks):
# Определяем цвет для задачи
if task['completed']:
color = Fore.GREEN
elif task['date'] and task['date'] < datetime.date.today():
color = Fore.RED
else:
color = Fore.WHITE
# Формируем строку задачи
status = "[x]" if task['completed'] else "[ ]"
task_text = task['text']
# Добавляем метку даты, если она есть
date_str = f" #{task['date'].strftime('%Y-%m-%d')}" if task['date'] else ""
# Выводим задачу
print(f"{i}. {color}{status} {task_text}{date_str}{Style.RESET_ALL}")
def interactive(self):
"""
Запускает интерактивный режим
"""
print("=== Менеджер задач ===")
print("Команды:")
print(" list - показать список задач")
print(" add <текст> - добавить задачу")
print(" complete <индекс> - отметить задачу как выполненную")
print(" uncomplete <индекс> - отметить задачу как невыполненную")
print(" remove <индекс> - удалить задачу")
print(" clear - удалить все выполненные задачи")
print(" exit - выйти из программы")
while True:
# Выводим список задач
self.list_tasks()
# Запрашиваем команду
command = input("\nВведите команду: ").strip()
if not command:
continue
# Разбиваем команду на части
parts = command.split(maxsplit=1)
cmd = parts[0].lower()
# Обрабатываем команду
if cmd == 'list':
# Показать список задач (уже показан)
pass
elif cmd == 'add':
# Добавить задачу
if len(parts) < 2:
print("Ошибка: не указан текст задачи")
else:
self.add_task(parts[1])
elif cmd == 'complete':
# Отметить задачу как выполненную
if len(parts) < 2:
print("Ошибка: не указан индекс задачи")
else:
try:
task_index = int(parts[1])
self.complete_task(task_index)
except ValueError:
print("Ошибка: индекс должен быть числом")
elif cmd == 'uncomplete':
# Отметить задачу как невыполненную
if len(parts) < 2:
print("Ошибка: не указан индекс задачи")
else:
try:
task_index = int(parts[1])
self.uncomplete_task(task_index)
except ValueError:
print("Ошибка: индекс должен быть числом")
elif cmd == 'remove':
# Удалить задачу
if len(parts) < 2:
print("Ошибка: не указан индекс задачи")
else:
try:
task_index = int(parts[1])
self.remove_task(task_index)
except ValueError:
print("Ошибка: индекс должен быть числом")
elif cmd == 'clear':
# Удалить все выполненные задачи
confirm = input("Вы уверены, что хотите удалить все выполненные задачи? (y/n): ").strip().lower()
if confirm == 'y':
self.clear_completed()
elif cmd == 'exit':
# Выйти из программы
print("До свидания!")
break
else:
print(f"Неизвестная команда: {cmd}")
print()
def parse_date(date_str):
"""
Парсит строку даты
Args:
date_str (str): Строка с датой
Returns:
datetime.date: Объект даты или None, если не удалось распарсить
"""
if not date_str:
return None
# Пытаемся парсить дату в разных форматах
formats = ['%Y-%m-%d', '%d.%m.%Y', '%d/%m/%Y']
for fmt in formats:
try:
return datetime.datetime.strptime(date_str, fmt).date()
except ValueError:
continue
print(f"Ошибка: не удалось распарсить дату '{date_str}'")
return None
def main():
parser = argparse.ArgumentParser(description='Менеджер задач')
parser.add_argument('--file', '-f', default='tasks.txt',
help='Путь к файлу с задачами (по умолчанию: tasks.txt)')
subparsers = parser.add_subparsers(dest='command', help='Команда')
# Команда list - вывод списка задач
list_parser = subparsers.add_parser('list', help='Вывести список задач')
list_parser.add_argument('--all', '-a', action='store_true',
help='Показать все задачи')
list_parser.add_argument('--completed', '-c', action='store_true',
help='Показать только выполненные задачи')
list_parser.add_argument('--uncompleted', '-u', action='store_true',
help='Показать только невыполненные задачи')
list_parser.add_argument('--date', '-d',
help='Фильтр по дате (формат: YYYY-MM-DD)')
# Команда add - добавление задачи
add_parser = subparsers.add_parser('add', help='Добавить задачу')
add_parser.add_argument('text', help='Текст задачи')
add_parser.add_argument('--date', '-d',
help='Дата для задачи (формат: YYYY-MM-DD)')
# Команда complete - отметка задачи как выполненной
complete_parser = subparsers.add_parser('complete', help='Отметить задачу как выполненную')
complete_parser.add_argument('index', type=int, help='Индекс задачи')
# Команда uncomplete - отметка задачи как невыполненной
uncomplete_parser = subparsers.add_parser('uncomplete', help='Отметить задачу как невыполненную')
uncomplete_parser.add_argument('index', type=int, help='Индекс задачи')
# Команда remove - удаление задачи
remove_parser = subparsers.add_parser('remove', help='Удалить задачу')
remove_parser.add_argument('index', type=int, help='Индекс задачи')
# Команда edit - редактирование задачи
edit_parser = subparsers.add_parser('edit', help='Редактировать задачу')
edit_parser.add_argument('index', type=int, help='Индекс задачи')
edit_parser.add_argument('--text', '-t', help='Новый текст задачи')
edit_parser.add_argument('--date', '-d', help='Новая дата для задачи (формат: YYYY-MM-DD)')
edit_parser.add_argument('--completed', '-c', action='store_true',
help='Отметить задачу как выполненную')
edit_parser.add_argument('--uncompleted', '-u', action='store_true',
help='Отметить задачу как невыполненную')
# Команда clear - удаление выполненных задач
clear_parser = subparsers.add_parser('clear', help='Удалить все выполненные задачи')
# Команда interactive - интерактивный режим
interactive_parser = subparsers.add_parser('interactive', help='Запустить интерактивный режим')
args = parser.parse_args()
# Создаем менеджер задач
manager = TaskManager(args.file)
if args.command == 'list':
# Определяем параметры фильтрации
show_completed = True
show_uncompleted = True
if args.completed:
show_uncompleted = False
elif args.uncompleted:
show_completed = False
# Парсим дату
filter_date = parse_date(args.date)
# Выводим список задач
manager.list_tasks(show_completed, show_uncompleted, filter_date)
elif args.command == 'add':
# Парсим дату
date = parse_date(args.date)
# Добавляем задачу
manager.add_task(args.text, date)
elif args.command == 'complete':
# Отмечаем задачу как выполненную
manager.complete_task(args.index)
elif args.command == 'uncomplete':
# Отмечаем задачу как невыполненную
manager.uncomplete_task(args.index)
elif args.command == 'remove':
# Удаляем задачу
manager.remove_task(args.index)
elif args.command == 'edit':
# Определяем статус выполнения
completed = None
if args.completed:
completed = True
elif args.uncompleted:
completed = False
# Парсим дату
date = parse_date(args.date)
# Редактируем задачу
manager.edit_task(args.index, args.text, date, completed)
elif args.command == 'clear':
# Удаляем выполненные задачи
manager.clear_completed()
elif args.command == 'interactive':
# Запускаем интерактивный режим
manager.interactive()
else:
# Если команда не указана, выводим список задач
manager.list_tasks()
return 0
if __name__ == "__main__":
sys.exit(main())
Использование:
# Запуск интерактивного режима
python task_manager.py interactive
# Добавление задачи
python task_manager.py add "Купить молоко" --date 2023-05-18
# Вывод списка задач
python task_manager.py list
# Отметка задачи как выполненной
python task_manager.py complete 0
# Удаление выполненных задач
python task_manager.py clear
31. Модуль для применения фильтров к изображениям
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import argparse
from PIL import Image, ImageOps, ImageFilter, ImageEnhance
class ImageProcessor:
"""
Класс для обработки изображений и применения фильтров
"""
def __init__(self):
"""
Инициализирует обработчик изображений
"""
self.image = None
self.original_image = None
def load_image(self, image_path):
"""
Загружает изображение из файла
Args:
image_path (str): Путь к изображению
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Проверяем существование файла
if not os.path.isfile(image_path):
print(f"Ошибка: Файл '{image_path}' не существует")
return False
# Загружаем изображение
self.image = Image.open(image_path)
self.original_image = self.image.copy()
# Выводим информацию о загруженном изображении
print(f"Загружено изображение: {image_path}")
print(f"Размер: {self.image.width}x{self.image.height}")
print(f"Формат: {self.image.format}")
print(f"Режим: {self.image.mode}")
return True
except Exception as e:
print(f"Ошибка при загрузке изображения: {e}")
return False
def save_image(self, output_path, format=None, quality=95):
"""
Сохраняет обработанное изображение
Args:
output_path (str): Путь для сохранения изображения
format (str): Формат изображения (если None, определяется из расширения)
quality (int): Качество изображения (для JPEG)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.image is None:
print("Ошибка: Изображение не загружено")
return False
try:
# Определяем формат из расширения, если не указан
if format is None:
format = os.path.splitext(output_path)[1][1:].upper()
# Для JPEG поддерживаются оба расширения
if format.lower() == 'jpg':
format = 'JPEG'
# Проверяем существование директории
os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True)
# Сохраняем изображение
self.image.save(output_path, format=format, quality=quality)
print(f"Изображение сохранено: {output_path}")
return True
except Exception as e:
print(f"Ошибка при сохранении изображения: {e}")
return False
def reset_image(self):
"""
Сбрасывает изображение к оригиналу
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.original_image is None:
print("Ошибка: Оригинальное изображение не загружено")
return False
self.image = self.original_image.copy()
print("Изображение сброшено к оригиналу")
return True
def resize_image(self, width=None, height=None, keep_aspect_ratio=True):
"""
Изменяет размер изображения
Args:
width (int): Новая ширина
height (int): Новая высота
keep_aspect_ratio (bool): Сохранять пропорции
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.image is None:
print("Ошибка: Изображение не загружено")
return False
try:
# Определяем новые размеры
original_width, original_height = self.image.size
if width is None and height is None:
print("Ошибка: Необходимо указать хотя бы один размер")
return False
if width is None:
# Определяем ширину пропорционально высоте
width = int(original_width * height / original_height)
if height is None:
# Определяем высоту пропорционально ширине
height = int(original_height * width / original_width)
# Если нужно сохранить пропорции, используем thumbnail
if keep_aspect_ratio:
self.image.thumbnail((width, height), Image.Resampling.LANCZOS)
else:
self.image = self.image.resize((width, height), Image.Resampling.LANCZOS)
print(f"Размер изображения изменен на {self.image.width}x{self.image.height}")
return True
except Exception as e:
print(f"Ошибка при изменении размера изображения: {e}")
return False
def rotate_image(self, angle, expand=True):
"""
Поворачивает изображение
Args:
angle (float): Угол поворота в градусах
expand (bool): Расширять холст при повороте
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.image is None:
print("Ошибка: Изображение не загружено")
return False
try:
self.image = self.image.rotate(angle, expand=expand, resample=Image.Resampling.BICUBIC)
print(f"Изображение повернуто на {angle} градусов")
return True
except Exception as e:
print(f"Ошибка при повороте изображения: {e}")
return False
def crop_image(self, left, top, right, bottom):
"""
Обрезает изображение
Args:
left (int): Левая координата
top (int): Верхняя координата
right (int): Правая координата
bottom (int): Нижняя координата
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.image is None:
print("Ошибка: Изображение не загружено")
return False
try:
width, height = self.image.size
# Проверяем корректность координат
if left < 0 or top < 0 or right > width or bottom > height or left >= right or top >= bottom:
print("Ошибка: Некорректные координаты для обрезки")
return False
self.image = self.image.crop((left, top, right, bottom))
print(f"Изображение обрезано до {self.image.width}x{self.image.height}")
return True
except Exception as e:
print(f"Ошибка при обрезке изображения: {e}")
return False
def apply_filter(self, filter_name):
"""
Применяет фильтр к изображению
Args:
filter_name (str): Название фильтра
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.image is None:
print("Ошибка: Изображение не загружено")
return False
try:
# Конвертируем изображение в RGB, если требуется
if self.image.mode not in ['RGB', 'RGBA']:
self.image = self.image.convert('RGB')
# Применяем выбранный фильтр
if filter_name == 'grayscale':
# Черно-белый фильтр
self.image = ImageOps.grayscale(self.image)
print("Применен фильтр: Черно-белый")
elif filter_name == 'sepia':
# Сепия
self.image = self.image.convert('RGB')
# Функция для преобразования пикселя в сепию
def sepia_filter(pixel):
r, g, b = pixel
tr = int(0.393 * r + 0.769 * g + 0.189 * b)
tg = int(0.349 * r + 0.686 * g + 0.168 * b)
tb = int(0.272 * r + 0.534 * g + 0.131 * b)
return min(255, tr), min(255, tg), min(255, tb)
# Применяем функцию к каждому пикселю
self.image = self.image.convert('RGB')
pixels = self.image.load()
width, height = self.image.size
for x in range(width):
for y in range(height):
pixels[x, y] = sepia_filter(pixels[x, y])
print("Применен фильтр: Сепия")
elif filter_name == 'negative':
# Негатив
self.image = ImageOps.invert(self.image)
print("Применен фильтр: Негатив")
elif filter_name == 'blur':
# Размытие
self.image = self.image.filter(ImageFilter.BLUR)
print("Применен фильтр: Размытие")
elif filter_name == 'sharpen':
# Увеличение резкости
self.image = self.image.filter(ImageFilter.SHARPEN)
print("Применен фильтр: Увеличение резкости")
elif filter_name == 'edge_enhance':
# Выделение краев
self.image = self.image.filter(ImageFilter.EDGE_ENHANCE)
print("Применен фильтр: Выделение краев")
elif filter_name == 'emboss':
# Тиснение
self.image = self.image.filter(ImageFilter.EMBOSS)
print("Применен фильтр: Тиснение")
elif filter_name == 'contour':
# Контур
self.image = self.image.filter(ImageFilter.CONTOUR)
print("Применен фильтр: Контур")
elif filter_name == 'detail':
# Детализация
self.image = self.image.filter(ImageFilter.DETAIL)
print("Применен фильтр: Детализация")
else:
print(f"Ошибка: Неизвестный фильтр '{filter_name}'")
return False
return True
except Exception as e:
print(f"Ошибка при применении фильтра: {e}")
return False
def adjust_brightness(self, factor):
"""
Регулирует яркость изображения
Args:
factor (float): Коэффициент яркости (0.0 - черное, 1.0 - без изменений, >1.0 - ярче)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.image is None:
print("Ошибка: Изображение не загружено")
return False
try:
enhancer = ImageEnhance.Brightness(self.image)
self.image = enhancer.enhance(factor)
print(f"Яркость изменена с коэффициентом {factor}")
return True
except Exception as e:
print(f"Ошибка при изменении яркости: {e}")
return False
def adjust_contrast(self, factor):
"""
Регулирует контраст изображения
Args:
factor (float): Коэффициент контраста (0.0 - серое, 1.0 - без изменений, >1.0 - больший контраст)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.image is None:
print("Ошибка: Изображение не загружено")
return False
try:
enhancer = ImageEnhance.Contrast(self.image)
self.image = enhancer.enhance(factor)
print(f"Контраст изменен с коэффициентом {factor}")
return True
except Exception as e:
print(f"Ошибка при изменении контраста: {e}")
return False
def adjust_color(self, factor):
"""
Регулирует насыщенность цветов изображения
Args:
factor (float): Коэффициент насыщенности (0.0 - черно-белое, 1.0 - без изменений, >1.0 - более насыщенное)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.image is None:
print("Ошибка: Изображение не загружено")
return False
try:
enhancer = ImageEnhance.Color(self.image)
self.image = enhancer.enhance(factor)
print(f"Насыщенность цветов изменена с коэффициентом {factor}")
return True
except Exception as e:
print(f"Ошибка при изменении насыщенности цветов: {e}")
return False
def adjust_sharpness(self, factor):
"""
Регулирует резкость изображения
Args:
factor (float): Коэффициент резкости (0.0 - размытое, 1.0 - без изменений, >1.0 - более резкое)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.image is None:
print("Ошибка: Изображение не загружено")
return False
try:
enhancer = ImageEnhance.Sharpness(self.image)
self.image = enhancer.enhance(factor)
print(f"Резкость изменена с коэффициентом {factor}")
return True
except Exception as e:
print(f"Ошибка при изменении резкости: {e}")
return False
def add_border(self, width, color):
"""
Добавляет рамку к изображению
Args:
width (int): Ширина рамки в пикселях
color (tuple or str): Цвет рамки (RGB-кортеж или название цвета)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.image is None:
print("Ошибка: Изображение не загружено")
return False
try:
# Создаем новое изображение с рамкой
img_width, img_height = self.image.size
new_width = img_width + 2 * width
new_height = img_height + 2 * width
# Создаем новое изображение с цветом рамки
bordered_image = Image.new(self.image.mode, (new_width, new_height), color)
# Вставляем исходное изображение в центр
bordered_image.paste(self.image, (width, width))
self.image = bordered_image
print(f"Добавлена рамка шириной {width} пикселей")
return True
except Exception as e:
print(f"Ошибка при добавлении рамки: {e}")
return False
def flip_image(self, direction):
"""
Отражает изображение
Args:
direction (str): Направление отражения ('horizontal' или 'vertical')
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.image is None:
print("Ошибка: Изображение не загружено")
return False
try:
if direction == 'horizontal':
self.image = self.image.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
print("Изображение отражено по горизонтали")
elif direction == 'vertical':
self.image = self.image.transpose(Image.Transpose.FLIP_TOP_BOTTOM)
print("Изображение отражено по вертикали")
else:
print(f"Ошибка: Неизвестное направление отражения '{direction}'")
return False
return True
except Exception as e:
print(f"Ошибка при отражении изображения: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Обработка изображений и применение фильтров')
parser.add_argument('--input', '-i', required=True,
help='Путь к входному изображению')
parser.add_argument('--output', '-o', required=True,
help='Путь для сохранения обработанного изображения')
parser.add_argument('--filter', '-f', choices=[
'grayscale', 'sepia', 'negative', 'blur', 'sharpen',
'edge_enhance', 'emboss', 'contour', 'detail'],
help='Фильтр для применения')
parser.add_argument('--resize', nargs=2, type=int, metavar=('WIDTH', 'HEIGHT'),
help='Изменить размер изображения (ширина высота)')
parser.add_argument('--rotate', type=float,
help='Повернуть изображение на указанный угол в градусах')
parser.add_argument('--crop', nargs=4, type=int, metavar=('LEFT', 'TOP', 'RIGHT', 'BOTTOM'),
help='Обрезать изображение (левая верхняя правая нижняя)')
parser.add_argument('--brightness', type=float,
help='Коэффициент яркости (0.0 - черное, 1.0 - без изменений, >1.0 - ярче)')
parser.add_argument('--contrast', type=float,
help='Коэффициент контраста (0.0 - серое, 1.0 - без изменений, >1.0 - больший контраст)')
parser.add_argument('--color', type=float,
help='Коэффициент насыщенности цветов (0.0 - черно-белое, 1.0 - без изменений)')
parser.add_argument('--sharpness', type=float,
help='Коэффициент резкости (0.0 - размытое, 1.0 - без изменений, >1.0 - более резкое)')
parser.add_argument('--border', nargs=2, metavar=('WIDTH', 'COLOR'),
help='Добавить рамку (ширина цвет)')
parser.add_argument('--flip', choices=['horizontal', 'vertical'],
help='Отразить изображение (horizontal/vertical)')
parser.add_argument('--quality', type=int, default=95,
help='Качество выходного изображения (для JPEG, по умолчанию: 95)')
args = parser.parse_args()
# Создаем обработчик изображений
processor = ImageProcessor()
# Загружаем изображение
if not processor.load_image(args.input):
return 1
# Применяем операции в порядке их указания
operations_count = 0
# Изменение размера
if args.resize:
width, height = args.resize
if processor.resize_image(width, height):
operations_count += 1
# Поворот
if args.rotate is not None:
if processor.rotate_image(args.rotate):
operations_count += 1
# Обрезка
if args.crop:
left, top, right, bottom = args.crop
if processor.crop_image(left, top, right, bottom):
operations_count += 1
# Фильтр
if args.filter:
if processor.apply_filter(args.filter):
operations_count += 1
# Яркость
if args.brightness is not None:
if processor.adjust_brightness(args.brightness):
operations_count += 1
# Контраст
if args.contrast is not None:
if processor.adjust_contrast(args.contrast):
operations_count += 1
# Насыщенность цветов
if args.color is not None:
if processor.adjust_color(args.color):
operations_count += 1
# Резкость
if args.sharpness is not None:
if processor.adjust_sharpness(args.sharpness):
operations_count += 1
# Рамка
if args.border:
width = int(args.border[0])
color = args.border[1]
if processor.add_border(width, color):
operations_count += 1
# Отражение
if args.flip:
if processor.flip_image(args.flip):
operations_count += 1
# Если не было выполнено ни одной операции, выводим предупреждение
if operations_count == 0:
print("Предупреждение: Не было применено ни одной операции к изображению")
# Сохраняем результат
if processor.save_image(args.output, quality=args.quality):
print(f"Обработка завершена. Выполнено операций: {operations_count}")
return 0
else:
return 1
if __name__ == "__main__":
sys.exit(main())
Использование:
# Применение черно-белого фильтра
python image_processor.py -i input.jpg -o output.jpg -f grayscale
# Изменение размера и применение фильтра сепии
python image_processor.py -i input.jpg -o output.jpg --resize 800 600 -f sepia
# Комплексная обработка
python image_processor.py -i input.jpg -o output.jpg --rotate 90 --contrast 1.5 --border 10 white
32. Модуль для работы с базой данных SQLite
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import sqlite3
import argparse
import csv
import json
from tabulate import tabulate
class DatabaseManager:
"""
Класс для работы с SQLite базой данных
"""
def __init__(self, db_path):
"""
Инициализирует менеджер базы данных
Args:
db_path (str): Путь к файлу базы данных
"""
self.db_path = db_path
self.conn = None
self.cursor = None
def connect(self):
"""
Устанавливает соединение с базой данных
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Создаем директорию для БД, если она не существует
db_dir = os.path.dirname(os.path.abspath(self.db_path))
os.makedirs(db_dir, exist_ok=True)
# Устанавливаем соединение
self.conn = sqlite3.connect(self.db_path)
# Настраиваем соединение
self.conn.execute("PRAGMA foreign_keys = ON")
# Создаем курсор
self.cursor = self.conn.cursor()
return True
except sqlite3.Error as e:
print(f"Ошибка при подключении к базе данных: {e}")
return False
except Exception as e:
print(f"Неожиданная ошибка: {e}")
return False
def disconnect(self):
"""
Закрывает соединение с базой данных
"""
if self.conn:
self.conn.close()
self.conn = None
self.cursor = None
def execute_query(self, query, params=None, commit=False):
"""
Выполняет SQL-запрос
Args:
query (str): SQL-запрос
params (tuple or dict): Параметры запроса
commit (bool): Выполнить фиксацию транзакции
Returns:
cursor: Курсор с результатами запроса или None в случае ошибки
"""
if not self.conn:
if not self.connect():
return None
try:
if params:
self.cursor.execute(query, params)
else:
self.cursor.execute(query)
if commit:
self.conn.commit()
return self.cursor
except sqlite3.Error as e:
print(f"Ошибка при выполнении запроса: {e}")
print(f"Запрос: {query}")
if params:
print(f"Параметры: {params}")
return None
def fetch_all(self, query, params=None):
"""
Выполняет запрос и возвращает все результаты
Args:
query (str): SQL-запрос
params (tuple or dict): Параметры запроса
Returns:
list: Список результатов или None в случае ошибки
"""
cursor = self.execute_query(query, params)
return cursor.fetchall() if cursor else None
def fetch_one(self, query, params=None):
"""
Выполняет запрос и возвращает первый результат
Args:
query (str): SQL-запрос
params (tuple or dict): Параметры запроса
Returns:
tuple: Результат запроса или None в случае ошибки или отсутствия результатов
"""
cursor = self.execute_query(query, params)
return cursor.fetchone() if cursor else None
def create_table(self, table_name, columns):
"""
Создает таблицу
Args:
table_name (str): Имя таблицы
columns (list): Список столбцов с их определениями
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Формируем SQL-запрос для создания таблицы
query = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})"
# Выполняем запрос
cursor = self.execute_query(query, commit=True)
return cursor is not None
def drop_table(self, table_name):
"""
Удаляет таблицу
Args:
table_name (str): Имя таблицы
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Формируем SQL-запрос для удаления таблицы
query = f"DROP TABLE IF EXISTS {table_name}"
# Выполняем запрос
cursor = self.execute_query(query, commit=True)
return cursor is not None
def insert_data(self, table_name, data):
"""
Вставляет данные в таблицу
Args:
table_name (str): Имя таблицы
data (dict): Словарь с данными {столбец: значение}
Returns:
int: ID вставленной записи или None в случае ошибки
"""
# Формируем SQL-запрос для вставки данных
columns = ', '.join(data.keys())
placeholders = ', '.join(['?' for _ in data])
query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
# Выполняем запрос
cursor = self.execute_query(query, tuple(data.values()), commit=True)
return cursor.lastrowid if cursor else None
def update_data(self, table_name, data, condition, condition_params):
"""
Обновляет данные в таблице
Args:
table_name (str): Имя таблицы
data (dict): Словарь с данными {столбец: значение}
condition (str): Условие для обновления
condition_params (tuple): Параметры условия
Returns:
int: Количество обновленных записей или None в случае ошибки
"""
# Формируем SQL-запрос для обновления данных
set_clause = ', '.join([f"{column} = ?" for column in data.keys()])
query = f"UPDATE {table_name} SET {set_clause} WHERE {condition}"
# Выполняем запрос
cursor = self.execute_query(query, tuple(data.values()) + condition_params, commit=True)
return cursor.rowcount if cursor else None
def delete_data(self, table_name, condition=None, condition_params=None): """
Удаляет данные из таблицы
Args:
table_name (str): Имя таблицы
condition (str): Условие для удаления (если None, удаляются все записи)
condition_params (tuple): Параметры условия
Returns:
int: Количество удаленных записей или None в случае ошибки
"""
# Формируем SQL-запрос для удаления данных
if condition:
query = f"DELETE FROM {table_name} WHERE {condition}"
else:
query = f"DELETE FROM {table_name}"
# Выполняем запрос
cursor = self.execute_query(query, condition_params, commit=True)
return cursor.rowcount if cursor else None
def get_table_info(self, table_name):
"""
Получает информацию о столбцах таблицы
Args:
table_name (str): Имя таблицы
Returns:
list: Список информации о столбцах или None в случае ошибки
"""
# Проверяем существование таблицы
check_query = f"SELECT name FROM sqlite_master WHERE type='table' AND name=?"
result = self.fetch_one(check_query, (table_name,))
if not result:
print(f"Таблица '{table_name}' не существует")
return None
# Получаем информацию о столбцах
query = f"PRAGMA table_info({table_name})"
return self.fetch_all(query)
def export_to_csv(self, table_name, csv_file, delimiter=','):
"""
Экспортирует таблицу в CSV-файл
Args:
table_name (str): Имя таблицы
csv_file (str): Путь к CSV-файлу
delimiter (str): Разделитель
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Получаем информацию о столбцах
columns_info = self.get_table_info(table_name)
if not columns_info:
return False
# Извлекаем названия столбцов
column_names = [column[1] for column in columns_info]
# Получаем данные
query = f"SELECT * FROM {table_name}"
rows = self.fetch_all(query)
if rows is None:
return False
try:
# Создаем директорию для CSV, если она не существует
csv_dir = os.path.dirname(os.path.abspath(csv_file))
os.makedirs(csv_dir, exist_ok=True)
# Записываем данные в CSV
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f, delimiter=delimiter)
# Записываем заголовки
writer.writerow(column_names)
# Записываем данные
writer.writerows(rows)
print(f"Таблица '{table_name}' экспортирована в '{csv_file}'")
return True
except Exception as e:
print(f"Ошибка при экспорте в CSV: {e}")
return False
def import_from_csv(self, table_name, csv_file, delimiter=',', create_table=False):
"""
Импортирует данные из CSV-файла в таблицу
Args:
table_name (str): Имя таблицы
csv_file (str): Путь к CSV-файлу
delimiter (str): Разделитель
create_table (bool): Создать таблицу, если она не существует
Returns:
int: Количество импортированных записей или None в случае ошибки
"""
try:
# Читаем CSV-файл
with open(csv_file, 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f, delimiter=delimiter)
# Читаем заголовки
headers = next(reader)
# Если таблица не существует и нужно ее создать
if create_table:
# Получаем информацию о таблице
table_exists = self.fetch_one(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
)
if not table_exists:
# Создаем определения столбцов (все как TEXT)
columns = [f"{header} TEXT" for header in headers]
# Создаем таблицу
if not self.create_table(table_name, columns):
return None
# Начинаем транзакцию
self.conn.begin()
# Вставляем данные
count = 0
for row in reader:
# Проверяем соответствие количества столбцов
if len(row) != len(headers):
print(f"Предупреждение: строка {count+2} имеет неверное количество столбцов. Пропускаем.")
continue
# Формируем данные для вставки
data = dict(zip(headers, row))
# Вставляем данные
if self.insert_data(table_name, data) is not None:
count += 1
# Фиксируем транзакцию
self.conn.commit()
print(f"Импортировано {count} записей из '{csv_file}' в таблицу '{table_name}'")
return count
except FileNotFoundError:
print(f"Ошибка: Файл '{csv_file}' не найден")
return None
except Exception as e:
print(f"Ошибка при импорте из CSV: {e}")
self.conn.rollback() # Откатываем транзакцию в случае ошибки
return None
def display_table(self, table_name, limit=None, order_by=None):
"""
Выводит содержимое таблицы
Args:
table_name (str): Имя таблицы
limit (int): Ограничение количества строк
order_by (str): Столбец для сортировки
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Получаем информацию о столбцах
columns_info = self.get_table_info(table_name)
if not columns_info:
return False
# Извлекаем названия столбцов
column_names = [column[1] for column in columns_info]
# Формируем запрос
query = f"SELECT * FROM {table_name}"
if order_by:
query += f" ORDER BY {order_by}"
if limit:
query += f" LIMIT {limit}"
# Получаем данные
rows = self.fetch_all(query)
if rows is None:
return False
# Выводим таблицу
if rows:
print(tabulate(rows, headers=column_names, tablefmt="fancy_grid"))
print(f"Всего записей: {len(rows)}")
else:
print(f"Таблица '{table_name}' пуста")
return True
def execute_script(self, script_file):
"""
Выполняет SQL-скрипт из файла
Args:
script_file (str): Путь к файлу скрипта
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Проверяем существование файла
if not os.path.isfile(script_file):
print(f"Ошибка: Файл '{script_file}' не существует")
return False
# Читаем скрипт
with open(script_file, 'r', encoding='utf-8') as f:
script = f.read()
# Выполняем скрипт
if not self.conn:
if not self.connect():
return False
self.conn.executescript(script)
self.conn.commit()
print(f"Скрипт '{script_file}' выполнен успешно")
return True
except sqlite3.Error as e:
print(f"Ошибка при выполнении скрипта: {e}")
return False
except Exception as e:
print(f"Неожиданная ошибка: {e}")
return False
def get_tables(self):
"""
Получает список таблиц в базе данных
Returns:
list: Список имен таблиц или None в случае ошибки
"""
query = "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
# Выполняем запрос
tables = self.fetch_all(query)
return [table[0] for table in tables] if tables else []
def backup_database(self, backup_path):
"""
Создает резервную копию базы данных
Args:
backup_path (str): Путь для сохранения резервной копии
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Создаем директорию для резервной копии, если она не существует
backup_dir = os.path.dirname(os.path.abspath(backup_path))
os.makedirs(backup_dir, exist_ok=True)
# Проверяем соединение
if not self.conn:
if not self.connect():
return False
# Создаем соединение с резервной копией
backup_conn = sqlite3.connect(backup_path)
# Выполняем резервное копирование
self.conn.backup(backup_conn)
# Закрываем соединение с резервной копией
backup_conn.close()
print(f"Резервная копия базы данных создана: '{backup_path}'")
return True
except sqlite3.Error as e:
print(f"Ошибка при создании резервной копии: {e}")
return False
except Exception as e:
print(f"Неожиданная ошибка: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Менеджер SQLite базы данных')
parser.add_argument('--db', required=True, help='Путь к файлу базы данных')
subparsers = parser.add_subparsers(dest='command', help='Команда')
# Команда tables - вывод списка таблиц
tables_parser = subparsers.add_parser('tables', help='Вывести список таблиц')
# Команда create - создание таблицы
create_parser = subparsers.add_parser('create', help='Создать таблицу')
create_parser.add_argument('--table', required=True, help='Имя таблицы')
create_parser.add_argument('--columns', required=True, nargs='+',
help='Определения столбцов (например, "id INTEGER PRIMARY KEY")')
# Команда drop - удаление таблицы
drop_parser = subparsers.add_parser('drop', help='Удалить таблицу')
drop_parser.add_argument('--table', required=True, help='Имя таблицы')
# Команда info - информация о таблице
info_parser = subparsers.add_parser('info', help='Получить информацию о таблице')
info_parser.add_argument('--table', required=True, help='Имя таблицы')
# Команда insert - вставка данных
insert_parser = subparsers.add_parser('insert', help='Вставить данные')
insert_parser.add_argument('--table', required=True, help='Имя таблицы')
insert_parser.add_argument('--data', required=True,
help='Данные в формате JSON (например, \'{"name": "John", "age": 30}\')')
# Команда query - выполнение запроса
query_parser = subparsers.add_parser('query', help='Выполнить запрос')
query_parser.add_argument('--query', required=True, help='SQL-запрос')
# Команда select - выборка данных
select_parser = subparsers.add_parser('select', help='Выбрать данные')
select_parser.add_argument('--table', required=True, help='Имя таблицы')
select_parser.add_argument('--where', help='Условие WHERE')
select_parser.add_argument('--limit', type=int, help='Ограничение количества строк')
select_parser.add_argument('--order', help='Столбец для сортировки')
# Команда delete - удаление данных
delete_parser = subparsers.add_parser('delete', help='Удалить данные')
delete_parser.add_argument('--table', required=True, help='Имя таблицы')
delete_parser.add_argument('--where', help='Условие WHERE')
# Команда update - обновление данных
update_parser = subparsers.add_parser('update', help='Обновить данные')
update_parser.add_argument('--table', required=True, help='Имя таблицы')
update_parser.add_argument('--data', required=True,
help='Данные в формате JSON (например, \'{"name": "John", "age": 30}\')')
update_parser.add_argument('--where', required=True, help='Условие WHERE')
# Команда export - экспорт в CSV
export_parser = subparsers.add_parser('export', help='Экспортировать данные в CSV')
export_parser.add_argument('--table', required=True, help='Имя таблицы')
export_parser.add_argument('--file', required=True, help='Путь к CSV-файлу')
export_parser.add_argument('--delimiter', default=',', help='Разделитель (по умолчанию: ,)')
# Команда import - импорт из CSV
import_parser = subparsers.add_parser('import', help='Импортировать данные из CSV')
import_parser.add_argument('--table', required=True, help='Имя таблицы')
import_parser.add_argument('--file', required=True, help='Путь к CSV-файлу')
import_parser.add_argument('--delimiter', default=',', help='Разделитель (по умолчанию: ,)')
import_parser.add_argument('--create', action='store_true',
help='Создать таблицу, если она не существует')
# Команда script - выполнение SQL-скрипта
script_parser = subparsers.add_parser('script', help='Выполнить SQL-скрипт')
script_parser.add_argument('--file', required=True, help='Путь к файлу скрипта')
# Команда backup - создание резервной копии
backup_parser = subparsers.add_parser('backup', help='Создать резервную копию базы данных')
backup_parser.add_argument('--file', required=True, help='Путь для сохранения резервной копии')
args = parser.parse_args()
# Создаем менеджер базы данных
db_manager = DatabaseManager(args.db)
# Выполняем команду
try:
if args.command == 'tables':
# Выводим список таблиц
db_manager.connect()
tables = db_manager.get_tables()
if tables:
print("Список таблиц:")
for i, table in enumerate(tables, 1):
print(f"{i}. {table}")
else:
print("База данных не содержит таблиц")
elif args.command == 'create':
# Создаем таблицу
db_manager.connect()
if db_manager.create_table(args.table, args.columns):
print(f"Таблица '{args.table}' создана")
else:
print(f"Ошибка при создании таблицы '{args.table}'")
elif args.command == 'drop':
# Удаляем таблицу
db_manager.connect()
if db_manager.drop_table(args.table):
print(f"Таблица '{args.table}' удалена")
else:
print(f"Ошибка при удалении таблицы '{args.table}'")
elif args.command == 'info':
# Получаем информацию о таблице
db_manager.connect()
columns = db_manager.get_table_info(args.table)
if columns:
# Выводим информацию о столбцах
headers = ["ID", "Имя", "Тип", "NotNull", "Default", "PK"]
print(f"Информация о таблице '{args.table}':")
print(tabulate(columns, headers=headers, tablefmt="fancy_grid"))
else:
print(f"Ошибка при получении информации о таблице '{args.table}'")
elif args.command == 'insert':
# Вставляем данные
db_manager.connect()
# Парсим JSON-данные
try:
data = json.loads(args.data)
except json.JSONDecodeError as e:
print(f"Ошибка при парсинге JSON: {e}")
return 1
# Вставляем данные
row_id = db_manager.insert_data(args.table, data)
if row_id is not None:
print(f"Данные вставлены в таблицу '{args.table}' (ID: {row_id})")
else:
print(f"Ошибка при вставке данных в таблицу '{args.table}'")
elif args.command == 'query':
# Выполняем запрос
db_manager.connect()
# Проверяем тип запроса
if args.query.strip().lower().startswith(('select', 'pragma')):
# Запрос на выборку
results = db_manager.fetch_all(args.query)
if results is not None:
if results:
# Выводим результаты
print(tabulate(results, tablefmt="fancy_grid"))
print(f"Всего записей: {len(results)}")
else:
print("Запрос не вернул результатов")
else:
print("Ошибка при выполнении запроса")
else:
# Запрос на изменение данных
cursor = db_manager.execute_query(args.query, commit=True)
if cursor:
print(f"Запрос выполнен успешно. Затронуто строк: {cursor.rowcount}")
else:
print("Ошибка при выполнении запроса")
elif args.command == 'select':
# Выбираем данные
db_manager.connect()
# Формируем запрос
query = f"SELECT * FROM {args.table}"
params = None
if args.where:
query += f" WHERE {args.where}"
if args.order:
query += f" ORDER BY {args.order}"
if args.limit:
query += f" LIMIT {args.limit}"
# Выполняем запрос
results = db_manager.fetch_all(query, params)
if results is not None:
# Получаем информацию о столбцах
columns_info = db_manager.get_table_info(args.table)
if columns_info:
# Извлекаем названия столбцов
column_names = [column[1] for column in columns_info]
if results:
# Выводим результаты
print(tabulate(results, headers=column_names, tablefmt="fancy_grid"))
print(f"Всего записей: {len(results)}")
else:
print("Запрос не вернул результатов")
else:
print(f"Ошибка при получении информации о таблице '{args.table}'")
else:
print("Ошибка при выполнении запроса")
elif args.command == 'delete':
# Удаляем данные
db_manager.connect()
count = db_manager.delete_data(args.table, args.where, None)
if count is not None:
print(f"Удалено {count} записей из таблицы '{args.table}'")
else:
print(f"Ошибка при удалении данных из таблицы '{args.table}'")
elif args.command == 'update':
# Обновляем данные
db_manager.connect()
# Парсим JSON-данные
try:
data = json.loads(args.data)
except json.JSONDecodeError as e:
print(f"Ошибка при парсинге JSON: {e}")
return 1
# Обновляем данные
count = db_manager.update_data(args.table, data, args.where, ())
if count is not None:
print(f"Обновлено {count} записей в таблице '{args.table}'")
else:
print(f"Ошибка при обновлении данных в таблице '{args.table}'")
elif args.command == 'export':
# Экспортируем данные в CSV
db_manager.connect()
if db_manager.export_to_csv(args.table, args.file, args.delimiter):
print(f"Данные из таблицы '{args.table}' экспортированы в '{args.file}'")
else:
print(f"Ошибка при экспорте данных из таблицы '{args.table}'")
elif args.command == 'import':
# Импортируем данные из CSV
db_manager.connect()
count = db_manager.import_from_csv(args.table, args.file, args.delimiter, args.create)
if count is not None:
print(f"Импортировано {count} записей в таблицу '{args.table}'")
else:
print(f"Ошибка при импорте данных в таблицу '{args.table}'")
elif args.command == 'script':
# Выполняем SQL-скрипт
db_manager.connect()
if db_manager.execute_script(args.file):
print(f"Скрипт '{args.file}' выполнен успешно")
else:
print(f"Ошибка при выполнении скрипта '{args.file}'")
elif args.command == 'backup':
# Создаем резервную копию
db_manager.connect()
if db_manager.backup_database(args.file):
print(f"Резервная копия базы данных создана: '{args.file}'")
else:
print(f"Ошибка при создании резервной копии базы данных")
else:
# Выводим справку, если команда не указана
parser.print_help()
finally:
# Закрываем соединение с базой данных
db_manager.disconnect()
return 0
if __name__ == "__main__":
sys.exit(main())
Использование:
# Создание таблицы
python db_manager.py --db mydatabase.db create --table users --columns "id INTEGER PRIMARY KEY" "name TEXT" "age INTEGER"
# Вставка данных
python db_manager.py --db mydatabase.db insert --table users --data '{\"name": \"John Doe", \"age": 30}' в повершел
# Выборка данных
python db_manager.py --db mydatabase.db select --table users --where "age > 18" --order "name"
# Экспорт в CSV
python db_manager.py --db mydatabase.db export --table users --file users.csv
33. Модуль для анализа текстового файла
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import re
import argparse
import json
from collections import Counter
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import matplotlib.pyplot as plt
from wordcloud import WordCloud
class TextAnalyzer:
"""
Класс для анализа текстовых файлов
"""
def __init__(self, language='english'):
"""
Инициализирует анализатор текста
Args:
language (str): Язык текста (определяет список стоп-слов)
"""
self.language = language
self.text = None
self.words = []
self.words_no_stopwords = []
self.word_counts = Counter()
self.word_counts_no_stopwords = Counter()
self.stop_words = set()
# Загружаем ресурсы NLTK
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
print("Загрузка ресурсов NLTK (punkt)...")
nltk.download('punkt', quiet=True)
try:
nltk.data.find(f'corpora/stopwords')
except LookupError:
print("Загрузка ресурсов NLTK (stopwords)...")
nltk.download('stopwords', quiet=True)
# Загружаем стоп-слова для выбранного языка
try:
self.stop_words = set(stopwords.words(self.language))
except Exception as e:
print(f"Ошибка при загрузке стоп-слов для языка '{self.language}': {e}")
print("Используется пустой набор стоп-слов")
self.stop_words = set()
def load_custom_stopwords(self, file_path):
"""
Загружает пользовательский список стоп-слов из файла
Args:
file_path (str): Путь к файлу со стоп-словами (по одному на строку)
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
custom_stopwords = {line.strip().lower() for line in f if line.strip()}
# Добавляем пользовательские стоп-слова к стандартным
self.stop_words.update(custom_stopwords)
print(f"Загружено {len(custom_stopwords)} пользовательских стоп-слов")
return True
except Exception as e:
print(f"Ошибка при загрузке пользовательских стоп-слов: {e}")
return False
def load_text(self, file_path):
"""
Загружает текст из файла
Args:
file_path (str): Путь к текстовому файлу
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Определяем кодировку файла
encodings = ['utf-8', 'latin-1', 'cp1251']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
self.text = f.read()
break
except UnicodeDecodeError:
continue
if self.text is None:
print(f"Ошибка: не удалось определить кодировку файла '{file_path}'")
return False
print(f"Загружен текст из файла: {file_path} (размер: {len(self.text)} символов)")
return True
except Exception as e:
print(f"Ошибка при загрузке текста: {e}")
return False
def preprocess_text(self):
"""
Предобрабатывает загруженный текст
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.text is None:
print("Ошибка: текст не загружен")
return False
try:
# Приводим текст к нижнему регистру
text_lower = self.text.lower()
# Токенизируем текст
self.words = word_tokenize(text_lower, language=self.language)
# Фильтруем слова: оставляем только буквенные токены
self.words = [word for word in self.words if word.isalpha()]
# Подсчитываем частоту слов
self.word_counts = Counter(self.words)
# Фильтруем стоп-слова
self.words_no_stopwords = [word for word in self.words if word not in self.stop_words]
# Подсчитываем частоту слов без стоп-слов
self.word_counts_no_stopwords = Counter(self.words_no_stopwords)
print(f"Предобработка текста завершена. Всего слов: {len(self.words)}, уникальных слов: {len(self.word_counts)}")
print(f"После удаления стоп-слов: {len(self.words_no_stopwords)} слов, {len(self.word_counts_no_stopwords)} уникальных слов")
return True
except Exception as e:
print(f"Ошибка при предобработке текста: {e}")
return False
def get_word_count(self):
"""
Возвращает общее количество слов
Returns:
int: Количество слов
"""
return len(self.words)
def get_unique_word_count(self):
"""
Возвращает количество уникальных слов
Returns:
int: Количество уникальных слов
"""
return len(self.word_counts)
def get_most_common_words(self, n=10, with_stopwords=False):
"""
Возвращает наиболее часто встречающиеся слова
Args:
n (int): Количество слов для вывода
with_stopwords (bool): Включать стоп-слова
Returns:
list: Список кортежей (слово, количество)
"""
if with_stopwords:
return self.word_counts.most_common(n)
else:
return self.word_counts_no_stopwords.most_common(n)
def get_least_common_words(self, n=10, with_stopwords=False):
"""
Возвращает наименее часто встречающиеся слова
Args:
n (int): Количество слов для вывода
with_stopwords (bool): Включать стоп-слова
Returns:
list: Список кортежей (слово, количество)
"""
if with_stopwords:
return self.word_counts.most_common()[:-n-1:-1]
else:
return self.word_counts_no_stopwords.most_common()[:-n-1:-1]
def get_word_frequency(self, word):
"""
Возвращает частоту указанного слова
Args:
word (str): Слово для поиска
Returns:
int: Частота слова
"""
return self.word_counts.get(word.lower(), 0)
def get_vocabulary_richness(self):
"""
Вычисляет богатство словарного запаса (отношение уникальных слов к общему количеству слов)
Returns:
float: Коэффициент богатства словарного запаса
"""
if not self.words:
return 0
return len(self.word_counts) / len(self.words)
def get_stopwords_percentage(self):
"""
Вычисляет процент стоп-слов в тексте
Returns:
float: Процент стоп-слов
"""
if not self.words:
return 0
return (len(self.words) - len(self.words_no_stopwords)) / len(self.words) * 100
def get_sentence_count(self):
"""
Возвращает количество предложений в тексте
Returns:
int: Количество предложений
"""
if self.text is None:
return 0
# Токенизируем текст на предложения
sentences = nltk.sent_tokenize(self.text, language=self.language)
return len(sentences)
def get_average_sentence_length(self):
"""
Вычисляет среднюю длину предложения (в словах)
Returns:
float: Средняя длина предложения
"""
if self.text is None:
return 0
# Токенизируем текст на предложения
sentences = nltk.sent_tokenize(self.text, language=self.language)
if not sentences:
return 0
# Подсчитываем количество слов в каждом предложении
words_per_sentence = [len(word_tokenize(sentence)) for sentence in sentences]
return sum(words_per_sentence) / len(sentences)
def get_average_word_length(self):
"""
Вычисляет среднюю длину слова (в символах)
Returns:
float: Средняя длина слова
"""
if not self.words:
return 0
# Подсчитываем длину каждого слова
word_lengths = [len(word) for word in self.words]
return sum(word_lengths) / len(self.words)
def save_results(self, output_file, n=20):
"""
Сохраняет результаты анализа в файл
Args:
output_file (str): Путь к выходному файлу
n (int): Количество слов для включения в отчет
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Создаем директорию для файла, если она не существует
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True)
# Формируем результаты анализа
results = {
"total_words": self.get_word_count(),
"unique_words": self.get_unique_word_count(),
"vocabulary_richness": self.get_vocabulary_richness(),
"stopwords_percentage": self.get_stopwords_percentage(),
"sentence_count": self.get_sentence_count(),
"average_sentence_length": self.get_average_sentence_length(),
"average_word_length": self.get_average_word_length(),
"most_common_words": self.get_most_common_words(n),
"most_common_words_with_stopwords": self.get_most_common_words(n, True),
"least_common_words": self.get_least_common_words(n)
}
# Определяем формат файла по расширению
_, ext = os.path.splitext(output_file)
if ext.lower() == '.json':
# Сохраняем в формате JSON
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=4)
else:
# Сохраняем в текстовом формате
with open(output_file, 'w', encoding='utf-8') as f:
f.write("Результаты анализа текста\n")
f.write("=======================\n\n")
f.write(f"Общее количество слов: {results['total_words']}\n")
f.write(f"Количество уникальных слов: {results['unique_words']}\n")
f.write(f"Богатство словарного запаса: {results['vocabulary_richness']:.4f}\n")
f.write(f"Процент стоп-слов: {results['stopwords_percentage']:.2f}%\n")
f.write(f"Количество предложений: {results['sentence_count']}\n")
f.write(f"Средняя длина предложения: {results['average_sentence_length']:.2f} слов\n")
f.write(f"Средняя длина слова: {results['average_word_length']:.2f} символов\n\n")
f.write("Наиболее часто встречающиеся слова (без стоп-слов):\n")
for word, count in results['most_common_words']:
f.write(f" {word}: {count}\n")
f.write("\nНаиболее часто встречающиеся слова (включая стоп-слова):\n")
for word, count in results['most_common_words_with_stopwords']:
f.write(f" {word}: {count}\n")
f.write("\nНаименее часто встречающиеся слова:\n")
for word, count in results['least_common_words']:
f.write(f" {word}: {count}\n")
print(f"Результаты анализа сохранены в файл: {output_file}")
return True
except Exception as e:
print(f"Ошибка при сохранении результатов: {e}")
return False
def generate_wordcloud(self, output_file, width=800, height=600, with_stopwords=False):
"""
Генерирует облако слов и сохраняет его в файл
Args:
output_file (str): Путь к выходному файлу
width (int): Ширина изображения
height (int): Высота изображения
with_stopwords (bool): Включать стоп-слова
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Создаем директорию для файла, если она не существует
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True)
# Выбираем словарь частот
word_freq = self.word_counts if with_stopwords else self.word_counts_no_stopwords
if not word_freq:
print("Ошибка: нет данных для генерации облака слов")
return False
# Генерируем облако слов
wordcloud = WordCloud(
width=width,
height=height,
background_color='white',
max_words=200,
contour_width=1,
contour_color='steelblue'
).generate_from_frequencies(word_freq)
# Сохраняем облако слов
plt.figure(figsize=(width/100, height/100), dpi=100)
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.tight_layout(pad=0)
plt.savefig(output_file, bbox_inches='tight')
plt.close()
print(f"Облако слов сохранено в файл: {output_file}")
return True
except Exception as e:
print(f"Ошибка при генерации облака слов: {e}")
return False
def plot_top_words(self, output_file, n=20, with_stopwords=False):
"""
Строит график частоты наиболее распространенных слов
Args:
output_file (str): Путь к выходному файлу
n (int): Количество слов для отображения
with_stopwords (bool): Включать стоп-слова
Returns:
bool: True в случае успеха, False в случае ошибки
"""
try:
# Создаем директорию для файла, если она не существует
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True)
# Получаем наиболее часто встречающиеся слова
top_words = self.get_most_common_words(n, with_stopwords)
if not top_words:
print("Ошибка: нет данных для построения графика")
return False
# Разделяем слова и частоты
words, frequencies = zip(*top_words)
# Создаем график
plt.figure(figsize=(12, 8))
# Строим горизонтальные столбцы
bars = plt.barh(range(len(words)), frequencies, align='center', color='steelblue')
# Настраиваем оси
plt.yticks(range(len(words)), words)
plt.xlabel('Частота')
plt.ylabel('Слово')
plt.title(f'Топ-{n} наиболее часто встречающихся слов{"" if with_stopwords else " (без стоп-слов)"}')
# Добавляем метки значений
for bar in bars:
width = bar.get_width()
plt.text(width + 0.5, bar.get_y() + bar.get_height()/2, f'{width}', ha='left', va='center')
# Инвертируем ось Y для отображения наиболее часто встречающихся слов сверху
plt.gca().invert_yaxis()
# Сохраняем график
plt.tight_layout()
plt.savefig(output_file)
plt.close()
print(f"График наиболее распространенных слов сохранен в файл: {output_file}")
return True
except Exception as e:
print(f"Ошибка при построении графика: {e}")
return False
def remove_stopwords_from_text(self, output_file):
"""
Удаляет стоп-слова из текста и сохраняет результат в файл
Args:
output_file (str): Путь к выходному файлу
Returns:
bool: True в случае успеха, False в случае ошибки
"""
if self.text is None:
print("Ошибка: текст не загружен")
return False
try:
# Создаем директорию для файла, если она не существует
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True)
# Токенизируем текст на предложения
sentences = nltk.sent_tokenize(self.text, language=self.language)
# Обрабатываем каждое предложение
filtered_sentences = []
for sentence in sentences:
# Токенизируем предложение на слова
words = word_tokenize(sentence, language=self.language)
# Фильтруем стоп-слова
filtered_words = [word for word in words if word.lower() not in self.stop_words]
# Собираем предложение обратно
filtered_sentence = ' '.join(filtered_words)
filtered_sentences.append(filtered_sentence)
# Собираем текст
filtered_text = '\n'.join(filtered_sentences)
# Сохраняем результат
with open(output_file, 'w', encoding='utf-8') as f:
f.write(filtered_text)
print(f"Текст без стоп-слов сохранен в файл: {output_file}")
return True
except Exception as e:
print(f"Ошибка при удалении стоп-слов: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Анализ текстового файла')
parser.add_argument('--input', '-i', required=True,
help='Путь к входному текстовому файлу')
parser.add_argument('--output', '-o',
help='Путь к выходному файлу с результатами анализа')
parser.add_argument('--language', '-l', default='english',
help='Язык текста (по умолчанию: english)')
parser.add_argument('--stopwords', '-s',
help='Путь к файлу с дополнительными стоп-словами')
parser.add_argument('--wordcloud', '-w',
help='Путь для сохранения облака слов')
parser.add_argument('--plot', '-p',
help='Путь для сохранения графика наиболее часто встречающихся слов')
parser.add_argument('--remove-stopwords', '-r',
help='Путь для сохранения текста без стоп-слов')
parser.add_argument('--top', '-t', type=int, default=20,
help='Количество наиболее часто встречающихся слов для отображения')
parser.add_argument('--with-stopwords', action='store_true',
help='Включать стоп-слова в результаты')
args = parser.parse_args()
# Создаем анализатор текста
analyzer = TextAnalyzer(args.language)
# Загружаем дополнительные стоп-слова, если указаны
if args.stopwords:
if not analyzer.load_custom_stopwords(args.stopwords):
return 1
# Загружаем текст
if not analyzer.load_text(args.input):
return 1
# Предобрабатываем текст
if not analyzer.preprocess_text():
return 1
# Выводим основные статистики
print("\nОсновные статистики:")
print(f"Общее количество слов: {analyzer.get_word_count()}")
print(f"Количество уникальных слов: {analyzer.get_unique_word_count()}")
print(f"Богатство словарного запаса: {analyzer.get_vocabulary_richness():.4f}")
print(f"Процент стоп-слов: {analyzer.get_stopwords_percentage():.2f}%")
print(f"Количество предложений: {analyzer.get_sentence_count()}")
print(f"Средняя длина предложения: {analyzer.get_average_sentence_length():.2f} слов")
print(f"Средняя длина слова: {analyzer.get_average_word_length():.2f} символов")
# Выводим наиболее часто встречающиеся слова
print(f"\nТоп-{args.top} наиболее часто встречающихся слов:")
for word, count in analyzer.get_most_common_words(args.top, args.with_stopwords):
print(f" {word}: {count}")
# Сохраняем результаты анализа, если указан выходной файл
if args.output:
if not analyzer.save_results(args.output, args.top):
return 1
# Генерируем облако слов, если указан путь
if args.wordcloud:
if not analyzer.generate_wordcloud(args.wordcloud, with_stopwords=args.with_stopwords):
return 1
# Строим график наиболее часто встречающихся слов, если указан путь
if args.plot:
if not analyzer.plot_top_words(args.plot, args.top, args.with_stopwords):
return 1
# Удаляем стоп-слова из текста, если указан путь
if args.remove_stopwords:
if not analyzer.remove_stopwords_from_text(args.remove_stopwords):
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
Использование:
# Базовый анализ текста
python text_analyzer.py -i mytext.txt -l russian
# Полный анализ с генерацией всех отчетов
python text_analyzer.py -i mytext.txt -l russian -o analysis.txt -w wordcloud.png -p top_words.png -r filtered_text.txt
# Анализ с пользовательскими стоп-словами
python text_analyzer.py -i mytext.txt -l english -s my_stopwords.txt -o analysis.json
Решение проблем с запуском скрипта TextAnalyzer
При запуске скрипта возникло две проблемы:
1. Отсутствие ресурсов NLTK для русского языка
Ошибка сообщает, что не найден ресурс punkt_tab для русского языка. Это происходит потому, что NLTK требует дополнительно загрузить языковые модели для корректной работы с русским текстом.
Решение:
Откройте Python в интерактивном режиме:
python
Загрузите необходимые ресурсы:
import nltk
nltk.download('punkt')
nltk.download('stopwords')
Если нужны специфические ресурсы для русского языка, используйте:
nltk.download('punkt', language='russian')
Если хотите загрузить все ресурсы сразу:
nltk.download('all')
(Внимание: этот вариант скачает большой объем данных)
2. Пустой файл с текстом
В сообщении также указано, что размер загруженного текстового файла - 0 символов. Это означает, что файл текст.txt либо пуст, либо не существует.
Решение:
- Убедитесь, что файл
текст.txt существует в текущей директории - Проверьте, что файл содержит текст (откройте его в любом текстовом редакторе)
- Если файл пуст, добавьте в него текст для анализа
- Если файл не находится в текущей директории, укажите полный путь к нему:
python 123.py --input C:\путь\к\вашему\текст.txt --output результаты.txt --wordcloud облако.png --plot график.png --language russian --top 30
Дополнительные советы:
Проверка установленных ресурсов NLTK:
import nltk
print(nltk.data.path) # Покажет пути, где NLTK ищет данные
Установка NLTK через pip (если не установлена):
pip install nltk
Для работы с визуализациями также убедитесь, что установлены библиотеки:
pip install matplotlib
pip install wordcloud
После установки необходимых ресурсов NLTK и проверки текстового файла попробуйте снова запустить скрипт.
34. Модуль для взаимодействия с API
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import json
import time
import argparse
import requests
import datetime
from tabulate import tabulate
import matplotlib.pyplot as plt
class APIClient:
"""
Базовый класс для взаимодействия с API
"""
def __init__(self, base_url, api_key=None, timeout=10):
"""
Инициализирует клиент API
Args:
base_url (str): Базовый URL API
api_key (str): Ключ API (если требуется)
timeout (int): Таймаут запросов в секундах
"""
self.base_url = base_url
self.api_key = api_key
self.timeout = timeout
self.session = requests.Session()
def _make_request(self, method, endpoint, params=None, data=None, headers=None, **kwargs):
"""
Выполняет HTTP-запрос к API
Args:
method (str): HTTP-метод ('GET', 'POST', 'PUT', 'DELETE')
endpoint (str): Конечная точка API
params (dict): Параметры строки запроса
data (dict): Данные для отправки в теле запроса
headers (dict): HTTP-заголовки
**kwargs: Дополнительные параметры для requests
Returns:
dict or str: Ответ от API
"""
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
# Объединяем заголовки
all_headers = {}
if headers:
all_headers.update(headers)
# Добавляем заголовок API-ключа, если он предоставлен
if self.api_key:
all_headers['Authorization'] = f"Bearer {self.api_key}"
try:
response = self.session.request(
method=method,
url=url,
params=params,
json=data,
headers=all_headers,
timeout=self.timeout,
**kwargs
)
# Проверяем статус ответа
response.raise_for_status()
# Пытаемся распарсить JSON
try:
return response.json()
except ValueError:
return response.text
except requests.exceptions.HTTPError as e:
print(f"HTTP-ошибка: {e}")
try:
error_data = response.json()
print(f"Детали ошибки: {json.dumps(error_data, indent=2)}")
except ValueError:
print(f"Ответ сервера: {response.text}")
return None
except requests.exceptions.ConnectionError:
print(f"Ошибка соединения: не удалось подключиться к {url}")
return None
except requests.exceptions.Timeout:
print(f"Ошибка таймаута: запрос к {url} занял слишком много времени")
return None
except requests.exceptions.RequestException as e:
print(f"Ошибка запроса: {e}")
return None
def get(self, endpoint, params=None, **kwargs):
"""
Выполняет GET-запрос к API
Args:
endpoint (str): Конечная точка API
params (dict): Параметры строки запроса
**kwargs: Дополнительные параметры для requests
Returns:
dict or str: Ответ от API
"""
return self._make_request('GET', endpoint, params=params, **kwargs)
def post(self, endpoint, data=None, params=None, **kwargs):
"""
Выполняет POST-запрос к API
Args:
endpoint (str): Конечная точка API
data (dict): Данные для отправки в теле запроса
params (dict): Параметры строки запроса
**kwargs: Дополнительные параметры для requests
Returns:
dict or str: Ответ от API
"""
return self._make_request('POST', endpoint, data=data, params=params, **kwargs)
def put(self, endpoint, data=None, params=None, **kwargs): """
Выполняет PUT-запрос к API
Args:
endpoint (str): Конечная точка API
data (dict): Данные для отправки в теле запроса
params (dict): Параметры строки запроса
**kwargs: Дополнительные параметры для requests
Returns:
dict or str: Ответ от API
"""
return self._make_request('PUT', endpoint, data=data, params=params, **kwargs)
def delete(self, endpoint, params=None, **kwargs):
"""
Выполняет DELETE-запрос к API
Args:
endpoint (str): Конечная точка API
params (dict): Параметры строки запроса
**kwargs: Дополнительные параметры для requests
Returns:
dict or str: Ответ от API
"""
return self._make_request('DELETE', endpoint, params=params, **kwargs)
class WeatherAPIClient(APIClient):
"""
Клиент для работы с OpenWeatherMap API
"""
def __init__(self, api_key, timeout=10):
"""
Инициализирует клиент OpenWeatherMap API
Args:
api_key (str): Ключ API OpenWeatherMap
timeout (int): Таймаут запросов в секундах
"""
super().__init__("https://api.openweathermap.org/data/2.5", timeout=timeout)
self.api_key = api_key
def get_current_weather(self, city, units='metric'):
"""
Получает текущую погоду для указанного города
Args:
city (str): Название города
units (str): Единицы измерения ('metric', 'imperial', 'standard')
Returns:
dict: Данные о текущей погоде
"""
params = {
'q': city,
'units': units,
'appid': self.api_key
}
return self.get('weather', params=params)
def get_weather_forecast(self, city, units='metric'):
"""
Получает прогноз погоды на 5 дней для указанного города
Args:
city (str): Название города
units (str): Единицы измерения ('metric', 'imperial', 'standard')
Returns:
dict: Данные о прогнозе погоды
"""
params = {
'q': city,
'units': units,
'appid': self.api_key
}
return self.get('forecast', params=params)
def get_weather_by_coordinates(self, lat, lon, units='metric'):
"""
Получает текущую погоду по координатам
Args:
lat (float): Широта
lon (float): Долгота
units (str): Единицы измерения ('metric', 'imperial', 'standard')
Returns:
dict: Данные о текущей погоде
"""
params = {
'lat': lat,
'lon': lon,
'units': units,
'appid': self.api_key
}
return self.get('weather', params=params)
def display_current_weather(self, city, units='metric'):
"""
Отображает текущую погоду для указанного города
Args:
city (str): Название города
units (str): Единицы измерения ('metric', 'imperial', 'standard')
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Получаем данные о погоде
weather_data = self.get_current_weather(city, units)
if not weather_data:
return False
try:
# Разбираем данные
city_name = weather_data.get('name', 'Неизвестно')
country = weather_data.get('sys', {}).get('country', '')
temperature = weather_data.get('main', {}).get('temp', 'Неизвестно')
feels_like = weather_data.get('main', {}).get('feels_like', 'Неизвестно')
humidity = weather_data.get('main', {}).get('humidity', 'Неизвестно')
pressure = weather_data.get('main', {}).get('pressure', 'Неизвестно')
weather_desc = weather_data.get('weather', [{}])[0].get('description', 'Неизвестно')
wind_speed = weather_data.get('wind', {}).get('speed', 'Неизвестно')
wind_dir = weather_data.get('wind', {}).get('deg', 'Неизвестно')
# Преобразуем направление ветра в текстовое описание
wind_direction = self._get_wind_direction(wind_dir)
# Определяем единицы измерения
temp_unit = '°C' if units == 'metric' else '°F' if units == 'imperial' else 'K'
speed_unit = 'м/с' if units == 'metric' else 'миль/ч' if units == 'imperial' else 'м/с'
# Формируем таблицу с данными
data = [
['Город', f"{city_name}, {country}"],
['Погода', weather_desc.capitalize()],
['Температура', f"{temperature} {temp_unit}"],
['Ощущается как', f"{feels_like} {temp_unit}"],
['Влажность', f"{humidity}%"],
['Давление', f"{pressure} гПа"],
['Ветер', f"{wind_speed} {speed_unit}, {wind_direction}"]
]
# Выводим таблицу
print(tabulate(data, tablefmt="fancy_grid"))
return True
except Exception as e:
print(f"Ошибка при обработке данных о погоде: {e}")
return False
def display_weather_forecast(self, city, units='metric'):
"""
Отображает прогноз погоды на 5 дней для указанного города
Args:
city (str): Название города
units (str): Единицы измерения ('metric', 'imperial', 'standard')
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Получаем данные о прогнозе погоды
forecast_data = self.get_weather_forecast(city, units)
if not forecast_data:
return False
try:
# Разбираем данные
city_name = forecast_data.get('city', {}).get('name', 'Неизвестно')
country = forecast_data.get('city', {}).get('country', '')
# Группируем прогноз по дням
daily_forecast = {}
for item in forecast_data.get('list', []):
timestamp = item.get('dt')
date = datetime.datetime.fromtimestamp(timestamp).date()
if date not in daily_forecast:
daily_forecast[date] = []
daily_forecast[date].append(item)
# Определяем единицы измерения
temp_unit = '°C' if units == 'metric' else '°F' if units == 'imperial' else 'K'
speed_unit = 'м/с' if units == 'metric' else 'миль/ч' if units == 'imperial' else 'м/с'
# Выводим прогноз по дням
print(f"Прогноз погоды для {city_name}, {country}:")
print("=" * 80)
for date, items in sorted(daily_forecast.items()):
print(f"\nДата: {date.strftime('%d.%m.%Y')}")
# Вычисляем средние значения для дня
temps = [item.get('main', {}).get('temp') for item in items]
min_temp = min(temps)
max_temp = max(temps)
avg_temp = sum(temps) / len(temps)
descriptions = [item.get('weather', [{}])[0].get('description', '') for item in items]
most_common_desc = max(set(descriptions), key=descriptions.count)
print(f"Погода: {most_common_desc.capitalize()}")
print(f"Температура: {avg_temp:.1f} {temp_unit} (мин: {min_temp:.1f}, макс: {max_temp:.1f})")
# Выводим почасовой прогноз
print("\nПочасовой прогноз:")
forecast_data = []
for item in items:
time = datetime.datetime.fromtimestamp(item.get('dt')).strftime('%H:%M')
temp = item.get('main', {}).get('temp')
desc = item.get('weather', [{}])[0].get('description', 'Неизвестно')
wind_speed = item.get('wind', {}).get('speed')
wind_dir = self._get_wind_direction(item.get('wind', {}).get('deg'))
forecast_data.append([time, f"{temp:.1f} {temp_unit}", desc.capitalize(), f"{wind_speed} {speed_unit}, {wind_dir}"])
headers = ["Время", "Температура", "Погода", "Ветер"]
print(tabulate(forecast_data, headers=headers, tablefmt="fancy_grid"))
return True
except Exception as e:
print(f"Ошибка при обработке данных о прогнозе погоды: {e}")
return False
def plot_temperature_forecast(self, city, output_file=None, units='metric'):
"""
Строит график прогноза температуры для указанного города
Args:
city (str): Название города
output_file (str): Путь для сохранения графика (если None, график отображается на экране)
units (str): Единицы измерения ('metric', 'imperial', 'standard')
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Получаем данные о прогнозе погоды
forecast_data = self.get_weather_forecast(city, units)
if not forecast_data:
return False
try:
# Разбираем данные
city_name = forecast_data.get('city', {}).get('name', 'Неизвестно')
country = forecast_data.get('city', {}).get('country', '')
# Подготавливаем данные для графика
timestamps = []
temperatures = []
feels_like_temperatures = []
for item in forecast_data.get('list', []):
timestamp = datetime.datetime.fromtimestamp(item.get('dt'))
temp = item.get('main', {}).get('temp')
feels_like = item.get('main', {}).get('feels_like')
timestamps.append(timestamp)
temperatures.append(temp)
feels_like_temperatures.append(feels_like)
# Определяем единицы измерения
temp_unit = '°C' if units == 'metric' else '°F' if units == 'imperial' else 'K'
# Создаем график
plt.figure(figsize=(12, 6))
plt.plot(timestamps, temperatures, label=f'Температура', marker='o')
plt.plot(timestamps, feels_like_temperatures, label=f'Ощущается как', marker='x', linestyle='--')
# Настройка графика
plt.title(f'Прогноз температуры для {city_name}, {country}')
plt.xlabel('Дата и время')
plt.ylabel(f'Температура ({temp_unit})')
plt.grid(True)
plt.legend()
# Форматируем ось X
plt.gcf().autofmt_xdate()
# Сохраняем или отображаем график
if output_file:
# Создаем директорию для сохранения, если она не существует
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True)
plt.savefig(output_file)
plt.close()
print(f"График сохранен в файл: {output_file}")
else:
plt.show()
return True
except Exception as e:
print(f"Ошибка при построении графика: {e}")
return False
def _get_wind_direction(self, degrees):
"""
Преобразует градусы направления ветра в текстовое описание
Args:
degrees (float): Направление ветра в градусах
Returns:
str: Текстовое описание направления ветра
"""
if degrees is None:
return "Неизвестно"
directions = [
"Северный", "Северо-восточный", "Восточный", "Юго-восточный",
"Южный", "Юго-западный", "Западный", "Северо-западный"
]
index = round(degrees / 45) % 8
return directions[index]
class GitHubAPIClient(APIClient):
"""
Клиент для работы с GitHub API
"""
def __init__(self, api_key=None, timeout=10):
"""
Инициализирует клиент GitHub API
Args:
api_key (str): Токен доступа GitHub (если требуется)
timeout (int): Таймаут запросов в секундах
"""
super().__init__("https://api.github.com", api_key=api_key, timeout=timeout)
def get_user(self, username):
"""
Получает информацию о пользователе GitHub
Args:
username (str): Имя пользователя
Returns:
dict: Информация о пользователе
"""
return self.get(f"users/{username}")
def get_repositories(self, username, sort='updated', per_page=30):
"""
Получает список репозиториев пользователя
Args:
username (str): Имя пользователя
sort (str): Сортировка ('created', 'updated', 'pushed', 'full_name')
per_page (int): Количество репозиториев на страницу
Returns:
list: Список репозиториев
"""
params = {
'sort': sort,
'per_page': per_page
}
return self.get(f"users/{username}/repos", params=params)
def search_repositories(self, query, sort='stars', order='desc', per_page=30):
"""
Выполняет поиск репозиториев
Args:
query (str): Поисковый запрос
sort (str): Сортировка ('stars', 'forks', 'updated')
order (str): Порядок ('asc', 'desc')
per_page (int): Количество репозиториев на страницу
Returns:
dict: Результаты поиска
"""
params = {
'q': query,
'sort': sort,
'order': order,
'per_page': per_page
}
return self.get('search/repositories', params=params)
def get_repository(self, owner, repo):
"""
Получает информацию о репозитории
Args:
owner (str): Владелец репозитория
repo (str): Название репозитория
Returns:
dict: Информация о репозитории
"""
return self.get(f"repos/{owner}/{repo}")
def get_repository_contributors(self, owner, repo):
"""
Получает список контрибьюторов репозитория
Args:
owner (str): Владелец репозитория
repo (str): Название репозитория
Returns:
list: Список контрибьюторов
"""
return self.get(f"repos/{owner}/{repo}/contributors")
def display_user_info(self, username):
"""
Отображает информацию о пользователе GitHub
Args:
username (str): Имя пользователя
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Получаем данные о пользователе
user_data = self.get_user(username)
if not user_data:
return False
try:
# Разбираем данные
name = user_data.get('name', 'Неизвестно')
login = user_data.get('login', username)
bio = user_data.get('bio', 'Нет информации')
location = user_data.get('location', 'Не указано')
company = user_data.get('company', 'Не указано')
public_repos = user_data.get('public_repos', 0)
followers = user_data.get('followers', 0)
following = user_data.get('following', 0)
created_at = user_data.get('created_at', 'Неизвестно')
# Преобразуем дату
if created_at != 'Неизвестно':
created_at = datetime.datetime.strptime(created_at, '%Y-%m-%dT%H:%M:%SZ').strftime('%d.%m.%Y')
# Формируем таблицу с данными
data = [
['Имя', f"{name} (@{login})"],
['О себе', bio],
['Местоположение', location],
['Компания', company],
['Репозитории', public_repos],
['Подписчики', followers],
['Подписки', following],
['Дата регистрации', created_at]
]
# Выводим таблицу
print(f"Информация о пользователе GitHub {username}:")
print(tabulate(data, tablefmt="fancy_grid"))
return True
except Exception as e:
print(f"Ошибка при обработке данных о пользователе: {e}")
return False
def display_repositories(self, username, sort='updated', per_page=10):
"""
Отображает список репозиториев пользователя
Args:
username (str): Имя пользователя
sort (str): Сортировка ('created', 'updated', 'pushed', 'full_name')
per_page (int): Количество репозиториев на страницу
Returns:
bool: True в случае успеха, False в случае ошибки
"""
# Получаем данные о репозиториях
repos_data = self.get_repositories(username, sort, per_page)
if not repos_data:
return False
try:
# Подготавливаем данные для таблицы
table_data = []
for repo in repos_data:
name = repo.get('name', 'Неизвестно')
description = repo.get('description', 'Нет описания')
language = repo.get('language', 'Не указан')
stars = repo.get('stargazers_count', 0)
forks = repo.get('forks_count', 0)
updated_at = repo.get('updated_at', 'Неизвестно')
# Обрезаем длинные описания
if description and len(description) > 50:
description = description[:47] + '...'
# Преобразуем дату
if updated_at != 'Неизвестно':
updated_at = datetime.datetime.strptime(updated_at, '%Y-%m-%dT%H:%M:%SZ').strftime('%d.%m.%Y')
table_data.append([name, description, language, stars, forks, updated_at])
# Выводим таблицу
headers = ['Название', 'Описание', 'Язык', 'Звезды', 'Форки', 'Обновлено']
print(f"Репозитории пользователя {username}:")
print(tabulate(table_data, headers=headers, tablefmt="fancy_grid"))
print(f"Показано {len(table_data)} из {len(repos_data)} репозиториев")
return True
except Exception as e:
print(f"Ошибка при обработке данных о репозиториях: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Модуль для взаимодействия с API')
subparsers = parser.add_subparsers(dest='api', help='API для использования')
# Парсер для OpenWeatherMap API
weather_parser = subparsers.add_parser('weather', help='OpenWeatherMap API')
weather_parser.add_argument('--api-key', required=True, help='API ключ OpenWeatherMap')
weather_parser.add_argument('--city', required=True, help='Город для получения погоды')
weather_parser.add_argument('--units', choices=['metric', 'imperial'], default='metric',
help='Единицы измерения (по умолчанию: metric)')
weather_parser.add_argument('--forecast', action='store_true', help='Показать прогноз погоды на 5 дней')
weather_parser.add_argument('--plot', help='Путь для сохранения графика прогноза температуры')
# Парсер для GitHub API
github_parser = subparsers.add_parser('github', help='GitHub API')
github_parser.add_argument('--api-key', help='Токен доступа GitHub')
github_subparsers = github_parser.add_subparsers(dest='github_command', help='Команда GitHub API')
# Команда user - получение информации о пользователе
github_user_parser = github_subparsers.add_parser('user', help='Получить информацию о пользователе')
github_user_parser.add_argument('--username', required=True, help='Имя пользователя GitHub')
# Команда repos - получение списка репозиториев
github_repos_parser = github_subparsers.add_parser('repos', help='Получить список репозиториев пользователя')
github_repos_parser.add_argument('--username', required=True, help='Имя пользователя GitHub')
github_repos_parser.add_argument('--sort', choices=['created', 'updated', 'pushed', 'full_name'],
default='updated', help='Сортировка репозиториев')
github_repos_parser.add_argument('--limit', type=int, default=10,
help='Максимальное количество репозиториев для отображения')
# Команда search - поиск репозиториев
github_search_parser = github_subparsers.add_parser('search', help='Поиск репозиториев')
github_search_parser.add_argument('--query', required=True, help='Поисковый запрос')
github_search_parser.add_argument('--sort', choices=['stars', 'forks', 'updated'],
default='stars', help='Сортировка результатов')
github_search_parser.add_argument('--limit', type=int, default=10,
help='Максимальное количество результатов для отображения')
args = parser.parse_args()
# Обработка команд для OpenWeatherMap API
if args.api == 'weather':
# Создаем клиент OpenWeatherMap API
weather_client = WeatherAPIClient(args.api_key)
if args.forecast:
# Показываем прогноз погоды
weather_client.display_weather_forecast(args.city, args.units)
else:
# Показываем текущую погоду
weather_client.display_current_weather(args.city, args.units)
# Строим график, если указан путь
if args.plot:
weather_client.plot_temperature_forecast(args.city, args.plot, args.units)
# Обработка команд для GitHub API
elif args.api == 'github':
# Создаем клиент GitHub API
github_client = GitHubAPIClient(args.api_key)
if args.github_command == 'user':
# Показываем информацию о пользователе
github_client.display_user_info(args.username)
elif args.github_command == 'repos':
# Показываем список репозиториев пользователя
github_client.display_repositories(args.username, args.sort, args.limit)
elif args.github_command == 'search':
# Выполняем поиск репозиториев
results = github_client.search_repositories(args.query, args.sort, 'desc', args.limit)
if results:
total_count = results.get('total_count', 0)
items = results.get('items', [])
# Подготавливаем данные для таблицы
table_data = []
for repo in items:
name = repo.get('full_name', 'Неизвестно')
description = repo.get('description', 'Нет описания')
language = repo.get('language', 'Не указан')
stars = repo.get('stargazers_count', 0)
forks = repo.get('forks_count', 0)
# Обрезаем длинные описания
if description and len(description) > 50:
description = description[:47] + '...'
table_data.append([name, description, language, stars, forks])
# Выводим таблицу
headers = ['Репозиторий', 'Описание', 'Язык', 'Звезды', 'Форки']
print(f"Результаты поиска по запросу '{args.query}':")
print(tabulate(table_data, headers=headers, tablefmt="fancy_grid"))
print(f"Показано {len(table_data)} из {total_count} результатов")
else:
print(f"Не удалось выполнить поиск по запросу '{args.query}'")
else:
github_parser.print_help()
else:
parser.print_help()
return 0
if __name__ == "__main__":
sys.exit(main())
Использование:
# Получение текущей погоды
python api_client.py weather --api-key YOUR_API_KEY --city Moscow
# Получение прогноза погоды с графиком
python api_client.py weather --api-key YOUR_API_KEY --city London --forecast --plot weather_forecast.png
# Получение информации о пользователе GitHub
python api_client.py github user --username torvalds
# Получение списка репозиториев пользователя
python api_client.py github repos --username microsoft --limit 15
# Поиск репозиториев
python api_client.py github search --query "language:python machine learning" --sort stars --limit 10
5kuv0cjs@b.occurredo.com
ОтветитьУдалить