Initial Release of IGNCore version 2.5

This commit is contained in:
2021-08-09 13:18:56 +02:00
commit a83d98c47e
910 changed files with 224171 additions and 0 deletions
+39
View File
@@ -0,0 +1,39 @@
from core.chat_blob import ChatBlob
from core.command_param_types import Const, Any, Options
from core.decorators import instance, command
from core.translation_service import TranslationService
@instance()
class AliasController:
def inject(self, registry):
self.command_alias_service = registry.get_instance("command_alias_service")
self.ts: TranslationService = registry.get_instance("translation_service")
self.getresp = self.ts.get_response
@command(command="alias", params=[Const("list")], access_level="member",
description="List command aliases")
def alias_list_cmd(self, _, _1):
blob = ""
data = self.command_alias_service.get_enabled_aliases()
count = len(data)
for row in data:
blob += row.alias + " - " + row.command + "\n"
return ChatBlob(self.getresp("module/config", "alias_blob_title", {"amount": count}), blob)
@command(command="alias", params=[Const("add"), Any("alias"), Any("command")], access_level="admin",
description="Add a command alias", sub_command="modify")
def alias_add_cmd(self, _, _1, alias, command_str):
if self.command_alias_service.add_alias(alias, command_str, force_enable=True):
return self.getresp("module/config", "alias_add_success", {"alias": alias, "cmd": command_str})
else:
return self.getresp("module/config", "alias_add_fail", {"alias": alias})
@command(command="alias", params=[Options(["rem", "remove"]), Any("alias")], access_level="admin",
description="Remove a command alias", sub_command="modify")
def alias_remove_cmd(self, _, _1, alias):
if self.command_alias_service.remove_alias(alias):
return self.getresp("module/config", "alias_rem_success", {"alias": alias})
else:
return self.getresp("module/config", "alias_rem_fail", {"alias": alias})
+181
View File
@@ -0,0 +1,181 @@
{
"alias_blob_title": {
"en_US": "Aliases ({amount})",
"de_DE": "Alias Liste ({amount})"
},
"alias_add_success": {
"en_US": "Alias <highlight>{alias}</highlight> for command <highlight>{cmd}</highlight> added successfully.",
"de_DE": "Der Alias <highlight>{alias}</highlight> für den Befehl <highlight>{cmd}</highlight> wurde erfolgreich hinzugefügt."
},
"alias_add_fail": {
"en_US": "Cannot add alias <highlight>{alias}</highlight> since there is already an active alias with that name.",
"de_DE": "Der Alias <highlight>{alias}</highlight> konnte nicht hinzugefügt werden, da bereits einer mit selbigem Alias existiert."
},
"alias_rem_success": {
"en_US": "Alias <highlight>{alias}</highlight> has been removed successfully.",
"de_DE": "Der Alias <highlight>{alias}</highlight> wurde erfolgreich entfernt."
},
"alias_rem_fail": {
"en_US": "Could not find alias <highlight>{alias}</highlight>",
"de_DE": "Der Alias <highlight>{alias}</highlight> konnte nicht gefunden werden."
},
"cmdlist_commands": {
"en_US": "Commands ({amount})",
"de_DE": "Befehlsliste ({amount})"
},
"cmd_unknown_channel": {
"en_US": "Unknown command channel <highlight>{channel}</highlight>.",
"de_DE": "Der Befehlschannel <highlight>{channel}</highlight> existiert nicht."
},
"cmd_unknown_for_channel": {
"en_US": "Could not find command <highlight>{cmd}</highlight> for channel <highlight>{channel}</highlight>.",
"de_DE": "Der Befehl <highlight>{cmd}</highlight> für den Channel <highlight>{channel}</highlight> wurde nicht gefunden."
},
"enabled_low": {
"en_US": "enabled",
"de_DE": "aktiviert"
},
"disabled_low": {
"en_US": "disabled",
"de_DE": "deaktiviert"
},
"run": {
"en_US": "run",
"de_DE": "ausgeführt"
},
"enabled_high": {
"en_US": "<green>Enabled</green>",
"de_DE": "<green>Aktiviert</green>"
},
"disabled_high": {
"en_US": "<red>Disabled</red>",
"de_DE": "<red>Deaktiviert</red>"
},
"enable": {
"en_US": "Enable",
"de_DE": "Aktivieren"
},
"disable": {
"en_US": "Disable",
"de_DE": "Deaktivieren"
},
"partial": {
"en_US": "Partial",
"de_DE": "Teilweise aktiviert"
},
"config": {
"en_US": "Config ({count})",
"de_DE": "Einstellungen ({count})"
},
"cmd_toggle_success": {
"en_US": "Command <highlight>{cmd}</highlight> has been <highlight>{changedto}</highlight> successfully.",
"de_DE": "Der Befehl <highlight>{cmd}</highlight> ist nun <highlight>{changedto}</highlight>"
},
"cmd_toggle_channel_success": {
"en_US": "Command <highlight>{cmd}</highlight> for channel <highlight>{channel}</highlight> has been <highlight>{changedto}</highlight> successfully.",
"de_DE": "Der Befehl <highlight>{cmd}</highlight> ist im channel <highlight>{channel}</highlight> nun <highlight>{changedto}</highlight>."
},
"unknown_accesslevel": {
"en_US": "Unknown access level <highlight>{al}</highlight>.",
"de_DE": "Unbekanntes Rechtelevel: <highlight>{al}</highlight>"
},
"set_accesslevel_success": {
"en_US": "Access level <highlight>{al}</highlight> for command <highlight>{cmd}</highlight> has been set successfully.",
"de_DE": "Für den Befehl <highlight>{cmd}</highlight> wurden die Zugriffsreche erfolgreich auf <highlight>{al}</highlight> geändert."
},
"set_accesslevel_fail": {
"en_US": "Access level <highlight>{al}</highlight> for command <highlight>{cmd}</highlight> on channel <highlight>{channel}</highlight> has been set successfully.",
"de_DE": "Der Befehl <highlight>{cmd}</highlight> benötigt im channel <highlight>{channel}</highlight> nun den Rang <highlight>{al}</highlight>."
},
"access_level": {
"en_US": "Access Level",
"de_DE": "Zugriffslevel"
},
"no_cmd": {
"en_US": "Could not find command <highlight>{cmd}</highlight>.",
"de_DE": "Der Befehl <highlight>{cmd}</highlight> ist mir unbekannt."
},
"settings": {
"en_US": "<header2>Settings</header2>\n",
"de_DE": "<header2>Einstellungen</header2>\n"
},
"commands": {
"en_US": "\n<header2>Commands</header2>\n",
"de_DE": "\n<header2>Befehle</header2>\n"
},
"events": {
"en_US": "Events",
"de_DE": "Events"
},
"hidden_events": {
"en_US": "Hidden Events",
"de_DE": "Verstecke Events"
},
"mod_title": {
"en_US": "Module ({mod})",
"de_DE": "Modul ({mod})"
},
"mod_not_found": {
"en_US": "Could not find module <highlight>{mod}</highlight>",
"de_DE": "Das Modul <highlight>{mod}</highlight> wurde nicht gefunden."
},
"no_new_value": {
"en_US": "Error! New value required to update setting.",
"de_DE": "Error! Es muss eine neue Option eingegeben werden, um die Einstellung zu ändern."
},
"set_clr": {
"en_US": "Setting <highlight>{setting}</highlight> has been cleared.",
"de_DE": "Die Einstellung <highlight>{setting}</highlight> wurde geleert."
},
"set_new": {
"en_US": "Setting <highlight>{setting}</highlight> has been set to {value}.",
"de_DE": "Die Einstellung <highlight>{setting}</highlight> hat nun den Wert: {value}"
},
"setting_not_found": {
"en_US": "Could not find setting <highlight>{setting}</highlight>.",
"de_DE": "Die Einstellung <highlight>{setting}</highlight> wurde nicht gefunden."
},
"current_value": {
"en_US": "Current Value: <highlight>{value}</highlight>\n",
"de_DE": "Aktueller Wert: <highlight>{value}</highlight>\n"
},
"description": {
"en_US": "Description: <highlight>{desc}</highlight>\n\n",
"de_DE": "Beschreibung: <highlight>{desc}</highlight>\n\n"
},
"setting": {
"en_US": "Setting ({setting})",
"de_DE": "Einstellung ({setting})"
},
"settinglist_title": {
"en_US": "Settings ({count})",
"de_DE": "Einstellungen ({count})"
},
"unknown_event": {
"en_US": "Unknown event type <highlight>{type}</highlight>.",
"de_DE": "Unbekannter Eventtyp: <highlight>{type}</highlight>"
},
"event_enable_fail": {
"en_US": "Handler <highlight>{handler}</highlight> type <highlight>{type}</highlight> already has the desired type."
},
"event_enable_success": {
"en_US": "Event type <highlight>{type}</highlight> for handler <highlight>{handler}</highlight> has been <highlight>{changedto}</highlight> successfully.",
"de_DE": "Für den Eventtyp <highlight>{type}</highlight> wurde der handler <highlight>{handler}</highlight> erfolgreich <highlight>{changedto}</highlight>."
},
"event_manual": {
"en_US": "Only <highlight>timer</highlight> events can be run manually.",
"de_DE": "Es können nur <highlight>timer</highlight> Events manuell angestoßen werden."
},
"blob_events": {
"en_US": "Events ({amount})",
"de_DE": "Events ({amount})"
},
"hidden": {
"en_US": "hidden",
"de_DE": "versteckt"
},
"include_hidden_events": {
"en_US": "Include hidden events",
"de_DE": "Versteckte Events anzeigen"
}
}
@@ -0,0 +1,82 @@
from core.chat_blob import ChatBlob
from core.command_param_types import NamedParameters, Const
from core.db import DB
from core.decorators import instance, command
from core.text import Text
from core.translation_service import TranslationService
@instance()
class CommandListController:
def inject(self, registry):
self.db: DB = registry.get_instance("db")
self.text: Text = registry.get_instance("text")
self.command_service = registry.get_instance("command_service")
self.ts: TranslationService = registry.get_instance("translation_service")
self.getresp = self.ts.get_response
@command(command="config", params=[Const("cmdlist"), NamedParameters(["access_level"])], access_level="admin",
description="List all commands")
def config_cmdlist_cmd(self, _, _1, named_params):
sql = "SELECT access_level, channel, enabled, command, module, sub_command FROM command_config"
params = []
if named_params.access_level:
sql += " WHERE access_level = ?"
params.append(named_params.access_level)
sql += " ORDER BY module, command, sub_command, channel"
data = self.db.query(sql, params)
blob = ""
current_module = ""
current_command_key = ""
count = 0
temp_rows = []
for row in data:
if current_module != row.module:
if temp_rows:
blob += self.display_row_data(temp_rows)
temp_rows = []
blob += "\n<pagebreak><header2>%s</header2>\n" % row.module
current_module = row.module
current_command_key = ""
command_key = self.command_service.get_command_key(row.command, row.sub_command)
if current_command_key != command_key:
if temp_rows:
blob += self.display_row_data(temp_rows)
temp_rows = []
count += 1
blob += "%s - " % (self.text.make_tellcmd(command_key, "config cmd " + command_key))
current_command_key = command_key
temp_rows.append(row)
if temp_rows:
blob += self.display_row_data(temp_rows)
return ChatBlob(self.getresp("module/config", "cmdlist_commands", {"amount": count}), blob)
def display_row_data(self, rows):
return "[%s %s]\n" % (self.get_enabled_str(rows), self.get_access_levels_str(rows))
def get_access_levels_str(self, rows):
access_levels = list(map(lambda x: x.access_level, rows))
if all(x == access_levels[0] for x in access_levels):
return access_levels[0]
else:
return ",".join(access_levels)
def get_enabled_str(self, rows):
enabled = list(map(lambda x: x.enabled, rows))
blob = ""
if all(x == enabled[0] for x in enabled):
blob += self.format_enabled(enabled[0])
else:
for x in enabled:
blob += self.format_enabled(x)
return blob
def format_enabled(self, enabled):
return "<green>E</green>" if enabled else "<red>D</red>"
@@ -0,0 +1,143 @@
from core.chat_blob import ChatBlob
from core.command_param_types import Const, Any, Options
from core.db import DB
from core.decorators import instance, command
from core.text import Text
from core.translation_service import TranslationService
@instance()
class ConfigCommandController:
def inject(self, registry):
self.db: DB = registry.get_instance("db")
self.text: Text = registry.get_instance("text")
self.access_service = registry.get_instance("access_service")
self.command_service = registry.get_instance("command_service")
self.ts: TranslationService = registry.get_instance("translation_service")
self.getresp = self.ts.get_response
@command(command="config",
params=[Const("cmd"), Any("cmd_name"), Options(["enable", "disable"]), Any("channel")],
access_level="admin",
description="Enable or disable a command")
def config_cmd_status_cmd(self, _, _1, cmd_name, action, cmd_channel):
cmd_name = cmd_name.lower()
action = action.lower()
cmd_channel = cmd_channel.lower()
command_str, sub_command_str = self.command_service.get_command_key_parts(cmd_name)
enabled = 1 if action == "enable" else 0
if cmd_channel != "all" and not self.command_service.is_command_channel(cmd_channel):
return self.getresp("module/config", "cmd_unknown_channel", {"channel": cmd_channel})
sql = "UPDATE command_config SET enabled = ? WHERE command = ? AND sub_command = ?"
params = [enabled, command_str, sub_command_str]
if cmd_channel != "all":
sql += " AND channel = ?"
params.append(cmd_channel)
count = self.db.exec(sql, params)
if count == 0:
return self.getresp("module/config", "cmd_unknown_for_channel", {"channel": cmd_channel, "cmd": cmd_name})
else:
action = self.getresp("module/config", "enabled_low" if action == "enable" else "disabled_low")
if cmd_channel == "all":
return self.getresp("module/config", "cmd_toggle_success", {"cmd": cmd_name, "changedto": action})
else:
return self.getresp("module/config", "cmd_toggle_channel_success",
{"channel": cmd_channel, "cmd": cmd_name, "changedto": action})
@command(command="config",
params=[Const("cmd"), Any("cmd_name"), Const("access_level"), Any("channel"), Any("access_level")],
access_level="admin",
description="Change access_level for a command")
def config_cmd_access_level_cmd(self, _, _1, cmd_name, _2, cmd_channel, access_level):
cmd_name = cmd_name.lower()
cmd_channel = cmd_channel.lower()
access_level = access_level.lower()
command_str, sub_command_str = self.command_service.get_command_key_parts(cmd_name)
if cmd_channel != "all" and not self.command_service.is_command_channel(cmd_channel):
return self.getresp("module/config", "cmd_unknown_channel", {"channel": cmd_channel})
if self.access_service.get_access_level_by_label(access_level) is None:
return self.getresp("module/config", "unknown_accesslevel", {"al": access_level})
sql = "UPDATE command_config SET access_level = ? WHERE command = ? AND sub_command = ?"
params = [access_level, command_str, sub_command_str]
if cmd_channel != "all":
sql += " AND channel = ?"
params.append(cmd_channel)
count = self.db.exec(sql, params)
if count == 0:
return self.getresp("module/config", "cmd_unknown_for_channel", {"channel": cmd_channel, "cmd": cmd_name})
else:
if cmd_channel == "all":
return self.getresp("module/config", "set_accesslevel_success", {"cmd": cmd_name, "al": access_level})
else:
return self.getresp("module/config", "set_accesslevel_fail",
{"channel": cmd_channel, "cmd": cmd_name, "al": access_level})
@command(command="config", params=[Const("cmd"), Any("cmd_name")], access_level="admin",
description="Show command configuration")
def config_cmd_show_cmd(self, _, _1, cmd_name):
cmd_name = cmd_name.lower()
command_str, sub_command_str = self.command_service.get_command_key_parts(cmd_name)
blob = ""
for command_channel, channel_label in self.command_service.channels.items():
cmd_configs = self.command_service.get_command_configs(command=command_str,
sub_command=sub_command_str,
channel=command_channel,
enabled=None)
if len(cmd_configs) > 0:
cmd_config = cmd_configs[0]
status = self.getresp("module/config", "enabled_high" if cmd_config.enabled == 1 else "disabled_high")
blob += "<header2>%s</header2> %s (%s: %s)\n" % (channel_label, status,
self.getresp("module/config", "access_level"),
cmd_config.access_level.capitalize())
# show status config
blob += "Status:"
enable_link = self.text.make_tellcmd(self.getresp("module/config", "enable"),
"config cmd %s enable %s"
% (cmd_name, command_channel))
disable_link = self.text.make_tellcmd(self.getresp("module/config", "disable"),
"config cmd %s disable %s"
% (cmd_name, command_channel))
blob += " " + enable_link + " " + disable_link
# show access level config
blob += "\n" + self.getresp("module/config", "access_level")
for access_level in self.access_service.access_levels:
# skip "None" access level
if access_level["level"] == 0:
continue
label = access_level["label"]
link = self.text.make_tellcmd(label.capitalize(),
f"config cmd {cmd_name} access_level {command_channel} {label}")
blob += " " + link
blob += "\n\n"
if blob:
sub_commands = self.get_sub_commands(command_str, sub_command_str)
if sub_commands:
blob += "<header2>Subcommands</header2>\n"
for row in sub_commands:
command_name = self.command_service.get_command_key(row.command, row.sub_command)
blob += self.text.make_tellcmd(command_name, f"config cmd {command_name}") + "\n\n"
# include help text
blob += "\n\n".join(map(lambda handler: handler["help"], self.command_service.get_handlers(cmd_name)))
return ChatBlob("Command (%s)" % cmd_name, blob)
else:
return self.getresp("module/config", "no_cmd", {"cmd": cmd_name})
def get_sub_commands(self, command_str, sub_command_str):
return self.db.query("SELECT DISTINCT command, sub_command FROM command_config "
"WHERE command = ? AND sub_command != ?",
[command_str, sub_command_str])
+193
View File
@@ -0,0 +1,193 @@
import hjson
from core.chat_blob import ChatBlob
from core.command_param_types import Const, Any, Options, NamedFlagParameters
from core.db import DB
from core.decorators import instance, command
from core.text import Text
from core.translation_service import TranslationService
# noinspection SqlCaseVsIf
@instance()
class ConfigController:
def inject(self, registry):
self.db: DB = registry.get_instance("db")
self.text: Text = registry.get_instance("text")
self.command_service = registry.get_instance("command_service")
self.event_service = registry.get_instance("event_service")
self.setting_service = registry.get_instance("setting_service")
self.config_events_controller = registry.get_instance("config_events_controller")
self.ts: TranslationService = registry.get_instance("translation_service")
self.getresp = self.ts.get_response
def start(self):
self.ts.register_translation("module/config", self.load_config_msg)
def load_config_msg(self):
with open("modules/core/config/config.msg", mode="r", encoding="UTF-8") as f:
return hjson.load(f)
@command(command="config", params=[], access_level="admin",
description="Show configuration options for the bot")
def config_list_cmd(self, _):
sql = """SELECT
module,
SUM(CASE WHEN enabled = 1 THEN 1 ELSE 0 END) count_enabled,
SUM(CASE WHEN enabled = 0 THEN 1 ELSE 0 END) count_disabled
FROM
(SELECT module, enabled FROM command_config
UNION
SELECT module, enabled FROM event_config WHERE is_hidden = 0
UNION
SELECT module, 2 FROM setting) t
GROUP BY
module
ORDER BY
module"""
data = self.db.query(sql)
count = len(data)
blob = ""
current_group = ""
for row in data:
parts = row.module.split(".")
group = parts[0]
module = parts[1]
if group != current_group:
current_group = group
blob += "\n<header2>" + current_group + "</header2>\n"
blob += self.text.make_tellcmd(module, "config mod " + row.module) + " "
if row.count_enabled > 0 and row.count_disabled > 0:
blob += self.getresp("module/config", "partial")
else:
blob += f"[{'<green>Enabled</green>' if row.count_disabled == 0 else '<red>Disabled</red>'}]"
blob += "\n"
return ChatBlob(self.getresp("module/config", "config", {"count": count}), blob)
@command(command="config",
params=[Options(["mod", "module"]), Any("module_name"), NamedFlagParameters(["include_hidden_events"])],
access_level="admin",
description="Show configuration options for a specific module")
def config_module_list_cmd(self, _, _1, module, named_params):
module = module.lower()
blob = ""
data = self.db.query("SELECT name FROM setting WHERE module = ? ORDER BY name", [module])
if data:
blob += self.getresp("module/config", "settings")
for row in data:
setting = self.setting_service.get(row.name)
blob += "%s: %s (%s)\n" % (setting.get_description(), setting.get_display_value(),
self.text.make_tellcmd("change", "config setting " + row.name))
data = self.db.query(
"SELECT DISTINCT command, sub_command FROM command_config WHERE module = ? ORDER BY command", [module])
if data:
blob += self.getresp("module/config", "commands")
for row in data:
command_key = self.command_service.get_command_key(row.command, row.sub_command)
blob += self.text.make_tellcmd(command_key, "config cmd " + command_key) + "\n"
blob += self.format_events(self.get_events(module, False), self.getresp("module/config", "events"))
if named_params.include_hidden_events:
blob += self.format_events(self.get_events(module, True), self.getresp("module/config", "hidden_events"))
if blob:
if not named_params.include_hidden_events:
blob += "\n" + self.text.make_tellcmd(self.getresp("module/config", "include_hidden_events"),
f"config mod {module} --include_hidden_events")
return ChatBlob(self.getresp("module/config", "mod_title", {"mod": module}), blob)
else:
return self.getresp("module/config", "mod_not_found", {"mod": module})
@command(command="config", params=[Const("settinglist")], access_level="admin",
description="List all settings")
def config_settinglist_cmd(self, _, _1):
blob = ""
data = self.db.query("SELECT * FROM setting ORDER BY module, name")
count = len(data)
if data:
blob += self.getresp("module/config", "settings")
current_module = ""
for row in data:
if row.module != current_module:
current_module = row.module
blob += "\n<pagebreak><header2>%s</header2>\n" % row.module
setting = self.setting_service.get(row.name)
blob += "%s: %s (%s)\n" % (setting.get_description(),
setting.get_display_value(),
self.text.make_tellcmd("change", "config setting " + row.name))
return ChatBlob(self.getresp("module/config", "settinglist_title", {"count": count}), blob)
@command(command="config", params=[Const("setting"), Any("setting_name"), Options(["set", "clear"]),
Any("new_value", is_optional=True)], access_level="admin",
description="Change a setting value")
def config_setting_update_cmd(self, _, _1, setting_name, op, new_value):
setting_name = setting_name.lower()
if op == "clear":
new_value = ""
elif not new_value:
return self.getresp("module/config", "no_new_value")
setting = self.setting_service.get(setting_name)
if setting:
setting.set_value(new_value)
if op == "clear":
return self.getresp("module/config", "set_clr", {"setting": setting_name})
else:
return self.getresp("module/config", "set_new", {"setting": setting_name,
"value": setting.get_display_value()})
else:
return self.getresp("module/config", "setting_not_found", {"setting": setting_name})
@command(command="config", params=[Const("setting"), Any("setting_name")], access_level="admin",
description="Show configuration options for a setting")
def config_setting_show_cmd(self, _, _1, setting_name):
setting_name = setting_name.lower()
blob = ""
setting = self.setting_service.get(setting_name)
if setting:
blob += self.getresp("module/config", "current_value", {"value": str(setting.get_display_value())})
blob += self.getresp("module/config", "description", {"desc": setting.get_description()})
if setting.get_extended_description():
blob += setting.get_extended_description() + "\n\n"
blob += setting.get_display()
return ChatBlob(self.getresp("module/config", "setting", {"setting": setting_name}), blob)
else:
return self.getresp("module/config", "setting_not_found", {"setting": setting_name})
def get_events(self, module, is_hidden):
return self.db.query("SELECT event_type, event_sub_type, handler, description, enabled, is_hidden "
f"FROM event_config WHERE module = ? AND is_hidden = ? "
"ORDER BY is_hidden, event_type, handler",
[module, 1 if is_hidden else 0])
def format_events(self, data, title):
blob = ""
if data:
blob += f"\n<header2>{title}</header2>\n"
for row in data:
event_type_key = self.event_service.get_event_type_key(row.event_type, row.event_sub_type)
enabled = self.getresp("module/config", "enabled_high" if row.enabled == 1 else "disabled_high")
blob += f"{self.config_events_controller.format_event_type(row)} - {row.description} [{enabled}]"
blob += " " + self.text.make_tellcmd("On", "config event %s %s enable" % (event_type_key, row.handler))
blob += " " + self.text.make_tellcmd("Off",
"config event %s %s disable" % (event_type_key, row.handler))
if row.event_type == "timer":
blob += " " + self.text.make_tellcmd("Run Now",
"config event %s %s run" % (event_type_key, row.handler))
blob += "\n"
return blob
@@ -0,0 +1,110 @@
import time
from core.chat_blob import ChatBlob
from core.command_param_types import Const, Any, Options, NamedParameters
from core.db import DB
from core.decorators import instance, command
from core.text import Text
from core.translation_service import TranslationService
@instance()
class ConfigEventsController:
def inject(self, registry):
self.db: DB = registry.get_instance("db")
self.text: Text = registry.get_instance("text")
self.command_service = registry.get_instance("command_service")
self.event_service = registry.get_instance("event_service")
self.setting_service = registry.get_instance("setting_service")
self.ts: TranslationService = registry.get_instance("translation_service")
self.getresp = self.ts.get_response
@command(command="config",
params=[Const("event"), Any("event_type"), Any("event_handler"), Options(["enable", "disable"])],
access_level="admin",
description="Enable or disable an event")
def config_event_status_cmd(self, _, _1, event_type, event_handler, action):
event_type = event_type.lower()
event_handler = event_handler.lower()
action = action.lower()
event_base_type, event_sub_type = self.event_service.get_event_type_parts(event_type)
enabled = 1 if action == "enable" else 0
if not self.event_service.is_event_type(event_base_type):
return self.getresp("module/config", "unknown event", {"type", event_type})
count = self.event_service.update_event_status(event_base_type, event_sub_type, event_handler, enabled)
if count == 0:
return self.getresp("module/config", "event_enable_fail", {"type": event_type, "handler": event_handler})
else:
action = self.getresp("module/config", "enabled_high" if action == "enable" else "disabled_high")
return self.getresp("module/config", "event_enable_success", {"type": event_type,
"handler": event_handler,
"changedto": action})
@command(command="config",
params=[Const("event"), Any("event_type"), Any("event_handler"), Const("run")],
access_level="admin",
description="Execute a timed event immediately")
def config_event_run_cmd(self, _, _1, event_type, event_handler, _2):
event_type = event_type.lower()
event_handler = event_handler.lower()
event_base_type, event_sub_type = self.event_service.get_event_type_parts(event_type)
if not self.event_service.is_event_type(event_base_type):
return self.getresp("module/config", "unknown event", {"type", event_type})
row = self.db.query_single("SELECT e.event_type, e.event_sub_type, e.handler, t.next_run FROM timer_event t "
"JOIN event_config e ON t.event_type = e.event_type AND t.handler = e.handler "
"WHERE e.event_type = ? AND e.event_sub_type = ? AND e.handler LIKE ?",
[event_base_type, event_sub_type, event_handler])
if not row:
return self.getresp("module/config", "event_enable_fail", {"type": event_type, "handler": event_handler})
elif row.event_type != "timer":
return self.getresp("module/config", "event_manual")
else:
self.event_service.execute_timed_event(row, int(time.time()))
action = self.getresp("module/config", "run")
return self.getresp("module/config", "event_enable_success", {"type": event_type,
"handler": event_handler,
"changedto": action})
@command(command="config", params=[Const("eventlist"), NamedParameters(["event_type"])], access_level="admin",
description="List all events")
def config_eventlist_cmd(self, _, _1, named_params):
params = []
sql = "SELECT module, event_type, event_sub_type, handler, description, enabled, is_hidden FROM event_config"
if named_params.event_type:
sql += " WHERE event_type = ?"
params.append(named_params.event_type)
sql += " ORDER BY module, is_hidden, event_type, event_sub_type, handler"
data = self.db.query(sql, params)
blob = "Asterisk (*) denotes a hidden event. Only change these events if you understand the implications.\n"
current_module = ""
for row in data:
if current_module != row.module:
blob += "\n<pagebreak><header2>%s</header2>\n" % row.module
current_module = row.module
event_type_key = self.format_event_type(row)
on_link = self.text.make_tellcmd("On", "config event %s %s enable" % (event_type_key, row.handler))
off_link = self.text.make_tellcmd("Off", "config event %s %s disable" % (event_type_key, row.handler))
if row.is_hidden == 1:
blob += "*"
blob += f"{event_type_key} [{self.format_enabled(row.enabled)}] {on_link} {off_link} - {row.description}\n"
return ChatBlob(self.getresp("module/config", "blob_events", {"amount": len(data)}), blob)
def format_enabled(self, enabled):
return "<green>E</green>" if enabled else "<red>D</red>"
def format_event_type(self, row):
if row.event_sub_type:
return row.event_type + ":" + row.event_sub_type
else:
return row.event_type