Answer the question
In order to leave comments, you need to log in
Multithreaded Form Processing in Python3 Pages. How?
Good evening Toaster.
It's been more than a day that I can't process a form in Python3 with multithreading. (In parallel, it's not correct to use the term - multithreading)
My code is quite simple and looks like this:
It takes search settings from the database (postgresql), as well as links that need to be processed , then creates the Nth number of threads and runs the same function with different arguments.
But when I have a database with 100,000 links, a script running in 20 threads starts to get dumb after 2,400 links and give the following error:
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Task exception was never retrieved
future: <Task finished coro=<main() done, defined at async.py:173> exception=BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.',)>
yield from loop.run_in_executor(p, operation, item)
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
from grab import Grab
import random
import psycopg2
# Open connection to the database
connection = psycopg2.connect(database="<....>",
user="<....>",
password="<....>",
host="127.0.0.1",
port="5432")
# Create a new cursor for it
c = connection.cursor()
# Select settings from database
c.execute("SELECT * FROM <....> WHERE id=1;")
data = c.fetchall()
# Get time starting script
start_time = time.time()
def operation(link):
# init grab framework
g = Grab()
# try to find some elements on the page
try:
# open link
g.go(link)
# some link processing
<....>
except:
pass
@asyncio.coroutine
def main(item):
yield from loop.run_in_executor(p, operation, item)
# Create async loop, declare number of threads
loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(data[0][13]) # =20
# Init tasks list - empty
tasks = []
# Select all urls which need to process
c.execute ("SELECT url FROM <....> ORDER BY id;")
# Forming tasks
for item in c.fetchall():
tasks.append(main(item[0]))
# Close main connection to the database
connection.close()
# Run async tasks
loop.run_until_complete(asyncio.wait(tasks))
# Close loop
loop.close()
# Get script finish time
print("--- %s seconds ---" % (time.time() - start_time))
Answer the question
In order to leave comments, you need to log in
You are not creating parallel threads for processing, but parallel processes .
And the number of maximum allowed processes you can have is limited by the system, something like:
$ ulimit -a
core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 32089
max locked memory (kbytes, -l) 64
max memory size (kbytes, -m) unlimited
open files (-n) 1024
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 8192
cpu time (seconds, -t) unlimited
max user processes (-u) 32089
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question