V
V
vikholodov2018-03-05 14:34:37
PostgreSQL
vikholodov, 2018-03-05 14:34:37

Why is the import so slow?

Good afternoon, colleagues!
I import products with api, limit 9000 requests per minute, took a cloud VDS with 12 cores, installed 16 processes using an experimental method and fired it up. If you believe the console (I decided to measure the processing time of each "mow" by each process separately, I got about 3-5 seconds, maximum, for each), the lines run very cheerfully in the console. But in reality, about 2000 - 3000 items per minute get into the database, which is sooooo slow (you need 30,000 per minute). What am I doing wrong? I have already sacrificed importing additional images and checking for the presence of goods in other categories (stupidly doing duplicates with reference to other categories), which is already very bad, but there is still no speed.

import random

import time
from django.core.management import BaseCommand
from decimal import Decimal
import multiprocessing
from multiprocessing import Pool
from django.db.models import Count
import zeep
from django.db import transaction
from pytils.translit import slugify

from core.models import Product, Category, ProductImage


def import_category_products(category):
  print('import category %s' % category.name)
  time.sleep(random.uniform(0.1, 3))
  lock = multiprocessing.Lock()
  from django.db import connection
  connection.close()

  allegro_api_key = 'key'

  prod_web_api = 'https://webapi.allegro.pl/service.php?wsdl'
  offset = 0
  while True:
    client = zeep.Client(wsdl=prod_web_api)
    options = {'item': {'filterId': 'category', 'filterValueId': [str(category.catId)]}}

    try:
      result = client.service.doGetItemsList(
        allegro_api_key, 1, filterOptions=options, resultSize=1000, resultOffset=offset, resultScope=2
      )
    except Exception as e:
      print('result exc: %s' % e)
      time.sleep(300)
      continue

    if not result or not result['itemsList']:
      print('finish(%s)' % offset)
      break

    print('products: %s' % len(result['itemsList']['item']))
    lock.acquire()
    print (multiprocessing.current_process())
    print('time loop start:%s' %time.clock() )
    lock.release()
    for product in result['itemsList']['item']:
      
      #if product['priceInfo']['item'][0]['priceType'] == 'bidding':  # priceType, исключить bidding
        #continue

      try:
        images = product['photosInfo']['item']
    
        general_image = None
        for p in images:
          if p['photoSize'] == 'large' and p['photoIsMain'] is True:
            general_image = p['photoUrl']  # большое фото
        if not general_image:
          continue
      except Exception as e:
        continue

      sku = product['itemId']
      category_id = product['categoryId']

      try:
        product_object = Product.objects.get(sku=sku)

      except Product.DoesNotExist:
        product_object = None

      if product_object:
        continue            

      is_disabled = product['endingTime'] 
      if is_disabled == u'Zako\u0144czona':
        continue  # Если статус закончился, то не добавляем

      condition = product['conditionInfo']  # condition new used
      if condition == 'new':
        condition = 'Новый'
      elif condition == 'used':
        condition = 'Б.у.'

      # Польская цена в злотых
      price_pl = Decimal(product['priceInfo']['item'][0]['priceValue']).quantize(Decimal('.00'))

      # умножаем на дефолтный курс, курс каждый день по крону обновляется
      price_ru = (price_pl * Decimal(17.00))

      # Price with shipping
      p_shipping = Decimal(product['priceInfo']['item'][1]['priceValue']).quantize(Decimal('.00'))

      shipping_price_ru = (p_shipping - price_pl) * Decimal(17.00)  # стоимость доставки в рублях
      product_count = product['leftCount']  # Товарный остаток
      sales_count = product['bidsCount']  # Количество продаж
      buyers_count = product['biddersCount']  # Количество покупателей
      name_poland = product['itemTitle']  # Заголовок
      slug = slugify(name_poland) + '-' + str(category_id) + '-' + (str(sku))
      try:
        cat = Category.objects.get(catId=category_id)
      except:
        continue		
      with transaction.atomic():
        product_object = Product.objects.create(
          slug=slug,
          sku=sku,
          price_pl=price_pl,
          price_ru=price_ru,
          name_poland=name_poland,
          condition=condition,
          buyers_count=buyers_count,
          general_image=general_image,
          product_count=product_count,
          sales_count=sales_count,
          pl_shipping_ru=shipping_price_ru,
          is_disabled=False,
          category_id=cat
        )
    lock.acquire()	
    print (multiprocessing.current_process())	
    print('time loop end:%s' %time.clock() )
    lock.release()

    offset += 1000
    

class Command(BaseCommand):
  def handle(self, *args, **options):
    categories = Category.objects.filter(parent=None).exclude(is_main=True).annotate(products_count=Count('categories_products__id')).order_by('products_count').distinct().iterator()
    #categories = Category.objects.filter(parent=None).exclude(is_main=True).iterator()
    with Pool(processes=16) as pool:
      pool.map(import_category_products, categories)

Answer the question

In order to leave comments, you need to log in

1 answer(s)
P
Pasechnik Kuzmich, 2018-03-05
@Hivemaster

When inserting a large number of records into the database, the first thing to try to speed up is bulk-create .

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question