微信公众号制作网站有哪些,中国科技发展成果,代做课件的网站,广州市天河区工程建设监督网站1、Map-Reduce的逻辑过程 假设我们需要处理一批有关天气的数据#xff0c;其格式如下#xff1a; 按照ASCII码存储#xff0c;每行一条记录每一行字符从0开始计数#xff0c;第15个到第18个字符为年第25个到第29个字符为温度#xff0c;其中第25位是符号/-006701199099999… 1、Map-Reduce的逻辑过程 假设我们需要处理一批有关天气的数据其格式如下 按照ASCII码存储每行一条记录每一行字符从0开始计数第15个到第18个字符为年第25个到第29个字符为温度其中第25位是符号/- 00670119909999919500515070000 00430119909999919500515120022 0043011990999991950051518-0011 00430126509999919490324120111 00430126509999919490324180078 00670119909999919370515070001 0043011990999991937051512-0002 00430119909999919450515180001 00430126509999919450324120002 00430126509999919450324180078 现在需要统计出每年的最高温度。 Map-Reduce主要包括两个步骤Map和Reduce 每一步都有key-value对作为输入和输出 map阶段的key-value对的格式是由输入的格式所决定的如果是默认的TextInputFormat则每行作为一个记录进程处理其中key为此行的开头相对于文件的起始位置value就是此行的字符文本map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应对于上面的例子在map过程输入的key-value对如下 (0, 00670119909999919500515070000) (33, 00430119909999919500515120022) (66, 0043011990999991950051518-0011) (99, 00430126509999919490324120111) (132, 00430126509999919490324180078) (165, 00670119909999919370515070001) (198, 0043011990999991937051512-0002) (231, 00430119909999919450515180001) (264, 00430126509999919450324120002) (297, 00430126509999919450324180078) 在map过程中通过对每一行字符串的解析得到年-温度的key-value对作为输出 (1950, 0) (1950, 22) (1950, -11) (1949, 111) (1949, 78) (1937, 1) (1937, -2) (1945, 1) (1945, 2) (1945, 78) 在reduce过程将map过程中的输出按照相同的key将value放到同一个列表中作为reduce的输入 (1950, [0, 22, –11]) (1949, [111, 78]) (1937, [1, -2]) (1945, [1, 2, 78]) 在reduce过程中在列表中选择出最大的温度将年-最大温度的key-value作为输出 (1950, 22) (1949, 111) (1937, 1) (1945, 78) 其逻辑过程可用如下图表示 2、编写Map-Reduce程序 编写Map-Reduce程序一般需要实现两个函数mapper中的map函数和reducer中的reduce函数。 一般遵循以下格式 map: (K1, V1) - list(K2, V2) public interface MapperK1, V1, K2, V2 extends JobConfigurable, Closeable { void map(K1 key, V1 value, OutputCollectorK2, V2 output, Reporter reporter) throws IOException; } reduce: (K2, list(V)) - list(K3, V3) public interface ReducerK2, V2, K3, V3 extends JobConfigurable, Closeable { void reduce(K2 key, IteratorV2 values, OutputCollectorK3, V3 output, Reporter reporter) throws IOException; } 对于上面的例子则实现的mapper如下 public class MaxTemperatureMapper extends MapReduceBase implements MapperLongWritable, Text, Text, IntWritable { Override public void map(LongWritable key, Text value, OutputCollectorText, IntWritable output, Reporter reporter) throws IOException { String line value.toString(); String year line.substring(15, 19); int airTemperature; if (line.charAt(25) ) { airTemperature Integer.parseInt(line.substring(26, 30)); } else { airTemperature Integer.parseInt(line.substring(25, 30)); } output.collect(new Text(year), new IntWritable(airTemperature)); } } 实现的reducer如下 public class MaxTemperatureReducer extends MapReduceBase implements ReducerText, IntWritable, Text, IntWritable { public void reduce(Text key, IteratorIntWritable values, OutputCollectorText, IntWritable output, Reporter reporter) throws IOException { int maxValue Integer.MIN_VALUE; while (values.hasNext()) { maxValue Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); } } 欲运行上面实现的Mapper和Reduce则需要生成一个Map-Reduce得任务(Job)其基本包括以下三部分 输入的数据也即需要处理的数据Map-Reduce程序也即上面实现的Mapper和Reducer此任务的配置项JobConf欲配置JobConf需要大致了解Hadoop运行job的基本原理 Hadoop将Job分成task进行处理共两种taskmap task和reduce taskHadoop有两类的节点控制job的运行JobTracker和TaskTracker JobTracker协调整个job的运行将task分配到不同的TaskTracker上TaskTracker负责运行task并将结果返回给JobTrackerHadoop将输入数据分成固定大小的块我们称之input splitHadoop为每一个input split创建一个task在此task中依次处理此split中的一个个记录(record)Hadoop会尽量让输入数据块所在的DataNode和task所执行的DataNode(每个DataNode上都有一个TaskTracker)为同一个可以提高运行效率所以input split的大小也一般是HDFS的block的大小。Reduce task的输入一般为Map Task的输出Reduce Task的输出为整个job的输出保存在HDFS上。在reduce中相同key的所有的记录一定会到同一个TaskTracker上面运行然而不同的key可以在不同的TaskTracker上面运行我们称之为partition partition的规则为(K2, V2) – Integer 也即根据K2生成一个partition的id具有相同id的K2则进入同一个partition被同一个TaskTracker上被同一个Reducer进行处理。 public interface PartitionerK2, V2 extends JobConfigurable { int getPartition(K2 key, V2 value, int numPartitions); } 下图大概描述了Map-Reduce的Job运行的基本原理 下面我们讨论JobConf其有很多的项可以进行配置 setInputFormat设置map的输入格式默认为TextInputFormatkey为LongWritable, value为TextsetNumMapTasks设置map任务的个数此设置通常不起作用map任务的个数取决于输入的数据所能分成的input split的个数setMapperClass设置Mapper默认为IdentityMappersetMapRunnerClass设置MapRunner, map task是由MapRunner运行的默认为MapRunnable其功能为读取input split的一个个record依次调用Mapper的map函数setMapOutputKeyClass和setMapOutputValueClass设置Mapper的输出的key-value对的格式setOutputKeyClass和setOutputValueClass设置Reducer的输出的key-value对的格式setPartitionerClass和setNumReduceTasks设置Partitioner默认为HashPartitioner其根据key的hash值来决定进入哪个partition每个partition被一个reduce task处理所以partition的个数等于reduce task的个数setReducerClass设置Reducer默认为IdentityReducersetOutputFormat设置任务的输出格式默认为TextOutputFormatFileInputFormat.addInputPath设置输入文件的路径可以使一个文件一个路径一个通配符。可以被调用多次添加多个路径FileOutputFormat.setOutputPath设置输出文件的路径在job运行前此路径不应该存在当然不用所有的都设置由上面的例子可以编写Map-Reduce程序如下 public class MaxTemperature { public static void main(String[] args) throws IOException { if (args.length ! 2) { System.err.println(Usage: MaxTemperature input path output path); System.exit(-1); } JobConf conf new JobConf(MaxTemperature.class); conf.setJobName(Max temperature); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(MaxTemperatureMapper.class); conf.setReducerClass(MaxTemperatureReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); JobClient.runJob(conf); } } 3、Map-Reduce数据流(data flow) Map-Reduce的处理过程主要涉及以下四个部分 客户端Client用于提交Map-reduce任务jobJobTracker协调整个job的运行其为一个Java进程其main class为JobTrackerTaskTracker运行此job的task处理input split其为一个Java进程其main class为TaskTrackerHDFShadoop分布式文件系统用于在各个进程间共享Job相关的文件 3.1、任务提交 JobClient.runJob()创建一个新的JobClient实例调用其submitJob()函数。 向JobTracker请求一个新的job ID检测此job的output配置计算此job的input splits将Job运行所需的资源拷贝到JobTracker的文件系统中的文件夹中包括job jar文件job.xml配置文件input splits通知JobTracker此Job已经可以运行了提交任务后runJob每隔一秒钟轮询一次job的进度将进度返回到命令行直到任务运行完毕。 3.2、任务初始化 当JobTracker收到submitJob调用的时候将此任务放到一个队列中job调度器将从队列中获取任务并初始化任务。 初始化首先创建一个对象来封装job运行的tasks, status以及progress。 在创建task之前job调度器首先从共享文件系统中获得JobClient计算出的input splits。 其为每个input split创建一个map task。 每个task被分配一个ID。 3.3、任务分配 TaskTracker周期性的向JobTracker发送heartbeat。 在heartbeat中TaskTracker告知JobTracker其已经准备运行一个新的taskJobTracker将分配给其一个task。 在JobTracker为TaskTracker选择一个task之前JobTracker必须首先按照优先级选择一个Job在最高优先级的Job中选择一个task。 TaskTracker有固定数量的位置来运行map task或者reduce task。 默认的调度器对待map task优先于reduce task 当选择reduce task的时候JobTracker并不在多个task之间进行选择而是直接取下一个因为reduce task没有数据本地化的概念。 3.4、任务执行 TaskTracker被分配了一个task下面便要运行此task。 首先TaskTracker将此job的jar从共享文件系统中拷贝到TaskTracker的文件系统中。 TaskTracker从distributed cache中将job运行所需要的文件拷贝到本地磁盘。 其次其为每个task创建一个本地的工作目录将jar解压缩到文件目录中。 其三其创建一个TaskRunner来运行task。 TaskRunner创建一个新的JVM来运行task。 被创建的child JVM和TaskTracker通信来报告运行进度。 3.4.1、Map的过程 MapRunnable从input split中读取一个个的record然后依次调用Mapper的map函数将结果输出。 map的输出并不是直接写入硬盘而是将其写入缓存memory buffer。 当buffer中数据的到达一定的大小一个背景线程将数据开始写入硬盘。 在写入硬盘之前内存中的数据通过partitioner分成多个partition。 在同一个partition中背景线程会将数据按照key在内存中排序。 每次从内存向硬盘flush数据都生成一个新的spill文件。 当此task结束之前所有的spill文件被合并为一个整的被partition的而且排好序的文件。 reducer可以通过http协议请求map的输出文件tracker.http.threads可以设置http服务线程数。 3.4.2、Reduce的过程 当map task结束后其通知TaskTrackerTaskTracker通知JobTracker。 对于一个jobJobTracker知道TaskTracer和map输出的对应关系。 reducer中一个线程周期性的向JobTracker请求map输出的位置直到其取得了所有的map输出。 reduce task需要其对应的partition的所有的map输出。 reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出因为不同的map task完成时间不同。 reduce task中有多个copy线程可以并行拷贝map输出。 当很多map输出拷贝到reduce task后一个背景线程将其合并为一个大的排好序的文件。 当所有的map输出都拷贝到reduce task后进入sort过程将所有的map输出合并为大的排好序的文件。 最后进入reduce过程调用reducer的reduce函数处理排好序的输出的每个key最后的结果写入HDFS。 3.5、任务结束 当JobTracker获得最后一个task的运行成功的报告后将job得状态改为成功。 当JobClient从JobTracker轮询的时候发现此job已经成功结束则向用户打印消息从runJob函数中返回。 转载于:https://www.cnblogs.com/JohnLiang/archive/2011/11/09/2243448.html