M
M
Maxim2017-02-24 15:07:35
Django
Maxim, 2017-02-24 15:07:35

Why is periodicity in celery not done asynchronously?

Hello everyone, I'm dealing with celery and there is a problem
Every minute the period task is executed

# Список отправляемых сообщений
@periodic_task(
    run_every=(crontab(minute='*/1'))
)
def message_delayed():
    # if request.method == 'GET':
        mes = Message.objects.filter(status='scheduled').values('owner_id', 'message_id', 'timestamp')
        # получаем id сообщений которые соответствуют времени отправки
        send_dlist = {i['message_id']: get_token(i['owner_id']) for i in mes if
                      i['timestamp'] in range(round(time.time() - 60), round(time.time()))}
        print('TIME ', range(round(time.time() - 60), round(time.time())))
        print('SEND LIST ', send_dlist)


        user_id = 259
        user = UserProfile.objects.get(id=user_id)


        scheduled_req.delay(send_dlist)

@task
def scheduled_req(send_dlist):
    from api.models import Scheduled
    for i in send_dlist:
        url = 'http://######8080/api/v1/message/send'
        data = dict(token=send_dlist.get(i), message_id=i)
        req = requests.post(url, data=json.dumps(data))

        try:
            Scheduled.objects.create(response=req.json())
        except Exception as e:
            print(str(e))

In general, I missed something, it turns out that the server is waiting for the scheduled_req to complete the sending, here are the logs
11-50 checked that before sending
11-51, too,
11-52 received messages that need to be sent
11-53, as you can see, the task was received, but the print does not work, as if waiting for the previous ones to go
, I can’t figure it out ...
[2017-02-24 11:50:00,042: WARNING/PoolWorker-1] TIME
[2017-02-24 11:50:00,043: WARNING/PoolWorker-1] range(1487936940, 1487937000)
[2017-02-24 11:50:00,043: WARNING/PoolWorker-1] SEND LIST
[2017-02-24 11:50:00,043: WARNING/PoolWorker-1] {}
[2017-02-24 11:50:00,057: INFO/MainProcess] Received task: api.controllers.message.scheduled_req[00362657-8e66-4e0a-852e-50bfe71468ed]  
[2017-02-24 11:50:00,061: INFO/PoolWorker-1] Task api.controllers.message.message_delayed[deb42bc9-5736-4b6e-97c6-6ffe7499d35a] succeeded in 0.021350613998947665s: None
[2017-02-24 11:50:00,064: INFO/PoolWorker-1] Task api.controllers.message.scheduled_req[00362657-8e66-4e0a-852e-50bfe71468ed] succeeded in 0.0007557420176453888s: None
[2017-02-24 11:51:00,027: INFO/MainProcess] Received task: api.controllers.message.message_delayed[3a265703-5083-4003-ad0f-a856fe729e1d]  
[2017-02-24 11:51:00,035: WARNING/PoolWorker-1] TIME
[2017-02-24 11:51:00,035: WARNING/PoolWorker-1] range(1487937000, 1487937060)
[2017-02-24 11:51:00,036: WARNING/PoolWorker-1] SEND LIST
[2017-02-24 11:51:00,036: WARNING/PoolWorker-1] {}
[2017-02-24 11:51:00,040: INFO/MainProcess] Received task: api.controllers.message.scheduled_req[2692fa0f-c6fa-43b8-bab8-c7b397e2c1bd]  
[2017-02-24 11:51:00,041: INFO/PoolWorker-1] Task api.controllers.message.message_delayed[3a265703-5083-4003-ad0f-a856fe729e1d] succeeded in 0.010261352988891304s: None
[2017-02-24 11:51:00,044: INFO/PoolWorker-1] Task api.controllers.message.scheduled_req[2692fa0f-c6fa-43b8-bab8-c7b397e2c1bd] succeeded in 0.0005564790044445544s: None
[2017-02-24 11:52:00,043: INFO/MainProcess] Received task: api.controllers.message.message_delayed[b082c5f4-7ba5-41a3-be65-ea06c627dc53]  
[2017-02-24 11:52:00,072: WARNING/PoolWorker-1] TIME
[2017-02-24 11:52:00,072: WARNING/PoolWorker-1] range(1487937060, 1487937120)
[2017-02-24 11:52:00,073: WARNING/PoolWorker-1] SEND LIST
[2017-02-24 11:52:00,073: WARNING/PoolWorker-1] {'33_10': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_17': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_16': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_18': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_14': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_5': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_6': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_13': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_15': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_8': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_3': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_0': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_19': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_12': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_4': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_1': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_2': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_9': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_11': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_7': 'e276bd68-f9b7-11e6-a095-52540010ddb4'}
[2017-02-24 11:52:00,079: INFO/MainProcess] Received task: api.controllers.message.scheduled_req[8003eb93-cf6a-4ee2-977c-bcd81b758486]  
[2017-02-24 11:52:00,080: INFO/PoolWorker-1] Task api.controllers.message.message_delayed[b082c5f4-7ba5-41a3-be65-ea06c627dc53] succeeded in 0.035682129993801937s: None
[2017-02-24 11:52:00,086: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:52:17,179: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:52:34,951: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:52:50,855: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:53:00,031: INFO/MainProcess] Received task: api.controllers.message.message_delayed[42b9e874-62de-4a0a-a976-02c1fe29f5c9]  
[2017-02-24 11:53:06,045: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:53:20,860: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:53:36,361: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:53:51,889: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:54:00,047: INFO/MainProcess] Received task: api.controllers.message.message_delayed[36a575ac-cade-481d-897f-f7bf722a4d15]  
[2017-02-24 11:54:13,885: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:54:30,652: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:54:46,091: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:55:00,008: INFO/MainProcess] Received task: api.controllers.message.message_delayed[a1ae26d3-61b7-402c-97a6-d798a89d7ad9]  
[2017-02-24 11:55:01,357: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:55:16,274: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:55:31,450: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:55:46,190: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:56:00,035: INFO/MainProcess] Received task: api.controllers.message.message_delayed[7f0e576a-0004-4837-a973-ce76e5ed5854]  
[2017-02-24 11:56:01,688: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:56:16,942: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha

Answer the question

In order to leave comments, you need to log in

1 answer(s)
I
iMrDron, 2017-02-24
@iMrDron

In addition to Maxim's answer, I remembered that I was at the report where they told how to use asyncio to download some data from around VK in a day, see if it can be useful for your implementation - https://youtu.be/8wvQGRJiKdY?t= 1699
In fact, you can start by not changing everything directly, just make task tasks on asyncio, so that not 10k tasks, but 10 tasks, and each would do 1000 requests using asyncio. Well, if there are 10 workers at the same time, then in theory everything will work at the same time.

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question