评论

收藏

[python] MapReduce统计上行流量、下行流量及流量之和,并且到集群上运行

编程语言 编程语言 发布于:2021-08-04 14:34 | 阅读数:422 | 评论:0

MapReduce统计上行流量、下行流量及流量之和

  • 数据集需求分析


    • 数据
    • 需求
    • 分析

  • 具体操作


    • 自定义一个数据类型
    • Map阶段
    • 自定义分区
    • Reduce阶段
    • Driver阶段

  • 将程序打成jar包


    • 在IDEA上打jar包的流程图

  • 在集群上运行

数据集需求分析
数据
1363157985066 1372623050300-FD-07-A4-72-B8:CMCC120.196.100.82i02.c.aliimg.com2427248124681200
1363157995052 138265441015C-0E-8B-C7-F1-E0:CMCC120.197.40.4402640200
1363157991076 1392643565620-10-7A-28-CC-0A:CMCC120.196.100.99241321512200
1363154400022 139262511065C-0E-8B-8B-B1-50:CMCC120.197.40.4402400200
1363157993044 1821157596194-71-AC-CD-E6-18:CMCC-EASY120.196.100.99iface.qiyi.com视频网站151215272106200
1363157993055 13560439658C4-17-FE-BA-DE-D9:CMCC120.196.100.9918151116954200
1363157995033 159201332575C-0E-8B-C7-BA-20:CMCC120.197.40.4sug.so.360.cn信息安全202031562936200
1363157983019 1371919941968-A1-B7-03-07-B1:CMCC-EASY120.196.100.82402400200
1363157984041 136605779915C-0E-8B-92-5C-20:CMCC-EASY120.197.40.4s19.cnzz.com站点统计2496960690200
1363157973098 150136858585C-0E-8B-C7-F7-90:CMCC120.197.40.4rank.ie.sogou.com搜索引擎282736593538200
1363157986029 15989002119E8-99-C4-4E-93-E0:CMCC-EASY120.196.100.99www.umeng.com站点统计331938180200
1363157992093 13560439658C4-17-FE-BA-DE-D9:CMCC120.196.100.991599184938200
1363157986041 134802531045C-0E-8B-C7-FC-80:CMCC-EASY120.197.40.433180180200
1363157984040 136028465655C-0E-8B-8B-B6-00:CMCC120.197.40.42052.flash2-http.qq.com综合门户151219382910200
1363157995093 1392231446600-FD-07-A2-EC-BA:CMCC120.196.100.82img.qfc.cn121230083720200
1363157982040 135024688235C-0A-5B-6A-0B-D4:CMCC-EASY120.196.100.99y0.ifengimg.com综合门户571027335110349200
1363157986072 1832017338284-25-DB-4F-10-1A:CMCC-EASY120.196.100.99input.shouji.sogou.com搜索引擎211895312412200
1363157990043 1392505741300-1F-64-E1-E6-9A:CMCC120.196.100.55t3.baidu.com搜索引擎69631105848243200
1363157988072 1376077871000-FD-07-A4-7B-08:CMCC120.196.100.8222120120200
1363157985079 1382307000120-7C-8F-70-68-1F:CMCC120.196.100.9963360180200
1363157985069 1360021750200-1F-64-E2-E8-B1:CMCC120.196.100.55181381080186852200
需求

  • 统计每个电话号码的总的上行流量、下行流量及总流量
  • 按照号码的前三个进行分区操作

分析

  • 将电话作为key,这样就可以按照key进行分组
  • 在reduce阶段进行求和汇总
