E
E
Elnurhan2020-08-18 18:03:34
C++ / C#
Elnurhan, 2020-08-18 18:03:34

How to get values ​​from streams without using future?

Greetings!
I wrote a program that counts the total number of words in .log files in a specified directory in multi-threaded mode.
The first argument on the command line is the path to the directory in which to look for .log files and count the words in them.
The second argument is the number of threads.
I have written the following code to solve this task

ThreadPool.h

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <boost/thread/condition_variable.hpp>
#include <boost/thread.hpp>

#include <future> // I don't how to work with boost future
#include <queue>
#include <vector>
#include <functional>

class ThreadPool
{
public:
    using Task = std::function<void()>; // Our task

    explicit ThreadPool(int num_threads)
    {
        start(num_threads);
    }

    ~ThreadPool()
    {
        stop();
    }

    template<class T>
    auto enqueue(T task)->std::future<decltype(task())>
    {
        // packaged_task wraps any Callable target
        auto wrapper = std::make_shared<std::packaged_task<decltype(task()) ()>>(std::move(task));

        {
            boost::unique_lock<boost::mutex> lock{ mutex_p };
            tasks_p.emplace([=] {
                (*wrapper)();
            });
        }

        event_p.notify_one();
        return wrapper->get_future();
    }

    //void enqueue(Task task)
    //{
    //  {
    //      boost::unique_lock<boost::mutex> lock { mutex_p };
    //      tasks_p.emplace(std::move(task));
    //      event_p.notify_one();
    //  }
    //}

private:
    std::vector<boost::thread> threads_p; // num of threads
    std::queue<Task>           tasks_p;   // Tasks to make
    boost::condition_variable  event_p; 
    boost::mutex               mutex_p;

    bool                       isStop = false;

    void start(int num_threads)
    {
        for (int i = 0; i < num_threads; ++i)
        {
            // Add to the end our thread
            threads_p.emplace_back([=] {
                while (true)
                {
                    // Task to do
                    Task task;

                    {
                        boost::unique_lock<boost::mutex> lock(mutex_p);

                        event_p.wait(lock, [=] { return isStop || !tasks_p.empty(); });

                        // If we make all tasks
                        if (isStop && tasks_p.empty())
                            break;

                        // Take new task from queue
                        task = std::move(tasks_p.front());
                        tasks_p.pop();
                    }

                    // Execute our task
                    task();
                }
            });
        }
    }

    void stop() noexcept
    {
        {
            boost::unique_lock<boost::mutex> lock(mutex_p);
            isStop = true;
            event_p.notify_all();
        }

        for (auto& thread : threads_p)
        {
            thread.join();
        }
    }
};

#endif


main.cpp
#include "ThreadPool.h"

#include <iostream>
#include <iomanip>
#include <Windows.h>

#include <vector>
#include <map>

#include <boost/filesystem.hpp>
#include <boost/thread.hpp>


namespace bfs = boost::filesystem;

int count_words(const std::string& filename)
{
    int counter = 0;
    std::ifstream file(filename);
    std::string buffer;
    while (file >> buffer)
    {
        ++counter;
    }
    
    return counter;
}

int main(int argc, const char* argv[])
{
    bfs::path path = argv[1];
    // If this path is exist and if this is dir
    if (bfs::exists(path) && bfs::is_directory(path))
    {
        // Number of threads. Default = 4
        int n = (argc == 3 ? atoi(argv[2]) : 4);
        ThreadPool pool(n);

        // Container to store all filenames and number of words inside them
        std::map<bfs::path, int> all_files_and_sums;
        
        // Iterate all files in dir
        for (auto& p : bfs::directory_iterator(path)) {
            // Takes only .txt files
            if (p.path().extension() == ".log") {
                // Future for taking value from here
                auto fut = pool.enqueue([&p, &all_files_and_sums]() {
                    // In this lambda function I count all words in file and return this value
                    int result = count_words(p.path().string());
                    std::cout << "TID " << GetCurrentThreadId() << "\n";
                    return result;
                });
                // "filename = words in this .txt file"
                all_files_and_sums[p.path()] = fut.get();
            }
        }

        int result = 0;

        for (auto& k : all_files_and_sums)
        {
            std::cout << k.first << "- " << k.second << "\n";
            result += k.second;
        }

        std::cout << "Result: " << result << "\n";
    }
    else
        std::perror("Dir is not exist");
}


This solution works correctly. But if there are a lot of .log files in the directory, the program works slowly, and some threads (with a large number of threads) simply exist and do nothing.
I think the problem is in the futures. How can you get values ​​from streams without a future?

Answer the question

In order to leave comments, you need to log in

1 answer(s)
E
Evgeny Shatunov, 2020-08-19
@Elnurhan

If you read your code carefully, you will see a lot of problems. If I don’t argue
[&p, &all_files_and_sums]
with capture by reference, then will it definitely continue to exist after exiting the iteration? Let's think. Will it continue after exiting the cycle? I have no confidence at all that after the offset, the address returned by it will change. I would rather capture the copy in the lambda. You don't have to wait for the thread pool to complete its work - there is no synchronization with the completion of scheduled tasks. More precisely ... well, how not ... You have a hard synchronization through [ ?all_files_and_sumsp
You should save yourself std::futurefrom tasks. At the time of planning, their result has not yet been determined and getshould not be called. We must wait for the completion of all threads in the pool and the exhaustion of all tasks. To do this, you must have notification mechanisms thought out in your pool.
After processing all the tasks, you can call std::future::get, get the results and perform your operations on them.
Alternatively, you may be more sensitive to the completion of each task and the appearance of its std::futureresult. This can be done too. You just need to do it yourself and think over the scalability of such a mechanism.
And in addition. Why do you need boost? You std::futurealso use lambdas, you write within the C++11 standard. You have access to and std::thread, and all the barrier primitives from std. The entire Thread Support Library
is at your disposal . And boost is clearly superfluous here. If you switch to C ++ 17, then you will not need it, because becomes available - Filesystem Library .
boost::filesystemstd::filesystem

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question