Skip to main content

Write and Query Data

Create the Namespace

Before writing data, create the namespace hierarchy: hive, box, and frame.

from apiary import Apiary

ap = Apiary("my_project")
ap.start()

# Create namespace
ap.create_hive("warehouse")
ap.create_box("warehouse", "sales")
ap.create_frame("warehouse", "sales", "orders", {
"order_id": "int64",
"customer": "utf8",
"amount": "float64",
"region": "utf8",
}, partition_by=["region"])

Write Data

Data is written as Arrow IPC bytes. Use PyArrow to create tables and serialize them:

import pyarrow as pa

table = pa.table({
"order_id": [1, 2, 3, 4, 5],
"customer": ["alice", "bob", "alice", "charlie", "bob"],
"amount": [100.0, 250.0, 75.0, 300.0, 150.0],
"region": ["us", "eu", "us", "eu", "us"],
})

# Serialize to Arrow IPC
sink = pa.BufferOutputStream()
writer = pa.ipc.new_stream_writer(sink, table.schema)
writer.write_table(table)
writer.close()

# Write to Apiary
result = ap.write_to_frame("warehouse", "sales", "orders", sink.getvalue().to_pybytes())
print(result) # {"cells_written": 2, "rows_written": 5}

Query with SQL

import pyarrow as pa

# Simple query
result_bytes = ap.sql("SELECT * FROM warehouse.sales.orders LIMIT 10")
reader = pa.ipc.open_stream(result_bytes)
table = reader.read_all()
print(table.to_pandas())

Use Context to Shorten Table Names

ap.sql("USE HIVE warehouse")
ap.sql("USE BOX sales")
result = ap.sql("SELECT * FROM orders LIMIT 10")

Aggregations

result_bytes = ap.sql("""
SELECT customer, COUNT(*) AS order_count, SUM(amount) AS total
FROM warehouse.sales.orders
GROUP BY customer
ORDER BY total DESC
""")
reader = pa.ipc.open_stream(result_bytes)
print(reader.read_all().to_pandas())

Filter with WHERE

# Partition pruning: only reads cells in region=us
result_bytes = ap.sql("SELECT * FROM warehouse.sales.orders WHERE region = 'us'")

Read Data via SDK

For programmatic access without SQL:

# Read all data
data = ap.read_from_frame("warehouse", "sales", "orders")
reader = pa.ipc.open_stream(data)
table = reader.read_all()

# Read with partition filter
data = ap.read_from_frame("warehouse", "sales", "orders", partition_filter={"region": "us"})

Overwrite Data

Replace all data in a frame atomically:

new_table = pa.table({
"order_id": [10, 11],
"customer": ["dave", "eve"],
"amount": [500.0, 600.0],
"region": ["us", "eu"],
})

sink = pa.BufferOutputStream()
writer = pa.ipc.new_stream_writer(sink, new_table.schema)
writer.write_table(new_table)
writer.close()

result = ap.overwrite_frame("warehouse", "sales", "orders", sink.getvalue().to_pybytes())

Inspect the Namespace

# List what exists
ap.list_hives() # ["warehouse"]
ap.list_boxes("warehouse") # ["sales"]
ap.list_frames("warehouse", "sales") # ["orders"]

# Get frame metadata
info = ap.get_frame("warehouse", "sales", "orders")
print(info) # schema, partition_by, cell_count, row_count, total_bytes

# Via SQL
ap.sql("SHOW HIVES")
ap.sql("SHOW BOXES IN warehouse")
ap.sql("SHOW FRAMES IN warehouse.sales")
ap.sql("DESCRIBE warehouse.sales.orders")

Clean Up

ap.shutdown()