My testing code
#include <iostream>
#include <thread>
#include <functional>
#include "LockFreeQueue.hpp"
using namespace std;
template<typename T>
void pusher(int data, LockFreeQueue2<T>& cnd){
int counter = 10000, i =0;
while( i < counter ) {
cnd.push(data);
i++;
}
}
template<typename T>
void popper(LockFreeQueue2<T>& cnd){
int counter = 10000, i =0;
while( i < counter ) {
T t;
std::unique_ptr<T> ptr = cnd.pop();
if( ptr.get()) std::cout << *ptr << std::endl;
else std::cout << "EMPTY\n";
i++;
}
}
int main() {
LockFreeQueue2<int> cnd;
std::thread pusher1(std::bind(pusher<int>, 1, std::ref(cnd)));
std::thread pusher2(std::bind(pusher<int>, 2, std::ref(cnd)));
std::thread popper1(std::bind(popper<int>, std::ref(cnd)));
std::thread popper10(std::bind(popper<int>, std::ref(cnd)));
std::thread popper11(std::bind(popper<int>, std::ref(cnd)));
std::thread popper12(std::bind(popper<int>, std::ref(cnd)));
std::thread popper13(std::bind(popper<int>, std::ref(cnd)));
std::thread pusher3(std::bind(pusher<int>, 3, std::ref(cnd)));
std::thread pusher4(std::bind(pusher<int>, 4, std::ref(cnd)));
std::thread pusher8(std::bind(pusher<int>, 4, std::ref(cnd)));
std::thread popper9(std::bind(popper<int>, std::ref(cnd)));
std::thread pusher5(std::bind(pusher<int>, 5, std::ref(cnd)));
std::thread pusher7(std::bind(pusher<int>, 7, std::ref(cnd)));
std::thread pusher9(std::bind(pusher<int>, 5, std::ref(cnd)));
std::thread pusher10(std::bind(pusher<int>, 6, std::ref(cnd)));
std::thread popper2(std::bind(popper<int>, std::ref(cnd)));
std::thread popper3(std::bind(popper<int>, std::ref(cnd)));
std::thread popper4(std::bind(popper<int>, std::ref(cnd)));
std::thread popper5(std::bind(popper<int>, std::ref(cnd)));
pusher1.join();
pusher3.join();
pusher2.join();
pusher4.join();
popper10.join();
popper11.join();
pusher5.join();
pusher7.join();
pusher8.join();
popper1.join();
popper2.join();
popper3.join();
popper4.join();
popper5.join();
popper9.join();
popper12.join();
popper13.join();
pusher9.join();
pusher10.join();
return 0;
}
And implementation of LockFreeQueue: LockFreeQueue.hpp
#include <stdint.h>
#include <memory>
#include <atomic>
#include <iostream>
template<typename T>
class LockFreeQueue2 {
private:
struct Node;
struct CountedNode {
CountedNode() noexcept : node(nullptr), external_counter(0) {}
CountedNode(const T& data) noexcept : node(new Node(data, this)), external_counter(0) {}
CountedNode(Node* node) noexcept : node(node), external_counter(0) {}
int64_t external_counter;
Node* node;
};
struct Node {
Node(const T& data, CountedNode* n) : data( new T(data)), next(nullptr), internal_counter(0), backNode(n), derived(0), tail_external(0) {}
Node() : data(nullptr), next(nullptr), internal_counter(0), backNode(nullptr), tail_external(0), derived(0) {}
std::atomic<T*> data;
std::atomic<CountedNode*> next;
std::atomic<CountedNode*> backNode;
std::atomic<int64_t> internal_counter;
std::atomic<int64_t> tail_external;
std::atomic<int64_t> derived;
};
std::atomic<CountedNode> head;
std::atomic<CountedNode> tail;
public:
LockFreeQueue2() {
Node* node = new Node();
head.store(CountedNode(node));
tail.store(CountedNode(node));
}
void push(const T& data) {
CountedNode* newNode = new CountedNode(data), *nullNode, currTail, incrementedCurrTail;
while(true) {
nullNode = nullptr;
currTail = tail.load();
do {
incrementedCurrTail = currTail;
incrementedCurrTail.external_counter++;
} while( !tail.compare_exchange_strong(currTail, incrementedCurrTail));
if( currTail.node->next.compare_exchange_strong(nullNode, newNode)) {
moveNext(incrementedCurrTail);
return;
} else
moveNext(incrementedCurrTail);
}
}
void moveNext(CountedNode& currTail) {
CountedNode* next = currTail.node->next.load();
CountedNode copyCurrTail = currTail;
int64_t curr = currTail.node->tail_external.load();
do {
if( currTail.external_counter <= curr) break;
} while( !currTail.node->tail_external.compare_exchange_strong(curr, currTail.external_counter));
tail.compare_exchange_strong(copyCurrTail, *next);
decreaseNode(currTail.node);
}
void pop_node(CountedNode &countedNode) {
int64_t sum = countedNode.node->tail_external.load() + countedNode.external_counter;
countedNode.node->internal_counter += sum;
decreaseNode(countedNode.node);
}
void decreaseNode(Node *node) {
if( node->internal_counter.fetch_sub(1) == 1) {
delete node;
}
}
std::unique_ptr<T> pop() {
CountedNode currHead, incrementedCurrHead, *nullCountedNode = nullptr;
while(true) {
nullCountedNode = nullptr;
currHead = head.load();
do {
incrementedCurrHead = currHead;
incrementedCurrHead.external_counter++;
} while (!head.compare_exchange_strong(currHead, incrementedCurrHead));
CountedNode copyIncrementedHead = incrementedCurrHead;
if(!currHead.node->next.compare_exchange_strong(nullCountedNode, nullCountedNode)) {
CountedNode next = *nullCountedNode;
int64_t diff = incrementedCurrHead.external_counter - incrementedCurrHead.node->derived.load();
next.external_counter = diff;
int64_t curr = next.node->derived.load();
do {
if( diff <= curr) break;
} while( !next.node->derived.compare_exchange_strong(curr, diff));
if (head.compare_exchange_strong(incrementedCurrHead, next)) {
T *ptr = next.node->data.load();
decreaseNode(next.node);
pop_node(copyIncrementedHead);
return std::unique_ptr<T>(ptr);
} else {
decreaseNode(next.node);
decreaseNode(copyIncrementedHead.node);
}
} else {
decreaseNode(copyIncrementedHead.node);
return std::unique_ptr<T>();
}
}
}
};
I know about memory leak- I have to write destructor for that, I haven't done it yet because I have other problems:
When I ignore memory freeing ( I mean when I comment thedelete node in the decreaseNode function it seems to work and there is no memory problems ( like access to freed memory) indicated by valgrind ( I skip memory leak).
But, as you can see I try to free memory so I use counters ( every thread before dereferencing must increment counter).
What is the problem? It usually work without problem but sometimes it got SIGSEGV or something similar. Valgrind indicates a problems ( see below) but I cannot imagine what situation can lead to that faulty situation and in a result I am not able to repair it. So, I am asking to help me find that situation and suggest me how to repair it. I tried find it within some days.
==24750== Invalid read of size 8
==24750== at 0x418971: fetch_sub (atomic_base.h:524)
==24750== by 0x418971: LockFreeQueue2<int>::decreaseNode(LockFreeQueue2<int>::Node*) (LockFreeQueue2.hpp:105)
==24750== by 0x418754: LockFreeQueue2<int>::moveNext(LockFreeQueue2<int>::CountedNode&) (LockFreeQueue2.hpp:90)
==24750== by 0x4173DC: LockFreeQueue2<int>::push(int const&) (LockFreeQueue2.hpp:73)
==24750== by 0x4169E2: void pusher<int>(int, LockFreeQueue2<int>&) (main.cpp:100)
==24750== by 0x41A9E6: void std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)>::__call<void, , 0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) (functional:1074)
==24750== by 0x41A836: void std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)>::operator()<, void>() (functional:1133)
==24750== by 0x41A7A3: void std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()>::_M_invoke<>(std::_Index_tuple<>) (functional:1531)
==24750== by 0x41A667: std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()>::operator()() (functional:1520)
==24750== by 0x41A571: std::thread::_Impl<std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()> >::_M_run() (thread:115)
==24750== by 0x4EF504F: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22)
==24750== by 0x53DF6F9: start_thread (pthread_create.c:333)
==24750== by 0x56FBB5C: clone (clone.S:109)
==24750== Address 0x100072f8 is 24 bytes inside a block of size 48 free'd
==24750== at 0x4C2F144: operator delete(void*) (vg_replace_malloc.c:575)
==24750== by 0x4189A1: LockFreeQueue2<int>::decreaseNode(LockFreeQueue2<int>::Node*) (LockFreeQueue2.hpp:109)
==24750== by 0x418754: LockFreeQueue2<int>::moveNext(LockFreeQueue2<int>::CountedNode&) (LockFreeQueue2.hpp:90)
==24750== by 0x417401: LockFreeQueue2<int>::push(int const&) (LockFreeQueue2.hpp:76)
==24750== by 0x4169E2: void pusher<int>(int, LockFreeQueue2<int>&) (main.cpp:100)
==24750== by 0x41A9E6: void std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)>::__call<void, , 0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) (functional:1074)
==24750== by 0x41A836: void std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)>::operator()<, void>() (functional:1133)
==24750== by 0x41A7A3: void std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()>::_M_invoke<>(std::_Index_tuple<>) (functional:1531)
==24750== by 0x41A667: std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()>::operator()() (functional:1520)
==24750== by 0x41A571: std::thread::_Impl<std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()> >::_M_run() (thread:115)
==24750== by 0x4EF504F: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22)
==24750== by 0x53DF6F9: start_thread (pthread_create.c:333)
==24750== Block was alloc'd at
==24750== at 0x4C2E118: operator new(unsigned long) (vg_replace_malloc.c:333)
==24750== by 0x41843B: LockFreeQueue2<int>::CountedNode::CountedNode(int const&) (LockFreeQueue2.hpp:19)
==24750== by 0x41731D: LockFreeQueue2<int>::push(int const&) (LockFreeQueue2.hpp:63)
==24750== by 0x4169E2: void pusher<int>(int, LockFreeQueue2<int>&) (main.cpp:100)
==24750== by 0x41A9E6: void std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)>::__call<void, , 0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) (functional:1074)
==24750== by 0x41A836: void std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)>::operator()<, void>() (functional:1133)
==24750== by 0x41A7A3: void std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()>::_M_invoke<>(std::_Index_tuple<>) (functional:1531)
==24750== by 0x41A667: std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()>::operator()() (functional:1520)
==24750== by 0x41A571: std::thread::_Impl<std::_Bind_simple<std::_Bind<void (*(int, std::reference_wrapper<LockFreeQueue2<int> >))(int, LockFreeQueue2<int>&)> ()> >::_M_run() (thread:115)
==24750== by 0x4EF504F: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22)
==24750== by 0x53DF6F9: start_thread (pthread_create.c:333)
==24750== by 0x56FBB5C: clone (clone.S:109)
P.S. When I uncomment delete node it sometimes loop infinitelly. When I comment it always finishes.
Aucun commentaire:
Enregistrer un commentaire