1. ClickHouse 소개
ClickHouse는 Yandex에서 최초로 개발한 강력한 오픈소스 컬럼 지향 데이터베이스 입니다. 온라인 분석 처리(OLAP) 시나리오에 특화되어 설계되었으며, 대규모 데이터 분석 워크로드를 처리하는 데 탁월한 선택입니다.
1.1 ClickHouse의 특별한 점
ClickHouse는 다음과 같은 여러 가지 이유로 혼잡한 데이터베이스 시장에서 돋보입니다:
-
컬럼 지향 저장소: 전통적인 행 지향 데이터베이스와 달리, ClickHouse는 데이터를 행이 아닌 열 단위로 저장하여 특정 열만 처리하는 것이 일반적인 분석 쿼리에 매우 효율적입니다.
-
초고속 성능: 초당 수십억 개의 행과 수십 개의 열을 처리할 수 있도록 설계되어 있어 사용 가능한 OLAP 데이터베이스 시스템 중 가장 빠른 시스템 중 하나입니다.
-
실시간 데이터 처리: 주로 OLAP 데이터베이스이지만 실시간 데이터 수집과 쿼리를 동시에 처리할 수 있습니다.
-
데이터 압축: 고급 압축 방법을 통해 쿼리 성능을 유지하면서 저장 비용을 절감할 수 있습니다.
1.2 유사 도구와의 비교
더 자세히 살펴보기 전에, ClickHouse가 다른 인기 있는 분석 솔루션과 어떻게 비교되는지 이해하는 것이 좋습니다:
도구 | 최적화된 용도 | 주요 차별점 |
---|---|---|
ClickHouse | 고성능 분석 | 탁월한 쿼리 속도 |
Apache Druid | 실시간 분석 | 스트림 수집 |
Elasticsearch | 전문 검색 | 텍스트 검색 기능 |
Snowflake | 클라우드 데이터 웨어하우스 | 관리형 서비스 |
TimescaleDB | 시계열 데이터 | PostgreSQL 호환성 |
2. ClickHouse 시작하기
2.1 설치
macOS에서 가장 간단한 시작 방법은 Homebrew를 사용하는 것입니다:
brew install clickhouse
brew services로 ClickHouse 실행하기
brew tap pavsap/clickhouse
brew install clickhouse-server
brew services start clickhouse-server
pavsap에서 가져옴
Docker Compose로 ClickHouse 실행하기
compose.yaml
services:
clickhouse:
container_name: clickhouse
network_mode: host
image: clickhouse/clickhouse-server:latest
restart: unless-stopped
environment:
CLICKHOUSE_USER: ${CLICKHOUSE_USER}
CLICKHOUSE_PASSWORD: ${CLICKHOUSE_PASSWORD}
CLICKHOUSE_DB: ${CLICKHOUSE_DB}
AWS_ACCESS_KEY_ID: ${GCS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${GCS_SECRET_KEY}
volumes:
- ./config/config.xml:/etc/clickhouse-server/config.d/config.xml
- ./mnt/data:/var/lib/clickhouse
2.2 ClickHouse 연결하기
ClickHouse 서버에 연결하는 방법에는 여러 가지가 있습니다:
1. 명령줄 인터페이스
clickhouse client -h <host_address> --port <port> -u <username> --password <password>
2. 환경 변수
더 쉬운 연결을 위해 다음 환경 변수를 설정하세요:
CLICKHOUSE_HOST
CLICKHOUSE_USER
CLICKHOUSE_PASSWORD
CLICKHOUSE_PORT
3. GUI 도구
DBeaver는 시각적 데이터베이스 관리를 위한 훌륭한 선택입니다:
- DBeaver 다운로드
- 새 연결 생성
- ClickHouse 어댑터 선택
- 연결 상세 정보 입력
4. Python 통합
ClickHouse는 다양한 라이브러리를 통해 강력한 Python 지원을 제공합니다:
import clickhouse_connect
client = clickhouse_connect.get_client(
host='your_host',
port='your_port',
username='your_username',
password='your_password'
)
https://clickhouse.com/docs/integrations/python
5. Node.js 통합
import { createClient } from '@clickhouse/client'
const client = createClient({
url: `${protocol}://${your_host}:${your_port}`,
username: 'your_username',
password: 'your_password',
database: 'your_database',
})
https://clickhouse.com/docs/integrations/javascript
2.3 ClickHouse에서 Parquet 파일 다루기
ClickHouse의 강력한 기능 중 하나는 클라우드 스토리지(S3/GCS)에 저장된 Parquet 파일을 직접 처리할 수 있다는 것입니다.
클라우드 스토리지 액세스 설정
GCS 버킷 구성:
- Cloud Storage > 설정 > 상호 운용성으로 이동
- 서비스 계정 섹션에서 HMAC 자격 증명 생성
- 환경 변수 설정:
GCS_ACCESS_KEY_ID
GCS_SECRET_KEY
ClickHouse 서버 구성
config.xml
에 다음을 추가하세요:
<storage_configuration>
<s3>
<use_environment_credentials>true</use_environment_credentials>
</s3>
</storage_configuration>
3. 실제 예제
예제 1:
CREATE DATABASE IF NOT EXISTS test;
CREATE TABLE IF NOT EXISTS test.orders
(`OrderID` Int64,
`CustomerID` Int64,
`OrderDate` DateTime,
`Comments` String,
`Cancelled` Bool)
ENGINE = MergeTree
PRIMARY KEY (OrderID, OrderDate)
ORDER BY (OrderID, OrderDate, CustomerID)
SETTINGS index_granularity = 8192;
INSERT INTO test.orders
VALUES (334, 123, '2021-09-15 14:30:00', 'some comment',
false);
SELECT OrderID, OrderDate FROM test.orders;
SELECT * FROM test.orders FORMAT Pretty;
SHOW databases;
SHOW tables;
예제 2:
-- lesson 02 engines
CREATE TABLE IF NOT EXISTS inventory1
(
`id` Int32,
`status` String,
`price` String,
`comment` String
)
ENGINE = MergeTree
PRIMARY KEY (id, price)
ORDER BY (id, price, status);
INSERT INTO inventory1 VALUES (23, 'success', '1000', 'Confirmed');
INSERT INTO inventory1 VALUES (23, 'success', '2000', 'Cancelled');
SELECT * from inventory1 WHERE id=23;
CREATE TABLE IF NOT EXISTS inventory2
(
`id` Int32,
`status` String,
`price` String,
`comment` String
)
ENGINE = ReplacingMergeTree
PRIMARY KEY (id)
ORDER BY (id, status);
INSERT INTO inventory2 VALUES (23, 'success', '1000', 'Confirmed');
INSERT INTO inventory2 VALUES (23, 'success', '2000', 'Cancelled');
SELECT * from inventory2 WHERE id=23;
CREATE TABLE IF NOT EXISTS inventory3
(
`id` Int32,
`status` String,
`price` String,
`comment` String,
`sign` Int8
)
ENGINE = CollapsingMergeTree(sign)
PRIMARY KEY (id)
ORDER BY (id, status);
INSERT INTO inventory3 VALUES (23, 'success', '1000', 'Confirmed', 1);
INSERT INTO inventory3 VALUES (23, 'success', '2000', 'Cancelled', -1);
SELECT * from inventory3 WHERE id=23;
CREATE TABLE IF NOT EXISTS inventory4
(
`id` Int32,
`status` String,
`price` Int32,
`num_items` UInt64
)
ENGINE = MergeTree
ORDER BY (id, status);
CREATE MATERIALIZED VIEW IF NOT EXISTS agg_inventory
(
`id` Int32,
`max_price` AggregateFunction(max, Int32),
`sum_items` AggregateFunction(sum, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (id)
AS SELECT
id,
maxState(price) as max_price,
sumState(num_items) as sum_items
FROM inventory4
GROUP BY id;
INSERT INTO inventory4 VALUES (3, 100, 2), (3, 500, 4);
SELECT
id,
maxMerge(max_price) AS max_price,
sumMerge(sum_items) AS sum_items
FROM agg_inventory
WHERE id=3
GROUP BY id;
CREATE TABLE log_location
(
id Int32,
long String,
lat Int32
) ENGINE = TinyLog;
CREATE TABLE mysql_inventory
(
id Int32,
price Int32
)
ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password')
예제 3:
-- lesson 03
CREATE TABLE IF NOT EXISTS default.projects1
(
`project_id` UInt32,
`name` String,
`created_date` Date
)
ENGINE = MergeTree
ORDER BY (project_id, created_date);
INSERT INTO projects1
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 10000000;
SELECT * FROM projects1 LIMIT 10 FORMAT Pretty;
EXPLAIN indexes=1
SELECT * FROM projects1 WHERE created_date=today();
-- order key
CREATE TABLE IF NOT EXISTS default.projects2
(
`project_id` UInt32,
`name` String,
`created_date` Date
)
ENGINE = MergeTree
PRIMARY KEY (created_date, project_id)
ORDER BY (created_date, project_id, name);
INSERT INTO projects2
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 10000000;
SELECT
name,
partition
FROM
system.parts
WHERE
table = 'projects2'
FORMAT Pretty;
-- partitioned
CREATE TABLE IF NOT EXISTS default.projects_partitioned
(
`project_id` UInt32,
`name` String,
`created_date` Date
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(created_date)
PRIMARY KEY (created_date, project_id)
ORDER BY (created_date, project_id, name);
INSERT INTO projects_partitioned
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 100;
SELECT *
FROM projects_partitioned
WHERE created_date='2020-02-01'
FORMAT Pretty;
EXPLAIN indexes=1
SELECT * FROM projects_partitioned
WHERE created_date='2020-02-01';
예제 4:
-- Query 1: Select all data with schema definition
SELECT *
FROM s3(
'https://storage.googleapis.com/our-lab/backtest_data/polygon_io_minute_aggs_hive_partitioned_parquet/year%3D2014/month%3D1/date%3D2014-01-02/00000000.parquet',
'Parquet',
'symbol String,
timestamp DateTime64(9),
open Float64,
high Float64,
low Float64,
close Float64,
volume Int64,
vwap Float64,
transactions Int64'
);
-- Query 2: Simple select all
SELECT *
FROM s3(
'https://storage.googleapis.com/our-lab/backtest_data/polygon_io_minute_aggs_hive_partitioned_parquet/year%3D2014/month%3D1/date%3D2014-01-02/00000000.parquet',
'Parquet'
);
-- Query 3: Select with volume filter and ordering
SELECT *
FROM s3(
'https://storage.googleapis.com/our-lab/backtest_data/polygon_io_minute_aggs_hive_partitioned_parquet/year%3D2014/month%3D1/date%3D2014-01-02/00000000.parquet',
'Parquet'
)
WHERE volume > 10000
ORDER BY volume DESC;
-- Query 4: Select from balance sheets
SELECT *
FROM s3(
'https://storage.googleapis.com/our-lab/input/v1.0.1/yfinance_balance_sheets_vti_20241107.parquet',
'Parquet'
);
-- Query 5: Describe balance sheets table
DESCRIBE TABLE s3(
'https://storage.googleapis.com/our-lab/input/v1.0.1/yfinance_balance_sheets_vti_20241107.parquet'
);
예제 5:
-- Query 1: insert into parquet file
INSERT INTO FUNCTION s3(
'https://storage.googleapis.com/our-lab/chtest/write_test1.parquet',
'Parquet'
)
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 200;
INSERT INTO FUNCTION s3(
'https://storage.googleapis.com/our-lab/chtest/write_test1.parquet',
'Parquet'
)
SETTINGS s3_create_new_file_on_insert=1
--SETTINGS s3_create_new_file_on_insert=0, s3_truncate_on_insert=0
SELECT * FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 200;
-- describe table
DESCRIBE TABLE s3(
'https://storage.googleapis.com/our-lab/chtest/write_test1.parquet'
);
-- select from parquet file
SELECT *
FROM s3(
'https://storage.googleapis.com/our-lab/chtest/write_test1.parquet',
'Parquet'
)
LIMIT 20
FORMAT Pretty;
-- Reading from multiple parquet files in a directory
SELECT *
FROM s3(
'https://storage.googleapis.com/our-lab/input/v1.0.1/sp500/financial_loss_output/v1/bi_losses_output/*.parquet',
'Parquet')
LIMIT 20;
예제 6:
-- Query 1: insert into parquet file
INSERT INTO FUNCTION s3(
'https://storage.googleapis.com/our-lab/chtest/write_test2.parquet',
'Parquet'
)
SETTINGS s3_create_new_file_on_insert=1
PARTITION BY rand() % 10
SELECT \* FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 20000;
-- Query 2: insert into parquet file
INSERT INTO FUNCTION s3(
'https://storage.googleapis.com/our-lab/chtest/write_test2_{_partition_id}.parquet',
'Parquet'
)
SETTINGS s3_create_new_file_on_insert=1
SELECT
\*,
rand() % 10 as \_partition_id
FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 20000;
-- Query 3. Writing to multiple parquet files in a directory
INSERT INTO FUNCTION file('data\_{_partition_id}.parquet', Parquet)
PARTITION BY toYYYYMM(created_date)
SELECT \*
FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 10000;
-- Query 4. Writing to multple parquet files in a directory with file()
INSERT INTO FUNCTION file('foo*{\_partition_id}.parquet', Parquet)
PARTITION BY concat(toString(toYear(created_date)), '*', toString(toMonth(created_date)))
SELECT
\*,
toString(toYear(created_date)) as year,
toString(toMonth(created_date)) as month
FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 100;
--root@onec02:/var/lib/clickhouse/user*files# ll
-- total 192
-- drwxr-xr-x 2 clickhouse clickhouse 4096 Mar 14 07:05 ./
-- drwxrwxrwx 12 clickhouse clickhouse 4096 Mar 4 22:50 ../
-- -rw-r----- 1 clickhouse clickhouse 185478 Mar 14 07:05 'foo*(1,1).parquet'
-- Query 5. Writing to multiple parquet files in a directory with s3()
INSERT INTO FUNCTION s3(
'https://storage.googleapis.com/our-lab/chtest/write_test3_{_partition_id}.parquet',
Parquet)
PARTITION BY toYYYYMM(created_date)
SELECT \*
FROM generateRandom('project_id Int32, name String, created_date Date', 10, 10, 1)
LIMIT 10000;
-- https://console.cloud.google.com/storage/browser/our-lab/chtest;tab=objects?pageState=(%22StorageObjectListTable%22:(%22f%22:%22%255B%255D%22))&project=oc-lunar&prefix=&forceOnObjectsSortingFiltering=false
- Python 예제 1:
# https://clickhouse.com/docs/knowledgebase/python-clickhouse-connect-example
import json
import os
import sys
import clickhouse_connect
CLICKHOUSE_CLOUD_HOSTNAME = os.getenv("CLICKHOUSE_HOST")
CLICKHOUSE_CLOUD_USER = os.getenv("CLICKHOUSE_USER")
CLICKHOUSE_CLOUD_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD")
CLICKHOUSE_CLOUD_PORT = os.getenv("CLICKHOUSE_PORT", "8123")
client = clickhouse_connect.get_client(
host=CLICKHOUSE_CLOUD_HOSTNAME,
port=8123,
username=CLICKHOUSE_CLOUD_USER,
password=CLICKHOUSE_CLOUD_PASSWORD,
)
print("connected to " + CLICKHOUSE_CLOUD_HOSTNAME + "\n")
client.command(
"CREATE TABLE IF NOT EXISTS new_table (key UInt32, value String, metric Float64) ENGINE MergeTree ORDER BY key"
)
print("table new_table created or exists already!\n")
row1 = [1000, "String Value 1000", 5.233]
row2 = [2000, "String Value 2000", -107.04]
data = [row1, row2]
client.insert("new_table", data, column_names=["key", "value", "metric"])
print("written 2 rows to table new_table\n")
QUERY = "SELECT max(key), avg(metric) FROM new_table"
result = client.query(QUERY)
sys.stdout.write("query: [" + QUERY + "] returns:\n\n")
print(result.result_rows)
client.close()
- Python 예제 2:
# https://clickhouse.com/blog/querying-pandas-dataframes
import chdb.dataframe as cdf
import pandas as pd
def main():
house_prices = pd.read_csv(
filepath_or_buffer="data/HouseListings-Top45Cities-10292023-kaggle.csv",
encoding="ISO-8859-1",
)
print(house_prices.head(n=2).T)
cities = pd.read_csv(filepath_or_buffer="data/canadacities.csv")
print(cities.head(n=1).T)
top_cities = cdf.query(
house_prices=house_prices,
cities=cities,
sql="""
FROM __house_prices__ AS hp
JOIN __cities__ AS c
ON c.city_ascii = hp.City AND c.province_name = hp.Province
SELECT City, Province, count(*),
round(avg(Price)) AS avgPrice,
round(max(Price)) AS maxPrice,
ranking, density
GROUP BY ALL
LIMIT 10
""",
)
top_cities.query(
"""
FROM __table__
SELECT City, maxPrice, ranking, density
LIMIT 5
"""
)
top_cities_df = top_cities.to_pandas()
print(top_cities_df.head(n=1).T)
print(top_cities_df)
bc_top_cities = (
top_cities_df[top_cities_df["Province"] == "British Columbia"]
.sort_values(["ranking", "density"])
.drop(["Province"], axis=1)
)
print("aaa=====")
print(top_cities_df)
print("bbb=====")
print(bc_top_cities)
if __name__ == "__main__":
main()
4. 결론
ClickHouse는 대규모 데이터 분석을 다루는 조직에게 탁월한 선택입니다. 컬럼 지향 아키텍처와 함께 Parquet 파일 직접 지원 및 다양한 통합 옵션과 같은 강력한 기능을 통해 모든 데이터 엔지니어의 도구 모음에서 다재다능한 도구가 됩니다.
더 자세한 예제와 고급 사용법은 ClickHouse 공식 문서를 참조하세요.