Apache Druid ingestion patterns for financial data — Kafka streaming specs, rollup, high-cardinality dimensions, compaction, and querying from Java.
Apache Druid is built for sub-second OLAP queries on event-driven data — exactly the kind of workload you get in financial systems where you need to query millions of trade events, market ticks, or price movements with millisecond latency. Getting ingestion right is the foundation: the decisions you make about rollup, granularity, and dimension cardinality at ingest time directly determine query performance.
This post covers the key ingestion patterns for financial and analytical data: streaming ingestion from Kafka, rollup configuration, high-cardinality handling, and compaction.
Druid stores data in segments organised around a time column. Every row must have a __time field. Data within segments is pre-aggregated at a specified granularity (the query granularity), and segments themselves cover a time chunk (the segment granularity).
Three column types:
__time column — mandatory, drives partitioningFor financial data, the timestamp is typically the trade or market event time, dimensions are instrument identifiers and event types, and metrics are prices and volumes.
For a trade event stream:
{
"type": "kafka",
"dataSchema": {
"dataSource": "trade_events",
"timestampSpec": {
"column": "eventTime",
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [
{ "type": "string", "name": "instrumentId" },
{ "type": "string", "name": "venue" },
{ "type": "string", "name": "side" },
{ "type": "string", "name": "traderRef" }
]
},
"metricsSpec": [
{ "type": "count", "name": "tradeCount" },
{ "type": "doubleSum", "name": "totalVolume", "fieldName": "volume" },
{ "type": "doubleSum", "name": "totalValue", "fieldName": "notionalValue" },
{ "type": "doubleMin", "name": "minPrice", "fieldName": "price" },
{ "type": "doubleMax", "name": "maxPrice", "fieldName": "price" },
{ "type": "doubleLast", "name": "lastPrice", "fieldName": "price" }
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "MINUTE",
"rollup": true
}
},
"ioConfig": {
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "kafka-broker:9092"
},
"topic": "trade-events",
"inputFormat": {
"type": "json"
},
"useEarliestOffset": false
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000,
"maxRowsInMemory": 500000,
"maxBytesInMemory": 0
}
}
With rollup: true and queryGranularity: "MINUTE", all trade events within the same minute for the same (instrumentId, venue, side, traderRef) combination are collapsed into a single row at ingest time. This dramatically reduces storage and query scan cost for aggregation queries.
Rollup is the single biggest performance lever in Druid. When enabled, rows with identical dimension values within the same query granularity bucket are merged into a single row. Counts accumulate, sums accumulate, min/max are maintained.
Use rollup when:
Don’t use rollup when:
For market tick data where you need every individual price point, set rollup: false and use queryGranularity: "NONE".
Financial data often has high-cardinality identifiers — trade IDs, order IDs, customer references. Putting these as dimensions with rollup enabled is wasteful: every row will be unique, so rollup never collapses anything, and you’ve stored a high-cardinality string column that makes segment scans slow.
Handle high-cardinality identifiers:
{
"type": "HLLSketchBuild",
"name": "distinctTraderCount",
"fieldName": "traderId",
"lgK": 12
}
This gives you COUNT(DISTINCT traderId) with ~1% error at a fraction of the storage cost of storing the raw IDs.
For instrument IDs (typically a few thousand values, not millions), including them as a dimension is fine. For order IDs (potentially billions), either exclude them or use a sketch.
Druid exposes a REST API for submitting ingestion tasks. Use it from a Spring Boot management service:
@Service
public class DruidIngestionService {
private final RestClient restClient;
public DruidIngestionService(@Value("${druid.coordinator.url}") String coordinatorUrl) {
this.restClient = RestClient.builder()
.baseUrl(coordinatorUrl)
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
public String submitKafkaTask(String specJson) {
var response = restClient.post()
.uri("/druid/indexer/v1/supervisor")
.body(specJson)
.retrieve()
.body(Map.class);
return (String) response.get("id");
}
public SupervisorStatus getSupervisorStatus(String supervisorId) {
return restClient.get()
.uri("/druid/indexer/v1/supervisor/{id}/status", supervisorId)
.retrieve()
.body(SupervisorStatus.class);
}
public void suspendSupervisor(String supervisorId) {
restClient.post()
.uri("/druid/indexer/v1/supervisor/{id}/suspend", supervisorId)
.retrieve()
.toBodilessEntity();
}
}
For querying from Java, use Druid’s JDBC driver:
@Bean
public DataSource druidDataSource(@Value("${druid.broker.url}") String brokerUrl) {
var ds = new org.apache.calcite.avatica.remote.Driver();
// Druid JDBC URL format:
return new DriverManagerDataSource(
"jdbc:avatica:remote:url=" + brokerUrl + "/druid/v2/sql/avatica/",
"", ""
);
}
// Query example
public List<TradeAggregation> queryHourlyVolume(String instrumentId, Instant from, Instant to) {
var sql = """
SELECT
TIME_FLOOR(__time, 'PT1H') AS hour,
SUM(totalVolume) AS volume,
SUM(totalValue) / SUM(totalVolume) AS vwap,
MIN(minPrice) AS low,
MAX(maxPrice) AS high
FROM trade_events
WHERE instrumentId = ? AND __time BETWEEN ? AND ?
GROUP BY 1
ORDER BY 1
""";
return jdbcTemplate.query(sql,
(rs, row) -> new TradeAggregation(
rs.getTimestamp("hour").toInstant(),
rs.getDouble("volume"),
rs.getDouble("vwap"),
rs.getDouble("low"),
rs.getDouble("high")
),
instrumentId,
Timestamp.from(from),
Timestamp.from(to)
);
}
Druid ingests data into small segments during the streaming ingestion window. Over time, many small segments accumulate for older time ranges — this hurts query performance because Druid must open and scan each segment separately.
Compaction merges small segments into larger ones for older data:
{
"dataSource": "trade_events",
"ioConfig": {
"type": "compact",
"inputSpec": {
"type": "dataSource",
"dataSource": "trade_events",
"interval": "2026-01-01/2026-06-01"
}
},
"tuningConfig": {
"type": "index_parallel",
"targetPartitionSize": 5000000
}
}
Run compaction as a scheduled job via the Coordinator API. For financial data with daily query patterns, compact hourly segments into daily segments overnight — queries against the previous day’s data scan a single segment instead of 24.
Use Druid’s load rules to keep hot data on fast SSD-backed historical nodes and move older data to cheaper storage:
[
{
"type": "loadByInterval",
"interval": "P7D/now",
"tieredReplicants": { "_default_tier": 1 }
},
{
"type": "loadByInterval",
"interval": "P365D/now",
"tieredReplicants": { "cold_tier": 1 }
},
{
"type": "dropForever"
}
]
This keeps 7 days on the default hot tier and a year on a cold tier, dropping anything older.
Timestamp accuracy: Druid uses __time as the primary partition key. If event timestamps are unreliable (late-arriving data, clock skew across producers), set a generous windowPeriod in the ingestion spec so late events are still accepted. For financial data, a 10-minute window is usually sufficient.
Dimension order matters: Druid uses bitmap indexes on dimensions. Placing the dimensions you filter on most frequently first in dimensionsSpec improves segment pruning efficiency.
Segment granularity vs query granularity: Segment granularity (how time chunks are partitioned into files) should be chosen based on data volume per period. For high-frequency trading data, hourly segments work well. For sparse data, daily segments. Query granularity (the rollup bucket) is a separate concern.
If you’re building a real-time analytics layer over financial event streams and need experienced hands on the data engineering side, get in touch.