数据采集与清洗
概述
数据是机器学习的燃料。数据质量直接决定模型性能。本文涵盖数据采集来源、ETL 管道、数据质量处理(缺失值、异常值、重复值)和常用工具。
1. 数据来源
| 来源 |
获取方式 |
典型数据 |
| API |
REST/GraphQL 调用 |
社交媒体、金融行情、天气 |
| Web 爬虫 |
HTML 解析、Selenium |
电商价格、新闻、评论 |
| 数据库 |
SQL 查询 |
业务数据、用户行为 |
| 文件 |
CSV/JSON/Parquet 读取 |
公开数据集、日志 |
| IoT 设备 |
MQTT/CoAP 协议 |
传感器数据 |
| 第三方数据 |
数据市场购买 |
征信、地理、行业数据 |
| 众包 |
Amazon Mechanical Turk |
标注数据 |
API 数据采集示例
import requests
import time
def fetch_with_retry(url, params, max_retries=3):
for attempt in range(max_retries):
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # 指数退避
# 分页采集
all_data = []
page = 1
while True:
data = fetch_with_retry(
"https://api.example.com/data",
params={"page": page, "per_page": 100}
)
if not data["results"]:
break
all_data.extend(data["results"])
page += 1
2. ETL 管道
ETL(Extract, Transform, Load)是数据工程的核心流程:
数据源 → Extract(提取)→ Transform(转换)→ Load(加载)→ 数据仓库
│ │ │
│ API/DB/文件 │ 清洗/转换/聚合 │ 写入目标
2.1 现代 ELT 模式
数据源 → Extract → Load(先加载到数据湖)→ Transform(在数据湖/仓库中转换)
ELT 的优势:
- 原始数据保留,可重新转换
- 利用数据仓库的计算能力
- 更灵活
2.2 常用工具
| 工具 |
类型 |
特点 |
| Apache Airflow |
编排 |
DAG 定义工作流,Python 生态 |
| dbt |
转换 |
SQL-first,版本控制,测试 |
| Prefect |
编排 |
现代 Python 工作流引擎 |
| Luigi |
编排 |
轻量级依赖管理 |
| Fivetran |
提取+加载 |
SaaS,200+ 数据源连接器 |
Airflow DAG 示例
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"data_pipeline",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(
task_id="extract_data",
python_callable=extract_from_api,
)
clean = PythonOperator(
task_id="clean_data",
python_callable=clean_and_validate,
)
load = PythonOperator(
task_id="load_to_warehouse",
python_callable=load_to_bigquery,
)
extract >> clean >> load
3. 缺失值处理
3.1 缺失类型
| 类型 |
缩写 |
含义 |
示例 |
| 完全随机缺失 |
MCAR |
缺失与任何变量无关 |
问卷随机遗漏 |
| 随机缺失 |
MAR |
缺失与已观测变量有关 |
年龄大的人更不愿填收入 |
| 非随机缺失 |
MNAR |
缺失与缺失值本身有关 |
收入高的人不愿报收入 |
3.2 处理方法
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer, KNNImputer
df = pd.read_csv("data.csv")
# 1. 查看缺失情况
print(df.isnull().sum())
print(df.isnull().mean()) # 缺失比例
# 2. 删除法(缺失比例 > 50% 时考虑删除该列)
df = df.dropna(subset=["critical_column"]) # 删除关键列缺失的行
df = df.drop(columns=["mostly_missing_col"]) # 删除高缺失列
# 3. 统计量填充
df["age"].fillna(df["age"].median(), inplace=True) # 中位数
df["city"].fillna(df["city"].mode()[0], inplace=True) # 众数
# 4. KNN 填充
imputer = KNNImputer(n_neighbors=5)
df_imputed = pd.DataFrame(
imputer.fit_transform(df[numeric_cols]),
columns=numeric_cols
)
# 5. 添加缺失指示变量
df["age_missing"] = df["age"].isnull().astype(int)
| 方法 |
适用 |
优缺点 |
| 删除行 |
MCAR,少量缺失 |
简单,但丢失数据 |
| 均值/中位数 |
数值型 |
简单,但低估方差 |
| 众数 |
分类型 |
简单 |
| KNN |
多变量关联 |
效果好,但计算慢 |
| 多重插补 (MICE) |
MAR |
理论最优,保留不确定性 |
| 模型预测 |
大规模数据 |
灵活,但可能引入偏差 |
4. 异常值处理
4.1 检测方法
# 1. IQR 方法
Q1 = df["value"].quantile(0.25)
Q3 = df["value"].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
outliers = df[(df["value"] < lower) | (df["value"] > upper)]
# 2. Z-score
from scipy import stats
z_scores = np.abs(stats.zscore(df["value"]))
outliers = df[z_scores > 3]
# 3. Isolation Forest(适合多维数据)
from sklearn.ensemble import IsolationForest
iso_forest = IsolationForest(contamination=0.05, random_state=42)
df["is_outlier"] = iso_forest.fit_predict(df[features])
4.2 处理策略
| 策略 |
方法 |
适用 |
| 删除 |
移除异常点 |
确认是错误数据 |
| 截断 |
Winsorize 到百分位 |
极值但非错误 |
| 转换 |
对数/Box-Cox 变换 |
右偏分布 |
| 分箱 |
离散化处理 |
异常值影响模型 |
| 保留 |
不处理 |
异常值本身有意义 |
5. 重复值处理
# 检测重复
print(f"重复行数: {df.duplicated().sum()}")
print(f"重复比例: {df.duplicated().mean():.2%}")
# 基于特定列检测重复
duplicates = df[df.duplicated(subset=["user_id", "timestamp"], keep=False)]
# 删除重复(保留第一条)
df = df.drop_duplicates(subset=["user_id", "timestamp"], keep="first")
# 模糊重复检测(文本数据)
from fuzzywuzzy import fuzz
def find_fuzzy_duplicates(names, threshold=90):
duplicates = []
for i, name1 in enumerate(names):
for j, name2 in enumerate(names[i+1:], i+1):
if fuzz.ratio(name1, name2) > threshold:
duplicates.append((i, j, name1, name2))
return duplicates
6. 数据验证
6.1 Schema 验证
import pandera as pa
schema = pa.DataFrameSchema({
"age": pa.Column(int, pa.Check.in_range(0, 150)),
"email": pa.Column(str, pa.Check.str_matches(r".+@.+\..+")),
"income": pa.Column(float, pa.Check.ge(0), nullable=True),
"gender": pa.Column(str, pa.Check.isin(["M", "F", "Other"])),
})
# 验证
validated_df = schema.validate(df)
6.2 数据质量检查清单
| 检查项 |
方法 |
| 完整性 |
缺失值比例 |
| 唯一性 |
主键重复检查 |
| 一致性 |
跨表/跨字段逻辑一致 |
| 准确性 |
值域范围、格式校验 |
| 时效性 |
数据更新时间 |
| 分布稳定性 |
与历史分布对比(数据漂移) |
7. 工具总结
| 工具 |
用途 |
语言 |
| pandas |
数据处理基础 |
Python |
| polars |
高性能数据处理 |
Python/Rust |
| Great Expectations |
数据验证 |
Python |
| pandera |
Schema 验证 |
Python |
| Apache Airflow |
工作流编排 |
Python |
| dbt |
SQL 转换 |
SQL |
| Apache Spark |
大规模处理 |
Python/Scala |
参考资料
- "Data Engineering with Python" - Paul Crickard
- "Fundamentals of Data Engineering" - Reis & Housley
- pandas 官方文档
- Apache Airflow 官方文档