Compare commits

...

10 Commits

5 changed files with 164 additions and 65 deletions

View File

@@ -1,10 +1,12 @@
addEvent("onReceiveClientPassword");
CLIENT_PASSWORD <- -1;
addEventHandler("onPacket", function(data){
local id = data.readUInt8();
if (id == 250)
{
local client_password = data.readString();
CLIENT_PASSWORD = client_password;
callEvent("onReceiveClientPassword", client_password);
}
})

View File

@@ -45,7 +45,8 @@ class PyG2O
function _send(data, uuid = "none")
{
data["uuid"] <- uuid;
if (!("uuid" in data))
data["uuid"] <- uuid;
_connection.send(JSON.dump_ansi(data, 2));
}
@@ -54,6 +55,7 @@ class PyG2O
if (!_silent)
print("[PyG2O] Successfully connected to " + url);
_send({"event": "register_server"});
_send({"event": "init_temp_tokens", "token": _clientTokens});
}
@@ -73,7 +75,7 @@ class PyG2O
{
local request = JSON.parse_ansi(message);
if (!("uuid" in request) ||
!("data" in request))
!("code" in request))
return;
_message_call.bindenv(this)(request);

View File

@@ -1,7 +1,7 @@
function _message_call(data)
{
local compile_string = "try { " + data["data"] + " } catch(id) { print(\"[PyG2O] Error white executing the code: \" + id); return null; }";
local compile_string = "try { " + data["code"] + " } catch(id) { print(\"[PyG2O] Error while executing the code: \" + id); return null; }";
local result = compilestring(compile_string)();
_send({"event": "sq_response", "uuid": data["uuid"], "data": result});
}

View File

