阅读 301

fastapi微服务系列-之封装GRPC实现类似fastapi一个框架

因为使用习惯了fastapi,在进行GRPC相关的处理的时候,其实很多东西是可以触类旁通的,所以寻思着封装了一个类似fastapi的使用方法来使用我们的GRPC框架。

参考了fastapi一些非常好的设计,我依葫芦画瓢的方式封装了我当前自用的grpc框架。

实现的功能也基本和我们的fastapi相同。

框架功能点


    1. 实现类似fastapi的on_event注册的启动和关闭的事件处理机制


    1. 实现类似fastapi的添加中间件处理机制


    1. 可以实现使用装饰器或调用的方法的方式进行服务的添加


    1. 可以进行相关扩展插件加载


    1. 动态批量载入服务


    1. 支持注册中心的扩展


    1. 如果需要对传递的参数校验,还可以使用pydantic进行参数校验


    1. 对于统一规范全局异常处理

局限:目前仅支持多线程的形式,异步的还没深入继续研究

框架结构

image.png

image.png

结构说明:

  • grpcframe 主要是封装公共一些可以公用服务


--  config 和我之前fastapi脚手架类似,主要是配置一些插件或其他地方需要配置,这里其实不应该放这,后续的服务应该分离出去这个配置信息,这个可以综合看需求


-- core 主要是整个APP启动的封装,这里其实主要对我们的grpc的启动做相关封装,如服务的加载和上面提到一些中间件处理方式。核心其实就是使用一些列表或字典保持相关的实例信息,统一进行装载到我们的grpc服务对象上面。


-- exception 主要封装统一归还异常抛出处理


-- ext 主要统一一些常用的服务中用到的扩展,如JWT验证之类,还有第三方的库的redis的使用


-- handler主要是实现基础常用公共的一些默认服务,如健康检测的HealthServicer


-- middlewares 主要是实现基础常用公共的一些默认中间件,如上下文管理的中间件,这个是需要,还有链路追踪。


-- utils 一些辅助工具类


-- validation 对应解析请求request的校验异常处理

核心代码

因为比较简单,其实完成可以自己去鼓捣,我这是个人使用,如有问题,希望有大佬可以交流指出!

app代码:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     serve
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/12/6
-------------------------------------------------
   修改描述-2021/12/6:         
