chore(cleanup): remove contact-related files and update JWT expiration time settings

This commit is contained in:
Davidson Gomes
2025-05-07 11:43:00 -03:00
parent 8979251541
commit 0e3c331a72
21 changed files with 151 additions and 744 deletions

View File

@@ -38,11 +38,11 @@ router = APIRouter(
)
@router.websocket("/ws/{agent_id}/{contact_id}")
@router.websocket("/ws/{agent_id}/{external_id}")
async def websocket_chat(
websocket: WebSocket,
agent_id: str,
contact_id: str,
external_id: str,
db: Session = Depends(get_db),
):
try:
@@ -81,7 +81,7 @@ async def websocket_chat(
await verify_user_client(payload, db, agent.client_id)
logger.info(
f"WebSocket connection established for agent {agent_id} and contact {contact_id}"
f"WebSocket connection established for agent {agent_id} and external_id {external_id}"
)
while True:
@@ -95,7 +95,7 @@ async def websocket_chat(
async for chunk in run_agent_stream(
agent_id=agent_id,
contact_id=contact_id,
external_id=external_id,
message=message,
session_service=session_service,
artifacts_service=artifacts_service,
@@ -162,7 +162,7 @@ async def chat(
try:
final_response_text = await run_agent(
request.agent_id,
request.contact_id,
request.external_id,
request.message,
session_service,
artifacts_service,

View File

@@ -1,122 +0,0 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from src.config.database import get_db
from typing import List
import uuid
from src.core.jwt_middleware import (
get_jwt_token,
verify_user_client,
)
from src.schemas.schemas import (
Contact,
ContactCreate,
)
from src.services import (
contact_service,
)
import logging
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/contacts",
tags=["contacts"],
responses={404: {"description": "Not found"}},
)
@router.post("/", response_model=Contact, status_code=status.HTTP_201_CREATED)
async def create_contact(
contact: ContactCreate,
db: Session = Depends(get_db),
payload: dict = Depends(get_jwt_token),
):
# Verify if the user has access to the contact's client
await verify_user_client(payload, db, contact.client_id)
return contact_service.create_contact(db, contact)
@router.get("/{client_id}", response_model=List[Contact])
async def read_contacts(
client_id: uuid.UUID,
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
payload: dict = Depends(get_jwt_token),
):
# Verify if the user has access to this client's data
await verify_user_client(payload, db, client_id)
return contact_service.get_contacts_by_client(db, client_id, skip, limit)
@router.get("/{contact_id}", response_model=Contact)
async def read_contact(
contact_id: uuid.UUID,
db: Session = Depends(get_db),
payload: dict = Depends(get_jwt_token),
):
db_contact = contact_service.get_contact(db, contact_id)
if db_contact is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Contact not found"
)
# Verify if the user has access to the contact's client
await verify_user_client(payload, db, db_contact.client_id)
return db_contact
@router.put("/{contact_id}", response_model=Contact)
async def update_contact(
contact_id: uuid.UUID,
contact: ContactCreate,
db: Session = Depends(get_db),
payload: dict = Depends(get_jwt_token),
):
# Get the current contact
db_current_contact = contact_service.get_contact(db, contact_id)
if db_current_contact is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Contact not found"
)
# Verify if the user has access to the contact's client
await verify_user_client(payload, db, db_current_contact.client_id)
# Verify if the user is trying to change the client
if contact.client_id != db_current_contact.client_id:
# Verify if the user has access to the new client as well
await verify_user_client(payload, db, contact.client_id)
db_contact = contact_service.update_contact(db, contact_id, contact)
if db_contact is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Contact not found"
)
return db_contact
@router.delete("/{contact_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_contact(
contact_id: uuid.UUID,
db: Session = Depends(get_db),
payload: dict = Depends(get_jwt_token),
):
# Get the contact
db_contact = contact_service.get_contact(db, contact_id)
if db_contact is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Contact not found"
)
# Verify if the user has access to the contact's client
await verify_user_client(payload, db, db_contact.client_id)
if not contact_service.delete_contact(db, contact_id):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Contact not found"
)

View File