@@ -37,7 +37,7 @@ async def call_event(event_name: str, **kwargs):
if result is None:
return
await Server.send(
Server.send(
connection = kwargs['connection'],
uuid = kwargs['uuid'],
event = 'backend_response',

View File

@@ -1,11 +1,13 @@
from __future__ import annotations
import json
from types import SimpleNamespace
import loguru
import asyncio
from types import SimpleNamespace
from weakref import WeakValueDictionary, WeakSet, finalize
from collections import UserDict
from uuid import uuid4
from loguru import logger
from fastapi import WebSocket, FastAPI, WebSocketDisconnect, WebSocketException
from .event import call_event
@@ -19,9 +21,10 @@ class TopicWeakDict(UserDict):
super().__setitem__(key, value)
class Server:
_logger: loguru.Logger = loguru.logger
_static_tokens: list[str] = []
_temp_tokens: list[str] = []
_server_connection: WebSocket | None = None
_registered_clients: dict[int, list] = {}
_requests: WeakValueDictionary[str, asyncio.Future] = WeakValueDictionary()
_topics = TopicWeakDict()
_topic_lock = asyncio.Lock()
@@ -34,52 +37,63 @@ class Server:
cls._register_routes(app)
@classmethod
def publish(cls, topic: str, event: str, message: dict) -> asyncio.Future | None:
try:
if topic not in cls._topics:
raise KeyError('Клиентов прослушивающих этот топик не существует')
# Формируем сообщение
request, data = cls._make_request()
data['event'] = event
data.update(message)
data = json.dumps(data)
# Меняем синтаксис под Squirrel
data = data.replace("'", '\\"').replace('True', 'true').replace('False', 'false')
asyncio.create_task(cls._send_to_topic(topic, data))
return request
except ValueError:
cls._logger.exception('message должен быть типа dict')
@classmethod
async def _send_to_topic(cls, topic, data):
for connection in cls._topics[topic]:
async def _async_send(cls, connection_list: list[WebSocket], data):
for connection in connection_list:
await connection.send_text(data)
@classmethod
async def send(cls, connection: WebSocket, event: str, message: dict, uuid: str | None = None):
def send(cls, connection: WebSocket | str | int, event: str, message: dict, uuid: str | None = None):
try:
data = {
'event': event,
'uuid': uuid,
}
if isinstance(connection, WebSocket):
connection_list = [connection]
elif isinstance(connection, int):
connection_list = cls._registered_clients[connection]
else:
connection_list = cls._topics[connection]
request, data = cls._make_request(event, uuid)
data.update(message)
data = json.dumps(data)
# Меняем синтаксис под Squirrel
data = data.replace("'", '\\"').replace('True', 'true').replace('False', 'false')
await connection.send_text(data)
except KeyError:
raise KeyError('Нет зарегистрированного клиента с таким ID')
except ValueError:
cls._logger.exception('message должен быть типа dict')
raise ValueError('message должен быть типа dict')
message_uuid = data['uuid']
data = json.dumps(data)
# Меняем синтаксис под Squirrel
data = data.replace("'", '\\"').replace('True', 'true').replace('False', 'false')
logs_connection_list = [f"{item.client.host}:{item.client.port}" for item in connection_list]
logger.info(
'Отправлено новое сообщение по каналу WebSocket',
log_type = 'PyG2O',
receivers = logs_connection_list,
message_uuid = message_uuid,
message_data = data,
)
asyncio.create_task(cls._async_send(connection_list, data))
return request
@classmethod
def _make_request(cls):
request_id = str(uuid4())
def sq_execute(cls, code: str) -> asyncio.Future:
if cls._server_connection is not None:
return cls.send(cls._server_connection, 'sq_execute', {'code': code})
else:
raise ConnectionError('Сервер не подключен к PyG2O')
@classmethod
def _make_request(cls, event: str, uuid: str | None = None):
if uuid is None:
request_id = str(uuid4())
else:
request_id = uuid
request = asyncio.Future()
cls._requests[request_id] = request
data = {
'event': event,
'uuid': request_id,
}
@@ -88,8 +102,8 @@ class Server:
@classmethod
def _register_routes(cls, app):
@app.websocket('/pyg2o')
async def pyg2o(websocket: WebSocket, token: str, topics: str | None = None):
await cls._handle_connection(websocket, token, topics)
async def pyg2o(websocket: WebSocket, token: str):
await cls._handle_connection(websocket, token)
_ = pyg2o
@@ -108,14 +122,19 @@ class Server:
cls._topics[topic].discard(connection)
@classmethod
async def _handle_connection(cls, connection: WebSocket, token: str, topics: str | None):
async def _handle_connection(cls, connection: WebSocket, token: str):
if not await cls._process_query_params(connection, token, topics):
if token not in cls._static_tokens and token not in cls._temp_tokens:
await connection.close()
return
await connection.accept()
cls._logger.info('WebSocket клиент подключился')
await cls._subscribe(['all'], connection)
logger.info(
'WebSocket клиент подключился',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
)
try:
while True:
@@ -124,23 +143,37 @@ class Server:
message_data = json.loads(data)
asyncio.create_task(cls._process_message(connection, message_data))
except json.JSONDecodeError as e:
cls._logger.exception(f'Ошибка декодирования JSON: {e}')
logger.info(
'Ошибка декодирования JSON сообщения',
log_type = 'PyG2O',
description = e,
message_data = data,
connection = f"{connection.client.host}:{connection.client.port}",
)
except WebSocketDisconnect:
cls._logger.info('WebSocket клиент отключился')
if connection == cls._server_connection:
cls._server_connection = None
logger.info(
'WebSocket G2O сервер отключился',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
)
else:
playerid = next((key for key, values in cls._registered_clients.items() if connection in values), None)
if playerid is not None: cls._registered_clients[playerid].remove(connection)
logger.info(
'WebSocket клиент отключился',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
playerid = playerid,
)
except WebSocketException as e:
cls._logger.exception(f'Ошибка WebSocket подключения: {e}')
@classmethod
async def _process_query_params(cls, connection: WebSocket, token: str, topics: str | None) -> bool:
if token not in cls._static_tokens and token not in cls._temp_tokens:
return False
if topics is not None:
topic_list = [s.strip() for s in topics.split(',')]
await cls._subscribe(topic_list, connection)
return True
logger.exception(
'Ошибка при обработке WebSocket сообщения',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
description = e,
)
@classmethod
async def _process_message(cls, connection: WebSocket, message: dict):
@@ -161,19 +194,81 @@ class Server:
case {'event': 'init_temp_tokens', 'tokens': tokens}:
cls._temp_tokens = cls._temp_tokens + list(tokens.items())
case {'event': 'register_client', 'playerid': playerid}:
try:
cls._registered_clients[playerid].append(connection)
except KeyError:
cls._registered_clients[playerid] = [connection]
logs_connection_list = [f"{item.client.host}:{item.client.port}" for item in cls._registered_clients[playerid]]
logger.info(
'Зарегистрирован новый WebSocket клиент',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
playerid = playerid,
total_playerid_connections = logs_connection_list,
)
case {'event': 'register_server'}:
if cls._server_connection is None:
cls._server_connection = connection
logger.info(
'Зарегистрирован новый WebSocket G2O сервер',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
)
case {'event': 'sq_response', 'uuid': uuid, 'data': data}:
try:
cls._requests[uuid].set_result(data)
logger.info(
'Получен ответ от G2O сервера (sq_response)',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
message_uuid = uuid,
message_data = data,
)
except KeyError:
...
logger.warning(
'Получен неожиданный ответ от G2O сервера',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
message_uuid = uuid,
message_data = data,
)
case {'event': event, 'uuid': uuid, **kwargs}:
try:
cls._requests[uuid].set_result(SimpleNamespace(**kwargs))
logger.info(
'Получен ответ от клиента',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
message_event = event,
message_uuid = uuid,
message_data = kwargs,
)
except KeyError:
kwargs['uuid'] = uuid
kwargs['connection'] = connection
playerid = next((key for key, values in cls._registered_clients.items() if connection in values), None)
if playerid is not None: kwargs['playerid'] = playerid
asyncio.create_task(call_event(event, **kwargs))
if (event != 'onTick' and event != 'onTime'):
logger.info(
'Получено сообщение от сервера' if connection == cls._server_connection else 'Получено сообщение от клиента',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
message_event = event,
message_uuid = uuid,
message_data = kwargs,
)
case _:
logger.error(
'Получено неподдерживаемое сообщение от сервера' if connection == cls._server_connection else 'Получено неподдерживаемое сообщение от клиента',
log_type = 'PyG2O',
connection = f"{connection.client.host}:{connection.client.port}",
message_data = message,
)
raise ValueError(f'Неподдерживаемый тип PyG2O сообщения: {message}')