4.5.MapReduce
含义
MapReduce编程模型是一种用于处理大规模数据集的分布式计算模型。它通过将大规模的数据集分解成多个小块,并将这些小块分发到多台计算机上进行处理,然后将处理结果汇总起来,从而达到快速处理大数据的目的。
MapReduce编程模型主要包含两个核心步骤:Map(映射)和Reduce(归约)。
snail-job的MapReduce官方定义为三个阶段:Map、Reduce、MergeReduce阶段。这三个阶段实际可以拆分成四个功能阶段:
- 第一个功能阶段:Root-Map —— 分片(包)阶段(root_map);
- 第二个功能阶段:Map阶段——分发(映射)处理阶段;
- 第三个功能阶段:Reduce阶段——合并Map阶段数据;
- 第四个功能阶段:MergeReduce阶段——当出现有多个Reduce节点时,在此处对上一层Reduce合并的结果作最终汇总。
以下是MapReduce编程模型的主要特点和步骤:
1、 特点:
- 分布式计算:MapReduce能够将计算任务分发到多个节点上并行执行,从而充分利用计算资源,提高处理速度。
- 抽象化:MapReduce将复杂的并行计算过程高度抽象为两个函数:Map和Reduce,降低了编程的复杂性。
- 容错性:MapReduce框架具有强大的容错能力,当某个节点因为故障停止工作时,框架可以自动重新在其他节点上运行该任务。
- 灵活性:MapReduce支持多种数据格式、编程语言和计算模型,使其在大数据处理领域具有广泛的应用。
2、 步骤:
- Map(映射)步骤:将输入数据分解成更小的数据块,并并行地在多个节点上处理这些数据块。映射操作会对数据块进行某种形式的转换或过滤,并输出一系列的键值对。
- Reduce(归约)步骤:在映射阶段之后,归约阶段将所有映射阶段的输出根据键合并在一起,然后对这些合并的数据集进行进一步的处理。通常这个步骤是对所有具有相同键的值进行某种形式的汇总或归总。
使用注意事项:
- 分片数量不建议过多(大于200时会提示分片过多,最多不能超过500个分片)
- Map阶段,自定义任务名称不可为“ROOT_MAP”
- 任务处理完结果,不建议传递大量数据或者敏感信息,影响带宽和效率
场景
以下是一个MapReduce编程模型的实例,用于统计文本文件中每个单词的出现次数:
- 假设有一个文本文件,被分成了4份,分别放到4台服务器中存储。
- Map阶段:每台服务器上运行一个Map函数,将文本文件分成单词,并输出每个单词及其出现次数(以1表示)的键值对。例如,对于文本“the weather is good”,Map函数会输出“the:1, weather:1, is:1, good:1”。
- Shuffle阶段:框架将具有相同键的键值对进行合并,并将合并后的键值对发送到执行Reduce函数的节点。
- Reduce阶段:Reduce函数接收合并后的键值对,
测试
Map阶段
功能概述:当任务到达执行时间时,服务端会去调用客户端指定的Map阶段的定时任务(testAnnoMapReduceJobExecutor),此时根据JobExecutorName进行调用,此时第一层的TaskName约定为Root_Map(不能更改),建议使用者进行分片处理。
@Component
@JobExecutor(name = "testAnnoMapReduceJobExecutor")
public class TestAnnoMapReduceJobExecutor {
// 注意Map的第一个层任务名称为ROOT_MAP
// 当前@MapExecutor注解中taskName默认"ROOT_MAP" 即为MapReduce统一入口
@MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
// 此处可以进行自定义的分片逻辑
........................................
return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4", "5", "6"), "SECOND_MAP");
// 服务端会根据当前方法中的List<T> taskList的size作为需要动态分片的数量
// 进行负载均衡调用,其中集合中的对象为对应分片所接收到的对象
// 例如:文档中的手机号总量为307条,每100条一个分组,分组结果(List<T> taskList)为[{0,99}, {100, 199}, {200,299}, {300, 307}],此时就会分为四组
// 集合中的各个对象都会作为MapArgs的mapResult参数传递到后续的各个Map任务中
// 此处的nextTaskName即为后续动态分片后分发的Map任务名称
}
............................
}
功能概述:后续多层Map分片任务
// 由ROOT_MAP分片分发到SECOND_MAP任务
@MapExecutor(taskName = "SECOND_MAP")
public ExecuteResult secondMapExecute(MapArgs mapArgs) {
// mapResult 即为上述分包中分发下来的具体参数
mapArgs.getMapResult();
// 执行分包逻辑
.................
return mapHandler.doMap(List<T> taskList, "THIRD_MAP");
}
// 由SECOND_MAP分片分发到THIRD_MAP任务
@MapExecutor(taskName = "THIRD_MAP")
public ExecuteResult thirdMapExecute(MapArgs mapArgs) {
// mapResult 即为上述分包中分发下来的具体参数
mapArgs.getMapResult();
// 执行分包逻辑
.................
return mapHandler.doMap(List<T> taskList, "xxx_MAP");
}
// 由THIRD_MAP分片分发到xxx_MAP任务
@MapExecutor(taskName = "xxx_MAP")
public ExecuteResult thirdMapExecute(MapArgs mapArgs) {
// mapResult 即为上述分包中分发下来的具体参数
mapArgs.getMapResult();
// 执行分包逻辑
.................
return mapHandler.doMap(List<T> taskList, "xxx_MAP");
}
功能概述:分片结束后,最后一次执行Map任务,此时可以处理业务数据,并且将数据根据Reduce的分片数量将map处理完的业务数据合并下发的Reduce阶段
// 由THIRD_MAP分片分发到xxx_MAP任务
@MapExecutor(taskName = "xxx_MAP")
public ExecuteResult thirdMapExecute(MapArgs mapArgs) {
// mapResult 即为上述分包中分发下来的具体参数
mapArgs.getMapResult();
// 执行分包逻辑
.................
return mapHandler.doMap(List<T> taskList, "FINAL_MAP");
}
// 由xxx_MAP分片分发到FINAL_MAP(最终)任务
@MapExecutor(taskName = "FINAL_MAP")
public ExecuteResult thirdMapExecute(MapArgs mapArgs) {
// mapResult 即为上述分包中分发下来的具体参数
mapArgs.getMapResult();
// 执行业务逻辑生成相关businessData
.................
// 若此为MAP类型任务,到此就完成调用
// 若此为MapReduce类型任务,到此还会继续往下执行Reduce(归约)阶段
// 考虑到单节点的Reduce在应对分片过多的MAP任务是压力可能过大,因此这里我们支持对Reduce进并行处理
// 向下调用Reduce任务时,会将MAP节点的任务处理结果根据Reduce节点的数量进行负载均匀汇总,分发到各个Reduce节点
return ExecuteResult.success(businessData);
}
Reduce阶段
功能概述
- Reduce阶段是对MAP的阶段的最后一次执行MAP的结果进行合并计算
- 考虑到单节点的Reduce在应对分片过多的MAP任务是压力可能过大,因此这里我们支持对Reduce进并行处理
@ReduceExecutor
public ExecuteResult reduceExecute(ReduceArgs mapReduceArgs) {
// 服务端在Map阶段汇总所有节点的数据,并根据Reduce节点的数量均匀分发,确保负载均衡。
// mapResult 即为汇总后的均匀分发的节点数据
mapReduceArgs.getMapResult();
// 执行相关合并逻辑 生成相关的reduceBusinessData
................................
// 若配置了多个Reduce分片 即Reduce分片数>1时,会多一步MergeReduce阶段
// 服务端在Reduce阶段会汇总所有Reduce节点的数据,分发到最终的MergeReduce阶段
// 若只有一个Reduce分片,mapReduce到此即调用结束。
return ExecuteResult.success(reduceBusinessData);
}
MergeReduce阶段
功能概述:当调用多个Reduce节点时,对reduce节点产生的数据作最终的汇总处理
- MergeReduce主要职责对上一层的Reduce结果进行最后的汇总
- MergeReduce一个可选的阶段,若超过了2个Reduce任务,系统会自动产生一个MergeReduce任务
@MergeReduceExecutor
public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
// 服务端在Reduce阶段汇总所有节点的数据,发送到该MergeReduce阶段。
// mapResult 即为所有Reduce分片节点合并后的数据
mapReduceArgs.getMapResult();
// 处理最终的汇总数据 mergeReduceBussinessData
..................................
// mapReduce到此即调用结束。
return ExecuteResult.success(mergeReduceBussinessData);
}
案例图解
结论
Snail Job的MapReduce功能是一个分布式计算框架,它允许用户通过Map和Reduce两个阶段处理大规模数据集。这个框架的特点包括分布式计算、编程模型抽象化、容错性和灵活性。工作流程分为Map阶段(动态分片和处理)、Reduce阶段(合并Map结果)和MergeReduce阶段(多Reduce节点数据汇总)。使用时需要注意分片数量、数据处理量和自定义任务名称的规范。Snail Job提供了类模式和注解模式两种实现方式,适用于需要分布式数据处理的场景。