1. Книги
  2. Программирование
  3. Джейд Картер

Библиотеки Python Часть 2. Практическое применение

Джейд Картер (2025)
Обложка книги

От анализа больших данных и машинного обучения до автоматизации рутинных процессов и создания интерактивных визуализаций — эта часть станет вашим практическим путеводителем. Вы узнаете, как распределенно обрабатывать данные с помощью Dask и PySpark, строить динамические дашборды с Plotly и Dash, оптимизировать производительность моделей с Cython, и разрабатывать высоконагруженные приложения с использованием Asyncio и CUDA. Кроме того, особое внимание уделено автоматизации задач, включая парсинг данных, обработку документов и создание рабочих процессов с Airflow. Визуализация геоданных, работа с изображениями и звуком, а также современные подходы к тестированию и развертыванию приложений помогут вам интегрировать Python в самые разнообразные проекты. Эта часть предназначена для разработчиков, стремящихся расширить свои навыки и внедрять Python в практические сферы, требующие высокую производительность, автоматизацию и гибкость.

Оглавление

Купить книгу

Приведённый ознакомительный фрагмент книги «Библиотеки Python Часть 2. Практическое применение» предоставлен нашим книжным партнёром — компанией ЛитРес.

Купить и скачать полную версию книги в форматах FB2, ePub, MOBI, TXT, HTML, RTF и других

Глава 1. Работа с большими данными

1.1 Распределенная обработка данных с Dask и PySpark

Работа с большими объемами данных требует инструментов, которые позволяют эффективно распределять вычисления между несколькими процессорами или даже серверами. Python предлагает две мощные библиотеки для таких задач — Dask и PySpark. Каждая из них разработана для обработки больших данных, но они имеют свои уникальные особенности и подходы. Разберем их по отдельности, чтобы понять, как их использовать, и приведем примеры.

Dask: инструмент для масштабирования локальных задач

Dask — это библиотека, которая позволяет расширить вычисления на вашем компьютере, эффективно распределяя их между ядрами процессора или несколькими машинами в кластере. Она идеально подходит для тех случаев, когда объем данных превышает доступную оперативную память, но вы хотите сохранить гибкость работы с Python.

Основные особенности Dask:

1. Dask совместим с большинством популярных библиотек Python, таких как Pandas, NumPy и Scikit-learn.

2. Он поддерживает ленивые вычисления: операции выполняются только при необходимости.

3. Dask позволяет работать как с массивами данных (аналог NumPy), так и с таблицами (аналог Pandas).

Пример использования Dask для обработки данных:

Предположим, у нас есть большой CSV-файл с данными о продажах. Его объем превышает объем оперативной памяти, поэтому обычные инструменты, такие как Pandas, не могут загрузить файл целиком.

```python

import dask.dataframe as dd

# Загрузка большого CSV-файла с помощью Dask

df = dd.read_csv('sales_data_large.csv')

# Выполнение простых операций (например, фильтрация по значению)

filtered_df = df[df['sales'] > 1000]

# Группировка и вычисление суммарных продаж

sales_summary = filtered_df.groupby('region')['sales'].sum()

# Выполнение вычислений (операции"ленивые", пока мы не вызовем.compute())

result = sales_summary.compute()

# Вывод результатов

print(result)

```

Объяснение кода:

1. `dd.read_csv()`: Вместо загрузки всего файла в память, Dask загружает его частями (по"чанкам").

2. Ленивые вычисления: Все операции, такие как фильтрация и группировка, откладываются до вызова `compute()`.

3. Параллельное выполнение: Dask автоматически распределяет работу между всеми доступными ядрами процессора.

Когда использовать Dask:

— Когда ваши данные не помещаются в память.

— Когда вы уже используете библиотеки Python, такие как Pandas или NumPy, и хотите масштабировать их.

— Когда вам нужно быстро настроить распределенные вычисления на одной или нескольких машинах.

PySpark: инструмент для кластерного вычисления

PySpark — это Python-интерфейс для Apache Spark, платформы, разработанной специально для обработки больших данных. Spark работает на кластерах, что позволяет масштабировать вычисления до сотен машин.

PySpark особенно популярен в случаях, когда данные хранятся в распределенных системах, таких как HDFS или Amazon S3.

Основные особенности PySpark:

1. PySpark работает с данными в формате **RDD** (Resilient Distributed Dataset) или DataFrame.

2. Он поддерживает широкий спектр операций, включая трансформации данных, машинное обучение и потоковую обработку.

3. PySpark интегрируется с Hadoop и другими системами для хранения больших данных.

Пример использования PySpark для обработки данных:

Допустим, у нас есть большие данные о транзакциях, хранящиеся в формате CSV, и мы хотим вычислить среднее значение транзакций по каждому клиенту.

```python

from pyspark.sql import SparkSession

# Создаем сессию Spark

spark = SparkSession.builder.appName("TransactionAnalysis").getOrCreate()

# Читаем данные из CSV-файла

df = spark.read.csv('transactions_large.csv', header=True, inferSchema=True)

# Выполняем трансформации данных

# 1. Фильтрация транзакций с нулевой суммой

filtered_df = df.filter(df['amount'] > 0)

# 2. Группировка по клиенту и вычисление среднего значения

average_transactions = filtered_df.groupBy('customer_id').avg('amount')

# Показ результатов

average_transactions.show()

# Останавливаем Spark-сессию

spark.stop()

```

Объяснение кода:

1. Создание SparkSession: Это точка входа для работы с PySpark.

2. `spark.read.csv()`: Загружаем данные в формате DataFrame, который поддерживает SQL-подобные операции.

3. Трансформации: Операции, такие как фильтрация и группировка, выполняются параллельно на всех узлах кластера.

4. Результат: PySpark возвращает распределенные данные, которые можно сохранить или преобразовать.

Когда использовать PySpark:

