xref: /sqlite-3.40.0/test/threadtest4.c (revision ef15c6e9)
1 /*
2 ** 2014-12-11
3 **
4 ** The author disclaims copyright to this source code.  In place of
5 ** a legal notice, here is a blessing:
6 **
7 **    May you do good and not evil.
8 **    May you find forgiveness for yourself and forgive others.
9 **    May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 ** This file implements a simple standalone program used to stress the
13 ** SQLite library when accessing the same set of databases simultaneously
14 ** from multiple threads in shared-cache mode.
15 **
16 ** This test program runs on unix-like systems only.  It uses pthreads.
17 ** To compile:
18 **
19 **     gcc -g -Wall -I. threadtest4.c sqlite3.c -ldl -lpthread
20 **
21 ** To run:
22 **
23 **     ./a.out 10
24 **
25 ** The argument is the number of threads.  There are also options, such
26 ** as -wal and -multithread and -serialized.
27 **
28 ** Consider also compiling with clang instead of gcc and adding the
29 ** -fsanitize=thread option.
30 */
31 #include "sqlite3.h"
32 #include <pthread.h>
33 #include <sched.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <unistd.h>
38 #include <stdarg.h>
39 
40 /*
41 ** An instance of the following structure is passed into each worker
42 ** thread.
43 */
44 typedef struct WorkerInfo WorkerInfo;
45 struct WorkerInfo {
46   int tid;                    /* Thread ID */
47   int nWorker;                /* Total number of workers */
48   unsigned wkrFlags;          /* Flags */
49   sqlite3 *mainDb;            /* Database connection of the main thread */
50   sqlite3 *db;                /* Database connection of this thread */
51   int nErr;                   /* Number of errors seen by this thread */
52   int nTest;                  /* Number of tests run by this thread */
53   char *zMsg;                 /* Message returned by this thread */
54   pthread_t id;               /* Thread id */
55   pthread_mutex_t *pWrMutex;  /* Hold this mutex while writing */
56 };
57 
58 /*
59 ** Allowed values for WorkerInfo.wkrFlags
60 */
61 #define TT4_SERIALIZED    0x0000001   /* The --serialized option is used */
62 #define TT4_WAL           0x0000002   /* WAL mode in use */
63 #define TT4_TRACE         0x0000004   /* Trace activity */
64 
65 
66 /*
67 ** Report an OOM error and die if the argument is NULL
68 */
69 static void check_oom(void *x){
70   if( x==0 ){
71     fprintf(stderr, "out of memory\n");
72     exit(1);
73   }
74 }
75 
76 /*
77 ** Allocate memory.  If the allocation fails, print an error message and
78 ** kill the process.
79 */
80 static void *safe_malloc(int sz){
81   void *x = sqlite3_malloc(sz>0?sz:1);
82   check_oom(x);
83   return x;
84 }
85 
86 /*
87 ** Print a trace message for a worker
88 */
89 static void worker_trace(WorkerInfo *p, const char *zFormat, ...){
90   va_list ap;
91   char *zMsg;
92   if( (p->wkrFlags & TT4_TRACE)==0 ) return;
93   va_start(ap, zFormat);
94   zMsg = sqlite3_vmprintf(zFormat, ap);
95   check_oom(zMsg);
96   va_end(ap);
97   fprintf(stderr, "TRACE(%02d): %s\n", p->tid, zMsg);
98   sqlite3_free(zMsg);
99 }
100 
101 /*
102 ** Prepare a single SQL query
103 */
104 static sqlite3_stmt *prep_sql(sqlite3 *db, const char *zFormat, ...){
105   va_list ap;
106   char *zSql;
107   int rc;
108   sqlite3_stmt *pStmt = 0;
109 
110   va_start(ap, zFormat);
111   zSql = sqlite3_vmprintf(zFormat, ap);
112   va_end(ap);
113   check_oom(zSql);
114   rc = sqlite3_prepare_v2(db, zSql, -1, &pStmt, 0);
115   if( rc!=SQLITE_OK ){
116     fprintf(stderr, "SQL error (%d,%d): %s\nWhile preparing: [%s]\n",
117             rc, sqlite3_extended_errcode(db), sqlite3_errmsg(db), zSql);
118     exit(1);
119   }
120   sqlite3_free(zSql);
121   return pStmt;
122 }
123 
124 /*
125 ** Run a SQL statements.  Panic if unable.
126 */
127 static void run_sql(WorkerInfo *p, const char *zFormat, ...){
128   va_list ap;
129   char *zSql;
130   int rc;
131   sqlite3_stmt *pStmt = 0;
132   int nRetry = 0;
133 
134   va_start(ap, zFormat);
135   zSql = sqlite3_vmprintf(zFormat, ap);
136   va_end(ap);
137   check_oom(zSql);
138   rc = sqlite3_prepare_v2(p->db, zSql, -1, &pStmt, 0);
139   if( rc!=SQLITE_OK ){
140     fprintf(stderr, "SQL error (%d,%d): %s\nWhile preparing: [%s]\n",
141             rc, sqlite3_extended_errcode(p->db), sqlite3_errmsg(p->db), zSql);
142     exit(1);
143   }
144   worker_trace(p, "running [%s]", zSql);
145   while( (rc = sqlite3_step(pStmt))!=SQLITE_DONE ){
146     if( (rc&0xff)==SQLITE_BUSY || (rc&0xff)==SQLITE_LOCKED ){
147       sqlite3_reset(pStmt);
148       nRetry++;
149       if( nRetry<10 ){
150         worker_trace(p, "retry %d for [%s]", nRetry, zSql);
151         sched_yield();
152         continue;
153       }else{
154         fprintf(stderr, "Deadlock in thread %d while running [%s]\n",
155                 p->tid, zSql);
156         exit(1);
157       }
158     }
159     if( rc!=SQLITE_ROW ){
160       fprintf(stderr, "SQL error (%d,%d): %s\nWhile running [%s]\n",
161               rc, sqlite3_extended_errcode(p->db), sqlite3_errmsg(p->db), zSql);
162       exit(1);
163     }
164   }
165   sqlite3_free(zSql);
166   sqlite3_finalize(pStmt);
167 }
168 
169 
170 /*
171 ** Open the database connection for WorkerInfo.  The order in which
172 ** the files are opened is a function of the tid value.
173 */
174 static void worker_open_connection(WorkerInfo *p, int iCnt){
175   char *zFile;
176   int x;
177   int rc;
178   static const unsigned char aOrder[6][3] = {
179     { 1, 2, 3},
180     { 1, 3, 2},
181     { 2, 1, 3},
182     { 2, 3, 1},
183     { 3, 1, 2},
184     { 3, 2, 1}
185   };
186   x = (p->tid + iCnt) % 6;
187   zFile = sqlite3_mprintf("tt4-test%d.db", aOrder[x][0]);
188   check_oom(zFile);
189   worker_trace(p, "open %s", zFile);
190   rc = sqlite3_open_v2(zFile, &p->db,
191                        SQLITE_OPEN_READWRITE|SQLITE_OPEN_SHAREDCACHE, 0);
192   if( rc!=SQLITE_OK ){
193     fprintf(stderr, "sqlite_open_v2(%s) failed on thread %d\n",
194             zFile, p->tid);
195     exit(1);
196   }
197   sqlite3_free(zFile);
198   run_sql(p, "PRAGMA read_uncommitted=ON;");
199   sqlite3_busy_timeout(p->db, 10000);
200   run_sql(p, "PRAGMA synchronous=OFF;");
201   run_sql(p, "ATTACH 'tt4-test%d.db' AS aux1", aOrder[x][1]);
202   run_sql(p, "ATTACH 'tt4-test%d.db' AS aux2", aOrder[x][2]);
203 }
204 
205 /*
206 ** Close the worker database connection
207 */
208 static void worker_close_connection(WorkerInfo *p){
209   if( p->db ){
210     worker_trace(p, "close");
211     sqlite3_close(p->db);
212     p->db = 0;
213   }
214 }
215 
216 /*
217 ** Delete all content in the three databases associated with a
218 ** single thread.  Make this happen all in a single transaction if
219 ** inTrans is true, or separately for each database if inTrans is
220 ** false.
221 */
222 static void worker_delete_all_content(WorkerInfo *p, int inTrans){
223   if( inTrans ){
224     pthread_mutex_lock(p->pWrMutex);
225     run_sql(p, "BEGIN");
226     run_sql(p, "DELETE FROM t1 WHERE tid=%d", p->tid);
227     run_sql(p, "DELETE FROM t2 WHERE tid=%d", p->tid);
228     run_sql(p, "DELETE FROM t3 WHERE tid=%d", p->tid);
229     run_sql(p, "COMMIT");
230     pthread_mutex_unlock(p->pWrMutex);
231     p->nTest++;
232   }else{
233     pthread_mutex_lock(p->pWrMutex);
234     run_sql(p, "DELETE FROM t1 WHERE tid=%d", p->tid);
235     pthread_mutex_unlock(p->pWrMutex);
236     p->nTest++;
237     pthread_mutex_lock(p->pWrMutex);
238     run_sql(p, "DELETE FROM t2 WHERE tid=%d", p->tid);
239     pthread_mutex_unlock(p->pWrMutex);
240     p->nTest++;
241     pthread_mutex_lock(p->pWrMutex);
242     run_sql(p, "DELETE FROM t3 WHERE tid=%d", p->tid);
243     pthread_mutex_unlock(p->pWrMutex);
244     p->nTest++;
245   }
246 }
247 
248 /*
249 ** Create rows mn through mx in table iTab for the given worker
250 */
251 static void worker_add_content(WorkerInfo *p, int mn, int mx, int iTab){
252   char *zTabDef;
253   switch( iTab ){
254     case 1:  zTabDef = "t1(tid,sp,a,b,c)";  break;
255     case 2:  zTabDef = "t2(tid,sp,d,e,f)";  break;
256     case 3:  zTabDef = "t3(tid,sp,x,y,z)";  break;
257   }
258   pthread_mutex_lock(p->pWrMutex);
259   run_sql(p,
260      "WITH RECURSIVE\n"
261      " c(i) AS (VALUES(%d) UNION ALL SELECT i+1 FROM c WHERE i<%d)\n"
262      "INSERT INTO %s SELECT %d, zeroblob(3000), i, printf('%%d',i), i FROM c;",
263      mn, mx, zTabDef, p->tid
264   );
265   pthread_mutex_unlock(p->pWrMutex);
266   p->nTest++;
267 }
268 
269 /*
270 ** Set an error message on a worker
271 */
272 static void worker_error(WorkerInfo *p, const char *zFormat, ...){
273   va_list ap;
274   p->nErr++;
275   sqlite3_free(p->zMsg);
276   va_start(ap, zFormat);
277   p->zMsg = sqlite3_vmprintf(zFormat, ap);
278   va_end(ap);
279 }
280 
281 /*
282 ** Each thread runs the following function.
283 */
284 static void *worker_thread(void *pArg){
285   WorkerInfo *p = (WorkerInfo*)pArg;
286   int iOuter;
287   int i;
288   int rc;
289   sqlite3_stmt *pStmt;
290 
291   printf("worker %d startup\n", p->tid);  fflush(stdout);
292   for(iOuter=1; iOuter<=p->nWorker; iOuter++){
293     worker_open_connection(p, iOuter);
294     for(i=0; i<4; i++){
295       worker_add_content(p, i*100+1, (i+1)*100, (p->tid+iOuter)%3 + 1);
296       worker_add_content(p, i*100+1, (i+1)*100, (p->tid+iOuter+1)%3 + 1);
297       worker_add_content(p, i*100+1, (i+1)*100, (p->tid+iOuter+2)%3 + 1);
298     }
299 
300     pStmt = prep_sql(p->db, "SELECT count(a) FROM t1 WHERE tid=%d", p->tid);
301     worker_trace(p, "query [%s]", sqlite3_sql(pStmt));
302     rc = sqlite3_step(pStmt);
303     if( rc!=SQLITE_ROW ){
304       worker_error(p, "Failed to step: %s", sqlite3_sql(pStmt));
305     }else if( sqlite3_column_int(pStmt, 0)!=400 ){
306       worker_error(p, "Wrong result: %d", sqlite3_column_int(pStmt,0));
307     }
308     if( p->nErr ) break;
309     sqlite3_finalize(pStmt);
310 
311     if( ((iOuter+p->tid)%3)==0 ){
312       sqlite3_db_release_memory(p->db);
313       p->nTest++;
314     }
315 
316     if( iOuter==p->tid ){
317       pthread_mutex_lock(p->pWrMutex);
318       run_sql(p, "VACUUM");
319       pthread_mutex_unlock(p->pWrMutex);
320     }
321 
322     worker_delete_all_content(p, (p->tid+iOuter)%2);
323     worker_close_connection(p);
324     p->db = 0;
325   }
326   worker_close_connection(p);
327   printf("worker %d finished\n", p->tid); fflush(stdout);
328   return 0;
329 }
330 
331 int main(int argc, char **argv){
332   int nWorker = 0;         /* Number of worker threads */
333   int i;                   /* Loop counter */
334   WorkerInfo *aInfo;       /* Information for each worker */
335   unsigned wkrFlags = 0;   /* Default worker flags */
336   int nErr = 0;            /* Number of errors */
337   int nTest = 0;           /* Number of tests */
338   int rc;                  /* Return code */
339   sqlite3 *db = 0;         /* Main database connection */
340   pthread_mutex_t wrMutex; /* The write serialization mutex */
341   WorkerInfo infoTop;      /* WorkerInfo for the main thread */
342   WorkerInfo *p;           /* Pointer to infoTop */
343 
344   sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
345   for(i=1; i<argc; i++){
346     const char *z = argv[i];
347     if( z[0]=='-' ){
348       if( z[1]=='-' && z[2]!=0 ) z++;
349       if( strcmp(z,"-multithread")==0 ){
350         sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
351         wkrFlags &= ~TT4_SERIALIZED;
352       }else if( strcmp(z,"-serialized")==0 ){
353         sqlite3_config(SQLITE_CONFIG_SERIALIZED);
354         wkrFlags |= TT4_SERIALIZED;
355       }else if( strcmp(z,"-wal")==0 ){
356         wkrFlags |= TT4_WAL;
357       }else if( strcmp(z,"-trace")==0 ){
358         wkrFlags |= TT4_TRACE;
359       }else{
360         fprintf(stderr, "unknown command-line option: %s\n", argv[i]);
361         exit(1);
362       }
363     }else if( z[0]>='1' && z[0]<='9' && nWorker==0 ){
364       nWorker = atoi(z);
365       if( nWorker<2 ){
366         fprintf(stderr, "minimum of 2 threads\n");
367         exit(1);
368       }
369     }else{
370       fprintf(stderr, "extra command-line argument: \"%s\"\n", argv[i]);
371       exit(1);
372     }
373   }
374   if( nWorker==0 ){
375     fprintf(stderr,
376        "usage:  %s ?OPTIONS? N\n"
377        "N is the number of threads and must be at least 2.\n"
378        "Options:\n"
379        "  --serialized\n"
380        "  --multithread\n"
381        "  --wal\n"
382        "  --trace\n"
383        ,argv[0]
384     );
385     exit(1);
386   }
387   if( !sqlite3_threadsafe() ){
388     fprintf(stderr, "requires a threadsafe build of SQLite\n");
389     exit(1);
390   }
391   sqlite3_initialize();
392   sqlite3_enable_shared_cache(1);
393   pthread_mutex_init(&wrMutex, 0);
394 
395   /* Initialize the test database files */
396   (void)unlink("tt4-test1.db");
397   (void)unlink("tt4-test2.db");
398   (void)unlink("tt4-test3.db");
399   rc = sqlite3_open("tt4-test1.db", &db);
400   if( rc!=SQLITE_OK ){
401     fprintf(stderr, "Unable to open test database: tt4-test2.db\n");
402     exit(1);
403   }
404   memset(&infoTop, 0, sizeof(infoTop));
405   infoTop.db = db;
406   infoTop.wkrFlags = wkrFlags;
407   p = &infoTop;
408   if( wkrFlags & TT4_WAL ){
409     run_sql(p, "PRAGMA journal_mode=WAL");
410   }
411   run_sql(p, "PRAGMA synchronous=OFF");
412   run_sql(p, "CREATE TABLE IF NOT EXISTS t1(tid INTEGER, sp, a, b, c)");
413   run_sql(p, "CREATE INDEX t1tid ON t1(tid)");
414   run_sql(p, "CREATE INDEX t1ab ON t1(a,b)");
415   run_sql(p, "ATTACH 'tt4-test2.db' AS 'test2'");
416   run_sql(p, "CREATE TABLE IF NOT EXISTS test2.t2(tid INTEGER, sp, d, e, f)");
417   run_sql(p, "CREATE INDEX test2.t2tid ON t2(tid)");
418   run_sql(p, "CREATE INDEX test2.t2de ON t2(d,e)");
419   run_sql(p, "ATTACH 'tt4-test3.db' AS 'test3'");
420   run_sql(p, "CREATE TABLE IF NOT EXISTS test3.t3(tid INTEGER, sp, x, y, z)");
421   run_sql(p, "CREATE INDEX test3.t3tid ON t3(tid)");
422   run_sql(p, "CREATE INDEX test3.t3xy ON t3(x,y)");
423   aInfo = safe_malloc( sizeof(*aInfo)*nWorker );
424   memset(aInfo, 0, sizeof(*aInfo)*nWorker);
425   for(i=0; i<nWorker; i++){
426     aInfo[i].tid = i+1;
427     aInfo[i].nWorker = nWorker;
428     aInfo[i].wkrFlags = wkrFlags;
429     aInfo[i].mainDb = db;
430     aInfo[i].pWrMutex = &wrMutex;
431     rc = pthread_create(&aInfo[i].id, 0, worker_thread, &aInfo[i]);
432     if( rc!=0 ){
433       fprintf(stderr, "thread creation failed for thread %d\n", i+1);
434       exit(1);
435     }
436     sched_yield();
437   }
438   for(i=0; i<nWorker; i++){
439     pthread_join(aInfo[i].id, 0);
440     printf("Joined thread %d: %d errors in %d tests",
441            aInfo[i].tid, aInfo[i].nErr, aInfo[i].nTest);
442     if( aInfo[i].zMsg ){
443       printf(": %s\n", aInfo[i].zMsg);
444     }else{
445       printf("\n");
446     }
447     nErr += aInfo[i].nErr;
448     nTest += aInfo[i].nTest;
449     fflush(stdout);
450   }
451   sqlite3_close(db);
452   sqlite3_free(aInfo);
453   printf("Total %d errors in %d tests\n", nErr, nTest);
454   return nErr;
455 }
456