Big data messaging with Kafka, Part 2

May 11, 2016
Big data messaging with Kafka, Part 2

In Part 1 you developed a couple of small-scale producer/consumer applications using Kafka. From these exercises you should be familiar with the basics of the Kafka messaging system. In Part 2 you'll learn how to use partitions to distribute load and scale your application horizontally, handling up to millions of messages per day. You'll also learn how Kafka uses message offsets to track and manage complex message processing, and how to protect your Kafka messaging system against failure should a consumer go down. We'll develop the example application from Part 1 for both publish-subscribe and point-to-point use cases.

Partitions in Kafka

Topics in Kafka can be subdivided into partitions. For example, while creating a topic named Demo, you might configure it to have three partitions. The server would create three log files, one for each of the demo partitions. When a producer published a message to the topic, it would assign a partition ID for that message. The server would then append the message to the log file for that partition only.

If you then started two consumers, the server might assign partitions 1 and 2 to the first consumer, and partition 3 to the second consumer. Each consumer would read only from its assigned partitions. You can see the Demo topic configured for three partitions in Figure 1.

A partitioned topic in Apache Kafka

Figure 1. A partitioned topic in Apache Kafka

To expand the scenario, imagine a Kafka cluster with two brokers, housed in two machines. When you partitioned the demo topic, you would configure it to have two partitions and two replicas. For this type of configuration, the Kafka server would assign the two partitions to the two brokers in your cluster. Each broker would be the leader for one of the partitions.

When a producer published a message, it would go to the partition leader. The leader would take the message and append it to the log file on the local machine. The second broker would passively replicate that commit log to its own machine. If the partition leader went down, the second broker would become the new leader and start serving client requests. In the same way, when a consumer sent a request to a partition, that request would go first to the partition leader, which would return the requested messages.

Benefits of partitioning

Consider the benefits of partitioning a Kafka-based messaging system:

  1. Scalability: In a system with just one partition, messages published to a topic are stored in a log file, which exists on a single machine. The number of messages for a topic must fit into a single commit log file, and the size of messages stored can never be more than that machine's disk space. Partitioning a topic lets you scale your system by storing messages on different machines in a cluster. If you wanted to store 30 gigabytes (GB) of messages for the Demo topic, for instance, you could build a Kafka cluster of three machines, each with 10 GB of disk space. Then you would configure the topic to have three partitions.
  2. Server-load balancing: Having multiple partitions lets you spread message requests across brokers. For example, If you had a topic that processed 1 million messages per second, you could divide it into 100 partitions and add 100 brokers to your cluster. Each broker would be the leader for single partition, responsible for responding to just 100,000 client requests per second.
  3. Consumer-load balancing: Similar to server-load balancing, hosting multiple consumers on different machine lets you spread the consumer load. Let's say you wanted to consume 1 million messages per second from a topic with 100 partitions. You could create 100 consumers and run them in parallel. The Kafka server would assign one partition to each of the consumers, and each consumer would process 100,000 messages in parallel. Since Kafka assigns each partition to only one consumer, within the partition each message would be consumed in order.

Two ways to partition

The producer is responsible for deciding what partition a message will go to. The producer has two options for controlling this assignment:

  • Custom partitioner: You can create a class implementing the org.apache.kafka.clients.producer.Partitioner interface. This custom Partitioner will implement the business logic to decide where messages are sent.
  • DefaultPartitioner: If you don't create a custom partitioner class, then by default the org.apache.kafka.clients.producer.internals.DefaultPartitioner class will be used. The default partitioner is good enough for most cases, providing three options:
    1. Manual: When you create a ProducerRecord, use the overloaded constructor new ProducerRecord(topicName, partitionId,messageKey,message) to specify a partition ID.
    2. Hashing(Locality sensitive): When you create a ProducerRecord, specify a messageKey, by calling new ProducerRecord(topicName,messageKey,message). DefaultPartitioner will use the hash of the key to ensure that all messages for the same key go to same producer. This is the easiest and most common approach.
    3. Spraying(Random Load Balancing): If you don't want to control which partition messages go to, simply call new ProducerRecord(topicName, message) to create your ProducerRecord. In this case the partitioner will send messages to all the partitions in round-robin fashion, ensuring a balanced server load.

Partitioning a Kafka application

