跳转至

大数据技术

概述

当数据规模超过单机处理能力时,需要分布式计算框架。本文涵盖 Hadoop 生态、Apache Spark、流处理、数据湖/数据仓库以及云上大数据服务。


1. 大数据架构总览

graph TB
    subgraph 数据源
        A1[数据库]
        A2[日志]
        A3[IoT]
        A4[API]
    end

    subgraph 数据采集
        B1[Kafka]
        B2[Flume]
        B3[Sqoop]
    end

    subgraph 存储
        C1[HDFS]
        C2[S3 / 对象存储]
        C3[数据湖 Delta/Iceberg]
    end

    subgraph 计算
        D1[Spark 批处理]
        D2[Flink 流处理]
        D3[Spark SQL]
    end

    subgraph 服务
        E1[数据仓库 Hive/BQ]
        E2[OLAP Clickhouse]
        E3[ML Pipeline]
        E4[BI 仪表盘]
    end

    A1 & A2 & A3 & A4 --> B1 & B2 & B3
    B1 & B2 & B3 --> C1 & C2 & C3
    C1 & C2 & C3 --> D1 & D2 & D3
    D1 & D2 & D3 --> E1 & E2 & E3 & E4

2. Hadoop 生态

2.1 HDFS(Hadoop Distributed File System)

分布式文件系统,为大文件优化:

NameNode(主节点)
  - 管理文件系统元数据(文件→块的映射)
  - 不存储实际数据

DataNode(数据节点)
  - 存储实际数据块
  - 默认块大小 128MB
  - 默认 3 副本

文件 "logs.csv" (512MB):
  Block 1 (128MB) → DataNode 1, 3, 5
  Block 2 (128MB) → DataNode 2, 4, 6
  Block 3 (128MB) → DataNode 1, 4, 5
  Block 4 (128MB) → DataNode 2, 3, 6

2.2 MapReduce

经典的分布式计算模型:

输入数据
  │
  ▼ Split
[Split 1] [Split 2] [Split 3]
  │          │          │
  ▼ Map      ▼ Map      ▼ Map
(k1,v1)   (k2,v2)   (k1,v3)
  │          │          │
  ▼ Shuffle & Sort
[k1: v1,v3]  [k2: v2]
  │              │
  ▼ Reduce      ▼ Reduce
(k1, result1) (k2, result2)

WordCount 示例

# Map: 文档 → (word, 1)
def mapper(document):
    for word in document.split():
        emit(word, 1)

# Reduce: (word, [1,1,1,...]) → (word, count)
def reducer(word, counts):
    emit(word, sum(counts))

2.3 YARN(Yet Another Resource Negotiator)

Hadoop 的资源管理器:

组件 职责
ResourceManager 集群资源分配
NodeManager 单节点资源管理
ApplicationMaster 单个应用的资源协调
Container 资源分配单位(CPU + 内存)

3. Apache Spark

Spark 是 MapReduce 的继任者,通过内存计算实现 10-100x 加速。

3.1 核心概念

概念 说明
RDD 弹性分布式数据集,基础抽象
DataFrame 带 Schema 的分布式表(推荐使用)
Dataset 类型安全的 DataFrame(Scala/Java)
Transformation 惰性操作(map, filter, join)
Action 触发计算(collect, count, save)

3.2 DataFrame 操作

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("DataAnalysis") \
    .config("spark.sql.shuffle.partitions", 200) \
    .getOrCreate()

# 读取数据
df = spark.read.parquet("s3://bucket/data/")

# 转换
result = (df
    .filter(F.col("age") >= 18)
    .withColumn("income_level", 
        F.when(F.col("income") > 100000, "high")
         .when(F.col("income") > 50000, "medium")
         .otherwise("low"))
    .groupBy("city", "income_level")
    .agg(
        F.count("*").alias("count"),
        F.avg("income").alias("avg_income"),
        F.percentile_approx("income", 0.5).alias("median_income")
    )
    .orderBy(F.desc("count"))
)

