PostHog Handbook Library / Engineering

1,906 words. Estimated reading time: 9 min.

Data storage or what is a MergeTree

Auto TL;DR

At a Glance

This long page covers these main areas. The list is generated from the article headings, so it updates with every handbook rebuild.

  1. Introduction to MergeTree
  2. How MergeTree stores data
  3. Seeding data
  4. Looking at part data
  5. Inspecting data on disk
  6. What does the Merge stand for?
  7. Query execution
  8. Aggregation supported by ORDER BY

This document covers the answers to the following questions:

Introduction to MergeTree

Why is ClickHouse so fast? states:

ClickHouse was initially built as a prototype to do just a single task well: to filter and aggregate data as fast as possible.

Rather than force all possible tasks to be solved by singular tools, ClickHouse provides specialized "engines" that each solve specific problems.

MergeTree engine family tables are intended for ingesting large amounts of data, storing that data efficiently, and running analytical queries on it.

How MergeTree stores data

Consider the following (simplified) table for storing sensor events:

CREATE TABLE sensor_values (
    timestamp DateTime,
    site_id UInt32,
    event VARCHAR,
    uuid UUID,
    metric_value Int32
)
ENGINE = MergeTree()
ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)
SETTINGS index_granularity = 8192

Data for this table would be stored in parts, each part a separate directory on disk. Data for a given part is always sorted by the order set in ORDER BY statement and compressed.

Parts can be Wide or Compact depending on its size. We'll be mostly dealing with Wide parts as part of day-to-day operations.

Wide parts are large and store each column in a separate binary data file, which are sorted and compressed.

ClickHouse also stores a sparse index for the part. A collection of rows with size equal to the index_granularity setting is called a granule. For every granule, the primary index stores a mark containing the value of the ORDER BY statement as well as a pointer to where that mark is located in each data file.

💡 For better performance when running queries, it is not recommended to set index_granularity too low. The default value for engines in the MergeTree family is 8192. An implication of this is that accessing data by primary key (in this case the ORDER BY clause is equivalent to the primary key) will not read just one row, but rather up to index_granularity number of rows. This is acceptable given ClickHouse is meant to perform well with aggregations, rather than point lookups.

<details><summary>Diving deeper into data-on-disk for a Wide part</summary>

This assumes you're using a docker-based ClickHouse installation and have clickhouse-client running

Seeding data
INSERT INTO sensor_values
SELECT *
FROM generateRandom('timestamp DateTime, site_id UInt8, event VARCHAR, uuid UUID, metric_value Int32', NULL, 10)
LIMIT 200000000
Looking at part data

system.parts table contains a lot of metadata about every part.

To find out what type each part is, its size, and where on disk it's located, you can run the following query:

SELECT
    name,
    part_type,
    rows,
    marks,
    formatReadableSize(bytes_on_disk),
    formatReadableSize(data_compressed_bytes),
    formatReadableSize(data_uncompressed_bytes),
    formatReadableSize(marks_bytes),
    path
FROM system.parts
WHERE active and table = 'sensor_values'
FORMAT Vertical

The result might look something like this:

