What is the LMAX Disruptor Design Pattern?

Can someone tell me what a Disruptor design template is with a simple example? I want to know the basics of this design pattern.

+5
source share
3 answers

Simple Google gives me a lot of information, including this introduction from Martin Fowler

At a rough level, you can think of Disruptor as a multicast schedule where manufacturers place objects on it that are sent to all consumers for parallel consumption through separate queues in turn. When you look inside, you see that this queue network is really a single data structure - a ring buffer. Each producer and consumer has a sequence counter indicating which slot in the buffer he is currently working on. Each producer / consumer writes its own sequence counter, but can read others' sequence counters. In this way, a manufacturer can read consumer counters to ensure that the slot that he wants to write is accessible without blocking the counters. Similarly, a consumer can provideso that it processes messages only after another consumer has it, watching the counters.

GitHub Java + doc.

+4

:

- , (.. ), , .

Fortunately, you do not need to understand the internal details of the disruptor template in order to use it. If it’s easier for you to understand the code, here is the Hello World CoralQueue , the lowest latency interworking queue that implements the destroyer pattern.

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.Builder;

public class Basics {

    public static void main(String[] args) {

        final Queue<StringBuilder> queue = new AtomicQueue<StringBuilder>(1024, new Builder<StringBuilder>() {
            @Override
            public StringBuilder newInstance() {
                return new StringBuilder(1024);
            }
        });

        Thread producer = new Thread(new Runnable() {

            private final StringBuilder getStringBuilder() {
                StringBuilder sb;
                while((sb = queue.nextToDispatch()) == null) {
                    // queue can be full if the size of the queue
                    // is small and/or the consumer is too slow

                    // busy spin (you can also use a wait strategy instead)
                }
                return sb;
            }

            @Override
            public void run() {

                StringBuilder sb;

                while(true) { // the main loop of the thread

                    // (...) do whatever you have to do here...

                    // and whenever you want to send a message to
                    // the other thread you can just do:
                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hello!");
                    queue.flush();

                    // you can also send in batches to increase throughput:
                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hi!");

                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hi again!");

                    queue.flush(); // dispatch the two messages above...
                }
            }
        }, "Producer");

        Thread consumer = new Thread(new Runnable() {

            @Override
            public void run() {

                while (true) { // the main loop of the thread

                    // (...) do whatever you have to do here...

                    // and whenever you want to check if the producer
                    // has sent a message you just do:

                    long avail;
                    while((avail = queue.availableToPoll()) == 0) {
                        // queue can be empty!
                        // busy spin (you can also use a wait strategy instead)
                    }

                    for(int i = 0; i < avail; i++) {
                        StringBuilder sb = queue.poll();
                        // (...) do whatever you want to do with the data
                        // just don't call toString() to create garbage...
                        // copy byte-by-byte instead...
                    }
                    queue.donePolling();
                }
            }
        }, "Consumer");

        consumer.start();
        producer.start();
    }
}
+2
source

All Articles