Posted on:March 7, 2025 at 02:30 AM

ClickHouse

ClickHouse

1. Introduction to ClickHouse

ClickHouse is a powerful open-source column-oriented database management system (DBMS) that was originally developed by Yandex. It’s specifically designed for Online Analytical Processing (OLAP) scenarios, making it an excellent choice for handling large-scale data analytics workloads.

1.1 What Makes ClickHouse Special?

ClickHouse stands out in the crowded database market for several reasons:

  1. Column-Oriented Storage: Unlike traditional row-oriented databases, ClickHouse stores data by columns rather than rows, making it extremely efficient for analytical queries that typically only need to process specific columns.

  2. Lightning-Fast Performance: It’s designed to process billions of rows and dozens of columns per second, making it one of the fastest OLAP database systems available.

  3. Real-Time Data Processing: While primarily an OLAP database, ClickHouse can handle real-time data ingestion and querying simultaneously.

  4. Data Compression: Advanced compression methods help reduce storage costs while maintaining query performance.

1.2 Comparison with Similar Tools

Before diving deeper, it’s worth understanding how ClickHouse compares to other popular analytics solutions:

ToolBest ForKey Differentiator
ClickHouseHigh-performance analyticsExceptional query speed
Apache DruidReal-time analyticsStream ingestion
ElasticsearchFull-text searchText search capabilities
SnowflakeCloud data warehouseManaged service
TimescaleDBTime-series dataPostgreSQL compatibility

2. Getting Started with ClickHouse

2.1 Installation

The simplest way to get started on macOS is using Homebrew:

brew install clickhouse

Run clickhouse as a brew services

brew tap pavsap/clickhouse
brew install clickhouse-server
brew services start clickhouse-server

From pavsap

Run clickhouse with docker compose

compose.yaml

