Scan queries
Apache Druid supports two query languages: Druid SQL and native queries. This document describes a query type in the native language. For information about when Druid SQL will use this query type, refer to the SQL documentation.
The Scan query returns raw Apache Druid rows in streaming mode.
In addition to straightforward usage where a Scan query is issued to the Broker, the Scan query can also be issued directly to Historical processes or streaming ingestion tasks. This can be useful if you want to retrieve large amounts of data in parallel.
An example Scan query object is shown below:
{
"queryType": "scan",
"dataSource": "wikipedia",
"resultFormat": "list",
"columns":[],
"intervals": [
"2013-01-01/2013-01-02"
],
"batchSize":20480,
"limit":3
}
The following are the main parameters for Scan queries:
property | description | required? |
---|---|---|
queryType | This String should always be "scan"; this is the first thing Druid looks at to figure out how to interpret the query | yes |
dataSource | A String or Object defining the data source to query, very similar to a table in a relational database. See DataSource for more information. | yes |
intervals | A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over. | yes |
resultFormat | How the results are represented: list, compactedList or valueVector. Currently only list and compactedList are supported. Default is list | no |
filter | See Filters | no |
columns | A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned. | no |
batchSize | The maximum number of rows buffered before being returned to the client. Default is 20480 | no |
limit | How many rows to return. If not specified, all rows will be returned. | no |
offset | Skip this many rows when returning results. Skipped rows will still need to be generated internally and then discarded, meaning that raising offsets to high values can cause queries to use additional resources. Together, "limit" and "offset" can be used to implement pagination. However, note that if the underlying datasource is modified in between page fetches in ways that affect overall query results, then the different pages will not necessarily align with each other. | no |
order | The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the __time column is included in the columns field and the requirements outlined in the time ordering section are met. | none |
context | An additional JSON Object which can be used to specify certain flags (see the query context properties section below). | no |
Example results
The format of the result when resultFormat equals list
:
[{
"segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
"columns" : [
"timestamp",
"robot",
"namespace",
"anonymous",
"unpatrolled",
"page",
"language",
"newpage",
"user",
"count",
"added",
"delta",
"variation",
"deleted"
],
"events" : [ {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "1",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "11._korpus_(NOVJ)",
"language" : "sl",
"newpage" : "0",
"user" : "EmausBot",
"count" : 1.0,
"added" : 39.0,
"delta" : 39.0,
"variation" : 39.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "112_U.S._580",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 70.0,
"delta" : 70.0,
"variation" : 70.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._243",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 77.0,
"delta" : 77.0,
"variation" : 77.0,
"deleted" : 0.0
} ]
} ]
The format of the result when resultFormat equals compactedList
:
[{
"segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
"columns" : [
"timestamp", "robot", "namespace", "anonymous", "unpatrolled", "page", "language", "newpage", "user", "count", "added", "delta", "variation", "deleted"
],
"events" : [
["2013-01-01T00:00:00.000Z", "1", "article", "0", "0", "11._korpus_(NOVJ)", "sl", "0", "EmausBot", 1.0, 39.0, 39.0, 39.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "112_U.S._580", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._243", "en", "1", "MZMcBride", 1.0, 77.0, 77.0, 77.0, 0.0]
]
} ]
Time ordering
The Scan query currently supports ordering based on timestamp. Note that using time ordering will yield results that
do not indicate which segment rows are from (segmentId
will show up as null
). Furthermore, time ordering is only
supported where the result set limit is less than druid.query.scan.maxRowsQueuedForOrdering
rows or all segments
scanned have fewer than druid.query.scan.maxSegmentPartitionsOrderedInMemory
partitions. Also, time ordering is not
supported for queries issued directly to historicals unless a list of segments is specified. The reasoning behind
these limitations is that the implementation of time ordering uses two strategies that can consume too much heap memory
if left unbounded. These strategies (listed below) are chosen on a per-Historical basis depending on query result set
limit and the number of segments being scanned.
Priority Queue: Each segment on a Historical is opened sequentially. Every row is added to a bounded priority queue which is ordered by timestamp. For every row above the result set limit, the row with the earliest (if descending) or latest (if ascending) timestamp will be dequeued. After every row has been processed, the sorted contents of the priority queue are streamed back to the Broker(s) in batches. Attempting to load too many rows into memory runs the risk of Historical nodes running out of memory. The
druid.query.scan.maxRowsQueuedForOrdering
property protects from this by limiting the number of rows in the query result set when time ordering is used.N-Way Merge: For each segment, each partition is opened in parallel. Since each partition's rows are already time-ordered, an n-way merge can be performed on the results from each partition. This approach doesn't persist the entire result set in memory (like the Priority Queue) as it streams back batches as they are returned from the merge function. However, attempting to query too many partition could also result in high memory usage due to the need to open decompression and decoding buffers for each. The
druid.query.scan.maxSegmentPartitionsOrderedInMemory
limit protects from this by capping the number of partitions opened at any times when time ordering is used.
Both druid.query.scan.maxRowsQueuedForOrdering
and druid.query.scan.maxSegmentPartitionsOrderedInMemory
are
configurable and can be tuned based on hardware specs and number of dimensions being queried. These config properties
can also be overridden using the maxRowsQueuedForOrdering
and maxSegmentPartitionsOrderedInMemory
properties in
the query context (see the Query Context Properties section).
Configuration Properties
Configuration properties:
property | description | values | default |
---|---|---|---|
druid.query.scan.maxRowsQueuedForOrdering | The maximum number of rows returned when time ordering is used | An integer in [1, 2147483647] | 100000 |
druid.query.scan.maxSegmentPartitionsOrderedInMemory | The maximum number of segments scanned per historical when time ordering is used | An integer in [1, 2147483647] | 50 |
Query context properties
property | description | values | default |
---|---|---|---|
maxRowsQueuedForOrdering | The maximum number of rows returned when time ordering is used. Overrides the identically named config. | An integer in [1, 2147483647] | druid.query.scan.maxRowsQueuedForOrdering |
maxSegmentPartitionsOrderedInMemory | The maximum number of segments scanned per historical when time ordering is used. Overrides the identically named config. | An integer in [1, 2147483647] | druid.query.scan.maxSegmentPartitionsOrderedInMemory |
Sample query context JSON object:
{
"maxRowsQueuedForOrdering": 100001,
"maxSegmentPartitionsOrderedInMemory": 100
}
Legacy mode
In older versions of Druid, the scan query supported a legacy mode designed for protocol compatibility with the former scan-query contrib extension from versions of Druid older than 0.11. This mode has been removed.