For the simple producer/consumer example in Part 1, we used a DefaultPartitioner. Now we'll try creating a custom partitioner instead. For this example, let's assume that we have a retail site that consumers can use to order products anywhere in the world. Based on usage, we know that most consumers are in either the United States or India. We want to partition our application to send orders from the US or India to their own respective consumers, while orders from anywhere else will go to a third consumer.

To start, we'll create a CountryPartitioner that implements the org.apache.kafka.clients.producer.Partitioner interface. We must implement the following methods:

  1. Kafka will call configure() when we initialize the Partitioner class, with a Map of configuration properties. This method initializes functions specific to the application's business logic, such as connecting to a database. In this case we want a fairly generic partitioner that takes countryName as a property. We can then use configProperties.put("partitions.0","USA") to map the flow of messages to partitions. In the future we can use this format to change which countries get their own partition.
  2. The Producer API calls partition() once for every message. In this case we'll use it to read the message and parse the name of the country from the message. If the name of the country is in the countryToPartitionMap, it will return partitionId stored in the Map. If not, it will hash the value of the country and use it to calculate which partition it should go to.
  3. We call close() to shut down the partitioner. Using this method ensures that any resources acquired during initialization are cleaned up during shutdown.

Note that when Kafka calls configure(), the Kafka producer will pass all the properties that we've configured for the producer to the Partitioner class. It is essential that we read only those properties that start with partitions., parse them to get the partitionId, and store the ID in countryToPartitionMap.

Below is our custom implementation of the Partitioner interface.

Listing 1. CountryPartitioner

    
    public class CountryPartitioner implements Partitioner {
        private static Map<String,Integer> countryToPartitionMap;

        public void configure(Map<String, ?> configs) {
            System.out.println("Inside CountryPartitioner.configure " + configs);
            countryToPartitionMap = new HashMap<String, Integer>();
            for(Map.Entry<String,?> entry: configs.entrySet()){
                if(entry.getKey().startsWith("partitions.")){
                    String keyName = entry.getKey();
                    String value = (String)entry.getValue();
                    System.out.println( keyName.substring(11));
                    int paritionId = Integer.parseInt(keyName.substring(11));
                    countryToPartitionMap.put(value,paritionId);
                }
            }
        }

        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                             Cluster cluster) {
            List partitions = cluster.availablePartitionsForTopic(topic);
            String valueStr = (String)value;
            String countryName = ((String) value).split(":")[0];
            if(countryToPartitionMap.containsKey(countryName)){
                //If the country is mapped to particular partition return it
                return countryToPartitionMap.get(countryName);
            }else {
                //If no country is mapped to particular partition distribute between remaining partitions
                int noOfPartitions = cluster.topics().size();
                return  value.hashCode()%noOfPartitions + countryToPartitionMap.size() ;
            }
        }

        public void close() {}
    }
    

The Producer class in Listing 2 (below) is very similar to our simple producer from Part 1, with two changes marked in bold:

  1. We set a config property with a key equal to the value of ProducerConfig.PARTITIONER_CLASS_CONFIG, which matches the fully qualified name of our CountryPartitioner class. We also set countryName to partitionId, thus mapping the properties that we want to pass to CountryPartitioner.
  2. We pass an instance of a class implementing the org.apache.kafka.clients.producer.Callback interface as a second argument to the producer.send() method. The Kafka client will call its onCompletion() method once a message is successfully published, attaching a RecordMetadata object. We'll be able to use this object to find out which partition a message was sent to, as well as the offset assigned to the published message.

Listing 2. A partitioned producer


public class Producer {
    private static Scanner in;
    public static void main(String[] argv)throws Exception {
        if (argv.length != 1) {
            System.err.println("Please specify 1 parameters ");
            System.exit(-1);
        }
        String topicName = argv[0];
        in = new Scanner(System.in);
        System.out.println("Enter message(type exit to quit)");

        //Configure the Producer
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

            configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName());
        configProperties.put("partition.1","USA");
        configProperties.put("partition.2","India");
        
        org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
        String line = in.nextLine();
        while(!line.equals("exit")) {
            ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, null, line);
            producer.send(rec, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset());
;
                }
            });
            line = in.nextLine();
        }
        in.close();
        producer.close();
    }
}

Assigning partitions to consumers

