Answer the question
In order to leave comments, you need to log in
Python multithreading Requests, what's blocking processing?
Hello.
I have been programming in python recently, to some extent I still understand.
Explain why this happens
Task: There is a file with urls. I want to process them (get the content, or just check the status/availability).
But all this is still implemented through streams.
So, if site domains exist, then processing is fast, if a domain is found in the file that is inaccessible / does not exist, then processing becomes several times slower, it seems that the flows are blocked and as soon as the erroneous domain is processed, again, quickly, but if there are many erroneous domains, then the execution time increases greatly.
from threading import Thread
import threading
import subprocess
import requests
import time
import os
theardCount = 25
domain_file = "domains.txt"
domain_temp = "temp/"
def CheckRequest(host, step=0):
fr_success = domain_temp+"/req-good-"+str(step)+".txt"
fr_errors = domain_temp+"/req-error-"+str(step)+".txt"
url = "http://"+host
try:
s = requests.Session()
r = s.get(url)
f = open(fr_success, "a+")
f.write(host+'\n')
f.close()
except Exception:
f = open(fr_errors, "a+")
f.write(host+'\n')
f.close()
return 0
class GetDomainThread(Thread):
def __init__(self, step):
self.step = step
self.body = None
super(GetDomainThread, self).__init__()
def run(self):
step = self.step+1
fdom = open(domain_file, "r")
i=1
f=step
for line in fdom.readlines():
if i==f:
strLine = line.split("\n")
domain = strLine[0]
CheckRequest(domain, step)
f=f+theardCount
i=i+1
# MAIN
def main():
start = time.time()
threads = [GetDomainThread(i) for i in range(theardCount)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
end = time.time()
print end-start
if __name__ == '__main__':
main()
Answer the question
In order to leave comments, you need to log in
It is not GIL that blocks processing in your case, but the way you distribute tasks between threads: before starting work, you distribute all URLs equally between threads and, therefore, there is such a situation when some of the threads are idle and wait for one of them to check all the remaining ones sites. In your program, you should use queues to distribute tasks and, of course, reduce the number of I / O operations.
The code will be something like this (see my comments to understand what and why):
# coding=utf-8
import requests
import time
import os
from threading import Thread, current_thread
from Queue import Queue
theard_count = 25
domain_file = "domains.txt"
domain_temp = "temp"
def check_url(host):
url = 'http://' + host
try:
requests.get(url, timeout=5)
except Exception:
return False
else:
return True
def run(queue, result_queue):
# Цикл продолжается пока очередь задач не станет пустой
while not queue.empty():
# получаем первую задачу из очереди
host = queue.get_nowait()
print '{} checking in thread {}'.format(host, current_thread())
# проверяем URL
status = check_url(host)
# сохраняем результат для дальнейшей обработки
result_queue.put_nowait((status, host))
# сообщаем о выполнении полученной задачи
queue.task_done()
print '{} finished in thread {}. Result={}'.format(host, current_thread(), status)
print '{} closing'.format(current_thread())
# MAIN
def main():
start_time = time.time()
# Для получения задач и выдачи результата используем очереди
queue = Queue()
result_queue = Queue()
fr_success = os.path.join(domain_temp, "req-good.txt")
fr_errors = os.path.join(domain_temp, "req-error.txt")
# Сначала загружаем все URL из файла в очередь задач
with open(domain_file) as f:
for line in f:
queue.put(line.strip())
# Затем запускаем необходимое количество потоков
for i in range(theard_count):
thread = Thread(target=run, args=(queue, result_queue))
thread.daemon = True
thread.start()
# И ждем, когда задачи будут выполнены
queue.join()
# После чего пишем результаты в файлы
with open(fr_success, 'w') as fs, open(fr_errors, 'w') as fe:
while not result_queue.empty():
status, host = result_queue.get_nowait()
if status:
f = fs
else:
f = fe
f.write(host)
f.write('\n')
print time.time() - start_time
if __name__ == '__main__':
main()
So it is - the GIL is blocking the thread.
In general, your code is not very efficiently written, there are too many unnecessary I \ O operations, every time you open files - a lot of time is lost, how you take a domain from a file for processing, this is generally hard. Try to rewrite so that there would be no file operations in the threads at all, work only with memory.
If you have a python version >= 3.3, you can try aiohttp and forget about the hassle of threading in python.
import asyncio
import aiohttp
@asyncio.coroutine
def fetch_status(session, url):
status = None
try:
response = yield from session.get(url)
response.close()
status = response.status
except Exception as e:
status = e.__str__()
return status
def run():
session = aiohttp.ClientSession()
with open('domains.txt', mode='r') as f:
for url in f:
url = url.strip()
status = yield from fetch_status(session, url)
print(url, ": ", status, sep='')
session.close()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(run())
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question