Skip to main content
Depths v0.1.1 exposes clear knobs for ingestion and batching. In this guide you will create a depths.core.config.LogProducerConfig and a depths.core.config.LogAggregatorConfig, attach them to depths.core.config.DepthsLoggerOptions, run a short ingest, verify that options persist to disk, and recreate the logger without passing options to confirm reload.

What you will build

  • A customized depths.core.logger.DepthsLogger with tuned producer and aggregator
  • A small dataset ingested locally
  • A check that options.json is written to the instance and reloaded on restart

Prerequisites

  • Python 3.12+
  • pip install depths

Imports and versions

The instance layout uses INSTANCE_DIR/INSTANCE_ID as the instance root. Options are stored under <instance_root>/configs/options.json.
import os, json, time, datetime as dt
from pathlib import Path

from depths.core.config import (
    LogProducerConfig,
    LogAggregatorConfig,
    DepthsLoggerOptions,
)
from depths.core.logger import DepthsLogger

Producer knobs — depths.core.config.LogProducerConfig

The producer handles validation, normalization, and queueing before rows hit the aggregator. The key fields are queue size and drop policy. Validation is enabled by default. The key aspect of producer is to control how much dynamic memory do you want to allocate for the in-memory queue. But it is also a tradeoff between persistence and throughput: larger in-memory queue gives higher throughput but also increases odds of data loss if the server crashes. Similarly, keeping the in-memory queue too small applies backpressure frequently, causing incoming signals to be dropped even before they hit the aggregator So, think of producer as the conduit between the tap (incoming signals) and the swimming pool (aggregator)
producer = LogProducerConfig(
    max_queue_size=2048,
    drop_policy="block",
    validate_required=True,
    validate_types=True,
    normalize_service_name=True,
    default_service_name="unknown",
)

Aggregator knobs — depths.core.config.LogAggregatorConfig

The aggregator batches rows into typed Polars frames and appends to a Delta table. The core controls are batch age (how long does a batch of data stays in-memory) and row thresholds. Strict frames keep types stable. So, essentially, tuning Aggregator is tuning a tradeoff between throughput and persistence. If you increase the batch age, you are doing fewer disk writes so potentially higher throughput, but then more data is in-memory: potential loss if the program terminates Similarly, keeping it too low blows up file counts on the disk and makes the write process slow amortized, hence lower throughput but higher persistence guarantee.
aggregator = LogAggregatorConfig(
    max_age_s=0.5,
    min_batch_rows=100,
    max_batch_rows=1000,
    strict_df=True,
)

Assemble depths.core.config.DepthsLoggerOptions

Options orchestrate startup, signal hooks, and shipper toggles. Signal handlers essentially tell the DepthsLogger how to handle program terminations with as much grace as possible (flush to disk wherever possible). The auto start toggle tells DepthsLogger to not require an explicit DepthsLogger.start() before starting to ingest telemetry. The DepthsLogger.start() behind the scenes essentially gears up the producer to start handling incoming signals. The auto_start toggle hides this behind the scenes for you. There can be instances where you would exact control when the logging should start in the code, and for those scenarios, you would opt for manually controlling this (so, auto_start=False and a manual DepthsLogger.start() when a start is desired)
options = DepthsLoggerOptions(
    auto_start=True,
    install_signal_handlers=False,
    atexit_hook=True,
    producer_config=producer,
    aggregator_config=aggregator,
)

Initialize depths.core.logger.DepthsLogger with options

The logger prepares the instance, merges and persists options, and starts the aggregators.
INSTANCE_ID = "custom_opts_demo"
INSTANCE_DIR = os.path.abspath("./depths_custom_opts")
INSTANCE_ROOT = os.path.join(INSTANCE_DIR, INSTANCE_ID)

logger = DepthsLogger(
    instance_id=INSTANCE_ID,
    instance_dir=INSTANCE_DIR,
    options=options,
)

Ingest a sample batch

A tiny dataset with varying severities makes filtering obvious.
PROJECT_ID = "opts_project"
SERVICE_NAME = "opts_service"
N = 600

now_ns = lambda: int(time.time() * 1_000_000_000)

accepted = 0
for i in range(N):
    sev_num = 13 if (i % 5 == 0) else 9
    sev_txt = "WARN" if sev_num >= 13 else "INFO"
    ok, _ = logger.ingest_log(
        {
            "project_id": PROJECT_ID,
            "service_name": SERVICE_NAME,
            "time_unix_nano": now_ns(),
            "severity_number": sev_num,
            "severity_text": sev_txt,
            "body": f"opts row {i}",
        }
    )
    if ok:
        accepted += 1

Stop and flush

With an explicit stop(), we gracefully tell the DepthsLogger instance to flush the remaining in-memory data to memory. The auto lets the age expiry gracefully come through. You can also do flush="none" to immediately flush without waiting for batch age to expire. The key difference is the flush="none" doesn’t drain the non-aggregated signals in the Producer’s queue. It simply wraps up the queued disk write tasks and shutdowns.
logger.stop(flush="auto")

Verify persisted rows

Use the named read helper to pull a small projection.
today = dt.datetime.utcnow().strftime("%Y-%m-%d")

rows = logger.read_logs(
    date_from=today,
    date_to=today,
    project_id=PROJECT_ID,
    service_name=SERVICE_NAME,
    select=["event_ts", "severity_text", "body", "service_name"],
    max_rows=5,
)

print(len(rows))
for r in rows:
    print(r)

Verify options persistence (options.json)

Options are serialized in the instance configs. Load the file and check selected fields. The exact path is <instance_root>/configs/options.json.
opts_path = Path(INSTANCE_ROOT, "configs", "options.json")
on_disk = json.loads(opts_path.read_text())

print(on_disk["producer_config"]["max_queue_size"])
print(on_disk["producer_config"]["drop_policy"])
print(on_disk["aggregator_config"]["max_age_s"])
print(on_disk["aggregator_config"]["max_batch_rows"])

Recreate logger without options and verify reload

Construct a new logger pointing at the same instance. The saved options are merged in automatically.
logger2 = DepthsLogger(
    instance_id=INSTANCE_ID,
    instance_dir=INSTANCE_DIR,
)

rows2 = logger2.read_logs(
    date_from=today,
    date_to=today,
    project_id=PROJECT_ID,
    service_name=SERVICE_NAME,
    select=["event_ts", "severity_text", "body", "service_name"],
    max_rows=3,
)

print(len(rows2))
for r in rows2:
    print(r)

Quick peek query (WARN+ with substring)

Filter helpers are pushed down and collected only at the end.
q = logger2.read_logs(
    date_from=today,
    date_to=today,
    project_id=PROJECT_ID,
    service_name=SERVICE_NAME,
    severity_ge=13,
    body_like="opts",
    select=["event_ts", "severity_text", "body", "service_name"],
    max_rows=5,
    return_as="lazy",
)

print(q.collect())

What just happened

  • The producer enforced queueing and validation using LogProducerConfig
  • The aggregator flushed on age or row thresholds using LogAggregatorConfig
  • DepthsLoggerOptions persisted to <instance_root>/configs/options.json and were reloaded on the next run
  • read_logs applied equality, substring, severity, and time-range filters with projection and row limits

Wrap-up and next steps

  • Keep these knobs small at first, then widen max_batch_rows or max_age_s to trade latency for throughput
  • Move on to Querying possibilities with Depths to explore grouped and lazy reads
  • When you are ready for object storage, try S3 backups from scratch