K
K
kikukuvota2015-07-01 05:53:19
Python
kikukuvota, 2015-07-01 05:53:19

Python multithreading Requests, what's blocking processing?

Hello.
I have been programming in python recently, to some extent I still understand.
Explain why this happens
Task: There is a file with urls. I want to process them (get the content, or just check the status/availability).
But all this is still implemented through streams.
So, if site domains exist, then processing is fast, if a domain is found in the file that is inaccessible / does not exist, then processing becomes several times slower, it seems that the flows are blocked and as soon as the erroneous domain is processed, again, quickly, but if there are many erroneous domains, then the execution time increases greatly.

from threading import Thread
import threading
import subprocess
import requests
import time
import os

theardCount = 25

domain_file = "domains.txt"
domain_temp = "temp/"

def CheckRequest(host, step=0):

    fr_success = domain_temp+"/req-good-"+str(step)+".txt"
    fr_errors  = domain_temp+"/req-error-"+str(step)+".txt" 

    url = "http://"+host
    
    try:
        s = requests.Session()
        r = s.get(url)
        f = open(fr_success, "a+")
        f.write(host+'\n')
        f.close() 
    except Exception:
        f = open(fr_errors, "a+")
        f.write(host+'\n')
        f.close()   

    return 0

class GetDomainThread(Thread):
    def __init__(self, step):
        self.step = step
        self.body = None
        super(GetDomainThread, self).__init__()
    def run(self):
        step = self.step+1
        fdom  = open(domain_file, "r") 
        i=1
        f=step
        for line in fdom.readlines():
            if i==f:
                strLine = line.split("\n")
                domain = strLine[0]
                CheckRequest(domain, step)
                   
                f=f+theardCount
            i=i+1


# MAIN 

def main():

    start = time.time()

    threads = [GetDomainThread(i) for i in range(theardCount)]
  
    for thread in threads:
        thread.start()
  
    for thread in threads:
        thread.join()
    
           
    end = time.time()

    print end-start

if __name__ == '__main__':
    main()

Answer the question

In order to leave comments, you need to log in

3 answer(s)
B
bzzzzzz, 2015-07-02
@kikukuvota

It is not GIL that blocks processing in your case, but the way you distribute tasks between threads: before starting work, you distribute all URLs equally between threads and, therefore, there is such a situation when some of the threads are idle and wait for one of them to check all the remaining ones sites. In your program, you should use queues to distribute tasks and, of course, reduce the number of I / O operations.
The code will be something like this (see my comments to understand what and why):

# coding=utf-8
import requests
import time
import os
from threading import Thread, current_thread
from Queue import Queue


theard_count = 25


domain_file = "domains.txt"
domain_temp = "temp"


def check_url(host):
    url = 'http://' + host

    try:
        requests.get(url, timeout=5)
    except Exception:
        return False
    else:
        return True


def run(queue, result_queue):
    # Цикл продолжается пока очередь задач не станет пустой
    while not queue.empty():
        # получаем первую задачу из очереди
        host = queue.get_nowait()
        print '{} checking in thread {}'.format(host, current_thread())
        # проверяем URL
        status = check_url(host)
        # сохраняем результат для дальнейшей обработки
        result_queue.put_nowait((status, host))
        # сообщаем о выполнении полученной задачи
        queue.task_done()
        print '{} finished in thread {}. Result={}'.format(host, current_thread(), status)

    print '{} closing'.format(current_thread())


# MAIN
def main():
    start_time = time.time()

    # Для получения задач и выдачи результата используем очереди
    queue = Queue()
    result_queue = Queue()

    fr_success = os.path.join(domain_temp, "req-good.txt")
    fr_errors  = os.path.join(domain_temp, "req-error.txt")

    # Сначала загружаем все URL из файла в очередь задач
    with open(domain_file) as f:
        for line in f:
            queue.put(line.strip())

    # Затем запускаем необходимое количество потоков
    for i in range(theard_count):
        thread = Thread(target=run, args=(queue, result_queue))
        thread.daemon = True
        thread.start()

    # И ждем, когда задачи будут выполнены    
    queue.join()

    # После чего пишем результаты в файлы
    with open(fr_success, 'w') as fs, open(fr_errors, 'w') as fe:
        while not result_queue.empty():
            status, host = result_queue.get_nowait()

            if status:
                f = fs
            else:
                f = fe

            f.write(host)
            f.write('\n')

    print time.time() - start_time

if __name__ == '__main__':
    main()

500 sites, of which 150 do not work, it scrapes in 35 seconds.

S
Stanislav Fateev, 2015-07-01
@svfat

So it is - the GIL is blocking the thread.
In general, your code is not very efficiently written, there are too many unnecessary I \ O operations, every time you open files - a lot of time is lost, how you take a domain from a file for processing, this is generally hard. Try to rewrite so that there would be no file operations in the threads at all, work only with memory.

S
SlivTime, 2015-07-01
@SlivTime

If you have a python version >= 3.3, you can try aiohttp and forget about the hassle of threading in python.

import asyncio
import aiohttp

@asyncio.coroutine
def fetch_status(session, url):
    status = None
    try:
        response = yield from session.get(url)
        response.close()
        status = response.status
    except Exception as e:
        status = e.__str__()
    return status


def run():
    session = aiohttp.ClientSession()
    with open('domains.txt', mode='r') as f:
        for url in f:
            url = url.strip()
            status = yield from fetch_status(session, url)
            print(url, ": ", status, sep='')
    session.close()

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(run())

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question