DuckDB + Python 実践ガイド:「分析用SQLite」で大規模データを高速処理する方法

DuckDB 1.4 LTSとPythonを組み合わせた大規模データ分析の実践手法を解説。pandasとの連携、CSV・Parquetファイルの直接クエリ、ETLパイプライン構築、パフォーマンスベンチマークまで実用コード付きで紹介。

はじめに:なぜ今DuckDBなのか

データ分析をやっていると、ツール選びで頭を悩ませる場面って多いですよね。かつてはデータベースサーバーを立ち上げて、接続設定を行い、スキーマを定義して——ようやくクエリを実行できる、みたいなワークフローが当たり前でした。

一方で、pandasの登場によってPythonでのデータ分析は飛躍的に手軽になりました。ただ、正直なところ、データサイズが数GBを超えた瞬間に「メモリが足りない…」という壁にぶつかった経験がある方も多いんじゃないでしょうか。

そこで注目されているのがDuckDBです。「分析用SQLite」とも呼ばれるこのデータベースエンジンは、サーバー不要のインプロセス型でありながら、列指向ストレージとベクトル化実行エンジンにより大規模データを驚くほどのスピードで処理してくれます。2025年9月にリリースされたDuckDB 1.4.x LTS(コードネーム「Andium」)では、データベース暗号化やMERGE INTO構文など、エンタープライズ向けの機能もかなり充実しました。1年間のコミュニティサポートが保証されるLTS版の登場は、本番環境への導入を検討している組織にとって大きな安心材料ですね。

この記事では、DuckDBとPythonを組み合わせた実践的なデータ処理手法を、基本操作から高度な分析パイプラインの構築まで一通り解説していきます。pandasとの連携、CSVやParquetファイルの直接クエリ、リモートファイルへのアクセス、パフォーマンスベンチマークまで、現場ですぐに使える内容を盛り込んでいるので、ぜひ手を動かしながら読んでみてください。

DuckDBとは何か:アーキテクチャの特徴

DuckDBは、OLAP(Online Analytical Processing)に特化したインプロセス型のリレーショナルデータベース管理システムです。SQLiteが「組み込み型OLTP」の代名詞であるのに対して、DuckDBは「組み込み型OLAP」として設計されています。

列指向ストレージとベクトル化実行エンジン

従来の行指向データベース(MySQL、PostgreSQLなど)はトランザクション処理には向いていますが、分析クエリでは大量の行から特定の列だけを集計するケースが多く、行指向だと不必要なデータの読み込みが発生してしまいます。DuckDBは列指向(Columnar)ストレージを採用していて、必要な列のみを効率的に読み込むことで、分析ワークロードにおけるI/Oを最小化します。

さらに、ベクトル化実行エンジンがデータを一行ずつではなくベクトル(チャンク)単位で処理します。これによってCPUキャッシュの利用効率が高まり、SIMD命令による並列処理の恩恵を最大限に受けられるわけです。加えてマルチスレッド処理にも対応しているので、現代のマルチコアCPUの性能をしっかり引き出してくれます。

インプロセス型:サーバー不要の手軽さ

DuckDBの最も特筆すべき特徴の一つが、インプロセス型であることです。PostgreSQLやMySQLのようにサーバープロセスを立ち上げる必要がなく、Pythonのライブラリとしてインポートするだけですぐに使えます。これは地味に嬉しいポイントで、以下のような利点があります。

  • デプロイの簡素化pip installだけで導入完了
  • 運用コストの削減:サーバーの監視・メンテナンスが不要
  • ゼロコピー連携:pandas、Polars、Apache Arrowとのデータ受け渡しにメモリコピーが不要
  • ポータビリティ:ノートPCでもクラウド環境でも同じコードが動作

RAMを超えるデータセットの処理

DuckDBは利用可能なメモリ量を超えるデータセットも処理できるよう設計されています。データが物理メモリに収まらない場合は、自動的にディスクへのスピル(一時退避)が行われてアウトオブコア処理が実現されます。つまり、8GBのメモリしかないノートPCでも、数十GBのデータセットに対してSQLクエリを実行できるんです。これ、初めて知ったときは結構驚きました。

環境構築とインストール

DuckDBのインストールはびっくりするほどシンプルです。pipを使って一行で終わります。

# DuckDBのインストール
pip install duckdb

# 特定バージョンを指定する場合
pip install duckdb==1.4.0

# アップグレード
pip install --upgrade duckdb

インストールしたら、Pythonからバージョンを確認してみましょう。

import duckdb

# バージョン確認
print(duckdb.__version__)
# 出力例: 1.4.0

# DuckDBの情報を取得
result = duckdb.sql("SELECT version() AS version, current_setting('threads') AS threads")
result.show()

DuckDB 1.4.x LTSは2025年9月にリリースされ、1年間のコミュニティサポートが提供されています。安定性を重視するプロジェクトでは、このLTS版を使うのがおすすめです。あと、関連ライブラリも合わせて入れておくと後々便利ですよ。

# 関連ライブラリのインストール
pip install pandas pyarrow polars

基本操作:SQLでデータを操作する

DuckDBの基本操作は、標準SQLの知識があればすぐに始められます。まずはインメモリデータベースでの操作から見ていきましょう。

接続の作成とテーブル操作

import duckdb

# インメモリデータベースに接続
con = duckdb.connect()

