阅读 193

uvicorn源码分析(uvicorn为什么和gunicorn一起部署)

前记

Uvicorn是一个基于uvloophttptools的ASGI服务器, 性能比较强劲, 通过它可以与使用ASGI规范的Python应用程序进行交互。ASGI与WSGI很像, 只不过ASGI原生支持HTTP2.0和WebSocket, 同时更多的是支持PythonAsyncio生态的WEB应用程序。通过了解Uvicron,能知道一个稳定的Web服务器的工作方式以及能更好的去了解其他基于ASGI的WEB应用程序。

最新修订见原文, 关注公众号<博海拾贝diary>可以及时收到新推文通知

Uvicron通过一个通用的协定接口与ASGI应用程序交互, 应用程序只要实现如下代码, 即可通过Uvicorn发送和接收信息:

async def app(scope, receive, send):     # 一个最简单的ASGI应用程序     assert scope['type'] == 'http'     await send({         'type': 'http.response.start',         'status': 200,         'headers': [             [b'content-type', b'text/plain'],         ]     })     await send({         'type': 'http.response.body',         'body': b'Hello, world!',     }) if __name__ == "__main__":     # uvicorn服务     import uvicorn     uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info") 复制代码

其中应用程序的scope代表有关传入连接信息的字典, receive是一个从服务器接收传入信息的通道, send是一个将消息发送到服务器的通道, 不过这不是本文的重点, 更多scope信息可以访问ASGI interface了解, 接下来将从例子的uvicorn.run开始, 通过源码分析uvicorn工作原理。

1.uvicorn主流程源码分析

分析源码之前, 首先是了解它的源码结构, uvicorn的源码结构如下:

├── lifespan ├── loops ├── protocols ├── middleware ├── supervisors ├── config.py ├── importer.py ├── __init__.py ├── logging.py ├── __main__.py ├── main.py ├── server.py ├── subprocess.py ├── _types.py └── workers.py 复制代码

uvicron做了很好的分类, 每个文件夹/文件都有自己的功能:

  • lifespan 告诉基于ASGI的应用程序uvicorn即将启动和停止的消息, uvicorn在启动的时候会初始化,然后发送初始化协议并等待ASGI应用程序返回, 如果ASGI应用程序返回compleleuvicorn会继续运行, 返回failed则报错退出。

  • loops 自动加载事件循环, 优先加载uvloop, 这将会获得极大的性能提升

  • protocols 里面存放着读取连接数据和解析消息体的协议, 如HTTP和WebSockets, 可以把他认为是一个序列化器。

  • middleware 存放着一些简单通用的ASGI中间件

  • supervisors uvicorn本身是以一个进程启动的, 这个文件夹存放着uvicorn的几种启动方式, 如多进程启动,监控文件变动自动重启的方式等。

  • config.py uvicorn的配置文件, 它不仅读取用户的配置, 还自动加载上面所述的lifespan, loops, protocols等等

  • importer.py uvicron中很多地方使用了动态加载配置和动态加载库, 这里是把这个方法进行统一封装

  • logging.py 提供了根据日志等级渲染不同颜色的日志以及访问日志(但是很少人用)

  • main.py uvicorn的入口文件, 包括代码运行和命令行运行两种方式

  • server.py uvicorn的核心服务, 用于处理进出流量以及处理自身的服务状态

  • subprocess.py 给supervisors/multiprocess.py使用的, 可能是为了以后拓展需要, 才放在一级目录

  • workers.py 其他工作模式的Uvicorn, 比如里面有个UvicornWorker, 就是用于gunicorn启动uvicorn

结构了解完, 接下来开始正式步入源码之旅, 这里直接忽略掉命令行的启动方式, 从uvicorn.run开始, 实际上命令行启动方式也是通过获取参数, 然后传入uvicorn方法中, 这个uvicorn.run方法会接受符合ASGI的app和kwargs参数, 然后生成对应的配置实例config, 再生成server, 接着依靠配置判断是否启动模式, 具体代码如下:

def run(app, **kwargs):     # 加载配置     config = Config(app, **kwargs)     # 加载server     server = Server(config=config)     if (config.reload or config.workers > 1) and not isinstance(app, str):         # 只有命令行模式才可以使用reload         logger = logging.getLogger("uvicorn.error")         logger.warning(             "You must pass the application as an import string to enable 'reload' or "             "'workers'."         )         sys.exit(1)     if config.should_reload:         # 启动reload逻辑         sock = config.bind_socket()         supervisor = ChangeReload(config, target=server.run, sockets=[sock])         supervisor.run()     elif config.workers > 1:         # 多进程方式启动         sock = config.bind_socket()         supervisor = Multiprocess(config, target=server.run, sockets=[sock])         supervisor.run()     else:         # 最普通的方法启动         server.run() 复制代码

config很简单, 它负责装填配置, 然后调用configure_logging配置全局的logger, 此外还有一个load的方法, 将会在Server中调用, 接下来, 先忽略其中涉及到的模板, 到server.run之中, 看看普通模式下, 服务是怎么启动的。

这个run方法很简单, 就是设置事件循环, 然后通过事件循环调用serve来启动服务:

def run(self, sockets=None):     self.config.setup_event_loop()     loop = asyncio.get_event_loop()     loop.run_until_complete(self.serve(sockets=sockets)) 复制代码

serve是启动服务的最核心代码, 首先会执行config.load方法加载一些动态的配置, 如解析http的库, 解析websocket的库, 还有通过用户传过来的app来加载app, 并判断是使用WSGIASGI2或者是ASGI3, 并进行配置(uvicorn在这里是通过ASGI中间件的方式来支持), 最后根据配置启动对应的中间件。 接着会跳转到server.startup方法, 该方法首先会通过lifaspan.startup与用户传过来的app通信, 校验是否是合法的应用程序, 然后初始化变量, 先是初始化一个信号处理函数, 当收到信号时, 会把变量should_exit设置会True。 然后会初始化一个名为create_protocol的变量, 它是继承于asyncio.Protocol, asyncio.Protocol主要用于从socket获取数据和写入数据, 同时也有一些TCP相关的调用, create_protocol的主要作用就是作为socket和应用程序的中间层, 负责把HTTP数据与ASGI数据互转, 如下图: 协议转换 接着根据用户传过来的变量方式来启动服务, 这些都是PythonAsyncio封装好的, 具体为以下几种:

  • 当用户传socket过来的时候: 基于该scoket和create_protocol创建服务, 如果是多进程且是Windows系统, 则要显示的共享socket。

  • 当用户传文件描述符的时候: 基于该文件描述符获取scoket, 并通过该socket和create_protocol创建服务。

  • 当用户传unix domain socket的时候: 基于unix domain socket和create_protocol创建服务。

  • 当用户传host和port参数的时候: 基于host和port和create_protocol创建服务。

创建完服务后, socket的处理就转给了应用程序了, 但是采用了事件循环的思路, 需要uvicorn使用while使程序一直跑, 防止主程序退出:

async def main_loop(self):     counter = 0     should_exit = await self.on_tick(counter)     while not should_exit:         counter += 1         counter = counter % 864000         await asyncio.sleep(0.1)         should_exit = await self.on_tick(counter) 复制代码

每次循环执行的时候都会调用on_tick方法, 该方法主要是进行服务统计以及判断啥时候可以退出服务, 比如请求总数超过配置的限制数, 或者收到信号,把变量should_exit设置为True等等, 如果在循环中判断程序需要进行退出, 就会进入退出逻辑shutdown, 该逻辑比较简单, 注释和代码如下:

async def shutdown(self, sockets=None):     logger.info("Shutting down")     # 关闭socket, 不让有新的连接建立     for server in self.servers:         server.close()     for sock in sockets or []:         sock.close()     for server in self.servers:         await server.wait_closed()     # 关闭已经创建的连接, 并等待他们处理完毕      for connection in list(self.server_state.connections):         connection.shutdown()     await asyncio.sleep(0.1)     # 等待连接关闭或者用户强制关闭     if self.server_state.connections and not self.force_exit:         msg = "Waiting for connections to close. (CTRL+C to force quit)"         logger.info(msg)         while self.server_state.connections and not self.force_exit:             await asyncio.sleep(0.1)     # 等待后台任务完成或者用户强制关闭     if self.server_state.tasks and not self.force_exit:         msg = "Waiting for background tasks to complete. (CTRL+C to force quit)"         logger.info(msg)         while self.server_state.tasks and not self.force_exit:             await asyncio.sleep(0.1)     # 通过lifespan告诉ASGI应用程序即将关闭     if not self.force_exit:         await self.lifespan.shutdown() 复制代码

至此整个主流程分析完毕, 下图是我整理后的一个流程图: 从图中可以很清晰的看清uvicorn与ASGI应用程序的关系, 接下来是上面部分没有详细讲过的小组件源码分析。

2.uvicorn.protocols源码分析

在了解了uvicorn的主流程后只大概的知道uvicorn是通过uvicorn.protocols与应用程序进行通信, 但是不明白他们具体是如何通信的, 接下来就开始了解uvicorn中最核心的uvicorn.protocols

