import sys
import time
from mysql.connector.cursor import CursorBase
from requests import Session
from conf.config import BotConfig
from core.aochat.BaseModule import BaseModule
from core.db import DB
from core.decorators import instance, timerevent, event, setting
from core.job_scheduler import JobScheduler
from core.setting_types import BooleanSettingType
from core.text import Text
from core.igncore import IgnCore
from core.util import Util
from modules.core.accounting.services.account_service import AccountService
from modules.raidbot.tower.tower_controller import TowerController
from modules.standard.helpbot.playfield_controller import PlayfieldController
@instance()
class TowerService(BaseModule):
# For this Module to work properly you might need to
# contact the API host of your choice to whitelist your IP addresses.
attack_hot = []
plant = []
def inject(self, registry):
self.bot: IgnCore = registry.get_instance("bot")
self.db: DB = registry.get_instance("db")
self.util: Util = registry.get_instance("util")
self.text: Text = registry.get_instance("text")
self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller")
self.job_scheduler: JobScheduler = registry.get_instance("job_scheduler")
self.account_service: AccountService = registry.get_instance("account_service")
mod = __import__(f'conf.{sys.argv[1]}', fromlist=['BotConfig'])
config: BotConfig = getattr(mod, 'BotConfig')
if hasattr(config, "tower_url"):
self.tower_url = config.tower_url
else:
self.tower_url = None
def pre_start(self):
self.db.shared.exec("CREATE TABLE IF NOT EXISTS towers("
"pf_id int not null, "
"site_number int not null,"
"ql int,"
"x_coord int not null,"
"y_coord int not null,"
"org_id int,"
"org_name varchar(255), "
"faction varchar(32), "
"close_time int,"
"planted int,"
"enabled tinyint, "
"PRIMARY KEY (pf_id, site_number), "
"INDEX ql(ql), INDEX close(close_time), "
"INDEX planted(planted), INDEX enabled(enabled)) ENGINE MEMORY")
self.db.create_view("towers")
@event(event_type=TowerController.TOWER_ATTACK_EVENT, description="Track planthot", is_hidden=True)
def tower_attack(self, _, event_data):
if event_data.attacker.get("org_id", None):
self.attack_hot.append({'org_name': event_data.attacker.org_name, 'hot': time.time() + 60 * 60})
@setting(name="tower_notify_type", value=False, description="Only notify when our orgs are involved")
def tower_notify_type(self) -> BooleanSettingType:
return BooleanSettingType()
@event(event_type=TowerController.TOWER_VICTORY_EVENT, description="Send NW warnings")
def victory(self, _, event_data):
t = int(time.time())
if event_data.type == "attack":
if self.tower_notify_type().get_value():
if not (event_data.winner.get("org_name", None) in self.account_service.get_org_names()
or event_data.loser.org_name in self.account_service.get_org_names()):
return
row = self.get_last_attack(event_data.winner.faction, event_data.winner.org_name, event_data.loser.faction,
event_data.loser.org_name, event_data.location.playfield.id, t)
self.send_nw_warn(0,
f'<{event_data.loser.faction.lower()}>'
f'{event_data.loser.org_name}'
f'{event_data.loser.faction.lower()}>'
f' Lost their Site at {event_data.location.playfield.short_name} x{row.site} - '
f'<{event_data.winner.faction.lower()}>'
f'{event_data.winner.org_name}'
f'{event_data.winner.faction.lower()}> won!!')
self.plant.append({'pf': row.pf_id, 'site': row.site, 'plant': time.time() + 20 * 60 - 1})
self.prepare_nw_warn(row.pf_id, row.site)
elif event_data.type == "terminated":
field = self.db.query("SELECT * FROM towers t where t.org_name=? and t.pf_id=?",
[event_data.loser.org_name, event_data.playfield.id])
if len(field) == 1:
field = field[0]
self.plant.append({'pf': event_data.location.playfield.id,
'site': field.site_number,
'plant': time.time() + 20 * 60 - 1})
self.send_nw_warn(0, f'<{event_data.loser.faction.lower()}>'
f'{event_data.loser.org_name}'
f'{event_data.loser.faction.lower()}> Lost their Site at '
f'{event_data.location.playfield.short_name} x{field.site_number}')
self.prepare_nw_warn(event_data.location.playfield.id, field.site_number)
return
self.plant.append({'pf': event_data.location.playfield.id,
'site': f"(UKN) PO: {event_data.loser.org_name}|{event_data.loser.faction}",
'plant': time.time() + 20 * 60 - 1})
self.send_nw_warn(0,
f'<{event_data.loser.faction.lower()}>'
f'{event_data.loser.org_name}'
f'{event_data.loser.faction.lower()}> '
f'Lost their Site in {event_data.location.playfield.long_name}')
self.prepare_nw_warn(event_data.playfield.id, "(UKN)",
f"(UKN) PO: {event_data.loser.org_name}|{event_data.loser.faction}")
@timerevent(budatime="4h", description="fetch the towerAPI cache", is_enabled=False)
def fetch_tower_update(self, _1, _2):
if self.tower_url:
#
# ONLY Access the API's via Tor... Tyrence is shadow-banning every IP Subnet
# accessing it multiple times/hour whenever the limit parameter is being used....
# ?limit is a parameter not documented anywhere, but allows pulling the whole list
# On other API implementations this parameter has no effect,
# as the server always responds with the full list.
# In the Future, this Event will get moved to an external process,
# which maintains the tower Cache for all connected bots,
# like it has been done with the Worldboss timers.
from torpy.http.requests import TorRequests
with TorRequests() as tor_request:
with tor_request.get_session(1) as session:
session: Session
r = session.get(self.tower_url)
if not r:
return
if data := r.json()["results"]:
blob = []
for row in data:
blob.append((row["playfield_id"], row["site_number"], row["ql"],
row["x_coord"], row["y_coord"],
row["org_id"], row['org_name'], row['faction'],
row["close_time"], row["created_at"],
row["enabled"]))
with self.db.lock:
with self.db.pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cur:
cur: CursorBase
cur.executemany(
"INSERT INTO towers (pf_id, site_number, "
"ql, x_coord, y_coord, org_id, org_name, "
"faction, close_time, planted, enabled) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE "
"org_id=VALUE(org_id), "
"org_name=VALUE(org_name), "
"faction=VALUE(faction), "
"close_time=VALUE(close_time),"
"ql=VALUE(ql),"
"planted=VALUE(planted) ", blob)
rem = []
for key, value in enumerate(sorted(self.attack_hot, key=lambda k: k['hot'])):
if value['hot'] < time.time():
rem.append(key)
for key in reversed(rem):
self.attack_hot.pop(key)
def day_time(self, day_t):
if day_t > 86400:
day_t -= 86400
elif day_t < 0:
day_t += 86400
return day_t
def get(self, where_order="", param=None):
if param is None:
param = []
return self.db.query(f"SELECT a.org_id, a.planted, a.close_time, a.ql, a.org_name, a.faction, "
f"b.min_ql, b.max_ql, b.site_name, b.site_number, b.x_coord, b.y_coord, "
f"c.id, c.long_name, c.short_name, "
f"e.pvp_min, e.pvp_max "
f"FROM towers a "
f"INNER JOIN tower_site b ON a.site_number=b.site_number "
f"LEFT JOIN playfields c ON a.pf_id = c.id "
f"LEFT JOIN level e ON e.level = a.ql "
f"WHERE a.pf_id = b.playfield_id and enabled = 1 "
f"{where_order}", param)
def get_towers_by_tl(self, tl, faction=None):
min_ql, max_ql = self.util.get_level_range_tl(tl)
if faction:
return self.get("and ql between ? and ? and faction LIKE ? order by c.short_name",
[min_ql, max_ql, faction])
return self.get("and ql between ? and ? order by c.short_name", [min_ql, max_ql])
def get_towers_all(self):
return self.get("order by c.long_name", [])
def get_towers_by_pf(self, pf):
return self.get("and a.pf_id=? order by a.site_number", [pf])
def get_towers_by_pf_site(self, pf, site):
return self.get("and a.pf_id=? and a.site_number=?", [pf, site])
def get_towers_by_org(self, org):
return self.get("and a.org_id"
"=? order by c.long_name", [org])
def get_towers_by_org_name(self, org):
return self.get("and a.org_name=? order by c.long_name", [org])
def get_free(self):
return self.get("and a.org_id IS NULL order by a.site_number", [])
def get_towers_hot_tl(self, tl, faction=None):
towers = self.get_towers_by_tl(tl, faction)
out = []
for tower in towers:
if self.is_hot(tower) in [1, 2]:
out.append(tower)
return out
def get_towers_hot_level(self, level, faction=None):
out = []
if level:
if faction:
towers = self.get("and pvp_min <= ? and pvp_max >= ? and faction LIKE ?", [level, level, faction])
else:
towers = self.get("and pvp_min <= ? and pvp_max >= ?", [level, level])
else:
if faction:
towers = self.get("and faction LIKE ?", [faction])
else:
towers = self.get()
for tower in towers:
if self.is_hot(tower) in [1, 2]:
out.append(tower)
return out
def format_entry(self, entry, _):
h3 = ""
now = self.day_time(int(time.time()) % 86400)
row0 = f"Site: {entry.short_name} x{entry.site_number} " \
f"[R:{entry.min_ql} - {entry.max_ql}] " \
f"[{self.text.make_tellcmd('More', f'lc {entry.short_name} {entry.site_number}')}]\n"
row1 = f'UKN :: No Owner -> Unplanted\n'
row2 = ""
row3 = "\n"
hot = self.is_hot(entry)
if hot != -1:
h1 = "COLD"
if hot == 2:
h1 = "WARHOT"
for org in self.attack_hot:
if org['org_name'] == entry.org_name:
hot_normal = self.is_hot(entry, False)
# print(hot, hot_normal, org['hot'] - now)
if hot_normal == 1:
h3 = f"COLD in " \
f"{self.util.format_time(self.day_time(int(entry.close_time - now)))}"
if hot_normal == 0:
h3 = f"COLD in {self.util.format_time(org['hot'] - now)}"
elif hot == 1:
h1 = 'HOT'
h3 = f"COLD in {self.util.format_time(self.day_time(int(entry.close_time - now)))}"
else:
hot_time = self.day_time(entry.close_time - 6 * 60 * 60)
h3 = f"HOT in " \
f"{self.util.format_time((18 * 80 * 60 - hot_time) if hot_time > 18 * 60 * 60 else hot_time)}"
org = f"<{entry.faction.lower()}>{entry.org_name}{entry.faction.lower()}> " \
f"[{self.text.make_tellcmd('View org', f'lc org {entry.org_name}')}]"
row1 = f"{h1} :: {h3} :: {org} :: \n"
row3 = f" » Planted: {self.util.format_datetime(entry.planted)}\n\n"
if entry.pvp_min:
pvp = f"[{entry.pvp_min} - {entry.pvp_max}]"
else:
pvp = f"[175 - 220]"
# noinspection LongLine
row2 = f" » QL: {entry.ql} PvP: {pvp} " \
f"[{self.text.make_chatcmd(f'{entry.x_coord} x {entry.y_coord}', f'/waypoint {entry.x_coord} {entry.y_coord} {entry.id}')}]\n"
return row0 + row1 + row2 + row3 + ""
def is_hot(self, entry, with_war=True) -> int:
if entry.get("close_time", None):
now = self.day_time((time.time()) % 86400)
self.attack_hot.sort(key=lambda k: k['org_name'])
rem = []
inside = False
for index, i in enumerate(self.attack_hot):
if i['hot'] < time.time():
rem.append(index)
continue
if i['org_name'] == entry.org_name:
inside = True
for index in reversed(rem):
self.attack_hot.pop(index)
if inside and with_war:
return 2
if self.day_time(entry.close_time - int(now)) > 6 * 60 * 60:
return 0
return 1
return -1
def get_last_attack(self, att_faction, att_org_name, def_faction, def_org_name, playfield_id, t):
last_updated = t - (8 * 3600)
is_finished = 1
sql = """
SELECT
b.id AS battle_id,
a.id AS attack_id,
b.playfield_id as pf_id,
b.site_number as site
FROM
tower_battle b
JOIN tower_attacker a ON
a.tower_battle_id = b.id
WHERE
a.att_faction = ?
AND a.att_org_name = ?
AND b.def_faction = ?
AND b.def_org_name = ?
AND b.playfield_id = ?
AND b.is_finished = ?
AND b.last_updated >= ?
ORDER BY
last_updated DESC
LIMIT 1"""
return self.db.query_single(sql,
[att_faction, att_org_name, def_faction, def_org_name, playfield_id, is_finished,
last_updated])
def prepare_nw_warn(self, pf_id, site, bonus=""):
pf = self.playfield_controller.get_playfield_by_id(pf_id)
site = f"{pf.short_name} » x{site}" + bonus
self.job_scheduler.delayed_job(self.send_nw_warn, 0, f"{site} plantable in 20 minutes!")
self.job_scheduler.delayed_job(self.send_nw_warn, 10 * 60 - 1, f"{site} plantable in 10 minutes!")
self.job_scheduler.delayed_job(self.send_nw_warn, 15 * 60 - 1, f"{site} plantable in 5 minutes!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 - 1, f"{site} plantable in 1 minute!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 30 - 1, f"{site} plantable in 30 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 45 - 1, f"{site} plantable in 15 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 50 - 1, f"{site} plantable in 10 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 55 - 1, f"{site} plantable in 5 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 56 - 1, f"{site} plantable in 4 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 57 - 1, f"{site} plantable in 3 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 58 - 1, f"{site} plantable in 2 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 59 - 1, f"{site} plantable in 1 second!")
self.job_scheduler.delayed_job(self.send_nw_warn, 20 * 60 - 1, f"{site} plantable NOW!")
def send_nw_warn(self, _, msg):
self.bot.send_private_channel_message(f"[NW] {msg}")