Джейд Картер – Библиотеки Python Часть 2. Практическое применение (страница 9)
if reading['temperature'] > 40 or reading['temperature'] < -10:
print(f"Аномалия! Город: {reading['city']}, Температура: {reading['temperature']}°C")
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Консьюмер читает данные о температуре из топика.
– Если температура выходит за пределы нормального диапазона, программа выводит сообщение об аномалии.
Задача 6: Потоковое объединение данных
Описание:
Есть два топика:
1. `orders` – содержит данные о заказах: `order_id`, `product_id`, `quantity`.
2. `products` – содержит данные о товарах: `product_id`, `product_name`, `price`.
Ваша задача: написать программу, которая объединяет данные из этих двух топиков и выводит итоговую информацию о каждом заказе, включая название продукта и общую стоимость.
Решение:
```python
from confluent_kafka import Consumer
import json
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмеров для обоих топиков
order_consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'order-group',
'auto.offset.reset': 'earliest'
})
product_consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'product-group',
'auto.offset.reset': 'earliest'
})
order_consumer.subscribe(['orders'])
product_consumer.subscribe(['products'])
# Словарь для хранения данных о товарах
product_catalog = {}
try:
while True:
# Чтение данных из топика products
product_msg = product_consumer.poll(0.1)
if product_msg and not product_msg.error():
product = json.loads(product_msg.value().decode('utf-8'))
product_catalog[product['product_id']] = {
'name': product['product_name'],
'price': product['price']
}
# Чтение данных из топика orders
order_msg = order_consumer.poll(0.1)
if order_msg and not order_msg.error():
order = json.loads(order_msg.value().decode('utf-8'))
product_id = order['product_id']
# Объединение данных о заказе и товаре
if product_id in product_catalog:
product = product_catalog[product_id]
total_price = order['quantity'] * product['price']
print(f"Заказ {order['order_id']}: {product['name']} x {order['quantity']} = {total_price} $")
else:
print(f"Информация о товаре {product_id} отсутствует.")
except KeyboardInterrupt:
print("Завершение работы.")
finally:
order_consumer.close()
product_consumer.close()
```