import time
from core.chat_blob import ChatBlob
from core.command_param_types import Const, Any, Int
from core.decorators import instance, command, event
from modules.core.accounting.services.account_service import AccountService
@instance()
class PollController:
def inject(self, registry):
self.bot = registry.get_instance("bot")
self.db = registry.get_instance("db")
self.util = registry.get_instance("util")
self.text = registry.get_instance("text")
self.job_scheduler = registry.get_instance("job_scheduler")
self.pork_service = registry.get_instance("pork_service")
self.command_alias_service = registry.get_instance("command_alias_service")
self.account_service: AccountService = registry.get_instance("account_service")
def start(self):
self.db.exec("CREATE TABLE IF NOT EXISTS poll ("
"id INT PRIMARY KEY AUTO_INCREMENT, "
"question VARCHAR(1024) NOT NULL, "
"duration INT NOT NULL, "
"min_access_level VARCHAR(20) NOT NULL, "
"char_id INT NOT NULL, "
"created_at INT NOT NULL, "
"finished_at INT NOT NULL, "
"is_finished SMALLINT NOT NULL)")
self.db.exec("CREATE TABLE IF NOT EXISTS poll_choice ("
"id INT PRIMARY KEY AUTO_INCREMENT, "
"poll_id INT NOT NULL, "
"choice VARCHAR(1024))")
self.db.exec("CREATE TABLE IF NOT EXISTS poll_vote ("
"poll_id INT NOT NULL, "
"choice_id INT NOT NULL, "
"char_id INT NOT NULL)")
self.command_alias_service.add_alias("vote", "poll")
@command(command="poll", params=[], access_level="member",
description="List the polls")
def poll_list_cmd(self, request):
blob = ""
t = int(time.time())
state = ""
polls = self.get_polls()
for poll in polls:
if poll.finished_at > t and state != "running":
state = "running"
blob += "\nRunning\n"
elif poll.finished_at <= t and state != "finished":
state = "finished"
blob += "\nFinished\n"
if state == "running":
time_string = self.util.time_to_readable(poll.finished_at - t) + " left"
else:
time_string = self.util.time_to_readable(t - poll.finished_at) + " ago"
blob += f"{poll.id:d}. {self.text.make_tellcmd(poll.question, f'poll {poll.id:d}')} " \
f"({poll.total_cnt:d}) - {time_string}\n"
return ChatBlob(f"Polls ({len(polls):d})", blob)
@command(command="poll", params=[Int("poll_id")], access_level="member",
description="View a poll")
def poll_view_cmd(self, request, poll_id):
poll = self.get_poll(poll_id)
if not poll:
return f"Could not find poll with ID {poll_id:d}."
return self.show_poll_details_blob(poll)
@command(command="poll",
params=[Const("add"), Any("duration|poll_question|option1|option2|...")],
access_level="moderator",
description="Add a poll", sub_command="add")
def poll_add_cmd(self, request, _, options):
options = options.split("|")
if len(options) < 4:
return "You must enter a duration, a poll question, and at least two choices."
time_str = options.pop(0).strip()
question = options.pop(0).strip()
choices = options
duration = self.util.parse_time(time_str)
if duration == 0:
return "You must enter a valid duration."
poll_id = self.add_poll(question, request.sender.char_id, duration)
for choice in choices:
self.add_poll_choice(poll_id, choice.strip())
self.create_scheduled_job(self.get_poll(poll_id))
return self.show_poll_details_blob(self.get_poll(poll_id))
@command(command="poll", params=[Int("poll_id"), Const("vote"), Int("choice_id")], access_level="member",
description="Vote on a poll")
def poll_vote_cmd(self, request, poll_id, _, choice_id):
poll = self.get_poll(poll_id)
if not poll:
return f"Could not find poll with id {poll_id:d}."
if poll.is_finished == 1:
return 'The poll has ended already, your vote has not been registered.'
choice = self.db.query_single("SELECT * FROM poll_choice WHERE poll_id = ? AND id = ?", [poll_id, choice_id])
if not choice:
return f"Could not find choice with id {choice_id:d} " \
f"for poll id {poll_id:d}."
main = self.account_service.get_main(request.sender.char_id)
# retrieve pork info
self.pork_service.get_character_info(main.char_id)
cnt = self.db.exec("DELETE FROM poll_vote "
"WHERE poll_id = ? "
"AND (char_id = ? OR char_id = ?)",
[poll_id, main.char_id, request.sender.char_id])
self.db.exec("INSERT INTO poll_vote (poll_id, choice_id, char_id) "
"VALUES (?, ?, ?)",
[poll_id, choice_id, main.char_id])
if cnt > 0:
return "Your vote has been updated."
else:
return "Your vote has been saved."
@command(command="poll", params=[Int("poll_id"), Const("remvote")], access_level="member",
description="Remove your vote on a poll")
def poll_remvote_cmd(self, request, poll_id, _):
poll = self.get_poll(poll_id)
if not poll:
return f"Could not find poll with id {poll_id:d}."
if poll.is_finished == 1:
return "The poll has ended already you may not remove your vote."
main = self.account_service.get_main(request.sender.char_id)
cnt = self.db.exec("DELETE FROM poll_vote "
"WHERE poll_id = ? "
"AND (char_id = ? OR char_id = ?)",
[poll_id, main.char_id, request.sender.char_id])
if cnt > 0:
return "Your vote has been removed."
else:
return "You have not voted for that choice."
@event(event_type="connect", description="Check for finished polls", is_hidden=True)
def connect_event(self, event_type, event_data):
self.check_for_finished_polls()
self.create_scheduled_jobs_for_polls()
@event(event_type="member_logon",
description="Send active polls to org members logging on")
def poll_on_logon(self, event_type, event_data):
if self.bot.is_ready():
# Only proceed if the "normal" checks are fine
if not self.account_service.simple_checks(event_data.account):
return
if event_data.account.news_spam == 1:
data = self.db.query("SELECT * FROM poll WHERE is_finished != 1 AND "
"id NOT IN (SELECT poll_id FROM poll_vote WHERE char_id = ?) "
"ORDER BY finished_at, id", [event_data.account.char_id])
if data:
row = data[0]
self.bot.send_private_message(event_data.packet.char_id, self.show_poll_details_blob(row))
def create_scheduled_jobs_for_polls(self):
data = self.db.query("SELECT * FROM poll WHERE is_finished != 1")
for row in data:
self.create_scheduled_job(row)
def check_for_finished_polls(self):
data = self.db.query("SELECT * FROM poll WHERE is_finished = 0 AND finished_at <= ?", [int(time.time())])
for row in data:
self.end_poll(row)
def show_poll_details_blob(self, poll):
blob = ""
blob += f"Duration: {self.util.time_to_readable(poll.duration)}\n"
blob += f"Created: {self.util.format_datetime(poll.created_at)}\n"
blob += f"Finished: {self.util.format_datetime(poll.finished_at)}\n"
blob += "\nChoices\n"
idx = 1
for choice in self.get_choices(poll.id):
blob += f"{idx:d}. {self.text.make_tellcmd(choice.choice, f'poll {poll.id:d} vote {choice.id:d}')} " \
f"({choice.cnt:d})\n"
for vote in self.get_votes(choice.id):
blob += f"{self.text.format_char_info(vote)}\n"
idx += 1
return ChatBlob(f"Poll ID {poll.id:d}: {poll.question}", blob)
def get_polls(self):
return self.db.query("SELECT p.*, "
"(SELECT COUNT(1) FROM poll_vote v WHERE v.poll_id = p.id) AS total_cnt "
"FROM poll p "
"ORDER BY finished_at DESC")
def get_poll(self, poll_id):
return self.db.query_single("SELECT * FROM poll WHERE id = ?", [poll_id])
def get_choices(self, poll_id):
return self.db.query("SELECT c.id, c.choice, COUNT(v.char_id) AS cnt FROM poll_choice c "
"LEFT JOIN poll_vote v ON c.id = v.choice_id "
"WHERE c.poll_id = ? "
"GROUP BY c.id, c.choice "
"ORDER BY c.id", [poll_id])
def get_votes(self, choice_id):
return self.db.query("SELECT p.* FROM poll_vote v "
"LEFT JOIN player p ON v.char_id = p.char_id "
"WHERE v.choice_id = ?", [choice_id])
def add_poll(self, question, char_id, duration, min_access_level="member"):
t = int(time.time())
self.db.exec("INSERT INTO poll (question, duration, min_access_level, "
"char_id, created_at, finished_at, is_finished) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
[question, duration, min_access_level, char_id, t, t + duration, 0])
return self.db.last_insert_id()
def add_poll_choice(self, poll_id, choice):
self.db.exec("INSERT INTO poll_choice (poll_id, choice) VALUES (?, ?)", [poll_id, choice])
return self.db.last_insert_id()
def create_scheduled_job(self, poll):
self.job_scheduler.scheduled_job(self.show_results, poll.finished_at, poll.id)
def show_results(self, t, poll_id):
self.end_poll(self.get_poll(poll_id))
def end_poll(self, poll):
self.bot.send_private_message(poll.char_id,
f"Your poll {poll.id:d}. {poll.question} has finished.")
self.db.exec("UPDATE poll SET is_finished = 1 WHERE id = ?", [poll.id])