阅读 849

fastapi微服务系列(2)-之GRPC的interceptor拦截器简单使用(中间件)

对于一个框架来说,通常具备有所谓的中间件,有时候也可以说是拦截器,其实和钩子差不多的概念。

那grpc也不例外。但是使用python如何应用到我们的拦截器的呐? 拦截器又可以做哪些事情呢?

1:grpc的拦截器可以做啥?

本身拦截器的概念和我们的中间件类似,所以类似fastapi中我们的中间件能做,拦截器都可以做:

  • 身份验证

  • 日志请求记录

  • 全局上下文的信息处理等

  • 多个拦截器和多个中间件遵循的请求规则都是洋葱模型

  • 拦截器必须有返回值,返回是响应报文体

PS:而且相对GRPC来说不止于我们的服务端有钩子,客户端也有钩子(拦截器),和我们的httpx库提供的类似的钩子函数差不多!

PS:拦截器可以作用再客户端和服务端:客户端拦截器和服务端拦截器

2:grpc的拦截器分类

  • 一元拦截器(UnaryServerInterceptor)-客户端中

  • 流式拦截器(StreamClientInterceptor)- 客户端中

  • python中的服务端是实现ServerInterceptor

image.png

3:在python实现grpc拦截器

查看服务传递的拦截器参数说明:

image.png

3.1 服务端的自带拦截器

主要注意点:

  • 拦截器传入是一个实例化的对象

  • 拦截器列表的传入,可以是元组也可以是列表

  • 多拦截器的形式遵循洋葱模型

服务端拦截器需要实现拦截器的抽象方法:

image.png

完整服务端示例代码:

from concurrent import futures
import time
import grpc
import hello_pb2
import hello_pb2_grpc
import signal


# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 返回是我们的定义的响应体的对象
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

    def SayHelloAgain(self, request, context):
        # 返回是我们的定义的响应体的对象

        # # 设置异常状态码
        # context.set_code(grpc.StatusCode.PERMISSION_DENIED)
        # context.set_details("你没有这个访问的权限")
        # raise context

        # 接收请求头的信息
        print("接收到的请求头元数据信息", context.invocation_metadata())
        # 设置响应报文头信息
        context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
        # 三种的压缩机制处理
        # NoCompression = _compression.NoCompression
        # Deflate = _compression.Deflate
        # Gzip = _compression.Gzip
        # 局部的数据进行压缩
        context.set_compression(grpc.Compression.Gzip)
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))


class MyUnaryServerInterceptor1(grpc.ServerInterceptor):

    def intercept_service(self,continuation, handler_call_details):
        print("我是拦截器1号:开始----1")
        respn = continuation(handler_call_details)
        print("我是拦截器1号:结束----2",respn)
        return respn

class MyUnaryServerInterceptor2(grpc.ServerInterceptor):

    def intercept_service(self,continuation, handler_call_details):
        print("我是拦截器2号:开始----1")
        respn = continuation(handler_call_details)
        print("我是拦截器2号:结束----2",respn)
        return respn

def serve():

    # 实例化一个rpc服务,使用线程池的方式启动我们的服务
    # 服务一些参数信息的配置
    options = [
        ('grpc.max_send_message_length', 60 * 1024 * 1024),  # 限制发送的最大的数据大小
        ('grpc.max_receive_message_length', 60 * 1024 * 1024),  # 限制接收的最大的数据的大小
    ]
    # 三种的压缩机制处理
    # NoCompression = _compression.NoCompression
    # Deflate = _compression.Deflate
    # Gzip = _compression.Gzip
    # 配置服务启动全局的数据传输的压缩机制
    compression = grpc.Compression.Gzip
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                         options=options,
                         compression=compression,
                         interceptors=[MyUnaryServerInterceptor1(),MyUnaryServerInterceptor2()])
    # 添加我们服务
    hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    # 配置启动的端口
    server.add_insecure_port('[::]:50051')
    #  开始启动的服务
    server.start()

    def stop_serve(signum, frame):
        print("进程结束了!!!!")
        # sys.exit(0)
        raise KeyboardInterrupt

    # 注销相关的信号
    # SIGINT 对应windos下的 ctrl+c的命令
    # SIGTERM 对应的linux下的kill命令
    signal.signal(signal.SIGINT, stop_serve)
    # signal.signal(signal.SIGTERM, stop_serve)

    # wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
    server.wait_for_termination()


if __name__ == '__main__':
    serve()复制代码

关键的配置地方是:

image.png此时使用我们的客户端请求服务端,服务端会输出一下的信息:

我是拦截器1号:开始----1
我是拦截器2号:开始----1
我是拦截器2号:结束----2 RpcMethodHandler(request_streaming=False, response_streaming=False, request_deserializer=<built-in method FromString of GeneratedProtocolMessageType object at 0x00000175D2988558>, response_serializer=<method 'SerializeToString' of 'google.protobuf.pyext._message.CMessage' objects>, unary_unary=<bound method Greeter.SayHelloAgain of <__main__.Greeter object at 0x00000175D46167B8>>, unary_stream=None, stream_unary=None, stream_stream=None)
我是拦截器1号:结束----2 RpcMethodHandler(request_streaming=False, response_streaming=False, request_deserializer=<built-in method FromString of GeneratedProtocolMessageType object at 0x00000175D2988558>, response_serializer=<method 'SerializeToString' of 'google.protobuf.pyext._message.CMessage' objects>, unary_unary=<bound method Greeter.SayHelloAgain of <__main__.Greeter object at 0x00000175D46167B8>>, unary_stream=None, stream_unary=None, stream_stream=None)
接收到的请求头元数据信息 (_Metadatum(key='mesasge', value='1010'), _Metadatum(key='error', value='No Error'), _Metadatum(key='user-agent', value='grpc-python/1.41.1 grpc-c/19.0.0 (windows; chttp2)'))复制代码

3.2 客户端的自带拦截器

客户端拦截器的需要实现类和服务端的不一样:

image.png

且当我们的使用客户端拦截器的时候,主要链接到我们的RPC的时候的方式也有所改变:

image.png

完整客户端示例代码:

import grpc
import hello_pb2
import hello_pb2_grpc


class ClientServerInterceptor1(grpc.UnaryUnaryClientInterceptor):
    def intercept_unary_unary(self, continuation, client_call_details, request):
        print("客户端的拦截器1:---开始1")
        resp = continuation(client_call_details, request)
        print("客户端的拦截器1:---结束2", resp)
        return resp

class ClientServerInterceptor2(grpc.UnaryUnaryClientInterceptor):
    def intercept_unary_unary(self, continuation, client_call_details, request):
        print("客户端的拦截器2:---开始1")
        resp = continuation(client_call_details, request)
        print("客户端的拦截器2:---结束2", resp)
        return resp

def run():
    # 连接 rpc 服务器
    options = [
        ('grpc.max_send_message_length', 100 * 1024 * 1024),
        ('grpc.max_receive_message_length', 100 * 1024 * 1024),
        ('grpc.enable_retries', 1),
        ('grpc.service_config',
         '{ "retryPolicy":{ "maxAttempts": 4, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMutiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] } }')
    ]

    # 三种的压缩机制处理
    # NoCompression = _compression.NoCompression
    # Deflate = _compression.Deflate
    # Gzip = _compression.Gzip
    # 配置服务启动全局的数据传输的压缩机制
    compression = grpc.Compression.Gzip
    # with grpc.insecure_channel(target='localhost:50051',
    #                            options=options,
    #                            compression=compression
    #                            ) as channel:


    with grpc.insecure_channel(target='localhost:50051',
                               options=options,
                               compression=compression
                               ) as channel:
        # 通过通道服务一个服务intercept_channel
        interceptor_channel = grpc.intercept_channel(channel, ClientServerInterceptor1(),ClientServerInterceptor2())
        stub = hello_pb2_grpc.GreeterStub(interceptor_channel)
        # 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
        try:

            reest_header = (
                ('mesasge', '1010'),
                ('error', 'No Error')
            )

            response, callbask = stub.SayHelloAgain.with_call(request=hello_pb2.HelloRequest(name='欢迎下次光临'),
                                                              # 设置请求的超时处理
                                                              timeout=5,
                                                              # 设置请求的头的信息
                                                              metadata=reest_header,
                                                              )
            print("SayHelloAgain函数调用结果的返回: " + response.message)
            print("SayHelloAgain函数调用结果的返回---响应报文头信息: ", callbask.trailing_metadata())
        except grpc._channel._InactiveRpcError as e:
            print(e.code())
            print(e.details())


if __name__ == '__main__':
    run()复制代码

4:grpc拦截器上下文传递

我们的都知道作为中间件的话,一般某些业务场景下是有些使用承载请求上下文的传递的任务滴,然是自带的拦截器,似乎完全没有对应的

request, context复制代码

相关的引入传递,如果我们的需要传递上下文的时候呢?这就无法实现了!!!!

要实现具有上下文的传递拦截器的话使用第三方库来实现:

 pip install grpc-interceptor复制代码

这个库还字典的相关的测试:

$ pip install grpc-interceptor[testing]复制代码

4.1 改造服务端拦截器

该用 第三方库后的完整服务端改造示例:

from concurrent import futures
import time
import grpc
import hello_pb2
import hello_pb2_grpc
import signal
from typing import Any,Callable

# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 返回是我们的定义的响应体的对象
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

    def SayHelloAgain(self, request, context):
        # 返回是我们的定义的响应体的对象

        # # 设置异常状态码
        # context.set_code(grpc.StatusCode.PERMISSION_DENIED)
        # context.set_details("你没有这个访问的权限")
        # raise context

        # 接收请求头的信息
        print("接收到的请求头元数据信息", context.invocation_metadata())
        # 设置响应报文头信息
        context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
        # 三种的压缩机制处理
        # NoCompression = _compression.NoCompression
        # Deflate = _compression.Deflate
        # Gzip = _compression.Gzip
        # 局部的数据进行压缩
        context.set_compression(grpc.Compression.Gzip)
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException
from grpc_interceptor.exceptions import NotFound
class MyUnaryServerInterceptor1(ServerInterceptor):

    def intercept(
            self,
            method: Callable,
            request: Any,
            context: grpc.ServicerContext,
            method_name: str,
    ) -> Any:

        rsep = None
        try:
            print("我是拦截器1号:开始----1")
            rsep= method(request, context)
        except GrpcException as e:
            context.set_code(e.status_code)
            context.set_details(e.details)
            raise
        finally:
            print("我是拦截器1号:结束----2",rsep)
            return rsep

class MyUnaryServerInterceptor2(ServerInterceptor):

    def intercept(
            self,
            method: Callable,
            request: Any,
            context: grpc.ServicerContext,
            method_name: str,
    ) -> Any:

        rsep = None
        try:
            print("我是拦截器2号:开始----1")
            rsep= method(request, context)
        except GrpcException as e:
            context.set_code(e.status_code)
            context.set_details(e.details)
            raise
        finally:
            print("我是拦截器2号:结束----2",rsep)
            return rsep

def serve():

    # 实例化一个rpc服务,使用线程池的方式启动我们的服务
    # 服务一些参数信息的配置
    options = [
        ('grpc.max_send_message_length', 60 * 1024 * 1024),  # 限制发送的最大的数据大小
        ('grpc.max_receive_message_length', 60 * 1024 * 1024),  # 限制接收的最大的数据的大小
    ]
    # 三种的压缩机制处理
    # NoCompression = _compression.NoCompression
    # Deflate = _compression.Deflate
    # Gzip = _compression.Gzip
    # 配置服务启动全局的数据传输的压缩机制
    compression = grpc.Compression.Gzip
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                         options=options,
                         compression=compression,
                         interceptors=[MyUnaryServerInterceptor1(),MyUnaryServerInterceptor2()])
    # 添加我们服务
    hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    # 配置启动的端口
    server.add_insecure_port('[::]:50051')
    #  开始启动的服务
    server.start()

    def stop_serve(signum, frame):
        print("进程结束了!!!!")
        # sys.exit(0)
        raise KeyboardInterrupt

    # 注销相关的信号
    # SIGINT 对应windos下的 ctrl+c的命令
    # SIGTERM 对应的linux下的kill命令
    signal.signal(signal.SIGINT, stop_serve)
    # signal.signal(signal.SIGTERM, stop_serve)

    # wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
    server.wait_for_termination()


if __name__ == '__main__':
    serve()复制代码

通过上面的方式,我们就可以对应我们的上下文请求做相关的处理了!这个和我们的web框架的中间件几乎是接近类似了!

4.2 简单分析第三方库简单源码

进入这个第三方库的源码内部的,其实发现它自己也是实现了

grpc.ServerInterceptor复制代码

然后对它进一步进行了抽象一层

  • 第一步其实和我们自带的实现一样,先是获取返回的下一个带处理的handle

next_handler = continuation(handler_call_details)复制代码

然后对返回这个next_handler进行是那种类型的的拦截器:

- unary_unary
- unary_stream
- stream_unary
- stream_stream复制代码
  • 判断完成是哪里蕾西的拦截器之后返回

handler_factory, next_handler_method复制代码

然后调用的是最终返回是handler_factory的对象

  • handler_factory的对象需要的参数有:

    • invoke_intercept_method 拦截器的方法

    • request_deserializer 请求的系列化

    • response_serializer 响应的系列化

  • 而我们的invoke_intercept_method 拦截器的方法获取则需要

    request: Any,
    context: grpc.ServicerContext,复制代码
    • 传入定义的一个

  • 然后返回是我们的最终需要实现的方法!我去晕了~

4.3 补充说明handler_call_details

如果我们的单纯只是需要获取到RPC请求里面的提交请求头元数据的,我们可以使用它读取:

print("handler call details: ", handler_call_details.invocation_metadata)复制代码

它本身是一个:

grpc._server._HandlerCallDetails的类型复制代码

总结

以上仅仅是个人结合自己的实际需求,做学习的实践笔记!如有笔误!欢迎批评指正!感谢各位大佬!

结尾


作者:小钟同学
链接:https://juejin.cn/post/7031098422589390861


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