具体操作
自定义一个数据类型
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public class Bean implements WritableComparable<Bean> {
  private int sum_low;
  private int sum_up;
  private int sum_bean;
  @Override
  public int compareTo(Bean o) {
    return this.sum_bean - o.sum_bean;
  }
  @Override
  public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeInt(sum_low);
    dataOutput.writeInt(sum_up);
    dataOutput.writeInt(sum_bean);
  }
  @Override
  public void readFields(DataInput dataInput) throws IOException {
    sum_low = dataInput.readInt();
    sum_up = dataInput.readInt();
    sum_bean = dataInput.readInt();
  }
  public void set(int sum_low, int sum_up, int sum_bean) {
    this.sum_low = sum_low;
    this.sum_up = sum_up;
    this.sum_bean = sum_bean;
  }
  @Override
  public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    Bean bean = (Bean) o;
    return sum_low == bean.sum_low &&
        sum_up == bean.sum_up &&
        sum_bean == bean.sum_bean;
  }
  @Override
  public int hashCode() {
    return Objects.hash(sum_low, sum_up, sum_bean);
  }
  @Override
  public String toString() {
    return sum_low + "\t" + sum_up + "\t" + sum_bean;
  }
  public int getSum_low() {
    return sum_low;
  }
  public void setSum_low(int sum_low) {
    this.sum_low = sum_low;
  }
  public int getSum_up() {
    return sum_up;
  }
  public void setSum_up(int sum_up) {
    this.sum_up = sum_up;
  }
  public int getSum_bean() {
    return sum_bean;
  }
  public void setSum_bean(int sum_bean) {
    this.sum_bean = sum_bean;
  }
}
Map阶段
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * 1363157986072 1832017338284-25-DB-4F-10-1A:CMCC-EASY120.196.100.99input.shouji.sogou.com搜索引擎211895312412200
 * 求每个手机号的上行流量之和、下行流量之和、上下行流量之和
 */
public class MapTest extends Mapper<LongWritable, Text, Text, Bean> {
  Bean v = new Bean();
  Text k = new Text();
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String [] datas = value.toString().split("\t");
    k.set(datas[1]);
    //这里先暂时存储单个的上下行流量
    v.set(Integer.parseInt(datas[datas.length-3]),Integer.parseInt(datas[datas.length-2]),0);
    context.write(k,v);
  }
}
自定义分区
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ParTest extends Partitioner<Text,Bean> {
  @Override
  public int getPartition(Text text, Bean bean, int i) {
    String prePhone =text.toString().substring(0,3);//substring左闭右开
    int partition = 4;//五个分区,从0开始算
    if("136".equals(prePhone)){
      partition = 0;
    }else if ("137".equals(prePhone)){
      partition = 1;
    }else if ("138".equals(prePhone)){
      partition = 2;
    }else if ("139".equals(prePhone)){
      partition = 3;
    }else {
      partition = 4;
    }
    return partition;
  }
}
Reduce阶段
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class RedTest extends Reducer<Text,Bean,Text,Bean> {
  int sum_low = 0;
  int sum_up = 0;
  Bean v = new Bean();
  @Override
  protected void reduce(Text key, Iterable<Bean> values, Context context) throws IOException, InterruptedException {
    for (Bean b:values){
      sum_up+=b.getSum_up();
      sum_low+=b.getSum_low();
    }
    v.set(sum_low,sum_up,sum_low+sum_up);
    context.write(key,v);
    sum_up=0;
    sum_low=0;
  }
}
Driver阶段
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
public class DriTest {
  public static void main(String[] args) throws Exception{
    File file = new File("D:\\FlowSum\\output");
    if (file.exists()){
      delFile(file);
      driver();
    }else {
      driver();
    }
  }
  public static void delFile(File file) {
    File[] files = file.listFiles();
    if (files != null && files.length != 0) {
      for (int i = 0;i<files.length;i++) {
        delFile(files[i]);
      }
    }
    file.delete();
  }
  public  static void driver() throws Exception{
    Configuration conf = new Configuration();
    conf.set("fs.default","hdfs://192.168.0.155:9000/");
    Job job = Job.getInstance(conf);
    job.setMapperClass(MapTest.class);
    job.setReducerClass(RedTest.class);
    job.setJarByClass(DriTest.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Bean.class);
    job.setOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Bean.class);
    job.setNumReduceTasks(5);
    job.setPartitionerClass(ParTest.class);
    FileInputFormat.setInputPaths(job, "/MR/input");
    FileOutputFormat.setOutputPath(job, new Path("/MR/output"));
    boolean b = job.waitForCompletion(true);
    System.exit(b ? 0 : 1);
  }
}
将程序打成jar包
在IDEA上打jar包的流程图
DSC0000.png
DSC0001.png

DSC0002.png
DSC0003.png
DSC0004.png

在集群上运行

  • 在集群上运行的命令:hadoop jar MPTEST.jar FlovwBean.DriTest
  • 其中FlovwBean.DriTest是Driver的绝对路径
    DSC0005.png




关注下面的标签,发现更多相似文章