irc.Client(): implement sasl authentication and send queue
All checks were successful
Publish to PyPI / Publish to PyPI (push) Successful in 1m31s

This commit is contained in:
Stefan Harmuth 2024-11-30 10:26:21 +01:00
parent bfac2b9433
commit 23283d3b66

View File

@ -1,9 +1,11 @@
from __future__ import annotations
from time import sleep
from .schedule import Scheduler
from .simplesocket import ClientSocket
from base64 import b64encode
from collections import deque
from datetime import timedelta
from enum import Enum
from time import sleep
from typing import Callable
@ -155,6 +157,15 @@ class ServerMessage(str, Enum):
ERR_NOOPERHOST = "491"
ERR_UMODEUNKNOWNFLAG = "501"
ERR_USERSDONTMATCH = "502"
RPL_LOGGEDIN = "900"
RPL_LOGGEDOUT = "901"
ERR_NICKLOCKED = "902"
RPL_SASLSUCCESS = "903"
ERR_SASLFAIL = "904"
ERR_SASLTOOLONG = "905"
ERR_SASLABORTED = "906"
ERR_SASLALREADY = "907"
RPL_SASLMECHS = "908"
MSG_NICK = "NICK"
MSG_TOPIC = "TOPIC"
MSG_MODE = "MODE"
@ -221,20 +232,30 @@ class Client:
nick: str,
username: str,
realname: str = "Python Bot",
sasl_password: str = None,
):
self.__connected = False
self.__send_queue = deque()
self.__userlist = {}
self.__channellist = {}
self.__server_socket = ClientSocket(server, port)
self.__server_socket.sendline(
"USER %s ignore ignore :%s" % (username, realname)
)
if sasl_password is not None:
self.__server_socket.sendline("CAP REQ :sasl")
self.__server_socket.sendline("USER %s ignore ignore :%s" % (username, realname))
self.__server_socket.sendline("NICK %s" % nick)
if sasl_password is not None:
self.__server_socket.sendline("AUTHENTICATE PLAIN")
auth_string = b64encode(bytes("%s\0%s\0%s" % (nick, nick, sasl_password), "utf-8")).decode("ascii")
self.__server_socket.sendline("AUTHENTICATE %s" % auth_string)
self.__server_caps = {"MAXLEN": 255}
self.__function_register = {
ServerMessage.RPL_WELCOME: [self.on_rpl_welcome],
ServerMessage.RPL_TOPIC: [self.on_rpl_topic],
ServerMessage.RPL_ISUPPORT: [self.on_rpl_isupport],
ServerMessage.ERR_NICKNAMEINUSE: [self.on_err_nicknameinuse],
ServerMessage.RPL_LOGGEDIN: [self.on_auth],
ServerMessage.MSG_JOIN: [self.on_join],
ServerMessage.MSG_PART: [self.on_part],
ServerMessage.MSG_QUIT: [self.on_quit],
@ -243,6 +264,17 @@ class Client:
}
self.receive()
def send_raw(self, msg: str):
self.__send_queue.append(msg)
def send_queue(self):
if not self.__connected or not self.__send_queue:
return
msg = self.__send_queue.popleft()
print(f"-> {msg}")
self.__server_socket.sendline(msg)
def receive(self):
while line := self.__server_socket.recvline():
line = line.strip()
@ -267,10 +299,7 @@ class Client:
if "!" in msg_from and msg_from not in self.__userlist:
self.__userlist[msg_from] = User(msg_from)
if (
self.__userlist[msg_from].nickname
== self.__userlist[self.__my_user].nickname
):
if self.__userlist[msg_from].nickname == self.__userlist[self.__my_user].nickname:
del self.__userlist[self.__my_user]
self.__my_user = msg_from
@ -284,9 +313,14 @@ class Client:
else:
self.__function_register[msg_type] = [func]
def on_auth(self, msg_from: str, msg_to: str, message: str):
print("authed")
self.__server_socket.sendline("CAP END")
def on_rpl_welcome(self, msg_from: str, msg_to: str, message: str):
self.__my_user = message.split()[-1]
self.__userlist[self.__my_user] = User(self.__my_user)
self.__connected = True
def on_rpl_isupport(self, msg_from: str, msg_to: str, message: str):
for cap in message.split():
@ -310,9 +344,7 @@ class Client:
def on_nick(self, msg_from: str, msg_to: str, message: str):
self.__userlist[msg_from].nick(msg_to)
self.__userlist[self.__userlist[msg_from].identifier] = self.__userlist[
msg_from
]
self.__userlist[self.__userlist[msg_from].identifier] = self.__userlist[msg_from]
del self.__userlist[msg_from]
def on_join(self, msg_from: str, msg_to: str, message: str):
@ -342,21 +374,21 @@ class Client:
print(msg_from, msg_type, msg_to, message)
def nick(self, new_nick: str):
self.__server_socket.sendline("NICK %s" % new_nick)
self.send_raw("NICK %s" % new_nick)
def join(self, channel: str):
self.__server_socket.sendline("JOIN %s" % channel)
self.send_raw("JOIN %s" % channel)
self.receive()
def part(self, channel: str):
self.__server_socket.sendline("PART %s" % channel)
self.send_raw("PART %s" % channel)
self.receive()
def privmsg(self, target: str, message: str):
self.__server_socket.sendline("PRIVMSG %s :%s" % (target, message))
self.send_raw("PRIVMSG %s :%s" % (target, message))
def quit(self, message: str = "Elvis has left the building!"):
self.__server_socket.sendline("QUIT :%s" % message)
self.send_raw("QUIT :%s" % message)
self.receive()
self.__server_socket.close()
@ -389,8 +421,9 @@ class IrcBot(Client):
nick: str,
username: str,
realname: str = "Python Bot",
sasl_password: str = None,
):
super().__init__(server, port, nick, username, realname)
super().__init__(server, port, nick, username, realname, sasl_password)
self._scheduler = Scheduler()
self._channel_commands = {}
self._privmsg_commands = {}
@ -415,13 +448,8 @@ class IrcBot(Client):
if not message:
return
command = message.split()[0]
if (
msg_to in self._channel_commands
and command in self._channel_commands[msg_to]
):
self._channel_commands[msg_to][command](
msg_from, " ".join(message.split()[1:])
)
if msg_to in self._channel_commands and command in self._channel_commands[msg_to]:
self._channel_commands[msg_to][command](msg_from, " ".join(message.split()[1:]))
if msg_to == self.getUser().nickname and command in self._privmsg_commands:
self._privmsg_commands[command](msg_from, " ".join(message.split()[1:]))
@ -429,5 +457,6 @@ class IrcBot(Client):
def run(self):
while True:
self._scheduler.run_pending()
self.send_queue()
self.receive()
sleep(0.01)