跳转至

数据采集与清洗

概述

数据是机器学习的燃料。数据质量直接决定模型性能。本文涵盖数据采集来源、ETL 管道、数据质量处理(缺失值、异常值、重复值)和常用工具。


1. 数据来源

来源 获取方式 典型数据
API REST/GraphQL 调用 社交媒体、金融行情、天气
Web 爬虫 HTML 解析、Selenium 电商价格、新闻、评论
数据库 SQL 查询 业务数据、用户行为
文件 CSV/JSON/Parquet 读取 公开数据集、日志
IoT 设备 MQTT/CoAP 协议 传感器数据
第三方数据 数据市场购买 征信、地理、行业数据
众包 Amazon Mechanical Turk 标注数据

API 数据采集示例

import requests
import time

def fetch_with_retry(url, params, max_retries=3):
    for attempt in range(max_retries):
        try:
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)  # 指数退避

# 分页采集
all_data = []
page = 1
while True:
    data = fetch_with_retry(
        "https://api.example.com/data",
        params={"page": page, "per_page": 100}
    )
    if not data["results"]:
        break
    all_data.extend(data["results"])
    page += 1

2. ETL 管道

ETL(Extract, Transform, Load)是数据工程的核心流程:

数据源 → Extract(提取)→ Transform(转换)→ Load(加载)→ 数据仓库
          │                 │                  │
          │ API/DB/文件     │ 清洗/转换/聚合   │ 写入目标

2.1 现代 ELT 模式

数据源 → Extract → Load(先加载到数据湖)→ Transform(在数据湖/仓库中转换)

ELT 的优势:

  • 原始数据保留,可重新转换
  • 利用数据仓库的计算能力
  • 更灵活

2.2 常用工具

工具 类型 特点
Apache Airflow 编排 DAG 定义工作流,Python 生态
dbt 转换 SQL-first,版本控制,测试
Prefect 编排 现代 Python 工作流引擎
Luigi 编排 轻量级依赖管理
Fivetran 提取+加载 SaaS,200+ 数据源连接器

Airflow DAG 示例

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    "data_pipeline",
    default_args=default_args,
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    extract = PythonOperator(
        task_id="extract_data",
        python_callable=extract_from_api,
    )

    clean = PythonOperator(
        task_id="clean_data",
        python_callable=clean_and_validate,
    )

    load = PythonOperator(
        task_id="load_to_warehouse",
        python_callable=load_to_bigquery,
    )

    extract >> clean >> load

3. 缺失值处理

3.1 缺失类型

类型 缩写 含义 示例
完全随机缺失 MCAR 缺失与任何变量无关 问卷随机遗漏
随机缺失 MAR 缺失与已观测变量有关 年龄大的人更不愿填收入
非随机缺失 MNAR 缺失与缺失值本身有关 收入高的人不愿报收入

3.2 处理方法

import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer, KNNImputer

df = pd.read_csv("data.csv")

# 1. 查看缺失情况
print(df.isnull().sum())
print(df.isnull().mean())  # 缺失比例

# 2. 删除法(缺失比例 > 50% 时考虑删除该列)
df = df.dropna(subset=["critical_column"])  # 删除关键列缺失的行
df = df.drop(columns=["mostly_missing_col"])  # 删除高缺失列

# 3. 统计量填充
df["age"].fillna(df["age"].median(), inplace=True)  # 中位数
df["city"].fillna(df["city"].mode()[0], inplace=True)  # 众数

# 4. KNN 填充
imputer = KNNImputer(n_neighbors=5)
df_imputed = pd.DataFrame(
    imputer.fit_transform(df[numeric_cols]),
    columns=numeric_cols
)

# 5. 添加缺失指示变量
df["age_missing"] = df["age"].isnull().astype(int)
方法 适用 优缺点
删除行 MCAR,少量缺失 简单,但丢失数据
均值/中位数 数值型 简单,但低估方差
众数 分类型 简单
KNN 多变量关联 效果好,但计算慢
多重插补 (MICE) MAR 理论最优,保留不确定性
模型预测 大规模数据 灵活,但可能引入偏差

4. 异常值处理

4.1 检测方法

# 1. IQR 方法
Q1 = df["value"].quantile(0.25)
Q3 = df["value"].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
outliers = df[(df["value"] < lower) | (df["value"] > upper)]

# 2. Z-score
from scipy import stats
z_scores = np.abs(stats.zscore(df["value"]))
outliers = df[z_scores > 3]

# 3. Isolation Forest(适合多维数据)
from sklearn.ensemble import IsolationForest
iso_forest = IsolationForest(contamination=0.05, random_state=42)
df["is_outlier"] = iso_forest.fit_predict(df[features])

4.2 处理策略

策略 方法 适用
删除 移除异常点 确认是错误数据
截断 Winsorize 到百分位 极值但非错误
转换 对数/Box-Cox 变换 右偏分布
分箱 离散化处理 异常值影响模型
保留 不处理 异常值本身有意义

5. 重复值处理

# 检测重复
print(f"重复行数: {df.duplicated().sum()}")
print(f"重复比例: {df.duplicated().mean():.2%}")

# 基于特定列检测重复
duplicates = df[df.duplicated(subset=["user_id", "timestamp"], keep=False)]

# 删除重复(保留第一条)
df = df.drop_duplicates(subset=["user_id", "timestamp"], keep="first")

# 模糊重复检测(文本数据)
from fuzzywuzzy import fuzz
def find_fuzzy_duplicates(names, threshold=90):
    duplicates = []
    for i, name1 in enumerate(names):
        for j, name2 in enumerate(names[i+1:], i+1):
            if fuzz.ratio(name1, name2) > threshold:
                duplicates.append((i, j, name1, name2))
    return duplicates

6. 数据验证

6.1 Schema 验证

import pandera as pa

schema = pa.DataFrameSchema({
    "age": pa.Column(int, pa.Check.in_range(0, 150)),
    "email": pa.Column(str, pa.Check.str_matches(r".+@.+\..+")),
    "income": pa.Column(float, pa.Check.ge(0), nullable=True),
    "gender": pa.Column(str, pa.Check.isin(["M", "F", "Other"])),
})

# 验证
validated_df = schema.validate(df)

6.2 数据质量检查清单

检查项 方法
完整性 缺失值比例
唯一性 主键重复检查
一致性 跨表/跨字段逻辑一致
准确性 值域范围、格式校验
时效性 数据更新时间
分布稳定性 与历史分布对比(数据漂移)

7. 工具总结

工具 用途 语言
pandas 数据处理基础 Python
polars 高性能数据处理 Python/Rust
Great Expectations 数据验证 Python
pandera Schema 验证 Python
Apache Airflow 工作流编排 Python
dbt SQL 转换 SQL
Apache Spark 大规模处理 Python/Scala

参考资料

  • "Data Engineering with Python" - Paul Crickard
  • "Fundamentals of Data Engineering" - Reis & Housley
  • pandas 官方文档
  • Apache Airflow 官方文档

评论 #