-------------------------------------------------
"""
from functools import lru_cache
import sys
import time
import grpc
from concurrent import futures
import signal
import logging
import typing
import platform
import inspect

from grpcframe.core.middlewares import BaseMiddleware
from grpcframe.core.logger import Logger
from grpcframe.utils.signal_wrapper import signal_send
from grpcframe.config.serve_conf import AppSettings
from grpcframe.ext.register.base_register import Register
from grpcframe.ext import BaseExpand
import contextvars
from dataclasses import dataclass
from grpcframe.utils.singleton import Singleton
import contextlib

_LOGGER = logging.getLogger(__name__)


class State(object):
    """
    An object that can be used to store arbitrary state.

    Used for `request.state` and `app.state`.
    """

    def __init__(self, state: typing.Dict = None):
        if state is None:
            state = {}
        super(State, self).__setattr__("_state", state)

    def __setattr__(self, key: typing.Any, value: typing.Any) -> None:
        self._state[key] = value

    def __getattr__(self, key: typing.Any) -> typing.Any:
        try:
            return self._state[key]
        except KeyError:
            message = "'{}' object has no attribute '{}'"
            raise AttributeError(message.format(self.__class__.__name__, key))

    def setattr(self, key: typing.Any, value: typing.Any) -> None:
        self._state[key] = value


@dataclass
class GManager(metaclass=Singleton):
    # 记录当前启动的app
    current_app_server = contextvars.ContextVar('current_app_server', default=None)
    # 记录当前请求的请求
    current_app_request = contextvars.ContextVar('current_app_request', default=None)
    # 记录当前请求的请求
    current_app_context = contextvars.ContextVar('current_app_context', default=None)
    # 记录当前被激活的追踪链路的span
    active_tracer_span = contextvars.ContextVar('current_active_tracer_span', default=None)


g = GManager()


class FastGrpcApp:

    # 实例化配置信息
    def __init__(self,
                 host='127.0.0.1',
                 # 启动的服务的端口号
                 port=None,
                 # 工作线程数
                 max_workers=10,
                 # 是否开启热重启机制
                 re_load=False,
                 # 是否进行debug日志输出
                 debug: bool = False,
                 # 当前服务的版本信息
                 version: str = "0.1.0",

                 # 启动的时候是否检测有服务的列表已经被注册
                 is_check_servicers=True,
                 # 服务队列列表
                 servicers: typing.Sequence[typing.Callable] = None,
                 # 当前服务定义中间件列表
                 middlewares: typing.Sequence[BaseMiddleware] = None,
                 # 当前服务定义中间件列表
                 expands: typing.Sequence[BaseExpand] = None,
                 # 当前服务自定义异常处理器
                 exception_handlers=None,
                 # 当前服务自定义的回调处理事件
                 on_startup: typing.Sequence[typing.Callable] = None,
                 on_shutdown: typing.Sequence[typing.Callable] = None,
                 # SERVER_PRIVATE_KEY
                 is_certificate=False,
                 serve_private_key=None,
                 server_certificate_chain=None,
                 register_center: typing.ClassVar[Register] = None,

                 is_tracer=False,
                 ):
        # app保持一个状态信息
        self.state: State = State()
        self.fastgrpc_server = None
        self._version = version
        # 是否debug
        self.debug = debug
        # 是否开启链路追踪中间件
        self.is_tracer = is_tracer
        # 配置当前工作线程数
        self.max_workers = max_workers
        # 配置进程延迟结束的间隔休眠时间
        self.grpc_grace = 1
        # 应用是否已经停止
        self._stopped = False
        # 服务配置信息读取
        self.config = self.get_app_settings()
        # 实例化日志对象
        self.logger = Logger().logger
        # 配置当前的异常错误的处理
        self.exception_handlers = ({} if exception_handlers is None else dict(exception_handlers))
        # 配置当前的所有服务中间件信息
        self.user_middlewares = [] if middlewares is None else list(middlewares)
        # 当前用户自定义的扩展插件集对象
        self.user_expands = expands if expands is not None else {}
        # 定义事件回调列表
        self.on_startup = [] if on_startup is None else list(on_startup)
        self.on_shutdown = [] if on_shutdown is None else list(on_shutdown)
        # 是否开启热重启机制
        self.re_load = self.get_reload_monitor() if re_load else False
        # 是否启动前检测服务列表信息
        self.is_check_servicers = is_check_servicers
        # 初始化服务列表
        self.user_servicers = servicers if servicers is not None else {}
        # 初始化端口的服务号
        self.port = self._get_open_port() if port is None else port
        # 初始化服务地址
        self._address_full = host + ':' + str(self.port)
        # 单纯是HOST地址
        self._address_host = host
        # 是否启用SSL的认证
        self.is_certificate = is_certificate
        self.server_private_key = serve_private_key
        self.server_certificate_chain = server_certificate_chain
        self.register_center = register_center

    @property
    def class_name(self):
        return self.__class__.__name__



    @property
    def address(self):
        return self._address_full

    @address.setter
    def address(self, value):
        self._address_full = value

    @property
    def version(self):
        return self._version

    @version.setter
    def version(self, value):
        self._version = value

    def get_certificate_chain(self):
        # 判断设置的安全秘钥信息
        assert self.server_private_key is not None or self.server_certificate_chain is not None
        # 开始读取相关信息
        if self.server_private_key and self.server_certificate_chain:
            with open(self.server_private_key, 'rb') as f:
                private_key = f.read()
            with open(self.server_certificate_chain, 'rb') as f2:
                certificate_chain = f2.read()
        return private_key, certificate_chain

    def set_certificate_chain(self, server_private_key, server_certificate_chain):
        # 判断设置的安全秘钥信息
        self.server_private_key = server_private_key
        self.server_certificate_chain = server_certificate_chain
        # 开始读取相关信息

    def _get_open_port(self):
        import socket
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind(("", 0))
        s.listen(1)
        port = s.getsockname()[1]
        s.close()
        return port

    # 可以写成装饰器的形式,也可以知己额调用 通过装饰器形式给我们的中间添加处理
    def add_middleware(self, middleware_type: type) -> None:
        # assert middleware_type in (BaseMiddleware)
        # 在第一个位置插入
        curr_middleware_type_name = middleware_type.__name__
        # 断言服务去重核验
        assert curr_middleware_type_name not in [name.__class__.__name__ for name in self.user_middlewares], 'middleware repeat!'

        self.user_middlewares.insert(0, middleware_type())

    # 添加事件信号处理器
    def add_expand_to_state(self, expand_type: typing.Type[BaseExpand], expand_name=None) -> None:
        # 根据类型来识别添加不懂的函数对象
        for b in expand_type.__bases__:
            assert b.__name__.endswith('BaseExpand'), '当前仅支持集成来自BaseExpand的扩展!'
        if expand_name:
            pass
            curr_expand_type_name = expand_name
        else:
            curr_expand_type_name = expand_type.__name__
        # 断言是否存在添加
        assert curr_expand_type_name not in [name for name in self.user_expands.keys()], 'expand add repeat!'
        # 初始化扩展实例对象
        self.user_expands[curr_expand_type_name] = expand_type()
        # 透传APP对象
        self.user_expands[curr_expand_type_name].init_app(self)
        # 设置第三的的扩展存贮到state
        self.state.setattr(curr_expand_type_name, self.user_expands[curr_expand_type_name])

    # 可以写成装饰器的形式,也可以知己额调用 通过装饰器形式给我们的中间添加处理
    def add_servicer(self, servicer: type) -> None:
        # assert middleware_type in (BaseMiddleware)
        # 在第一个位置插入
        # 服务去重,使用一样的服务的名称
        curr_servicer_name = servicer.__name__
        # 断言服务去重核验
        assert curr_servicer_name not in [name for name in self.user_servicers.keys()], 'servicer repeat!'
        # print("开始添加", self.user_servicers)
        # 活动获取对应的类似的:add_GreeterServicer_to_server的方法函数
        add_func = self._get_servicer_add_func(servicer)
        # add_func =add_GreeterServicer_to_server
        # 字典保持对应的服务名称和对应的调用的方法还要对应的服务对象
        # 保存一个元组到我们的字典里面
        self.user_servicers[curr_servicer_name] = (add_func, servicer)

    # 批量进行模块下的服务的导入
    def load_servicers_handlers_in_module(self, module):
        # 批量进行模块下的导入
        for _, _servicer in inspect.getmembers(module, inspect.isclass):
            # 获取当前服务模块的名称
            curr_servicer_name = _servicer.__name__
            # 对于如果是直接导入了具体模块下类的情况:example.handler.ser1,则直接进行处理即可,但这情况需要排除其他模块处理
            if _servicer.__name__.endswith('Servicer'):
                add_func = self._get_servicer_add_func(_servicer)
                self.user_servicers[curr_servicer_name] = (add_func, _servicer)
            # 如果是导入的整个外层的模块下的需要再执行一次变量
            elif curr_servicer_name not in [name for name in self.user_servicers.keys()]:
                add_func = self._get_servicer_add_func(_servicer)
                self.user_servicers[curr_servicer_name] = (add_func, _servicer)
        return self.user_servicers

    def _get_servicer_add_func(self, servicer):
        for b in servicer.__bases__:
            # b.__name__=GreeterServicer
            if b.__name__.endswith('Servicer'):
                # inspect模块也被称为 检查现场对象。这里的重点在于“现场”二字,也就是当前运行的状态。
                # inspect模块提供了一些函数来了解现场对象,包括 模块、类、实例、函数和方法。
                # inspect.getmodule(object):返回object所属的模块名
                m = inspect.getmodule(b)
                # 判断模块存在add_GreeterServicer_to_server 这个的方法
                return getattr(m, f'add_{b.__name__}_to_server')

    def get_reload_monitor(self):
        # 用于服务重启监听---目前这个还不行!!有待继续改进!
        import time
        from watchdog.observers import Observer
        from watchdog.events import FileSystemEventHandler
        from grpcframe.utils.asynctask import task
        import pathlib
        class ActionHandler(FileSystemEventHandler):
            def on_any_event(self, event):
                print('文件有变化:', event.event_type, event.src_path)
                # 发送自定义信号用户接收重启事件处理
                signal_send('reload', source='ThirdInternalErrorException', traceback='服务重启!!暂时没实现!')
                # 直接的结束了整个进程
                import os
                python = sys.executable
                os.execl(python, python, *sys.argv)

        print('当前监控的文件目录为:', pathlib.Path.cwd())

        @task(daemon=True)
        def ReloadHandler():
            event_handler = ActionHandler()
            observer = Observer()
            observer.schedule(event_handler, path='.', recursive=True)
            observer.start()
            try:
                while True:
                    time.sleep(1)
            except KeyboardInterrupt:
                observer.stop()
            observer.join()

        return ReloadHandler()

    @lru_cache()
    def get_app_settings(self):
        return AppSettings()

    def _restart_serve(self, signum, frame):
        self.shutdown()
        self.fastgrpc_server.stop(self.grpc_grace)
        time.sleep(self.grpc_grace or 1)
        # self._stopped = True
        self.start_serve()
        # raise KeyboardInterrupt

    def _stop_serve(self, signum, frame):
        self.shutdown()
        self.fastgrpc_server.stop(self.grpc_grace)
        time.sleep(self.grpc_grace or 1)
        self._stopped = True
        sys.exit(0)
        # raise KeyboardInterrupt

    # ===============事件处理模块机制===============
    def startup(self) -> None:
        for handler in self.on_startup:
            handler()

    def shutdown(self) -> None:
        for handler in self.on_shutdown:
            handler()

    # 通过装饰器添加事件处理
    def on_event(self, event_type: str) -> typing.Callable:
        def decorator(func: typing.Callable) -> typing.Callable:
            self.add_event_handler(event_type, func)
            return func

        return decorator

    # 添加事件信号处理器
    def add_event_handler(self, event_type: str, func: typing.Callable) -> None:
        # 根据类型来识别添加不懂的函数对象
        assert event_type in ("startup", "shutdown")
        if event_type == "startup":
            self.on_startup.append(func)
        else:
            self.on_shutdown.append(func)

    def _register_signal(self):
        # windo只支持这个平台
        if platform.system().lower() == 'windows':
            signal.signal(signal.SIGINT, self._stop_serve)
        elif platform.system().lower() == 'linux':
            signal.signal(signal.SIGINT, self._stop_serve)
            signal.signal(signal.SIGHUP, self._stop_serve)
            signal.signal(signal.SIGTERM, self._stop_serve)
            signal.signal(signal.SIGQUIT, self._stop_serve)

    def _make_server(self):
        compression = grpc.Compression.Gzip
        # 拦截器
        self.fastgrpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=self.max_workers), compression=compression, interceptors=self.user_middlewares)
        # 设置上下文的对象管理器信息
        g.current_app_server.set(self.fastgrpc_server)
        # 判断是否启用了安全信息
        # print('服务启动:', self.address)
        self.logger.info(f"服务启动:>>{self.address}")
        if self.is_certificate:
            private_key, certificate_chain = self.get_certificate_chain()
            server_credentials = grpc.ssl_server_credentials(((private_key, certificate_chain,),))
            self.fastgrpc_server.add_insecure_port(self.address, server_credentials)
        else:
            self.fastgrpc_server.add_insecure_port(self.address)

    def _check_servicers(self):
        if self.is_check_servicers:
            # >>> assert True     # 条件为 true 正常执行
            # >>> assert False    # 条件为 false 触发异常
            # assert self.user_servicers != {}, "请先添加对应服务对象!"
            assert len(self.user_servicers) > 0

    def _print_servicers(self):
      
            self.logger.info(">>>>>>>>>>>>欢迎使用FastGrpcApp>>>>>>>>>>>>")
            for servicer_name in self.user_servicers.keys():
                self.logger.info(f"注册服务类名:>>{servicer_name}")
            self.logger.info(f">>>>>>" * 5)
            for middleware_name in self.user_middlewares:
                self.logger.info(f"注册中间名:>>{middleware_name.__class__.__name__}")
            for expand_name in self.user_expands.keys():
                self.logger.info(f"注册扩展:>>{expand_name}")

    def _register_serve(self):
        # 循环的方式注册我们的服务-注册我们的当前RPC的服务
        for name, (add_func, servicer) in self.user_servicers.items():
            add_func(servicer(), self.fastgrpc_server)

    def _register_default_middlewares(self):
        # 默认注册一些中间件信息,如上下文处理中间件
        from grpcframe.middlewares.cxtg import CxtgMiddleware
        self.add_middleware(CxtgMiddleware)
        if self.is_tracer:
            from grpcframe.middlewares.tracer import Zyxopentracing
            self.add_middleware(Zyxopentracing)

    def _register_default_handler(self):
        # 默认添加相关一些基础的服务处理,如健康检查服务
        from grpcframe.handler.grpcio_health_checking.v1.health import HealthServicer
        self.add_servicer(HealthServicer)
        pass

    def _register_center_handler(self):
        # 默认添加相关一些基础的服务处理,如健康检查服务
        if self.register_center:
            # 调用注册中心的基类中响应的方法进行注册处理
            self.register_center.register(address=self._address_host,port=self.port)

    def _unregister_center_handler(self):
        # 默认添加相关一些基础的服务处理,如健康检查服务
        if self.register_center:
            # 调用注册中心的基类中响应的方法进行注册处理
            self.register_center.deregister()



    def stop_serve_callback(self):
        # 注销注册中心
        self._unregister_center_handler()

    def start_serve(self):
        # 先判断我们的服务列表是否为空为空则不启动!
        # ExitStack 实现类似GO 延迟执行的defer的延迟调用
        with contextlib.ExitStack() as stack:
            stack.callback(self.stop_serve_callback)
            self._check_servicers()
            # 发生启动信号事件处理
            self._register_signal()
            # 注册注册中间件默认名的--需要再_make_server之前调用
            self._register_default_middlewares()
            # 注册注册健康检测--需要再_make_server之前调用
            self._register_default_handler()
            # 生成服务对象
            self._print_servicers()
            self._make_server()
            # 执行服务列表注册
            self._register_serve()
            # 执行注册的服务中心
            self._register_center_handler()
            # 应用启动
            self.fastgrpc_server.start()
            # 调用应用启动回调处理
            self.startup()
            # 循环监听处理
            self.fastgrpc_server.wait_for_termination()复制代码

基础中间件:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     middlewares
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/11/29
-------------------------------------------------
   修改描述-2021/11/29:         
-------------------------------------------------
"""
import typing
from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException
from grpc_interceptor.exceptions import NotFound
from typing import Any, Callable, NamedTuple, Tuple
import abc
import grpc



