一、disruptor基本概念
二、disruptor入门程序
导入disruptor包
com.lmax disruptor 3.3.2
实现disruptor的四部操作
1.建立一个工厂Event类,用于创建Event类实例对象
public class OrderEvent { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; }}
public class OrderEventFactory implements EventFactory{ @Override public OrderEvent newInstance() { return new OrderEvent(); //这个方法就是为了返回空的数据对象Event }}
2.创建事件监听类,用于处理数据
import com.lmax.disruptor.EventHandler;public class OrderEventHandler implements EventHandler{ @Override public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception { System.out.println("消费者:"+orderEvent.getValue()); }}
3.实例化Disruptor实例,配置一系列的参数,编写Disruptory核心组件
public static void main(String[] args) { OrderEventFactory orderEventFactory = new OrderEventFactory(); int ringBufferSize = 1024 * 1024;//指定容器的大小 ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //指定线程,建议使用自定义线程池 /** * 1.实例化disruptor对象 * 参数一:orderEventFactory 消息(event)工厂对象 * 参数二:ringBufferSize 容器的长度 * 参数三:线程池(建议使用自定义线程池)RejectedExecutionHandler * 参数四:ProducerType 单生产者还是多生产者 * 参数五:waitStrategy 等待策略 */ Disruptordisruptor = new Disruptor (orderEventFactory, ringBufferSize, executorService, ProducerType.SINGLE, new BlockingWaitStrategy()); //2.添加消费者的监听 disruptor.handleEventsWith(new OrderEventHandler()); //3.启动disruptor disruptor.start(); //4.获取实际存储数据的容器: RingBuffer RingBuffer ringBuffer = disruptor.getRingBuffer(); OrderEventProducer producer = new OrderEventProducer(ringBuffer); ByteBuffer byteBuffer = ByteBuffer.allocate(8); for (long i = 0; i < 100; i++) { byteBuffer.putLong(0, i); producer.sendData(byteBuffer); } disruptor.shutdown(); executorService.shutdown(); }
4.编写生产者组件,向Disruptor容器中去投递数据 (此步骤对应上述main方法中的4)
import com.lmax.disruptor.RingBuffer;import java.nio.ByteBuffer;public class OrderEventProducer { private RingBufferringBuffer; public OrderEventProducer(RingBuffer ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(ByteBuffer data) { //1 在生产者发送消息的时候,首先需要从 ringBuffer 中获取一个可用的序号 long sequence = ringBuffer.next(); try { //2 根据这个序号找到具体的 OrderEvent 元素,草地上获取的OrderEvent对象是一个没有被赋值的空对象 OrderEvent orderEvent = ringBuffer.get(sequence); //3 进行时间赋值处理 orderEvent.setValue(data.getLong(0)); } finally { //4 提交操作 ringBuffer.publish(sequence); } }}