129 lines
4.4 KiB
Python
129 lines
4.4 KiB
Python
from datetime import datetime
|
|
from abc import ABC, abstractmethod
|
|
import re
|
|
from ipaddress import IPv4Address
|
|
|
|
class SSHLogEntry(ABC):
|
|
timestamp: str
|
|
timestamp_datetime: datetime
|
|
hostname: str | None
|
|
pid: int
|
|
message: str
|
|
_raw_entry: str
|
|
|
|
_line_pattern = re.compile(r"(?P<timestamp>\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2})\s(?P<hostname>\S+)\ssshd\[(?P<pid>\d+)\]:\s(?P<message>.+)")
|
|
_ipv4_pattern = re.compile(r'(?P<address>(?:[0-9]{1,3}\.){3}[0-9]{1,3})')
|
|
_user_pattern = re.compile(r"([Ii]nvalid user |Failed password for(?: invalid user)? |Too many authentication failures for |Accepted password for |user[= ])(?P<user>\w+)")
|
|
|
|
def __init__(self, line: str) -> None:
|
|
data = self._line_pattern.match(line)
|
|
if data is None:
|
|
raise Exception(f"invalid data: {data}")
|
|
timestamp = data.group("timestamp")
|
|
self.timestamp = timestamp
|
|
self.hostname = data.group("hostname")
|
|
self.pid = int(data.group("pid"))
|
|
self.message = data.group("message")
|
|
self._raw_entry = line.strip()
|
|
|
|
dt = datetime.strptime(self.timestamp, "%b %d %H:%M:%S")
|
|
self.timestamp_datetime = dt
|
|
|
|
def __str__(self) -> str:
|
|
return self._raw_entry
|
|
|
|
def ipv4(self) -> IPv4Address | None:
|
|
addresses = self._ipv4_pattern.findall(self.message)
|
|
if len(addresses) == 0:
|
|
return None
|
|
return IPv4Address(addresses[0])
|
|
|
|
@abstractmethod
|
|
def validate(self) -> bool:
|
|
raise NotImplementedError()
|
|
|
|
@property
|
|
def has_ip(self) -> bool:
|
|
return self.ipv4() is not None
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
return (f"SSHLogEntry(timestamp='{self.timestamp}', hostname={self.hostname}, pid={self.pid}, message='{self.message}')")
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
if not isinstance(other, SSHLogEntry):
|
|
return False
|
|
return self._raw_entry == other._raw_entry
|
|
|
|
def __lt__(self, other: object) -> bool:
|
|
if not isinstance(other, SSHLogEntry):
|
|
return False
|
|
return self.timestamp_datetime < other.timestamp_datetime
|
|
|
|
def __gt__(self, other: object) -> bool:
|
|
if not isinstance(other, SSHLogEntry):
|
|
return False
|
|
return self.timestamp_datetime > other.timestamp_datetime
|
|
|
|
class SSHRejectedPassword(SSHLogEntry):
|
|
user: str
|
|
|
|
def __init__(self, line: str) -> None:
|
|
super().__init__(line)
|
|
user = self._user_pattern.search(self.message)
|
|
if user is None:
|
|
raise Exception(f"invalid data: {line}")
|
|
self.user = user.group("user")
|
|
|
|
def validate(self) -> bool:
|
|
match = self._line_pattern.match(self._raw_entry)
|
|
user = self._user_pattern.search(self.message)
|
|
return (match is not None and user is not None
|
|
and self.timestamp == match.group("timestamp")
|
|
and self.hostname == match.group("hostname")
|
|
and self.pid == int(match.group("pid"))
|
|
and self.message == match.group("message")
|
|
and self.user == user.group("user")
|
|
)
|
|
|
|
class SSHAcceptedPassword(SSHLogEntry):
|
|
user: str
|
|
|
|
def __init__(self, line: str) -> None:
|
|
super().__init__(line)
|
|
user = self._user_pattern.search(self.message)
|
|
if user is None:
|
|
raise Exception(f"invalid data: {line}")
|
|
self.user = user.group("user")
|
|
|
|
def validate(self) -> bool:
|
|
match = self._line_pattern.match(self._raw_entry)
|
|
user = self._user_pattern.search(self.message)
|
|
return (match is not None and user is not None
|
|
and self.timestamp == match.group("timestamp")
|
|
and self.hostname == match.group("hostname")
|
|
and self.pid == int(match.group("pid"))
|
|
and self.message == match.group("message")
|
|
and self.user == user.group("user")
|
|
)
|
|
|
|
class SSHError(SSHLogEntry):
|
|
def __init__(self, line: str) -> None:
|
|
super().__init__(line)
|
|
|
|
def validate(self) -> bool:
|
|
match = self._line_pattern.match(self._raw_entry)
|
|
return (match is not None
|
|
and self.timestamp == match.group("timestamp")
|
|
and self.hostname == match.group("hostname")
|
|
and self.pid == int(match.group("pid"))
|
|
and self.message == match.group("message")
|
|
)
|
|
|
|
class SSHOther(SSHLogEntry):
|
|
def __init__(self, line: str) -> None:
|
|
super().__init__(line)
|
|
|
|
def validate(self) -> bool:
|
|
return True
|