реклама
Бургер менюБургер меню

Джейд Картер – Библиотеки Python Часть 2. Практическое применение (страница 5)

18

print(f"Получено сообщение: {msg.value().decode('utf-8')}")

except KeyboardInterrupt:

print("Завершение работы…")

finally:

# Закрытие консьюмера

consumer.close()

```

В этом примере консьюмер подключается к Kafka, читает сообщения из топика `orders` и выводит их на экран.

Потоковая обработка данных

Kafka часто используется совместно с платформами потоковой обработки, такими как Apache Spark или Apache Flink, для анализа данных в реальном времени. Однако вы также можете обрабатывать данные прямо в Python.

Например, предположим, что мы хотим обработать события из топика `orders` и рассчитать суммарную стоимость всех заказов:

```python

from confluent_kafka import Consumer

import json

# Настройки консьюмера

consumer_config = {

'bootstrap.servers': 'localhost:9092',

'group.id': 'order-sum-group',

'auto.offset.reset': 'earliest'

}

# Создание консьюмера

consumer = Consumer(consumer_config)

consumer.subscribe(['orders'])

# Суммарная стоимость заказов

total_sales = 0

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Обработка сообщения

order = json.loads(msg.value().decode('utf-8'))

total_sales += order['price']

print(f"Обработан заказ: {order['order_id']}, текущая сумма: {total_sales}")

except KeyboardInterrupt:

print(f"Общая сумма всех заказов: {total_sales}")

finally:

consumer.close()

```

Преимущества использования Kafka

1. Высокая производительность. Kafka поддерживает миллионы событий в секунду благодаря своей архитектуре и использованию партиций.

2. Надежность. Данные хранятся в Kafka до тех пор, пока их не обработают все подписчики.

3. Масштабируемость. Kafka легко масштабируется путем добавления новых брокеров.

4. Универсальность. Kafka поддерживает интеграцию с большинством современных инструментов обработки данных.

Apache Kafka предоставляет мощный набор инструментов для потоковой обработки данных. Используя Python, вы можете легко настроить передачу данных, их обработку и анализ в реальном времени. Это особенно полезно для систем, где требуется высокая производительность и минимальная задержка при обработке больших потоков данных.

Задача 1: Фильтрация событий по условию

Описание:

У вас есть топик `clickstream`, содержащий события о кликах на веб-сайте. Каждое событие содержит следующие поля:

– `user_id` – идентификатор пользователя.

– `url` – URL-адрес, на который был клик.

– `timestamp` – время клика.

Ваша задача: создать консьюмера, который будет читать события из Kafka, фильтровать только события с URL-адресами, содержащими слово "product", и сохранять их в новый топик `filtered_clicks`.

Решение:

```python

from confluent_kafka import Producer, Consumer

import json

# Настройки Kafka

broker = 'localhost:9092'

# Создание продюсера для записи в новый топик

producer = Producer({'bootstrap.servers': broker})

def produce_filtered_event(event):

producer.produce('filtered_clicks', value=json.dumps(event))

producer.flush()