1 
2 #include "lsmtest_tdb.h"
3 #include "lsm.h"
4 #include "lsmtest.h"
5 
6 #include <stdlib.h>
7 #include <string.h>
8 #include <assert.h>
9 #ifndef _WIN32
10 # include <unistd.h>
11 #endif
12 #include <stdio.h>
13 
14 #ifndef _WIN32
15 # include <sys/time.h>
16 #endif
17 
18 typedef struct LsmDb LsmDb;
19 typedef struct LsmWorker LsmWorker;
20 typedef struct LsmFile LsmFile;
21 
22 #define LSMTEST_DFLT_MT_MAX_CKPT (8*1024)
23 #define LSMTEST_DFLT_MT_MIN_CKPT (2*1024)
24 
25 #ifdef LSM_MUTEX_PTHREADS
26 #include <pthread.h>
27 
28 #define LSMTEST_THREAD_CKPT      1
29 #define LSMTEST_THREAD_WORKER    2
30 #define LSMTEST_THREAD_WORKER_AC 3
31 
32 /*
33 ** There are several different types of worker threads that run in different
34 ** test configurations, depending on the value of LsmWorker.eType.
35 **
36 **   1. Checkpointer.
37 **   2. Worker with auto-checkpoint.
38 **   3. Worker without auto-checkpoint.
39 */
40 struct LsmWorker {
41   LsmDb *pDb;                     /* Main database structure */
42   lsm_db *pWorker;                /* Worker database handle */
43   pthread_t worker_thread;        /* Worker thread */
44   pthread_cond_t worker_cond;     /* Condition var the worker waits on */
45   pthread_mutex_t worker_mutex;   /* Mutex used with worker_cond */
46   int bDoWork;                    /* Set to true by client when there is work */
47   int worker_rc;                  /* Store error code here */
48   int eType;                      /* LSMTEST_THREAD_XXX constant */
49   int bBlock;
50 };
51 #else
52 struct LsmWorker { int worker_rc; int bBlock; };
53 #endif
54 
55 static void mt_shutdown(LsmDb *);
56 
tdb_lsm_env(void)57 lsm_env *tdb_lsm_env(void){
58   static int bInit = 0;
59   static lsm_env env;
60   if( bInit==0 ){
61     memcpy(&env, lsm_default_env(), sizeof(env));
62     bInit = 1;
63   }
64   return &env;
65 }
66 
67 typedef struct FileSector FileSector;
68 typedef struct FileData FileData;
69 
70 struct FileSector {
71   u8 *aOld;                       /* Old data for this sector */
72 };
73 
74 struct FileData {
75   int nSector;                    /* Allocated size of apSector[] array */
76   FileSector *aSector;            /* Array of file sectors */
77 };
78 
79 /*
80 ** bPrepareCrash:
81 **   If non-zero, the file wrappers maintain enough in-memory data to
82 **   simulate the effect of a power-failure on the file-system (i.e. that
83 **   unsynced sectors may be written, not written, or overwritten with
84 **   arbitrary data when the crash occurs).
85 **
86 ** bCrashed:
87 **   Set to true after a crash is simulated. Once this variable is true, all
88 **   VFS methods other than xClose() return LSM_IOERR as soon as they are
89 **   called (without affecting the contents of the file-system).
90 **
91 ** env:
92 **   The environment object used by all lsm_db* handles opened by this
93 **   object (i.e. LsmDb.db plus any worker connections). Variable env.pVfsCtx
94 **   always points to the containing LsmDb structure.
95 */
96 struct LsmDb {
97   TestDb base;                    /* Base class - methods table */
98   lsm_env env;                    /* Environment used by connection db */
99   char *zName;                    /* Database file name */
100   lsm_db *db;                     /* LSM database handle */
101 
102   lsm_cursor *pCsr;               /* Cursor held open during read transaction */
103   void *pBuf;                     /* Buffer for tdb_fetch() output */
104   int nBuf;                       /* Allocated (not used) size of pBuf */
105 
106   /* Crash testing related state */
107   int bCrashed;                   /* True once a crash has occurred */
108   int nAutoCrash;                 /* Number of syncs until a crash */
109   int bPrepareCrash;              /* True to store writes in memory */
110 
111   /* Unsynced data (while crash testing) */
112   int szSector;                   /* Assumed size of disk sectors (512B) */
113   FileData aFile[2];              /* Database and log file data */
114 
115   /* Other test instrumentation */
116   int bNoRecovery;                /* If true, assume DMS2 is locked */
117 
118   /* Work hook redirection */
119   void (*xWork)(lsm_db *, void *);
120   void *pWorkCtx;
121 
122   /* IO logging hook */
123   void (*xWriteHook)(void *, int, lsm_i64, int, int);
124   void *pWriteCtx;
125 
126   /* Worker threads (for lsm_mt) */
127   int nMtMinCkpt;
128   int nMtMaxCkpt;
129   int eMode;
130   int nWorker;
131   LsmWorker *aWorker;
132 };
133 
134 #define LSMTEST_MODE_SINGLETHREAD    1
135 #define LSMTEST_MODE_BACKGROUND_CKPT 2
136 #define LSMTEST_MODE_BACKGROUND_WORK 3
137 #define LSMTEST_MODE_BACKGROUND_BOTH 4
138 
139 /*************************************************************************
140 **************************************************************************
141 ** Begin test VFS code.
142 */
143 
144 struct LsmFile {
145   lsm_file *pReal;                /* Real underlying file */
146   int bLog;                       /* True for log file. False for db file */
147   LsmDb *pDb;                     /* Database handle that uses this file */
148 };
149 
testEnvFullpath(lsm_env * pEnv,const char * zFile,char * zOut,int * pnOut)150 static int testEnvFullpath(
151   lsm_env *pEnv,                  /* Environment for current LsmDb */
152   const char *zFile,              /* Relative path name */
153   char *zOut,                     /* Output buffer */
154   int *pnOut                      /* IN/OUT: Size of output buffer */
155 ){
156   lsm_env *pRealEnv = tdb_lsm_env();
157   return pRealEnv->xFullpath(pRealEnv, zFile, zOut, pnOut);
158 }
159 
testEnvOpen(lsm_env * pEnv,const char * zFile,int flags,lsm_file ** ppFile)160 static int testEnvOpen(
161   lsm_env *pEnv,                  /* Environment for current LsmDb */
162   const char *zFile,              /* Name of file to open */
163   int flags,
164   lsm_file **ppFile               /* OUT: New file handle object */
165 ){
166   lsm_env *pRealEnv = tdb_lsm_env();
167   LsmDb *pDb = (LsmDb *)pEnv->pVfsCtx;
168   int rc;                         /* Return Code */
169   LsmFile *pRet;                  /* The new file handle */
170   int nFile;                      /* Length of string zFile in bytes */
171 
172   nFile = strlen(zFile);
173   pRet = (LsmFile *)testMalloc(sizeof(LsmFile));
174   pRet->pDb = pDb;
175   pRet->bLog = (nFile > 4 && 0==memcmp("-log", &zFile[nFile-4], 4));
176 
177   rc = pRealEnv->xOpen(pRealEnv, zFile, flags, &pRet->pReal);
178   if( rc!=LSM_OK ){
179     testFree(pRet);
180     pRet = 0;
181   }
182 
183   *ppFile = (lsm_file *)pRet;
184   return rc;
185 }
186 
testEnvRead(lsm_file * pFile,lsm_i64 iOff,void * pData,int nData)187 static int testEnvRead(lsm_file *pFile, lsm_i64 iOff, void *pData, int nData){
188   lsm_env *pRealEnv = tdb_lsm_env();
189   LsmFile *p = (LsmFile *)pFile;
190   if( p->pDb->bCrashed ) return LSM_IOERR;
191   return pRealEnv->xRead(p->pReal, iOff, pData, nData);
192 }
193 
testEnvWrite(lsm_file * pFile,lsm_i64 iOff,void * pData,int nData)194 static int testEnvWrite(lsm_file *pFile, lsm_i64 iOff, void *pData, int nData){
195   lsm_env *pRealEnv = tdb_lsm_env();
196   LsmFile *p = (LsmFile *)pFile;
197   LsmDb *pDb = p->pDb;
198 
199   if( pDb->bCrashed ) return LSM_IOERR;
200 
201   if( pDb->bPrepareCrash ){
202     FileData *pData2 = &pDb->aFile[p->bLog];
203     int iFirst;
204     int iLast;
205     int iSector;
206 
207     iFirst = (int)(iOff / pDb->szSector);
208     iLast =  (int)((iOff + nData - 1) / pDb->szSector);
209 
210     if( pData2->nSector<(iLast+1) ){
211       int nNew = ( ((iLast + 1) + 63) / 64 ) * 64;
212       assert( nNew>iLast );
213       pData2->aSector = (FileSector *)testRealloc(
214           pData2->aSector, nNew*sizeof(FileSector)
215       );
216       memset(&pData2->aSector[pData2->nSector],
217           0, (nNew - pData2->nSector) * sizeof(FileSector)
218       );
219       pData2->nSector = nNew;
220     }
221 
222     for(iSector=iFirst; iSector<=iLast; iSector++){
223       if( pData2->aSector[iSector].aOld==0 ){
224         u8 *aOld = (u8 *)testMalloc(pDb->szSector);
225         pRealEnv->xRead(
226             p->pReal, (lsm_i64)iSector*pDb->szSector, aOld, pDb->szSector
227         );
228         pData2->aSector[iSector].aOld = aOld;
229       }
230     }
231   }
232 
233   if( pDb->xWriteHook ){
234     int rc;
235     int nUs;
236     struct timeval t1;
237     struct timeval t2;
238 
239     gettimeofday(&t1, 0);
240     assert( nData>0 );
241     rc = pRealEnv->xWrite(p->pReal, iOff, pData, nData);
242     gettimeofday(&t2, 0);
243 
244     nUs = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec);
245     pDb->xWriteHook(pDb->pWriteCtx, p->bLog, iOff, nData, nUs);
246     return rc;
247   }
248 
249   return pRealEnv->xWrite(p->pReal, iOff, pData, nData);
250 }
251 
252 static void doSystemCrash(LsmDb *pDb);
253 
testEnvSync(lsm_file * pFile)254 static int testEnvSync(lsm_file *pFile){
255   lsm_env *pRealEnv = tdb_lsm_env();
256   LsmFile *p = (LsmFile *)pFile;
257   LsmDb *pDb = p->pDb;
258   FileData *pData = &pDb->aFile[p->bLog];
259   int i;
260 
261   if( pDb->bCrashed ) return LSM_IOERR;
262 
263   if( pDb->nAutoCrash ){
264     pDb->nAutoCrash--;
265     if( pDb->nAutoCrash==0 ){
266       doSystemCrash(pDb);
267       pDb->bCrashed = 1;
268       return LSM_IOERR;
269     }
270   }
271 
272   if( pDb->bPrepareCrash ){
273     for(i=0; i<pData->nSector; i++){
274       testFree(pData->aSector[i].aOld);
275       pData->aSector[i].aOld = 0;
276     }
277   }
278 
279   if( pDb->xWriteHook ){
280     int rc;
281     int nUs;
282     struct timeval t1;
283     struct timeval t2;
284 
285     gettimeofday(&t1, 0);
286     rc = pRealEnv->xSync(p->pReal);
287     gettimeofday(&t2, 0);
288 
289     nUs = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec);
290     pDb->xWriteHook(pDb->pWriteCtx, p->bLog, 0, 0, nUs);
291     return rc;
292   }
293 
294   return pRealEnv->xSync(p->pReal);
295 }
296 
testEnvTruncate(lsm_file * pFile,lsm_i64 iOff)297 static int testEnvTruncate(lsm_file *pFile, lsm_i64 iOff){
298   lsm_env *pRealEnv = tdb_lsm_env();
299   LsmFile *p = (LsmFile *)pFile;
300   if( p->pDb->bCrashed ) return LSM_IOERR;
301   return pRealEnv->xTruncate(p->pReal, iOff);
302 }
303 
testEnvSectorSize(lsm_file * pFile)304 static int testEnvSectorSize(lsm_file *pFile){
305   lsm_env *pRealEnv = tdb_lsm_env();
306   LsmFile *p = (LsmFile *)pFile;
307   return pRealEnv->xSectorSize(p->pReal);
308 }
309 
testEnvRemap(lsm_file * pFile,lsm_i64 iMin,void ** ppOut,lsm_i64 * pnOut)310 static int testEnvRemap(
311   lsm_file *pFile,
312   lsm_i64 iMin,
313   void **ppOut,
314   lsm_i64 *pnOut
315 ){
316   lsm_env *pRealEnv = tdb_lsm_env();
317   LsmFile *p = (LsmFile *)pFile;
318   return pRealEnv->xRemap(p->pReal, iMin, ppOut, pnOut);
319 }
320 
testEnvFileid(lsm_file * pFile,void * ppOut,int * pnOut)321 static int testEnvFileid(
322   lsm_file *pFile,
323   void *ppOut,
324   int *pnOut
325 ){
326   lsm_env *pRealEnv = tdb_lsm_env();
327   LsmFile *p = (LsmFile *)pFile;
328   return pRealEnv->xFileid(p->pReal, ppOut, pnOut);
329 }
330 
testEnvClose(lsm_file * pFile)331 static int testEnvClose(lsm_file *pFile){
332   lsm_env *pRealEnv = tdb_lsm_env();
333   LsmFile *p = (LsmFile *)pFile;
334 
335   pRealEnv->xClose(p->pReal);
336   testFree(p);
337   return LSM_OK;
338 }
339 
testEnvUnlink(lsm_env * pEnv,const char * zFile)340 static int testEnvUnlink(lsm_env *pEnv, const char *zFile){
341   lsm_env *pRealEnv = tdb_lsm_env();
342   unused_parameter(pEnv);
343   return pRealEnv->xUnlink(pRealEnv, zFile);
344 }
345 
testEnvLock(lsm_file * pFile,int iLock,int eType)346 static int testEnvLock(lsm_file *pFile, int iLock, int eType){
347   LsmFile *p = (LsmFile *)pFile;
348   lsm_env *pRealEnv = tdb_lsm_env();
349 
350   if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){
351     return LSM_BUSY;
352   }
353   return pRealEnv->xLock(p->pReal, iLock, eType);
354 }
355 
testEnvTestLock(lsm_file * pFile,int iLock,int nLock,int eType)356 static int testEnvTestLock(lsm_file *pFile, int iLock, int nLock, int eType){
357   LsmFile *p = (LsmFile *)pFile;
358   lsm_env *pRealEnv = tdb_lsm_env();
359 
360   if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){
361     return LSM_BUSY;
362   }
363   return pRealEnv->xTestLock(p->pReal, iLock, nLock, eType);
364 }
365 
testEnvShmMap(lsm_file * pFile,int iRegion,int sz,void ** pp)366 static int testEnvShmMap(lsm_file *pFile, int iRegion, int sz, void **pp){
367   LsmFile *p = (LsmFile *)pFile;
368   lsm_env *pRealEnv = tdb_lsm_env();
369   return pRealEnv->xShmMap(p->pReal, iRegion, sz, pp);
370 }
371 
testEnvShmBarrier(void)372 static void testEnvShmBarrier(void){
373 }
374 
testEnvShmUnmap(lsm_file * pFile,int bDel)375 static int testEnvShmUnmap(lsm_file *pFile, int bDel){
376   LsmFile *p = (LsmFile *)pFile;
377   lsm_env *pRealEnv = tdb_lsm_env();
378   return pRealEnv->xShmUnmap(p->pReal, bDel);
379 }
380 
testEnvSleep(lsm_env * pEnv,int us)381 static int testEnvSleep(lsm_env *pEnv, int us){
382   lsm_env *pRealEnv = tdb_lsm_env();
383   return pRealEnv->xSleep(pRealEnv, us);
384 }
385 
doSystemCrash(LsmDb * pDb)386 static void doSystemCrash(LsmDb *pDb){
387   lsm_env *pEnv = tdb_lsm_env();
388   int iFile;
389   int iSeed = pDb->aFile[0].nSector + pDb->aFile[1].nSector;
390 
391   char *zFile = pDb->zName;
392   char *zFree = 0;
393 
394   for(iFile=0; iFile<2; iFile++){
395     lsm_file *pFile = 0;
396     int i;
397 
398     pEnv->xOpen(pEnv, zFile, 0, &pFile);
399     for(i=0; i<pDb->aFile[iFile].nSector; i++){
400       u8 *aOld = pDb->aFile[iFile].aSector[i].aOld;
401       if( aOld ){
402         int iOpt = testPrngValue(iSeed++) % 3;
403         switch( iOpt ){
404           case 0:
405             break;
406 
407           case 1:
408             testPrngArray(iSeed++, (u32 *)aOld, pDb->szSector/4);
409             /* Fall-through */
410 
411           case 2:
412             pEnv->xWrite(
413                 pFile, (lsm_i64)i * pDb->szSector, aOld, pDb->szSector
414             );
415             break;
416         }
417         testFree(aOld);
418         pDb->aFile[iFile].aSector[i].aOld = 0;
419       }
420     }
421     pEnv->xClose(pFile);
422     zFree = zFile = sqlite3_mprintf("%s-log", pDb->zName);
423   }
424 
425   sqlite3_free(zFree);
426 }
427 /*
428 ** End test VFS code.
429 **************************************************************************
430 *************************************************************************/
431 
432 /*************************************************************************
433 **************************************************************************
434 ** Begin test compression hooks.
435 */
436 
437 #ifdef HAVE_ZLIB
438 #include <zlib.h>
439 
testZipBound(void * pCtx,int nSrc)440 static int testZipBound(void *pCtx, int nSrc){
441   return compressBound(nSrc);
442 }
443 
testZipCompress(void * pCtx,char * aOut,int * pnOut,const char * aIn,int nIn)444 static int testZipCompress(
445   void *pCtx,                     /* Context pointer */
446   char *aOut, int *pnOut,         /* OUT: Buffer containing compressed data */
447   const char *aIn, int nIn        /* Buffer containing input data */
448 ){
449   uLongf n = *pnOut;              /* In/out buffer size for compress() */
450   int rc;                         /* compress() return code */
451 
452   rc = compress((Bytef*)aOut, &n, (Bytef*)aIn, nIn);
453   *pnOut = n;
454   return (rc==Z_OK ? 0 : LSM_ERROR);
455 }
456 
testZipUncompress(void * pCtx,char * aOut,int * pnOut,const char * aIn,int nIn)457 static int testZipUncompress(
458   void *pCtx,                     /* Context pointer */
459   char *aOut, int *pnOut,         /* OUT: Buffer containing uncompressed data */
460   const char *aIn, int nIn        /* Buffer containing input data */
461 ){
462   uLongf n = *pnOut;              /* In/out buffer size for uncompress() */
463   int rc;                         /* uncompress() return code */
464 
465   rc = uncompress((Bytef*)aOut, &n, (Bytef*)aIn, nIn);
466   *pnOut = n;
467   return (rc==Z_OK ? 0 : LSM_ERROR);
468 }
469 
testConfigureCompression(lsm_db * pDb)470 static int testConfigureCompression(lsm_db *pDb){
471   static lsm_compress zip = {
472     0,                            /* Context pointer (unused) */
473     1,                            /* Id value */
474     testZipBound,                 /* xBound method */
475     testZipCompress,              /* xCompress method */
476     testZipUncompress             /* xUncompress method */
477   };
478   return lsm_config(pDb, LSM_CONFIG_SET_COMPRESSION, &zip);
479 }
480 #endif /* ifdef HAVE_ZLIB */
481 
482 /*
483 ** End test compression hooks.
484 **************************************************************************
485 *************************************************************************/
486 
test_lsm_close(TestDb * pTestDb)487 static int test_lsm_close(TestDb *pTestDb){
488   int i;
489   int rc = LSM_OK;
490   LsmDb *pDb = (LsmDb *)pTestDb;
491 
492   lsm_csr_close(pDb->pCsr);
493   lsm_close(pDb->db);
494 
495   /* If this is a multi-threaded database, wait on the worker threads. */
496   mt_shutdown(pDb);
497   for(i=0; i<pDb->nWorker && rc==LSM_OK; i++){
498     rc = pDb->aWorker[i].worker_rc;
499   }
500 
501   for(i=0; i<pDb->aFile[0].nSector; i++){
502     testFree(pDb->aFile[0].aSector[i].aOld);
503   }
504   testFree(pDb->aFile[0].aSector);
505   for(i=0; i<pDb->aFile[1].nSector; i++){
506     testFree(pDb->aFile[1].aSector[i].aOld);
507   }
508   testFree(pDb->aFile[1].aSector);
509 
510   memset(pDb, sizeof(LsmDb), 0x11);
511   testFree((char *)pDb->pBuf);
512   testFree((char *)pDb);
513   return rc;
514 }
515 
516 static void mt_signal_worker(LsmDb*, int);
517 
waitOnCheckpointer(LsmDb * pDb,lsm_db * db)518 static int waitOnCheckpointer(LsmDb *pDb, lsm_db *db){
519   int nSleep = 0;
520   int nKB;
521   int rc;
522 
523   do {
524     nKB = 0;
525     rc = lsm_info(db, LSM_INFO_CHECKPOINT_SIZE, &nKB);
526     if( rc!=LSM_OK || nKB<pDb->nMtMaxCkpt ) break;
527 #ifdef LSM_MUTEX_PTHREADS
528     mt_signal_worker(pDb,
529         (pDb->eMode==LSMTEST_MODE_BACKGROUND_CKPT ? 0 : 1)
530     );
531 #endif
532     usleep(5000);
533     nSleep += 5;
534   }while( 1 );
535 
536 #if 0
537     if( nSleep ) printf("# waitOnCheckpointer(): nSleep=%d\n", nSleep);
538 #endif
539 
540   return rc;
541 }
542 
waitOnWorker(LsmDb * pDb)543 static int waitOnWorker(LsmDb *pDb){
544   int rc;
545   int nLimit = -1;
546   int nSleep = 0;
547 
548   rc = lsm_config(pDb->db, LSM_CONFIG_AUTOFLUSH, &nLimit);
549   do {
550     int nOld, nNew, rc2;
551     rc2 = lsm_info(pDb->db, LSM_INFO_TREE_SIZE, &nOld, &nNew);
552     if( rc2!=LSM_OK ) return rc2;
553     if( nOld==0 || nNew<(nLimit/2) ) break;
554 #ifdef LSM_MUTEX_PTHREADS
555     mt_signal_worker(pDb, 0);
556 #endif
557     usleep(5000);
558     nSleep += 5;
559   }while( 1 );
560 
561 #if 0
562   if( nSleep ) printf("# waitOnWorker(): nSleep=%d\n", nSleep);
563 #endif
564 
565   return rc;
566 }
567 
test_lsm_write(TestDb * pTestDb,void * pKey,int nKey,void * pVal,int nVal)568 static int test_lsm_write(
569   TestDb *pTestDb,
570   void *pKey,
571   int nKey,
572   void *pVal,
573   int nVal
574 ){
575   LsmDb *pDb = (LsmDb *)pTestDb;
576   int rc = LSM_OK;
577 
578   if( pDb->eMode==LSMTEST_MODE_BACKGROUND_CKPT ){
579     rc = waitOnCheckpointer(pDb, pDb->db);
580   }else if(
581       pDb->eMode==LSMTEST_MODE_BACKGROUND_WORK
582    || pDb->eMode==LSMTEST_MODE_BACKGROUND_BOTH
583   ){
584     rc = waitOnWorker(pDb);
585   }
586 
587   if( rc==LSM_OK ){
588     rc = lsm_insert(pDb->db, pKey, nKey, pVal, nVal);
589   }
590   return rc;
591 }
592 
test_lsm_delete(TestDb * pTestDb,void * pKey,int nKey)593 static int test_lsm_delete(TestDb *pTestDb, void *pKey, int nKey){
594   LsmDb *pDb = (LsmDb *)pTestDb;
595   return lsm_delete(pDb->db, pKey, nKey);
596 }
597 
test_lsm_delete_range(TestDb * pTestDb,void * pKey1,int nKey1,void * pKey2,int nKey2)598 static int test_lsm_delete_range(
599   TestDb *pTestDb,
600   void *pKey1, int nKey1,
601   void *pKey2, int nKey2
602 ){
603   LsmDb *pDb = (LsmDb *)pTestDb;
604   return lsm_delete_range(pDb->db, pKey1, nKey1, pKey2, nKey2);
605 }
606 
test_lsm_fetch(TestDb * pTestDb,void * pKey,int nKey,void ** ppVal,int * pnVal)607 static int test_lsm_fetch(
608   TestDb *pTestDb,
609   void *pKey,
610   int nKey,
611   void **ppVal,
612   int *pnVal
613 ){
614   int rc;
615   LsmDb *pDb = (LsmDb *)pTestDb;
616   lsm_cursor *csr;
617 
618   if( pKey==0 ) return LSM_OK;
619 
620   if( pDb->pCsr==0 ){
621     rc = lsm_csr_open(pDb->db, &csr);
622     if( rc!=LSM_OK ) return rc;
623   }else{
624     csr = pDb->pCsr;
625   }
626 
627   rc = lsm_csr_seek(csr, pKey, nKey, LSM_SEEK_EQ);
628   if( rc==LSM_OK ){
629     if( lsm_csr_valid(csr) ){
630       const void *pVal; int nVal;
631       rc = lsm_csr_value(csr, &pVal, &nVal);
632       if( nVal>pDb->nBuf ){
633         testFree(pDb->pBuf);
634         pDb->pBuf = testMalloc(nVal*2);
635         pDb->nBuf = nVal*2;
636       }
637       memcpy(pDb->pBuf, pVal, nVal);
638       *ppVal = pDb->pBuf;
639       *pnVal = nVal;
640     }else{
641       *ppVal = 0;
642       *pnVal = -1;
643     }
644   }
645   if( pDb->pCsr==0 ){
646     lsm_csr_close(csr);
647   }
648   return rc;
649 }
650 
test_lsm_scan(TestDb * pTestDb,void * pCtx,int bReverse,void * pFirst,int nFirst,void * pLast,int nLast,void (* xCallback)(void *,void *,int,void *,int))651 static int test_lsm_scan(
652   TestDb *pTestDb,
653   void *pCtx,
654   int bReverse,
655   void *pFirst, int nFirst,
656   void *pLast, int nLast,
657   void (*xCallback)(void *, void *, int , void *, int)
658 ){
659   LsmDb *pDb = (LsmDb *)pTestDb;
660   lsm_cursor *csr;
661   lsm_cursor *csr2 = 0;
662   int rc;
663 
664   if( pDb->pCsr==0 ){
665     rc = lsm_csr_open(pDb->db, &csr);
666     if( rc!=LSM_OK ) return rc;
667   }else{
668     rc = LSM_OK;
669     csr = pDb->pCsr;
670   }
671 
672   /* To enhance testing, if both pLast and pFirst are defined, seek the
673   ** cursor to the "end" boundary here. Then the next block seeks it to
674   ** the "start" ready for the scan. The point is to test that cursors
675   ** can be reused.  */
676   if( pLast && pFirst ){
677     if( bReverse ){
678       rc = lsm_csr_seek(csr, pFirst, nFirst, LSM_SEEK_LE);
679     }else{
680       rc = lsm_csr_seek(csr, pLast, nLast, LSM_SEEK_GE);
681     }
682   }
683 
684   if( bReverse ){
685     if( pLast ){
686       rc = lsm_csr_seek(csr, pLast, nLast, LSM_SEEK_LE);
687     }else{
688       rc = lsm_csr_last(csr);
689     }
690   }else{
691     if( pFirst ){
692       rc = lsm_csr_seek(csr, pFirst, nFirst, LSM_SEEK_GE);
693     }else{
694       rc = lsm_csr_first(csr);
695     }
696   }
697 
698   while( rc==LSM_OK && lsm_csr_valid(csr) ){
699     const void *pKey; int nKey;
700     const void *pVal; int nVal;
701     int cmp;
702 
703     lsm_csr_key(csr, &pKey, &nKey);
704     lsm_csr_value(csr, &pVal, &nVal);
705 
706     if( bReverse && pFirst ){
707       cmp = memcmp(pFirst, pKey, MIN(nKey, nFirst));
708       if( cmp>0 || (cmp==0 && nFirst>nKey) ) break;
709     }else if( bReverse==0 && pLast ){
710       cmp = memcmp(pLast, pKey, MIN(nKey, nLast));
711       if( cmp<0 || (cmp==0 && nLast<nKey) ) break;
712     }
713 
714     xCallback(pCtx, (void *)pKey, nKey, (void *)pVal, nVal);
715 
716     if( bReverse ){
717       rc = lsm_csr_prev(csr);
718     }else{
719       rc = lsm_csr_next(csr);
720     }
721   }
722 
723   if( pDb->pCsr==0 ){
724     lsm_csr_close(csr);
725   }
726   return rc;
727 }
728 
test_lsm_begin(TestDb * pTestDb,int iLevel)729 static int test_lsm_begin(TestDb *pTestDb, int iLevel){
730   int rc = LSM_OK;
731   LsmDb *pDb = (LsmDb *)pTestDb;
732 
733   /* iLevel==0 is a no-op. */
734   if( iLevel==0 ) return 0;
735 
736   if( pDb->pCsr==0 ) rc = lsm_csr_open(pDb->db, &pDb->pCsr);
737   if( rc==LSM_OK && iLevel>1 ){
738     rc = lsm_begin(pDb->db, iLevel-1);
739   }
740 
741   return rc;
742 }
test_lsm_commit(TestDb * pTestDb,int iLevel)743 static int test_lsm_commit(TestDb *pTestDb, int iLevel){
744   LsmDb *pDb = (LsmDb *)pTestDb;
745 
746   /* If iLevel==0, close any open read transaction */
747   if( iLevel==0 && pDb->pCsr ){
748     lsm_csr_close(pDb->pCsr);
749     pDb->pCsr = 0;
750   }
751 
752   /* If iLevel==0, close any open read transaction */
753   return lsm_commit(pDb->db, MAX(0, iLevel-1));
754 }
test_lsm_rollback(TestDb * pTestDb,int iLevel)755 static int test_lsm_rollback(TestDb *pTestDb, int iLevel){
756   LsmDb *pDb = (LsmDb *)pTestDb;
757 
758   /* If iLevel==0, close any open read transaction */
759   if( iLevel==0 && pDb->pCsr ){
760     lsm_csr_close(pDb->pCsr);
761     pDb->pCsr = 0;
762   }
763 
764   return lsm_rollback(pDb->db, MAX(0, iLevel-1));
765 }
766 
767 /*
768 ** A log message callback registered with lsm connections. Prints all
769 ** messages to stderr.
770 */
xLog(void * pCtx,int rc,const char * z)771 static void xLog(void *pCtx, int rc, const char *z){
772   unused_parameter(rc);
773   /* fprintf(stderr, "lsm: rc=%d \"%s\"\n", rc, z); */
774   if( pCtx ) fprintf(stderr, "%s: ", (char *)pCtx);
775   fprintf(stderr, "%s\n", z);
776   fflush(stderr);
777 }
778 
xWorkHook(lsm_db * db,void * pArg)779 static void xWorkHook(lsm_db *db, void *pArg){
780   LsmDb *p = (LsmDb *)pArg;
781   if( p->xWork ) p->xWork(db, p->pWorkCtx);
782 }
783 
784 #define TEST_NO_RECOVERY -1
785 #define TEST_COMPRESSION -3
786 
787 #define TEST_MT_MODE     -2
788 #define TEST_MT_MIN_CKPT -4
789 #define TEST_MT_MAX_CKPT -5
790 
791 
test_lsm_config_str(LsmDb * pLsm,lsm_db * db,int bWorker,const char * zStr,int * pnThread)792 int test_lsm_config_str(
793   LsmDb *pLsm,
794   lsm_db *db,
795   int bWorker,
796   const char *zStr,
797   int *pnThread
798 ){
799   struct CfgParam {
800     const char *zParam;
801     int bWorker;
802     int eParam;
803   } aParam[] = {
804     { "autoflush",        0, LSM_CONFIG_AUTOFLUSH },
805     { "page_size",        0, LSM_CONFIG_PAGE_SIZE },
806     { "block_size",       0, LSM_CONFIG_BLOCK_SIZE },
807     { "safety",           0, LSM_CONFIG_SAFETY },
808     { "autowork",         0, LSM_CONFIG_AUTOWORK },
809     { "autocheckpoint",   0, LSM_CONFIG_AUTOCHECKPOINT },
810     { "mmap",             0, LSM_CONFIG_MMAP },
811     { "use_log",          0, LSM_CONFIG_USE_LOG },
812     { "automerge",        0, LSM_CONFIG_AUTOMERGE },
813     { "max_freelist",     0, LSM_CONFIG_MAX_FREELIST },
814     { "multi_proc",       0, LSM_CONFIG_MULTIPLE_PROCESSES },
815     { "worker_automerge", 1, LSM_CONFIG_AUTOMERGE },
816     { "test_no_recovery", 0, TEST_NO_RECOVERY },
817     { "bg_min_ckpt",      0, TEST_NO_RECOVERY },
818 
819     { "mt_mode",          0, TEST_MT_MODE },
820     { "mt_min_ckpt",      0, TEST_MT_MIN_CKPT },
821     { "mt_max_ckpt",      0, TEST_MT_MAX_CKPT },
822 
823 #ifdef HAVE_ZLIB
824     { "compression",      0, TEST_COMPRESSION },
825 #endif
826     { 0, 0 }
827   };
828   const char *z = zStr;
829   int nThread = 1;
830 
831   if( zStr==0 ) return 0;
832 
833   assert( db );
834   while( z[0] ){
835     const char *zStart;
836 
837     /* Skip whitespace */
838     while( *z==' ' ) z++;
839     zStart = z;
840 
841     while( *z && *z!='=' ) z++;
842     if( *z ){
843       int eParam;
844       int i;
845       int iVal;
846       int iMul = 1;
847       int rc;
848       char zParam[32];
849       int nParam = z-zStart;
850       if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;
851 
852       memcpy(zParam, zStart, nParam);
853       zParam[nParam] = '\0';
854       rc = testArgSelect(aParam, "param", zParam, &i);
855       if( rc!=0 ) return rc;
856       eParam = aParam[i].eParam;
857 
858       z++;
859       zStart = z;
860       while( *z>='0' && *z<='9' ) z++;
861       if( *z=='k' || *z=='K' ){
862         iMul = 1;
863         z++;
864       }else if( *z=='M' || *z=='M' ){
865         iMul = 1024;
866         z++;
867       }
868       nParam = z-zStart;
869       if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;
870       memcpy(zParam, zStart, nParam);
871       zParam[nParam] = '\0';
872       iVal = atoi(zParam) * iMul;
873 
874       if( eParam>0 ){
875         if( bWorker || aParam[i].bWorker==0 ){
876           lsm_config(db, eParam, &iVal);
877         }
878       }else{
879         switch( eParam ){
880           case TEST_NO_RECOVERY:
881             if( pLsm ) pLsm->bNoRecovery = iVal;
882             break;
883           case TEST_MT_MODE:
884             if( pLsm ) nThread = iVal;
885             break;
886           case TEST_MT_MIN_CKPT:
887             if( pLsm && iVal>0 ) pLsm->nMtMinCkpt = iVal*1024;
888             break;
889           case TEST_MT_MAX_CKPT:
890             if( pLsm && iVal>0 ) pLsm->nMtMaxCkpt = iVal*1024;
891             break;
892 #ifdef HAVE_ZLIB
893           case TEST_COMPRESSION:
894             testConfigureCompression(db);
895             break;
896 #endif
897         }
898       }
899     }else if( z!=zStart ){
900       goto syntax_error;
901     }
902   }
903 
904   if( pnThread ) *pnThread = nThread;
905   if( pLsm && pLsm->nMtMaxCkpt < pLsm->nMtMinCkpt ){
906     pLsm->nMtMinCkpt = pLsm->nMtMaxCkpt;
907   }
908 
909   return 0;
910  syntax_error:
911   testPrintError("syntax error at: \"%s\"\n", z);
912   return 1;
913 }
914 
tdb_lsm_config_str(TestDb * pDb,const char * zStr)915 int tdb_lsm_config_str(TestDb *pDb, const char *zStr){
916   int rc = 0;
917   if( tdb_lsm(pDb) ){
918 #ifdef LSM_MUTEX_PTHREADS
919     int i;
920 #endif
921     LsmDb *pLsm = (LsmDb *)pDb;
922 
923     rc = test_lsm_config_str(pLsm, pLsm->db, 0, zStr, 0);
924 #ifdef LSM_MUTEX_PTHREADS
925     for(i=0; rc==0 && i<pLsm->nWorker; i++){
926       rc = test_lsm_config_str(0, pLsm->aWorker[i].pWorker, 1, zStr, 0);
927     }
928 #endif
929   }
930   return rc;
931 }
932 
tdb_lsm_configure(lsm_db * db,const char * zConfig)933 int tdb_lsm_configure(lsm_db *db, const char *zConfig){
934   return test_lsm_config_str(0, db, 0, zConfig, 0);
935 }
936 
937 static int testLsmStartWorkers(LsmDb *, int, const char *, const char *);
938 
testLsmOpen(const char * zCfg,const char * zFilename,int bClear,TestDb ** ppDb)939 static int testLsmOpen(
940   const char *zCfg,
941   const char *zFilename,
942   int bClear,
943   TestDb **ppDb
944 ){
945   static const DatabaseMethods LsmMethods = {
946     test_lsm_close,
947     test_lsm_write,
948     test_lsm_delete,
949     test_lsm_delete_range,
950     test_lsm_fetch,
951     test_lsm_scan,
952     test_lsm_begin,
953     test_lsm_commit,
954     test_lsm_rollback
955   };
956 
957   int rc;
958   int nFilename;
959   LsmDb *pDb;
960 
961   /* If the bClear flag is set, delete any existing database. */
962   assert( zFilename);
963   if( bClear ) testDeleteLsmdb(zFilename);
964   nFilename = strlen(zFilename);
965 
966   pDb = (LsmDb *)testMalloc(sizeof(LsmDb) + nFilename + 1);
967   memset(pDb, 0, sizeof(LsmDb));
968   pDb->base.pMethods = &LsmMethods;
969   pDb->zName = (char *)&pDb[1];
970   memcpy(pDb->zName, zFilename, nFilename + 1);
971 
972   /* Default the sector size used for crash simulation to 512 bytes.
973   ** Todo: There should be an OS method to obtain this value - just as
974   ** there is in SQLite. For now, LSM assumes that it is smaller than
975   ** the page size (default 4KB).
976   */
977   pDb->szSector = 256;
978 
979   /* Default values for the mt_min_ckpt and mt_max_ckpt parameters. */
980   pDb->nMtMinCkpt = LSMTEST_DFLT_MT_MIN_CKPT;
981   pDb->nMtMaxCkpt = LSMTEST_DFLT_MT_MAX_CKPT;
982 
983   memcpy(&pDb->env, tdb_lsm_env(), sizeof(lsm_env));
984   pDb->env.pVfsCtx = (void *)pDb;
985   pDb->env.xFullpath = testEnvFullpath;
986   pDb->env.xOpen = testEnvOpen;
987   pDb->env.xRead = testEnvRead;
988   pDb->env.xWrite = testEnvWrite;
989   pDb->env.xTruncate = testEnvTruncate;
990   pDb->env.xSync = testEnvSync;
991   pDb->env.xSectorSize = testEnvSectorSize;
992   pDb->env.xRemap = testEnvRemap;
993   pDb->env.xFileid = testEnvFileid;
994   pDb->env.xClose = testEnvClose;
995   pDb->env.xUnlink = testEnvUnlink;
996   pDb->env.xLock = testEnvLock;
997   pDb->env.xTestLock = testEnvTestLock;
998   pDb->env.xShmBarrier = testEnvShmBarrier;
999   pDb->env.xShmMap = testEnvShmMap;
1000   pDb->env.xShmUnmap = testEnvShmUnmap;
1001   pDb->env.xSleep = testEnvSleep;
1002 
1003   rc = lsm_new(&pDb->env, &pDb->db);
1004   if( rc==LSM_OK ){
1005     int nThread = 1;
1006     lsm_config_log(pDb->db, xLog, 0);
1007     lsm_config_work_hook(pDb->db, xWorkHook, (void *)pDb);
1008 
1009     rc = test_lsm_config_str(pDb, pDb->db, 0, zCfg, &nThread);
1010     if( rc==LSM_OK ) rc = lsm_open(pDb->db, zFilename);
1011 
1012     pDb->eMode = nThread;
1013 #ifdef LSM_MUTEX_PTHREADS
1014     if( rc==LSM_OK && nThread>1 ){
1015       testLsmStartWorkers(pDb, nThread, zFilename, zCfg);
1016     }
1017 #endif
1018 
1019     if( rc!=LSM_OK ){
1020       test_lsm_close((TestDb *)pDb);
1021       pDb = 0;
1022     }
1023   }
1024 
1025   *ppDb = (TestDb *)pDb;
1026   return rc;
1027 }
1028 
test_lsm_open(const char * zSpec,const char * zFilename,int bClear,TestDb ** ppDb)1029 int test_lsm_open(
1030   const char *zSpec,
1031   const char *zFilename,
1032   int bClear,
1033   TestDb **ppDb
1034 ){
1035   return testLsmOpen(zSpec, zFilename, bClear, ppDb);
1036 }
1037 
test_lsm_small_open(const char * zSpec,const char * zFile,int bClear,TestDb ** ppDb)1038 int test_lsm_small_open(
1039   const char *zSpec,
1040   const char *zFile,
1041   int bClear,
1042   TestDb **ppDb
1043 ){
1044   const char *zCfg = "page_size=256 block_size=64 mmap=1024";
1045   return testLsmOpen(zCfg, zFile, bClear, ppDb);
1046 }
1047 
test_lsm_lomem_open(const char * zSpec,const char * zFilename,int bClear,TestDb ** ppDb)1048 int test_lsm_lomem_open(
1049   const char *zSpec,
1050   const char *zFilename,
1051   int bClear,
1052   TestDb **ppDb
1053 ){
1054     /* "max_freelist=4 autocheckpoint=32" */
1055   const char *zCfg =
1056     "page_size=256 block_size=64 autoflush=16 "
1057     "autocheckpoint=32"
1058     "mmap=0 "
1059   ;
1060   return testLsmOpen(zCfg, zFilename, bClear, ppDb);
1061 }
1062 
test_lsm_lomem2_open(const char * zSpec,const char * zFilename,int bClear,TestDb ** ppDb)1063 int test_lsm_lomem2_open(
1064   const char *zSpec,
1065   const char *zFilename,
1066   int bClear,
1067   TestDb **ppDb
1068 ){
1069     /* "max_freelist=4 autocheckpoint=32" */
1070   const char *zCfg =
1071     "page_size=512 block_size=64 autoflush=0 mmap=0 "
1072   ;
1073   return testLsmOpen(zCfg, zFilename, bClear, ppDb);
1074 }
1075 
test_lsm_zip_open(const char * zSpec,const char * zFilename,int bClear,TestDb ** ppDb)1076 int test_lsm_zip_open(
1077   const char *zSpec,
1078   const char *zFilename,
1079   int bClear,
1080   TestDb **ppDb
1081 ){
1082   const char *zCfg =
1083     "page_size=256 block_size=64 autoflush=16 "
1084     "autocheckpoint=32 compression=1 mmap=0 "
1085   ;
1086   return testLsmOpen(zCfg, zFilename, bClear, ppDb);
1087 }
1088 
tdb_lsm(TestDb * pDb)1089 lsm_db *tdb_lsm(TestDb *pDb){
1090   if( pDb->pMethods->xClose==test_lsm_close ){
1091     return ((LsmDb *)pDb)->db;
1092   }
1093   return 0;
1094 }
1095 
tdb_lsm_multithread(TestDb * pDb)1096 int tdb_lsm_multithread(TestDb *pDb){
1097   int ret = 0;
1098   if( tdb_lsm(pDb) ){
1099     ret = ((LsmDb*)pDb)->eMode!=LSMTEST_MODE_SINGLETHREAD;
1100   }
1101   return ret;
1102 }
1103 
tdb_lsm_enable_log(TestDb * pDb,int bEnable)1104 void tdb_lsm_enable_log(TestDb *pDb, int bEnable){
1105   lsm_db *db = tdb_lsm(pDb);
1106   if( db ){
1107     lsm_config_log(db, (bEnable ? xLog : 0), (void *)"client");
1108   }
1109 }
1110 
tdb_lsm_application_crash(TestDb * pDb)1111 void tdb_lsm_application_crash(TestDb *pDb){
1112   if( tdb_lsm(pDb) ){
1113     LsmDb *p = (LsmDb *)pDb;
1114     p->bCrashed = 1;
1115   }
1116 }
1117 
tdb_lsm_prepare_system_crash(TestDb * pDb)1118 void tdb_lsm_prepare_system_crash(TestDb *pDb){
1119   if( tdb_lsm(pDb) ){
1120     LsmDb *p = (LsmDb *)pDb;
1121     p->bPrepareCrash = 1;
1122   }
1123 }
1124 
tdb_lsm_system_crash(TestDb * pDb)1125 void tdb_lsm_system_crash(TestDb *pDb){
1126   if( tdb_lsm(pDb) ){
1127     LsmDb *p = (LsmDb *)pDb;
1128     p->bCrashed = 1;
1129     doSystemCrash(p);
1130   }
1131 }
1132 
tdb_lsm_safety(TestDb * pDb,int eMode)1133 void tdb_lsm_safety(TestDb *pDb, int eMode){
1134   assert( eMode==LSM_SAFETY_OFF
1135        || eMode==LSM_SAFETY_NORMAL
1136        || eMode==LSM_SAFETY_FULL
1137   );
1138   if( tdb_lsm(pDb) ){
1139     int iParam = eMode;
1140     LsmDb *p = (LsmDb *)pDb;
1141     lsm_config(p->db, LSM_CONFIG_SAFETY, &iParam);
1142   }
1143 }
1144 
tdb_lsm_prepare_sync_crash(TestDb * pDb,int iSync)1145 void tdb_lsm_prepare_sync_crash(TestDb *pDb, int iSync){
1146   assert( iSync>0 );
1147   if( tdb_lsm(pDb) ){
1148     LsmDb *p = (LsmDb *)pDb;
1149     p->nAutoCrash = iSync;
1150     p->bPrepareCrash = 1;
1151   }
1152 }
1153 
tdb_lsm_config_work_hook(TestDb * pDb,void (* xWork)(lsm_db *,void *),void * pWorkCtx)1154 void tdb_lsm_config_work_hook(
1155   TestDb *pDb,
1156   void (*xWork)(lsm_db *, void *),
1157   void *pWorkCtx
1158 ){
1159   if( tdb_lsm(pDb) ){
1160     LsmDb *p = (LsmDb *)pDb;
1161     p->xWork = xWork;
1162     p->pWorkCtx = pWorkCtx;
1163   }
1164 }
1165 
tdb_lsm_write_hook(TestDb * pDb,void (* xWrite)(void *,int,lsm_i64,int,int),void * pWriteCtx)1166 void tdb_lsm_write_hook(
1167   TestDb *pDb,
1168   void (*xWrite)(void *, int, lsm_i64, int, int),
1169   void *pWriteCtx
1170 ){
1171   if( tdb_lsm(pDb) ){
1172     LsmDb *p = (LsmDb *)pDb;
1173     p->xWriteHook = xWrite;
1174     p->pWriteCtx = pWriteCtx;
1175   }
1176 }
1177 
tdb_lsm_open(const char * zCfg,const char * zDb,int bClear,TestDb ** ppDb)1178 int tdb_lsm_open(const char *zCfg, const char *zDb, int bClear, TestDb **ppDb){
1179   return testLsmOpen(zCfg, zDb, bClear, ppDb);
1180 }
1181 
1182 #ifdef LSM_MUTEX_PTHREADS
1183 
1184 /*
1185 ** Signal worker thread iWorker that there may be work to do.
1186 */
mt_signal_worker(LsmDb * pDb,int iWorker)1187 static void mt_signal_worker(LsmDb *pDb, int iWorker){
1188   LsmWorker *p = &pDb->aWorker[iWorker];
1189   pthread_mutex_lock(&p->worker_mutex);
1190   p->bDoWork = 1;
1191   pthread_cond_signal(&p->worker_cond);
1192   pthread_mutex_unlock(&p->worker_mutex);
1193 }
1194 
1195 /*
1196 ** This routine is used as the main() for all worker threads.
1197 */
worker_main(void * pArg)1198 static void *worker_main(void *pArg){
1199   LsmWorker *p = (LsmWorker *)pArg;
1200   lsm_db *pWorker;                /* Connection to access db through */
1201 
1202   pthread_mutex_lock(&p->worker_mutex);
1203   while( (pWorker = p->pWorker) ){
1204     int rc = LSM_OK;
1205 
1206     /* Do some work. If an error occurs, exit. */
1207 
1208     pthread_mutex_unlock(&p->worker_mutex);
1209     if( p->eType==LSMTEST_THREAD_CKPT ){
1210       int nKB = 0;
1211       rc = lsm_info(pWorker, LSM_INFO_CHECKPOINT_SIZE, &nKB);
1212       if( rc==LSM_OK && nKB>=p->pDb->nMtMinCkpt ){
1213         rc = lsm_checkpoint(pWorker, 0);
1214       }
1215     }else{
1216       int nWrite;
1217       do {
1218 
1219         if( p->eType==LSMTEST_THREAD_WORKER ){
1220           waitOnCheckpointer(p->pDb, pWorker);
1221         }
1222 
1223         nWrite = 0;
1224         rc = lsm_work(pWorker, 0, 256, &nWrite);
1225 
1226         if( p->eType==LSMTEST_THREAD_WORKER && nWrite ){
1227           mt_signal_worker(p->pDb, 1);
1228         }
1229       }while( nWrite && p->pWorker );
1230     }
1231     pthread_mutex_lock(&p->worker_mutex);
1232 
1233     if( rc!=LSM_OK && rc!=LSM_BUSY ){
1234       p->worker_rc = rc;
1235       break;
1236     }
1237 
1238     /* The thread will wake up when it is signaled either because another
1239     ** thread has created some work for this one or because the connection
1240     ** is being closed.  */
1241     if( p->pWorker && p->bDoWork==0 ){
1242       pthread_cond_wait(&p->worker_cond, &p->worker_mutex);
1243     }
1244     p->bDoWork = 0;
1245   }
1246   pthread_mutex_unlock(&p->worker_mutex);
1247 
1248   return 0;
1249 }
1250 
1251 
mt_stop_worker(LsmDb * pDb,int iWorker)1252 static void mt_stop_worker(LsmDb *pDb, int iWorker){
1253   LsmWorker *p = &pDb->aWorker[iWorker];
1254   if( p->pWorker ){
1255     void *pDummy;
1256     lsm_db *pWorker;
1257 
1258     /* Signal the worker to stop */
1259     pthread_mutex_lock(&p->worker_mutex);
1260     pWorker = p->pWorker;
1261     p->pWorker = 0;
1262     pthread_cond_signal(&p->worker_cond);
1263     pthread_mutex_unlock(&p->worker_mutex);
1264 
1265     /* Join the worker thread. */
1266     pthread_join(p->worker_thread, &pDummy);
1267 
1268     /* Free resources allocated in mt_start_worker() */
1269     pthread_cond_destroy(&p->worker_cond);
1270     pthread_mutex_destroy(&p->worker_mutex);
1271     lsm_close(pWorker);
1272   }
1273 }
1274 
mt_shutdown(LsmDb * pDb)1275 static void mt_shutdown(LsmDb *pDb){
1276   int i;
1277   for(i=0; i<pDb->nWorker; i++){
1278     mt_stop_worker(pDb, i);
1279   }
1280 }
1281 
1282 /*
1283 ** This callback is invoked by LSM when the client database writes to
1284 ** the database file (i.e. to flush the contents of the in-memory tree).
1285 ** This implies there may be work to do on the database, so signal
1286 ** the worker threads.
1287 */
mt_client_work_hook(lsm_db * db,void * pArg)1288 static void mt_client_work_hook(lsm_db *db, void *pArg){
1289   LsmDb *pDb = (LsmDb *)pArg;     /* LsmDb database handle */
1290 
1291   /* Invoke the user level work-hook, if any. */
1292   if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx);
1293 
1294   /* Wake up worker thread 0. */
1295   mt_signal_worker(pDb, 0);
1296 }
1297 
mt_worker_work_hook(lsm_db * db,void * pArg)1298 static void mt_worker_work_hook(lsm_db *db, void *pArg){
1299   LsmDb *pDb = (LsmDb *)pArg;     /* LsmDb database handle */
1300 
1301   /* Invoke the user level work-hook, if any. */
1302   if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx);
1303 }
1304 
1305 /*
1306 ** Launch worker thread iWorker for database connection pDb.
1307 */
mt_start_worker(LsmDb * pDb,int iWorker,const char * zFilename,const char * zCfg,int eType)1308 static int mt_start_worker(
1309   LsmDb *pDb,                     /* Main database structure */
1310   int iWorker,                    /* Worker number to start */
1311   const char *zFilename,          /* File name of database to open */
1312   const char *zCfg,               /* Connection configuration string */
1313   int eType                       /* Type of worker thread */
1314 ){
1315   int rc = 0;                     /* Return code */
1316   LsmWorker *p;                   /* Object to initialize */
1317 
1318   assert( iWorker<pDb->nWorker );
1319   assert( eType==LSMTEST_THREAD_CKPT
1320        || eType==LSMTEST_THREAD_WORKER
1321        || eType==LSMTEST_THREAD_WORKER_AC
1322   );
1323 
1324   p = &pDb->aWorker[iWorker];
1325   p->eType = eType;
1326   p->pDb = pDb;
1327 
1328   /* Open the worker connection */
1329   if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker);
1330   if( zCfg ){
1331     test_lsm_config_str(pDb, p->pWorker, 1, zCfg, 0);
1332   }
1333   if( rc==0 ) rc = lsm_open(p->pWorker, zFilename);
1334   lsm_config_log(p->pWorker, xLog, (void *)"worker");
1335 
1336   /* Configure the work-hook */
1337   if( rc==0 ){
1338     lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb);
1339   }
1340 
1341   if( eType==LSMTEST_THREAD_WORKER ){
1342     test_lsm_config_str(0, p->pWorker, 1, "autocheckpoint=0", 0);
1343   }
1344 
1345   /* Kick off the worker thread. */
1346   if( rc==0 ) rc = pthread_cond_init(&p->worker_cond, 0);
1347   if( rc==0 ) rc = pthread_mutex_init(&p->worker_mutex, 0);
1348   if( rc==0 ) rc = pthread_create(&p->worker_thread, 0, worker_main, (void *)p);
1349 
1350   return rc;
1351 }
1352 
1353 
testLsmStartWorkers(LsmDb * pDb,int eModel,const char * zFilename,const char * zCfg)1354 static int testLsmStartWorkers(
1355   LsmDb *pDb, int eModel, const char *zFilename, const char *zCfg
1356 ){
1357   int rc;
1358 
1359   if( eModel<1 || eModel>4 ) return 1;
1360   if( eModel==1 ) return 0;
1361 
1362   /* Configure a work-hook for the client connection. Worker 0 is signalled
1363   ** every time the users connection writes to the database.  */
1364   lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb);
1365 
1366   /* Allocate space for two worker connections. They may not both be
1367   ** used, but both are allocated.  */
1368   pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * 2);
1369   memset(pDb->aWorker, 0, sizeof(LsmWorker) * 2);
1370 
1371   switch( eModel ){
1372     case LSMTEST_MODE_BACKGROUND_CKPT:
1373       pDb->nWorker = 1;
1374       test_lsm_config_str(0, pDb->db, 0, "autocheckpoint=0", 0);
1375       rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_CKPT);
1376       break;
1377 
1378     case LSMTEST_MODE_BACKGROUND_WORK:
1379       pDb->nWorker = 1;
1380       test_lsm_config_str(0, pDb->db, 0, "autowork=0", 0);
1381       rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_WORKER_AC);
1382       break;
1383 
1384     case LSMTEST_MODE_BACKGROUND_BOTH:
1385       pDb->nWorker = 2;
1386       test_lsm_config_str(0, pDb->db, 0, "autowork=0", 0);
1387       rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_WORKER);
1388       if( rc==0 ){
1389         rc = mt_start_worker(pDb, 1, zFilename, zCfg, LSMTEST_THREAD_CKPT);
1390       }
1391       break;
1392   }
1393 
1394   return rc;
1395 }
1396 
1397 
test_lsm_mt2(const char * zSpec,const char * zFilename,int bClear,TestDb ** ppDb)1398 int test_lsm_mt2(
1399   const char *zSpec,
1400   const char *zFilename,
1401   int bClear,
1402   TestDb **ppDb
1403 ){
1404   const char *zCfg = "mt_mode=2";
1405   return testLsmOpen(zCfg, zFilename, bClear, ppDb);
1406 }
1407 
test_lsm_mt3(const char * zSpec,const char * zFilename,int bClear,TestDb ** ppDb)1408 int test_lsm_mt3(
1409   const char *zSpec,
1410   const char *zFilename,
1411   int bClear,
1412   TestDb **ppDb
1413 ){
1414   const char *zCfg = "mt_mode=4";
1415   return testLsmOpen(zCfg, zFilename, bClear, ppDb);
1416 }
1417 
1418 #else
mt_shutdown(LsmDb * pDb)1419 static void mt_shutdown(LsmDb *pDb) {
1420   unused_parameter(pDb);
1421 }
test_lsm_mt(const char * zFilename,int bClear,TestDb ** ppDb)1422 int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){
1423   unused_parameter(zFilename);
1424   unused_parameter(bClear);
1425   unused_parameter(ppDb);
1426   testPrintError("threads unavailable - recompile with LSM_MUTEX_PTHREADS\n");
1427   return 1;
1428 }
1429 #endif
1430