dimanche 31 octobre 2021

Getting SIGSEGV error in my concurrent Queue code, plus wanted to understand how can I improve it's performance if the segfault is solved

I am trying to write a single producer- single consumer queue. But to me it feels that it's not the best approach that I have used, I am new to multi-threading. Can some one please take a look and suggest few improvements. Also it is throwing segmentation fault. I have attached the case in main body it self with gdb output.

compiling the code like this

clang++ -std=c++11 ConcurrentQueue.cpp -o test -lpthread

Below is my concurrent queue code. The failed case is also given in main block.

#include <cmath>
#include <functional>
#include <iostream>
#include <mutex>
#include <stdexcept>
#include <thread>

template<typename T, uint64_t SIZE = 2048, uint64_t MAX_SPIN_ON_BUSY = 40000000>
class ConcurrentQueue {
private:
    static constexpr unsigned Log2(unsigned n, unsigned p = 0) {
        return (n <= 1) ? p : Log2(n / 2, p + 1);
    }

    static constexpr uint64_t closestExponentOf2(uint64_t x) {
        return (1UL << ((uint64_t) (Log2(SIZE - 1)) + 1));
    }

    static constexpr uint64_t mRingModMask = closestExponentOf2(SIZE) - 1;
    static constexpr uint64_t mSize = closestExponentOf2(SIZE);

    static const T mEmpty;

    T mMem[mSize];
    std::mutex mLock;
    uint64_t mReadPtr = 0;
    uint64_t mWritePtr = 0;

public:
    const T& pop() {
        if (!peek()) {
            return mEmpty;
        }

        std::lock_guard<std::mutex> lock(mLock);

        if (!peek()) {
            return mEmpty;
        }

        T& ret = mMem[mReadPtr & mRingModMask];

        mReadPtr++;
        return ret;
    }

    bool peek() const {
        return (mWritePtr != mReadPtr);
    }

    uint64_t getCount() const {
        return mWritePtr > mReadPtr ? mWritePtr - mReadPtr : mReadPtr - mWritePtr;
    }

    bool busyWaitForPush() {
        uint64_t start = 0;
        while (getCount() == mSize) {
            if (start++ > MAX_SPIN_ON_BUSY) {
                return false;
            }
        }
        return true;
    }

    void push(const T& pItem) {
        if (!busyWaitForPush()) {
            throw std::runtime_error("Concurrent queue full cannot write to it!");
        }

        std::lock_guard<std::mutex> lock(mLock);
        mMem[mWritePtr & mRingModMask] = pItem;
        mWritePtr++;
    }

    void push(T&& pItem) {
        if (!busyWaitForPush()) {
            throw std::runtime_error("Concurrent queue full cannot write to it!");
        }

        std::lock_guard<std::mutex> lock(mLock);
        mMem[mWritePtr & mRingModMask] = std::move(pItem);
        mWritePtr++;
    }
};

template<typename T, uint64_t SIZE, uint64_t MAX_SPIN_ON_BUSY>
const T ConcurrentQueue<T, SIZE, MAX_SPIN_ON_BUSY>::mEmpty = T{ };

int main(int, char**) {
    using Functor = std::function<void()>;

    ConcurrentQueue<Functor*> queue;

    std::thread consumer([ & ] {
        while (true) {
            if (queue.peek()) {
                auto task = queue.pop();
                (*task)();
                delete task;
            }
        }
    });

    std::thread producer([ & ] {
        uint64_t counter = 0;
        while (true) {
            auto taskId = counter++;
            auto newTask = new Functor([ = ] {
                std::cout << "Running task " << taskId << std::endl << std::flush;
            });
            queue.push(newTask);
        }
    });

    consumer.join();
    producer.join();
    return 0;
}

Below is gdb output

Reading symbols from test...done.
[New LWP 3242]
[New LWP 3240]
[New LWP 3241]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
Core was generated by `./test'.
Program terminated with signal SIGSEGV, Segmentation fault.
#0  _int_malloc (av=av@entry=0x7f6500000020, bytes=bytes@entry=32) at malloc.c:3378
3378    malloc.c: No such file or directory.
[Current thread is 1 (Thread 0x7f6505f34700 (LWP 3242))]
(gdb) bt
#0  _int_malloc (av=av@entry=0x7f6500000020, bytes=bytes@entry=32) at malloc.c:3378
#1  0x00007f65067ba184 in __GI___libc_malloc (bytes=32) at malloc.c:2913
#2  0x00007f65070ace78 in operator new(unsigned long) () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#3  0x00000000004034da in main::$_1::operator() (this=0x1fc7db8) at ConcurrentQueue.cpp:108
#4  0x0000000000403475 in std::_Bind_simple<main::$_1 ()>::_M_invoke<>(std::_Index_tuple<>) (this=0x1fc7db8)
    at /usr/bin/../lib/gcc/x86_64-linux-gnu/5.4.0/../../../../include/c++/5.4.0/functional:1530
#5  0x0000000000403445 in std::_Bind_simple<main::$_1 ()>::operator()() (this=0x1fc7db8)
    at /usr/bin/../lib/gcc/x86_64-linux-gnu/5.4.0/../../../../include/c++/5.4.0/functional:1520
#6  0x0000000000403339 in std::thread::_Impl<std::_Bind_simple<main::$_1 ()> >::_M_run() (this=0x1fc7da0)
    at /usr/bin/../lib/gcc/x86_64-linux-gnu/5.4.0/../../../../include/c++/5.4.0/thread:115
#7  0x00007f65070d7c80 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#8  0x00007f65073a86ba in start_thread (arg=0x7f6505f34700) at pthread_create.c:333
#9  0x00007f650683d41d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109
(gdb)

Aucun commentaire:

Enregistrer un commentaire