— Когда вы работаете с кластерами и хотите обрабатывать данные на нескольких машинах.

— Когда данные хранятся в распределенных системах, таких как HDFS или Amazon S3.

— Когда нужно интегрировать обработку данных с экосистемой Hadoop.

Сравнение Dask и PySpark

И Dask, и PySpark являются эффективными инструментами для распределенной обработки данных. Выбор между ними зависит от ваших требований. Если вы работаете с данными, которые не помещаются в оперативную память, но ваши вычисления выполняются на одном компьютере, Dask будет лучшим выбором. Если же вы имеете дело с огромными объемами данных, распределенными по нескольким машинам, то PySpark станет незаменимым инструментом.

Обе библиотеки позволяют решать задачи, которые ранее казались невозможными из-за ограничений памяти или производительности, и они помогут вам эффективно работать с данными любого масштаба.

Задачи для практики

Задачи для Dask

Задача 1: Обработка большого CSV-файла

Описание: У вас есть CSV-файл размером 10 ГБ с данными о продажах. Вам нужно вычислить общую сумму продаж по регионам, но файл слишком большой для работы в Pandas.

Решение:

```python

import dask.dataframe as dd

# Загрузка большого CSV-файла

df = dd.read_csv('sales_data_large.csv')

# Проверка структуры данных

print(df.head()) # Показываем первые строки

# Группировка по регионам и подсчет общей суммы продаж

sales_by_region = df.groupby('region')['sales'].sum()

# Выполнение вычислений

result = sales_by_region.compute()

print(result)

```

Объяснение:

— `dd.read_csv` позволяет загружать файлы большего объема, чем объем оперативной памяти.

— `compute` выполняет ленивые вычисления.

Задача 2: Преобразование данных в формате JSON

Описание: Дан файл в формате JSON, содержащий информацию о транзакциях. Необходимо отфильтровать транзакции с суммой менее 1000 и сохранить отфильтрованные данные в новый CSV-файл.

Решение:

```python

import dask.dataframe as dd

# Загрузка JSON-файла

df = dd.read_json('transactions_large.json')

# Фильтрация данных

filtered_df = df[df['amount'] >= 1000]

# Сохранение результатов в новый CSV-файл

filtered_df.to_csv('filtered_transactions_*.csv', index=False)

print("Данные сохранены в файлы CSV.")

```

Объяснение:

— Dask автоматически разбивает данные на части, сохраняя их в несколько CSV-файлов.

— Фильтрация выполняется параллельно.

Задачи для PySpark

Задача 3: Анализ логов

Описание: Имеется файл логов сервера (формат CSV). Ваша задача — подсчитать количество ошибок (строки с `status ="ERROR"`) и вывести их общее количество.

Решение:

```python

from pyspark.sql import SparkSession

# Создаем сессию Spark

spark = SparkSession.builder.appName("LogAnalysis").getOrCreate()

# Загрузка данных из CSV-файла

df = spark.read.csv('server_logs.csv', header=True, inferSchema=True)

# Фильтрация строк с ошибками

errors = df.filter(df['status'] == 'ERROR')

# Подсчет количества ошибок

error_count = errors.count()

print(f"Количество ошибок: {error_count}")

# Завершаем сессию Spark

spark.stop()

```

Объяснение:

— `filter` позволяет выбрать строки с определенным значением.

— `count` подсчитывает количество строк после фильтрации.

Задача 4: Средняя сумма покупок

Описание: Дан CSV-файл с данными о покупках. Ваша задача — вычислить среднюю сумму покупок для каждого клиента.

Решение:

```python

from pyspark.sql import SparkSession

# Создаем сессию Spark

spark = SparkSession.builder.appName("PurchaseAnalysis").getOrCreate()

# Загрузка данных

df = spark.read.csv('purchases.csv', header=True, inferSchema=True)

# Группировка по клиенту и расчет средней суммы покупок

avg_purchases = df.groupBy('customer_id').avg('purchase_amount')

# Показ результатов

avg_purchases.show()

# Завершаем сессию Spark

spark.stop()

```

Объяснение:

— `groupBy` позволяет сгруппировать данные по столбцу.

— `avg` вычисляет среднее значение для каждой группы.

Задача 5: Сортировка больших данных

Описание: У вас есть файл с информацией о транзакциях. Необходимо отсортировать данные по дате транзакции и сохранить результат в новый файл.

Решение:

```python

from pyspark.sql import SparkSession

# Создаем сессию Spark

spark = SparkSession.builder.appName("SortTransactions").getOrCreate()

# Загрузка данных

df = spark.read.csv('transactions_large.csv', header=True, inferSchema=True)

# Сортировка данных по дате

sorted_df = df.orderBy('transaction_date')

# Сохранение отсортированных данных в новый файл

sorted_df.write.csv('sorted_transactions', header=True, mode='overwrite')

print("Данные отсортированы и сохранены.")

# Завершаем сессию Spark

spark.stop()

```

Объяснение:

— `orderBy` сортирует данные по указанному столбцу.

— `write.csv` сохраняет результат в новом файле.

Эти задачи демонстрируют, как использовать Dask и PySpark для работы с большими объемами данных.

— Dask подходит для локальных задач и интеграции с Python-библиотеками.

— PySpark эффективен для кластерной обработки данных и интеграции с экосистемой Hadoop.

Обе библиотеки упрощают решение задач, которые сложно выполнить традиционными методами из-за ограничений памяти или мощности процессора.

1.2 Потоковая обработка данных с Apache Kafka

Apache Kafka — это мощная платформа для обработки потоков данных в реальном времени. Она широко используется для обработки и анализа событий, поступающих из различных источников, таких как веб-серверы, базы данных, датчики IoT, системы мониторинга и многое другое. Kafka обеспечивает высокую производительность, надежность и масштабируемость, что делает её одним из лучших инструментов для потоковой обработки данных.

В основе Apache Kafka лежат несколько ключевых компонентов:

