Skip to content

Data Collection and Cleaning

Introduction

Data is the fuel of machine learning. Data quality directly determines model performance. This article covers data sources, ETL pipelines, data quality handling (missing values, outliers, duplicates), and commonly used tools.


1. Data Sources

Source Acquisition Method Typical Data
API REST/GraphQL calls Social media, financial quotes, weather
Web scraping HTML parsing, Selenium E-commerce prices, news, reviews
Databases SQL queries Business data, user behavior
Files CSV/JSON/Parquet reading Public datasets, logs
IoT devices MQTT/CoAP protocols Sensor data
Third-party data Data marketplace purchases Credit, geographic, industry data
Crowdsourcing Amazon Mechanical Turk Labeled data

API Data Collection Example

import requests
import time

def fetch_with_retry(url, params, max_retries=3):
    for attempt in range(max_retries):
        try:
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)  # exponential backoff

# Paginated collection
all_data = []
page = 1
while True:
    data = fetch_with_retry(
        "https://api.example.com/data",
        params={"page": page, "per_page": 100}
    )
    if not data["results"]:
        break
    all_data.extend(data["results"])
    page += 1

2. ETL Pipelines

ETL (Extract, Transform, Load) is the core workflow in data engineering:

Data Sources → Extract → Transform → Load → Data Warehouse
                │          │            │
                │ API/DB/  │ Clean/     │ Write to
                │ Files    │ Transform/ │ target
                           │ Aggregate

2.1 Modern ELT Pattern

Data Sources → Extract → Load (load to data lake first) → Transform (transform in lake/warehouse)

ELT advantages:

  • Raw data is preserved, allowing re-transformation
  • Leverages the computing power of the data warehouse
  • More flexible

2.2 Common Tools

Tool Type Features
Apache Airflow Orchestration DAG-defined workflows, Python ecosystem
dbt Transformation SQL-first, version control, testing
Prefect Orchestration Modern Python workflow engine
Luigi Orchestration Lightweight dependency management
Fivetran Extract+Load SaaS, 200+ data source connectors

Airflow DAG Example

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    "data_pipeline",
    default_args=default_args,
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    extract = PythonOperator(
        task_id="extract_data",
        python_callable=extract_from_api,
    )

    clean = PythonOperator(
        task_id="clean_data",
        python_callable=clean_and_validate,
    )

    load = PythonOperator(
        task_id="load_to_warehouse",
        python_callable=load_to_bigquery,
    )

    extract >> clean >> load

3. Missing Value Handling

3.1 Types of Missingness

Type Abbreviation Meaning Example
Missing Completely at Random MCAR Missingness unrelated to any variable Random survey omission
Missing at Random MAR Missingness related to observed variables Older people less willing to report income
Missing Not at Random MNAR Missingness related to the missing value itself High earners unwilling to report income

3.2 Handling Methods

import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer, KNNImputer

df = pd.read_csv("data.csv")

# 1. Examine missing patterns
print(df.isnull().sum())
print(df.isnull().mean())  # missing proportion

# 2. Deletion (consider dropping column if missing > 50%)
df = df.dropna(subset=["critical_column"])  # drop rows missing critical column
df = df.drop(columns=["mostly_missing_col"])  # drop high-missing columns

# 3. Statistical imputation
df["age"].fillna(df["age"].median(), inplace=True)  # median
df["city"].fillna(df["city"].mode()[0], inplace=True)  # mode

# 4. KNN imputation
imputer = KNNImputer(n_neighbors=5)
df_imputed = pd.DataFrame(
    imputer.fit_transform(df[numeric_cols]),
    columns=numeric_cols
)

# 5. Add missing indicator
df["age_missing"] = df["age"].isnull().astype(int)
Method Suited For Pros/Cons
Drop rows MCAR, small amount missing Simple, but loses data
Mean/Median Numeric Simple, but underestimates variance
Mode Categorical Simple
KNN Multivariate associations Good results, but computationally slow
Multiple Imputation (MICE) MAR Theoretically optimal, preserves uncertainty
Model prediction Large-scale data Flexible, but may introduce bias

4. Outlier Handling

4.1 Detection Methods

# 1. IQR method
Q1 = df["value"].quantile(0.25)
Q3 = df["value"].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
outliers = df[(df["value"] < lower) | (df["value"] > upper)]

# 2. Z-score
from scipy import stats
z_scores = np.abs(stats.zscore(df["value"]))
outliers = df[z_scores > 3]

# 3. Isolation Forest (suited for multivariate data)
from sklearn.ensemble import IsolationForest
iso_forest = IsolationForest(contamination=0.05, random_state=42)
df["is_outlier"] = iso_forest.fit_predict(df[features])

4.2 Treatment Strategies

Strategy Method Suited For
Remove Delete outlier points Confirmed erroneous data
Clip Winsorize to percentile Extreme but not erroneous
Transform Log/Box-Cox transformation Right-skewed distributions
Bin Discretize Outliers affect the model
Keep No treatment Outliers are meaningful

5. Duplicate Handling

# Detect duplicates
print(f"Duplicate rows: {df.duplicated().sum()}")
print(f"Duplicate rate: {df.duplicated().mean():.2%}")

# Detect duplicates based on specific columns
duplicates = df[df.duplicated(subset=["user_id", "timestamp"], keep=False)]

# Remove duplicates (keep first occurrence)
df = df.drop_duplicates(subset=["user_id", "timestamp"], keep="first")

# Fuzzy duplicate detection (text data)
from fuzzywuzzy import fuzz
def find_fuzzy_duplicates(names, threshold=90):
    duplicates = []
    for i, name1 in enumerate(names):
        for j, name2 in enumerate(names[i+1:], i+1):
            if fuzz.ratio(name1, name2) > threshold:
                duplicates.append((i, j, name1, name2))
    return duplicates

6. Data Validation

6.1 Schema Validation

import pandera as pa

schema = pa.DataFrameSchema({
    "age": pa.Column(int, pa.Check.in_range(0, 150)),
    "email": pa.Column(str, pa.Check.str_matches(r".+@.+\..+")),
    "income": pa.Column(float, pa.Check.ge(0), nullable=True),
    "gender": pa.Column(str, pa.Check.isin(["M", "F", "Other"])),
})

# Validate
validated_df = schema.validate(df)

6.2 Data Quality Checklist

Check Method
Completeness Missing value proportion
Uniqueness Primary key duplicate check
Consistency Cross-table/cross-field logical consistency
Accuracy Value range, format validation
Timeliness Data update timestamps
Distribution stability Compare with historical distributions (data drift)

7. Tool Summary

Tool Purpose Language
pandas Basic data processing Python
polars High-performance data processing Python/Rust
Great Expectations Data validation Python
pandera Schema validation Python
Apache Airflow Workflow orchestration Python
dbt SQL transformation SQL
Apache Spark Large-scale processing Python/Scala

References

  • "Data Engineering with Python" - Paul Crickard
  • "Fundamentals of Data Engineering" - Reis & Housley
  • pandas Official Documentation
  • Apache Airflow Official Documentation

评论 #