Answer the question
In order to leave comments, you need to log in
How to get values from streams without using future?
Greetings!
I wrote a program that counts the total number of words in .log files in a specified directory in multi-threaded mode.
The first argument on the command line is the path to the directory in which to look for .log files and count the words in them.
The second argument is the number of threads.
I have written the following code to solve this task
ThreadPool.h
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <boost/thread/condition_variable.hpp>
#include <boost/thread.hpp>
#include <future> // I don't how to work with boost future
#include <queue>
#include <vector>
#include <functional>
class ThreadPool
{
public:
using Task = std::function<void()>; // Our task
explicit ThreadPool(int num_threads)
{
start(num_threads);
}
~ThreadPool()
{
stop();
}
template<class T>
auto enqueue(T task)->std::future<decltype(task())>
{
// packaged_task wraps any Callable target
auto wrapper = std::make_shared<std::packaged_task<decltype(task()) ()>>(std::move(task));
{
boost::unique_lock<boost::mutex> lock{ mutex_p };
tasks_p.emplace([=] {
(*wrapper)();
});
}
event_p.notify_one();
return wrapper->get_future();
}
//void enqueue(Task task)
//{
// {
// boost::unique_lock<boost::mutex> lock { mutex_p };
// tasks_p.emplace(std::move(task));
// event_p.notify_one();
// }
//}
private:
std::vector<boost::thread> threads_p; // num of threads
std::queue<Task> tasks_p; // Tasks to make
boost::condition_variable event_p;
boost::mutex mutex_p;
bool isStop = false;
void start(int num_threads)
{
for (int i = 0; i < num_threads; ++i)
{
// Add to the end our thread
threads_p.emplace_back([=] {
while (true)
{
// Task to do
Task task;
{
boost::unique_lock<boost::mutex> lock(mutex_p);
event_p.wait(lock, [=] { return isStop || !tasks_p.empty(); });
// If we make all tasks
if (isStop && tasks_p.empty())
break;
// Take new task from queue
task = std::move(tasks_p.front());
tasks_p.pop();
}
// Execute our task
task();
}
});
}
}
void stop() noexcept
{
{
boost::unique_lock<boost::mutex> lock(mutex_p);
isStop = true;
event_p.notify_all();
}
for (auto& thread : threads_p)
{
thread.join();
}
}
};
#endif
#include "ThreadPool.h"
#include <iostream>
#include <iomanip>
#include <Windows.h>
#include <vector>
#include <map>
#include <boost/filesystem.hpp>
#include <boost/thread.hpp>
namespace bfs = boost::filesystem;
int count_words(const std::string& filename)
{
int counter = 0;
std::ifstream file(filename);
std::string buffer;
while (file >> buffer)
{
++counter;
}
return counter;
}
int main(int argc, const char* argv[])
{
bfs::path path = argv[1];
// If this path is exist and if this is dir
if (bfs::exists(path) && bfs::is_directory(path))
{
// Number of threads. Default = 4
int n = (argc == 3 ? atoi(argv[2]) : 4);
ThreadPool pool(n);
// Container to store all filenames and number of words inside them
std::map<bfs::path, int> all_files_and_sums;
// Iterate all files in dir
for (auto& p : bfs::directory_iterator(path)) {
// Takes only .txt files
if (p.path().extension() == ".log") {
// Future for taking value from here
auto fut = pool.enqueue([&p, &all_files_and_sums]() {
// In this lambda function I count all words in file and return this value
int result = count_words(p.path().string());
std::cout << "TID " << GetCurrentThreadId() << "\n";
return result;
});
// "filename = words in this .txt file"
all_files_and_sums[p.path()] = fut.get();
}
}
int result = 0;
for (auto& k : all_files_and_sums)
{
std::cout << k.first << "- " << k.second << "\n";
result += k.second;
}
std::cout << "Result: " << result << "\n";
}
else
std::perror("Dir is not exist");
}
Answer the question
In order to leave comments, you need to log in
If you read your code carefully, you will see a lot of problems. If I don’t argue
[&p, &all_files_and_sums]
with capture by reference, then will it definitely continue to exist after exiting the iteration? Let's think. Will it continue after exiting the cycle?
I have no confidence at all that after the offset, the address returned by it will change. I would rather capture the copy in the lambda.
You don't have to wait for the thread pool to complete its work - there is no synchronization with the completion of scheduled tasks. More precisely ... well, how not ... You have a hard synchronization through [ ?all_files_and_sums
p
You should save yourself std::future
from tasks. At the time of planning, their result has not yet been determined and get
should not be called. We must wait for the completion of all threads in the pool and the exhaustion of all tasks. To do this, you must have notification mechanisms thought out in your pool.
After processing all the tasks, you can call std::future::get
, get the results and perform your operations on them.
Alternatively, you may be more sensitive to the completion of each task and the appearance of its std::future
result. This can be done too. You just need to do it yourself and think over the scalability of such a mechanism.
And in addition. Why do you need boost? You std::future
also use lambdas, you write within the C++11 standard. You have access to and std::thread
, and all the barrier primitives from std
. The entire Thread Support Library
is at your disposal . And boost is clearly superfluous here.
If you switch to C ++ 17, then you will not need it, because becomes available - Filesystem Library . boost::filesystem
std::filesystem
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question