从上面的分析中我们可以知道, 作为一个Web服务器, uvicorn是通过一个socket来接收发发送请求的。 而对于socket来说, 它只关心怎么创建连接,关闭连接以及如何传输内容, 它不会关心传输的字节流的上层协议是如何实现的。 好在Asyncio提供了几种简单的网络传输模型, 它们都是对于这些传输数据的抽象, 这些抽象会在如loop.create_serverloop.create_unix_server方法中使用, 通过这个抽象我们能很方便的使用TCP和UDP连接。

uvicorn在基于Asyncio创建服务器时, 把protocol抽象通过protocols参数传入loop.create_serverloop.create_unix_server后, 维护他们返回的server对象, 剩下的与ASGI应用程序的数据交互全由protocol对象处理。 uvicorn封装的对象继承于asyncio.Protocols, 它是针对TCP协议的封装, 它总共有6个方法,包括启动时的connection_made, 接收数据时的data_received, 断开时的eof_received以及丢失连接时的connection_lost, 然后还有当TCP连接出现堵塞时的暂停pause_writing和恢复resume_writing, 具体的方法传输的参数和使用方法如下:

class Protocol(BaseProtocol):     def connection_made(self, transport):         """         在建立连接时调用.         参数是表示管道连接的transport,          此时得到的transport需要设置为该类的transport, 方便后续connection_lost控制关闭管道.         """     def data_received(self, data):         """         通过该方法可以获取到客户端传输过来的数据         """     def eof_received(self):         """         当另一端调用write_eof()或等效函数时调用.         如果返回一个假值(包括None),则传输将关闭自身。         如果它返回true值,则关闭传输取决于协议.         """     def connection_lost(self, exc):         """         当连接丢失或关闭时调用, 根据exc判断是否要关闭trnasport.         参数是一个异常对象或None(后者表示接收到常规EOF或中止或关闭连接).         """     def pause_writing(self):         """         当transport缓冲区超过高水位(high-water mark)时调用,          此时应该能控制外部不再写入数据(通常是一个asyncio.Future),         同时应该通过transport.pause_reading来停止获取数据, 之后TCP就会通过拥塞机制使得客户端减缓发送数据的速度。         """     def resume_writing(self):         """         当transport缓冲区排放低于低水位线(low-water mark)时调用.          此时要释放标志, 使得外部可以继续写入数据,          同时通过transport.resume_reading来恢复获取数据,          之后TCP就会通过拥塞机制知道服务端的处理能力上来了, 使客户端加快发送速度。         """ 复制代码

了解完asycnio.Protocol后, 可以正式了解uvicornasyncio.Protocol做了哪些修改来达到跟应用程序进行通信, 由于各个Protocol的封装差不多, 这里以httptools_impl.HttpToolsProtocol为例子进行说明。

首先是类的初始化, HttpToolsProtocol 在初始化时会加载config并配置到日志,对应的websocket协议处理,对应的http解析器以及serve创建的统计容器等, 其中需要注意的是, 在初始化时, 传入的变量是HttpToolsProtocol的实例化本身。 接着是Protocol的几大主要协议接口函数, 这里以源码和注释进行分析:

class HttpToolsProtocol(asyncio.Protocol):     def connection_made(self, transport):         # 添加实例本身到集合, 代表当前还有连接在处理         self.connections.add(self)         self.transport = transport         # 初始化流控制         self.flow = FlowControl(transport)         # 简单的初始化实例trsnaport的相关编列         self.server = get_local_addr(transport)         self.client = get_remote_addr(transport)         self.scheme = "https" if is_ssl(transport) else "http"         if self.logger.level <= TRACE_LOG_LEVEL:             prefix = "%s:%d - " % tuple(self.client) if self.client else ""             self.logger.log(TRACE_LOG_LEVEL, "%sConnection made", prefix)     def connection_lost(self, exc):         # 从集合删除实例本身, 代表当前连接已经处理玩了, 不需要进入统计容器         self.connections.discard(self)         if self.logger.level <= TRACE_LOG_LEVEL:             prefix = "%s:%d - " % tuple(self.client) if self.client else ""             self.logger.log(TRACE_LOG_LEVEL, "%sConnection lost", prefix)         # 设置cycle, 告诉他连接已经断开         if self.cycle and not self.cycle.response_complete:             self.cycle.disconnected = True         if self.cycle is not None:             self.cycle.message_event.set()         if self.flow is not None:             self.flow.resume_writing()     def _unset_keepalive_if_required(self):         """取消keep alive timeout的任务,         一般来说, 在发送数据后服务端会等待客户端发送数据, 如果超过多少秒没有发送数据则可以判断该客户端已经断开了, 服务端可以主动关闭连接         而uvicorn通过timeout_keep_alive_task来实现         """         if self.timeout_keep_alive_task is not None:             self.timeout_keep_alive_task.cancel()             self.timeout_keep_alive_task = None     def data_received(self, data):         self._unset_keepalive_if_required()         try:             # 接受字节数据, 并交由http解析器进行解析             self.parser.feed_data(data)         except httptools.HttpParserError as exc:             # 解析失败, 应该不是http协议的数据, 断开连接             msg = "Invalid HTTP request received."             self.logger.warning(msg, exc_info=exc)             self.transport.close()         except httptools.HttpParserUpgrade:             # 已经超过了解析器能解析的协议版本, 应该交由更新的协议解析器处理             self.handle_upgrade() 复制代码

