Posted on:March 7, 2025 at 02:30 AM

Clickhouse 사용하기

Clickhouse 사용하기

1. ClickHouse 소개

ClickHouse는 Yandex에서 최초로 개발한 강력한 오픈소스 컬럼 지향 데이터베이스 입니다. 온라인 분석 처리(OLAP) 시나리오에 특화되어 설계되었으며, 대규모 데이터 분석 워크로드를 처리하는 데 탁월한 선택입니다.

1.1 ClickHouse의 특별한 점

ClickHouse는 다음과 같은 여러 가지 이유로 혼잡한 데이터베이스 시장에서 돋보입니다:

  1. 컬럼 지향 저장소: 전통적인 행 지향 데이터베이스와 달리, ClickHouse는 데이터를 행이 아닌 열 단위로 저장하여 특정 열만 처리하는 것이 일반적인 분석 쿼리에 매우 효율적입니다.

  2. 초고속 성능: 초당 수십억 개의 행과 수십 개의 열을 처리할 수 있도록 설계되어 있어 사용 가능한 OLAP 데이터베이스 시스템 중 가장 빠른 시스템 중 하나입니다.

  3. 실시간 데이터 처리: 주로 OLAP 데이터베이스이지만 실시간 데이터 수집과 쿼리를 동시에 처리할 수 있습니다.

  4. 데이터 압축: 고급 압축 방법을 통해 쿼리 성능을 유지하면서 저장 비용을 절감할 수 있습니다.

1.2 유사 도구와의 비교

더 자세히 살펴보기 전에, ClickHouse가 다른 인기 있는 분석 솔루션과 어떻게 비교되는지 이해하는 것이 좋습니다:

도구최적화된 용도주요 차별점
ClickHouse고성능 분석탁월한 쿼리 속도
Apache Druid실시간 분석스트림 수집
Elasticsearch전문 검색텍스트 검색 기능
Snowflake클라우드 데이터 웨어하우스관리형 서비스
TimescaleDB시계열 데이터PostgreSQL 호환성

2. ClickHouse 시작하기

2.1 설치

macOS에서 가장 간단한 시작 방법은 Homebrew를 사용하는 것입니다:

brew install clickhouse

brew services로 ClickHouse 실행하기

brew tap pavsap/clickhouse
brew install clickhouse-server
brew services start clickhouse-server

pavsap에서 가져옴

Docker Compose로 ClickHouse 실행하기

compose.yaml