The Kafka server guarantees that a partition is assigned to only one consumer, thereby guaranteeing the order of message consumption. You can manually assign a partition or have it assigned automatically.

If your business logic demands more control, then you'll need to manually assign partitions. In this case you would use KafkaConsumer.assign(<listOfPartitions>) to pass a list of partitions that each consumer was interested in to the Kakfa server.

Having partitions assigned automatically is the default and most common choice. In this case, the Kafka server will assign a partition to each consumer, and will reassign partitions to scale for new consumers.

Say you're creating a new topic with three partitions. When you start the first consumer for the new topic, Kafka will assign all three partitions to the same consumer. If you then start a second consumer, Kafka will reassign all the partitions, assigning one partition to the first consumer and the remaining two partitions to the second consumer. If you add a third consumer, Kafka will reassign the partitions again, so that each consumer is assigned a single partition. Finally, if you start fourth and fifth consumers, then three of the consumers will have an assigned partition, but the others won't receive any messages. If one of the initial three partitions goes down, Kafka will use the same partitioning logic to reassign that consumer's partition to one of the additional consumers.

We'll use automatic assignment for the example application. Most of our consumer code will be the same as it was for the simple consumer seen in Part 1. The only difference is that we'll pass an instance of ConsumerRebalanceListener as a second argument to our KafkaConsumer.subscribe() method. Kafka will call methods of this class every time it either assigns or revokes a partition to this consumer. We'll override ConsumerRebalanceListener's onPartitionsRevoked() and onPartitionsAssigned() methods and print the list of partitions that were assigned or revoked from this subscriber.

Listing 3. A partitioned consumer

  
   private static class ConsumerThread extends Thread {
     private String topicName;
     private String groupId;
     private KafkaConsumer<String, String> kafkaConsumer;

     public ConsumerThread(String topicName, String groupId) {
         this.topicName = topicName;
         this.groupId = groupId;
     }

     public void run() {
         Properties configProperties = new Properties();
         configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

         //Figure out where to start processing messages from
         kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
         kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                 System.out.printf("%s topic-partitions are revoked from this consumern", Arrays.toString(partitions.toArray()));
             }
             public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                 System.out.printf("%s topic-partitions are assigned to this consumern", Arrays.toString(partitions.toArray()));
             }
         });
         //Start processing messages
         try {
             while (true) {
                 ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                 for (ConsumerRecord<String, String> record : records)
                     System.out.println(record.value());
             }
         } catch (WakeupException ex) {
             System.out.println("Exception caught " + ex.getMessage());
         } finally {
             kafkaConsumer.close();
             System.out.println("After closing KafkaConsumer");
         }
     }

     public KafkaConsumer<String, String> getKafkaConsumer() {
         return this.kafkaConsumer;
     }
}
   
   

Test the application

We're ready to run and test the current iteration of our producer/consumer application. As you've done previously, you can use the code in Listings 1 through 3, or download the complete source code on GitHub.

  1. Compile and create a fat JAR by invoking: mvn compile assembly:single.
  2. Create a topic named part-demo with three partitions and one replication factor:
    
        <KAFKA_HOME>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic part-demo
      
    
  3. Start a producer:
    
                java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer part-demo
    
    
  4. Start three consumers, then watch the console to see how your partitions are assigned and revoked every time you start a new instance of the consumer:
    
                java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Consumer part-demo group1
    
    
  5. Type some messages into your producer console and verify whether the messages are routed to the correct consumer:
    
                USA: First order
                India: First order
                USA: Second order
                France: First order
    
    

Figure 2 shows producer/consumer output in the partitioned topic.

Producer/consumer output

Figure 2. Producer/consumer output

Being able to partition a single topic into multiple parts is one essential to Kafka's scalability. Partitioning lets you scale your messaging infrastructure horizontally while also maintaining order within each partition. Next we'll look at how Kafka uses message offsets to track and manage complex messaging scenarios.

Managing message offsets

I mentioned in Part 1 that whenever a producer publishes a message, the Kafka server assigns an offset to that message. A consumer is able to control which messages it wants to consume by setting or resetting the message offset. When developing a consumer you have two options for managing the offset: automatic and manual.

Two types of offset