# テーブルの作成
con.execute("""
    CREATE TABLE employees (
        id INTEGER PRIMARY KEY,
        name VARCHAR NOT NULL,
        department VARCHAR,
        salary DECIMAL(10, 2),
        hire_date DATE
    )
""")

# データの挿入
con.execute("""
    INSERT INTO employees VALUES
    (1, '田中太郎', 'エンジニアリング', 850000, '2020-04-01'),
    (2, '鈴木花子', 'マーケティング', 720000, '2019-08-15'),
    (3, '佐藤健一', 'エンジニアリング', 920000, '2018-01-10'),
    (4, '山田美咲', 'データサイエンス', 880000, '2021-03-20'),
    (5, '高橋翔太', 'マーケティング', 680000, '2022-06-01'),
    (6, '伊藤さくら', 'データサイエンス', 950000, '2017-11-30'),
    (7, '渡辺大輔', 'エンジニアリング', 780000, '2023-01-15')
""")

# クエリの実行
result = con.execute("""
    SELECT department,
           COUNT(*) AS member_count,
           ROUND(AVG(salary), 0) AS avg_salary
    FROM employees
    GROUP BY department
    ORDER BY avg_salary DESC
""")

print(result.fetchdf())  # pandasのDataFrameとして取得

永続化データベースの利用

インメモリだけでなく、ファイルベースの永続化データベースも簡単に作れます。

# ファイルベースのデータベースに接続
con = duckdb.connect("analytics.duckdb")

# テーブル作成とデータ挿入
con.execute("""
    CREATE TABLE IF NOT EXISTS sales (
        sale_id INTEGER,
        product_name VARCHAR,
        quantity INTEGER,
        unit_price DECIMAL(8, 2),
        sale_date DATE
    )
""")

# バルクインサート
con.execute("""
    INSERT INTO sales VALUES
    (1001, 'ノートPC', 5, 89800, '2025-01-15'),
    (1002, 'モニター', 12, 34500, '2025-01-16'),
    (1003, 'キーボード', 30, 8900, '2025-01-16'),
    (1004, 'ノートPC', 3, 89800, '2025-01-17'),
    (1005, 'マウス', 50, 3200, '2025-01-17')
""")

# 接続を閉じてもデータは永続化される
con.close()

# 再接続してデータが存在することを確認
con = duckdb.connect("analytics.duckdb")
result = con.execute("SELECT COUNT(*) FROM sales").fetchone()
print(f"レコード数: {result[0]}")  # レコード数: 5
con.close()

便利な関数型API

DuckDBのPython APIには、接続オブジェクトを経由せずに直接SQLを実行できる関数型APIも用意されています。ちょっとした分析なら、こっちの方が手軽で気に入っています。

import duckdb

# duckdb.sql()でデフォルトのインメモリ接続を使用
result = duckdb.sql("""
    SELECT
        range AS id,
        'Product_' || range AS product_name,
        (random() * 10000)::INTEGER AS price
    FROM range(10)
""")

result.show()

pandasとの連携:DataFrameをSQLで分析

個人的にDuckDBで一番感動したのが、pandasのDataFrameに対して直接SQLクエリを実行できる機能です。DataFrameをわざわざデータベースにインポートする必要がなく、Python変数名をそのままテーブル名として参照できます。

DataFrameへの直接クエリ

import duckdb
import pandas as pd

# pandasのDataFrameを作成
df_orders = pd.DataFrame({
    'order_id': range(1, 8),
    'customer': ['田中', '鈴木', '田中', '佐藤', '鈴木', '山田', '田中'],
    'product': ['A', 'B', 'C', 'A', 'A', 'B', 'B'],
    'amount': [15000, 23000, 8500, 15000, 15000, 23000, 23000],
    'order_date': pd.to_datetime([
        '2025-01-10', '2025-01-12', '2025-01-15',
        '2025-01-18', '2025-01-20', '2025-01-22', '2025-01-25'
    ])
})

# DataFrameに対して直接SQLを実行
result = duckdb.sql("""
    SELECT
        customer,
        COUNT(*) AS order_count,
        SUM(amount) AS total_amount,
        ROUND(AVG(amount), 0) AS avg_amount
    FROM df_orders
    GROUP BY customer
    ORDER BY total_amount DESC
""")

print(result.fetchdf())

上記のコードでは、df_ordersというPython変数名がそのままSQL内でテーブル名として使われています。内部的にはゼロコピーでDataFrameが参照されるので、大量のデータでもメモリの無駄遣いがありません。

複数DataFrameのJOIN

複数のDataFrameをSQLでJOINできるのも強力なポイントです。pandasのmerge()で苦労していた複雑な結合も、SQLなら直感的に書けます。

import duckdb
import pandas as pd

# 顧客マスタ
df_customers = pd.DataFrame({
    'customer_id': [1, 2, 3, 4],
    'name': ['田中太郎', '鈴木花子', '佐藤健一', '山田美咲'],
    'region': ['東京', '大阪', '東京', '名古屋']
})

# 注文データ
df_transactions = pd.DataFrame({
    'tx_id': range(1, 11),
    'customer_id': [1, 2, 1, 3, 4, 2, 1, 3, 4, 2],
    'amount': [5000, 12000, 8000, 3500, 9200, 15000, 6800, 4200, 7100, 11000]
})

