listy skryptowe

This commit is contained in:
Dariusz Majnert 2024-06-14 16:53:58 +02:00
commit 8f6d2a37f2
59 changed files with 2636 additions and 0 deletions

3
jezyki-skryptowe/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
__pycache__
*.csv
*.png

View File

@ -0,0 +1,20 @@
import sys
from database import init_database, get_database
from models.station import Station
from models.rental import Rental
def create_tables(db):
with db:
db.create_tables([Station, Rental])
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <database_name>")
sys.exit(1)
database_name = sys.argv[1]
db = init_database(database_name + ".sqlite3")
create_tables(db)
print("Tables 'stations' and 'rentals' created successfully.")

View File

@ -0,0 +1,27 @@
from peewee import SqliteDatabase
from models.rental import Rental
from models.station import Station
_db = None
def init_database(db_name):
global _db
if _db is None:
_db = SqliteDatabase(db_name)
Station._meta.database = _db
Rental._meta.database = _db
return _db
def get_database():
global _db
if _db is None:
raise ValueError("Database is not initialized. Call init_database() first.")
return _db
def close_database():
global _db
if _db is not None:
_db.close()
Station._meta.database = None
Rental._meta.database = None
_db = None

View File

@ -0,0 +1,64 @@
from database import init_database, get_database
from models.station import Station
from models.rental import Rental
import sys
import csv
import os
def load_csv_to_db(csv_file):
try:
with open(csv_file) as f:
reader = csv.DictReader(f)
stations = dict()
rentals = list()
try:
for row in reader:
station_names = [row['Stacja wynajmu'].strip(), row['Stacja zwrotu'].strip()]
for station_name in station_names:
if not station_name in stations:
(station, _) = Station.get_or_create(name=station_name)
stations[station_name] = station
rentals.append(Rental(
uid=int(row['UID wynajmu']),
bike_id=int(row['Numer roweru']),
rent_date=row['Data wynajmu'],
ret_date=row['Data zwrotu'],
rent_station=stations[station_names[0]],
ret_station=stations[station_names[1]],
duration=int(row['Czas trwania'])
))
Rental.bulk_create(rentals, batch_size=500)
print(f"Sucessfully added {len(rentals)} rental entries.")
except Exception as ex:
print(f"There was an error while inserting data from the file: {str(ex)}")
except Exception as ex:
print(f"There was an error while reading the csv file {str(ex)}")
def main():
if len(sys.argv) != 3:
print("Usage: load_data.py <path_to_csv_file> <database_name>")
sys.exit(1)
csv_file = sys.argv[1]
db_name = sys.argv[2] + ".sqlite3"
if not os.path.exists(db_name):
print("Database does not exist! First create it using create_database.py")
sys.exit(1)
if not os.path.exists(csv_file):
print("CSV file does not exist!")
sys.exit(1)
init_database(db_name)
load_csv_to_db(csv_file)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,2 @@
from .station import Station
from .rental import Rental

View File

@ -0,0 +1,15 @@
from peewee import Model, AutoField, IntegerField, DateTimeField, ForeignKeyField
from .station import Station
class Rental(Model):
uid = IntegerField(primary_key=True)
bike_id = IntegerField()
rent_date = DateTimeField()
ret_date = DateTimeField()
rent_station = ForeignKeyField(Station, backref='rentals_rent', on_delete='CASCADE')
ret_station = ForeignKeyField(Station, backref='rentals_return', on_delete='CASCADE')
duration = IntegerField()
class Meta:
database = None
table_name = 'rentals'

View File

@ -0,0 +1,10 @@
from peewee import Model, CharField, AutoField
class Station(Model):
id = AutoField()
name = CharField(unique = True)
class Meta:
database = None
table_name = 'stations'

View File

@ -0,0 +1,204 @@
from datetime import datetime, timedelta
import sys
from PyQt6.QtWidgets import (
QApplication,
QWidget,
QVBoxLayout,
QDateEdit,
QGridLayout,
QHBoxLayout,
QPushButton,
QLabel,
QListWidget,
QListWidgetItem,
QFrame,
QFileDialog,
QMessageBox
)
from database import init_database, get_database, close_database
from models.station import Station
from models.rental import Rental
from peewee import fn, SQL
DATA_ROLE = 1001
LABELS = [
"Avg. rent duration when starting there",
"Avg. rent duration when stopping there",
"Count of different bikes stopped there",
"Most popular destination from there"
]
class DatabaseViewer(QWidget):
def __init__(self):
self.value_labels = {}
super().__init__()
self.init()
self.stations = []
def init(self):
self.setWindowTitle("Database stats")
self.setGeometry(100, 100, 900, 500)
mainLayout = QVBoxLayout()
topLayout = QHBoxLayout()
self.pathLabel = QLabel("Select database file...")
self.openButton = QPushButton("Open")
self.openButton.clicked.connect(self.open_file_dialog)
topLayout.addWidget(self.pathLabel)
topLayout.addWidget(self.openButton)
middleLayout = QHBoxLayout()
self.stationsListWidget = QListWidget()
self.stationsListWidget.itemClicked.connect(self.select_station)
middleLayout.addWidget(self.stationsListWidget, 2)
self.statsFrame = QFrame()
statsLayout = QVBoxLayout(self.statsFrame)
self.create_stats_widget(statsLayout)
middleLayout.addWidget(self.statsFrame, 2)
mainLayout.addLayout(topLayout)
mainLayout.addLayout(middleLayout)
self.setLayout(mainLayout)
def reset_ui(self):
self.pathLabel.setText("Select database file...")
self.stationsListWidget.clear()
for label in LABELS:
self.value_labels[label].setText("")
def select_station(self, item):
station_id = item.data(DATA_ROLE)
try:
avg_time_start = (Rental
.select(fn.AVG(Rental.duration))
.where(Rental.rent_station == station_id)
.scalar()
) or 0.0
avg_time_stopped = (Rental
.select(fn.AVG(Rental.duration))
.where(Rental.ret_station == station_id)
.scalar()
) or 0.0
bikes_count = (Rental
.select(fn.COUNT(fn.DISTINCT(Rental.bike_id)))
.where(Rental.ret_station == station_id)
.scalar()
) or 0
(most_popular, count) = (Rental
.select(Rental.ret_station, fn.COUNT(Rental.ret_station).alias("count"))
.where(Rental.rent_station == station_id)
.group_by(Rental.ret_station)
.order_by(SQL("count").desc())
.scalar(as_tuple=True)
) or (0,0)
most_popular_station = (Station
.select()
.where(Station.id == most_popular)
.get()
) if count != 0 else None
self.value_labels[LABELS[0]].setText(f"{avg_time_start:.2f} minutes")
self.value_labels[LABELS[1]].setText(f"{avg_time_stopped:.2f} minutes")
self.value_labels[LABELS[2]].setText(str(bikes_count))
self.value_labels[LABELS[3]].setText(f"{most_popular_station.name if most_popular_station is not None else '-'} ({count} trips)")
except Exception as ex:
self.show_error(f"Failed when executing database queries: {str(ex)}")
def create_stats_widget(self, layout):
gridLayout = QGridLayout()
for index, label in enumerate(LABELS):
labelWidget = QLabel(f"{label}:")
valueLabel = QLabel()
gridLayout.addWidget(labelWidget, index, 0)
gridLayout.addWidget(valueLabel, index, 1)
self.value_labels[label] = valueLabel
layout.addLayout(gridLayout)
def open_file_dialog(self):
if len(self.stations) > 0:
self.openButton.setText("Open")
self.stations = []
self.reset_ui()
try:
close_database()
except:
self.show_error("Failed to close database!")
else:
fileName, _ = QFileDialog.getOpenFileName(
self,
"Open Database File",
"",
"Sqlite Files (*.sqlite3);;All Files (*)",
)
if fileName:
self.load_database(fileName)
def load_database(self, file_path):
self.reset_ui()
self.openButton.setText("Close")
self.pathLabel.setText(file_path)
try:
init_database(file_path)
self.stations = Station.select().order_by(Station.name)
self.update_list()
except:
self.show_error("Failed to load database!")
def update_list(self):
self.stationsListWidget.clear()
for station in self.stations:
item = QListWidgetItem(station.name)
item.setData(DATA_ROLE, station.id)
self.stationsListWidget.addItem(item)
def show_error(self, text):
message_box = QMessageBox()
message_box.setIcon(QMessageBox.Icon.Critical)
message_box.setText("Error")
message_box.setWindowTitle("Error")
message_box.setInformativeText(text)
message_box.exec()
def main():
app = QApplication(sys.argv)
ex = DatabaseViewer()
ex.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
import sys
for line in sys.stdin:
print(line.rstrip())

View File

@ -0,0 +1,19 @@
import sys
def require_int_arg(pos):
if len(sys.argv) <= pos:
print('nieprawidlowa liczba argumentow')
exit(2)
try:
arg = int(sys.argv[pos])
except ValueError:
print(f'nieprawidlowy argument - na pozycji {pos} oczekiwana jest liczba calkowita')
exit(2)
return arg
def require_str_arg(pos):
if len(sys.argv) <= pos:
print('nieprawidlowa liczba argumentow')
exit(2)
return sys.argv[pos]

View File

