feat: Добавлена обработка сообщений, ивентов и подписок

This commit is contained in:
AURUMVORXX
2025-09-02 20:27:21 +05:00
parent 80d51cece4
commit 275bd6d8b1
3 changed files with 129 additions and 81 deletions

View File

@@ -2,57 +2,58 @@ from __future__ import annotations
import json
import logging
import asyncio
from weakref import WeakValueDictionary
from fastapi import WebSocket, FastAPI, Depends, HTTPException, WebSocketDisconnect, WebSocketException
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from weakref import WeakValueDictionary, WeakSet, finalize
from collections import UserDict
from uuid import uuid4
from fastapi import WebSocket, FastAPI, WebSocketDisconnect, WebSocketException
from .event import call_event
class TopicWeakDict(UserDict):
# Аналог WeakValueDict, но с сильными ссылками для возможности поддержки WeakSet
def __init__(self, initial_data = None):
super().__init__(initial_data or {})
def __setitem__(self, key, value):
finalize(value, lambda k=key, s=self: s.pop(k))
super().__setitem__(key, value)
class Server:
_current_server: Server | None = None
_logger: logging.Logger = logging.getLogger(__name__)
_static_tokens: list[str] = []
_temp_tokens: list[str] = []
_requests: WeakValueDictionary[str, asyncio.Future] = WeakValueDictionary()
_topics = TopicWeakDict()
_topic_lock = asyncio.Lock()
@classmethod
def get_current_server(cls) -> Server:
if cls._current_server is None:
raise ConnectionError('PyG2O сервер не подключен')
def init(cls, *, app: FastAPI, static_tokens: list[str] = []):
cls._logger.addHandler(logging.NullHandler())
cls._static_tokens = static_tokens
return cls._current_server
def __init__(self, *, app: FastAPI, server_username: str, server_password: str, client_password: str):
Server._current_server = self
self._security = HTTPBasic()
self._server_token: str = ''
self._server_username = server_username
self._server_password = server_password
self._logger = logging.getLogger(__name__)
self._logger.addHandler(logging.NullHandler())
self._server_connection: WebSocket | None = None
self._requests: WeakValueDictionary[str, asyncio.Future] = WeakValueDictionary()
self._register_routes(app)
cls._requests: WeakValueDictionary[str, asyncio.Future] = WeakValueDictionary()
cls._register_routes(app)
@classmethod
async def server_call(cls, message: str):
return await cls.get_current_server()._call(cls.get_current_server()._server_connection, message)
async def publish(cls, topic: str, message: str) -> asyncio.Future:
if topic not in cls._topics:
raise KeyError('Клиентов прослушивающих этот топик не существует')
async def _call(self, socket: WebSocket | None, message: str):
if socket is None:
raise ConnectionError('PyG2O сервер не подключен')
request, data = self._make_request()
request, data = cls._make_request()
data['data'] = message
data = json.dumps(data)
# Меняем синтаксис под Squirrel
data = data.replace("'", '\\"').replace('True', 'true').replace('False', 'false')
await socket.send_text(message)
for connection in cls._topics[topic]:
await connection.send_text(data)
return request
def _make_request(self):
@classmethod
def _make_request(cls):
request_id = str(uuid4())
request = asyncio.Future()
self._requests[request_id] = request
cls._requests[request_id] = request
data = {
'uuid': request_id,
@@ -61,57 +62,90 @@ class Server:
return request, data
def _register_routes(self, app):
@app.websocket('/pyg2o/server')
async def pyg2o_main(websocket: WebSocket):
await self._handle_server_connection(websocket)
@classmethod
def _register_routes(cls, app):
@app.websocket('/pyg2o')
async def pyg2o(websocket: WebSocket):
await cls._handle_connection(websocket)
@app.websocket('/pyg2o/client/{playerid}')
async def pyg2o_client(websocket: WebSocket, playerid: int):
await self._handle_client_connection(websocket, playerid)
_ = pyg2o
# Я потратил примерно 2ч чтобы понять, почему pyright игнорирует type: ignore
# Я сдаюсь, мне пришлось это добавить
_ = pyg2o_main
_ = pyg2o_client
@classmethod
async def _subscribe(cls, topic_list: list[str], connection: WebSocket):
async with cls._topic_lock:
for topic in topic_list:
if topic not in cls._topics:
cls._topics[topic] = WeakSet()
cls._topics[topic].add(connection)
async def _handle_server_connection(self, websocket: WebSocket):
headers = websocket.headers
password = headers.get('Authorization')
if password != self._server_password:
# Закрытие до принятия подключения выбрасывает 403 (Forbidden) код, так что не нужны доп сообщения
await websocket.close()
@classmethod
async def _unsubscribe(cls, topic_list: list[str], connection: WebSocket):
async with cls._topic_lock:
for topic in topic_list:
cls._topics[topic].discard(connection)
@classmethod
async def _handle_connection(cls, connection: WebSocket):
if not await cls._process_headers(connection):
await connection.close()
return
if self._server_connection is not None:
await self._server_connection.close()
await websocket.accept()
self._server_connection = websocket
self._logger.info('PyG2O сервер подключился')
await connection.accept()
cls._logger.info('WebSocket клиент подключился')
try:
while True:
try:
data = await websocket.receive_text()
data = await connection.receive_text()
message_data = json.loads(data)
self._logger.info(f'Сообщение сервера: {message_data}')
asyncio.create_task(cls._process_message(connection, message_data))
except json.JSONDecodeError as e:
self._logger.exception(f'Ошибка декодирования JSON: {e}')
cls._logger.exception(f'Ошибка декодирования JSON: {e}')
except WebSocketDisconnect:
self._logger.info('PyG2O сервер отключился')
cls._logger.info('WebSocket клиент отключился')
except WebSocketException as e:
self._logger.exception(f'Ошибка подключения PyG2O сервера: {e}')
cls._logger.exception(f'Ошибка WebSocket подключения: {e}')
async def _process_server_message(self, message: dict):
@classmethod
async def _process_headers(cls, connection: WebSocket) -> bool:
headers = connection.headers
token = headers.get('Authorization')
topics = headers.get('Subscribe')
if token not in cls._static_tokens and token not in cls._temp_tokens:
return False
if topics is not None:
topic_list = [s.strip() for s in topics.split(',')]
await cls._subscribe(topic_list, connection)
return True
@classmethod
async def _process_message(cls, connection: WebSocket, message: dict):
match message:
case {'uuid': id, 'data': data}:
...
case {'data': data}:
...
case _:
raise ValueError(f'Неподдерживаемый тип PyG2O Server сообщения: {message}')
async def _handle_client_connection(self, websocket: WebSocket, playerid: int):
...
case {'uuid': id, 'data': data}:
if id in cls._requests:
cls._requests[id].set_result(data)
else:
asyncio.create_task(call_event('onWebsocketMessage', connection, id, data))
case {'event': event, **args}:
asyncio.create_task(call_event(event, **args))
case {'subscribe': topics}:
await cls._subscribe(topics, connection)
case {'unsubscribe': topics}:
await cls._unsubscribe(topics, connection)
case {'create_temp_token': token}:
cls._temp_tokens.append(token)
case {'remove_temp_token': token}:
cls._temp_tokens.remove(token)
case _:
raise ValueError(f'Неподдерживаемый тип PyG2O сообщения: {message}')