from datetime import datetime from abc import ABC, abstractmethod import re from ipaddress import IPv4Address class SSHLogEntry(ABC): timestamp: str timestamp_datetime: datetime hostname: str | None pid: int message: str _raw_entry: str _line_pattern = re.compile(r"(?P\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2})\s(?P\S+)\ssshd\[(?P\d+)\]:\s(?P.+)") _ipv4_pattern = re.compile(r'(?P
(?:[0-9]{1,3}\.){3}[0-9]{1,3})') _user_pattern = re.compile(r"([Ii]nvalid user |Failed password for(?: invalid user)? |Too many authentication failures for |Accepted password for |user[= ])(?P\w+)") def __init__(self, line: str) -> None: data = self._line_pattern.match(line) if data is None: raise Exception(f"invalid data: {data}") timestamp = data.group("timestamp") self.timestamp = timestamp self.hostname = data.group("hostname") self.pid = int(data.group("pid")) self.message = data.group("message") self._raw_entry = line.strip() dt = datetime.strptime(self.timestamp, "%b %d %H:%M:%S") self.timestamp_datetime = dt def __str__(self) -> str: return self._raw_entry def ipv4(self) -> IPv4Address | None: addresses = self._ipv4_pattern.findall(self.message) if len(addresses) == 0: return None return IPv4Address(addresses[0]) @abstractmethod def validate(self) -> bool: raise NotImplementedError() @property def has_ip(self) -> bool: return self.ipv4() is not None def __repr__(self) -> str: return (f"SSHLogEntry(timestamp='{self.timestamp}', hostname={self.hostname}, pid={self.pid}, message='{self.message}')") def __eq__(self, other: object) -> bool: if not isinstance(other, SSHLogEntry): return False return self._raw_entry == other._raw_entry def __lt__(self, other: object) -> bool: if not isinstance(other, SSHLogEntry): return False return self.timestamp_datetime < other.timestamp_datetime def __gt__(self, other: object) -> bool: if not isinstance(other, SSHLogEntry): return False return self.timestamp_datetime > other.timestamp_datetime class SSHRejectedPassword(SSHLogEntry): user: str def __init__(self, line: str) -> None: super().__init__(line) user = self._user_pattern.search(self.message) if user is None: raise Exception(f"invalid data: {line}") self.user = user.group("user") def validate(self) -> bool: match = self._line_pattern.match(self._raw_entry) user = self._user_pattern.search(self.message) return (match is not None and user is not None and self.timestamp == match.group("timestamp") and self.hostname == match.group("hostname") and self.pid == int(match.group("pid")) and self.message == match.group("message") and self.user == user.group("user") ) class SSHAcceptedPassword(SSHLogEntry): user: str def __init__(self, line: str) -> None: super().__init__(line) user = self._user_pattern.search(self.message) if user is None: raise Exception(f"invalid data: {line}") self.user = user.group("user") def validate(self) -> bool: match = self._line_pattern.match(self._raw_entry) user = self._user_pattern.search(self.message) return (match is not None and user is not None and self.timestamp == match.group("timestamp") and self.hostname == match.group("hostname") and self.pid == int(match.group("pid")) and self.message == match.group("message") and self.user == user.group("user") ) class SSHError(SSHLogEntry): def __init__(self, line: str) -> None: super().__init__(line) def validate(self) -> bool: match = self._line_pattern.match(self._raw_entry) return (match is not None and self.timestamp == match.group("timestamp") and self.hostname == match.group("hostname") and self.pid == int(match.group("pid")) and self.message == match.group("message") ) class SSHOther(SSHLogEntry): def __init__(self, line: str) -> None: super().__init__(line) def validate(self) -> bool: return True