# JOINクエリ
result = duckdb.sql("""
    SELECT
        c.region,
        c.name,
        COUNT(t.tx_id) AS tx_count,
        SUM(t.amount) AS total_spent
    FROM df_customers c
    JOIN df_transactions t ON c.customer_id = t.customer_id
    GROUP BY c.region, c.name
    ORDER BY total_spent DESC
""")

# 結果をDataFrameに変換
df_result = result.fetchdf()
print(df_result)

結果の変換オプション

クエリ結果はいろんな形式で取得できます。使い方に応じて選べるのが便利ですね。

import duckdb

query = "SELECT * FROM range(1000) t(id)"

# pandasのDataFrameとして取得
df_pandas = duckdb.sql(query).fetchdf()

# Apache ArrowのTableとして取得
arrow_table = duckdb.sql(query).arrow()

# PolarsのDataFrameとして取得
df_polars = duckdb.sql(query).pl()

# Pythonのリストとして取得
rows = duckdb.sql(query).fetchall()

# NumPy配列として取得
np_array = duckdb.sql(query).fetchnumpy()

CSVファイルの直接クエリ

さて、ここからがDuckDBの本領発揮です。ファイルに対する直接クエリは、一度体験するともう戻れなくなります。CSVファイルを事前にテーブルにロードする必要がなく、ファイルパスを指定するだけでSQLが実行できるんです。

CSVファイルの自動検出読み込み

import duckdb

# CSVファイルに対して直接クエリ(ヘッダー・型を自動検出)
result = duckdb.sql("""
    SELECT *
    FROM read_csv('sales_data.csv')
    LIMIT 5
""")
result.show()

# 複数のCSVファイルを一括読み込み(ワイルドカード対応)
result = duckdb.sql("""
    SELECT
        filename,
        COUNT(*) AS row_count,
        SUM(amount) AS total_amount
    FROM read_csv('data/sales_*.csv', filename=true)
    GROUP BY filename
    ORDER BY total_amount DESC
""")
result.show()

CSVの読み込みオプション

自動検出でうまくいかない場合は、オプションを明示的に指定できます。レガシーなデータを扱うときに重宝する機能です。

import duckdb

# 詳細なオプション指定
result = duckdb.sql("""
    SELECT *
    FROM read_csv(
        'legacy_data.csv',
        delim = '\t',           -- タブ区切り
        header = true,          -- ヘッダー行あり
        dateformat = '%Y/%m/%d', -- 日付フォーマット
        null_padding = true,     -- 欠損値の補完
        columns = {
            'id': 'INTEGER',
            'name': 'VARCHAR',
            'value': 'DOUBLE',
            'created_at': 'DATE'
        }
    )
    WHERE value > 1000
""")
result.show()

CSVからテーブルへの高速ロード

import duckdb

con = duckdb.connect("analytics.duckdb")

# CSVから直接テーブルを作成(CTAS)
con.execute("""
    CREATE TABLE sales_history AS
    SELECT * FROM read_csv('data/sales_2024_*.csv')
""")

# テーブルの情報を確認
con.execute("DESCRIBE sales_history").show()

# レコード数を確認
count = con.execute("SELECT COUNT(*) FROM sales_history").fetchone()[0]
print(f"読み込んだレコード数: {count:,}")

con.close()

Parquetファイルの活用

Apache Parquetは列指向のバイナリフォーマットで、分析ワークロードに最適化されています。DuckDBとParquetの組み合わせは、列指向同士の相性が抜群で、正直ちょっと異次元なパフォーマンスを発揮します。

Parquetファイルの読み込み

import duckdb

# Parquetファイルの直接クエリ
result = duckdb.sql("""
    SELECT
        category,
        COUNT(*) AS count,
        SUM(revenue) AS total_revenue,
        AVG(revenue) AS avg_revenue
    FROM read_parquet('data/transactions.parquet')
    GROUP BY category
    ORDER BY total_revenue DESC
""")
result.show()

# 複数のParquetファイルを一括クエリ
result = duckdb.sql("""
    SELECT *
    FROM read_parquet('data/partitioned/**/*.parquet', hive_partitioning=true)
    WHERE year = 2025 AND month = 1
    LIMIT 100
""")
result.show()

Parquetファイルへの書き出し

import duckdb
import pandas as pd
import numpy as np

# サンプルデータの生成
np.random.seed(42)
n = 1_000_000
df = pd.DataFrame({
    'id': range(n),
    'category': np.random.choice(['Electronics', 'Clothing', 'Food', 'Books'], n),
    'price': np.round(np.random.uniform(100, 50000, n), 2),
    'quantity': np.random.randint(1, 100, n),
    'sale_date': pd.date_range('2024-01-01', periods=n, freq='s')
})

# DataFrameからParquetに書き出し
duckdb.sql("""
    COPY (SELECT * FROM df)
    TO 'output/sales_data.parquet'
    (FORMAT PARQUET, COMPRESSION 'zstd', ROW_GROUP_SIZE 100000)
""")

# パーティション分割して書き出し
duckdb.sql("""
    COPY (SELECT * FROM df)
    TO 'output/partitioned'
    (FORMAT PARQUET, PARTITION_BY (category), COMPRESSION 'zstd')
""")

print("Parquetファイルへの書き出しが完了しました。")

Parquetのメタデータ活用

Parquetフォーマットの大きな利点は、ファイルのメタデータにカラムの統計情報(最小値・最大値など)が格納されていることです。DuckDBはこの情報を賢く活用して、不要な行グループの読み飛ばし(Predicate Pushdown)を行います。これが速さの秘訣の一つですね。

