Java MapReduce程序实现分析历年的温度数据

作者: admin 分类: Hadoop 发布时间: 2018-02-26 09:09  阅读: 229 views

阅读完使用Hadoop来分析历年的温度数据(map,reduce处理流程)后,应该了解了MapReduce程序的工作原理。
代码实现的时候,我们需要
1. 一个map函数 (由Mapper类实现)
2. 一个reduce函数
3. 一些用来运行作业的代码

范例1.1 查找最高气温的Mapper类

package test;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final int MISSING = 9999;

    @Override
    public void map(LongWritable ikey, Text ivalue, Context context)
        throws IOException, InterruptedException {
        String line = ivalue.toString();
        String year = line.substring(15, 19);
        int airTemperature;

        if (line.charAt(45) == '+') {
            airTemperature = Integer.parseInt(line.substring(46, 50));
        } else {
            airTemperature = Integer.parseInt(line.substring(45, 50));
        }

        String quality = line.substring(50, 51);
        System.out.println("year" + year + "---airTemperature---" +
            airTemperature + "---quality" + quality);

        if ((airTemperature != MISSING) && quality.matches("[01459]")) {
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}

 

这个Mapper类是一个泛型类型,它有四个形参类型,分别制定map函数的输入键、输入值、输出键和输出值的类型。
就上面气温例子而言。
输入键是一个长整数偏移量,
输入值是一行文本,
输出键是年份,
输出值是气温(整数)。
Hadoop本省提供了一套可优化网络序列化传输的基本类型,而不直接使用JAVA内嵌的类型。这些类型都在org.apache.hadoop.io包中。这里使用的LongWritable类型(相当于java的Long类型),Text类型(相当于java中的String类型)
map()方法的输入是一个键和一个值。首先将包含有一行输入的Text值转换成Java的String类型,之后使用substring()方法提取需要的列。
map()方法还提供了Context实例用于输出内容的写入。在这种情况下。将年份数据Text对象进行读\写,将气温值封装在text中。只有气温数据不缺并且所对应质量代码显示为正确的气温读数时,这些数据才会被写入输出记录中。

范例1.2查找最高气温的Reducer类

package test;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MaxTemperature {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println(
                "Usage: MaxTemperature <input path> <output path>");
            System.exit(-1);
        }

        Job job = Job.getInstance();
        job.setJarByClass(MaxTemperature.class);
        job.setJobName("Max temperature");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

 

同样,reduce函数也有四个形式参数类型用户制定输入和输出类型。reduce函数的输入类型必须匹配map函数的输出类型。这个最高气温是通过循环比较每个气温与当前所知最高气温所得到的。

范例1.3通过程序实现在数据集中找出最高气温

package test;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MaxTemperature {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println(
                "Usage: MaxTemperature <input path> <output path>");
            System.exit(-1);
        }

        Job job = Job.getInstance();
        job.setJarByClass(MaxTemperature.class);
        job.setJobName("Max temperature");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

 

Job对象指定作业执行规范。我们可以用来控制整个作业的运行。我们在Hadoop集群上运行这个作业时,要把代码打包成一个JAR文件(Hadoop在集群上发布这个文件)。无需明确指定JAR文件的名称,在JOB对象的setJarByClass()方法中传递一个类即可。Hadoop利用这个类来查找包括它的JAR文件,进而找到相关的JAR文件。

构造Job对象之后,需要制定输入和输出数据的路径。调用FileInputFormat类的静态方法addInputPath()来定义输入数据的路径,这个路径可以是单个的文件、一个目录(此时,将目录下的所有文件当作输入)或符合特定文件模式的一系列文件。由函数名克制,可以多次调用addInputPath()来实现多路径的输入。

调用FileOutputFormat类中的静态方法setOutputPath()来指定输出路径(只能有一个输出路径)。这个方法制定的是reduce函数输出文件的写入目录。在运行作业前该目录是不应该存在的,否则Hadoop会包错并拒绝运行作业。这种预防措施的目的是防止数据丢失(防止被覆盖)

接着通过setMapperClass()和setReducerClass()指定map类型和reduce类型。setOutputKeyClass()和setOutputValueClass()控制map和reduce函数的输出类型,这两个输出类型一般都是相同的。如果不同,则通过setMapOutputKeyClass()和setMapOutputValueClass()来设置map函数的输出类型。

输入的类型通过InputFormat类来控制,我们的例子中没有设置,因为使用的是默认的TextInputFormat。

Job中的waitForCompletion()方法提交作业等待执行完成。该方法中的布尔参数是个详细标识,会把进度写到控制台。


 

说明:如何正确的运行该MaxTemperature 主函数程序

1.先准备好要测试的数据,如下,保存到txt文档中

0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
0067011990999991950032418004888888880500001N9+00001+9999999999999999999999
0067011990999991950051507004888888880500001N9+00781+9999999999999999999999

2.将该txt文档存放到  hdfs系统中(我是window 64系统中配好了hadoop环境)

通过以下命令进行存放
hadoop fs -put c:\hadoop\test.txt /user/xxxx/input  上传到hdfs文件系统中

3.在运行MaxTemperature的main()函数时,先配置好两个入参。 

我是在eclipse中项目 – propeties – Run/Debug Settings – Edit – Arguments – Program atguments配置了以下信息。
hdfs://localhost:9000/user/deathearth/input/source.txt
hdfs://localhost:9000/user/deathearth/output

4.确定在HDFS locations的对应目录下没有output文件夹

如果存在会报错。

5.运行main()函数,会在output文件夹中生成新的文件  part-r-00000(17.0 b,r3)

内容如下
1949    111
1950    78

ps: 这就说明查找最高气温的范例就运行成功了。祝大家成功。


   原创文章,转载请标明本文链接: Java MapReduce程序实现分析历年的温度数据

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

电子邮件地址不会被公开。 必填项已用*标注

更多阅读