T
T
TheRoSS2018-12-08 14:58:13
Java
TheRoSS, 2018-12-08 14:58:13

Java + netty + kafka: how to move from multithreading to multiplexing?

Dear experts in multiplexer programming for java (in particular, netty)
I have the following task:
- there is an external apache kafka server (simplified, this is a binary message queue with its own cursor for each connection; that is, for those who are not familiar with kafka, you can consider, for example, the mysql database, this does not change the essence of the issue)
- there is a previously unknown number of external consumers of this data from kafka (we focus on 100+), independent of each other (that is, we cannot use the data flow from one for another)
- you need some module that will accept connections from these consumers, subscribe with a separate connection to each of them in kafka (to create independent cursors), read data from kafka, do something with them, then pass the converted data to consumers and manage the kafka cursor (make a commit) upon receiving feedback from the consumer Head-on
solution:
- Create a listening tcp socket and / or a unix domain socket
- Accept the connection from the consumer
- Create a new thread to work with this consumer
- In this thread, listen and process incoming commands from consumer
- I create another thread to connect to kafka (I use the standard apache driver)
- In this thread, I do poll data packets from kafka, process it and send it to the consumer.
The disadvantage of this approach is a huge number of threads and losses on context switching (I did not evaluate the value). Estimated load - about 5-10k messages per second per consumer.
The task has two features that simplify the solution:
- in the module itself, message processing is very fast
- messages are processed only in batches, and the next batch will be read by the module only after the previous one has been completely processed by the consumer, and a commit has been received from it
That is, ideally, I see the module architecture as several multiplexer threads, each of which processes several dozen consumers. Since I myself came from the world of node.js, this is solved there once or twice. Unfortunately, in this case, using node.js is not possible.
I tried to use netty for these purposes ... But I constantly come across various pitfalls.
In particular, I would like to be able to transfer kafka connections to netty multiplexers without dancing with a tambourine and rewriting the kafka driver (I suspect it will not work)
Or even the simplest, when I try to poll from kafka in the same thread as processing client commands, I get thread blocking:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    switch (messageName) {
            case "init":
                initKafkaConsumer(message);
                consume(ctx);
                break;
            case "commit":
                commit(ctx);
                consume(ctx);
                break;
            default:
                throw new BotlaneException("No handler for message " + messageName);
        }
}

private void consume(ChannelHandlerContext ctx) {
    ConsumerRecords<String, KafkaAvroRawDeserializer.Result> records;
    do {
        records = consumer.poll(Duration.ofMillis(pollTimeout));
    } while (records.isEmpty() && !closed);

    ...
}

Here, for example, after initializing the connection and sending the first batch, neither a commit from the client, nor even a connection break in channelUnregistered will be processed due to poll and the loop. But how to solve it differently, until I came up with
it. How did you solve such problems? If possible, with a code example. I will be very grateful

Answer the question

In order to leave comments, you need to log in

2 answer(s)
S
Sergey Gornostaev, 2018-12-09
@TheRoSS

Didn't work with Kafka, but as far as I know, it's outrageously synchronous. At least in terms of subscription. Two ways come to mind to solve the asynchronous Netty integration problem.
You can run a periodic task in the pipeline initializer or client connection handler that will poll the queue with a zero timeout:

eventLoop.schedule(() -> {
   ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
   // Какие-либо действия
}, 100, TimeUnit.MILLISECONDS);

But this option will bring down a flurry of requests on the Kafka server.
Another option is to make a crutch in the form of an additional queue, to which to send messages that a message has appeared in any of the client queues. Then you can block in one thread waiting for messages from this queue, and when received, raise an event in the Netty event loop:
class MessageListener implements Runnable {
    private final ChannelGroup group;
    private volatile boolean run = true;

    public MessageListener(ChannelGroup group) {
        this.group = group;
    }

    public void run() {
        while(run) {
            ConsumerRecords<String, String> records = notificationConsumer.poll(Duration.ofSecond(5));
            if (!records.isEmpty())
                group.forEach(c -> c.pipeline().fireUserEventTriggered(new NewMsgEvent()));
        }
    }

    public void stop() {
        run = false;
    }
}

class SomeHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof NewMsgEvent) {
            ConsumerRecords<String, String> records = clientConsumer.poll(Duration.ZERO);
            records.forEach(record -> {
                ctx.write(Unpooled.wrappedBuffer(record.value().getBytes(StandardCharsets.UTF_8)));
            });
            ctx.flush();
        }
        else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

You can refine this idea a bit by passing information to the notification queue about which of the client queues a new message appeared in, so that MessageListenerit can send an event to only one necessary pipeline or so that only the necessary handler reacts to the event.

A
AlexHell, 2019-01-05
@AlexHell

I read the answer from Sergey Gornostaev, in principle, I agree with the 2nd option, if I understood it, but I will add how I would do it:
- someone sends a message to the netty "write" server
- netty adds to the InMemory queue for kafka (instantly)
- netty notifies the rest of those interested (if the client is already connected, otherwise it will receive all packets at the time of the connection) TCP packet "notify" (instantly) so that later the client sends "readAll" if the data does not want to be sent right away, or a packet with all the data is sent to him immediately intended to him
- (if the client has not yet been connected) the interested client connects and netty gives him all his messages (does not commit yet) from InMemory
- the client sends "commit" - netty fixes in InMemory which message the client has read, and sends it to the queue also in kafka
, in fact, one extra queue is obtained, where KafkaTask
and the current state of InMemory are stored (apparently with deletion when the client has definitely read)
in this queue from InMemory in a separate thread or even a pool, as in the case of a database - is there a record in kafka really for persistence,
I correctly understood the goals?
if it were not for persistence, then it would be possible to do without kafka even just the client sends "write" and all other clients are sent "data" if they are already online or added to InMemory and only at the moment the client connects is given
when persistence is needed, only an extra queue is added so that it is eventually saved into it, the rest is the same, and when the server is restarted, the data from kafka (or the database) would be restored to InMemory (if there are not a lot of them directly, otherwise it can be a delayed task in the stream friend)

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question