@@ -34,21 +34,6 @@ def get_redis_config():
}
def get_a2a_config():
"""
Get A2A-specific cache TTL values from environment variables.
Returns:
dict: A2A TTL configuration parameters
"""
return {
"task_ttl": int(os.getenv("A2A_TASK_TTL", 3600)),
"history_ttl": int(os.getenv("A2A_HISTORY_TTL", 86400)),
"push_notification_ttl": int(os.getenv("A2A_PUSH_NOTIFICATION_TTL", 3600)),
"sse_client_ttl": int(os.getenv("A2A_SSE_CLIENT_TTL", 300)),
}
def create_redis_pool(config=None):
"""
Create and return a Redis connection pool.

View File

@@ -47,7 +47,7 @@ class Settings(BaseSettings):
# JWT settings
JWT_SECRET_KEY: str = os.getenv("JWT_SECRET_KEY", secrets.token_urlsafe(32))
JWT_ALGORITHM: str = os.getenv("JWT_ALGORITHM", "HS256")
JWT_EXPIRATION_TIME: int = int(os.getenv("JWT_EXPIRATION_TIME", 30))
JWT_EXPIRATION_TIME: int = int(os.getenv("JWT_EXPIRATION_TIME", 3600))
# SendGrid settings
SENDGRID_API_KEY: str = os.getenv("SENDGRID_API_KEY", "")

View File

@@ -18,7 +18,6 @@ import src.api.admin_routes
import src.api.chat_routes
import src.api.session_routes
import src.api.agent_routes
import src.api.contact_routes
import src.api.mcp_server_routes
import src.api.tool_routes
import src.api.client_routes
@@ -70,7 +69,6 @@ admin_router = src.api.admin_routes.router
chat_router = src.api.chat_routes.router
session_router = src.api.session_routes.router
agent_router = src.api.agent_routes.router
contact_router = src.api.contact_routes.router
mcp_server_router = src.api.mcp_server_routes.router
tool_router = src.api.tool_routes.router
client_router = src.api.client_routes.router
@@ -85,7 +83,6 @@ app.include_router(client_router, prefix=API_PREFIX)
app.include_router(chat_router, prefix=API_PREFIX)
app.include_router(session_router, prefix=API_PREFIX)
app.include_router(agent_router, prefix=API_PREFIX)
app.include_router(contact_router, prefix=API_PREFIX)
app.include_router(a2a_router, prefix=API_PREFIX)

View File

@@ -51,18 +51,6 @@ class User(Base):
)
class Contact(Base):
__tablename__ = "contacts"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
client_id = Column(UUID(as_uuid=True), ForeignKey("clients.id", ondelete="CASCADE"))
ext_id = Column(String)
name = Column(String)
meta = Column(JSON, default={})
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class Agent(Base):
__tablename__ = "agents"

View File

@@ -8,8 +8,8 @@ class ChatRequest(BaseModel):
agent_id: str = Field(
..., description="ID of the agent that will process the message"
)
contact_id: str = Field(
..., description="ID of the contact that will process the message"
external_id: str = Field(
..., description="ID of the external_id that will process the message"
)
message: str = Field(..., description="User message")

View File

@@ -33,24 +33,6 @@ class Client(ClientBase):
from_attributes = True
class ContactBase(BaseModel):
ext_id: Optional[str] = None
name: Optional[str] = None
meta: Optional[Dict[str, Any]] = Field(default_factory=dict)
class ContactCreate(ContactBase):
client_id: UUID
class Contact(ContactBase):
id: UUID
client_id: UUID
class Config:
from_attributes = True
class AgentBase(BaseModel):
name: Optional[str] = Field(
None, description="Agent name (no spaces or special characters)"

View File

@@ -315,13 +315,13 @@ class A2ATaskManager:
)
# Collect the chunks of the agent's response
contact_id = task_params.sessionId
external_id = task_params.sessionId
full_response = ""
# We use the same streaming function used in the WebSocket
async for chunk in run_agent_stream(
agent_id=str(agent.id),
contact_id=contact_id,
external_id=external_id,
message=query,
session_service=session_service,
artifacts_service=artifacts_service,
@@ -438,13 +438,13 @@ class A2ATaskManager:
async def _run_agent(self, agent: Agent, query: str, session_id: str) -> str:
"""Executes the agent to process the user query."""
try:
# We use the session_id as contact_id to maintain the conversation continuity
contact_id = session_id
# We use the session_id as external_id to maintain the conversation continuity
external_id = session_id
# We call the same function used in the chat API
final_response = await run_agent(
agent_id=str(agent.id),
contact_id=contact_id,
external_id=external_id,
message=query,
session_service=session_service,
artifacts_service=artifacts_service,

View File

@@ -16,7 +16,7 @@ logger = setup_logger(__name__)
async def run_agent(
agent_id: str,
contact_id: str,
external_id: str,
message: str,
session_service: DatabaseSessionService,
artifacts_service: InMemoryArtifactService,
@@ -27,7 +27,9 @@ async def run_agent(
):
exit_stack = None
try:
logger.info(f"Starting execution of agent {agent_id} for contact {contact_id}")
logger.info(
f"Starting execution of agent {agent_id} for external_id {external_id}"
)
logger.info(f"Received message: {message}")
get_root_agent = get_agent(db, agent_id)
@@ -50,22 +52,22 @@ async def run_agent(
artifact_service=artifacts_service,
memory_service=memory_service,
)
adk_session_id = contact_id + "_" + agent_id
adk_session_id = external_id + "_" + agent_id
if session_id is None:
session_id = adk_session_id
logger.info(f"Searching session for contact {contact_id}")
logger.info(f"Searching session for external_id {external_id}")
session = session_service.get_session(
app_name=agent_id,
user_id=contact_id,
user_id=external_id,
session_id=adk_session_id,
)
if session is None:
logger.info(f"Creating new session for contact {contact_id}")
logger.info(f"Creating new session for external_id {external_id}")
session = session_service.create_session(
app_name=agent_id,
user_id=contact_id,
user_id=external_id,
session_id=adk_session_id,
)
@@ -80,7 +82,7 @@ async def run_agent(
async def process_events():
try:
events_async = agent_runner.run_async(
user_id=contact_id,
user_id=external_id,
session_id=adk_session_id,
new_message=content,
)
@@ -139,7 +141,7 @@ async def run_agent(
# Add the session to memory after completion
completed_session = session_service.get_session(
app_name=agent_id,
user_id=contact_id,
user_id=external_id,
session_id=adk_session_id,
)
@@ -180,7 +182,7 @@ async def run_agent(
async def run_agent_stream(
agent_id: str,
contact_id: str,
external_id: str,
message: str,
session_service: DatabaseSessionService,
artifacts_service: InMemoryArtifactService,
@@ -190,7 +192,7 @@ async def run_agent_stream(
) -> AsyncGenerator[str, None]:
try:
logger.info(
f"Starting streaming execution of agent {agent_id} for contact {contact_id}"
f"Starting streaming execution of agent {agent_id} for external_id {external_id}"
)
logger.info(f"Received message: {message}")
@@ -214,22 +216,22 @@ async def run_agent_stream(
artifact_service=artifacts_service,
memory_service=memory_service,
)
adk_session_id = contact_id + "_" + agent_id
adk_session_id = external_id + "_" + agent_id
if session_id is None:
session_id = adk_session_id
logger.info(f"Searching session for contact {contact_id}")
logger.info(f"Searching session for external_id {external_id}")
session = session_service.get_session(
app_name=agent_id,
user_id=contact_id,
user_id=external_id,
session_id=adk_session_id,
)
if session is None:
logger.info(f"Creating new session for contact {contact_id}")
logger.info(f"Creating new session for external_id {external_id}")
session = session_service.create_session(
app_name=agent_id,
user_id=contact_id,
user_id=external_id,
session_id=adk_session_id,
)
@@ -238,7 +240,7 @@ async def run_agent_stream(
try:
events_async = agent_runner.run_async(
user_id=contact_id,
user_id=external_id,
session_id=adk_session_id,
new_message=content,
)
@@ -252,7 +254,7 @@ async def run_agent_stream(
completed_session = session_service.get_session(
app_name=agent_id,
user_id=contact_id,
user_id=external_id,
session_id=adk_session_id,
)

View File

@@ -1,109 +0,0 @@
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from fastapi import HTTPException, status
from src.models.models import Contact
from src.schemas.schemas import ContactCreate
from typing import List, Optional
import uuid
import logging
logger = logging.getLogger(__name__)
def get_contact(db: Session, contact_id: uuid.UUID) -> Optional[Contact]:
"""Search for a contact by ID"""
try:
contact = db.query(Contact).filter(Contact.id == contact_id).first()
if not contact:
logger.warning(f"Contact not found: {contact_id}")
return None
return contact
except SQLAlchemyError as e:
logger.error(f"Error searching for contact {contact_id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error searching for contact",
)
def get_contacts_by_client(
db: Session, client_id: uuid.UUID, skip: int = 0, limit: int = 100
) -> List[Contact]:
"""Search for contacts of a client with pagination"""
try:
return (
db.query(Contact)
.filter(Contact.client_id == client_id)
.offset(skip)
.limit(limit)
.all()
)
except SQLAlchemyError as e:
logger.error(f"Error searching for contacts of client {client_id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error searching for contacts",
)
def create_contact(db: Session, contact: ContactCreate) -> Contact:
"""Create a new contact"""
try:
db_contact = Contact(**contact.model_dump())
db.add(db_contact)
db.commit()
db.refresh(db_contact)
logger.info(f"Contact created successfully: {db_contact.id}")
return db_contact
except SQLAlchemyError as e:
db.rollback()
logger.error(f"Error creating contact: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error creating contact",
)
def update_contact(
db: Session, contact_id: uuid.UUID, contact: ContactCreate
) -> Optional[Contact]:
"""Update an existing contact"""
try:
db_contact = get_contact(db, contact_id)
if not db_contact:
return None
for key, value in contact.model_dump().items():
setattr(db_contact, key, value)
db.commit()
db.refresh(db_contact)
logger.info(f"Contact updated successfully: {contact_id}")
return db_contact
except SQLAlchemyError as e:
db.rollback()
logger.error(f"Error updating contact {contact_id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error updating contact",
)
def delete_contact(db: Session, contact_id: uuid.UUID) -> bool:
"""Remove a contact"""
try:
db_contact = get_contact(db, contact_id)
if not db_contact:
return False
db.delete(db_contact)
db.commit()
logger.info(f"Contact removed successfully: {contact_id}")
return True
except SQLAlchemyError as e:
db.rollback()
logger.error(f"Error removing contact {contact_id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error removing contact",
)

View File

@@ -197,10 +197,32 @@ class WorkflowAgent(BaseAgent):
print(f"\n🔄 CONDITION: {label} (Cycle {cycle_count})")
content = state.get("content", [])
print(f"Evaluating condition for content: '{content}'")
conversation_history = state.get("conversation_history", [])
# Obter apenas o evento mais recente para avaliação
latest_event = None
if content and len(content) > 0:
# Ignorar eventos gerados por nós de condição para avaliação
for event in reversed(content):
# Verificar se é um evento gerado pelo condition_node
if (
event.author != "agent"
or not hasattr(event.content, "parts")
or not event.content.parts
):
latest_event = event
break
if latest_event:
print(
f"Avaliando condição apenas para o evento mais recente: '{latest_event}'"
)
# Usar apenas o evento mais recente para avaliação de condição
evaluation_state = state.copy()
if latest_event:
evaluation_state["content"] = [latest_event]
session_id = state.get("session_id", "")
conversation_history = state.get("conversation_history", [])
# Check all conditions
conditions_met = []
@@ -213,9 +235,9 @@ class WorkflowAgent(BaseAgent):
expected_value = condition_data.get("value")
print(
f" Checking if {field} {operator} '{expected_value}' (current value: '{state.get(field, '')}')"
f" Checking if {field} {operator} '{expected_value}' (current value: '{evaluation_state.get(field, '')}')"
)
if self._evaluate_condition(condition, state):
if self._evaluate_condition(condition, evaluation_state):
conditions_met.append(condition_id)
condition_details.append(
f"{field} {operator} '{expected_value}'"
@@ -474,14 +496,35 @@ class WorkflowAgent(BaseAgent):
# If it's a condition node, evaluate the conditions
if node_id in condition_nodes:
conditions = condition_nodes[node_id]
any_condition_met = False
for condition in conditions:
condition_id = condition.get("id")
# Get latest event for evaluation, ignoring condition node informational events
content = state.get("content", [])
latest_event = None
for event in reversed(content):
# Skip events generated by condition nodes
if (
event.author != "agent"
or not hasattr(event.content, "parts")
or not event.content.parts
):
latest_event = event
break
evaluation_state = state.copy()
if latest_event:
evaluation_state["content"] = [latest_event]
# Check if the condition is met
is_condition_met = self._evaluate_condition(condition, state)
is_condition_met = self._evaluate_condition(
condition, evaluation_state
)
if is_condition_met:
any_condition_met = True
print(
f"Condition {condition_id} met. Moving to the next node."
)
@@ -498,12 +541,20 @@ class WorkflowAgent(BaseAgent):
)
# If no condition is met, use the bottom-handle if available
if node_id in edges_map and "bottom-handle" in edges_map[node_id]:
print("No condition met. Using default path (bottom-handle).")
return edges_map[node_id]["bottom-handle"]
else:
print("No condition met and no default path. Closing the flow.")
return END
if not any_condition_met:
if (
node_id in edges_map
and "bottom-handle" in edges_map[node_id]
):
print(
"No condition met. Using default path (bottom-handle)."
)
return edges_map[node_id]["bottom-handle"]
else:
print(
"No condition met and no default path. Closing the flow."
)
return END
# For regular nodes, simply follow the first available connection
if node_id in edges_map: