37 lines
974 B
Python
37 lines
974 B
Python
from typing import List, Dict
|
|
from datetime import datetime
|
|
from dataclasses import dataclass
|
|
from threading import Lock
|
|
|
|
@dataclass
|
|
class Event:
|
|
timestamp: datetime
|
|
data: str
|
|
|
|
@dataclass
|
|
class Job:
|
|
status: str
|
|
events: List[Event]
|
|
result: str
|
|
|
|
jobs_lock = Lock()
|
|
jobs: Dict[str, "Job"] = {}
|
|
|
|
def append_event(job_id, event_data):
|
|
# This is our job status db; so we need to lock it
|
|
with jobs_lock:
|
|
if job_id not in jobs: # If this is the first action, initialize defaults and flag as started
|
|
print(f"Start job: {job_id}")
|
|
jobs[job_id] = Job(
|
|
status="STARTED",
|
|
events=[],
|
|
result=""
|
|
)
|
|
else: # If the job already exists, append to list
|
|
print("Appending event for job")
|
|
jobs[job_id].events.append(
|
|
Event(
|
|
timestamp=datetime.now(),
|
|
data=event_data
|
|
)
|
|
) |