Files
igncore/WS-S/discord_client.py

148 lines
5.7 KiB
Python

import asyncio
import datetime
import json
import websockets
from websockets.legacy.client import WebSocketClientProtocol
from core.dict_object import DictObject
# There is some calculation required. Best to kinda mess with it is to use the websocket on a dev pc and see the output, I believe it was JSON
# ws://<You-KNOW-THS>:8887 as basic chat communication. When u connect u have to identify (it sends "#auth") - Repky with the following message or get disconnected: "#auth 13r9Zh9qd10%§SD29#31+". After login u can use followin commands: "#timers" to get all timers(tara, gaunt, ...)
#
# "#all" sends complete towerdatabase, so just do it every 24h...
#
# You automaticly get tower updates: "#tower+ [data]" when planted and "#tower- [towerID]" when destroyed
# Also timers get been broadcasted: "#timer Tarasque killable" etc..."#timer Vizaresh spawn", "#timer Tarasque 30min", "#timer Tarasque 15min"
# like "#tower+ [{asddasdasdaqqd}]
class TimerObj:
name: str = ""
spawn: int = 0
mortal: int = 0
time: int = 0
spam: int = 0
def __init__(self, obj):
self.name = obj.name
self.spawn = obj.spawn
self.mortal = obj.mortal
self.time = obj.time
def nick(self):
return self.name.split()[0] if self.name != 'The Desert Rider' else 'Rider'
def update(self, obj):
self.spawn = obj.spawn
self.time = obj.time
self.mortal = obj.mortal
def __str__(self):
return f"{self.name} " \
f"\n\tspawn: {datetime.datetime.fromtimestamp(self.spawn, tz=datetime.timezone.utc).strftime('%H:%M:%S')}" \
f"\n\tmortal delta: {self.mortal}" \
f"\n\tmortal: {datetime.datetime.fromtimestamp(self.time, tz=datetime.timezone.utc).strftime('%H:%M:%S')}" \
f"\n\tImmortal: {self.alive_immortal()}" \
f"\n\tMortal killable: {self.alive_mortal()}" \
f"\n\t{self.name} - {self.spawn} - {self.mortal} - {self.time} - {self.spam} - {self.getTime()}"
def alive_immortal(self, spam=False) -> bool:
if spam:
return self.spam < self.spawn < self.time and self.time > self.getTime()
return self.time > self.getTime()
def alive_mortal(self, spam=False) -> bool:
if spam:
return self.spam < self.time < (self.getTime()) and (self.time + 60) > self.getTime()
return self.time < self.getTime()
@classmethod
def getTime(cls) -> int:
return int(datetime.datetime.now().timestamp())
def get_respawn_timer(self):
if self.name in ["Tarasque", "Loren Warr", "The Hollow Reaper", "Cerubin The Reborn"]:
return 9 * 60 * 60
elif self.name in ["Vizaresh"]:
return 17 * 60 * 60
elif self.name in ["Atma"]:
return 3 * 60 * 60
elif self.name in ["T.A.M.", "Zaal The Immortal"]:
return 6 * 60 * 60
elif self.name in ["Abmouth Indomitus"]:
return 9 * 60 * 60
# No/unknown respawn timer?
return 0
class WebsocketClient:
timer_data = []
def getWBTimer(self, name=None):
if not name:
blob = ""
for x in sorted(self.timer_data, key=lambda y: y.name):
msg = self.formatMessage(x)
blob += (msg + "\n") if msg else ""
return blob
for x in self.timer_data:
if x.name == name:
return self.formatMessage(x).strip()
def formatMessage(self, x: TimerObj):
if x.name in ['Tarasque', 'Vizaresh']:
return ""
if x.alive_immortal():
return f"`{x.name.split()[0]: <8}` :: mortal <t:{x.time}:R>"
else:
rt = x.get_respawn_timer()
if rt != 0:
if (x.time + rt) < x.getTime() < (x.time + 1.5 * rt):
return f"`{x.nick(): <8}` :: should have spawned <t:{abs((x.time + rt))}:R>... next one <t:{x.time + rt + x.get_respawn_timer()}:R>"
if x.time < x.getTime() < (x.time + 10 * rt):
skips = 0
time = x.time
while time < x.getTime():
time += x.get_respawn_timer()
skips += 1
if (x.time + rt) < x.getTime():
return f"`{x.nick(): <8}` :: spawn <t:{x.time + rt * skips}:R> [Skipped: {skips - 1}]"
return f"`{x.nick(): <8}` :: spawn <t:{x.time + rt}:R>"
else:
return f"`{x.nick(): <8}` :: mortal <t:{x.time + rt}:R>"
return ""
async def internal(self, uri):
async with websockets.connect(uri) as websocket:
websocket: WebSocketClientProtocol
data = await websocket.recv()
if data == "#auth":
await websocket.send("#login dbs nFa*n4+p~#__H)6NVvQ]W.Veg8!`q6h[Pp9q6HKDk")
while True:
data = DictObject(json.loads(await websocket.recv()))
print(data.payload)
for x in data.payload:
found = False
for y in self.timer_data:
if y.name == x.name and not found:
y.update(x)
found = True
if not found:
self.timer_data.append(TimerObj(x))
print(self.getWBTimer())
async def monitor(self, uri, method):
# Auto reconnect
while True:
await method(uri)
await asyncio.sleep(10)
def __init__(self):
loop = asyncio.get_event_loop()
b = loop.create_task(self.monitor("ws://37.187.118.232:25501/internal/timers", self.internal))
loop.run_until_complete(b)
WebsocketClient()