import time from datetime import datetime import pytz from core.chat_blob import ChatBlob from core.command_alias_service import CommandAliasService from core.command_param_types import Any, Int, Const from core.db import DB from core.decorators import instance, command, event, setting from core.event_service import EventService from core.igncore import IgnCore from core.lookup.pork_service import PorkService from core.public_channel_service import PublicChannelService from core.setting_types import BooleanSettingType from core.text import Text from core.util import Util from modules.standard.helpbot.playfield_controller import PlayfieldController from modules.standard.tower.tower_attack_controller import TowerAttackController from modules.standard.tower.tower_events import TowerEventController # legacy(0), EU - friendly(1) or US - friendly(2) => timing FIXED_TIMES = {1: 4, 2: 20} @instance() class LandController: def inject(self, registry): self.bot: IgnCore = registry.get_instance("bot") self.db: DB = registry.get_instance("db") self.text: Text = registry.get_instance("text") self.util: Util = registry.get_instance("util") self.event_service: EventService = registry.get_instance("event_service") self.pork_service: PorkService = registry.get_instance("pork_service") self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller") self.public_channel_service: PublicChannelService = registry.get_instance("public_channel_service") self.command_alias_service: CommandAliasService = registry.get_instance("command_alias_service") self.tac: TowerAttackController = registry.get_instance("tower_attack_controller") def pre_start(self): self.command_alias_service.add_alias('towers', "lc") self.command_alias_service.add_alias('tower', "lc") self.command_alias_service.add_alias('lca', "lc") self.db.load_sql_file(self.module_dir + "/" + "tower_sites.sql", pre_optimized=True) self.db.shared.exec("CREATE TABLE IF NOT EXISTS `towers` (" "`tower_id` INT(11) NOT NULL," "`pf_id` INT(11) NOT NULL," "`site_number` INT(11) NOT NULL," "`x_coord` INT(11) NOT NULL," "`y_coord` INT(11) NOT NULL," "`high_id` INT(11) NOT NULL," "`ql` INT(11) NOT NULL," "`org_id` INT(11)," "`faction` VARCHAR(11)," "`planted` INT(11)," "`close_time` INT(11)," "`penalty_until` INT(11)," "PRIMARY KEY (`tower_id`) USING HASH," "INDEX `site` (`pf_id`, `site_number`, `org_id`, `close_time`, `penalty_until`) USING HASH," "INDEX `tower` (`planted`, `ql`, `x_coord`, `y_coord`) USING HASH) " "COLLATE='utf8mb4_general_ci' ENGINE=MEMORY;") self.db.create_view("towers") @setting(name="lc_cmd_full", value="false", description="Toggle the verbosity of !lc commands") def lc_full(self) -> BooleanSettingType: return BooleanSettingType() @event(event_type=TowerEventController.TOWER_VICTORY_EVENT, description="Purge sites which got wiped from DB", is_enabled=False) def tower_victory_event(self, _, event_data): row = self.tac.get_last_attack(event_data.winner.faction, event_data.winner.org_name, event_data.loser.faction, event_data.loser.org_name, event_data.location.playfield.id, time.time()) if row: self.db.exec("DELETE FROM towers where pf_id=? and site_number=?", [row.pf_id, row.site]) @command(command="lc", params=[], access_level="member", description="See a list of playfields containing land control tower sites") def lc_list_cmd(self, request): data = self.db.query( "SELECT id, long_name, short_name FROM playfields " "WHERE id IN (SELECT DISTINCT playfield_id FROM tower_sites) ORDER BY short_name") blob = "" for row in data: blob += f"[{row.id:d}] {self.text.make_tellcmd(row.long_name, f'lc {row.short_name}')} {row.short_name}\n" return ChatBlob(f"Land Control Playfields ({len(data):d})", blob) @command(command="lc", params=[Const("org"), Any("search")], access_level="member", description="See a list of land control tower sites in a particular playfield") def sites_org_cmd(self, request, _, search): if search.isdigit(): org_id = int(search) result = search else: orgs = self.find_orgs(search) num_orgs = len(orgs) if num_orgs == 0: char_info = self.pork_service.get_character_info(search) if char_info: if not char_info.org_id: return f"{search.capitalize()} does not appear to belong to an org." else: org_id = char_info.org_id result = char_info.org_name else: return f"Character or org {search} does not own any sites." elif num_orgs == 1: result = orgs[0].org_name org_id = orgs[0].org_id else: blob = "" for org in orgs: blob += self.text.make_tellcmd(f"{org.org_name} ({org.org_id})", f"lc org {org.org_id}") + "\n" return ChatBlob(f"Orgs matching your search criteria ({num_orgs})", blob) data = self.get_towers_by_org(org_id) if not data: return f"The org {search} does not own any sites or does not exist." blob = "" ql = 0 for x in data: blob += f"{self.format_site_info(x, time.time(), len(data))}\n" ql += x.ql blob += f"Stats: QL{ql}, contracts up to QL{ql * 2}" return ChatBlob(f"Sites owned by {result}", blob) @command(command="lc", params=[Any("playfield"), Int("site_number", is_optional=True)], access_level="member", description="See a list of land control tower sites in a particular playfield") def lc_playfield_cmd(self, request, playfield_name, site_number): playfield = self.playfield_controller.get_playfield_by_name_or_id(playfield_name) if not playfield: return f"Could not find playfield {playfield_name}." data = self.get_towers(playfield.id, site_number) if not data: return f"The Site {site_number} does not exist in playfield {playfield_name}." blob = "" t = int(time.time()) if site_number: data = self.get_towers(playfield.id, site_number) blob += "" + self.format_site_info(data, t) else: for row in data: blob += f"{self.format_site_info(row, t, len(data))}\n" if site_number: title = f"Tower Info: {playfield.long_name} x{site_number}" else: title = f"Tower Info: {playfield.long_name} ({len(data)})" return ChatBlob(title, blob) @command(command="free", params=[], access_level="member", description="Shows potentially free towerfields") def free(self, _, ): blob = "" data = self.get_free() if not data: return "Currently there are no free sites. Go kill some." for row in data: blob += f"{self.format_site_info(row, time.time(), len(data))}\n" return ChatBlob(f"FREE Towersites ({len(data)})", blob) def format_site_info(self, row, t, count=0): data = row if count == 0 and data: row = data[0] if not row: return "Site does not exist." blob = f"{row.short_name} x{row.site_number} ({row.site_name})\n" blob += f"Level Range: {row.min_ql} - {row.max_ql} " if row.timing == 0: blob += f"[Legacy]\n" if row.timing == 1: blob += f"[04 UTC]\n" if row.timing == 2: blob += f"[20 UTC]\n" blob += f"Coordinates: " blob += self.text.make_chatcmd(f"{row.x_coord:d}x{row.y_coord:d}", f"/waypoint {row.x_coord:d} {row.y_coord:d} {row.pf_id:d}") + "\n" if row.get("org_name", None): current_day_time = t % 86400 if row.timing > 0: row.close_time = FIXED_TIMES[row.timing] * 3600 + row.planted % 3600 value = datetime.fromtimestamp(row.close_time, tz=pytz.UTC) current_status_time = row.close_time - current_day_time if current_status_time < 0: current_status_time += 86400 status = "" if current_status_time <= 3600: status += f"5% (closes in {self.util.time_to_readable(current_status_time)})" elif current_status_time <= (3600 * 6): status += f"25% (closes in {self.util.time_to_readable(current_status_time)})" else: status += f"75% (opens in {self.util.time_to_readable(current_status_time - (3600 * 6))})" if row.penalty_until > t: status += f" Penalty (for {self.util.time_to_readable(row.penalty_until - t)})" blob += f"CT: QL{row.ql} ({self.text.get_formatted_faction(row.faction, row.org_name)}) T{self.get_ct_type(row.ql)} - Planted {self.util.time_to_readable(t - row.planted)} ago\n" blob += f"Gas: {status}\n" if self.lc_full().get_value(): towers = "" cond, turret = 0, 0 if count == 0: for tower in data: if tower.name.__contains__("Turret"): turret += 1 elif tower.name.__contains__("Conductor"): cond += 1 towers += f" - QL{self.text.zfill(tower.ql, 220)} {tower.name}\n" blob += f"Dist: {cond} Conductors and {turret} Turrets\n" blob += "\n Towers:\n" blob += towers else: blob += f"Dist: {row.guards} Conductors and {row.turrets} Turrets\n" else: if not row.enabled: blob += "Disabled\n" else: blob += "This site is potentially unplanted\n" return blob def get_ct_type(self, ql): if ql < 34: return 1 elif ql < 82: return 2 elif ql < 129: return 3 elif ql < 177: return 4 elif ql < 201: return 5 elif ql < 226: return 6 else: return 7 def get_free(self): return self.db.query("""SELECT d.playfield_id AS pf_id,d.site_number, d.site_name, d.min_ql, d.max_ql, d.x_coord, d.y_coord, d.timing, d.enabled, a.tower_id, a.ql, a.close_time, a.penalty_until, a.planted, b.*, c.*, e.* FROM tower_sites d LEFT JOIN towers a on a.pf_id = d.playfield_id and a.site_number = d.site_number LEFT JOIN aodb b ON a.high_id = b.highid LEFT JOIN playfields c on d.playfield_id = c.id LEFT JOIN all_orgs e on a.org_id = e.org_id WHERE a.org_id IS NULL AND d.enabled = 1 GROUP BY d.playfield_id, d.site_number """) def get_towers(self, pf, site=None): if site: return self.db.query("""SELECT d.playfield_id AS pf_id,d.site_number, d.site_name, d.min_ql, d.max_ql, d.x_coord, d.y_coord, d.timing, d.enabled, a.tower_id, a.ql, a.close_time, a.penalty_until, a.planted, b.*, c.*, e.* FROM tower_sites d LEFT JOIN towers a on a.pf_id = d.playfield_id and a.site_number = d.site_number LEFT JOIN aodb b ON a.high_id = b.highid LEFT JOIN playfields c on d.playfield_id = c.id LEFT JOIN all_orgs e on a.org_id = e.org_id WHERE playfield_id=? AND d.site_number=? ORDER BY close_time IS NULL, ql desc""", [pf, site]) else: return self.db.query("""SELECT d.playfield_id AS pf_id, d.site_number, d.site_name, d.min_ql, d.max_ql, d.x_coord, d.y_coord, d.timing, d.enabled, a.tower_id, a.ql, a.close_time, a.penalty_until, a.planted, b.*, c.*, e.*, t2.turrets, t2.guards FROM tower_sites d LEFT JOIN towers a on a.pf_id = d.playfield_id and a.site_number = d.site_number AND a.close_time IS NOT NULL LEFT JOIN aodb b ON a.high_id = b.highid LEFT JOIN playfields c on d.playfield_id = c.id LEFT JOIN all_orgs e on a.org_id = e.org_id LEFT JOIN (SELECT COUNT(CASE WHEN name LIKE '%Turret%' THEN 1 WHEN name LIKE '%SAM Battery%' THEN 1 END) turrets, COUNT(CASE WHEN name LIKE '%Guard%' THEN 1 END) guards, site_number AS site FROM towers a LEFT JOIN aodb b ON a.high_id = b.highid WHERE a.pf_id=? GROUP BY site_number) t2 ON d.site_number = t2.site WHERE playfield_id=? GROUP BY playfield_id, site_number ORDER BY site_number, ql DESC """, [pf, pf]) def get_towers_by_org(self, org_id): return self.db.query("""SELECT * FROM (SELECT COUNT(CASE WHEN name LIKE '%Turret%' THEN 1 WHEN name LIKE '%SAM Battery%' THEN 1 END) turrets, COUNT(CASE WHEN name LIKE '%Guard%' THEN 1 END) guards, a.pf_id, a.org_id, e.org_name, c.*, d.* FROM towers a LEFT JOIN aodb b ON a.high_id = b.highid LEFT JOIN playfields c on a.pf_id = c.id LEFT JOIN tower_sites d on a.pf_id = d.playfield_id and a.site_number = d.site_number LEFT JOIN all_orgs e on a.org_id = e.org_id WHERE a.org_id=? GROUP BY a.pf_id, a.site_number ORDER BY ql, close_time IS NOT NULL) t1 LEFT JOIN (SELECT * from towers a WHERE a.org_id=? AND close_time IS NOT NULL) t2 ON t1.pf_id=t2.pf_id AND t1.site_number = t2.site_number ORDER BY ql""", [org_id, org_id]) def find_orgs(self, search): return self.db.query("SELECT DISTINCT a.org_name, a.org_id FROM all_orgs a " "LEFT JOIN towers b ON a.org_id = b.org_id WHERE org_name ? AND b.org_id IS NOT NULL", [search], extended_like=True)