Row 1:
──────
name:                                        all_12_17_1
part_type:                                   Wide
rows:                                        6291270
marks:                                       769
formatReadableSize(bytes_on_disk):           476.07 MiB
formatReadableSize(data_compressed_bytes):   475.92 MiB
formatReadableSize(data_uncompressed_bytes): 474.00 MiB
formatReadableSize(marks_bytes):             90.12 KiB
path:                                        /var/lib/clickhouse/store/267/267cd730-33ca-4e43-8a84-e4f0786e364b/all_12_17_1/
Inspecting data on disk
⟩ docker exec -it posthog_clickhouse_1 ls -lhS /var/lib/clickhouse/store/267/267cd730-33ca-4e43-8a84-e4f0786e364b/all_12_17_1/
total 477M
-rw-r----- 1 clickhouse clickhouse 308M Nov  2 07:33 event.bin
-rw-r----- 1 clickhouse clickhouse  97M Nov  2 07:33 uuid.bin
-rw-r----- 1 clickhouse clickhouse  25M Nov  2 07:33 metric_value.bin
-rw-r----- 1 clickhouse clickhouse  25M Nov  2 07:33 timestamp.bin
-rw-r----- 1 clickhouse clickhouse  25M Nov  2 07:33 site_id.bin
-rw-r----- 1 clickhouse clickhouse  58K Nov  2 07:33 primary.idx
-rw-r----- 1 clickhouse clickhouse  19K Nov  2 07:33 event.mrk2
-rw-r----- 1 clickhouse clickhouse  19K Nov  2 07:33 metric_value.mrk2
-rw-r----- 1 clickhouse clickhouse  19K Nov  2 07:33 site_id.mrk2
-rw-r----- 1 clickhouse clickhouse  19K Nov  2 07:33 timestamp.mrk2
-rw-r----- 1 clickhouse clickhouse  19K Nov  2 07:33 uuid.mrk2
-rw-r----- 1 clickhouse clickhouse  494 Nov  2 07:33 checksums.txt
-rw-r----- 1 clickhouse clickhouse  123 Nov  2 07:33 columns.txt
-rw-r----- 1 clickhouse clickhouse   10 Nov  2 07:33 default_compression_codec.txt
-rw-r----- 1 clickhouse clickhouse    7 Nov  2 07:33 count.txt

What are these files?

You can read more on the exact structure of these files and how they're used in ClickHouse Index Design documentation.

What does the Merge stand for?

In every system, data must be ingested and kept up-to-date somehow. When data is inserted into MergeTree tables, each insert creates one or multiple parts for the data inserted.

As having a lot of small files would be disadvantageous for many reasons from query performance to storage, ClickHouse regularly merges small parts together until they reach a maximum size.

The merge combines the two parts into a new one. This is similar to how merge sort works and atomically replaces the two source parts.

Merges can be monitored using the system.merges table.

Query execution

Aggregation supported by ORDER BY

Our sensor_values table is set up in a way that queries similar to the following are really fast to execute.

SELECT
    toStartOfDay(timestamp),
    event,
    sum(metric_value) as total_metric_value
FROM sensor_values
WHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY toStartOfDay(timestamp), event
ORDER BY total_metric_value DESC
LIMIT 20

Executing this reports:

20 rows in set. Elapsed: 0.042 sec. Processed 90.11 thousand rows, 3.54 MB (2.13 million rows/s., 83.60 MB/s.)

Why can it be fast? Because ClickHouse:

  1. leverages the table ORDER BY clause (ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)) to skip reading a lot of data
  2. is fast and efficient about I/O and aggregation

Let's dig into how the primary index for this query is used by using EXPLAIN.

EXPLAIN indexes=1, header=1 SELECT
    toStartOfDay(timestamp),
    event,
    sum(metric_value) as total_metric_value
FROM sensor_values
WHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY toStartOfDay(timestamp), event
ORDER BY total_metric_value DESC
LIMIT 20
FORMAT LineAsString

<details><summary>Show full EXPLAIN output</summary>

Expression (Projection)
Header: toStartOfDay(timestamp) DateTime
        event String
        total_metric_value Int64
  Limit (preliminary LIMIT (without OFFSET))
  Header: toStartOfDay(timestamp) DateTime
          event String
          sum(metric_value) Int64
    Sorting (Sorting for ORDER BY)
    Header: toStartOfDay(timestamp) DateTime
            event String
            sum(metric_value) Int64
      Expression (Before ORDER BY)
      Header: toStartOfDay(timestamp) DateTime
              event String
              sum(metric_value) Int64
        Aggregating
        Header: toStartOfDay(timestamp) DateTime
                event String
                sum(metric_value) Int64
          Expression (Before GROUP BY)
          Header: event String
                  metric_value Int32
                  toStartOfDay(timestamp) DateTime
            Filter (WHERE)
            Header: timestamp DateTime
                    event String
                    metric_value Int32
              SettingQuotaAndLimits (Set limits and quota after reading from storage)
              Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
                      timestamp DateTime
                      site_id UInt32
                      event String
                      metric_value Int32
                ReadFromMergeTree
                Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
                        timestamp DateTime
                        site_id UInt32
                        event String
                        metric_value Int32
                Indexes:
                  PrimaryKey
                    Keys:
                      site_id
                      toStartOfDay(timestamp)
                    Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))
                    Parts: 2/2
                    Granules: 11/24415

