Mastering Real-Time Data Streaming with Apache Kafka and Avro

Sep 09

In the realm of real-time data streaming and processing, Apache Kafka has emerged as a powerful tool. When combined with Avro, a compact binary format for serializing data, Kafka becomes even more efficient for handling complex data structures, like files. In this guide, we’ll explore how to use Kafka and Avro to seamlessly produce and consume files, and we’ll also integrate a REST API to trigger the Kafka producer.

What is Apache Avro?

Avro is a binary serialisation format developed within the Apache Hadoop project. It provides a compact, fast, and schema-driven serialisation system that integrates well with Kafka. Avro’s schema evolution capabilities make it ideal for scenarios where data schemas may evolve over time without breaking compatibility.

Why Use Apache Avro with Kafka?

Apache Kafka is widely known for its ability to handle high-throughput, fault-tolerant, and scalable message streaming. However, the format in which the data is serialized plays a critical role in its performance and flexibility. This is where Apache Avro comes in as a powerful tool when working with Kafka.

Key Benefits of Using Avro with Kafka:

  1. Efficient Serialization: Avro's compact binary format reduces message size, improving Kafka's performance and reducing network and storage costs.
  2. Schema-Driven: Avro uses schemas to define data structure, ensuring that Kafka producers and consumers understand and process data consistently, even across different programming languages.
  3. Schema Evolution: Avro supports forward and backward schema compatibility, allowing you to modify data models without breaking existing Kafka pipelines.
  4. Integration with Schema Registry: Avro works with Kafka's Schema Registry to store, retrieve, and validate schemas, ensuring data integrity and version control across producers and consumers.
  5. Language-Agnostic: Avro's standardized format makes it easy to exchange data between applications written in different languages, enhancing interoperability in Kafka-based systems.

Setting Up Your Environment

Before diving into producing and consuming files with Kafka and Avro, ensure you have the following setup:

Step 1: Import Kafka and Avro Dependencies

Include the following dependencies in your project:

<dependencies>
    <!-- Kafka dependencies -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.1</version>
    </dependency>
    <!-- Avro dependencies -->
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.11.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-compiler</artifactId>
        <version>1.11.0</version>
    </dependency>
    <!-- Kafka Avro Serializer -->
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>6.1.1</version>
    </dependency>
    <!-- Kafka Avro Schema Registry -->
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-schema-registry-client</artifactId>
        <version>6.1.1</version>
    </dependency>
</dependencies>

Step 2: Generate Avro Schema from Java Objects

//To generate an Avro Schema from a Java Object, use AvroMapper:
public static void main(String[] args) throws IOException {
      AvroMapper avroMapper = AvroMapper.builder()
        .disable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)
        .addModule(new AvroJavaTimeModule())
        .build();
    createAvroSchemaFromClass(DocumentRecord.class, avroMapper);
    createAvroSchemaFromClass(Earth.class, avroMapper);
    createAvroSchemaFromClass(Mars.class, avroMapper);
}

private static void createAvroSchemaFromClass(Class<?> clazz, AvroMapper avroMapper) throws IOException {
    AvroSchemaGenerator gen = new AvroSchemaGenerator();
    gen.enableLogicalTypes();
    avroMapper.acceptJsonFormatVisitor(clazz, gen);
    AvroSchema schemaWrapper = gen.getGeneratedSchema();
    org.apache.avro.Schema avroSchema = schemaWrapper.getAvroSchema();
    String avroSchemaInJSON = avroSchema.toString(true);
    // Write to File
    Path fileName = Path.of(clazz.getSimpleName() + ".avsc");
    Files.writeString(fileName, avroSchemaInJSON);
}

Step 3: Define Your Avro Schema

An Avro schema is written in JSON and defines the structure of your data. For example, here is a simple schema for a document with title and body fields:

{
  "type": "record",
  "name": "DocumentRecord",
  "namespace": "com.example.document",
  "fields": [
    {
      "name": "title",
      "type": [
        "null",
        "string"
      ],
      "default": null,
      "doc": "The title of the document"
    },
    {
      "name": "body",
      "type": [
        "null",
        "string"
      ],
      "default": null,
      "doc": "The content of the document"
    }
  ]
}

