Sensor Data Pipeline
In this tutorial, you will build a realistic data pipeline: a temperature monitoring system for rooms in a building. You will learn how partitioning works, how to use aggregation queries, and how Apiary prunes unnecessary data.
What you will learn:
- How partitioning affects data storage and query performance
- How to write multiple batches of data over time
- How to use GROUP BY with aggregate functions
- How DESCRIBE and SHOW commands help you inspect data
- How partition pruning skips irrelevant data
Prerequisites:
- Completed Your First Apiary
- Apiary installed and working
Step 1: Set Up the Schema
Create a namespace for sensor data, partitioned by room:
from apiary import Apiary
import pyarrow as pa
ap = Apiary("sensor_tutorial")
ap.start()
# Create the namespace
ap.create_hive("building")
ap.create_box("building", "sensors")
ap.create_frame("building", "sensors", "temperature", {
"timestamp": "timestamp",
"room": "utf8",
"temp_celsius": "float64",
"humidity": "float64",
}, partition_by=["room"])
The partition_by=["room"] means Apiary will store data for each room in a separate directory. Queries that filter by room will only read the relevant partition.
Step 2: Generate Sample Data
Create a helper function that simulates sensor readings:
import random
from datetime import datetime, timedelta
def generate_readings(start_time, rooms, readings_per_room):
"""Generate simulated temperature and humidity readings."""
timestamps = []
room_list = []
temps = []
humidities = []
for room in rooms:
# Each room has a base temperature
base_temp = {"kitchen": 22.0, "bedroom": 19.0, "office": 21.0, "garage": 15.0}[room]
base_humidity = {"kitchen": 65.0, "bedroom": 50.0, "office": 45.0, "garage": 55.0}[room]
for i in range(readings_per_room):
timestamps.append(start_time + timedelta(minutes=i * 5))
room_list.append(room)
temps.append(round(base_temp + random.uniform(-3, 3), 1))
humidities.append(round(base_humidity + random.uniform(-10, 10), 1))
return pa.table({
"timestamp": timestamps,
"room": room_list,
"temp_celsius": temps,
"humidity": humidities,
})
rooms = ["kitchen", "bedroom", "office", "garage"]
Step 3: Write Multiple Batches
Simulate data arriving over time by writing three batches:
def write_batch(ap, table):
"""Serialize and write a PyArrow table to Apiary."""
sink = pa.BufferOutputStream()
writer = pa.ipc.new_stream_writer(sink, table.schema)
writer.write_table(table)
writer.close()
return ap.write_to_frame("building", "sensors", "temperature", sink.getvalue().to_pybytes())
# Morning readings
morning = generate_readings(datetime(2026, 2, 10, 8, 0), rooms, 12)
result = write_batch(ap, morning)
print(f"Morning: {result['rows_written']} rows, {result['cells_written']} cells")
# Afternoon readings
afternoon = generate_readings(datetime(2026, 2, 10, 13, 0), rooms, 12)
result = write_batch(ap, afternoon)
print(f"Afternoon: {result['rows_written']} rows, {result['cells_written']} cells")
# Evening readings
evening = generate_readings(datetime(2026, 2, 10, 18, 0), rooms, 12)
result = write_batch(ap, evening)
print(f"Evening: {result['rows_written']} rows, {result['cells_written']} cells")
Each batch writes 4 cells (one per room partition). After three batches, the frame has 12 cells total.
Step 4: Inspect the Data
Use DESCRIBE to see the frame's current state:
result_bytes = ap.sql("DESCRIBE building.sensors.temperature")
reader = pa.ipc.open_stream(result_bytes)
print(reader.read_all().to_pandas())
This shows the schema, which columns are partitions, and statistics about the stored data.
Use SHOW and frame metadata:
# Frame metadata
info = ap.get_frame("building", "sensors", "temperature")
print(f"Cell count: {info['cell_count']}")
print(f"Row count: {info['row_count']}")
print(f"Total bytes: {info['total_bytes']}")
Step 5: Aggregation Queries
Find the average temperature and humidity per room:
result_bytes = ap.sql("""
SELECT
room,
ROUND(AVG(temp_celsius), 1) AS avg_temp,
ROUND(MIN(temp_celsius), 1) AS min_temp,
ROUND(MAX(temp_celsius), 1) AS max_temp,
ROUND(AVG(humidity), 1) AS avg_humidity
FROM building.sensors.temperature
GROUP BY room
ORDER BY avg_temp DESC
""")
reader = pa.ipc.open_stream(result_bytes)
print(reader.read_all().to_pandas())
Count readings per room:
result_bytes = ap.sql("""
SELECT room, COUNT(*) AS reading_count
FROM building.sensors.temperature
GROUP BY room
""")
reader = pa.ipc.open_stream(result_bytes)
print(reader.read_all().to_pandas())
Step 6: Partition Pruning in Action
When you filter by the partition column (room), Apiary skips partitions entirely:
# This only reads cells from the kitchen partition
# The bedroom, office, and garage cells are never touched
result_bytes = ap.sql("""
SELECT timestamp, temp_celsius, humidity
FROM building.sensors.temperature
WHERE room = 'kitchen'
ORDER BY timestamp
""")
reader = pa.ipc.open_stream(result_bytes)
df = reader.read_all().to_pandas()
print(f"Kitchen readings: {len(df)} rows")
print(df.head(10))
You can also use read_from_frame() with partition_filter to read a specific partition directly, bypassing SQL:
# Read only the kitchen partition via the SDK
result_bytes = ap.read_from_frame(
"building", "sensors", "temperature",
partition_filter={"room": "kitchen"}
)
reader = pa.ipc.open_stream(result_bytes)
df = reader.read_all().to_pandas()
print(f"Kitchen readings (via partition filter): {len(df)} rows")
In both cases, only cells in the room=kitchen partition are read from storage. The other partitions are never touched.
Step 7: Combine Filters
Partition pruning combined with cell statistics skips even more data:
# Only reads kitchen cells where temp might exceed 23
result_bytes = ap.sql("""
SELECT timestamp, temp_celsius
FROM building.sensors.temperature
WHERE room = 'kitchen' AND temp_celsius > 23.0
ORDER BY temp_celsius DESC
""")
reader = pa.ipc.open_stream(result_bytes)
print(reader.read_all().to_pandas())
Step 8: Clean Up
ap.shutdown()
print("Tutorial complete!")
What You Learned
- Partitioning stores data by column value, enabling efficient queries on that column
- Multiple writes create additional cells within each partition
- Aggregation queries (AVG, MIN, MAX, COUNT) work with GROUP BY
- DESCRIBE shows frame schema, partition columns, and statistics
- Partition pruning skips entire directories of data that don't match the WHERE clause (via SQL
WHEREorpartition_filterinread_from_frame()) - Cell statistics enable further pruning within a partition based on min/max values
Next Steps
- Multi-Node Swarm -- Distribute queries across multiple nodes
- Configuration Reference -- Tune cell sizing, cache, and thresholds
- Storage Engine -- Understand how cells and partitions work at a deeper level