mercredi 24 août 2016

Locking-Free queue- memory corruption

My testing code

#include <iostream>
#include <thread>
#include <functional>
#include "LockFreeQueue.hpp"

using namespace std;

template<typename T>
void pusher(int data, LockFreeQueue2<T>& cnd){
    int counter = 10000, i =0;
       while( i < counter )   {
        cnd.push(data);
        i++;
    }
}

template<typename T>
void popper(LockFreeQueue2<T>& cnd){
    int counter = 10000, i =0;
    while( i < counter )   {
        T t;
        std::unique_ptr<T> ptr = cnd.pop();
        if( ptr.get()) std::cout << *ptr << std::endl;
        else std::cout << "EMPTY\n";
        i++;
    }
}

int main() {

   LockFreeQueue2<int> cnd;

   std::thread pusher1(std::bind(pusher<int>, 1, std::ref(cnd)));
   std::thread pusher2(std::bind(pusher<int>, 2, std::ref(cnd)));


    std::thread popper1(std::bind(popper<int>, std::ref(cnd)));
    std::thread popper10(std::bind(popper<int>, std::ref(cnd)));
    std::thread popper11(std::bind(popper<int>, std::ref(cnd)));
    std::thread popper12(std::bind(popper<int>, std::ref(cnd)));
    std::thread popper13(std::bind(popper<int>, std::ref(cnd)));
    std::thread pusher3(std::bind(pusher<int>, 3, std::ref(cnd)));
    std::thread pusher4(std::bind(pusher<int>, 4, std::ref(cnd)));

    std::thread pusher8(std::bind(pusher<int>, 4, std::ref(cnd)));

    std::thread popper9(std::bind(popper<int>, std::ref(cnd)));

    std::thread pusher5(std::bind(pusher<int>, 5, std::ref(cnd)));
    std::thread pusher7(std::bind(pusher<int>, 7, std::ref(cnd)));
    std::thread pusher9(std::bind(pusher<int>, 5, std::ref(cnd)));
    std::thread pusher10(std::bind(pusher<int>, 6, std::ref(cnd)));


    std::thread popper2(std::bind(popper<int>, std::ref(cnd)));
    std::thread popper3(std::bind(popper<int>, std::ref(cnd)));
    std::thread popper4(std::bind(popper<int>, std::ref(cnd)));
    std::thread popper5(std::bind(popper<int>, std::ref(cnd)));



    pusher1.join();
    pusher3.join();
    pusher2.join();
    pusher4.join();
    popper10.join();
    popper11.join();
    pusher5.join();
    pusher7.join();
    pusher8.join();


    popper1.join();
    popper2.join();
    popper3.join();
    popper4.join();
    popper5.join();

    popper9.join();
    popper12.join();
    popper13.join();


    pusher9.join();
    pusher10.join();

    return 0;
}

And implementation of LockFreeQueue: LockFreeQueue.hpp

#include <stdint.h>
#include <memory>
#include <atomic>
#include <iostream>

template<typename T>
class LockFreeQueue2 {
private:
    struct Node;
    struct CountedNode {
        CountedNode() noexcept : node(nullptr), external_counter(0)   {}
        CountedNode(const T& data) noexcept : node(new Node(data, this)), external_counter(0)  {}
        CountedNode(Node* node) noexcept : node(node), external_counter(0)  {}
        int64_t external_counter;
        Node* node;
    };
    struct Node {
        Node(const T& data, CountedNode* n) : data( new T(data)), next(nullptr), internal_counter(0), backNode(n), derived(0), tail_external(0) {}
        Node() : data(nullptr), next(nullptr), internal_counter(0), backNode(nullptr), tail_external(0), derived(0) {}
        std::atomic<T*> data;
        std::atomic<CountedNode*> next;
        std::atomic<CountedNode*> backNode;
        std::atomic<int64_t> internal_counter;
        std::atomic<int64_t> tail_external;
        std::atomic<int64_t> derived;


    };
    std::atomic<CountedNode> head;
    std::atomic<CountedNode> tail;


public:
    LockFreeQueue2()  {
        Node* node = new Node();
        head.store(CountedNode(node));
        tail.store(CountedNode(node));
    }

