A Code Implementation of a Actual‑Time In‑Reminiscence Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic


On this pocket book, we exhibit the best way to construct a completely in-memory “sensor alert” pipeline in Google Colab utilizing FastStream, a high-performance, Python-native stream processing framework, and its integration with RabbitMQ. By leveraging faststream.rabbit’s RabbitBroker and TestRabbitBroker, we simulate a message dealer without having exterior infrastructure. We orchestrate 4 distinct phases: ingestion & validation, normalization, monitoring & alert era, and archiving, every outlined as Pydantic fashions (RawSensorData, NormalizedData, AlertData) to make sure knowledge high quality and sort security. Beneath the hood, Python’s asyncio powers asynchronous message movement, whereas nest_asyncio permits nested occasion loops in Colab. We additionally make use of the usual logging module for traceable pipeline execution and pandas for remaining outcome inspection, making it straightforward to visualise archived alerts in a DataFrame.

!pip set up -q faststream[rabbit] nest_asyncio

We set up FastStream with its RabbitMQ integration, offering the core stream-processing framework and dealer connectors, in addition to the nest_asyncio bundle, which permits nested asyncio occasion loops in environments like Colab. All that is achieved whereas preserving the output minimal with the -q flag.

import nest_asyncio, asyncio, logging
nest_asyncio.apply()

We import the nest_asyncio, asyncio, and logging modules, then apply nest_asyncio.apply() to patch Python’s occasion loop to be able to run nested asynchronous duties inside environments like Colab or Jupyter notebooks with out errors. The logging import readies you to instrument your pipeline with detailed runtime logs.

