这篇文章主要介绍了Disruptor、Kafka、Netty如何整合,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
创新互联主营凤泉网站建设的网络公司,主营网站建设方案,成都app开发,凤泉h5小程序设计搭建,凤泉网站营销推广欢迎凤泉等地区企业咨询
整个网关的核心是一个netty server,各个应用程序(包括web server,手机app等)连到这个netty server上请求数据;关于数据来源,需要监听多个kafka topic(而且这里的topic是可变的,也就是说需要kafka consumer的动态开始和停止),之后需要把所有这些topic的数据整合在一起,通过channel发送给客户端应用程序。
下面把大部分的代码贴出来,有需要的同学可以参考。会对关键的技术点进行说明,偏业务部分大家自行忽略吧。
启动disruptor;监听一个固定的topic,把获取到的msg,交给ConsumerProcessorGroup来完成kafka consumer的创建和停止。
public static void main(String[] args) {
DisruptorHelper.getInstance().start();
Properties props = ConsumerProps.getConsumerProps();
KafkaConsumer
consumer.subscribe(Arrays.asList("uavlst"));
while (true) {
ConsumerRecords
ConsumerRecord
for (ConsumerRecord
lastRecord = record;
if (lastRecord != null){
ConsumerProcessorGroup.getInstance().recieveNewUavLst(lastRecord.value());
}
}
}
DisruptorHelper是一个单例,主要是包含了一个disruptor 对象,在new这个对象的时候,用到了ProducerType.MULTI和new BlockingWaitStrategy(),其中前者意味着我们需要多个producer共同来工作,后者其实是默认的producer的等待策略,后续根据实际情况进行调整。
public class DisruptorHelper {
private static DisruptorHelper instance = null;
public static DisruptorHelper getInstance() {
if (instance == null) {
instance = new DisruptorHelper();
}
return instance;
}
private final int BUFFER_SIZE = 1024;
private Disruptor
private DisruptorHelper() {
MsgEventHandler eventHandler = new MsgEventHandler();
disruptor = new Disruptor(new MsgEventFactory(), BUFFER_SIZE, new ConsumerThreadFactory(), ProducerType.MULTI, new BlockingWaitStrategy());
disruptor.handleEventsWith(eventHandler);
}
public void start() {
disruptor.start();
}
public void shutdown() {
disruptor.shutdown();
}
public void produce(ConsumerRecord
RingBuffer
long sequence = ringBuffer.next();
try {
ringBuffer.get(sequence).setRecord(record);
} finally {
ringBuffer.publish(sequence);
}
}
}
ConsumerProcessorGroup是一个单例,当中包含一个fixedThreadPool,动态的启动线程来进行kafka topic的消费。
public class ConsumerProcessorGroup {
private static ConsumerProcessorGroup instance = null;
public static ConsumerProcessorGroup getInstance(){
if (instance == null){
instance = new ConsumerProcessorGroup();
}
return instance;
}
private ConsumerProcessorGroup() {
}
private ExecutorService fixedThreadPool = Executors.newFixedThreadPool(20);
public List
public void recieveNewUavLst(String uavIDs){
List
for (String uavID : newUavIDs){
if (!uavIDLst.contains(uavID)){
fixedThreadPool.execute(new ConsumerThread(uavID));
uavIDLst.add(uavID);
}
}
List
for (String uavID : uavIDLst){
if (!newUavIDs.contains(uavID)){
tmpLstForDel.add(uavID);
}
}
uavIDLst.removeAll(tmpLstForDel);
}
}
对kafka topic进行消费,通过DisruptorHelper将获取的record写入disruptor的ring buffer当中。
public class ConsumerThread implements Runnable {
private String uavID;
public ConsumerThread(String uavID) {
this.uavID = uavID;
}
public void run() {
Properties props = ConsumerProps.getConsumerProps();
KafkaConsumer
consumer.subscribe(Arrays.asList(uavID));
System.out.println(uavID + " consumer started! Current thread id is " + Thread.currentThread().getId());
while (ConsumerProcessorGroup.getInstance().uavIDLst.contains(uavID)) {
ConsumerRecords
for (ConsumerRecord
DisruptorHelper.getInstance().produce(record);
}
}
System.out.println(uavID + " consumer finished! Current thread id is " + Thread.currentThread().getId());
}
}
Disruptor的消费者,依次从Ring Buffer当中读取数据并执行相应的处理。
public class MsgEventHandler implements EventHandler
private Map
public void onEvent(MsgEvent event, long sequence, boolean endOfBatch) throws Exception {
ConsumerRecord
System.out.printf("topic = %s, part = %d, offset = %d, key = %s, value = %s \n\r", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
感谢你能够认真阅读完这篇文章,希望小编分享的“Disruptor、Kafka、Netty如何整合”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!