import time from core.aochat.BaseModule import BaseModule from core.command_alias_service import CommandAliasService from core.command_param_types import Options, Int, Any, NamedParameters from core.db import DB from core.decorators import instance, command, event from core.dict_object import DictObject from core.event_service import EventService from core.igncore import IgnCore from core.lookup.pork_service import PorkService from core.public_channel_service import PublicChannelService from core.text import Text from core.util import Util from modules.standard.helpbot.playfield_controller import PlayfieldController from modules.standard.tower.tower_events import TowerEventController @instance() class TowerHotController(BaseModule): PAGE_SIZE = 30 # noinspection DuplicatedCode def inject(self, registry): self.bot: IgnCore = registry.get_instance("bot") self.db: DB = registry.get_instance("db") self.util: Util = registry.get_instance("util") self.text: Text = registry.get_instance("text") self.event_service: EventService = registry.get_instance("event_service") self.pork_service: PorkService = registry.get_instance("pork_service") self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller") self.public_channel_service: PublicChannelService = registry.get_instance("public_channel_service") self.command_alias_service: CommandAliasService = registry.get_instance("command_alias_service") @event(event_type=TowerEventController.TOWER_ATTACK_EVENT, description="Mark Sites in penalty as in penalty", is_enabled=False) def tower_victory_event(self, _, event_data): if event_data.attacker.org_id: data = self.db.query( """SELECT pf_id, site_number, t.org_id, t.faction, penalty_until, planted, close_time FROM towers t LEFT JOIN all_orgs a ON a.org_id = t.org_id WHERE a.org_name = ? AND t.faction = ? AND close_time IS NOT NULL""", [event_data.attacker.org_name, event_data.attacker.faction.lower()]) blob = [] for row in data: blob.append((((row.planted - time.time()) % 3600) + 3600+time.time(), row.pf_id, row.site_number, row.org_id)) if blob: with self.db.lock: with self.db.pool.get_connection() as conn: with conn.cursor() as cur: cur.executemany("UPDATE towers SET penalty_until=? WHERE pf_id=? AND site_number=? AND org_id=?".replace("?", "%s"), blob) # self.db.exec("UPDATE towers SET penalty_until=? where org_id=?", # [time.time() + 60 * 60, event_data.attacker.org_id]) @command(command="hot", params=[Options(['tl1', 'tl2', 'tl3', 'tl4', 'tl5', 'tl6', 'tl7']), Any('faction', is_optional=True), NamedParameters(["page"])], access_level="member", description="Shows hot playfields") def hot_tl(self, _, tl, faction: str, named_params): if faction: if faction.startswith("--page="): named_params = DictObject({'page': faction[7:]}) faction = None if faction is not None and faction.lower() not in ['omni', 'clan', 'neut', 'neutral']: return f"Unknown faction: {faction}" tl = tl[2:] page = int(named_params.page or "1") offset = (page - 1) * self.PAGE_SIZE towers = self.get_hot_sites_tl(int(tl), faction) return self.text.format_pagination(towers, offset, page, self.formatter, f"Hot Sites TL{tl} ({len(towers)})", f"There are no hot sites for TL {tl}.", f'hot tl{tl} {faction or ""}', self.PAGE_SIZE) def formatter(self, row, index, data): d = {} if index > 1: d = data[index - 2] status = "" if row.status_time <= 3600: status += f"5% (closes in {self.util.time_to_readable(row.status_time)})" elif row.status_time <= (3600 * 6): status += f"25% (closes in {self.util.time_to_readable(row.status_time)})" else: status += f"75% (opens in {self.util.time_to_readable(row.status_time - (3600 * 6))})" if row.penalty_until > time.time(): status += f" In Penalty for: {self.util.time_to_readable(row.penalty_until - time.time())}" blob = "" if self.get_ct_type(d.get("ql", 0)) < (tl := self.get_ct_type(row.ql)): blob += f"TL{tl}
" space = f"{row.short_name} x{row.site_number}" place = "_" * (7 - len(space)) return blob + "" + self.text.make_tellcmd(space, f'lc {row.short_name} {row.site_number}') + \ f"{place} QL {row.min_ql}/{row.ql}/{row.max_ql} - " \ f"{self.text.get_formatted_faction(row.faction, row.org_name)}, {status}\n" @command(command="hot", params=[Int('level', is_optional=True), Any('faction', is_optional=True), NamedParameters(["page"])], access_level="member", description="Shows hot playfields by level") def hot_level(self, _, level, faction, named_params): if faction: if faction.startswith("--page="): named_params = DictObject({'page': faction[7:]}) faction = None if faction is not None and faction.lower() not in ['omni', 'clan', 'neut', 'neutral']: return f"Unknown faction: {faction}" if level: if level < 0 | level > 220: return f"Level out of range: {level}" page = int(named_params.page or "1") offset = (page - 1) * self.PAGE_SIZE towers = self.get_hot_sites(level, faction) level = f"{level}" if level else "" faction = f"{faction} " if faction else "" return self.text.format_pagination(towers, offset, page, self.formatter, f"Hot Towersites ({len(towers)})", f"There are no hot sites.", f'hot {level}{faction}', self.PAGE_SIZE) def get_hot_sites(self, level=None, faction=None): where = "" now = time.time() % 86400 params = [now, now, now] if level: where += " AND l.pvp_min <=? and pvp_max >= ? " params.append(level) params.append(level) if faction: where += " AND c.faction LIKE ? " params.append(faction.capitalize()) data = self.db.query("SELECT a.*, f.short_name, c.org_name, b.min_ql, b.max_ql, " "(CASE WHEN (a.close_time-?) < 0 THEN a.close_time-? +86400 ELSE a.close_time-? END) " " AS status_time FROM tower_sites b " "LEFT JOIN towers a ON a.pf_id = b.playfield_id AND a.site_number = b.site_number " "LEFT JOIN level l on a.ql = l.level " "LEFT JOIN playfields f on a.pf_id = f.id " f"LEFT JOIN all_orgs c ON a.org_id = c.org_id WHERE close_time IS NOT NULL {where} ORDER BY a.ql", params) return [x for x in data if x.status_time - (3600 * 6) < 60 * 60 or x.penalty_until > time.time()] def get_hot_sites_tl(self, tl=7, faction=None): min_ql, max_ql = self.util.get_level_range_tl(tl) where = "" now = time.time() % 86400 params = [now, now, now] where += " AND ql between ? and ? " params.append(min_ql) params.append(max_ql) if faction: where += " AND c.faction LIKE ?" params.append("%" + faction.capitalize() + "%") data = self.db.query("SELECT a.*, f.short_name, c.org_name, b.min_ql, b.max_ql, " "(CASE WHEN (a.close_time-?) < 0 THEN a.close_time-? +86400 ELSE a.close_time-? END) " " AS status_time FROM tower_sites b " "LEFT JOIN towers a ON a.pf_id = b.playfield_id AND a.site_number = b.site_number " "LEFT JOIN level l on a.ql = l.level " "LEFT JOIN playfields f on a.pf_id = f.id " f"LEFT JOIN all_orgs c ON a.org_id = c.org_id WHERE close_time IS NOT NULL {where} ORDER BY a.ql", params) return [x for x in data if x.status_time - (3600 * 6) < 60 * 60 or x.penalty_until > time.time()] def get_ct_type(self, ql): if ql == 0: return 0 elif ql < 34: return 1 elif ql < 82: return 2 elif ql < 129: return 3 elif ql < 177: return 4 elif ql < 201: return 5 elif ql < 226: return 6 else: return 7