hadoop系列(16)-----Writable序列化机制案例
Mapper
/** * @author jacques huang * @date 2021年11月10日 */ @Log4j public class OrderMapper extends Mapper<Object, Text, Text, OrderModel> { private final OrderModel MODEL = new OrderModel(); private final Text KEY = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] order = value.toString().split(","); MODEL.setOrderId(Long.valueOf(order[0])); MODEL.setTotalMoney(Double.parseDouble(order[1])); MODEL.setRealMoney(Double.parseDouble(order[2])); MODEL.setAddress(order[3]); MODEL.setCreateTime(StringUtils.isNotBlank(order[4])?LocalDateTime.parse(order[4], DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")):null); MODEL.setPaysTime(StringUtils.isNotBlank(order[5])?LocalDateTime.parse(order[5], DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")):null); MODEL.setCacelMoney(Double.parseDouble(order[6])); KEY.set(MODEL.getAddress()); log.info("模型的值为"+MODEL); context.write(KEY, MODEL); } } 复制代码
Reduce
/** * @author jacques huang * @date 2021年11月10日 */ public class OrderReduce extends Reducer<Text, OrderModel,Text, OrderOutModel> { @Override protected void reduce(Text key, Iterable<OrderModel> values, Context context) throws IOException, InterruptedException { OrderOutModel model=new OrderOutModel(); for (OrderModel value : values) { Double totalMoney = Optional.ofNullable(model.getTotalMoney()).orElse(0D); Integer totalOrder = Optional.ofNullable(model.getTotalOrder()).orElse(0); model.setTotalMoney(totalMoney+ value.getTotalMoney()); model.setTotalOrder(totalOrder+ 1); } context.write(key,model); } } 复制代码
Model
/** * @author jacques huang * @date 2021年11月10日 */ @Data @Builder @AllArgsConstructor @NoArgsConstructor public class OrderModel implements Writable { //通过实现Writable 并且重写write、readFields方法实现序列化和返序列化 private Long orderId; private Double totalMoney; private Double realMoney; private String address; private LocalDateTime createTime; private LocalDateTime paysTime; private Double cacelMoney; @Override public void write(DataOutput out) throws IOException { /** * 写处理 */ out.writeLong(orderId); out.writeDouble(totalMoney); out.writeDouble(realMoney); out.writeUTF(address); if (createTime != null) { out.writeUTF(createTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); } else { out.writeUTF(""); } if (paysTime != null) { out.writeUTF(paysTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); } else { out.writeUTF(""); } out.writeDouble(cacelMoney); } @Override public void readFields(DataInput in) throws IOException { /** * 读处理 */ this.orderId = in.readLong(); this.totalMoney = in.readDouble(); this.realMoney = in.readDouble(); this.address = in.readUTF(); String createTimeStr = in.readUTF(); this.createTime = StringUtils.isNotBlank(createTimeStr)?LocalDateTime.parse(createTimeStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")):null; String paysTimeStr = in.readUTF(); this.paysTime = StringUtils.isNotBlank(paysTimeStr)?LocalDateTime.parse(paysTimeStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")):null;; this.cacelMoney = in.readDouble(); } } 复制代码
/** * @author jacques huang * @date 2021年11月10日 */ @Data @AllArgsConstructor @NoArgsConstructor @Builder public class OrderOutModel implements Writable { //通过实现Writable 并且重写write、readFields方法实现序列化和返序列化 /** * 总金额 */ private Double totalMoney; /** * 总单数 */ private Integer totalOrder; @Override public void write(DataOutput out) throws IOException { /** * 写处理 */ out.writeDouble(totalMoney); out.writeInt(totalOrder); } @Override public void readFields(DataInput in) throws IOException { /** * 读处理 */ this.totalOrder=in.readInt(); this.totalMoney=in.readDouble(); } @Override public String toString() { /** * 继承了Writable输出会以这个toString为输出格式 */ return "\t 成交总金额:" + totalMoney +"\t 成交总单数:" + totalOrder; } } 复制代码
MapReduceOrderApplication(启动提交任务类)
public class MapReduceOrderApplication { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { System.setProperty("hadoop.home.dir", "E:\\winutils-master\\hadoop-2.6.0\\"); Configuration conf = new Configuration(true); /** * 让框架知道是windows异构平台运行 在windows 平台需要 */ conf.set("mapreduce.app-submission.cross-platform", "true"); Job job = Job.getInstance(conf); /** * 指定程序的jar包路径 */ job.setJar("F:\\sourceCode\\hadoop\\mapreduce-order-demo\\target\\mapreduce-order-demo-1.0.jar"); /** * 指定程序的启动类 */ job.setJarByClass(MapReduceOrderApplication.class); /** * 指定任务名称 */ job.setJobName("example-order-demo0003"); /** * 输入数据 */ Path infile = new Path("/data/order"); TextInputFormat.addInputPath(job, infile); /** * 输出数据 */ Path outfile = new Path("/result/order/demo0001"); if (outfile.getFileSystem(conf).exists(outfile)) { outfile.getFileSystem(conf).delete(outfile, true); } TextOutputFormat.setOutputPath(job, outfile); /** * mapper配置 */ job.setMapperClass(OrderMapper.class); /** *类型配置 */ job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(OrderModel.class); /** *Reduce配置 */ job.setReducerClass(OrderReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(OrderOutModel.class); /** *任务提交 */ job.waitForCompletion(true); } } 复制代码
输出结果
上海 成交总金额:544907.6300000018 成交总单数:3353 云南省 成交总金额:75769.32000000007 成交总单数:778 内蒙古自治区 成交总金额:36827.0 成交总单数:215 北京 成交总金额:231055.48999999982 成交总单数:2054 吉林省 成交总金额:42040.92000000002 成交总单数:401 四川省 成交总金额:188948.11999999985 成交总单数:2019 天津 成交总金额:124564.23999999982 成交总单数:1153 宁夏回族自治区 成交总金额:4804.92 成交总单数:42 安徽省 成交总金额:61378.67000000005 成交总单数:609 山东省 成交总金额:175046.12999999986 成交总单数:1804 山西省 成交总金额:46568.800000000025 成交总单数:465 广东省 成交总金额:227855.27999999962 成交总单数:2463 广西壮族自治区 成交总金额:35140.10000000001 成交总单数:436 新疆维吾尔自治区 成交总金额:10112.9 成交总单数:58 江苏省 成交总金额:227930.92999999895 成交总单数:2126 江西省 成交总金额:36791.65 成交总单数:411 河北省 成交总金额:106561.5599999999 成交总单数:1083 河南省 成交总金额:90619.72000000003 成交总单数:966 浙江省 成交总金额:203126.9599999989 成交总单数:2061 海南省 成交总金额:16828.18 成交总单数:178 湖北省 成交总金额:8581.7 成交总单数:75 湖南省 成交总金额:102929.22000000006 成交总单数:1099 甘肃省 成交总金额:14294.759999999998 成交总单数:167 福建省 成交总金额:37075.53000000001 成交总单数:489 西藏自治区 成交总金额:489.72 成交总单数:3 贵州省 成交总金额:32274.16 成交总单数:345 辽宁省 成交总金额:107355.92999999993 成交总单数:1187 重庆 成交总金额:108975.65000000007 成交总单数:1036 陕西省 成交总金额:59450.93000000003 成交总单数:536 青海省 成交总金额:2396.2 成交总单数:19 黑龙江省 成交总金额:35058.29000000001 成交总单数:379
作者:jacqueshuang
链接:https://juejin.cn/post/7031157748708573191