大数据技术
概述
当数据规模超过单机处理能力时,需要分布式计算框架。本文涵盖 Hadoop 生态、Apache Spark、流处理、数据湖/数据仓库以及云上大数据服务。
1. 大数据架构总览
graph TB
subgraph 数据源
A1[数据库]
A2[日志]
A3[IoT]
A4[API]
end
subgraph 数据采集
B1[Kafka]
B2[Flume]
B3[Sqoop]
end
subgraph 存储
C1[HDFS]
C2[S3 / 对象存储]
C3[数据湖 Delta/Iceberg]
end
subgraph 计算
D1[Spark 批处理]
D2[Flink 流处理]
D3[Spark SQL]
end
subgraph 服务
E1[数据仓库 Hive/BQ]
E2[OLAP Clickhouse]
E3[ML Pipeline]
E4[BI 仪表盘]
end
A1 & A2 & A3 & A4 --> B1 & B2 & B3
B1 & B2 & B3 --> C1 & C2 & C3
C1 & C2 & C3 --> D1 & D2 & D3
D1 & D2 & D3 --> E1 & E2 & E3 & E4
2. Hadoop 生态
2.1 HDFS(Hadoop Distributed File System)
分布式文件系统,为大文件优化:
NameNode(主节点)
- 管理文件系统元数据(文件→块的映射)
- 不存储实际数据
DataNode(数据节点)
- 存储实际数据块
- 默认块大小 128MB
- 默认 3 副本
文件 "logs.csv" (512MB):
Block 1 (128MB) → DataNode 1, 3, 5
Block 2 (128MB) → DataNode 2, 4, 6
Block 3 (128MB) → DataNode 1, 4, 5
Block 4 (128MB) → DataNode 2, 3, 6
2.2 MapReduce
经典的分布式计算模型:
输入数据
│
▼ Split
[Split 1] [Split 2] [Split 3]
│ │ │
▼ Map ▼ Map ▼ Map
(k1,v1) (k2,v2) (k1,v3)
│ │ │
▼ Shuffle & Sort
[k1: v1,v3] [k2: v2]
│ │
▼ Reduce ▼ Reduce
(k1, result1) (k2, result2)
WordCount 示例:
# Map: 文档 → (word, 1)
def mapper(document):
for word in document.split():
emit(word, 1)
# Reduce: (word, [1,1,1,...]) → (word, count)
def reducer(word, counts):
emit(word, sum(counts))
2.3 YARN(Yet Another Resource Negotiator)
Hadoop 的资源管理器:
| 组件 | 职责 |
|---|---|
| ResourceManager | 集群资源分配 |
| NodeManager | 单节点资源管理 |
| ApplicationMaster | 单个应用的资源协调 |
| Container | 资源分配单位(CPU + 内存) |
3. Apache Spark
Spark 是 MapReduce 的继任者,通过内存计算实现 10-100x 加速。
3.1 核心概念
| 概念 | 说明 |
|---|---|
| RDD | 弹性分布式数据集,基础抽象 |
| DataFrame | 带 Schema 的分布式表(推荐使用) |
| Dataset | 类型安全的 DataFrame(Scala/Java) |
| Transformation | 惰性操作(map, filter, join) |
| Action | 触发计算(collect, count, save) |
3.2 DataFrame 操作
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("DataAnalysis") \
.config("spark.sql.shuffle.partitions", 200) \
.getOrCreate()
# 读取数据
df = spark.read.parquet("s3://bucket/data/")
# 转换
result = (df
.filter(F.col("age") >= 18)
.withColumn("income_level",
F.when(F.col("income") > 100000, "high")
.when(F.col("income") > 50000, "medium")
.otherwise("low"))
.groupBy("city", "income_level")
.agg(
F.count("*").alias("count"),
F.avg("income").alias("avg_income"),
F.percentile_approx("income", 0.5).alias("median_income")
)
.orderBy(F.desc("count"))
)
result.show()
result.write.parquet("s3://bucket/output/")
3.3 Spark SQL
df.createOrReplaceTempView("users")
result = spark.sql("""
SELECT
city,
COUNT(*) as user_count,
AVG(income) as avg_income,
PERCENTILE(income, 0.5) as median_income
FROM users
WHERE age >= 18
GROUP BY city
HAVING COUNT(*) > 100
ORDER BY avg_income DESC
""")
3.4 Spark MLlib
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# 特征处理管道
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
scaler = StandardScaler(inputCol="features_raw", outputCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)
pipeline = Pipeline(stages=[assembler, scaler, rf])
# 训练
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_df)
# 评估
predictions = model.transform(test_df)
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc:.4f}")
4. 流处理
4.1 Kafka Streams
轻量级流处理库(无需独立集群):
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> events = builder.stream("user-events");
KTable<String, Long> eventCounts = events
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
eventCounts.toStream()
.to("event-counts", Produced.with(windowedSerde, Serdes.Long()));
4.2 Apache Flink
强大的分布式流处理引擎:
from pyflink.table import EnvironmentSettings, TableEnvironment
env = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env)
# 定义 Kafka 源
t_env.execute_sql("""
CREATE TABLE events (
user_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# 窗口聚合
result = t_env.sql_query("""
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
COUNT(*) as event_count
FROM events
GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' HOUR)
""")
4.3 批处理 vs 流处理
| 维度 | 批处理 | 流处理 |
|---|---|---|
| 延迟 | 分钟~小时 | 秒~毫秒 |
| 数据 | 有界数据集 | 无界数据流 |
| 模型 | MapReduce, Spark | Flink, Kafka Streams |
| 适用 | 报表、ETL、模型训练 | 实时监控、告警、推荐 |
5. 数据湖 vs 数据仓库
| 维度 | 数据湖 | 数据仓库 |
|---|---|---|
| 数据格式 | 原始(JSON, CSV, Parquet) | 结构化(Schema-on-Write) |
| 处理方式 | Schema-on-Read | Schema-on-Write |
| 成本 | 低(对象存储) | 高(专用引擎) |
| 用户 | 数据科学家、工程师 | 分析师、业务用户 |
| 代表 | S3 + Delta Lake | Snowflake, BigQuery |
5.1 Lakehouse 架构
融合数据湖的灵活性和数据仓库的管理能力:
对象存储 (S3/ADLS)
+
表格式 (Delta Lake / Apache Iceberg)
- ACID 事务
- Schema 演进
- 时间旅行
- 高效更新/删除
+
查询引擎 (Spark / Trino / Presto)
+
BI / ML 工具
# Delta Lake 示例
df.write.format("delta").save("s3://bucket/delta_table")
# 时间旅行
spark.read.format("delta") \
.option("timestampAsOf", "2024-01-01") \
.load("s3://bucket/delta_table")
# UPSERT (MERGE)
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "s3://bucket/delta_table")
delta_table.alias("target") \
.merge(new_data.alias("source"), "target.id = source.id") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
6. 云上大数据服务
| 服务 | 云厂商 | 类型 |
|---|---|---|
| EMR | AWS | 托管 Hadoop/Spark |
| Databricks | 多云 | Lakehouse 平台 |
| BigQuery | GCP | Serverless 数据仓库 |
| Snowflake | 多云 | 云数据仓库 |
| Synapse | Azure | 统一分析平台 |
| Glue | AWS | Serverless ETL |
| Dataflow | GCP | 托管 Apache Beam |
7. 性能优化
| 优化 | 方法 | 效果 |
|---|---|---|
| 分区 | 按日期/地区分区存储 | 减少扫描数据量 |
| 列式存储 | Parquet / ORC | 只读需要的列 |
| 压缩 | Snappy / Zstd | 减少 I/O |
| 广播 Join | 小表广播到所有节点 | 避免 Shuffle |
| 分桶 | 预按 key 分桶 | 加速 Join |
| 缓存 | df.cache() |
避免重复计算 |
| 合理分区数 | 调整 spark.sql.shuffle.partitions |
平衡并行度和开销 |
参考资料
- "Learning Spark" - Damji et al.
- "Designing Data-Intensive Applications" - Martin Kleppmann
- Apache Spark 官方文档:https://spark.apache.org
- Delta Lake 官方文档:https://delta.io