Answer the question
In order to leave comments, you need to log in
Java + netty + kafka: how to move from multithreading to multiplexing?
Dear experts in multiplexer programming for java (in particular, netty)
I have the following task:
- there is an external apache kafka server (simplified, this is a binary message queue with its own cursor for each connection; that is, for those who are not familiar with kafka, you can consider, for example, the mysql database, this does not change the essence of the issue)
- there is a previously unknown number of external consumers of this data from kafka (we focus on 100+), independent of each other (that is, we cannot use the data flow from one for another)
- you need some module that will accept connections from these consumers, subscribe with a separate connection to each of them in kafka (to create independent cursors), read data from kafka, do something with them, then pass the converted data to consumers and manage the kafka cursor (make a commit) upon receiving feedback from the consumer Head-on
solution:
- Create a listening tcp socket and / or a unix domain socket
- Accept the connection from the consumer
- Create a new thread to work with this consumer
- In this thread, listen and process incoming commands from consumer
- I create another thread to connect to kafka (I use the standard apache driver)
- In this thread, I do poll data packets from kafka, process it and send it to the consumer.
The disadvantage of this approach is a huge number of threads and losses on context switching (I did not evaluate the value). Estimated load - about 5-10k messages per second per consumer.
The task has two features that simplify the solution:
- in the module itself, message processing is very fast
- messages are processed only in batches, and the next batch will be read by the module only after the previous one has been completely processed by the consumer, and a commit has been received from it
That is, ideally, I see the module architecture as several multiplexer threads, each of which processes several dozen consumers. Since I myself came from the world of node.js, this is solved there once or twice. Unfortunately, in this case, using node.js is not possible.
I tried to use netty for these purposes ... But I constantly come across various pitfalls.
In particular, I would like to be able to transfer kafka connections to netty multiplexers without dancing with a tambourine and rewriting the kafka driver (I suspect it will not work)
Or even the simplest, when I try to poll from kafka in the same thread as processing client commands, I get thread blocking:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
switch (messageName) {
case "init":
initKafkaConsumer(message);
consume(ctx);
break;
case "commit":
commit(ctx);
consume(ctx);
break;
default:
throw new BotlaneException("No handler for message " + messageName);
}
}
private void consume(ChannelHandlerContext ctx) {
ConsumerRecords<String, KafkaAvroRawDeserializer.Result> records;
do {
records = consumer.poll(Duration.ofMillis(pollTimeout));
} while (records.isEmpty() && !closed);
...
}
Answer the question
In order to leave comments, you need to log in
Didn't work with Kafka, but as far as I know, it's outrageously synchronous. At least in terms of subscription. Two ways come to mind to solve the asynchronous Netty integration problem.
You can run a periodic task in the pipeline initializer or client connection handler that will poll the queue with a zero timeout:
eventLoop.schedule(() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
// Какие-либо действия
}, 100, TimeUnit.MILLISECONDS);
class MessageListener implements Runnable {
private final ChannelGroup group;
private volatile boolean run = true;
public MessageListener(ChannelGroup group) {
this.group = group;
}
public void run() {
while(run) {
ConsumerRecords<String, String> records = notificationConsumer.poll(Duration.ofSecond(5));
if (!records.isEmpty())
group.forEach(c -> c.pipeline().fireUserEventTriggered(new NewMsgEvent()));
}
}
public void stop() {
run = false;
}
}
class SomeHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof NewMsgEvent) {
ConsumerRecords<String, String> records = clientConsumer.poll(Duration.ZERO);
records.forEach(record -> {
ctx.write(Unpooled.wrappedBuffer(record.value().getBytes(StandardCharsets.UTF_8)));
});
ctx.flush();
}
else {
super.userEventTriggered(ctx, evt);
}
}
}
MessageListener
it can send an event to only one necessary pipeline or so that only the necessary handler reacts to the event.
I read the answer from Sergey Gornostaev, in principle, I agree with the 2nd option, if I understood it, but I will add how I would do it:
- someone sends a message to the netty "write" server
- netty adds to the InMemory queue for kafka (instantly)
- netty notifies the rest of those interested (if the client is already connected, otherwise it will receive all packets at the time of the connection) TCP packet "notify" (instantly) so that later the client sends "readAll" if the data does not want to be sent right away, or a packet with all the data is sent to him immediately intended to him
- (if the client has not yet been connected) the interested client connects and netty gives him all his messages (does not commit yet) from InMemory
- the client sends "commit" - netty fixes in InMemory which message the client has read, and sends it to the queue also in kafka
, in fact, one extra queue is obtained, where KafkaTask
and the current state of InMemory are stored (apparently with deletion when the client has definitely read)
in this queue from InMemory in a separate thread or even a pool, as in the case of a database - is there a record in kafka really for persistence,
I correctly understood the goals?
if it were not for persistence, then it would be possible to do without kafka even just the client sends "write" and all other clients are sent "data" if they are already online or added to InMemory and only at the moment the client connects is given
when persistence is needed, only an extra queue is added so that it is eventually saved into it, the rest is the same, and when the server is restarted, the data from kafka (or the database) would be restored to InMemory (if there are not a lot of them directly, otherwise it can be a delayed task in the stream friend)
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question