services:
  clickhouse:
    container_name: clickhouse
    network_mode: host
    image: clickhouse/clickhouse-server:latest
    restart: unless-stopped
    environment:
      CLICKHOUSE_USER: ${CLICKHOUSE_USER}
      CLICKHOUSE_PASSWORD: ${CLICKHOUSE_PASSWORD}
      CLICKHOUSE_DB: ${CLICKHOUSE_DB}
      AWS_ACCESS_KEY_ID: ${GCS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${GCS_SECRET_KEY}
    volumes:
      - ./config/config.xml:/etc/clickhouse-server/config.d/config.xml
      - ./mnt/data:/var/lib/clickhouse

2.2 ClickHouse 연결하기

ClickHouse 서버에 연결하는 방법에는 여러 가지가 있습니다:

1. 명령줄 인터페이스

clickhouse client -h <host_address> --port <port> -u <username> --password <password>

2. 환경 변수

더 쉬운 연결을 위해 다음 환경 변수를 설정하세요:

  • CLICKHOUSE_HOST
  • CLICKHOUSE_USER
  • CLICKHOUSE_PASSWORD
  • CLICKHOUSE_PORT

3. GUI 도구

DBeaver는 시각적 데이터베이스 관리를 위한 훌륭한 선택입니다:

  1. DBeaver 다운로드
  2. 새 연결 생성
  3. ClickHouse 어댑터 선택
  4. 연결 상세 정보 입력

4. Python 통합

ClickHouse는 다양한 라이브러리를 통해 강력한 Python 지원을 제공합니다:

import clickhouse_connect

client = clickhouse_connect.get_client(
    host='your_host',
    port='your_port',
    username='your_username',
    password='your_password'
)

https://clickhouse.com/docs/integrations/python

5. Node.js 통합

import { createClient } from '@clickhouse/client'

const client = createClient({
    url: `${protocol}://${your_host}:${your_port}`,
    username: 'your_username',
    password: 'your_password',
    database: 'your_database',
})

https://clickhouse.com/docs/integrations/javascript

2.3 ClickHouse에서 Parquet 파일 다루기

ClickHouse의 강력한 기능 중 하나는 클라우드 스토리지(S3/GCS)에 저장된 Parquet 파일을 직접 처리할 수 있다는 것입니다.

클라우드 스토리지 액세스 설정

GCS 버킷 구성:

  1. Cloud Storage > 설정 > 상호 운용성으로 이동
  2. 서비스 계정 섹션에서 HMAC 자격 증명 생성
  3. 환경 변수 설정:
    • GCS_ACCESS_KEY_ID
    • GCS_SECRET_KEY

ClickHouse 서버 구성

config.xml에 다음을 추가하세요:

<storage_configuration>
    <s3>
        <use_environment_credentials>true</use_environment_credentials>
    </s3>
</storage_configuration>

3. 실제 예제

예제 1:

CREATE DATABASE IF NOT EXISTS test;

CREATE TABLE IF NOT EXISTS test.orders
(`OrderID` Int64,
`CustomerID` Int64,
`OrderDate` DateTime,
`Comments` String,
`Cancelled` Bool)
ENGINE = MergeTree
PRIMARY KEY (OrderID, OrderDate)
ORDER BY (OrderID, OrderDate, CustomerID)
SETTINGS index_granularity = 8192;

INSERT INTO test.orders
VALUES (334, 123, '2021-09-15 14:30:00', 'some comment',
false);

SELECT OrderID, OrderDate FROM test.orders;

SELECT * FROM test.orders FORMAT Pretty;

SHOW databases;
SHOW tables;

예제 2:

-- lesson 02 engines
CREATE TABLE IF NOT EXISTS inventory1
(
    `id` Int32,
    `status` String,
    `price` String,
    `comment` String
)
ENGINE = MergeTree
PRIMARY KEY (id, price)
ORDER BY (id, price, status);

INSERT INTO inventory1 VALUES (23, 'success', '1000', 'Confirmed');
INSERT INTO inventory1 VALUES (23, 'success', '2000', 'Cancelled');
SELECT * from inventory1 WHERE id=23;

CREATE TABLE IF NOT EXISTS inventory2
(
    `id` Int32,
    `status` String,
    `price` String,
    `comment` String
)
ENGINE = ReplacingMergeTree
PRIMARY KEY (id)
ORDER BY (id, status);

INSERT INTO inventory2 VALUES (23, 'success', '1000', 'Confirmed');
INSERT INTO inventory2 VALUES (23, 'success', '2000', 'Cancelled');
SELECT * from inventory2 WHERE id=23;

CREATE TABLE IF NOT EXISTS inventory3
(
    `id` Int32,
    `status` String,
    `price` String,
    `comment` String,
    `sign` Int8
)
ENGINE = CollapsingMergeTree(sign)
PRIMARY KEY (id)
ORDER BY (id, status);

INSERT INTO inventory3 VALUES (23, 'success', '1000', 'Confirmed', 1);
INSERT INTO inventory3 VALUES (23, 'success', '2000', 'Cancelled', -1);
SELECT * from inventory3 WHERE id=23;

CREATE TABLE IF NOT EXISTS inventory4
 (
    `id` Int32,
    `status` String,
    `price` Int32,
    `num_items` UInt64
)
ENGINE = MergeTree
ORDER BY (id, status);

CREATE MATERIALIZED VIEW IF NOT EXISTS agg_inventory
(
    `id` Int32,
    `max_price` AggregateFunction(max, Int32),
    `sum_items` AggregateFunction(sum, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (id)
AS SELECT
    id,
    maxState(price) as max_price,
    sumState(num_items) as sum_items
FROM inventory4
GROUP BY id;

INSERT INTO inventory4 VALUES (3, 100, 2), (3, 500, 4);

SELECT
  id,
  maxMerge(max_price) AS max_price,
  sumMerge(sum_items) AS sum_items
FROM agg_inventory
WHERE id=3
GROUP BY id;

CREATE TABLE log_location
(
  id Int32,
  long String,
  lat Int32
) ENGINE = TinyLog;

CREATE TABLE mysql_inventory
(
  id Int32,
  price Int32
)
ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password')

예제 3:

-- lesson 03

CREATE TABLE IF NOT EXISTS default.projects1
(
   `project_id` UInt32,
   `name` String,
   `created_date` Date
)
ENGINE = MergeTree
ORDER BY (project_id, created_date);


INSERT INTO projects1
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 10000000;

SELECT * FROM projects1 LIMIT 10 FORMAT Pretty;

EXPLAIN indexes=1
SELECT * FROM projects1 WHERE created_date=today();

-- order key
CREATE TABLE IF NOT EXISTS default.projects2
(
    `project_id` UInt32,
    `name` String,
    `created_date` Date
)
ENGINE = MergeTree
PRIMARY KEY (created_date, project_id)
ORDER BY (created_date, project_id, name);

INSERT INTO projects2
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 10000000;

SELECT
    name,
    partition
FROM
    system.parts
WHERE
    table = 'projects2'
FORMAT Pretty;

-- partitioned
CREATE TABLE IF NOT EXISTS default.projects_partitioned
(
    `project_id` UInt32,
    `name` String,
    `created_date` Date
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(created_date)
PRIMARY KEY (created_date, project_id)
ORDER BY (created_date, project_id, name);

INSERT INTO projects_partitioned
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 100;

SELECT *
FROM projects_partitioned
WHERE created_date='2020-02-01'
FORMAT Pretty;

EXPLAIN indexes=1
SELECT * FROM projects_partitioned
WHERE created_date='2020-02-01';

예제 4:

-- Query 1: Select all data with schema definition
SELECT *
FROM s3(
    'https://storage.googleapis.com/our-lab/backtest_data/polygon_io_minute_aggs_hive_partitioned_parquet/year%3D2014/month%3D1/date%3D2014-01-02/00000000.parquet',
    'Parquet',
    'symbol String,
    timestamp DateTime64(9),
    open Float64,
    high Float64,
    low Float64,
    close Float64,
    volume Int64,
    vwap Float64,
    transactions Int64'
);

-- Query 2: Simple select all
SELECT *
FROM s3(
    'https://storage.googleapis.com/our-lab/backtest_data/polygon_io_minute_aggs_hive_partitioned_parquet/year%3D2014/month%3D1/date%3D2014-01-02/00000000.parquet',
    'Parquet'
);

-- Query 3: Select with volume filter and ordering
SELECT *
FROM s3(
    'https://storage.googleapis.com/our-lab/backtest_data/polygon_io_minute_aggs_hive_partitioned_parquet/year%3D2014/month%3D1/date%3D2014-01-02/00000000.parquet',
    'Parquet'
)
WHERE volume > 10000
ORDER BY volume DESC;

-- Query 4: Select from balance sheets
SELECT *
FROM s3(
    'https://storage.googleapis.com/our-lab/input/v1.0.1/yfinance_balance_sheets_vti_20241107.parquet',
    'Parquet'
);

-- Query 5: Describe balance sheets table
DESCRIBE TABLE s3(
    'https://storage.googleapis.com/our-lab/input/v1.0.1/yfinance_balance_sheets_vti_20241107.parquet'
);

예제 5:

-- Query 1: insert into parquet file
INSERT INTO FUNCTION s3(
    'https://storage.googleapis.com/our-lab/chtest/write_test1.parquet',
    'Parquet'
)
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 200;

INSERT INTO FUNCTION s3(
    'https://storage.googleapis.com/our-lab/chtest/write_test1.parquet',
    'Parquet'
)
SETTINGS s3_create_new_file_on_insert=1
--SETTINGS s3_create_new_file_on_insert=0, s3_truncate_on_insert=0
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 200;

-- describe table
DESCRIBE TABLE s3(
    'https://storage.googleapis.com/our-lab/chtest/write_test1.parquet'
);

-- select from parquet file
SELECT *
FROM s3(
    'https://storage.googleapis.com/our-lab/chtest/write_test1.parquet',
    'Parquet'
)
LIMIT 20
FORMAT Pretty;

-- Reading from multiple parquet files in a directory
SELECT *
FROM s3(
    'https://storage.googleapis.com/our-lab/input/v1.0.1/sp500/financial_loss_output/v1/bi_losses_output/*.parquet',
    'Parquet')
LIMIT 20;

예제 6:

-- Query 1: insert into parquet file
INSERT INTO FUNCTION s3(
'https://storage.googleapis.com/our-lab/chtest/write_test2.parquet',
'Parquet'
)
SETTINGS s3_create_new_file_on_insert=1
PARTITION BY rand() % 10
SELECT \* FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 20000;

-- Query 2: insert into parquet file
INSERT INTO FUNCTION s3(
'https://storage.googleapis.com/our-lab/chtest/write_test2_{_partition_id}.parquet',
'Parquet'
)
SETTINGS s3_create_new_file_on_insert=1
SELECT
\*,
rand() % 10 as \_partition_id
FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 20000;

-- Query 3. Writing to multiple parquet files in a directory
INSERT INTO FUNCTION file('data\_{_partition_id}.parquet', Parquet)
PARTITION BY toYYYYMM(created_date)
SELECT \*
FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 10000;

-- Query 4. Writing to multple parquet files in a directory with file()
INSERT INTO FUNCTION file('foo*{\_partition_id}.parquet', Parquet)
PARTITION BY concat(toString(toYear(created_date)), '*', toString(toMonth(created_date)))
SELECT
\*,
toString(toYear(created_date)) as year,
toString(toMonth(created_date)) as month
FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 100;

--root@onec02:/var/lib/clickhouse/user*files# ll
-- total 192
-- drwxr-xr-x 2 clickhouse clickhouse 4096 Mar 14 07:05 ./
-- drwxrwxrwx 12 clickhouse clickhouse 4096 Mar 4 22:50 ../
-- -rw-r----- 1 clickhouse clickhouse 185478 Mar 14 07:05 'foo*(1,1).parquet'

-- Query 5. Writing to multiple parquet files in a directory with s3()
INSERT INTO FUNCTION s3(
'https://storage.googleapis.com/our-lab/chtest/write_test3_{_partition_id}.parquet',
Parquet)
PARTITION BY toYYYYMM(created_date)
SELECT \*
FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 10000;

-- https://console.cloud.google.com/storage/browser/our-lab/chtest;tab=objects?pageState=(%22StorageObjectListTable%22:(%22f%22:%22%255B%255D%22))&project=oc-lunar&prefix=&forceOnObjectsSortingFiltering=false
  1. Python 예제 1:
# https://clickhouse.com/docs/knowledgebase/python-clickhouse-connect-example
import json
import os
import sys

import clickhouse_connect

CLICKHOUSE_CLOUD_HOSTNAME = os.getenv("CLICKHOUSE_HOST")
CLICKHOUSE_CLOUD_USER = os.getenv("CLICKHOUSE_USER")
CLICKHOUSE_CLOUD_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD")
CLICKHOUSE_CLOUD_PORT = os.getenv("CLICKHOUSE_PORT", "8123")

client = clickhouse_connect.get_client(
    host=CLICKHOUSE_CLOUD_HOSTNAME,
    port=8123,
    username=CLICKHOUSE_CLOUD_USER,
    password=CLICKHOUSE_CLOUD_PASSWORD,
)

print("connected to " + CLICKHOUSE_CLOUD_HOSTNAME + "\n")
client.command(
    "CREATE TABLE IF NOT EXISTS new_table (key UInt32, value String, metric Float64) ENGINE MergeTree ORDER BY key"
)

print("table new_table created or exists already!\n")

row1 = [1000, "String Value 1000", 5.233]
row2 = [2000, "String Value 2000", -107.04]
data = [row1, row2]
client.insert("new_table", data, column_names=["key", "value", "metric"])

print("written 2 rows to table new_table\n")

QUERY = "SELECT max(key), avg(metric) FROM new_table"

result = client.query(QUERY)

sys.stdout.write("query: [" + QUERY + "] returns:\n\n")
print(result.result_rows)

client.close()
  1. Python 예제 2:
# https://clickhouse.com/blog/querying-pandas-dataframes

import chdb.dataframe as cdf
import pandas as pd


def main():
    house_prices = pd.read_csv(
        filepath_or_buffer="data/HouseListings-Top45Cities-10292023-kaggle.csv",
        encoding="ISO-8859-1",
    )

    print(house_prices.head(n=2).T)

    cities = pd.read_csv(filepath_or_buffer="data/canadacities.csv")
    print(cities.head(n=1).T)

    top_cities = cdf.query(
        house_prices=house_prices,
        cities=cities,
        sql="""
            FROM __house_prices__ AS hp
            JOIN __cities__ AS c
            ON c.city_ascii = hp.City AND c.province_name = hp.Province
            SELECT City, Province, count(*),
                round(avg(Price)) AS avgPrice,
                round(max(Price)) AS maxPrice,
                ranking, density
            GROUP BY ALL
            LIMIT 10
            """,
    )

    top_cities.query(
        """
        FROM __table__
        SELECT City, maxPrice, ranking, density
        LIMIT 5
        """
    )

    top_cities_df = top_cities.to_pandas()
    print(top_cities_df.head(n=1).T)
    print(top_cities_df)

    bc_top_cities = (
        top_cities_df[top_cities_df["Province"] == "British Columbia"]
        .sort_values(["ranking", "density"])
        .drop(["Province"], axis=1)
    )
    print("aaa=====")
    print(top_cities_df)
    print("bbb=====")
    print(bc_top_cities)


if __name__ == "__main__":
    main()

4. 결론

ClickHouse는 대규모 데이터 분석을 다루는 조직에게 탁월한 선택입니다. 컬럼 지향 아키텍처와 함께 Parquet 파일 직접 지원 및 다양한 통합 옵션과 같은 강력한 기능을 통해 모든 데이터 엔지니어의 도구 모음에서 다재다능한 도구가 됩니다.

더 자세한 예제와 고급 사용법은 ClickHouse 공식 문서를 참조하세요.