Step 4: Kafka and Avro Integration

Key Components

  1. Kafka Broker: The Apache Kafka server.
  2. Schema Registry: A centralized registry for Avro schemas.
  3. Producer: An application that sends Avro-encoded messages to Kafka.
  4. Consumer: An application that reads Avro-encoded messages from Kafka.

Spring Boot Configuration (application.yml)

Ensure your application.yml file is configured correctly for Kafka and Avro:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      schema-registry-url: http://localhost:8081
    consumer:
      group-id: my-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      schema-registry-url: http://localhost:8081

Step 5: Producing Avro Files to Kafka

To produce Avro files to Kafka, use the Kafka Producer API along with Avro serialization:

@Autowired
private KafkaTemplate<String, SpecificRecordBase> kafkaTemplate;

public CompletableFuture<SendResult<String, SpecificRecordBase>> sendEvent(String topic,
    String key, SpecificRecordBase documentRecord) {
        return kafkaTemplate.send(topic, key, documentRecord)
        .whenComplete((result, ex) -> {
                if (ex != null) {
                    handleFailure(key, topic, ex);
            } else {
                    handleSuccess(key, topic, result);
            }
        });
}

private void handleFailure(String key, String topic, Throwable ex) {
    // Add error handling logic here (logging, retries, etc.)
    log.error("Failed to send message to Kafka - Topic: {}, Key: {}", topic, key, ex);
}

private void handleSuccess(String key, String topic, SendResult<String, SpecificRecordBase> result) {
    // Add success handling logic here (e.g., logging)
    log.info("Message sent to Kafka - Topic: {}, Partition: {}, Key: {}", topic, result.getRecordMetadata().partition(), key);
}

Step 6: Integrating a REST API to Call the Producer

To provide a way to trigger the Kafka producer through a REST API, you can create a simple Spring Boot controller.

Define the REST API (Controller)

This controller allows you to send a DocumentRecord object to Kafka via a POST request:

@RestController
@RequestMapping("/api/documents")
public class DocumentController {
    
    @Autowired
    private KafkaProducerService kafkaProducerService;

    @PostMapping("/send")
    public ResponseEntity<String> sendDocumentToKafka(@RequestBody DocumentRecord documentRecord) {
        String topic = "documents-event-topic";
        String key = documentRecord.getTitle(); // Using the title as the key
        try {
            kafkaProducerService.sendEvent(topic, key, documentRecord);
            return ResponseEntity.ok("Document sent to Kafka successfully.");
        } catch (Exception e) {
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                                 .body("Failed to send document to Kafka: " + e.getMessage());
        }
    }
}

Request Payload Example

Here’s an example of the JSON payload you can send via the POST API:

{
    "title": "Sample Document",
    "body": "This is the content of the sample document."
}

Step 7: Consuming Files from Kafka

To consume files from Kafka, configure a Kafka consumer to deserialize Avro data:

@Service
public class ConsumerService {
    
    @KafkaListener(topics = "document.event.topic", groupId = "document.event.consumer.group-id")
    public void consumeDocumentEvent(ConsumerRecord<String, DocumentRecord> rec) {
        log.info(String.format(
            "Consumed Document event from topic %s: partition %s key = %-10s",
            rec.topic(), rec.partition(), rec.key()));
    }
}

Conclusion

Apache Kafka and Avro together provide a robust solution for handling files in real-time data pipelines. Avro’s efficient serialisation and schema evolution capabilities complement Kafka’s scalability and fault-tolerant message delivery, making them ideal for building modern data architectures.

By integrating Kafka with Avro, organisations can achieve faster data processing, better scalability, and seamless integration across diverse data sources and applications. The added REST API allows easy interaction with the Kafka producer, making it simple to send Avro-encoded messages to Kafka from any application.

Comments (0)
    No comments found.
Post a Comment