c04f76c0db
Fixed command & event threading Events are now threaded by event_type (i.e. all buddy_logon events get ran in the same one) Added default preferences Fixed recipe loading for multiple installs (i.e. on different machines)
360 lines
14 KiB
Python
360 lines
14 KiB
Python
import math
|
|
import re
|
|
from html.parser import HTMLParser
|
|
|
|
from core.chat_blob import ChatBlob
|
|
from core.decorators import instance
|
|
from core.logger import Logger
|
|
from core.setting_service import SettingService
|
|
|
|
|
|
class MLStripper(HTMLParser):
|
|
def error(self, message):
|
|
pass
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.reset()
|
|
self.strict = False
|
|
self.convert_charrefs = True
|
|
self.fed = []
|
|
self.chat_commands = []
|
|
|
|
def handle_data(self, d):
|
|
self.fed.append(d)
|
|
|
|
def get_data(self):
|
|
return "".join(self.fed)
|
|
|
|
|
|
@instance()
|
|
class Text:
|
|
separators = [{"symbol": "<pagebreak>", "include": False}, {"symbol": "\n", "include": True},
|
|
{"symbol": "<br>", "include": True}, {"symbol": " ", "include": True}]
|
|
|
|
# taken from IGN bot
|
|
pixel_mapping = {'i': 3, 'l': 3, 'K': 10, 'R': 10, "'": 3, 'e': 8, 'U': 10, 'j': 5, 'I': 5, '|': 6, 'N': 10, 'f': 5,
|
|
'.': 5, ' ': 5,
|
|
',': 5, 'J': 6, 'r': 6, 't': 6, '!': 6, '(': 6, ')': 6, '[': 6, ']': 6, '/': 6, ':': 6, ';': 6,
|
|
'"': 6, 'c': 7,
|
|
'-': 7, 's': 8, 'v': 8, 'k': 8, 'a': 8, 'y': 8, 'z': 8, 'F': 8, 'L': 8, 'P': 8, 'n': 9, '3': 9,
|
|
'b': 9, 'd': 9,
|
|
'g': 9, 'h': 9, 'Y': 9, 'S': 10, 'Q': 11, 'w': 11, '<': 11, '>': 11, '=': 11, 'q': 9, 'u': 9,
|
|
'x': 9, '0': 9,
|
|
'1': 9, '2': 9, '4': 9, '5': 9, '6': 9, '7': 9, '8': 9, '9': 9, 'E': 9, 'T': 9, '$': 9, '*': 9,
|
|
'{': 9, '}': 9,
|
|
'_': 9, '`': 9, 'A': 10, 'B': 10, 'C': 10, 'H': 10, 'V': 10, 'X': 10, 'Z': 10, '&': 10, 'D': 11,
|
|
'G': 11, 'M': 11,
|
|
'O': 11, '+': 11, '~': 11, '%': 15, 'p': 9, 'm': 13, 'o': 9, '@': 14, 'W': 15}
|
|
|
|
def __init__(self):
|
|
self.logger = Logger(__name__)
|
|
self.items_regex = re.compile(r"<a href=\"itemref://(\d+)/(\d+)/(\d+)\">(.+?)</a>")
|
|
|
|
def inject(self, registry):
|
|
self.setting_service: SettingService = registry.get_instance("setting_service")
|
|
self.ban = registry.get_instance("ban_service")
|
|
self.bot = registry.get_instance("bot")
|
|
self.public_channel_service = registry.get_instance("public_channel_service")
|
|
|
|
def make_chatcmd(self, name, msg, style=""):
|
|
msg = msg.strip()
|
|
msg = msg.replace("'", "'")
|
|
return f"<a {style} href='chatcmd://{msg}'>{name}</a>"
|
|
|
|
def make_tellcmd(self, name, msg, style="", char="<myname>"):
|
|
return self.make_chatcmd(name, f"/tell {char} {msg}", style)
|
|
|
|
def make_charlink(self, char, style=""):
|
|
return f"<a {style} href='user://{char}'>{char}</a>"
|
|
|
|
def make_item(self, low_id, high_id, ql, name):
|
|
return f"<a href='itemref://{low_id:d}/{high_id:d}/{ql:d}'>{name}</a>"
|
|
|
|
def make_image(self, image_id, image_db="rdb"):
|
|
return f"<img src='{image_db}://{image_id}'>"
|
|
|
|
def format_item(self, item, ql=None, with_icon=True):
|
|
if not item:
|
|
return None
|
|
|
|
ql = ql or item["highql"]
|
|
|
|
result = self.make_item(item["lowid"], item["highid"], ql, item["name"])
|
|
|
|
if with_icon:
|
|
result = self.make_image(item["icon"]) + "\n" + result
|
|
|
|
return result
|
|
|
|
def generate_item(self, item, ql, synonym=None):
|
|
if synonym:
|
|
return {f"icon_{synonym}": self.make_item(item.lowid, item.highid, ql, self.make_image(item.icon)),
|
|
f"text_{synonym}": self.make_item(item.lowid, item.highid, ql, item.name)}
|
|
else:
|
|
return {"icon": self.make_item(item.lowid, item.highid, ql, self.make_image(item.icon)),
|
|
"text": self.make_item(item.lowid, item.highid, ql, item.name)}
|
|
|
|
def get_count_digits(self, number: int):
|
|
"""Return number of digits in a number."""
|
|
|
|
if number == 0:
|
|
return 1
|
|
|
|
number = abs(number)
|
|
|
|
if number <= 999999999999997:
|
|
return math.floor(math.log10(number)) + 1
|
|
|
|
count = 0
|
|
while number:
|
|
count += 1
|
|
number //= 10
|
|
return count
|
|
|
|
def zfill(self, numb, highest_number):
|
|
return f"<black>{(self.get_count_digits(highest_number) - self.get_count_digits(numb)) * '0'}</black>{numb}"
|
|
|
|
def format_pagination(self, data, offset, page, formatter, title, no_data_msg, cmd, page_size=10, headline=""):
|
|
selected = data[offset:offset + page_size]
|
|
count = len(selected)
|
|
pages = ""
|
|
if page > 1:
|
|
pages += "Pages: " + self.make_tellcmd(f"«« Page {page - 1:d}", f'{cmd} --page={page - 1}')
|
|
if offset + page_size < len(data):
|
|
pages += f" Page {page}/{math.ceil(len(data) / page_size)}"
|
|
pages += " " + self.make_tellcmd(f"Page {page + 1:d} »»", f'{cmd} --page={page + 1}')
|
|
pages += "\n"
|
|
if count == 0:
|
|
return no_data_msg
|
|
else:
|
|
blob = "<font color=CCInfoText>"
|
|
blob += "" + pages + "\n"
|
|
blob += headline
|
|
index = offset
|
|
for entry in selected:
|
|
index += 1
|
|
blob += formatter(entry, index, data)
|
|
blob += pages
|
|
blob += "</font>"
|
|
return ChatBlob(title, blob)
|
|
|
|
def format_char_info(self, char_info, online_status=None, check_ban=False):
|
|
banned = ""
|
|
|
|
if char_info.org_name and char_info.org_rank_name:
|
|
msg = f"<{char_info.faction.lower()}>{char_info.name}</{char_info.faction.lower()}> :: " \
|
|
f"{char_info.level}/<green>{char_info.ai_level}</green> {char_info.profession} :: " \
|
|
f"<{char_info.faction.lower()}>{char_info.org_name}</{char_info.faction.lower()}>"
|
|
elif char_info.get("level", None):
|
|
msg = f"<{char_info.faction.lower()}>{char_info.name}</{char_info.faction.lower()}> :: " \
|
|
f"{char_info.level}/<green>{char_info.ai_level}</green> {char_info.profession}"
|
|
elif char_info.name:
|
|
msg = f"<highlight>{char_info.name}</highlight>"
|
|
else:
|
|
msg = f"<highlight>CharId({char_info.char_id:d})</highlight>"
|
|
if check_ban:
|
|
banned = f" :: <red>Banned!</red>" if self.ban.get_ban(char_info.char_id) else ""
|
|
msg += banned
|
|
if online_status is not None:
|
|
msg += " :: " + ("<green>Online</green>" if online_status else "<red>Offline</red>")
|
|
|
|
return msg
|
|
|
|
def get_formatted_faction(self, faction, contents=None):
|
|
if not contents:
|
|
contents = faction.capitalize()
|
|
faction = faction.lower()
|
|
if faction in ["omni", "clan", "neutral"]:
|
|
return f"<{faction}>{contents}</{faction}>"
|
|
return f"<unknown>{contents}</unknown>"
|
|
|
|
def paginate_single(self, chatblob):
|
|
return self.paginate(chatblob, 8000)[0]
|
|
|
|
def paginate(self, chatblob, max_page_length=None, max_num_pages=None, footer=None):
|
|
label = chatblob.title
|
|
msg = chatblob.msg
|
|
|
|
msg = msg.strip()
|
|
|
|
# chat blobs with empty messages are rendered as simple strings instead of links
|
|
if not msg:
|
|
return [label]
|
|
|
|
msg = self.items_regex.sub(r"<a href='itemref://\1/\2/\3'>\4</a>", msg)
|
|
|
|
color = self.setting_service.get("blob_color").get_font_color()
|
|
msg = ("<header>" + label + "</header>\n\n" + color + msg).replace("\"", """)
|
|
msg = self.format_message(msg)
|
|
|
|
if footer:
|
|
footer = "\n\n" + self.format_message(footer.replace("\"", """).strip())
|
|
else:
|
|
footer = ""
|
|
|
|
adjusted_max_page_length = None
|
|
if max_page_length:
|
|
adjusted_max_page_length = max_page_length - len(footer)
|
|
pages = self.split_by_separators(msg, adjusted_max_page_length, max_num_pages)
|
|
pages = list(map(lambda p: p + footer, pages))
|
|
|
|
num_pages = len(pages)
|
|
|
|
def mapper(tup):
|
|
page, index = tup
|
|
if num_pages == 1:
|
|
label2 = self.format_message(label)
|
|
else:
|
|
label2 = self.format_message(label) + " (Page " + str(index) + " / " + str(num_pages) + ")"
|
|
return self.format_message(chatblob.page_prefix) + self.format_page(label2, page) + self.format_message(chatblob.page_postfix)
|
|
|
|
return list(map(mapper, zip(pages, range(1, num_pages + 1))))
|
|
|
|
def split_by_separators(self, content, max_page_length=None, max_num_pages=None):
|
|
separators = iter(self.separators)
|
|
|
|
separator = next(separators)
|
|
rest = content
|
|
current_page = ""
|
|
pages = []
|
|
|
|
while len(rest) > 0:
|
|
line, rest = self.get_next_line(rest, separator)
|
|
line_length = len(line)
|
|
|
|
# if separator is not sufficient, try the next one
|
|
if max_page_length and line_length > max_page_length:
|
|
try:
|
|
separator = next(separators)
|
|
rest = line + rest
|
|
continue
|
|
except StopIteration:
|
|
# this is thrown when there are no more separators in the iterator
|
|
raise Exception("Could not paginate: page is too large")
|
|
|
|
if max_num_pages == len(pages) + 1:
|
|
if max_page_length and (len(current_page) + line_length > max_page_length):
|
|
break
|
|
else:
|
|
if max_page_length and len(current_page) + line_length > max_page_length:
|
|
pages.append(current_page.strip())
|
|
current_page = ""
|
|
|
|
current_page += line
|
|
|
|
current_page = current_page.strip()
|
|
if max_page_length and len(current_page) > max_page_length:
|
|
pages.append(current_page)
|
|
else:
|
|
pages.append(current_page)
|
|
|
|
return pages
|
|
|
|
def format_page(self, label, msg):
|
|
return "<a href=\"text://%s\">%s</a>" % (msg, label)
|
|
|
|
def get_next_line(self, msg, separator):
|
|
result = msg.split(separator["symbol"], 1)
|
|
line = result[0]
|
|
if len(result) == 1:
|
|
rest = ""
|
|
else:
|
|
rest = result[1:][0]
|
|
|
|
if separator["include"]:
|
|
line += separator["symbol"]
|
|
|
|
return line, rest
|
|
|
|
def strip_html_tags(self, s):
|
|
if not s:
|
|
return None
|
|
|
|
stripper = MLStripper()
|
|
stripper.feed(s)
|
|
return stripper.get_data()
|
|
|
|
def pad_table(self, rows, fill=" "):
|
|
max_width = {}
|
|
for columns in rows:
|
|
for i, column in enumerate(columns[:-1]):
|
|
w = self.get_pixel_width(column)
|
|
if i not in max_width or max_width[i] < w:
|
|
max_width[i] = w
|
|
|
|
for columns in rows:
|
|
adjustment = 0
|
|
num_cols = len(columns)
|
|
for i, column in enumerate(columns):
|
|
if i == num_cols - 1:
|
|
continue
|
|
|
|
s, new_adjustment = self.pad_string(column, adjustment + max_width[i], fill)
|
|
columns[i] = s
|
|
adjustment += new_adjustment
|
|
|
|
return rows
|
|
|
|
def pad_string(self, s, length, fill=" "):
|
|
if s is None:
|
|
s = ""
|
|
|
|
s_pixel_width = self.get_pixel_width(s)
|
|
spacer_pixel_width = self.get_pixel_width(fill)
|
|
fill_width = length - s_pixel_width
|
|
if fill_width > 0:
|
|
num_spacers = round(fill_width / spacer_pixel_width)
|
|
else:
|
|
num_spacers = 0
|
|
adjustment = fill_width - (spacer_pixel_width * num_spacers)
|
|
return s + (num_spacers * fill), adjustment
|
|
|
|
def get_pixel_width(self, s):
|
|
if not s:
|
|
return 0
|
|
|
|
s = self.strip_html_tags(s)
|
|
|
|
width = 0
|
|
for c in s:
|
|
pixel_width = self.pixel_mapping.get(c, None)
|
|
if not pixel_width:
|
|
self.logger.warning(f"Unknown pixel width mapping for char '{c}'")
|
|
pixel_width = 8
|
|
width += pixel_width or 8
|
|
return width
|
|
|
|
def format_message(self, msg, replace_br=True):
|
|
for t in ["</header>", "</header2>", "</highlight>", "</notice>", "</black>", "</white>", "</yellow>",
|
|
"</blue>", "</green>", "</red>", "</orange>", "</grey>", "</cyan>",
|
|
"</violet>", "</neutral>", "</omni>", "</clan>", "</unknown>"]:
|
|
msg = msg.replace(t, "</font>")
|
|
if replace_br:
|
|
msg = msg.replace("<br>", "\n")
|
|
return msg \
|
|
.replace("<header>", self.setting_service.get("header_color").get_font_color()) \
|
|
.replace("<header2>", self.setting_service.get("header2_color").get_font_color()) \
|
|
.replace("<highlight>", self.setting_service.get("highlight_color").get_font_color()) \
|
|
.replace("<notice>", self.setting_service.get("notice_color").get_font_color()) \
|
|
.replace("<black>", "<font color=#000000>") \
|
|
.replace("<white>", "<font color=#FFFFFF>") \
|
|
.replace("<yellow>", "<font color=#FFFF00>") \
|
|
.replace("<blue>", "<font color=#8CB5FF>") \
|
|
.replace("<green>", "<font color=#00DE42>") \
|
|
.replace("<red>", "<font color=#FF0000>") \
|
|
.replace("<orange>", "<font color=#FCA712>") \
|
|
.replace("<grey>", "<font color=#C3C3C3>") \
|
|
.replace("<cyan>", "<font color=#00FFFF>") \
|
|
.replace("<violet>", "<font color=#8F00FF>") \
|
|
.replace("<neutral>", self.setting_service.get("neutral_color").get_font_color()) \
|
|
.replace("<omni>", self.setting_service.get("omni_color").get_font_color()) \
|
|
.replace("<clan>", self.setting_service.get("clan_color").get_font_color()) \
|
|
.replace("<unknown>", self.setting_service.get("unknown_color").get_font_color()) \
|
|
.replace("<myname>", self.bot.get_char_name()) \
|
|
.replace("<myorg>", self.public_channel_service.get_org_name() or "Unknown Org") \
|
|
.replace("<tab>", " ") \
|
|
.replace("<end>", "</font>") \
|
|
.replace("<symbol>", self.setting_service.get("symbol").get_value()) \
|
|
.replace("\n", "<br>")
|