# 需要先继承ServerInterceptor再继承ABCMeta
class BaseMiddleware(ServerInterceptor,metaclass=abc.ABCMeta):

    @abc.abstractmethod
    def before_handler(self, request: Any, context: grpc.ServicerContext, method_name):
        raise NotImplementedError()

    @abc.abstractmethod
    def after_handler(self, context: grpc.ServicerContext, response):
        raise NotImplementedError()

    def intercept(self, method: Callable, request: Any, context: grpc.ServicerContext, method_name: str, ) -> Any:
        try:
            # 调用必要的前置的函数处理
            self.before_handler(request, context, method_name)
            response = method(request, context)
            # 调用必要的后置的函数处理
            self.after_handler(context, response)
            return response
        except GrpcException as e:
            context.set_code(e.status_code)
            context.set_details(e.details)
            raise复制代码

服务功能扩展:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     serveis
   文件功能描述 :   功能描述
   创建人 :       小钟同学
-------------------------------------------------
   修改描述- 用于多继承,实现相关Serveis服务辅助功能的扩展

-------------------------------------------------
"""
from typing import Any, Dict, Tuple
# 引入到这个的会报错# NamedTuple
from google.protobuf import json_format
from pydantic import BaseModel, ValidationError
from typing import TypeVar
from grpcframe.exception import BadRequestException
from grpcframe.validation import raw_errors_info
from typing import Any, List, Optional
from urllib.parse import urlparse
from grpcframe.utils.cache_property import CachedProperty


class BaseServeis:
    pass
    '''
    基类的主要作用:用于多继承,实现相关Serveis服务辅助功能的扩展
    1:设置头信息
    2:解析请求参数信息
    3:对参数信息的校验
    4:对请求头信息的解析转为字典
    '''
    def __init__(self):
        pass
        # print("每次的只实例化一次")
        self._metadata = None
        self._request = None

    @property
    def name(self):
        return self.__class__.__name__

    def rpc_event(self, context):
        """ 返回RPC事件
        """
        return getattr(context, "_rpc_event")

    def call_details(self, context):
        """
        从rpc_event获取我们的---调用详情信息
        从调用详情里面可以获取信息比较多:如方法,还有方法名
        """
        if self.rpc_event(context) is not None:
            return getattr(self.rpc_event(context), "call_details")
        return None

    def method(self, context):
        """  从rpc_event获取我们的---调用详情信息--从调用详情信息里面:
             获取本次请求调用的方法 : /Greeter/SayHelloAgain
        """
        if self.call_details(context) is not None:
            method = getattr(self.call_details(context), "method")
            return method.decode("utf8") if method else method
        return None

    def service(self, context):
        """  从rpc_event获取我们的---调用详情信息--从调用详情信息里面:
             获取本次请求调用的方法 ---从调用方法里面截取到--本次请求的Service
             这里和我们的
          @property
          def name(self):
             return self.__class__.__name__ 是一样的就是获取服务的类名
        """
        if self.method(context) is not None:
            return self.method(context).split("/")[-2]
        return None

    def set_trailing_metadata(self, context):
        '''
        通过设置字典的形式设置我们的响应头信息,只能调用一次的设置,不然会引发服务的异常
        :param context:
        :return:
        context.set_trailing_metadata((('name', '2232232'), ('sex', '23232')))
        '''
        pass

    def parse_metadata(self, context) -> Dict:
        '''
        获取请求头信息
        :param context:
        :return:
        '''
        metadata = {}
        for key, value in context.invocation_metadata():
            metadata[key.strip()] = value.strip()
        # 返回信息
        metadata.update({'peer': context.peer()})
        return metadata

    def parse_request(self, request) -> Dict:
        '''
        获取当前RPC的请求参数信息,进行字典的序列化
        :param request:
        :return:
        '''
        request_dict = json_format.MessageToDict(request, preserving_proto_field_name=True)
        return request_dict

    def validation_request(self, request, model: type):
        request_model = None
        try:
            # 模型实例化
            request_model = model(**self.parse_request(request))
            pass
        except ValidationError as exc:
            raise BadRequestException(message=raw_errors_info(exc))
        finally:
            return request_model

    def parse_request_metadata(self, request, context) -> Tuple:
        '''
        解析请求参数和请求头信息并返回
        :param request:
        :param context:
        :return:
        '''
        return self.parse_request(request=request), self.parse_metadata(context=context)复制代码

使用过程示例

第1步:根据hello.proto生成服务信息

image.png

第2步:重写服务进行业务逻辑处理

image.png

完整示例服务示例代码:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     kasdska
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/12/1
-------------------------------------------------
   修改描述-2021/12/1:         
-------------------------------------------------
"""

