Практический гайд по автоматизации процессов на Python

Вот подробный практический гайд по автоматизации процессов на Python для продвинутых разработчиков. Он фокусируется на промышленном уровне качества: архитектура, надёжность, наблюдаемость, упаковка и деплой. В каждом разделе — конкретные паттерны и готовые фрагменты кода.

1. Принципы промышленной автоматизации

  • Идемпотентность: повторный запуск не должен ломать состояние. Используйте ключи идемпотентности, блокировки и проверку «already done».
  • Наблюдаемость: структурированные логи, метрики, трассировки. Мин. набор — structlogprometheus_clientopentelemetry.
  • Контроль побочных эффектов: явные контракты для I/O (файлы, БД, сети), dry-run режим.
  • Отказоустойчивость: экспоненциальные ретраи с джиттером, дедлайны, таймауты, отмена (cancellation).
  • Конфигурация: pydantic-settings/dynaconf/tomllib, секреты извне (env/Secrets Manager), без хардкода.
  • Распараллеливание: asyncio для I/O, multiprocessing/concurrent.futures для CPU, очереди задач.
  • Развёртывание: единый артефакт (Docker/ZipApp), версионирование, миграции, healthchecks.
  • Тестирование: мокайте внешние системы, используйте snapshot/golden-тесты, property-based тесты (Hypothesis).

2. Базовые строительные блоки

2.1 Структурированные логи и метрики

# pip install structlog prometheus-client
import time
import structlog
from prometheus_client import Counter, Histogram, start_http_server

structlog.configure(processors=[
    structlog.processors.add_log_level,
    structlog.processors.TimeStamper(fmt="iso"),
    structlog.processors.JSONRenderer()
])
log = structlog.get_logger()

REQUESTS = Counter("jobs_total", "Total jobs processed", ["status"])
LATENCY = Histogram("job_latency_seconds", "Job latency")

def process_job(job_id: str):
    start = time.perf_counter()
    try:
        # ... work ...
        REQUESTS.labels("ok").inc()
        log.info("job_done", job_id=job_id)
    except Exception as e:
        REQUESTS.labels("err").inc()
        log.exception("job_failed", job_id=job_id, error=str(e))
        raise
    finally:
        LATENCY.observe(time.perf_counter() - start)

if __name__ == "__main__":
    start_http_server(9100)  # метрики на :9100/metrics
    process_job("42")

2.2 Конфигурация через Pydantic Settings

# pip install pydantic-settings
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="APP_", env_file=".env", extra="ignore")

    db_url: str
    workers: int = 8
    dry_run: bool = False

settings = Settings()

2.3 Надёжные ретраи и таймауты

# pip install tenacity
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type

@retry(
    retry=retry_if_exception_type((httpx.ReadTimeout, httpx.ConnectError)),
    wait=wait_exponential_jitter(initial=0.2, max=5.0),
    stop=stop_after_attempt(5),
    reraise=True,
)
def fetch_json(url: str) -> dict:
    with httpx.Client(timeout=5.0) as client:
        r = client.get(url)
        r.raise_for_status()
        return r.json()

3. Архитектура CLI-инструмента (Typer + asyncio)

3.1 Каркас проекта

automation/
  pyproject.toml
  automation/
    __init__.py
    cli.py
    config.py
    log.py
    workflows/
      __init__.py
      images.py
      crawl.py
      email_rules.py
    utils/
      fs.py
      net.py
      time.py
  tests/
    test_images.py

3.2 Typer CLI с асинхронными командами

# pip install typer[all] httpx anyio
import anyio
import typer
import httpx
from typing import Annotated

app = typer.Typer(add_completion=False)

async def _download_one(client: httpx.AsyncClient, url: str, out: str):
    r = await client.get(url)
    r.raise_for_status()
    with open(out, "wb") as f:
        f.write(r.content)

@app.command()
def download(
    url: Annotated[str, typer.Argument(help="URL ресурса")],
    out: Annotated[str, typer.Option("--out", "-o", help="Путь вывода")]
):
    async def runner():
        async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
            await _download_one(client, url, out)
    anyio.run(runner)

if __name__ == "__main__":
    app()

4. Оркестрация задач и планирование

4.1 APScheduler (локальный планировщик)

# pip install apscheduler
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler

async def sync_folder():
    # ... ваша логика синхронизации ...
    print("sync!")

async def main():
    scheduler = AsyncIOScheduler()
    scheduler.add_job(sync_folder, "cron", minute="*/5")  # каждые 5 минут
    scheduler.start()
    try:
        await asyncio.Event().wait()  # держим цикл
    finally:
        scheduler.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

