Architecture
Kafka Events
Event-driven messaging between services — topics, producers, consumers, and message schemas.
Apache Kafka decouples the API Gateway from the Agno agents and downstream services. All state transitions publish events; consumers react asynchronously without tight coupling. Kafka is optional for local development — events are logged but not published if the broker is unavailable.
Topic Reference
| Topic | Published By | Consumed By | Trigger |
|---|---|---|---|
| interview.info_needed | API Gateway | Notification Service | Validation fails — missing fields |
| interview.info_completed | API Gateway | Notification Service | HITL completes missing data |
| skill.validation.requested | API Gateway | Agno Planner Agent | Data complete, start plan generation |
| plan.generated | Agno Agent | API Gateway / Workflow Engine | Plan generation complete |
| approval.approved | API Gateway | Candidate Access Service | Recruiter approves plan |
| approval.rejected | API Gateway | Agno Agent (optional retry) | Recruiter rejects plan |
| interview.modification_requested | API Gateway | Agno Agent | Recruiter requests plan changes |
| interview.completed | Vert.x Edge | Agno Assessor Agent | Interview session ends |
| assessment.completed | Agno Agent | API Gateway / Workflow Engine | AI assessment ready |
Message Schema
All Kafka messages share a common envelope with a topic-specific payload field.
skill.validation.requested
{
"eventType": "skill.validation.requested",
"tenantId": "tenant-uuid",
"runId": "agno-run-uuid",
"interviewId": "interview-uuid",
"timestamp": "2024-01-15T10:01:00.000Z",
"payload": {
"skills": ["TypeScript", "React", "Node.js"],
"jobDescription": "Build scalable platform services...",
"level": "SENIOR",
"callbackUrl": "https://ats.example.com/webhook"
}
}plan.generated
{
"eventType": "plan.generated",
"tenantId": "tenant-uuid",
"runId": "agno-run-uuid",
"interviewId": "interview-uuid",
"timestamp": "2024-01-15T10:45:00.000Z",
"payload": {
"planId": "plan-uuid",
"validatedSkills": ["TypeScript", "React", "Node.js"],
"duration": 60
}
}Producer Implementation
typescript
// KafkaProducerService example
@Injectable()
export class KafkaProducerService {
async publishSkillValidationRequest(data: {
runId: string;
interviewId: string;
tenantId: string;
skills: string[];
jobDescription: string;
level: string;
callbackUrl?: string;
}): Promise<void> {
const message: KafkaMessage = {
eventType: 'skill.validation.requested',
tenantId: data.tenantId,
runId: data.runId,
interviewId: data.interviewId,
timestamp: new Date().toISOString(),
payload: {
skills: data.skills,
jobDescription: data.jobDescription,
level: data.level,
callbackUrl: data.callbackUrl,
},
};
await this.producer.send({
topic: 'skill.validation.requested',
messages: [{ value: JSON.stringify(message) }],
});
}
}Consumer Implementation
typescript
// A2A Consumer subscribes to plan.generated
@EventPattern('plan.generated')
async handlePlanGenerated(
@Payload() data: PlanGeneratedEvent,
): Promise<void> {
const { runId, interviewId, tenantId, payload } = data;
// Update interview state to PENDING
await this.interviewService.updateState(
tenantId,
interviewId,
'PENDING',
{ planId: payload.planId },
);
// Send webhook to callback URL
const interview = await this.interviewService.findById(tenantId, interviewId);
if (interview.callbackUrl) {
await this.webhookService.sendPlanGeneratedWebhook(
interview.callbackUrl,
runId,
interviewId,
tenantId,
);
}
}Local Development
Kafka is optional — the API Gateway detects connectivity on startup and falls back to logging-only mode. Start Kafka with Docker Compose:
bash
# Start Kafka + Zookeeper
docker-compose up -d kafka zookeeper
# Check topic list
docker exec -it kafka kafka-topics.sh \
--list --bootstrap-server localhost:9092
# Consume messages from a topic
docker exec -it kafka kafka-console-consumer.sh \
--topic skill.validation.requested \
--bootstrap-server localhost:9092 \
--from-beginning.env
KAFKA_BROKER=localhost:9092
KAFKA_CLIENT_ID=ai-interview-gateway
KAFKA_GROUP_ID=api-gateway-consumerKafka lag for each consumer group is visible at
/api/v1/admin/kafka/status. High lag on skill.validation.requested indicates the Agno Python service may be down or overloaded.Was this page helpful?