1. Брокеры — серверы, которые принимают, хранят и доставляют данные.

2. Топики — логические каналы, через которые данные передаются.

3. Продюсеры — приложения или устройства, которые отправляют данные в Kafka.

4. Консьюмеры — приложения, которые получают данные из Kafka.

Kafka организует поток данных в виде последовательностей сообщений. Сообщения записываются в топики и разделяются на партиции, что позволяет обрабатывать данные параллельно.

Пример потока данных

Представим, что у нас есть система интернет-магазина, где Kafka используется для обработки событий, таких как заказы, клики на странице, добавление товаров в корзину и платежи. Каждое из этих событий записывается в топик Kafka. Например, топик `orders` может содержать события, описывающие новые заказы.

Установка и настройка Apache Kafka

Перед началом работы убедитесь, что Kafka установлена. Для локальной работы используйте официальные сборки Kafka с сайта [Apache Kafka](https://kafka.apache.org/).

1. Установите Kafka и запустите ZooKeeper, необходимый для управления брокерами.

2. Запустите Kafka-брокер.

3. Создайте топик с помощью команды:

```bash

bin/kafka-topics.sh — create — topic orders — bootstrap-server localhost:9092 — partitions 3 — replication-factor 1

```

Отправка данных в Kafka

Теперь создадим простого продюсера на Python, который будет отправлять данные в топик `orders`. Для работы с Kafka на Python используется библиотека `confluent-kafka`. Установите её с помощью команды:

```bash

pip install confluent-kafka

```

Пример кода, который отправляет сообщения в топик:

```python

from confluent_kafka import Producer

import json

import time

# Настройки продюсера

producer_config = {

'bootstrap.servers': 'localhost:9092' # Адрес Kafka-брокера

}

# Создание продюсера

producer = Producer(producer_config)

# Функция для обратного вызова при успешной отправке сообщения

def delivery_report(err, msg):

if err is not None:

print(f'Ошибка доставки сообщения: {err}')

else:

print(f'Сообщение отправлено: {msg.topic()} [{msg.partition()}]')

# Отправка данных в Kafka

orders = [

{'order_id': 1, 'product': 'Laptop', 'price': 1000},

{'order_id': 2, 'product': 'Phone', 'price': 500},

{'order_id': 3, 'product': 'Headphones', 'price': 150}

]

for order in orders:

producer.produce(

'orders',

key=str(order['order_id']),

value=json.dumps(order),

callback=delivery_report

)

producer.flush() # Отправка сообщений в брокер

time.sleep(1)

```

В этом примере продюсер отправляет JSON-объекты в топик `orders`. Каждое сообщение содержит данные о заказе.

Чтение данных из Kafka

Теперь создадим консьюмера, который будет читать сообщения из топика `orders`.

```python

from confluent_kafka import Consumer, KafkaException

# Настройки консьюмера

consumer_config = {

'bootstrap.servers': 'localhost:9092',

'group.id': 'order-group', # Группа консьюмеров

'auto.offset.reset': 'earliest' # Начало чтения с первой записи

}

# Создание консьюмера

consumer = Consumer(consumer_config)

# Подписка на топик

consumer.subscribe(['orders'])

# Чтение сообщений из Kafka

try:

while True:

msg = consumer.poll(1.0) # Ожидание сообщения (1 секунда)

if msg is None:

continue

if msg.error():

if msg.error().code() == KafkaException._PARTITION_EOF:

# Конец партиции

continue

else:

print(f"Ошибка: {msg.error()}")

break

# Обработка сообщения

print(f"Получено сообщение: {msg.value().decode('utf-8')}")

except KeyboardInterrupt:

print("Завершение работы…")

finally:

# Закрытие консьюмера

consumer.close()

```

В этом примере консьюмер подключается к Kafka, читает сообщения из топика `orders` и выводит их на экран.

Потоковая обработка данных

Kafka часто используется совместно с платформами потоковой обработки, такими как Apache Spark или Apache Flink, для анализа данных в реальном времени. Однако вы также можете обрабатывать данные прямо в Python.

Например, предположим, что мы хотим обработать события из топика `orders` и рассчитать суммарную стоимость всех заказов:

```python

from confluent_kafka import Consumer

import json

# Настройки консьюмера

consumer_config = {

'bootstrap.servers': 'localhost:9092',

'group.id': 'order-sum-group',

'auto.offset.reset': 'earliest'

}

# Создание консьюмера

consumer = Consumer(consumer_config)

consumer.subscribe(['orders'])

# Суммарная стоимость заказов

total_sales = 0

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Обработка сообщения

order = json.loads(msg.value().decode('utf-8'))

total_sales += order['price']

print(f"Обработан заказ: {order['order_id']}, текущая сумма: {total_sales}")

except KeyboardInterrupt:

print(f"Общая сумма всех заказов: {total_sales}")

finally:

consumer.close()

```

Преимущества использования Kafka

1. Высокая производительность. Kafka поддерживает миллионы событий в секунду благодаря своей архитектуре и использованию партиций.

2. Надежность. Данные хранятся в Kafka до тех пор, пока их не обработают все подписчики.

3. Масштабируемость. Kafka легко масштабируется путем добавления новых брокеров.

4. Универсальность. Kafka поддерживает интеграцию с большинством современных инструментов обработки данных.

Apache Kafka предоставляет мощный набор инструментов для потоковой обработки данных. Используя Python, вы можете легко настроить передачу данных, их обработку и анализ в реальном времени. Это особенно полезно для систем, где требуется высокая производительность и минимальная задержка при обработке больших потоков данных.

Задачи для практики

Задача 1: Фильтрация событий по условию

Описание:

У вас есть топик `clickstream`, содержащий события о кликах на веб-сайте. Каждое событие содержит следующие поля:

— `user_id` — идентификатор пользователя.

— `url` — URL-адрес, на который был клик.

— `timestamp` — время клика.

Ваша задача: создать консьюмера, который будет читать события из Kafka, фильтровать только события с URL-адресами, содержащими слово"product", и сохранять их в новый топик `filtered_clicks`.

Решение:

```python

from confluent_kafka import Producer, Consumer

import json

# Настройки Kafka

broker = 'localhost:9092'

# Создание продюсера для записи в новый топик

producer = Producer({'bootstrap.servers': broker})

def produce_filtered_event(event):

producer.produce('filtered_clicks', value=json.dumps(event))

producer.flush()

# Создание консьюмера для чтения из исходного топика

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'clickstream-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['clickstream'])

# Чтение и фильтрация событий

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение из Kafka в Python-объект

event = json.loads(msg.value().decode('utf-8'))

# Фильтруем события с URL, содержащими"product"

if 'product' in event['url']:

print(f"Фильтруем событие: {event}")

produce_filtered_event(event)

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

— Консьюмер читает события из топика `clickstream`.

— Каждое сообщение проверяется на наличие слова"product"в поле `url`.

— Отфильтрованные события отправляются в новый топик `filtered_clicks` через продюсера.

Задача 2: Подсчет количества событий в реальном времени

Описание:

Топик `log_events` содержит логи системы. Каждое сообщение содержит:

— `log_level` (например,"INFO","ERROR","DEBUG").

— `message` (текст лога).

Ваша задача: написать программу, которая считает количество событий уровня"ERROR"в реальном времени и каждые 10 секунд выводит их общее количество.

Решение:

```python

from confluent_kafka import Consumer

import time

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'log-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['log_events'])

error_count = 0

start_time = time.time()

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

log_event = json.loads(msg.value().decode('utf-8'))

# Увеличиваем счетчик, если уровень лога"ERROR"

if log_event['log_level'] == 'ERROR':

error_count += 1

# Каждые 10 секунд выводим текущий счетчик

if time.time() — start_time >= 10:

print(f"Количество ошибок за последние 10 секунд: {error_count}")

error_count = 0

start_time = time.time()

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

— Консьюмер читает события из топика `log_events`.

— Если уровень лога"ERROR", увеличивается счетчик `error_count`.

— Каждые 10 секунд программа выводит количество событий"ERROR"и сбрасывает счетчик.

Задача 3: Агрегация данных по группам

Описание:

Топик `transactions` содержит данные о финансовых транзакциях:

— `user_id` — идентификатор пользователя.

— `amount` — сумма транзакции.

Ваша задача: написать программу, которая подсчитывает общую сумму транзакций для каждого пользователя и выводит результаты в реальном времени.

Решение:

```python

from confluent_kafka import Consumer

import json

from collections import defaultdict

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'transaction-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['transactions'])

# Словарь для хранения сумм по пользователям

user_totals = defaultdict(float)

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

transaction = json.loads(msg.value().decode('utf-8'))

# Обновляем сумму для пользователя

user_id = transaction['user_id']

user_totals[user_id] += transaction['amount']

# Вывод текущих сумм

print(f"Текущая сумма транзакций по пользователям: {dict(user_totals)}")

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

— Консьюмер читает данные из топика `transactions`.

— Для каждого пользователя обновляется сумма его транзакций в словаре `user_totals`.

— Программа выводит текущие суммы по всем пользователям.

Задача 4: Сохранение обработанных данных в файл

Описание:

Топик `sensor_data` содержит данные с датчиков IoT:

— `sensor_id` — идентификатор датчика.

— `temperature` — измеренная температура.

— `timestamp` — время измерения.

Ваша задача: написать программу, которая сохраняет все данные о температуре выше 30°C в файл `high_temp.json`.

Решение:

```python

from confluent_kafka import Consumer

import json

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'sensor-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['sensor_data'])

# Открываем файл для записи

with open('high_temp.json', 'w') as outfile:

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

sensor_data = json.loads(msg.value().decode('utf-8'))

# Сохраняем данные, если температура выше 30°C

if sensor_data['temperature'] > 30:

json.dump(sensor_data, outfile)

outfile.write('\n') # Новый ряд для каждого объекта

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

— Консьюмер читает данные из топика `sensor_data`.

— Данные с температурой выше 30°C записываются в файл `high_temp.json`.

Задача 5: Обнаружение аномалий в данных

Описание:

В топик `temperature_readings` поступают данные о температуре из различных городов:

— `city` — название города.

— `temperature` — измеренная температура.

— `timestamp` — время измерения.

Ваша задача: написать программу, которая будет находить и выводить аномалии — случаи, когда температура превышает 40°C или опускается ниже — 10°C.

Решение:

```python

from confluent_kafka import Consumer

import json

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'temperature-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['temperature_readings'])

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

reading = json.loads(msg.value().decode('utf-8'))

# Проверяем на аномалии

if reading['temperature'] > 40 or reading['temperature'] < — 10:

print(f"Аномалия! Город: {reading['city']}, Температура: {reading['temperature']}°C")

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

— Консьюмер читает данные о температуре из топика.

— Если температура выходит за пределы нормального диапазона, программа выводит сообщение об аномалии.

Задача 6: Потоковое объединение данных

Описание:

Есть два топика:

1. `orders` — содержит данные о заказах: `order_id`, `product_id`, `quantity`.

2. `products` — содержит данные о товарах: `product_id`, `product_name`, `price`.

Ваша задача: написать программу, которая объединяет данные из этих двух топиков и выводит итоговую информацию о каждом заказе, включая название продукта и общую стоимость.

Решение:

```python

from confluent_kafka import Consumer

import json

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмеров для обоих топиков

order_consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'order-group',

'auto.offset.reset': 'earliest'

})

product_consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'product-group',

'auto.offset.reset': 'earliest'

})

order_consumer.subscribe(['orders'])

product_consumer.subscribe(['products'])

# Словарь для хранения данных о товарах

product_catalog = {}

try:

while True:

# Чтение данных из топика products

product_msg = product_consumer.poll(0.1)

if product_msg and not product_msg.error():

product = json.loads(product_msg.value().decode('utf-8'))

product_catalog[product['product_id']] = {

'name': product['product_name'],

'price': product['price']

}

# Чтение данных из топика orders

order_msg = order_consumer.poll(0.1)

if order_msg and not order_msg.error():

order = json.loads(order_msg.value().decode('utf-8'))

product_id = order['product_id']

# Объединение данных о заказе и товаре

if product_id in product_catalog:

product = product_catalog[product_id]

total_price = order['quantity'] * product['price']

print(f"Заказ {order['order_id']}: {product['name']} x {order['quantity']} = {total_price} $")

else:

print(f"Информация о товаре {product_id} отсутствует.")

except KeyboardInterrupt:

print("Завершение работы.")

finally:

order_consumer.close()

product_consumer.close()

```

Объяснение:

— Данные из топика `products` кэшируются в словаре `product_catalog`.

— При чтении заказа из топика `orders` программа объединяет данные и вычисляет итоговую стоимость.

Задача 7: Потоковая обработка с вычислением скользящего среднего

Описание:

В топик `stock_prices` поступают данные о ценах акций:

— `symbol` — тикер акции.

— `price` — текущая цена.

— `timestamp` — время.

Ваша задача: вычислять скользящее среднее цены акции за последние 5 сообщений для каждого тикера.

Решение:

```python