4.2 Очереди задач: Celery (для распределённой нагрузки)

# pip install celery[redis]
from celery import Celery

app = Celery("auto", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1")

@app.task(acks_late=True, autoretry_for=(Exception,), retry_backoff=2, retry_jitter=True, max_retries=5)
def resize_image(path: str, out: str, width: int):
    from PIL import Image
    img = Image.open(path)
    img.thumbnail((width, width*10_000), Image.Resampling.LANCZOS)
    img.save(out)

4.3 Durable workflows (Temporal)

Если нужны долгоживущие саги/компенсации — рассмотрите Temporal (Python SDK). Плюсы: гарантия исполнения, таймауты, ретраи, версии воркфлоу, компенсационные шаги.

Паттерн саги:

  • Оркестратор вызывает шаги A, B, C.
  • При неуспехе C вызываются компенсации B⁻¹, A⁻¹.

5. Файловая автоматизация

5.1 Безопасная работа с subprocess

import subprocess
from pathlib import Path

def run_cmd(cmd: list[str], cwd: Path|None=None, timeout: float=30.0) -> str:
    res = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, timeout=timeout, check=False)
    if res.returncode != 0:
        raise RuntimeError(f"cmd failed: {' '.join(cmd)}\nstdout={res.stdout}\nstderr={res.stderr}")
    return res.stdout

5.2 Наблюдение за директорией (watchdog)

# pip install watchdog
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path
import time

class Handler(FileSystemEventHandler):
    def on_created(self, event):
        if not event.is_directory:
            print("new file:", event.src_path)

def watch(path: str):
    obs = Observer()
    obs.schedule(Handler(), Path(path), recursive=False)
    obs.start()
    try:
        while True:
            time.sleep(1)
    finally:
        obs.stop()
        obs.join()

5.3 Паттерн идемпотентной обработки файлов

from pathlib import Path
import hashlib, json

STATE = Path(".state/processed.json")
STATE.parent.mkdir(exist_ok=True)
if STATE.exists():
    processed = set(json.loads(STATE.read_text()))
else:
    processed = set()

def file_key(p: Path) -> str:
    h = hashlib.sha256()
    h.update(p.read_bytes())
    return h.hexdigest()

def mark_done(key: str):
    processed.add(key)
    STATE.write_text(json.dumps(list(processed)))

def handle_file(p: Path):
    key = file_key(p)
    if key in processed:
        return  # уже обработан
    # ... обработка ...
    mark_done(key)

6. Веб- и UI-автоматизация

6.1 Playwright (устойчивее Selenium)

# pip install playwright
# python -m playwright install
import asyncio
from playwright.async_api import async_playwright

async def scrape_title(url: str) -> str:
    async with async_playwright() as pw:
        browser = await pw.chromium.launch(headless=True)
        page = await browser.new_page()
        await page.goto(url, wait_until="domcontentloaded")
        title = await page.title()
        await browser.close()
        return title

if __name__ == "__main__":
    print(asyncio.run(scrape_title("https://example.com")))

6.2 Автотесты для автоматизации (stability first)

  • Всегда используйте явные ожидания (page.wait_for_selector / locator).
  • Снимайте HAR/скриншоты/видео при падениях.
  • Изолируйте состояние (userDataDir в отдельной папке на ран).

7. Сетевые задачи и интеграции

7.1 Асинхронные пайпы с httpx + asyncio.TaskGroup

# Python 3.11+
import asyncio
import httpx

async def fetch(client: httpx.AsyncClient, url: str) -> bytes:
    r = await client.get(url)
    r.raise_for_status()
    return r.content

async def main(urls: list[str]):
    async with httpx.AsyncClient(timeout=10.0) as client:
        results: dict[str, bytes] = {}
        async with asyncio.TaskGroup() as tg:
            tasks = {url: tg.create_task(fetch(client, url)) for url in urls}
        for url, task in tasks.items():
            results[url] = task.result()
        return results

if __name__ == "__main__":
    data = asyncio.run(main(["https://example.com", "https://httpbin.org/get"]))
    print({k: len(v) for k in data})

7.2 Подпись запросов/секреты

  • Секреты в переменных окружения/Secret Manager.
  • Подпись HMAC:
import hmac, hashlib, base64

def sign(payload: bytes, secret: bytes) -> str:
    mac = hmac.new(secret, payload, hashlib.sha256).digest()
    return base64.b64encode(mac).decode()

8. Параллелизм и производительность

8.1 CPU-bound: ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
from math import sqrt

def heavy(n: int) -> float:
    return sum(sqrt(i) for i in range(n))

def run():
    with ProcessPoolExecutor() as ex:
        return list(ex.map(heavy, [10_000_00, 12_000_00, 15_000_00]))

