mercredi 29 mars 2017

Using Wt::Dbo cause MySQL prepared statement leak

I have a project that is using Wt::Dbo as object relational database management with MySQL. Since about a week I've remarked a leak in database prepared statement. Previously this project was using SQLite.

I tried different flush() without success and can't figure out exactly what is causing this leak, but for sure when prepared statements grow at a certain point, MySQL stop answering.

Here is how I monitor the leaked prepared statement :

$ mysql -uroot -p -e "SHOW SESSION STATUS LIKE '%prepare%';" | grep stmt_count
Enter password: 
Prepared_stmt_count 260

The leaked statement are cleared when program is restarted.

All database operations are centralized inside a class called DataBase, here are some functions that are known to leak :

DataBase::initialize()

void DataBase::initialize(void)
{
    m_oMutex.lock();

    if(!m_bInitialized)
    {
        m_bInitialized = true;

        try
        {
            m_oSessionConfigurations.setConnection(*new Wt::Dbo::backend::Sqlite3("databases/configuration.db"));
            m_oSessionConfigurations.mapClass<InformationSite>("informationsite");

            m_oSessionConfigurations.createTables();
        }
        catch(std::exception &e)
        {
            ajouterLog("error",std::string("DataBase::initialize() : ") + e.what());
        }

        try
        {
            #if defined(DATABASE_TYPE_SQLITE)
                Wt::Dbo::backend::Sqlite3 *pBackend = new Wt::Dbo::backend::Sqlite3("databases/dataBase.db");
            #elif defined(DATABASE_TYPE_MYSQL)
                Wt::Dbo::backend::MySQL *pBackend;

                try
                {
                    pBackend = new Wt::Dbo::backend::MySQL(DATABASE_NAME,DATABASE_USERNAME,DATABASE_PASSWORD,"localhost");
                }
                catch(Wt::Dbo::Exception &e)
                {
                    ajouterLog("error",std::string("DataBase::initialize() : ") + e.what());

                    // If MySQL is not available, this cause issue to program until restart.
                    exit(1);
                }
            #endif

            pBackend->setProperty("show-queries","true");

            m_oSession.setConnection(*pBackend);

            m_oSession.setFlushMode(Wt::Dbo::FlushMode::Auto);

            m_oSession.mapClass<RFNode>("rfnode");
            m_oSession.mapClass<NodeMeasure>("nodemeasure");

            // Override the default InnoDB from Wt, MyISAM is easier to repair in case of hardware failure with database corruption
            #if defined(DATABASE_TYPE_MYSQL)
                try
                {
                    Wt::Dbo::Transaction oTransaction(m_oSession);

                    m_oSession.execute("SET default_storage_engine=MYISAM;");

                    oTransaction.commit();
                }
                catch(Wt::Dbo::Exception &e)
                {
                    ajouterLog("error",std::string("DataBase::initialize() : ") + e.what());
                }
            #endif

            m_oSession.createTables();
        }
        catch(Wt::Dbo::Exception &e)
        {
            ajouterLog("error",std::string("DataBase::initialize() : ") + e.what());
        }
    }

    m_oMutex.unlock();
}

DataBase::addNodeMeasure()

void DataBase::addNodeMeasure(NodeMeasure *p_pItem)
{
    m_oMutex.lock();

    try
    {
        Wt::Dbo::Transaction oTransaction(m_oSession);

        Wt::Dbo::ptr<NodeMeasure> oItem = m_oSession.add(p_pItem);

        oItem.flush();

        oTransaction.commit();
    }
    catch(std::exception &e)
    {
        ajouterLog("error",std::string("Exception DataBase::addNodeMeasure() : ") + e.what());
    }

    m_oMutex.unlock();

    printPreparedStatementCount("DataBase::addNodeMeasure()");
}

DataBase::updateNode()

void DataBase::updateNode(RFNode *p_pItem)
{
    printPreparedStatementCount("DataBase::updateNode() Before");

    m_oMutex.lock();

    try
    {
        Wt::Dbo::Transaction oTransaction(m_oSession);

        Wt::Dbo::ptr<RFNode> oItem = m_oSession.find<RFNode>().where("mac = ?").bind(p_pItem->mac);

        oItem.modify()->zone                    = p_pItem->zone;
        oItem.modify()->subZone                 = p_pItem->subZone;
        oItem.modify()->unit                    = p_pItem->unit;
        oItem.modify()->pwm                     = p_pItem->pwm;
        oItem.modify()->led                     = p_pItem->led;
        oItem.modify()->network                 = p_pItem->network;
        oItem.modify()->lastContact             = p_pItem->lastContact;
        oItem.modify()->ioConfiguration         = p_pItem->ioConfiguration;
        oItem.modify()->networkAddress          = p_pItem->networkAddress;
        oItem.modify()->type                    = p_pItem->type;
        oItem.modify()->functionality           = p_pItem->functionality;
        oItem.modify()->transmitPowerLevel      = p_pItem->transmitPowerLevel;
        oItem.modify()->lastNetworkRoute        = p_pItem->lastNetworkRoute;
        oItem.modify()->lastNetworkJumpsCount   = p_pItem->lastNetworkJumpsCount;
        oItem.modify()->lastRequestDuration     = p_pItem->lastRequestDuration;
        oItem.modify()->hardwareVersion         = p_pItem->hardwareVersion;
        oItem.modify()->softwareVersion         = p_pItem->softwareVersion;

        oItem.flush();

        oTransaction.commit();
    }
    catch(std::exception &e)
    {
        ajouterLog("error",std::string("Exception DataBase::updateNode() : ") + e.what());
    }

    m_oMutex.unlock();

    printPreparedStatementCount("DataBase::updateNode() After");
}

