SQL query translation
Apache Druid supports two query languages: Druid SQL and native queries. This document describes the Druid SQL language.
Druid uses Apache Calcite to parse and plan SQL queries. Druid translates SQL statements into its native JSON-based query language. In general, the slight overhead of translating SQL on the Broker is the only minor performance penalty to using Druid SQL compared to native queries.
This topic includes best practices and tools to help you achieve good performance and minimize the impact of translation.
Best practices
Consider the following non-exhaustive list of best practices when looking into performance implications of translating Druid SQL queries to native queries.
If you wrote a filter on the primary time column
__time
, make sure it is being correctly translated to an"intervals"
filter, as described in the Time filters section below. If not, you may need to change the way you write the filter.Try to avoid subqueries underneath joins: they affect both performance and scalability. This includes implicit subqueries generated by conditions on mismatched types, and implicit subqueries generated by conditions that use expressions to refer to the right-hand side.
Currently, Druid does not support pushing down predicates (condition and filter) past a Join (i.e. into Join's children). Druid only supports pushing predicates into the join if they originated from above the join. Hence, the location of predicates and filters in your Druid SQL is very important. Also, as a result of this, comma joins should be avoided.
Read through the Query execution page to understand how various types of native queries will be executed.
Be careful when interpreting EXPLAIN PLAN output, and use request logging if in doubt. Request logs will show the exact native query that was run. See the next section for more details.
If you encounter a query that could be planned better, feel free to raise an issue on GitHub. A reproducible test case is always appreciated.
Interpreting EXPLAIN PLAN output
The EXPLAIN PLAN functionality can help you understand how a given SQL query will be translated to native. EXPLAIN PLAN statements return:
- a
PLAN
column that contains a JSON array of native queries that Druid will run - a
RESOURCES
column that describes the resources used in the query - an
ATTRIBUTES
column that describes the attributes of the query, including:statementType
: the SQL statement typetargetDataSource
: the target datasource in an INSERT or REPLACE statementpartitionedBy
: the time-based partitioning granularity in an INSERT or REPLACE statementclusteredBy
: the clustering columns in an INSERT or REPLACE statementreplaceTimeChunks
: the time chunks in a REPLACE statement
Example 1: EXPLAIN PLAN for a SELECT
query on the wikipedia
datasource:
Show the query
EXPLAIN PLAN FOR
SELECT
channel,
COUNT(*)
FROM wikipedia
WHERE channel IN (SELECT page FROM wikipedia GROUP BY page ORDER BY COUNT(*) DESC LIMIT 10)
GROUP BY channel
The above EXPLAIN PLAN query returns the following result:
Show the result
[
[
{
"query": {
"queryType": "topN",
"dataSource": {
"type": "join",
"left": {
"type": "table",
"name": "wikipedia"
},
"right": {
"type": "query",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "table",
"name": "wikipedia"
},
"intervals": {
"type": "intervals",
"intervals": [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
]
},
"granularity": {
"type": "all"
},
"dimensions": [
{
"type": "default",
"dimension": "page",
"outputName": "d0",
"outputType": "STRING"
}
],
"aggregations": [
{
"type": "count",
"name": "a0"
}
],
"limitSpec": {
"type": "default",
"columns": [
{
"dimension": "a0",
"direction": "descending",
"dimensionOrder": {
"type": "numeric"
}
}
],
"limit": 10
},
"context": {
"sqlOuterLimit": 101,
"sqlQueryId": "ee616a36-c30c-4eae-af00-245127956e42",
"useApproximateCountDistinct": false,
"useApproximateTopN": false
}
}
},
"rightPrefix": "j0.",
"condition": "(\"channel\" == \"j0.d0\")",
"joinType": "INNER"
},
"dimension": {
"type": "default",
"dimension": "channel",
"outputName": "d0",
"outputType": "STRING"
},
"metric": {
"type": "dimension",
"ordering": {
"type": "lexicographic"
}
},
"threshold": 101,
"intervals": {
"type": "intervals",
"intervals": [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
]
},
"granularity": {
"type": "all"
},
"aggregations": [
{
"type": "count",
"name": "a0"
}
],
"context": {
"sqlOuterLimit": 101,
"sqlQueryId": "ee616a36-c30c-4eae-af00-245127956e42",
"useApproximateCountDistinct": false,
"useApproximateTopN": false
}
},
"signature": [
{
"name": "d0",
"type": "STRING"
},
{
"name": "a0",
"type": "LONG"
}
],
"columnMappings": [
{
"queryColumn": "d0",
"outputColumn": "channel"
},
{
"queryColumn": "a0",
"outputColumn": "EXPR$1"
}
]
}
],
[
{
"name": "wikipedia",
"type": "DATASOURCE"
}
],
{
"statementType": "SELECT"
}
]
Example 2: EXPLAIN PLAN for an INSERT
query that inserts data into the wikipedia
datasource:
Show the query
EXPLAIN PLAN FOR
INSERT INTO wikipedia2
SELECT
TIME_PARSE("timestamp") AS __time,
namespace,
cityName,
countryName,
regionIsoCode,
metroCode,
countryIsoCode,
regionName
FROM TABLE(
EXTERN(
'{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz"]}',
'{"type":"json"}',
'[{"name":"timestamp","type":"string"},{"name":"namespace","type":"string"},{"name":"cityName","type":"string"},{"name":"countryName","type":"string"},{"name":"regionIsoCode","type":"string"},{"name":"metroCode","type":"long"},{"name":"countryIsoCode","type":"string"},{"name":"regionName","type":"string"}]'
)
)
PARTITIONED BY ALL
The above EXPLAIN PLAN returns the following result:
Show the result
[
[
{
"query": {
"queryType": "scan",
"dataSource": {
"type": "external",
"inputSource": {
"type": "http",
"uris": [
"https://druid.apache.org/data/wikipedia.json.gz"
]
},
"inputFormat": {
"type": "json"
},
"signature": [
{
"name": "timestamp",
"type": "STRING"
},
{
"name": "namespace",
"type": "STRING"
},
{
"name": "cityName",
"type": "STRING"
},
{
"name": "countryName",
"type": "STRING"
},
{
"name": "regionIsoCode",
"type": "STRING"
},
{
"name": "metroCode",
"type": "LONG"
},
{
"name": "countryIsoCode",
"type": "STRING"
},
{
"name": "regionName",
"type": "STRING"
}
]
},
"intervals": {
"type": "intervals",
"intervals": [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
]
},
"virtualColumns": [
{
"type": "expression",
"name": "v0",
"expression": "timestamp_parse(\"timestamp\",null,'UTC')",
"outputType": "LONG"
}
],
"resultFormat": "compactedList",
"columns": [
"cityName",
"countryIsoCode",
"countryName",
"metroCode",
"namespace",
"regionIsoCode",
"regionName",
"v0"
],
"context": {
"finalizeAggregations": false,
"forceExpressionVirtualColumns": true,
"groupByEnableMultiValueUnnesting": false,
"maxNumTasks": 5,
"multiStageQuery": true,
"queryId": "42e3de2b-daaf-40f9-a0e7-2c6184529ea3",
"scanSignature": "[{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"},{\"name\":\"countryName\",\"type\":\"STRING\"},{\"name\":\"metroCode\",\"type\":\"LONG\"},{\"name\":\"namespace\",\"type\":\"STRING\"},{\"name\":\"regionIsoCode\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"},{\"name\":\"v0\",\"type\":\"LONG\"}]",
"sqlInsertSegmentGranularity": "{\"type\":\"all\"}",
"sqlQueryId": "42e3de2b-daaf-40f9-a0e7-2c6184529ea3",
"useNativeQueryExplain": true
},
"granularity": {
"type": "all"
}
},
"signature": [
{
"name": "v0",
"type": "LONG"
},
{
"name": "namespace",
"type": "STRING"
},
{
"name": "cityName",
"type": "STRING"
},
{
"name": "countryName",
"type": "STRING"
},
{
"name": "regionIsoCode",
"type": "STRING"
},
{
"name": "metroCode",
"type": "LONG"
},
{
"name": "countryIsoCode",
"type": "STRING"
},
{
"name": "regionName",
"type": "STRING"
}
],
"columnMappings": [
{
"queryColumn": "v0",
"outputColumn": "__time"
},
{
"queryColumn": "namespace",
"outputColumn": "namespace"
},
{
"queryColumn": "cityName",
"outputColumn": "cityName"
},
{
"queryColumn": "countryName",
"outputColumn": "countryName"
},
{
"queryColumn": "regionIsoCode",
"outputColumn": "regionIsoCode"
},
{
"queryColumn": "metroCode",
"outputColumn": "metroCode"
},
{
"queryColumn": "countryIsoCode",
"outputColumn": "countryIsoCode"
},
{
"queryColumn": "regionName",
"outputColumn": "regionName"
}
]
}
],
[
{
"name": "EXTERNAL",
"type": "EXTERNAL"
},
{
"name": "wikipedia",
"type": "DATASOURCE"
}
],
{
"statementType": "INSERT",
"targetDataSource": "wikipedia",
"partitionedBy": {
"type": "all"
}
}
]
Example 3: EXPLAIN PLAN for a REPLACE
query that replaces all the data in the wikipedia
datasource with a DAY
time partitioning, and cityName
and countryName
as the clustering columns:
Show the query
EXPLAIN PLAN FOR
REPLACE INTO wikipedia
OVERWRITE ALL
SELECT
TIME_PARSE("timestamp") AS __time,
namespace,
cityName,
countryName,
regionIsoCode,
metroCode,
countryIsoCode,
regionName
FROM TABLE(
EXTERN(
'{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz"]}',
'{"type":"json"}',
'[{"name":"timestamp","type":"string"},{"name":"namespace","type":"string"},{"name":"cityName","type":"string"},{"name":"countryName","type":"string"},{"name":"regionIsoCode","type":"string"},{"name":"metroCode","type":"long"},{"name":"countryIsoCode","type":"string"},{"name":"regionName","type":"string"}]'
)
)
PARTITIONED BY DAY
CLUSTERED BY cityName, countryName
The above EXPLAIN PLAN query returns the following result:
Show the result
[
[
{
"query": {
"queryType": "scan",
"dataSource": {
"type": "external",
"inputSource": {
"type": "http",
"uris": [
"https://druid.apache.org/data/wikipedia.json.gz"
]
},
"inputFormat": {
"type": "json"
},
"signature": [
{
"name": "timestamp",
"type": "STRING"
},
{
"name": "namespace",
"type": "STRING"
},
{
"name": "cityName",
"type": "STRING"
},
{
"name": "countryName",
"type": "STRING"
},
{
"name": "regionIsoCode",
"type": "STRING"
},
{
"name": "metroCode",
"type": "LONG"
},
{
"name": "countryIsoCode",
"type": "STRING"
},
{
"name": "regionName",
"type": "STRING"
}
]
},
"intervals": {
"type": "intervals",
"intervals": [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
]
},
"virtualColumns": [
{
"type": "expression",
"name": "v0",
"expression": "timestamp_parse(\"timestamp\",null,'UTC')",
"outputType": "LONG"
}
],
"resultFormat": "compactedList",
"columns": [
"cityName",
"countryIsoCode",
"countryName",
"metroCode",
"namespace",
"regionIsoCode",
"regionName",
"v0"
],
"context": {
"finalizeAggregations": false,
"groupByEnableMultiValueUnnesting": false,
"maxNumTasks": 5,
"queryId": "d88e0823-76d4-40d9-a1a7-695c8577b79f",
"scanSignature": "[{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"},{\"name\":\"countryName\",\"type\":\"STRING\"},{\"name\":\"metroCode\",\"type\":\"LONG\"},{\"name\":\"namespace\",\"type\":\"STRING\"},{\"name\":\"regionIsoCode\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"},{\"name\":\"v0\",\"type\":\"LONG\"}]",
"sqlInsertSegmentGranularity": "\"DAY\"",
"sqlQueryId": "d88e0823-76d4-40d9-a1a7-695c8577b79f",
"sqlReplaceTimeChunks": "all"
},
"granularity": {
"type": "all"
}
},
"signature": [
{
"name": "v0",
"type": "LONG"
},
{
"name": "namespace",
"type": "STRING"
},
{
"name": "cityName",
"type": "STRING"
},
{
"name": "countryName",
"type": "STRING"
},
{
"name": "regionIsoCode",
"type": "STRING"
},
{
"name": "metroCode",
"type": "LONG"
},
{
"name": "countryIsoCode",
"type": "STRING"
},
{
"name": "regionName",
"type": "STRING"
}
],
"columnMappings": [
{
"queryColumn": "v0",
"outputColumn": "__time"
},
{
"queryColumn": "namespace",
"outputColumn": "namespace"
},
{
"queryColumn": "cityName",
"outputColumn": "cityName"
},
{
"queryColumn": "countryName",
"outputColumn": "countryName"
},
{
"queryColumn": "regionIsoCode",
"outputColumn": "regionIsoCode"
},
{
"queryColumn": "metroCode",
"outputColumn": "metroCode"
},
{
"queryColumn": "countryIsoCode",
"outputColumn": "countryIsoCode"
},
{
"queryColumn": "regionName",
"outputColumn": "regionName"
}
]
}
],
[
{
"name": "EXTERNAL",
"type": "EXTERNAL"
},
{
"name": "wikipedia",
"type": "DATASOURCE"
}
],
{
"statementType": "REPLACE",
"targetDataSource": "wikipedia",
"partitionedBy": "DAY",
"clusteredBy": ["cityName","countryName"],
"replaceTimeChunks": "all"
}
]
In this case the JOIN operator gets translated to a join
datasource. See the Join translation section
for more details about how this works.
We can see this for ourselves using Druid's request logging feature. After enabling logging and running this query, we can see that it actually runs as the following native query.
{
"queryType": "groupBy",
"dataSource": {
"type": "join",
"left": "wikipedia",
"right": {
"type": "query",
"query": {
"queryType": "topN",
"dataSource": "wikipedia",
"dimension": {"type": "default", "dimension": "page", "outputName": "d0"},
"metric": {"type": "numeric", "metric": "a0"},
"threshold": 10,
"intervals": "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z",
"granularity": "all",
"aggregations": [
{ "type": "count", "name": "a0"}
]
}
},
"rightPrefix": "j0.",
"condition": "(\"page\" == \"j0.d0\")",
"joinType": "INNER"
},
"intervals": "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z",
"granularity": "all",
"dimensions": [
{"type": "default", "dimension": "channel", "outputName": "d0"}
],
"aggregations": [
{ "type": "count", "name": "a0"}
]
}
Query types
Druid SQL uses four different native query types.
Scan is used for queries that do not aggregate—no GROUP BY, no DISTINCT.
Timeseries is used for queries that GROUP BY
FLOOR(__time TO unit)
orTIME_FLOOR(__time, period)
, have no other grouping expressions, no HAVING clause, no nesting, and either no ORDER BY, or an ORDER BY that orders by same expression as present in GROUP BY. It also uses Timeseries for "grand total" queries that have aggregation functions but no GROUP BY. This query type takes advantage of the fact that Druid segments are sorted by time.TopN is used by default for queries that group by a single expression, do have ORDER BY and LIMIT clauses, do not have HAVING clauses, and are not nested. However, the TopN query type will deliver approximate ranking and results in some cases; if you want to avoid this, set "useApproximateTopN" to "false". TopN results are always computed in memory. See the TopN documentation for more details.
GroupBy is used for all other aggregations, including any nested aggregation queries. Druid's GroupBy is a traditional aggregation engine: it delivers exact results and rankings and supports a wide variety of features. GroupBy aggregates in memory if it can, but it may spill to disk if it doesn't have enough memory to complete your query. Results are streamed back from data processes through the Broker if you ORDER BY the same expressions in your GROUP BY clause, or if you don't have an ORDER BY at all. If your query has an ORDER BY referencing expressions that don't appear in the GROUP BY clause (like aggregation functions) then the Broker will materialize a list of results in memory, up to a max of your LIMIT, if any. See the GroupBy documentation for details about tuning performance and memory use.
Time filters
For all native query types, filters on the __time
column will be translated into top-level query "intervals" whenever
possible, which allows Druid to use its global time index to quickly prune the set of data that must be scanned.
Consider this (non-exhaustive) list of time filters that will be recognized and translated to "intervals":
__time >= TIMESTAMP '2000-01-01 00:00:00'
(comparison to absolute time)__time >= CURRENT_TIMESTAMP - INTERVAL '8' HOUR
(comparison to relative time)FLOOR(__time TO DAY) = TIMESTAMP '2000-01-01 00:00:00'
(specific day)
Refer to the Interpreting EXPLAIN PLAN output section for details on confirming that time filters are being translated as you expect.
Joins
SQL join operators are translated to native join datasources as follows:
Joins that the native layer can handle directly are translated literally, to a join datasource whose
left
,right
, andcondition
are faithful translations of the original SQL. This includes any SQL join where the right-hand side is a lookup or subquery, and where the condition is an equality where one side is an expression based on the left-hand table, the other side is a simple column reference to the right-hand table, and both sides of the equality are the same data type.If a join cannot be handled directly by a native join datasource as written, Druid SQL will insert subqueries to make it runnable. For example,
foo INNER JOIN bar ON foo.abc = LOWER(bar.def)
cannot be directly translated, because there is an expression on the right-hand side instead of a simple column access. A subquery will be inserted that effectively transforms this clause tofoo INNER JOIN (SELECT LOWER(def) AS def FROM bar) t ON foo.abc = t.def
.Druid SQL does not currently reorder joins to optimize queries.
Refer to the Interpreting EXPLAIN PLAN output section for details on confirming that joins are being translated as you expect.
Refer to the Query execution page for information about how joins are executed.
Subqueries
Subqueries in SQL are generally translated to native query datasources. Refer to the Query execution page for information about how subqueries are executed.
Note: Subqueries in the WHERE clause, like WHERE col1 IN (SELECT foo FROM ...)
are translated to inner joins.
Approximations
Druid SQL will use approximate algorithms in some situations:
The
COUNT(DISTINCT col)
aggregation functions by default uses a variant of HyperLogLog, a fast approximate distinct counting algorithm. Druid SQL will switch to exact distinct counts if you set "useApproximateCountDistinct" to "false", either through query context or through Broker configuration.GROUP BY queries over a single column with ORDER BY and LIMIT may be executed using the TopN engine, which uses an approximate algorithm. Druid SQL will switch to an exact grouping algorithm if you set "useApproximateTopN" to "false", either through query context or through Broker configuration.
Aggregation functions that are labeled as using sketches or approximations, such as APPROX_COUNT_DISTINCT, are always approximate, regardless of configuration.
A known issue with approximate functions based on data sketches
The APPROX_QUANTILE_DS
and DS_QUANTILES_SKETCH
functions can fail with an IllegalStateException
if one of the sketches for
the query hits maxStreamLength
: the maximum number of items to store in each sketch.
See GitHub issue 11544 for more details.
To workaround the issue, increase value of the maximum string length with the approxQuantileDsMaxStreamLength
parameter
in the query context. Since it is set to 1,000,000,000 by default, you don't need to override it in most cases.
See accuracy information in the DataSketches documentation for how many bytes are required per stream length.
This query context parameter is a temporary solution to avoid the known issue. It may be removed in a future release after the bug is fixed.
Unsupported features
Druid does not support all SQL features. In particular, the following features are not supported.
- JOIN between native datasources (table, lookup, subquery) and system tables.
- JOIN conditions that are not an equality between expressions from the left- and right-hand sides.
- JOIN conditions containing a constant value inside the condition.
- JOIN conditions on a column which contains a multi-value dimension.
- ORDER BY for a non-aggregating query, except for
ORDER BY __time
orORDER BY __time DESC
, which are supported. This restriction only applies to non-aggregating queries; you can ORDER BY any column in an aggregating query. - DDL and DML.
- Using Druid-specific functions like
TIME_PARSE
andAPPROX_QUANTILE_DS
on system tables.
Additionally, some Druid native query features are not supported by the SQL language. Some unsupported Druid features include:
- Inline datasources.
- Spatial filters.
- Multi-value dimensions are only partially implemented in Druid SQL. There are known inconsistencies between their behavior in SQL queries and in native queries due to how they are currently treated by the SQL planner.