result.show()
result.write.parquet("s3://bucket/output/")

3.3 Spark SQL

df.createOrReplaceTempView("users")

result = spark.sql("""
    SELECT 
        city,
        COUNT(*) as user_count,
        AVG(income) as avg_income,
        PERCENTILE(income, 0.5) as median_income
    FROM users
    WHERE age >= 18
    GROUP BY city
    HAVING COUNT(*) > 100
    ORDER BY avg_income DESC
""")

3.4 Spark MLlib

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# 特征处理管道
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
scaler = StandardScaler(inputCol="features_raw", outputCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

pipeline = Pipeline(stages=[assembler, scaler, rf])

# 训练
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_df)

# 评估
predictions = model.transform(test_df)
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc:.4f}")

4. 流处理

4.1 Kafka Streams

轻量级流处理库(无需独立集群):

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> events = builder.stream("user-events");

KTable<String, Long> eventCounts = events
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
    .count();

eventCounts.toStream()
    .to("event-counts", Produced.with(windowedSerde, Serdes.Long()));

强大的分布式流处理引擎:

from pyflink.table import EnvironmentSettings, TableEnvironment

env = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env)

# 定义 Kafka 源
t_env.execute_sql("""
    CREATE TABLE events (
        user_id STRING,
        event_type STRING,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'events',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'json'
    )
""")

# 窗口聚合
result = t_env.sql_query("""
    SELECT 
        user_id,
        TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
        COUNT(*) as event_count
    FROM events
    GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' HOUR)
""")

4.3 批处理 vs 流处理

维度 批处理 流处理
延迟 分钟~小时 秒~毫秒
数据 有界数据集 无界数据流
模型 MapReduce, Spark Flink, Kafka Streams
适用 报表、ETL、模型训练 实时监控、告警、推荐

5. 数据湖 vs 数据仓库

维度 数据湖 数据仓库
数据格式 原始(JSON, CSV, Parquet) 结构化(Schema-on-Write)
处理方式 Schema-on-Read Schema-on-Write
成本 低(对象存储) 高(专用引擎)
用户 数据科学家、工程师 分析师、业务用户
代表 S3 + Delta Lake Snowflake, BigQuery

5.1 Lakehouse 架构

融合数据湖的灵活性和数据仓库的管理能力:

对象存储 (S3/ADLS)
    +
表格式 (Delta Lake / Apache Iceberg)
    - ACID 事务
    - Schema 演进
    - 时间旅行
    - 高效更新/删除
    +
查询引擎 (Spark / Trino / Presto)
    +
BI / ML 工具
# Delta Lake 示例
df.write.format("delta").save("s3://bucket/delta_table")

# 时间旅行
spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-01") \
    .load("s3://bucket/delta_table")

# UPSERT (MERGE)
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "s3://bucket/delta_table")
delta_table.alias("target") \
    .merge(new_data.alias("source"), "target.id = source.id") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

6. 云上大数据服务

服务 云厂商 类型
EMR AWS 托管 Hadoop/Spark
Databricks 多云 Lakehouse 平台
BigQuery GCP Serverless 数据仓库
Snowflake 多云 云数据仓库
Synapse Azure 统一分析平台
Glue AWS Serverless ETL
Dataflow GCP 托管 Apache Beam

7. 性能优化

优化 方法 效果
分区 按日期/地区分区存储 减少扫描数据量
列式存储 Parquet / ORC 只读需要的列
压缩 Snappy / Zstd 减少 I/O
广播 Join 小表广播到所有节点 避免 Shuffle
分桶 预按 key 分桶 加速 Join
缓存 df.cache() 避免重复计算
合理分区数 调整 spark.sql.shuffle.partitions 平衡并行度和开销

参考资料

  • "Learning Spark" - Damji et al.
  • "Designing Data-Intensive Applications" - Martin Kleppmann
  • Apache Spark 官方文档:https://spark.apache.org
  • Delta Lake 官方文档:https://delta.io

评论 #