The full output of explain is obtuse, but the most important part is also the most deeply nested one:

ReadFromMergeTree
Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
        timestamp DateTime
        site_id UInt32
        event String
        metric_value Int32
Indexes:
    PrimaryKey
    Keys:
        site_id
        toStartOfDay(timestamp)
    Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))
    Parts: 2/2
    Granules: 11/24415

At the start of the query, ClickHouse loaded the primary index of each part into memory. From this output, we know that the query first used the primary key to filter based on site_id and timestamp values stored in the index. This allowed it to know that only 11 out of 24415 granules (0.05%) contained any relevant data.

From there it read those 11 granules (11 * 8192 rows) worth of data from timestamp, side_id, event and metric_value columns and did the rest of filtering and aggregation on that data alone.

See this documentation for a guide on how to choose ORDER BY.

"Point queries" not supported by ORDER BY

Consider this query:

SELECT * FROM sensor_values WHERE uuid = '69028f26-768f-afef-1816-521b22d281ca'

Executing this query reports:

1 row in set. Elapsed: 0.703 sec. Processed 200.00 million rows, 3.20 GB (304.43 million rows/s., 4.87 GB/s.)

While the overall execution time of this query is not bad thanks to fast I/O, it needed to read 2200x the amount of data from disk. As the dataset size or column sizes increase, this performance would get dramatically worse.

Why is this query slower? Because our ORDER BY does not support fast filtering by uuid and ClickHouse needs to read the whole table to find a single record _and_ read all columns.

ClickHouse provides some ways to make this faster (e.g. Projections) but in general these require extra disk space or have other trade-offs.

Thus, it's important to make sure the ClickHouse schema is aligned with queries that are being executed.

PARTITION BY

Another tool to make queries faster is PARTITION BY. Consider the updated table definition:

CREATE TABLE sensor_values (
    timestamp DateTime,
    site_id UInt32,
    event VARCHAR,
    uuid UUID,
    metric_value Int32
)
ENGINE = MergeTree()
PARTITION BY intDiv(toYear(timestamp), 10)
ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)
SETTINGS index_granularity = 8192

Here, ClickHouse would generate one partition per 10 years of data, allowing to skip reading even the primary index in some cases.

In the underlying data, each part would belong to a single partition and only parts within a partition would get merged.

One additional benefit of partitioning by a derivate of timestamp is that if most queries touch recent data, you can also set up rules to automatically move older parts and partitions to cheaper storage or drop them entirely.

Query analysis

Let's use an identical query as before to explain with the new dataset:

SELECT
    toStartOfDay(timestamp),
    event,
    sum(metric_value) as total_metric_value
FROM sensor_values
WHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY toStartOfDay(timestamp), event
ORDER BY total_metric_value DESC
LIMIT 20

<details><summary>Show full EXPLAIN output</summary>

Expression (Projection)
Header: toStartOfDay(timestamp) DateTime
        event String
        total_metric_value Int64
  Limit (preliminary LIMIT (without OFFSET))
  Header: toStartOfDay(timestamp) DateTime
          event String
          sum(metric_value) Int64
    Sorting (Sorting for ORDER BY)
    Header: toStartOfDay(timestamp) DateTime
            event String
            sum(metric_value) Int64
      Expression (Before ORDER BY)
      Header: toStartOfDay(timestamp) DateTime
              event String
              sum(metric_value) Int64
        Aggregating
        Header: toStartOfDay(timestamp) DateTime
                event String
                sum(metric_value) Int64
          Expression (Before GROUP BY)
          Header: event String
                  metric_value Int32
                  toStartOfDay(timestamp) DateTime
            Filter (WHERE)
            Header: timestamp DateTime
                    event String
                    metric_value Int32
              SettingQuotaAndLimits (Set limits and quota after reading from storage)
              Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
                      timestamp DateTime
                      site_id UInt32
                      event String
                      metric_value Int32
                ReadFromMergeTree
                Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
                        timestamp DateTime
                        site_id UInt32
                        event String
                        metric_value Int32
                Indexes:
                  MinMax
                    Keys:
                      timestamp
                    Condition: and(and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))), and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))))
                    Parts: 2/14
                    Granules: 3589/24421
                  Partition
                    Keys:
                      intDiv(toYear(timestamp), 10)
                    Condition: and(and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))), and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))))
                    Parts: 2/2
                    Granules: 3589/3589
                  PrimaryKey
                    Keys:
                      site_id
                      toStartOfDay(timestamp)
                    Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))
                    Parts: 2/2
                    Granules: 12/3589

