Практический гайд по автоматизации процессов на Python

Вот подробный практический гайд по автоматизации процессов на Python для продвинутых разработчиков. Он фокусируется на промышленном уровне качества: архитектура, надёжность, наблюдаемость, упаковка и деплой. В каждом разделе — конкретные паттерны и готовые фрагменты кода.
1. Принципы промышленной автоматизации
- Идемпотентность: повторный запуск не должен ломать состояние. Используйте ключи идемпотентности, блокировки и проверку «already done».
- Наблюдаемость: структурированные логи, метрики, трассировки. Мин. набор —
structlog,prometheus_client,opentelemetry. - Контроль побочных эффектов: явные контракты для I/O (файлы, БД, сети), dry-run режим.
- Отказоустойчивость: экспоненциальные ретраи с джиттером, дедлайны, таймауты, отмена (cancellation).
- Конфигурация:
pydantic-settings/dynaconf/tomllib, секреты извне (env/Secrets Manager), без хардкода. - Распараллеливание:
asyncioдля I/O,multiprocessing/concurrent.futuresдля CPU, очереди задач. - Развёртывание: единый артефакт (Docker/ZipApp), версионирование, миграции, healthchecks.
- Тестирование: мокайте внешние системы, используйте snapshot/golden-тесты, property-based тесты (Hypothesis).
2. Базовые строительные блоки
2.1 Структурированные логи и метрики
# pip install structlog prometheus-client
import time
import structlog
from prometheus_client import Counter, Histogram, start_http_server
structlog.configure(processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
])
log = structlog.get_logger()
REQUESTS = Counter("jobs_total", "Total jobs processed", ["status"])
LATENCY = Histogram("job_latency_seconds", "Job latency")
def process_job(job_id: str):
start = time.perf_counter()
try:
# ... work ...
REQUESTS.labels("ok").inc()
log.info("job_done", job_id=job_id)
except Exception as e:
REQUESTS.labels("err").inc()
log.exception("job_failed", job_id=job_id, error=str(e))
raise
finally:
LATENCY.observe(time.perf_counter() - start)
if __name__ == "__main__":
start_http_server(9100) # метрики на :9100/metrics
process_job("42")
2.2 Конфигурация через Pydantic Settings
# pip install pydantic-settings
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="APP_", env_file=".env", extra="ignore")
db_url: str
workers: int = 8
dry_run: bool = False
settings = Settings()
2.3 Надёжные ретраи и таймауты
# pip install tenacity
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type
@retry(
retry=retry_if_exception_type((httpx.ReadTimeout, httpx.ConnectError)),
wait=wait_exponential_jitter(initial=0.2, max=5.0),
stop=stop_after_attempt(5),
reraise=True,
)
def fetch_json(url: str) -> dict:
with httpx.Client(timeout=5.0) as client:
r = client.get(url)
r.raise_for_status()
return r.json()
3. Архитектура CLI-инструмента (Typer + asyncio)
3.1 Каркас проекта
automation/
pyproject.toml
automation/
__init__.py
cli.py
config.py
log.py
workflows/
__init__.py
images.py
crawl.py
email_rules.py
utils/
fs.py
net.py
time.py
tests/
test_images.py
3.2 Typer CLI с асинхронными командами
# pip install typer[all] httpx anyio
import anyio
import typer
import httpx
from typing import Annotated
app = typer.Typer(add_completion=False)
async def _download_one(client: httpx.AsyncClient, url: str, out: str):
r = await client.get(url)
r.raise_for_status()
with open(out, "wb") as f:
f.write(r.content)
@app.command()
def download(
url: Annotated[str, typer.Argument(help="URL ресурса")],
out: Annotated[str, typer.Option("--out", "-o", help="Путь вывода")]
):
async def runner():
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
await _download_one(client, url, out)
anyio.run(runner)
if __name__ == "__main__":
app()
4. Оркестрация задач и планирование
4.1 APScheduler (локальный планировщик)
# pip install apscheduler
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def sync_folder():
# ... ваша логика синхронизации ...
print("sync!")
async def main():
scheduler = AsyncIOScheduler()
scheduler.add_job(sync_folder, "cron", minute="*/5") # каждые 5 минут
scheduler.start()
try:
await asyncio.Event().wait() # держим цикл
finally:
scheduler.shutdown()
if __name__ == "__main__":
asyncio.run(main())
4.2 Очереди задач: Celery (для распределённой нагрузки)
# pip install celery[redis]
from celery import Celery
app = Celery("auto", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1")
@app.task(acks_late=True, autoretry_for=(Exception,), retry_backoff=2, retry_jitter=True, max_retries=5)
def resize_image(path: str, out: str, width: int):
from PIL import Image
img = Image.open(path)
img.thumbnail((width, width*10_000), Image.Resampling.LANCZOS)
img.save(out)
4.3 Durable workflows (Temporal)
Если нужны долгоживущие саги/компенсации — рассмотрите Temporal (Python SDK). Плюсы: гарантия исполнения, таймауты, ретраи, версии воркфлоу, компенсационные шаги.
Паттерн саги:
- Оркестратор вызывает шаги A, B, C.
- При неуспехе C вызываются компенсации B⁻¹, A⁻¹.
5. Файловая автоматизация
5.1 Безопасная работа с subprocess
import subprocess
from pathlib import Path
def run_cmd(cmd: list[str], cwd: Path|None=None, timeout: float=30.0) -> str:
res = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, timeout=timeout, check=False)
if res.returncode != 0:
raise RuntimeError(f"cmd failed: {' '.join(cmd)}\nstdout={res.stdout}\nstderr={res.stderr}")
return res.stdout
5.2 Наблюдение за директорией (watchdog)
# pip install watchdog
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path
import time
class Handler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory:
print("new file:", event.src_path)
def watch(path: str):
obs = Observer()
obs.schedule(Handler(), Path(path), recursive=False)
obs.start()
try:
while True:
time.sleep(1)
finally:
obs.stop()
obs.join()
5.3 Паттерн идемпотентной обработки файлов
from pathlib import Path
import hashlib, json
STATE = Path(".state/processed.json")
STATE.parent.mkdir(exist_ok=True)
if STATE.exists():
processed = set(json.loads(STATE.read_text()))
else:
processed = set()
def file_key(p: Path) -> str:
h = hashlib.sha256()
h.update(p.read_bytes())
return h.hexdigest()
def mark_done(key: str):
processed.add(key)
STATE.write_text(json.dumps(list(processed)))
def handle_file(p: Path):
key = file_key(p)
if key in processed:
return # уже обработан
# ... обработка ...
mark_done(key)
6. Веб- и UI-автоматизация
6.1 Playwright (устойчивее Selenium)
# pip install playwright
# python -m playwright install
import asyncio
from playwright.async_api import async_playwright
async def scrape_title(url: str) -> str:
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
page = await browser.new_page()
await page.goto(url, wait_until="domcontentloaded")
title = await page.title()
await browser.close()
return title
if __name__ == "__main__":
print(asyncio.run(scrape_title("https://example.com")))
6.2 Автотесты для автоматизации (stability first)
- Всегда используйте явные ожидания (
page.wait_for_selector/locator). - Снимайте HAR/скриншоты/видео при падениях.
- Изолируйте состояние (userDataDir в отдельной папке на ран).
7. Сетевые задачи и интеграции
7.1 Асинхронные пайпы с httpx + asyncio.TaskGroup
# Python 3.11+
import asyncio
import httpx
async def fetch(client: httpx.AsyncClient, url: str) -> bytes:
r = await client.get(url)
r.raise_for_status()
return r.content
async def main(urls: list[str]):
async with httpx.AsyncClient(timeout=10.0) as client:
results: dict[str, bytes] = {}
async with asyncio.TaskGroup() as tg:
tasks = {url: tg.create_task(fetch(client, url)) for url in urls}
for url, task in tasks.items():
results[url] = task.result()
return results
if __name__ == "__main__":
data = asyncio.run(main(["https://example.com", "https://httpbin.org/get"]))
print({k: len(v) for k in data})
7.2 Подпись запросов/секреты
- Секреты в переменных окружения/Secret Manager.
- Подпись HMAC:
import hmac, hashlib, base64
def sign(payload: bytes, secret: bytes) -> str:
mac = hmac.new(secret, payload, hashlib.sha256).digest()
return base64.b64encode(mac).decode()
8. Параллелизм и производительность
8.1 CPU-bound: ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from math import sqrt
def heavy(n: int) -> float:
return sum(sqrt(i) for i in range(n))
def run():
with ProcessPoolExecutor() as ex:
return list(ex.map(heavy, [10_000_00, 12_000_00, 15_000_00]))
8.2 I/O-bound: asyncio + ограничение параллелизма
import asyncio, httpx
async def bounded_fetch(urls: list[str], limit: int = 50):
sem = asyncio.Semaphore(limit)
async with httpx.AsyncClient() as client:
async def one(u: str):
async with sem:
r = await client.get(u)
r.raise_for_status()
return r.text
return await asyncio.gather(*[one(u) for u in urls], return_exceptions=False)
8.3 Кэширование и дедупликация
- Локальный кэш (
diskcache,functools.lru_cache). - Контент-адресное хранилище: ключ — SHA256 содержимого.
9. Обработка e-mail/документов
9.1 Парсинг писем (imaplib + mail-parser)
# pip install mail-parser
import imaplib, email
from mailparser import mailparser
def fetch_unseen(host, user, password):
m = imaplib.IMAP4_SSL(host)
m.login(user, password)
m.select("INBOX")
typ, data = m.search(None, '(UNSEEN)')
for num in data[0].split():
typ, raw = m.fetch(num, '(RFC822)')
msg = email.message_from_bytes(raw[0][1])
parsed = mailparser.parse_from_bytes(raw[0][1])
yield parsed
m.close(); m.logout()
9.2 Правила-автоматы
- Определите декларативные правила (YAML/JSON).
- Маппинг правило → действие: «если from содержит X и subject содержит Y — сохранить вложения и создать тикет».
10. Тестирование и качество
10.1 Pytest с временной средой
# pip install pytest
import pytest
from pathlib import Path
@pytest.fixture
def tmp(tmp_path: Path):
(tmp_path / "in").mkdir()
(tmp_path / "out").mkdir()
return tmp_path
def test_pipeline(tmp: Path):
# подготовка
# вызов
# проверки
assert True
10.2 Snapshot testing (pytest-approvaltests/pytest-regressions)
Стабилизируйте выводы CLI и файловые артефакты; для нестабильных полей используйте нормализацию.
10.3 Property-based (Hypothesis)
# pip install hypothesis
from hypothesis import given, strategies as st
@given(st.text())
def test_idempotent(s: str):
out1 = normalize(s)
out2 = normalize(out1)
assert out1 == out2
11. Деплой и эксплуатация
11.1 Dockerfile (тонкий, кэшируемый)
# syntax=docker/dockerfile:1.7
FROM python:3.12-slim AS base
ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 PIP_NO_CACHE_DIR=1
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
curl ca-certificates gcc && rm -rf /var/lib/apt/lists/*
COPY pyproject.toml poetry.lock ./
RUN pip install --upgrade pip && pip install poetry && poetry config virtualenvs.create false && poetry install --only main
COPY automation ./automation
ENTRYPOINT ["python", "-m", "automation.cli"]
11.2 systemd unit (для on-prem)
[Unit]
Description=Automation Service
After=network.target
[Service]
User=svc-automation
Group=svc-automation
WorkingDirectory=/opt/automation
ExecStart=/usr/bin/python -m automation.cli
Restart=always
RestartSec=3
Environment=APP_DB_URL=postgres://...
[Install]
WantedBy=multi-user.target
11.3 Healthchecks и алерты
- HTTP
/healthz(FastAPI/Starlette) или «heartbeat»-файл с обновлением таймстампа. - Алерты по метрикам: error rate, latency p95/p99, backpressure (длина очереди), лаг расписания.
12. Паттерны надёжности
- Outbox/Inbox: записывайте событие в БД в той же транзакции, а потом доставляйте его отдельно, помечая offset.
- Идемпотентные ключи:
operation_id→ таблица «выполнено». - Distributed lock: Redis Redlock/DB advisory locks (PostgreSQL pg_advisory_lock).
- Частичная деградация: если интеграция падает — пропускайте шаг и логируйте, не валите весь пайплайн.
- Чекпоинты: сохраняйте прогресс, чтобы перезапуск продолжал с последней точки.
13. Безопасность
- Минимизируйте права: отдельный сервисный аккаунт, только необходимые пермишены.
- Валидация входа и схемы: Pydantic моделей, строгая десериализация.
- Политика секретов: только через окружение/Secret Manager, периодическая ротация.
- Лимиты на внешние вызовы: rate limiting/token bucket, circuit breaker (pybreaker).
14. Сквозной пример: мини-оркестратор обработки изображений
Функции:
- Мониторинг входной папки.
- Параллельное сжатие и выгрузка в S3-совместимый storage.
- Ретраи, метрики, структурные логи, dry-run.
# pip install pillow httpx structlog prometheus-client boto3 tenacity watchdog anyio
import anyio, asyncio, time, io
from pathlib import Path
from PIL import Image
import boto3, structlog, httpx
from tenacity import retry, wait_exponential_jitter, stop_after_attempt
from prometheus_client import start_http_server, Counter, Histogram
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
log = structlog.get_logger()
JOBS = Counter("img_jobs_total", "jobs", ["status"])
LAT = Histogram("img_latency", "latency")
s3 = boto3.client("s3") # creds из окружения
BUCKET = "images-bucket"
IN = Path("in"); OUT = Path("out"); OUT.mkdir(exist_ok=True)
def compress(img_bytes: bytes, max_w=1600, quality=85) -> bytes:
im = Image.open(io.BytesIO(img_bytes))
im.thumbnail((max_w, max_w*10_000), Image.Resampling.LANCZOS)
buf = io.BytesIO()
im.save(buf, format="JPEG", optimize=True, quality=quality)
return buf.getvalue()
@retry(wait=wait_exponential_jitter(0.2, 3), stop=stop_after_attempt(5), reraise=True)
def upload_s3(key: str, data: bytes):
s3.put_object(Bucket=BUCKET, Key=key, Body=data, ContentType="image/jpeg")
async def handle_file(path: Path):
t0 = time.perf_counter()
try:
img = path.read_bytes()
out = await anyio.to_thread.run_sync(compress, img)
key = f"{path.stem}.jpg"
await anyio.to_thread.run_sync(upload_s3, key, out)
(OUT / key).write_bytes(out)
JOBS.labels("ok").inc()
log.info("done", src=str(path), key=key, size=len(out))
except Exception as e:
JOBS.labels("err").inc()
log.exception("fail", src=str(path), error=str(e))
finally:
LAT.observe(time.perf_counter() - t0)
class Handler(FileSystemEventHandler):
def __init__(self, nursery): self.nursery = nursery
def on_created(self, event):
if not event.is_directory:
self.nursery.start_soon(handle_file, Path(event.src_path))
async def main():
start_http_server(9200)
IN.mkdir(exist_ok=True)
async with anyio.create_task_group() as tg:
obs = Observer()
obs.schedule(Handler(tg), IN, recursive=False)
obs.start()
try:
await anyio.Event().wait()
finally:
obs.stop(); obs.join()
if __name__ == "__main__":
anyio.run(main)
15. CI/CD и качество кода
- pre-commit: black, ruff, mypy, bandit, detect-secrets.
- GitHub Actions: матрица версий Python, кэширование зависимостей, публикация Docker-образа.
- SAST/DAST сканирование (CodeQL/Trivy), SBOM (syft), подпись образов (cosign).
Пример pre-commit-config.yaml:
repos:
- repo: https://github.com/psf/black
rev: 24.8.0
hooks: [{id: black}]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.9
hooks: [{id: ruff}]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.11.2
hooks: [{id: mypy}]
- repo: https://github.com/PyCQA/bandit
rev: 1.7.9
hooks: [{id: bandit}]
16. Набор «боевых» рецептов
- Массовая миграция данных: батчи, контрольная сумма, лог смещений, идемпотентность на уровне ключей.
- Ротация логов/архив:
logging.handlers.RotatingFileHandlerили отдавайте всё в stdout и ротацию делегируйте контейнерной среде. - Генерация отчётов: шаблоны Jinja2 + WeasyPrint/ReportLab; складывайте артефакты в S3 с префиксами дат.
- Инвентаризация: периодический обход API/файловых деревьев, запись «срезов» в паркет/duckdb, дельты через хэши.
17. Чек-лист перед продом
- Идемпотентность подтверждена тестами.
- Наблюдаемость: логи/метрики/трейсы подключены, есть дашборды и алерты.
- Ретраи + таймауты + дедлайны — везде, где I/O.
- Конфигурация из окружения, секреты вне кода.
- Контейнер/пакет воспроизводимы; SBOM/вендоринг критичных зависимостей.
- Плейбуки инцидентов и runbook для ручных сценариев.
+1
+1
5
+1
+1
+1