import duckdb

# メタデータを確認
result = duckdb.sql("""
    SELECT *
    FROM parquet_metadata('output/sales_data.parquet')
""")
result.show()

# スキーマ情報の確認
result = duckdb.sql("""
    SELECT *
    FROM parquet_schema('output/sales_data.parquet')
""")
result.show()

パフォーマンス比較:DuckDB vs pandas

DuckDBの性能を実感するために、pandasとのベンチマーク比較をやってみましょう。結論から言うと、大規模データでの集計処理においてDuckDBは圧倒的です。

ベンチマークコード

import duckdb
import pandas as pd
import numpy as np
import time

# テストデータの生成(1000万行)
np.random.seed(42)
n = 10_000_000

df = pd.DataFrame({
    'user_id': np.random.randint(1, 100_000, n),
    'product_id': np.random.randint(1, 10_000, n),
    'category': np.random.choice(
        ['Electronics', 'Clothing', 'Food', 'Books', 'Sports'], n
    ),
    'amount': np.round(np.random.uniform(100, 100000, n), 2),
    'timestamp': pd.date_range('2024-01-01', periods=n, freq='s')
})

print(f"データ件数: {len(df):,}行")
print(f"メモリ使用量: {df.memory_usage(deep=True).sum() / 1024**2:.1f} MB")

# --- pandas での集計 ---
start = time.time()
result_pandas = (
    df.groupby('category')
    .agg(
        count=('amount', 'count'),
        total=('amount', 'sum'),
        avg=('amount', 'mean'),
        max_amount=('amount', 'max')
    )
    .sort_values('total', ascending=False)
)
pandas_time = time.time() - start
print(f"\npandas処理時間: {pandas_time:.3f}秒")

# --- DuckDB での集計 ---
start = time.time()
result_duckdb = duckdb.sql("""
    SELECT
        category,
        COUNT(*) AS count,
        SUM(amount) AS total,
        ROUND(AVG(amount), 2) AS avg,
        MAX(amount) AS max_amount
    FROM df
    GROUP BY category
    ORDER BY total DESC
""").fetchdf()
duckdb_time = time.time() - start
print(f"DuckDB処理時間: {duckdb_time:.3f}秒")

print(f"\n速度比: DuckDBはpandasの約{pandas_time/duckdb_time:.1f}倍高速")

典型的なベンチマーク結果

上記のベンチマークを実行すると、だいたい以下のような結果になります(もちろん環境によって差はあります)。

  • 単純な集計(GROUP BY + SUM/AVG):DuckDBはpandasの約3〜5倍高速
  • 複雑なJOIN + 集計:DuckDBはpandasの約5〜10倍高速
  • Parquetファイルの直接クエリ:DuckDBはpandasの約10〜50倍高速(Predicate Pushdownの効果)
  • PyArrow経由でのデータ受け渡し:特定のケースで最大2900倍の高速化が報告されている

この差はデータサイズが大きくなるほど顕著になります。pandasは全データをメモリ上に行指向で保持するのでキャッシュ効率が悪くなりがちですが、DuckDBの列指向・ベクトル化処理はデータ量が増えてもスケーラブルに動作します。

複合ベンチマーク:JOIN + ウィンドウ関数

import duckdb
import pandas as pd
import numpy as np
import time

np.random.seed(42)
n = 5_000_000

df_sales = pd.DataFrame({
    'sale_id': range(n),
    'store_id': np.random.randint(1, 500, n),
    'product_id': np.random.randint(1, 5000, n),
    'revenue': np.round(np.random.uniform(500, 50000, n), 2),
    'sale_date': np.random.choice(pd.date_range('2024-01-01', '2024-12-31'), n)
})

df_stores = pd.DataFrame({
    'store_id': range(1, 500),
    'region': np.random.choice(['北海道', '東北', '関東', '中部', '関西', '九州'], 499),
    'store_type': np.random.choice(['直営', 'FC'], 499)
})

# DuckDBでの複合クエリ
start = time.time()
result = duckdb.sql("""
    WITH store_monthly AS (
        SELECT
            st.region,
            st.store_type,
            DATE_TRUNC('month', s.sale_date) AS month,
            SUM(s.revenue) AS monthly_revenue,
            COUNT(*) AS tx_count
        FROM df_sales s
        JOIN df_stores st ON s.store_id = st.store_id
        GROUP BY st.region, st.store_type, DATE_TRUNC('month', s.sale_date)
    )
    SELECT
        region,
        store_type,
        month,
        monthly_revenue,
        tx_count,
        SUM(monthly_revenue) OVER (
            PARTITION BY region, store_type
            ORDER BY month
        ) AS cumulative_revenue,
        RANK() OVER (
            PARTITION BY month
            ORDER BY monthly_revenue DESC
        ) AS revenue_rank
    FROM store_monthly
    ORDER BY month, revenue_rank
""").fetchdf()
duckdb_time = time.time() - start
print(f"DuckDB処理時間: {duckdb_time:.3f}秒")
print(f"結果行数: {len(result):,}")

ウィンドウ関数とCTE:高度な分析SQL

DuckDBは標準SQLに準拠しつつ、分析に便利な拡張構文も多数サポートしています。ここでは実務でよく使うウィンドウ関数、CTE(共通テーブル式)、そしてDuckDB独自の便利な構文を紹介します。

