Answer the question
In order to leave comments, you need to log in
How to make sure all child processes have exited?
The task is to receive data via SSH from several servers, process it a bit and write it to a file. I decided to parallelize the solution, because in the single-threaded version, most of the time is spent waiting for a response from the server.
To do this, I created a multiprocessing.Queue queue, created a list of multiprocessing.Process objects, launched all these processes one by one and passed the queue to each as an argument. Thus, each process can write to this queue, and I can read from the main process from there.
The question is, how do I know that all child processes have completed? On the Internet, I found a recipe to call each in turn .join(), but this will block the main process, and I want to write to the file in the main process.
I also tried to check .is_alive() for processes, but the process is considered alive until I call .join() on it.
Now I use a crutch that checks if there is anything in the queue, and if it doesn’t find anything, then it waits 20 seconds, checks again and, in case of repeated “emptiness”, exits. But this is obviously a crutch.
How to do it right?
Answer the question
In order to leave comments, you need to log in
It is most convenient to write in another process. Its task would be to wait for data from the get () queue and write everything received to a file. Obviously, there is a problem here that the process will never end, but it is easily solved: in the main process, you wait for the completion of all worker processes by join(), after which you send some "interesting" value to the queue (I would send None) , after which you wait for the completion of the writer process again by join (). At the sight of an "interesting" value, the writing process ends. Well, the main process also ends, respectively. As if it doesn’t look very nice, BUT that’s normal: some mechanism is needed that would say that “they won’t come to the queue anymore”, which we do in the main process. You can expand the queue, throw exceptions, but I think None is enough.By the way, you can use Pipe() for this in the case of processes, but I definitely would not do this, because why? :)
By the way, I was recently worried about something like that, I was looking for information and realized that this task is very similar to the producer-consumer pattern, only our producing process also processes data, and the receiving process only writes. In C#, by the way, there is a special collection that can "close". And here is what Java writes about this:
Special end-of-stream or poison objects, KARL! It's as if I justify myself that I offer a normal solution. :)))
Writing in the main process is inconvenient. In this case, we cannot call join() on the workers, so we need another way to make sure the tasks are finished. It seems that you can use the task_done() and join() methods of the queue for this. It would be possible to push the server addresses into the queue (let it be q_in), do q_in.get() in the worker, process the data and put it in another queue (let's call it q_out), and then call q_in.task_done(). BUT we again have a blocking q_in.join() method that waits for all tasks to complete. Those. such an opportunity does not roll here. Yes, even if it did not block, you would still have to do sleep () in a loop, which is completely ugly. Here it is correct to call a blocking get () in the writing process and complete when some signal is received. This signal will either send "interesting" value (in the case of recording in the main course there is no one to do it, h̶o̶t̶ya̶ ̶m̶o̶zh̶n̶o̶ ̶s̶d̶e̶l̶a̶t̶̶ ̶o̶t̶d̶e̶l̶̶n̶y̶y̶ ̶p̶r̶o̶ts̶e̶s̶s̶ ̶d̶l̶ya̶ ̶e̶t̶o̶g̶o̶, ̶ ̶g̶o̶v̶o̶r̶i̶t̶ ̶i̶z̶v̶r̶a̶sch̶e̶n̶e̶ts̶), as I said above, or you can enter another entity, call it a "global counter". Those. it should decrease after the result is added to the queue. And the main process can check if this counter is not equal to zero after the get() on the queue has worked. And if it is equal, then exit the infinite loop, join the workers, and end. But after all, this is less beautiful than a separate writing process: you have to create a global variable, and if you could do with threads with a simple lock, then in the case of processes there is some kind of hemorrhoids 100%, I never did that, because global variables are evil . Generally,
I want to note a couple more things here.
Do you really need to write as data comes in? Perhaps this is not necessary. After all, there will be constant opening / closing of the file, this is also a kind of overhead, is it really necessary? Also, are processes really needed? There is complex data processing, how much time does it take relative to I / O? If it is uncritically small, then it is better to use threads, in which case everything can be faster with them. Also, with threads comes the ability to use global objects, which I still use, even though they are evil. You can, for example, use list/set/dict instead of Queue. In cpython, they are threadsafe, but it’s better to use locks just in case, they introduce a very small overhead, but at the same time they protect 100% from interesting problems (I would make the LockedIterator class in this case, so that it is universal for everything). The main plus is that they are significantly faster than Queue, even with locks (according to my tests, although I think you can google it). But you, in fact, do not need the delights of Queue if you use threads. That is, are you sure that processes + writing smoothly to a file is faster / more convenient / better than just making threads without Queue, waiting for completion, writing everything to a file? Although there may also be difficulties with the operative, if you need to write a lot.
Besides, why not, for example, ansible ? It can query hosts and accept python plugins. In addition, there are asynchronous tasks , I have not used them, but I think they will do the trick.
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question