import grpc
from example.servicers import hello_pb2_grpc
from example.servicers import hello_pb2
# from typing import Any, Dict
# 引入到这个的会报错# NamedTuple
# from google.protobuf import json_format, symbol_database
from grpcframe.core.serveis import BaseServeis
from pydantic import BaseModel, ValidationError
from grpcframe.ext.redis.client import sync_redis_client
from grpcframe.core.app import g


class RequestParse(BaseModel):
    name: str

class Greeter(hello_pb2_grpc.GreeterServicer, BaseServeis):

    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 返回是我们的定义的响应体的对象
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

    def SayHelloAgain(self, request, context):
        # 解析请求参信息
        # 校验请求的参数信息
        sadas = self.validation_request(request, RequestParse)

        print("设置的哈是",id(g.current_app_request.get()))
        print("startup=========3》", sync_redis_client.get('nimei'))

        context.set_compression(grpc.Compression.Gzip)
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))复制代码

第3步:实例化grpc的app对象

from grpcframe.core.app import FastGrpcApp
app = FastGrpcApp(port=45564,max_workers=20,is_certificate=False,debug=True)
if __name__ == '__main__':
    # 单独的进行加载
    app.start_serve()复制代码

app默认已经启动检测检测服务注册,还有上下文管理器的中间件,一般需要这两个,健康服务可以忽略,看情况进行使用。

