"""
This file contains a custom logger with different print levels and specific formatting.
It also contains a progress bar function that can be used to show progress of a loop.
The decorator_provide_time_information can be used to wrap a function and provide time
together with a starting and finished message, that works congruently with the logger.
Example usage:
logger = CustomLogger("test_logger", "logs")
logger.info("This is an info message", print_level=3)
logger.starting("This is a starting message", print_level=3)
logger.finished("This is a finished message", print_level=3)
Created by Jelle Bonthuis on 2025-03-24
"""
import atexit
import cProfile
import io
import pstats
import re
import tracemalloc
from collections import defaultdict
from datetime import datetime
from functools import partial, wraps
from inspect import getsourcelines, stack
from logging import (
CRITICAL,
DEBUG,
ERROR,
INFO,
WARNING,
FileHandler,
Filter,
Formatter,
Logger,
StreamHandler,
addLevelName,
getLogger,
)
from os import getpid
from pathlib import Path
from time import perf_counter
from typing import Any, Callable, Optional, cast
import line_profiler
import pandas as pd
import psutil
from memory_profiler import memory_usage
# todo add function nesting for color adjustments (so starting and finished will
# have different colors depending on what level they are called from)
DEFAULT_DECORATOR_PRINT_LEVEL = 4
DEFAULT_DECORATOR_TIME_DECIMALS = 4
DEFAULT_BACKUP_LOGGER_PRINT_LEVEL = 3
[docs]
def parse_log_file(log_path: str | Path) -> pd.DataFrame: # noqa: C901
"""Generated: validation needed.
Description:
Parse a log file written by :class:`CustomLogger` and return a DataFrame
pairing each STARTING entry with its corresponding FINISHED entry.
Args:
log_path (str | Path): Path to the ``.log`` file to parse.
Returns:
pd.DataFrame: DataFrame with columns ``function``, ``file``, ``start_time``,
``end_time``, ``duration``, ``lineno``, and ``calls``.
"""
pattern_start = re.compile(r"STARTING - Starting: (.*?) \((.*?):(\d+)\)")
pattern_finish = re.compile(
r"FINISHED - Finished: (.*?) in: ([\d.]+) seconds \((.*?):(\d+)\)"
)
calls = []
call_counts = defaultdict(int)
with open(log_path, "r") as f:
for line in f:
timestamp_str = line.split(" - ")[0].strip()
timestamp_str = timestamp_str[
: timestamp_str.rfind(",")
].strip() # Extract timestamp part
try:
timestamp = datetime.strptime(
timestamp_str,
"%Y-%m-%d %H:%M:%S",
)
except ValueError:
continue # skip malformed lines
if "STARTING - Starting" in line:
match = pattern_start.search(line)
if match:
func, file, lineno = match.groups()
calls.append(
{
"function": func,
"file": file,
"lineno": int(lineno),
"timestamp": timestamp,
"type": "start",
}
)
call_counts[func] += 1
elif "FINISHED - Finished" in line:
match = pattern_finish.search(line)
if match:
func, duration, file, lineno = match.groups()
calls.append(
{
"function": func,
"file": file,
"lineno": int(lineno),
"timestamp": timestamp,
"type": "end",
"duration": float(duration),
}
)
# Combine start and end pairs
stack = []
data = []
for call in calls:
if call["type"] == "start":
stack.append(call)
elif call["type"] == "end":
for i in range(len(stack) - 1, -1, -1):
if stack[i]["function"] == call["function"]:
start_call = stack.pop(i)
data.append(
{
"function": call["function"],
"file": call["file"],
"start_time": start_call["timestamp"],
"end_time": call["timestamp"],
"duration": call["duration"],
"lineno": call["lineno"],
"calls": call_counts[call["function"]],
}
)
break
df = pd.DataFrame(data)
return df
[docs]
class CustomLogger:
VALID_LEVEL = 25
HIGH_DETAIL_LEVEL = 24
LOW_DETAIL_LEVEL = 23
FINISHED_LEVEL = 22
STARTING_LEVEL = 21
def __init__(
self,
name: str,
log_files_location: str | Path | None = None,
print_level: int = 2,
auto_parse: bool = False,
):
"""Generated: validation needed.
Description:
Initialise logger, attach console and file handlers, optionally
register an atexit log-parser.
Args:
name (str): Logger name; also used as log filename stem.
log_files_location (str | Path | None): Directory for log files.
Defaults to ``utils/logs/`` when None.
print_level (int): Maximum print-level passed through to console handler.
auto_parse (bool): When True register :meth:`_run_log_parser` at exit.
"""
if log_files_location is None:
log_files_location_path = Path(__file__).resolve().parent / "logs"
else:
log_files_location_path = Path(log_files_location)
self.logger: Logger = getLogger(name)
self.logger.setLevel(DEBUG)
self.logger.propagate = False
self.print_level = print_level
self.filter = PrintLevelFilter(print_level)
addLevelName(CustomLogger.VALID_LEVEL, "VALID")
addLevelName(CustomLogger.FINISHED_LEVEL, "FINISHED")
addLevelName(CustomLogger.STARTING_LEVEL, "STARTING")
# Keep runtime compatibility for code using `custom_logger.logger.starting(...)`.
logger_with_custom_levels = cast(Any, self.logger)
logger_with_custom_levels.valid = self.valid
logger_with_custom_levels.starting = self.starting
logger_with_custom_levels.finished = self.finished
# todo add extra ones that could be used
console_handler = StreamHandler()
console_handler.setLevel(DEBUG)
console_handler.setFormatter(CustomFormatter())
console_handler.addFilter(self.filter)
self.logger.addHandler(console_handler)
log_filename = name + ".log"
log_files_location_path.mkdir(parents=True, exist_ok=True)
log_file_path = log_files_location_path / log_filename
file_handler = FileHandler(log_file_path, mode="w")
file_handler.setFormatter(
Formatter("%(asctime)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)")
)
self.logger.addHandler(file_handler)
self.log_file_path = log_file_path
self.auto_parse = auto_parse
if self.auto_parse:
atexit.register(self._run_log_parser)
[docs]
def set_log_files_location(self, log_files_location: str, mode: str = "w") -> None:
"""Generated: validation needed.
Description:
Rebind the file handler to a new directory while preserving console logging.
Args:
log_files_location (str): New directory path for the log file.
mode (str): File open mode passed to :class:`~logging.FileHandler`.
"""
log_files_location_path = Path(log_files_location)
log_files_location_path.mkdir(parents=True, exist_ok=True)
log_filename = (
self.log_file_path.name if hasattr(self, "log_file_path") else "run.log"
)
log_file_path = log_files_location_path / log_filename
# Remove existing file handlers only; keep stream handlers and filters intact.
handlers = list(self.logger.handlers)
for handler in handlers:
if isinstance(handler, FileHandler):
self.logger.removeHandler(handler)
try:
handler.close()
except Exception:
pass
file_handler = FileHandler(log_file_path, mode=mode)
file_handler.setFormatter(
Formatter("%(asctime)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)")
)
self.logger.addHandler(file_handler)
self.log_file_path = log_file_path
[docs]
@staticmethod
def fix_non_ascii_messages_decorator(func: Callable) -> Callable:
"""Generated: validation needed.
Description:
Decorator that sanitises log messages to cp1252-safe strings before
passing them to the wrapped logging method.
Args:
func (Callable): Logging method to wrap.
Returns:
Callable: Wrapped method with message sanitisation.
"""
@wraps(func)
def wrapper(self, message, *args, **kwargs):
if not isinstance(message, str):
try:
message = str(message)
except Exception as e:
print(e)
message = repr(message)
# Step 2: sanitize for cp1252
try:
message = message.encode("cp1252", errors="replace").decode("cp1252")
except Exception as e:
print(e)
message = message.encode("ascii", errors="replace").decode("ascii")
return func(self, message, *args, **kwargs)
return wrapper
[docs]
def process_stack(self, stack_: list) -> tuple[str, int]:
"""Generated: validation needed.
Description:
Walk the call stack and return filename and line number of the first
frame outside this module.
Args:
stack_ (list): Frame list from :func:`inspect.stack`.
Returns:
tuple[str, int]: Caller filename (basename) and line number.
Raises:
ValueError: When no caller frame outside this module is found.
"""
caller_frame = None
for frame in stack_:
name = __name__.replace(".", "\\")
if name not in frame.filename:
caller_frame = frame
break
if caller_frame is None:
raise ValueError("No caller frame found")
lineno = caller_frame.lineno
filename = caller_frame.filename
filename = filename.split("/")[-1]
filename = filename.split("\\")[-1]
return filename, lineno
@fix_non_ascii_messages_decorator
def info(self, message, print_level=2, *args, **kwargs):
stack_ = stack()
filename, lineno = self.process_stack(stack_)
self.logger.info(
message,
extra={
"print_level": print_level,
"custom_filename": filename,
"custom_lineno": lineno,
},
)
@fix_non_ascii_messages_decorator
def debug(self, message, print_level=3, *args, **kwargs):
stack_ = stack()
filename, lineno = self.process_stack(stack_)
self.logger.debug(
message,
extra={
"print_level": print_level,
"custom_filename": filename,
"custom_lineno": lineno,
},
)
@fix_non_ascii_messages_decorator
def warning(self, message, print_level=2, *args, **kwargs):
stack_ = stack()
filename, lineno = self.process_stack(stack_)
self.logger.warning(
message,
extra={
"print_level": print_level,
"custom_filename": filename,
"custom_lineno": lineno,
},
)
@fix_non_ascii_messages_decorator
def error(self, message, print_level=1, *args, **kwargs):
stack_ = stack()
filename, lineno = self.process_stack(stack_)
self.logger.error(
message,
extra={
"print_level": print_level,
"custom_filename": filename,
"custom_lineno": lineno,
},
)
@fix_non_ascii_messages_decorator
def critical(self, message, print_level=0, *args, **kwargs):
stack_ = stack()
filename, lineno = self.process_stack(stack_)
self.logger.critical(
message,
extra={
"print_level": print_level,
"custom_filename": filename,
"custom_lineno": lineno,
},
)
[docs]
def set_print_level(self, level: int) -> None:
"""Generated: validation needed.
Description:
Change the console handler print level dynamically.
Args:
level (int): New maximum print level to pass through to console.
"""
self.print_level = level
self.filter.print_level = level
# def log(self, level, message, print_level=2):
# """Log messages with an additional print_level argument"""
# self.logger.log(level, message, extra={"print_level": print_level})
@fix_non_ascii_messages_decorator
def starting(self, message, print_level=4, *args, **kwargs):
if self.logger.isEnabledFor(CustomLogger.STARTING_LEVEL):
stack_ = stack()
filename, lineno = self.process_stack(stack_)
extra_new = {"custom_filename": filename, "print_level": print_level}
# merge with new from args if it exists
extra_new.update(kwargs.get("extra", {}))
self.logger._log(
CustomLogger.STARTING_LEVEL,
message,
args,
extra=extra_new,
)
@fix_non_ascii_messages_decorator
def finished(self, message, print_level=3, *args, **kwargs):
if self.logger.isEnabledFor(CustomLogger.FINISHED_LEVEL):
stack_ = stack()
filename, lineno = self.process_stack(stack_)
self.logger._log(
CustomLogger.FINISHED_LEVEL,
message,
args,
extra={
"custom_lineno": lineno,
"custom_filename": filename,
"print_level": print_level,
},
)
@fix_non_ascii_messages_decorator
def valid(self, message, print_level=3, *args, **kwargs):
if self.logger.isEnabledFor(CustomLogger.VALID_LEVEL):
stack_ = stack()
filename, lineno = self.process_stack(stack_)
self.logger._log(
CustomLogger.VALID_LEVEL,
message,
args,
extra={
"custom_lineno": lineno,
"custom_filename": filename,
"print_level": print_level,
},
)
def _run_log_parser(self):
df = parse_log_file(self.log_file_path)
parsed_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_parsed.csv")
df.to_csv(parsed_path, index=False)
if self.print_level <= 1:
print(f"[Logger] Parsed log file saved to: {parsed_path}")
[docs]
def progressBar(
iterable,
prefix="",
suffix="",
decimals=1,
length=100,
fill="█",
printEnd="\n",
start_time=None,
current_print_level=2,
print_level=2,
):
"""
Taken from https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console
Then modified by Jelle Bonthuis on 2025-01-23 to include time elapsed in the suffix
Further modified by Jelle Bonthuis on 2025-03-24 to include print levels that
works congruently with the CustomLogger class
Call in a loop to create terminal progress bar
@params:
iterable - Required : iterable object (Iterable)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
start_time - Optional : start time of the process (Float)
current_print_level - Optional : current print level of the logger (Int)
print_level - Optional : print level of the logger (Int)
"""
total = len(iterable)
if total == 0:
return
# Progress Bar Printing Function
def printProgressBar(iteration, suffix_ori=suffix, start_time=start_time):
suffix = suffix_ori
if start_time is not None:
time_elapsed = perf_counter() - start_time
time_elapsed = round(time_elapsed, decimals)
suffix = f"{suffix_ori} Time elapsed: {time_elapsed}s "
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + "-" * (length - filledLength)
print(f"{prefix} |{bar}| {percent}% {suffix} ", end=printEnd)
# Initial Call
if int(print_level) <= int(current_print_level):
printProgressBar(0, suffix_ori=suffix, start_time=start_time)
# Update Progress Bar
for i, item in enumerate(iterable):
yield item
if int(print_level) <= int(current_print_level):
printProgressBar(i + 1, suffix_ori=suffix, start_time=start_time)
# Print New Line on Complete
# print(f"{prefix} |{'█' * length}| 100% {suffix} ", end = printEnd)
print()
[docs]
class PrintLevelFilter(Filter):
def __init__(self, print_level):
super().__init__()
self.print_level = int(print_level)
[docs]
def filter(self, record):
record_print_level = getattr(record, "print_level", 2)
return record_print_level <= self.print_level
_backup_logger = CustomLogger(
"backup_logger",
Path(__file__).resolve().parent / "logs",
print_level=DEFAULT_BACKUP_LOGGER_PRINT_LEVEL,
auto_parse=True,
)
[docs]
def profile_time(func: Callable) -> Callable:
"""Generated: validation needed.
Description:
Decorator that profiles a function using both cProfile and line_profiler,
printing timing statistics to stdout.
Args:
func (Callable): Function to profile.
Returns:
Callable: Wrapped function that prints timing stats on each call.
"""
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
func_name = cast(Any, func).__name__
print(f"\n--- Profiling (Time) for {func_name} ---")
# Line profiler
profiler = line_profiler.LineProfiler()
profiler.add_function(func)
# CProfile
cprof = cProfile.Profile()
cprof.enable()
profiler.enable_by_count()
result = profiler(func)(*args, **kwargs)
profiler.disable_by_count()
cprof.disable()
# Print cProfile stats
print("\n[cProfile Stats]")
s = io.StringIO()
ps = pstats.Stats(cprof, stream=s).sort_stats("cumtime")
ps.print_stats(20)
print(s.getvalue())
# Print line_profiler stats
print("[line_profiler Stats]")
profiler.print_stats()
return result
return wrapper
[docs]
def profile_memory_full(func: Callable) -> Callable:
"""Generated: validation needed.
Description:
Decorator that profiles memory usage of a function using tracemalloc,
psutil, and memory_profiler, printing a full memory report to stdout.
Args:
func (Callable): Function to profile.
Returns:
Callable: Wrapped function that prints memory stats on each call.
"""
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
func_name = cast(Any, func).__name__
print(f"\n--- Profiling (Memory) for {func_name} ---")
start_time = perf_counter()
# Start tracemalloc
tracemalloc.start()
snapshot_before = tracemalloc.take_snapshot()
# Start psutil measurement
process = psutil.Process(getpid())
mem_before = process.memory_info().rss / (1024**2)
# Run with memory_profiler
def _target():
return func(*args, **kwargs)
mem_usage, result = memory_usage(
(_target,),
retval=True,
interval=0.05,
timeout=None,
)
# End tracemalloc
snapshot_after = tracemalloc.take_snapshot()
top_stats = snapshot_after.compare_to(snapshot_before, "lineno")
# End psutil measurement
mem_after = process.memory_info().rss / (1024**2)
print(f"\n[memory_profiler] Peak memory usage: {max(mem_usage):.2f} MiB")
print(
f"[psutil] Start: {mem_before:.2f} MiB → End:"
f" {mem_after:.2f} MiB (Δ {(mem_after - mem_before):.2f} MiB)"
)
print(f"[Execution time] {perf_counter() - start_time:.4f} seconds")
print("\n[tracemalloc top 10 differences]")
for stat in top_stats[:10]:
print(stat)
tracemalloc.stop()
return result
return wrapper
[docs]
@profile_memory_full
def memory_checker(func, *args, **kwargs):
return func(*args, **kwargs)
[docs]
@profile_time
def time_checker(func, *args, **kwargs):
return func(*args, **kwargs)