I'm trying to add concurrency to my code, but keep getting a 'debug' error whenever I'm running it with more than one thread. I've looked around on SO and google for some sort of solution, but to no avail.
Error message:
My initial thought was that I might need a mutex to lock the read/write permissions of my vector, but since I'm iterating through an array of threads, their access to the vector should already be restricted.
Basically I need to iterate through a vector> and send each vector to a database (using ODBC). Running it on the main thread works fine.
Can someone give me an idea of what I'm doing wrong and how to correct it?
Here is the code: (Please excuse any uglyness to the code..)
For clarity, the imporant parts of the code are in the constructor and in assignThread
#include "stdafx.h"
#include "database_con.h"
////////////////////////////////////////////////////////////////////////
// Show errors from the SQLHANDLE
void database_con::show_error(unsigned int handletype, const SQLHANDLE& handle)
{
SQLWCHAR sqlstate[1024];
SQLWCHAR message[1024];
if (SQL_SUCCESS == SQLGetDiagRec(handletype, handle, 1, sqlstate, NULL, message, 1024, NULL))
wcout << "Message: " << message << "\nSQLSTATE: " << sqlstate << endl;
}
std::wstring database_con::StringToWString(const std::string& s)
{
std::wstring temp(s.length(), L' ');
std::copy(s.begin(), s.end(), temp.begin());
return temp;
}
////////////////////////////////////////////////////////////////////////
// Builds the stored procedure query.
std::wstring database_con::buildQuery(vector<std::wstring> input, string symbol)
{
std::wstringstream builder;
builder << L"EXEC sp_addHistorical " << "@Symbol='" << L"" << StringToWString(symbol) << "'," <<
"@Date='" << (wstring)L"" << input.at(0) << "'," <<
"@Open=" << (wstring)L"" << input.at(1) << "," <<
"@Close=" << (wstring)L"" << input.at(2) << "," <<
"@MaxPrice=" << (wstring)L"" << input.at(3) << "," <<
"@MinPrice=" << (wstring)L"" << input.at(4) << "," <<
"@Volume=" << (wstring)L"" << input.at(5) << ";";
return builder.str();
}
void database_con::executeQuery(wstring query) {
if (SQL_SUCCESS != SQLExecDirectW(stmt, const_cast<SQLWCHAR*>(query.c_str()), SQL_NTS)) {
std::cout << "Execute error " << std::endl;
show_error(SQL_HANDLE_STMT, stmt);
std::wcout << L"Unsuccessful Query: " << query << std::endl;
}
// Close Cursor before next iteration starts:
SQLRETURN closeCursRet = SQLFreeStmt(stmt, SQL_CLOSE);
if (!SQL_SUCCEEDED(closeCursRet))
{
show_error(SQL_HANDLE_STMT, stmt);
// maybe add some handling for the case that closing failed.
}
}
void database_con::assignThread(std::vector<vector<std::wstring>> historical, std::string symbol) {
int nThreads = 5;
std::thread threads[5];
if (historical.size() < 5) {
threads[historical.size()];
nThreads = historical.size();
}
for (int i = 0; i < nThreads; i++) {
threads[i] = std::thread(&database_con::executeQuery, this, buildQuery(historical.at(historical.size()), symbol));
historical.pop_back();
}
for (auto& th : threads) th.join();
}
////////////////////////////////////////////////////////////////////////
// Constructs a database connector object with the historical data and its symbol
database_con::database_con(std::vector<std::vector<std::wstring>> historical, string symbol){
/*
Set up the handlers
*/
/* Allocate an environment handle */
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
/* We want ODBC 3 support */
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, (void *)SQL_OV_ODBC3, 0);
/* Allocate a connection handle */
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);
/* Connect to the DSN */
SQLDriverConnectW(dbc, NULL, L"DRIVER={SQL Server};SERVER=DESKTOP-L5OT4OH\\SQLEXPRESS;DATABASE=stocks;UID=geo;PWD=geostocks;", SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE);
/* Check for success */
if (SQL_SUCCESS != SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt))
{
show_error(SQL_HANDLE_DBC, dbc);
std::cout << "Failed to connect";
}
std::cout << "Building and executing the query" << std::endl;
while (!historical.empty()) {
assignThread(historical, symbol);
}
/*for (_mVecHistIter = historical.begin();
_mVecHistIter != historical.end();
_mVecHistIter++) {
assignThread(*_mVecHistIter, symbol);
//std::thread t(&database_con::executeQuery, this, buildQuery(*_mVecHistIter, symbol));
//std::thread t2(&database_con::executeQuery, this, buildQuery(historical.at(_mVecHistIter - historical.begin()+1), symbol));
std::thread t3(&database_con::executeQuery, this, buildQuery(*_mVecHistIter, symbol));
std::thread t4(&database_con::executeQuery, this, buildQuery(*_mVecHistIter, symbol));
std::thread t5(&database_con::executeQuery, this, buildQuery(*_mVecHistIter, symbol));
*/
//t.join();
//t2.join();
/*
t3.join();
t4.join();
t5.join();
*/
//executeQuery(buildQuery(*_mVecHistIter, symbol));
/*_mSymbol = symbol;
std::wstringstream stream(StringToWString(historical));
std::wstring line;
int row = 0;
while (std::getline(stream, line)) {
if (row > 0) {
vector<wstring> vHistorical = parseData(L"" + line, ',');
std::wstring SQL = buildQuery(vHistorical, _mSymbol);
if (SQL_SUCCESS != SQLExecDirectW(stmt, const_cast<SQLWCHAR*>(SQL.c_str()), SQL_NTS)) {
std::cout << "Execute error " << std::endl;
show_error(SQL_HANDLE_STMT, stmt);
std::wcout << L"Unsuccessful Query: " << SQL << std::endl;
}
// Close Cursor before next iteration starts:
SQLRETURN closeCursRet = SQLFreeStmt(stmt, SQL_CLOSE);
if (!SQL_SUCCEEDED(closeCursRet))
{
show_error(SQL_HANDLE_STMT, stmt);
// maybe add some handling for the case that closing failed.
}
}
row++;
}*/
std::cout << "Query " << _mSymbol << " ready" << std::endl;
}
database_con::~database_con() {
std::cout << "The database object has been deleted" << std::endl;
}
Here is the stored procedure that is used:
USE [stocks]
GO
/****** Object: StoredProcedure [dbo].[sp_addHistorical] Script Date: 02/04/2016 15:04:50 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
ALTER PROCEDURE [dbo].[sp_addHistorical]
@Symbol nchar(10),@Date datetime,
@Open decimal(8,2),@Close decimal(8,2),@MinPrice decimal(8,2),
@MaxPrice decimal(8,2),@Volume int
AS
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED
BEGIN TRANSACTION
MERGE HistoricalStock WITH (UPDLOCK) AS myTarget
USING (SELECT @Symbol AS Symbol,
@Date AS Date, @Open AS [Open], @Close AS [Close],
@MinPrice AS MinPrice, @MaxPrice AS MaxPrice,@Volume AS Volume) AS mySource
ON mySource.Symbol = myTarget.Symbol AND mySource.Date = myTarget.Date
WHEN MATCHED
THEN UPDATE
SET [Open] = mySource.[Open], [Close] = mySource.[Close],
MinPrice = mySource.MinPrice, MaxPrice = mySource.MaxPrice, Volume = mySource.Volume
WHEN NOT MATCHED
THEN
INSERT(Symbol,Date,[Open],[Close],MinPrice,MaxPrice,Volume)
VALUES(@Symbol,@Date,@Open,@Close,@MinPrice,@MaxPrice,@Volume);
COMMIT
Any help would be greatly appriciated! :)
Aucun commentaire:
Enregistrer un commentaire