services:
  clickhouse:
    container_name: clickhouse
    network_mode: host
    image: clickhouse/clickhouse-server:latest
    restart: unless-stopped
    environment:
      CLICKHOUSE_USER: ${CLICKHOUSE_USER}
      CLICKHOUSE_PASSWORD: ${CLICKHOUSE_PASSWORD}
      CLICKHOUSE_DB: ${CLICKHOUSE_DB}
      AWS_ACCESS_KEY_ID: ${GCS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${GCS_SECRET_KEY}
    volumes:
      - ./config/config.xml:/etc/clickhouse-server/config.d/config.xml
      - ./mnt/data:/var/lib/clickhouse

2.2 Connecting to ClickHouse

There are several ways to connect to a ClickHouse server:

1. Command Line Interface

clickhouse client -h <host_address> --port <port> -u <username> --password <password>

2. Environment Variables

Set these environment variables for easier connection:

  • CLICKHOUSE_HOST
  • CLICKHOUSE_USER
  • CLICKHOUSE_PASSWORD
  • CLICKHOUSE_PORT

3. GUI Tools

DBeaver is an excellent choice for visual database management:

  1. Download DBeaver
  2. Create a new connection
  3. Select ClickHouse adapter
  4. Enter your connection details

4. Python Integration

ClickHouse offers robust Python support through various libraries:

import clickhouse_connect

client = clickhouse_connect.get_client(
    host='your_host',
    port='your_port',
    username='your_username',
    password='your_password'
)

https://clickhouse.com/docs/integrations/python

5. Node Integreation:

import { createClient } from '@clickhouse/client'

const client = createClient({
    url: `${protocol}://${your_host}:${your_port}`,
    username: 'your_username',
    password: 'your_password',
    database: 'your_database',
})

https://clickhouse.com/docs/integrations/javascript

2.3 Working with Parquet Files in ClickHouse

One of ClickHouse’s powerful features is its ability to work directly with Parquet files stored in cloud storage (S3/GCS).

Setting Up Cloud Storage Access

GCS Bucket Configuration:

  1. Navigate to Cloud Storage > Settings > Interoperability
  2. Create HMAC credentials under Service Account section
  3. Set environment variables:
    • GCS_ACCESS_KEY_ID
    • GCS_SECRET_KEY

ClickHouse Server Configuration

Add the following to your config.xml:

<storage_configuration>
    <s3>
        <use_environment_credentials>true</use_environment_credentials>
    </s3>
</storage_configuration>

3. Practical Examples

Example 1:

CREATE DATABASE IF NOT EXISTS test;

CREATE TABLE IF NOT EXISTS test.orders
(`OrderID` Int64,
`CustomerID` Int64,
`OrderDate` DateTime,
`Comments` String,
`Cancelled` Bool)
ENGINE = MergeTree
PRIMARY KEY (OrderID, OrderDate)
ORDER BY (OrderID, OrderDate, CustomerID)
SETTINGS index_granularity = 8192;

INSERT INTO test.orders
VALUES (334, 123, '2021-09-15 14:30:00', 'some comment',
false);

SELECT OrderID, OrderDate FROM test.orders;

SELECT * FROM test.orders FORMAT Pretty;

SHOW databases;
SHOW tables;

Example 2:

-- lesson 02 engines
CREATE TABLE IF NOT EXISTS inventory1
(
    `id` Int32,
    `status` String,
    `price` String,
    `comment` String
)
ENGINE = MergeTree
PRIMARY KEY (id, price)
ORDER BY (id, price, status);

INSERT INTO inventory1 VALUES (23, 'success', '1000', 'Confirmed');
INSERT INTO inventory1 VALUES (23, 'success', '2000', 'Cancelled');
SELECT * from inventory1 WHERE id=23;

CREATE TABLE IF NOT EXISTS inventory2
(
    `id` Int32,
    `status` String,
    `price` String,
    `comment` String
)
ENGINE = ReplacingMergeTree
PRIMARY KEY (id)
ORDER BY (id, status);

INSERT INTO inventory2 VALUES (23, 'success', '1000', 'Confirmed');
INSERT INTO inventory2 VALUES (23, 'success', '2000', 'Cancelled');
SELECT * from inventory2 WHERE id=23;

CREATE TABLE IF NOT EXISTS inventory3
(
    `id` Int32,
    `status` String,
    `price` String,
    `comment` String,
    `sign` Int8
)
ENGINE = CollapsingMergeTree(sign)
PRIMARY KEY (id)
ORDER BY (id, status);

INSERT INTO inventory3 VALUES (23, 'success', '1000', 'Confirmed', 1);
INSERT INTO inventory3 VALUES (23, 'success', '2000', 'Cancelled', -1);
SELECT * from inventory3 WHERE id=23;

CREATE TABLE IF NOT EXISTS inventory4
 (
    `id` Int32,
    `status` String,
    `price` Int32,
    `num_items` UInt64
)
ENGINE = MergeTree
ORDER BY (id, status);

CREATE MATERIALIZED VIEW IF NOT EXISTS agg_inventory
(
    `id` Int32,
    `max_price` AggregateFunction(max, Int32),
    `sum_items` AggregateFunction(sum, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (id)
AS SELECT
    id,
    maxState(price) as max_price,
    sumState(num_items) as sum_items
FROM inventory4
GROUP BY id;

INSERT INTO inventory4 VALUES (3, 100, 2), (3, 500, 4);

SELECT
  id,
  maxMerge(max_price) AS max_price,
  sumMerge(sum_items) AS sum_items
FROM agg_inventory
WHERE id=3
GROUP BY id;

CREATE TABLE log_location
(
  id Int32,
  long String,
  lat Int32
) ENGINE = TinyLog;

CREATE TABLE mysql_inventory
(
  id Int32,
  price Int32
)
ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password')

Example 3:

-- lesson 03

CREATE TABLE IF NOT EXISTS default.projects1
(
   `project_id` UInt32,
   `name` String,
   `created_date` Date
)
ENGINE = MergeTree
ORDER BY (project_id, created_date);


INSERT INTO projects1
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 10000000;

SELECT * FROM projects1 LIMIT 10 FORMAT Pretty;

EXPLAIN indexes=1
SELECT * FROM projects1 WHERE created_date=today();

-- order key
CREATE TABLE IF NOT EXISTS default.projects2
(
    `project_id` UInt32,
    `name` String,
    `created_date` Date
)
ENGINE = MergeTree
PRIMARY KEY (created_date, project_id)
ORDER BY (created_date, project_id, name);

INSERT INTO projects2
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 10000000;

SELECT
    name,
    partition
FROM
    system.parts
WHERE
    table = 'projects2'
FORMAT Pretty;

-- partitioned
CREATE TABLE IF NOT EXISTS default.projects_partitioned
(
    `project_id` UInt32,
    `name` String,
    `created_date` Date
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(created_date)
PRIMARY KEY (created_date, project_id)
ORDER BY (created_date, project_id, name);

INSERT INTO projects_partitioned
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 100;

SELECT *
FROM projects_partitioned
WHERE created_date='2020-02-01'
FORMAT Pretty;

EXPLAIN indexes=1
SELECT * FROM projects_partitioned
WHERE created_date='2020-02-01';

Example 4:

-- Query 1: Select all data with schema definition
SELECT *
FROM s3(
    'https://storage.googleapis.com/our-lab/backtest_data/polygon_io_minute_aggs_hive_partitioned_parquet/year%3D2014/month%3D1/date%3D2014-01-02/00000000.parquet',
    'Parquet',
    'symbol String,
    timestamp DateTime64(9),
    open Float64,
    high Float64,
    low Float64,
    close Float64,
    volume Int64,
    vwap Float64,
    transactions Int64'
);

-- Query 2: Simple select all
SELECT *
FROM s3(
    'https://storage.googleapis.com/our-lab/backtest_data/polygon_io_minute_aggs_hive_partitioned_parquet/year%3D2014/month%3D1/date%3D2014-01-02/00000000.parquet',
    'Parquet'
);

-- Query 3: Select with volume filter and ordering
SELECT *
FROM s3(
    'https://storage.googleapis.com/our-lab/backtest_data/polygon_io_minute_aggs_hive_partitioned_parquet/year%3D2014/month%3D1/date%3D2014-01-02/00000000.parquet',
    'Parquet'
)
WHERE volume > 10000
ORDER BY volume DESC;

-- Query 4: Select from balance sheets
SELECT *
FROM s3(
    'https://storage.googleapis.com/our-lab/input/v1.0.1/yfinance_balance_sheets_vti_20241107.parquet',
    'Parquet'
);

-- Query 5: Describe balance sheets table
DESCRIBE TABLE s3(
    'https://storage.googleapis.com/our-lab/input/v1.0.1/yfinance_balance_sheets_vti_20241107.parquet'
);

Example 5:

-- Query 1: insert into parquet file
INSERT INTO FUNCTION s3(
    'https://storage.googleapis.com/our-lab/chtest/write_test1.parquet',
    'Parquet'
)
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 200;

INSERT INTO FUNCTION s3(
    'https://storage.googleapis.com/our-lab/chtest/write_test1.parquet',
    'Parquet'
)
SETTINGS s3_create_new_file_on_insert=1
--SETTINGS s3_create_new_file_on_insert=0, s3_truncate_on_insert=0
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 200;

-- describe table
DESCRIBE TABLE s3(
    'https://storage.googleapis.com/our-lab/chtest/write_test1.parquet'
);

-- select from parquet file
SELECT *
FROM s3(
    'https://storage.googleapis.com/our-lab/chtest/write_test1.parquet',
    'Parquet'
)
LIMIT 20
FORMAT Pretty;

-- Reading from multiple parquet files in a directory
SELECT *
FROM s3(
    'https://storage.googleapis.com/our-lab/input/v1.0.1/sp500/financial_loss_output/v1/bi_losses_output/*.parquet',
    'Parquet')
LIMIT 20;

Example 6:

-- Query 1: insert into parquet file
INSERT INTO FUNCTION s3(
    'https://storage.googleapis.com/our-lab/chtest/write_test2.parquet',
    'Parquet'
)
SETTINGS s3_create_new_file_on_insert=1
PARTITION BY rand() % 10
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 20000;

-- Query 2: insert into parquet file
INSERT INTO FUNCTION s3(
    'https://storage.googleapis.com/our-lab/chtest/write_test2_{_partition_id}.parquet',
    'Parquet'
)
SETTINGS s3_create_new_file_on_insert=1
SELECT
    *,
    rand() % 10 as _partition_id
FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 20000;

-- Query 3. Writing to multiple parquet files in a directory
INSERT INTO FUNCTION file('data_{_partition_id}.parquet', Parquet)
PARTITION BY toYYYYMM(created_date)
SELECT *
FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 10000;

-- Query 4. Writing to multple parquet files in a directory with file()
INSERT INTO FUNCTION file('foo_{_partition_id}.parquet', Parquet)
PARTITION BY concat(toString(toYear(created_date)), '_', toString(toMonth(created_date)))
SELECT
    *,
    toString(toYear(created_date)) as year,
    toString(toMonth(created_date)) as month
FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 100;

--root@onec02:/var/lib/clickhouse/user_files# ll
-- total 192
-- drwxr-xr-x  2 clickhouse clickhouse   4096 Mar 14 07:05  ./
-- drwxrwxrwx 12 clickhouse clickhouse   4096 Mar  4 22:50  ../
-- -rw-r-----  1 clickhouse clickhouse 185478 Mar 14 07:05 'foo_(1,1).parquet'

-- Query 5. Writing to multiple parquet files in a directory with s3()
INSERT INTO FUNCTION s3(
    'https://storage.googleapis.com/our-lab/chtest/write_test3_{_partition_id}.parquet',
    Parquet)
PARTITION BY toYYYYMM(created_date)
SELECT *
FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 10000;

-- https://console.cloud.google.com/storage/browser/our-lab/chtest;tab=objects?pageState=(%22StorageObjectListTable%22:(%22f%22:%22%255B%255D%22))&project=oc-lunar&prefix=&forceOnObjectsSortingFiltering=false
  1. Python Example 1:
# https://clickhouse.com/docs/knowledgebase/python-clickhouse-connect-example
import json
import os
import sys

import clickhouse_connect

CLICKHOUSE_CLOUD_HOSTNAME = os.getenv("CLICKHOUSE_HOST")
CLICKHOUSE_CLOUD_USER = os.getenv("CLICKHOUSE_USER")
CLICKHOUSE_CLOUD_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD")
CLICKHOUSE_CLOUD_PORT = os.getenv("CLICKHOUSE_PORT", "8123")

client = clickhouse_connect.get_client(
    host=CLICKHOUSE_CLOUD_HOSTNAME,
    port=8123,
    username=CLICKHOUSE_CLOUD_USER,
    password=CLICKHOUSE_CLOUD_PASSWORD,
)

print("connected to " + CLICKHOUSE_CLOUD_HOSTNAME + "\n")
client.command(
    "CREATE TABLE IF NOT EXISTS new_table (key UInt32, value String, metric Float64) ENGINE MergeTree ORDER BY key"
)

print("table new_table created or exists already!\n")

row1 = [1000, "String Value 1000", 5.233]
row2 = [2000, "String Value 2000", -107.04]
data = [row1, row2]
client.insert("new_table", data, column_names=["key", "value", "metric"])

print("written 2 rows to table new_table\n")

QUERY = "SELECT max(key), avg(metric) FROM new_table"

result = client.query(QUERY)

sys.stdout.write("query: [" + QUERY + "] returns:\n\n")
print(result.result_rows)

client.close()
  1. Python Example 2:
# https://clickhouse.com/blog/querying-pandas-dataframes

import chdb.dataframe as cdf
import pandas as pd


def main():
    house_prices = pd.read_csv(
        filepath_or_buffer="data/HouseListings-Top45Cities-10292023-kaggle.csv",
        encoding="ISO-8859-1",
    )

    print(house_prices.head(n=2).T)

    cities = pd.read_csv(filepath_or_buffer="data/canadacities.csv")
    print(cities.head(n=1).T)

    top_cities = cdf.query(
        house_prices=house_prices,
        cities=cities,
        sql="""
            FROM __house_prices__ AS hp
            JOIN __cities__ AS c
            ON c.city_ascii = hp.City AND c.province_name = hp.Province
            SELECT City, Province, count(*),
                round(avg(Price)) AS avgPrice,
                round(max(Price)) AS maxPrice,
                ranking, density
            GROUP BY ALL
            LIMIT 10
            """,
    )

    top_cities.query(
        """
        FROM __table__
        SELECT City, maxPrice, ranking, density
        LIMIT 5
        """
    )

    top_cities_df = top_cities.to_pandas()
    print(top_cities_df.head(n=1).T)
    print(top_cities_df)

    bc_top_cities = (
        top_cities_df[top_cities_df["Province"] == "British Columbia"]
        .sort_values(["ranking", "density"])
        .drop(["Province"], axis=1)
    )
    print("aaa=====")
    print(top_cities_df)
    print("bbb=====")
    print(bc_top_cities)


if __name__ == "__main__":
    main()

4. Conclusion

ClickHouse is an excellent choice for organizations dealing with large-scale data analytics. Its column-oriented architecture, combined with powerful features like direct Parquet file support and various integration options, makes it a versatile tool in any data engineer’s arsenal.

For more detailed examples and advanced usage, check out the official ClickHouse documentation.