Compare commits

..

10 Commits

5 changed files with 164 additions and 65 deletions

View File

@@ -1,10 +1,12 @@
addEvent("onReceiveClientPassword"); addEvent("onReceiveClientPassword");
CLIENT_PASSWORD <- -1;
addEventHandler("onPacket", function(data){ addEventHandler("onPacket", function(data){
local id = data.readUInt8(); local id = data.readUInt8();
if (id == 250) if (id == 250)
{ {
local client_password = data.readString(); local client_password = data.readString();
CLIENT_PASSWORD = client_password;
callEvent("onReceiveClientPassword", client_password); callEvent("onReceiveClientPassword", client_password);
} }
}) })

View File

@@ -45,6 +45,7 @@ class PyG2O
function _send(data, uuid = "none") function _send(data, uuid = "none")
{ {
if (!("uuid" in data))
data["uuid"] <- uuid; data["uuid"] <- uuid;
_connection.send(JSON.dump_ansi(data, 2)); _connection.send(JSON.dump_ansi(data, 2));
} }
@@ -54,6 +55,7 @@ class PyG2O
if (!_silent) if (!_silent)
print("[PyG2O] Successfully connected to " + url); print("[PyG2O] Successfully connected to " + url);
_send({"event": "register_server"});
_send({"event": "init_temp_tokens", "token": _clientTokens}); _send({"event": "init_temp_tokens", "token": _clientTokens});
} }
@@ -73,7 +75,7 @@ class PyG2O
{ {
local request = JSON.parse_ansi(message); local request = JSON.parse_ansi(message);
if (!("uuid" in request) || if (!("uuid" in request) ||
!("data" in request)) !("code" in request))
return; return;
_message_call.bindenv(this)(request); _message_call.bindenv(this)(request);

View File

@@ -1,7 +1,7 @@
function _message_call(data) function _message_call(data)
{ {
local compile_string = "try { " + data["data"] + " } catch(id) { print(\"[PyG2O] Error white executing the code: \" + id); return null; }"; local compile_string = "try { " + data["code"] + " } catch(id) { print(\"[PyG2O] Error while executing the code: \" + id); return null; }";
local result = compilestring(compile_string)(); local result = compilestring(compile_string)();
_send({"event": "sq_response", "uuid": data["uuid"], "data": result}); _send({"event": "sq_response", "uuid": data["uuid"], "data": result});
} }

View File

@@ -37,7 +37,7 @@ async def call_event(event_name: str, **kwargs):
if result is None: if result is None:
return return
await Server.send( Server.send(
connection = kwargs['connection'], connection = kwargs['connection'],
uuid = kwargs['uuid'], uuid = kwargs['uuid'],
event = 'backend_response', event = 'backend_response',

View File

@@ -1,11 +1,13 @@
from __future__ import annotations from __future__ import annotations
import json import json
from types import SimpleNamespace
import loguru
import asyncio import asyncio
from types import SimpleNamespace
from weakref import WeakValueDictionary, WeakSet, finalize from weakref import WeakValueDictionary, WeakSet, finalize
from collections import UserDict from collections import UserDict
from uuid import uuid4 from uuid import uuid4
from loguru import logger
from fastapi import WebSocket, FastAPI, WebSocketDisconnect, WebSocketException from fastapi import WebSocket, FastAPI, WebSocketDisconnect, WebSocketException
from .event import call_event from .event import call_event
@@ -19,9 +21,10 @@ class TopicWeakDict(UserDict):
super().__setitem__(key, value) super().__setitem__(key, value)
class Server: class Server:
_logger: loguru.Logger = loguru.logger
_static_tokens: list[str] = [] _static_tokens: list[str] = []
_temp_tokens: list[str] = [] _temp_tokens: list[str] = []
_server_connection: WebSocket | None = None
_registered_clients: dict[int, list] = {}
_requests: WeakValueDictionary[str, asyncio.Future] = WeakValueDictionary() _requests: WeakValueDictionary[str, asyncio.Future] = WeakValueDictionary()
_topics = TopicWeakDict() _topics = TopicWeakDict()
_topic_lock = asyncio.Lock() _topic_lock = asyncio.Lock()
@@ -34,52 +37,63 @@ class Server:
cls._register_routes(app) cls._register_routes(app)
@classmethod @classmethod
def publish(cls, topic: str, event: str, message: dict) -> asyncio.Future | None: async def _async_send(cls, connection_list: list[WebSocket], data):
for connection in connection_list:
await connection.send_text(data)
@classmethod
def send(cls, connection: WebSocket | str | int, event: str, message: dict, uuid: str | None = None):
try: try:
if topic not in cls._topics: if isinstance(connection, WebSocket):
raise KeyError('Клиентов прослушивающих этот топик не существует') connection_list = [connection]
elif isinstance(connection, int):
connection_list = cls._registered_clients[connection]
else:
connection_list = cls._topics[connection]
# Формируем сообщение request, data = cls._make_request(event, uuid)
request, data = cls._make_request()
data['event'] = event
data.update(message) data.update(message)
data = json.dumps(data) except KeyError:
raise KeyError('Нет зарегистрированного клиента с таким ID')
except ValueError:
raise ValueError('message должен быть типа dict')
message_uuid = data['uuid']
data = json.dumps(data)
# Меняем синтаксис под Squirrel # Меняем синтаксис под Squirrel
data = data.replace("'", '\\"').replace('True', 'true').replace('False', 'false') data = data.replace("'", '\\"').replace('True', 'true').replace('False', 'false')
asyncio.create_task(cls._send_to_topic(topic, data)) logs_connection_list = [f"{item.client.host}:{item.client.port}" for item in connection_list]
logger.info(
'Отправлено новое сообщение по каналу WebSocket',
log_type = 'PyG2O',
receivers = logs_connection_list,
message_uuid = message_uuid,
message_data = data,
)
asyncio.create_task(cls._async_send(connection_list, data))
return request return request
except ValueError:
cls._logger.exception('message должен быть типа dict')
@classmethod @classmethod
async def _send_to_topic(cls, topic, data): def sq_execute(cls, code: str) -> asyncio.Future:
for connection in cls._topics[topic]: if cls._server_connection is not None:
await connection.send_text(data) return cls.send(cls._server_connection, 'sq_execute', {'code': code})
else:
raise ConnectionError('Сервер не подключен к PyG2O')
@classmethod @classmethod
async def send(cls, connection: WebSocket, event: str, message: dict, uuid: str | None = None): def _make_request(cls, event: str, uuid: str | None = None):
try: if uuid is None:
data = {
'event': event,
'uuid': uuid,
}
data.update(message)
data = json.dumps(data)
# Меняем синтаксис под Squirrel
data = data.replace("'", '\\"').replace('True', 'true').replace('False', 'false')
await connection.send_text(data)
except ValueError:
cls._logger.exception('message должен быть типа dict')
@classmethod
def _make_request(cls):
request_id = str(uuid4()) request_id = str(uuid4())
else:
request_id = uuid
request = asyncio.Future() request = asyncio.Future()
cls._requests[request_id] = request cls._requests[request_id] = request
data = { data = {
'event': event,
'uuid': request_id, 'uuid': request_id,
} }
@@ -88,8 +102,8 @@ class Server:
@classmethod @classmethod
def _register_routes(cls, app): def _register_routes(cls, app):
@app.websocket('/pyg2o') @app.websocket('/pyg2o')
async def pyg2o(websocket: WebSocket, token: str, topics: str | None = None): async def pyg2o(websocket: WebSocket, token: str):
await cls._handle_connection(websocket, token, topics) await cls._handle_connection(websocket, token)
_ = pyg2o _ = pyg2o
@@ -108,14 +122,19 @@ class Server:
cls._topics[topic].discard(connection) cls._topics[topic].discard(connection)
@classmethod @classmethod
async def _handle_connection(cls, connection: WebSocket, token: str, topics: str | None): async def _handle_connection(cls, connection: WebSocket, token: str):
if not await cls._process_query_params(connection, token, topics): if token not in cls._static_tokens and token not in cls._temp_tokens:
await connection.close() await connection.close()
return return
await connection.accept() await connection.accept()
cls._logger.info('WebSocket клиент подключился') await cls._subscribe(['all'], connection)
logger.info(
'WebSocket клиент подключился',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
)
try: try:
while True: while True:
@@ -124,23 +143,37 @@ class Server:
message_data = json.loads(data) message_data = json.loads(data)
asyncio.create_task(cls._process_message(connection, message_data)) asyncio.create_task(cls._process_message(connection, message_data))
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
cls._logger.exception(f'Ошибка декодирования JSON: {e}') logger.info(
'Ошибка декодирования JSON сообщения',
log_type = 'PyG2O',
description = e,
message_data = data,
connection = f"{connection.client.host}:{connection.client.port}",
)
except WebSocketDisconnect: except WebSocketDisconnect:
cls._logger.info('WebSocket клиент отключился') if connection == cls._server_connection:
cls._server_connection = None
logger.info(
'WebSocket G2O сервер отключился',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
)
else:
playerid = next((key for key, values in cls._registered_clients.items() if connection in values), None)
if playerid is not None: cls._registered_clients[playerid].remove(connection)
logger.info(
'WebSocket клиент отключился',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
playerid = playerid,
)
except WebSocketException as e: except WebSocketException as e:
cls._logger.exception(f'Ошибка WebSocket подключения: {e}') logger.exception(
'Ошибка при обработке WebSocket сообщения',
@classmethod log_type = 'PyG2O',
async def _process_query_params(cls, connection: WebSocket, token: str, topics: str | None) -> bool: connection = f"{connection.client.host}:{connection.client.port}",
description = e,
if token not in cls._static_tokens and token not in cls._temp_tokens: )
return False
if topics is not None:
topic_list = [s.strip() for s in topics.split(',')]
await cls._subscribe(topic_list, connection)
return True
@classmethod @classmethod
async def _process_message(cls, connection: WebSocket, message: dict): async def _process_message(cls, connection: WebSocket, message: dict):
@@ -161,19 +194,81 @@ class Server:
case {'event': 'init_temp_tokens', 'tokens': tokens}: case {'event': 'init_temp_tokens', 'tokens': tokens}:
cls._temp_tokens = cls._temp_tokens + list(tokens.items()) cls._temp_tokens = cls._temp_tokens + list(tokens.items())
case {'event': 'register_client', 'playerid': playerid}:
try:
cls._registered_clients[playerid].append(connection)
except KeyError:
cls._registered_clients[playerid] = [connection]
logs_connection_list = [f"{item.client.host}:{item.client.port}" for item in cls._registered_clients[playerid]]
logger.info(
'Зарегистрирован новый WebSocket клиент',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
playerid = playerid,
total_playerid_connections = logs_connection_list,
)
case {'event': 'register_server'}:
if cls._server_connection is None:
cls._server_connection = connection
logger.info(
'Зарегистрирован новый WebSocket G2O сервер',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
)
case {'event': 'sq_response', 'uuid': uuid, 'data': data}: case {'event': 'sq_response', 'uuid': uuid, 'data': data}:
try: try:
cls._requests[uuid].set_result(data) cls._requests[uuid].set_result(data)
logger.info(
'Получен ответ от G2O сервера (sq_response)',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
message_uuid = uuid,
message_data = data,
)
except KeyError: except KeyError:
... logger.warning(
'Получен неожиданный ответ от G2O сервера',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
message_uuid = uuid,
message_data = data,
)
case {'event': event, 'uuid': uuid, **kwargs}: case {'event': event, 'uuid': uuid, **kwargs}:
try: try:
cls._requests[uuid].set_result(SimpleNamespace(**kwargs)) cls._requests[uuid].set_result(SimpleNamespace(**kwargs))
logger.info(
'Получен ответ от клиента',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
message_event = event,
message_uuid = uuid,
message_data = kwargs,
)
except KeyError: except KeyError:
kwargs['uuid'] = uuid kwargs['uuid'] = uuid
kwargs['connection'] = connection kwargs['connection'] = connection
playerid = next((key for key, values in cls._registered_clients.items() if connection in values), None)
if playerid is not None: kwargs['playerid'] = playerid
asyncio.create_task(call_event(event, **kwargs)) asyncio.create_task(call_event(event, **kwargs))
if (event != 'onTick' and event != 'onTime'):
logger.info(
'Получено сообщение от сервера' if connection == cls._server_connection else 'Получено сообщение от клиента',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
message_event = event,
message_uuid = uuid,
message_data = kwargs,
)
case _: case _:
logger.error(
'Получено неподдерживаемое сообщение от сервера' if connection == cls._server_connection else 'Получено неподдерживаемое сообщение от клиента',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
message_data = message,
)
raise ValueError(f'Неподдерживаемый тип PyG2O сообщения: {message}') raise ValueError(f'Неподдерживаемый тип PyG2O сообщения: {message}')