d3461ef462
Allowed viewing the most recent account logs by using !account log recent Allowed removing of chatbots Moved the so far hardcoded org prefixing into the DB, which allows dynamically changing them. Fix for orgrosters; org leaves are being detected now.
295 lines
15 KiB
Python
295 lines
15 KiB
Python
import time
|
|
from datetime import datetime
|
|
|
|
import pytz
|
|
|
|
from core.chat_blob import ChatBlob
|
|
from core.command_alias_service import CommandAliasService
|
|
from core.command_param_types import Any, Int, Const
|
|
from core.db import DB
|
|
from core.decorators import instance, command, event, setting
|
|
from core.event_service import EventService
|
|
from core.igncore import IgnCore
|
|
from core.lookup.pork_service import PorkService
|
|
from core.public_channel_service import PublicChannelService
|
|
from core.setting_types import BooleanSettingType
|
|
from core.text import Text
|
|
from core.util import Util
|
|
from modules.standard.helpbot.playfield_controller import PlayfieldController
|
|
from modules.standard.tower.tower_attack_controller import TowerAttackController
|
|
from modules.standard.tower.tower_events import TowerEventController
|
|
|
|
# legacy(0), EU - friendly(1) or US - friendly(2) => timing
|
|
FIXED_TIMES = {1: 4,
|
|
2: 20}
|
|
|
|
|
|
@instance()
|
|
class LandController:
|
|
def inject(self, registry):
|
|
self.bot: IgnCore = registry.get_instance("bot")
|
|
self.db: DB = registry.get_instance("db")
|
|
self.text: Text = registry.get_instance("text")
|
|
self.util: Util = registry.get_instance("util")
|
|
self.event_service: EventService = registry.get_instance("event_service")
|
|
self.pork_service: PorkService = registry.get_instance("pork_service")
|
|
self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller")
|
|
self.public_channel_service: PublicChannelService = registry.get_instance("public_channel_service")
|
|
self.command_alias_service: CommandAliasService = registry.get_instance("command_alias_service")
|
|
self.tac: TowerAttackController = registry.get_instance("tower_attack_controller")
|
|
|
|
def pre_start(self):
|
|
self.command_alias_service.add_alias('towers', "lc")
|
|
self.command_alias_service.add_alias('tower', "lc")
|
|
self.command_alias_service.add_alias('lca', "lc")
|
|
self.db.load_sql_file(self.module_dir + "/" + "tower_sites.sql", pre_optimized=True)
|
|
self.db.shared.exec("CREATE TABLE IF NOT EXISTS `towers` ("
|
|
"`tower_id` INT(11) NOT NULL,"
|
|
"`pf_id` INT(11) NOT NULL,"
|
|
"`site_number` INT(11) NOT NULL,"
|
|
"`x_coord` INT(11) NOT NULL,"
|
|
"`y_coord` INT(11) NOT NULL,"
|
|
"`high_id` INT(11) NOT NULL,"
|
|
"`ql` INT(11) NOT NULL,"
|
|
"`org_id` INT(11),"
|
|
"`faction` VARCHAR(11),"
|
|
"`planted` INT(11),"
|
|
"`close_time` INT(11),"
|
|
"`penalty_until` INT(11),"
|
|
"PRIMARY KEY (`tower_id`) USING HASH,"
|
|
"INDEX `site` (`pf_id`, `site_number`, `org_id`, `close_time`, `penalty_until`) USING HASH,"
|
|
"INDEX `tower` (`planted`, `ql`, `x_coord`, `y_coord`) USING HASH) "
|
|
"COLLATE='utf8mb4_general_ci' ENGINE=MEMORY;")
|
|
self.db.create_view("towers")
|
|
|
|
@setting(name="lc_cmd_full", value="false", description="Toggle the verbosity of !lc commands")
|
|
def lc_full(self) -> BooleanSettingType:
|
|
return BooleanSettingType()
|
|
|
|
@event(event_type=TowerEventController.TOWER_VICTORY_EVENT, description="Purge sites which got wiped from DB",
|
|
is_enabled=False)
|
|
def tower_victory_event(self, _, event_data):
|
|
row = self.tac.get_last_attack(event_data.winner.faction, event_data.winner.org_name, event_data.loser.faction,
|
|
event_data.loser.org_name, event_data.location.playfield.id, time.time())
|
|
if row:
|
|
self.db.exec("DELETE FROM towers where pf_id=? and site_number=?", [row.pf_id, row.site])
|
|
|
|
@command(command="lc", params=[], access_level="member",
|
|
description="See a list of playfields containing land control tower sites")
|
|
def lc_list_cmd(self, request):
|
|
data = self.db.query(
|
|
"SELECT id, long_name, short_name FROM playfields "
|
|
"WHERE id IN (SELECT DISTINCT playfield_id FROM tower_sites) ORDER BY short_name")
|
|
|
|
blob = ""
|
|
for row in data:
|
|
blob += f"[{row.id:d}] {self.text.make_tellcmd(row.long_name, f'lc {row.short_name}')} <highlight>{row.short_name}</highlight>\n"
|
|
|
|
return ChatBlob(f"Land Control Playfields ({len(data):d})", blob)
|
|
|
|
@command(command="lc", params=[Const("org"), Any("search")], access_level="member",
|
|
description="See a list of land control tower sites in a particular playfield")
|
|
def sites_org_cmd(self, request, _, search):
|
|
if search.isdigit():
|
|
org_id = int(search)
|
|
result = search
|
|
else:
|
|
orgs = self.find_orgs(search)
|
|
num_orgs = len(orgs)
|
|
if num_orgs == 0:
|
|
char_info = self.pork_service.get_character_info(search)
|
|
if char_info:
|
|
if not char_info.org_id:
|
|
return f"<highlight>{search.capitalize()}</highlight> does not appear to belong to an org."
|
|
else:
|
|
org_id = char_info.org_id
|
|
result = char_info.org_name
|
|
else:
|
|
return f"Character or org <highlight>{search}</highlight> does not own any sites."
|
|
elif num_orgs == 1:
|
|
result = orgs[0].org_name
|
|
org_id = orgs[0].org_id
|
|
else:
|
|
blob = ""
|
|
for org in orgs:
|
|
blob += self.text.make_tellcmd(f"{org.org_name} ({org.org_id})",
|
|
f"lc org {org.org_id}") + "\n"
|
|
return ChatBlob(f"Orgs matching your search criteria ({num_orgs})", blob)
|
|
|
|
data = self.get_towers_by_org(org_id)
|
|
if not data:
|
|
return f"The org <highlight>{search}</highlight> does not own any sites or does not exist."
|
|
blob = ""
|
|
ql = 0
|
|
for x in data:
|
|
blob += f"<pagebreak>{self.format_site_info(x, time.time(), len(data))}"
|
|
blob += f"<tab>Dist: <highlight>{x.guard}</highlight> Conductors and <highlight>{x.turrets}</highlight> Turrets planted\n\n"
|
|
ql += x.ql
|
|
blob += f"Stats: QL<highlight>{ql}</highlight>, contracts up to QL<highlight>{ql * 2}</highlight>"
|
|
return ChatBlob(f"Sites owned by {result}", blob)
|
|
|
|
@command(command="lc", params=[Any("playfield"), Int("site_number", is_optional=True)], access_level="member",
|
|
description="See a list of land control tower sites in a particular playfield")
|
|
def lc_playfield_cmd(self, request, playfield_name, site_number):
|
|
playfield = self.playfield_controller.get_playfield_by_name_or_id(playfield_name)
|
|
if not playfield:
|
|
return f"Could not find playfield <highlight>{playfield_name}</highlight>."
|
|
|
|
data = self.get_towers(playfield.id, site_number)
|
|
if not data:
|
|
return f"The Site <highlight>{site_number}</highlight> does not exist in playfield <highlight>{playfield_name}</highlight>."
|
|
|
|
blob = ""
|
|
t = int(time.time())
|
|
if site_number:
|
|
data = self.get_towers(playfield.id, site_number)
|
|
blob += "<pagebreak>" + self.format_site_info(data, t)
|
|
else:
|
|
for row in data:
|
|
blob += f"<pagebreak>{self.format_site_info(row, t, len(data))}\n"
|
|
|
|
if site_number:
|
|
title = f"Tower Info: {playfield.long_name} x{site_number}"
|
|
else:
|
|
title = f"Tower Info: {playfield.long_name} ({len(data)})"
|
|
|
|
return ChatBlob(title, blob)
|
|
|
|
@command(command="free", params=[],
|
|
access_level="member",
|
|
description="Shows potentially free towerfields")
|
|
def free(self, _, ):
|
|
blob = ""
|
|
data = self.get_free()
|
|
if not data:
|
|
return "Currently there are no free sites. Go kill some."
|
|
for row in data:
|
|
blob += f"<pagebreak>{self.format_site_info(row, time.time(), len(data))}\n"
|
|
return ChatBlob(f"FREE Towersites ({len(data)})", blob)
|
|
|
|
def format_site_info(self, row, t, count=0):
|
|
data = row
|
|
if count == 0 and data:
|
|
row = data[0]
|
|
if not row:
|
|
return "Site does not exist."
|
|
blob = f"<highlight>{row.short_name} x{row.site_number}</highlight> ({row.site_name})\n"
|
|
blob += f"<tab>Level Range: <white>{row.min_ql} - {row.max_ql}</white> "
|
|
if row.timing == 0:
|
|
blob += f"[<grey>Legacy</grey>]\n"
|
|
if row.timing == 1:
|
|
blob += f"[<grey><black>0</black>4 UTC</grey>]\n"
|
|
if row.timing == 2:
|
|
blob += f"[<grey>20 UTC</grey>]\n"
|
|
blob += f"<tab>Coordinates: "
|
|
blob += self.text.make_chatcmd(f"{row.x_coord:d}x{row.y_coord:d}",
|
|
f"/waypoint {row.x_coord:d} {row.y_coord:d} {row.pf_id:d}") + "\n"
|
|
if row.get("org_name", None):
|
|
current_day_time = t % 86400
|
|
if row.timing > 0:
|
|
row.close_time = FIXED_TIMES[row.timing] * 3600 + row.planted % 3600
|
|
value = datetime.fromtimestamp(row.close_time, tz=pytz.UTC)
|
|
current_status_time = row.close_time - current_day_time
|
|
if current_status_time < 0:
|
|
current_status_time += 86400
|
|
status = ""
|
|
if current_status_time <= 3600:
|
|
status += f"<red>5%</red> (closes in {self.util.time_to_readable(current_status_time)})"
|
|
elif current_status_time <= (3600 * 6):
|
|
status += f"<orange>25%</orange> (closes in {self.util.time_to_readable(current_status_time)})"
|
|
else:
|
|
status += f"<green>75%</green> (opens in {self.util.time_to_readable(current_status_time - (3600 * 6))})"
|
|
|
|
if row.penalty_until > t:
|
|
status += f" <red>Penalty</red> (for {self.util.time_to_readable(row.penalty_until - t)})"
|
|
blob += f"<tab>CT: QL<highlight>{row.ql}</highlight> ({self.text.get_formatted_faction(row.faction, row.org_name)}) T{self.get_ct_type(row.ql)} - Planted {self.util.time_to_readable(t - row.planted)} ago\n"
|
|
blob += f"<tab>Gas: {status}\n"
|
|
if self.lc_full().get_value():
|
|
towers = ""
|
|
cond, turret = 0, 0
|
|
if count == 0:
|
|
for tower in data:
|
|
if tower.name.__contains__("Turret"):
|
|
turret += 1
|
|
elif tower.name.__contains__("Conductor"):
|
|
cond += 1
|
|
towers += f" - QL<highlight>{self.text.zfill(tower.ql, 220)}</highlight> {tower.name}\n"
|
|
blob += f"<tab>Dist: <highlight>{cond}</highlight> Conductors and <highlight>{turret}</highlight> Turrets\n"
|
|
blob += "\n Towers:\n"
|
|
blob += towers
|
|
else:
|
|
if not row.enabled:
|
|
blob += "<tab><red>Disabled</red>\n"
|
|
else:
|
|
blob += "<tab><red>This site is potentially unplanted</red>\n"
|
|
|
|
return blob
|
|
|
|
def get_ct_type(self, ql):
|
|
if ql < 34:
|
|
return 1
|
|
elif ql < 82:
|
|
return 2
|
|
elif ql < 129:
|
|
return 3
|
|
elif ql < 177:
|
|
return 4
|
|
elif ql < 201:
|
|
return 5
|
|
elif ql < 226:
|
|
return 6
|
|
else:
|
|
return 7
|
|
|
|
def get_free(self):
|
|
return self.db.query("""SELECT d.playfield_id AS pf_id,d.site_number, d.site_name, d.min_ql, d.max_ql, d.x_coord, d.y_coord, d.timing, d.enabled, a.tower_id, a.ql, a.close_time, a.penalty_until, a.planted, b.*, c.*, e.* FROM tower_sites d
|
|
LEFT JOIN towers a on a.pf_id = d.playfield_id and a.site_number = d.site_number
|
|
LEFT JOIN aodb b ON a.high_id = b.highid
|
|
LEFT JOIN playfields c on d.playfield_id = c.id
|
|
LEFT JOIN all_orgs e on a.org_id = e.org_id
|
|
WHERE a.org_id IS NULL AND d.enabled = 1 GROUP BY d.playfield_id, d.site_number """)
|
|
|
|
def get_towers(self, pf, site=None):
|
|
if site:
|
|
return self.db.query("""SELECT d.playfield_id AS pf_id,d.site_number, d.site_name, d.min_ql, d.max_ql,
|
|
d.x_coord, d.y_coord, d.timing, d.enabled, a.tower_id, a.ql,
|
|
a.close_time, a.penalty_until, a.planted, b.*, c.*, e.*
|
|
FROM tower_sites d
|
|
LEFT JOIN towers a on a.pf_id = d.playfield_id and a.site_number = d.site_number
|
|
LEFT JOIN aodb b ON a.high_id = b.highid
|
|
LEFT JOIN playfields c on d.playfield_id = c.id
|
|
LEFT JOIN all_orgs e on a.org_id = e.org_id
|
|
WHERE playfield_id=? AND d.site_number=? ORDER BY close_time IS NULL, ql desc""",
|
|
[pf, site])
|
|
else:
|
|
return self.db.query("""SELECT d.playfield_id AS pf_id, d.site_number, d.site_name, d.min_ql, d.max_ql,
|
|
d.x_coord, d.y_coord, d.timing, d.enabled, a.tower_id, a.ql,
|
|
a.close_time, a.penalty_until, a.planted, b.*, c.*, e.*
|
|
FROM tower_sites d
|
|
LEFT JOIN towers a on a.pf_id = d.playfield_id and a.site_number = d.site_number AND a.close_time IS NOT NULL
|
|
LEFT JOIN aodb b ON a.high_id = b.highid
|
|
LEFT JOIN playfields c on d.playfield_id = c.id
|
|
LEFT JOIN all_orgs e on a.org_id = e.org_id
|
|
WHERE playfield_id=?
|
|
GROUP BY playfield_id, site_number
|
|
ORDER BY site_number, ql DESC
|
|
""", [pf])
|
|
|
|
def get_towers_by_org(self, org_id):
|
|
return self.db.query("""SELECT * FROM (SELECT COUNT(CASE WHEN name LIKE '%Turret%' THEN 1 WHEN name LIKE '%SAM Battery%' THEN 1 END) turrets,
|
|
COUNT(CASE WHEN name LIKE '%Guard%' THEN 1 END) guard, a.pf_id, a.org_id, e.org_name, c.*, d.* FROM towers a
|
|
LEFT JOIN aodb b ON a.high_id = b.highid
|
|
LEFT JOIN playfields c on a.pf_id = c.id
|
|
LEFT JOIN tower_sites d on a.pf_id = d.playfield_id and a.site_number = d.site_number
|
|
LEFT JOIN all_orgs e on a.org_id = e.org_id
|
|
WHERE a.org_id=? GROUP BY a.pf_id, a.site_number ORDER BY ql, close_time IS NOT NULL) t1
|
|
LEFT JOIN
|
|
(SELECT * from towers a WHERE a.org_id=? AND close_time IS NOT NULL) t2
|
|
ON t1.pf_id=t2.pf_id AND t1.site_number = t2.site_number
|
|
ORDER BY ql""", [org_id, org_id])
|
|
|
|
def find_orgs(self, search):
|
|
return self.db.query("SELECT DISTINCT a.org_name, a.org_id FROM all_orgs a "
|
|
"LEFT JOIN towers b ON a.org_id = b.org_id WHERE org_name <EXTENDED_LIKE=0> ? AND b.org_id IS NOT NULL",
|
|
[search], extended_like=True)
|