
WWW.MARKTECHPOST.COM
A Code Implementation of a Real‑Time In‑Memory Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic
In this notebook, we demonstrate how to build a fully in-memory “sensor alert” pipeline in Google Colab using FastStream, a high-performance, Python-native stream processing framework, and its integration with RabbitMQ. By leveraging faststream.rabbit’s RabbitBroker and TestRabbitBroker, we simulate a message broker without needing external infrastructure. We orchestrate four distinct stages: ingestion & validation, normalization, monitoring & alert generation, and archiving, each defined as Pydantic models (RawSensorData, NormalizedData, AlertData) to ensure data quality and type safety. Under the hood, Python’s asyncio powers asynchronous message flow, while nest_asyncio enables nested event loops in Colab. We also employ the standard logging module for traceable pipeline execution and pandas for final result inspection, making it easy to visualize archived alerts in a DataFrame.
!pip install -q faststream[rabbit] nest_asyncio
We install FastStream with its RabbitMQ integration, providing the core stream-processing framework and broker connectors, as well as the nest_asyncio package, which enables nested asyncio event loops in environments like Colab. All this is achieved while keeping the output minimal with the -q flag.
import nest_asyncio, asyncio, logging
nest_asyncio.apply()
We import the nest_asyncio, asyncio, and logging modules, then apply nest_asyncio.apply() to patch Python’s event loop so that you can run nested asynchronous tasks inside environments like Colab or Jupyter notebooks without errors. The logging import readies you to instrument your pipeline with detailed runtime logs.
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("sensor_pipeline")
We configure Python’s built‑in logging to emit INFO‑level (and above) messages prefixed with a timestamp and severity, then create a dedicated logger named “sensor_pipeline” for emitting structured logs within your streaming pipeline.
from faststream import FastStream
from faststream.rabbit import RabbitBroker, TestRabbitBroker
from pydantic import BaseModel, Field, validator
import pandas as pd
from typing import List
We bring in FastStream’s core FastStream class alongside its RabbitMQ connectors (RabbitBroker for real brokers and TestRabbitBroker for in‑memory testing), Pydantic’s BaseModel, Field, and validator for declarative data validation, pandas for tabular result inspection, and Python’s List type for annotating our in‑memory archives.
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)
We instantiate a RabbitBroker pointed at a (local) RabbitMQ server using the AMQP URL, then create a FastStream application bound to that broker, setting up the messaging backbone for your pipeline stages.
class RawSensorData(BaseModel):
sensor_id: str = Field(..., examples=["sensor_1"])
reading_celsius: float = Field(..., ge=-50, le=150, examples=[23.5])
@validator("sensor_id")
def must_start_with_sensor(cls, v):
if not v.startswith("sensor_"):
raise ValueError("sensor_id must start with 'sensor_'")
return v
class NormalizedData(BaseModel):
sensor_id: str
reading_kelvin: float
class AlertData(BaseModel):
sensor_id: str
reading_kelvin: float
alert: bool
These Pydantic models define the schema for each stage: RawSensorData enforces input validity (e.g., reading range and a sensor_ prefix), NormalizedData converts Celsius to Kelvin, and AlertData encapsulates the final alert payload (including a boolean flag), ensuring a type-safe data flow throughout the pipeline.
archive: List[AlertData] = []
@broker.subscriber("sensor_input")
@broker.publisher("normalized_input")
async def ingest_and_validate(raw: RawSensorData) -> dict:
logger.info(f"Ingested raw data: {raw.json()}")
return raw.dict()
@broker.subscriber("normalized_input")
@broker.publisher("sensor_alert")
async def normalize(data: dict) -> dict:
norm = NormalizedData(
sensor_id=data["sensor_id"],
reading_kelvin=data["reading_celsius"] + 273.15
)
logger.info(f"Normalized to Kelvin: {norm.json()}")
return norm.dict()
ALERT_THRESHOLD_K = 323.15
@broker.subscriber("sensor_alert")
@broker.publisher("archive_topic")
async def monitor(data: dict) -> dict:
alert_flag = data["reading_kelvin"] > ALERT_THRESHOLD_K
alert = AlertData(
sensor_id=data["sensor_id"],
reading_kelvin=data["reading_kelvin"],
alert=alert_flag
)
logger.info(f"Monitor result: {alert.json()}")
return alert.dict()
@broker.subscriber("archive_topic")
async def archive_data(payload: dict):
rec = AlertData(**payload)
archive.append(rec)
logger.info(f"Archived: {rec.json()}")
An in-memory archive list collects all finalized alerts, while four asynchronous functions, wired via @broker.subscriber/@broker.publisher, form the pipeline stages. These functions ingest and validate raw sensor inputs, convert Celsius to Kelvin, check against an alert threshold, and finally archive each AlertData record, emitting logs at every step for full traceability.
async def main():
readings = [
{"sensor_id": "sensor_1", "reading_celsius": 45.2},
{"sensor_id": "sensor_2", "reading_celsius": 75.1},
{"sensor_id": "sensor_3", "reading_celsius": 50.0},
]
async with TestRabbitBroker(broker) as tb:
for r in readings:
await tb.publish(r, "sensor_input")
await asyncio.sleep(0.1)
df = pd.DataFrame([a.dict() for a in archive])
print("\nFinal Archived Alerts:")
display(df)
asyncio.run(main())
Finally, the main coroutine publishes a set of sample sensor readings into the in-memory TestRabbitBroker, pauses briefly to allow each pipeline stage to run, and then collates the resulting AlertData records from the archive into a pandas DataFrame for easy display and verification of the end-to-end alert flow. At the end, asyncio.run(main()) kicks off the entire async demo in Colab.
In conclusion, this tutorial demonstrates how FastStream, combined with RabbitMQ abstractions and in-memory testing via TestRabbitBroker, can accelerate the development of real-time data pipelines without the overhead of deploying external brokers. With Pydantic handling schema validation, asyncio managing concurrency, and pandas enabling quick data analysis, this pattern provides a robust foundation for sensor monitoring, ETL tasks, or event‑driven workflows. You can seamlessly transition from this in‑memory demo to production by swapping in a live broker URL (RabbitMQ, Kafka, NATS, or Redis) and running faststream run under uvicorn or your preferred ASGI server, unlocking scalable, maintainable stream processing in any Python environment.
Here is the Colab Notebook. Also, don’t forget to follow us on Twitter and join our Telegram Channel and LinkedIn Group. Don’t Forget to join our 90k+ ML SubReddit.
Sana HassanSana Hassan, a consulting intern at Marktechpost and dual-degree student at IIT Madras, is passionate about applying technology and AI to address real-world challenges. With a keen interest in solving practical problems, he brings a fresh perspective to the intersection of AI and real-life solutions.Sana Hassanhttps://www.marktechpost.com/author/sana-hassan/LLMs Still Struggle to Cite Medical Sources Reliably: Stanford Researchers Introduce SourceCheckup to Audit Factual Support in AI-Generated ResponsesSana Hassanhttps://www.marktechpost.com/author/sana-hassan/Stanford Researchers Propose FramePack: A Compression-based AI Framework to Tackle Drifting and Forgetting in Long-Sequence Video Generation Using Efficient Context Management and SamplingSana Hassanhttps://www.marktechpost.com/author/sana-hassan/LLMs Can Be Misled by Surprising Data: Google DeepMind Introduces New Techniques to Predict and Reduce Unintended Knowledge ContaminationSana Hassanhttps://www.marktechpost.com/author/sana-hassan/LLMs Can Now Learn to Try Again: Researchers from Menlo Introduce ReZero, a Reinforcement Learning Framework That Rewards Query Retrying to Improve Search-Based Reasoning in RAG Systems
0 التعليقات
0 المشاركات
12 مشاهدة