6.2. Data Sensors
Building resilient data pipelines by validating data availability instead of relying solely on task completion status.
Introduction
A common anti-pattern in data orchestration is relying on task completion to determine if downstream pipelines can proceed. Task completion indicates a process finished, but doesn’t guarantee the expected data is actually available.
Data sensors verify the actual presence and freshness of data before proceeding. This makes pipelines schedule-agnostic - they can be safely triggered at any time, not just at specific cadence intervals. This simplifies development, testing, backfills, and incident recovery.
The Problem with Task-Based Dependencies
Traditional task-based dependencies create brittle pipelines. Consider a reporting pipeline depending on a core data pipeline:
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_core_pipeline = ExternalTaskSensor(
task_id="wait_for_core_pipeline",
external_dag_id="core_data_pipeline",
external_task_id="final_task",
execution_date_fn=lambda dt: dt,
)
Problems:
Manual Trigger Failures: Can’t trigger at arbitrary times for testing or backfill
Delayed Dependencies: May start before data is available if upstream is delayed
Date Logic Complexity: Calculating
execution_date_fnfor different schedules is error-proneCross-Schedule Issues: Monthly pipelines depending on daily data require complex date calculations
Data-Based Dependencies: A Better Approach
Instead of checking if a task completed, verify that the data exists:
from airflow.providers.databricks.sensors.databricks_sql import DatabricksSqlSensor
from datetime import timedelta
wait_for_data = DatabricksSqlSensor(
task_id="wait_for_subscription_data",
sql="""
SELECT 1
FROM prod_monetization.core.active_subscriptions
WHERE event_date = '{{ ds }}'
LIMIT 1
""",
poke_interval=timedelta(minutes=10),
timeout=timedelta(hours=2),
mode="reschedule",
)
Advantages:
Works regardless of when triggered (scheduled, manual, backfill)
Ensures data actually exists before proceeding
Automatically waits if upstream is delayed
No complex date calculations needed
Can trigger at any time during the data interval
Real-World Example
Monthly reporting pipeline depending on daily core data:
from datetime import timedelta
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.providers.databricks.sensors.databricks_sql import DatabricksSqlSensor
from pendulum import datetime as pdatetime
CATALOG = "prod_monetization"
CORE_SCHEMA = "core"
BERLIN_TZ = "Europe/Berlin"
# Templated date variable using Airflow's Jinja2 templating
DS = "{{ data_interval_end.in_timezone(dag.timezone) | ds }}"
@dag(
dag_id="process_customer_lifetime_value_models",
tags=["analytics", "monetization"],
start_date=pdatetime(2025, 8, 1, tz=BERLIN_TZ),
schedule="@monthly",
catchup=False,
max_active_runs=1,
doc_md="""
### Customer Lifetime Value Reporting Pipeline
Processes monthly CLV models for customer segmentation and retention analysis.
**Dependencies:**
- Requires completed subscription data in core layer
- Waits for data availability using DatabricksSqlSensor
**Schedule:** Monthly (first day of month)
**Safe Manual Triggers:** Yes - can be triggered at any time during the month
""",
)
def customer_lifetime_value_dag():
"""
Monthly pipeline for processing customer lifetime value models.
This DAG demonstrates the use of data sensors to verify data availability
before processing, making it safe to trigger manually at any time.
"""
start_task = EmptyOperator(
task_id="start",
doc_md="Start of the pipeline",
)
# Data sensor: waits for subscription data to be available
wait_for_subscription_data = DatabricksSqlSensor(
task_id="wait_for_subscription_data",
sql=f"""
SELECT COUNT(*) as row_count
FROM {CATALOG}.{CORE_SCHEMA}.active_subscriptions
WHERE event_date = TO_DATE('{DS}')
HAVING COUNT(*) > 100
""",
poke_interval=timedelta(minutes=20),
timeout=timedelta(hours=3),
mode="reschedule",
doc_md="""
Waits for subscription data to be available in the core layer.
**Query Logic:**
- Checks for data on the DAG's execution date
- Requires at least 100 rows (data quality check)
- Uses reschedule mode to avoid blocking workers
**Why This Works:**
- Can be triggered manually at any time
- Works regardless of when upstream pipelines complete
- Self-heals if upstream is delayed
""",
)
# Placeholder for dbt task group
# In real implementation, this would be:
# from utils.dbt_helpers import get_dbt_task_group
# clv_models = get_dbt_task_group(
# group_id="dbt_clv_models",
# catalog=CATALOG,
# select=["path:models/monetization/report/customer_lifetime_value"],
# )
process_clv_models = EmptyOperator(
task_id="process_clv_models",
doc_md="""
Runs dbt models for customer lifetime value analysis.
Includes:
- Customer segmentation by value
- Cohort analysis
- Churn prediction features
- Retention metrics
""",
)
end_task = EmptyOperator(
task_id="end",
doc_md="End of the pipeline",
)
# Define task dependencies
start_task >> wait_for_subscription_data >> process_clv_models >> end_task
# Instantiate the DAG
customer_lifetime_value_dag()
Key Components:
Templated Date:
DSuses Jinja2 templating for the execution dateDatabricksSqlSensor: Checks for actual data presence
Poke Interval: Checks every 20 minutes
Timeout: Fails after 3 hours if data doesn’t arrive
Reschedule Mode: Releases worker slot between checks
Why This Pattern Works Better
Manual Triggers
# Works seamlessly - checks if data exists for that date
airflow dags trigger process_customer_lifetime_value_models \
--conf '{"execution_date": "2025-09-15"}'
With task sensors, this fails unless you trigger all upstream DAGs with matching execution dates.
Cross-Schedule Dependencies
# Monthly pipeline waiting for month-end daily data
wait_for_month_end_data = DatabricksSqlSensor(
task_id="wait_for_month_end_data",
sql="""
SELECT 1
FROM prod_analytics.core.daily_metrics
WHERE event_date = LAST_DAY(TO_DATE('{{ ds }}'))
LIMIT 1
""",
poke_interval=timedelta(minutes=30),
timeout=timedelta(hours=6),
)
Date logic in SQL is more maintainable than Python execution_date calculations.
Configuration Best Practices
Poke Intervals
poke_interval=timedelta(minutes=10) # Predictable data arrival
poke_interval=timedelta(minutes=30) # Variable arrival times
poke_interval=timedelta(hours=1) # Monthly data
Timeouts
timeout=timedelta(hours=3) # Daily data (1hr typical + buffer)
timeout=timedelta(hours=8) # Complex monthly processing
Reschedule Mode
Use mode="reschedule" to release worker slots between checks.
Query Optimization
# Good: Quick check with partition filter
sql="""
SELECT 1
FROM prod_analytics.core.events
WHERE event_date = '{{ ds }}'
LIMIT 1
"""
# With quality checks
sql="""
SELECT COUNT(*) as cnt
FROM prod_analytics.core.events
WHERE event_date = '{{ ds }}'
AND user_id IS NOT NULL
HAVING COUNT(*) >= 1000
"""
Advanced Patterns
Multiple Dependencies
from airflow.utils.task_group import TaskGroup
with TaskGroup("wait_for_dependencies") as wait_group:
wait_for_users = DatabricksSqlSensor(
task_id="wait_for_users",
sql="SELECT 1 FROM prod_crm.core.users WHERE updated_date = '{{ ds }}' LIMIT 1",
poke_interval=timedelta(minutes=15),
timeout=timedelta(hours=2),
)
wait_for_transactions = DatabricksSqlSensor(
task_id="wait_for_transactions",
sql="SELECT 1 FROM prod_payments.core.transactions WHERE event_date = '{{ ds }}' LIMIT 1",
poke_interval=timedelta(minutes=15),
timeout=timedelta(hours=2),
)
Date Ranges
# Monthly pipeline checking entire month
wait_for_month_data = DatabricksSqlSensor(
task_id="wait_for_month_data",
sql="""
SELECT COUNT(DISTINCT event_date) as days
FROM prod_analytics.core.daily_events
WHERE event_date >= DATE_TRUNC('MONTH', TO_DATE('{{ ds }}'))
AND event_date < DATE_TRUNC('MONTH', TO_DATE('{{ ds }}')) + INTERVAL 1 MONTH
HAVING COUNT(DISTINCT event_date) >= 28
""",
poke_interval=timedelta(hours=1),
timeout=timedelta(hours=12),
)
Comparison: Task vs. Data Sensors
Task Sensor (Anti-Pattern)
wait_for_upstream = ExternalTaskSensor(
task_id="wait_for_core_pipeline",
external_dag_id="core_data_pipeline",
external_task_id="load_data",
execution_date_fn=lambda dt: dt - timedelta(hours=1),
)
Problems: Can’t trigger manually, fails if upstream delayed, complex date logic.
Data Sensor (Best Practice)
wait_for_data = DatabricksSqlSensor(
task_id="wait_for_data",
sql="SELECT 1 FROM prod_monetization.core.subscriptions WHERE event_date = '{{ ds }}' LIMIT 1",
poke_interval=timedelta(minutes=10),
timeout=timedelta(hours=2),
mode="reschedule",
)
Advantages: Works with manual triggers, self-heals, no date calculations, independent of upstream structure.
Migration Strategy
Identify Dependencies: List all
ExternalTaskSensorinstancesAdd Data Sensors in Parallel: Keep existing sensors while testing new ones
Monitor: Validate data sensors work correctly
Remove Task Sensors: Once confident, remove old approach
Document: Specify data contracts for each DAG
Common Pitfalls
Over-Checking
# Bad: Too frequent
poke_interval=timedelta(minutes=1)
# Good: Reasonable intervals
poke_interval=timedelta(minutes=15)
Missing Partition Filters
# Bad: Full table scan
sql="SELECT COUNT(*) FROM large_table WHERE status = 'complete'"
# Good: Partition filter
sql="SELECT COUNT(*) FROM large_table WHERE event_date = '{{ ds }}' AND status = 'complete'"
Insufficient Timeouts
# Risky
timeout=timedelta(minutes=30)
# Safer
timeout=timedelta(hours=2)