When you start a consumer in the Kafka client, it will read the value of your ConsumerConfig.AUTO_OFFSET_RESET_CONFIG(auto.offset.reset) configuration. If that config is set to earliest then the consumer will start with the smallest offset available for the topic. In its first request to Kafka, the consumer will say: give me all the messages in this partition with an offset greater than the smallest one available. It will also specify a batch size. The Kafka server will return it all the matching messages in batches of the specified size.

The consumer keeps track of the offset of the last message it has processed, so it will always request messages with an offset higher than the last offset. This setup works when a consumer is functioning normally, but what happens if the consumer crashes, or you want to stop it for maintenance? In this case you would want the consumer to remember the offset of last message processed, so that it can start with the first unprocessed message.

In order to ensure message persistence, Kafka uses two types of offset: The current offset is used to track messages consumed when the consumer is working normally. The committed offset also tracks the last message offset, but it sends that information to the Kafka server for persistent storage.

If the consumer goes down or is taken down for some reason, it can query the Kafka server for the last committed offset and resume message consumption as if no time has been lost. For its part, the Kafka broker stores this information in a topic called __consumer_offsets. This data is replicated to multiple brokers so that the broker won't ever lose the offsets.

Committing offset data

You have a choice about how often to commit offset data. If you commit frequently, you'll take a performance penalty. On the other hand, if the consumer does go down you will have fewer messages to reprocess and consume. Your other option is to commit less frequently (for better performance), but reprocess more messages in case of failure. In either case the consumer has two options for committing the offset:

  1. Auto commits: You can set auto.commit to true and set the auto.commit.interval.ms property with a value in milliseconds. Once you've enabled this, the Kafka consumer will commit the offset of the last message received in response to its poll() call. The poll() call is issued in the background at the set auto.commit.interval.ms.
  2. Manual commits: You can call a commitSync() or commitAsync() method anytime on the KafkaConsumer. When you issue the call, the consumer will take the offset of the last message received during a poll() and commit that to the Kafka server.

Three use cases for manual offsets

Let's consider three use cases where you wouldn't want to use Kafka's default offset management infrastructure. Instead, you'll manually decide what message to to start from.

  1. Start from the beginning: In this use case, you are capturing database changes in Kafka. The first record was the full record; thereafter you only get columns whose value has changed (delta of changes). In this case you always need to read all the messages in a topic from the beginning, in order to construct the full state of the record. To solve a scenario like this, you can configure the consumer to read from the beginning by calling the kafkaConsumer.seekToBeginning(topicPartition) method. Remember that by default Kafka will delete messages more than seven days old, so you need to configure log.retention.hours to a higher value for this use case.
  2. Go to the end: Now let's say you're building a stock recommendation application by analyzing trades in realtime. The worst case happens and your consumer application goes down. In this case, you've used kafkaConsumer.seekToEnd(topicPartition) to configure the offset to ignore messages that are posted during downtime. Instead, the consumer will begin processing trades that are happening from the instant that it restarts.
  3. Start at a given offset: Finally, say that you just released a new version of the producer in your production environment. After watching it produce a few messages, you realize that it is generating bad messages. You fix the producer and start it again. You don't want your consumer to consume those bad messages, so you manually set the offset to the first good message produced, by calling kafkaConsumer.seek(topicPartition, startingOffset).

Manual offsets in the consumer app

The consumer code that we've developed so far auto-commits records every 5 seconds. Now let's update the consumer to take a third argument that manually sets your offset consumption.

If you use the value of the last argument equal to 0, the consumer will assume that you want to start from the beginning, so it will call a kafkaConsumer.seekToBeginning() method for each of its partitions. If you pass a value of -1 it will assume that you want to ignore the existing messages and only consume messages published after the consumer has been restarted. In this case it will call kafkaConsumer.seekToEnd() on each of the partitions. Finally, if you specify any value other than 0 or -1 it will assume that you have specified the offset that you want the consumer to start from; for example, if you pass the third value as 5, then on restart the consumer will consume messages with an offset greater than 5. For this it would call kafkaConsumer.seek(<topicname>, <startingoffset>).