@ -0,0 +1,20 @@
import re
date_pattern = r"""
^(\d{2})/(.+)/(\d{4}):(\d{2}):(\d{2}):(\d{2})$
"""
def parseDate(date):
data = re.search(date_pattern, date, re.VERBOSE)
if data is None:
raise Exception(f"invalid date: {date}")
return {
"day": int(data.group(1)),
"month": data.group(2),
"year": int(data.group(3)),
"hour": int(data.group(4)),
"minute": int(data.group(5)),
"second": int(data.group(6))
}

View File

@ -0,0 +1,31 @@
import re
log_pattern = r"""
(.+)\s-\s-\s
\[(.*)\s(.*)\]
\s"(.*)"
\s(\d{3})
\s(\d+|-)$
"""
def parseData(line):
data = re.search(log_pattern, line, re.VERBOSE)
if data is None:
raise Exception(f"invalid data: {line}")
req = data.group(4).split(' ')
return {
"address": data.group(1),
"timestamp": data.group(2),
"tz": data.group(3),
"method": req[0],
"path": req[1],
"status_code": int(data.group(5)),
"size": 0 if data.group(6) == '-' else int(data.group(6))
}

View File

@ -0,0 +1,12 @@
import sys
import parser
def processLines(lines):
data = []
for line in lines:
try:
data.append(parser.parseData(line.rstrip()))
except Exception:
print(f"invalid data: {line.rstrip()}, ignoring", file=sys.stderr)
return data

View File

@ -0,0 +1,13 @@
import sys
import processor
import cli
def main():
status_code = cli.require_int_arg(1)
data = processor.processLines(sys.stdin)
requests_count = len([x for x in data if x['status_code'] == status_code])
print(requests_count)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,12 @@
import sys
import processor
def main():
data = processor.processLines(sys.stdin)
total_sent = sum(x['size'] for x in data)
print(requests_count)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,12 @@
import sys
import processor
def main():
data = processor.processLines(sys.stdin)
biggest_req = max(data, key=lambda x: x['size'])
print(f"{biggest_req['path']} {biggest_req['size']}")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,20 @@
import os
import sys
import processor
graphics_formats = [".gif", ".jpg", ".jpeg", ".xbm"]
def pathExtension(path):
(name, extension) = os.path.splitext(path)
return extension
def main():
data = processor.processLines(sys.stdin)
graphics_reqs_count = len([x for x in data if pathExtension(x['path']) in graphics_formats])
print(f"wszystkie: {len(data)}")
print(f"graficzne: {graphics_reqs_count}")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,19 @@
import sys
import parser
import cli
def main():
status_code = cli.require_int_arg(1)
for line in sys.stdin:
try:
data = parser.parseData(line.rstrip())
except Exception:
print(f"invalid data: {line.rstrip()}, ignoring", file=sys.stderr)
if data['status_code'] == status_code:
print(line.rstrip())
if __name__ == '__main__':
main()

View File

@ -0,0 +1,27 @@
import sys
import parser
import dateParser
import cli
def main():
lower_bound = cli.require_int_arg(1)
upper_bound = cli.require_int_arg(2)
for line in sys.stdin:
try:
data = parser.parseData(line.rstrip())
date = dateParser.parseDate(data['timestamp'])
except Exception:
print(f"invalid data: {line.rstrip()}, ignoring", file=sys.stderr)
if lower_bound > upper_bound:
if date['hour'] >= lower_bound or date['hour'] <= upper_bound:
print(line.rstrip())
else:
if date['hour'] >= lower_bound and date['hour'] <= upper_bound:
print(line.rstrip())
if __name__ == '__main__':
main()

View File

@ -0,0 +1,24 @@
import sys
import parser
import dateParser
import cli
def main():
day = cli.require_int_arg(1)
month = cli.require_str_arg(2)
year = cli.require_int_arg(3)
for line in sys.stdin:
try:
data = parser.parseData(line.rstrip())
date = dateParser.parseDate(data['timestamp'])
except Exception:
print(f"invalid data: {line.rstrip()}, ignoring", file=sys.stderr)
if day == date['day'] and month == date['month'] and year == date['year']:
print(line.rstrip())
if __name__ == '__main__':
main()

View File

@ -0,0 +1,19 @@
import sys
import parser
import cli
def main():
country_code = cli.require_str_arg(1)
for line in sys.stdin:
try:
data = parser.parseData(line.rstrip())
except Exception:
print(f"invalid data: {line.rstrip()}, ignoring", file=sys.stderr)
if data['address'].endswith(country_code):
print(line.rstrip())
if __name__ == '__main__':
main()

View File

@ -0,0 +1,21 @@
import re
import sys
ip_regex = r"""^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)(\.(?!$)|$)){4}$"""
domain_regex = r"""^(?:[a-zA-Z0-9][a-zA-Z0-9-]{0,61}[a-zA-Z0-9]\.)+[a-zA-Z]{2,}$"""
def get_entries_by_addr(logs, address):
if not (re.match(ip_regex,address) or re.match(domain_regex, address)):
raise ValueError("invalid address")
return [log for log in logs if log[0] == address]
# test
if __name__ == "__main__":
from read_log import read_log
logs = read_log()
sorted_logs = get_entries_by_addr(logs, sys.argv[1].rstrip())
print("[",*sorted_logs, "]", sep="\n")

View File

@ -0,0 +1,18 @@
import os
import sys
def pathExtension(path):
(name, extension) = os.path.splitext(path)
return extension.lower()
def get_entries_by_extension(logs, extension):
return [log for log in logs if pathExtension(log[3]) == extension]
# test
if __name__ == "__main__":
from read_log import read_log
logs = read_log()
logs_by_extension = get_entries_by_extension(logs, sys.argv[1].lower())
print("[",*logs_by_extension, "]", sep="\n")

View File

