Apache Hadoop - 大数据存储与处理平台

项目简介

Apache Hadoop是一个开源的分布式存储和计算框架,专门设计用于处理大规模数据集。Hadoop最初由Doug Cutting基于Google的MapReduce和Google File System(GFS)论文开发,后来捐赠给Apache软件基金会。

Hadoop的核心思想是将大数据分散存储在多台廉价的商用硬件上,并通过并行计算的方式处理这些数据。这种设计使得Hadoop能够以相对较低的成本处理PB级别的数据,成为了大数据领域的基础平台。

主要特性

  • 高容错性:自动检测和处理硬件故障
  • 高扩展性:可以轻松扩展到数千个节点
  • 高可靠性:数据多副本存储,确保数据安全
  • 低成本:使用商用硬件,降低存储成本
  • 处理多样化数据:支持结构化、半结构化和非结构化数据
  • 批处理优化:专门针对大规模批处理作业优化

项目原理

核心架构

Hadoop生态系统主要由以下核心组件组成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Hadoop生态系统
├── 核心组件
│ ├── HDFS (分布式文件系统)
│ ├── MapReduce (分布式计算框架)
│ └── YARN (资源管理器)
├── 数据仓库
│ ├── Hive (数据仓库软件)
│ └── HBase (NoSQL数据库)
├── 数据流处理
│ ├── Flume (日志收集)
│ ├── Sqoop (数据导入导出)
│ └── Kafka (消息队列)
└── 管理工具
├── Ambari (集群管理)
└── Zookeeper (协调服务)

HDFS架构

HDFS(Hadoop Distributed File System)采用主从架构:

NameNode(名称节点)

  • 管理文件系统的命名空间
  • 维护文件到数据块的映射
  • 处理客户端的文件系统操作
  • 管理DataNode的生命周期

DataNode(数据节点)

  • 存储实际的数据块
  • 处理读写请求
  • 执行数据块的创建、删除和复制
  • 定期向NameNode报告状态

Secondary NameNode

  • 协助NameNode进行检查点操作
  • 定期合并fsimage和edits日志
  • 在NameNode故障时提供恢复支持

MapReduce工作流程

MapReduce是一种编程模型,分为两个阶段:

  1. Map阶段

    • 将输入数据分割成独立的块
    • 每个Map任务处理一个数据块
    • 产生中间键值对结果
  2. Reduce阶段

    • 合并Map阶段的输出
    • 对相同键的值进行聚合
    • 产生最终结果
1
输入数据 → Split → Map → Shuffle → Sort → Reduce → 输出结果

使用场景

1. 大数据存储和归档

Hadoop可以存储企业产生的海量原始数据,提供低成本的长期存储解决方案。

2. 日志分析

处理Web服务器日志、应用程序日志等,提取有价值的业务洞察。

3. 数据仓库和ETL

构建企业数据仓库,进行复杂的ETL(Extract, Transform, Load)操作。

4. 机器学习和数据挖掘

为机器学习算法提供大规模数据处理能力,支持模型训练和预测。

5. 科学计算

在生物信息学、天文学、气候模拟等领域处理大规模科学数据。

具体案例

案例1:网站访问日志分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// MapReduce程序 - 统计网站访问量
public class PageViewCounter {

// Mapper类
public static class PageViewMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text page = new Text();

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 解析日志行:IP - - [timestamp] "GET /page.html HTTP/1.1" 200 1024
String line = value.toString();
String[] fields = line.split(" ");

if (fields.length >= 7) {
String request = fields[6];
if (request.startsWith("\"GET")) {
String url = request.split(" ")[1];
page.set(url);
context.write(page, one);
}
}
}
}

// Reducer类
public static class PageViewReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
result.set(sum);
context.write(key, result);
}
}

// 驱动程序
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "page view counter");

job.setJarByClass(PageViewCounter.class);
job.setMapperClass(PageViewMapper.class);
job.setCombinerClass(PageViewReducer.class);
job.setReducerClass(PageViewReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

案例2:用户行为分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 分析用户购买行为
public class UserBehaviorAnalysis {

public static class BehaviorMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 解析用户行为数据:userId,itemId,categoryId,behavior,timestamp
String[] fields = value.toString().split(",");

if (fields.length == 5) {
String userId = fields[0];
String itemId = fields[1];
String behavior = fields[3];

if ("buy".equals(behavior)) {
context.write(new Text(userId), new Text("buy:" + itemId));
} else if ("view".equals(behavior)) {
context.write(new Text(userId), new Text("view:" + itemId));
}
}
}
}