logging.basicConfig(degree=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("sensor_pipeline")

We configure Python’s constructed‑in logging to emit INFO‑degree (and above) messages prefixed with a timestamp and severity, then create a devoted logger named “sensor_pipeline” for emitting structured logs inside your streaming pipeline.

from faststream import FastStream
from faststream.rabbit import RabbitBroker, TestRabbitBroker
from pydantic import BaseModel, Subject, validator
import pandas as pd
from typing import Listing

We usher in FastStream’s core FastStream class alongside its RabbitMQ connectors (RabbitBroker for actual brokers and TestRabbitBroker for in‑reminiscence testing), Pydantic’s BaseModel, Subject, and validator for declarative knowledge validation, pandas for tabular outcome inspection, and Python’s Listing sort for annotating our in‑reminiscence archives.

dealer = RabbitBroker("amqp://visitor:visitor@localhost:5672/")
app    = FastStream(dealer)

We instantiate a RabbitBroker pointed at a (native) RabbitMQ server utilizing the AMQP URL, then create a FastStream software sure to that dealer, establishing the messaging spine in your pipeline phases.

class RawSensorData(BaseModel):
    sensor_id: str       = Subject(..., examples=["sensor_1"])
    reading_celsius: float = Subject(..., ge=-50, le=150, examples=[23.5])
   
    @validator("sensor_id")
    def must_start_with_sensor(cls, v):
        if not v.startswith("sensor_"):
            increase ValueError("sensor_id should begin with 'sensor_'")
        return v


class NormalizedData(BaseModel):
    sensor_id: str
    reading_kelvin: float


class AlertData(BaseModel):
    sensor_id: str
    reading_kelvin: float
    alert: bool

These Pydantic fashions outline the schema for every stage: RawSensorData enforces enter validity (e.g., studying vary and a sensor_ prefix), NormalizedData converts Celsius to Kelvin, and AlertData encapsulates the ultimate alert payload (together with a boolean flag), guaranteeing a type-safe knowledge movement all through the pipeline.

archive: Listing[AlertData] = []


@dealer.subscriber("sensor_input")
@dealer.writer("normalized_input")
async def ingest_and_validate(uncooked: RawSensorData) -> dict:
    logger.information(f"Ingested uncooked knowledge: {uncooked.json()}")
    return uncooked.dict()


@dealer.subscriber("normalized_input")
@dealer.writer("sensor_alert")
async def normalize(knowledge: dict) -> dict:
    norm = NormalizedData(
        sensor_id=knowledge["sensor_id"],
        reading_kelvin=knowledge["reading_celsius"] + 273.15
    )
    logger.information(f"Normalized to Kelvin: {norm.json()}")
    return norm.dict()


ALERT_THRESHOLD_K = 323.15  
   
@dealer.subscriber("sensor_alert")
@dealer.writer("archive_topic")
async def monitor(knowledge: dict) -> dict:
    alert_flag = knowledge["reading_kelvin"] > ALERT_THRESHOLD_K
    alert = AlertData(
        sensor_id=knowledge["sensor_id"],
        reading_kelvin=knowledge["reading_kelvin"],
        alert=alert_flag
    )
    logger.information(f"Monitor outcome: {alert.json()}")
    return alert.dict()


@dealer.subscriber("archive_topic")
async def archive_data(payload: dict):
    rec = AlertData(**payload)
    archive.append(rec)
    logger.information(f"Archived: {rec.json()}")

An in-memory archive checklist collects all finalized alerts, whereas 4 asynchronous features, wired through @dealer.subscriber/@dealer.writer, type the pipeline phases. These features ingest and validate uncooked sensor inputs, convert Celsius to Kelvin, test in opposition to an alert threshold, and at last archive every AlertData document, emitting logs at each step for full traceability.

async def foremost():
    readings = [
        {"sensor_id": "sensor_1", "reading_celsius": 45.2},
        {"sensor_id": "sensor_2", "reading_celsius": 75.1},
        {"sensor_id": "sensor_3", "reading_celsius": 50.0},
    ]
    async with TestRabbitBroker(dealer) as tb:
        for r in readings:
            await tb.publish(r, "sensor_input")
        await asyncio.sleep(0.1)
       
    df = pd.DataFrame([a.dict() for a in archive])
    print("nFinal Archived Alerts:")
    show(df)


asyncio.run(foremost())

Lastly, the primary coroutine publishes a set of pattern sensor readings into the in-memory TestRabbitBroker, pauses briefly to permit every pipeline stage to run, after which collates the ensuing AlertData data from the archive right into a pandas DataFrame for straightforward show and verification of the end-to-end alert movement. On the finish, asyncio.run(foremost()) kicks off the whole async demo in Colab.

In conclusion, this tutorial demonstrates how FastStream, mixed with RabbitMQ abstractions and in-memory testing through TestRabbitBroker, can speed up the event of real-time knowledge pipelines with out the overhead of deploying exterior brokers. With Pydantic dealing with schema validation, asyncio managing concurrency, and pandas enabling fast knowledge evaluation, this sample supplies a sturdy basis for sensor monitoring, ETL duties, or occasion‑pushed workflows. You possibly can seamlessly transition from this in‑reminiscence demo to manufacturing by swapping in a stay dealer URL (RabbitMQ, Kafka, NATS, or Redis) and working faststream run beneath uvicorn or your most popular ASGI server, unlocking scalable, maintainable stream processing in any Python surroundings.


Right here is the Colab Notebook. Additionally, don’t neglect to comply with us on Twitter and be part of our Telegram Channel and LinkedIn Group. Don’t Neglect to affix our 90k+ ML SubReddit.

🔥 [Register Now] miniCON Virtual Conference on AGENTIC AI: FREE REGISTRATION + Certificate of Attendance + 4 Hour Short Event (May 21, 9 am- 1 pm PST) + Hands on Workshop


Sana Hassan, a consulting intern at Marktechpost and dual-degree scholar at IIT Madras, is obsessed with making use of know-how and AI to handle real-world challenges. With a eager curiosity in fixing sensible issues, he brings a recent perspective to the intersection of AI and real-life options.

Leave a Reply

Your email address will not be published. Required fields are marked *