打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Kafka详解五、Kafka Consumer的底层API
Kafka提供了两套API给Consumer
  1. The high-level Consumer API
  2. The SimpleConsumer API     

第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么先介绍下第二种API能够帮助我们做哪些事情

  • 一个消息读取多次
  • 在一个处理过程中只消费Partition其中的一部分消息
  • 添加事务管理机制以保证消息被处理且仅被处理一次

使用SimpleConsumer有哪些弊端呢?

  • 必须在程序中跟踪offset值
  • 必须找出指定Topic Partition中的lead broker
  • 必须处理broker的变动

使用SimpleConsumer的步骤

  1. 从所有活跃的broker中找出哪个是指定Topic Partition中的leader broker
  2. 找出指定Topic Partition中的所有备份broker
  3. 构造请求
  4. 发送请求查询数据
  5. 处理leader broker变更
代码实例:
package bonree.consumer;import java.nio.ByteBuffer;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;import kafka.api.FetchRequest;import kafka.api.FetchRequestBuilder;import kafka.api.PartitionOffsetRequestInfo;import kafka.common.ErrorMapping;import kafka.common.TopicAndPartition;import kafka.javaapi.FetchResponse;import kafka.javaapi.OffsetResponse;import kafka.javaapi.PartitionMetadata;import kafka.javaapi.TopicMetadata;import kafka.javaapi.TopicMetadataRequest;import kafka.javaapi.consumer.SimpleConsumer;import kafka.message.MessageAndOffset;public class SimpleExample {  private List<String> m_replicaBrokers = new ArrayList<String>();  public SimpleExample() {    m_replicaBrokers = new ArrayList<String>();  }  public static void main(String args[]) {    SimpleExample example = new SimpleExample();    // 最大读取消息数量    long maxReads = Long.parseLong("3");    // 要订阅的topic    String topic = "mytopic";    // 要查找的分区    int partition = Integer.parseInt("0");    // broker节点的ip    List<String> seeds = new ArrayList<String>();    seeds.add("192.168.4.30");    seeds.add("192.168.4.31");    seeds.add("192.168.4.32");    // 端口    int port = Integer.parseInt("9092");    try {      example.run(maxReads, topic, partition, seeds, port);    } catch (Exception e) {      System.out.println("Oops:" + e);      e.printStackTrace();    }  }  public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {    // 获取指定Topic partition的元数据    PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);    if (metadata == null) {      System.out.println("Can't find metadata for Topic and Partition. Exiting");      return;    }    if (metadata.leader() == null) {      System.out.println("Can't find Leader for Topic and Partition. Exiting");      return;    }    String leadBroker = metadata.leader().host();    String clientName = "Client_" + a_topic + "_" + a_partition;    SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);    long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);    int numErrors = 0;    while (a_maxReads > 0) {      if (consumer == null) {        consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);      }      FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();      FetchResponse fetchResponse = consumer.fetch(req);      if (fetchResponse.hasError()) {        numErrors++;        // Something went wrong!        short code = fetchResponse.errorCode(a_topic, a_partition);        System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);        if (numErrors > 5)          break;        if (code == ErrorMapping.OffsetOutOfRangeCode()) {          // We asked for an invalid offset. For simple case ask for          // the last element to reset          readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);          continue;        }        consumer.close();        consumer = null;        leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);        continue;      }      numErrors = 0;      long numRead = 0;      for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {        long currentOffset = messageAndOffset.offset();        if (currentOffset < readOffset) {          System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);          continue;        }        readOffset = messageAndOffset.nextOffset();        ByteBuffer payload = messageAndOffset.message().payload();        byte[] bytes = new byte[payload.limit()];        payload.get(bytes);        System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));        numRead++;        a_maxReads--;      }      if (numRead == 0) {        try {          Thread.sleep(1000);        } catch (InterruptedException ie) {        }      }    }    if (consumer != null)      consumer.close();  }  public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);    OffsetResponse response = consumer.getOffsetsBefore(request);    if (response.hasError()) {      System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));      return 0;    }    long[] offsets = response.offsets(topic, partition);    return offsets[0];  }  /**   * @param a_oldLeader   * @param a_topic   * @param a_partition   * @param a_port   * @return String   * @throws Exception   *             找一个leader broker   */  private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {    for (int i = 0; i < 3; i++) {      boolean goToSleep = false;      PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);      if (metadata == null) {        goToSleep = true;      } else if (metadata.leader() == null) {        goToSleep = true;      } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {        // first time through if the leader hasn't changed give        // ZooKeeper a second to recover        // second time, assume the broker did recover before failover,        // or it was a non-Broker issue        //        goToSleep = true;      } else {        return metadata.leader().host();      }      if (goToSleep) {        try {          Thread.sleep(1000);        } catch (InterruptedException ie) {        }      }    }    System.out.println("Unable to find new leader after Broker failure. Exiting");    throw new Exception("Unable to find new leader after Broker failure. Exiting");  }  private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {    PartitionMetadata returnMetaData = null;    loop: for (String seed : a_seedBrokers) {      SimpleConsumer consumer = null;      try {        consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");        List<String> topics = Collections.singletonList(a_topic);        TopicMetadataRequest req = new TopicMetadataRequest(topics);        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);        List<TopicMetadata> metaData = resp.topicsMetadata();        for (TopicMetadata item : metaData) {          for (PartitionMetadata part : item.partitionsMetadata()) {            if (part.partitionId() == a_partition) {              returnMetaData = part;              break loop;            }          }        }      } catch (Exception e) {        System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);      } finally {        if (consumer != null)          consumer.close();      }    }    if (returnMetaData != null) {      m_replicaBrokers.clear();      for (kafka.cluster.Broker replica : returnMetaData.replicas()) {        m_replicaBrokers.add(replica.host());      }    }    return returnMetaData;  }}
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
漫游Kafka实战篇之客户端编程实例
Kafka基本原理
Kafka学习整理七(producer和consumer编程实践)
Kafka基本原理和java简单使用教程
kafka学习笔记:知识点整理
Kafka的安装与使用(转)
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服