8.2 I/O-bound: asyncio + ограничение параллелизма

import asyncio, httpx

async def bounded_fetch(urls: list[str], limit: int = 50):
    sem = asyncio.Semaphore(limit)
    async with httpx.AsyncClient() as client:
        async def one(u: str):
            async with sem:
                r = await client.get(u)
                r.raise_for_status()
                return r.text
        return await asyncio.gather(*[one(u) for u in urls], return_exceptions=False)

8.3 Кэширование и дедупликация

  • Локальный кэш (diskcachefunctools.lru_cache).
  • Контент-адресное хранилище: ключ — SHA256 содержимого.

9. Обработка e-mail/документов

9.1 Парсинг писем (imaplib + mail-parser)

# pip install mail-parser
import imaplib, email
from mailparser import mailparser

def fetch_unseen(host, user, password):
    m = imaplib.IMAP4_SSL(host)
    m.login(user, password)
    m.select("INBOX")
    typ, data = m.search(None, '(UNSEEN)')
    for num in data[0].split():
        typ, raw = m.fetch(num, '(RFC822)')
        msg = email.message_from_bytes(raw[0][1])
        parsed = mailparser.parse_from_bytes(raw[0][1])
        yield parsed
    m.close(); m.logout()

9.2 Правила-автоматы

  • Определите декларативные правила (YAML/JSON).
  • Маппинг правило → действие: «если from содержит X и subject содержит Y — сохранить вложения и создать тикет».

10. Тестирование и качество

10.1 Pytest с временной средой

# pip install pytest
import pytest
from pathlib import Path

@pytest.fixture
def tmp(tmp_path: Path):
    (tmp_path / "in").mkdir()
    (tmp_path / "out").mkdir()
    return tmp_path

def test_pipeline(tmp: Path):
    # подготовка
    # вызов
    # проверки
    assert True

10.2 Snapshot testing (pytest-approvaltests/pytest-regressions)

Стабилизируйте выводы CLI и файловые артефакты; для нестабильных полей используйте нормализацию.

10.3 Property-based (Hypothesis)

# pip install hypothesis
from hypothesis import given, strategies as st

@given(st.text())
def test_idempotent(s: str):
    out1 = normalize(s)
    out2 = normalize(out1)
    assert out1 == out2

11. Деплой и эксплуатация

11.1 Dockerfile (тонкий, кэшируемый)

# syntax=docker/dockerfile:1.7
FROM python:3.12-slim AS base
ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 PIP_NO_CACHE_DIR=1
WORKDIR /app

RUN apt-get update && apt-get install -y --no-install-recommends \
    curl ca-certificates gcc && rm -rf /var/lib/apt/lists/*

COPY pyproject.toml poetry.lock ./
RUN pip install --upgrade pip && pip install poetry && poetry config virtualenvs.create false && poetry install --only main

COPY automation ./automation
ENTRYPOINT ["python", "-m", "automation.cli"]

11.2 systemd unit (для on-prem)

[Unit]
Description=Automation Service
After=network.target

[Service]
User=svc-automation
Group=svc-automation
WorkingDirectory=/opt/automation
ExecStart=/usr/bin/python -m automation.cli
Restart=always
RestartSec=3
Environment=APP_DB_URL=postgres://...

[Install]
WantedBy=multi-user.target

11.3 Healthchecks и алерты

  • HTTP /healthz (FastAPI/Starlette) или «heartbeat»-файл с обновлением таймстампа.
  • Алерты по метрикам: error rate, latency p95/p99, backpressure (длина очереди), лаг расписания.

12. Паттерны надёжности

  • Outbox/Inbox: записывайте событие в БД в той же транзакции, а потом доставляйте его отдельно, помечая offset.
  • Идемпотентные ключи: operation_id → таблица «выполнено».
  • Distributed lock: Redis Redlock/DB advisory locks (PostgreSQL pg_advisory_lock).
  • Частичная деградация: если интеграция падает — пропускайте шаг и логируйте, не валите весь пайплайн.
  • Чекпоинты: сохраняйте прогресс, чтобы перезапуск продолжал с последней точки.

13. Безопасность

  • Минимизируйте права: отдельный сервисный аккаунт, только необходимые пермишены.
  • Валидация входа и схемы: Pydantic моделей, строгая десериализация.
  • Политика секретов: только через окружение/Secret Manager, периодическая ротация.
  • Лимиты на внешние вызовы: rate limiting/token bucket, circuit breaker (pybreaker).

14. Сквозной пример: мини-оркестратор обработки изображений

Функции:

  • Мониторинг входной папки.
  • Параллельное сжатие и выгрузка в S3-совместимый storage.
  • Ретраи, метрики, структурные логи, dry-run.
# pip install pillow httpx structlog prometheus-client boto3 tenacity watchdog anyio
import anyio, asyncio, time, io
from pathlib import Path
from PIL import Image
import boto3, structlog, httpx
from tenacity import retry, wait_exponential_jitter, stop_after_attempt
from prometheus_client import start_http_server, Counter, Histogram
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

log = structlog.get_logger()
JOBS = Counter("img_jobs_total", "jobs", ["status"])
LAT = Histogram("img_latency", "latency")

s3 = boto3.client("s3")  # creds из окружения
BUCKET = "images-bucket"
IN = Path("in"); OUT = Path("out"); OUT.mkdir(exist_ok=True)

def compress(img_bytes: bytes, max_w=1600, quality=85) -> bytes:
    im = Image.open(io.BytesIO(img_bytes))
    im.thumbnail((max_w, max_w*10_000), Image.Resampling.LANCZOS)
    buf = io.BytesIO()
    im.save(buf, format="JPEG", optimize=True, quality=quality)
    return buf.getvalue()

@retry(wait=wait_exponential_jitter(0.2, 3), stop=stop_after_attempt(5), reraise=True)
def upload_s3(key: str, data: bytes):
    s3.put_object(Bucket=BUCKET, Key=key, Body=data, ContentType="image/jpeg")

async def handle_file(path: Path):
    t0 = time.perf_counter()
    try:
        img = path.read_bytes()
        out = await anyio.to_thread.run_sync(compress, img)
        key = f"{path.stem}.jpg"
        await anyio.to_thread.run_sync(upload_s3, key, out)
        (OUT / key).write_bytes(out)
        JOBS.labels("ok").inc()
        log.info("done", src=str(path), key=key, size=len(out))
    except Exception as e:
        JOBS.labels("err").inc()
        log.exception("fail", src=str(path), error=str(e))
    finally:
        LAT.observe(time.perf_counter() - t0)

class Handler(FileSystemEventHandler):
    def __init__(self, nursery): self.nursery = nursery
    def on_created(self, event):
        if not event.is_directory:
            self.nursery.start_soon(handle_file, Path(event.src_path))

async def main():
    start_http_server(9200)
    IN.mkdir(exist_ok=True)
    async with anyio.create_task_group() as tg:
        obs = Observer()
        obs.schedule(Handler(tg), IN, recursive=False)
        obs.start()
        try:
            await anyio.Event().wait()
        finally:
            obs.stop(); obs.join()

if __name__ == "__main__":
    anyio.run(main)

15. CI/CD и качество кода

  • pre-commit: black, ruff, mypy, bandit, detect-secrets.
  • GitHub Actions: матрица версий Python, кэширование зависимостей, публикация Docker-образа.
  • SAST/DAST сканирование (CodeQL/Trivy), SBOM (syft), подпись образов (cosign).

Пример pre-commit-config.yaml:

repos:
- repo: https://github.com/psf/black
  rev: 24.8.0
  hooks: [{id: black}]
- repo: https://github.com/astral-sh/ruff-pre-commit
  rev: v0.6.9
  hooks: [{id: ruff}]
- repo: https://github.com/pre-commit/mirrors-mypy
  rev: v1.11.2
  hooks: [{id: mypy}]
- repo: https://github.com/PyCQA/bandit
  rev: 1.7.9
  hooks: [{id: bandit}]

16. Набор «боевых» рецептов

  • Массовая миграция данных: батчи, контрольная сумма, лог смещений, идемпотентность на уровне ключей.
  • Ротация логов/архив: logging.handlers.RotatingFileHandler или отдавайте всё в stdout и ротацию делегируйте контейнерной среде.
  • Генерация отчётов: шаблоны Jinja2 + WeasyPrint/ReportLab; складывайте артефакты в S3 с префиксами дат.
  • Инвентаризация: периодический обход API/файловых деревьев, запись «срезов» в паркет/duckdb, дельты через хэши.

17. Чек-лист перед продом

  • Идемпотентность подтверждена тестами.
  • Наблюдаемость: логи/метрики/трейсы подключены, есть дашборды и алерты.
  • Ретраи + таймауты + дедлайны — везде, где I/O.
  • Конфигурация из окружения, секреты вне кода.
  • Контейнер/пакет воспроизводимы; SBOM/вендоринг критичных зависимостей.
  • Плейбуки инцидентов и runbook для ручных сценариев.
+1
0
+1
5
+1
0
+1
0
+1
0

Ответить

Ваш адрес email не будет опубликован. Обязательные поля помечены *