P
P
PaulTes2018-09-12 16:19:16
Scala
PaulTes, 2018-09-12 16:19:16

How to perform condition check in Akka Streams before sending data via BroadcastHub?

Hello, I am learning the socket library https://github.com/playframework/play-socket.io
I have the following question, how can I use it to process entities before they are sent to all subscribers.
Now I'm trying to do this:

Pair<Sink<Brand, NotUsed>, Source<Brand, NotUsed>> pair = MergeHub.of(Brand.class)
                .toMat(BroadcastHub.of(Brand.class), Keep.both())
                .run(materializer);

        Source<Brand, NotUsed> modifiedSource = pair.second().flatMapConcat((brand) -> {
            Logger.warn(brand.getName());
            return Source.fromCompletionStage(
                    brandRepository.put(brand)
            );
        });

        Flow<Brand, Brand, NotUsed> chatFlow = Flow.fromSinkAndSourceCoupled(
                pair.first(),
                modifiedSource
        );

But there is a problem, the brand saving code is executed as many times as there are connections.
And if you explain how MergeHub and BroadcastHub still work, I will be grateful.
Thank you.

Answer the question

In order to leave comments, you need to log in

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question