DataBase::getNodeMeasures()

std::vector<NodeMeasure> DataBase::getNodeMeasures(std::string p_sMAC, int p_nType, Wt::WDateTime p_oStartDate, Wt::WDateTime p_oEndDate, std::string p_sOrder, int p_nLimit)
{
    std::vector<NodeMeasure> lNodeMeasures;

    m_oMutex.lock();

    try
    {
        Wt::Dbo::Transaction oTransaction(m_oSession);

        std::string sWhereClause = "", sOrderClause = "";

        if(!p_sMAC.empty())
        {
            if(!sWhereClause.empty())
            {
                sWhereClause += " AND ";
            }

            sWhereClause += "mac = '" + p_sMAC + "'";
        }

        if(p_nType != -1)
        {
            if(!sWhereClause.empty())
            {
                sWhereClause += " AND ";
            }

            sWhereClause += "type = " + std::to_string(p_nType);
        }

        if(p_oStartDate.isValid())
        {
            if(!sWhereClause.empty())
            {
                sWhereClause += " AND ";
            }

            // When not using type, we usually want nodes measures (not external temperature), so we want to find them using batchDate instead of date
            sWhereClause += (p_nType != -1 ? "date" : "batchDate");
            sWhereClause += " >= '";
            sWhereClause += p_oStartDate.toString("yyyy-MM-ddTHH:mm:ss").toUTF8();
            sWhereClause += "'";
        }

        if(p_oEndDate.isValid())
        {
            if(!sWhereClause.empty())
            {
                sWhereClause += " AND ";
            }

            // When not using type, we usually want nodes measures (not external temperature), so we want to find them using batchDate instead of date
            sWhereClause += (p_nType != -1 ? "date" : "batchDate");
            sWhereClause += " <= '";
            // Add one second because SQLite have microseconds, and we must include results no matter microseconds field
            sWhereClause += p_oEndDate.addSecs(1).toString("yyyy-MM-ddTHH:mm:ss").toUTF8();
            sWhereClause += "'";
        }

        if(!p_sOrder.empty())
        {
            sOrderClause = " ORDER BY " + p_sOrder;
        }

        std::string sQuery = "";

        if(!sWhereClause.empty())
        {
            sQuery += " WHERE ";
            sQuery += sWhereClause;
        }

        if(!sOrderClause.empty())
        {
            sQuery += sOrderClause;
        }

        //std::cout << "**************************************************************************" << std::endl;
        //std::cout << sQuery << std::endl;
        //Wt::WDateTime oStart = Wt::WDateTime::currentDateTime();

        if(Configuration::getParameter(Configuration::PARAMETER_DEBUG).getBooleanValue())
        {
            ajouterLog("debug","DataBase::getNodeMeasures() " + sQuery);
        }

        // TEST : find vs query
        Wt::Dbo::collection<Wt::Dbo::ptr<NodeMeasure>> lMeasures = m_oSession.find<NodeMeasure>(sQuery).limit(p_nLimit).resultList();

        // TODO : Get it cleaner... can't use Wt::Dbo::ptr outside transaction.
        for(Wt::Dbo::collection<Wt::Dbo::ptr<NodeMeasure>>::const_iterator pMeasure = lMeasures.begin();pMeasure != lMeasures.end();pMeasure++)
        {
            lNodeMeasures.push_back(
                    NodeMeasure(
                            (*pMeasure)->mac,
                            (*pMeasure)->type,
                            (*pMeasure)->date,
                            (*pMeasure)->batchDate,
                            (*pMeasure)->value
                    )
            );

            (*pMeasure).flush();
        }

        //lNodeMeasures = m_oSession.find<NodeMeasure>(sQuery).limit(p_nLimit).resultList();

        //std::cout << "Result : " << lNodeMeasures.size() << " in " << oStart.secsTo(Wt::WDateTime::currentDateTime()) << "s" << std::endl;
        //std::cout << "**************************************************************************" << std::endl;

        oTransaction.commit();
    }
    catch(std::exception &e)
    {
        ajouterLog("error",std::string("Exception DataBase::getNodeMeasures() : ") + e.what());
    }

    m_oMutex.unlock();

    printPreparedStatementCount("DataBase::getNodeMeasures()");

    return lNodeMeasures;
}

Aucun commentaire:

Enregistrer un commentaire