75 lines
2.3 KiB
Python
Executable File
75 lines
2.3 KiB
Python
Executable File
import re
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
from enum import StrEnum
|
|
|
|
line_pattern = re.compile(r"(?P<timestamp>\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2})\s(?P<hostname>\S+)\ssshd\[(?P<pid>\d+)\]:\s(?P<message>.+)")
|
|
ipv4_pattern = re.compile(r"(?P<address>(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))")
|
|
user_pattern = re.compile(r"([Ii]nvalid user |Failed password for(?: invalid user)? |Too many authentication failures for |Accepted password for |user[= ])(?P<user>\w+)")
|
|
|
|
@dataclass
|
|
class Line:
|
|
timestamp: datetime
|
|
hostname: str
|
|
pid: int
|
|
message: str
|
|
original_line: str
|
|
|
|
class MessageType(StrEnum):
|
|
SUCCESSFUL_LOGIN = "successful login",
|
|
INVALID_USERNAME = "invalid username",
|
|
INVALID_PASSWORD = "invalid password",
|
|
FAILED_LOGIN = "failed login",
|
|
CLOSED_CONNECTION = "closed connection",
|
|
BREAK_IN_ATTEMPT = "break-in attempt",
|
|
OTHER = "other"
|
|
|
|
#1.1.1
|
|
def parse_line(line: str) -> Line:
|
|
data = line_pattern.match(line)
|
|
if data is None:
|
|
raise Exception(f"invalid data: {data}")
|
|
|
|
|
|
return Line (
|
|
timestamp=datetime.strptime(data.group("timestamp"), "%b %d %H:%M:%S"),
|
|
hostname=data.group("hostname"),
|
|
pid=int(data.group("pid")),
|
|
message=data.group("message"),
|
|
original_line=line
|
|
)
|
|
|
|
|
|
#1.1.2
|
|
def get_ipv4s_from_log(log: Line) -> List[str]:
|
|
return ipv4_pattern.findall(log.message)
|
|
|
|
#1.1.3
|
|
def get_user_from_log(log: Line) -> Optional[str]:
|
|
user = user_pattern.search(log.message)
|
|
return user.group("user") if user else None
|
|
|
|
#1.1.4
|
|
def get_message_type(message: str) -> MessageType:
|
|
msg = message.lower()
|
|
if ("accepted password for" in msg
|
|
or "session opened" in msg):
|
|
return MessageType.SUCCESSFUL_LOGIN
|
|
elif "invalid user" in msg:
|
|
return MessageType.INVALID_USERNAME
|
|
elif "failed password for" in msg:
|
|
return MessageType.INVALID_PASSWORD
|
|
elif ("authentication failure" in msg
|
|
or "did not receive identification string" in msg):
|
|
return MessageType.FAILED_LOGIN
|
|
elif ("connection closed" in msg
|
|
or "received disconnect" in msg
|
|
or "session closed" in msg
|
|
or "connection reset by peer" in msg):
|
|
return MessageType.CLOSED_CONNECTION
|
|
elif "break-in attempt" in msg:
|
|
return MessageType.BREAK_IN_ATTEMPT
|
|
else:
|
|
return MessageType.OTHER
|