V
V
Vasily2019-12-22 12:18:05
C++ / C#
Vasily, 2019-12-22 12:18:05

How to write an asynchronous pipeline in c#?

Good day.
There is such an algorithm:
1. An array with image files (streams) is received as input.
2. We convert these images to another format (resource-intensive),
3. And then we put them on a remote server (there may be a long response).
4. We return links from where we put them in the form of a collection to the one who called us.
We know the number of input files at once.
How to write a pipeline algorithm so that we do not wait until all images are converted. It is necessary that at the first ready we would immediately try to send it to a remote server?
Tried with BlockingCollection but I don't seem to be using it correctly.
UPDATED
Thanks to mindtester 's advice , this code was writtenhttps://dotnetfiddle.net/fHWZ9n
It seems to work as it should, if possible, take a look and write your comments in the comments to this post. (Who is comfortable with github https://gist.github.com/milovidov983/e305890bfd8fb... )

Answer the question

In order to leave comments, you need to log in

2 answer(s)
#
#, 2019-12-22
@mindtester

well, IEnumerable<> and yield will help you. Of course , async / await
is just these four concepts that are intended, to simplify writing code, over data arrays stretched in time and of unknown length. with their help, most of the code looks like all the input data is provided at once, and you give the result as if it were also immediately
ps examples from MSDN (where they are) or on https://metanit.com/sharp/ are useful for practice tutorial/ will have to look. in any case, you will definitely have to understand the scope and possibilities of these concepts. and then there will definitely be a picture of how they will fall on your specific case
pps

We return links from where we put them in the form of a collection to the one who called us.
this place does not really fit into the concept of the pipeline on IEnumerable/yield/async/await
, the fact is that you get the resulting links "occasionally", and obviously, apart from the moment you get the original image,
there are several ways:
- a collection of links (json? ) form at the end of processing
- return links to the source one at a time, and form a collection there, and binding can be done by name / tag / code / image number .. (frame?)
- you can send json asynchronously, stretched in time, or any the selected dotnet collection, if there is a dotnet on the receiver, and you also edit the sources
- as an option, if the number of frames is known initially, you can notify the source about the start of sending at least an array of a given dimension, in any format that it understands, unless of course it is also capable of asynchronous
ppps processing
Tried with BlockingCollection
I repeat, as far as I caught your task, except for IEnumerable<>, in principle, nothing else is needed in such a pipeline. well, or maybe how you decide to form the result there. but also for the result, even for the drive, from dotnet constructions, the same IEnumerable<> for the ears, and from it, if it is required, you can also form json

V
Vladislav, 2019-12-23
@Jewish_Cat

Check out DataFlow. Maybe it will be overkill, but for working with the image it fits with a bang.

BigInteger result = 0;
            var inputBlock = new BufferBlock<(string,string)>(); // Блок для входных данных

            var options = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 200 // указываем сколько потоков будут обрабатывать наши данные
            };

            var callBalanceBlock = new TransformBlock<(string,string), BigInteger>(async x =>
            {
                BigInteger wallet = await function.CallAsync<BigInteger>(x.Item1);
                BigInteger eth = await Current.Eth.GetBalance.SendRequestAsync(x.Item1);
                TechWallets.TechEURGWallets.Add(new TechWallet
                {
                    Address = x.Item1, 
                    TokenValue = Web3.Convert.FromWei(wallet,2), 
                    EthValue = Web3.Convert.FromWei(eth, UnitConversion.EthUnit.Ether),
                    PrivateKey = x.Item2
                });
                return wallet;
            }, options); // Это главный испольнительный блок. Получает информация в виде (string,string) и отдает в формате BigInteger.

            var calc = new ActionBlock<BigInteger>(x => result += x, options); // Получает данные после работы TransformBlock, чтобы подсчитать число

            inputBlock.LinkTo(callBalanceBlock); // Прокидываем линк(соединение), что после BufferBlock, должен отрабатывать TransformBlock
            inputBlock.Completion.ContinueWith(task => callBalanceBlock.Complete());

            callBalanceBlock.LinkTo(calc); // Тоже самое, но для ActionBlock
            callBalanceBlock.Completion.ContinueWith(task => calc.Complete());

            foreach (var i in TechWallets.ListEURGTechWallets)
                inputBlock.Post(i); // Заполняем наш BufferBlock входными значениями

            inputBlock.Complete(); // Запускаем работу наших блоков

            try
            {
                calc.Completion.Wait(); // Ждем завершения работы последнего блока
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }

The code will become much simpler and faster.
You can build your own pipeline like this: BufferBlock(Array of input data) -> TransformBlock(Here you process input images) -> ActionBlock(Here you upload the image to the server)

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question