1 /* 2 ** 2014-12-11 3 ** 4 ** The author disclaims copyright to this source code. In place of 5 ** a legal notice, here is a blessing: 6 ** 7 ** May you do good and not evil. 8 ** May you find forgiveness for yourself and forgive others. 9 ** May you share freely, never taking more than you give. 10 ** 11 ************************************************************************* 12 ** This file implements a simple standalone program used to stress the 13 ** SQLite library when accessing the same set of databases simultaneously 14 ** from multiple threads in shared-cache mode. 15 ** 16 ** This test program runs on unix-like systems only. It uses pthreads. 17 ** To compile: 18 ** 19 ** gcc -o tt4 -I. threadtest4.c sqlite3.c -ldl -lpthread 20 ** 21 ** To run: 22 ** 23 ** ./tt4 10 24 ** 25 ** The argument is the number of threads. 26 */ 27 #include "sqlite3.h" 28 #include <pthread.h> 29 #include <sched.h> 30 #include <stdio.h> 31 #include <stdlib.h> 32 #include <string.h> 33 #include <unistd.h> 34 #include <stdarg.h> 35 36 /* 37 ** An instance of the following structure is passed into each worker 38 ** thread. 39 */ 40 typedef struct WorkerInfo WorkerInfo; 41 struct WorkerInfo { 42 int tid; /* Thread ID */ 43 unsigned wkrFlags; /* Flags */ 44 sqlite3 *mainDb; /* Database connection of the main thread */ 45 sqlite3 *db; /* Database connection of this thread */ 46 int nErr; /* Number of errors seen by this thread */ 47 int nTest; /* Number of tests run by this thread */ 48 char *zMsg; /* Message returned by this thread */ 49 pthread_t id; /* Thread id */ 50 pthread_mutex_t *pWrMutex; /* Hold this mutex while writing */ 51 }; 52 53 /* 54 ** Allowed values for WorkerInfo.wkrFlags 55 */ 56 #define TT4_SERIALIZED 0x0000001 /* The --serialized option is used */ 57 #define TT4_WAL 0x0000002 /* WAL mode in use */ 58 #define TT4_TRACE 0x0000004 /* Trace activity */ 59 60 61 /* 62 ** Report an OOM error and die if the argument is NULL 63 */ 64 static void check_oom(void *x){ 65 if( x==0 ){ 66 fprintf(stderr, "out of memory\n"); 67 exit(1); 68 } 69 } 70 71 /* 72 ** Allocate memory. If the allocation fails, print an error message and 73 ** kill the process. 74 */ 75 static void *safe_malloc(int sz){ 76 void *x = sqlite3_malloc(sz>0?sz:1); 77 check_oom(x); 78 return x; 79 } 80 81 /* 82 ** Print a trace message for a worker 83 */ 84 static void worker_trace(WorkerInfo *p, const char *zFormat, ...){ 85 va_list ap; 86 char *zMsg; 87 if( (p->wkrFlags & TT4_TRACE)==0 ) return; 88 va_start(ap, zFormat); 89 zMsg = sqlite3_vmprintf(zFormat, ap); 90 check_oom(zMsg); 91 va_end(ap); 92 fprintf(stderr, "TRACE(%02d): %s\n", p->tid, zMsg); 93 sqlite3_free(zMsg); 94 } 95 96 /* 97 ** Prepare a single SQL query 98 */ 99 static sqlite3_stmt *prep_sql(sqlite3 *db, const char *zFormat, ...){ 100 va_list ap; 101 char *zSql; 102 int rc; 103 sqlite3_stmt *pStmt = 0; 104 105 va_start(ap, zFormat); 106 zSql = sqlite3_vmprintf(zFormat, ap); 107 va_end(ap); 108 check_oom(zSql); 109 rc = sqlite3_prepare_v2(db, zSql, -1, &pStmt, 0); 110 if( rc!=SQLITE_OK ){ 111 fprintf(stderr, "SQL error (%d,%d): %s\nWhile preparing: [%s]\n", 112 rc, sqlite3_extended_errcode(db), sqlite3_errmsg(db), zSql); 113 exit(1); 114 } 115 sqlite3_free(zSql); 116 return pStmt; 117 } 118 119 /* 120 ** Run a SQL statements. Panic if unable. 121 */ 122 static void run_sql(WorkerInfo *p, const char *zFormat, ...){ 123 va_list ap; 124 char *zSql; 125 int rc; 126 sqlite3_stmt *pStmt = 0; 127 int nRetry = 0; 128 129 va_start(ap, zFormat); 130 zSql = sqlite3_vmprintf(zFormat, ap); 131 va_end(ap); 132 check_oom(zSql); 133 rc = sqlite3_prepare_v2(p->db, zSql, -1, &pStmt, 0); 134 if( rc!=SQLITE_OK ){ 135 fprintf(stderr, "SQL error (%d,%d): %s\nWhile preparing: [%s]\n", 136 rc, sqlite3_extended_errcode(p->db), sqlite3_errmsg(p->db), zSql); 137 exit(1); 138 } 139 worker_trace(p, "running [%s]", zSql); 140 while( (rc = sqlite3_step(pStmt))!=SQLITE_DONE ){ 141 if( (rc&0xff)==SQLITE_BUSY || (rc&0xff)==SQLITE_LOCKED ){ 142 sqlite3_reset(pStmt); 143 nRetry++; 144 if( nRetry<10 ){ 145 worker_trace(p, "retry %d for [%s]", nRetry, zSql); 146 sched_yield(); 147 continue; 148 }else{ 149 fprintf(stderr, "Deadlock in thread %d while running [%s]\n", 150 p->tid, zSql); 151 exit(1); 152 } 153 } 154 if( rc!=SQLITE_ROW ){ 155 fprintf(stderr, "SQL error (%d,%d): %s\nWhile running [%s]\n", 156 rc, sqlite3_extended_errcode(p->db), sqlite3_errmsg(p->db), zSql); 157 exit(1); 158 } 159 } 160 sqlite3_free(zSql); 161 sqlite3_finalize(pStmt); 162 } 163 164 165 /* 166 ** Open the database connection for WorkerInfo. The order in which 167 ** the files are opened is a function of the tid value. 168 */ 169 static void worker_open_connection(WorkerInfo *p, int iCnt){ 170 char *zFile; 171 int x; 172 int rc; 173 static const unsigned char aOrder[6][3] = { 174 { 1, 2, 3}, 175 { 1, 3, 2}, 176 { 2, 1, 3}, 177 { 2, 3, 1}, 178 { 3, 1, 2}, 179 { 3, 2, 1} 180 }; 181 x = (p->tid + iCnt) % 6; 182 zFile = sqlite3_mprintf("tt4-test%d.db", aOrder[x][0]); 183 check_oom(zFile); 184 worker_trace(p, "open %s", zFile); 185 rc = sqlite3_open_v2(zFile, &p->db, 186 SQLITE_OPEN_READWRITE|SQLITE_OPEN_SHAREDCACHE, 0); 187 if( rc!=SQLITE_OK ){ 188 fprintf(stderr, "sqlite_open_v2(%s) failed on thread %d\n", 189 zFile, p->tid); 190 exit(1); 191 } 192 sqlite3_free(zFile); 193 run_sql(p, "PRAGMA read_uncommitted=ON;"); 194 sqlite3_busy_timeout(p->db, 10000); 195 run_sql(p, "PRAGMA synchronous=OFF;"); 196 run_sql(p, "ATTACH 'tt4-test%d.db' AS aux1", aOrder[x][1]); 197 run_sql(p, "ATTACH 'tt4-test%d.db' AS aux2", aOrder[x][2]); 198 } 199 200 /* 201 ** Close the worker database connection 202 */ 203 static void worker_close_connection(WorkerInfo *p){ 204 if( p->db ){ 205 worker_trace(p, "close"); 206 sqlite3_close(p->db); 207 p->db = 0; 208 } 209 } 210 211 /* 212 ** Delete all content in the three databases associated with a 213 ** single thread. Make this happen all in a single transaction if 214 ** inTrans is true, or separately for each database if inTrans is 215 ** false. 216 */ 217 static void worker_delete_all_content(WorkerInfo *p, int inTrans){ 218 if( inTrans ){ 219 pthread_mutex_lock(p->pWrMutex); 220 run_sql(p, "BEGIN"); 221 run_sql(p, "DELETE FROM t1 WHERE tid=%d", p->tid); 222 run_sql(p, "DELETE FROM t2 WHERE tid=%d", p->tid); 223 run_sql(p, "DELETE FROM t3 WHERE tid=%d", p->tid); 224 run_sql(p, "COMMIT"); 225 pthread_mutex_unlock(p->pWrMutex); 226 p->nTest++; 227 }else{ 228 pthread_mutex_lock(p->pWrMutex); 229 run_sql(p, "DELETE FROM t1 WHERE tid=%d", p->tid); 230 pthread_mutex_unlock(p->pWrMutex); 231 p->nTest++; 232 pthread_mutex_lock(p->pWrMutex); 233 run_sql(p, "DELETE FROM t2 WHERE tid=%d", p->tid); 234 pthread_mutex_unlock(p->pWrMutex); 235 p->nTest++; 236 pthread_mutex_lock(p->pWrMutex); 237 run_sql(p, "DELETE FROM t3 WHERE tid=%d", p->tid); 238 pthread_mutex_unlock(p->pWrMutex); 239 p->nTest++; 240 } 241 } 242 243 /* 244 ** Create rows mn through mx in table iTab for the given worker 245 */ 246 static void worker_add_content(WorkerInfo *p, int mn, int mx, int iTab){ 247 char *zTabDef; 248 switch( iTab ){ 249 case 1: zTabDef = "t1(tid,sp,a,b,c)"; break; 250 case 2: zTabDef = "t2(tid,sp,d,e,f)"; break; 251 case 3: zTabDef = "t3(tid,sp,x,y,z)"; break; 252 } 253 pthread_mutex_lock(p->pWrMutex); 254 run_sql(p, 255 "WITH RECURSIVE\n" 256 " c(i) AS (VALUES(%d) UNION ALL SELECT i+1 FROM c WHERE i<%d)\n" 257 "INSERT INTO %s SELECT %d, zeroblob(3000), i, printf('%%d',i), i FROM c;", 258 mn, mx, zTabDef, p->tid 259 ); 260 pthread_mutex_unlock(p->pWrMutex); 261 p->nTest++; 262 } 263 264 /* 265 ** Set an error message on a worker 266 */ 267 static void worker_error(WorkerInfo *p, const char *zFormat, ...){ 268 va_list ap; 269 p->nErr++; 270 sqlite3_free(p->zMsg); 271 va_start(ap, zFormat); 272 p->zMsg = sqlite3_vmprintf(zFormat, ap); 273 va_end(ap); 274 } 275 276 /* 277 ** Each thread runs the following function. 278 */ 279 static void *worker_thread(void *pArg){ 280 WorkerInfo *p = (WorkerInfo*)pArg; 281 int iOuter; 282 int i; 283 int rc; 284 sqlite3_stmt *pStmt; 285 286 printf("worker %d startup\n", p->tid); fflush(stdout); 287 for(iOuter=1; iOuter<=4; iOuter++){ 288 worker_open_connection(p, iOuter); 289 for(i=0; i<4; i++){ 290 worker_add_content(p, i*100+1, (i+1)*100, (p->tid+iOuter)%3 + 1); 291 worker_add_content(p, i*100+1, (i+1)*100, (p->tid+iOuter+1)%3 + 1); 292 worker_add_content(p, i*100+1, (i+1)*100, (p->tid+iOuter+2)%3 + 1); 293 } 294 295 pStmt = prep_sql(p->db, "SELECT count(a) FROM t1 WHERE tid=%d", p->tid); 296 worker_trace(p, "query [%s]", sqlite3_sql(pStmt)); 297 rc = sqlite3_step(pStmt); 298 if( rc!=SQLITE_ROW ){ 299 worker_error(p, "Failed to step: %s", sqlite3_sql(pStmt)); 300 }else if( sqlite3_column_int(pStmt, 0)!=400 ){ 301 worker_error(p, "Wrong result: %d", sqlite3_column_int(pStmt,0)); 302 } 303 if( p->nErr ) break; 304 sqlite3_finalize(pStmt); 305 306 worker_delete_all_content(p, (p->tid+iOuter)%2); 307 worker_close_connection(p); 308 p->db = 0; 309 } 310 worker_close_connection(p); 311 printf("worker %d finished\n", p->tid); fflush(stdout); 312 return 0; 313 } 314 315 int main(int argc, char **argv){ 316 int nWorker = 0; /* Number of worker threads */ 317 int i; /* Loop counter */ 318 WorkerInfo *aInfo; /* Information for each worker */ 319 unsigned wkrFlags = 0; /* Default worker flags */ 320 int nErr = 0; /* Number of errors */ 321 int nTest = 0; /* Number of tests */ 322 int rc; /* Return code */ 323 sqlite3 *db = 0; /* Main database connection */ 324 pthread_mutex_t wrMutex; /* The write serialization mutex */ 325 WorkerInfo infoTop; /* WorkerInfo for the main thread */ 326 WorkerInfo *p; /* Pointer to infoTop */ 327 328 sqlite3_config(SQLITE_CONFIG_MULTITHREAD); 329 for(i=1; i<argc; i++){ 330 const char *z = argv[i]; 331 if( z[0]=='-' ){ 332 if( z[1]=='-' && z[2]!=0 ) z++; 333 if( strcmp(z,"-multithread")==0 ){ 334 sqlite3_config(SQLITE_CONFIG_MULTITHREAD); 335 wkrFlags &= ~TT4_SERIALIZED; 336 }else if( strcmp(z,"-serialized")==0 ){ 337 sqlite3_config(SQLITE_CONFIG_SERIALIZED); 338 wkrFlags |= TT4_SERIALIZED; 339 }else if( strcmp(z,"-wal")==0 ){ 340 wkrFlags |= TT4_WAL; 341 }else if( strcmp(z,"-trace")==0 ){ 342 wkrFlags |= TT4_TRACE; 343 }else{ 344 fprintf(stderr, "unknown command-line option: %s\n", argv[i]); 345 exit(1); 346 } 347 }else if( z[0]>='1' && z[0]<='9' && nWorker==0 ){ 348 nWorker = atoi(z); 349 if( nWorker<2 ){ 350 fprintf(stderr, "minimum of 2 threads\n"); 351 exit(1); 352 } 353 }else{ 354 fprintf(stderr, "extra command-line argument: \"%s\"\n", argv[i]); 355 exit(1); 356 } 357 } 358 if( nWorker==0 ){ 359 fprintf(stderr, 360 "usage: %s ?OPTIONS? N\n" 361 "N is the number of threads and must be at least 2.\n" 362 "Options:\n" 363 " --serialized\n" 364 " --multithread\n" 365 ,argv[0] 366 ); 367 exit(1); 368 } 369 if( !sqlite3_threadsafe() ){ 370 fprintf(stderr, "requires a threadsafe build of SQLite\n"); 371 exit(1); 372 } 373 sqlite3_initialize(); 374 sqlite3_enable_shared_cache(1); 375 pthread_mutex_init(&wrMutex, 0); 376 377 /* Initialize the test database files */ 378 (void)unlink("tt4-test1.db"); 379 (void)unlink("tt4-test2.db"); 380 (void)unlink("tt4-test3.db"); 381 rc = sqlite3_open("tt4-test1.db", &db); 382 if( rc!=SQLITE_OK ){ 383 fprintf(stderr, "Unable to open test database: tt4-test2.db\n"); 384 exit(1); 385 } 386 memset(&infoTop, 0, sizeof(infoTop)); 387 infoTop.db = db; 388 infoTop.wkrFlags = wkrFlags; 389 p = &infoTop; 390 if( wkrFlags & TT4_WAL ){ 391 run_sql(p, "PRAGMA journal_mode=WAL"); 392 } 393 run_sql(p, "PRAGMA synchronous=OFF"); 394 run_sql(p, "CREATE TABLE IF NOT EXISTS t1(tid INTEGER, sp, a, b, c)"); 395 run_sql(p, "CREATE INDEX t1tid ON t1(tid)"); 396 run_sql(p, "CREATE INDEX t1ab ON t1(a,b)"); 397 run_sql(p, "ATTACH 'tt4-test2.db' AS 'test2'"); 398 run_sql(p, "CREATE TABLE IF NOT EXISTS test2.t2(tid INTEGER, sp, d, e, f)"); 399 run_sql(p, "CREATE INDEX test2.t2tid ON t2(tid)"); 400 run_sql(p, "CREATE INDEX test2.t2de ON t2(d,e)"); 401 run_sql(p, "ATTACH 'tt4-test3.db' AS 'test3'"); 402 run_sql(p, "CREATE TABLE IF NOT EXISTS test3.t3(tid INTEGER, sp, x, y, z)"); 403 run_sql(p, "CREATE INDEX test3.t3tid ON t3(tid)"); 404 run_sql(p, "CREATE INDEX test3.t3xy ON t3(x,y)"); 405 aInfo = safe_malloc( sizeof(*aInfo)*nWorker ); 406 memset(aInfo, 0, sizeof(*aInfo)*nWorker); 407 for(i=0; i<nWorker; i++){ 408 aInfo[i].tid = i+1; 409 aInfo[i].wkrFlags = wkrFlags; 410 aInfo[i].mainDb = db; 411 aInfo[i].pWrMutex = &wrMutex; 412 rc = pthread_create(&aInfo[i].id, 0, worker_thread, &aInfo[i]); 413 if( rc!=0 ){ 414 fprintf(stderr, "thread creation failed for thread %d\n", i+1); 415 exit(1); 416 } 417 sched_yield(); 418 } 419 for(i=0; i<nWorker; i++){ 420 pthread_join(aInfo[i].id, 0); 421 printf("Joined thread %d: %d errors in %d tests", 422 aInfo[i].tid, aInfo[i].nErr, aInfo[i].nTest); 423 if( aInfo[i].zMsg ){ 424 printf(": %s\n", aInfo[i].zMsg); 425 }else{ 426 printf("\n"); 427 } 428 nErr += aInfo[i].nErr; 429 nTest += aInfo[i].nTest; 430 fflush(stdout); 431 } 432 sqlite3_close(db); 433 sqlite3_free(aInfo); 434 printf("Total %d errors in %d tests\n", nErr, nTest); 435 return nErr; 436 } 437