xref: /sqlite-3.40.0/ext/lsm1/lsm-test/lsmtest8.c (revision 067b92ba)
1 
2 /*
3 ** This file contains test cases to verify that "live-recovery" following
4 ** a mid-transaction failure of a writer process.
5 */
6 
7 
8 /*
9 ** This test file includes lsmInt.h to get access to the definition of the
10 ** ShmHeader structure. This is required to cause strategic damage to the
11 ** shared memory header as part of recovery testing.
12 */
13 #include "lsmInt.h"
14 
15 #include "lsmtest.h"
16 
17 typedef struct SetupStep SetupStep;
18 struct SetupStep {
19   int bFlush;                     /* Flush to disk and checkpoint */
20   int iInsStart;                  /* First key-value from ds to insert */
21   int nIns;                       /* Number of rows to insert */
22   int iDelStart;                  /* First key from ds to delete */
23   int nDel;                       /* Number of rows to delete */
24 };
25 
doSetupStep(TestDb * pDb,Datasource * pData,const SetupStep * pStep,int * pRc)26 static void doSetupStep(
27   TestDb *pDb,
28   Datasource *pData,
29   const SetupStep *pStep,
30   int *pRc
31 ){
32   testWriteDatasourceRange(pDb, pData, pStep->iInsStart, pStep->nIns, pRc);
33   testDeleteDatasourceRange(pDb, pData, pStep->iDelStart, pStep->nDel, pRc);
34   if( *pRc==0 ){
35     int nSave = -1;
36     int nBuf = 64;
37     lsm_db *db = tdb_lsm(pDb);
38 
39     lsm_config(db, LSM_CONFIG_AUTOFLUSH, &nSave);
40     lsm_config(db, LSM_CONFIG_AUTOFLUSH, &nBuf);
41     lsm_begin(db, 1);
42     lsm_commit(db, 0);
43     lsm_config(db, LSM_CONFIG_AUTOFLUSH, &nSave);
44 
45     *pRc = lsm_work(db, 0, 0, 0);
46     if( *pRc==0 ){
47       *pRc = lsm_checkpoint(db, 0);
48     }
49   }
50 }
51 
doSetupStepArray(TestDb * pDb,Datasource * pData,const SetupStep * aStep,int nStep)52 static void doSetupStepArray(
53   TestDb *pDb,
54   Datasource *pData,
55   const SetupStep *aStep,
56   int nStep
57 ){
58   int i;
59   for(i=0; i<nStep; i++){
60     int rc = 0;
61     doSetupStep(pDb, pData, &aStep[i], &rc);
62     assert( rc==0 );
63   }
64 }
65 
setupDatabase1(TestDb * pDb,Datasource ** ppData)66 static void setupDatabase1(TestDb *pDb, Datasource **ppData){
67   const SetupStep aStep[] = {
68     { 0,                                  1,     2000, 0, 0 },
69     { 1,                                  0,     0, 0, 0 },
70     { 0,                                  10001, 1000, 0, 0 },
71   };
72   const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 12, 16, 100, 500};
73   Datasource *pData;
74 
75   pData = testDatasourceNew(&defn);
76   doSetupStepArray(pDb, pData, aStep, ArraySize(aStep));
77   if( ppData ){
78     *ppData = pData;
79   }else{
80     testDatasourceFree(pData);
81   }
82 }
83 
84 #include <stdio.h>
testReadFile(const char * zFile,int iOff,void * pOut,int nByte,int * pRc)85 void testReadFile(const char *zFile, int iOff, void *pOut, int nByte, int *pRc){
86   if( *pRc==0 ){
87     FILE *fd;
88     fd = fopen(zFile, "rb");
89     if( fd==0 ){
90       *pRc = 1;
91     }else{
92       if( 0!=fseek(fd, iOff, SEEK_SET) ){
93         *pRc = 1;
94       }else{
95         assert( nByte>=0 );
96         if( (size_t)nByte!=fread(pOut, 1, nByte, fd) ){
97           *pRc = 1;
98         }
99       }
100       fclose(fd);
101     }
102   }
103 }
104 
testWriteFile(const char * zFile,int iOff,void * pOut,int nByte,int * pRc)105 void testWriteFile(
106   const char *zFile,
107   int iOff,
108   void *pOut,
109   int nByte,
110   int *pRc
111 ){
112   if( *pRc==0 ){
113     FILE *fd;
114     fd = fopen(zFile, "r+b");
115     if( fd==0 ){
116       *pRc = 1;
117     }else{
118       if( 0!=fseek(fd, iOff, SEEK_SET) ){
119         *pRc = 1;
120       }else{
121         assert( nByte>=0 );
122         if( (size_t)nByte!=fwrite(pOut, 1, nByte, fd) ){
123           *pRc = 1;
124         }
125       }
126       fclose(fd);
127     }
128   }
129 }
130 
getShmHeader(const char * zDb)131 static ShmHeader *getShmHeader(const char *zDb){
132   int rc = 0;
133   char *zShm = testMallocPrintf("%s-shm", zDb);
134   ShmHeader *pHdr;
135 
136   pHdr = testMalloc(sizeof(ShmHeader));
137   testReadFile(zShm, 0, (void *)pHdr, sizeof(ShmHeader), &rc);
138   assert( rc==0 );
139 
140   return pHdr;
141 }
142 
143 /*
144 ** This function makes a copy of the three files associated with LSM
145 ** database zDb (i.e. if zDb is "test.db", it makes copies of "test.db",
146 ** "test.db-log" and "test.db-shm").
147 **
148 ** It then opens a new database connection to the copy with the xLock() call
149 ** instrumented so that it appears that some other process already connected
150 ** to the db (holding a shared lock on DMS2). This prevents recovery from
151 ** running. Then:
152 **
153 **    1) Check that the checksum of the database is zCksum.
154 **    2) Write a few keys to the database. Then delete the same keys.
155 **    3) Check that the checksum is zCksum.
156 **    4) Flush the db to disk and run a checkpoint.
157 **    5) Check once more that the checksum is still zCksum.
158 */
doLiveRecovery(const char * zDb,const char * zCksum,int * pRc)159 static void doLiveRecovery(const char *zDb, const char *zCksum, int *pRc){
160   if( *pRc==LSM_OK ){
161     const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 20, 25, 100, 500};
162     Datasource *pData;
163     const char *zCopy = "testcopy.lsm";
164     char zCksum2[TEST_CKSUM_BYTES];
165     TestDb *pDb = 0;
166     int rc;
167 
168     pData = testDatasourceNew(&defn);
169 
170     testCopyLsmdb(zDb, zCopy);
171     rc = tdb_lsm_open("test_no_recovery=1", zCopy, 0, &pDb);
172     if( rc==0 ){
173       ShmHeader *pHdr;
174       lsm_db *db;
175       testCksumDatabase(pDb, zCksum2);
176       testCompareStr(zCksum, zCksum2, &rc);
177 
178       testWriteDatasourceRange(pDb, pData, 1, 10, &rc);
179       testDeleteDatasourceRange(pDb, pData, 1, 10, &rc);
180 
181       /* Test that the two tree-headers are now consistent. */
182       pHdr = getShmHeader(zCopy);
183       if( rc==0 && memcmp(&pHdr->hdr1, &pHdr->hdr2, sizeof(pHdr->hdr1)) ){
184         rc = 1;
185       }
186       testFree(pHdr);
187 
188       if( rc==0 ){
189         int nBuf = 64;
190         db = tdb_lsm(pDb);
191         lsm_config(db, LSM_CONFIG_AUTOFLUSH, &nBuf);
192         lsm_begin(db, 1);
193         lsm_commit(db, 0);
194         rc = lsm_work(db, 0, 0, 0);
195       }
196 
197       testCksumDatabase(pDb, zCksum2);
198       testCompareStr(zCksum, zCksum2, &rc);
199     }
200 
201     testDatasourceFree(pData);
202     testClose(&pDb);
203     testDeleteLsmdb(zCopy);
204     *pRc = rc;
205   }
206 }
207 
doWriterCrash1(int * pRc)208 static void doWriterCrash1(int *pRc){
209   const int nWrite = 2000;
210   const int nStep = 10;
211   const int iWriteStart = 20000;
212   int rc = 0;
213   TestDb *pDb = 0;
214   Datasource *pData = 0;
215 
216   rc = tdb_lsm_open("autowork=0", "testdb.lsm", 1, &pDb);
217   if( rc==0 ){
218     int iDot = 0;
219     char zCksum[TEST_CKSUM_BYTES];
220     int i;
221     setupDatabase1(pDb, &pData);
222     testCksumDatabase(pDb, zCksum);
223     testBegin(pDb, 2, &rc);
224     for(i=0; rc==0 && i<nWrite; i+=nStep){
225       testCaseProgress(i, nWrite, testCaseNDot(), &iDot);
226       testWriteDatasourceRange(pDb, pData, iWriteStart+i, nStep, &rc);
227       doLiveRecovery("testdb.lsm", zCksum, &rc);
228     }
229   }
230   testCommit(pDb, 0, &rc);
231   testClose(&pDb);
232   testDatasourceFree(pData);
233   *pRc = rc;
234 }
235 
236 /*
237 ** This test case verifies that inconsistent tree-headers in shared-memory
238 ** are resolved correctly.
239 */
doWriterCrash2(int * pRc)240 static void doWriterCrash2(int *pRc){
241   int rc = 0;
242   TestDb *pDb = 0;
243   Datasource *pData = 0;
244 
245   rc = tdb_lsm_open("autowork=0", "testdb.lsm", 1, &pDb);
246   if( rc==0 ){
247     ShmHeader *pHdr1;
248     ShmHeader *pHdr2;
249     char zCksum1[TEST_CKSUM_BYTES];
250     char zCksum2[TEST_CKSUM_BYTES];
251 
252     pHdr1 = testMalloc(sizeof(ShmHeader));
253     pHdr2 = testMalloc(sizeof(ShmHeader));
254     setupDatabase1(pDb, &pData);
255 
256     /* Grab a copy of the shared-memory header. And the db checksum */
257     testReadFile("testdb.lsm-shm", 0, (void *)pHdr1, sizeof(ShmHeader), &rc);
258     testCksumDatabase(pDb, zCksum1);
259 
260     /* Modify the database */
261     testBegin(pDb, 2, &rc);
262     testWriteDatasourceRange(pDb, pData, 30000, 200, &rc);
263     testCommit(pDb, 0, &rc);
264 
265     /* Grab a second copy of the shared-memory header. And the db checksum */
266     testReadFile("testdb.lsm-shm", 0, (void *)pHdr2, sizeof(ShmHeader), &rc);
267     testCksumDatabase(pDb, zCksum2);
268     doLiveRecovery("testdb.lsm", zCksum2, &rc);
269 
270     /* If both tree-headers are valid, tree-header-1 is used. */
271     memcpy(&pHdr2->hdr1, &pHdr1->hdr1, sizeof(pHdr1->hdr1));
272     pHdr2->bWriter = 1;
273     testWriteFile("testdb.lsm-shm", 0, (void *)pHdr2, sizeof(ShmHeader), &rc);
274     doLiveRecovery("testdb.lsm", zCksum1, &rc);
275 
276     /* If both tree-headers are valid, tree-header-1 is used. */
277     memcpy(&pHdr2->hdr1, &pHdr2->hdr2, sizeof(pHdr1->hdr1));
278     memcpy(&pHdr2->hdr2, &pHdr1->hdr1, sizeof(pHdr1->hdr1));
279     pHdr2->bWriter = 1;
280     testWriteFile("testdb.lsm-shm", 0, (void *)pHdr2, sizeof(ShmHeader), &rc);
281     doLiveRecovery("testdb.lsm", zCksum2, &rc);
282 
283     /* If tree-header 1 is invalid, tree-header-2 is used */
284     memcpy(&pHdr2->hdr2, &pHdr2->hdr1, sizeof(pHdr1->hdr1));
285     pHdr2->hdr1.aCksum[0] = 5;
286     pHdr2->hdr1.aCksum[0] = 6;
287     pHdr2->bWriter = 1;
288     testWriteFile("testdb.lsm-shm", 0, (void *)pHdr2, sizeof(ShmHeader), &rc);
289     doLiveRecovery("testdb.lsm", zCksum2, &rc);
290 
291     /* If tree-header 2 is invalid, tree-header-1 is used */
292     memcpy(&pHdr2->hdr1, &pHdr2->hdr2, sizeof(pHdr1->hdr1));
293     pHdr2->hdr2.aCksum[0] = 5;
294     pHdr2->hdr2.aCksum[0] = 6;
295     pHdr2->bWriter = 1;
296     testWriteFile("testdb.lsm-shm", 0, (void *)pHdr2, sizeof(ShmHeader), &rc);
297     doLiveRecovery("testdb.lsm", zCksum2, &rc);
298 
299     testFree(pHdr1);
300     testFree(pHdr2);
301     testClose(&pDb);
302   }
303 
304   *pRc = rc;
305 }
306 
do_writer_crash_test(const char * zPattern,int * pRc)307 void do_writer_crash_test(const char *zPattern, int *pRc){
308   struct Test {
309     const char *zName;
310     void (*xFunc)(int *);
311   } aTest[] = {
312     { "writercrash1.lsm", doWriterCrash1 },
313     { "writercrash2.lsm", doWriterCrash2 },
314   };
315   int i;
316   for(i=0; i<ArraySize(aTest); i++){
317     struct Test *p = &aTest[i];
318     if( testCaseBegin(pRc, zPattern, p->zName) ){
319       p->xFunc(pRc);
320       testCaseFinish(*pRc);
321     }
322   }
323 
324 }
325