Apache Kafka - 简单的生产者示例

让我们创建一个使用 Java 客户端发布和消费消息的应用程序。 Kafka 生产者客户端由以下 API 组成。


KafkaProducer API

让我们了解本节中最重要的一组 Kafka 生产者 API。 KafkaProducer API 的核心部分是 KafkaProducer 类。 KafkaProducer 类提供了一个选项,可以在其构造函数中使用以下方法连接 Kafka 代理。

  • KafkaProducer 类提供了 send 方法来将消息异步发送到主题。 send() 的签名如下

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord − 生产者管理等待发送的记录缓冲区。

  • Callback − 当记录已被服务器确认时执行的用户提供的回调(null 表示没有回调)。

  • KafkaProducer 类提供了一个 flush 方法来确保所有之前发送的消息都已经真正完成。 flush方法的语法如下 −

public void flush()
  • KafkaProducer 类提供 partitionFor 方法,该方法有助于获取给定主题的分区元数据。 这可用于自定义分区。 该方法的签名如下 −

public Map metrics()

它返回生产者维护的内部指标映射。

  • public void close() − KafkaProducer 类提供 close 方法块,直到完成所有先前发送的请求。


Producer API

Producer API 的核心部分是 Producer 类。 Producer 类提供了一个选项,通过以下方法在其构造函数中连接 Kafka 代理。

Producer 生产者类

Producer 生产者类使用以下签名提供发送方法来发送消息到单个或多个主题。


public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

有两种类型的生产者 - SyncAsync

相同的 API 配置也适用于 Sync 生产者。 它们之间的区别是同步生产者直接发送消息,但在后台发送消息。 当您需要更高的吞吐量时,首选异步生产者。 在 0.8 等以前的版本中,异步生产者没有用于 send() 的回调来注册错误处理程序。 这仅在当前版本的 0.9 中可用。

public void close()

Producer 类提供 close 方法来关闭与所有 Kafka 代理的生产者池连接。


配置设置

Producer API 的主要配置设置如下表所示,便于理解 −

S.No 配置设置和说明
1

client.id

标识生产者应用程序

2

producer.type

同步或异步

3

acks

acks 配置控制生产者请求下的标准被认为是完整的。

4

retries

如果生产者请求失败,则自动使用特定值重试。

5

bootstrap.servers

经纪人的引导列表。

6

linger.ms

如果您想减少请求的数量,您可以将 linger.ms 设置为大于某个值的值。

7

key.serializer

序列化器接口的键。

8

value.serializer

串行器接口的值。

9

batch.size

缓冲区大小。

10

buffer.memory

控制生产者可用于缓冲的内存总量。

ProducerRecord API

ProducerRecord 是发送到 Kafka 集群的键/值对。ProducerRecord 类构造函数用于使用以下签名创建具有分区、键和值对的记录。

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic − 用户定义的主题名称,将附加到记录中。

  • Partition − 分区数

  • Key − 将包含在记录中的键。

  • Value − 记录内容
public ProducerRecord (string topic, k key, v value)

ProducerRecord 类构造函数用于创建具有键值对且没有分区的记录。

  • Topic − 创建主题以分配记录。

  • Key − 记录的关键。

  • Value − 记录内容。

public ProducerRecord (string topic, v value)

ProducerRecord 类创建没有分区和键的记录。

  • Topic − 创建一个主题。

  • Value − 记录内容。

ProducerRecord 类方法如下表所示 −

S.No 类方法和描述
1

public string topic()

主题将附加到记录中。

2

public K key()

将包含在记录中的键。 如果没有这样的键,这里会返回null。

3

public V value()

记录内容。

4

partition()

记录的分区计数


SimpleProducer 应用程序

在创建应用程序之前,首先启动 ZooKeeper 和 Kafka 代理,然后使用 create topic 命令在 Kafka 代理中创建您自己的主题。 之后创建一个名为 Sim-pleProducer.java 的 java 类并输入以下代码。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation − 可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution − 可以使用以下命令执行应用程序。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

