Answer the question
In order to leave comments, you need to log in
How to implement a parser on aiohttp with a dynamic queue and a limit on simultaneous connections through a proxy?
Greetings.
I'm making a page parser in python. You need to constantly bypass a large number of pages. For example, 1M per day.
For this, I used TheadPool multithreading before. But when the number of required simultaneous connections increased to 80-100, the script began to fall in memory.
I read that they write it is necessary to switch to asyncio.
I threw a simple script where I take a proxy from a file, take a url and bypass them. To limit concurrency using semaphore.
But I came across the fact that if I have, for example, 50 threads available for the proxy, then when 200 URLs are selected, the first 50 will be processed correctly, and the rest will go into error. It feels like the semaphore doesn't work or I'm somehow using it the wrong way.
I would like to understand how to do it. Or a similar code example.
My example code:
#!/usr/bin/python3.6
# modified fetch function with semaphore
import random
import asyncio
from aiohttp import ClientSession
from random import shuffle
from bs4 import BeautifulSoup
count = 0
with open('proxy2.txt') as f:
proxy = f.readlines()
proxy = [x.strip() for x in proxy]
shuffle(proxy)
def geturls(num):
urls = []
f = open('sitemap.xml', 'r')
soup = BeautifulSoup(f, "lxml")
arr = soup.findAll('loc')
i = 0
for url in arr:
if i < num:
urls.append(url.contents[0])
i = i + 1
else:
break
return urls
async def fetch(sem,url, session):
global proxy
global count
try:
async with session.get(url,proxy="http://"+random.choice(proxy)) as response:
count = count + 1
body = await response.read()
print(str(count) + " " + url)
return body
except Exception as e:
print(e)
async def bound_fetch(sem, url, session):
# Getter function with semaphore.
async with sem:
return await fetch(sem, url, session)
async def run(r):
urls=geturls(r)
tasks = []
#одноврменно можно до 50 коннектов
sem = asyncio.Semaphore(50)
async with ClientSession() as session:
for url in urls:
task = asyncio.ensure_future(bound_fetch(sem, url, session))
tasks.append(task)
responses = asyncio.gather(*tasks)
await responses
#200 - выбираем 200 урлов из sitemap для теста
number = 200
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(number))
loop.run_until_complete(future)
Answer the question
In order to leave comments, you need to log in
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question