I
I
Ivan Yakushenko2019-07-10 13:46:51
Python
Ivan Yakushenko, 2019-07-10 13:46:51

Why is MongoDB not writing to multiple threads?

When studying MongoDB, I came across the possibility of using ordered (1, 2, 3...) instead of the generated _id value:

def get_next_sequence(collection, name):
    return collection.find_and_modify({'_id': name}, update={'$inc': {'seq': 1}}, new=True).get('seq')


def insert_in_db():
    client = MongoClient(mongo_url)
    db = client['']
    collection = db['']
    print(collection.insert_one({'_id': get_next_sequence(collection, 'userid'), 'value': f'{random.randint(10000, 2147483647)}'}))
    client.close()

The variant works, but I was confused by the possibility of using it in several threads. If I understand correctly, then the script executes 2 queries to the database:
1. Finds out the value of the last element
2. Assigns a new element with a value of +1
If several queries access it simultaneously? Started checking:
with Pool(processes=200) as pool:
    for _ in range(100000):
        pool.apply_async(insert_in_db)
    pool.close()
    pool.join()

I started to increase the number of streams a little bit - from values ​​of 50 to 1000.
Up to the number of streams of 200-250, there were no problems at all - everything was recorded, after that recording began to skip. Especially noticeable after 400 threads. With 1000 threads, out of 100,000 write requests in the database, there were only a little more than 90,000, and there were no errors. Now I have 2 assumptions:
1. The server with MongoDB (3 cores, 4GB of RAM) does not withstand. When the script was running, there were moments when it simply stopped for a few seconds, sometimes for 10-30 seconds. Apparently he couldn't connect. Accordingly, some streams simply could not connect and the recording did not occur.
2. Several threads are contacting, everyone receives information that at the moment, for example, there are 1389 records in the database and they are trying to write the record at number 1389. It is confusing that there are no write errors in the output, although the idea should be.

Answer the question

In order to leave comments, you need to log in

1 answer(s)
T
Taus, 2019-07-10
@kshnkvn

You don't get execution result in created processes from pool.apply_async. This is bad practice because when code is executed in child processes, there may be exceptions that should be handled in the main process. Read more documentation . Example:

import multiprocessing

def f():
    raise ValueError()

with multiprocessing.Pool() as pool:
    for _ in range(10):
        pool.apply_async(f) # no errors

with multiprocessing.Pool() as pool:
    for _ in range(10):
        result = pool.apply_async(f)
        result.get(timeout=1) # raise ValueError

Since you have such processing, we can assume the following. For a certain number of processes spawned, the spawn MongoClientor query .insert_one|.find_and_modifythrows an exception associated with some timeout being exceeded (see mongo_client optional arguments and exceptions )

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question