xref: /sqlite-3.40.0/test/threadtest4.c (revision 18b67f3f)
1 /*
2 ** 2014-12-11
3 **
4 ** The author disclaims copyright to this source code.  In place of
5 ** a legal notice, here is a blessing:
6 **
7 **    May you do good and not evil.
8 **    May you find forgiveness for yourself and forgive others.
9 **    May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 ** This file implements a simple standalone program used to stress the
13 ** SQLite library when accessing the same set of databases simultaneously
14 ** from multiple threads in shared-cache mode.
15 **
16 ** This test program runs on unix-like systems only.  It uses pthreads.
17 ** To compile:
18 **
19 **     gcc -o tt4 -I. threadtest4.c sqlite3.c -ldl -lpthread
20 **
21 ** To run:
22 **
23 **     ./tt4 10
24 **
25 ** The argument is the number of threads.
26 */
27 #include "sqlite3.h"
28 #include <pthread.h>
29 #include <sched.h>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <stdarg.h>
35 
36 /*
37 ** An instance of the following structure is passed into each worker
38 ** thread.
39 */
40 typedef struct WorkerInfo WorkerInfo;
41 struct WorkerInfo {
42   int tid;                    /* Thread ID */
43   unsigned wkrFlags;          /* Flags */
44   sqlite3 *mainDb;            /* Database connection of the main thread */
45   sqlite3 *db;                /* Database connection of this thread */
46   int nErr;                   /* Number of errors seen by this thread */
47   int nTest;                  /* Number of tests run by this thread */
48   char *zMsg;                 /* Message returned by this thread */
49   pthread_t id;               /* Thread id */
50   pthread_mutex_t *pWrMutex;  /* Hold this mutex while writing */
51 };
52 
53 /*
54 ** Allowed values for WorkerInfo.wkrFlags
55 */
56 #define TT4_SERIALIZED    0x0000001   /* The --serialized option is used */
57 #define TT4_WAL           0x0000002   /* WAL mode in use */
58 #define TT4_TRACE         0x0000004   /* Trace activity */
59 
60 
61 /*
62 ** Report an OOM error and die if the argument is NULL
63 */
64 static void check_oom(void *x){
65   if( x==0 ){
66     fprintf(stderr, "out of memory\n");
67     exit(1);
68   }
69 }
70 
71 /*
72 ** Allocate memory.  If the allocation fails, print an error message and
73 ** kill the process.
74 */
75 static void *safe_malloc(int sz){
76   void *x = sqlite3_malloc(sz>0?sz:1);
77   check_oom(x);
78   return x;
79 }
80 
81 /*
82 ** Print a trace message for a worker
83 */
84 static void worker_trace(WorkerInfo *p, const char *zFormat, ...){
85   va_list ap;
86   char *zMsg;
87   if( (p->wkrFlags & TT4_TRACE)==0 ) return;
88   va_start(ap, zFormat);
89   zMsg = sqlite3_vmprintf(zFormat, ap);
90   check_oom(zMsg);
91   va_end(ap);
92   fprintf(stderr, "TRACE(%02d): %s\n", p->tid, zMsg);
93   sqlite3_free(zMsg);
94 }
95 
96 /*
97 ** Prepare a single SQL query
98 */
99 static sqlite3_stmt *prep_sql(sqlite3 *db, const char *zFormat, ...){
100   va_list ap;
101   char *zSql;
102   int rc;
103   sqlite3_stmt *pStmt = 0;
104 
105   va_start(ap, zFormat);
106   zSql = sqlite3_vmprintf(zFormat, ap);
107   va_end(ap);
108   check_oom(zSql);
109   rc = sqlite3_prepare_v2(db, zSql, -1, &pStmt, 0);
110   if( rc!=SQLITE_OK ){
111     fprintf(stderr, "SQL error (%d,%d): %s\nWhile preparing: [%s]\n",
112             rc, sqlite3_extended_errcode(db), sqlite3_errmsg(db), zSql);
113     exit(1);
114   }
115   sqlite3_free(zSql);
116   return pStmt;
117 }
118 
119 /*
120 ** Run a SQL statements.  Panic if unable.
121 */
122 static void run_sql(WorkerInfo *p, const char *zFormat, ...){
123   va_list ap;
124   char *zSql;
125   int rc;
126   sqlite3_stmt *pStmt = 0;
127   int nRetry = 0;
128 
129   va_start(ap, zFormat);
130   zSql = sqlite3_vmprintf(zFormat, ap);
131   va_end(ap);
132   check_oom(zSql);
133   rc = sqlite3_prepare_v2(p->db, zSql, -1, &pStmt, 0);
134   if( rc!=SQLITE_OK ){
135     fprintf(stderr, "SQL error (%d,%d): %s\nWhile preparing: [%s]\n",
136             rc, sqlite3_extended_errcode(p->db), sqlite3_errmsg(p->db), zSql);
137     exit(1);
138   }
139   worker_trace(p, "running [%s]", zSql);
140   while( (rc = sqlite3_step(pStmt))!=SQLITE_DONE ){
141     if( (rc&0xff)==SQLITE_BUSY || (rc&0xff)==SQLITE_LOCKED ){
142       sqlite3_reset(pStmt);
143       nRetry++;
144       if( nRetry<10 ){
145         worker_trace(p, "retry %d for [%s]", nRetry, zSql);
146         sched_yield();
147         continue;
148       }else{
149         fprintf(stderr, "Deadlock in thread %d while running [%s]\n",
150                 p->tid, zSql);
151         exit(1);
152       }
153     }
154     if( rc!=SQLITE_ROW ){
155       fprintf(stderr, "SQL error (%d,%d): %s\nWhile running [%s]\n",
156               rc, sqlite3_extended_errcode(p->db), sqlite3_errmsg(p->db), zSql);
157       exit(1);
158     }
159   }
160   sqlite3_free(zSql);
161   sqlite3_finalize(pStmt);
162 }
163 
164 
165 /*
166 ** Open the database connection for WorkerInfo.  The order in which
167 ** the files are opened is a function of the tid value.
168 */
169 static void worker_open_connection(WorkerInfo *p, int iCnt){
170   char *zFile;
171   int x;
172   int rc;
173   static const unsigned char aOrder[6][3] = {
174     { 1, 2, 3},
175     { 1, 3, 2},
176     { 2, 1, 3},
177     { 2, 3, 1},
178     { 3, 1, 2},
179     { 3, 2, 1}
180   };
181   x = (p->tid + iCnt) % 6;
182   zFile = sqlite3_mprintf("tt4-test%d.db", aOrder[x][0]);
183   check_oom(zFile);
184   worker_trace(p, "open %s", zFile);
185   rc = sqlite3_open_v2(zFile, &p->db,
186                        SQLITE_OPEN_READWRITE|SQLITE_OPEN_SHAREDCACHE, 0);
187   if( rc!=SQLITE_OK ){
188     fprintf(stderr, "sqlite_open_v2(%s) failed on thread %d\n",
189             zFile, p->tid);
190     exit(1);
191   }
192   sqlite3_free(zFile);
193   run_sql(p, "PRAGMA read_uncommitted=ON;");
194   sqlite3_busy_timeout(p->db, 10000);
195   run_sql(p, "PRAGMA synchronous=OFF;");
196   run_sql(p, "ATTACH 'tt4-test%d.db' AS aux1", aOrder[x][1]);
197   run_sql(p, "ATTACH 'tt4-test%d.db' AS aux2", aOrder[x][2]);
198 }
199 
200 /*
201 ** Close the worker database connection
202 */
203 static void worker_close_connection(WorkerInfo *p){
204   if( p->db ){
205     worker_trace(p, "close");
206     sqlite3_close(p->db);
207     p->db = 0;
208   }
209 }
210 
211 /*
212 ** Delete all content in the three databases associated with a
213 ** single thread.  Make this happen all in a single transaction if
214 ** inTrans is true, or separately for each database if inTrans is
215 ** false.
216 */
217 static void worker_delete_all_content(WorkerInfo *p, int inTrans){
218   if( inTrans ){
219     pthread_mutex_lock(p->pWrMutex);
220     run_sql(p, "BEGIN");
221     run_sql(p, "DELETE FROM t1 WHERE tid=%d", p->tid);
222     run_sql(p, "DELETE FROM t2 WHERE tid=%d", p->tid);
223     run_sql(p, "DELETE FROM t3 WHERE tid=%d", p->tid);
224     run_sql(p, "COMMIT");
225     pthread_mutex_unlock(p->pWrMutex);
226     p->nTest++;
227   }else{
228     pthread_mutex_lock(p->pWrMutex);
229     run_sql(p, "DELETE FROM t1 WHERE tid=%d", p->tid);
230     pthread_mutex_unlock(p->pWrMutex);
231     p->nTest++;
232     pthread_mutex_lock(p->pWrMutex);
233     run_sql(p, "DELETE FROM t2 WHERE tid=%d", p->tid);
234     pthread_mutex_unlock(p->pWrMutex);
235     p->nTest++;
236     pthread_mutex_lock(p->pWrMutex);
237     run_sql(p, "DELETE FROM t3 WHERE tid=%d", p->tid);
238     pthread_mutex_unlock(p->pWrMutex);
239     p->nTest++;
240   }
241 }
242 
243 /*
244 ** Create rows mn through mx in table iTab for the given worker
245 */
246 static void worker_add_content(WorkerInfo *p, int mn, int mx, int iTab){
247   char *zTabDef;
248   switch( iTab ){
249     case 1:  zTabDef = "t1(tid,sp,a,b,c)";  break;
250     case 2:  zTabDef = "t2(tid,sp,d,e,f)";  break;
251     case 3:  zTabDef = "t3(tid,sp,x,y,z)";  break;
252   }
253   pthread_mutex_lock(p->pWrMutex);
254   run_sql(p,
255      "WITH RECURSIVE\n"
256      " c(i) AS (VALUES(%d) UNION ALL SELECT i+1 FROM c WHERE i<%d)\n"
257      "INSERT INTO %s SELECT %d, zeroblob(3000), i, printf('%%d',i), i FROM c;",
258      mn, mx, zTabDef, p->tid
259   );
260   pthread_mutex_unlock(p->pWrMutex);
261   p->nTest++;
262 }
263 
264 /*
265 ** Set an error message on a worker
266 */
267 static void worker_error(WorkerInfo *p, const char *zFormat, ...){
268   va_list ap;
269   p->nErr++;
270   sqlite3_free(p->zMsg);
271   va_start(ap, zFormat);
272   p->zMsg = sqlite3_vmprintf(zFormat, ap);
273   va_end(ap);
274 }
275 
276 /*
277 ** Each thread runs the following function.
278 */
279 static void *worker_thread(void *pArg){
280   WorkerInfo *p = (WorkerInfo*)pArg;
281   int iOuter;
282   int i;
283   int rc;
284   sqlite3_stmt *pStmt;
285 
286   printf("worker %d startup\n", p->tid);  fflush(stdout);
287   for(iOuter=1; iOuter<=4; iOuter++){
288     worker_open_connection(p, iOuter);
289     for(i=0; i<4; i++){
290       worker_add_content(p, i*100+1, (i+1)*100, (p->tid+iOuter)%3 + 1);
291       worker_add_content(p, i*100+1, (i+1)*100, (p->tid+iOuter+1)%3 + 1);
292       worker_add_content(p, i*100+1, (i+1)*100, (p->tid+iOuter+2)%3 + 1);
293     }
294 
295     pStmt = prep_sql(p->db, "SELECT count(a) FROM t1 WHERE tid=%d", p->tid);
296     worker_trace(p, "query [%s]", sqlite3_sql(pStmt));
297     rc = sqlite3_step(pStmt);
298     if( rc!=SQLITE_ROW ){
299       worker_error(p, "Failed to step: %s", sqlite3_sql(pStmt));
300     }else if( sqlite3_column_int(pStmt, 0)!=400 ){
301       worker_error(p, "Wrong result: %d", sqlite3_column_int(pStmt,0));
302     }
303     if( p->nErr ) break;
304     sqlite3_finalize(pStmt);
305 
306     worker_delete_all_content(p, (p->tid+iOuter)%2);
307     worker_close_connection(p);
308     p->db = 0;
309   }
310   worker_close_connection(p);
311   printf("worker %d finished\n", p->tid); fflush(stdout);
312   return 0;
313 }
314 
315 int main(int argc, char **argv){
316   int nWorker = 0;         /* Number of worker threads */
317   int i;                   /* Loop counter */
318   WorkerInfo *aInfo;       /* Information for each worker */
319   unsigned wkrFlags = 0;   /* Default worker flags */
320   int nErr = 0;            /* Number of errors */
321   int nTest = 0;           /* Number of tests */
322   int rc;                  /* Return code */
323   sqlite3 *db = 0;         /* Main database connection */
324   pthread_mutex_t wrMutex; /* The write serialization mutex */
325   WorkerInfo infoTop;      /* WorkerInfo for the main thread */
326   WorkerInfo *p;           /* Pointer to infoTop */
327 
328   sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
329   for(i=1; i<argc; i++){
330     const char *z = argv[i];
331     if( z[0]=='-' ){
332       if( z[1]=='-' && z[2]!=0 ) z++;
333       if( strcmp(z,"-multithread")==0 ){
334         sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
335         wkrFlags &= ~TT4_SERIALIZED;
336       }else if( strcmp(z,"-serialized")==0 ){
337         sqlite3_config(SQLITE_CONFIG_SERIALIZED);
338         wkrFlags |= TT4_SERIALIZED;
339       }else if( strcmp(z,"-wal")==0 ){
340         wkrFlags |= TT4_WAL;
341       }else if( strcmp(z,"-trace")==0 ){
342         wkrFlags |= TT4_TRACE;
343       }else{
344         fprintf(stderr, "unknown command-line option: %s\n", argv[i]);
345         exit(1);
346       }
347     }else if( z[0]>='1' && z[0]<='9' && nWorker==0 ){
348       nWorker = atoi(z);
349       if( nWorker<2 ){
350         fprintf(stderr, "minimum of 2 threads\n");
351         exit(1);
352       }
353     }else{
354       fprintf(stderr, "extra command-line argument: \"%s\"\n", argv[i]);
355       exit(1);
356     }
357   }
358   if( nWorker==0 ){
359     fprintf(stderr,
360        "usage:  %s ?OPTIONS? N\n"
361        "N is the number of threads and must be at least 2.\n"
362        "Options:\n"
363        "  --serialized\n"
364        "  --multithread\n"
365        ,argv[0]
366     );
367     exit(1);
368   }
369   if( !sqlite3_threadsafe() ){
370     fprintf(stderr, "requires a threadsafe build of SQLite\n");
371     exit(1);
372   }
373   sqlite3_initialize();
374   sqlite3_enable_shared_cache(1);
375   pthread_mutex_init(&wrMutex, 0);
376 
377   /* Initialize the test database files */
378   (void)unlink("tt4-test1.db");
379   (void)unlink("tt4-test2.db");
380   (void)unlink("tt4-test3.db");
381   rc = sqlite3_open("tt4-test1.db", &db);
382   if( rc!=SQLITE_OK ){
383     fprintf(stderr, "Unable to open test database: tt4-test2.db\n");
384     exit(1);
385   }
386   memset(&infoTop, 0, sizeof(infoTop));
387   infoTop.db = db;
388   infoTop.wkrFlags = wkrFlags;
389   p = &infoTop;
390   if( wkrFlags & TT4_WAL ){
391     run_sql(p, "PRAGMA journal_mode=WAL");
392   }
393   run_sql(p, "PRAGMA synchronous=OFF");
394   run_sql(p, "CREATE TABLE IF NOT EXISTS t1(tid INTEGER, sp, a, b, c)");
395   run_sql(p, "CREATE INDEX t1tid ON t1(tid)");
396   run_sql(p, "CREATE INDEX t1ab ON t1(a,b)");
397   run_sql(p, "ATTACH 'tt4-test2.db' AS 'test2'");
398   run_sql(p, "CREATE TABLE IF NOT EXISTS test2.t2(tid INTEGER, sp, d, e, f)");
399   run_sql(p, "CREATE INDEX test2.t2tid ON t2(tid)");
400   run_sql(p, "CREATE INDEX test2.t2de ON t2(d,e)");
401   run_sql(p, "ATTACH 'tt4-test3.db' AS 'test3'");
402   run_sql(p, "CREATE TABLE IF NOT EXISTS test3.t3(tid INTEGER, sp, x, y, z)");
403   run_sql(p, "CREATE INDEX test3.t3tid ON t3(tid)");
404   run_sql(p, "CREATE INDEX test3.t3xy ON t3(x,y)");
405   aInfo = safe_malloc( sizeof(*aInfo)*nWorker );
406   memset(aInfo, 0, sizeof(*aInfo)*nWorker);
407   for(i=0; i<nWorker; i++){
408     aInfo[i].tid = i+1;
409     aInfo[i].wkrFlags = wkrFlags;
410     aInfo[i].mainDb = db;
411     aInfo[i].pWrMutex = &wrMutex;
412     rc = pthread_create(&aInfo[i].id, 0, worker_thread, &aInfo[i]);
413     if( rc!=0 ){
414       fprintf(stderr, "thread creation failed for thread %d\n", i+1);
415       exit(1);
416     }
417     sched_yield();
418   }
419   for(i=0; i<nWorker; i++){
420     pthread_join(aInfo[i].id, 0);
421     printf("Joined thread %d: %d errors in %d tests",
422            aInfo[i].tid, aInfo[i].nErr, aInfo[i].nTest);
423     if( aInfo[i].zMsg ){
424       printf(": %s\n", aInfo[i].zMsg);
425     }else{
426       printf("\n");
427     }
428     nErr += aInfo[i].nErr;
429     nTest += aInfo[i].nTest;
430     fflush(stdout);
431   }
432   sqlite3_close(db);
433   sqlite3_free(aInfo);
434   printf("Total %d errors in %d tests\n", nErr, nTest);
435   return nErr;
436 }
437