-> !wants
-> !orgs info
-> special cmd's
-> !assist
-> "afk" for players without active account
-> !loot add <item_ref> <count> => nolonger breaks !account
Changes:
-> grouped !tara, !gaunt, .. into !wb
-> Display the most recent news entry on logon (default: enabled)
-> improved grouping of !items
-> Added the option to authentificate WS connections (Datanet module). This is used in special cases, where the Websocket Server requires the clien tto authentificate itself. (Server sends "#auth", client responds with the auth string)
-> Add main name to relaying (priv <-> org) [default: disabled]
-> Added logon/logoff messages back
-> restricted default access to "dangerous" commands to moderator
-> Added optional logging (Private Channel, Org Channel, Tells, ... disabled by default)

Rewrite of the Tower Module.
-> More verbosity, if enabled in config. by default, GAS and Hot timer only.
-> !hot displays currently hot (and in penalty) sites, and these which go hot in < 60 minutes
-> !attacks filterable by PF and Site
-> display current contract QL's grouped by org: !contracts (requires managed cache)
This commit is contained in:
2021-11-25 14:09:43 +01:00
parent 2d7ecf4883
commit 17c776faec
44 changed files with 1669 additions and 1249 deletions
@@ -1,81 +0,0 @@
import math
from core.aochat.BaseModule import BaseModule
from core.chat_blob import ChatBlob
from core.command_alias_service import CommandAliasService
from core.command_param_types import Int, NamedParameters
from core.db import DB
from core.decorators import instance, command
from core.event_service import EventService
from core.lookup.pork_service import PorkService
from core.public_channel_service import PublicChannelService
from core.text import Text
from core.igncore import IgnCore
from core.util import Util
from modules.raidbot.tower.tower_service import TowerService
from modules.standard.helpbot.playfield_controller import PlayfieldController
# noinspection DuplicatedCode
@instance()
class ContractController(BaseModule):
PAGE_SIZE = 40
# noinspection DuplicatedCode
def inject(self, registry):
self.bot: IgnCore = registry.get_instance("bot")
self.db: DB = registry.get_instance("db")
self.util: Util = registry.get_instance("util")
self.text: Text = registry.get_instance("text")
self.event_service: EventService = registry.get_instance("event_service")
self.pork_service: PorkService = registry.get_instance("pork_service")
self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller")
self.public_channel_service: PublicChannelService = registry.get_instance("public_channel_service")
self.towercache: TowerService = registry.get_instance("tower_service")
self.command_alias_service: CommandAliasService = registry.get_instance("command_alias_service")
@command(command="contracts",
params=[Int('mininum', is_optional=True), NamedParameters(['page'])],
access_level="member",
description="Shows contracts")
def cotracts(self, _, min_ql, named_params):
if not min_ql:
min_ql = 200
data = self.db.query("SELECT CAST(SUM(ql)*2 AS INTEGER) AS contracts, "
"COUNT(*) as sites, org_name, org_id, faction FROM towers "
"where org_name IS NOT NULL GROUP BY org_name ORDER BY contracts desc", [])
page = int(named_params.page or "1")
offset = (page - 1) * self.PAGE_SIZE
data = [x for x in data if x.contracts and x.contracts > min_ql]
return self.format_pagination(data, offset, page, f"Tower Contracts ({len(data)})",
f"There are no orgs with more than "
f"<highlight>{min_ql}</highlight> contract points.",
f'contracts {min_ql}')
def format_pagination(self, data, offset, page, title, nullmsg, cmd):
selected = data[offset:offset + self.PAGE_SIZE]
count = len(selected)
pages = ""
if page > 1:
pages += "Pages: " + self.text.make_tellcmd("«« Page %d" % (page - 1), f'{cmd} --page={page - 1}')
if offset + self.PAGE_SIZE < len(data):
pages += f" Page {page}/{math.ceil(len(data) / self.PAGE_SIZE)}"
pages += " " + self.text.make_tellcmd("Page %d »»" % (page + 1), f'{cmd} --page={page + 1}')
pages += "\n"
if count == 0:
return nullmsg
else:
blob = "<font color=CCInfoText>"
blob += "" + pages + "\n"
index = offset
for entry in selected:
index += 1
blob += self.row_formatter(entry, index, data)
blob += "\n" + pages
blob += "</font>"
return ChatBlob(title, blob)
def row_formatter(self, entry, index, data):
return f"{self.text.zfill(index, len(data))}. " \
f"<white>{self.text.zfill(entry.contracts, data[0].contracts)}</white> " \
f"<{entry.faction.lower()}>{entry.org_name}</{entry.faction.lower()}>\n"
-76
View File
@@ -1,76 +0,0 @@
import time
from core.aochat.BaseModule import BaseModule
from core.chat_blob import ChatBlob
from core.command_alias_service import CommandAliasService
from core.db import DB
from core.decorators import instance, command
from core.event_service import EventService
from core.lookup.pork_service import PorkService
from core.public_channel_service import PublicChannelService
from core.text import Text
from core.igncore import IgnCore
from core.util import Util
from modules.raidbot.tower.tower_service import TowerService
from modules.standard.helpbot.playfield_controller import PlayfieldController
@instance()
class PlantController(BaseModule):
# noinspection DuplicatedCode
def inject(self, registry):
self.bot: IgnCore = registry.get_instance("bot")
self.db: DB = registry.get_instance("db")
self.util: Util = registry.get_instance("util")
self.text: Text = registry.get_instance("text")
self.event_service: EventService = registry.get_instance("event_service")
self.pork_service: PorkService = registry.get_instance("pork_service")
self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller")
self.public_channel_service: PublicChannelService = registry.get_instance("public_channel_service")
self.towercache: TowerService = registry.get_instance("tower_service")
self.command_alias_service: CommandAliasService = registry.get_instance("command_alias_service")
@command(command="penalty", params=[], access_level="member", description="Shows planttimers")
def penalty(self, _):
blob = "<font color=CCInfoText>"
self.towercache.attack_hot.sort(key=lambda k: k['org_name'])
rem = []
for key, value in enumerate(sorted(self.towercache.attack_hot, key=lambda k: k['org_name'])):
if value["hot"] < time.time():
rem.append(key)
continue
lca = self.towercache.get_towers_by_org_name(value["org_name"])
for site in lca:
blob += self.towercache.format_entry(site, 10)
blob += "</font>"
for i in reversed(rem):
self.towercache.attack_hot.pop(i)
if len(self.towercache.attack_hot) == 0:
blob = ""
return ChatBlob(f"Warhot tower sites", blob.strip("\n")) if blob != "" else f"There are no orgs in penalty."
@command(command="plant", params=[], access_level="member", description="Shows planttimers")
def plant(self, _):
blob = "<font color=CCInfoText>"
rem = []
for key, value in enumerate(sorted(self.towercache.plant, key=lambda k: k['pf'])):
print(key, value)
if value["plant"] < time.time():
rem.append(key)
continue
lca = self.playfield_controller.get_playfield_by_id(value['pf'])
blob += f"[<cyan>{lca.short_name}</cyan>] <cyan>x{value['site']}</cyan> in " \
f"<white>{self.util.format_time(value['plant'] - time.time())}</white>\n"
blob += "</font>"
for i in reversed(rem):
self.towercache.plant.pop(i)
if len(self.towercache.plant) == 0:
blob = ""
if blob != "":
return ChatBlob(f"Awaiting plants ({len(self.towercache.plant)})", blob.strip("\n"))
else:
return f"There are no sites awaiting plants."
@@ -1,368 +0,0 @@
import time
from core.chat_blob import ChatBlob
from core.command_param_types import Const, Int, NamedParameters
from core.decorators import instance, command, event, setting
from core.job_scheduler import JobScheduler
from core.logger import Logger
from core.public_channel_service import PublicChannelService
from core.setting_service import SettingService
from core.setting_types import BooleanSettingType
from core.text import Text
from core.igncore import IgnCore
from modules.core.accounting.services.account_service import AccountService
from modules.raidbot.tower.tower_controller import TowerController
from modules.raidbot.tower.tower_service import TowerService
from modules.standard.helpbot.playfield_controller import PlayfieldController
# TODO: This module should get split again in the future, allowing tower-tracking in orgbots, or other types.
#
@instance()
class TowerAttackController:
def __init__(self):
self.logger = Logger(__name__)
def inject(self, registry):
self.bot: IgnCore = registry.get_instance("bot")
self.db = registry.get_instance("db")
self.text: Text = registry.get_instance("text")
self.settings: SettingService = registry.get_instance("setting_service")
self.util = registry.get_instance("util")
self.tower: TowerController = registry.get_instance("tower_controller")
self.towerservice: TowerService = registry.get_instance("tower_service")
self.event_service = registry.get_instance("event_service")
self.command_alias_service = registry.get_instance("command_alias_service")
self.public_channel_service: PublicChannelService = registry.get_instance("public_channel_service")
self.job_scheduler: JobScheduler = registry.get_instance("job_scheduler")
self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller")
self.account_service: AccountService = registry.get_instance("account_service")
def start(self):
self.db.exec(
"CREATE TABLE IF NOT EXISTS tower_attacker ("
"id INT PRIMARY KEY AUTO_INCREMENT, "
"att_org_name VARCHAR(50) NOT NULL, "
"att_faction VARCHAR(10) NOT NULL, "
"att_char_id INT, att_char_name VARCHAR(20) NOT NULL, "
"att_level INT NOT NULL, "
"att_ai_level INT NOT NULL, "
"att_profession VARCHAR(15) NOT NULL, "
"x_coord INT NOT NULL, "
"y_coord INT NOT NULL, "
"is_victory SMALLINT NOT NULL, "
"tower_battle_id INT NOT NULL, "
"created_at INT NOT NULL)")
self.db.exec(
"CREATE TABLE IF NOT EXISTS tower_battle ("
"id INT PRIMARY KEY AUTO_INCREMENT, "
"playfield_id INT NOT NULL, "
"site_number INT NOT NULL, "
"def_org_name VARCHAR(50) NOT NULL, "
"def_faction VARCHAR(10) NOT NULL, "
"is_finished INT NOT NULL, "
"battle_type VARCHAR(20) NOT NULL, "
"last_updated INT NOT NULL)")
self.command_alias_service.add_alias("victory", "attacks")
@command(command="attacks", params=[NamedParameters(["page"])], access_level="member",
description="Show recent tower attacks and victories")
def attacks_cmd(self, _, named_params):
page = int(named_params.page or "1")
page_size = 30
offset = (page - 1) * page_size
sql = """
SELECT
b.*,
a.*,
COALESCE(a.att_level, 0) AS att_level,
COALESCE(a.att_ai_level, 0) AS att_ai_level,
p.short_name,
b.id AS battle_id
FROM
tower_battle b
LEFT JOIN tower_attacker a ON
a.tower_battle_id = b.id
LEFT JOIN playfields p ON
p.id = b.playfield_id
ORDER BY
b.last_updated DESC,
a.created_at DESC
LIMIT %d, %d
""" % (offset, page_size)
data = self.db.query(sql)
t = int(time.time())
blob = self.check_for_all_towers_channel()
if page > 1:
blob += " " + self.text.make_chatcmd("<< Page %d" % (page - 1), self.get_chat_command(page - 1))
if len(data) > 0:
blob += " Page " + str(page)
blob += " " + self.text.make_chatcmd("Page %d >>" % (page + 1), self.get_chat_command(page + 1))
blob += "\n"
current_battle_id = -1
for row in data:
if current_battle_id != row.battle_id:
blob += "\n<pagebreak>"
current_battle_id = row.battle_id
blob += self.format_battle_info(row, t)
blob += self.text.make_tellcmd("More Info", "attacks battle %d" % row.battle_id) + "\n"
blob += "<header2>Attackers:</header2>\n"
blob += "<tab>" + self.format_attacker(row) + "\n"
return ChatBlob("Tower Attacks", blob)
@command(command="attacks", params=[Const("battle"), Int("battle_id")], access_level="member",
description="Show battle info for a specific battle")
def attacks_battle_cmd(self, _, _1, battle_id):
battle = self.db.query_single(
"SELECT b.*, p.short_name FROM tower_battle b "
"LEFT JOIN playfields p ON p.id = b.playfield_id WHERE b.id = ?",
[battle_id])
if not battle:
return "Could not find battle with ID <highlight>%d</highlight>." % battle_id
t = int(time.time())
attackers = self.db.query("SELECT * FROM tower_attacker WHERE tower_battle_id = ? ORDER BY created_at DESC",
[battle_id])
first_activity = attackers[-1].created_at if len(attackers) > 0 else battle.last_updated
blob = self.check_for_all_towers_channel()
blob += self.format_battle_info(battle, t)
blob += f"Duration: <highlight>" \
f"{self.util.time_to_readable(battle.last_updated - first_activity)}</highlight>\n\n"
blob += "<header2>Attackers:</header2>\n"
for row in attackers:
blob += "<tab>" + self.format_attacker(row)
blob += " " + self.format_timestamp(row.created_at, t)
blob += "\n"
return ChatBlob(f"Battle Info {battle_id}", blob)
@event(event_type=TowerController.TOWER_ATTACK_EVENT, description="Create logentries for tower attacks", is_hidden=True)
def tower_attack_event(self, _, event_data):
t = int(time.time())
site_number = self.find_closest_site_number(event_data.location.playfield.id, event_data.location.x_coord,
event_data.location.y_coord)
attacker = event_data.attacker or {}
defender = event_data.defender
battle = self.find_or_create_battle(event_data.location.playfield.id, site_number, defender.org_name,
defender.faction, "attack", t)
# print(battle)
self.db.exec(
"INSERT INTO tower_attacker (att_org_name, att_faction, att_char_id, att_char_name, "
"att_level, att_ai_level, att_profession, "
"x_coord, y_coord, is_victory, tower_battle_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
[attacker.get("org_name", ""), attacker.get("faction", ""), attacker.get("char_id", 0),
attacker.get("name", ""), attacker.get("level", 0),
attacker.get("ai_level", 0), attacker.get("profession", ""), event_data.location.x_coord,
event_data.location.y_coord, 0, battle.id, t])
@setting(name="tower_notify_type", value=False, description="Only notify when our orgs are involved")
def tower_notify_type(self) -> BooleanSettingType:
return BooleanSettingType()
@event(event_type=TowerController.TOWER_ATTACK_EVENT, description="Notify whenever a tower attack happens")
def tower_def_event(self, _, event_data):
if self.tower_notify_type().get_value():
if not (event_data.attacker.get("org_name", None) in self.account_service.get_org_names() or event_data.defender.org_name in self.account_service.get_org_names()):
return
if event_data.attacker.get("name", None) is not None:
field_id = self.find_closest_site_number(event_data.location.playfield.id, event_data.location.x_coord,
event_data.location.y_coord)
row = self.db.query_single(
"SELECT t.*, p.short_name, p.long_name FROM tower_site t "
"JOIN playfields p ON t.playfield_id = p.id WHERE t.playfield_id = ? AND site_number = ?",
[event_data.location.playfield.id, field_id])
lca = self.text.format_page(f"{event_data.location.playfield.long_name} - {field_id:d}",
self.tower.format_site_info(row))
attacker = self.text.format_char_info(event_data.attacker)
add = ""
# Disable for now...
# if account := self.account_service.get_account(event_data.attacker.char_id) and event_data.defender.org_name in self.account_service.get_org_names():
# if self.account_service.simple_checks(account):
# add = " :: <red>He's a <myname> Raider!</red>"
self.bot.send_private_channel_message(
f"[<cyan>NW</cyan>] "
f"<{event_data.defender.faction.lower()}>"
f"{event_data.defender.org_name}"
f"</{event_data.defender.faction.lower()}> "
f"attacked by {attacker} in {lca}{add}")
@event(event_type=TowerController.TOWER_VICTORY_EVENT, description="Record tower victories", is_hidden=True)
def tower_victory_event(self, _, event_data):
t = int(time.time())
if event_data.type == "attack":
row = self.get_last_attack(event_data.winner.faction, event_data.winner.org_name, event_data.loser.faction,
event_data.loser.org_name, event_data.location.playfield.id, t)
if not row:
site_number = 0
is_finished = 1
self.db.exec(
"INSERT INTO tower_battle (playfield_id, site_number, def_org_name, def_faction, "
"is_finished, battle_type, last_updated) VALUES (?, ?, ?, ?, ?, ?, ?)",
[event_data.location.playfield.id, site_number, event_data.loser.org_name, event_data.loser.faction,
is_finished, event_data.type, t])
battle_id = self.db.last_insert_id()
attacker = event_data.winner or {}
self.db.exec(
"INSERT INTO tower_attacker (att_org_name, att_faction, att_char_id, "
"att_char_name, att_level, att_ai_level, att_profession, "
"x_coord, y_coord, is_victory, tower_battle_id, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
[attacker.get("org_name", ""), attacker.get("faction", ""), attacker.get("char_id", 0),
attacker.get("name", ""), attacker.get("level", 0),
attacker.get("ai_level", 0), attacker.get("profession", ""), 0, 0, 0, battle_id, t])
else:
is_victory = 1
self.db.exec("UPDATE tower_attacker SET is_victory = ? WHERE id = ?", [is_victory, row.attack_id])
is_finished = 1
self.db.exec("UPDATE tower_battle SET is_finished = ?, last_updated = ? WHERE id = ?",
[is_finished, t, row.battle_id])
elif event_data.type == "terminated":
site_number = 0
is_finished = 1
self.db.exec(
"INSERT INTO tower_battle (playfield_id, site_number, def_org_name, def_faction, "
"is_finished, battle_type, last_updated) VALUES (?, ?, ?, ?, ?, ?, ?)",
[event_data.location.playfield.id, site_number, event_data.loser.org_name, event_data.loser.faction,
is_finished, event_data.type, t])
else:
raise Exception("Unknown victory event type: '%s'" % event_data.type)
def format_attacker(self, row):
level = f"{row.att_level}/<green>{row.att_ai_level}</green>" if row.att_ai_level > 0 else f"{row.att_level}"
org = row.att_org_name + " " if row.att_org_name else ""
victor = " - <notice>Winner!</notice>" if row.is_victory else ""
return f"{row.att_char_name or 'Unknown attacker'} ({level} {row.att_profession})" \
f" {org}({row.att_faction}){victor}"
def find_closest_site_number(self, playfield_id, x_coord, y_coord):
# noinspection SqlUnused
sql = """
SELECT
site_number,
((x_distance * x_distance) + (y_distance * y_distance)) radius
FROM
(SELECT
playfield_id,
site_number,
min_ql,
max_ql,
x_coord,
y_coord,
site_name,
(x_coord - ?) as x_distance,
(y_coord - ?) as y_distance
FROM
tower_site
WHERE
playfield_id = ?) t
ORDER BY
radius
LIMIT 1"""
row = self.db.query_single(sql, [x_coord, y_coord, playfield_id])
if row:
return row.site_number
else:
return 0
def find_or_create_battle(self, playfield_id, site_number, org_name, faction, battle_type, t):
last_updated = t - (8 * 3600)
is_finished = 0
sql = """
SELECT
*
FROM
tower_battle
WHERE
playfield_id = ?
AND site_number = ?
AND is_finished = ?
AND def_org_name = ?
AND def_faction = ?
AND last_updated >= ?
"""
battle = self.db.query_single(sql, [playfield_id, site_number, is_finished, org_name, faction, last_updated])
if battle:
self.db.exec("UPDATE tower_battle SET last_updated = ? WHERE id = ?", [t, battle.id])
return battle
else:
self.db.exec(
"INSERT INTO tower_battle (playfield_id, site_number, def_org_name, def_faction, "
"is_finished, battle_type, last_updated) VALUES (?, ?, ?, ?, ?, ?, ?)",
[playfield_id, site_number, org_name, faction, is_finished, battle_type, t])
return self.db.query_single(sql, [playfield_id, site_number, is_finished, org_name, faction, last_updated])
def get_last_attack(self, att_faction, att_org_name, def_faction, def_org_name, playfield_id, t):
last_updated = t - (8 * 3600)
is_finished = 0
sql = """
SELECT
b.id AS battle_id,
a.id AS attack_id,
b.playfield_id as pf_id,
b.site_number as site
FROM
tower_battle b
JOIN tower_attacker a ON
a.tower_battle_id = b.id
WHERE
a.att_faction = ?
AND a.att_org_name = ?
AND b.def_faction = ?
AND b.def_org_name = ?
AND b.playfield_id = ?
AND b.is_finished = ?
AND b.last_updated >= ?
ORDER BY
last_updated DESC
LIMIT 1"""
return self.db.query_single(sql,
[att_faction, att_org_name, def_faction, def_org_name, playfield_id, is_finished,
last_updated])
def format_battle_info(self, row, t):
blob = ""
defeated = " - <notice>Defeated!</notice>" if row.is_finished else ""
blob += f"Site: <highlight>{row.short_name} {row.site_number or '?'}</highlight>\n"
blob += f"Defender: <highlight>{row.def_org_name}</highlight> ({row.def_faction}){defeated}\n"
blob += f"Last Activity: {self.format_timestamp(row.last_updated, t)}\n"
return blob
def format_timestamp(self, t, current_t):
return f"<highlight>{self.util.format_datetime(t)}</highlight> " \
f"({self.util.time_to_readable(current_t - t)} ago)"
def get_chat_command(self, page):
return f"/tell <myname> attacks --page={page}"
def check_for_all_towers_channel(self):
if not self.public_channel_service.get_channel_name(TowerController.ALL_TOWERS_ID):
return "Notice: The bot must belong to an org and be promoted to a rank that is high enough " \
"to have the All Towers channel (e.g., Squad Commander) in order for the " \
"<symbol>attacks command to work correctly.\n\n"
else:
return ""
-207
View File
@@ -1,207 +0,0 @@
import re
from core.aochat import server_packets
from core.conn import Conn
from core.db import DB
from core.decorators import instance, event
from core.dict_object import DictObject
from core.event_service import EventService
from core.logger import Logger
from core.lookup.pork_service import PorkService
from core.public_channel_service import PublicChannelService
from core.text import Text
from core.igncore import IgnCore
from modules.standard.helpbot.playfield_controller import PlayfieldController
@instance()
class TowerController:
TOWER_ATTACK_EVENT = "tower_attack"
TOWER_VICTORY_EVENT = "tower_victory"
TOWER_BATTLE_OUTCOME_ID = 42949672962
ALL_TOWERS_ID = 42949672960
# The %s organization %s just entered a state of war!
# %s attacked the %s organization %s's tower in %s at location (%d,%d).
ATTACK_1 = [506, 12753364]
ATTACK_2 = re.compile(r"^(.+) just attacked the (clan|neutral|omni) organization (.+)'s tower in (.+) "
r"at location \((\d+), (\d+)\).\n$")
VICTORY_1 = re.compile(r"^Notum Wars Update: Victory to the (Clan|Neutral|Omni)s!!!$")
VICTORY_2 = re.compile(r"^The (Clan|Neutral|Omni) organization (.+) attacked the (Clan|Neutral|Omni) (.+) "
r"at their base in (.+). The attackers won!!$")
VICTORY_3 = [506, 147506468] # 'Notum Wars Update: The %s organization %s lost their base in %s.'
def __init__(self):
self.logger = Logger(__name__)
def inject(self, registry):
self.bot: IgnCore = registry.get_instance("bot")
self.db: DB = registry.get_instance("db")
self.text: Text = registry.get_instance("text")
self.event_service: EventService = registry.get_instance("event_service")
self.pork_service: PorkService = registry.get_instance("pork_service")
self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller")
self.public_channel_service: PublicChannelService = registry.get_instance("public_channel_service")
def pre_start(self):
self.event_service.register_event_type(self.TOWER_ATTACK_EVENT)
self.event_service.register_event_type(self.TOWER_VICTORY_EVENT)
self.bot.register_packet_handler(server_packets.PublicChannelMessage.id, self.handle_public_channel_message)
self.db.load_sql_file(self.module_dir + "/" + "tower_site.sql", pre_optimized=True)
self.db.create_view("tower_site")
@event(event_type="connect", description="Check if All Towers channel is available", is_hidden=True)
def handle_connect_event(self, _, _1):
if self.public_channel_service.org_id and not self.public_channel_service.get_channel_id("All Towers"):
self.logger.warning("This bot is a member of an org but does not have access to 'All Towers' channel and "
"therefore will not receive tower attack messages")
def format_site_info(self, row):
blob = f"Short name: <highlight>{row.short_name} {row.site_number:d}</highlight>\n"
blob += f"Long name: <highlight>{row.site_name}, {row.long_name}</highlight>\n"
blob += f"Level range: <highlight>{row.min_ql:d}-{row.max_ql:d}</highlight>\n"
blob += "Coordinates: %s\n" % self.text.make_chatcmd(f"{row.x_coord:d}x{row.y_coord:d}",
f"/waypoint {row.x_coord:d} "
f"{row.y_coord:d} "
f"{row.playfield_id:d}")
return blob
def handle_public_channel_message(self, conn: Conn, packet: server_packets.PublicChannelMessage):
if conn.id != "main":
return
if packet.channel_id == self.TOWER_BATTLE_OUTCOME_ID:
victory = self.get_victory_event(packet)
if victory:
# self.logger.debug("tower victory packet: %s" % str(packet))
# lookup playfield
playfield_name = victory.location.playfield.long_name
victory.location.playfield = self.playfield_controller.get_playfield_by_name(playfield_name) or \
DictObject()
victory.location.playfield.long_name = playfield_name
# print(victory)
self.event_service.fire_event(self.TOWER_VICTORY_EVENT, victory)
elif packet.channel_id == self.ALL_TOWERS_ID:
attack = self.get_attack_event(packet)
if attack:
# self.logger.debug("tower attack packet: %s" % str(packet))
# lookup playfield
playfield_name = attack.location.playfield.long_name
attack.location.playfield = self.playfield_controller.get_playfield_by_name(playfield_name) or \
DictObject()
attack.location.playfield.long_name = playfield_name
# lookup attacker
name = attack.attacker.name
faction = attack.attacker.faction
org_name = attack.attacker.org_name
char_info = self.pork_service.get_character_info(name)
attack.attacker = char_info or DictObject()
attack.attacker.name = name
attack.attacker.faction = faction or attack.attacker.get("faction", "Unknown")
attack.attacker.org_name = org_name
self.event_service.fire_event(self.TOWER_ATTACK_EVENT, attack)
def get_attack_event(self, packet: server_packets.PublicChannelMessage):
if packet.extended_message and \
[packet.extended_message.category_id, packet.extended_message.instance_id] == self.ATTACK_1:
params = packet.extended_message.params
return DictObject({
"attacker": {
"name": params[2],
"faction": params[0].capitalize(),
"org_name": params[1]
},
"defender": {
"faction": params[3].capitalize(),
"org_name": params[4]
},
"location": {
"playfield": {
"long_name": params[5]
},
"x_coord": params[6],
"y_coord": params[7]
}
})
else:
match = self.ATTACK_2.match(packet.message)
if match:
return DictObject({
"attacker": {
"name": match.group(1),
"faction": "",
"org_name": ""
},
"defender": {
"faction": match.group(2).capitalize(),
"org_name": match.group(3)
},
"location": {
"playfield": {
"long_name": match.group(4)
},
"x_coord": match.group(5),
"y_coord": match.group(6)
}
})
# Unknown attack
self.logger.warning("Unknown tower attack: " + str(packet))
return None
def get_victory_event(self, packet: server_packets.PublicChannelMessage):
match = self.VICTORY_1.match(packet.message)
if match:
return None
match = self.VICTORY_2.match(packet.message)
if match:
return DictObject({
"type": "attack",
"winner": {
"faction": match.group(1).capitalize(),
"org_name": match.group(2)
},
"loser": {
"faction": match.group(3).capitalize(),
"org_name": match.group(4)
},
"location": {
"playfield": {
"long_name": match.group(5)
}
}
})
if packet.extended_message and \
[packet.extended_message.category_id, packet.extended_message.instance_id] == self.VICTORY_3:
params = packet.extended_message.params
return DictObject({
"type": "terminated",
"winner": {
"faction": params[0].capitalize(),
"org_name": params[1]
},
"loser": {
"faction": params[0].capitalize(),
"org_name": params[1]
},
"location": {
"playfield": {
"long_name": params[2]
}
}
})
# Unknown victory
self.logger.warning("Unknown tower victory: " + str(packet))
return None
@@ -1,204 +0,0 @@
from core.aochat.BaseModule import BaseModule
from core.chat_blob import ChatBlob
from core.command_alias_service import CommandAliasService
from core.command_param_types import Options, Int, Any, Const, NamedParameters
from core.db import DB
from core.decorators import instance, command
from core.dict_object import DictObject
from core.event_service import EventService
from core.lookup.pork_service import PorkService
from core.public_channel_service import PublicChannelService
from core.text import Text
from core.igncore import IgnCore
from core.util import Util
from modules.raidbot.tower.tower_service import TowerService
from modules.standard.helpbot.playfield_controller import PlayfieldController
@instance()
class TowerHotController(BaseModule):
PAGE_SIZE = 9
# noinspection DuplicatedCode
def inject(self, registry):
self.bot: IgnCore = registry.get_instance("bot")
self.db: DB = registry.get_instance("db")
self.util: Util = registry.get_instance("util")
self.text: Text = registry.get_instance("text")
self.event_service: EventService = registry.get_instance("event_service")
self.pork_service: PorkService = registry.get_instance("pork_service")
self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller")
self.public_channel_service: PublicChannelService = registry.get_instance("public_channel_service")
self.towercache: TowerService = registry.get_instance("tower_service")
self.command_alias_service: CommandAliasService = registry.get_instance("command_alias_service")
def pre_start(self):
self.command_alias_service.add_alias('towers', "lc")
self.command_alias_service.add_alias('tower', "lc")
@command(command="hot",
params=[Options(['tl1', 'tl2', 'tl3', 'tl4', 'tl5', 'tl6', 'tl7']), Any('faction', is_optional=True),
NamedParameters(["page"])],
access_level="member",
description="Shows hot playfields")
def hot_tl(self, _, tl, faction: str, named_params):
if faction:
if faction.startswith("--page="):
named_params = DictObject({'page': faction[7:]})
faction = None
if faction is not None and faction.lower() not in ['omni', 'clan', 'neut', 'neutral']:
return f"Unknown faction: {faction}"
tl = tl[2:]
page = int(named_params.page or "1")
offset = (page - 1) * self.PAGE_SIZE
towers = self.towercache.get_towers_hot_tl(int(tl), faction)
return self.text.format_pagination(towers, offset, page, self.formatter, f"Hot Sites TL{tl} ({len(towers)})",
f"There are no hot sites for TL <highlight>{tl}</highlight>.",
f'hot tl{tl} {faction or ""}', 9)
def formatter(self, row, _, data):
return self.towercache.format_entry(row, len(data))
@command(command="hot",
params=[Int('level', is_optional=True), Any('faction', is_optional=True), NamedParameters(["page"])],
access_level="member",
description="Shows hot playfields by level")
def hot_level(self, _, level, faction, named_params):
if faction:
if faction.startswith("--page="):
named_params = DictObject({'page': faction[7:]})
faction = None
if faction is not None and faction.lower() not in ['omni', 'clan', 'neut', 'neutral']:
return f"Unknown faction: {faction}"
if level:
if level < 0 | level > 220:
return f"Level out of range: {level}"
page = int(named_params.page or "1")
offset = (page - 1) * self.PAGE_SIZE
towers = self.towercache.get_towers_hot_level(level, faction)
level = f"{level}" if level else ""
faction = f"{faction} " if faction else ""
return self.text.format_pagination(towers, offset, page, self.formatter, f"Hot Towersites ({len(towers)})",
f"There are no hot sites.", f'hot {level}{faction}', 9)
@command(command="free", params=[],
access_level="member",
description="Shows hot playfields by level")
def free(self, _, ):
blob = ""
towers = self.towercache.get_free()
for row in towers:
blob += self.towercache.format_entry(row, len(towers))
return ChatBlob(f"FREE Towersites ({len(towers)})", blob) if blob else f"No free towersites found."
@command(command="lc", params=[], access_level="member",
description="See a list of land control tower sites in a particular playfield")
def lc(self, _):
hot, cold, unplanted = 0, 0, 0
clan, omni, neut = 0, 0, 0
last_pf = 0
previous = {}
blob = ""
def number(numb):
if numb < 10:
return f"<black>0</black>{numb}"
return numb
for tower in self.towercache.get_towers_all():
if tower.id != last_pf:
if last_pf == 0:
previous = tower
last_pf = tower.id
continue
blob += f"<red>{number(hot)}</red> <cyan>{number(cold)}</cyan> <grey>{number(unplanted)}</grey> :: " \
f"<clan>{number(clan)}</clan> <neutral>{number(neut)}</neutral> <omni>{number(omni)}</omni> " \
f"[{self.text.make_tellcmd(previous.short_name, f'lc {previous.long_name}')}] " \
f"{previous.long_name}\n"
hot, cold, unplanted = 0, 0, 0
clan, omni, neut = 0, 0, 0
previous = tower
last_pf = tower.id
site = self.towercache.is_hot(tower)
if site == 0:
cold += 1
elif site == -1:
unplanted += 1
elif site == 1:
hot += 1
faction = tower.get('faction', None)
if faction:
faction = faction.lower
if faction == "omni":
omni += 1
elif faction == "clan":
clan += 1
else:
neut += 1
return ChatBlob('All Tower Sites', blob)
@command(command="lc", params=[Const("org"), Any("org_name", is_optional=True)], access_level="member",
description="See a list of land control tower sites in a particular playfield")
def lc_org(self, _, _1, org):
towers = []
try:
org = int(org)
except ValueError:
pass
if type(org) == str:
orgs = self.db.query(
"SELECT * from all_orgs where org_name LIKE ? and org_id in "
"(SELECT org_id from towers group by org_id)",
[f"%{org.replace(' ', '%')}%"])
if len(orgs) == 1:
towers = self.towercache.get_towers_by_org(orgs[0].org_id)
elif len(orgs) == 0:
return "Your search returned no orgs."
else:
blob = "Your search had multiple results; please pick an org:<br>"
for org in orgs:
blob += "[%s] <highlight>%s<end> (<highlight>%s<end>) <%s>%s<end> [<highlight>%s<end> " \
"members]<br><pagebreak>" \
% (self.text.make_chatcmd("Towers", "/tell <myname> lc org %s" % org.org_id),
org.org_name, org.org_id, org.faction.lower(), org.faction, org.member_count)
return ChatBlob("Pick an Org", blob)
elif type(org) == int:
if len(self.db.query("SELECT org_id from all_orgs where org_id=?", [int(org)])) == 0:
return "Your search returned no orgs."
else:
towers = self.towercache.get_towers_by_org(org)
title = f"Towersites of the org {org}"
blob = ""
for tower in towers:
blob += self.towercache.format_entry(tower, len(towers))
return ChatBlob(title, blob)
@command(command="lc", params=[Any("playfield"), Int("site_number", is_optional=True)], access_level="member",
description="See a list of land control tower sites in a particular playfield")
def lc_playfield(self, _, playfield_name, site_number):
playfield = self.playfield_controller.get_playfield_by_name(playfield_name)
if not playfield:
return "Could not find playfield <highlight>%s</highlight>." % playfield_name
if site_number:
title = f"Tower site x{site_number} in {playfield.long_name}"
towers = self.towercache.get_towers_by_pf_site(playfield.id, site_number)
else:
title = f"Tower sites in {playfield.long_name}"
towers = self.towercache.get_towers_by_pf(playfield.id)
blob = ""
for tower in towers:
blob += self.towercache.format_entry(tower, len(towers))
if site_number:
blob += "More to come... stay tuned."
return ChatBlob(title, blob)
-298
View File
@@ -1,298 +0,0 @@
import time
from core.aochat.BaseModule import BaseModule
from core.db import DB
from core.decorators import instance, event, setting
from core.igncore import IgnCore
from core.job_scheduler import JobScheduler
from core.setting_types import BooleanSettingType
from core.text import Text
from core.util import Util
from modules.core.accounting.services.account_service import AccountService
from modules.raidbot.tower.tower_controller import TowerController
from modules.standard.helpbot.playfield_controller import PlayfieldController
@instance()
class TowerService(BaseModule):
# For this Module to work properly you might need to
# contact the API host of your choice to whitelist your IP addresses.
attack_hot = []
plant = []
def inject(self, registry):
self.bot: IgnCore = registry.get_instance("bot")
self.db: DB = registry.get_instance("db")
self.util: Util = registry.get_instance("util")
self.text: Text = registry.get_instance("text")
self.playfield_controller: PlayfieldController = registry.get_instance("playfield_controller")
self.job_scheduler: JobScheduler = registry.get_instance("job_scheduler")
self.account_service: AccountService = registry.get_instance("account_service")
def pre_start(self):
self.db.shared.exec("CREATE TABLE IF NOT EXISTS towers("
"pf_id int not null, "
"site_number int not null,"
"ql int,"
"x_coord int not null,"
"y_coord int not null,"
"org_id int,"
"org_name varchar(255), "
"faction varchar(32), "
"close_time int,"
"planted int,"
"enabled tinyint, "
"PRIMARY KEY (pf_id, site_number), "
"INDEX ql(ql), INDEX close(close_time), "
"INDEX planted(planted), INDEX enabled(enabled)) ENGINE MEMORY")
self.db.create_view("towers")
@event(event_type=TowerController.TOWER_ATTACK_EVENT, description="Track planthot", is_hidden=True)
def tower_attack(self, _, event_data):
if event_data.attacker.get("org_id", None):
self.attack_hot.append({'org_name': event_data.attacker.org_name, 'hot': time.time() + 60 * 60})
@setting(name="tower_notify_type", value=False, description="Only notify when our orgs are involved")
def tower_notify_type(self) -> BooleanSettingType:
return BooleanSettingType()
@event(event_type=TowerController.TOWER_VICTORY_EVENT, description="Send NW warnings")
def victory(self, _, event_data):
t = int(time.time())
if event_data.type == "attack":
if self.tower_notify_type().get_value():
if not (event_data.winner.get("org_name", None) in self.account_service.get_org_names()
or event_data.loser.org_name in self.account_service.get_org_names()):
return
row = self.get_last_attack(event_data.winner.faction, event_data.winner.org_name, event_data.loser.faction,
event_data.loser.org_name, event_data.location.playfield.id, t)
self.send_nw_warn(0,
f'<{event_data.loser.faction.lower()}>'
f'{event_data.loser.org_name}'
f'</{event_data.loser.faction.lower()}>'
f' Lost their Site at {event_data.location.playfield.short_name} x{row.site} - '
f'<{event_data.winner.faction.lower()}>'
f'{event_data.winner.org_name}'
f'</{event_data.winner.faction.lower()}> <highlight>won!!</highlight>')
self.plant.append({'pf': row.pf_id, 'site': row.site, 'plant': time.time() + 20 * 60 - 1})
self.prepare_nw_warn(row.pf_id, row.site)
elif event_data.type == "terminated":
# DEBUG: terminated sites.. behave strange.
# for that reason, we'll just output these events to the console, but not the log.
print(event_data)
field = self.db.query("SELECT * FROM towers t where t.org_name=? and t.pf_id=?",
[event_data.loser.org_name, event_data.playfield.id])
if len(field) == 1:
field = field[0]
self.plant.append({'pf': event_data.location.playfield.id,
'site': field.site_number,
'plant': time.time() + 20 * 60 - 1})
self.send_nw_warn(0, f'<{event_data.loser.faction.lower()}>'
f'{event_data.loser.org_name}'
f'</{event_data.loser.faction.lower()}> Lost their Site at '
f'{event_data.location.playfield.short_name} x{field.site_number}')
self.prepare_nw_warn(event_data.location.playfield.id, field.site_number)
return
self.plant.append({'pf': event_data.location.playfield.id,
'site': f"(<red>UKN</red>) PO: {event_data.loser.org_name}|{event_data.loser.faction}",
'plant': time.time() + 20 * 60 - 1})
self.send_nw_warn(0,
f'<{event_data.loser.faction.lower()}>'
f'{event_data.loser.org_name}'
f'</{event_data.loser.faction.lower()}> '
f'Lost their Site in {event_data.location.playfield.long_name}')
self.prepare_nw_warn(event_data.playfield.id, "(<red>UKN</red>)",
f"(<red>UKN</red>) PO: {event_data.loser.org_name}|{event_data.loser.faction}")
def day_time(self, day_t):
if day_t > 86400:
day_t -= 86400
elif day_t < 0:
day_t += 86400
return day_t
def get(self, where_order="", param=None):
if param is None:
param = []
return self.db.query(f"SELECT a.org_id, a.planted, a.close_time, a.ql, a.org_name, a.faction, "
f"b.min_ql, b.max_ql, b.site_name, b.site_number, b.x_coord, b.y_coord, "
f"c.id, c.long_name, c.short_name, "
f"e.pvp_min, e.pvp_max "
f"FROM towers a "
f"INNER JOIN tower_site b ON a.site_number=b.site_number "
f"LEFT JOIN playfields c ON a.pf_id = c.id "
f"LEFT JOIN level e ON e.level = a.ql "
f"WHERE a.pf_id = b.playfield_id and enabled = 1 "
f"{where_order}", param)
def get_towers_by_tl(self, tl, faction=None):
min_ql, max_ql = self.util.get_level_range_tl(tl)
if faction:
return self.get("and ql between ? and ? and faction LIKE ? order by c.short_name",
[min_ql, max_ql, faction])
return self.get("and ql between ? and ? order by c.short_name", [min_ql, max_ql])
def get_towers_all(self):
return self.get("order by c.long_name", [])
def get_towers_by_pf(self, pf):
return self.get("and a.pf_id=? order by a.site_number", [pf])
def get_towers_by_pf_site(self, pf, site):
return self.get("and a.pf_id=? and a.site_number=?", [pf, site])
def get_towers_by_org(self, org):
return self.get("and a.org_id"
"=? order by c.long_name", [org])
def get_towers_by_org_name(self, org):
return self.get("and a.org_name=? order by c.long_name", [org])
def get_free(self):
return self.get("and a.org_id IS NULL order by a.site_number", [])
def get_towers_hot_tl(self, tl, faction=None):
towers = self.get_towers_by_tl(tl, faction)
out = []
for tower in towers:
if self.is_hot(tower) in [1, 2]:
out.append(tower)
return out
def get_towers_hot_level(self, level, faction=None):
out = []
if level:
if faction:
towers = self.get("and pvp_min <= ? and pvp_max >= ? and faction LIKE ?", [level, level, faction])
else:
towers = self.get("and pvp_min <= ? and pvp_max >= ?", [level, level])
else:
if faction:
towers = self.get("and faction LIKE ?", [faction])
else:
towers = self.get()
for tower in towers:
if self.is_hot(tower) in [1, 2]:
out.append(tower)
return out
def format_entry(self, entry, _):
h3 = ""
now = self.day_time(int(time.time()) % 86400)
row0 = f"<font color=CCInfoText>Site: <blue>{entry.short_name}</blue> <red>x{entry.site_number}</red> " \
f"[R:<red>{entry.min_ql} - {entry.max_ql}</red>] " \
f"[{self.text.make_tellcmd('More', f'lc {entry.short_name} {entry.site_number}')}]\n"
row1 = f'<grey>UKN</grey> :: <red>No Owner -> Unplanted</red>\n'
row2 = ""
row3 = "</font>\n"
hot = self.is_hot(entry)
if hot != -1:
h1 = "<cyan>COLD</cyan>"
if hot == 2:
h1 = "<red>WARHOT</red>"
for org in self.attack_hot:
if org['org_name'] == entry.org_name:
hot_normal = self.is_hot(entry, False)
# print(hot, hot_normal, org['hot'] - now)
if hot_normal == 1:
h3 = f"<cyan>COLD</cyan> in " \
f"{self.util.format_time(self.day_time(int(entry.close_time - now)))}"
if hot_normal == 0:
h3 = f"<cyan>COLD</cyan> in {self.util.format_time(org['hot'] - now)}"
elif hot == 1:
h1 = '<red>HOT</red>'
h3 = f"<cyan>COLD</cyan> in {self.util.format_time(self.day_time(int(entry.close_time - now)))}"
else:
hot_time = self.day_time(entry.close_time - 6 * 60 * 60)
h3 = f"<red>HOT</red> in " \
f"{self.util.format_time((18 * 80 * 60 - hot_time) if hot_time > 18 * 60 * 60 else hot_time)}"
org = f"<{entry.faction.lower()}>{entry.org_name}</{entry.faction.lower()}> " \
f"[{self.text.make_tellcmd('View org', f'lc org {entry.org_name}')}]"
row1 = f"{h1} :: {h3} :: {org} :: \n"
row3 = f" » Planted: <grey>{self.util.format_datetime(entry.planted)}</grey></font>\n\n"
if entry.pvp_min:
pvp = f"[<red>{entry.pvp_min} - {entry.pvp_max}</red>]"
else:
pvp = f"[<red>175 - 220</red>]"
# noinspection LongLine
row2 = f" » QL: <red>{entry.ql}</red> PvP: {pvp} " \
f"[{self.text.make_chatcmd(f'{entry.x_coord} x {entry.y_coord}', f'/waypoint {entry.x_coord} {entry.y_coord} {entry.id}')}]\n"
return row0 + row1 + row2 + row3 + "<pagebreak>"
def is_hot(self, entry, with_war=True) -> int:
if entry.get("close_time", None):
now = self.day_time((time.time()) % 86400)
self.attack_hot.sort(key=lambda k: k['org_name'])
rem = []
inside = False
for index, i in enumerate(self.attack_hot):
if i['hot'] < time.time():
rem.append(index)
continue
if i['org_name'] == entry.org_name:
inside = True
for index in reversed(rem):
self.attack_hot.pop(index)
if inside and with_war:
return 2
if self.day_time(entry.close_time - int(now)) > 6 * 60 * 60:
return 0
return 1
return -1
def get_last_attack(self, att_faction, att_org_name, def_faction, def_org_name, playfield_id, t):
last_updated = t - (8 * 3600)
is_finished = 1
sql = """
SELECT
b.id AS battle_id,
a.id AS attack_id,
b.playfield_id as pf_id,
b.site_number as site
FROM
tower_battle b
JOIN tower_attacker a ON
a.tower_battle_id = b.id
WHERE
a.att_faction = ?
AND a.att_org_name = ?
AND b.def_faction = ?
AND b.def_org_name = ?
AND b.playfield_id = ?
AND b.is_finished = ?
AND b.last_updated >= ?
ORDER BY
last_updated DESC
LIMIT 1"""
return self.db.query_single(sql,
[att_faction, att_org_name, def_faction, def_org_name, playfield_id, is_finished,
last_updated])
def prepare_nw_warn(self, pf_id, site, bonus=""):
pf = self.playfield_controller.get_playfield_by_id(pf_id)
site = f"{pf.short_name} <cyan>»</cyan> x{site}" + bonus
self.job_scheduler.delayed_job(self.send_nw_warn, 0, f"{site} plantable in 20 minutes!")
self.job_scheduler.delayed_job(self.send_nw_warn, 10 * 60 - 1, f"{site} plantable in 10 minutes!")
self.job_scheduler.delayed_job(self.send_nw_warn, 15 * 60 - 1, f"{site} plantable in 5 minutes!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 - 1, f"{site} plantable in 1 minute!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 30 - 1, f"{site} plantable in 30 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 45 - 1, f"{site} plantable in 15 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 50 - 1, f"{site} plantable in 10 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 55 - 1, f"{site} plantable in 5 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 56 - 1, f"{site} plantable in 4 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 57 - 1, f"{site} plantable in 3 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 58 - 1, f"{site} plantable in 2 seconds!")
self.job_scheduler.delayed_job(self.send_nw_warn, 19 * 60 + 59 - 1, f"{site} plantable in 1 second!")
self.job_scheduler.delayed_job(self.send_nw_warn, 20 * 60 - 1, f"{site} plantable <green>NOW</green>!")
def send_nw_warn(self, _, msg):
self.bot.send_private_channel_message(f"[<cyan>NW</cyan>] {msg}")
File diff suppressed because one or more lines are too long