84a5933490
generified the Worldboss module; removed the spammy-section as of currently, will be readded at a later stage.
179 lines
9.0 KiB
Python
179 lines
9.0 KiB
Python
import time
|
|
|
|
from core.aochat.BaseModule import BaseModule
|
|
from core.command_alias_service import CommandAliasService
|
|
from core.command_param_types import Options, Int, Any, NamedParameters
|
|
from core.db import DB
|
|
from core.decorators import instance, command, event
|
|
from core.dict_object import DictObject
|
|
from core.event_service import EventService
|
|
from core.igncore import IgnCore
|
|
from core.lookup.pork_service import PorkService
|
|
from core.public_channel_service import PublicChannelService
|
|
from core.text import Text
|
|
from core.util import Util
|
|
from modules.standard.helpbot.playfield_controller import PlayfieldController
|
|
from modules.standard.tower.tower_events import TowerEventController
|
|
|
|
|
|
@instance()
|
|
class TowerHotController(BaseModule):
|
|
PAGE_SIZE = 30
|
|
|
|
# noinspection DuplicatedCode
|
|
|
|
def inject(self, registry):
|
|
self.bot: IgnCore = registry.get_instance("bot")
|
|
self.db: DB = registry.get_instance("db")
|
|
self.util: Util = registry.get_instance("util")
|
|
self.text: Text = registry.get_instance("text")
|
|
self.event_service: EventService = registry.get_instance("event_service")
|
|
self.pork_service: PorkService = registry.get_instance("pork_service")
|
|
self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller")
|
|
self.public_channel_service: PublicChannelService = registry.get_instance("public_channel_service")
|
|
self.command_alias_service: CommandAliasService = registry.get_instance("command_alias_service")
|
|
|
|
@event(event_type=TowerEventController.TOWER_ATTACK_EVENT, description="Mark Sites in penalty as in penalty",
|
|
is_enabled=False)
|
|
def tower_victory_event(self, _, event_data):
|
|
if event_data.attacker.org_id:
|
|
data = self.db.query(
|
|
"""SELECT pf_id, site_number, t.org_id, t.faction, penalty_until, planted, close_time
|
|
FROM towers t
|
|
LEFT JOIN all_orgs a ON a.org_id = t.org_id WHERE a.org_name = ? AND t.faction = ? AND close_time IS NOT NULL""", [event_data.attacker.org_name, event_data.attacker.faction.lower()])
|
|
blob = []
|
|
for row in data:
|
|
blob.append((((row.planted - time.time()) % 3600) + 3600+time.time(), row.pf_id, row.site_number, row.org_id))
|
|
if blob:
|
|
with self.db.lock:
|
|
with self.db.pool.get_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.executemany("UPDATE towers SET penalty_until=? WHERE pf_id=? AND site_number=? AND org_id=?", blob)
|
|
# self.db.exec("UPDATE towers SET penalty_until=? where org_id=?",
|
|
# [time.time() + 60 * 60, event_data.attacker.org_id])
|
|
|
|
@command(command="hot",
|
|
params=[Options(['tl1', 'tl2', 'tl3', 'tl4', 'tl5', 'tl6', 'tl7']), Any('faction', is_optional=True),
|
|
NamedParameters(["page"])],
|
|
access_level="member",
|
|
description="Shows hot playfields")
|
|
def hot_tl(self, _, tl, faction: str, named_params):
|
|
if faction:
|
|
if faction.startswith("--page="):
|
|
named_params = DictObject({'page': faction[7:]})
|
|
faction = None
|
|
if faction is not None and faction.lower() not in ['omni', 'clan', 'neut', 'neutral']:
|
|
return f"Unknown faction: {faction}"
|
|
tl = tl[2:]
|
|
page = int(named_params.page or "1")
|
|
offset = (page - 1) * self.PAGE_SIZE
|
|
towers = self.get_hot_sites_tl(int(tl), faction)
|
|
return self.text.format_pagination(towers, offset, page, self.formatter, f"Hot Sites TL{tl} ({len(towers)})",
|
|
f"There are no hot sites for TL <highlight>{tl}</highlight>.",
|
|
f'hot tl{tl} {faction or ""}', self.PAGE_SIZE)
|
|
|
|
def formatter(self, row, index, data):
|
|
d = {}
|
|
if index > 1:
|
|
d = data[index - 2]
|
|
status = ""
|
|
if row.status_time <= 3600:
|
|
status += f"<red>5%</red> (closes in {self.util.time_to_readable(row.status_time)})"
|
|
elif row.status_time <= (3600 * 6):
|
|
status += f"<orange>25%</orange> (closes in {self.util.time_to_readable(row.status_time)})"
|
|
else:
|
|
status += f"<green>75%</green> (opens in {self.util.time_to_readable(row.status_time - (3600 * 6))})"
|
|
if row.penalty_until > time.time():
|
|
status += f" <red>In Penalty for: {self.util.time_to_readable(row.penalty_until - time.time())}</red>"
|
|
blob = ""
|
|
if self.get_ct_type(d.get("ql", 0)) < (tl := self.get_ct_type(row.ql)):
|
|
blob += f"<notice>TL{tl}</notice><br>"
|
|
space = f"{row.short_name} x{row.site_number}"
|
|
place = "_" * (7 - len(space))
|
|
return blob + "<tab>" + self.text.make_tellcmd(space,
|
|
f'lc {row.short_name} {row.site_number}') + \
|
|
f"<black>{place}</black> QL {row.min_ql}/<highlight>{row.ql}</highlight>/{row.max_ql} - " \
|
|
f"{self.text.get_formatted_faction(row.faction, row.org_name)}, {status}\n"
|
|
|
|
@command(command="hot",
|
|
params=[Int('level', is_optional=True), Any('faction', is_optional=True), NamedParameters(["page"])],
|
|
access_level="member",
|
|
description="Shows hot playfields by level")
|
|
def hot_level(self, _, level, faction, named_params):
|
|
if faction:
|
|
if faction.startswith("--page="):
|
|
named_params = DictObject({'page': faction[7:]})
|
|
faction = None
|
|
if faction is not None and faction.lower() not in ['omni', 'clan', 'neut', 'neutral']:
|
|
return f"Unknown faction: {faction}"
|
|
if level:
|
|
if level < 0 | level > 220:
|
|
return f"Level out of range: {level}"
|
|
page = int(named_params.page or "1")
|
|
offset = (page - 1) * self.PAGE_SIZE
|
|
towers = self.get_hot_sites(level, faction)
|
|
level = f"{level}" if level else ""
|
|
faction = f"{faction} " if faction else ""
|
|
return self.text.format_pagination(towers, offset, page, self.formatter, f"Hot Towersites ({len(towers)})",
|
|
f"There are no hot sites.", f'hot {level}{faction}', self.PAGE_SIZE)
|
|
|
|
def get_hot_sites(self, level=None, faction=None):
|
|
where = ""
|
|
now = time.time() % 86400
|
|
params = [now, now, now]
|
|
if level:
|
|
where += " AND l.pvp_min <=? and pvp_max >= ? "
|
|
params.append(level)
|
|
params.append(level)
|
|
if faction:
|
|
where += " AND c.faction LIKE ? "
|
|
params.append(faction.capitalize())
|
|
data = self.db.query("SELECT a.*, f.short_name, c.org_name, b.min_ql, b.max_ql, "
|
|
"(CASE WHEN (a.close_time-?) < 0 THEN a.close_time-? +86400 ELSE a.close_time-? END) "
|
|
" AS status_time FROM tower_sites b "
|
|
"LEFT JOIN towers a ON a.pf_id = b.playfield_id AND a.site_number = b.site_number "
|
|
"LEFT JOIN level l on a.ql = l.level "
|
|
"LEFT JOIN playfields f on a.pf_id = f.id "
|
|
f"LEFT JOIN all_orgs c ON a.org_id = c.org_id WHERE close_time IS NOT NULL {where} ORDER BY a.ql",
|
|
params)
|
|
return [x for x in data if x.status_time - (3600 * 6) < 60 * 60 or x.penalty_until > time.time()]
|
|
|
|
def get_hot_sites_tl(self, tl=7, faction=None):
|
|
min_ql, max_ql = self.util.get_level_range_tl(tl)
|
|
where = ""
|
|
now = time.time() % 86400
|
|
params = [now, now, now]
|
|
where += " AND ql between ? and ? "
|
|
params.append(min_ql)
|
|
params.append(max_ql)
|
|
if faction:
|
|
where += " AND c.faction LIKE ?"
|
|
params.append("%" + faction.capitalize() + "%")
|
|
data = self.db.query("SELECT a.*, f.short_name, c.org_name, b.min_ql, b.max_ql, "
|
|
"(CASE WHEN (a.close_time-?) < 0 THEN a.close_time-? +86400 ELSE a.close_time-? END) "
|
|
" AS status_time FROM tower_sites b "
|
|
"LEFT JOIN towers a ON a.pf_id = b.playfield_id AND a.site_number = b.site_number "
|
|
"LEFT JOIN level l on a.ql = l.level "
|
|
"LEFT JOIN playfields f on a.pf_id = f.id "
|
|
f"LEFT JOIN all_orgs c ON a.org_id = c.org_id WHERE close_time IS NOT NULL {where} ORDER BY a.ql",
|
|
params)
|
|
return [x for x in data if x.status_time - (3600 * 6) < 60 * 60 or x.penalty_until > time.time()]
|
|
|
|
def get_ct_type(self, ql):
|
|
if ql == 0:
|
|
return 0
|
|
elif ql < 34:
|
|
return 1
|
|
elif ql < 82:
|
|
return 2
|
|
elif ql < 129:
|
|
return 3
|
|
elif ql < 177:
|
|
return 4
|
|
elif ql < 201:
|
|
return 5
|
|
elif ql < 226:
|
|
return 6
|
|
else:
|
|
return 7
|