vendredi 29 avril 2022

C++ google test fail for multi threads

I am having following C++ code and corresponding unit tests in google test. I am studying book on Modern C++ programming using Test Driven development. Below code is crashing in scale tests. Problem to my analysis is this part of code. If this code is removed program is running. Another observations same code is working in another test case "HandlesLargeNumbersOfUsers" with single or multiple threads. your inputs will help me in moving further.

Work work{ [&] {
           if (isDifferentUserInBounds(each, user, box))
              listener->updated(User{each.first, each.second});
        } }; 

enter image description here

class GeoServerUsersInBoxTests : public testing::Test {
public:
    GeoServer server;

    const double TenMeters{ 10 };
    const double Width{ 2000 + TenMeters };
    const double Height{ 4000 + TenMeters };
    const string aUser{ "auser" };
    const string bUser{ "buser" };
    const string cUser{ "cuser" };

    Location aUserLocation{ 38, -103 };

    shared_ptr<ThreadPool> pool;

    virtual void SetUp() override {
        server.useThreadPool(pool);

        server.track(aUser);
        server.track(bUser);
        server.track(cUser);

        server.updateLocation(aUser, aUserLocation);
    }

    string userName(unsigned int i) {
        return string{ "user" + to_string(i) };
    }

    void addUsersAt(unsigned int number, const Location& location) {
        for (unsigned int i{ 0 }; i < number; i++) {
            string user = userName(i);
            server.track(user);
            server.updateLocation(user, location);
        }
    }
};

class AGeoServer_ScaleTests : public GeoServerUsersInBoxTests {

public:

    class GeoServerCountingListener : public GeoServerListener {
    public:
        void updated(const User& user) override {
            unique_lock<std::mutex> lock(mutex_);
            Count++;
            wasExecuted_.notify_all();
        }

        void waitForCountAndFailOnTimeout(unsigned int expectedCount,
            const milliseconds& time = milliseconds(10000)) {
            unique_lock<mutex> lock(mutex_);
            ASSERT_TRUE(wasExecuted_.wait_for(lock, time, [&]
                { return expectedCount == Count; }));
        }

        condition_variable wasExecuted_;
        unsigned int Count{ 0 };
        mutex mutex_;
    };

    GeoServerCountingListener countingListener;
    shared_ptr<thread> t;

    void SetUp() override {
        pool = make_shared<ThreadPool>();
        GeoServerUsersInBoxTests::SetUp();
    }

    void TearDown() override {
        t->join();
    }
};


TEST_F(AGeoServer_ScaleTests, HandlesLargeNumbersOfUsers) {
    pool->start(4);
    const unsigned int lots{ 5000 };
    addUsersAt(lots, Location{ aUserLocation.go(TenMeters, West) });

    t = make_shared<thread>(
        [&] { server.usersInBox(aUser, Width, Height, &countingListener); });

    countingListener.waitForCountAndFailOnTimeout(lots);
}

ThreadPool.h

class ThreadPool {
public:
    virtual ~ThreadPool() {
        stop();
    }

    void stop() {
        done_ = true;
        for (auto& thread : threads_) thread.join();
    }

    void start(unsigned int numberOfThreads = 1) {
        for (unsigned int i{ 0u }; i < numberOfThreads; i++)
            threads_.push_back(std::thread(&ThreadPool::worker, this));
    }

    bool hasWork() {
        std::lock_guard<std::mutex> block(mutex_);
        return !workQueue_.empty();
    }

    virtual void add(Work work) {
        std::lock_guard<std::mutex> block(mutex_);
        workQueue_.push_front(work);
    }

    Work pullWork() {
        std::lock_guard<std::mutex> block(mutex_);

        if (workQueue_.empty()) return Work{};

        auto work = workQueue_.back();
        workQueue_.pop_back();
        return work;
    }

private:
    void worker() {
        while (!done_) {
            while (!done_ && !hasWork())
                ;
            if (done_) break;
            pullWork().execute();
        }
    }

    std::atomic<bool> done_{ false };
    std::deque<Work> workQueue_;
    std::shared_ptr<std::thread> workThread_;
    std::mutex mutex_;
    std::vector<std::thread> threads_;
};

GeoServer.cpp

void GeoServer::track(const string& user) {
    locations_[user] = Location();
}

void GeoServer::stopTracking(const string& user) {
    locations_.erase(user);
}

bool GeoServer::isTracking(const string& user) const {
    return find(user) != locations_.end();
}

void GeoServer::updateLocation(const string& user, const Location& location) {
    locations_[user] = location;
}

Location GeoServer::locationOf(const string& user) const {
    if (!isTracking(user)) return Location{}; // TODO performance cost?

    return find(user)->second;
}

std::unordered_map<std::string, Location>::const_iterator
GeoServer::find(const std::string& user) const {
    return locations_.find(user);
}

bool GeoServer::isDifferentUserInBounds(
    const pair<string, Location>& each,
    const string& user,
    const Area& box) const {
    if (each.first == user) return false;
    return box.inBounds(each.second);
}

void GeoServer::usersInBox(
    const string& user, double widthInMeters, double heightInMeters,
    GeoServerListener* listener) const {
    auto location = locations_.find(user)->second;
    Area box{ location, widthInMeters, heightInMeters };

    for (auto& each : locations_) {
        Work work{ [&] {
           if (isDifferentUserInBounds(each, user, box))
              listener->updated(User{each.first, each.second});
        } };
        pool_->add(work);
    }
}

Aucun commentaire:

Enregistrer un commentaire