from confluent_kafka import Consumer

import json

from collections import defaultdict, deque

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'stocks-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['stock_prices'])

# Дек для хранения последних цен по тикерам

price_window = defaultdict(lambda: deque(maxlen=5))

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

stock_data = json.loads(msg.value().decode('utf-8'))

# Добавляем цену в окно

symbol = stock_data['symbol']

price_window[symbol].append(stock_data['price'])

# Вычисляем скользящее среднее

moving_average = sum(price_window[symbol]) / len(price_window[symbol])

print(f"Скользящее среднее для {symbol}: {moving_average:.2f}")

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

— Используется `deque` для хранения последних 5 цен.

— Скользящее среднее вычисляется как сумма значений, делённая на их количество.

Задача 8: Генерация уведомлений

Описание:

В топик `user_actions` поступают данные о действиях пользователей:

— `user_id` — идентификатор пользователя.

— `action` — выполненное действие (например,"login","purchase").

Напишите программу, которая отслеживает пользователей, выполнивших вход (`login`), но не совершивших покупку (`purchase`) в течение 10 минут, и отправляет уведомление в топик `notifications`.

Решение:

```python

from confluent_kafka import Consumer, Producer

import json

from datetime import datetime, timedelta

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'user-actions-group',

'auto.offset.reset': 'earliest'

})

producer = Producer({'bootstrap.servers': broker})

consumer.subscribe(['user_actions'])

# Словарь для отслеживания пользователей

user_login_time = {}

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

action = json.loads(msg.value().decode('utf-8'))

user_id = action['user_id']

action_type = action['action']

timestamp = datetime.fromisoformat(action['timestamp'])

if action_type == 'login':

user_login_time[user_id] = timestamp

elif action_type == 'purchase' and user_id in user_login_time:

del user_login_time[user_id]

# Проверяем, прошло ли 10 минут

current_time = datetime.now()

for user, login_time in list(user_login_time.items()):

if current_time — login_time > timedelta(minutes=10):

notification = {'user_id': user, 'message': 'Сделайте покупку!'}

producer.produce('notifications', value=json.dumps(notification))

print(f"Уведомление отправлено для пользователя {user}")

del user_login_time[user]

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

— Время входа пользователей сохраняется в словаре.

— Если с момента входа прошло более 10 минут и покупка не совершена, генерируется уведомление.

Эти задачи показывают, как использовать Apache Kafka для решения реальных задач, таких как фильтрация событий, подсчет статистики, агрегация данных и сохранение обработанной информации. Эти примеры помогут вам освоить основные подходы к работе с потоками данных в реальном времени.

1.3 Работа с базами данных: SQLAlchemy и интеграция с Pandas

SQLAlchemy — это мощная библиотека для работы с базами данных в Python. Она предоставляет инструменты для удобного взаимодействия с реляционными базами данных через ORM (Object Relational Mapping) или с использованием чистого SQL.

Pandas же идеально подходит для анализа данных, но иногда данные, которые мы хотим обработать, хранятся в базах данных. Для этого SQLAlchemy и Pandas можно эффективно интегрировать, чтобы выгружать данные из базы, обрабатывать их в Pandas и сохранять обратно.

Установка и подключение

Для начала работы установите библиотеку SQLAlchemy:

```bash

pip install sqlalchemy

```

Если вы используете SQLite, дополнительных действий не требуется. Для других баз данных, таких как PostgreSQL или MySQL, также потребуется установить драйверы, например:

```bash

pip install psycopg2 # Для PostgreSQL

pip install pymysql # Для MySQL

```

Создайте подключение к базе данных с помощью SQLAlchemy. Например, для SQLite это будет выглядеть так:

```python

from sqlalchemy import create_engine

# Создаем подключение к базе данных SQLite

engine = create_engine('sqlite:///example.db', echo=True)

```

Здесь `echo=True` означает, что в консоль будут выводиться SQL-запросы, выполняемые через SQLAlchemy, что полезно для отладки.

Создание таблиц и работа с ORM

SQLAlchemy поддерживает два основных подхода: работа через ORM и использование SQL-запросов напрямую. Рассмотрим оба.

Создадим таблицу для хранения информации о пользователях:

```python

from sqlalchemy import Table, Column, Integer, String, MetaData

# Создаем метаданные

metadata = MetaData()

# Определяем таблицу

users = Table(

'users', metadata,

Column('id', Integer, primary_key=True),

Column('name', String),

Column('age', Integer),

Column('email', String)

)