ウィンドウ関数の活用

import duckdb

duckdb.sql("""
    CREATE OR REPLACE TABLE daily_sales AS
    SELECT
        date '2025-01-01' + INTERVAL (i) DAY AS sale_date,
        (random() * 500000 + 100000)::INTEGER AS revenue
    FROM generate_series(0, 89) t(i)
""")

# 移動平均、累計、前日比
result = duckdb.sql("""
    SELECT
        sale_date,
        revenue,
        -- 7日間移動平均
        ROUND(AVG(revenue) OVER (
            ORDER BY sale_date
            ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
        ), 0) AS moving_avg_7d,
        -- 累計売上
        SUM(revenue) OVER (
            ORDER BY sale_date
        ) AS cumulative_revenue,
        -- 前日比
        ROUND(
            100.0 * revenue / LAG(revenue) OVER (ORDER BY sale_date) - 100,
            1
        ) AS day_over_day_pct,
        -- ランク(売上順位)
        RANK() OVER (ORDER BY revenue DESC) AS revenue_rank
    FROM daily_sales
    ORDER BY sale_date
""")
result.show(max_rows=20)

CTEを使った複雑な分析

CTEを使うと、複雑な分析クエリもステップごとに分解できて可読性が格段に上がります。

import duckdb

result = duckdb.sql("""
    -- 再帰CTEを使ったカレンダー生成
    WITH RECURSIVE calendar AS (
        SELECT DATE '2025-01-01' AS dt
        UNION ALL
        SELECT dt + INTERVAL 1 DAY
        FROM calendar
        WHERE dt < DATE '2025-03-31'
    ),
    -- ランダムな売上データの生成
    sales_data AS (
        SELECT
            dt AS sale_date,
            CASE WHEN DAYOFWEEK(dt) IN (0, 6)
                 THEN (random() * 200000)::INTEGER
                 ELSE (random() * 500000 + 100000)::INTEGER
            END AS revenue
        FROM calendar
    ),
    -- 週次集計
    weekly_summary AS (
        SELECT
            DATE_TRUNC('week', sale_date) AS week_start,
            SUM(revenue) AS weekly_revenue,
            AVG(revenue) AS daily_avg,
            COUNT(*) AS days_count
        FROM sales_data
        GROUP BY DATE_TRUNC('week', sale_date)
    )
    SELECT
        week_start,
        weekly_revenue,
        ROUND(daily_avg, 0) AS daily_avg,
        ROUND(100.0 * weekly_revenue / LAG(weekly_revenue)
              OVER (ORDER BY week_start) - 100, 1) AS wow_growth_pct
    FROM weekly_summary
    ORDER BY week_start
""")
result.show()

DuckDB独自の便利構文

DuckDBには、標準SQLにはない独自の構文がいくつかあって、これがなかなか便利なんです。特にGROUP BY ALLEXCLUDE句は、一度使うと手放せなくなります。

import duckdb

# EXCLUDE句:特定の列を除外して選択
duckdb.sql("""
    SELECT * EXCLUDE (internal_id, debug_flag)
    FROM read_csv('data.csv')
""")

# REPLACE句:列の値を変換して選択
duckdb.sql("""
    SELECT * REPLACE (ROUND(price, 0) AS price)
    FROM read_csv('products.csv')
""")

# COLUMNS式:パターンマッチで列を選択
duckdb.sql("""
    SELECT COLUMNS('.*_amount')
    FROM read_csv('financial_data.csv')
""")

# GROUP BY ALL:SELECT句の非集計列を自動でGROUP BY
duckdb.sql("""
    SELECT category, region, SUM(amount), AVG(price)
    FROM sales
    GROUP BY ALL
""")

# ORDER BY ALL:全列でソート
duckdb.sql("""
    SELECT category, COUNT(*) AS cnt
    FROM sales
    GROUP BY category
    ORDER BY ALL DESC
""")

# TRY式:エラーをNULLに変換(DuckDB 1.3+)
duckdb.sql("""
    SELECT TRY('abc'::INTEGER) AS safe_cast
    -- 結果: NULL(エラーにならない)
""")

リモートファイルへのクエリ:S3・HTTPSサポート

DuckDBのhttpfs拡張機能を使えば、Amazon S3やHTTPS上のファイルに対して直接クエリを実行できます。ファイル全体をダウンロードせず、必要な部分だけを読み込むパーシャルリーディングで効率的にリモートデータにアクセスできるのが特徴です。

httpfs拡張のインストールと利用

import duckdb

con = duckdb.connect()

# httpfs拡張のインストールとロード
con.execute("INSTALL httpfs")
con.execute("LOAD httpfs")

# HTTPS経由でのParquetファイルクエリ
result = con.sql("""
    SELECT COUNT(*), AVG(value)
    FROM read_parquet(
        'https://example.com/data/sample.parquet'
    )
""")
result.show()

S3上のファイルへのアクセス

import duckdb

con = duckdb.connect()
con.execute("INSTALL httpfs")
con.execute("LOAD httpfs")

# AWS認証情報の設定
con.execute("""
    SET s3_region = 'ap-northeast-1';
    SET s3_access_key_id = 'YOUR_ACCESS_KEY';
    SET s3_secret_access_key = 'YOUR_SECRET_KEY';
""")

