A
A
Andrey_Dolg2020-06-19 19:30:15
Python
Andrey_Dolg, 2020-06-19 19:30:15

How to monitor the script's work?

There is a script on the server running 24/7 but there is a rare situation when it literally freezes memory consumption is normal and the process in the process list is not marked as a zombie. In the logs, if they do not lie, it stops somewhere in the chain of polling aiohttp urls (in try) in an asynchronous function. There are no errors, no failures, which is sad, this error even bypasses the timeouts itself without failures. I thought to solve this for now in a slightly crooked way to monitor the load on the CPU on the server and if it is less than 10% for more than 5-10 minutes, then restart the script. Since now the event loop is being parallelized by processes, it is possible to somehow hang an execution timeout on the processing pool map, but since there is no full trust in the logs, I still wanted to tie the process usage monitoring to the main thread. Actually the question is how is this usually done and what should be taken into account?

# #############################   All time running  loop inside 1 thread child
# init 2 process pool
pool = multiprocessing.Pool(processes=2)
while True:


    # create list of proxy with size == chunk size
    len_of_proxy = self.n * len(areas_of_lock)
    proxy_optimize = random.choices([i for i in self.weight_dict.keys() if i in self.proxy_list],
                                    [v for k, v in self.weight_dict.items() if k in self.proxy_list],
                                    k=len_of_proxy)

    # Do chunks
    # 1 chunk to 1 process
    ids_chunks = [(ids[i:i + self.n], proxy_optimize, self.tcp_speed) for i in range(0, len(ids), self.n)]

    results = []
    proxy_analize = []

    del proxy_optimize
    
    # Send chanks to our multiprocessing pool
    unpuck_results = pool.map(get_results, ids_chunks)

    [(results.extend(i[0]), proxy_analize.extend(i[1])) for i in unpuck_results]


    self.analize(results, self.extend_id, self.proxy_list)
    self.proxy_balancer(proxy_analize, self.weight_dict)

# ######################################


# ######################################## multiprocessing and aiohttp part request sequence of urls

def get_results(ids):
    """
    Process fuction create new loop and send requests in separate process
    :param ids: 
    :return: 
    """
    proxy_list = ids[1]
    #weight_dict = ids[2]
    tcp_speed = ids[2]
    loop = asyncio.new_event_loop()
    results, proxy_analize =  loop.run_until_complete(
        requests_new_items(ids[0], proxy_list,tcp_speed))
    return results, proxy_analize


async def requests_new_items(ids,param_proxy_list, tcp_speed):
    """
    Generate tasks
    :param ids:
    :param param_proxy_list:
    :param tcp_speed:
    :return:
    """
    connector = aiohttp.TCPConnector(limit=tcp_speed, force_close=True, ssl=False) # One time connection limit
    client_timeout = aiohttp.ClientTimeout(connect=13, total=14)
    urls_sub_sets_optimizer = []

    # generate subset of urls usual repeat subset indexes like [0,0,0,0,0,0,0,1,1,1,1,1,1,1,2,2,2,2,2]
    [urls_sub_sets_optimizer.extend([i]*len(ids)) for i in range(len(urls_sub_sets)) ]
    async with aiohttp.ClientSession(connector=connector, timeout=client_timeout) as session:
        responce_list = await asyncio.gather(*[fetch_one(session, param_proxy_list[current_index], id_of_url,
                                                     urls_sub_sets_optimizer[current_index]) for
                                               current_index, id_of_url in enumerate(ids * len(urls_sub_sets))])

    new_item_or_except = [i for i in responce_list if i[0]]
    proxy_analize = [(i[1],i[2]) for i in responce_list if not i[0]]
    return  new_item_or_except, proxy_analize   #

async def fetch_one(session, proxy, id_of_url,sub_url):
    """
     Request to server
    :param session:
    :param proxy:
    :param id_of_url:
    :param sub_url:
    :return:
    """
    try:

        await_time = time.perf_counter()
        result = await session.head(''.join((url_part_1,url_part_2,url_part_3,url_part_4)),
                                    proxy= ''.join(('http://',proxy)),
                                    headers=headers_const)

        if not result.status==status_number:
            return (id_of_url,result.status,sub_url)

        return (None,proxy, time.perf_counter()-await_time)
    except Exception as err:
        print(err)
        return (id_of_url,proxy)
# ############################################

Answer the question

In order to leave comments, you need to log in

1 answer(s)
B
bkosun, 2020-06-20
@Andrey_Dolg

Use a tool that allows you to monitor processes, such as Supervisor

autorestart=true - restart the worker if it crashes for some reason;
stopsignal=KILL - signal to stop (kill) the process. If not defined, the default command, TERM, is used;

https://ruhighload.com/%D0%97%D0%B0%D0%BF%D1%83%D1...

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question