45 lines
1.2 KiB
Python
45 lines
1.2 KiB
Python
from SSHLogEntry import (
|
|
SSHRejectedPassword,
|
|
SSHAcceptedPassword,
|
|
SSHError,
|
|
SSHOther,
|
|
SSHLogEntry
|
|
)
|
|
from typing import List, Iterator
|
|
from ipaddress import IPv4Address
|
|
|
|
|
|
class SSHLogJournal:
|
|
_entries: List[SSHLogEntry]
|
|
|
|
|
|
def __init__(self) -> None:
|
|
self._entries = []
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._entries)
|
|
|
|
def __iter__(self) -> Iterator[SSHLogEntry]:
|
|
return iter(self._entries)
|
|
|
|
def __contains__(self, value: SSHLogEntry) -> bool:
|
|
return value in self._entries
|
|
|
|
def append(self, log: str) -> None:
|
|
entry: SSHLogEntry
|
|
if "Failed password for invalid user " in log:
|
|
entry = SSHRejectedPassword(log)
|
|
elif "Accepted password for " in log:
|
|
entry = SSHAcceptedPassword(log)
|
|
elif "error: " in log:
|
|
entry = SSHError(log)
|
|
else:
|
|
entry = SSHOther(log)
|
|
|
|
if not entry.validate():
|
|
raise Exception("entry data validation failed!")
|
|
|
|
self._entries.append(entry)
|
|
|
|
def get_logs_by_ip(self, ipv4: IPv4Address) -> List[SSHLogEntry]:
|
|
return [log for log in self._entries if log.ipv4() == ipv4] |