From 3bbf77a8a552c4b87db994e829a49479a14bb937 Mon Sep 17 00:00:00 2001 From: medusa Date: Tue, 4 Jun 2024 17:57:30 +0000 Subject: [PATCH] Add tech_docs/airflow_mqtt.md --- tech_docs/airflow_mqtt.md | 392 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 392 insertions(+) create mode 100644 tech_docs/airflow_mqtt.md diff --git a/tech_docs/airflow_mqtt.md b/tech_docs/airflow_mqtt.md new file mode 100644 index 0000000..6eeda7a --- /dev/null +++ b/tech_docs/airflow_mqtt.md @@ -0,0 +1,392 @@ +### Detailed Orchestration with Airflow + +Orchestration with Airflow involves setting up Directed Acyclic Graphs (DAGs) that define a sequence of tasks to be executed in a specific order. This ensures that each step in the workflow is completed before the next one begins, and it allows for scheduling, monitoring, and managing the data pipeline efficiently. + +Here’s a more detailed explanation of the orchestration portion, including setting up Airflow, defining tasks, and managing dependencies. + +#### Setting Up Airflow + +1. **Install Airflow**: + - You can install Airflow using pip. It's recommended to use a virtual environment. + + ```bash + pip install apache-airflow + ``` + +2. **Initialize Airflow Database**: + - Initialize the Airflow metadata database. + + ```bash + airflow db init + ``` + +3. **Start Airflow Web Server and Scheduler**: + - Start the web server and scheduler in separate terminal windows. + + ```bash + airflow webserver + airflow scheduler + ``` + +4. **Create Airflow Directory Structure**: + - Create the necessary directory structure for your Airflow project. + + ```bash + mkdir -p ~/airflow/dags + mkdir -p ~/airflow/plugins + mkdir -p ~/airflow/logs + ``` + +5. **Set Up Airflow Configuration**: + - Ensure your Airflow configuration file (`airflow.cfg`) is correctly set up to point to these directories. + +#### Defining the Airflow DAG + +Create a DAG that orchestrates the entire workflow from data ingestion to ML inference. + +##### Example Airflow DAG: `sensor_data_pipeline.py` + +1. **Import Necessary Libraries**: + + ```python + from airflow import DAG + from airflow.operators.python_operator import PythonOperator + from airflow.operators.bash_operator import BashOperator + from airflow.utils.dates import days_ago + from datetime import timedelta + import os + ``` + +2. **Set Default Arguments**: + + ```python + default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + } + ``` + +3. **Define the DAG**: + + ```python + dag = DAG( + 'sensor_data_pipeline', + default_args=default_args, + description='A DAG for processing sensor data', + schedule_interval=timedelta(minutes=10), + start_date=days_ago(1), + catchup=False, + ) + ``` + +4. **Define Tasks**: + + - **Ingest MQTT Data**: Run the MQTT subscriber script to collect sensor data. + + ```python + def subscribe_to_mqtt(): + import paho.mqtt.client as mqtt + import json + import pandas as pd + from datetime import datetime + import sqlite3 + + def on_message(client, userdata, message): + payload = json.loads(message.payload.decode()) + df = pd.DataFrame([payload]) + df['timestamp'] = datetime.now() + conn = sqlite3.connect('/path/to/sensor_data.db') + df.to_sql('raw_sensor_data', conn, if_exists='append', index=False) + conn.close() + + client = mqtt.Client() + client.on_message = on_message + client.connect("mqtt_broker_host", 1883, 60) + client.subscribe("sensors/data") + client.loop_forever() + + ingest_mqtt_data = PythonOperator( + task_id='ingest_mqtt_data', + python_callable=subscribe_to_mqtt, + dag=dag, + ) + ``` + + - **Transform Data with dbt**: Run dbt models to clean and transform the data. + + ```python + transform_data = BashOperator( + task_id='transform_data', + bash_command='dbt run --profiles-dir /path/to/your/dbt/project', + dag=dag, + ) + ``` + + - **Run ML Inference**: Execute the ML inference script to make predictions. + + ```python + def run_inference(): + import pandas as pd + import sqlite3 + import joblib + + def load_transformed_data(): + conn = sqlite3.connect('/path/to/sensor_data.db') + query = "SELECT * FROM aggregated_sensor_data" + df = pd.read_sql_query(query, conn) + conn.close() + return df + + def make_predictions(data): + model = joblib.load('/path/to/your_model.pkl') + predictions = model.predict(data[['avg_temperature', 'avg_humidity']]) + data['predictions'] = predictions + return data + + def save_predictions(data): + conn = sqlite3.connect('/path/to/sensor_data.db') + data.to_sql('sensor_predictions', conn, if_exists='append', index=False) + conn.close() + + data = load_transformed_data() + predictions = make_predictions(data) + save_predictions(predictions) + + ml_inference = PythonOperator( + task_id='run_inference', + python_callable=run_inference, + dag=dag, + ) + ``` + +5. **Set Task Dependencies**: + + ```python + ingest_mqtt_data >> transform_data >> ml_inference + ``` + +#### Directory Structure + +Ensure your project is structured correctly to support the workflow. + +``` +sensor_data_project/ +├── dags/ +│ └── sensor_data_pipeline.py +├── dbt_project.yml +├── models/ +│ ├── cleaned_sensor_data.sql +│ └── aggregated_sensor_data.sql +├── profiles.yml +├── scripts/ +│ ├── mqtt_subscriber.py +│ ├── ml_inference.py +└── Dockerfile +``` + +#### Docker Integration (Optional) + +For better scalability and reproducibility, consider containerizing your Airflow setup with Docker. + +##### Dockerfile Example + +```Dockerfile +FROM apache/airflow:2.1.2 + +# Copy DAGs and scripts +COPY dags/ /opt/airflow/dags/ +COPY scripts/ /opt/airflow/scripts/ + +# Install additional Python packages +RUN pip install paho-mqtt pandas sqlite3 joblib dbt + +# Set environment variables +ENV AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=False + +# Entry point +ENTRYPOINT ["/usr/bin/dumb-init", "--"] +CMD ["bash", "-c", "airflow webserver & airflow scheduler"] +``` + +### Summary + +Using Airflow for orchestration allows you to: +1. **Schedule and Automate**: Regularly schedule data ingestion, transformation, and ML inference tasks. +2. **Manage Dependencies**: Ensure tasks are executed in the correct order. +3. **Monitor and Alert**: Monitor the status of your workflows and get alerts on failures. +4. **Scalability**: Easily scale your workflows by distributing tasks across multiple workers. + +By structuring your project with these components, you can create a robust, end-to-end data pipeline that ingests MQTT sensor data, processes it, runs ML inference, and provides actionable insights. + +--- + +Yes, MQTT shares some similarities with multicast in that both involve a publisher/subscriber model where data is broadcast to multiple recipients. However, there are distinct differences in how they operate and their use cases: + +### Similarities + +1. **Publisher/Subscriber Model**: + - Both MQTT and multicast use a publisher/subscriber model where one entity publishes data and multiple entities can subscribe to receive that data. + +2. **Efficient Data Distribution**: + - Both are designed for efficient data distribution to multiple recipients. + +### Differences + +1. **Transport Protocol**: + - **MQTT**: MQTT operates over TCP/IP and includes mechanisms for reliable message delivery, including Quality of Service (QoS) levels. + - **Multicast**: Multicast typically operates over UDP/IP, which does not guarantee delivery, ordering, or duplicate protection. + +2. **Broker vs. Network Layer**: + - **MQTT**: Uses a broker (server) to manage message routing between publishers and subscribers. The broker handles message distribution, connection management, and QoS. + - **Multicast**: Operates at the network layer, where data packets are delivered to multiple recipients based on IP multicast group addresses. There is no central server; the network infrastructure handles data distribution. + +3. **Message Reliability**: + - **MQTT**: Provides different QoS levels to ensure message delivery: + - QoS 0: At most once (fire and forget) + - QoS 1: At least once (acknowledged delivery) + - QoS 2: Exactly once (guaranteed delivery) + - **Multicast**: UDP multicast does not inherently provide reliable message delivery, although application-level protocols can be built on top of it to add reliability. + +4. **Use Cases**: + - **MQTT**: Commonly used in IoT, where devices publish sensor data to a broker, and applications subscribe to this data. Ideal for scenarios requiring reliable communication and complex routing. + - **Multicast**: Often used in applications like streaming media, live broadcasts, and other scenarios where low-latency, one-to-many data distribution is needed, and reliability can be managed at the application level. + +### Example: Using MQTT for Real-Time Data Streams + +Let's consider an example where we use MQTT to subscribe to a stream of sensor data from IoT devices and process it using Airflow and dbt. + +#### Step 1: Set Up MQTT Broker and Clients + +1. **MQTT Broker**: + - Use an MQTT broker like Mosquitto to handle message routing. + + ```bash + mosquitto -v + ``` + +2. **MQTT Publisher (Sensor)**: + - Simulate an IoT device publishing sensor data. + + ```python + import paho.mqtt.client as mqtt + import time + import json + import random + + def publish_sensor_data(): + client = mqtt.Client() + client.connect("localhost", 1883, 60) + while True: + sensor_data = { + "sensor_id": "sensor_1", + "timestamp": time.time(), + "temperature": random.uniform(20.0, 30.0), + "humidity": random.uniform(30.0, 50.0) + } + client.publish("sensors/data", json.dumps(sensor_data)) + time.sleep(5) + + if __name__ == "__main__": + publish_sensor_data() + ``` + +3. **MQTT Subscriber (Airflow Task)**: + - Subscribe to the MQTT topic and process incoming messages. + + ```python + import paho.mqtt.client as mqtt + import json + import pandas as pd + from airflow import DAG + from airflow.operators.python import PythonOperator + from airflow.utils.dates import days_ago + + def on_message(client, userdata, message): + payload = json.loads(message.payload.decode()) + process_payload(payload) + + def process_payload(payload): + df = pd.DataFrame([payload]) + df.to_csv('/tmp/sensor_data.csv', mode='a', header=False, index=False) + + def subscribe_to_mqtt(): + client = mqtt.Client() + client.on_message = on_message + client.connect("localhost", 1883, 60) + client.subscribe("sensors/data") + client.loop_start() + + def ingest_mqtt_data(): + subscribe_to_mqtt() + + default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + } + + with DAG( + 'mqtt_ingestion_dag', + default_args=default_args, + description='A DAG to ingest MQTT data', + schedule_interval=timedelta(minutes=10), + start_date=days_ago(1), + catchup=False, + ) as dag: + + ingest_mqtt_task = PythonOperator( + task_id='ingest_mqtt_data', + python_callable=ingest_mqtt_data, + ) + + ingest_mqtt_task + ``` + +#### Step 2: Transform Data Using dbt + +1. **Set Up dbt Models**: + - Define models to process the ingested sensor data. + + ```sql + -- models/sensor_data.sql + + WITH raw_data AS ( + SELECT + * + FROM + {{ ref('raw_sensor_data') }} + ) + + SELECT + sensor_id, + timestamp, + temperature, + humidity + FROM + raw_data; + ``` + +2. **Run dbt Models in Airflow**: + - Schedule dbt runs to transform the data after ingestion. + + ```python + from airflow.operators.bash import BashOperator + + dbt_run = BashOperator( + task_id='dbt_run', + bash_command='dbt run --profiles-dir /path/to/your/dbt/project', + ) + + ingest_mqtt_task >> dbt_run + ``` + +### Summary + +While MQTT and multicast both enable efficient data distribution to multiple recipients, MQTT provides additional features such as message reliability, quality of service, and broker-based routing, making it well-suited for IoT and other applications requiring reliable, real-time data streams. By integrating MQTT with tools like Airflow and dbt, you can build robust data pipelines that handle real-time data ingestion, transformation, and analysis, providing valuable business insights. \ No newline at end of file