ConsumerConfig.scala
儲存Consumer的配置
按照我的理解,0.10的Kafka沒有專門的SimpleConsumer,仍然是沿用0.8版本的。
1.從poll開始
消費的規(guī)則如下:
一個partition只能被同一個ConsumersGroup的一個線程所消費.
線程數(shù)小于partition數(shù),某些線程會消費多個partition.
線程數(shù)等于partition數(shù),一個線程正好消費一個線程.
當(dāng)添加消費者線程時,會觸發(fā)rebalance,partition的分配發(fā)送變化.
同一個partition的offset保證消費有序,不同的partition消費不保證順序.
Consumers編程的用法:
private final KafkaConsumer<Long, String> consumer; // 與Kafka進(jìn)行通信的consumer... consumer = new KafkaConsumer<Long, String>(props); consumer.subscribe(Collections.singletonList(this.topic)); ConsumerRecords<Long, String> records = consumer.poll(512); ...
consumer,是一個純粹的單線程程序,后面所講的所有機(jī)制(包括coordinator,rebalance, heartbeat等),都是在這個單線程的poll函數(shù)里面完成的。也因此,在consumer的代碼內(nèi)部,沒有鎖的出現(xiàn)。