S
S
Sergey Tsatsatsa2018-03-13 08:26:11
.NET
Sergey Tsatsatsa, 2018-03-13 08:26:11

Multithreading ON DEMAND, or single-threaded multithreading?

There is a task of the following type:

  • There is a main thread that generates data for handlers.
  • Handlers are small program codes that can differ both in the code itself and in the context (if the code is the same)
  • There can be many processors (up to 500 thousand pieces).
  • The number of handlers is not known in advance, and they are identified in the process of generating data for them.
  • Data to one of the handlers can arrive several times separately in time (for example, a data queue for handlers: A, B, C, A, A, C, B, A, C, etc.)
  • Thus, only one handler can (and should) work at a time, the parallel operation of handlers is unacceptable, as well as the simultaneous operation of the main thread and the handler, since handlers can have feedback and can further affect the work of both the main thread and handlers.
  • The speed of the complex from the main thread and handlers, together with the initialization of the handlers, should be high.
  • The platform used is NetStandart2.x (NetCore2.x).

On the one hand, you can just use Threads for this, and this is very convenient, but initializing a large number takes time. If you use Task, then the initialization is fast, but the competition spoils everything.
The question is next. How to implement so that the handler, while waiting for the next data packet, was in sleep mode and did not participate in the competition.
And it seems to be convenient to use one thread, but slip different handler contexts into it, but how?
The following code, theoretically, should perform the task, but given the features of Task, a block occurs and everything stops:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Diagnostics;

namespace ThreadsTests
{
    public class WorkData
    {
        public int Id;
    }

    public abstract class ProtoBase<T> : IDisposable
    {
        public volatile bool IsFinish = false;

        ManualResetEventSlim WaitData = new ManualResetEventSlim(false);
        ManualResetEventSlim WaitRequest = new ManualResetEventSlim(false);
        static int NextId = 0;
        T NextData;

        public int give_count = 1;
        public ProtoBase()
        {
            ID = ++NextId;
        }
        public int ID;

        public void Finish()
        {
            IsFinish = true;
            WaitData.Dispose();
            WaitRequest.Dispose();
        }


        internal void GiveData(T bext_data)
        {
            WaitRequest.Wait();
            WaitRequest.Reset();
            NextData = bext_data;
            WaitData.Set();
            
        }

        public Task<T> GetNextData()
        {
            Func<T> DataWatingAction = () =>
            {
                WaitRequest.Set();
                WaitData.Wait();
                WaitData.Reset();
                T b = NextData;
                NextData = default(T);
                return b;
            };
            Task<T> DataWaitngTask = new Task<T>(DataWatingAction);
            DataWaitngTask.Start();

            return DataWaitngTask;


        }

        public abstract void Parse();

        public virtual void Dispose()
        {

        }
    }

    public class TestWorker : ProtoBase<WorkData>
    {
        public override async void Parse()
        {

            WorkData data = await GetNextData();
            Trace.TraceInformation("{0:N0} taked 1 [id={1:N0}]", ID, data.Id);
            data = await GetNextData();
            Trace.TraceInformation("{0:N0} taked 2 [id={1:N0}]", ID, data.Id);
            data = await GetNextData();
            Trace.TraceInformation("{0:N0} taked 3 [id={1:N0}]", ID, data.Id);
            Finish();
            Trace.TraceInformation("{0:N0} Finish ххх", ID);
        }
    }


    public class Main
    {
        //workers count
        int test_count = 2; // 100, 1000, 10000, 100000
        List<TestWorker> Workers = new List<TestWorker>();
        public void AsyncWorkersTest()
        {
            LinkedList<TestWorker> wrks = new LinkedList<TestWorker>();
            for (int i = 0; i < test_count; i++)
            {
                TestWorker tp = new TestWorker();
                wrks.AddLast(tp);
                tp.Parse();
            }
            Workers = wrks.ToList();
            Random rnd = new Random();

            int getDataCount = test_count * 3;
            for (int i = 0; i < getDataCount; i++)
            {
                int ind = rnd.Next(0, Workers.Count);
                WorkData wd = new WorkData() { Id = i };
                
                if (Workers[ind].IsFinish) continue;
                Trace.TraceInformation("{0:N0} give  {1} [id={2:N0}]", ind, Workers[ind].give_count++, wd.Id);
                Workers[ind].GiveData(wd);
            }
        }

    }
}

Here is the log of this code.
testhost Information: 0 : 0 give  1 [id=0]
testhost Information: 0 : 0 give  2 [id=1]
testhost Information: 0 : 1 taked 1 [id=0]
testhost Information: 0 : 1 give  1 [id=2]
testhost Information: 0 : 0 give  3 [id=3]
testhost Information: 0 : 1 taked 2 [id=1]
testhost Information: 0 : 0 give  4 [id=4]
testhost Information: 0 : 2 taked 1 [id=2]
testhost Information: 0 : 1 taked 3 [id=3]
testhost Information: 0 : 1 Finish ххх

