import time from core.aochat.BaseModule import BaseModule from core.db import DB from core.decorators import instance, event, setting from core.igncore import IgnCore from core.job_scheduler import JobScheduler from core.setting_types import BooleanSettingType from core.text import Text from core.util import Util from modules.core.accounting.services.account_service import AccountService from modules.raidbot.tower.tower_controller import TowerController from modules.standard.helpbot.playfield_controller import PlayfieldController @instance() class TowerService(BaseModule): # For this Module to work properly you might need to # contact the API host of your choice to whitelist your IP addresses. attack_hot = [] plant = [] def inject(self, registry): self.bot: IgnCore = registry.get_instance("bot") self.db: DB = registry.get_instance("db") self.util: Util = registry.get_instance("util") self.text: Text = registry.get_instance("text") self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller") self.job_scheduler: JobScheduler = registry.get_instance("job_scheduler") self.account_service: AccountService = registry.get_instance("account_service") def pre_start(self): self.db.shared.exec("CREATE TABLE IF NOT EXISTS towers(" "pf_id int not null, " "site_number int not null," "ql int," "x_coord int not null," "y_coord int not null," "org_id int," "org_name varchar(255), " "faction varchar(32), " "close_time int," "planted int," "enabled tinyint, " "PRIMARY KEY (pf_id, site_number), " "INDEX ql(ql), INDEX close(close_time), " "INDEX planted(planted), INDEX enabled(enabled)) ENGINE MEMORY") self.db.create_view("towers") @event(event_type=TowerController.TOWER_ATTACK_EVENT, description="Track planthot", is_hidden=True) def tower_attack(self, _, event_data): if event_data.attacker.get("org_id", None): self.attack_hot.append({'org_name': event_data.attacker.org_name, 'hot': time.time() + 60 * 60}) @setting(name="tower_notify_type", value=False, description="Only notify when our orgs are involved") def tower_notify_type(self) -> BooleanSettingType: return BooleanSettingType() @event(event_type=TowerController.TOWER_VICTORY_EVENT, description="Send NW warnings") def victory(self, _, event_data): t = int(time.time()) if event_data.type == "attack": if self.tower_notify_type().get_value(): if not (event_data.winner.get("org_name", None) in self.account_service.get_org_names() or event_data.loser.org_name in self.account_service.get_org_names()): return row = self.get_last_attack(event_data.winner.faction, event_data.winner.org_name, event_data.loser.faction, event_data.loser.org_name, event_data.location.playfield.id, t) self.send_nw_warn(0, f'<{event_data.loser.faction.lower()}>' f'{event_data.loser.org_name}' f'' f' Lost their Site at {event_data.location.playfield.short_name} x{row.site} - ' f'<{event_data.winner.faction.lower()}>' f'{event_data.winner.org_name}' f' won!!') self.plant.append({'pf': row.pf_id, 'site': row.site, 'plant': time.time() + 20 * 60 - 1}) self.prepare_nw_warn(row.pf_id, row.site) elif event_data.type == "terminated": # DEBUG: terminated sites.. behave strange. # for that reason, we'll just output these events to the console, but not the log. print(event_data) field = self.db.query("SELECT * FROM towers t where t.org_name=? and t.pf_id=?", [event_data.loser.org_name, event_data.playfield.id]) if len(field) == 1: field = field[0] self.plant.append({'pf': event_data.location.playfield.id, 'site': field.site_number, 'plant': time.time() + 20 * 60 - 1}) self.send_nw_warn(0, f'<{event_data.loser.faction.lower()}>' f'{event_data.loser.org_name}' f' Lost their Site at ' f'{event_data.location.playfield.short_name} x{field.site_number}') self.prepare_nw_warn(event_data.location.playfield.id, field.site_number) return self.plant.append({'pf': event_data.location.playfield.id, 'site': f"(UKN) PO: {event_data.loser.org_name}|{event_data.loser.faction}", 'plant': time.time() + 20 * 60 - 1}) self.send_nw_warn(0, f'<{event_data.loser.faction.lower()}>' f'{event_data.loser.org_name}' f' ' f'Lost their Site in {event_data.location.playfield.long_name}') self.prepare_nw_warn(event_data.playfield.id, "(UKN)", f"(UKN) PO: {event_data.loser.org_name}|{event_data.loser.faction}") def day_time(self, day_t): if day_t > 86400: day_t -= 86400 elif day_t < 0: day_t += 86400 return day_t def get(self, where_order="", param=None): if param is None: param = [] return self.db.query(f"SELECT a.org_id, a.planted, a.close_time, a.ql, a.org_name, a.faction, " f"b.min_ql, b.max_ql, b.site_name, b.site_number, b.x_coord, b.y_coord, " f"c.id, c.long_name, c.short_name, " f"e.pvp_min, e.pvp_max " f"FROM towers a " f"INNER JOIN tower_site b ON a.site_number=b.site_number " f"LEFT JOIN playfields c ON a.pf_id = c.id " f"LEFT JOIN level e ON e.level = a.ql " f"WHERE a.pf_id = b.playfield_id and enabled = 1 " f"{where_order}", param) def get_towers_by_tl(self, tl, faction=None): min_ql, max_ql = self.util.get_level_range_tl(tl) if faction: return self.get("and ql between ? and ? and faction LIKE ? order by c.short_name", [min_ql, max_ql, faction]) return self.get("and ql between ? and ? order by c.short_name", [min_ql, max_ql]) def get_towers_all(self): return self.get("order by c.long_name", []) def get_towers_by_pf(self, pf): return self.get("and a.pf_id=? order by a.site_number", [pf]) def get_towers_by_pf_site(self, pf, site): return self.get("and a.pf_id=? and a.site_number=?", [pf, site]) def get_towers_by_org(self, org): return self.get("and a.org_id" "=? order by c.long_name", [org]) def get_towers_by_org_name(self, org): return self.get("and a.org_name=? order by c.long_name", [org]) def get_free(self): return self.get("and a.org_id IS NULL order by a.site_number", []) def get_towers_hot_tl(self, tl, faction=None): towers = self.get_towers_by_tl(tl, faction) out = [] for tower in towers: if self.is_hot(tower) in [1, 2]: out.append(tower) return out def get_towers_hot_level(self, level, faction=None): out = [] if level: if faction: towers = self.get("and pvp_min <= ? and pvp_max >= ? and faction LIKE ?", [level, level, faction]) else: towers = self.get("and pvp_min <= ? and pvp_max >= ?", [level, level]) else: if faction: towers = self.get("and faction LIKE ?", [faction]) else: towers = self.get() for tower in towers: if self.is_hot(tower) in [1, 2]: out.append(tower) return out def format_entry(self, entry, _): h3 = "" now = self.day_time(int(time.time()) % 86400) row0 = f"Site: {entry.short_name} x{entry.site_number} " \ f"[R:{entry.min_ql} - {entry.max_ql}] " \ f"[{self.text.make_tellcmd('More', f'lc {entry.short_name} {entry.site_number}')}]\n" row1 = f'UKN :: No Owner -> Unplanted\n' row2 = "" row3 = "\n" hot = self.is_hot(entry) if hot != -1: h1 = "COLD" if hot == 2: h1 = "WARHOT" for org in self.attack_hot: if org['org_name'] == entry.org_name: hot_normal = self.is_hot(entry, False) # print(hot, hot_normal, org['hot'] - now) if hot_normal == 1: h3 = f"COLD in " \ f"{self.util.format_time(self.day_time(int(entry.close_time - now)))}" if hot_normal == 0: h3 = f"COLD in {self.util.format_time(org['hot'] - now)}" elif hot == 1: h1 = 'HOT' h3 = f"COLD in {self.util.format_time(self.day_time(int(entry.close_time - now)))}" else: hot_time = self.day_time(entry.close_time - 6 * 60 * 60) h3 = f"HOT in " \ f"{self.util.format_time((18 * 80 * 60 - hot_time) if hot_time > 18 * 60 * 60 else hot_time)}" org = f"<{entry.faction.lower()}>{entry.org_name} " \ f"[{self.text.make_tellcmd('View org', f'lc org {entry.org_name}')}]" row1 = f"{h1} :: {h3} :: {org} :: \n" row3 = f" » Planted: {self.util.format_datetime(entry.planted)}\n\n" if entry.pvp_min: pvp = f"[{entry.pvp_min} - {entry.pvp_max}]" else: pvp = f"[175 - 220]" # noinspection LongLine row2 = f" » QL: {entry.ql} PvP: {pvp} " \ f"[{self.text.make_chatcmd(f'{entry.x_coord} x {entry.y_coord}', f'/waypoint {entry.x_coord} {entry.y_coord} {entry.id}')}]\n" return row0 + row1 + row2 + row3 + "" def is_hot(self, entry, with_war=True) -> int: if entry.get("close_time", None): now = self.day_time((time.time()) % 86400) self.attack_hot.sort(key=lambda k: k['org_name']) rem = [] inside = False for index, i in enumerate(self.attack_hot): if i['hot'] < time.time(): rem.append(index) continue if i['org_name'] == entry.org_name: inside = True for index in reversed(rem): self.attack_hot.pop(index) if inside and with_war: return 2 if self.day_time(entry.close_time - int(now)) > 6 * 60 * 60: return 0 return 1 return -1 def get_last_attack(self, att_faction, att_org_name, def_faction, def_org_name, playfield_id, t): last_updated = t - (8 * 3600) is_finished = 1 sql = """ SELECT b.id AS battle_id, a.id AS attack_id, b.playfield_id as pf_id, b.site_number as site FROM tower_battle b JOIN tower_attacker a ON a.tower_battle_id = b.id WHERE a.att_faction = ? AND a.att_org_name = ? AND b.def_faction = ? AND b.def_org_name = ? AND b.playfield_id = ? AND b.is_finished = ? AND b.last_updated >= ? ORDER BY last_updated DESC LIMIT 1""" return self.db.query_single(sql, [att_faction, att_org_name, def_faction, def_org_name, playfield_id, is_finished, last_updated]) def prepare_nw_warn(self, pf_id, site, bonus=""): pf = self.playfield_controller.get_playfield_by_id(pf_id) site = f"{pf.short_name} » x{site}" + bonus self.job_scheduler.delayed_job(self.send_nw_warn, 0, f"{site} plantable in 20 minutes!") self.job_scheduler.delayed_job(self.send_nw_warn, 10 * 60 - 1, f"{site} plantable in 10 minutes!") self.job_scheduler.delayed_job(self.send_nw_warn, 15 * 60 - 1, f"{site} plantable in 5 minutes!") self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 - 1, f"{site} plantable in 1 minute!") self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 30 - 1, f"{site} plantable in 30 seconds!") self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 45 - 1, f"{site} plantable in 15 seconds!") self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 50 - 1, f"{site} plantable in 10 seconds!") self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 55 - 1, f"{site} plantable in 5 seconds!") self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 56 - 1, f"{site} plantable in 4 seconds!") self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 57 - 1, f"{site} plantable in 3 seconds!") self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 58 - 1, f"{site} plantable in 2 seconds!") self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 59 - 1, f"{site} plantable in 1 second!") self.job_scheduler.delayed_job(self.send_nw_warn, 20 * 60 - 1, f"{site} plantable NOW!") def send_nw_warn(self, _, msg): self.bot.send_private_channel_message(f"[NW] {msg}")