Listing 4. Adding a third argument to the consumer


  private static class ConsumerThread extends Thread{
    private String topicName;
    private String groupId;
    private long startingOffset;
    private KafkaConsumer<String,String> kafkaConsumer;

    public ConsumerThread(String topicName, String groupId, long startingOffset){
        this.topicName = topicName;
        this.groupId = groupId;
        this.startingOffset=startingOffset;
    }
    public void run() {
        Properties configProperties = new Properties();
        configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "offset123");
        configProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        //Figure out where to start processing messages from
        kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
        kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.printf("%s topic-partitions are revoked from this consumern", Arrays.toString(partitions.toArray()));
            }
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.printf("%s topic-partitions are assigned to this consumern", Arrays.toString(partitions.toArray()));
                Iterator<TopicPartition> topicPartitionIterator = partitions.iterator();
                while(topicPartitionIterator.hasNext()){
                    TopicPartition topicPartition = topicPartitionIterator.next();
                    System.out.println("Current offset is " + kafkaConsumer.position(topicPartition) + " committed offset is ->" + kafkaConsumer.committed(topicPartition) );
                    if(startingOffset ==0){
                        System.out.println("Setting offset to beginning");
                        kafkaConsumer.seekToBeginning(topicPartition);
                    }else if(startingOffset == -1){
                        System.out.println("Setting it to the end ");
                        kafkaConsumer.seekToEnd(topicPartition);
                    }else {
                        System.out.println("Resetting offset to " + startingOffset);
                        kafkaConsumer.seek(topicPartition, startingOffset);
                    }
                }
            }
        });
        //Start processing messages
        try {
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                }

            }
        }catch(WakeupException ex){
            System.out.println("Exception caught " + ex.getMessage());
        }finally{
            kafkaConsumer.close();
            System.out.println("After closing KafkaConsumer");
        }
    }
    public KafkaConsumer<String,String> getKafkaConsumer(){
        return this.kafkaConsumer;
    }
}

Once your code is ready you can test it by executing following command:


        java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.offset.Consumer part-demo group1 0
    

The Kafka client should print all the messages from an offset of 0, or you could change the value of the last argument to jump around in the message queue.

Consumer groups in Kafka

Traditional messaging use cases can be divided into two main types: point to point and publish-subscribe. In a point-to-point scenario, one consumer consumes one message. When a message relays a bank transaction, only one consumer should respond by updating the bank account. In a publish-subscribe scenario, multiple consumers will consume a single message but respond differently to it. When a web server goes down, you want the alert to go to consumers programmed to respond in different ways.

Queue refers to a point-to-point scenario, where a message is consumed by only one consumer. Topic refers to a publish-subscribe scenario, where a message is consumed by every consumer. Kafka doesn't define a separate API for the queue and topic use cases; instead, when you start your consumer you need to specify the ConsumerConfig.GROUP_ID_CONFIG property.

If you use the same GROUP_ID_CONFIG for more than one consumer, Kafka will assume that both of them are part of a single group, and it will deliver messages to only one of the consumers. If you start the two consumers in separate group.ids, Kafka will assume that they are not related, so each consumer will get its own copy of the message.

Recall that the partitioned consumer in Listing 3 takes groupId as its second parameter. Now we'll use the groupId parameter to implement both queue and topic use cases for the consumer.

  1. Create a topic named group-test with two partitions:
    
      bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic group-test
    
    
  2. Start a producer that could be used for publishing messages to the group-test topic that you just created:
    
      java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer group-test
    
    
  3. Start three consumers that listen for messages published to the group-test topic. Use group1 for the value of your group id. This will give you three consumers in group1:
    
      java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group1
    
    
  4. Start a fourth consumer, but this time change the value of the group id to group2. This will give you three consumers in group1 and a single consumer in group2:
    
      java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group2
    
    
  5. Return to the producer console and start typing messages. Every new message you publish should appear once in the group2 consumer window and once in one of the three group1 consumer windows, as shown in Figure 3.
Consumer group output

Figure 3. Consumer group output

Conclusion to Part 2

Early use cases for big data message systems called for batch processing, such as running a nightly ETL process or moving data from the RDBMS to a NoSQL datastore at regular intervals. In the past few years the demand for realtime processing has increased, especially for fraud detection and emergency response systems. Kafka was built for just these types of realtime scenarios.

Kafka is a great open source product but it does have some limitations; for instance you can't query data from inside a topic before it reaches its destination, or replicate data across multiple geographically distributed clusters. You could combine MapR Streams (a commercial product) with the Kafka API for these and other more complex publish-subscribe scenarios.

Source: Javaworld.com

LATEST HOW-TO