import json import threading from core.aochat.BaseModule import BaseModule from core.decorators import instance, timerevent from core.event_service import EventService from core.logger import Logger from core.setting_service import SettingService from core.setting_types import TextSettingType from modules.standard.datanet.ws_worker import WebsocketRelayWorker @instance() class WebsocketRelayController(BaseModule): WS_RELAY = "ws_relay_internal" def __init__(self): self.dthread = None self.queue = [] self.logger = Logger(__name__) self.worker = None self.encrypter = None self.channels = {} def inject(self, registry): self.bot = registry.get_instance("bot") self.db = registry.get_instance("db") self.util = registry.get_instance("util") self.setting_service: SettingService = registry.get_instance("setting_service") self.event_service: EventService = registry.get_instance("event_service") self.character_service = registry.get_instance("character_service") self.pork_service = registry.get_instance("pork_service") self.online_controller = registry.get_instance("online_controller") self.public_channel_service = registry.get_instance("public_channel_service") self.message_hub_service = registry.get_instance("message_hub_service") def pre_start(self): self.event_service.register_event_type(self.WS_RELAY) self.setting_service.register_new(self.module_name, 'relay_address', 'ws://localhost:25500', TextSettingType([], allow_empty=True), "relay for timers, tower info, ...") @timerevent(budatime="1s", description="Relay messages from Data relay to the internal message hub", is_hidden=True) def handle_queue_event(self, _, _1): while self.queue: obj = self.queue.pop(0) self.event_service.fire_event(self.WS_RELAY, obj) if obj.type == "connected": self.send_relay_message('join', f"{self.bot.name}") @timerevent(budatime="1m", description="Ensure the bot is connected to Data relay", is_hidden=True, run_at_startup=True) def handle_connect_event(self, _, _1): if not self.worker or not self.dthread.is_alive(): self.connect() def send_relay_message(self, msg_type, message): if self.worker: message = json.dumps(message) if self.encrypter: # noinspection PyArgumentEqualDefault message = self.encrypter.encrypt(message.encode('utf-8')).decode('utf-8') obj = json.dumps({"type": msg_type, "payload": message}) self.worker.send_message(obj) def connect(self): self.disconnect() self.worker = WebsocketRelayWorker(self.queue, self.setting_service.get_value("relay_address"), False) self.dthread = threading.Thread(target=self.worker.run, daemon=True) self.dthread.start() def disconnect(self): for channels in self.channels.values(): for channel in channels: self.online_controller.deregister_online_channel(channel) if self.worker: self.worker.close() self.worker = None self.dthread.join() self.dthread = None