# Создаем таблицу в базе данных

metadata.create_all(engine)

```

Теперь таблица `users` создана в базе данных.

Для добавления данных используем объект подключения:

```python

from sqlalchemy import insert

# Подключаемся к базе данных

conn = engine.connect()

# Добавляем данные

insert_query = insert(users).values([

{'name': 'Alice', 'age': 25, 'email': 'alice@example.com'},

{'name': 'Bob', 'age': 30, 'email': 'bob@example.com'},

{'name': 'Charlie', 'age': 35, 'email': 'charlie@example.com'}

])

conn.execute(insert_query)

print("Данные добавлены в таблицу.")

```

Чтение данных и интеграция с Pandas

Чтобы выгрузить данные из базы данных в Pandas, SQLAlchemy предоставляет удобный метод. Используем Pandas для выполнения SQL-запроса:

```python

import pandas as pd

# Чтение данных из таблицы users

query ="SELECT * FROM users"

df = pd.read_sql(query, engine)

print(df)

```

Вывод будет выглядеть так:

```

id name age email

0 1 Alice 25 alice@example.com

1 2 Bob 30 bob@example.com

2 3 Charlie 35 charlie@example.com

```

Теперь данные из базы данных доступны в формате DataFrame, и вы можете применять к ним все мощные инструменты анализа, которые предоставляет Pandas.

Обработка данных с использованием Pandas

Допустим, мы хотим найти всех пользователей старше 30 лет и добавить новый столбец с доменом их электронной почты.

```python

# Фильтрация пользователей старше 30 лет

filtered_df = df[df['age'] > 30]

# Добавление нового столбца с доменом электронной почты

filtered_df['email_domain'] = filtered_df['email'].apply(lambda x: x.split('@')[1])

print(filtered_df)

```

Результат будет выглядеть так:

```

id name age email email_domain

2 3 Charlie 35 charlie@example.com example.com

```

Сохранение данных обратно в базу

После обработки данных в Pandas мы можем сохранить их обратно в базу данных. Для этого Pandas предоставляет метод `to_sql`:

```python

# Сохранение отфильтрованных данных в новую таблицу filtered_users

filtered_df.to_sql('filtered_users', engine, if_exists='replace', index=False)

print("Данные сохранены в таблицу filtered_users.")

```

Теперь в базе данных появилась новая таблица `filtered_users`, содержащая обработанные данные.

Работа с ORM

Для более сложных сценариев SQLAlchemy поддерживает ORM, позволяющий работать с таблицами как с Python-классами.

Определим класс для таблицы `users`:

```python

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class User(Base):

__tablename__ = 'users'

id = Column(Integer, primary_key=True)

name = Column(String)

age = Column(Integer)

email = Column(String)

# Создаем сессию для работы с ORM

Session = sessionmaker(bind=engine)

session = Session()

# Пример чтения данных через ORM

users = session.query(User).filter(User.age > 30).all()

for user in users:

print(f"Имя: {user.name}, Возраст: {user.age}, Email: {user.email}")

```

Этот подход особенно удобен, если вы предпочитаете объектно-ориентированный стиль работы с базой данных.

Пример: Анализ данных с SQLAlchemy и Pandas

Представьте, что у вас есть база данных с информацией о продажах, и вы хотите найти города, в которых средняя сумма покупок превышает 5000.

1. Создадим таблицу:

```python

sales = Table(

'sales', metadata,

Column('id', Integer, primary_key=True),

Column('city', String),

Column('amount', Integer)

)

metadata.create_all(engine)

# Добавим данные

conn.execute(insert(sales).values([

{'city': 'New York', 'amount': 7000},

{'city': 'Los Angeles', 'amount': 3000},

{'city': 'New York', 'amount': 8000},

{'city': 'Los Angeles', 'amount': 2000},

{'city': 'Chicago', 'amount': 6000}

]))

```

2. Выгрузим данные и найдем среднюю сумму по городам:

```python

# Чтение данных из таблицы sales

query ="SELECT * FROM sales"

sales_df = pd.read_sql(query, engine)

# Вычисление средней суммы по городам

avg_sales = sales_df.groupby('city')['amount'].mean().reset_index()

# Фильтрация городов с средней суммой > 5000

filtered_sales = avg_sales[avg_sales['amount'] > 5000]

print(filtered_sales)

```

Результат:

```

city amount

0 Chicago 6000.0

1 New York 7500.0

```

3. Сохраним результат в таблицу:

```python

filtered_sales.to_sql('high_avg_sales', engine, if_exists='replace', index=False)

```

Теперь обработанные данные сохранены в базе, и вы можете использовать их в дальнейшем.

SQLAlchemy предоставляет мощные возможности для работы с базами данных, а интеграция с Pandas делает обработку данных ещё более удобной и гибкой. Вы можете быстро выгружать данные из базы, анализировать их с помощью Pandas и сохранять обратно, что упрощает создание аналитических решений и автоматизацию работы с данными.

Задачи для практики

Задача 1: Создание базы данных пользователей и извлечение данных

Описание:

Создайте базу данных `users.db` с таблицей `users`, содержащей следующие столбцы:

— `id` — уникальный идентификатор пользователя.

— `name` — имя пользователя.

— `age` — возраст пользователя.

— `email` — электронная почта.

Добавьте в таблицу данные о пяти пользователях и извлеките всех пользователей старше 30 лет.

Решение:

```python

from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData

import pandas as pd

# Создаем подключение к базе данных SQLite

engine = create_engine('sqlite:///users.db', echo=False)

metadata = MetaData()

# Определяем таблицу users

users = Table(

'users', metadata,

Column('id', Integer, primary_key=True),

Column('name', String),

Column('age', Integer),

Column('email', String)

)

# Создаем таблицу

metadata.create_all(engine)

# Добавляем данные