The relevant part of EXPLAIN is again nested deep within:

ReadFromMergeTree
Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
        timestamp DateTime
        site_id UInt32
        event String
        metric_value Int32
Indexes:
  MinMax
    Keys:
      timestamp
    Condition: and(and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))), and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))))
    Parts: 2/14
    Granules: 3589/24421
  Partition
    Keys:
      intDiv(toYear(timestamp), 10)
    Condition: and(and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))), and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))))
    Parts: 2/2
    Granules: 3589/3589
  PrimaryKey
    Keys:
      site_id
      toStartOfDay(timestamp)
    Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))
    Parts: 2/2
    Granules: 12/3589

What this tells us is that ClickHouse:

  1. First leverages an internal MinMax index on timestamp to whittle down the number of parts to 2/14 and granules to 3589/24421
  2. Then it tries to filter via the partition key but this doesn't narrow things down further
  3. Then, it loads and leverages the Primary key as before to narrow data down to 12 granules.
  4. Lastly reads, filters and aggregates data in those 12 granules

The benefit here is that it could skip reading the primary key index for most of the parts that did not contain relevant data. If and how much this speeds up the query however depends on the size of the dataset.

Choosing a good PARTITION BY

Use partitions wisely - each INSERT should ideally only touch 1-2 partitions and too many partitions will cause issues around replication or prove useless for filtering.

Loading the primary index/marks file might not be the bottleneck you expect, so be sure to benchmark different schemas against each other.

See the following Altinity documentation for more guidance:

Other notes on MergeTree

Data is expensive to update

Updating data in ClickHouse is expensive and analogous to a schema migration.

For example, to update an event's properties, ClickHouse frequently needs to:

This makes things operationally hard. We mitigate this by:

No query planner

ClickHouse doesn't have a query planner in the sense PostgreSQL or other databases do.

On the one hand, you often end up fighting the query planner in other databases. If we know how ClickHouse works internally and can develop that into intuition for how SQL is executed, we're well-equipped to deal with performance issues as they arise.

On the other, this means that we'll need to be careful writing SQL as small changes can have huge performance implications.

Examples:

One notable exception to "no query planner" is that ClickHouse often pushes predicates from WHERE into PREWHERE. Filters in PREWHERE are executed first and ClickHouse moves columns it thinks are "cheaper" or "more selective" into it. However putting the wrong column (e.g. a fat column containing JSON) in PREWHERE can cause performance to tank.

Read more on PREWHERE in the ClickHouse docs.

Data compression

Compression means that if subsequent column values of a given column are often similar or identical, the data compresses really well. At PostHog we frequently see uncompressed / compressed ratios of 20x-40x for JSON columns and 300x-2000x for sparse small columns.

Compression ratios have direct impact on query performance: I/O is often the bottleneck, meaning that highly compressed data can be read faster from disk at the cost of more CPU work for decompression.

By default columns are compressed by the LZ4 algorithm. We've found good success using ZSTD(3) for storing JSON columns - see benchmarks for more information.

Another tip is to use ClickHouse's LowCardinality data type modifier on schemas where a given column will store values with low cardinality i.e. the total number of values is low. An example of this would be "country name".

Weak JOIN support

ClickHouse excels at aggregating data from a single table at a time. If you however have a query with JOINs or subqueries, the right-hand-side of the JOIN would be loaded into memory first. Thus, you should always have the bigger table on the left side of left-hand-side!

This means that at scale JOINs can kill performance. Read more on the effect of removing JOINs from our events database here:

Suggested reading

Next in the ClickHouse manual: Data replication

Canonical URL: https://posthog.com/handbook/engineering/clickhouse/data-storage

GitHub source: contents/handbook/engineering/clickhouse/data-storage.mdx

Content hash: 06cc5925ac1136c5