#include #include #include #include #include #include #include #include #include #include #include #include #include const int NUM_THREADS = 2; const int NUM_OPS_PER_TRANSACTION = 25; const int NUM_TRANSACTIONS = 100; // TODO: set user, password and DSN const char *USER = ""; const char *PASSWORD = ""; const char *DSN = ""; struct tester_work_area { int thread_num; bool success; tester_work_area(int thread_num): thread_num(thread_num) { success = false; } }; typedef std::shared_ptr tester_work_area_ptr; const int NUM_COLS = 6; typedef struct { int attr1; int attr2; int attr3; double attr4; timeval attr5; char attr5_str[40]; char attr6[257]; } row_t; void *tester(void *work_area); void generate_row(row_t *row); void to_sql_timestamp(SQL_TIMESTAMP_STRUCT *sql_timestamp, const timeval *tv); std::string error_info(SQLSMALLINT handle_type, SQLHANDLE handle); int main() { std::vector threads; pthread_t thread_id; pthread_attr_t attr; int rc; void *status; std::vector work_areas; tester_work_area_ptr wa; struct timeval start_time; struct timeval end_time; struct timeval diff; double total_time; int total_transactions = NUM_THREADS * NUM_TRANSACTIONS; double tps_overall; srand(time(NULL)); std::cout << "Number of transactions: " << NUM_TRANSACTIONS << std::endl; std::cout << "Number of operations per transaction: " << NUM_OPS_PER_TRANSACTION << std::endl; std::cout << "Number of threads: " << NUM_THREADS << std::endl; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); gettimeofday(&start_time, NULL); for (int t = 0; t < NUM_THREADS; t++) { wa.reset(new tester_work_area(t)); work_areas.push_back(wa); rc = pthread_create(&thread_id, &attr, tester, (void *)wa.get()); if (rc != 0) { std::cerr << "Unable to create thread " << t << ": pthread_create returned: " << rc << std::endl; return 1; } threads.push_back(thread_id); } pthread_attr_destroy(&attr); for (int t = 0; t < NUM_THREADS; t++) { rc = pthread_join(threads[t], &status); if (rc != 0) { std::cerr << "Unable to join thread " << t << ": pthread_join returned: " << rc << std::endl; return 1; } } gettimeofday(&end_time, NULL); for (std::vector::const_iterator it = work_areas.begin(); it != work_areas.end(); ++it) { if (!(*it)->success) { std::cerr << "Unable to compute performance results as one or " << "more threads did not complete successfully" << std::endl; return 1; } } diff.tv_sec = end_time.tv_sec - start_time.tv_sec; diff.tv_usec = end_time.tv_usec - end_time.tv_usec; while (diff.tv_usec < 0) { diff.tv_usec += 1000000; diff.tv_sec--; } while (diff.tv_usec >= 1000000) { diff.tv_usec -= 1000000; diff.tv_sec++; } total_time = (double)diff.tv_sec + (double)diff.tv_usec / 1000000.0; tps_overall = (double)total_transactions / total_time; std::cout << "Total time: " << total_time << "s" << std::endl; std::cout << "Transaction throughput: " << tps_overall << "tps" << std::endl; return 0; } #define ON_ODBC_ERROR(rc, fmt, ...) \ if (!SQL_SUCCEEDED(rc)) { \ fprintf(stderr, fmt, ##__VA_ARGS__); \ pthread_exit(NULL); \ } void *tester(void *work_area) { tester_work_area *my_work_area = (tester_work_area *)work_area; SQLHENV env_handle; SQLHDBC db_handle; SQLHSTMT stmt_handle; SQLRETURN ret; row_t row; SQL_TIMESTAMP_STRUCT attr5; // Prepare environment handle ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env_handle); ON_ODBC_ERROR(ret, "Thread %d: unable to allocate environment handle", my_work_area->thread_num); ret = SQLSetEnvAttr(env_handle, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3, 0); ON_ODBC_ERROR(ret, "Thread %d: unable to set environment attributes", my_work_area->thread_num); // Prepare connection handle ret = SQLAllocHandle(SQL_HANDLE_DBC, env_handle, &db_handle); ON_ODBC_ERROR(ret, "Thread %d: unable to allocate connection handle", my_work_area->thread_num); ret = SQLSetConnectAttr(db_handle, SQL_ATTR_LOGIN_TIMEOUT, (SQLPOINTER)5, 0); ON_ODBC_ERROR(ret, "Thread %d: unable to set connection attributes", my_work_area->thread_num); ret = SQLSetConnectAttr(db_handle, SQL_ATTR_AUTOCOMMIT, SQL_AUTOCOMMIT_OFF, 0); ON_ODBC_ERROR(ret, "Thread %d: unable to set connection attributes", my_work_area->thread_num); // Connect to the database ret = SQLConnect(db_handle, (SQLCHAR *)DSN, SQL_NTS, (SQLCHAR *)USER, SQL_NTS, (SQLCHAR *)PASSWORD, SQL_NTS); ON_ODBC_ERROR(ret, "Thread %d: unable to connect to the database", my_work_area->thread_num); // Prepare the SQL statement handle ret = SQLAllocHandle(SQL_HANDLE_STMT, db_handle, &stmt_handle); ON_ODBC_ERROR(ret, "Thread %d: unable to allocate statement handle", my_work_area->thread_num); // Prepare the SQL statement ret = SQLPrepare(stmt_handle, (SQLCHAR *)"insert into test_table(attr1_int, attr2_int, " "attr3_int, attr4_double, attr5_time, attr6_str) values ( " "?, ?, ?, ?, ?, ?)", SQL_NTS); ON_ODBC_ERROR(ret, "Thread %d: unable to prepare statement", my_work_area->thread_num); // Bind the parameters ret = SQLBindParameter(stmt_handle, 1, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER, 0, 0, (SQLPOINTER)&row.attr1, 0, NULL); ON_ODBC_ERROR(ret, "Thread %d: unable to bind parameter 1", my_work_area->thread_num); ret = SQLBindParameter(stmt_handle, 2, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER, 0, 0, (SQLPOINTER)&row.attr2, 0, NULL); ON_ODBC_ERROR(ret, "Thread %d: unable to bind parameter 2", my_work_area->thread_num); ret = SQLBindParameter(stmt_handle, 3, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER, 0, 0, (SQLPOINTER)&row.attr3, 0, NULL); ON_ODBC_ERROR(ret, "Thread %d: unable to bind parameter 3", my_work_area->thread_num); ret = SQLBindParameter(stmt_handle, 4, SQL_PARAM_INPUT, SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, (SQLPOINTER)&row.attr4, 0, NULL); ON_ODBC_ERROR(ret, "Thread %d: unable to bind parameter 4", my_work_area->thread_num); ret = SQLBindParameter(stmt_handle, 5, SQL_PARAM_INPUT, SQL_C_TYPE_TIMESTAMP, SQL_TYPE_TIMESTAMP, 0, 0, (SQLPOINTER)&attr5, sizeof(attr5), NULL); ON_ODBC_ERROR(ret, "Thread %d: unable to bind parameter 5", my_work_area->thread_num); ret = SQLBindParameter(stmt_handle, 6, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR, 0, 0, (SQLPOINTER)row.attr6, sizeof(row.attr6), NULL); ON_ODBC_ERROR(ret, "Thread %d: unable to bind parameter 6", my_work_area->thread_num); for (int i = 0; i < NUM_TRANSACTIONS; i++) { for (int j = 0; j < NUM_OPS_PER_TRANSACTION; j++) { generate_row(&row); to_sql_timestamp(&attr5, &row.attr5); ret = SQLExecute(stmt_handle); ON_ODBC_ERROR(ret, "Thread %d: unable to execute statement; ODBC " "error info:\n%s", my_work_area->thread_num, error_info(SQL_HANDLE_STMT, stmt_handle).c_str()); } ret = SQLEndTran(SQL_HANDLE_DBC, db_handle, SQL_COMMIT); ON_ODBC_ERROR(ret, "Thread %d: unable to commit transaction; ODBC " "error info:\n%s", my_work_area->thread_num, error_info(SQL_HANDLE_STMT, stmt_handle).c_str()); } my_work_area->success = true; pthread_exit(NULL); return NULL; } void generate_row(row_t *row) { std::ostringstream oss; std::size_t len; row->attr1 = rand(); row->attr2 = rand(); row->attr3 = rand(); row->attr4 = rand() / (double)RAND_MAX; gettimeofday(&row->attr5, NULL); strftime(row->attr5_str, sizeof(row->attr5_str), "%F %T", localtime(&row->attr5.tv_sec)); oss << "New row generated at time: " << row->attr5_str; len = oss.str().copy(row->attr6, sizeof(row->attr6) - 1); row->attr6[len] = '\0'; } void to_sql_timestamp(SQL_TIMESTAMP_STRUCT *sql_timestamp, const timeval *tv) { tm *time_info = localtime(&tv->tv_sec); sql_timestamp->year = time_info->tm_year + 1900; sql_timestamp->month = time_info->tm_mon + 1; sql_timestamp->day = time_info->tm_mday; sql_timestamp->hour = time_info->tm_hour; sql_timestamp->minute = time_info->tm_min; sql_timestamp->second = time_info->tm_sec; sql_timestamp->fraction = tv->tv_usec * 1000; } std::string error_info(SQLSMALLINT handle_type, SQLHANDLE handle) { std::ostringstream ss; SQLCHAR sql_state[6]; SQLINTEGER native_error; SQLCHAR msg[SQL_MAX_MESSAGE_LENGTH]; SQLSMALLINT msg_len; int i = 1; int ret; ret = SQLGetDiagRec(handle_type, handle, i, sql_state, &native_error, msg, sizeof(msg), &msg_len); if (!(SQL_SUCCEEDED(ret))) { return std::string("unable to retrieve ODBC error information"); } do { ss << "Status record: " << i << std::endl; ss << "SQL state: " << sql_state << std::endl; ss << "Native error: " << native_error << std::endl; ss << "Message: " << msg << std::endl; i++; ret = SQLGetDiagRec(handle_type, handle, i, sql_state, &native_error, msg, sizeof(msg), &msg_len); } while (SQL_SUCCEEDED(ret)); return ss.str(); }