# Data Lakehouse for "Big" Observability

Usually, a single-tenant or isolated multi-tenant approach is sufficient to meet the observability needs of startups and enterprises, respectively. Aside from running distributed queries across tenants, is there a graceful approach to storing data from all tenants for very long periods of time? Can this be stored in a cost-effective, nicely partitioned, and compressed way?

## Object Storage

Object storage represents a "bottomless" and horizontally concurrent way to store batches of data, such as `otlp_json`. The `awss3exporter` within the OTel Collector Contrib provides a great way to batch write to S3.

An example config:

```yaml
receivers:
  otlp:
    protocols:
      http:
        endpoint: '0.0.0.0:4318'

processors:
  batch:

exporters:
  awss3:
    s3uploader:
      region: "us-west-2"
      s3_bucket: "omlet-logs"
      # root “directory” inside the bucket; you can change "logs" to whatever prefix you like
      s3_prefix: "logs"
      # partition by year/month/day/hour/minute
      s3_partition_format: "year=%Y/month=%m/day=%d/hour=%H/minute=%M"
       # gzip compression is optional
      compression: gzip
    sending_queue:
      enabled: true
      num_consumers: 5
      queue_size: 50
    timeout: 10s
  debug:

service:
  pipelines:
    logs:
      receivers: [otlp]
      processors: [batch]
      exporters: [debug, awss3]
```

The above configuration takes OTLP data (in this example, only logs) and batches it to an S3 bucket. While straightforward, this setup unlocks **major capabilities.**

## Lakehouse DB

OLAP databases like DuckDB and Clickhouse (among others) can easily connect to this storage system. We use CHDB (embedded Clickhouse) to access this object storage.

```python
import os
import json
import gzip
import logging
import boto3
from datetime import datetime, timedelta
import chdb
from itertools import islice
from concurrent.futures import ThreadPoolExecutor, as_completed

# ───────────────────────────────
# Configure logging
# ───────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("otel_ingest")

# ───────────────────────────────
# AWS creds
# ───────────────────────────────
assert os.environ.get("AWS_ACCESS_KEY_ID"), "Missing AWS_ACCESS_KEY_ID"
assert os.environ.get("AWS_SECRET_ACCESS_KEY"), "Missing AWS_SECRET_ACCESS_KEY"
log.info("AWS credentials loaded")

# ───────────────────────────────
# List S3 files for last day
# ───────────────────────────────
s3 = boto3.client("s3", region_name="us-west-2")
now = datetime.utcnow()
start = now - timedelta(days=1)
paginator = s3.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket="omlet-logs", Prefix="logs/")

all_keys = [
    obj["Key"]
    for page in pages
    for obj in page.get("Contents", [])
    if start <= obj["LastModified"].replace(tzinfo=None) <= now and obj["Key"].endswith(".json.gz")
]
log.info(f"Total files found: {len(all_keys)}")
if not all_keys:
    exit(0)

# ───────────────────────────────
# Create target table
# ───────────────────────────────
conn = chdb.connect()
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS otel_logs (
    Timestamp           DateTime64(9),
    TraceId             String,
    SpanId              String,
    TraceFlags          UInt8,
    SeverityText        String,
    SeverityNumber      UInt8,
    ServiceName         String,
    Body                String,
    ResourceAttributes  Map(String, String),
    ScopeAttributes     Map(String, String),
    LogAttributes       Map(String, String)
) ENGINE = Memory
""")
log.info("Table otel_logs ready")

# ───────────────────────────────
# Helper to process a single file → list of dicts
# ───────────────────────────────
def parse_file(key):
    log.info(f"Starting download of {key}")
    blob = s3.get_object(Bucket="omlet-logs", Key=key)["Body"].read()
    log.info(f"Downloaded {key} ({len(blob)} bytes)")
    payload = json.loads(gzip.decompress(blob))
    entries = payload.get("resourceLogs", [])
    log.info(f"Parsed JSON for {key}, found {len(entries)} resourceLogs entries")
    
    out = []
    for r in entries:
        res_attrs = {a["key"]: a["value"]["stringValue"] for a in r["resource"]["attributes"]}
        for sl in r["scopeLogs"]:
            scope_attrs = {a["key"]: a["value"]["stringValue"] for a in sl["scope"].get("attributes", [])}
            service = sl["scope"].get("name", "")
            for logrec in sl["logRecords"]:
                ts = datetime.utcfromtimestamp(int(logrec["timeUnixNano"]) / 1e9).isoformat(sep=' ')
                rec = {
                    "Timestamp": ts,
                    "TraceId": logrec.get("traceId", ""),
                    "SpanId": logrec.get("spanId", ""),
                    "TraceFlags": logrec.get("traceFlags", 0),
                    "SeverityText": logrec.get("severityText", ""),
                    "SeverityNumber": logrec.get("severityNumber", 0),
                    "ServiceName": service,
                    "Body": logrec["body"]["stringValue"],
                    "ResourceAttributes": res_attrs,
                    "ScopeAttributes": scope_attrs,
                    "LogAttributes": {a["key"]: a["value"]["stringValue"] for a in logrec.get("attributes", [])}
                }
                out.append(rec)
    log.info(f"Processed {key}, constructed {len(out)} records")
    return out

# ───────────────────────────────
# Chunking utilities
# ───────────────────────────────
def chunked(iterable, size):
    it = iter(iterable)
    while True:
        batch = list(islice(it, size))
        if not batch:
            break
        yield batch

batch_size = 100
row_batch_size = 100000  # JSONEachRow batch size
total = 0

# ───────────────────────────────
# Main loop: batches of files
# ───────────────────────────────
for batch_keys in chunked(all_keys, batch_size):
    log.info(f"Processing batch of {len(batch_keys)} files")
    # parallel parse → list of dicts
    rows_accum = []
    with ThreadPoolExecutor(max_workers=8) as ex:
        futures = {ex.submit(parse_file, key): key for key in batch_keys}
        for fut in as_completed(futures):
            rows_accum.extend(fut.result())
    
    # bulk insert in JSONEachRow chunks
    for chunk in chunked(rows_accum, row_batch_size):
        payload = "\n".join(json.dumps(r, default=str) for r in chunk)
        sql = "INSERT INTO otel_logs FORMAT JSONEachRow\n" + payload
        cur.execute(sql)
        log.info(f"Inserted JSONEachRow batch of {len(chunk)} rows")
        total += len(chunk)

log.info(f"Total rows inserted: {total}")

# Example select query
import pandas as pd
df = pd.read_sql("SELECT SeverityText, count() AS cnt FROM otel_logs GROUP BY SeverityText", conn)
log.info("Counts by severity:\n" + df.to_string(index=False))

conn.close()
log.info("Done.")
```

In the code above, we load a series of compressed (gzip) `otlp_json` files from a specific time window (the past day), create an in-memory table, insert rows in batches, and query for counts by severity level.

## Going “Wide”

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1748130920062/c929cd99-eafb-489e-a371-9a3661689fa1.png align="center")

The code above shows a simple way to query using one worker. However, the advantage of object storage is that it allows multiple workers to access partitions in the bucket. For example:

1. Access log/trace data by minute per worker for the past hour for large-scale data and aggregate it.
    
2. Partition by "Service" and "Time" to query high-resolution data at scale during live incidents.
    

Our vision at Omlet is to use OTel as a tool to create the best Observability Lakehouse experience.
