diff --git a/tools/irc.py b/tools/irc.py index 1636ba2..492a558 100644 --- a/tools/irc.py +++ b/tools/irc.py @@ -1,6 +1,6 @@ from .simplesocket import ClientSocket -from typing import Callable from enum import Enum +from typing import Callable, Dict, List, Union class ServerMessage(str, Enum): @@ -8,7 +8,9 @@ class ServerMessage(str, Enum): RPL_YOURHOST = "002" RPL_CREATED = "003" RPL_MYINFO = "004" - RPL_BOUNCE = "005" + RPL_ISUPPORT = "005" + RPL_BOUNCE = "010" + RPL_UNIQID = "042" RPL_TRACELINK = "200" RPL_TRACECONNECTING = "201" RPL_TRACEHANDSHAKE = "202" @@ -67,6 +69,7 @@ class ServerMessage(str, Enum): RPL_UNIQOPIS = "325" RPL_NOTOPIC = "331" RPL_TOPIC = "332" + RPL_TOPICBY = "333" RPL_INVITING = "341" RPL_SUMMONING = "342" RPL_INVITELIST = "346" @@ -148,26 +151,87 @@ class ServerMessage(str, Enum): ERR_NOOPERHOST = "491" ERR_UMODEUNKNOWNFLAG = "501" ERR_USERSDONTMATCH = "502" + MSG_NICK = "NICK" + MSG_TOPIC = "TOPIC" + MSG_MODE = "MODE" + MSG_PRIVMSG = "PRIVMSG" + MSG_JOIN = "JOIN" + MSG_PART = "PART" + MSG_QUIT = "QUIT" + RAW = "RAW" + + +class User: + user: str + nickname: str + username: str + hostname: str + + def __init__(self, user: str): + self.user = user + user, self.hostname = self.user.split("@") + self.nickname, self.username = user.split("!") + + def nick(self, new_nick: str): + self.user.replace("%s!" % self.nickname, "%s!" % new_nick) + self.nickname = new_nick + + +class Channel: + name: str + topic: str + userlist: Dict[str, User] + + def __init__(self, name: str): + self.name = name + self.topic = "" + self.userlist = {} + + def join(self, user: User): + if user.user not in self.userlist: + self.userlist[user.user] = user + + def quit(self, user: User): + if user.user in self.userlist: + del self.userlist[user.user] class Client: + __function_register: Dict[str, List[Callable]] + __server_socket: ClientSocket + __server_caps: Dict[str, Union[str, int]] + __userlist: Dict[str, User] + __channellist: Dict[str, Channel] + __server_name: str = None + __my_user: str = None + def __init__(self, server: str, port: int, nick: str, username: str, realname: str = "Python Bot"): - self.nickname = None - self.__server = ClientSocket(server, port) - self.__server.sendline("USER %s ignore ignore :%s" % (username, realname)) - self.__server.sendline("NICK %s" % nick) + self.__userlist = {} + self.__channellist = {} + self.__server_socket = ClientSocket(server, port) + self.__server_socket.sendline("USER %s ignore ignore :%s" % (username, realname)) + self.__server_socket.sendline("NICK %s" % nick) + self.__server_caps = { + 'MAXLEN': 255 + } self.__function_register = { - ServerMessage.RPL_WELCOME: self.on_welcome, - ServerMessage.ERR_NICKNAMEINUSE: self.on_nickname_in_use, - 'NICK': self.on_nick, - '__default__': self.unhandled_server_message, + ServerMessage.RPL_WELCOME: [self.on_rpl_welcome], + ServerMessage.RPL_TOPIC: [self.on_rpl_topic], + ServerMessage.RPL_ISUPPORT: [self.on_rpl_isupport], + ServerMessage.ERR_NICKNAMEINUSE: [self.on_err_nicknameinuse], + ServerMessage.MSG_JOIN: [self.on_join], + ServerMessage.MSG_PART: [self.on_part], + ServerMessage.MSG_QUIT: [self.on_quit], + ServerMessage.MSG_NICK: [self.on_nick], + ServerMessage.MSG_TOPIC: [self.on_topic], + ServerMessage.RAW: [self.on_raw], } self.receive() def receive(self): - while line := self.__server.recvline(): + while line := self.__server_socket.recvline(): if line.startswith("PING"): - self.__server.sendline("PONG " + line.split()[1]) + self.__server_socket.sendline("PONG " + line.split()[1]) continue try: @@ -181,43 +245,109 @@ class Client: message = " ".join(msg) if msg_type in self.__function_register: - self.__function_register[msg_type](msg_from, msg_to, message) - else: - self.__function_register['__default__'](msg_from, msg_type, msg_to, message) + for func in self.__function_register[msg_type]: + func(msg_from, msg_to, message) + + for func in self.__function_register['RAW']: + func(msg_from, msg_type, msg_to, message) def register(self, msg_type: str, func: Callable[..., None]): - self.__function_register[msg_type] = func + if msg_type in self.__function_register: + self.__function_register[msg_type].append(func) + else: + self.__function_register[msg_type] = [func] - def on_welcome(self, msg_from: str, msg_to: str, message: str): - self.nickname = msg_to + def on_rpl_welcome(self, msg_from: str, msg_to: str, message: str): + self.__server_name = msg_from + self.__my_user = message.split()[-1] + self.__userlist[self.__my_user] = User(self.__my_user) - def on_nickname_in_use(self, msg_from: str, msg_to: str, message: str): - if self.nickname is None: + def on_rpl_isupport(self, msg_from: str, msg_to: str, message: str): + for cap in message.split(): + if "=" not in cap: + self.__server_caps[cap] = True + else: + (a, b) = cap.split("=") + self.__server_caps[a] = b + + def on_rpl_topic(self, msg_from: str, msg_to: str, message: str): + channel, *topic = message.split() + if len(topic) > 0: + topic[0] = topic[0][1:] + + new_topic = " ".join(topic) + self.__channellist[channel].topic = new_topic + + def on_err_nicknameinuse(self, msg_from: str, msg_to: str, message: str): + if self.__my_user is None: self.nick(message.split()[0] + "_") - def on_nick(self, old_nick: str, new_nick: str, message: str): - old_nick = old_nick.split("!")[0] - if old_nick == self.nickname: - self.nickname = new_nick[1:] + def on_nick(self, msg_from: str, msg_to: str, message: str): + self.__userlist[msg_from].nick(msg_to) + self.__userlist[self.__userlist[msg_from].user] = self.__userlist[msg_from] + del self.__userlist[msg_from] - def unhandled_server_message(self, msg_from: str, msg_type: str, msg_to: str, message: str): + def on_join(self, msg_from: str, msg_to: str, message: str): + channel = msg_to[1:] + if msg_from == self.__my_user: + self.__channellist[channel] = Channel(channel) + # FIXME: get user list (NAMES just returns nicknames, not nick!user@host !!!) + + if msg_from not in self.__userlist: + self.__userlist[msg_from] = User(msg_from) + + self.__channellist[channel].join(self.__userlist[msg_from]) + + def on_topic(self, msg_from: str, msg_to: str, message: str): + self.__channellist[msg_to].topic = message + + def on_part(self, msg_from: str, msg_to: str, message: str): + self.__channellist[msg_to].quit(self.__userlist[msg_from]) + + def on_quit(self, msg_from: str, msg_to: str, message: str): + for c in self.__channellist: + self.__channellist[c].quit(self.__userlist[msg_from]) + + del self.__userlist[msg_from] + + def on_raw(self, msg_from: str, msg_type: str, msg_to: str, message: str): print(msg_from, msg_type, msg_to, message) def nick(self, new_nick: str): - self.__server.sendline("NICK %s" % new_nick) + self.__server_socket.sendline("NICK %s" % new_nick) def join(self, channel: str): - self.__server.sendline("JOIN %s" % channel) + self.__server_socket.sendline("JOIN %s" % channel) self.receive() - def leave(self, channel: str): - self.__server.sendline("LEAVE %s" % channel) + def part(self, channel: str): + self.__server_socket.sendline("PART %s" % channel) self.receive() def privmsg(self, target: str, message: str): - self.__server.sendline("PRIVMSG %s :%s" % (target, message)) + self.__server_socket.sendline("PRIVMSG %s :%s" % (target, message)) def quit(self, message: str = "Elvis has left the building!"): - self.__server.sendline("QUIT :%s" % message) + self.__server_socket.sendline("QUIT :%s" % message) self.receive() - self.__server.close() + self.__server_socket.close() + + def getUser(self, user: str = None) -> Union[User, None]: + if user is None: + return self.__userlist[self.__my_user] + elif user in self.__userlist: + return self.__userlist[user] + else: + return None + + def getUserList(self) -> List[User]: + return list(self.__userlist.values()) + + def getChannel(self, channel: str) -> Union[Channel, None]: + if channel in self.__channellist: + return self.__channellist[channel] + else: + return None + + def getChannelList(self) -> List[Channel]: + return list(self.__channellist.values())