M
M
Matvey Nosurname2021-10-08 21:27:06
Python
Matvey Nosurname, 2021-10-08 21:27:06

How can I fix the code to make this work in multi-threaded mode?

There is a code that works in single-threaded mode, I need to compare the running time of the same function, but parallelized:

from time import process_time


def create_matrix(n):
    res_list = []
    for i_ind in range(n):
        res_list.append(list())
        for j_ind in range(n):
            res_list[i_ind].append(0)

    return res_list


def get_counting_func(q_vector, v_vector):
    q_matrix = [[*q_vector] for i in range(len(q_vector))]
    v_matrix = [[i for j in range(len(v_vector))] for i in v_vector]

    def t_func(i_start, i_len, j_start, j_len):
        global matrix
        for i in range(i_start, i_start + i_len):
            for j in range(j_start, j_start + j_len):
                matrix[i][j] = ((q_matrix[i][j]) ** 2 + (v_matrix[i][j]) ** 2) ** 0.5
     return t_func


n_el = 5000
matrix = create_matrix(n_el)

q_vect = [(i + 11) ** 2 for i in range(n_el)]
p_vect = [(i * 3 + 13) * 17 for i in range(n_el)]

counting_func = get_counting_func(q_vect, p_vect)

start_time = process_time()

counting_func(0, n_el, 0, n_el)

print(process_time() - start_time, "секунд")


But in a multithreaded one (an error occurs: AttributeError: Can't pickle local object 'get_counting_func..t_func'):
from time import process_time
from multiprocessing import Process


def create_matrix(n):
    res_list = []
    for i_ind in range(n):
        res_list.append(list())
        for j_ind in range(n):
            res_list[i_ind].append(0)

    return res_list


def get_counting_func(q_vector, v_vector):
    q_matrix = [[*q_vector] for i in range(len(q_vector))]
    v_matrix = [[i for j in range(len(v_vector))] for i in v_vector]

    def t_func(i_start, i_len, j_start, j_len):
        global matrix
        for i in range(i_start, i_start + i_len):
            for j in range(j_start, j_start + j_len):
                matrix[i][j] = (t_func.q_matrix[i][j]**2 + t_func.v_matrix[i][j]**2) ** 0.5

    t_func.q_matrix = q_matrix
    t_func.v_matrix = v_matrix

    return t_func


n_el = 5000
matrix = create_matrix(n_el)

q_vect = [(i + 11) ** 2 for i in range(n_el)]
p_vect = [(i * 3 + 13) * 17 for i in range(n_el)]

counting_func = get_counting_func(q_vect, p_vect)
prepared_data = [(i, j) for i in range(n_el) for j in range(n_el)]

process_part = 1000
processes = [Process(target=counting_func, args=(i, process_part, 0, n_el)) for i in range(0, 5000, process_part)]

start_time = process_time()

for i in processes:
    i.start()

print(process_time() - start_time, "секунд")

Answer the question

In order to leave comments, you need to log in

[[+comments_count]] answer(s)
S
ScriptKiddo, 2021-10-09
@matweykai

If there is a question of optimization, then you can use numpy: the difference is an order of magnitude

Naive  13.342852999999998 seconds
Optimized  0.22429799999999744 seconds
Unique: [ True] Counts: [25000000]

Source

import time
from time import process_time

import numpy

matrix = []


def naive():
    from time import process_time

    def create_matrix(n):
        res_list = []
        for i_ind in range(n):
            res_list.append(list())
            for j_ind in range(n):
                res_list[i_ind].append(0)

        return res_list

    def get_counting_func(q_vector, v_vector):
        q_matrix = [[*q_vector] for i in range(len(q_vector))]
        v_matrix = [[i for j in range(len(v_vector))] for i in v_vector]

        def t_func(i_start, i_len, j_start, j_len):
            global matrix
            for i in range(i_start, i_start + i_len):
                for j in range(j_start, j_start + j_len):
                    matrix[i][j] = ((q_matrix[i][j]) ** 2 + (v_matrix[i][j]) ** 2) ** 0.5

        return t_func

    n_el = 5000
    global matrix
    matrix = create_matrix(n_el)

    q_vect = [(i + 11) ** 2 for i in range(n_el)]
    p_vect = [(i * 3 + 13) * 17 for i in range(n_el)]

    counting_func = get_counting_func(q_vect, p_vect)

    start_time = process_time()

    counting_func(0, n_el, 0, n_el)

    print('Naive ', process_time() - start_time, "seconds")
    return matrix


def optimized():
    start_time = process_time()

    n_el = 5000
    res = numpy.fromfunction(lambda i, j: (((i * 3 + 13) * 17) ** 2 + (j + 11) ** 4) ** 0.5, (n_el, n_el)
                             , dtype=numpy.float32)

    print('Optimized ', process_time() - start_time, "seconds")

    return res

if __name__ == '__main__':
    first = naive()
    second = optimized()

    unique, counts = numpy.unique(numpy.isclose(second, numpy.array(first)), return_counts=True)
    print(f'Unique: {unique} Counts: {counts}')

V
Vindicar, 2021-10-08
@Vindicar

This is multiprocessing, not multithreading. The difference is VERY significant, since different processes have different address spaces, and data between them has to be sent with serialization via pickle.
So functions that run in another process should, if possible, only work with Python primitives, as well as lists and dictionaries. I'm not sure how it works with nested functions...

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question