Files
igncore/core/private_channel_service.py
T

77 lines
3.3 KiB
Python

from core.aochat import server_packets, client_packets
from core.conn import Conn
from core.decorators import instance
from core.logger import Logger
@instance()
class PrivateChannelService:
PRIVATE_CHANNEL_MESSAGE_EVENT = "private_channel_message"
JOINED_PRIVATE_CHANNEL_EVENT = "private_channel_joined"
LEFT_PRIVATE_CHANNEL_EVENT = "private_channel_left"
def __init__(self):
self.logger = Logger(__name__)
self.private_channel_chars = {}
def inject(self, registry):
self.bot = registry.get_instance("bot")
self.event_service = registry.get_instance("event_service")
self.character_service = registry.get_instance("character_service")
self.access_service = registry.get_instance("access_service")
def pre_start(self):
self.event_service.register_event_type(self.JOINED_PRIVATE_CHANNEL_EVENT)
self.event_service.register_event_type(self.LEFT_PRIVATE_CHANNEL_EVENT)
self.event_service.register_event_type(self.PRIVATE_CHANNEL_MESSAGE_EVENT)
self.bot.register_packet_handler(server_packets.PrivateChannelClientJoined.id,
self.handle_private_channel_client_joined)
self.bot.register_packet_handler(server_packets.PrivateChannelClientLeft.id,
self.handle_private_channel_client_left)
# priority must be above that of CommandService in order for relaying of commands to work correctly
self.bot.register_packet_handler(server_packets.PrivateChannelMessage.id,
self.handle_private_channel_message, priority=30)
self.access_service.register_access_level("guest", 95, self.in_private_channel)
def handle_private_channel_message(self, conn: Conn, packet: server_packets.PrivateChannelMessage):
if conn.id != "main":
return
if packet.private_channel_id == self.bot.get_char_id():
self.event_service.fire_event(self.PRIVATE_CHANNEL_MESSAGE_EVENT, packet)
def handle_private_channel_client_joined(self, conn: Conn, packet: server_packets.PrivateChannelClientJoined):
if conn.id != "main":
return
if packet.private_channel_id == self.bot.get_char_id():
self.private_channel_chars[packet.char_id] = packet
self.event_service.fire_event(self.JOINED_PRIVATE_CHANNEL_EVENT, packet)
def handle_private_channel_client_left(self, conn: Conn, packet: server_packets.PrivateChannelClientLeft):
if conn.id != "main":
return
if packet.private_channel_id == self.bot.get_char_id():
del self.private_channel_chars[packet.char_id]
self.event_service.fire_event(self.LEFT_PRIVATE_CHANNEL_EVENT, packet)
def invite(self, char_id):
if char_id != self.bot.get_char_id():
self.bot.send_packet(client_packets.PrivateChannelInvite(char_id))
def kick(self, char_id):
if char_id != self.bot.get_char_id() and char_id in self.private_channel_chars:
self.bot.send_packet(client_packets.PrivateChannelKick(char_id))
def kickall(self):
self.bot.send_packet(client_packets.PrivateChannelKickAll())
def in_private_channel(self, char_id):
return char_id in self.private_channel_chars
def get_all_in_private_channel(self):
return self.private_channel_chars