dimanche 21 avril 2019

Racing conditions in C++ (concurrently reading Hash Maps and Vectors)

I have been working on a C++ application that answers to some SQL queries using indices only. For the "first phase" of the following code, I need to create some Hash Maps (multiple threads write these structures with an appropriate mutex for each Hash Map). Then, for the "second phase" the program Modifies the result tables RC & R (again with multiple threads) depending on the data inside the Hash Maps (at this point I don't use any kind of locking since there are only reads for the Hash Maps). Is it okay to read (concurrently) from structures like Hash Maps and vectors without locking and (if this assumption is correct) which part of my code causes the racing conditions?

I tried to keep the multi-threading nature in one of the two "phases" of the program. By setting "1" in place of the "NUM_THREADS" in lines : 422, 425, 431, 433 and 437 (therefore disabling multi-threading for the second phase that reads the Hash Maps without locking) Ι am able to get the correct results (and they are the same every time i execute this version of the code). However (and this troubles me a lot), if I keep the multi-threaded version of "phase 2" and lock the whole called function (pthread_A2T2Y_worker) with a mutex ( insert "my_mutex.lock();" in line 155 and unlock the mutex right before the return statement), the program doesn't return correct (or stable in each run) results.

static args_threading arguments[NUM_THREADS];


static uint32_t* R;
static int* RC;

static uint64_t** d1_col0_buffer;
static uint64_t** d1_col1_buffer;
static uint64_t** da1_col0_buffer;
unordered_map<uint32_t, vector<vector<uint32_t>>> dt1_hashmap;
std::mutex dt1_lock;

static uint64_t** dt1_col0_buffer;
unordered_map<uint32_t, vector<vector<uint32_t>>> da2_hashmap;
std::mutex da2_lock;

static uint64_t** d2_col0_buffer;
static uint64_t** d2_col1_buffer;
static uint64_t** da2_col0_buffer;
unordered_map<uint32_t, vector<vector<uint32_t>>> dt2_hashmap;
std::mutex dt2_lock;

std::mutex da1_lock;

static uint64_t** dt2_col0_buffer;



void* pthread_A2T2Y_SubQuery_1_worker(void* arguments) {

    args_threading* args = (args_threading *) arguments;

    uint32_t d2_it = args->start;
    uint32_t d2_fragment_size = args->end;
    int thread_id = args->thread_id;

    for (; d2_it < d2_fragment_size; d2_it++) {

        uint32_t d2_col0_element = d2_col0_buffer[0][d2_it];
        uint32_t d2_col1_element = d2_col1_buffer[0][d2_it];

        uint32_t* row_da2 = idx[0]->index_map[d2_col1_element];
        uint32_t da2_col0_bytes = idx[0]->index_map[d2_col1_element+1][0] - row_da2[0];
        if(da2_col0_bytes) {

            uint32_t* da2_col0_ptr = reinterpret_cast<uint32_t *>(&(idx[0]->fragment_data[0][row_da2[0]]));
            uint32_t da2_fragment_size = 0;
            A2T2Y_SubQuery_1_da2_col0_decode_UA_threaded(thread_id, da2_col0_ptr, da2_col0_bytes, da2_fragment_size);


            for (uint32_t da2_it = 0; da2_it < da2_fragment_size; da2_it++) {
                uint32_t da2_col0_element = da2_col0_buffer[thread_id][da2_it];
                da2_lock.lock();
                da2_hashmap[da2_col0_element].push_back({d2_col1_element});
                da2_lock.unlock();
            }
        }
    }
    return nullptr;
}


void* pthread_A2T2Y_worker(void* arguments) {

    args_threading* args = (args_threading *) arguments;

    uint32_t d1_it = args->start;
    uint32_t d1_fragment_size = args->end;
    int thread_id = args->thread_id;

    for (; d1_it < d1_fragment_size; d1_it++) {

        uint32_t d1_col0_element = d1_col0_buffer[0][d1_it];
        uint32_t d1_col1_element = d1_col1_buffer[0][d1_it];

        uint32_t* row_da1 = idx[0]->index_map[d1_col1_element];
        uint32_t da1_col0_bytes = idx[0]->index_map[d1_col1_element+1][0] - row_da1[0];
        if(da1_col0_bytes) {

            uint32_t* da1_col0_ptr = reinterpret_cast<uint32_t *>(&(idx[0]->fragment_data[0][row_da1[0]]));
            uint32_t da1_fragment_size = 0;
            A2T2Y_da1_col0_decode_UA_threaded(thread_id, da1_col0_ptr, da1_col0_bytes, da1_fragment_size);

            for (uint32_t da1_it = 0; da1_it < da1_fragment_size; da1_it++) {

                uint32_t da1_col0_element = da1_col0_buffer[0][da1_it];


                dt1_lock.lock();
                bool dt1_flag = !(dt1_hashmap.find(d1_col1_element) == dt1_hashmap.end());
                int dt1_size = dt1_hashmap[d1_col1_element].size();
                dt1_lock.unlock();
                if (dt1_flag) {

                    for(int dt1_iterator=0; dt1_iterator<dt1_size;dt1_iterator++){
                        dt1_lock.lock();
                        vector<uint32_t> dt1_vector = dt1_hashmap[d1_col1_element][dt1_iterator];
                        dt1_lock.unlock();


                        da2_lock.lock();
                        bool da2_flag = !(da2_hashmap.find(da1_col0_element) == da2_hashmap.end());
                        int da2_size = da2_hashmap[da1_col0_element].size();
                        da2_lock.unlock();

                        if (da2_flag) {

                            for(int da2_iterator=0; da2_iterator<da2_size;da2_iterator++){
                                da2_lock.lock();
                                vector<uint32_t> da2_vector = da2_hashmap[da1_col0_element][da2_iterator];

                                uint32_t da2_col0_element = da2_vector[0];
                                da2_lock.unlock();

                                dt2_lock.lock();
                                bool dt2_flag = !(dt2_hashmap.find(da2_col0_element) == dt2_hashmap.end());
                                int dt2_size = dt2_hashmap[da2_col0_element].size();
                                dt2_lock.unlock();

                                if (dt2_flag) {

                                    for(int dt2_iterator=0; dt2_iterator<dt2_size;dt2_iterator++){
                                        dt2_lock.lock();
                                        vector<uint32_t> dt2_vector = dt2_hashmap[da2_col0_element][dt2_iterator];

                                        dt2_lock.unlock();

                                        RC[da1_col0_element] = 1;

                                        pthread_spin_lock(&r_spin_locks[da1_col0_element]);
                                        R[da1_col0_element] += 1;
                                        pthread_spin_unlock(&r_spin_locks[da1_col0_element]);

                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
    return nullptr;
}


extern "C" uint32_t* A2T2Y(int** null_checks) {

    benchmark_t1 = chrono::steady_clock::now();

    d1_col0_buffer = new uint64_t*[NUM_THREADS];
    for (int i=0; i<NUM_THREADS; i++) {
        d1_col0_buffer[i] = new uint64_t[1142934];
    }
    d1_col1_buffer = new uint64_t*[NUM_THREADS];
    for (int i=0; i<NUM_THREADS; i++) {
        d1_col1_buffer[i] = new uint64_t[1142934];
    }
    da1_col0_buffer = new uint64_t*[NUM_THREADS];
    for (int i=0; i<NUM_THREADS; i++) {
        da1_col0_buffer[i] = new uint64_t[5425];
    }

    RC = new int[12765017]();
    R = new uint32_t[12765017]();


    dt1_col0_buffer = new uint64_t*[NUM_THREADS];
    for (int i=0; i<NUM_THREADS; i++) {
        dt1_col0_buffer[i] = new uint64_t[1301347];
    }

    uint64_t d04_list[1];
    d04_list[0] = query_parameters[2];

    for (int d04_it = 0; d04_it<1; d04_it++) {

        uint64_t d04_col0_element = d04_list[d04_it];

        uint32_t* row_dt1 = idx[2]->index_map[d04_col0_element];
        uint32_t dt1_col0_bytes = idx[2]->index_map[d04_col0_element+1][0] - row_dt1[0];
        if(dt1_col0_bytes) {

            uint32_t* dt1_col0_ptr = reinterpret_cast<uint32_t *>(&(idx[2]->fragment_data[0][row_dt1[0]]));
            uint32_t dt1_fragment_size = 0;
            A2T2Y_SubQuery_0_dt1_col0_decode_UA(dt1_col0_ptr, dt1_col0_bytes, dt1_fragment_size);


            for (uint32_t dt1_it = 0; dt1_it < dt1_fragment_size; dt1_it++) {
                uint32_t dt1_col0_element = dt1_col0_buffer[0][dt1_it];

                dt1_lock.lock();
                dt1_hashmap[dt1_col0_element].push_back({});
                dt1_lock.unlock();
            }
        }
    }




    d2_col0_buffer = new uint64_t*[NUM_THREADS];
    for (int i=0; i<NUM_THREADS; i++) {
        d2_col0_buffer[i] = new uint64_t[1142934];
    }
    d2_col1_buffer = new uint64_t*[NUM_THREADS];
    for (int i=0; i<NUM_THREADS; i++) {
        d2_col1_buffer[i] = new uint64_t[1142934];
    }
    da2_col0_buffer = new uint64_t*[NUM_THREADS];
    for (int i=0; i<NUM_THREADS; i++) {
        da2_col0_buffer[i] = new uint64_t[5425];
    }

    uint64_t d03_list[1];
    d03_list[0] = query_parameters[1];

    for (int d03_it = 0; d03_it<1; d03_it++) {

        uint64_t d03_col0_element = d03_list[d03_it];

        uint32_t* row_d2 = idx[1]->index_map[d03_col0_element];
        uint32_t d2_col0_bytes = idx[1]->index_map[d03_col0_element+1][0] - row_d2[0];
        if(d2_col0_bytes) {

            uint32_t* d2_col0_ptr = reinterpret_cast<uint32_t *>(&(idx[1]->fragment_data[0][row_d2[0]]));
            uint32_t d2_fragment_size = 0;
            A2T2Y_SubQuery_1_d2_col0_decode_UA(d2_col0_ptr, d2_col0_bytes, d2_fragment_size);

            uint32_t* d2_col1_ptr = reinterpret_cast<uint32_t *>(&(idx[1]->fragment_data[1][row_d2[1]]));
            A2T2Y_SubQuery_1_d2_col1_decode_UA(d2_col1_ptr, d2_fragment_size);

            uint32_t thread_size = d2_fragment_size/NUM_THREADS;
            uint32_t position = 0;

            for (int i=0; i<NUM_THREADS; i++) {
                arguments[i].start = position;
                position += thread_size;
                arguments[i].end = position;
                arguments[i].thread_id = i;
            }
            arguments[NUM_THREADS-1].end = d2_fragment_size;

            for (int i=0; i<NUM_THREADS; i++) {
                pthread_create(&threads[i], NULL, &pthread_A2T2Y_SubQuery_1_worker, (void *) &arguments[i]);
            }

            for (int i=0; i<NUM_THREADS; i++) {
                pthread_join(threads[i], NULL);
            }
        }
    }




    dt2_col0_buffer = new uint64_t*[NUM_THREADS];
    for (int i=0; i<NUM_THREADS; i++) {
        dt2_col0_buffer[i] = new uint64_t[1301347];
    }

    uint64_t d02_list[1];
    d02_list[0] = query_parameters[3];

    for (int d02_it = 0; d02_it<1; d02_it++) {

        uint64_t d02_col0_element = d02_list[d02_it];

        uint32_t* row_dt2 = idx[2]->index_map[d02_col0_element];
        uint32_t dt2_col0_bytes = idx[2]->index_map[d02_col0_element+1][0] - row_dt2[0];
        if(dt2_col0_bytes) {

            uint32_t* dt2_col0_ptr = reinterpret_cast<uint32_t *>(&(idx[2]->fragment_data[0][row_dt2[0]]));
            uint32_t dt2_fragment_size = 0;
            A2T2Y_SubQuery_2_dt2_col0_decode_UA(dt2_col0_ptr, dt2_col0_bytes, dt2_fragment_size);


            for (uint32_t dt2_it = 0; dt2_it < dt2_fragment_size; dt2_it++) {
                uint32_t dt2_col0_element = dt2_col0_buffer[0][dt2_it];

                dt2_lock.lock();
                dt2_hashmap[dt2_col0_element].push_back({});
                dt2_lock.unlock();
            }
        }
    }


    uint64_t d01_list[1]; //Start of Phase 2
    d01_list[0] = query_parameters[0];

    for (int d01_it = 0; d01_it<1; d01_it++) {

        uint64_t d01_col0_element = d01_list[d01_it];

        uint32_t* row_d1 = idx[1]->index_map[d01_col0_element];
        uint32_t d1_col0_bytes = idx[1]->index_map[d01_col0_element+1][0] - row_d1[0];
        if(d1_col0_bytes) {

            uint32_t* d1_col0_ptr = reinterpret_cast<uint32_t *>(&(idx[1]->fragment_data[0][row_d1[0]]));
            uint32_t d1_fragment_size = 0;
            A2T2Y_d1_col0_decode_UA(d1_col0_ptr, d1_col0_bytes, d1_fragment_size);

            uint32_t* d1_col1_ptr = reinterpret_cast<uint32_t *>(&(idx[1]->fragment_data[1][row_d1[1]]));
            A2T2Y_d1_col1_decode_UA(d1_col1_ptr, d1_fragment_size);

            uint32_t thread_size = d1_fragment_size/NUM_THREADS;
            uint32_t position = 0;

            for (int i=0; i<NUM_THREADS; i++) {
                arguments[i].start = position;
                position += thread_size;
                arguments[i].end = position;
                arguments[i].thread_id = i;
            }
            arguments[NUM_THREADS-1].end = d1_fragment_size;

            for (int i=0; i<NUM_THREADS; i++) {
                pthread_create(&threads[i], NULL, &pthread_A2T2Y_worker, (void *) &arguments[i]);
            }

            for (int i=0; i<NUM_THREADS; i++) {
                pthread_join(threads[i], NULL);
            }
        }
    }

    *null_checks = RC;
    return R;

}

Aucun commentaire:

Enregistrer un commentaire