第4步:注册的服务到app

方式1:

from grpcframe.core.app import FastGrpcApp
from example.server.handler import Greeter

app = FastGrpcApp(port=45564, max_workers=20, is_certificate=False, debug=True)

# 使用方法调用的方式
app.add_servicer(Greeter)

if __name__ == '__main__':
    # 单独的进行加载
    app.start_serve()复制代码

方式2:使用装饰器形式实现服务加载

from grpcframe.core.app import FastGrpcApp
from example.servicers import hello_pb2_grpc
from example.servicers import hello_pb2
from grpcframe.core.serveis import BaseServeis

app = FastGrpcApp(port=45564, max_workers=20, is_certificate=False, debug=True)

@app.add_servicer
class Greeter(hello_pb2_grpc.GreeterServicer, BaseServeis):

    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 返回是我们的定义的响应体的对象
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

    def SayHelloAgain(self, request, context):
        # 解析请求参信息
        # 校验请求的参数信息
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

if __name__ == '__main__':
    # 单独的进行加载
    app.start_serve()复制代码

方式3:批量加载模块下所有服务

image.png

from grpcframe.core.app import FastGrpcApp
from example.servicers import hello_pb2_grpc
from example.servicers import hello_pb2
from grpcframe.core.serveis import BaseServeis

