阅读 198

hadoop系列(16)-----Writable序列化机制案例

  • Mapper

/**  * @author jacques huang  * @date 2021年11月10日  */ @Log4j public class OrderMapper extends Mapper<Object, Text, Text, OrderModel> {     private final OrderModel MODEL = new OrderModel();     private final Text KEY = new Text();     @Override     protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {             String[] order = value.toString().split(",");             MODEL.setOrderId(Long.valueOf(order[0]));             MODEL.setTotalMoney(Double.parseDouble(order[1]));             MODEL.setRealMoney(Double.parseDouble(order[2]));             MODEL.setAddress(order[3]);             MODEL.setCreateTime(StringUtils.isNotBlank(order[4])?LocalDateTime.parse(order[4], DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")):null);             MODEL.setPaysTime(StringUtils.isNotBlank(order[5])?LocalDateTime.parse(order[5], DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")):null);             MODEL.setCacelMoney(Double.parseDouble(order[6]));             KEY.set(MODEL.getAddress());             log.info("模型的值为"+MODEL);             context.write(KEY, MODEL);     } } 复制代码

  • Reduce

/**  * @author jacques huang  * @date 2021年11月10日  */ public class OrderReduce  extends Reducer<Text, OrderModel,Text, OrderOutModel> {     @Override     protected void reduce(Text key, Iterable<OrderModel> values, Context context) throws IOException, InterruptedException {         OrderOutModel model=new OrderOutModel();         for (OrderModel value : values) {             Double totalMoney = Optional.ofNullable(model.getTotalMoney()).orElse(0D);             Integer totalOrder = Optional.ofNullable(model.getTotalOrder()).orElse(0);             model.setTotalMoney(totalMoney+ value.getTotalMoney());             model.setTotalOrder(totalOrder+ 1);         }         context.write(key,model);     } } 复制代码

  • Model