输出

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

简单的消费者示例

到目前为止,我们已经创建了一个生产者来向 Kafka 集群发送消息。 现在让我们创建一个消费者来消费来自 Kafka 集群的消息。 KafkaConsumer API 用于消费来自 Kafka 集群的消息。 KafkaConsumer 类构造函数定义如下。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs − 返回消费者配置图。

KafkaConsumer 类具有下表列出的以下重要方法。

S.No 方法及说明
1

public java.util.Set<TopicPar-tition> assignment()

获取消费者当前分配的分区集。

2

public string subscription()

订阅给定的主题列表以获取动态分配的分区。

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

订阅给定的主题列表以获取动态分配的分区。

4

public void unsubscribe()

从给定的分区列表中取消订阅主题。

5

public void sub-scribe(java.util.List<java.lang.String> topics)

订阅给定的主题列表以获取动态分配的分区。 如果给定的主题列表为空,则将其视为与 unsubscribe() 相同。

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

参数模式是指正则表达式格式的订阅模式,监听器参数从订阅模式中获取通知。

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

手动将分区列表分配给客户。

8

poll()

获取使用订阅/分配 API 之一指定的主题或分区的数据。 如果在轮询数据之前没有订阅主题,这将返回错误。

9

public void commitSync()

提交在最后一次 poll() 上返回的所有订阅的主题和分区列表的偏移量。 相同的操作适用于 commitAsyn()。

10

public void seek(TopicPartition partition, long offset)

获取消费者将在下一个 poll() 方法中使用的当前偏移值。

11

public void resume()

恢复暂停的分区。

12

public void wakeup()

唤醒消费者。


ConsumerRecord API

ConsumerRecord API 用于接收来自 Kafka 集群的记录。 该 API 由主题名称、分区号(从中接收记录)和指向 Kafka 分区中记录的偏移量组成。 ConsumerRecord 类用于创建具有特定主题名称、分区计数和 <key, value> 对的消费者记录。 它具有以下签名。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic − 从 Kafka 集群接收的消费者记录的主题名称。

  • Partition − 主题的分区。

  • Key − 记录的键,如果不存在键,则返回null。

  • Value − 记录内容。


ConsumerRecords API

ConsumerRecords API 充当 ConsumerRecord 的容器。 此 API 用于保存特定主题的每个分区的 ConsumerRecord 列表。 它的构造函数定义如下。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition − 返回特定主题的分区映射。

  • Records − ConsumerRecord 的返回列表。

ConsumerRecords 类定义了以下方法。

S.No 方法和描述
1

public int count()

所有主题的记录数。

2

public Set partitions()

此记录集中具有数据的分区集(如果没有返回数据,则该集为空)。

3

public Iterator iterator()

迭代器使您能够循环通过集合、获取或重新移动元素。

4

public List records()

获取给定分区的记录列表。


配置设置

Consumer 客户端 API 主要配置设置的配置设置如下所示 −

S.No 设置和说明
1

bootstrap.servers

Bootstrapping list of brokers.

2

group.id

将单个使用者分配给组。

3

enable.auto.commit

如果值为 true,则启用偏移量的自动提交,否则不提交。

4

auto.commit.interval.ms

返回将更新的消耗偏移量写入 ZooKeeper 的频率。

5

session.timeout.ms

表示 Kafka 在放弃并继续消费消息之前将等待 ZooKeeper 响应请求(读取或写入)的毫秒数。


简单消费者应用

生产者申请步骤在这里保持不变。 首先,启动 ZooKeeper 和 Kafka 代理。 然后使用名为 SimpleCon-sumer.java 的 java 类创建一个 SimpleConsumer 应用程序并键入以下代码。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

编译 − 可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行 − 应用程序可以使用以下命令执行

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

输入 − 打开生产者 CLI 并向主题发送一些消息。 您可以将简单输入作为"Hello Consumer"。

输出 − 以下将是输出。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer