import time from core.command_alias_service import CommandAliasService from core.decorators import instance, command, event from core.dict_object import DictObject from core.job_scheduler import JobScheduler from core.logger import Logger from core.setting_service import SettingService from core.setting_types import BooleanSettingType from core.text import Text from core.igncore import Tyrbot from core.util import Util from modules.standard.datanet.ws_controller import WebsocketRelayController @instance() class WorldBossController: # Timers are provided through an local websocket relay, which gets fed by an external API; # If you intend to take advantage of this module, # you **will** need to contact the API host of your choice to whitelist # The IP Addresses with which you intend to access the API. # example timer data: # [{"name":"Tarasque","time": }, # {"name":"Vizaresh","time": }] timer_data = [] alerts = [480 * 60, 360 * 60, 240 * 60, 120 * 60, 60 * 60, 60 * 15, 60 * 5, 60 * 3, 60 * 2, 60, 30, 15, 10, 5, 4, 3, 2, 1, 0] jobs = [] def inject(self, registry): self.logger = Logger(__name__) self.bot: Tyrbot = registry.get_instance("bot") self.text: Text = registry.get_instance("text") self.util: Util = registry.get_instance("util") self.command_alias_service: CommandAliasService = registry.get_instance("command_alias_service") self.job_scheduler: JobScheduler = registry.get_instance("job_scheduler") self.setting_service: SettingService = registry.get_instance("setting_service") def pre_start(self): self.setting_service.register(self.module_name, 'timer_spam', True, BooleanSettingType(), "should timers be spammed") @event(WebsocketRelayController.WS_RELAY, "save most current timers") def get_timer(self, _, data): if data.type == "timer": self.timer_data = data.payload def test(test_data): if test_data: for job in self.jobs: if job["name"] == test_data.name: return self.job_scheduler.delayed_job(self.timer_alert, 2, test_data) if self.setting_service.get_value("timer_spam") == "1": for row in self.timer_data: data = self.get_spawn(row) test(data) def get_spawn(self, timer): timer = DictObject(timer) if timer.name == "Loren Warr": data = self.calc_spawn_mortal(timer.time, 9 * 60 * 60, 15 * 60) elif timer.name == "Tarasque": data = self.calc_spawn_mortal(timer.time, 9 * 60 * 60, 30 * 60) elif timer.name == "The Hollow Reaper": data = self.calc_spawn_mortal(timer.time, 9 * 60 * 60, 15 * 60) elif timer.name == "Vizaresh": data = self.calc_spawn_mortal(timer.time, 17 * 60 * 60, 6 * 60) else: return None if not data: return if data.spawn < time.time(): if data.mortal > time.time(): return DictObject( {'name': timer.name, 'type': 'mortal', 'time': data.mortal - time.time(), 'at': data.mortal, 'data': data}) return DictObject( {'name': timer.name, 'type': 'spawn', 'time': data.spawn - time.time(), 'at': data.spawn, 'data': data}) def calc_spawn_mortal(self, last, respawn, immortal): pop = last + respawn attackable = pop + immortal now = time.time() # Mortal if immortal > attackable - now > 0: return DictObject({'spawn': pop, 'mortal': attackable}) elif immortal > attackable + respawn - now > 0: return DictObject({'spawn': pop + respawn, 'mortal': attackable + respawn}) # Spawn elif respawn > (pop - now) > 0: return DictObject({'spawn': pop, 'mortal': pop}) elif respawn > pop + respawn - now > 0: return DictObject({'spawn': attackable + respawn, 'mortal': attackable + respawn + immortal}) elif last - now > 0: return DictObject({'spawn': last - immortal, 'mortal': last}) def send_warn(self, msg): self.bot.send_private_channel_message(f"[WB] {msg}") def get_next_alert(self, duration): for alert in self.alerts: if duration - 1 > alert: return duration - alert - 1 return duration @command(command="gaunt", params=[], description="Displays the next Vizaresh pop time", access_level="member") def show_gaunt(self, _): for timer in self.timer_data: if timer['name'] == "Vizaresh": return self.show_user(timer) return "Timer not found" @command(command="loren", params=[], description="Displays the next Loren Warr pop time", access_level="member") def show_loren(self, _): for timer in self.timer_data: if timer['name'] == "Loren Warr": return self.show_user(timer) return "Timer not found" @command(command="tara", params=[], description="Displays the next Tara pop time", access_level="member") def show_tara(self, _): for timer in self.timer_data: if timer['name'] == "Tarasque": return self.show_user(timer) return "Timer not found" def show_user(self, timer): timer = self.get_spawn(timer) if not timer: return "No timers cached; please try again later." if timer.type == "mortal": return f"{timer.name} :: mortal in {self.util.format_time(timer.time)}" elif timer.type == "spawn": return f"{timer.name} :: spawn in {self.util.format_time(timer.time)}" def timer_alert(self, t, timer): if self.setting_service.get_value("timer_spam") == "0": return for row in self.timer_data: if row["name"] == timer.name: timer = self.get_spawn(row) alert_duration = self.get_next_alert(timer.at - t) if timer.at - time.time() < 1: if timer.type == "mortal": self.send_warn(f"{timer.name} :: is now mortal") self.jobs = [x for x in self.jobs if x['name'] != timer.name] elif timer.type == "spawn": self.send_warn(f"{timer.name} :: has just spawned") self.jobs = [x for x in self.jobs if x['name'] != timer.name] else: # timer.at > time.time(): if timer.type == "mortal": self.send_warn(f"{timer.name} :: mortal in {self.util.format_time(timer.time)}") elif timer.type == "spawn": if alert_duration > 60 * 2: self.send_warn( f"{timer.name} :: spawn in {self.util.format_time(timer.time)}") for row in self.timer_data: if row["name"] == timer.name: timer = self.get_spawn(row) job_id = self.job_scheduler.scheduled_job(self.timer_alert, t + alert_duration, timer) for job in self.jobs: if job['name'] == timer.name: job['id'] = job_id return self.jobs.append({'name': timer.name, 'id': job_id})