I am trying to write a single producer- single consumer queue. But to me it feels that it's not the best approach that I have used, I am new to multi-threading. Can some one please take a look and suggest few improvements. Also it is throwing segmentation fault. I have attached the case in main body it self with gdb output.
compiling the code like this
clang++ -std=c++11 ConcurrentQueue.cpp -o test -lpthread
Below is my concurrent queue code. The failed case is also given in main block.
#include <cmath>
#include <functional>
#include <iostream>
#include <mutex>
#include <stdexcept>
#include <thread>
template<typename T, uint64_t SIZE = 2048, uint64_t MAX_SPIN_ON_BUSY = 40000000>
class ConcurrentQueue {
private:
static constexpr unsigned Log2(unsigned n, unsigned p = 0) {
return (n <= 1) ? p : Log2(n / 2, p + 1);
}
static constexpr uint64_t closestExponentOf2(uint64_t x) {
return (1UL << ((uint64_t) (Log2(SIZE - 1)) + 1));
}
static constexpr uint64_t mRingModMask = closestExponentOf2(SIZE) - 1;
static constexpr uint64_t mSize = closestExponentOf2(SIZE);
static const T mEmpty;
T mMem[mSize];
std::mutex mLock;
uint64_t mReadPtr = 0;
uint64_t mWritePtr = 0;
public:
const T& pop() {
if (!peek()) {
return mEmpty;
}
std::lock_guard<std::mutex> lock(mLock);
if (!peek()) {
return mEmpty;
}
T& ret = mMem[mReadPtr & mRingModMask];
mReadPtr++;
return ret;
}
bool peek() const {
return (mWritePtr != mReadPtr);
}
uint64_t getCount() const {
return mWritePtr > mReadPtr ? mWritePtr - mReadPtr : mReadPtr - mWritePtr;
}
bool busyWaitForPush() {
uint64_t start = 0;
while (getCount() == mSize) {
if (start++ > MAX_SPIN_ON_BUSY) {
return false;
}
}
return true;
}
void push(const T& pItem) {
if (!busyWaitForPush()) {
throw std::runtime_error("Concurrent queue full cannot write to it!");
}
std::lock_guard<std::mutex> lock(mLock);
mMem[mWritePtr & mRingModMask] = pItem;
mWritePtr++;
}
void push(T&& pItem) {
if (!busyWaitForPush()) {
throw std::runtime_error("Concurrent queue full cannot write to it!");
}
std::lock_guard<std::mutex> lock(mLock);
mMem[mWritePtr & mRingModMask] = std::move(pItem);
mWritePtr++;
}
};
template<typename T, uint64_t SIZE, uint64_t MAX_SPIN_ON_BUSY>
const T ConcurrentQueue<T, SIZE, MAX_SPIN_ON_BUSY>::mEmpty = T{ };
int main(int, char**) {
using Functor = std::function<void()>;
ConcurrentQueue<Functor*> queue;
std::thread consumer([ & ] {
while (true) {
if (queue.peek()) {
auto task = queue.pop();
(*task)();
delete task;
}
}
});
std::thread producer([ & ] {
uint64_t counter = 0;
while (true) {
auto taskId = counter++;
auto newTask = new Functor([ = ] {
std::cout << "Running task " << taskId << std::endl << std::flush;
});
queue.push(newTask);
}
});
consumer.join();
producer.join();
return 0;
}
Below is gdb output
Reading symbols from test...done.
[New LWP 3242]
[New LWP 3240]
[New LWP 3241]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
Core was generated by `./test'.
Program terminated with signal SIGSEGV, Segmentation fault.
#0 _int_malloc (av=av@entry=0x7f6500000020, bytes=bytes@entry=32) at malloc.c:3378
3378 malloc.c: No such file or directory.
[Current thread is 1 (Thread 0x7f6505f34700 (LWP 3242))]
(gdb) bt
#0 _int_malloc (av=av@entry=0x7f6500000020, bytes=bytes@entry=32) at malloc.c:3378
#1 0x00007f65067ba184 in __GI___libc_malloc (bytes=32) at malloc.c:2913
#2 0x00007f65070ace78 in operator new(unsigned long) () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#3 0x00000000004034da in main::$_1::operator() (this=0x1fc7db8) at ConcurrentQueue.cpp:108
#4 0x0000000000403475 in std::_Bind_simple<main::$_1 ()>::_M_invoke<>(std::_Index_tuple<>) (this=0x1fc7db8)
at /usr/bin/../lib/gcc/x86_64-linux-gnu/5.4.0/../../../../include/c++/5.4.0/functional:1530
#5 0x0000000000403445 in std::_Bind_simple<main::$_1 ()>::operator()() (this=0x1fc7db8)
at /usr/bin/../lib/gcc/x86_64-linux-gnu/5.4.0/../../../../include/c++/5.4.0/functional:1520
#6 0x0000000000403339 in std::thread::_Impl<std::_Bind_simple<main::$_1 ()> >::_M_run() (this=0x1fc7da0)
at /usr/bin/../lib/gcc/x86_64-linux-gnu/5.4.0/../../../../include/c++/5.4.0/thread:115
#7 0x00007f65070d7c80 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#8 0x00007f65073a86ba in start_thread (arg=0x7f6505f34700) at pthread_create.c:333
#9 0x00007f650683d41d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109
(gdb)
Aucun commentaire:
Enregistrer un commentaire