from typing import List, Dict from datetime import datetime from dataclasses import dataclass from threading import Lock @dataclass class Event: timestamp: datetime data: str @dataclass class Job: status: str events: List[Event] result: str jobs_lock = Lock() jobs: Dict[str, "Job"] = {} def append_event(job_id, event_data): # This is our job status db; so we need to lock it with jobs_lock: if job_id not in jobs: # If this is the first action, initialize defaults and flag as started print(f"Start job: {job_id}") jobs[job_id] = Job( status="STARTED", events=[], result="" ) else: # If the job already exists, append to list print("Appending event for job") jobs[job_id].events.append( Event( timestamp=datetime.now(), data=event_data ) )