with engine.connect() as conn:

conn.execute(users.insert(), [

{'name': 'Alice', 'age': 25, 'email': 'alice@example.com'},

{'name': 'Bob', 'age': 35, 'email': 'bob@example.com'},

{'name': 'Charlie', 'age': 32, 'email': 'charlie@example.com'},

{'name': 'Diana', 'age': 28, 'email': 'diana@example.com'},

{'name': 'Eve', 'age': 40, 'email': 'eve@example.com'}

])

# Извлечение пользователей старше 30 лет

query ="SELECT * FROM users WHERE age > 30"

df = pd.read_sql(query, engine)

print(df)

```

Результат:

```

id name age email

1 2 Bob 35 bob@example.com

2 3 Charlie 32 charlie@example.com

4 5 Eve 40 eve@example.com

```

Задача 2: Подсчет пользователей по возрастным группам

Описание:

Используя базу данных `users.db`, разделите пользователей на две группы: младше 30 лет и 30 лет и старше. Посчитайте количество пользователей в каждой группе.

Решение:

```python

# Чтение данных из таблицы

df = pd.read_sql("SELECT * FROM users", engine)

# Добавление возрастной группы

df['age_group'] = df['age'].apply(lambda x: 'Under 30' if x < 30 else '30 and above')

# Подсчет пользователей по группам

group_counts = df.groupby('age_group')['id'].count().reset_index()

print(group_counts)

```

Результат:

```

age_group id

0 30 and above 3

1 Under 30 2

```

Задача 3: Сохранение агрегированных данных в новую таблицу

Описание:

Сохраните результаты подсчета пользователей по возрастным группам в новую таблицу `age_groups` в базе данных `users.db`.

Решение:

```python

# Сохранение в новую таблицу

group_counts.to_sql('age_groups', engine, if_exists='replace', index=False)

# Проверка сохраненных данных

saved_data = pd.read_sql("SELECT * FROM age_groups", engine)

print(saved_data)

```

Результат:

```

age_group id

0 30 and above 3

1 Under 30 2

```

Задача 4: Поиск наиболее популярных доменов электронной почты

Описание:

Добавьте данные о пользователях с разными адресами электронной почты. Найдите, какие домены (`example.com`, `gmail.com` и т.д.) встречаются чаще всего.

Решение:

```python

# Добавление новых данных

with engine.connect() as conn:

conn.execute(users.insert(), [

{'name': 'Frank', 'age': 29, 'email': 'frank@gmail.com'},

{'name': 'Grace', 'age': 37, 'email': 'grace@gmail.com'},

{'name': 'Helen', 'age': 33, 'email': 'helen@example.com'}

])

# Чтение данных

df = pd.read_sql("SELECT * FROM users", engine)

# Выделение доменов

df['email_domain'] = df['email'].apply(lambda x: x.split('@')[1])

# Подсчет частоты доменов

domain_counts = df['email_domain'].value_counts().reset_index()

domain_counts.columns = ['email_domain', 'count']

print(domain_counts)

```

Результат:

```

email_domain count

0 example.com 5

1 gmail.com 2

```

Задача 5: Создание таблицы продаж и анализ доходов

Описание:

Создайте таблицу `sales`, содержащую данные о продажах:

— `id` — идентификатор продажи.

— `product` — название продукта.

— `price` — цена продукта.

— `quantity` — количество проданных единиц.

Рассчитайте общий доход для каждого продукта и сохраните результаты в новую таблицу `product_revenues`.

Решение:

```python

# Определение таблицы sales

sales = Table(

'sales', metadata,

Column('id', Integer, primary_key=True),

Column('product', String),

Column('price', Integer),

Column('quantity', Integer)

)

metadata.create_all(engine)

# Добавление данных

with engine.connect() as conn:

conn.execute(sales.insert(), [

{'product': 'Laptop', 'price': 1000, 'quantity': 3},

{'product': 'Phone', 'price': 500, 'quantity': 5},

{'product': 'Tablet', 'price': 300, 'quantity': 7}

])

# Чтение данных

sales_df = pd.read_sql("SELECT * FROM sales", engine)

# Расчет общего дохода

sales_df['revenue'] = sales_df['price'] * sales_df['quantity']

revenues = sales_df.groupby('product')['revenue'].sum().reset_index()

# Сохранение в новую таблицу

revenues.to_sql('product_revenues', engine, if_exists='replace', index=False)

# Проверка сохраненных данных

saved_revenues = pd.read_sql("SELECT * FROM product_revenues", engine)

print(saved_revenues)

```

Результат:

```

product revenue

0 Laptop 3000

1 Phone 2500

2 Tablet 2100

```

Задача 6: Фильтрация данных по динамическому запросу

Описание:

Создайте функцию, которая принимает минимальную цену и возвращает список продуктов, стоимость которых выше указанного значения.

Решение:

```python

def filter_products_by_price(min_price):

query = f"SELECT * FROM sales WHERE price > {min_price}"

result_df = pd.read_sql(query, engine)

return result_df

# Фильтрация продуктов с ценой выше 400

filtered_products = filter_products_by_price(400)

print(filtered_products)

```

Результат:

```

id product price quantity

0 1 Laptop 1000 3

1 2 Phone 500 5

```

Задача 7: Определение наиболее активных пользователей

Описание:

В таблице `activity_log` содержатся данные о действиях пользователей:

— `id` — идентификатор записи.

— `user_id` — идентификатор пользователя.

— `action` — выполненное действие.

— `timestamp` — время выполнения действия.

Определите, кто из пользователей совершил наибольшее количество действий.

Решение:

```python

from sqlalchemy import Table, Column, Integer, String, DateTime

from datetime import datetime

# Определение таблицы activity_log

activity_log = Table(

'activity_log', metadata,

Column('id', Integer, primary_key=True),

Column('user_id', Integer),

Column('action', String),

Column('timestamp', DateTime)

)

