studia/jezyki-skryptowe/lista9/SSHLogEntry.py
2024-06-14 16:53:58 +02:00

129 lines
4.4 KiB
Python

from datetime import datetime
from abc import ABC, abstractmethod
import re
from ipaddress import IPv4Address
class SSHLogEntry(ABC):
timestamp: str
timestamp_datetime: datetime
hostname: str | None
pid: int
message: str
_raw_entry: str
_line_pattern = re.compile(r"(?P<timestamp>\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2})\s(?P<hostname>\S+)\ssshd\[(?P<pid>\d+)\]:\s(?P<message>.+)")
_ipv4_pattern = re.compile(r'(?P<address>(?:[0-9]{1,3}\.){3}[0-9]{1,3})')
_user_pattern = re.compile(r"([Ii]nvalid user |Failed password for(?: invalid user)? |Too many authentication failures for |Accepted password for |user[= ])(?P<user>\w+)")
def __init__(self, line: str) -> None:
data = self._line_pattern.match(line)
if data is None:
raise Exception(f"invalid data: {data}")
timestamp = data.group("timestamp")
self.timestamp = timestamp
self.hostname = data.group("hostname")
self.pid = int(data.group("pid"))
self.message = data.group("message")
self._raw_entry = line.strip()
dt = datetime.strptime(self.timestamp, "%b %d %H:%M:%S")
self.timestamp_datetime = dt
def __str__(self) -> str:
return self._raw_entry
def ipv4(self) -> IPv4Address | None:
addresses = self._ipv4_pattern.findall(self.message)
if len(addresses) == 0:
return None
return IPv4Address(addresses[0])
@abstractmethod
def validate(self) -> bool:
raise NotImplementedError()
@property
def has_ip(self) -> bool:
return self.ipv4() is not None
def __repr__(self) -> str:
return (f"SSHLogEntry(timestamp='{self.timestamp}', hostname={self.hostname}, pid={self.pid}, message='{self.message}')")
def __eq__(self, other: object) -> bool:
if not isinstance(other, SSHLogEntry):
return False
return self._raw_entry == other._raw_entry
def __lt__(self, other: object) -> bool:
if not isinstance(other, SSHLogEntry):
return False
return self.timestamp_datetime < other.timestamp_datetime
def __gt__(self, other: object) -> bool:
if not isinstance(other, SSHLogEntry):
return False
return self.timestamp_datetime > other.timestamp_datetime
class SSHRejectedPassword(SSHLogEntry):
user: str
def __init__(self, line: str) -> None:
super().__init__(line)
user = self._user_pattern.search(self.message)
if user is None:
raise Exception(f"invalid data: {line}")
self.user = user.group("user")
def validate(self) -> bool:
match = self._line_pattern.match(self._raw_entry)
user = self._user_pattern.search(self.message)
return (match is not None and user is not None
and self.timestamp == match.group("timestamp")
and self.hostname == match.group("hostname")
and self.pid == int(match.group("pid"))
and self.message == match.group("message")
and self.user == user.group("user")
)
class SSHAcceptedPassword(SSHLogEntry):
user: str
def __init__(self, line: str) -> None:
super().__init__(line)
user = self._user_pattern.search(self.message)
if user is None:
raise Exception(f"invalid data: {line}")
self.user = user.group("user")
def validate(self) -> bool:
match = self._line_pattern.match(self._raw_entry)
user = self._user_pattern.search(self.message)
return (match is not None and user is not None
and self.timestamp == match.group("timestamp")
and self.hostname == match.group("hostname")
and self.pid == int(match.group("pid"))
and self.message == match.group("message")
and self.user == user.group("user")
)
class SSHError(SSHLogEntry):
def __init__(self, line: str) -> None:
super().__init__(line)
def validate(self) -> bool:
match = self._line_pattern.match(self._raw_entry)
return (match is not None
and self.timestamp == match.group("timestamp")
and self.hostname == match.group("hostname")
and self.pid == int(match.group("pid"))
and self.message == match.group("message")
)
class SSHOther(SSHLogEntry):
def __init__(self, line: str) -> None:
super().__init__(line)
def validate(self) -> bool:
return True