データサイエンスや機械学習のプロジェクトをやっていると、作業時間の7〜8割がデータの前処理・クリーニングに消えていく——これ、よく言われる話ですよね。欠損値の補完、外れ値の検出、型変換、重複の除去。正直、毎回手作業でやるのはしんどいです。
そこでこの記事では、pandas 3.0(2026年1月リリース)の pipe() メソッドとメソッドチェーンを使って、再利用可能なデータクリーニングパイプラインを構築する方法を紹介します。scikit-learnの ColumnTransformer との連携も扱うので、機械学習の前処理までひと通りカバーできるはずです。
なぜデータクリーニングのパイプライン化が重要なのか
「毎回同じようなクリーニング処理を書いている気がする……」と感じたことはありませんか?パイプライン化すれば、その悩みがかなり解消されます。
- 再利用性:一度構築したクリーニング処理を、別のデータセットにもそのまま使い回せる
- 再現性:処理ステップがコードとして残るので、結果の再現が簡単
- 可読性:メソッドチェーンで書くと、処理の流れがトップダウンに読める
- 保守性:各ステップが独立した関数なので、修正やテストがしやすい
- データリーク防止:scikit-learn のパイプラインと組み合わせれば、学習データとテストデータの分離を確実にできる
環境セットアップ
この記事のコードを動かすには、以下のパッケージが必要です。
pip install pandas>=3.0.1 numpy scikit-learn matplotlib pyarrow
pandas 3.0 では PyArrow がデフォルトの文字列バックエンドになっているので、PyArrow も一緒にインストールしておきましょう。
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
print(f"pandas version: {pd.__version__}")
# pandas version: 3.0.1
pandas 3.0 のデータクリーニングに関わる新機能
pandas 3.0(2026年1月リリース)では、データクリーニングに直結するいくつかの重要な変更がありました。ここでは特に影響の大きい2つを取り上げます。
新しいデフォルト文字列型(str 型)
従来、文字列列のデフォルト型は object でした。pandas 3.0 では、PyArrow バックエンドの専用 str 型がデフォルトになっています。メモリ使用量が減り、処理速度も上がるので、これは嬉しい変更です。
# pandas 3.0 では文字列が自動的にstr型として推論される
df = pd.DataFrame({"name": ["田中", "佐藤", None, "鈴木"]})
print(df["name"].dtype)
# string(PyArrowバックエンド)
# 欠損値はNaNとして扱われる(pd.NAではなく)
print(df["name"].isna())
# 0 False
# 1 False
# 2 True
# 3 False
Copy-on-Write(CoW)のデフォルト有効化
もう一つ大きいのが、Copy-on-Write がデフォルトで有効になったことです。DataFrame のスライスを変更しても元のデータには影響しなくなるので、パイプライン内で安心してデータを加工できます。
個人的には、これが pandas 3.0 で一番ありがたい変更だと思っています。以前は「うっかり元データを書き換えてしまった」事故がありましたが、もうその心配は不要です。
# Copy-on-Writeにより、元データが安全に保持される
df_original = pd.DataFrame({"value": [1, 2, 3, 4, 5]})
df_subset = df_original[df_original["value"] > 2]
# df_subsetを変更してもdf_originalには影響しない
Step 1:データの基本確認関数を作る
では、実際にパイプラインを組んでいきましょう。最初のステップは、データの全体像を把握する関数です。
欠損値の数、データ型の分布、重複行の有無——これらを一発で確認できると便利ですよね。
def inspect_data(df: pd.DataFrame, label: str = "データ確認") -> pd.DataFrame:
"""データの概要を表示し、そのままDataFrameを返す"""
print(f"=== {label} ===")
print(f"行数: {len(df):,} 列数: {df.shape[1]}")
print(f"重複行: {df.duplicated().sum():,}")
print(f"\n--- 欠損値 ---")
missing = df.isna().sum()
missing_pct = (missing / len(df) * 100).round(1)
missing_info = pd.DataFrame({
"欠損数": missing,
"欠損率(%)": missing_pct
})
print(missing_info[missing_info["欠損数"] > 0])
print(f"\n--- データ型 ---")
print(df.dtypes.value_counts())
return df
ポイントは、この関数が DataFrame を受け取ってそのまま返すこと。これが pipe() で使うための大事な約束事です。
Step 2:欠損値を処理する関数
欠損値の処理は、データクリーニングの中で最も基本的で、かつ最も頭を悩ませるステップかもしれません。数値列と文字列列で戦略を分ける関数を作ります。
欠損値の戦略を列単位で設定する
def handle_missing_values(
df: pd.DataFrame,
numeric_strategy: str = "median",
categorical_fill: str = "不明",
drop_threshold: float = 0.5
) -> pd.DataFrame:
"""
欠損値を処理する
- 欠損率がdrop_thresholdを超える列は削除
- 数値列: numeric_strategyで補完(mean, median, zero)
- カテゴリ列: categorical_fillで補完
"""
df = df.copy()
# 欠損率が高すぎる列を削除
missing_rate = df.isna().mean()
cols_to_drop = missing_rate[missing_rate > drop_threshold].index.tolist()
if cols_to_drop:
print(f"欠損率 {drop_threshold*100}% 超のため削除: {cols_to_drop}")
df = df.drop(columns=cols_to_drop)
# 数値列の補完
numeric_cols = df.select_dtypes(include="number").columns
for col in numeric_cols:
if df[col].isna().any():
if numeric_strategy == "median":
fill_value = df[col].median()
elif numeric_strategy == "mean":
fill_value = df[col].mean()
else:
fill_value = 0
df[col] = df[col].fillna(fill_value)
# カテゴリ・文字列列の補完
cat_cols = df.select_dtypes(include=["object", "string", "category"]).columns
for col in cat_cols:
if df[col].isna().any():
df[col] = df[col].fillna(categorical_fill)
return df
drop_threshold=0.5 は「欠損率50%を超えたら列ごと削除」という意味です。このあたりの閾値はプロジェクトによって調整してください。
Step 3:外れ値を検出・処理する関数
外れ値の処理も避けて通れないステップです。IQR(四分位範囲)法とZスコア法が定番ですが、ここではIQR法をメインに紹介します。
def handle_outliers(
df: pd.DataFrame,
columns: list[str] | None = None,
method: str = "iqr",
factor: float = 1.5,
action: str = "clip"
) -> pd.DataFrame:
"""
外れ値を処理する
- method: "iqr"(IQR法)または "zscore"(Zスコア法)
- action: "clip"(上下限に丸める)または "drop"(行を削除)
"""
df = df.copy()
if columns is None:
columns = df.select_dtypes(include="number").columns.tolist()
for col in columns:
if method == "iqr":
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
lower = q1 - factor * iqr
upper = q3 + factor * iqr
elif method == "zscore":
mean = df[col].mean()
std = df[col].std()
lower = mean - factor * std
upper = mean + factor * std
outlier_count = ((df[col] < lower) | (df[col] > upper)).sum()
if outlier_count > 0:
print(f"{col}: 外れ値 {outlier_count} 件を検出")
if action == "clip":
df[col] = df[col].clip(lower=lower, upper=upper)
elif action == "drop":
df = df[(df[col] >= lower) & (df[col] <= upper)]
return df
経験上、action="clip"(外れ値を上下限に丸める)の方が、行を丸ごと消すより安全なことが多いです。ただし、明らかなデータ入力ミス(年齢200歳とか)なら "drop" もアリですね。
Step 4:重複データの除去と型変換
重複除去と型変換は比較的シンプルですが、パイプラインの一部として関数にしておくと、後で組み合わせやすくなります。
def remove_duplicates(
df: pd.DataFrame,
subset: list[str] | None = None,
keep: str = "first"
) -> pd.DataFrame:
"""重複行を削除する"""
n_before = len(df)
df = df.drop_duplicates(subset=subset, keep=keep)
n_removed = n_before - len(df)
if n_removed > 0:
print(f"重複行 {n_removed:,} 件を削除")
return df
def convert_dtypes(
df: pd.DataFrame,
datetime_cols: list[str] | None = None,
category_cols: list[str] | None = None
) -> pd.DataFrame:
"""データ型を適切に変換する"""
df = df.copy()
if datetime_cols:
for col in datetime_cols:
df[col] = pd.to_datetime(df[col], errors="coerce")
if category_cols:
for col in category_cols:
df[col] = df[col].astype("category")
return df
Step 5:pipe() でパイプラインを組み立てる
さて、ここからが本題です。ここまでで作った関数たちを pipe() で繋げて、一気にクリーニングパイプラインを組み立てましょう。
まずはサンプルデータを用意します。
# サンプルデータの作成
np.random.seed(42)
n = 1000
raw_data = pd.DataFrame({
"customer_id": range(1, n + 1),
"age": np.random.normal(35, 10, n).astype(int),
"income": np.random.lognormal(13, 0.8, n),
"category": np.random.choice(["A", "B", "C", None], n),
"signup_date": pd.date_range("2023-01-01", periods=n, freq="D").astype(str),
"score": np.random.uniform(0, 100, n)
})
# 欠損値をランダムに挿入
mask = np.random.random(raw_data.shape) < 0.05
raw_data = raw_data.mask(mask.astype(bool) & (raw_data.columns != "customer_id"))
# 外れ値を挿入
raw_data.loc[0, "age"] = 200
raw_data.loc[1, "income"] = -5000
# 重複行を挿入
raw_data = pd.concat([raw_data, raw_data.iloc[:5]], ignore_index=True)
print(f"元データ: {raw_data.shape}")
# 元データ: (1005, 6)
そして、pipe() でパイプラインを実行します。
# pipe() でパイプラインを実行
cleaned_data = (
raw_data
.pipe(inspect_data, label="クリーニング前")
.pipe(remove_duplicates)
.pipe(handle_missing_values, numeric_strategy="median")
.pipe(handle_outliers, method="iqr", action="clip")
.pipe(convert_dtypes,
datetime_cols=["signup_date"],
category_cols=["category"])
.pipe(inspect_data, label="クリーニング後")
)
print(f"\nクリーニング後: {cleaned_data.shape}")
print(cleaned_data.dtypes)
どうでしょう。処理の流れが上から下にスッと読めますよね。これが pipe() の最大の魅力です。
比較のために、従来のネスト型の書き方も見てみましょう。
# ❌ ネスト型(読みにくい)
result = convert_dtypes(
handle_outliers(
handle_missing_values(
remove_duplicates(raw_data)
)
),
datetime_cols=["signup_date"]
)
# ✅ pipe() チェーン型(読みやすい)
result = (
raw_data
.pipe(remove_duplicates)
.pipe(handle_missing_values)
.pipe(handle_outliers)
.pipe(convert_dtypes, datetime_cols=["signup_date"])
)
ネスト型だと、処理が内側から外側に向かって読まないといけないので、ステップが増えるほど辛くなります。
assign() と query() を組み合わせた高度なチェーン
pipe() だけでなく、assign() や query() もメソッドチェーンに組み込めます。特に assign() で新しい列を作りながらチェーンを続けるパターンは、実務でもかなり使います。
# assign() で新しい列を追加しながらチェーン
analysis_ready = (
cleaned_data
.assign(
signup_year=lambda df: df["signup_date"].dt.year,
signup_month=lambda df: df["signup_date"].dt.month,
income_log=lambda df: np.log1p(df["income"]),
age_group=lambda df: pd.cut(
df["age"],
bins=[0, 25, 35, 45, 60, 100],
labels=["~25", "26-35", "36-45", "46-60", "61~"]
)
)
.query("age >= 18 and income > 0")
)
print(analysis_ready[["age", "age_group", "income", "income_log"]].head())
デバッグ用デコレータで中間状態を確認する
長いメソッドチェーンを書いていると、「あれ、どのステップで行数が減ったんだっけ?」となることがあります。そんなときに便利なのが、デコレータを使った中間ログの出力です。
import functools
def log_step(func):
"""パイプラインの各ステップの状態をログ出力するデコレータ"""
@functools.wraps(func)
def wrapper(df, *args, **kwargs):
result = func(df, *args, **kwargs)
print(f"[{func.__name__}] "
f"行数: {len(df)} → {len(result)}, "
f"列数: {df.shape[1]} → {result.shape[1]}, "
f"欠損合計: {result.isna().sum().sum()}")
return result
return wrapper
# デコレータを関数に適用
@log_step
def remove_duplicates_logged(df, **kwargs):
return remove_duplicates(df, **kwargs)
@log_step
def handle_missing_logged(df, **kwargs):
return handle_missing_values(df, **kwargs)
# デバッグモードでパイプラインを実行
debug_result = (
raw_data
.pipe(remove_duplicates_logged)
.pipe(handle_missing_logged, numeric_strategy="median")
.pipe(handle_outliers, method="iqr", action="clip")
)
このデコレータを挟むだけで、各ステップの行数変化と欠損値の残量が分かります。本番コードに入れるものではありませんが、開発中のデバッグにはかなり重宝します。
scikit-learn の ColumnTransformer と連携する
pandas でデータを綺麗にしたら、次は機械学習モデルに渡すための前処理です。ここで scikit-learn の ColumnTransformer が活躍します。
ColumnTransformer の基本構造
数値列とカテゴリ列に対して、それぞれ別の前処理パイプラインを適用するのが基本パターンです。
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, FunctionTransformer
# 数値列用のパイプライン
numeric_pipeline = Pipeline([
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler())
])
# カテゴリ列用のパイプライン
categorical_pipeline = Pipeline([
("imputer", SimpleImputer(strategy="most_frequent")),
("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
])
# ColumnTransformerで列ごとに異なる処理を適用
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_pipeline,
make_column_selector(dtype_include="number")),
("cat", categorical_pipeline,
make_column_selector(dtype_include=["object", "string", "category"]))
],
remainder="drop"
)
pandas パイプラインと sklearn パイプラインの連携
ここからが一番おいしいところです。pandas の pipe() でクリーニングしたデータを、そのまま scikit-learn の Pipeline に流し込みます。
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# Step 1: pandasパイプラインでクリーニング
cleaned = (
raw_data
.pipe(remove_duplicates)
.pipe(handle_missing_values, numeric_strategy="median")
.pipe(handle_outliers, method="iqr", action="clip")
.pipe(convert_dtypes, category_cols=["category"])
)
# Step 2: 特徴量とターゲットを分離
X = cleaned.drop(columns=["customer_id", "signup_date", "score"])
y = (cleaned["score"] > 50).astype(int) # 二値分類用
# Step 3: 学習・テスト分割(前処理の前に分割するのが重要)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Step 4: sklearn パイプラインで前処理 + モデル
model_pipeline = Pipeline([
("preprocessor", preprocessor),
("classifier", RandomForestClassifier(n_estimators=100, random_state=42))
])
model_pipeline.fit(X_train, y_train)
score = model_pipeline.score(X_test, y_test)
print(f"テスト精度: {score:.3f}")
ここで重要なのが、Step 3 の順序です。必ず学習データとテストデータに分割してから前処理(fit)を行ってください。逆にすると、テストデータの情報がモデルに漏れてしまいます(いわゆるデータリーク)。scikit-learn の Pipeline に入れておけば、fit() 時にこの順序が自動で守られるのも大きなメリットです。
FunctionTransformer でカスタム処理を組み込む
「既存のTransformerにない処理をパイプラインに入れたい」——そんなときは FunctionTransformer の出番です。任意のPython関数をscikit-learnのTransformerに変換できます。
from sklearn.preprocessing import FunctionTransformer
# 対数変換を行うFunctionTransformer
log_transformer = FunctionTransformer(
func=np.log1p,
inverse_func=np.expm1
)
# カスタムクリーニング関数をTransformerにする
def clip_negative(X):
"""負の値をゼロにクリップする"""
return np.clip(X, 0, None)
clip_transformer = FunctionTransformer(func=clip_negative)
# パイプラインに組み込む
custom_numeric_pipeline = Pipeline([
("clip", clip_transformer),
("log", log_transformer),
("scaler", StandardScaler())
])
実践テンプレート:再利用可能なクリーニングクラス
最後に、ここまでの内容をまとめて、プロジェクト間で使い回せるクリーニングクラスを作ってみます。自分のプロジェクトでもこのパターンをベースにカスタマイズしていますが、なかなか便利です。
class DataCleaningPipeline:
"""再利用可能なデータクリーニングパイプライン"""
def __init__(self, config: dict | None = None):
self.config = config or {}
self.steps = []
self.log = []
def add_step(self, name: str, func, **kwargs):
"""処理ステップを追加する"""
self.steps.append((name, func, kwargs))
return self
def run(self, df: pd.DataFrame) -> pd.DataFrame:
"""パイプラインを実行する"""
result = df.copy()
for name, func, kwargs in self.steps:
n_before = len(result)
result = func(result, **kwargs)
n_after = len(result)
step_log = {
"step": name,
"rows_before": n_before,
"rows_after": n_after,
"missing_after": result.isna().sum().sum()
}
self.log.append(step_log)
print(f"✓ {name}: {n_before} → {n_after} 行")
return result
def get_log(self) -> pd.DataFrame:
"""実行ログをDataFrameで取得する"""
return pd.DataFrame(self.log)
# 使用例
pipeline = DataCleaningPipeline()
pipeline.add_step("重複除去", remove_duplicates)
pipeline.add_step("欠損値処理", handle_missing_values,
numeric_strategy="median")
pipeline.add_step("外れ値処理", handle_outliers,
method="iqr", action="clip")
pipeline.add_step("型変換", convert_dtypes,
datetime_cols=["signup_date"],
category_cols=["category"])
cleaned = pipeline.run(raw_data)
print(pipeline.get_log())
add_step() でステップを追加して run() で実行するだけなので、使い方はシンプルです。get_log() で各ステップの行数変化も確認できます。
パフォーマンスのヒント
大規模データセットを扱うときに意識しておきたいポイントをいくつか挙げておきます。
- PyArrow バックエンドの活用:pandas 3.0 では文字列に PyArrow がデフォルトで使われますが、
pd.read_csv()でdtype_backend="pyarrow"を指定すると全列を PyArrow 型で読み込めます。メモリ効率がかなり変わります - チャンク処理:メモリに収まらないデータは
pd.read_csv(chunksize=10000)でチャンク単位に処理しましょう。各チャンクにパイプラインを適用すればOKです - 不要な列の早期削除:パイプラインの最初で不要な列を落としておくと、以降のすべてのステップが速くなります。地味ですが効果は大きいです
- inplace は使わない:pandas 3.0 の Copy-on-Write により、
inplace=Trueのメリットはほぼなくなりました。メソッドチェーンとの相性も悪いので、使わないのが正解です
よくある質問(FAQ)
Q1:欠損値は削除と補完のどちらが良いですか?
ケースバイケースですが、目安はあります。欠損率が低く(5%未満)データ量が十分なら、行の削除(dropna())が最もシンプルです。欠損率が高かったり、1行ごとのデータが貴重な場合は、中央値・最頻値での補完や線形補間(interpolate())を検討してください。機械学習に使うなら、scikit-learn の SimpleImputer をパイプラインに組み込むのがベストです。
Q2:IQR法とZスコア法、どちらで外れ値を検出すべきですか?
IQR法は四分位数ベースなので、データの分布に依存しにくく堅牢です。正規分布に従わないデータにも使えます。Zスコア法は平均と標準偏差を使うため、正規分布に近いデータ向きです。迷ったらIQR法を選んでおけば、まず外れません(外れ値だけに)。
Q3:pandas の pipe() と scikit-learn の Pipeline の違いは何ですか?
pandas の pipe() は DataFrame の変換をチェーンするためのメソッドで、探索的分析やクリーニングに向いています。scikit-learn の Pipeline は fit() / transform() のインターフェースを持ち、学習とテストの分離を保証するので、機械学習の前処理に最適です。実務では、まず pipe() でデータを整えてから Pipeline でモデル用の前処理を行う、という流れが一般的です。
Q4:pandas 3.0 の新しい文字列型でクリーニング方法は変わりますか?
基本的な方法は変わりません。新しい str 型でも欠損値は NaN で表現されるので、isna() / fillna() はそのまま使えます。ただし、PyArrow バックエンドでは Unicode 以外の文字列を格納できないため、バイナリデータを含む列は dtype=object を明示する必要がある点だけ注意してください。
Q5:大規模データで pandas が遅い場合はどうすればよいですか?
まずは dtype_backend="pyarrow" を試してみてください。それでも足りない場合は、DuckDB(SQLベースの分析エンジン)や Polars(Rustベースの DataFrame ライブラリ)の導入を検討するのがおすすめです。どちらも pandas との互換性があり、大規模データで大幅な高速化が期待できます。