public static class BehaviorReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Set<String> viewedItems = new HashSet<>();
Set<String> boughtItems = new HashSet<>();

for (Text value : values) {
String[] parts = value.toString().split(":");
if ("buy".equals(parts[0])) {
boughtItems.add(parts[1]);
} else if ("view".equals(parts[0])) {
viewedItems.add(parts[1]);
}
}

// 计算转化率
int viewed = viewedItems.size();
int bought = boughtItems.size();
double conversionRate = viewed > 0 ? (double) bought / viewed : 0;

String result = String.format("viewed:%d,bought:%d,conversion:%.2f",
viewed, bought, conversionRate);
context.write(key, new Text(result));
}
}
}

案例3:HDFS文件操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// HDFS文件操作示例
public class HDFSOperations {
private static final String HDFS_URL = "hdfs://namenode:9000";

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", HDFS_URL);

FileSystem fs = FileSystem.get(conf);

// 创建目录
Path dir = new Path("/user/data");
if (!fs.exists(dir)) {
fs.mkdirs(dir);
System.out.println("目录创建成功: " + dir);
}

// 上传文件
Path localFile = new Path("local_data.txt");
Path hdfsFile = new Path("/user/data/remote_data.txt");
fs.copyFromLocalFile(localFile, hdfsFile);
System.out.println("文件上传成功");

// 读取文件
FSDataInputStream in = fs.open(hdfsFile);
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
reader.close();

// 列出目录内容
FileStatus[] fileStatuses = fs.listStatus(dir);
for (FileStatus status : fileStatuses) {
System.out.println(status.getPath().getName() + " - " + status.getLen() + " bytes");
}

// 删除文件
fs.delete(hdfsFile, false);
System.out.println("文件删除成功");

fs.close();
}
}

案例4:集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
<!-- core-site.xml -->
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://namenode:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop/tmp</value>
</property>
</configuration>

<!-- hdfs-site.xml -->
<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/opt/hadoop/hdfs/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/opt/hadoop/hdfs/datanode</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>134217728</value> <!-- 128MB -->
</property>
</configuration>

<!-- mapred-site.xml -->
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.application.classpath</name>
<value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
</property>
</configuration>

<!-- yarn-site.xml -->
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>resourcemanager</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>8192</value>
</property>
</configuration>

案例5:性能监控脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/bin/bash
# Hadoop集群健康检查脚本

echo "=== Hadoop集群状态检查 ==="

# 检查HDFS状态
echo "1. HDFS状态:"
hdfs dfsadmin -report | grep -E "(Live datanodes|Dead datanodes|DFS Remaining)"

# 检查YARN状态
echo -e "\n2. YARN状态:"
yarn node -list -all | head -10

# 检查MapReduce作业
echo -e "\n3. 运行中的MapReduce作业:"
mapred job -list

# 检查HDFS文件系统
echo -e "\n4. HDFS文件系统检查:"
hdfs fsck / | tail -10

# 检查集群负载
echo -e "\n5. 集群资源使用:"
yarn top | head -20

# 检查日志中的错误
echo -e "\n6. 最近的错误日志:"
find $HADOOP_LOG_DIR -name "*.log" -mtime -1 -exec grep -l "ERROR\|WARN" {} \; | head -5

性能优化建议

1. HDFS优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!-- 优化数据块大小 -->
<property>
<name>dfs.blocksize</name>
<value>268435456</value> <!-- 256MB,适合大文件 -->
</property>

<!-- 增加副本数量提高可靠性 -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>

<!-- 优化NameNode内存 -->
<property>
<name>dfs.namenode.handler.count</name>
<value>100</value>
</property>

2. MapReduce优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!-- 增加Map任务内存 -->
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>

<!-- 增加Reduce任务内存 -->
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value>
</property>

<!-- 启用压缩减少I/O -->
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>

3. 集群硬件配置建议

NameNode节点

  • CPU: 8-16核心
  • 内存: 64GB+
  • 存储: SSD用于元数据存储

DataNode节点

  • CPU: 4-8核心
  • 内存: 16-32GB
  • 存储: 大容量SATA硬盘(12TB+)
  • 网络: 万兆以太网

Apache Hadoop作为大数据处理的基础平台,其分布式架构和容错机制为企业处理海量数据提供了可靠的解决方案。通过合理的配置和优化,Hadoop可以构建高性能、高可用的大数据处理集群,满足各种规模的数据处理需求。

版权所有,如有侵权请联系我