PreCET/job_manager.py

37 lines
974 B
Python

from typing import List, Dict
from datetime import datetime
from dataclasses import dataclass
from threading import Lock
@dataclass
class Event:
timestamp: datetime
data: str
@dataclass
class Job:
status: str
events: List[Event]
result: str
jobs_lock = Lock()
jobs: Dict[str, "Job"] = {}
def append_event(job_id, event_data):
# This is our job status db; so we need to lock it
with jobs_lock:
if job_id not in jobs: # If this is the first action, initialize defaults and flag as started
print(f"Start job: {job_id}")
jobs[job_id] = Job(
status="STARTED",
events=[],
result=""
)
else: # If the job already exists, append to list
print("Appending event for job")
jobs[job_id].events.append(
Event(
timestamp=datetime.now(),
data=event_data
)
)