分析完了几个跟连接相关的主要方法后就会发现分析路线已经断了, 而该类中还有很多on_xxx的方法, 它们也没有被其他方法调用。 这是因为在初始化HTTP协议解析器的时候,uvicorn.protocol把自己的实例传入了HTTP解析器中, 解析器会边接收数据边按照url, header, body来顺序解析, 并在执行每种数据解析后, 会通过回调告诉传入的实例, uvicorn正是通过on_xxx方法来监听这些回调并处理解析完的HTTP数据:

class HttpToolsProtocol(asyncio.Protocol):     def on_url(self, url):         """这是收到一个请求后的第一次解析, 可以认为是该请求体的初始化, 此时会根据url和连接数据进行初始化, 并存放在实例的scope中"""         method = self.parser.get_method()         parsed_url = httptools.parse_url(url)         raw_path = parsed_url.path         path = raw_path.decode("ascii")         if "%" in path:             path = urllib.parse.unquote(path)         self.url = url         self.expect_100_continue = False         self.headers = []         self.scope = {             "type": "http",             "asgi": {"version": self.config.asgi_version, "spec_version": "2.1"},             "http_version": "1.1",             "server": self.server,             "client": self.client,             "scheme": self.scheme,             "method": method.decode("ascii"),             "root_path": self.root_path,             "path": path,             "raw_path": raw_path,             "query_string": parsed_url.query if parsed_url.query else b"",             "headers": self.headers,         }     def on_header(self, name: bytes, value: bytes):         """解析器在解析header时, 是按照header一行一行进行解析的, 所以每即系一行header都会调用一次on_header, 并把他们存在实例的headers中"""         name = name.lower()         if name == b"expect" and value.lower() == b"100-continue":             self.expect_100_continue = True         self.headers.append((name, value))     def on_headers_complete(self):         """对于大部分前置web框架来说, 一般解析到header后就结束不再解析了, 会开始发送到正真处理的应用程序, uvicorn也是这样的"""         http_version = self.parser.get_http_version()         if http_version != "1.1":             self.scope["http_version"] = http_version         if self.parser.should_upgrade():             # 如果发现当前http版本更加高级(比如websocket), 则不再处理, 在另外一个逻辑会转到websocket处理             return         # Handle 503 responses when 'limit_concurrency' is exceeded.         if self.limit_concurrency is not None and (             len(self.connections) >= self.limit_concurrency             or len(self.tasks) >= self.limit_concurrency         ):             # 当前并发数过高, 不再转发给后面的应用程序, 直接返回错误, 这里是一个具有ASGI标准函数签名的函数, 里面实现的功能是发送错误信息到socket             app = service_unavailable             message = "Exceeded concurrency limit."             self.logger.warning(message)         else:             app = self.app         # cycle相当于一个request的处理流程         # 普通的HTTP请求只对应一个cycle就可以了, 这里是兼容Pipelined HTTP请求         existing_cycle = self.cycle         self.cycle = RequestResponseCycle(             scope=self.scope,             transport=self.transport,             flow=self.flow,             logger=self.logger,             access_logger=self.access_logger,             access_log=self.access_log,             default_headers=self.default_headers,             message_event=asyncio.Event(),             expect_100_continue=self.expect_100_continue,             keep_alive=http_version != "1.0",             on_response=self.on_response_complete,         )         if existing_cycle is None or existing_cycle.response_complete:             # 如果上个请求已经处理完了, 则开始处理这个请求(通过run_asgi来运行)             task = self.loop.create_task(self.cycle.run_asgi(app))             task.add_done_callback(self.tasks.discard)             self.tasks.add(task)         else:             # 如果上个请求没有处理完, 就先暂停读取数据, 并把该cycle放到pipeline暂存             self.flow.pause_reading()             self.pipeline.insert(0, (self.cycle, app))     def on_body(self, body: bytes):         """读取到原生的body字节, 如果ASGI处理者还在运行, 且不是websocket, 则转给ASGI处理者         注: 一个请求可能会触发多次on_body"""         if self.parser.should_upgrade() or self.cycle.response_complete:             return         self.cycle.body += body         if len(self.cycle.body) > HIGH_WATER_LIMIT:             # 由于ASGI应用程序会根据调用者需要才来获取body(比如starlette的 await request.body()), 如果应用程序没有需要则会暂缓获取body数据             self.flow.pause_reading()         # 告诉ASGI应用程序, body已经获取结束(通常在cycle的more_body为False的时候, 才会检查message_event)         self.cycle.message_event.set()     def on_message_complete(self):         if self.parser.should_upgrade() or self.cycle.response_complete:             return         # 表示body已经读取结束了          self.cycle.more_body = False         self.cycle.message_event.set()     def on_response_complete(self):         """返回响应时的回调"""         self.server_state.total_requests += 1         if self.transport.is_closing():             return         # 设置一个keep_alive的机制, 服务端返回响应后会设置一个倒计时future, 该future只有在上面data_received收到请求的时候才会取消         # 如果该future没有取消, 则会调用timeout_keep_alive_handler函数来关闭transport通道         self._unset_keepalive_if_required()         self.timeout_keep_alive_task = self.loop.call_later(             self.timeout_keep_alive, self.timeout_keep_alive_handler         )         # 恢复读取数据         self.flow.resume_reading()         if self.pipeline:             # 如果是pipeline请求, 则开始处理刚才暂存的cycle             cycle, app = self.pipeline.pop()             task = self.loop.create_task(cycle.run_asgi(app))             task.add_done_callback(self.tasks.discard)             self.tasks.add(task) 复制代码

在了解解析HTTP数据的时候, 经常会遇到一个cycle对象, 这个对象是基于ASGI负责读写数据转换的对象, 这个对象有sendreceive两个方法, 这两个方法的命名是站在ASGI应用程序的角度来命名的。 其中send通过传入的参数message获取到ASGI应用程序返回的数据, 并依据ASGI协议进行解析, 并拼接成HTTP协议的字节流, 当ASGI应用程序发送结束标记时, send会把拼接的字节流通过socket返回给客户端, 同时触发on_response_complete方法。 而receive比较简单, 它只负责接收获取到已经解析完成的HTTP数据(早前面on_xxx时会把数据传给cycle), 然后发送到ASGI应用程序中。 这两个方法都是通过on_headers_complete中执行的run_asgi方法来调用的, 通过该方法, uvicorn会把数据的处理权转给ASGI应用程序, 如果ASGI应用程序处理异常, 则会返回HTTP状态码为500的响应给客户端并关闭transport。

分析完了cycle对象后, 再次回到protocol的data_received的方法中, 这里通过获取httptools.HttpParserUpgrade异常的方式得知当前可能是一个WebSocket请求, 于是进入handle_upgrade逻辑, 这个逻辑会检查是否加载了解析WebSocket解析器以及请求体是否满足WebSocket条件, 如果不满足就会返回一个响应体告诉客户端当前无法支持该HTTP请求的升级协议, 如果满足则会生成WebSocketProtocols来处理socket的数据, 并把它设置为当前的transport向关联, 不过这个WebsocketProtocols基本上是HTTP protocols和cycle两个对象的融合, 具体处理步骤也差不多, 这里就不多做描述了。

至此, 大体上的uvicorn.protocols源码就分析完了, 由于uvicorn是把asyncio.Protocol, 解析器, cycle三者结合在一起, 所以分析起来要经常跳转, 因此我把他们的流程转成如下的图: 流处理图

3.总结

至此, uvicorn的核心流程已经分析完了, 它先是通过server来启动一个服务, 并管理服务状态, 然后再通过protocol负责做双端的序列化, 使ASGI应用程序能够按照ASGI协议读写数据, 其中protocol还融合了HTTP解析器解析HTTP并通过它来解析数据。 当然, 除了上述主流程外, uvicorn还包括了中间件, 多进程启动以及监控文件变化重启服务等组件, 这些组件的代码量不大, 分析源码也说不出啥, 这里就简单略过。


作者:陈思煜
链接:https://juejin.cn/post/7034150387774914597

 伪原创工具 https://www.237it.com/ 


文章分类
代码人生
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