    void push(const T& data) {
        CountedNode* newNode = new CountedNode(data), *nullNode, currTail, incrementedCurrTail;
        while(true) {
            nullNode = nullptr;
            currTail = tail.load();
            do {
                incrementedCurrTail = currTail;
                incrementedCurrTail.external_counter++;
            } while( !tail.compare_exchange_strong(currTail, incrementedCurrTail));

            if( currTail.node->next.compare_exchange_strong(nullNode, newNode)) {
                moveNext(incrementedCurrTail);
                return;
            } else
                moveNext(incrementedCurrTail);
        }
    }
    void moveNext(CountedNode& currTail) {
        CountedNode* next = currTail.node->next.load();
        CountedNode copyCurrTail = currTail;

        int64_t curr = currTail.node->tail_external.load();
        do {
            if( currTail.external_counter <= curr) break;
        } while( !currTail.node->tail_external.compare_exchange_strong(curr, currTail.external_counter));

        tail.compare_exchange_strong(copyCurrTail, *next);

        decreaseNode(currTail.node);
    }

    void pop_node(CountedNode &countedNode) {
        int64_t sum = countedNode.node->tail_external.load() + countedNode.external_counter;

        countedNode.node->internal_counter += sum;
        decreaseNode(countedNode.node);
    }

    void decreaseNode(Node *node) {
        if( node->internal_counter.fetch_sub(1) == 1) {
        delete node;
        }
    }


    std::unique_ptr<T> pop() {
        CountedNode currHead, incrementedCurrHead, *nullCountedNode = nullptr;
        while(true) {
            nullCountedNode = nullptr;
            currHead = head.load();
            do {
                incrementedCurrHead = currHead;
                incrementedCurrHead.external_counter++;
            } while (!head.compare_exchange_strong(currHead, incrementedCurrHead));

            CountedNode copyIncrementedHead = incrementedCurrHead;

            if(!currHead.node->next.compare_exchange_strong(nullCountedNode, nullCountedNode)) {

                CountedNode next = *nullCountedNode;
                int64_t diff = incrementedCurrHead.external_counter - incrementedCurrHead.node->derived.load();
                next.external_counter = diff;
                int64_t curr = next.node->derived.load();
                do {
                    if( diff  <= curr) break;
                } while( !next.node->derived.compare_exchange_strong(curr, diff));

                if (head.compare_exchange_strong(incrementedCurrHead, next)) {
                    T *ptr = next.node->data.load();
                    decreaseNode(next.node);
                    pop_node(copyIncrementedHead);
                    return std::unique_ptr<T>(ptr);
                } else {
                    decreaseNode(next.node);
                    decreaseNode(copyIncrementedHead.node);
                }
            } else {
                decreaseNode(copyIncrementedHead.node);
                return std::unique_ptr<T>();
            }
        }
    }
};

I know about memory leak- I have to write destructor for that, I haven't done it yet because I have other problems:

