Data Collection and Cleaning
Introduction
Data is the fuel of machine learning. Data quality directly determines model performance. This article covers data sources, ETL pipelines, data quality handling (missing values, outliers, duplicates), and commonly used tools.
1. Data Sources
| Source |
Acquisition Method |
Typical Data |
| API |
REST/GraphQL calls |
Social media, financial quotes, weather |
| Web scraping |
HTML parsing, Selenium |
E-commerce prices, news, reviews |
| Databases |
SQL queries |
Business data, user behavior |
| Files |
CSV/JSON/Parquet reading |
Public datasets, logs |
| IoT devices |
MQTT/CoAP protocols |
Sensor data |
| Third-party data |
Data marketplace purchases |
Credit, geographic, industry data |
| Crowdsourcing |
Amazon Mechanical Turk |
Labeled data |
API Data Collection Example
import requests
import time
def fetch_with_retry(url, params, max_retries=3):
for attempt in range(max_retries):
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # exponential backoff
# Paginated collection
all_data = []
page = 1
while True:
data = fetch_with_retry(
"https://api.example.com/data",
params={"page": page, "per_page": 100}
)
if not data["results"]:
break
all_data.extend(data["results"])
page += 1
2. ETL Pipelines
ETL (Extract, Transform, Load) is the core workflow in data engineering:
Data Sources → Extract → Transform → Load → Data Warehouse
│ │ │
│ API/DB/ │ Clean/ │ Write to
│ Files │ Transform/ │ target
│ Aggregate
2.1 Modern ELT Pattern
Data Sources → Extract → Load (load to data lake first) → Transform (transform in lake/warehouse)
ELT advantages:
- Raw data is preserved, allowing re-transformation
- Leverages the computing power of the data warehouse
- More flexible
| Tool |
Type |
Features |
| Apache Airflow |
Orchestration |
DAG-defined workflows, Python ecosystem |
| dbt |
Transformation |
SQL-first, version control, testing |
| Prefect |
Orchestration |
Modern Python workflow engine |
| Luigi |
Orchestration |
Lightweight dependency management |
| Fivetran |
Extract+Load |
SaaS, 200+ data source connectors |
Airflow DAG Example
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"data_pipeline",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(
task_id="extract_data",
python_callable=extract_from_api,
)
clean = PythonOperator(
task_id="clean_data",
python_callable=clean_and_validate,
)
load = PythonOperator(
task_id="load_to_warehouse",
python_callable=load_to_bigquery,
)
extract >> clean >> load
3. Missing Value Handling
3.1 Types of Missingness
| Type |
Abbreviation |
Meaning |
Example |
| Missing Completely at Random |
MCAR |
Missingness unrelated to any variable |
Random survey omission |
| Missing at Random |
MAR |
Missingness related to observed variables |
Older people less willing to report income |
| Missing Not at Random |
MNAR |
Missingness related to the missing value itself |
High earners unwilling to report income |
3.2 Handling Methods
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer, KNNImputer
df = pd.read_csv("data.csv")
# 1. Examine missing patterns
print(df.isnull().sum())
print(df.isnull().mean()) # missing proportion
# 2. Deletion (consider dropping column if missing > 50%)
df = df.dropna(subset=["critical_column"]) # drop rows missing critical column
df = df.drop(columns=["mostly_missing_col"]) # drop high-missing columns
# 3. Statistical imputation
df["age"].fillna(df["age"].median(), inplace=True) # median
df["city"].fillna(df["city"].mode()[0], inplace=True) # mode
# 4. KNN imputation
imputer = KNNImputer(n_neighbors=5)
df_imputed = pd.DataFrame(
imputer.fit_transform(df[numeric_cols]),
columns=numeric_cols
)
# 5. Add missing indicator
df["age_missing"] = df["age"].isnull().astype(int)
| Method |
Suited For |
Pros/Cons |
| Drop rows |
MCAR, small amount missing |
Simple, but loses data |
| Mean/Median |
Numeric |
Simple, but underestimates variance |
| Mode |
Categorical |
Simple |
| KNN |
Multivariate associations |
Good results, but computationally slow |
| Multiple Imputation (MICE) |
MAR |
Theoretically optimal, preserves uncertainty |
| Model prediction |
Large-scale data |
Flexible, but may introduce bias |
4. Outlier Handling
4.1 Detection Methods
# 1. IQR method
Q1 = df["value"].quantile(0.25)
Q3 = df["value"].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
outliers = df[(df["value"] < lower) | (df["value"] > upper)]
# 2. Z-score
from scipy import stats
z_scores = np.abs(stats.zscore(df["value"]))
outliers = df[z_scores > 3]
# 3. Isolation Forest (suited for multivariate data)
from sklearn.ensemble import IsolationForest
iso_forest = IsolationForest(contamination=0.05, random_state=42)
df["is_outlier"] = iso_forest.fit_predict(df[features])
4.2 Treatment Strategies
| Strategy |
Method |
Suited For |
| Remove |
Delete outlier points |
Confirmed erroneous data |
| Clip |
Winsorize to percentile |
Extreme but not erroneous |
| Transform |
Log/Box-Cox transformation |
Right-skewed distributions |
| Bin |
Discretize |
Outliers affect the model |
| Keep |
No treatment |
Outliers are meaningful |
5. Duplicate Handling
# Detect duplicates
print(f"Duplicate rows: {df.duplicated().sum()}")
print(f"Duplicate rate: {df.duplicated().mean():.2%}")
# Detect duplicates based on specific columns
duplicates = df[df.duplicated(subset=["user_id", "timestamp"], keep=False)]
# Remove duplicates (keep first occurrence)
df = df.drop_duplicates(subset=["user_id", "timestamp"], keep="first")
# Fuzzy duplicate detection (text data)
from fuzzywuzzy import fuzz
def find_fuzzy_duplicates(names, threshold=90):
duplicates = []
for i, name1 in enumerate(names):
for j, name2 in enumerate(names[i+1:], i+1):
if fuzz.ratio(name1, name2) > threshold:
duplicates.append((i, j, name1, name2))
return duplicates
6. Data Validation
6.1 Schema Validation
import pandera as pa
schema = pa.DataFrameSchema({
"age": pa.Column(int, pa.Check.in_range(0, 150)),
"email": pa.Column(str, pa.Check.str_matches(r".+@.+\..+")),
"income": pa.Column(float, pa.Check.ge(0), nullable=True),
"gender": pa.Column(str, pa.Check.isin(["M", "F", "Other"])),
})
# Validate
validated_df = schema.validate(df)
6.2 Data Quality Checklist
| Check |
Method |
| Completeness |
Missing value proportion |
| Uniqueness |
Primary key duplicate check |
| Consistency |
Cross-table/cross-field logical consistency |
| Accuracy |
Value range, format validation |
| Timeliness |
Data update timestamps |
| Distribution stability |
Compare with historical distributions (data drift) |
| Tool |
Purpose |
Language |
| pandas |
Basic data processing |
Python |
| polars |
High-performance data processing |
Python/Rust |
| Great Expectations |
Data validation |
Python |
| pandera |
Schema validation |
Python |
| Apache Airflow |
Workflow orchestration |
Python |
| dbt |
SQL transformation |
SQL |
| Apache Spark |
Large-scale processing |
Python/Scala |
References
- "Data Engineering with Python" - Paul Crickard
- "Fundamentals of Data Engineering" - Reis & Housley
- pandas Official Documentation
- Apache Airflow Official Documentation