app = FastGrpcApp(port=45564, max_workers=20, is_certificate=False, debug=True)

from grpcframe.utils.import_string import import_string
app.load_servicers_handlers_in_module(import_string('example.server.handler'))

if __name__ == '__main__':
    # 单独的进行加载
    app.start_serve()复制代码

注册服务的方式目前有多种,可以根据结合自己的习惯或需求来进行注册

第5步:注册相关事件(可选)

from grpcframe.core.app import FastGrpcApp
from example.servicers import hello_pb2_grpc
from example.servicers import hello_pb2
from grpcframe.core.serveis import BaseServeis

app = FastGrpcApp(port=45564, max_workers=20, is_certificate=False, debug=True)

@app.add_servicer
class Greeter(hello_pb2_grpc.GreeterServicer, BaseServeis):

   # 实现 proto 文件中定义的 rpc 调用
   def SayHello(self, request, context):
       # 返回是我们的定义的响应体的对象
       return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

   def SayHelloAgain(self, request, context):
       # 解析请求参信息
       # 校验请求的参数信息
       return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

@app.on_event('startup')
def startup():
   print("startup=========1》")


@app.on_event('shutdown')
def shutdown():
   print("shutdown=========》")

# 添加事件处理
def ceshiguanbi():
   print("关闭了!")
app.add_event_handler(event_type='shutdown', func=ceshiguanbi)


