N
N
nuker2014-04-16 14:25:39
Python
nuker, 2014-04-16 14:25:39

Python: Why doesn't socket multiprocessing pool of workers work?

Hi all!

There is such a task: to accept socket connections, receive input data for complex calculations, calculate and give an answer. There can be a lot of requests at the same time.
At first, I realized that because of the GIL I would not be able to use threads normally, and tried to wrap everything in C ++, including boost: threads and boost: python, starting threads in C ++ and performing calculations in each thread in the python subinterpreter, but it didn’t work out , vseravno percent with 8 cores did not use 100%.

As a result, I decided that I would have to do it only through multiprocessing, and it would be optimal to create n workers at once (number of cores * 2) so that they sit and wait for tasks (queue). Thus, no time is wasted creating a process, and they will not breed more than n in number.

With sockets and streams, I worked more in C ++, so in python I only come across this.
There is this script:

#!/usr/bin/env python   
import os
import sys
import SocketServer
import Queue
import time
import socket
import multiprocessing
from multiprocessing.reduction import reduce_handle
from multiprocessing.reduction import rebuild_handle 

class MultiprocessWorker(multiprocessing.Process):

    def __init__(self, sq):

        self.SLEEP_INTERVAL = 1

        # base class initialization
        multiprocessing.Process.__init__(self)

        # job management stuff
        self.socket_queue = sq
        self.kill_received = False

    def run(self):
        while not self.kill_received:
            try:     
                h = self.socket_queue.get_nowait()          
                fd=rebuild_handle(h)
                client_socket=socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM)
                #client_socket.send("hellofromtheworkerprocess\r\n")
                received = client_socket.recv(1024)
                print "Recieved on client: ",received
                client_socket.close()

            except Queue.Empty:
                pass

            #Dummy timer
            time.sleep(self.SLEEP_INTERVAL)

class MyTCPHandler(SocketServer.BaseRequestHandler):
    """
    The RequestHandler class for our server.

    It is instantiated once per connection to the server, and must
    override the handle() method to implement communication to the
    client.
    """

    def handle(self):
        # self.request is the TCP socket connected to the client
        #self.data = self.request.recv(1024).strip()
        #print "{} wrote:".format(self.client_address[0])
        #print self.data
        # just send back the same data, but upper-cased
        #self.request.sendall(self.data.upper())

        #Either pipe it to worker directly like this
        #pipe_to_worker.send(h) #instanceofmultiprocessing.Pipe
        #or use a Queue :)

        h = reduce_handle(self.request.fileno())
        socket_queue.put(h)


if __name__ == "__main__":

    #Mainprocess
    address =  ('localhost', 8082)
    server = SocketServer.TCPServer(address, MyTCPHandler)
    socket_queue = multiprocessing.Queue()

    for i in range(5):
        worker = MultiprocessWorker(socket_queue)
        worker.start()

    try:
        server.serve_forever()
    except KeyboardInterrupt:
        sys.exit(0)

But it receives and immediately closes the connection... it's not clear why.

Answer the question

In order to leave comments, you need to log in

1 answer(s)
A
Alexey Cheremisin, 2014-04-16
@leahch

I recommend looking at twisted, (although the threshold for entering it is a little high, in my opinion) you make your server in it, and process it in threads. There are a lot of helpers for twisted, for example, for working with threads (if tasks are short-lived) - https://twistedmatrix.com/documents/current/core/h...
or for working with processes (if tasks are long-lived) - https:/ /twistedmatrix.com/documents/current/core/h...
and for working with the network in almost any manifestation - https://twistedmatrix.com/documents/current/core/h...
well, a link to all sorts of howtos - https://twistedmatrix.com/documents/current/core/howto
Ask, I'll try to answer..

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question