# S3上のParquetファイルを直接クエリ
result = con.sql("""
    SELECT
        DATE_TRUNC('month', event_date) AS month,
        event_type,
        COUNT(*) AS event_count
    FROM read_parquet('s3://my-data-lake/events/year=2025/**/*.parquet',
                      hive_partitioning=true)
    WHERE event_date >= '2025-01-01'
    GROUP BY ALL
    ORDER BY month, event_count DESC
""")
result.show()

con.close()

シークレット管理によるセキュアな認証

DuckDB 1.3以降では、認証情報をシークレットとして管理する機能が導入されました。クエリテキストに認証情報を直接書かなくてよくなるので、セキュリティ面でだいぶ安心できます。

import duckdb

con = duckdb.connect()
con.execute("INSTALL httpfs")
con.execute("LOAD httpfs")

# シークレットの作成(認証情報をセキュアに管理)
con.execute("""
    CREATE OR REPLACE SECRET my_s3_secret (
        TYPE s3,
        PROVIDER config,
        KEY_ID 'YOUR_ACCESS_KEY',
        SECRET 'YOUR_SECRET_KEY',
        REGION 'ap-northeast-1'
    )
""")

# シークレットを使って自動的に認証
result = con.sql("""
    SELECT *
    FROM read_parquet('s3://my-bucket/data/*.parquet')
    LIMIT 10
""")
result.show()

con.close()

DuckDB 1.4 LTSの新機能

2025年9月にリリースされたDuckDB 1.4.x LTS(コードネーム「Andium」)は、DuckDB初のLTS(Long Term Support)リリースです。1年間のコミュニティサポートが保証されていて、安定性を最重視する本番環境にぴったりです。主要な新機能を見ていきましょう。

データベース暗号化(AES-256 GCM)

DuckDB 1.4の目玉機能の一つが、データベースファイル全体の暗号化です。AES-256 GCMアルゴリズムを使って、メインデータベースファイル、WAL、一時ファイルまで含めてデータを強力に保護します。機密データを扱うプロジェクトでは待望の機能ですね。

import duckdb

# 暗号化されたデータベースの作成
con = duckdb.connect()
con.execute("INSTALL encryption")
con.execute("LOAD encryption")

# パスフレーズを指定してデータベースをアタッチ
con.execute("""
    ATTACH 'secure_analytics.duckdb' AS secure_db (
        TYPE DUCKDB,
        ENCRYPTION_KEY 'my_very_secure_passphrase_2025'
    )
""")

# 暗号化されたデータベースにテーブルを作成
con.execute("""
    CREATE TABLE secure_db.sensitive_data (
        id INTEGER,
        customer_name VARCHAR,
        credit_score INTEGER,
        annual_income DECIMAL(12, 2)
    )
""")

con.execute("""
    INSERT INTO secure_db.sensitive_data VALUES
    (1, '田中太郎', 750, 8500000),
    (2, '鈴木花子', 680, 6200000)
""")

# 暗号化データベースからの読み込み
result = con.execute("SELECT * FROM secure_db.sensitive_data").fetchdf()
print(result)

con.close()

MERGE INTO構文

MERGE INTOは、いわゆる「UPSERT」操作をSQL標準の構文で実現する機能です。ETLパイプラインでの差分更新をやるときに、これがあるとないとでは大違いです。

import duckdb

con = duckdb.connect()

# ターゲットテーブル(既存データ)
con.execute("""
    CREATE TABLE products (
        product_id INTEGER PRIMARY KEY,
        name VARCHAR,
        price DECIMAL(10, 2),
        stock INTEGER,
        updated_at TIMESTAMP
    )
""")

con.execute("""
    INSERT INTO products VALUES
    (1, 'ノートPC', 89800, 50, TIMESTAMP '2025-01-01 00:00:00'),
    (2, 'モニター', 34500, 120, TIMESTAMP '2025-01-01 00:00:00'),
    (3, 'キーボード', 8900, 200, TIMESTAMP '2025-01-01 00:00:00')
""")

# ソーステーブル(更新データ)
con.execute("""
    CREATE TEMPORARY TABLE product_updates AS
    SELECT * FROM (VALUES
        (2, 'モニター 4K', 42000, 80, CURRENT_TIMESTAMP),
        (4, 'Webカメラ', 12800, 60, CURRENT_TIMESTAMP)
    ) AS t(product_id, name, price, stock, updated_at)
""")

# MERGE INTOで差分更新
con.execute("""
    MERGE INTO products AS target
    USING product_updates AS source
    ON target.product_id = source.product_id
    WHEN MATCHED THEN
        UPDATE SET
            name = source.name,
            price = source.price,
            stock = source.stock,
            updated_at = source.updated_at
    WHEN NOT MATCHED THEN
        INSERT VALUES (
            source.product_id, source.name, source.price,
            source.stock, source.updated_at
        )
""")

# 結果を確認
con.execute("SELECT * FROM products ORDER BY product_id").show()
con.close()

Apache Icebergへの書き込みサポート

DuckDB 1.4では、Apache Icebergテーブルへの書き込みサポートが追加されました。INSERT、UPDATE、DELETEのすべてがサポートされていて、DuckDBをデータレイクハウスのETLツールとして使える道が開けています。