metadata.create_all(engine)

# Добавление данных

with engine.connect() as conn:

conn.execute(activity_log.insert(), [

{'user_id': 1, 'action': 'login', 'timestamp': datetime(2025, 1, 1, 10, 0)},

{'user_id': 1, 'action': 'purchase', 'timestamp': datetime(2025, 1, 1, 10, 5)},

{'user_id': 2, 'action': 'login', 'timestamp': datetime(2025, 1, 1, 11, 0)},

{'user_id': 1, 'action': 'logout', 'timestamp': datetime(2025, 1, 1, 10, 10)},

{'user_id': 2, 'action': 'purchase', 'timestamp': datetime(2025, 1, 1, 11, 5)},

{'user_id': 2, 'action': 'logout', 'timestamp': datetime(2025, 1, 1, 11, 10)}

])

# Чтение данных

activity_df = pd.read_sql("SELECT * FROM activity_log", engine)

# Подсчет количества действий по пользователям

user_activity = activity_df.groupby('user_id')['id'].count().reset_index()

user_activity.columns = ['user_id', 'action_count']

# Поиск самого активного пользователя

most_active_user = user_activity.loc[user_activity['action_count'].idxmax()]

print(most_active_user)

```

Результат:

```

user_id 1

action_count 3

```

Задача 8: Подсчет действий по типу

Описание: Для каждого типа действия из таблицы `activity_log` подсчитайте, сколько раз оно выполнялось.

Решение:

```python

# Подсчет количества каждого типа действия

action_counts = activity_df['action'].value_counts().reset_index()

action_counts.columns = ['action', 'count']

print(action_counts)

```

Результат:

```

action count

0 login 2

1 purchase 2

2 logout 2

```

Задача 9: Анализ временных меток

Описание: Определите, в какие часы дня пользователи наиболее активны.

Решение:

```python

# Извлечение часа из временных меток

activity_df['hour'] = activity_df['timestamp'].dt.hour

# Подсчет действий по часам

hourly_activity = activity_df.groupby('hour')['id'].count().reset_index()

hourly_activity.columns = ['hour', 'action_count']

print(hourly_activity)

```

Результат:

```

hour action_count

0 10 3

1 11 3

```

Задача 10: Создание таблицы доходов от пользователей

Описание: Используя таблицу `sales`, определите, сколько дохода принёс каждый пользователь, и сохраните результаты в таблицу `user_revenues`.

Решение:

```python

# Добавление данных о продажах с указанием user_id

with engine.connect() as conn:

conn.execute(sales.insert(), [

{'product': 'Laptop', 'price': 1000, 'quantity': 1, 'user_id': 1},

{'product': 'Phone', 'price': 500, 'quantity': 2, 'user_id': 1},

{'product': 'Tablet', 'price': 300, 'quantity': 3, 'user_id': 2}

])

# Чтение данных из sales

sales_df = pd.read_sql("SELECT * FROM sales", engine)

# Расчёт дохода для каждого пользователя

sales_df['revenue'] = sales_df['price'] * sales_df['quantity']

user_revenues = sales_df.groupby('user_id')['revenue'].sum().reset_index()

# Сохранение в новую таблицу

user_revenues.to_sql('user_revenues', engine, if_exists='replace', index=False)

# Проверка результатов

saved_user_revenues = pd.read_sql("SELECT * FROM user_revenues", engine)

print(saved_user_revenues)

```

Результат:

```

user_id revenue

0 1 2000

1 2 900

```

Задача 11: Поиск последнего действия пользователей

Описание:Для каждого пользователя из таблицы `activity_log` найдите его последнее действие.

Решение:

```python

# Поиск последнего действия

last_actions = activity_df.sort_values('timestamp').groupby('user_id').last().reset_index()

last_actions = last_actions[['user_id', 'action', 'timestamp']]

print(last_actions)

```

Результат:

```

user_id action timestamp

0 1 logout 2025-01-01 10:10:00

1 2 logout 2025-01-01 11:10:00

```

Задача 12: Фильтрация пользователей с высоким доходом

Описание: Используя таблицу `user_revenues`, выберите всех пользователей, чей доход превышает 1500.

Решение:

```python

# Чтение данных из user_revenues

user_revenues = pd.read_sql("SELECT * FROM user_revenues", engine)

# Фильтрация пользователей с доходом > 1500

high_revenue_users = user_revenues[user_revenues['revenue'] > 1500]

print(high_revenue_users)

```

Результат:

```

user_id revenue

0 1 2000

```

Задача 13: Распределение доходов по продуктам

Описание: Определите, какой процент от общего дохода приносит каждый продукт.

Решение:

```python

# Подсчет общего дохода

total_revenue = sales_df['revenue'].sum()

# Расчет процента дохода по продуктам

sales_df['revenue_percent'] = (sales_df['revenue'] / total_revenue) * 100

product_revenue_percent = sales_df.groupby('product')['revenue_percent'].sum().reset_index()

print(product_revenue_percent)

```

Результат:

```

product revenue_percent

0 Laptop 50.793651

1 Phone 25.396825

2 Tablet 23.809524

```

Эти задачи демонстрируют, как SQLAlchemy и Pandas могут использоваться вместе для создания, управления и анализа данных в базах данных. Они покрывают такие аспекты, как фильтрация данных, выполнение группировок и агрегатов, интеграция данных и сохранение результатов. Эти примеры помогут вам освоить основные техники работы с базами данных в Python.

Оглавление

Купить книгу

Приведённый ознакомительный фрагмент книги «Библиотеки Python Часть 2. Практическое применение» предоставлен нашим книжным партнёром — компанией ЛитРес.

Купить и скачать полную версию книги в форматах FB2, ePub, MOBI, TXT, HTML, RTF и других

Вам также может быть интересно

а б в г д е ё ж з и й к л м н о п р с т у ф х ц ч ш щ э ю я