Answer the question
In order to leave comments, you need to log in
Why is the import so slow?
Good afternoon, colleagues!
I import products with api, limit 9000 requests per minute, took a cloud VDS with 12 cores, installed 16 processes using an experimental method and fired it up. If you believe the console (I decided to measure the processing time of each "mow" by each process separately, I got about 3-5 seconds, maximum, for each), the lines run very cheerfully in the console. But in reality, about 2000 - 3000 items per minute get into the database, which is sooooo slow (you need 30,000 per minute). What am I doing wrong? I have already sacrificed importing additional images and checking for the presence of goods in other categories (stupidly doing duplicates with reference to other categories), which is already very bad, but there is still no speed.
import random
import time
from django.core.management import BaseCommand
from decimal import Decimal
import multiprocessing
from multiprocessing import Pool
from django.db.models import Count
import zeep
from django.db import transaction
from pytils.translit import slugify
from core.models import Product, Category, ProductImage
def import_category_products(category):
print('import category %s' % category.name)
time.sleep(random.uniform(0.1, 3))
lock = multiprocessing.Lock()
from django.db import connection
connection.close()
allegro_api_key = 'key'
prod_web_api = 'https://webapi.allegro.pl/service.php?wsdl'
offset = 0
while True:
client = zeep.Client(wsdl=prod_web_api)
options = {'item': {'filterId': 'category', 'filterValueId': [str(category.catId)]}}
try:
result = client.service.doGetItemsList(
allegro_api_key, 1, filterOptions=options, resultSize=1000, resultOffset=offset, resultScope=2
)
except Exception as e:
print('result exc: %s' % e)
time.sleep(300)
continue
if not result or not result['itemsList']:
print('finish(%s)' % offset)
break
print('products: %s' % len(result['itemsList']['item']))
lock.acquire()
print (multiprocessing.current_process())
print('time loop start:%s' %time.clock() )
lock.release()
for product in result['itemsList']['item']:
#if product['priceInfo']['item'][0]['priceType'] == 'bidding': # priceType, исключить bidding
#continue
try:
images = product['photosInfo']['item']
general_image = None
for p in images:
if p['photoSize'] == 'large' and p['photoIsMain'] is True:
general_image = p['photoUrl'] # большое фото
if not general_image:
continue
except Exception as e:
continue
sku = product['itemId']
category_id = product['categoryId']
try:
product_object = Product.objects.get(sku=sku)
except Product.DoesNotExist:
product_object = None
if product_object:
continue
is_disabled = product['endingTime']
if is_disabled == u'Zako\u0144czona':
continue # Если статус закончился, то не добавляем
condition = product['conditionInfo'] # condition new used
if condition == 'new':
condition = 'Новый'
elif condition == 'used':
condition = 'Б.у.'
# Польская цена в злотых
price_pl = Decimal(product['priceInfo']['item'][0]['priceValue']).quantize(Decimal('.00'))
# умножаем на дефолтный курс, курс каждый день по крону обновляется
price_ru = (price_pl * Decimal(17.00))
# Price with shipping
p_shipping = Decimal(product['priceInfo']['item'][1]['priceValue']).quantize(Decimal('.00'))
shipping_price_ru = (p_shipping - price_pl) * Decimal(17.00) # стоимость доставки в рублях
product_count = product['leftCount'] # Товарный остаток
sales_count = product['bidsCount'] # Количество продаж
buyers_count = product['biddersCount'] # Количество покупателей
name_poland = product['itemTitle'] # Заголовок
slug = slugify(name_poland) + '-' + str(category_id) + '-' + (str(sku))
try:
cat = Category.objects.get(catId=category_id)
except:
continue
with transaction.atomic():
product_object = Product.objects.create(
slug=slug,
sku=sku,
price_pl=price_pl,
price_ru=price_ru,
name_poland=name_poland,
condition=condition,
buyers_count=buyers_count,
general_image=general_image,
product_count=product_count,
sales_count=sales_count,
pl_shipping_ru=shipping_price_ru,
is_disabled=False,
category_id=cat
)
lock.acquire()
print (multiprocessing.current_process())
print('time loop end:%s' %time.clock() )
lock.release()
offset += 1000
class Command(BaseCommand):
def handle(self, *args, **options):
categories = Category.objects.filter(parent=None).exclude(is_main=True).annotate(products_count=Count('categories_products__id')).order_by('products_count').distinct().iterator()
#categories = Category.objects.filter(parent=None).exclude(is_main=True).iterator()
with Pool(processes=16) as pool:
pool.map(import_category_products, categories)
Answer the question
In order to leave comments, you need to log in
When inserting a large number of records into the database, the first thing to try to speed up is bulk-create .
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question