その他の改善点

  • ソート処理の刷新:k-wayマージ方式への書き直しによるソート性能の向上
  • CTEのデフォルトマテリアライズ:CTEが自動的にマテリアライズされ、繰り返し参照時のパフォーマンスが改善
  • CLIの進捗バー:長時間クエリ実行時にETAを表示するプログレスバーが追加(地味にありがたい)
  • FILL ウィンドウ関数:欠損値の補間に使える新しいウィンドウ関数
  • DuckDB 1.3から引き継がれた機能:Python風のラムダ構文、UUID v7サポート、リモートファイルキャッシュ

実践プロジェクト:売上データのETL分析パイプライン

ここまでに学んだDuckDBの機能を組み合わせて、実践的なETL分析パイプラインを構築してみましょう。売上データを複数ソースから取り込み、変換・集計して分析レポートを生成するというシナリオです。

プロジェクト概要

以下のようなシナリオを想定しています。

  • 複数のCSVファイルに分散した売上データを取り込む(Extract)
  • データのクレンジングと変換を行う(Transform)
  • 集計結果をParquetファイルとして出力する(Load)
  • 分析レポートを生成する

ステップ1:テストデータの生成

import duckdb
import pandas as pd
import numpy as np
import os

# 出力ディレクトリの作成
os.makedirs('etl_project/raw', exist_ok=True)
os.makedirs('etl_project/processed', exist_ok=True)
os.makedirs('etl_project/reports', exist_ok=True)

np.random.seed(42)

# 商品マスタの生成
products = pd.DataFrame({
    'product_id': range(1, 51),
    'product_name': [f'商品_{i:03d}' for i in range(1, 51)],
    'category': np.random.choice(
        ['家電', '衣料品', '食品', '書籍', 'スポーツ用品'], 50
    ),
    'unit_price': np.random.choice(
        [980, 1980, 3980, 5980, 9800, 19800, 39800, 59800], 50
    )
})
products.to_csv('etl_project/raw/products.csv', index=False)

# 店舗マスタの生成
stores = pd.DataFrame({
    'store_id': range(1, 21),
    'store_name': [f'店舗_{chr(65+i)}' for i in range(20)],
    'region': np.random.choice(
        ['北海道', '東北', '関東', '中部', '関西', '中国', '四国', '九州'], 20
    ),
    'store_type': np.random.choice(['直営', 'FC', 'オンライン'], 20)
})
stores.to_csv('etl_project/raw/stores.csv', index=False)

# 月別売上データの生成(12ヶ月分)
for month in range(1, 13):
    n_records = np.random.randint(8000, 15000)
    days_in_month = pd.Period(f'2024-{month:02d}').days_in_month

    sales = pd.DataFrame({
        'sale_id': range(n_records),
        'sale_date': np.random.choice(
            pd.date_range(f'2024-{month:02d}-01', periods=days_in_month),
            n_records
        ),
        'store_id': np.random.randint(1, 21, n_records),
        'product_id': np.random.randint(1, 51, n_records),
        'quantity': np.random.randint(1, 20, n_records),
        'discount_rate': np.random.choice(
            [0, 0, 0, 0.05, 0.1, 0.15, 0.2], n_records
        )
    })
    sales.to_csv(
        f'etl_project/raw/sales_2024_{month:02d}.csv',
        index=False
    )

print("テストデータの生成が完了しました。")

ステップ2:ETLパイプラインの実装

import duckdb
import time

class SalesETLPipeline:
    """売上データのETL分析パイプライン"""

    def __init__(self, db_path='etl_project/analytics.duckdb'):
        self.con = duckdb.connect(db_path)
        self._setup()

    def _setup(self):
        """初期設定"""
        self.con.execute("SET threads TO 4")
        self.con.execute("SET memory_limit = '2GB'")

    def extract(self):
        """データの取り込み(Extract)"""
        print("=" * 50)
        print("EXTRACT フェーズ開始")
        print("=" * 50)

        start = time.time()

        # 商品マスタの取り込み
        self.con.execute("""
            CREATE OR REPLACE TABLE products AS
            SELECT * FROM read_csv('etl_project/raw/products.csv')
        """)

        # 店舗マスタの取り込み
        self.con.execute("""
            CREATE OR REPLACE TABLE stores AS
            SELECT * FROM read_csv('etl_project/raw/stores.csv')
        """)

        # 売上データの一括取り込み(ワイルドカード使用)
        self.con.execute("""
            CREATE OR REPLACE TABLE raw_sales AS
            SELECT * FROM read_csv('etl_project/raw/sales_2024_*.csv')
        """)

        sales_count = self.con.execute(
            "SELECT COUNT(*) FROM raw_sales"
        ).fetchone()[0]

        elapsed = time.time() - start
        print(f"  売上データ: {sales_count:,}件 ({elapsed:.2f}秒)")
        return self

    def transform(self):
        """データの変換(Transform)"""
        print("\nTRANSFORM フェーズ開始")

        start = time.time()

        # 売上明細テーブルの作成(JOIN + 計算列の追加)
        self.con.execute("""
            CREATE OR REPLACE TABLE sales_detail AS
            SELECT
                s.sale_id,
                s.sale_date,
                st.store_name,
                st.region,
                st.store_type,
                p.product_name,
                p.category,
                s.quantity,
                p.unit_price,
                s.discount_rate,
                (s.quantity * p.unit_price) AS gross_amount,
                (s.quantity * p.unit_price * (1 - s.discount_rate))
                    AS net_amount,
                EXTRACT(MONTH FROM s.sale_date) AS sale_month
            FROM raw_sales s
            JOIN products p ON s.product_id = p.product_id
            JOIN stores st ON s.store_id = st.store_id
        """)

        elapsed = time.time() - start
        print(f"  変換完了 ({elapsed:.2f}秒)")
        return self

    def load(self):
        """データの出力(Load)"""
        print("\nLOAD フェーズ開始")

        start = time.time()

        # Parquetで出力(カテゴリ別パーティション)
        self.con.execute("""
            COPY (SELECT * FROM sales_detail)
            TO 'etl_project/processed/sales_detail'
            (FORMAT PARQUET, PARTITION_BY (category),
             COMPRESSION 'zstd')
        """)

        elapsed = time.time() - start
        print(f"  Parquet出力完了 ({elapsed:.2f}秒)")
        return self

    def analyze(self):
        """分析レポートの生成"""
        print("\n【地域別月次売上レポート】")
        self.con.execute("""
            SELECT
                region AS 地域,
                sale_month AS 月,
                ROUND(SUM(net_amount) / 10000, 0) AS 売上_万円
            FROM sales_detail
            GROUP BY region, sale_month
            ORDER BY region, sale_month
        """).show(max_rows=20)

        print("\n【カテゴリ別パフォーマンス】")
        self.con.execute("""
            SELECT
                category AS カテゴリ,
                COUNT(*) AS 取引数,
                ROUND(SUM(net_amount) / 10000, 0) AS 売上_万円,
                ROUND(AVG(net_amount), 0) AS 平均単価
            FROM sales_detail
            GROUP BY category
            ORDER BY 売上_万円 DESC
        """).show()
        return self

    def close(self):
        self.con.close()
        print("\nパイプライン完了。")


