FastAPI(56)- 使用 Websocket 打造一个迷你聊天室
背景
在实际项目中,可能会通过前端框架使用 WebSocket 和后端进行通信
这里就来详细讲解下 FastAPI 是如何操作 WebSocket 的
模拟 WebSocket 客户端
#!usr/bin/env python# -*- coding:utf-8 _*-"""# author: 小菠萝测试笔记# blog: https://www.cnblogs.com/poloyy/# time: 2021/10/5 5:26 下午# file: 46_websocket.py"""import uvicornfrom fastapi import FastAPI, WebSocketfrom fastapi.responses import HTMLResponseapp = FastAPI()html = """<!DOCTYPE html><html> <head> <title>小菠萝聊天室</title> </head> <body> <h1>小菠萝聊天室</h1> <form action="" onsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <button>Send</button> </form> <ul id='messages'> </ul> <script> // 加载页面,自动创建一个 WebSocket 连接 var ws = new WebSocket("ws://localhost:8080/ws"); // 收到消息 ws.onmessage = function(event) { // 获取输入框的值 var messages = document.getElementById('messages') // 创建一个 li 元素 var message = document.createElement('li') // 接收 event 的 data var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; // 发送消息方法 function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body></html>"""# 返回一段 HTML 代码给前端@app.get("/")async def get(): return HTMLResponse(html)@app.websocket("/ws")async def websocket_endpoint(websocket: WebSocket): # 1、ws 连接 await websocket.accept() while True: # 2、接收客户端发送的内容 data = await websocket.receive_text() # 3、服务端发送内容 await websocket.send_text(f"小菠萝收到的消息是: {data}")if __name__ == '__main__': uvicorn.run(app="46_websocket:app", reload=True, host="127.0.0.1", port=8080)
启动 uvicorn 服务器,访问 127.0.0.1:8080/
客户端、服务端建立 WebSocket 连接成功
发送聊天信息
每发一条消息,均会显示在列表中
可以在其他地方使用 WebSocket
Depends
Security
Cookie
Header
Path
Query
在依赖项中使用 WebSocket
from typing import Optionalimport uvicornfrom fastapi import FastAPI, WebSocket, Cookie, Query, status, Dependsfrom fastapi.responses import HTMLResponseapp = FastAPI()html = """<!DOCTYPE html><html> <head> <title>Chat</title> </head> <body> <h1>小菠萝聊天室</h1> <form action="" onsubmit="sendMessage(event)"> <label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label> <label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label> <button onclick="connect(event)">Connect</button> <hr> <label>Message: <input type="text" id="messageText" autocomplete="off"/></label> <button>Send</button> </form> <ul id='messages'> </ul> <script> var ws = null; function connect(event) { var itemId = document.getElementById("itemId") var token = document.getElementById("token") ws = new WebSocket("ws://localhost:8080/items/" + itemId.value + "/ws?token=" + token.value); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; event.preventDefault() } function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body></html>"""@app.get("/")async def get(): return HTMLResponse(html)async def get_cookie_or_token( websocket: WebSocket, session: Optional[str] = Cookie(None), token: Optional[str] = Query(None)): # 模拟:如果 session 和 token 都为空,则关闭 websocket if session or token: return session or token await websocket.close(code=status.WS_1008_POLICY_VIOLATION)@app.websocket("/items/{item_id}/ws")async def websocket_depends( websocket: WebSocket, item_id: str, q: Optional[str] = None, # 依赖项 cookie_or_token: str = Depends(get_cookie_or_token)): # 1、创建 websocket 连接 await websocket.accept() while True: # 2、接收客户端发送的内容 data = await websocket.receive_text() # 3、服务端发送内容 await websocket.send_text(f"cookie or token value is:{cookie_or_token}") if q: # 4、如果有传查询参数 q,则再发一条 await websocket.send_text(f"query param value is:{q}") # 5、最后再发一条信息 await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}")if __name__ == '__main__': uvicorn.run(app="46_websocket:app", reload=True, host="127.0.0.1", port=8080)
发送聊天信息
不带查询参数 q
带查询参数 q
当 WebSocket 连接关闭时
将引发 WebSocketDisconnect 异常,这不是期望看到的结果
处理断开连接和多个客户端
from typing import Listimport uvicornfrom fastapi import FastAPI, WebSocket, WebSocketDisconnect, statusfrom fastapi.responses import HTMLResponseapp = FastAPI()html = """<!DOCTYPE html><html> <head> <title>Chat</title> </head> <body> <h1>WebSocket Chat</h1> <h2>Your ID: <span id="ws-id"></span></h2> <form action="" onsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <button>Send</button> </form> <ul id='messages'> </ul> <script> var client_id = Date.now() document.querySelector("#ws-id").textContent = client_id; var ws = new WebSocket(`ws://localhost:8080/ws/${client_id}`); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body></html>"""# 返回一段 HTML 代码给前端@app.get("/")async def get(): return HTMLResponse(html)# 处理和广播消息到多个 WebSocket 连接class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def send_personal_message(self, message: str, websocket: WebSocket): await websocket.send_text(message) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message)manager = ConnectionManager()@app.websocket("/ws/{client_id}")async def websocket_endpoint(client_id: str, websocket: WebSocket): # 1、客户端、服务端建立 ws 连接 await manager.connect(websocket) # 2、广播某个客户端进入聊天室 await manager.broadcast(f"{client_id} 进入了聊天室") try: while True: # 3、服务端接收客户端发送的内容 data = await websocket.receive_text() # 4、广播某个客户端发送的消息 await manager.broadcast(f"{client_id} 发送消息:{data}") # 5、服务端回复客户端 await manager.send_personal_message(f"服务端回复{client_id}:你发送的信息是:{data}", websocket) except WebSocketDisconnect: # 6、若有客户端断开连接,广播某个客户端离开了 manager.disconnect(websocket) await manager.broadcast(f"{client_id} 离开了聊天室")if __name__ == '__main__': uvicorn.run(app="48_websocket_handler:app", reload=True, host="127.0.0.1", port=8080)
模拟一个小型聊天室的场景
新的客户端进来,所有人都会收到新客户端进入聊天室的消息
某个客户端发送消息,所有人都能看到
某个客户端退出了(关闭浏览器),所有人都会收到该客户端退出聊天室的消息
浏览器打开三个 tab 均访问 127.0.0.1:8080
关掉其中一个客户端(tab)
__EOF__ 本文作者: 小菠萝测试笔记 本文链接: https://www.cnblogs.com/poloyy/p/15369967.html