if __name__ == '__main__':
   # 单独的进行加载
   app.start_serve()复制代码

第6步:注册自定义其他中间件

方式1:通过装饰形式添加

from grpcframe.core.middlewares import BaseMiddleware,Any
import grpc

@app.add_middleware
class ServiceLogMiddleware2222(BaseMiddleware):

    def before_handler(self, request: Any, context: grpc.ServicerContext, method_name):
        pass
        print("我是自定义的中间件信息---》1")

    def after_handler(self, context: grpc.ServicerContext, response):
        pass
        print("我是自定义的中间件信息---》2")



if __name__ == '__main__':
    # 单独的进行加载
    app.start_serve()复制代码

方式2:通过调用方法的形式添加:

from grpcframe.core.middlewares import BaseMiddleware,Any
import grpc
class ServiceLogMiddleware2222(BaseMiddleware):

    def before_handler(self, request: Any, context: grpc.ServicerContext, method_name):
        pass
        print("我是自定义的中间件信息---》1")

    def after_handler(self, context: grpc.ServicerContext, response):
        pass
        print("我是自定义的中间件信息---》2")

app.add_middleware(ServiceLogMiddleware2222)

if __name__ == '__main__':
    # 单独的进行加载
    app.start_serve()复制代码

第7步: 注册到consul注册中心(可选)

register = ConsulRegister(host='127.0.0.1',port=8500, server_name='FastGrpcApp',server_id="23423424234")
app = FastGrpcApp(port=45564,max_workers=20,is_certificate=False,register_center=register,debug=True)复制代码

第8步: 对请求参数反序列验证(可选)

import grpc
from example.servicers import hello_pb2_grpc
from example.servicers import hello_pb2
# from typing import Any, Dict
# 引入到这个的会报错# NamedTuple
# from google.protobuf import json_format, symbol_database
from grpcframe.core.serveis import BaseServeis
from pydantic import BaseModel, ValidationError
from grpcframe.ext.redis.client import sync_redis_client
from grpcframe.core.app import g


class RequestParse(BaseModel):
    name: str

class Greeter(hello_pb2_grpc.GreeterServicer, BaseServeis):

    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 返回是我们的定义的响应体的对象
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

    def SayHelloAgain(self, request, context):
        # 解析请求参信息
        # 校验请求的参数信息
        sadas = self.validation_request(request, RequestParse)

        print("设置的哈是",id(g.current_app_request.get()))
        print("startup=========3》", sync_redis_client.get('nimei'))

        context.set_compression(grpc.Compression.Gzip)
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))复制代码

错误的统一处理:

def validation_request(self, request, model: type):
    request_model = None
    try:
        # 模型实例化
        request_model = model(**self.parse_request(request))
        pass
    except ValidationError as exc:
        raise BadRequestException(message=raw_errors_info(exc))
    finally:
        return request_model复制代码

上面主要是基于  sadas = self.validation_request(request, RequestParse)来进行对请求参数的校验,对错误的处理,则统一进行   raise BadRequestException(message=raw_errors_info(exc))抛出!

完整启动示例

启动服务:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     app
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/12/6
-------------------------------------------------
   修改描述-2021/12/6:         