/**  * @author jacques huang  * @date 2021年11月10日  */ @Data @Builder @AllArgsConstructor @NoArgsConstructor public class OrderModel implements Writable { //通过实现Writable 并且重写write、readFields方法实现序列化和返序列化     private Long orderId;     private Double totalMoney;     private Double realMoney;     private String address;     private LocalDateTime createTime;     private LocalDateTime paysTime;     private Double cacelMoney;     @Override     public void write(DataOutput out) throws IOException {      /**          * 写处理          */         out.writeLong(orderId);         out.writeDouble(totalMoney);         out.writeDouble(realMoney);         out.writeUTF(address);         if (createTime != null) {             out.writeUTF(createTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));         } else {             out.writeUTF("");         }         if (paysTime != null) {             out.writeUTF(paysTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));         } else {             out.writeUTF("");         }         out.writeDouble(cacelMoney);     }          @Override     public void readFields(DataInput in) throws IOException {         /**          * 读处理          */         this.orderId = in.readLong();         this.totalMoney = in.readDouble();         this.realMoney = in.readDouble();         this.address = in.readUTF();          String  createTimeStr = in.readUTF();         this.createTime = StringUtils.isNotBlank(createTimeStr)?LocalDateTime.parse(createTimeStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")):null;         String  paysTimeStr = in.readUTF();         this.paysTime = StringUtils.isNotBlank(paysTimeStr)?LocalDateTime.parse(paysTimeStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")):null;;         this.cacelMoney = in.readDouble();     } } 复制代码

/**  * @author jacques huang  * @date 2021年11月10日  */ @Data @AllArgsConstructor @NoArgsConstructor @Builder public class OrderOutModel implements Writable { //通过实现Writable 并且重写write、readFields方法实现序列化和返序列化     /**      * 总金额      */     private  Double totalMoney;     /**      * 总单数      */     private  Integer totalOrder;     @Override     public void write(DataOutput out) throws IOException {         /**          * 写处理          */         out.writeDouble(totalMoney);         out.writeInt(totalOrder);     }     @Override     public void readFields(DataInput in) throws IOException {         /**          * 读处理          */         this.totalOrder=in.readInt();         this.totalMoney=in.readDouble();     }     @Override     public String toString() {         /**          * 继承了Writable输出会以这个toString为输出格式          */         return "\t 成交总金额:" + totalMoney +"\t 成交总单数:" + totalOrder;     } } 复制代码

MapReduceOrderApplication(启动提交任务类)

public class MapReduceOrderApplication {       public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {           System.setProperty("hadoop.home.dir", "E:\\winutils-master\\hadoop-2.6.0\\");           Configuration conf = new Configuration(true);           /**            *    让框架知道是windows异构平台运行  在windows 平台需要            */           conf.set("mapreduce.app-submission.cross-platform", "true");           Job job = Job.getInstance(conf);           /**            * 指定程序的jar包路径            */           job.setJar("F:\\sourceCode\\hadoop\\mapreduce-order-demo\\target\\mapreduce-order-demo-1.0.jar");           /**            * 指定程序的启动类            */           job.setJarByClass(MapReduceOrderApplication.class);           /**            * 指定任务名称            */           job.setJobName("example-order-demo0003");           /**            * 输入数据            */           Path infile = new Path("/data/order");           TextInputFormat.addInputPath(job, infile);           /**            * 输出数据            */           Path outfile = new Path("/result/order/demo0001");           if (outfile.getFileSystem(conf).exists(outfile)) {               outfile.getFileSystem(conf).delete(outfile, true);           }           TextOutputFormat.setOutputPath(job, outfile);           /**            * mapper配置            */           job.setMapperClass(OrderMapper.class);           /**            *类型配置            */           job.setMapOutputKeyClass(Text.class);           job.setMapOutputValueClass(OrderModel.class);           /**            *Reduce配置            */           job.setReducerClass(OrderReduce.class);           job.setOutputKeyClass(Text.class);           job.setOutputValueClass(OrderOutModel.class);           /**            *任务提交            */           job.waitForCompletion(true);      } } 复制代码

  • 输出结果

上海  成交总金额:544907.6300000018  成交总单数:3353 云南省  成交总金额:75769.32000000007  成交总单数:778 内蒙古自治区  成交总金额:36827.0  成交总单数:215 北京  成交总金额:231055.48999999982  成交总单数:2054 吉林省  成交总金额:42040.92000000002  成交总单数:401 四川省  成交总金额:188948.11999999985  成交总单数:2019 天津  成交总金额:124564.23999999982  成交总单数:1153 宁夏回族自治区  成交总金额:4804.92  成交总单数:42 安徽省  成交总金额:61378.67000000005  成交总单数:609 山东省  成交总金额:175046.12999999986  成交总单数:1804 山西省  成交总金额:46568.800000000025  成交总单数:465 广东省  成交总金额:227855.27999999962  成交总单数:2463 广西壮族自治区  成交总金额:35140.10000000001  成交总单数:436 新疆维吾尔自治区  成交总金额:10112.9  成交总单数:58 江苏省  成交总金额:227930.92999999895  成交总单数:2126 江西省  成交总金额:36791.65  成交总单数:411 河北省  成交总金额:106561.5599999999  成交总单数:1083 河南省  成交总金额:90619.72000000003  成交总单数:966 浙江省  成交总金额:203126.9599999989  成交总单数:2061 海南省  成交总金额:16828.18  成交总单数:178 湖北省  成交总金额:8581.7  成交总单数:75 湖南省  成交总金额:102929.22000000006  成交总单数:1099 甘肃省  成交总金额:14294.759999999998  成交总单数:167 福建省  成交总金额:37075.53000000001  成交总单数:489 西藏自治区  成交总金额:489.72  成交总单数:3 贵州省  成交总金额:32274.16  成交总单数:345 辽宁省  成交总金额:107355.92999999993  成交总单数:1187 重庆  成交总金额:108975.65000000007  成交总单数:1036 陕西省  成交总金额:59450.93000000003  成交总单数:536 青海省  成交总金额:2396.2  成交总单数:19 黑龙江省  成交总金额:35058.29000000001  成交总单数:379


作者:jacqueshuang
链接:https://juejin.cn/post/7031157748708573191


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