# パイプラインの実行
pipeline = SalesETLPipeline()
pipeline.extract().transform().load().analyze()
pipeline.close()

まとめとベストプラクティス

ここまで、DuckDBとPythonを組み合わせたデータ分析の手法を基礎から応用まで見てきました。最後にDuckDBを効果的に使うためのベストプラクティスをまとめておきます。

DuckDBを選ぶべきユースケース

  • ローカルでの大規模データ分析:数GB〜数十GBのデータを手元のマシンで分析する場合
  • pandasの限界を感じたとき:メモリ不足やパフォーマンス低下に直面している場合
  • ETLパイプラインの構築:CSV/Parquetファイルの変換・集計処理
  • データレイクのアドホック分析:S3上のParquetファイルに対する対話的なクエリ
  • SQLベースの分析:SQLに習熟したチームでの分析ワークフロー

パフォーマンス最適化のポイント

  1. Parquetフォーマットを積極的に活用する:CSVに比べて圧倒的に高速。列指向同士の相性で、Predicate Pushdownも効く
  2. パーティション分割を活用する:頻繁にフィルタリングする列(日付、カテゴリなど)でパーティションを分割すると、不要なファイルの読み込みをスキップできる
  3. 適切なデータ型を使用するVARCHARよりもENUM、大きすぎる数値型よりも適切なサイズの型を選ぶ
  4. メモリ設定を調整するSET memory_limitで利用可能なメモリを明示的に設定する
  5. スレッド数を調整するSET threadsで並列度を環境に応じて設定する

コーディングのベストプラクティス

import duckdb

# 1. コンテキストマネージャーでの接続管理
with duckdb.connect('my_analytics.duckdb') as con:
    result = con.execute("SELECT 42 AS answer").fetchone()
    print(result)

# 2. パラメータ化クエリでSQLインジェクション防止
con = duckdb.connect()
con.execute("""
    CREATE TABLE users (id INTEGER, name VARCHAR, age INTEGER)
""")
con.execute(
    "INSERT INTO users VALUES (?, ?, ?)",
    [1, '田中太郎', 30]
)

# 複数行の挿入
con.executemany(
    "INSERT INTO users VALUES (?, ?, ?)",
    [(2, '鈴木花子', 25), (3, '佐藤健一', 35)]
)

# 3. エラーハンドリング
try:
    result = con.execute("SELECT * FROM nonexistent_table")
except duckdb.CatalogException as e:
    print(f"テーブルが見つかりません: {e}")
except duckdb.Error as e:
    print(f"DuckDBエラー: {e}")

con.close()

DuckDBのエコシステム

DuckDBのエコシステムは急速に拡大しています。主要な拡張機能は以下の通りです。

  • httpfs:S3/HTTPS上のファイルへの直接クエリ
  • spatial:地理空間データの処理
  • iceberg:Apache Icebergテーブルの読み書き
  • postgres_scanner:PostgreSQLからの直接データ読み込み
  • sqlite_scanner:SQLiteデータベースからの直接読み込み
  • encryption:データベースファイルの暗号化

DuckDB 1.4 LTSの登場により、プロダクション環境での採用がさらに加速していくでしょう。従来pandasやSparkが担っていた分析ワークロードの一部が、よりシンプルで高速なDuckDBに置き換わっていくのは間違いなさそうです。

DuckDBは、「データ分析の民主化」を推進する強力なツールだと感じています。サーバーの構築もクラウドサービスの契約も不要で、pip install duckdbの一行から始められる手軽さは本当に魅力的です。ぜひこの記事のコード例を実際に動かしながら、DuckDBの威力を体感してみてください。

著者について Editorial Team

Our team of expert writers and editors.