-------------------------------------------------
"""

from grpcframe.core.app import FastGrpcApp
from example.server.handler import Greeter
from grpcframe.ext.redis.client import sync_redis_client
from grpcframe.ext.register.consul import ConsulRegister
from grpcframe.core.middlewares import BaseMiddleware,Any
import grpc
# 注册中心
register = ConsulRegister(host='127.0.0.1',port=8500, server_name='FastGrpcApp',server_id="23423424234")
app = FastGrpcApp(port=45564,max_workers=20,is_certificate=False,register_center=register,debug=True)

@app.on_event('startup')
def startup():
    print("startup=========1》")
    sync_redis_client.init_app(app)

@app.on_event('shutdown')
def shutdown():
    print("shutdown=========》")


@app.add_middleware
class ServiceLogMiddleware2222(BaseMiddleware):

    def before_handler(self, request: Any, context: grpc.ServicerContext, method_name):
        pass
        print("我是自定义的中间件信息---》1")

    def after_handler(self, context: grpc.ServicerContext, response):
        pass
        print("我是自定义的中间件信息---》2")


@app.on_event('startup')
def startup():
    print("startup=========3》")

# 添加事件处理
def ceshiguanbi():
    print("关闭了!")
app.add_event_handler(event_type='shutdown', func=ceshiguanbi)

# 添加服务处理
app.add_servicer(Greeter)

if __name__ == '__main__':
    # 单独的进行加载
    app.start_serve()复制代码

启动日志信息输出:

2021-12-08 17:48:46.000  INFO 33144 --- [     MainThread] app                  : >>>>>>>>>>>>欢迎使用FastGrpcApp>>>>>>>>>>>>
2021-12-08 17:48:46.000  INFO 33144 --- [     MainThread] app                  : 注册服务类名:>>Greeter
2021-12-08 17:48:46.000  INFO 33144 --- [     MainThread] app                  : 注册服务类名:>>HealthServicer
2021-12-08 17:48:46.000  INFO 33144 --- [     MainThread] app                  : >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
2021-12-08 17:48:46.000  INFO 33144 --- [     MainThread] app                  : 注册中间名:>>CxtgMiddleware
2021-12-08 17:48:46.000  INFO 33144 --- [     MainThread] app                  : 注册中间名:>>ServiceLogMiddleware2222
2021-12-08 17:48:46.000  INFO 33144 --- [     MainThread] app                  : 服务启动:>>127.0.0.1:45564
GRPC开始注册服务FastGrpcApp
GRPC注册服务FastGrpcApp成功
startup=========1》
startup=========3》
启动redis注册2复制代码

查看服务注册中心:

image.png

使用客户端进行请求验证:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-


import grpc
from example.servicers import hello_pb2
from example.servicers import hello_pb2_grpc

 # 连接consul服务,作为dns服务器



from dns import resolver
from dns.exception import DNSException

# 创建一个consul dns查询的 resolver
consul_resolver = resolver.Resolver()
consul_resolver.port = 8600
consul_resolver.nameservers = ['127.0.0.1']


def get_ip_port(server_name):
    '''查询出可用的一个ip,和端口'''
    try:
        dnsanswer = consul_resolver.resolve(f'{server_name}.service.consul', "A")
        dnsanswer_srv = consul_resolver.resolve(f"{server_name}.service.consul", "SRV")
    except DNSException:
        return None, None
    return dnsanswer[0].address, dnsanswer_srv[0].port

from jaeger_client import Config
from grpc_opentracing import open_tracing_client_interceptor
from grpc_opentracing.grpcext import intercept_channel




def run():

    compression = grpc.Compression.Gzip
    # 添加客户端的请求拦截器
    # 单纯的子节点上传了,父节点没有上传
    with grpc.insecure_channel(target=f'127.0.0.1:45564',compression=compression) as channel:
        # 通过通道服务一个服务intercept_channel
        # interceptor_channel = grpc.intercept_channel(channel,*interceptors)
        stub = hello_pb2_grpc.GreeterStub(channel)
        # 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
        try:

            # reest_header = (
            #     ('mesasge', '1010'),
            #     ('error', 'No Error')
            # )
            response, callbask = stub.SayHelloAgain.with_call(request=hello_pb2.HelloRequest(name='欢迎下次光临'),
                                                              # 设置请求的超时处理
                                                              timeout=5,
                                                              # 设置请求的头的信息
                                                              # metadata=reest_header,
                                                              )
            print("SayHelloAgain函数调用结果的返回: " + response.message)
            print("SayHelloAgain函数调用结果的返回---响应报文头信息: ", callbask.trailing_metadata())
        except grpc._channel._InactiveRpcError as e:
            print(e.code())
            print(e.details())
        finally:
            # 关闭追踪
            pass
            # time.sleep(2)
            # time.sleep(3)


if __name__ == '__main__':
    import time
    for _ in range(3):
        # time.sleep(1)
        run()复制代码

查看请求输出日志信息:

客户端:

SayHelloAgain函数调用结果的返回: hello 欢迎下次光临
SayHelloAgain函数调用结果的返回---响应报文头信息:  ()
SayHelloAgain函数调用结果的返回: hello 欢迎下次光临
SayHelloAgain函数调用结果的返回---响应报文头信息:  ()
SayHelloAgain函数调用结果的返回: hello 欢迎下次光临
SayHelloAgain函数调用结果的返回---响应报文头信息:  ()复制代码

服务端:

我是自定义的中间件信息---》1
设置的哈是 1135112800416
startup=========3》 hsahashdh
我是自定义的中间件信息---》2
我是自定义的中间件信息---》1
设置的哈是 1135112801136
startup=========3》 hsahashdh
我是自定义的中间件信息---》2
我是自定义的中间件信息---》1
设置的哈是 1135112801536
startup=========3》 hsahashdh
我是自定义的中间件信息---》2复制代码

上面的第三方扩展插件也一样可以使用!

总结

以上仅仅是个人结合自己的实际需求,做学习的实践笔记!如有笔误!欢迎批评指正!感谢各位大佬!

结尾


作者:小钟同学
链接:https://juejin.cn/post/7039268008790851591


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