from abc import ABC, abstractmethod
from datetime import datetime
from queue import Empty, Queue
from threading import Thread, Event
class LoggerBase(ABC):
@abstractmethod
def log(self, source, message, task_id=None):
pass
@abstractmethod
def stop(self):
pass
class ThreadLoggerBase(LoggerBase):
def __init__(self, log_file):
self.log_file = log_file
self.queue = Queue()
self.stop_event = Event()
self.thread = Thread(target=self.worker, daemon=False)
self.thread.start()
@abstractmethod
def worker(self):
pass
def stop(self):
self.stop_event.set()
self.queue.join()
self.thread.join()
class FileLogger(ThreadLoggerBase):
def __init__(self, log_file="app.log"):
super().__init__(log_file)
def log(self, source, message, task_id=None):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if task_id is not None:
line = f"[{timestamp}] [{source}] [task_id={task_id}] {message}"
else:
line = f"[{timestamp}] [{source}] {message}"
self.queue.put(line)
def worker(self):
with open(self.log_file, "a", encoding="utf-8") as f:
while not self.stop_event.is_set() or not self.queue.empty():
try:
line = self.queue.get(timeout=0.1)
except Empty:
continue
try:
f.write(line + "\n")
finally:
self.queue.task_done()