From e43a9d5daf5a46477887cebc350f0422286e1114 Mon Sep 17 00:00:00 2001 From: Josh Deck Date: Thu, 21 Mar 2024 12:23:29 -0400 Subject: [PATCH] Overhaul of architecture to act as a Flask web API, provide results as a JSON including event logs, and provide threading functionality. --- .env | 1 - .gitignore | 5 +++ PreCET.pyproj | 6 ++-- agents.py | 1 - api.py | 79 ++++++++++++++++++++++++++++++++++++++++++++ crew.py | 52 +++++++++++++++++++++++++++++ job_manager.py | 37 +++++++++++++++++++++ main.py | 89 -------------------------------------------------- tasks.py | 5 ++- 9 files changed, 181 insertions(+), 94 deletions(-) delete mode 100644 .env create mode 100644 .gitignore create mode 100644 api.py create mode 100644 crew.py create mode 100644 job_manager.py delete mode 100644 main.py diff --git a/.env b/.env deleted file mode 100644 index 537bad4..0000000 --- a/.env +++ /dev/null @@ -1 +0,0 @@ -SERPER_API_KEY = 84c996f21aff6fac44c2610888b4e0f3422487e9 \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4a231e0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +################################################################################ +# This .gitignore file was automatically created by Microsoft(R) Visual Studio. +################################################################################ + +/.env diff --git a/PreCET.pyproj b/PreCET.pyproj index bd0c47f..80eb922 100644 --- a/PreCET.pyproj +++ b/PreCET.pyproj @@ -4,7 +4,7 @@ 2.0 f31e605a-2c1b-40be-9c5d-e09f28ab8fe1 . - main.py + api.py . @@ -22,7 +22,9 @@ - + + + diff --git a/agents.py b/agents.py index 559b6bb..79da032 100644 --- a/agents.py +++ b/agents.py @@ -1,7 +1,6 @@ from crewai import Agent from textwrap import dedent from langchain_community.llms import ollama -from langchain_openai import ChatOpenAI from tools.search_tools import SearchTools diff --git a/api.py b/api.py new file mode 100644 index 0000000..2930647 --- /dev/null +++ b/api.py @@ -0,0 +1,79 @@ +from threading import Thread +from uuid import uuid4 as uuid +from flask import Flask, jsonify, request, abort +from crew import ChatCrew +from job_manager import jobs_lock, append_event, jobs, Event +from datetime import datetime +import json + +app = Flask(__name__) + +def kickoff_crew(job_id, company, contact, interests, city): + print(f"Running crew for {job_id} for company {company} with point of contact {contact}") + + # Setup the crew, run, and process results + results = None + try: + crew = ChatCrew(job_id=job_id) + crew.setup_crew(company=company, contact=contact, interests=interests, city=city) + result = crew.kickoff() + except Exception as e: + print(f"CREW FALED: {str(e)}") + append_event(job_id, f"CREW FAILED: {str(e)}") + with jobs_lock: + jobs[job_id].status = "ERROR" + jobs[job_id].result = str(e) + + with jobs_lock: + jobs[job_id].status = "COMPLETE" + jobs[job_id].result = result + jobs[job_id].events.append(Event( + data="CREW COMPLETED", timestamp=datetime.now() + )) + + +@app.route('/api/crew', methods=['POST']) +def run_crew(): + data = request.json + if not data or 'company' not in data or 'contact' not in data or 'interests' not in data or 'city' not in data: + abort(400, description='Invalid request') + + job_id = str(uuid()) + company = data['company'] + contact = data['contact'] + interests = data['interests'] + city = data['city'] + + # Run the crew + thread = Thread(target=kickoff_crew, args=(job_id, company, contact, interests, city)) + thread.start() + + return jsonify({'job_id': job_id}), 200 + + + +@app.route('/api/crew/', methods=['GET']) +def get_status(job_id): + # Lock the job and check if it exists + with jobs_lock: + job = jobs.get(job_id) + if not job: + abort(404, description="Job not found") + + try: + result_json = json.loads(job.result) + except: + result_json = job.result + + print(f"{job_id} | {job.status} | {result_json}") + return jsonify({ + 'job_id': job_id, + 'status': job.status, + 'result': result_json, + 'events': [{"timestamp": event.timestamp.isoformat(), "data": event.data} for event in job.events] + }), 200 + + + +if __name__ == '__main__': + app.run(debug=True, port=3001) \ No newline at end of file diff --git a/crew.py b/crew.py new file mode 100644 index 0000000..d83a575 --- /dev/null +++ b/crew.py @@ -0,0 +1,52 @@ +from crewai import Crew +from textwrap import dedent +from agents import TalkingAgents +from tasks import TalkingTasks +from job_manager import append_event + +from dotenv import load_dotenv +load_dotenv() + + +class ChatCrew: + def __init__(self, job_id): + # Crew variables + self.job_id = job_id + self.crew = None + + def setup_crew(self, company, contact, interests, city): + print(f'Initializing crew for {self.job_id} for comapny {company} with point of contact {contact}') + + # Initialize agents + agents = TalkingAgents() + master_networker = agents.master_networker() + local_expert = agents.local_expert() + + # Initialize tasks + tasks = TalkingTasks(self.job_id) + chat_tasks = [ + tasks.research_topics(master_networker, company), tasks.gather_city_info(local_expert, city, interests) + ] + + # Initialize crew + self.crew = Crew( + agents=[master_networker, local_expert], + tasks=chat_tasks, + verbose=2 + ) + + def kickoff(self): + if not self.crew: + print(f"No crew found for {self.job_id}") + return + + append_event(self.job_id, "CREW STARTED") + try: + print(f'Running crew for {self.job_id}') + result = self.crew.kickoff() + append_event(self.job_id, "CREW COMPLETED") + return result + + except Exception as e: + append_event(self.job_id, "ERROR ENCOUNTERED") + return str(e) \ No newline at end of file diff --git a/job_manager.py b/job_manager.py new file mode 100644 index 0000000..f4ba732 --- /dev/null +++ b/job_manager.py @@ -0,0 +1,37 @@ +from typing import List, Dict +from datetime import datetime +from dataclasses import dataclass +from threading import Lock + +@dataclass +class Event: + timestamp: datetime + data: str + +@dataclass +class Job: + status: str + events: List[Event] + result: str + +jobs_lock = Lock() +jobs: Dict[str, "Job"] = {} + +def append_event(job_id, event_data): + # This is our job status db; so we need to lock it + with jobs_lock: + if job_id not in jobs: # If this is the first action, initialize defaults and flag as started + print(f"Start job: {job_id}") + jobs[job_id] = Job( + status="STARTED", + events=[], + result="" + ) + else: # If the job already exists, append to list + print("Appending event for job") + jobs[job_id].events.append( + Event( + timestamp=datetime.now(), + data=event_data + ) + ) \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index a250a79..0000000 --- a/main.py +++ /dev/null @@ -1,89 +0,0 @@ -""" -Client Engagement Tool - - -""" -from doctest import master -from crewai import Crew -from textwrap import dedent -from agents import TalkingAgents -from tasks import TalkingTasks - -from dotenv import load_dotenv -load_dotenv() - - -class TripCrew: - def __init__(self, customer, contact): - self.customer = customer - self.contact = contact - self.interests = 'Art, hiking, animals' - self.city = 'Canton, Michigan' - self.team_affinity = '' - - def run(self): - # Define your custom agents and tasks in agents.py and tasks.py - agents = TalkingAgents() - tasks = TalkingTasks() - - # Define your custom agents and tasks here - master_networker = agents.master_networker() - local_guide = agents.local_expert() - sports_analyst = agents.sports_analyst() - - # Custom tasks include agent name and variables as input - research_topics = tasks.research_topics( - master_networker, - self.customer, - #self.contact - ) - gather_sports_info = tasks.gather_sports_info( - sports_analyst, - # These two will be replaced programmatically, presumably from a db/excel lookup - self.city, - self.team_affinity - ) - - gather_city_info = tasks.gather_city_info( - local_guide, - self.city, - self.interests - ) - - # Define your custom crew here - crew = Crew( - agents=[master_networker, - sports_analyst, - local_guide - ], - tasks=[ - research_topics, - gather_sports_info, - gather_city_info - ], - verbose=True, - ) - - result = crew.kickoff() - return result - - -# This is the main function that you will use to run your custom crew. -if __name__ == "__main__": - print("## Welcome to PreCET") - print('-------------------------------') - customer = input( - dedent(""" - Which company are you going to be contacting? - """)) - contact = input( - dedent(""" - Who are you going to be talking with from {}? - """.format(customer))) - - trip_crew = TripCrew(customer, contact) - result = trip_crew.run() - print("\n\n########################") - print("## Here are your Topics of Conversation") - print("########################\n") - print(result) \ No newline at end of file diff --git a/tasks.py b/tasks.py index 1364356..ddbd828 100644 --- a/tasks.py +++ b/tasks.py @@ -46,7 +46,10 @@ Key Steps for Task Creation: """ -class TalkingTasks: +class TalkingTasks(): + def __init__(self, job_id): + self.job_id = job_id + def __tip_section(self): return "You work on commission, so securing the sale is your number one priority."