Files
Minidodo d3461ef462 Moved org finder to the core
Allowed viewing the most recent account logs by using !account log recent
Allowed removing of chatbots
Moved the so far hardcoded org prefixing into the DB, which allows dynamically changing them.
Fix for orgrosters; org leaves are being detected now.
2022-02-11 15:43:48 +01:00

295 lines
15 KiB
Python

import time
from datetime import datetime
import pytz
from core.chat_blob import ChatBlob
from core.command_alias_service import CommandAliasService
from core.command_param_types import Any, Int, Const
from core.db import DB
from core.decorators import instance, command, event, setting
from core.event_service import EventService
from core.igncore import IgnCore
from core.lookup.pork_service import PorkService
from core.public_channel_service import PublicChannelService
from core.setting_types import BooleanSettingType
from core.text import Text
from core.util import Util
from modules.standard.helpbot.playfield_controller import PlayfieldController
from modules.standard.tower.tower_attack_controller import TowerAttackController
from modules.standard.tower.tower_events import TowerEventController
# legacy(0), EU - friendly(1) or US - friendly(2) => timing
FIXED_TIMES = {1: 4,
2: 20}
@instance()
class LandController:
def inject(self, registry):
self.bot: IgnCore = registry.get_instance("bot")
self.db: DB = registry.get_instance("db")
self.text: Text = registry.get_instance("text")
self.util: Util = registry.get_instance("util")
self.event_service: EventService = registry.get_instance("event_service")
self.pork_service: PorkService = registry.get_instance("pork_service")
self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller")
self.public_channel_service: PublicChannelService = registry.get_instance("public_channel_service")
self.command_alias_service: CommandAliasService = registry.get_instance("command_alias_service")
self.tac: TowerAttackController = registry.get_instance("tower_attack_controller")
def pre_start(self):
self.command_alias_service.add_alias('towers', "lc")
self.command_alias_service.add_alias('tower', "lc")
self.command_alias_service.add_alias('lca', "lc")
self.db.load_sql_file(self.module_dir + "/" + "tower_sites.sql", pre_optimized=True)
self.db.shared.exec("CREATE TABLE IF NOT EXISTS `towers` ("
"`tower_id` INT(11) NOT NULL,"
"`pf_id` INT(11) NOT NULL,"
"`site_number` INT(11) NOT NULL,"
"`x_coord` INT(11) NOT NULL,"
"`y_coord` INT(11) NOT NULL,"
"`high_id` INT(11) NOT NULL,"
"`ql` INT(11) NOT NULL,"
"`org_id` INT(11),"
"`faction` VARCHAR(11),"
"`planted` INT(11),"
"`close_time` INT(11),"
"`penalty_until` INT(11),"
"PRIMARY KEY (`tower_id`) USING HASH,"
"INDEX `site` (`pf_id`, `site_number`, `org_id`, `close_time`, `penalty_until`) USING HASH,"
"INDEX `tower` (`planted`, `ql`, `x_coord`, `y_coord`) USING HASH) "
"COLLATE='utf8mb4_general_ci' ENGINE=MEMORY;")
self.db.create_view("towers")
@setting(name="lc_cmd_full", value="false", description="Toggle the verbosity of !lc commands")
def lc_full(self) -> BooleanSettingType:
return BooleanSettingType()
@event(event_type=TowerEventController.TOWER_VICTORY_EVENT, description="Purge sites which got wiped from DB",
is_enabled=False)
def tower_victory_event(self, _, event_data):
row = self.tac.get_last_attack(event_data.winner.faction, event_data.winner.org_name, event_data.loser.faction,
event_data.loser.org_name, event_data.location.playfield.id, time.time())
if row:
self.db.exec("DELETE FROM towers where pf_id=? and site_number=?", [row.pf_id, row.site])
@command(command="lc", params=[], access_level="member",
description="See a list of playfields containing land control tower sites")
def lc_list_cmd(self, request):
data = self.db.query(
"SELECT id, long_name, short_name FROM playfields "
"WHERE id IN (SELECT DISTINCT playfield_id FROM tower_sites) ORDER BY short_name")
blob = ""
for row in data:
blob += f"[{row.id:d}] {self.text.make_tellcmd(row.long_name, f'lc {row.short_name}')} <highlight>{row.short_name}</highlight>\n"
return ChatBlob(f"Land Control Playfields ({len(data):d})", blob)
@command(command="lc", params=[Const("org"), Any("search")], access_level="member",
description="See a list of land control tower sites in a particular playfield")
def sites_org_cmd(self, request, _, search):
if search.isdigit():
org_id = int(search)
result = search
else:
orgs = self.find_orgs(search)
num_orgs = len(orgs)
if num_orgs == 0:
char_info = self.pork_service.get_character_info(search)
if char_info:
if not char_info.org_id:
return f"<highlight>{search.capitalize()}</highlight> does not appear to belong to an org."
else:
org_id = char_info.org_id
result = char_info.org_name
else:
return f"Character or org <highlight>{search}</highlight> does not own any sites."
elif num_orgs == 1:
result = orgs[0].org_name
org_id = orgs[0].org_id
else:
blob = ""
for org in orgs:
blob += self.text.make_tellcmd(f"{org.org_name} ({org.org_id})",
f"lc org {org.org_id}") + "\n"
return ChatBlob(f"Orgs matching your search criteria ({num_orgs})", blob)
data = self.get_towers_by_org(org_id)
if not data:
return f"The org <highlight>{search}</highlight> does not own any sites or does not exist."
blob = ""
ql = 0
for x in data:
blob += f"<pagebreak>{self.format_site_info(x, time.time(), len(data))}"
blob += f"<tab>Dist: <highlight>{x.guard}</highlight> Conductors and <highlight>{x.turrets}</highlight> Turrets planted\n\n"
ql += x.ql
blob += f"Stats: QL<highlight>{ql}</highlight>, contracts up to QL<highlight>{ql * 2}</highlight>"
return ChatBlob(f"Sites owned by {result}", blob)
@command(command="lc", params=[Any("playfield"), Int("site_number", is_optional=True)], access_level="member",
description="See a list of land control tower sites in a particular playfield")
def lc_playfield_cmd(self, request, playfield_name, site_number):
playfield = self.playfield_controller.get_playfield_by_name_or_id(playfield_name)
if not playfield:
return f"Could not find playfield <highlight>{playfield_name}</highlight>."
data = self.get_towers(playfield.id, site_number)
if not data:
return f"The Site <highlight>{site_number}</highlight> does not exist in playfield <highlight>{playfield_name}</highlight>."
blob = ""
t = int(time.time())
if site_number:
data = self.get_towers(playfield.id, site_number)
blob += "<pagebreak>" + self.format_site_info(data, t)
else:
for row in data:
blob += f"<pagebreak>{self.format_site_info(row, t, len(data))}\n"
if site_number:
title = f"Tower Info: {playfield.long_name} x{site_number}"
else:
title = f"Tower Info: {playfield.long_name} ({len(data)})"
return ChatBlob(title, blob)
@command(command="free", params=[],
access_level="member",
description="Shows potentially free towerfields")
def free(self, _, ):
blob = ""
data = self.get_free()
if not data:
return "Currently there are no free sites. Go kill some."
for row in data:
blob += f"<pagebreak>{self.format_site_info(row, time.time(), len(data))}\n"
return ChatBlob(f"FREE Towersites ({len(data)})", blob)
def format_site_info(self, row, t, count=0):
data = row
if count == 0 and data:
row = data[0]
if not row:
return "Site does not exist."
blob = f"<highlight>{row.short_name} x{row.site_number}</highlight> ({row.site_name})\n"
blob += f"<tab>Level Range: <white>{row.min_ql} - {row.max_ql}</white> "
if row.timing == 0:
blob += f"[<grey>Legacy</grey>]\n"
if row.timing == 1:
blob += f"[<grey><black>0</black>4 UTC</grey>]\n"
if row.timing == 2:
blob += f"[<grey>20 UTC</grey>]\n"
blob += f"<tab>Coordinates: "
blob += self.text.make_chatcmd(f"{row.x_coord:d}x{row.y_coord:d}",
f"/waypoint {row.x_coord:d} {row.y_coord:d} {row.pf_id:d}") + "\n"
if row.get("org_name", None):
current_day_time = t % 86400
if row.timing > 0:
row.close_time = FIXED_TIMES[row.timing] * 3600 + row.planted % 3600
value = datetime.fromtimestamp(row.close_time, tz=pytz.UTC)
current_status_time = row.close_time - current_day_time
if current_status_time < 0:
current_status_time += 86400
status = ""
if current_status_time <= 3600:
status += f"<red>5%</red> (closes in {self.util.time_to_readable(current_status_time)})"
elif current_status_time <= (3600 * 6):
status += f"<orange>25%</orange> (closes in {self.util.time_to_readable(current_status_time)})"
else:
status += f"<green>75%</green> (opens in {self.util.time_to_readable(current_status_time - (3600 * 6))})"
if row.penalty_until > t:
status += f" <red>Penalty</red> (for {self.util.time_to_readable(row.penalty_until - t)})"
blob += f"<tab>CT: QL<highlight>{row.ql}</highlight> ({self.text.get_formatted_faction(row.faction, row.org_name)}) T{self.get_ct_type(row.ql)} - Planted {self.util.time_to_readable(t - row.planted)} ago\n"
blob += f"<tab>Gas: {status}\n"
if self.lc_full().get_value():
towers = ""
cond, turret = 0, 0
if count == 0:
for tower in data:
if tower.name.__contains__("Turret"):
turret += 1
elif tower.name.__contains__("Conductor"):
cond += 1
towers += f" - QL<highlight>{self.text.zfill(tower.ql, 220)}</highlight> {tower.name}\n"
blob += f"<tab>Dist: <highlight>{cond}</highlight> Conductors and <highlight>{turret}</highlight> Turrets\n"
blob += "\n Towers:\n"
blob += towers
else:
if not row.enabled:
blob += "<tab><red>Disabled</red>\n"
else:
blob += "<tab><red>This site is potentially unplanted</red>\n"
return blob
def get_ct_type(self, ql):
if ql < 34:
return 1
elif ql < 82:
return 2
elif ql < 129:
return 3
elif ql < 177:
return 4
elif ql < 201:
return 5
elif ql < 226:
return 6
else:
return 7
def get_free(self):
return self.db.query("""SELECT d.playfield_id AS pf_id,d.site_number, d.site_name, d.min_ql, d.max_ql, d.x_coord, d.y_coord, d.timing, d.enabled, a.tower_id, a.ql, a.close_time, a.penalty_until, a.planted, b.*, c.*, e.* FROM tower_sites d
LEFT JOIN towers a on a.pf_id = d.playfield_id and a.site_number = d.site_number
LEFT JOIN aodb b ON a.high_id = b.highid
LEFT JOIN playfields c on d.playfield_id = c.id
LEFT JOIN all_orgs e on a.org_id = e.org_id
WHERE a.org_id IS NULL AND d.enabled = 1 GROUP BY d.playfield_id, d.site_number """)
def get_towers(self, pf, site=None):
if site:
return self.db.query("""SELECT d.playfield_id AS pf_id,d.site_number, d.site_name, d.min_ql, d.max_ql,
d.x_coord, d.y_coord, d.timing, d.enabled, a.tower_id, a.ql,
a.close_time, a.penalty_until, a.planted, b.*, c.*, e.*
FROM tower_sites d
LEFT JOIN towers a on a.pf_id = d.playfield_id and a.site_number = d.site_number
LEFT JOIN aodb b ON a.high_id = b.highid
LEFT JOIN playfields c on d.playfield_id = c.id
LEFT JOIN all_orgs e on a.org_id = e.org_id
WHERE playfield_id=? AND d.site_number=? ORDER BY close_time IS NULL, ql desc""",
[pf, site])
else:
return self.db.query("""SELECT d.playfield_id AS pf_id, d.site_number, d.site_name, d.min_ql, d.max_ql,
d.x_coord, d.y_coord, d.timing, d.enabled, a.tower_id, a.ql,
a.close_time, a.penalty_until, a.planted, b.*, c.*, e.*
FROM tower_sites d
LEFT JOIN towers a on a.pf_id = d.playfield_id and a.site_number = d.site_number AND a.close_time IS NOT NULL
LEFT JOIN aodb b ON a.high_id = b.highid
LEFT JOIN playfields c on d.playfield_id = c.id
LEFT JOIN all_orgs e on a.org_id = e.org_id
WHERE playfield_id=?
GROUP BY playfield_id, site_number
ORDER BY site_number, ql DESC
""", [pf])
def get_towers_by_org(self, org_id):
return self.db.query("""SELECT * FROM (SELECT COUNT(CASE WHEN name LIKE '%Turret%' THEN 1 WHEN name LIKE '%SAM Battery%' THEN 1 END) turrets,
COUNT(CASE WHEN name LIKE '%Guard%' THEN 1 END) guard, a.pf_id, a.org_id, e.org_name, c.*, d.* FROM towers a
LEFT JOIN aodb b ON a.high_id = b.highid
LEFT JOIN playfields c on a.pf_id = c.id
LEFT JOIN tower_sites d on a.pf_id = d.playfield_id and a.site_number = d.site_number
LEFT JOIN all_orgs e on a.org_id = e.org_id
WHERE a.org_id=? GROUP BY a.pf_id, a.site_number ORDER BY ql, close_time IS NOT NULL) t1
LEFT JOIN
(SELECT * from towers a WHERE a.org_id=? AND close_time IS NOT NULL) t2
ON t1.pf_id=t2.pf_id AND t1.site_number = t2.site_number
ORDER BY ql""", [org_id, org_id])
def find_orgs(self, search):
return self.db.query("SELECT DISTINCT a.org_name, a.org_id FROM all_orgs a "
"LEFT JOIN towers b ON a.org_id = b.org_id WHERE org_name <EXTENDED_LIKE=0> ? AND b.org_id IS NOT NULL",
[search], extended_like=True)