import time
from datetime import datetime
import pytz
from core.chat_blob import ChatBlob
from core.command_alias_service import CommandAliasService
from core.command_param_types import Any, Int, Const
from core.db import DB
from core.decorators import instance, command, event, setting
from core.event_service import EventService
from core.igncore import IgnCore
from core.lookup.pork_service import PorkService
from core.public_channel_service import PublicChannelService
from core.setting_types import BooleanSettingType
from core.text import Text
from core.util import Util
from modules.standard.helpbot.playfield_controller import PlayfieldController
from modules.standard.tower.tower_attack_controller import TowerAttackController
from modules.standard.tower.tower_events import TowerEventController
# legacy(0), EU - friendly(1) or US - friendly(2) => timing
FIXED_TIMES = {1: 4,
2: 20}
@instance()
class LandController:
def inject(self, registry):
self.bot: IgnCore = registry.get_instance("bot")
self.db: DB = registry.get_instance("db")
self.text: Text = registry.get_instance("text")
self.util: Util = registry.get_instance("util")
self.event_service: EventService = registry.get_instance("event_service")
self.pork_service: PorkService = registry.get_instance("pork_service")
self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller")
self.public_channel_service: PublicChannelService = registry.get_instance("public_channel_service")
self.command_alias_service: CommandAliasService = registry.get_instance("command_alias_service")
self.tac: TowerAttackController = registry.get_instance("tower_attack_controller")
def pre_start(self):
self.command_alias_service.add_alias('towers', "lc")
self.command_alias_service.add_alias('tower', "lc")
self.command_alias_service.add_alias('lca', "lc")
self.db.load_sql_file(self.module_dir + "/" + "tower_sites.sql", pre_optimized=True)
self.db.shared.exec("CREATE TABLE IF NOT EXISTS `towers` ("
"`tower_id` INT(11) NOT NULL,"
"`pf_id` INT(11) NOT NULL,"
"`site_number` INT(11) NOT NULL,"
"`x_coord` INT(11) NOT NULL,"
"`y_coord` INT(11) NOT NULL,"
"`high_id` INT(11) NOT NULL,"
"`ql` INT(11) NOT NULL,"
"`org_id` INT(11),"
"`faction` VARCHAR(11),"
"`planted` INT(11),"
"`close_time` INT(11),"
"`penalty_until` INT(11),"
"PRIMARY KEY (`tower_id`) USING HASH,"
"INDEX `site` (`pf_id`, `site_number`, `org_id`, `close_time`, `penalty_until`) USING HASH,"
"INDEX `tower` (`planted`, `ql`, `x_coord`, `y_coord`) USING HASH) "
"COLLATE='utf8mb4_general_ci' ENGINE=MEMORY;")
self.db.create_view("towers")
@setting(name="lc_cmd_full", value="false", description="Toggle the verbosity of !lc commands")
def lc_full(self) -> BooleanSettingType:
return BooleanSettingType()
@event(event_type=TowerEventController.TOWER_VICTORY_EVENT, description="Purge sites which got wiped from DB",
is_enabled=False)
def tower_victory_event(self, _, event_data):
row = self.tac.get_last_attack(event_data.winner.faction, event_data.winner.org_name, event_data.loser.faction,
event_data.loser.org_name, event_data.location.playfield.id, time.time())
if row:
self.db.exec("DELETE FROM towers where pf_id=? and site_number=?", [row.pf_id, row.site])
@command(command="lc", params=[], access_level="member",
description="See a list of playfields containing land control tower sites")
def lc_list_cmd(self, request):
data = self.db.query(
"SELECT id, long_name, short_name FROM playfields "
"WHERE id IN (SELECT DISTINCT playfield_id FROM tower_sites) ORDER BY short_name")
blob = ""
for row in data:
blob += f"[{row.id:d}] {self.text.make_tellcmd(row.long_name, f'lc {row.short_name}')} {row.short_name}\n"
return ChatBlob(f"Land Control Playfields ({len(data):d})", blob)
@command(command="lc", params=[Const("org"), Any("search")], access_level="member",
description="See a list of land control tower sites in a particular playfield")
def sites_org_cmd(self, request, _, search):
if search.isdigit():
org_id = int(search)
result = search
else:
orgs = self.find_orgs(search)
num_orgs = len(orgs)
if num_orgs == 0:
char_info = self.pork_service.get_character_info(search)
if char_info:
if not char_info.org_id:
return f"{search.capitalize()} does not appear to belong to an org."
else:
org_id = char_info.org_id
result = char_info.org_name
else:
return f"Character or org {search} does not own any sites."
elif num_orgs == 1:
result = orgs[0].org_name
org_id = orgs[0].org_id
else:
blob = ""
for org in orgs:
blob += self.text.make_tellcmd(f"{org.org_name} ({org.org_id})",
f"lc org {org.org_id}") + "\n"
return ChatBlob(f"Orgs matching your search criteria ({num_orgs})", blob)
data = self.get_towers_by_org(org_id)
if not data:
return f"The org {search} does not own any sites or does not exist."
blob = ""
ql = 0
for x in data:
blob += f"{self.format_site_info(x, time.time(), len(data))}"
blob += f"Dist: {x.guard} Conductors and {x.turrets} Turrets planted\n\n"
ql += x.ql
blob += f"Stats: QL{ql}, contracts up to QL{ql * 2}"
return ChatBlob(f"Sites owned by {result}", blob)
@command(command="lc", params=[Any("playfield"), Int("site_number", is_optional=True)], access_level="member",
description="See a list of land control tower sites in a particular playfield")
def lc_playfield_cmd(self, request, playfield_name, site_number):
playfield = self.playfield_controller.get_playfield_by_name_or_id(playfield_name)
if not playfield:
return f"Could not find playfield {playfield_name}."
data = self.get_towers(playfield.id, site_number)
if not data:
return f"The Site {site_number} does not exist in playfield {playfield_name}."
blob = ""
t = int(time.time())
if site_number:
data = self.get_towers(playfield.id, site_number)
blob += "" + self.format_site_info(data, t)
else:
for row in data:
blob += f"{self.format_site_info(row, t, len(data))}\n"
if site_number:
title = f"Tower Info: {playfield.long_name} x{site_number}"
else:
title = f"Tower Info: {playfield.long_name} ({len(data)})"
return ChatBlob(title, blob)
@command(command="free", params=[],
access_level="member",
description="Shows potentially free towerfields")
def free(self, _, ):
blob = ""
data = self.get_free()
if not data:
return "Currently there are no free sites. Go kill some."
for row in data:
blob += f"{self.format_site_info(row, time.time(), len(data))}\n"
return ChatBlob(f"FREE Towersites ({len(data)})", blob)
def format_site_info(self, row, t, count=0):
data = row
if count == 0 and data:
row = data[0]
if not row:
return "Site does not exist."
blob = f"{row.short_name} x{row.site_number} ({row.site_name})\n"
blob += f"Level Range: {row.min_ql} - {row.max_ql} "
if row.timing == 0:
blob += f"[Legacy]\n"
if row.timing == 1:
blob += f"[04 UTC]\n"
if row.timing == 2:
blob += f"[20 UTC]\n"
blob += f"Coordinates: "
blob += self.text.make_chatcmd(f"{row.x_coord:d}x{row.y_coord:d}",
f"/waypoint {row.x_coord:d} {row.y_coord:d} {row.pf_id:d}") + "\n"
if row.get("org_name", None):
current_day_time = t % 86400
if row.timing > 0:
row.close_time = FIXED_TIMES[row.timing] * 3600 + row.planted % 3600
value = datetime.fromtimestamp(row.close_time, tz=pytz.UTC)
current_status_time = row.close_time - current_day_time
if current_status_time < 0:
current_status_time += 86400
status = ""
if current_status_time <= 3600:
status += f"5% (closes in {self.util.time_to_readable(current_status_time)})"
elif current_status_time <= (3600 * 6):
status += f"25% (closes in {self.util.time_to_readable(current_status_time)})"
else:
status += f"75% (opens in {self.util.time_to_readable(current_status_time - (3600 * 6))})"
if row.penalty_until > t:
status += f" Penalty (for {self.util.time_to_readable(row.penalty_until - t)})"
blob += f"CT: QL{row.ql} ({self.text.get_formatted_faction(row.faction, row.org_name)}) T{self.get_ct_type(row.ql)} - Planted {self.util.time_to_readable(t - row.planted)} ago\n"
blob += f"Gas: {status}\n"
if self.lc_full().get_value():
towers = ""
cond, turret = 0, 0
if count == 0:
for tower in data:
if tower.name.__contains__("Turret"):
turret += 1
elif tower.name.__contains__("Conductor"):
cond += 1
towers += f" - QL{self.text.zfill(tower.ql, 220)} {tower.name}\n"
blob += f"Dist: {cond} Conductors and {turret} Turrets\n"
blob += "\n Towers:\n"
blob += towers
else:
if not row.enabled:
blob += "Disabled\n"
else:
blob += "This site is potentially unplanted\n"
return blob
def get_ct_type(self, ql):
if ql < 34:
return 1
elif ql < 82:
return 2
elif ql < 129:
return 3
elif ql < 177:
return 4
elif ql < 201:
return 5
elif ql < 226:
return 6
else:
return 7
def get_free(self):
return self.db.query("""SELECT d.playfield_id AS pf_id,d.site_number, d.site_name, d.min_ql, d.max_ql, d.x_coord, d.y_coord, d.timing, d.enabled, a.tower_id, a.ql, a.close_time, a.penalty_until, a.planted, b.*, c.*, e.* FROM tower_sites d
LEFT JOIN towers a on a.pf_id = d.playfield_id and a.site_number = d.site_number
LEFT JOIN aodb b ON a.high_id = b.highid
LEFT JOIN playfields c on d.playfield_id = c.id
LEFT JOIN all_orgs e on a.org_id = e.org_id
WHERE a.org_id IS NULL AND d.enabled = 1 GROUP BY d.playfield_id, d.site_number """)
def get_towers(self, pf, site=None):
if site:
return self.db.query("""SELECT d.playfield_id AS pf_id,d.site_number, d.site_name, d.min_ql, d.max_ql,
d.x_coord, d.y_coord, d.timing, d.enabled, a.tower_id, a.ql,
a.close_time, a.penalty_until, a.planted, b.*, c.*, e.*
FROM tower_sites d
LEFT JOIN towers a on a.pf_id = d.playfield_id and a.site_number = d.site_number
LEFT JOIN aodb b ON a.high_id = b.highid
LEFT JOIN playfields c on d.playfield_id = c.id
LEFT JOIN all_orgs e on a.org_id = e.org_id
WHERE playfield_id=? AND d.site_number=? ORDER BY close_time IS NULL, ql desc""",
[pf, site])
else:
return self.db.query("""SELECT d.playfield_id AS pf_id, d.site_number, d.site_name, d.min_ql, d.max_ql,
d.x_coord, d.y_coord, d.timing, d.enabled, a.tower_id, a.ql,
a.close_time, a.penalty_until, a.planted, b.*, c.*, e.*
FROM tower_sites d
LEFT JOIN towers a on a.pf_id = d.playfield_id and a.site_number = d.site_number AND a.close_time IS NOT NULL
LEFT JOIN aodb b ON a.high_id = b.highid
LEFT JOIN playfields c on d.playfield_id = c.id
LEFT JOIN all_orgs e on a.org_id = e.org_id
WHERE playfield_id=?
GROUP BY playfield_id, site_number
ORDER BY site_number, ql DESC
""", [pf])
def get_towers_by_org(self, org_id):
return self.db.query("""SELECT * FROM (SELECT COUNT(CASE WHEN name LIKE '%Turret%' THEN 1 WHEN name LIKE '%SAM Battery%' THEN 1 END) turrets,
COUNT(CASE WHEN name LIKE '%Guard%' THEN 1 END) guard, a.pf_id, a.org_id, e.org_name, c.*, d.* FROM towers a
LEFT JOIN aodb b ON a.high_id = b.highid
LEFT JOIN playfields c on a.pf_id = c.id
LEFT JOIN tower_sites d on a.pf_id = d.playfield_id and a.site_number = d.site_number
LEFT JOIN all_orgs e on a.org_id = e.org_id
WHERE a.org_id=? GROUP BY a.pf_id, a.site_number ORDER BY ql, close_time IS NOT NULL) t1
LEFT JOIN
(SELECT * from towers a WHERE a.org_id=? AND close_time IS NOT NULL) t2
ON t1.pf_id=t2.pf_id AND t1.site_number = t2.site_number
ORDER BY ql""", [org_id, org_id])
def find_orgs(self, search):
return self.db.query("SELECT DISTINCT a.org_name, a.org_id FROM all_orgs a "
"LEFT JOIN towers b ON a.org_id = b.org_id WHERE org_name ? AND b.org_id IS NOT NULL",
[search], extended_like=True)