When I ignore memory freeing ( I mean when I comment thedelete node in the decreaseNode function it seems to work and there is no memory problems ( like access to freed memory) indicated by valgrind ( I skip memory leak).

But, as you can see I try to free memory so I use counters ( every thread before dereferencing must increment counter).

What is the problem? It usually work without problem but sometimes it got SIGSEGV or something similar. Valgrind indicates a problems ( see below) but I cannot imagine what situation can lead to that faulty situation and in a result I am not able to repair it. So, I am asking to help me find that situation and suggest me how to repair it. I tried find it within some days.

    ==24750== Invalid read of size 8
==24750==    at 0x418971: fetch_sub (atomic_base.h:524)
==24750==    by 0x418971: LockFreeQueue2<int>::decreaseNode(LockFreeQueue2<int>::Node*) (LockFreeQueue2.hpp:105)
==24750==    by 0x418754: LockFreeQueue2<int>::moveNext(LockFreeQueue2<int>::CountedNode&) (LockFreeQueue2.hpp:90)
==24750==    by 0x4173DC: LockFreeQueue2<int>::push(int const&) (LockFreeQueue2.hpp:73)
==24750==    by 0x4169E2: void pusher<int>(int, LockFreeQueue2<int>&) (main.cpp:100)
==24750==    by 0x41A9E6: void std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)>::__call<void, , 0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) (functional:1074)
==24750==    by 0x41A836: void std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)>::operator()<, void>() (functional:1133)
==24750==    by 0x41A7A3: void std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()>::_M_invoke<>(std::_Index_tuple<>) (functional:1531)
==24750==    by 0x41A667: std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()>::operator()() (functional:1520)
==24750==    by 0x41A571: std::thread::_Impl<std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()> >::_M_run() (thread:115)
==24750==    by 0x4EF504F: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22)
==24750==    by 0x53DF6F9: start_thread (pthread_create.c:333)
==24750==    by 0x56FBB5C: clone (clone.S:109)
==24750==  Address 0x100072f8 is 24 bytes inside a block of size 48 free'd
==24750==    at 0x4C2F144: operator delete(void*) (vg_replace_malloc.c:575)
==24750==    by 0x4189A1: LockFreeQueue2<int>::decreaseNode(LockFreeQueue2<int>::Node*) (LockFreeQueue2.hpp:109)
==24750==    by 0x418754: LockFreeQueue2<int>::moveNext(LockFreeQueue2<int>::CountedNode&) (LockFreeQueue2.hpp:90)
==24750==    by 0x417401: LockFreeQueue2<int>::push(int const&) (LockFreeQueue2.hpp:76)
==24750==    by 0x4169E2: void pusher<int>(int, LockFreeQueue2<int>&) (main.cpp:100)
==24750==    by 0x41A9E6: void std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)>::__call<void, , 0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) (functional:1074)
==24750==    by 0x41A836: void std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)>::operator()<, void>() (functional:1133)
==24750==    by 0x41A7A3: void std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()>::_M_invoke<>(std::_Index_tuple<>) (functional:1531)
==24750==    by 0x41A667: std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()>::operator()() (functional:1520)
==24750==    by 0x41A571: std::thread::_Impl<std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()> >::_M_run() (thread:115)
==24750==    by 0x4EF504F: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22)
==24750==    by 0x53DF6F9: start_thread (pthread_create.c:333)
==24750==  Block was alloc'd at
==24750==    at 0x4C2E118: operator new(unsigned long) (vg_replace_malloc.c:333)
==24750==    by 0x41843B: LockFreeQueue2<int>::CountedNode::CountedNode(int const&) (LockFreeQueue2.hpp:19)
==24750==    by 0x41731D: LockFreeQueue2<int>::push(int const&) (LockFreeQueue2.hpp:63)
==24750==    by 0x4169E2: void pusher<int>(int, LockFreeQueue2<int>&) (main.cpp:100)
==24750==    by 0x41A9E6: void std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)>::__call<void, , 0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) (functional:1074)
==24750==    by 0x41A836: void std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)>::operator()<, void>() (functional:1133)
==24750==    by 0x41A7A3: void std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()>::_M_invoke<>(std::_Index_tuple<>) (functional:1531)
==24750==    by 0x41A667: std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()>::operator()() (functional:1520)
==24750==    by 0x41A571: std::thread::_Impl<std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()> >::_M_run() (thread:115)
==24750==    by 0x4EF504F: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22)
==24750==    by 0x53DF6F9: start_thread (pthread_create.c:333)
==24750==    by 0x56FBB5C: clone (clone.S:109)

P.S. When I uncomment delete node it sometimes loop infinitelly. When I comment it always finishes.

Aucun commentaire:

Enregistrer un commentaire