Architecture

Kafka Events

Event-driven messaging between services — topics, producers, consumers, and message schemas.

Apache Kafka decouples the API Gateway from the Agno agents and downstream services. All state transitions publish events; consumers react asynchronously without tight coupling. Kafka is optional for local development — events are logged but not published if the broker is unavailable.

Topic Reference

TopicPublished ByConsumed ByTrigger
interview.info_neededAPI GatewayNotification ServiceValidation fails — missing fields
interview.info_completedAPI GatewayNotification ServiceHITL completes missing data
skill.validation.requestedAPI GatewayAgno Planner AgentData complete, start plan generation
plan.generatedAgno AgentAPI Gateway / Workflow EnginePlan generation complete
approval.approvedAPI GatewayCandidate Access ServiceRecruiter approves plan
approval.rejectedAPI GatewayAgno Agent (optional retry)Recruiter rejects plan
interview.modification_requestedAPI GatewayAgno AgentRecruiter requests plan changes
interview.completedVert.x EdgeAgno Assessor AgentInterview session ends
assessment.completedAgno AgentAPI Gateway / Workflow EngineAI assessment ready

Message Schema

All Kafka messages share a common envelope with a topic-specific payload field.

skill.validation.requested
{
  "eventType": "skill.validation.requested",
  "tenantId": "tenant-uuid",
  "runId": "agno-run-uuid",
  "interviewId": "interview-uuid",
  "timestamp": "2024-01-15T10:01:00.000Z",
  "payload": {
    "skills": ["TypeScript", "React", "Node.js"],
    "jobDescription": "Build scalable platform services...",
    "level": "SENIOR",
    "callbackUrl": "https://ats.example.com/webhook"
  }
}
plan.generated
{
  "eventType": "plan.generated",
  "tenantId": "tenant-uuid",
  "runId": "agno-run-uuid",
  "interviewId": "interview-uuid",
  "timestamp": "2024-01-15T10:45:00.000Z",
  "payload": {
    "planId": "plan-uuid",
    "validatedSkills": ["TypeScript", "React", "Node.js"],
    "duration": 60
  }
}

Producer Implementation

typescript
// KafkaProducerService example
@Injectable()
export class KafkaProducerService {
  async publishSkillValidationRequest(data: {
    runId: string;
    interviewId: string;
    tenantId: string;
    skills: string[];
    jobDescription: string;
    level: string;
    callbackUrl?: string;
  }): Promise<void> {
    const message: KafkaMessage = {
      eventType: 'skill.validation.requested',
      tenantId: data.tenantId,
      runId: data.runId,
      interviewId: data.interviewId,
      timestamp: new Date().toISOString(),
      payload: {
        skills: data.skills,
        jobDescription: data.jobDescription,
        level: data.level,
        callbackUrl: data.callbackUrl,
      },
    };

    await this.producer.send({
      topic: 'skill.validation.requested',
      messages: [{ value: JSON.stringify(message) }],
    });
  }
}

Consumer Implementation

typescript
// A2A Consumer subscribes to plan.generated
@EventPattern('plan.generated')
async handlePlanGenerated(
  @Payload() data: PlanGeneratedEvent,
): Promise<void> {
  const { runId, interviewId, tenantId, payload } = data;

  // Update interview state to PENDING
  await this.interviewService.updateState(
    tenantId,
    interviewId,
    'PENDING',
    { planId: payload.planId },
  );

  // Send webhook to callback URL
  const interview = await this.interviewService.findById(tenantId, interviewId);
  if (interview.callbackUrl) {
    await this.webhookService.sendPlanGeneratedWebhook(
      interview.callbackUrl,
      runId,
      interviewId,
      tenantId,
    );
  }
}

Local Development

Kafka is optional — the API Gateway detects connectivity on startup and falls back to logging-only mode. Start Kafka with Docker Compose:

bash
# Start Kafka + Zookeeper
docker-compose up -d kafka zookeeper

# Check topic list
docker exec -it kafka kafka-topics.sh \
  --list --bootstrap-server localhost:9092

# Consume messages from a topic
docker exec -it kafka kafka-console-consumer.sh \
  --topic skill.validation.requested \
  --bootstrap-server localhost:9092 \
  --from-beginning
.env
KAFKA_BROKER=localhost:9092
KAFKA_CLIENT_ID=ai-interview-gateway
KAFKA_GROUP_ID=api-gateway-consumer
Kafka lag for each consumer group is visible at /api/v1/admin/kafka/status. High lag on skill.validation.requested indicates the Agno Python service may be down or overloaded.
Was this page helpful?