refactor: Упрощен метод send - удалены дубликаты
+ Удален query параметр topics + Добавлена автоподпись на топик all
This commit is contained in:
@@ -36,67 +36,52 @@ class Server:
|
||||
cls._register_routes(app)
|
||||
|
||||
@classmethod
|
||||
def publish(cls, topic: str, event: str, message: dict) -> asyncio.Future | None:
|
||||
try:
|
||||
if topic not in cls._topics:
|
||||
raise KeyError('Клиентов прослушивающих этот топик не существует')
|
||||
|
||||
# Формируем сообщение
|
||||
request, data = cls._make_request()
|
||||
data['event'] = event
|
||||
data.update(message)
|
||||
data = json.dumps(data)
|
||||
|
||||
# Меняем синтаксис под Squirrel
|
||||
data = data.replace("'", '\\"').replace('True', 'true').replace('False', 'false')
|
||||
|
||||
asyncio.create_task(cls._send_to_topic(topic, data))
|
||||
return request
|
||||
except ValueError:
|
||||
cls._logger.exception('message должен быть типа dict')
|
||||
|
||||
@classmethod
|
||||
async def _send_to_topic(cls, topic, data):
|
||||
for connection in cls._topics[topic]:
|
||||
async def _async_send(cls, connection_list: list[WebSocket], data):
|
||||
for connection in connection_list:
|
||||
await connection.send_text(data)
|
||||
|
||||
@classmethod
|
||||
async def send(cls, connection: WebSocket, event: str, message: dict, uuid: str | None = None):
|
||||
def send(cls, connection: WebSocket | str | int, event: str, message: dict, uuid: str | None = None):
|
||||
try:
|
||||
data = {
|
||||
'event': event,
|
||||
'uuid': uuid,
|
||||
}
|
||||
if isinstance(connection, WebSocket):
|
||||
connection_list = [connection]
|
||||
elif isinstance(connection, int):
|
||||
connection_list = cls._registered_clients[connection]
|
||||
else:
|
||||
connection_list = cls._topics[connection]
|
||||
|
||||
request, data = cls._make_request(event, uuid)
|
||||
data.update(message)
|
||||
data = json.dumps(data)
|
||||
# Меняем синтаксис под Squirrel
|
||||
data = data.replace("'", '\\"').replace('True', 'true').replace('False', 'false')
|
||||
await connection.send_text(data)
|
||||
except KeyError:
|
||||
raise KeyError('Нет зарегистрированного клиента с таким ID')
|
||||
except ValueError:
|
||||
cls._logger.exception('message должен быть типа dict')
|
||||
raise ValueError('message должен быть типа dict')
|
||||
|
||||
@classmethod
|
||||
def sq_execute(cls, code: str) -> asyncio.Future | None:
|
||||
if cls._server_connection is None:
|
||||
return
|
||||
|
||||
request, data = cls._make_request()
|
||||
data['code'] = code
|
||||
data = json.dumps(data)
|
||||
|
||||
# Меняем синтаксис под Squirrel
|
||||
data = data.replace("'", '\\"').replace('True', 'true').replace('False', 'false')
|
||||
|
||||
asyncio.create_task(cls._server_connection.send_text(data))
|
||||
asyncio.create_task(cls._async_send(connection_list, data))
|
||||
return request
|
||||
|
||||
@classmethod
|
||||
def _make_request(cls):
|
||||
request_id = str(uuid4())
|
||||
def sq_execute(cls, code: str) -> asyncio.Future:
|
||||
if cls._server_connection is not None:
|
||||
return cls.send(cls._server_connection, 'sq_execute', {'code': code})
|
||||
else:
|
||||
raise ConnectionError('Сервер не подключен к PyG2O')
|
||||
|
||||
@classmethod
|
||||
def _make_request(cls, event: str, uuid: str | None = None):
|
||||
if uuid is None:
|
||||
request_id = str(uuid4())
|
||||
else:
|
||||
request_id = uuid
|
||||
|
||||
request = asyncio.Future()
|
||||
cls._requests[request_id] = request
|
||||
|
||||
data = {
|
||||
'event': event,
|
||||
'uuid': request_id,
|
||||
}
|
||||
|
||||
@@ -105,8 +90,8 @@ class Server:
|
||||
@classmethod
|
||||
def _register_routes(cls, app):
|
||||
@app.websocket('/pyg2o')
|
||||
async def pyg2o(websocket: WebSocket, token: str, topics: str | None = None):
|
||||
await cls._handle_connection(websocket, token, topics)
|
||||
async def pyg2o(websocket: WebSocket, token: str):
|
||||
await cls._handle_connection(websocket, token)
|
||||
|
||||
_ = pyg2o
|
||||
|
||||
@@ -125,13 +110,14 @@ class Server:
|
||||
cls._topics[topic].discard(connection)
|
||||
|
||||
@classmethod
|
||||
async def _handle_connection(cls, connection: WebSocket, token: str, topics: str | None):
|
||||
|
||||
if not await cls._process_query_params(connection, token, topics):
|
||||
async def _handle_connection(cls, connection: WebSocket, token: str):
|
||||
|
||||
if token not in cls._static_tokens and token not in cls._temp_tokens:
|
||||
await connection.close()
|
||||
return
|
||||
|
||||
await connection.accept()
|
||||
await cls._subscribe(['all'], connection)
|
||||
cls._logger.info('WebSocket клиент подключился')
|
||||
|
||||
try:
|
||||
@@ -149,18 +135,6 @@ class Server:
|
||||
except WebSocketException as e:
|
||||
cls._logger.exception(f'Ошибка WebSocket подключения: {e}')
|
||||
|
||||
@classmethod
|
||||
async def _process_query_params(cls, connection: WebSocket, token: str, topics: str | None) -> bool:
|
||||
|
||||
if token not in cls._static_tokens and token not in cls._temp_tokens:
|
||||
return False
|
||||
|
||||
if topics is not None:
|
||||
topic_list = [s.strip() for s in topics.split(',')]
|
||||
await cls._subscribe(topic_list, connection)
|
||||
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
async def _process_message(cls, connection: WebSocket, message: dict):
|
||||
match message:
|
||||
|
||||
Reference in New Issue
Block a user