MapReduce统计上行流量、下行流量及流量之和
- 数据集需求分析
- 具体操作
- 自定义一个数据类型
- Map阶段
- 自定义分区
- Reduce阶段
- Driver阶段
- 将程序打成jar包
- 在集群上运行
数据集需求分析
数据1363157985066 1372623050300-FD-07-A4-72-B8:CMCC120.196.100.82i02.c.aliimg.com2427248124681200
1363157995052 138265441015C-0E-8B-C7-F1-E0:CMCC120.197.40.4402640200
1363157991076 1392643565620-10-7A-28-CC-0A:CMCC120.196.100.99241321512200
1363154400022 139262511065C-0E-8B-8B-B1-50:CMCC120.197.40.4402400200
1363157993044 1821157596194-71-AC-CD-E6-18:CMCC-EASY120.196.100.99iface.qiyi.com视频网站151215272106200
1363157993055 13560439658C4-17-FE-BA-DE-D9:CMCC120.196.100.9918151116954200
1363157995033 159201332575C-0E-8B-C7-BA-20:CMCC120.197.40.4sug.so.360.cn信息安全202031562936200
1363157983019 1371919941968-A1-B7-03-07-B1:CMCC-EASY120.196.100.82402400200
1363157984041 136605779915C-0E-8B-92-5C-20:CMCC-EASY120.197.40.4s19.cnzz.com站点统计2496960690200
1363157973098 150136858585C-0E-8B-C7-F7-90:CMCC120.197.40.4rank.ie.sogou.com搜索引擎282736593538200
1363157986029 15989002119E8-99-C4-4E-93-E0:CMCC-EASY120.196.100.99www.umeng.com站点统计331938180200
1363157992093 13560439658C4-17-FE-BA-DE-D9:CMCC120.196.100.991599184938200
1363157986041 134802531045C-0E-8B-C7-FC-80:CMCC-EASY120.197.40.433180180200
1363157984040 136028465655C-0E-8B-8B-B6-00:CMCC120.197.40.42052.flash2-http.qq.com综合门户151219382910200
1363157995093 1392231446600-FD-07-A2-EC-BA:CMCC120.196.100.82img.qfc.cn121230083720200
1363157982040 135024688235C-0A-5B-6A-0B-D4:CMCC-EASY120.196.100.99y0.ifengimg.com综合门户571027335110349200
1363157986072 1832017338284-25-DB-4F-10-1A:CMCC-EASY120.196.100.99input.shouji.sogou.com搜索引擎211895312412200
1363157990043 1392505741300-1F-64-E1-E6-9A:CMCC120.196.100.55t3.baidu.com搜索引擎69631105848243200
1363157988072 1376077871000-FD-07-A4-7B-08:CMCC120.196.100.8222120120200
1363157985079 1382307000120-7C-8F-70-68-1F:CMCC120.196.100.9963360180200
1363157985069 1360021750200-1F-64-E2-E8-B1:CMCC120.196.100.55181381080186852200 需求
- 统计每个电话号码的总的上行流量、下行流量及总流量
- 按照号码的前三个进行分区操作
分析
- 将电话作为key,这样就可以按照key进行分组
- 在reduce阶段进行求和汇总
具体操作
自定义一个数据类型import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public class Bean implements WritableComparable<Bean> {
private int sum_low;
private int sum_up;
private int sum_bean;
@Override
public int compareTo(Bean o) {
return this.sum_bean - o.sum_bean;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(sum_low);
dataOutput.writeInt(sum_up);
dataOutput.writeInt(sum_bean);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
sum_low = dataInput.readInt();
sum_up = dataInput.readInt();
sum_bean = dataInput.readInt();
}
public void set(int sum_low, int sum_up, int sum_bean) {
this.sum_low = sum_low;
this.sum_up = sum_up;
this.sum_bean = sum_bean;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Bean bean = (Bean) o;
return sum_low == bean.sum_low &&
sum_up == bean.sum_up &&
sum_bean == bean.sum_bean;
}
@Override
public int hashCode() {
return Objects.hash(sum_low, sum_up, sum_bean);
}
@Override
public String toString() {
return sum_low + "\t" + sum_up + "\t" + sum_bean;
}
public int getSum_low() {
return sum_low;
}
public void setSum_low(int sum_low) {
this.sum_low = sum_low;
}
public int getSum_up() {
return sum_up;
}
public void setSum_up(int sum_up) {
this.sum_up = sum_up;
}
public int getSum_bean() {
return sum_bean;
}
public void setSum_bean(int sum_bean) {
this.sum_bean = sum_bean;
}
} Map阶段import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 1363157986072 1832017338284-25-DB-4F-10-1A:CMCC-EASY120.196.100.99input.shouji.sogou.com搜索引擎211895312412200
* 求每个手机号的上行流量之和、下行流量之和、上下行流量之和
*/
public class MapTest extends Mapper<LongWritable, Text, Text, Bean> {
Bean v = new Bean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String [] datas = value.toString().split("\t");
k.set(datas[1]);
//这里先暂时存储单个的上下行流量
v.set(Integer.parseInt(datas[datas.length-3]),Integer.parseInt(datas[datas.length-2]),0);
context.write(k,v);
}
} 自定义分区import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ParTest extends Partitioner<Text,Bean> {
@Override
public int getPartition(Text text, Bean bean, int i) {
String prePhone =text.toString().substring(0,3);//substring左闭右开
int partition = 4;//五个分区,从0开始算
if("136".equals(prePhone)){
partition = 0;
}else if ("137".equals(prePhone)){
partition = 1;
}else if ("138".equals(prePhone)){
partition = 2;
}else if ("139".equals(prePhone)){
partition = 3;
}else {
partition = 4;
}
return partition;
}
} Reduce阶段import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class RedTest extends Reducer<Text,Bean,Text,Bean> {
int sum_low = 0;
int sum_up = 0;
Bean v = new Bean();
@Override
protected void reduce(Text key, Iterable<Bean> values, Context context) throws IOException, InterruptedException {
for (Bean b:values){
sum_up+=b.getSum_up();
sum_low+=b.getSum_low();
}
v.set(sum_low,sum_up,sum_low+sum_up);
context.write(key,v);
sum_up=0;
sum_low=0;
}
} Driver阶段import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
public class DriTest {
public static void main(String[] args) throws Exception{
File file = new File("D:\\FlowSum\\output");
if (file.exists()){
delFile(file);
driver();
}else {
driver();
}
}
public static void delFile(File file) {
File[] files = file.listFiles();
if (files != null && files.length != 0) {
for (int i = 0;i<files.length;i++) {
delFile(files[i]);
}
}
file.delete();
}
public static void driver() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.default","hdfs://192.168.0.155:9000/");
Job job = Job.getInstance(conf);
job.setMapperClass(MapTest.class);
job.setReducerClass(RedTest.class);
job.setJarByClass(DriTest.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Bean.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(Bean.class);
job.setNumReduceTasks(5);
job.setPartitionerClass(ParTest.class);
FileInputFormat.setInputPaths(job, "/MR/input");
FileOutputFormat.setOutputPath(job, new Path("/MR/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
} 将程序打成jar包
在IDEA上打jar包的流程图
在集群上运行
- 在集群上运行的命令:hadoop jar MPTEST.jar FlovwBean.DriTest
- 其中FlovwBean.DriTest是Driver的绝对路径
|