阅读 148

Hadoop Rpc简单实现

Hadoop Rpc简单实现

Hadoop Rpc简单实现

一、Hadoop RPC总体架构

二、Hadoop Rpc特点

三、实现步骤

四、运行及实验结果

一、Hadoop RPC总体架构



序列化层

序列化作用主要还是将结构化数据对象转换成字节流用于网络传输或写入持久存储。

在RPC中,主要是将用户请求的参数或者服务器应答转化成字节流跨机器传输。

函数调用层

作用:定位所需调用的函数并执行函数。

依赖:Java的反射与Java的动态代理模式来实现

网络传输层

作用:用于描述Client与Server之间的消息格式。

依赖:Hadoop RPC 依赖基于TCP/IP协议中的Socket机制

服务器端处理框架

作用:抽象为网络IO模型,用于述client与server间信息交互方式。

网络IO模型:阻塞式IO、NIO、事件驱动IO等

Hadoop RPC:基于Reactor设计模式的事件驱动IO模型。

二、Hadoop Rpc特点

透明性

这是所有RPC框架最根本的特点,即当用户在一台计算机的程序调用另外一台计算机上的子程序时,用户自身不应感觉到其间涉及跨机器间的通信,而是感觉像是在执行一个本地调用。

高性能

Hadoop各个系统(如HDFS、YARN、MapReduce等)均采用了Master/Slave结构,其中,Master实际上是一个RPC Server,它负责处理集群中所有Slave发送的服务请求,为了保证Master的并发处理能力,RPCServer应是一个高性能服务器,能够高效地处理来自多个Client的并发RPC请求。

可控性

相对于RMI(remote method invocation)而言,Hadoop RPC具有轻量级、可控性等优点,即用户程序调用RPC接口可控的地方比较RMI多些。

三、实现步骤

创建Maven项目,添加maven依赖(maven有问题,可以直接导入hadoop相关依赖包)


<dependencies>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.7.3</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.7.3</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-mapreduce-client-common</artifactId>

            <version>2.7.3</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-mapreduce-client-core</artifactId>

            <version>2.7.3</version>

        </dependency>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

定义接口协议


/**

 * 1、定义RPC接口协议,添加自定义方法

 * 必须继承Hadoop提供的接口VersionedProtocol

 */

public interface RPCProtocol {

    //RPC client 和 server 之间必须使用相同的版本的协议才能进行正常通信

    public static final long versionID = 1L;

    //自定义方法

    public String echo(String value) throws Exception;

    public int add(int v1, int v2) throws Exception;

}

1

2

3

4

5

6

7

8

9

10

11

定义接口协议实现类


import org.apache.hadoop.ipc.ProtocolSignature;

import java.io.IOException;


/**

 * 2、定义一个Java类,实现自定义的RCP接口

 */

public class RPCProtocolImplement implements RPCProtocol{

    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {

        System.out.println("===getProtocolVersion被调用===protocol=" + protocol + "\t clientVersion=" + clientVersion);

        return RPCProtocol.versionID;

    }

    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {

        System.out.println("===getProtocolSignature被调用===protocol=" + protocol + ",clientVersion=" + clientVersion + ",clientMethodsHash=" + clientMethodsHash);

        return new ProtocolSignature(RPCProtocol.versionID,null);

    }


    public String echo(String value) throws Exception {

        System.out.println("好的,我已收到你的信息");

        return value;

    }


    public int add(int v1, int v2) throws Exception {

        System.out.println("正在帮您计算,稍等片刻……");

        int sum = v1 + v2;

        System.out.println("计算完毕,结果为:" + sum);

        return sum;

    }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

定义服务端(Server)


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.ipc.RPC;


import java.io.IOException;


/**

 * 3、构造RPC服务启动类

 */

public class RPCServer {

    public static void main(String[] args) throws IOException {

        RPC.Server server = new RPC.Builder(new Configuration())

                .setProtocol(RPCProtocol.class)

                .setInstance(new RPCProtocolImplement())

                .setBindAddress("localhost")

                .setPort(8001).setNumHandlers(1).build();

        server.start();

    }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

定义客户端Client


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.ipc.RPC;


import java.net.InetSocketAddress;


/**

 * 4、定义RPC客户端

 */

public class RPCClient {

    public static void main(String[] args) throws Exception {

        RPCProtocol client = RPC.getProxy(RPCProtocol.class,

                RPCProtocol.versionID,

                new InetSocketAddress("localhost",8001),

                new Configuration());

        String echo = client.echo("rpc服务,请帮我计算一下:");

        System.out.println(echo);

        int sum = client.add(400, 300);

        System.out.println("收到rpc服务的计算结果:" + sum);

        // 停止客户端

        RPC.stopProxy(client);

    }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

四、运行及实验结果

启动RPCServer服务端服务(先启动)


启动RPCClient客户端调用服务

运行结果如下

客户端Client控制台会看到如下日志:



服务端Server控制台会看到如下日志:


————————————————

版权声明:本文为CSDN博主「若兰幽竹」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/sujiangming/article/details/115985509


文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