S
S
Stanislav Markin2016-04-14 04:39:54
Java
Stanislav Markin, 2016-04-14 04:39:54

How to manage threads in Java when working with chains of asynchronous tasks?

For the last six months at work, I have been making a service in which I am trying to achieve the highest throughput on a bunch of java + netty + mysql + redis.
For me, it is built according to an asynchronous model and, in general, it does something like this:
it receives a request via http (or takes it from AMQP),
then polls the radish,
then takes data from the database,
based on this data it decides which service to send processing to
(and services third-party ones, you need to rely on the worst scenarios of their work),
then it can get into the database again,
then some other things along the way
and gives an http response (or lays it in another AMQP queue).
The load on the CPU is small. The time of one request consists of 1% of the CPU time (intermediate processing), 4% of the response from the base and 95% of the response time of the third-party service.
Very simplified code can be written like this:

void onRequest(Context ctx) {
    // здесь тред netty-worker
    Request request = parseRequest(ctx);

    sqlExecutor.submit(request)
        .thenCompose(model -> {
            // здесь тред sqlExecutor'а, т.к. именно в том тредпуле происходит комплит фьючи
            val something = doSomething(model, ctx);
            switch(pickService(something)) {
                case SERVICE_1: return service1.handleSomething(model);
                ..
                case SERVICE_N: return serviceN.handleSomething(model);
            }
        })

        .thenApply(serviceResponse -> {
            // здесь тред из serviceX
            val somethingElse = doSomethingElse(serviceResponse);
            return toHttpResponse(somethingElse);
        })

        .thenAccept(httpResponse -> {
            // здесь тред из serviceX
            ctx.getResponse().send(httpResponse);
        })

        .exceptionally(e -> {
            // здесь может оказаться netty-worker, sqlExecutor или тред из serviceX
            log.warn(e);
            ctx.getResponse().send(500, "failed");
        });
}

There are some assumptions, but in general, something like this happens:
The netty-worker thread enters the onRequest() method. In this thread, some primary processing is performed (parseRequest),
and then the request is submitted to sqlExecutor and the subsequent chain of calls is built.
Then, after some time, the model -> {} function passed to thenCompose will be called.
This work will be done by a thread from sqlExecutor, because it is in the thread pool of sqlExecutor that the fuchi completes.
In other words, a thread that processes sql queries will then call the doSomething(model, ctx), pickService(something) methods and put some task in serviceX.handleSomething(model).
After that, the thread from serviceX will take this task, calculate it, call the doSomethingElse(serviceResponse) and toHttpResponse(somethingElse) methods, and put the response back into the original request context.
And after some time, one of the netty workers will take this answer and write it to the channel.
I ran into the following problems that I would like to solve:
1) Threads in thread pools are not doing the job for which they were created.
In my example, the thread from sqlExecutor will call several more methods in addition to processing the request.
If suddenly there is a lot of work in these methods or some kind of blocking occurs, it leads to a sharp degradation of SQLExecutor.
It would seem, where is sqlExecutor, and where is some serviceN. But the serviceN implementation curve (locks or extra work) can lead to blocking the entire sqlExecutor.
2) Let's say that serviceN has an order of magnitude lower throughput than sqlExecutor.
Then some strange situations will constantly occur at their junction, when sqlExecutor has already prepared the model for further processing, but cannot put this task in serviceN.
Then I see only 4 options for the development of events:
- blocking, which leads to the degradation of sqlExecutor
- throw an exception, which leads to the loss of the request
- bypass the service queue and do the necessary work in the current thread. What adds inappropriate work in the sql thread pool, but does not lead to a sharp degradation
-- to have some kind of fallback queue, in which the task can be placed with greater reliability.
This does not solve the problem, but it makes it possible to live for some more time, while the strength of the new queue is sufficient.
3) If we allow degradation for sqlExecutor (that is, we do not throw an exception when we unsuccessfully try to put the task into the service), then it would be wiser to somehow redistribute the load.
So that SQLExecutor does not get hung up on a half-dead service, but continues to load other services with the same efficiency.
I understand how you can hack this, but I want a simple and elegant solution.
4) For each CompletableFuture that is created in .then...() I need to understand in which executor the function will be executed.
If all composition happens in one method, then there is no particular problem in this. But if the creation of this entire chain is spread over several classes,
then each public method has to add a description in the javadoc, which thread from which labor pool collects the returned future.
This is some kind of complexity that appeared out of the blue, and I don’t understand how to get rid of it

Answer the question

In order to leave comments, you need to log in

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question