from datetime import datetime from abc import ABC, abstractmethod import re class SSHLogEntry(ABC): timestamp: str hostname: str | None pid: int message: str # 4 _raw_entry: str _line_pattern = re.compile(r"(?P\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2})\s(?P\S+)\ssshd\[(?P\d+)\]:\s(?P.+)") _ipv4_pattern = re.compile(r"(?P
(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))") _user_pattern = re.compile(r"([Ii]nvalid user |Failed password for(?: invalid user)? |Too many authentication failures for |Accepted password for |user[= ])(?P\w+)") #1.a def __init__(self, line: str): data = self._line_pattern.match(line) if data is None: raise Exception(f"invalid data: {data}") timestamp = data.group("timestamp") self.timestamp = timestamp self.hostname = data.group("hostname") self.pid = int(data.group("pid")) self.message = data.group("message") self._raw_entry = line.strip() #1.b def __str__(self) -> str: return self._raw_entry #1.c def ipv4(self): addresses = self._ipv4_pattern.findall(self.message) if len(addresses) == 0: return None return addresses[0] # 3 @abstractmethod def validate(self) -> bool: raise NotImplementedError() # 5 @property def has_ip(self): return self.ipv4() is not None # 6 def __repr__(self) -> str: return (f"SSHLogEntry(timestamp='{self.timestamp}', hostname={self.hostname}, pid={self.pid}, message='{self.message}')") def __eq__(self, other: "SSHLogEntry") -> bool: return self._raw_entry == other._raw_entry def __lt__(self, other: "SSHLogEntry") -> bool: return self.timestamp_datetime < other.timestamp_datetime def __gt__(self, other: "SSHLogEntry") -> bool: return self.timestamp_datetime > other.timestamp_datetime @property def timestamp_datetime(self): return datetime.strptime(self.timestamp, "%b %d %H:%M:%S"), #2.a class SSHRejectedPassword(SSHLogEntry): user: str def __init__(self, line: str): super().__init__(line) user = self._user_pattern.search(self.message) if user is None: raise Exception(f"invalid data {data}") self.user = user.group("user") def validate(self): match = self._line_pattern.match(self._raw_entry) user = self._user_pattern.search(self.message) return (match and user and self.timestamp == match.group("timestamp") and self.hostname == match.group("hostname") and self.pid == int(match.group("pid")) and self.message == match.group("message") and self.user == user.group("user") ) #2.b class SSHAcceptedPassword(SSHLogEntry): user: str def __init__(self, line: str): super().__init__(line) user = self._user_pattern.search(self.message) if user is None: raise Exception(f"invalid data {data}") self.user = user.group("user") def validate(self): match = self._line_pattern.match(self._raw_entry) user = self._user_pattern.search(self.message) return (match and user and self.timestamp == match.group("timestamp") and self.hostname == match.group("hostname") and self.pid == int(match.group("pid")) and self.message == match.group("message") and self.user == user.group("user") ) #2.c class SSHError(SSHLogEntry): def validate(self): match = self._line_pattern.match(self._raw_entry) return (match and self.timestamp == match.group("timestamp") and self.hostname == match.group("hostname") and self.pid == int(match.group("pid")) and self.message == match.group("message") ) #2.d class SSHOther(SSHLogEntry): def validate(self): return True