Answer the question

In order to leave comments, you need to log in

3 answer(s)
L
lam0x86, 2018-03-16
@capiev

I got this bike:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Diagnostics;

namespace ThreadsTests
{
    public class WorkData
    {
        public int Id;
    }

    public abstract class ProtoBase<T>
    {
        private static int NextId;
        
        private TaskCompletionSource<T> _dataTaskCompletionSource = new TaskCompletionSource<T>();

        public int give_count = 1;

        protected ProtoBase()
        {
            Id = ++NextId;
            new Task(Parse).Start();
        }
        
        public int Id { get; }

        public bool IsFinish { get; private set; }

        protected void Finish()
        {
            IsFinish = true;
        }

        public async Task PushData(T data)
        {
            _dataTaskCompletionSource.SetResult(data);
            await Task.Yield();
        }

        protected async Task<T> GetNextData()
        {
            var taskResult = await _dataTaskCompletionSource.Task;
            _dataTaskCompletionSource = new TaskCompletionSource<T>();
            return taskResult;
        }

        protected abstract void Parse();
    }

    public class TestWorker : ProtoBase<WorkData>
    {
        protected override async void Parse()
        {

            WorkData data = await GetNextData();
            Trace.TraceInformation("{0:N0} take 1 [id={1:N0}]", Id, data.Id);
            data = await GetNextData();
            Trace.TraceInformation("{0:N0} take 2 [id={1:N0}]", Id, data.Id);
            data = await GetNextData();
            Trace.TraceInformation("{0:N0} take 3 [id={1:N0}]", Id, data.Id);
            Finish();
            Trace.TraceInformation("{0:N0} Finish ххх", Id);
        }
    }


    public class Program
    {
        public static async Task Main(string[] args)
        {
            var schedulerPair = new ConcurrentExclusiveSchedulerPair();
            await await Task.Factory.StartNew(
                AsyncWorkersTest,
                CancellationToken.None,
                TaskCreationOptions.None,
                schedulerPair.ExclusiveScheduler);
            Console.WriteLine("FINISHED");
            Console.ReadKey();
        }
        
        public static async Task AsyncWorkersTest()
        {
            //workers count
            const int testCount = 1000; // 100, 1000, 10000, 100000
            var Workers = new List<TestWorker>();

            for (int i = 0; i < testCount; i++)
            {
                Workers.Add(new TestWorker());
            }

            Random rnd = new Random();

            int getDataCount = testCount * 3;
            for (int i = 0; i < getDataCount; i++)
            {
                int ind = rnd.Next(0, Workers.Count);
                WorkData wd = new WorkData() { Id = i };
                
                if (Workers[ind].IsFinish) continue;
                Trace.TraceInformation("{0:N0} push {1} [id={2:N0}]", Workers[ind].Id, Workers[ind].give_count++, wd.Id);
                await Workers[ind].PushData(wd);
            }
        }
    }
}

Conclusion:
ConsoleApp1 Information: 0 : 1 push 1 [id=0]
ConsoleApp1 Information: 0 : 1 take 1 [id=0]
ConsoleApp1 Information: 0 : 2 push 1 [id=1]
ConsoleApp1 Information: 0 : 2 take 1 [id=1]
ConsoleApp1 Information: 0 : 1 push 2 [id=2]
ConsoleApp1 Information: 0 : 1 take 2 [id=2]
ConsoleApp1 Information: 0 : 1 push 3 [id=3]
ConsoleApp1 Information: 0 : 1 take 3 [id=3]
ConsoleApp1 Information: 0 : 1 Finish ххх
ConsoleApp1 Information: 0 : 2 push 2 [id=4]
ConsoleApp1 Information: 0 : 2 take 2 [id=4]
ConsoleApp1 Information: 0 : 2 push 3 [id=5]
ConsoleApp1 Information: 0 : 2 take 3 [id=5]
ConsoleApp1 Information: 0 : 2 Finish ххх

A
Alexander Yudakov, 2018-03-13
@AlexanderYudakov

To clean up all this mess in my head, I suggest using RTFM technology:
Thread Pooling in C#:
https://docs.microsoft.com/en-us/dotnet/csharp/pro...
Thread Synchronization in C#:
https:// docs.microsoft.com/en-us/dotnet/csharp/pro...
Asynchronous Programming (C#):
https://docs.microsoft.com/en-us/dotnet/csharp/async

I
Ivan Arxont, 2018-03-13
@arxont

"only one handler can (and should) work at a time" - something tells me that the lock statement will save you
Look at this answer on stackoverflow for an example - https://stackoverflow.com/questions/37242257/why-i. ..

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question