@ -0,0 +1,24 @@
import sys
def get_failed_reads(logs, action):
logs_4xx = [log for log in logs if (log[4] // 100) == 4]
logs_5xx = [log for log in logs if (log[4] // 100) == 5]
if action == "4xx":
return logs_4xx
elif action == "5xx":
return logs_5xx
elif action == "join":
return logs_4xx + logs_5xx
elif action == "separate":
return (logs_4xx, logs_5xx)
else:
raise ValueError("invalid argument")
# test
if __name__ == "__main__":
from read_log import read_log
logs = read_log()
failed_reads = get_failed_reads(logs, sys.argv[1].lower())
print("[",*failed_reads, "]", sep="\n")

View File

@ -0,0 +1,11 @@
import sys
def print_entries(logs, N):
print(*logs[:N], sep="\n")
# test
if __name__ == "__main__":
from read_log import read_log
logs = read_log()
print_entries(logs, int(sys.argv[1]))

View File

@ -0,0 +1,39 @@
import re
import datetime
import sys
log_pattern = r"""
(.+)\s-\s-\s
\[(.*)\s(.*)\]
\s"(.*)"
\s(\d{3})
\s(\d+|-)$
"""
def parseData(line):
data = re.search(log_pattern, line, re.VERBOSE)
if data is None:
raise Exception(f"invalid data: {line}")
date = datetime.datetime.strptime(data.group(2), "%d/%b/%Y:%H:%M:%S")
req = data.group(4).split(' ')
return (data.group(1), date, req[0], req[1], int(data.group(5)), data.group(6))
def read_log():
data = []
for line in sys.stdin:
try:
data.append(parseData(line.rstrip()))
except Exception as e:
print(f"invalid data: {line.rstrip()}, ignoring", file=sys.stderr)
return data
# test
if __name__ == "__main__":
print("[",*read_log(), "]", sep="\n")

View File

@ -0,0 +1,11 @@
import sys
def sort_log(logs, sort_by):
return sorted(logs, key=lambda x: x[sort_by])
# test
if __name__ == "__main__":
from read_log import read_log
logs = read_log()
sorted_logs = sort_log(logs, int(sys.argv[1]))
print("[",*sorted_logs, "]", sep="\n")

View File

@ -0,0 +1,17 @@
def entry_to_dict(entry):
return {
"ip": entry[0],
"datetime": entry[1],
"method": entry[2],
"path": entry[3],
"status_code": entry[4],
"size": entry[5]
}
# test
if __name__ == "__main__":
from read_log import read_log
print("[",*[entry_to_dict(x) for x in read_log()], "]", sep="\n")

View File

@ -0,0 +1,12 @@
def get_addrs(log_dict):
return [*log_dict.keys()]
# test
if __name__ == "__main__":
from read_log import read_log
from log_to_dict import log_to_dict
log_dict = log_to_dict(read_log())
print(get_addrs(log_dict))

View File

@ -0,0 +1,16 @@
from entry_to_dict import entry_to_dict
def log_to_dict(logs):
result = {}
for log in logs:
result.setdefault(log[0], []).append(entry_to_dict(log))
return result
# test
if __name__ == "__main__":
from read_log import read_log
logs = read_log()
for k,v in log_to_dict(logs).items():
print(k + ": [", *v, "]", sep="\n")

View File

@ -0,0 +1,35 @@
PADDING = [30, 10, 20, 20, 10, 1, 0]
def strftime(datetime):
return datetime.strftime("%d/%b/%Y:%H:%M:%S")
def print_padded(*args):
for i, arg in enumerate(args):
print(str(arg).rjust(0 if i >= len(PADDING) else PADDING[i]), end=" ")
print()
def print_dict_entry_dates(log_dict, status_code):
print_padded("address", "req_count","first_req_time", "last_req_time", f"code_{status_code}", "/", "other_codes")
for ip,logs in log_dict.items():
first_req = min(logs, key=lambda log: log['datetime'])
first_req_time = strftime(first_req['datetime'])
last_req = max(logs, key=lambda log: log['datetime'])
last_req_time = strftime(last_req['datetime'])
searched_codes_count = len([x for x in logs if x['status_code'] == status_code])
print_padded(ip, len(logs), first_req_time, last_req_time, searched_codes_count,"/", len(logs) - searched_codes_count)
# test
if __name__ == "__main__":
from read_log import read_log
from log_to_dict import log_to_dict
import sys
log_dict = log_to_dict(read_log())
print_dict_entry_dates(log_dict, int(sys.argv[1]))

View File

@ -0,0 +1,39 @@
import re
import datetime
import sys
log_pattern = r"""
(.+)\s-\s-\s
\[(.*)\s(.*)\]
\s"(.*)"
\s(\d{3})
\s(\d+|-)$
"""
def parseData(line):
data = re.search(log_pattern, line, re.VERBOSE)
if data is None:
raise Exception(f"invalid data: {line}")
date = datetime.datetime.strptime(data.group(2), "%d/%b/%Y:%H:%M:%S")
req = data.group(4).split(' ')
return (data.group(1), date, req[0], req[1], int(data.group(5)), data.group(6))
def read_log():
data = []
for line in sys.stdin:
try:
data.append(parseData(line.rstrip()))
except Exception as e:
print(f"invalid data: {line.rstrip()}, ignoring", file=sys.stderr)
return data
# test
if __name__ == "__main__":
print("[",*read_log(), "]", sep="\n")

View File

@ -0,0 +1,59 @@
import subprocess
import os
from datetime import datetime
import utils
import sys
def append_history(log):
try:
with open(utils.history_filepath(), "a") as f:
f.write(f"{log['date']};{log['path']};{log['filename']}\n")
except:
print("Could not append to history file")
def main(archive_path):
backup_directory = utils.backup_dir()
if not os.path.exists(backup_directory):
try:
os.makedirs(backup_directory)
except:
print("Could not create backup directory")
exit(1)
if not os.path.isdir(backup_directory):
print("Backup directory path is not a directory!")
exit(1)
name = os.path.basename(archive_path)
datenow = datetime.now()
date = datenow.strftime("%Y%m%d%H%M%S")
filename = f"{date}-{name}.tar.gz"
destination = os.path.join(backup_directory, filename)
try:
subprocess.run(["tar", "-czf", destination, "-C", archive_path, os.path.curdir])
except:
print("Failed to create archive of the directory")
log = {
'date': datenow.isoformat(),
'path': archive_path,
'filename': filename
}
append_history(log)
if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <directory>")
exit(1)
if not os.path.exists(sys.argv[1]):
print(f"Path {sys.argv[1]} does not exist!")
exit(1)
if not os.path.isdir(sys.argv[1]):
print(f"Path {sys.argv[1]} is not a directory!")
exit(1)
main(os.path.abspath(sys.argv[1]))

View File

@ -0,0 +1,86 @@
import subprocess
import os
from datetime import datetime
import utils
import sys
import shutil
def extract_backup(from_file, target_directory):
files = os.listdir(target_directory)
try:
for file in files:
file_path = os.path.join(target_directory, file)
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print(f"failed to remove file from target directory {str(e)}")
return
try:
subprocess.run(["tar", "-xzf", from_file, "--absolute-names", "-C", target_directory])
except:
print("Failed to extract backup from the archive")
def read_history():
entries = []
try:
with open(utils.history_filepath(), 'r') as f:
lines = f.readlines()
for line in lines:
data = line.strip().split(';')
try:
if len(data) != 3:
raise ValueError()
entries.append({
'date': datetime.fromisoformat(data[0]),
'path': data[1],
'filename': data[2]
})
except :
print(f"history entry {line} malformed, ignoring")
except:
print("Could not read history file")
exit(1)
return sorted(entries, key=lambda entry: entry['date'], reverse=True)
def main(restore_path):
backup_directory = utils.backup_dir()
entries = read_history()
print("List of backups possible to restore")
for i, entry in enumerate(entries):
print(f"{i+1} {entry['date']} {entry['filename']} (from {entry['path']})")
try:
backup_num = int(input("Number of the backup to restore: ")) - 1
if backup_num < 0 or backup_num >= len(entries):
raise ValueError()
except:
print("Invalid backup number")
return
backup = entries[backup_num]
extract_backup(os.path.join(backup_directory, backup['filename']), restore_path)
if __name__ == "__main__":
workdir = os.getcwd()
if len(sys.argv) > 2:
print(f"Usage: {sys.argv[0]} <directory>")
exit(1)
if len(sys.argv) == 2:
if not os.path.exists(sys.argv[1]):
print(f"Path {sys.argv[1]} does not exist!")
exit(1)
if not os.path.isdir(sys.argv[1]):
print(f"Path {sys.argv[1]} is not a directory!")
exit(1)
workdir = sys.argv[1]
main(os.path.abspath(workdir))

View File

@ -0,0 +1,6 @@
import os
def backup_dir():
return os.path.abspath(os.environ.get("BACKUPS_DIR", os.path.join(os.path.expanduser("~"), ".backups")))
def history_filepath():
return os.path.join(backup_dir(), "history.csv")

View File

@ -0,0 +1,31 @@
import os
import sys
#1.3
ENV = sorted(os.environ.items())
def filter_dict(filtered_dict, strings):
result = dict()
for k,v in filtered_dict.items():
if any(x in k for x in strings):
result[k] = v
return result
def print_dict(printed_dict):
for k,v in printed_dict.items():
print(k + ": " + v, sep="\n")
def main():
print(ENV)
#1.1
print('zmienne środowiskowe:')
print_dict(ENV)
#1.2
filter_strings = sys.argv[1:]
print('zmienne środowiskowe filtrowane przez:', ", ".join(filter_strings))
print_dict(filter_dict(ENV, filter_strings))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,29 @@
import os
import sys
PATH = os.environ['PATH'].split(os.pathsep)
def is_executable(filepath):
return os.access(filepath, os.X_OK)
def main():
# 2.1
print('katalogi PATH:')
print(*PATH, sep='\n')
print()
#2.2
for d in PATH:
if not os.path.exists(d):
print(f'katalog {d} w zmiennej srodowiskowej PATH nie istnieje')
else:
print(f'katalog {d} w zmiennej srodowiskowej PATH')
print('pliki wykonywalne wewnatrz:')
files = os.listdir(d)
for f in files:
filepath = os.path.join(d,f)
if is_executable(filepath):
print(filepath)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,66 @@
import sys
import argparse
from collections import deque
def handle_print(source, count, is_bytes):
if is_bytes:
decoded = bytearray(deque(source, count)).decode('utf-8')
print(decoded)
else:
for l in deque(source, count):
print(l, end="")
def tail(filename, count, is_bytes=False):
if filename:
try:
if is_bytes:
with open(filename, 'rb') as f:
handle_print(f.read(), count, is_bytes)
else:
with open(filename, 'r') as f:
handle_print(f, count, is_bytes)
except FileNotFoundError:
print("File not found!")
exit(1)
else:
source = sys.stdin
if is_bytes:
source = source.buffer.read()
handle_print(source, count, is_bytes)
def positive_int(arg):
try:
val = int(arg)
except ValueError:
raise argparse.ArgumentTypeError("Must be an integer")
if val < 0:
raise argparse.ArgumentTypeError("Must be greater than or equal to 0")
return val
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("--lines", type=positive_int, default=10, help="Number of lines to read from the end")
parser.add_argument("--bytes", type=positive_int, nargs="?", help="Number of bytes to read from the end", )
parser.add_argument("filename", nargs="?", help="Path of the file to be read")
return parser.parse_args()
def main(args):
if args.bytes is not None:
tail(args.filename,args.bytes, True)
else:
tail(args.filename, args.lines)
if __name__ == "__main__":
args = parse_arguments()
main(args)

View File

@ -0,0 +1,86 @@
import subprocess
import sys
import os
from collections import Counter
SUBPROGRAM_PATH = "./file-analyzer"
def most_common_in_list(searched_list, key):
return Counter(key(el) for el in searched_list).most_common(1)
def summarize(analyses):
char_counter = most_common_in_list(analyses, lambda x: x['most_common_char'])
word_counter = most_common_in_list(analyses, lambda x: x['most_common_word'])
return {
'total_files_read': len(analyses),
'chars_total': sum(analysis['chars_total'] for analysis in analyses),
'words_total': sum(analysis['words_total'] for analysis in analyses),
'rows_total': sum(analysis['rows_total'] for analysis in analyses),
'most_common_char': char_counter[0][0] if len(char_counter) != 0 else " ",
'most_common_word': word_counter[0][0] if len(word_counter) != 0 else "",
}
def main(path):
analyzedFiles = []
try:
process = subprocess.Popen(
[SUBPROGRAM_PATH],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True
)
except:
print("Could not open analyzer!")
exit(1)
for dirpath, dirs, files in os.walk(path):
for filename in files:
path = os.path.join(dirpath, filename)
print(path, file=process.stdin)
process.stdin.flush()
output = process.stdout.readline()
data = output.strip().split("\t")
if len(data) != 6:
# error response
print(data[0])
continue
analysis = {
'path': data[0],
'chars_total': int(data[1]),
'words_total': int(data[2]),
'rows_total': int(data[3]),
'most_common_char': data[4][1:-1],
'most_common_word': data[5][1:-1]
}
analyzedFiles.append(analysis)
process.terminate()
summary = summarize(analyzedFiles)
print(f"Number of read files: {summary['total_files_read']}" )
print(f"Total characters in all files: {summary['chars_total']}" )
print(f"Total words in all files: {summary['words_total']}" )
print(f"Total rows in all files: {summary['rows_total']}" )
print(f"Most common character between files: '{summary['most_common_char']}'" )
print(f"Most common word between files: \"{summary['most_common_word']}\"" )
if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <directory>")
exit(1)
if not os.path.exists(sys.argv[1]):
print(f"Path {sys.argv[1]} does not exist!")
exit(1)
if not os.path.isdir(sys.argv[1]):
print(f"Path {sys.argv[1]} is not a directory!")
exit(1)
main(sys.argv[1])

View File

@ -0,0 +1,66 @@
import random
import statistics
import datetime
from parser import Line, get_user_from_log, get_message_type, MessageType
from typing import Iterable, List, Tuple, Dict, Optional
from collections import Counter
#1.3.1
def random_user_logs(logs: List[Line], n: int) -> List[Line]:
users_set = set(get_user_from_log(log) for log in logs)
users_set.discard(None)
users = list(users_set)
user = random.choice(users)
user_logs = [log for log in logs if get_user_from_log(log) == user]
if n > len(user_logs):
return user_logs
else:
return random.sample(user_logs, n)
#1.3.2
def get_average_session_duration(logs: List[Line]) -> Tuple[float, float]:
sessions = dict()
session_durations = []
for log in logs:
if "session opened" in log.message:
sessions[(log.pid)] = log.timestamp
elif "session closed" in log.message:
# find a session opened with a given pid
start_time = sessions.pop(log.pid, None)
if start_time:
# if a session was opened in previous year
if start_time > log.timestamp:
start_time = start_time.replace(year=start_time.year - 1)
# calculate duration in seconds
duration = (log.timestamp - start_time).total_seconds()
session_durations.append(duration)
measured_durations = len(session_durations)
if measured_durations > 0:
mean = statistics.mean(session_durations)
stdev = statistics.stdev(session_durations) if measured_durations >= 2 else 0.0
return mean, stdev
else:
return 0.0, 0.0
def get_average_session_duration_by_user(logs: List[Line]) -> Dict[str, Tuple[float, float]]:
user_logs = [(log, get_user_from_log(log)) for log in logs]
user_stats = dict()
users = set(user for (log, user) in user_logs)
users.discard(None)
for user in users:
user_stats[user] = get_average_session_duration([log for (log, searched_user) in user_logs if user == searched_user])
return user_stats
#1.3.3
def get_most_and_least_frequent_user(logs: List[Line]) -> Tuple[Optional[Tuple[str, int]], Optional[Tuple[str, int]]]:
login_logs = [log for log in logs if "session opened" in log.message]
login_counter = Counter(get_user_from_log(log) for log in login_logs)
most_common = login_counter.most_common()
if len(most_common) == 0:
return None, None
return most_common[0], most_common[-1]

View File

@ -0,0 +1,4 @@
import logging
logger = logging.getLogger("main")

129
jezyki-skryptowe/lista5/main.py Executable file
View File

@ -0,0 +1,129 @@
#!/usr/bin/env python3
import sys
import logging
from logger import logger
import log_statistics
import reader
import parser
import argparse
from enum import StrEnum
class Commands(StrEnum):
PARSE = "parse"
IPV4 = "ipv4",
USER = "user",
TYPE = "type",
RANDOM = "random",
DURATION = "duration",
FREQ = "freq"
def main(args):
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.addFilter(lambda log: log.levelno <= logging.WARNING)
stdout_handler.setLevel(logging.DEBUG)
stdout_handler.setFormatter(logging.Formatter('%(levelname)s - %(name)s - %(message)s'))
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(logging.ERROR)
stderr_handler.setFormatter(logging.Formatter('%(levelname)s - %(name)s - %(message)s'))
logger.addHandler(stdout_handler)
logger.addHandler(stderr_handler)
if args.log_level == "NONE":
logger.disabled = True
else:
logger.setLevel(args.log_level)
try:
logs = reader.read_logs(args.filename)
except:
logger.critical(f"Could not open file {args.filename}")
exit(1)
if args.subcommand == Commands.PARSE:
for line in logs:
print(f"{line.timestamp.strftime('%d/%m:%H:%M:%S')};{line.hostname};{line.pid};{line.message}")
#1.1.2
if args.subcommand == Commands.IPV4:
for line in logs:
print(*parser.get_ipv4s_from_log(line), sep=", ")
#1.1.3
elif args.subcommand == Commands.USER:
for line in logs:
print(parser.get_user_from_log(line) or "")
#1.1.4
elif args.subcommand == Commands.TYPE:
for line in logs:
print(parser.get_message_type(line.message))
#1.3.1
elif args.subcommand == Commands.RANDOM:
random_logs = log_statistics.random_user_logs(list(logs), args.n)
for line in random_logs:
print(line.original_line)
#1.3.2
elif args.subcommand == Commands.DURATION:
log_list = list(logs)
if args.group_by_users:
durations_by_user = log_statistics.get_average_session_duration_by_user(log_list)
for (username, (avg_time, stdev)) in sorted(durations_by_user.items(), key=lambda x: x[1], reverse=True):
print(f"{username}: average={avg_time:.4f}s, stdev={stdev:.4f}s")
else:
(avg_time, stdev) = log_statistics.get_average_session_duration(log_list)
print(f"average={avg_time:.4f}s, stdev={stdev:.4f}s")
#1.3.3
elif args.subcommand == Commands.FREQ:
log_list = list(logs)
(most, least) = log_statistics.get_most_and_least_frequent_user(log_list)
if most and least:
print(f"most frequent user - {most[0]} - {most[1]} login(s)")
print(f"least frequent user - {least[0]} - {least[1]} login(s)")
else:
print("could not determine most and least frequent user")
def positive_int(arg):
try:
val = int(arg)
except ValueError:
raise argparse.ArgumentTypeError("Must be an integer")
if val < 0:
raise argparse.ArgumentTypeError("Must be greater than or equal to 0")
return val
def parse_args():
parser = argparse.ArgumentParser(description="Script for analyzing sshd logs")
#1.4.1
parser.add_argument("filename", help="Path of the file to be read")
#1.4.2
parser.add_argument("--log-level", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "NONE"], default="INFO", help="Level of logging, default is info")
#1.4.3
subparsers = parser.add_subparsers(dest="subcommand", title="subcommands", required=True)
subparsers.add_parser(Commands.PARSE, help="Parse logs into semicolon separated structure")
subparsers.add_parser(Commands.IPV4, help="Get all IPv4 addresses from log messsages")
subparsers.add_parser(Commands.USER, help="Extract user from log messages")
subparsers.add_parser(Commands.TYPE, help="Get type of log message")
random_parser = subparsers.add_parser(Commands.RANDOM, help="Get specified amount (default is 3) of random logs from random user")
random_parser.add_argument("-n", type=positive_int, default=3, help="Amount of random logs to be returned")
duration_parser = subparsers.add_parser(Commands.DURATION, help="Get average session duration and standard deviation")
duration_parser.add_argument("--group-by-users", action="store_true", help="Whether session durations should be calculated for each user separately")
subparsers.add_parser(Commands.FREQ, help="Get most and least frequently logging users")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
main(args)

View File

@ -0,0 +1,74 @@
import re
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
from enum import StrEnum
line_pattern = re.compile(r"(?P<timestamp>\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2})\s(?P<hostname>\S+)\ssshd\[(?P<pid>\d+)\]:\s(?P<message>.+)")
ipv4_pattern = re.compile(r"(?P<address>(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))")
user_pattern = re.compile(r"([Ii]nvalid user |Failed password for(?: invalid user)? |Too many authentication failures for |Accepted password for |user[= ])(?P<user>\w+)")
@dataclass
class Line:
timestamp: datetime
hostname: str
pid: int
message: str
original_line: str
class MessageType(StrEnum):
SUCCESSFUL_LOGIN = "successful login",
INVALID_USERNAME = "invalid username",
INVALID_PASSWORD = "invalid password",
FAILED_LOGIN = "failed login",
CLOSED_CONNECTION = "closed connection",
BREAK_IN_ATTEMPT = "break-in attempt",
OTHER = "other"
#1.1.1
def parse_line(line: str) -> Line:
data = line_pattern.match(line)
if data is None:
raise Exception(f"invalid data: {data}")
return Line (
timestamp=datetime.strptime(data.group("timestamp"), "%b %d %H:%M:%S"),
hostname=data.group("hostname"),
pid=int(data.group("pid")),
message=data.group("message"),
original_line=line
)
#1.1.2
def get_ipv4s_from_log(log: Line) -> List[str]:
return ipv4_pattern.findall(log.message)
#1.1.3
def get_user_from_log(log: Line) -> Optional[str]:
user = user_pattern.search(log.message)
return user.group("user") if user else None
#1.1.4
def get_message_type(message: str) -> MessageType:
msg = message.lower()
if ("accepted password for" in msg
or "session opened" in msg):
return MessageType.SUCCESSFUL_LOGIN
elif "invalid user" in msg:
return MessageType.INVALID_USERNAME
elif "failed password for" in msg:
return MessageType.INVALID_PASSWORD
elif ("authentication failure" in msg
or "did not receive identification string" in msg):
return MessageType.FAILED_LOGIN
elif ("connection closed" in msg
or "received disconnect" in msg
or "session closed" in msg
or "connection reset by peer" in msg):
return MessageType.CLOSED_CONNECTION
elif "break-in attempt" in msg:
return MessageType.BREAK_IN_ATTEMPT
else:
return MessageType.OTHER

View File

@ -0,0 +1,35 @@
from parser import get_message_type, parse_line, Line, MessageType
import logging
from logger import logger
def read_logs(path: str) -> Line:
for line in open(path, 'r'):
#1.2.1
logger.debug(f"read {len(line.encode('utf-8'))} bytes")
try:
parsed_line = parse_line(line.strip())
except ValueError as e:
logger.error(str(e))
continue
msg_type = get_message_type(parsed_line.message)
if msg_type == MessageType.SUCCESSFUL_LOGIN:
logger.info("read successful login message")
#1.2.2
elif msg_type == MessageType.CLOSED_CONNECTION:
logger.info("read closed connection message")
#1.2.3
elif msg_type == MessageType.FAILED_LOGIN:
logger.warning("read failed login message")
#1.2.4
elif msg_type == MessageType.INVALID_PASSWORD:
logger.error("read invalid password message")
elif msg_type == MessageType.INVALID_USERNAME:
logger.error("read invalid username message")
#1.2.5
elif msg_type == MessageType.BREAK_IN_ATTEMPT:
logger.critical("read break-in attempt message")
yield parsed_line

View File

@ -0,0 +1,131 @@
from datetime import datetime
from abc import ABC, abstractmethod
import re
class SSHLogEntry(ABC):
timestamp: str
hostname: str | None
pid: int
message: str
# 4
_raw_entry: str
_line_pattern = re.compile(r"(?P<timestamp>\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2})\s(?P<hostname>\S+)\ssshd\[(?P<pid>\d+)\]:\s(?P<message>.+)")
_ipv4_pattern = re.compile(r"(?P<address>(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))")
_user_pattern = re.compile(r"([Ii]nvalid user |Failed password for(?: invalid user)? |Too many authentication failures for |Accepted password for |user[= ])(?P<user>\w+)")
#1.a
def __init__(self, line: str):
data = self._line_pattern.match(line)
if data is None:
raise Exception(f"invalid data: {data}")
timestamp = data.group("timestamp")
self.timestamp = timestamp
self.hostname = data.group("hostname")
self.pid = int(data.group("pid"))
self.message = data.group("message")
self._raw_entry = line.strip()
#1.b
def __str__(self) -> str:
return self._raw_entry
#1.c
def ipv4(self):
addresses = self._ipv4_pattern.findall(self.message)
if len(addresses) == 0:
return None
return addresses[0]
# 3
@abstractmethod
def validate(self) -> bool:
raise NotImplementedError()
# 5
@property
def has_ip(self):
return self.ipv4() is not None
# 6
def __repr__(self) -> str:
return (f"SSHLogEntry(timestamp='{self.timestamp}', hostname={self.hostname}, pid={self.pid}, message='{self.message}')")
def __eq__(self, other: "SSHLogEntry") -> bool:
return self._raw_entry == other._raw_entry
def __lt__(self, other: "SSHLogEntry") -> bool:
return self.timestamp_datetime < other.timestamp_datetime
def __gt__(self, other: "SSHLogEntry") -> bool:
return self.timestamp_datetime > other.timestamp_datetime
@property
def timestamp_datetime(self):
return datetime.strptime(self.timestamp, "%b %d %H:%M:%S"),
#2.a
class SSHRejectedPassword(SSHLogEntry):
user: str
def __init__(self, line: str):
super().__init__(line)
user = self._user_pattern.search(self.message)
if user is None:
raise Exception(f"invalid data {data}")
self.user = user.group("user")
def validate(self):
match = self._line_pattern.match(self._raw_entry)
user = self._user_pattern.search(self.message)
return (match and user
and self.timestamp == match.group("timestamp")
and self.hostname == match.group("hostname")
and self.pid == int(match.group("pid"))
and self.message == match.group("message")
and self.user == user.group("user")
)
#2.b
class SSHAcceptedPassword(SSHLogEntry):
user: str
def __init__(self, line: str):
super().__init__(line)
user = self._user_pattern.search(self.message)
if user is None:
raise Exception(f"invalid data {data}")
self.user = user.group("user")
def validate(self):
match = self._line_pattern.match(self._raw_entry)
user = self._user_pattern.search(self.message)
return (match and user
and self.timestamp == match.group("timestamp")
and self.hostname == match.group("hostname")
and self.pid == int(match.group("pid"))
and self.message == match.group("message")
and self.user == user.group("user")
)
#2.c
class SSHError(SSHLogEntry):
def validate(self):
match = self._line_pattern.match(self._raw_entry)
return (match
and self.timestamp == match.group("timestamp")
and self.hostname == match.group("hostname")
and self.pid == int(match.group("pid"))
and self.message == match.group("message")
)
#2.d
class SSHOther(SSHLogEntry):
def validate(self):
return True

View File

@ -0,0 +1,42 @@
from SSHLogEntry import (
SSHRejectedPassword,
SSHAcceptedPassword,
SSHError,
SSHOther,
SSHLogEntry
)
from typing import List
class SSHLogJournal:
_entries: List[SSHLogEntry]
def __init__(self):
self._entries = []
def __len__(self):
return len(self._entries)
def __iter__(self):
return iter(self._entries)
def __contains__(self, value):
return value in self._entries
def append(self, log: str):
if "Failed password for invalid user " in log:
entry = SSHRejectedPassword(log)
elif "Accepted password for " in log:
entry = SSHAcceptedPassword(log)
elif "error: " in log:
entry = SSHError(log)
else:
entry = SSHOther(log)
if not entry.validate():
raise Exception("entry data validation failed!")
self._entries.append(entry)
def get_logs_by_ip(self, ipv4: str) -> List[SSHLogEntry]:
return [log for log in self._entries if log.ipv4() == ipv4]

View File

@ -0,0 +1,18 @@
from datetime import datetime
import re
USER_PATTERN = re.compile(r"^[a-z_][a-z0-9_-]{0,31}$")
class SSHUser:
user: str
last_login: datetime | None
def __repr__(self):
return f"SSHUser(user='{self.user}', last_login={self.last_login})"
def __init__(self, user: str):
self.user = user
self.last_login = None
def validate(self):
return USER_PATTERN.match(self.user) is not None

View File

@ -0,0 +1,126 @@
from SSHLogJournal import SSHLogJournal
from SSHUser import SSHUser
from SSHLogEntry import SSHError, SSHOther
logs = [
"Dec 10 06:55:46 LabSZ sshd[24200]: reverse mapping checking getaddrinfo for ns.marryaldkfaczcz.com [173.234.31.186] failed - POSSIBLE BREAK-IN ATTEMPT!",
"Dec 10 06:55:46 LabSZ sshd[24200]: Invalid user webmaster from 173.234.31.186",
"Dec 10 06:55:46 LabSZ sshd[24200]: input_userauth_request: invalid user webmaster [preauth]",
"Dec 10 06:55:46 LabSZ sshd[24200]: pam_unix(sshd:auth): check pass; user unknown",
"Dec 10 06:55:46 LabSZ sshd[24200]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=173.234.31.186 ",
"Dec 10 06:55:48 LabSZ sshd[24200]: Failed password for invalid user webmaster from 173.234.31.186 port 38926 ssh2",
"Dec 10 06:55:48 LabSZ sshd[24200]: Connection closed by 173.234.31.186 [preauth]",
"Dec 10 07:02:47 LabSZ sshd[24203]: Connection closed by 212.47.254.145 [preauth]",
"Dec 10 07:07:38 LabSZ sshd[24206]: Invalid user test9 from 52.80.34.196",
"Dec 10 07:07:38 LabSZ sshd[24206]: input_userauth_request: invalid user test9 [preauth]",
"Dec 10 07:07:38 LabSZ sshd[24206]: pam_unix(sshd:auth): check pass; user unknown",
"Dec 10 07:07:38 LabSZ sshd[24206]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=ec2-52-80-34-196.cn-north-1.compute.amazonaws.com.cn ",
"Dec 10 07:07:45 LabSZ sshd[24206]: Failed password for invalid user test9 from 52.80.34.196 port 36060 ssh2",
"Dec 10 07:07:45 LabSZ sshd[24206]: Received disconnect from 52.80.34.196: 11: Bye Bye [preauth]",
"Dec 10 07:08:28 LabSZ sshd[24208]: reverse mapping checking getaddrinfo for ns.marryaldkfaczcz.com [173.234.31.186] failed - POSSIBLE BREAK-IN ATTEMPT!",
"Dec 10 07:08:28 LabSZ sshd[24208]: Invalid user webmaster from 173.234.31.186",
"Dec 10 07:08:28 LabSZ sshd[24208]: input_userauth_request: invalid user webmaster [preauth]",
"Dec 10 07:08:28 LabSZ sshd[24208]: pam_unix(sshd:auth): check pass; user unknown",
"Dec 10 07:08:28 LabSZ sshd[24208]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=173.234.31.186 ",
"Dec 10 07:08:30 LabSZ sshd[24208]: Failed password for invalid user webmaster from 173.234.31.186 port 39257 ssh2",
"Dec 10 07:51:15 LabSZ sshd[24324]: error: Received disconnect from 195.154.37.122: 3: com.jcraft.jsch.JSchException: Auth fail [preauth]",
"Dec 10 09:32:20 LabSZ sshd[24680]: Accepted password for fztu from 119.137.62.142 port 49116 ssh2"
]
error_log = SSHError("Dec 10 07:51:15 LabSZ sshd[24324]: error: Received disconnect from 195.154.37.122: 3: com.jcraft.jsch.JSchException: Auth fail [preauth]")
standalone_log = SSHOther("Jan 7 17:07:14 LabSZ sshd[30222]: Received disconnect from 185.165.29.69: 11: Bye Bye [preauth]")
def main():
searched_ips = ["173.234.31.186", "119.137.62.142"]
journal = SSHLogJournal()
for log in logs:
journal.append(log)
# 3.
print(f"error log validate before changing = {error_log.validate()}")
error_log.hostname = "test1234"
print("error log hostname changed to test1234")
print(f"error log validate after changing = {error_log.validate()}")
print()
print(f"other log validate before changing = {standalone_log.validate()}")
standalone_log.hostname = "test1234"
print("other log hostname changed to test1234")
print(f"other log validate after changing = {standalone_log.validate()}")
print()
# 5
print("entries that have an ip:")
has_ip_entries = [log for log in journal if log.has_ip]
print(*has_ip_entries, sep="\n")
print()
# 6
print("__repr__ of entries:")
print(*[repr(log) for log in journal], sep="\n")
print()
magic_entry = journal.get_logs_by_ip(searched_ips[0])[3]
print(f"entries __lt__ {magic_entry}:")
print(*[log for log in journal if magic_entry < log], sep="\n")
print()
print(f"entries __gt__ {magic_entry}:")
print(*[log for log in journal if magic_entry > log], sep="\n")
print()
print(f"entries __eq__ {magic_entry}:")
print(*[log for log in journal if magic_entry == log], sep="\n")
print()
# SSHLogJournal
for searched_ip in searched_ips:
ip_entries = journal.get_logs_by_ip(searched_ip)
print(f"entries with ip {searched_ip}:")
print(*ip_entries, sep="\n")
print()
print("journal iter test:")
print(f"len of journal: {len(journal)}")
print(f"{repr(magic_entry)} in journal: {magic_entry in journal}")
print(f"standalone log {repr(standalone_log)} in journal: {standalone_log in journal}")
print()
print("error entries:")
failed_passwords = [log for log in journal if type(log) == SSHError]
print(*failed_passwords, sep="\n")
print()
# 7
users = [SSHUser("fztu"), SSHUser("root"), SSHUser("9test9"), SSHUser("-invalid")]
merged = journal.get_logs_by_ip(searched_ips[0]) + users
print("duck typing test: ")
for item in merged:
print(item)
if not item.validate():
print(f"validation failed for this item")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,83 @@
# a
def liczba(numbers):
return len([x for x in numbers if x % 2 == 0])
# b
def median(numbers):
sorted_numbers = sorted(numbers)
return (
0
if len(numbers) == 0
else
sorted_numbers[len(numbers) // 2]
if len(numbers) % 2 == 1
else (sorted_numbers[len(numbers) // 2 - 1] + sorted_numbers[len(numbers) // 2]) / 2
)
def pierwiastek(x, epsilon, guess = None):
guess = x / 2.0 if guess is None else guess
next_guess = (guess + x / guess) / 2.0
return (
next_guess
if abs(next_guess**2 - x) < epsilon
else pierwiastek(x, epsilon, next_guess)
)
# d
def make_alpha_dict(string: str):
return {
char: [word for word in string.split() if char in word] for char in filter(lambda x: x.isalpha(), string)
}
def flatten(lst):
return [
element for sublist in lst for element in (
flatten(sublist) if isinstance(sublist, (list, tuple)) else [sublist]
)
]
if __name__ == "__main__":
lists = [
[1,2,3,4],
[],
[4,4,5,4,4],
[3,3,1,3,2],
]
print("1.a")
for array in lists:
print(f"liczba liczb parzystych w {array}: {liczba(array)}")
print("1.b")
for array in lists:
print(f"mediana w {array}: {median(array)}")
print("1.c")
sqrt_list = [0.5, 2,3,10,25,50,100]
for x in sqrt_list:
print(f"pierwiastek z {x}: {pierwiastek(x, 0.01)}")
strings_list = [
"",
"test test test",
"ala ma kota",
"kluczami są znaki występujące alfabetyczne występujące ciągu"
]
print("1.d")
for string in strings_list:
print(f"make_alpha_dict \"{string}\": {make_alpha_dict(string)}")
flatten_lists = [
[],
[1,2,3],
[[1,2], [1,2]],
[[[[[1]]]]],
[1,2,3, [4], [[5], 6]]
]
print("1.e")
for l in flatten_lists:
print(f"flatten {l}: {flatten(l)}")

View File

@ -0,0 +1,42 @@
# a
def forall(pred, iterable):
return all(pred(x) for x in iterable)
def exists(pred, iterable):
return any(pred(x) for x in iterable)
def atleast(n, pred, iterable):
return sum(1 for x in iterable if pred(x)) >= n
def atmost(n, pred, iterable):
return sum(1 for x in iterable if pred(x)) <= n
if __name__ == "__main__":
lists = [
[],
[1],
[2,4,6,8],
[2,2,2,2,2],
[1,1,1,1,1],
[1,2,3,4,5],
[10,20,30],
]
print("2.a")
for l in lists:
print(f"{l} all even: {forall(lambda x: x % 2 == 0, l)}")
print("2.b")
for l in lists:
print(f"{l} exists value equal to 1: {exists(lambda x: x == 1, l)}")
print("2.c")
for l in lists:
print(f"{l} at least 2 even values: {atleast(2, lambda x: x % 2 == 0, l)}")
print("2.d")
for l in lists:
print(f"{l} at most 2 even values: {atmost(2, lambda x: x % 2 == 0, l)}")

View File

@ -0,0 +1,53 @@
import random
class PasswordGenerator:
def __init__(self, length, count, charset = list("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")):
if count < 0:
raise ValueError("Count must be non-negative")
if length < 0:
raise ValueError("Length must be non-negative")
if not charset:
raise ValueError("Charset must not be empty")
self.length = length
self.count = count
self.charset = charset
def __iter__(self):
return self
def __next__(self):
if self.count == 0:
raise StopIteration
self.count -= 1
return "".join(random.choices(self.charset, k=self.length))
if __name__ == "__main__":
generator = PasswordGenerator(10, count=10)
print("for loop")
for password in generator:
print(password)
print()
print("charset = 1234")
generator = PasswordGenerator(10, count=3, charset=list("1234"))
for password in generator:
print(password)
print()
generator = PasswordGenerator(10, count=2)
print("__next__(), generator count = 2")
print(generator.__next__())
print(generator.__next__())
print("what happens when you call __next__ too many times:")
# exception # b
print(generator.__next__())

View File

@ -0,0 +1,28 @@
def make_generator(f):
n = 1
while True:
yield f(n)
n += 1
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
if __name__ == "__main__":
# a
print("fibonacci 10 next terms")
fib_generator = make_generator(fibonacci)
for x in range(10):
print(next(fib_generator))
# b
print("square generator")
square_generator = make_generator(lambda x : x**2)
for x in range(10):
print(next(square_generator))
print("1 generator")
one_generator = make_generator(lambda x : 1)
for x in range(10):
print(next(one_generator))

View File

@ -0,0 +1,35 @@
from functools import cache
def make_generator(f):
n = 1
while True:
yield f(n)
n += 1
def make_generator_mem(f):
@cache
def cached_f(n):
return f(n)
return make_generator(f)
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
if __name__ == "__main__":
print("fibonacci 10 next terms")
fib_generator = make_generator_mem(fibonacci)
for x in range(10):
print(next(fib_generator))
print("square generator")
square_generator = make_generator_mem(lambda x : x**2)
for x in range(10):
print(next(square_generator))
print("ones generator")
one_generator = make_generator_mem(lambda x : 1)
for x in range(10):
print(next(one_generator))

View File

@ -0,0 +1,54 @@
import logging
from functools import wraps
from time import time
def log(level):
def decorator(func_or_class):
logging.basicConfig(level=level)
if isinstance(func_or_class, type):
@wraps(func_or_class)
def wrapper_class(*args, **kwargs):
logging.log(
level, f"Creating instance of class {func_or_class.__name__}"
)
return func_or_class(*args, **kwargs)
return wrapper_class
else:
@wraps(func_or_class)
def wrapper(*args, **kwargs):
start_time = time()
result = func_or_class(*args, **kwargs)
duration = time() - start_time
logging.log(
level,
f"Invoked {func_or_class.__name__} with arguments {args}, {kwargs}. "
f"elapsed: {duration:.4f}s, return value: {result}",
)
return result
return wrapper
return decorator
@log(logging.DEBUG)
def example_function(x, y):
return x + y
@log(logging.INFO)
class ExampleClass:
def __init__(self, value):
self.value = value
if __name__ == "__main__":
example_function(1, 2)
ExampleClass(42)
example_function(3, 4)
ExampleClass(1337)
example_function(5, 6)
ExampleClass(9001)

View File

@ -0,0 +1,249 @@
from datetime import datetime, timedelta
import sys
from PyQt6.QtWidgets import (
QApplication,
QWidget,
QVBoxLayout,
QDateEdit,
QGridLayout,
QHBoxLayout,
QPushButton,
QLabel,
QListWidget,
QListWidgetItem,
QFrame,
QFileDialog,
QMessageBox
)
from parser import ApacheLogEntry
DATA_ROLE = 1001
LINES_PER_PAGE = 100
LABELS = [
"Host",
"Date",
"Time",
"Timezone",
"Status code",
"Method",
"Path",
"Size",
]
class LogViewer(QWidget):
def __init__(self):
self.value_labels = {}
super().__init__()
self.init()
self.current_page = 0
self.total_pages = 0
self.file_lines = []
def init(self):
self.setWindowTitle("Log Viewer")
self.setGeometry(100, 100, 900, 500)
mainLayout = QVBoxLayout()
topLayout = QHBoxLayout()
self.pathLabel = QLabel("Select working file...")
self.openButton = QPushButton("Open")
self.openButton.clicked.connect(self.open_file_dialog)
topLayout.addWidget(self.pathLabel)
topLayout.addWidget(self.openButton)
filterLayout = QHBoxLayout()
self.fromDateEdit = QDateEdit()
self.fromDateEdit.setDate(datetime.now() - timedelta(days=7))
self.toDateEdit = QDateEdit()
self.toDateEdit.setDate(datetime.now())
self.filterButton = QPushButton("Filter")
self.filterButton.clicked.connect(self.apply_date_filter)
filterLayout.addWidget(QLabel("From"))
filterLayout.addWidget(self.fromDateEdit)
filterLayout.addWidget(QLabel("To"))
filterLayout.addWidget(self.toDateEdit)
filterLayout.addWidget(self.filterButton)
middleLayout = QHBoxLayout()
self.logListWidget = QListWidget()
self.logListWidget.itemClicked.connect(self.select_log)
middleLayout.addWidget(self.logListWidget, 2)
self.detailsFrame = QFrame()
detailsLayout = QVBoxLayout(self.detailsFrame)
self.create_details_widget(detailsLayout)
middleLayout.addWidget(self.detailsFrame, 2)
bottomLayout = QHBoxLayout()
self.prevButton = QPushButton("Previous")
self.nextButton = QPushButton("Next")
self.prevButton.clicked.connect(self.show_previous_page)
self.nextButton.clicked.connect(self.show_next_page)
self.pageLabel = QLabel("")
bottomLayout.addWidget(self.prevButton)
bottomLayout.addStretch()
bottomLayout.addWidget(self.pageLabel)
bottomLayout.addStretch()
bottomLayout.addWidget(self.nextButton)
mainLayout.addLayout(topLayout)
mainLayout.addLayout(filterLayout)
mainLayout.addLayout(middleLayout)
mainLayout.addLayout(bottomLayout)
self.setLayout(mainLayout)
def reset_ui(self):
self.pathLabel.setText("Select working file...")
self.prevButton.setEnabled(False)
self.nextButton.setEnabled(True)
self.pageLabel.setText("")
self.logListWidget.clear()
for label in LABELS:
self.value_labels[label].setText("")
def apply_date_filter(self):
from_date = self.fromDateEdit.date().toPyDate()
to_date = self.toDateEdit.date().toPyDate()
if from_date > to_date:
self.show_error("Date 'From' must not be after date 'To'")
self.filtered_lines = []
for line in self.file_lines:
log = ApacheLogEntry.from_log(line)
if not log:
self.show_parse_error(line)
continue
date = datetime.strptime(log.timestamp.strftime("%Y-%m-%d"), "%Y-%m-%d").date()
if from_date <= date:
if date <= to_date:
self.filtered_lines.append(line)
else:
break
self.total_pages = (len(self.filtered_lines) - 1) // LINES_PER_PAGE + 1
self.current_page = 0
self.update_list()
def select_log(self, item):
log = item.data(DATA_ROLE)
self.value_labels["Host"].setText(log.host)
self.value_labels["Date"].setText(log.timestamp.strftime("%Y-%m-%d"))
self.value_labels["Time"].setText(log.timestamp.strftime("%H:%M:%S"))
self.value_labels["Timezone"].setText(log.timestamp.strftime("%z"))
self.value_labels["Status code"].setText(str(log.status_code))
self.value_labels["Method"].setText(log.method)
self.value_labels["Path"].setText(log.path)
self.value_labels["Size"].setText(str(log.bytes_sent or "-"))
def create_details_widget(self, layout):
gridLayout = QGridLayout()
for index, label in enumerate(LABELS):
labelWidget = QLabel(f"{label}:")
valueLabel = QLabel()
gridLayout.addWidget(labelWidget, index, 0)
gridLayout.addWidget(valueLabel, index, 1)
self.value_labels[label] = valueLabel
layout.addLayout(gridLayout)
def open_file_dialog(self):
fileName, _ = QFileDialog.getOpenFileName(
self,
"Open Log File",
"",
"All Files (*)",
)
if fileName:
self.pathLabel.setText(fileName)
self.load_log_file(fileName)
def load_log_file(self, file_path):
self.file_lines = []
self.reset_ui()
try:
with open(file_path, "r") as file:
self.file_lines = file.readlines()
self.filtered_lines = self.file_lines
except:
self.show_error(f"could not open file {file_path}")
return
self.total_pages = (len(self.file_lines) - 1) // LINES_PER_PAGE + 1
if self.total_pages == 1:
self.nextButton.setEnabled(False)
self.current_page = 0
self.update_list()
def update_list(self):
self.logListWidget.clear()
start_index = self.current_page * LINES_PER_PAGE
end_index = start_index + LINES_PER_PAGE
self.pageLabel.setText(f"{self.current_page + 1} / {max(1, self.total_pages)}")
for line in self.filtered_lines[start_index:end_index]:
parsed_data = ApacheLogEntry.from_log(line.strip())
if not parsed_data:
self.show_parse_error(line)
continue
item = QListWidgetItem(line.strip())
item.setData(DATA_ROLE, parsed_data)
self.logListWidget.addItem(item)
def show_next_page(self):
if self.current_page < self.total_pages - 1:
self.current_page += 1
self.update_list()
self.prevButton.setEnabled(True)
if self.current_page == self.total_pages - 1:
self.nextButton.setEnabled(False)
def show_previous_page(self):
if self.current_page > 0:
self.current_page -= 1
self.update_list()
self.nextButton.setEnabled(True)
if self.current_page == 0:
self.prevButton.setEnabled(False)
def show_error(self, text):
message_box = QMessageBox()
message_box.setIcon(QMessageBox.Icon.Critical)
message_box.setText("Error")
message_box.setWindowTitle("Error")
message_box.setInformativeText(text)
message_box.exec()
def show_parse_error(self, line):
self.show_error(f"Failed to parse data from line: {line}, ignoring.")
def main():
app = QApplication(sys.argv)
ex = LogViewer()
ex.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

View File

@ -0,0 +1,40 @@
import re
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import locale
LOG_PATTERN = re.compile(
r'(?P<host>\S+) - - \[(?P<time>.+)\] "(?P<request>.*)" (?P<status_code>\d+) (?P<bytes_sent>\d+|-)'
)
@dataclass
class ApacheLogEntry:
host: str
timestamp: datetime
method: str
path: str
status_code: int
bytes_sent: Optional[int]
original_line: str
@classmethod
def from_log(cls, log_string):
match = LOG_PATTERN.match(log_string.strip())
if not match:
return None
try:
locale.setlocale(locale.LC_ALL, "en_US.UTF-8")
request = match.group("request").split(" ")
host = match.group('host')
timestamp = datetime.strptime(match.group("time"), "%d/%b/%Y:%H:%M:%S %z")
method = request[0]
path = request[1]
status_code = int(match.group('status_code'))
bytes_sent = int(match.group('bytes_sent')) if match.group('bytes_sent') != '-' else None
except:
return None
return cls(host, timestamp, method, path, status_code, bytes_sent, log_string)

View File

@ -0,0 +1,128 @@
from datetime import datetime
from abc import ABC, abstractmethod
import re
from ipaddress import IPv4Address
class SSHLogEntry(ABC):
timestamp: str
timestamp_datetime: datetime
hostname: str | None
pid: int
message: str
_raw_entry: str
_line_pattern = re.compile(r"(?P<timestamp>\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2})\s(?P<hostname>\S+)\ssshd\[(?P<pid>\d+)\]:\s(?P<message>.+)")
_ipv4_pattern = re.compile(r'(?P<address>(?:[0-9]{1,3}\.){3}[0-9]{1,3})')
_user_pattern = re.compile(r"([Ii]nvalid user |Failed password for(?: invalid user)? |Too many authentication failures for |Accepted password for |user[= ])(?P<user>\w+)")
def __init__(self, line: str) -> None:
data = self._line_pattern.match(line)
if data is None:
raise Exception(f"invalid data: {data}")
timestamp = data.group("timestamp")
self.timestamp = timestamp
self.hostname = data.group("hostname")
self.pid = int(data.group("pid"))
self.message = data.group("message")
self._raw_entry = line.strip()
dt = datetime.strptime(self.timestamp, "%b %d %H:%M:%S")
self.timestamp_datetime = dt
def __str__(self) -> str:
return self._raw_entry
def ipv4(self) -> IPv4Address | None:
addresses = self._ipv4_pattern.findall(self.message)
if len(addresses) == 0:
return None
return IPv4Address(addresses[0])
@abstractmethod
def validate(self) -> bool:
raise NotImplementedError()
@property
def has_ip(self) -> bool:
return self.ipv4() is not None
def __repr__(self) -> str:
return (f"SSHLogEntry(timestamp='{self.timestamp}', hostname={self.hostname}, pid={self.pid}, message='{self.message}')")
def __eq__(self, other: object) -> bool:
if not isinstance(other, SSHLogEntry):
return False
return self._raw_entry == other._raw_entry
def __lt__(self, other: object) -> bool:
if not isinstance(other, SSHLogEntry):
return False
return self.timestamp_datetime < other.timestamp_datetime
def __gt__(self, other: object) -> bool:
if not isinstance(other, SSHLogEntry):
return False
return self.timestamp_datetime > other.timestamp_datetime
class SSHRejectedPassword(SSHLogEntry):
user: str
def __init__(self, line: str) -> None:
super().__init__(line)
user = self._user_pattern.search(self.message)
if user is None:
raise Exception(f"invalid data: {line}")
self.user = user.group("user")
def validate(self) -> bool:
match = self._line_pattern.match(self._raw_entry)
user = self._user_pattern.search(self.message)
return (match is not None and user is not None
and self.timestamp == match.group("timestamp")
and self.hostname == match.group("hostname")
and self.pid == int(match.group("pid"))
and self.message == match.group("message")
and self.user == user.group("user")
)
class SSHAcceptedPassword(SSHLogEntry):
user: str
def __init__(self, line: str) -> None:
super().__init__(line)
user = self._user_pattern.search(self.message)
if user is None:
raise Exception(f"invalid data: {line}")
self.user = user.group("user")
def validate(self) -> bool:
match = self._line_pattern.match(self._raw_entry)
user = self._user_pattern.search(self.message)
return (match is not None and user is not None
and self.timestamp == match.group("timestamp")
and self.hostname == match.group("hostname")
and self.pid == int(match.group("pid"))
and self.message == match.group("message")
and self.user == user.group("user")
)
class SSHError(SSHLogEntry):
def __init__(self, line: str) -> None:
super().__init__(line)
def validate(self) -> bool:
match = self._line_pattern.match(self._raw_entry)
return (match is not None
and self.timestamp == match.group("timestamp")
and self.hostname == match.group("hostname")
and self.pid == int(match.group("pid"))
and self.message == match.group("message")
)
class SSHOther(SSHLogEntry):
def __init__(self, line: str) -> None:
super().__init__(line)
def validate(self) -> bool:
return True

View File

@ -0,0 +1,45 @@
from SSHLogEntry import (
SSHRejectedPassword,
SSHAcceptedPassword,
SSHError,
SSHOther,
SSHLogEntry
)
from typing import List, Iterator
from ipaddress import IPv4Address
class SSHLogJournal:
_entries: List[SSHLogEntry]
def __init__(self) -> None:
self._entries = []
def __len__(self) -> int:
return len(self._entries)
def __iter__(self) -> Iterator[SSHLogEntry]:
return iter(self._entries)
def __contains__(self, value: SSHLogEntry) -> bool:
return value in self._entries
def append(self, log: str) -> None:
entry: SSHLogEntry
if "Failed password for invalid user " in log:
entry = SSHRejectedPassword(log)
elif "Accepted password for " in log:
entry = SSHAcceptedPassword(log)
elif "error: " in log:
entry = SSHError(log)
else:
entry = SSHOther(log)
if not entry.validate():
raise Exception("entry data validation failed!")
self._entries.append(entry)
def get_logs_by_ip(self, ipv4: IPv4Address) -> List[SSHLogEntry]:
return [log for log in self._entries if log.ipv4() == ipv4]

View File

@ -0,0 +1,71 @@
import pytest
from datetime import datetime
from SSHLogEntry import SSHLogEntry, SSHRejectedPassword, SSHOther, SSHAcceptedPassword, SSHError
from SSHLogJournal import SSHLogJournal
from ipaddress import IPv4Address, AddressValueError
def test_extract_timestamp_rejected_password():
entry = SSHRejectedPassword("Dec 10 07:08:30 LabSZ sshd[24208]: Failed password for invalid user webmaster from 173.234.31.186 port 39257 ssh2")
assert entry.timestamp_datetime == datetime.strptime("Dec 10 07:08:30", '%b %d %H:%M:%S')
def test_extract_timestamp_other():
entry = SSHOther("Dec 10 07:08:28 LabSZ sshd[24208]: input_userauth_request: invalid user webmaster [preauth]")
assert entry.timestamp_datetime == datetime.strptime("Dec 10 07:08:28", '%b %d %H:%M:%S')
def test_extract_timestamp_accepted_password():
entry = SSHAcceptedPassword("Dec 10 09:32:20 LabSZ sshd[24680]: Accepted password for fztu from 119.137.62.142 port 49116 ssh2")
assert entry.timestamp_datetime == datetime.strptime("Dec 10 09:32:20", '%b %d %H:%M:%S')
def test_extract_timestamp_error():
entry = SSHError("Dec 10 07:51:15 LabSZ sshd[24324]: error: Received disconnect from 195.154.37.122: 3: com.jcraft.jsch.JSchException: Auth fail [preauth]")
assert entry.timestamp_datetime == datetime.strptime("Dec 10 07:51:15", '%b %d %H:%M:%S')
def test_extract_timestamp_invalid_month():
with pytest.raises(ValueError) as exc_info:
entry = SSHRejectedPassword("Dce 10 07:08:30 LabSZ sshd[24208]: Failed password for invalid user webmaster from 173.234.31.186 port 39257 ssh2")
def test_extract_timestamp_invalid_day():
with pytest.raises(ValueError) as exc_info:
entry = SSHRejectedPassword("Dec 32 07:08:30 LabSZ sshd[24208]: Failed password for invalid user webmaster from 173.234.31.186 port 39257 ssh2")
def test_extract_timestamp_invalid_hour():
with pytest.raises(ValueError) as exc_info:
entry = SSHRejectedPassword("Dec 10 25:08:30 LabSZ sshd[24208]: Failed password for invalid user webmaster from 173.234.31.186 port 39257 ssh2")
def test_extract_timestamp_invalid_minute():
with pytest.raises(ValueError) as exc_info:
entry = SSHRejectedPassword("Dec 10 07:60:30 LabSZ sshd[24208]: Failed password for invalid user webmaster from 173.234.31.186 port 39257 ssh2")
def test_extract_timestamp_invalid_second():
with pytest.raises(ValueError) as exc_info:
entry = SSHRejectedPassword("Dec 10 07:08:60 LabSZ sshd[24208]: Failed password for invalid user webmaster from 173.234.31.186 port 39257 ssh2")
def test_extract_ipv4_correct():
entry = SSHRejectedPassword("Dec 10 06:55:48 LabSZ sshd[24200]: Failed password for invalid user webmaster from 173.234.31.186 port 38926 ssh2")
assert entry.ipv4() == IPv4Address("173.234.31.186")
def test_extract_ipv4_incorrect():
with pytest.raises(AddressValueError) as exc_info:
entry = SSHOther("Dec 10 06:55:48 LabSZ sshd[24200]: Failed password for invalid user webma ster from 666.777.88.213 port 38926 ssh2")
entry.ipv4()
def test_extract_ipv4_empty():
entry = SSHAcceptedPassword("Dec 10 07:07:38 LabSZ sshd[24206]: input_userauth_request: invalid user test9 [preauth]")
assert entry.ipv4() is None
@pytest.mark.parametrize("entry, expected_type", [
("Dec 10 07:08:30 LabSZ sshd[24208]: Failed password for invalid user webmaster from 173.234.31.186 port 39257 ssh2", SSHRejectedPassword),
("Dec 10 09:32:20 LabSZ sshd[24680]: Accepted password for fztu from 119.137.62.142 port 49116 ssh2", SSHAcceptedPassword),
("Dec 10 07:51:15 LabSZ sshd[24324]: error: Received disconnect from 195.154.37.122: 3: com.jcraft.jsch.JSchException: Auth fail [preauth]", SSHError),
("Dec 10 07:08:28 LabSZ sshd[24208]: input_userauth_request: invalid user webmaster [preauth]", SSHOther)
])
def test_journal_append(entry, expected_type):
journal = SSHLogJournal()
journal.append(entry)
assert isinstance(journal._entries[0], expected_type)