dimanche 5 novembre 2017

Apparent Threading Performance Differences between macOS and Linux using std::thread

So I have been starting to learn about threading, using threading in C++, and, in particular, using std::thread to accomplish this. To practice a little bit, I wrote a threaded version of a basic Particle Swarm Optimization code and was comparing it to the non-threaded counterpart.

What I found interesting was that running the comparison on my MacBook with 4 useable threads showed a speed-up relative to the non-threaded implementation, but when I did the same comparison on a Linux environment where I have 96 useable threads, the threaded implementation was much slower. Below is an example of the run-time comparisons on each platform from my basic unit test outputs:

macOS macOS testing output for PSO

Linux Linux testing output for PSO

I want to suspect the issue is my code, so I will share my main code below in case someone can spot some noob mistakes in my threading implementation. For the full project source (which isn't much code anyway), just go here. Any thoughts about what could be going on will be much appreciated.

//
// Created by C. Howard on 10/29/17.
//

#ifndef SRC_OPT_PSO_HPP
#define SRC_OPT_PSO_HPP

#include <cstdint>
#include <cmath>
#include <random>
#include <limits>
#include <thread>
#include <mutex>
#include <array>
#include "opt_solver.hpp"
#include "../utility/text_color.hpp"
#include "../utility/UnitTest.hpp"
#include "../test/opt_test_rosenbrock2d.hpp"

namespace opt {

    // forward declare pso iteration callback
    template<template<typename> class costfunc> class pso_iter;


    /*
     * This code assumes you are solving minimization problems
     *
     */
    template< template<typename> class costfunc >
    class pso : public solver<pso<costfunc>> {
    public:

        // useful typedefs
        typedef solver<pso<costfunc>>   parent;
        typedef typename parent::num_t  num_t;

        // constructor
        pso():nparticles(10),mc(0.8),lc(0.5),gc(0.3),
              seed(17),U(0.0,1.0),
              miter(100),niter(0),useThreads(false){}

        // setters
        void setUseThreading( bool useThreads_ )    { useThreads = useThreads_; }
        void setNumParticles( uint32_t np )         { nparticles = np; }
        void setMomentumCoefficient( num_t mom)     { mc = mom; }
        void setLocalCoefficient(num_t lcoef )      { lc = lcoef; }
        void setGlobalCoefficient( num_t gcoef )    { gc = gcoef; }
        void setSeed( uint32_t s )                  { seed = s; }
        void setMaxIterations( uint32_t miter_ )    { miter = miter_; }
        void setSearchBounds(   const std::vector<num_t> & lb_,
                                const std::vector<num_t> & ub_ )
        {
            lb = lb_; ub = ub_;
        }

        // getters
        const std::vector<num_t> & getBestSolution() { return gb; }
        const num_t              & getBestCost() { return gb_cost; }
        const std::vector<num_t> & getBestSolutionFor(uint32_t particle_num){ return pb[particle_num]; }
        const num_t              & getBestCostFor(uint32_t particle_num){ return pb_cost[particle_num]; }

        costfunc<num_t> & getCostFunction(){ return cfunc; }

    private:
        friend class pso_iter<costfunc>;
        friend solver<pso<costfunc>>;

        bool useThreads;
        uint32_t nparticles, niter, miter, seed, ndim;
        std::default_random_engine rng;
        std::uniform_real_distribution<num_t> U;
        double mc, lc, gc;
        std::vector<std::vector<num_t>> p;  // particle positions
        std::vector<std::vector<num_t>> pb; // a particle's best position
        std::vector<num_t>              pb_cost; // personal best costs
        std::vector<num_t>              gb; // the global best position
        num_t                           gb_cost; // global best cost
        std::mutex                      gb_mutex;
        std::vector<std::vector<num_t>> v;  // particle velocities
        std::vector<num_t> lb; // lower bound of domain
        std::vector<num_t> ub; // upper bound of domain
        costfunc<num_t> cfunc;

        void init() {
            niter = 0;
            ndim = costfunc<num_t>::numDims();
            rng.seed(seed);
            randomInit();
        }

        bool isNotComplete() {
            return miter > niter;
        }

        void update() {
            if( useThreads ){   update_threaded(); }
            else{               update_serial(); }
        }

        void update_serial() {
            // init temp variable for cost
            num_t tcost = 0.0;

            // loop through and update all particles
            // and their associated local/global info
            for(uint32_t n = 0; n < nparticles; ++n){
                updateParticle(n);
                tcost = cfunc(p[n]);

                if( tcost < pb_cost[n] ){
                    pb_cost[n]  = tcost;
                    pb[n].assign(p[n].begin(),p[n].end());
                    if( tcost < gb_cost ){
                        gb_cost = tcost;
                        gb.assign(p[n].begin(),p[n].end());
                    }// end if tcost is less than best global cost
                }// end if tcost is less than best local cost

            }// end for n

            // update iteration count
            niter++;
        }

        void update_chunk_threaded(uint32_t sidx, uint32_t eidx){

            // init temp variable for cost
            num_t tcost = 0.0;

            // loop through and update all particles
            // and their associated local/global info
            for(uint32_t n = sidx; n < eidx; ++n){
                updateParticle(n);
                tcost = cfunc(p[n]);

                if( tcost < pb_cost[n] ){
                    pb_cost[n]  = tcost;
                    pb[n].assign(p[n].begin(),p[n].end());
                    if( tcost < gb_cost ){
                        std::lock_guard<std::mutex> guard(gb_mutex);
                        gb_cost = tcost;
                        gb.assign(p[n].begin(),p[n].end());
                    }// end if tcost is less than best global cost
                }// end if tcost is less than best local cost

            }// end for n
        }

        void update_chunk_threaded2(uint32_t sidx, uint32_t eidx, num_t & gcost, uint32_t & gcost_idx){

            // init temp variable for cost
            num_t tcost = 0.0;

            // loop through and update all particles
            // and their associated local/global info
            for(uint32_t n = sidx; n < eidx; ++n){
                updateParticle(n);
                tcost = cfunc(p[n]);

                if( tcost < pb_cost[n] ){
                    pb_cost[n]  = tcost;
                    pb[n].assign(p[n].begin(),p[n].end());
                    if( tcost < gcost ){
                        gcost = tcost;
                        gcost_idx = n;
                    }// end if tcost is less than best global cost
                }// end if tcost is less than best local cost
            }// end for n
        }

        void update_threaded(){

            // define static arrays for threads related to best cost stuff
            // with upper bound on number of threads expected to be used
            static std::array<num_t,128>    gcost;
            static std::array<uint32_t,128> gcost_idx;

            // get number of threads
            uint32_t nthreads = std::thread::hardware_concurrency();

            // initialize threads
            std::vector<std::thread> threads;

            // get the number of particles to update per thread
            uint32_t npt = nparticles / nthreads;
            uint32_t sidx = 0, eidx = 0;
            if( npt == 0 ){ npt = 1; nthreads = nparticles; }
            for(uint32_t n = 0; n < nthreads; ++n){
                gcost[n] = gb_cost;
                sidx = n*npt;
                if( n == nthreads - 1 ){    eidx = nparticles; }
                else{                       eidx = sidx + npt;}
                threads.push_back(std::thread( [&]
                                               { update_chunk_threaded2(sidx,eidx,
                                                                        gcost[n],gcost_idx[n]);
                                               }
                ));
            }

            // join each thread
            for(auto & thread: threads){ thread.join(); }

            // set the global particle value
            int bidx = -1;
            for(int n = 0; n < nthreads; ++n){
                if( gcost[n] < gb_cost ){ bidx = n;}
            }
            if( bidx >= 0 ){
                gb_cost = gcost[bidx];
                gb.assign(p[gcost_idx[bidx]].begin(),
                          p[gcost_idx[bidx]].end());
            }

            // update iteration count
            niter++;
        }


        // initialize particles using Monte Carlo sampling
        void randomInit(){

            // init variables
            num_t s = 0.0, dv = 0.0;
            gb_cost = std::numeric_limits<num_t>::max();
            p.resize(nparticles);
            pb.resize(nparticles);
            pb_cost.resize(nparticles);
            v.resize(nparticles);

            // loop through and initialize all the particles
            // and their associated local/global data
            for(int n = 0; n < nparticles; ++n){
                p[n].resize(ndim);
                pb[n].resize(ndim);
                v[n].resize(ndim);

                // construct particle values and velocities via monte carlo
                for(int d = 0; d < ndim; ++d){
                    s           = U(rng);
                    p[n][d]     = lb[d]*(1-s) + ub[d]*s;
                    pb[n][d]    = p[n][d];

                    s           = U(rng);
                    dv          = std::abs(ub[d] - lb[d]);
                    v[n][d]     = dv*(1.0 - 2*s);
                }// end for d

                // evaluate particle positions to see what the
                // local and global best particles are
                pb_cost[n] = cfunc(pb[n]);
                if( pb_cost[n] < gb_cost ){
                    gb.assign(pb[n].begin(),pb[n].end());
                    gb_cost = pb_cost[n];
                }
            }// end for i
        }

        void updateParticle(uint32_t idx){
            num_t r1 = 0.0,r2 = 0.0;
            for(int d = 0; d < ndim; ++d){
                r1          = U(rng);
                r2          = U(rng);

                // update velocity via stochastic formula
                v[idx][d]   = mc*v[idx][d]
                              + lc*r1*(pb[idx][d]   - p[idx][d])
                              + gc*r2*(gb[d]        - p[idx][d]);

                // update position via velocity
                p[idx][d]    += v[idx][d];

                // do energy loss reflection if particle leaves some hyperrectangle boundary
                if      ( p[idx][d] > ub[d] ){ p[idx][d] = ub[d]; v[idx][d] *= -0.8; }
                else if ( p[idx][d] < lb[d] ){ p[idx][d] = lb[d]; v[idx][d] *= -0.8; }
            }// end for d
        }

    };


    // define iteration progress callback for PSO
    template<template<typename> class costfunc>
    class pso_iter : public callback<pso<costfunc>> {
    public:
        typedef pso<costfunc> opt_t;

        // main callback method
        void run() {
            const opt_t* opt_ = this->getOptimizer();
            printf("Iteration(");
            text::printf_color(text::Yellow,"%6u",opt_->niter);
            printf(") | Best Cost so far is ");
            text::printf_color(text::Cyan,"%lf",static_cast<double>(opt_->gb_cost));
            printf("\n");
        }
    };


    // define some unit tests
    TEST(PSO,rosenbrock2d_serial,{
        typedef opt::pso<test::rosenbrock2d> pso_t;
        typedef typename pso_t::num_t num_t;
        std::vector<num_t> lb(test::rosenbrock2d<double>::numDims(),-5);
        std::vector<num_t> ub(test::rosenbrock2d<double>::numDims(),5);
        pso_t solver;
        solver.setMaxIterations(1000);
        solver.setNumParticles(10000);
        solver.setSeed(17);
        solver.setSearchBounds(lb,ub);
        solver.solve();

        auto soln = solver.getBestSolution();

        // return true if the solution is close to the true solution
        return (std::abs(soln[0] - 1.0) + std::abs(soln[1]-1.0)) < 1e-5;
    })


    TEST(PSO,rosenbrock2d_threaded,{
        typedef opt::pso<test::rosenbrock2d> pso_t;
        typedef typename pso_t::num_t num_t;
        std::vector<num_t> lb(test::rosenbrock2d<double>::numDims(),-5);
        std::vector<num_t> ub(test::rosenbrock2d<double>::numDims(),5);
        pso_t solver;
        solver.setMaxIterations(1000);
        solver.setNumParticles(10000);
        solver.setSeed(17);
        solver.setSearchBounds(lb,ub);
        solver.setUseThreading(true);
        solver.solve();

        auto soln = solver.getBestSolution();

        // return true if the solution is close to the true solution
        return (std::abs(soln[0] - 1.0) + std::abs(soln[1]-1.0)) < 1e-5;
    })

}

#endif //SRC_OPT_PSO_HPP

Aucun commentaire:

Enregistrer un commentaire