1. 首页
  2. kafka实践

6. kafka拦截器

  • 拦截器定义
    拦截器参数命名为:interceptor.classes。官方文档解析如下:

A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.

即拦截发送到kafka服务器之前的消息,且在序列化&反序列化之前调用,序列化&反序列化又在分区策略之前调用,这个调用顺序在[kafka生产者&消费者]已经分析过了。拦截器允许修改key和value,同时拦截器可以指定多个,多个拦截器形成拦截器链,且有先后顺序,假定按照如下方式配置2个拦截器,那么会先调用AProducerInterceptor,再调用BProducerInterceptor,且调用BProducerInterceptor时的ProducerRecord是经过AProducerInterceptor修改过的ProducerRecord(如果在AProducerInterceptor中修改过ProducerRecord的话):

  props.put("interceptor.classes", "com.afei.kafka.interceptor.AProducerInterceptor,com.afei.kafka.interceptor.BProducerInterceptor");

  • ProducerInterceptor接口定义源码解读
  public interface ProducerInterceptor<K, V> extends Configurable {
        /**
         * This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
         * {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
         * get serialized and partition is assigned (if partition is not specified in ProducerRecord).
         * 即这个方法调用在KafkaProducer.send()之后,但是在key&value序列化之前,以及分配分区之前。
         * <p>
         * This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
         * key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
         * 这个方法中允许修改ProducerRecord即发送的消息,接下来的分区分配,根据修改后的key(如果会在onSend()中修改key的话)进行计算.
         */
        public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

        /**
         * 不管消息发送成功还是发送过程抛出异常,这个方法都会执行
         */
        public void onAcknowledgement(RecordMetadata metadata, Exception exception);

        /**
         * This is called when interceptor is closed
         */
        public void close();
    }

  • 自定义拦截器实现
  /**
     * @author afei
     * @version 1.0.0
     * @since 2018年06月27日
     */
    public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

        /**
         * 统计发送成功数
         */
        private static AtomicLong sendSuccess = new AtomicLong(0);
        /**
         * 统计发送失败数
         */
        private static AtomicLong sendFailure = new AtomicLong(0);

        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {

            this.outputSendStat();

            // 改写ProducerRecord, 将key置为null, 分区全部交给kafka去决定
            return new ProducerRecord<>(record.topic(),
                    record.partition(), record.timestamp(), null, record.value(),
                    record.headers());
        }

        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            // 如果没有异常表示发送成功, 那么发送成功数+1, 否则发送失败数+1
            if (exception!=null){
                sendFailure.getAndIncrement();
            }else{
                sendSuccess.getAndIncrement();
            }
        }

        /**
         * 打印出发送的成功&失败次数的统计信息
         */
        private void outputSendStat(){
            long successCount = sendSuccess.get();
            long  failedCount = sendFailure.get();
            System.out.println("success count: "+successCount+", failed count:"+failedCount);
        }

        @Override
        public void close() {
            this.outputSendStat();
        }

        @Override
        public void configure(Map<String, ?> configs) {

        }
    }

作者:阿飞的博客

来源:https://www.jianshu.com/p/771eae1be734


看完两件小事

如果你觉得这篇文章对你挺有启发,我想请你帮我两个小忙:

  1. 关注我们的 GitHub 博客,让我们成为长期关系
  2. 把这篇文章分享给你的朋友 / 交流群,让更多的人看到,一起进步,一起成长!
  3. 关注公众号 「方志朋」,公众号后台回复「666」 免费领取我精心整理的进阶资源教程
  4. JS中文网,Javascriptc中文网是中国领先的新一代开发者社区和专业的技术媒体,一个帮助开发者成长的社区,是给开发者用的 Hacker News,技术文章由为你筛选出最优质的干货,其中包括:Android、iOS、前端、后端等方面的内容。目前已经覆盖和服务了超过 300 万开发者,你每天都可以在这里找到技术世界的头条内容。

    本文著作权归作者所有,如若转载,请注明出处

    转载请注明:文章转载自「 Java极客技术学习 」https://www.javajike.com

    标题:6. kafka拦截器

    链接:https://www.javajike.com/article/1790.html

« 7. kafka序列化&反序列化
5. ActiveMQ平滑迁移到kafka»

相关推荐

QR code