# How to Supercharge Your Streaming Data Pipeline in Python

Streaming data processing has come a long way, so why stick to old methods and not use modern practices. Let me share my fresh perspective that can help you solve your problem.

## Inspiration from Batch Processing

Batch Processing shines with below (though not limited) general use cases

* Transforming data in ETL/ELT data pipelines.
    
* Performing aggregation, grouping, filtering, joins, analytics, etc, the list is never ending.
    
* Doing all kinds of operations on table data like append, merge, delete, update, etc.
    

This side of the world is pretty mature now. We have a very good set of tools, frameworks, etc. that allows us to develop the pipeline. So why not apply the same functionality on a streaming pipeline but at the same time not lose its real-time processing character.

## Problem Statement

Track customers' journeys on an e-commerce website. The events include which products a customer viewed, which products were added to the cart, and finally, which products were purchased. Calculate metrics that can be used for analysis.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1749389855794/2a345922-d7ac-4b60-9f01-5ba78ffd70c2.png align="center")

I am using `Kafka` as my streaming platform. In Python, two popular client libraries can be used to interact with Kafka brokers

1. [confluent kafka](https://github.com/confluentinc/confluent-kafka-python)
    
2. [kafka python](https://github.com/dpkp/kafka-python)
    

Here is my alternative take - neither of them should be used directly.

* Neither of them has Python’s `Typed` support, so no IDE can work efficiently.
    
* Both need a fair amount of boilerplate code
    
* Their scope of work is limited to just `Producer`, `Consumer` & `Admin`.
    
* To apply principles of Batch processing, we have to write a lot of custom code.
    

You need not worry, there are open-source libraries like [Pathway](https://github.com/pathwaycom/pathway) and [Quix Streams](https://github.com/quixio/quix-streams) that can be used for stream processing. Let’s quickly compare them

* Pathway has many more GitHub stars compared to Quix Streams.
    
* Pathway also supports a lot more connectors and sinks than the other.
    
* Both allow us to do batch processing tasks like filters, aggregation, joins, analytics, and Python UDF on streaming data.
    
* Pathway's strength in supporting many connectors and sinks is also its weakness, as it makes the library quite large. Quix Streams is designed for one main purpose: Streaming.
    
* The biggest advantage of Quix Streams is that it lets us use low-level Kafka client tools with full `Typed` support, allowing us to use IDEs efficiently. **This was the main reason I chose Quix Streams.**
    

### The Producer:

Below, I am generating Fake data for a Customer journey & sending the events into their respective topics.

```python
 import logging

from quixstreams import Application

from src.ops.generator import generate_dummy_e_commerce_data

logger = logging.getLogger("data-pipeline")
app = Application(
    broker_address="localhost:9092",
    loglevel="DEBUG",
    producer_extra_config={
        "linger.ms": "300",  # Wait up to 300ms for more messages before sending
        "compression.type": "gzip",  # Use gzip compression for messages
    },
)
product_view_topic = app.topic(
    "ecomm_product_view", value_serializer="json", key_serializer="string"
)
# Creating the kafka Topic client
cart_topic = app.topic("ecomm_cart", value_serializer="json", key_serializer="string")
buy_topic = app.topic("ecomm_buy", value_serializer="json", key_serializer="string")

# quixstreams' Application allow us to use context manager so we won't need to worry 
# about closing connection gracefully
with app.get_producer() as producer:
    for _ in range(200):
        logger.info(f"current iteration: {_}")
        # generate dummy data
        event = generate_dummy_e_commerce_data()

        # sending the event to respective topics
        # Product view event
        product_view_msg = product_view_topic.serialize(
            key=event["product_view"].user_id,
            value=event["product_view"].model_dump(mode="json"),
        )
        producer.produce(
            product_view_topic.name,
            value=product_view_msg.value,
            key=product_view_msg.key,
        )
        logger.debug(f"producing product view event: {event['product_view'].event_id}")

        # Add to cart event
        # NOTE - It is possible that the add_to_cart and purchase events are None
        if event["add_to_cart"]:
            cart_msg = cart_topic.serialize(
                key=event["add_to_cart"].user_id,
                value=event["add_to_cart"].model_dump(mode="json"),
            )
            producer.produce(cart_topic.name, value=cart_msg.value, key=cart_msg.key)
            logger.debug(
                f"producing product add to cart event: {event['add_to_cart'].event_id}"
            )

        if event["purchase"]:
            buy_msg = buy_topic.serialize(
                key=event["purchase"].user_id,
                value=event["purchase"].model_dump(mode="json"),
            )
            producer.produce(buy_topic.name, value=buy_msg.value, key=buy_msg.key)
            logger.debug(f"producing buy event: {event['purchase'].event_id}")
```

**Important Point:**

* The default settings for Producer are good, but to extract more performance, you’ll need to fine-tune them. I usually play around following the producer config setting:
    
    * `compression.type`: To compress messages. I usually use `gzip`. It has wide support.
        
    * `linger.ms`: Duration to wait for more gathering messages before sending. It should depend on the frequency of incoming messages.
        
* It’s recommended to use the context manager of `quixstreams` app object, so that towards the end connections will be closed gracefully.
    

### The Consumer: Part A

Consuming messages from three topics —&gt; Perform data processing —&gt; Perform join to calculate customer journey —&gt; Publish results to 4th topic.

```python
from quixstreams import Application

from src.ops.transform import convert_utc_to_ist

app = Application(
    broker_address="localhost:9092",
    consumer_group="ecomm_sync_group",
    auto_offset_reset="earliest",
    consumer_extra_config={
        "auto.offset.reset": "earliest",  # Start reading from the earliest message
        "enable.auto.commit": "true",  # Automatically commit offsets
    },
    loglevel="DEBUG",
)

# add topics to consume
product_view_topic = app.topic(
    "ecomm_product_view", value_serializer="json", key_serializer="string"
)
cart_topic = app.topic("ecomm_cart", value_deserializer="json", key_serializer="string")
buy_topic = app.topic("ecomm_buy", value_deserializer="json", key_serializer="string")
# Output topic for customer journey
customer_journey_topic = app.topic(
    "ecomm_customer_journey",
    value_serializer="json",
)

# Streaming dataframe consumers
product_view_sdf = app.dataframe(product_view_topic)
cart_sdf = app.dataframe(cart_topic)
buy_sdf = app.dataframe(buy_topic)

# Stream processing
# Product view topic
product_view_sdf["timestamp"] = product_view_sdf["timestamp"].apply(convert_utc_to_ist)

# Add to cart topic
cart_sdf["price"] = cart_sdf["price"].apply(lambda x: round(x, 2))
cart_sdf["timestamp"] = cart_sdf["timestamp"].apply(convert_utc_to_ist)
# Join product view and cart dataframes on user_id
joined_view_cart = cart_sdf.join_asof(
    right=product_view_sdf, how="left", on_merge="keep-left"
)

# Buy product topic
buy_sdf["price"] = buy_sdf["price"].apply(lambda x: round(x, 2))
buy_sdf["timestamp"] = buy_sdf["timestamp"].apply(convert_utc_to_ist)
joined_buy = buy_sdf.join_asof(right=joined_view_cart, how="left", on_merge="keep-left")
joined_buy.to_topic(customer_journey_topic)

# Starting the app to process streams real-time
app.run()
```

**Important Points:**

* Same as `Producer`, the default settings for `Consumer` are good, but to extract more performance, you’ll need to fine-tune them. I usually play around following the producer config setting:
    
* Quix streams provides us with `StreamingDataFrame` interface to apply all kinds of transformation & analytical logic that we want to apply, including Python `UDF`.
    
* I am performing a `Stateful Join` on two `StreamingDataFrame`. It uses `RocksDB` to maintain the State with flushing data. I will highly recommend reading more on it [here](https://quix.io/docs/quix-streams/joins.html#how-it-works_1)
    
* After a couple of `Join` Operation, I am sending data as an event to yet another topic. This will only get triggered when the join operation finds a key to join. In this way, I can track the entire journey of the customer from product view to add to cart to finally buy.
    

<div data-node-type="callout">
<div data-node-type="callout-emoji">💡</div>
<div data-node-type="callout-text">I would highly recommend that you read &amp; go through Quix Streams’ official <a target="_self" rel="noopener noreferrer nofollow" href="https://quix.io/docs/get-started/welcome.html" style="pointer-events: none">docs</a>. They nicely explained a lot more use cases with examples.</div>
</div>

### The Consumer: Part B

Consuming messages using a low-level client library API to perform custom sink logic.

```python
import logging
from pathlib import Path

import orjson
import polars as pl
from quixstreams import Application

from src.connector import DataWriter

logger = logging.getLogger("data-pipeline")
merge_option = DataWriter.generate_delta_table_merge_method_options(
    when_not_matched_insert_all=True, when_matched_update_all=True
)
app = Application(
    broker_address="localhost:9092",
    consumer_group="ecomm_customer_report_group",
    auto_offset_reset="earliest",
    consumer_extra_config={
        "enable.auto.commit": True,  # Automatically commit offsets
    },
    loglevel="DEBUG",
)

with app.get_consumer() as consumer:
    consumer.subscribe(topics=["ecomm_customer_journey"])

    # Starting the 'Forever consuming consumer'
    while True:
        message = consumer.poll(0.5)
        if message is None:
            continue
        elif message.error():
            logger.error("Kafka error:", message.error())
            continue

        value = message.value()

        # Merge data into Delta lake table
        df = pl.from_dict(orjson.loads(value))
        merge_stats = DataWriter.delta_table_merge_disk(
            df=df,
            path=Path(__file__).parent.parent.parent
            / "data/gold/ecomm_customer_journey",
            delta_merge_options={
                "predicate": "source.event_id = target.event_id",  # condition to determine upsert req
                "source_alias": "source",
                "target_alias": "target",
            },
            delta_merge_method_options=merge_option,
        )
        logger.info(f"merge successfully with stats: {merge_stats}")

        consumer.store_offsets(message=message)
```

**Important Points:**

* This is the second way to get messages from Kafka Topics. It requires more code but gives us flexibility in processing. However, it's not the main recommendation; using `StreamingDataFrame` is preferred.
    
* I needed to do a `Delta Merge` on the Delta Lake table, which the `quixstreams` library doesn't support directly. So, I used the low-level client API to achieve it.
    
* I like the `quixstreams` library because it offers a high-level API -`StreamingDataFrame`, for batch-like processing on streaming data. But if that's not an option, it also provides a low-level Kafka client API, allowing us to do almost anything.
    

<div data-node-type="callout">
<div data-node-type="callout-emoji">💡</div>
<div data-node-type="callout-text">Pathway library has native support for writing to Deltalake tables, but it simply performs <code>append</code> &amp; no <code>delta merge</code>. Also, it does not expose any direct client API.</div>
</div>

**Note**: I have open-sourced my project & you can find all the source code here - [Akashdesarda/data-pipeline-app-demo](https://github.com/Akashdesarda/data-pipeline-app-demo)

## Conclusion:

* Supercharging your streaming data pipeline in Python involves using modern tools and practices to boost efficiency and performance.
    
* Draw inspiration from batch processing to apply similar principles to streaming data without losing real-time capabilities.
    
* Use advanced libraries like Quix Streams for efficient data processing.
    
* Features like `StreamingDataFrame` these allow for high-level operations.
    
* Low-level client APIs are available for custom tasks.
    
* This approach simplifies development and provides flexibility and scalability.
    
* It enables effective tracking and analysis of customer journeys in real-time.
    
* Embracing these modern techniques ensures your data pipeline is robust, efficient, and meets the demands of today's data-driven environments.
