xref: /sqlite-3.40.0/ext/lsm1/lsm_file.c (revision 34aebb88)
1 /*
2 ** 2011-08-26
3 **
4 ** The author disclaims copyright to this source code.  In place of
5 ** a legal notice, here is a blessing:
6 **
7 **    May you do good and not evil.
8 **    May you find forgiveness for yourself and forgive others.
9 **    May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 **
13 ** NORMAL DATABASE FILE FORMAT
14 **
15 ** The following database file format concepts are used by the code in
16 ** this file to read and write the database file.
17 **
18 ** Pages:
19 **
20 **   A database file is divided into pages. The first 8KB of the file consists
21 **   of two 4KB meta-pages. The meta-page size is not configurable. The
22 **   remainder of the file is made up of database pages. The default database
23 **   page size is 4KB. Database pages are aligned to page-size boundaries,
24 **   so if the database page size is larger than 8KB there is a gap between
25 **   the end of the meta pages and the start of the database pages.
26 **
27 **   Database pages are numbered based on their position in the file. Page N
28 **   begins at byte offset ((N-1)*pgsz). This means that page 1 does not
29 **   exist - since it would always overlap with the meta pages. If the
30 **   page-size is (say) 512 bytes, then the first usable page in the database
31 **   is page 33.
32 **
33 **   It is assumed that the first two meta pages and the data that follows
34 **   them are located on different disk sectors. So that if a power failure
35 **   while writing to a meta page there is no risk of damage to the other
36 **   meta page or any other part of the database file. TODO: This may need
37 **   to be revisited.
38 **
39 ** Blocks:
40 **
41 **   The database file is also divided into blocks. The default block size is
42 **   1MB. When writing to the database file, an attempt is made to write data
43 **   in contiguous block-sized chunks.
44 **
45 **   The first and last page on each block are special in that they are 4
46 **   bytes smaller than all other pages. This is because the last four bytes
47 **   of space on the first and last pages of each block are reserved for
48 **   pointers to other blocks (i.e. a 32-bit block number).
49 **
50 ** Runs:
51 **
52 **   A run is a sequence of pages that the upper layer uses to store a
53 **   sorted array of database keys (and accompanying data - values, FC
54 **   pointers and so on). Given a page within a run, it is possible to
55 **   navigate to the next page in the run as follows:
56 **
57 **     a) if the current page is not the last in a block, the next page
58 **        in the run is located immediately after the current page, OR
59 **
60 **     b) if the current page is the last page in a block, the next page
61 **        in the run is the first page on the block identified by the
62 **        block pointer stored in the last 4 bytes of the current block.
63 **
64 **   It is possible to navigate to the previous page in a similar fashion,
65 **   using the block pointer embedded in the last 4 bytes of the first page
66 **   of each block as required.
67 **
68 **   The upper layer is responsible for identifying by page number the
69 **   first and last page of any run that it needs to navigate - there are
70 **   no "end-of-run" markers stored or identified by this layer. This is
71 **   necessary as clients reading different database snapshots may access
72 **   different subsets of a run.
73 **
74 ** THE LOG FILE
75 **
76 ** This file opens and closes the log file. But it does not contain any
77 ** logic related to the log file format. Instead, it exports the following
78 ** functions that are used by the code in lsm_log.c to read and write the
79 ** log file:
80 **
81 **     lsmFsOpenLog
82 **     lsmFsWriteLog
83 **     lsmFsSyncLog
84 **     lsmFsReadLog
85 **     lsmFsTruncateLog
86 **     lsmFsCloseAndDeleteLog
87 **
88 ** COMPRESSED DATABASE FILE FORMAT
89 **
90 ** The compressed database file format is very similar to the normal format.
91 ** The file still begins with two 4KB meta-pages (which are never compressed).
92 ** It is still divided into blocks.
93 **
94 ** The first and last four bytes of each block are reserved for 32-bit
95 ** pointer values. Similar to the way four bytes are carved from the end of
96 ** the first and last page of each block in uncompressed databases. From
97 ** the point of view of the upper layer, all pages are the same size - this
98 ** is different from the uncompressed format where the first and last pages
99 ** on each block are 4 bytes smaller than the others.
100 **
101 ** Pages are stored in variable length compressed form, as follows:
102 **
103 **     * 3-byte size field containing the size of the compressed page image
104 **       in bytes. The most significant bit of each byte of the size field
105 **       is always set. The remaining 7 bits are used to store a 21-bit
106 **       integer value (in big-endian order - the first byte in the field
107 **       contains the most significant 7 bits). Since the maximum allowed
108 **       size of a compressed page image is (2^17 - 1) bytes, there are
109 **       actually 4 unused bits in the size field.
110 **
111 **       In other words, if the size of the compressed page image is nSz,
112 **       the header can be serialized as follows:
113 **
114 **         u8 aHdr[3]
115 **         aHdr[0] = 0x80 | (u8)(nSz >> 14);
116 **         aHdr[1] = 0x80 | (u8)(nSz >>  7);
117 **         aHdr[2] = 0x80 | (u8)(nSz >>  0);
118 **
119 **     * Compressed page image.
120 **
121 **     * A second copy of the 3-byte record header.
122 **
123 ** A page number is a byte offset into the database file. So the smallest
124 ** possible page number is 8192 (immediately after the two meta-pages).
125 ** The first and root page of a segment are identified by a page number
126 ** corresponding to the byte offset of the first byte in the corresponding
127 ** page record. The last page of a segment is identified by the byte offset
128 ** of the last byte in its record.
129 **
130 ** Unlike uncompressed pages, compressed page records may span blocks.
131 **
132 ** Sometimes, in order to avoid touching sectors that contain synced data
133 ** when writing, it is necessary to insert unused space between compressed
134 ** page records. This can be done as follows:
135 **
136 **     * For less than 6 bytes of empty space, the first and last byte
137 **       of the free space contain the total number of free bytes. For
138 **       example:
139 **
140 **         Block of 4 free bytes: 0x04 0x?? 0x?? 0x04
141 **         Block of 2 free bytes: 0x02 0x02
142 **         A single free byte:    0x01
143 **
144 **     * For 6 or more bytes of empty space, a record similar to a
145 **       compressed page record is added to the segment. A padding record
146 **       is distinguished from a compressed page record by the most
147 **       significant bit of the second byte of the size field, which is
148 **       cleared instead of set.
149 */
150 #include "lsmInt.h"
151 
152 #include <sys/types.h>
153 #include <sys/stat.h>
154 #include <fcntl.h>
155 
156 /*
157 ** File-system object. Each database connection allocates a single instance
158 ** of the following structure. It is used for all access to the database and
159 ** log files.
160 **
161 ** The database file may be accessed via two methods - using mmap() or using
162 ** read() and write() calls. In the general case both methods are used - a
163 ** prefix of the file is mapped into memory and the remainder accessed using
164 ** read() and write(). This is helpful when accessing very large files (or
165 ** files that may grow very large during the lifetime of a database
166 ** connection) on systems with 32-bit address spaces. However, it also requires
167 ** that this object manage two distinct types of Page objects simultaneously -
168 ** those that carry pointers to the mapped file and those that carry arrays
169 ** populated by read() calls.
170 **
171 ** pFree:
172 **   The head of a singly-linked list that containing currently unused Page
173 **   structures suitable for use as mmap-page handles. Connected by the
174 **   Page.pFreeNext pointers.
175 **
176 ** pMapped:
177 **   The head of a singly-linked list that contains all pages that currently
178 **   carry pointers to the mapped region. This is used if the region is
179 **   every remapped - the pointers carried by existing pages can be adjusted
180 **   to account for the remapping. Connected by the Page.pMappedNext pointers.
181 **
182 ** pWaiting:
183 **   When the upper layer wishes to append a new b-tree page to a segment,
184 **   it allocates a Page object that carries a malloc'd block of memory -
185 **   regardless of the mmap-related configuration. The page is not assigned
186 **   a page number at first. When the upper layer has finished constructing
187 **   the page contents, it calls lsmFsPagePersist() to assign a page number
188 **   to it. At this point it is likely that N pages have been written to the
189 **   segment, the (N+1)th page is still outstanding and the b-tree page is
190 **   assigned page number (N+2). To avoid writing page (N+2) before page
191 **   (N+1), the recently completed b-tree page is held in the singly linked
192 **   list headed by pWaiting until page (N+1) has been written.
193 **
194 **   Function lsmFsFlushWaiting() is responsible for eventually writing
195 **   waiting pages to disk.
196 **
197 ** apHash/nHash:
198 **   Hash table used to store all Page objects that carry malloc'd arrays,
199 **   except those b-tree pages that have not yet been assigned page numbers.
200 **   Once they have been assigned page numbers - they are added to this
201 **   hash table.
202 **
203 **   Hash table overflow chains are connected using the Page.pHashNext
204 **   pointers.
205 **
206 ** pLruFirst, pLruLast:
207 **   The first and last entries in a doubly-linked list of pages. This
208 **   list contains all pages with malloc'd data that are present in the
209 **   hash table and have a ref-count of zero.
210 */
211 struct FileSystem {
212   lsm_db *pDb;                    /* Database handle that owns this object */
213   lsm_env *pEnv;                  /* Environment pointer */
214   char *zDb;                      /* Database file name */
215   char *zLog;                     /* Database file name */
216   int nMetasize;                  /* Size of meta pages in bytes */
217   int nMetaRwSize;                /* Read/written size of meta pages in bytes */
218   int nPagesize;                  /* Database page-size in bytes */
219   int nBlocksize;                 /* Database block-size in bytes */
220 
221   /* r/w file descriptors for both files. */
222   LsmFile *pLsmFile;              /* Used after lsm_close() to link into list */
223   lsm_file *fdDb;                 /* Database file */
224   lsm_file *fdLog;                /* Log file */
225   int szSector;                   /* Database file sector size */
226 
227   /* If this is a compressed database, a pointer to the compression methods.
228   ** For an uncompressed database, a NULL pointer.  */
229   lsm_compress *pCompress;
230   u8 *aIBuffer;                   /* Buffer to compress to */
231   u8 *aOBuffer;                   /* Buffer to uncompress from */
232   int nBuffer;                    /* Allocated size of above buffers in bytes */
233 
234   /* mmap() page related things */
235   i64 nMapLimit;                  /* Maximum bytes of file to map */
236   void *pMap;                     /* Current mapping of database file */
237   i64 nMap;                       /* Bytes mapped at pMap */
238   Page *pFree;                    /* Unused Page structures */
239   Page *pMapped;                  /* List of Page structs that point to pMap */
240 
241   /* Page cache parameters for non-mmap() pages */
242   int nCacheMax;                  /* Configured cache size (in pages) */
243   int nCacheAlloc;                /* Current cache size (in pages) */
244   Page *pLruFirst;                /* Head of the LRU list */
245   Page *pLruLast;                 /* Tail of the LRU list */
246   int nHash;                      /* Number of hash slots in hash table */
247   Page **apHash;                  /* nHash Hash slots */
248   Page *pWaiting;                 /* b-tree pages waiting to be written */
249 
250   /* Statistics */
251   int nOut;                       /* Number of outstanding pages */
252   int nWrite;                     /* Total number of pages written */
253   int nRead;                      /* Total number of pages read */
254 };
255 
256 /*
257 ** Database page handle.
258 **
259 ** pSeg:
260 **   When lsmFsSortedAppend() is called on a compressed database, the new
261 **   page is not assigned a page number or location in the database file
262 **   immediately. Instead, these are assigned by the lsmFsPagePersist() call
263 **   right before it writes the compressed page image to disk.
264 **
265 **   The lsmFsSortedAppend() function sets the pSeg pointer to point to the
266 **   segment that the new page will be a part of. It is unset by
267 **   lsmFsPagePersist() after the page is written to disk.
268 */
269 struct Page {
270   u8 *aData;                      /* Buffer containing page data */
271   int nData;                      /* Bytes of usable data at aData[] */
272   LsmPgno iPg;                    /* Page number */
273   int nRef;                       /* Number of outstanding references */
274   int flags;                      /* Combination of PAGE_XXX flags */
275   Page *pHashNext;                /* Next page in hash table slot */
276   Page *pLruNext;                 /* Next page in LRU list */
277   Page *pLruPrev;                 /* Previous page in LRU list */
278   FileSystem *pFS;                /* File system that owns this page */
279 
280   /* Only used in compressed database mode: */
281   int nCompress;                  /* Compressed size (or 0 for uncomp. db) */
282   int nCompressPrev;              /* Compressed size of prev page */
283   Segment *pSeg;                  /* Segment this page will be written to */
284 
285   /* Pointers for singly linked lists */
286   Page *pWaitingNext;             /* Next page in FileSystem.pWaiting list */
287   Page *pFreeNext;                /* Next page in FileSystem.pFree list */
288   Page *pMappedNext;              /* Next page in FileSystem.pMapped list */
289 };
290 
291 /*
292 ** Meta-data page handle. There are two meta-data pages at the start of
293 ** the database file, each FileSystem.nMetasize bytes in size.
294 */
295 struct MetaPage {
296   int iPg;                        /* Either 1 or 2 */
297   int bWrite;                     /* Write back to db file on release */
298   u8 *aData;                      /* Pointer to buffer */
299   FileSystem *pFS;                /* FileSystem that owns this page */
300 };
301 
302 /*
303 ** Values for LsmPage.flags
304 */
305 #define PAGE_DIRTY   0x00000001   /* Set if page is dirty */
306 #define PAGE_FREE    0x00000002   /* Set if Page.aData requires lsmFree() */
307 #define PAGE_HASPREV 0x00000004   /* Set if page is first on uncomp. block */
308 
309 /*
310 ** Number of pgsz byte pages omitted from the start of block 1. The start
311 ** of block 1 contains two 4096 byte meta pages (8192 bytes in total).
312 */
313 #define BLOCK1_HDR_SIZE(pgsz)  LSM_MAX(1, 8192/(pgsz))
314 
315 /*
316 ** If NDEBUG is not defined, set a breakpoint in function lsmIoerrBkpt()
317 ** to catch IO errors (any error returned by a VFS method).
318 */
319 #ifndef NDEBUG
lsmIoerrBkpt(void)320 static void lsmIoerrBkpt(void){
321   static int nErr = 0;
322   nErr++;
323 }
IOERR_WRAPPER(int rc)324 static int IOERR_WRAPPER(int rc){
325   if( rc!=LSM_OK ) lsmIoerrBkpt();
326   return rc;
327 }
328 #else
329 # define IOERR_WRAPPER(rc) (rc)
330 #endif
331 
332 #ifdef NDEBUG
333 # define assert_lists_are_ok(x)
334 #else
335 static Page *fsPageFindInHash(FileSystem *pFS, LsmPgno iPg, int *piHash);
336 
assert_lists_are_ok(FileSystem * pFS)337 static void assert_lists_are_ok(FileSystem *pFS){
338 #if 0
339   Page *p;
340 
341   assert( pFS->nMapLimit>=0 );
342 
343   /* Check that all pages in the LRU list have nRef==0, pointers to buffers
344   ** in heap memory, and corresponding entries in the hash table.  */
345   for(p=pFS->pLruFirst; p; p=p->pLruNext){
346     assert( p==pFS->pLruFirst || p->pLruPrev!=0 );
347     assert( p==pFS->pLruLast || p->pLruNext!=0 );
348     assert( p->pLruPrev==0 || p->pLruPrev->pLruNext==p );
349     assert( p->pLruNext==0 || p->pLruNext->pLruPrev==p );
350     assert( p->nRef==0 );
351     assert( p->flags & PAGE_FREE );
352     assert( p==fsPageFindInHash(pFS, p->iPg, 0) );
353   }
354 #endif
355 }
356 #endif
357 
358 /*
359 ** Wrappers around the VFS methods of the lsm_env object:
360 **
361 **     lsmEnvOpen()
362 **     lsmEnvRead()
363 **     lsmEnvWrite()
364 **     lsmEnvSync()
365 **     lsmEnvSectorSize()
366 **     lsmEnvClose()
367 **     lsmEnvTruncate()
368 **     lsmEnvUnlink()
369 **     lsmEnvRemap()
370 */
lsmEnvOpen(lsm_env * pEnv,const char * zFile,int flags,lsm_file ** ppNew)371 int lsmEnvOpen(lsm_env *pEnv, const char *zFile, int flags, lsm_file **ppNew){
372   return pEnv->xOpen(pEnv, zFile, flags, ppNew);
373 }
374 
lsmEnvRead(lsm_env * pEnv,lsm_file * pFile,lsm_i64 iOff,void * pRead,int nRead)375 static int lsmEnvRead(
376   lsm_env *pEnv,
377   lsm_file *pFile,
378   lsm_i64 iOff,
379   void *pRead,
380   int nRead
381 ){
382   return IOERR_WRAPPER( pEnv->xRead(pFile, iOff, pRead, nRead) );
383 }
384 
lsmEnvWrite(lsm_env * pEnv,lsm_file * pFile,lsm_i64 iOff,const void * pWrite,int nWrite)385 static int lsmEnvWrite(
386   lsm_env *pEnv,
387   lsm_file *pFile,
388   lsm_i64 iOff,
389   const void *pWrite,
390   int nWrite
391 ){
392   return IOERR_WRAPPER( pEnv->xWrite(pFile, iOff, (void *)pWrite, nWrite) );
393 }
394 
lsmEnvSync(lsm_env * pEnv,lsm_file * pFile)395 static int lsmEnvSync(lsm_env *pEnv, lsm_file *pFile){
396   return IOERR_WRAPPER( pEnv->xSync(pFile) );
397 }
398 
lsmEnvSectorSize(lsm_env * pEnv,lsm_file * pFile)399 static int lsmEnvSectorSize(lsm_env *pEnv, lsm_file *pFile){
400   return pEnv->xSectorSize(pFile);
401 }
402 
lsmEnvClose(lsm_env * pEnv,lsm_file * pFile)403 int lsmEnvClose(lsm_env *pEnv, lsm_file *pFile){
404   return IOERR_WRAPPER( pEnv->xClose(pFile) );
405 }
406 
lsmEnvTruncate(lsm_env * pEnv,lsm_file * pFile,lsm_i64 nByte)407 static int lsmEnvTruncate(lsm_env *pEnv, lsm_file *pFile, lsm_i64 nByte){
408   return IOERR_WRAPPER( pEnv->xTruncate(pFile, nByte) );
409 }
410 
lsmEnvUnlink(lsm_env * pEnv,const char * zDel)411 static int lsmEnvUnlink(lsm_env *pEnv, const char *zDel){
412   return IOERR_WRAPPER( pEnv->xUnlink(pEnv, zDel) );
413 }
414 
lsmEnvRemap(lsm_env * pEnv,lsm_file * pFile,i64 szMin,void ** ppMap,i64 * pszMap)415 static int lsmEnvRemap(
416   lsm_env *pEnv,
417   lsm_file *pFile,
418   i64 szMin,
419   void **ppMap,
420   i64 *pszMap
421 ){
422   return pEnv->xRemap(pFile, szMin, ppMap, pszMap);
423 }
424 
lsmEnvLock(lsm_env * pEnv,lsm_file * pFile,int iLock,int eLock)425 int lsmEnvLock(lsm_env *pEnv, lsm_file *pFile, int iLock, int eLock){
426   if( pFile==0 ) return LSM_OK;
427   return pEnv->xLock(pFile, iLock, eLock);
428 }
429 
lsmEnvTestLock(lsm_env * pEnv,lsm_file * pFile,int iLock,int nLock,int eLock)430 int lsmEnvTestLock(
431   lsm_env *pEnv,
432   lsm_file *pFile,
433   int iLock,
434   int nLock,
435   int eLock
436 ){
437   return pEnv->xTestLock(pFile, iLock, nLock, eLock);
438 }
439 
lsmEnvShmMap(lsm_env * pEnv,lsm_file * pFile,int iChunk,int sz,void ** ppOut)440 int lsmEnvShmMap(
441   lsm_env *pEnv,
442   lsm_file *pFile,
443   int iChunk,
444   int sz,
445   void **ppOut
446 ){
447   return pEnv->xShmMap(pFile, iChunk, sz, ppOut);
448 }
449 
lsmEnvShmBarrier(lsm_env * pEnv)450 void lsmEnvShmBarrier(lsm_env *pEnv){
451   pEnv->xShmBarrier();
452 }
453 
lsmEnvShmUnmap(lsm_env * pEnv,lsm_file * pFile,int bDel)454 void lsmEnvShmUnmap(lsm_env *pEnv, lsm_file *pFile, int bDel){
455   pEnv->xShmUnmap(pFile, bDel);
456 }
457 
lsmEnvSleep(lsm_env * pEnv,int nUs)458 void lsmEnvSleep(lsm_env *pEnv, int nUs){
459   pEnv->xSleep(pEnv, nUs);
460 }
461 
462 
463 /*
464 ** Write the contents of string buffer pStr into the log file, starting at
465 ** offset iOff.
466 */
lsmFsWriteLog(FileSystem * pFS,i64 iOff,LsmString * pStr)467 int lsmFsWriteLog(FileSystem *pFS, i64 iOff, LsmString *pStr){
468   assert( pFS->fdLog );
469   return lsmEnvWrite(pFS->pEnv, pFS->fdLog, iOff, pStr->z, pStr->n);
470 }
471 
472 /*
473 ** fsync() the log file.
474 */
lsmFsSyncLog(FileSystem * pFS)475 int lsmFsSyncLog(FileSystem *pFS){
476   assert( pFS->fdLog );
477   return lsmEnvSync(pFS->pEnv, pFS->fdLog);
478 }
479 
480 /*
481 ** Read nRead bytes of data starting at offset iOff of the log file. Append
482 ** the results to string buffer pStr.
483 */
lsmFsReadLog(FileSystem * pFS,i64 iOff,int nRead,LsmString * pStr)484 int lsmFsReadLog(FileSystem *pFS, i64 iOff, int nRead, LsmString *pStr){
485   int rc;                         /* Return code */
486   assert( pFS->fdLog );
487   rc = lsmStringExtend(pStr, nRead);
488   if( rc==LSM_OK ){
489     rc = lsmEnvRead(pFS->pEnv, pFS->fdLog, iOff, &pStr->z[pStr->n], nRead);
490     pStr->n += nRead;
491   }
492   return rc;
493 }
494 
495 /*
496 ** Truncate the log file to nByte bytes in size.
497 */
lsmFsTruncateLog(FileSystem * pFS,i64 nByte)498 int lsmFsTruncateLog(FileSystem *pFS, i64 nByte){
499   if( pFS->fdLog==0 ) return LSM_OK;
500   return lsmEnvTruncate(pFS->pEnv, pFS->fdLog, nByte);
501 }
502 
503 /*
504 ** Truncate the db file to nByte bytes in size.
505 */
lsmFsTruncateDb(FileSystem * pFS,i64 nByte)506 int lsmFsTruncateDb(FileSystem *pFS, i64 nByte){
507   if( pFS->fdDb==0 ) return LSM_OK;
508   return lsmEnvTruncate(pFS->pEnv, pFS->fdDb, nByte);
509 }
510 
511 /*
512 ** Close the log file. Then delete it from the file-system. This function
513 ** is called during database shutdown only.
514 */
lsmFsCloseAndDeleteLog(FileSystem * pFS)515 int lsmFsCloseAndDeleteLog(FileSystem *pFS){
516   char *zDel;
517 
518   if( pFS->fdLog ){
519     lsmEnvClose(pFS->pEnv, pFS->fdLog );
520     pFS->fdLog = 0;
521   }
522 
523   zDel = lsmMallocPrintf(pFS->pEnv, "%s-log", pFS->zDb);
524   if( zDel ){
525     lsmEnvUnlink(pFS->pEnv, zDel);
526     lsmFree(pFS->pEnv, zDel);
527   }
528   return LSM_OK;
529 }
530 
531 /*
532 ** Return true if page iReal of the database should be accessed using mmap.
533 ** False otherwise.
534 */
fsMmapPage(FileSystem * pFS,LsmPgno iReal)535 static int fsMmapPage(FileSystem *pFS, LsmPgno iReal){
536   return ((i64)iReal*pFS->nPagesize <= pFS->nMapLimit);
537 }
538 
539 /*
540 ** Given that there are currently nHash slots in the hash table, return
541 ** the hash key for file iFile, page iPg.
542 */
fsHashKey(int nHash,LsmPgno iPg)543 static int fsHashKey(int nHash, LsmPgno iPg){
544   return (iPg % nHash);
545 }
546 
547 /*
548 ** This is a helper function for lsmFsOpen(). It opens a single file on
549 ** disk (either the database or log file).
550 */
fsOpenFile(FileSystem * pFS,int bReadonly,int bLog,int * pRc)551 static lsm_file *fsOpenFile(
552   FileSystem *pFS,                /* File system object */
553   int bReadonly,                  /* True to open this file read-only */
554   int bLog,                       /* True for log, false for db */
555   int *pRc                        /* IN/OUT: Error code */
556 ){
557   lsm_file *pFile = 0;
558   if( *pRc==LSM_OK ){
559     int flags = (bReadonly ? LSM_OPEN_READONLY : 0);
560     const char *zPath = (bLog ? pFS->zLog : pFS->zDb);
561 
562     *pRc = lsmEnvOpen(pFS->pEnv, zPath, flags, &pFile);
563   }
564   return pFile;
565 }
566 
567 /*
568 ** If it is not already open, this function opens the log file. It returns
569 ** LSM_OK if successful (or if the log file was already open) or an LSM
570 ** error code otherwise.
571 **
572 ** The log file must be opened before any of the following may be called:
573 **
574 **     lsmFsWriteLog
575 **     lsmFsSyncLog
576 **     lsmFsReadLog
577 */
lsmFsOpenLog(lsm_db * db,int * pbOpen)578 int lsmFsOpenLog(lsm_db *db, int *pbOpen){
579   int rc = LSM_OK;
580   FileSystem *pFS = db->pFS;
581 
582   if( 0==pFS->fdLog ){
583     pFS->fdLog = fsOpenFile(pFS, db->bReadonly, 1, &rc);
584 
585     if( rc==LSM_IOERR_NOENT && db->bReadonly ){
586       rc = LSM_OK;
587     }
588   }
589 
590   if( pbOpen ) *pbOpen = (pFS->fdLog!=0);
591   return rc;
592 }
593 
594 /*
595 ** Close the log file, if it is open.
596 */
lsmFsCloseLog(lsm_db * db)597 void lsmFsCloseLog(lsm_db *db){
598   FileSystem *pFS = db->pFS;
599   if( pFS->fdLog ){
600     lsmEnvClose(pFS->pEnv, pFS->fdLog);
601     pFS->fdLog = 0;
602   }
603 }
604 
605 /*
606 ** Open a connection to a database stored within the file-system.
607 **
608 ** If parameter bReadonly is true, then open a read-only file-descriptor
609 ** on the database file. It is possible that bReadonly will be false even
610 ** if the user requested that pDb be opened read-only. This is because the
611 ** file-descriptor may later on be recycled by a read-write connection.
612 ** If the db file can be opened for read-write access, it always is. Parameter
613 ** bReadonly is only ever true if it has already been determined that the
614 ** db can only be opened for read-only access.
615 **
616 ** Return LSM_OK if successful or an lsm error code otherwise.
617 */
lsmFsOpen(lsm_db * pDb,const char * zDb,int bReadonly)618 int lsmFsOpen(
619   lsm_db *pDb,                    /* Database connection to open fd for */
620   const char *zDb,                /* Full path to database file */
621   int bReadonly                   /* True to open db file read-only */
622 ){
623   FileSystem *pFS;
624   int rc = LSM_OK;
625   int nDb = strlen(zDb);
626   int nByte;
627 
628   assert( pDb->pFS==0 );
629   assert( pDb->pWorker==0 && pDb->pClient==0 );
630 
631   nByte = sizeof(FileSystem) + nDb+1 + nDb+4+1;
632   pFS = (FileSystem *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
633   if( pFS ){
634     LsmFile *pLsmFile;
635     pFS->zDb = (char *)&pFS[1];
636     pFS->zLog = &pFS->zDb[nDb+1];
637     pFS->nPagesize = LSM_DFLT_PAGE_SIZE;
638     pFS->nBlocksize = LSM_DFLT_BLOCK_SIZE;
639     pFS->nMetasize = LSM_META_PAGE_SIZE;
640     pFS->nMetaRwSize = LSM_META_RW_PAGE_SIZE;
641     pFS->pDb = pDb;
642     pFS->pEnv = pDb->pEnv;
643 
644     /* Make a copy of the database and log file names. */
645     memcpy(pFS->zDb, zDb, nDb+1);
646     memcpy(pFS->zLog, zDb, nDb);
647     memcpy(&pFS->zLog[nDb], "-log", 5);
648 
649     /* Allocate the hash-table here. At some point, it should be changed
650     ** so that it can grow dynamicly. */
651     pFS->nCacheMax = 2048*1024 / pFS->nPagesize;
652     pFS->nHash = 4096;
653     pFS->apHash = lsmMallocZeroRc(pDb->pEnv, sizeof(Page *) * pFS->nHash, &rc);
654 
655     /* Open the database file */
656     pLsmFile = lsmDbRecycleFd(pDb);
657     if( pLsmFile ){
658       pFS->pLsmFile = pLsmFile;
659       pFS->fdDb = pLsmFile->pFile;
660       memset(pLsmFile, 0, sizeof(LsmFile));
661     }else{
662       pFS->pLsmFile = lsmMallocZeroRc(pDb->pEnv, sizeof(LsmFile), &rc);
663       if( rc==LSM_OK ){
664         pFS->fdDb = fsOpenFile(pFS, bReadonly, 0, &rc);
665       }
666     }
667 
668     if( rc!=LSM_OK ){
669       lsmFsClose(pFS);
670       pFS = 0;
671     }else{
672       pFS->szSector = lsmEnvSectorSize(pFS->pEnv, pFS->fdDb);
673     }
674   }
675 
676   pDb->pFS = pFS;
677   return rc;
678 }
679 
680 /*
681 ** Configure the file-system object according to the current values of
682 ** the LSM_CONFIG_MMAP and LSM_CONFIG_SET_COMPRESSION options.
683 */
lsmFsConfigure(lsm_db * db)684 int lsmFsConfigure(lsm_db *db){
685   FileSystem *pFS = db->pFS;
686   if( pFS ){
687     lsm_env *pEnv = pFS->pEnv;
688     Page *pPg;
689 
690     assert( pFS->nOut==0 );
691     assert( pFS->pWaiting==0 );
692     assert( pFS->pMapped==0 );
693 
694     /* Reset any compression/decompression buffers already allocated */
695     lsmFree(pEnv, pFS->aIBuffer);
696     lsmFree(pEnv, pFS->aOBuffer);
697     pFS->nBuffer = 0;
698 
699     /* Unmap the file, if it is currently mapped */
700     if( pFS->pMap ){
701       lsmEnvRemap(pEnv, pFS->fdDb, -1, &pFS->pMap, &pFS->nMap);
702       pFS->nMapLimit = 0;
703     }
704 
705     /* Free all allocated page structures */
706     pPg = pFS->pLruFirst;
707     while( pPg ){
708       Page *pNext = pPg->pLruNext;
709       assert( pPg->flags & PAGE_FREE );
710       lsmFree(pEnv, pPg->aData);
711       lsmFree(pEnv, pPg);
712       pPg = pNext;
713     }
714 
715     pPg = pFS->pFree;
716     while( pPg ){
717       Page *pNext = pPg->pFreeNext;
718       lsmFree(pEnv, pPg);
719       pPg = pNext;
720     }
721 
722     /* Zero pointers that point to deleted page objects */
723     pFS->nCacheAlloc = 0;
724     pFS->pLruFirst = 0;
725     pFS->pLruLast = 0;
726     pFS->pFree = 0;
727     if( pFS->apHash ){
728       memset(pFS->apHash, 0, pFS->nHash*sizeof(pFS->apHash[0]));
729     }
730 
731     /* Configure the FileSystem object */
732     if( db->compress.xCompress ){
733       pFS->pCompress = &db->compress;
734       pFS->nMapLimit = 0;
735     }else{
736       pFS->pCompress = 0;
737       if( db->iMmap==1 ){
738         /* Unlimited */
739         pFS->nMapLimit = (i64)1 << 60;
740       }else{
741         /* iMmap is a limit in KB. Set nMapLimit to the same value in bytes. */
742         pFS->nMapLimit = (i64)db->iMmap * 1024;
743       }
744     }
745   }
746 
747   return LSM_OK;
748 }
749 
750 /*
751 ** Close and destroy a FileSystem object.
752 */
lsmFsClose(FileSystem * pFS)753 void lsmFsClose(FileSystem *pFS){
754   if( pFS ){
755     Page *pPg;
756     lsm_env *pEnv = pFS->pEnv;
757 
758     assert( pFS->nOut==0 );
759     pPg = pFS->pLruFirst;
760     while( pPg ){
761       Page *pNext = pPg->pLruNext;
762       if( pPg->flags & PAGE_FREE ) lsmFree(pEnv, pPg->aData);
763       lsmFree(pEnv, pPg);
764       pPg = pNext;
765     }
766 
767     pPg = pFS->pFree;
768     while( pPg ){
769       Page *pNext = pPg->pFreeNext;
770       if( pPg->flags & PAGE_FREE ) lsmFree(pEnv, pPg->aData);
771       lsmFree(pEnv, pPg);
772       pPg = pNext;
773     }
774 
775     if( pFS->fdDb ) lsmEnvClose(pFS->pEnv, pFS->fdDb );
776     if( pFS->fdLog ) lsmEnvClose(pFS->pEnv, pFS->fdLog );
777     lsmFree(pEnv, pFS->pLsmFile);
778     lsmFree(pEnv, pFS->apHash);
779     lsmFree(pEnv, pFS->aIBuffer);
780     lsmFree(pEnv, pFS->aOBuffer);
781     lsmFree(pEnv, pFS);
782   }
783 }
784 
785 /*
786 ** This function is called when closing a database handle (i.e. lsm_close())
787 ** if there exist other connections to the same database within this process.
788 ** In that case the file-descriptor open on the database file is not closed
789 ** when the FileSystem object is destroyed, as this would cause any POSIX
790 ** locks held by the other connections to be silently dropped (see "man close"
791 ** for details). Instead, the file-descriptor is stored in a list by the
792 ** lsm_shared.c module until it is either closed or reused.
793 **
794 ** This function returns a pointer to an object that can be linked into
795 ** the list described above. The returned object now 'owns' the database
796 ** file descriptr, so that when the FileSystem object is destroyed, it
797 ** will not be closed.
798 **
799 ** This function may be called at most once in the life-time of a
800 ** FileSystem object. The results of any operations involving the database
801 ** file descriptor are undefined once this function has been called.
802 **
803 ** None of this is necessary on non-POSIX systems. But we do it anyway in
804 ** the name of using as similar code as possible on all platforms.
805 */
lsmFsDeferClose(FileSystem * pFS)806 LsmFile *lsmFsDeferClose(FileSystem *pFS){
807   LsmFile *p = pFS->pLsmFile;
808   assert( p->pNext==0 );
809   p->pFile = pFS->fdDb;
810   pFS->fdDb = 0;
811   pFS->pLsmFile = 0;
812   return p;
813 }
814 
815 /*
816 ** Allocate a buffer and populate it with the output of the xFileid()
817 ** method of the database file handle. If successful, set *ppId to point
818 ** to the buffer and *pnId to the number of bytes in the buffer and return
819 ** LSM_OK. Otherwise, set *ppId and *pnId to zero and return an LSM
820 ** error code.
821 */
lsmFsFileid(lsm_db * pDb,void ** ppId,int * pnId)822 int lsmFsFileid(lsm_db *pDb, void **ppId, int *pnId){
823   lsm_env *pEnv = pDb->pEnv;
824   FileSystem *pFS = pDb->pFS;
825   int rc;
826   int nId = 0;
827   void *pId;
828 
829   rc = pEnv->xFileid(pFS->fdDb, 0, &nId);
830   pId = lsmMallocZeroRc(pEnv, nId, &rc);
831   if( rc==LSM_OK ) rc = pEnv->xFileid(pFS->fdDb, pId, &nId);
832 
833   if( rc!=LSM_OK ){
834     lsmFree(pEnv, pId);
835     pId = 0;
836     nId = 0;
837   }
838 
839   *ppId = pId;
840   *pnId = nId;
841   return rc;
842 }
843 
844 /*
845 ** Return the nominal page-size used by this file-system. Actual pages
846 ** may be smaller or larger than this value.
847 */
lsmFsPageSize(FileSystem * pFS)848 int lsmFsPageSize(FileSystem *pFS){
849   return pFS->nPagesize;
850 }
851 
852 /*
853 ** Return the block-size used by this file-system.
854 */
lsmFsBlockSize(FileSystem * pFS)855 int lsmFsBlockSize(FileSystem *pFS){
856   return pFS->nBlocksize;
857 }
858 
859 /*
860 ** Configure the nominal page-size used by this file-system. Actual
861 ** pages may be smaller or larger than this value.
862 */
lsmFsSetPageSize(FileSystem * pFS,int nPgsz)863 void lsmFsSetPageSize(FileSystem *pFS, int nPgsz){
864   pFS->nPagesize = nPgsz;
865   pFS->nCacheMax = 2048*1024 / pFS->nPagesize;
866 }
867 
868 /*
869 ** Configure the block-size used by this file-system.
870 */
lsmFsSetBlockSize(FileSystem * pFS,int nBlocksize)871 void lsmFsSetBlockSize(FileSystem *pFS, int nBlocksize){
872   pFS->nBlocksize = nBlocksize;
873 }
874 
875 /*
876 ** Return the page number of the first page on block iBlock. Blocks are
877 ** numbered starting from 1.
878 **
879 ** For a compressed database, page numbers are byte offsets. The first
880 ** page on each block is the byte offset immediately following the 4-byte
881 ** "previous block" pointer at the start of each block.
882 */
fsFirstPageOnBlock(FileSystem * pFS,int iBlock)883 static LsmPgno fsFirstPageOnBlock(FileSystem *pFS, int iBlock){
884   LsmPgno iPg;
885   if( pFS->pCompress ){
886     if( iBlock==1 ){
887       iPg = pFS->nMetasize * 2 + 4;
888     }else{
889       iPg = pFS->nBlocksize * (LsmPgno)(iBlock-1) + 4;
890     }
891   }else{
892     const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
893     if( iBlock==1 ){
894       iPg = 1 + ((pFS->nMetasize*2 + pFS->nPagesize - 1) / pFS->nPagesize);
895     }else{
896       iPg = 1 + (iBlock-1) * nPagePerBlock;
897     }
898   }
899   return iPg;
900 }
901 
902 /*
903 ** Return the page number of the last page on block iBlock. Blocks are
904 ** numbered starting from 1.
905 **
906 ** For a compressed database, page numbers are byte offsets. The first
907 ** page on each block is the byte offset of the byte immediately before
908 ** the 4-byte "next block" pointer at the end of each block.
909 */
fsLastPageOnBlock(FileSystem * pFS,int iBlock)910 static LsmPgno fsLastPageOnBlock(FileSystem *pFS, int iBlock){
911   if( pFS->pCompress ){
912     return pFS->nBlocksize * (LsmPgno)iBlock - 1 - 4;
913   }else{
914     const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
915     return iBlock * nPagePerBlock;
916   }
917 }
918 
919 /*
920 ** Return the block number of the block that page iPg is located on.
921 ** Blocks are numbered starting from 1.
922 */
fsPageToBlock(FileSystem * pFS,LsmPgno iPg)923 static int fsPageToBlock(FileSystem *pFS, LsmPgno iPg){
924   if( pFS->pCompress ){
925     return (int)((iPg / pFS->nBlocksize) + 1);
926   }else{
927     return (int)(1 + ((iPg-1) / (pFS->nBlocksize / pFS->nPagesize)));
928   }
929 }
930 
931 /*
932 ** Return true if page iPg is the last page on its block.
933 **
934 ** This function is only called in non-compressed database mode.
935 */
fsIsLast(FileSystem * pFS,LsmPgno iPg)936 static int fsIsLast(FileSystem *pFS, LsmPgno iPg){
937   const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
938   assert( !pFS->pCompress );
939   return ( iPg && (iPg % nPagePerBlock)==0 );
940 }
941 
942 /*
943 ** Return true if page iPg is the first page on its block.
944 **
945 ** This function is only called in non-compressed database mode.
946 */
fsIsFirst(FileSystem * pFS,LsmPgno iPg)947 static int fsIsFirst(FileSystem *pFS, LsmPgno iPg){
948   const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
949   assert( !pFS->pCompress );
950   return ( (iPg % nPagePerBlock)==1
951         || (iPg<nPagePerBlock && iPg==fsFirstPageOnBlock(pFS, 1))
952   );
953 }
954 
955 /*
956 ** Given a page reference, return a pointer to the buffer containing the
957 ** pages contents. If parameter pnData is not NULL, set *pnData to the size
958 ** of the buffer in bytes before returning.
959 */
lsmFsPageData(Page * pPage,int * pnData)960 u8 *lsmFsPageData(Page *pPage, int *pnData){
961   if( pnData ){
962     *pnData = pPage->nData;
963   }
964   return pPage->aData;
965 }
966 
967 /*
968 ** Return the page number of a page.
969 */
lsmFsPageNumber(Page * pPage)970 LsmPgno lsmFsPageNumber(Page *pPage){
971   /* assert( (pPage->flags & PAGE_DIRTY)==0 ); */
972   return pPage ? pPage->iPg : 0;
973 }
974 
975 /*
976 ** Page pPg is currently part of the LRU list belonging to pFS. Remove
977 ** it from the list. pPg->pLruNext and pPg->pLruPrev are cleared by this
978 ** operation.
979 */
fsPageRemoveFromLru(FileSystem * pFS,Page * pPg)980 static void fsPageRemoveFromLru(FileSystem *pFS, Page *pPg){
981   assert( pPg->pLruNext || pPg==pFS->pLruLast );
982   assert( pPg->pLruPrev || pPg==pFS->pLruFirst );
983   if( pPg->pLruNext ){
984     pPg->pLruNext->pLruPrev = pPg->pLruPrev;
985   }else{
986     pFS->pLruLast = pPg->pLruPrev;
987   }
988   if( pPg->pLruPrev ){
989     pPg->pLruPrev->pLruNext = pPg->pLruNext;
990   }else{
991     pFS->pLruFirst = pPg->pLruNext;
992   }
993   pPg->pLruPrev = 0;
994   pPg->pLruNext = 0;
995 }
996 
997 /*
998 ** Page pPg is not currently part of the LRU list belonging to pFS. Add it.
999 */
fsPageAddToLru(FileSystem * pFS,Page * pPg)1000 static void fsPageAddToLru(FileSystem *pFS, Page *pPg){
1001   assert( pPg->pLruNext==0 && pPg->pLruPrev==0 );
1002   pPg->pLruPrev = pFS->pLruLast;
1003   if( pPg->pLruPrev ){
1004     pPg->pLruPrev->pLruNext = pPg;
1005   }else{
1006     pFS->pLruFirst = pPg;
1007   }
1008   pFS->pLruLast = pPg;
1009 }
1010 
1011 /*
1012 ** Page pPg is currently stored in the apHash/nHash hash table. Remove it.
1013 */
fsPageRemoveFromHash(FileSystem * pFS,Page * pPg)1014 static void fsPageRemoveFromHash(FileSystem *pFS, Page *pPg){
1015   int iHash;
1016   Page **pp;
1017 
1018   iHash = fsHashKey(pFS->nHash, pPg->iPg);
1019   for(pp=&pFS->apHash[iHash]; *pp!=pPg; pp=&(*pp)->pHashNext);
1020   *pp = pPg->pHashNext;
1021   pPg->pHashNext = 0;
1022 }
1023 
1024 /*
1025 ** Free a Page object allocated by fsPageBuffer().
1026 */
fsPageBufferFree(Page * pPg)1027 static void fsPageBufferFree(Page *pPg){
1028   pPg->pFS->nCacheAlloc--;
1029   lsmFree(pPg->pFS->pEnv, pPg->aData);
1030   lsmFree(pPg->pFS->pEnv, pPg);
1031 }
1032 
1033 
1034 /*
1035 ** Purge the cache of all non-mmap pages with nRef==0.
1036 */
lsmFsPurgeCache(FileSystem * pFS)1037 void lsmFsPurgeCache(FileSystem *pFS){
1038   Page *pPg;
1039 
1040   pPg = pFS->pLruFirst;
1041   while( pPg ){
1042     Page *pNext = pPg->pLruNext;
1043     assert( pPg->flags & PAGE_FREE );
1044     fsPageRemoveFromHash(pFS, pPg);
1045     fsPageBufferFree(pPg);
1046     pPg = pNext;
1047   }
1048   pFS->pLruFirst = 0;
1049   pFS->pLruLast = 0;
1050 
1051   assert( pFS->nCacheAlloc<=pFS->nOut && pFS->nCacheAlloc>=0 );
1052 }
1053 
1054 /*
1055 ** Search the hash-table for page iPg. If an entry is round, return a pointer
1056 ** to it. Otherwise, return NULL.
1057 **
1058 ** Either way, if argument piHash is not NULL set *piHash to the hash slot
1059 ** number that page iPg would be stored in before returning.
1060 */
fsPageFindInHash(FileSystem * pFS,LsmPgno iPg,int * piHash)1061 static Page *fsPageFindInHash(FileSystem *pFS, LsmPgno iPg, int *piHash){
1062   Page *p;                        /* Return value */
1063   int iHash = fsHashKey(pFS->nHash, iPg);
1064 
1065   if( piHash ) *piHash = iHash;
1066   for(p=pFS->apHash[iHash]; p; p=p->pHashNext){
1067     if( p->iPg==iPg) break;
1068   }
1069   return p;
1070 }
1071 
1072 /*
1073 ** Allocate and return a non-mmap Page object. If there are already
1074 ** nCacheMax such Page objects outstanding, try to recycle an existing
1075 ** Page instead.
1076 */
fsPageBuffer(FileSystem * pFS,Page ** ppOut)1077 static int fsPageBuffer(
1078   FileSystem *pFS,
1079   Page **ppOut
1080 ){
1081   int rc = LSM_OK;
1082   Page *pPage = 0;
1083   if( pFS->pLruFirst==0 || pFS->nCacheAlloc<pFS->nCacheMax ){
1084     /* Allocate a new Page object */
1085     pPage = lsmMallocZero(pFS->pEnv, sizeof(Page));
1086     if( !pPage ){
1087       rc = LSM_NOMEM_BKPT;
1088     }else{
1089       pPage->aData = (u8 *)lsmMalloc(pFS->pEnv, pFS->nPagesize);
1090       if( !pPage->aData ){
1091         lsmFree(pFS->pEnv, pPage);
1092         rc = LSM_NOMEM_BKPT;
1093         pPage = 0;
1094       }else{
1095         pFS->nCacheAlloc++;
1096       }
1097     }
1098   }else{
1099     /* Reuse an existing Page object */
1100     u8 *aData;
1101     pPage = pFS->pLruFirst;
1102     aData = pPage->aData;
1103     fsPageRemoveFromLru(pFS, pPage);
1104     fsPageRemoveFromHash(pFS, pPage);
1105 
1106     memset(pPage, 0, sizeof(Page));
1107     pPage->aData = aData;
1108   }
1109 
1110   if( pPage ){
1111     pPage->flags = PAGE_FREE;
1112   }
1113   *ppOut = pPage;
1114   return rc;
1115 }
1116 
1117 /*
1118 ** Assuming *pRc is initially LSM_OK, attempt to ensure that the
1119 ** memory-mapped region is at least iSz bytes in size. If it is not already,
1120 ** iSz bytes in size, extend it and update the pointers associated with any
1121 ** outstanding Page objects.
1122 **
1123 ** If *pRc is not LSM_OK when this function is called, it is a no-op.
1124 ** Otherwise, *pRc is set to an lsm error code if an error occurs, or
1125 ** left unmodified otherwise.
1126 **
1127 ** This function is never called in compressed database mode.
1128 */
fsGrowMapping(FileSystem * pFS,i64 iSz,int * pRc)1129 static void fsGrowMapping(
1130   FileSystem *pFS,                /* File system object */
1131   i64 iSz,                        /* Minimum size to extend mapping to */
1132   int *pRc                        /* IN/OUT: Error code */
1133 ){
1134   assert( pFS->pCompress==0 );
1135   assert( PAGE_HASPREV==4 );
1136 
1137   if( *pRc==LSM_OK && iSz>pFS->nMap ){
1138     int rc;
1139     u8 *aOld = pFS->pMap;
1140     rc = lsmEnvRemap(pFS->pEnv, pFS->fdDb, iSz, &pFS->pMap, &pFS->nMap);
1141     if( rc==LSM_OK && pFS->pMap!=aOld ){
1142       Page *pFix;
1143       i64 iOff = (u8 *)pFS->pMap - aOld;
1144       for(pFix=pFS->pMapped; pFix; pFix=pFix->pMappedNext){
1145         pFix->aData += iOff;
1146       }
1147       lsmSortedRemap(pFS->pDb);
1148     }
1149     *pRc = rc;
1150   }
1151 }
1152 
1153 /*
1154 ** If it is mapped, unmap the database file.
1155 */
lsmFsUnmap(FileSystem * pFS)1156 int lsmFsUnmap(FileSystem *pFS){
1157   int rc = LSM_OK;
1158   if( pFS ){
1159     rc = lsmEnvRemap(pFS->pEnv, pFS->fdDb, -1, &pFS->pMap, &pFS->nMap);
1160   }
1161   return rc;
1162 }
1163 
1164 /*
1165 ** fsync() the database file.
1166 */
lsmFsSyncDb(FileSystem * pFS,int nBlock)1167 int lsmFsSyncDb(FileSystem *pFS, int nBlock){
1168   return lsmEnvSync(pFS->pEnv, pFS->fdDb);
1169 }
1170 
1171 /*
1172 ** If block iBlk has been redirected according to the redirections in the
1173 ** object passed as the first argument, return the destination block to
1174 ** which it is redirected. Otherwise, return a copy of iBlk.
1175 */
fsRedirectBlock(Redirect * p,int iBlk)1176 static int fsRedirectBlock(Redirect *p, int iBlk){
1177   if( p ){
1178     int i;
1179     for(i=0; i<p->n; i++){
1180       if( iBlk==p->a[i].iFrom ) return p->a[i].iTo;
1181     }
1182   }
1183   assert( iBlk!=0 );
1184   return iBlk;
1185 }
1186 
1187 /*
1188 ** If page iPg has been redirected according to the redirections in the
1189 ** object passed as the second argument, return the destination page to
1190 ** which it is redirected. Otherwise, return a copy of iPg.
1191 */
lsmFsRedirectPage(FileSystem * pFS,Redirect * pRedir,LsmPgno iPg)1192 LsmPgno lsmFsRedirectPage(FileSystem *pFS, Redirect *pRedir, LsmPgno iPg){
1193   LsmPgno iReal = iPg;
1194 
1195   if( pRedir ){
1196     const int nPagePerBlock = (
1197         pFS->pCompress ? pFS->nBlocksize : (pFS->nBlocksize / pFS->nPagesize)
1198     );
1199     int iBlk = fsPageToBlock(pFS, iPg);
1200     int i;
1201     for(i=0; i<pRedir->n; i++){
1202       int iFrom = pRedir->a[i].iFrom;
1203       if( iFrom>iBlk ) break;
1204       if( iFrom==iBlk ){
1205         int iTo = pRedir->a[i].iTo;
1206         iReal = iPg - (LsmPgno)(iFrom - iTo) * nPagePerBlock;
1207         if( iTo==1 ){
1208           iReal += (fsFirstPageOnBlock(pFS, 1)-1);
1209         }
1210         break;
1211       }
1212     }
1213   }
1214 
1215   assert( iReal!=0 );
1216   return iReal;
1217 }
1218 
1219 /* Required by the circular fsBlockNext<->fsPageGet dependency. */
1220 static int fsPageGet(FileSystem *, Segment *, LsmPgno, int, Page **, int *);
1221 
1222 /*
1223 ** Parameter iBlock is a database file block. This function reads the value
1224 ** stored in the blocks "next block" pointer and stores it in *piNext.
1225 ** LSM_OK is returned if everything is successful, or an LSM error code
1226 ** otherwise.
1227 */
fsBlockNext(FileSystem * pFS,Segment * pSeg,int iBlock,int * piNext)1228 static int fsBlockNext(
1229   FileSystem *pFS,                /* File-system object handle */
1230   Segment *pSeg,                  /* Use this segment for block redirects */
1231   int iBlock,                     /* Read field from this block */
1232   int *piNext                     /* OUT: Next block in linked list */
1233 ){
1234   int rc;
1235   int iRead;                      /* Read block from here */
1236 
1237   if( pSeg ){
1238     iRead = fsRedirectBlock(pSeg->pRedirect, iBlock);
1239   }else{
1240     iRead = iBlock;
1241   }
1242 
1243   assert( pFS->nMapLimit==0 || pFS->pCompress==0 );
1244   if( pFS->pCompress ){
1245     i64 iOff;                     /* File offset to read data from */
1246     u8 aNext[4];                  /* 4-byte pointer read from db file */
1247 
1248     iOff = (i64)iRead * pFS->nBlocksize - sizeof(aNext);
1249     rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aNext, sizeof(aNext));
1250     if( rc==LSM_OK ){
1251       *piNext = (int)lsmGetU32(aNext);
1252     }
1253   }else{
1254     const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
1255     Page *pLast;
1256     rc = fsPageGet(pFS, 0, iRead*nPagePerBlock, 0, &pLast, 0);
1257     if( rc==LSM_OK ){
1258       *piNext = lsmGetU32(&pLast->aData[pFS->nPagesize-4]);
1259       lsmFsPageRelease(pLast);
1260     }
1261   }
1262 
1263   if( pSeg ){
1264     *piNext = fsRedirectBlock(pSeg->pRedirect, *piNext);
1265   }
1266   return rc;
1267 }
1268 
1269 /*
1270 ** Return the page number of the last page on the same block as page iPg.
1271 */
fsLastPageOnPagesBlock(FileSystem * pFS,LsmPgno iPg)1272 LsmPgno fsLastPageOnPagesBlock(FileSystem *pFS, LsmPgno iPg){
1273   return fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iPg));
1274 }
1275 
1276 /*
1277 ** Read nData bytes of data from offset iOff of the database file into
1278 ** buffer aData. If this means reading past the end of a block, follow
1279 ** the block pointer to the next block and continue reading.
1280 **
1281 ** Offset iOff is an absolute offset - not subject to any block redirection.
1282 ** However any block pointer followed is. Use pSeg->pRedirect in this case.
1283 **
1284 ** This function is only called in compressed database mode.
1285 */
fsReadData(FileSystem * pFS,Segment * pSeg,i64 iOff,u8 * aData,int nData)1286 static int fsReadData(
1287   FileSystem *pFS,                /* File-system handle */
1288   Segment *pSeg,                  /* Block redirection */
1289   i64 iOff,                       /* Read data from this offset */
1290   u8 *aData,                      /* Buffer to read data into */
1291   int nData                       /* Number of bytes to read */
1292 ){
1293   i64 iEob;                       /* End of block */
1294   int nRead;
1295   int rc;
1296 
1297   assert( pFS->pCompress );
1298 
1299   iEob = fsLastPageOnPagesBlock(pFS, iOff) + 1;
1300   nRead = (int)LSM_MIN(iEob - iOff, nData);
1301 
1302   rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aData, nRead);
1303   if( rc==LSM_OK && nRead!=nData ){
1304     int iBlk;
1305 
1306     rc = fsBlockNext(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk);
1307     if( rc==LSM_OK ){
1308       i64 iOff2 = fsFirstPageOnBlock(pFS, iBlk);
1309       rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff2, &aData[nRead], nData-nRead);
1310     }
1311   }
1312 
1313   return rc;
1314 }
1315 
1316 /*
1317 ** Parameter iBlock is a database file block. This function reads the value
1318 ** stored in the blocks "previous block" pointer and stores it in *piPrev.
1319 ** LSM_OK is returned if everything is successful, or an LSM error code
1320 ** otherwise.
1321 */
fsBlockPrev(FileSystem * pFS,Segment * pSeg,int iBlock,int * piPrev)1322 static int fsBlockPrev(
1323   FileSystem *pFS,                /* File-system object handle */
1324   Segment *pSeg,                  /* Use this segment for block redirects */
1325   int iBlock,                     /* Read field from this block */
1326   int *piPrev                     /* OUT: Previous block in linked list */
1327 ){
1328   int rc = LSM_OK;                /* Return code */
1329 
1330   assert( pFS->nMapLimit==0 || pFS->pCompress==0 );
1331   assert( iBlock>0 );
1332 
1333   if( pFS->pCompress ){
1334     i64 iOff = fsFirstPageOnBlock(pFS, iBlock) - 4;
1335     u8 aPrev[4];                  /* 4-byte pointer read from db file */
1336     rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aPrev, sizeof(aPrev));
1337     if( rc==LSM_OK ){
1338       Redirect *pRedir = (pSeg ? pSeg->pRedirect : 0);
1339       *piPrev = fsRedirectBlock(pRedir, (int)lsmGetU32(aPrev));
1340     }
1341   }else{
1342     assert( 0 );
1343   }
1344   return rc;
1345 }
1346 
1347 /*
1348 ** Encode and decode routines for record size fields.
1349 */
putRecordSize(u8 * aBuf,int nByte,int bFree)1350 static void putRecordSize(u8 *aBuf, int nByte, int bFree){
1351   aBuf[0] = (u8)(nByte >> 14) | 0x80;
1352   aBuf[1] = ((u8)(nByte >>  7) & 0x7F) | (bFree ? 0x00 : 0x80);
1353   aBuf[2] = (u8)nByte | 0x80;
1354 }
getRecordSize(u8 * aBuf,int * pbFree)1355 static int getRecordSize(u8 *aBuf, int *pbFree){
1356   int nByte;
1357   nByte  = (aBuf[0] & 0x7F) << 14;
1358   nByte += (aBuf[1] & 0x7F) << 7;
1359   nByte += (aBuf[2] & 0x7F);
1360   *pbFree = !(aBuf[1] & 0x80);
1361   return nByte;
1362 }
1363 
1364 /*
1365 ** Subtract iSub from database file offset iOff and set *piRes to the
1366 ** result. If doing so means passing the start of a block, follow the
1367 ** block pointer stored in the first 4 bytes of the block.
1368 **
1369 ** Offset iOff is an absolute offset - not subject to any block redirection.
1370 ** However any block pointer followed is. Use pSeg->pRedirect in this case.
1371 **
1372 ** Return LSM_OK if successful or an lsm error code if an error occurs.
1373 */
fsSubtractOffset(FileSystem * pFS,Segment * pSeg,i64 iOff,int iSub,i64 * piRes)1374 static int fsSubtractOffset(
1375   FileSystem *pFS,
1376   Segment *pSeg,
1377   i64 iOff,
1378   int iSub,
1379   i64 *piRes
1380 ){
1381   i64 iStart;
1382   int iBlk = 0;
1383   int rc;
1384 
1385   assert( pFS->pCompress );
1386 
1387   iStart = fsFirstPageOnBlock(pFS, fsPageToBlock(pFS, iOff));
1388   if( (iOff-iSub)>=iStart ){
1389     *piRes = (iOff-iSub);
1390     return LSM_OK;
1391   }
1392 
1393   rc = fsBlockPrev(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk);
1394   *piRes = fsLastPageOnBlock(pFS, iBlk) - iSub + (iOff - iStart + 1);
1395   return rc;
1396 }
1397 
1398 /*
1399 ** Add iAdd to database file offset iOff and set *piRes to the
1400 ** result. If doing so means passing the end of a block, follow the
1401 ** block pointer stored in the last 4 bytes of the block.
1402 **
1403 ** Offset iOff is an absolute offset - not subject to any block redirection.
1404 ** However any block pointer followed is. Use pSeg->pRedirect in this case.
1405 **
1406 ** Return LSM_OK if successful or an lsm error code if an error occurs.
1407 */
fsAddOffset(FileSystem * pFS,Segment * pSeg,i64 iOff,int iAdd,i64 * piRes)1408 static int fsAddOffset(
1409   FileSystem *pFS,
1410   Segment *pSeg,
1411   i64 iOff,
1412   int iAdd,
1413   i64 *piRes
1414 ){
1415   i64 iEob;
1416   int iBlk;
1417   int rc;
1418 
1419   assert( pFS->pCompress );
1420 
1421   iEob = fsLastPageOnPagesBlock(pFS, iOff);
1422   if( (iOff+iAdd)<=iEob ){
1423     *piRes = (iOff+iAdd);
1424     return LSM_OK;
1425   }
1426 
1427   rc = fsBlockNext(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk);
1428   *piRes = fsFirstPageOnBlock(pFS, iBlk) + iAdd - (iEob - iOff + 1);
1429   return rc;
1430 }
1431 
1432 /*
1433 ** If it is not already allocated, allocate either the FileSystem.aOBuffer (if
1434 ** bWrite is true) or the FileSystem.aIBuffer (if bWrite is false). Return
1435 ** LSM_OK if successful if the attempt to allocate memory fails.
1436 */
fsAllocateBuffer(FileSystem * pFS,int bWrite)1437 static int fsAllocateBuffer(FileSystem *pFS, int bWrite){
1438   u8 **pp;                        /* Pointer to either aIBuffer or aOBuffer */
1439 
1440   assert( pFS->pCompress );
1441 
1442   /* If neither buffer has been allocated, figure out how large they
1443   ** should be. Store this value in FileSystem.nBuffer.  */
1444   if( pFS->nBuffer==0 ){
1445     assert( pFS->aIBuffer==0 && pFS->aOBuffer==0 );
1446     pFS->nBuffer = pFS->pCompress->xBound(pFS->pCompress->pCtx, pFS->nPagesize);
1447     if( pFS->nBuffer<(pFS->szSector+6) ){
1448       pFS->nBuffer = pFS->szSector+6;
1449     }
1450   }
1451 
1452   pp = (bWrite ? &pFS->aOBuffer : &pFS->aIBuffer);
1453   if( *pp==0 ){
1454     *pp = lsmMalloc(pFS->pEnv, LSM_MAX(pFS->nBuffer, pFS->nPagesize));
1455     if( *pp==0 ) return LSM_NOMEM_BKPT;
1456   }
1457 
1458   return LSM_OK;
1459 }
1460 
1461 /*
1462 ** This function is only called in compressed database mode. It reads and
1463 ** uncompresses the compressed data for page pPg from the database and
1464 ** populates the pPg->aData[] buffer and pPg->nCompress field.
1465 **
1466 ** It is possible that instead of a page record, there is free space
1467 ** at offset pPg->iPgno. In this case no data is read from the file, but
1468 ** output variable *pnSpace is set to the total number of free bytes.
1469 **
1470 ** LSM_OK is returned if successful, or an LSM error code otherwise.
1471 */
fsReadPagedata(FileSystem * pFS,Segment * pSeg,Page * pPg,int * pnSpace)1472 static int fsReadPagedata(
1473   FileSystem *pFS,                /* File-system handle */
1474   Segment *pSeg,                  /* pPg is part of this segment */
1475   Page *pPg,                      /* Page to read and uncompress data for */
1476   int *pnSpace                    /* OUT: Total bytes of free space */
1477 ){
1478   lsm_compress *p = pFS->pCompress;
1479   i64 iOff = pPg->iPg;
1480   u8 aSz[3];
1481   int rc;
1482 
1483   assert( p && pPg->nCompress==0 );
1484 
1485   if( fsAllocateBuffer(pFS, 0) ) return LSM_NOMEM;
1486 
1487   rc = fsReadData(pFS, pSeg, iOff, aSz, sizeof(aSz));
1488 
1489   if( rc==LSM_OK ){
1490     int bFree;
1491     if( aSz[0] & 0x80 ){
1492       pPg->nCompress = (int)getRecordSize(aSz, &bFree);
1493     }else{
1494       pPg->nCompress = (int)aSz[0] - sizeof(aSz)*2;
1495       bFree = 1;
1496     }
1497     if( bFree ){
1498       if( pnSpace ){
1499         *pnSpace = pPg->nCompress + sizeof(aSz)*2;
1500       }else{
1501         rc = LSM_CORRUPT_BKPT;
1502       }
1503     }else{
1504       rc = fsAddOffset(pFS, pSeg, iOff, 3, &iOff);
1505       if( rc==LSM_OK ){
1506         if( pPg->nCompress>pFS->nBuffer ){
1507           rc = LSM_CORRUPT_BKPT;
1508         }else{
1509           rc = fsReadData(pFS, pSeg, iOff, pFS->aIBuffer, pPg->nCompress);
1510         }
1511         if( rc==LSM_OK ){
1512           int n = pFS->nPagesize;
1513           rc = p->xUncompress(p->pCtx,
1514               (char *)pPg->aData, &n,
1515               (const char *)pFS->aIBuffer, pPg->nCompress
1516           );
1517           if( rc==LSM_OK && n!=pPg->pFS->nPagesize ){
1518             rc = LSM_CORRUPT_BKPT;
1519           }
1520         }
1521       }
1522     }
1523   }
1524   return rc;
1525 }
1526 
1527 /*
1528 ** Return a handle for a database page.
1529 **
1530 ** If this file-system object is accessing a compressed database it may be
1531 ** that there is no page record at database file offset iPg. Instead, there
1532 ** may be a free space record. In this case, set *ppPg to NULL and *pnSpace
1533 ** to the total number of free bytes before returning.
1534 **
1535 ** If no error occurs, LSM_OK is returned. Otherwise, an lsm error code.
1536 */
fsPageGet(FileSystem * pFS,Segment * pSeg,LsmPgno iPg,int noContent,Page ** ppPg,int * pnSpace)1537 static int fsPageGet(
1538   FileSystem *pFS,                /* File-system handle */
1539   Segment *pSeg,                  /* Block redirection to use (or NULL) */
1540   LsmPgno iPg,                    /* Page id */
1541   int noContent,                  /* True to not load content from disk */
1542   Page **ppPg,                    /* OUT: New page handle */
1543   int *pnSpace                    /* OUT: Bytes of free space */
1544 ){
1545   Page *p;
1546   int iHash;
1547   int rc = LSM_OK;
1548 
1549   /* In most cases iReal is the same as iPg. Except, if pSeg->pRedirect is
1550   ** not NULL, and the block containing iPg has been redirected, then iReal
1551   ** is the page number after redirection.  */
1552   LsmPgno iReal = lsmFsRedirectPage(pFS, (pSeg ? pSeg->pRedirect : 0), iPg);
1553 
1554   assert_lists_are_ok(pFS);
1555   assert( iPg>=fsFirstPageOnBlock(pFS, 1) );
1556   assert( iReal>=fsFirstPageOnBlock(pFS, 1) );
1557   *ppPg = 0;
1558 
1559   /* Search the hash-table for the page */
1560   p = fsPageFindInHash(pFS, iReal, &iHash);
1561 
1562   if( p ){
1563     assert( p->flags & PAGE_FREE );
1564     if( p->nRef==0 ) fsPageRemoveFromLru(pFS, p);
1565   }else{
1566 
1567     if( fsMmapPage(pFS, iReal) ){
1568       i64 iEnd = (i64)iReal * pFS->nPagesize;
1569       fsGrowMapping(pFS, iEnd, &rc);
1570       if( rc!=LSM_OK ) return rc;
1571 
1572       if( pFS->pFree ){
1573         p = pFS->pFree;
1574         pFS->pFree = p->pFreeNext;
1575         assert( p->nRef==0 );
1576       }else{
1577         p = lsmMallocZeroRc(pFS->pEnv, sizeof(Page), &rc);
1578         if( rc ) return rc;
1579         p->pFS = pFS;
1580       }
1581       p->aData = &((u8 *)pFS->pMap)[pFS->nPagesize * (iReal-1)];
1582       p->iPg = iReal;
1583 
1584       /* This page now carries a pointer to the mapping. Link it in to
1585       ** the FileSystem.pMapped list.  */
1586       assert( p->pMappedNext==0 );
1587       p->pMappedNext = pFS->pMapped;
1588       pFS->pMapped = p;
1589 
1590       assert( pFS->pCompress==0 );
1591       assert( (p->flags & PAGE_FREE)==0 );
1592     }else{
1593       rc = fsPageBuffer(pFS, &p);
1594       if( rc==LSM_OK ){
1595         int nSpace = 0;
1596         p->iPg = iReal;
1597         p->nRef = 0;
1598         p->pFS = pFS;
1599         assert( p->flags==0 || p->flags==PAGE_FREE );
1600 
1601 #ifdef LSM_DEBUG
1602         memset(p->aData, 0x56, pFS->nPagesize);
1603 #endif
1604         assert( p->pLruNext==0 && p->pLruPrev==0 );
1605         if( noContent==0 ){
1606           if( pFS->pCompress ){
1607             rc = fsReadPagedata(pFS, pSeg, p, &nSpace);
1608           }else{
1609             int nByte = pFS->nPagesize;
1610             i64 iOff = (i64)(iReal-1) * pFS->nPagesize;
1611             rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, p->aData, nByte);
1612           }
1613           pFS->nRead++;
1614         }
1615 
1616         /* If the xRead() call was successful (or not attempted), link the
1617         ** page into the page-cache hash-table. Otherwise, if it failed,
1618         ** free the buffer. */
1619         if( rc==LSM_OK && nSpace==0 ){
1620           p->pHashNext = pFS->apHash[iHash];
1621           pFS->apHash[iHash] = p;
1622         }else{
1623           fsPageBufferFree(p);
1624           p = 0;
1625           if( pnSpace ) *pnSpace = nSpace;
1626         }
1627       }
1628     }
1629 
1630     assert( (rc==LSM_OK && (p || (pnSpace && *pnSpace)))
1631          || (rc!=LSM_OK && p==0)
1632     );
1633   }
1634 
1635   if( rc==LSM_OK && p ){
1636     if( pFS->pCompress==0 && (fsIsLast(pFS, iReal) || fsIsFirst(pFS, iReal)) ){
1637       p->nData = pFS->nPagesize - 4;
1638       if( fsIsFirst(pFS, iReal) && p->nRef==0 ){
1639         p->aData += 4;
1640         p->flags |= PAGE_HASPREV;
1641       }
1642     }else{
1643       p->nData = pFS->nPagesize;
1644     }
1645     pFS->nOut += (p->nRef==0);
1646     p->nRef++;
1647   }
1648   *ppPg = p;
1649   return rc;
1650 }
1651 
1652 /*
1653 ** Read the 64-bit checkpoint id of the checkpoint currently stored on meta
1654 ** page iMeta of the database file. If no error occurs, store the id value
1655 ** in *piVal and return LSM_OK. Otherwise, return an LSM error code and leave
1656 ** *piVal unmodified.
1657 **
1658 ** If a checkpointer connection is currently updating meta-page iMeta, or an
1659 ** earlier checkpointer crashed while doing so, the value read into *piVal
1660 ** may be garbage. It is the callers responsibility to deal with this.
1661 */
lsmFsReadSyncedId(lsm_db * db,int iMeta,i64 * piVal)1662 int lsmFsReadSyncedId(lsm_db *db, int iMeta, i64 *piVal){
1663   FileSystem *pFS = db->pFS;
1664   int rc = LSM_OK;
1665 
1666   assert( iMeta==1 || iMeta==2 );
1667   if( pFS->nMapLimit>0 ){
1668     fsGrowMapping(pFS, iMeta*LSM_META_PAGE_SIZE, &rc);
1669     if( rc==LSM_OK ){
1670       *piVal = (i64)lsmGetU64(&((u8 *)pFS->pMap)[(iMeta-1)*LSM_META_PAGE_SIZE]);
1671     }
1672   }else{
1673     MetaPage *pMeta = 0;
1674     rc = lsmFsMetaPageGet(pFS, 0, iMeta, &pMeta);
1675     if( rc==LSM_OK ){
1676       *piVal = (i64)lsmGetU64(pMeta->aData);
1677       lsmFsMetaPageRelease(pMeta);
1678     }
1679   }
1680 
1681   return rc;
1682 }
1683 
1684 
1685 /*
1686 ** Return true if the first or last page of segment pRun falls between iFirst
1687 ** and iLast, inclusive, and pRun is not equal to pIgnore.
1688 */
fsRunEndsBetween(Segment * pRun,Segment * pIgnore,LsmPgno iFirst,LsmPgno iLast)1689 static int fsRunEndsBetween(
1690   Segment *pRun,
1691   Segment *pIgnore,
1692   LsmPgno iFirst,
1693   LsmPgno iLast
1694 ){
1695   return (pRun!=pIgnore && (
1696         (pRun->iFirst>=iFirst && pRun->iFirst<=iLast)
1697      || (pRun->iLastPg>=iFirst && pRun->iLastPg<=iLast)
1698   ));
1699 }
1700 
1701 /*
1702 ** Return true if level pLevel contains a segment other than pIgnore for
1703 ** which the first or last page is between iFirst and iLast, inclusive.
1704 */
fsLevelEndsBetween(Level * pLevel,Segment * pIgnore,LsmPgno iFirst,LsmPgno iLast)1705 static int fsLevelEndsBetween(
1706   Level *pLevel,
1707   Segment *pIgnore,
1708   LsmPgno iFirst,
1709   LsmPgno iLast
1710 ){
1711   int i;
1712 
1713   if( fsRunEndsBetween(&pLevel->lhs, pIgnore, iFirst, iLast) ){
1714     return 1;
1715   }
1716   for(i=0; i<pLevel->nRight; i++){
1717     if( fsRunEndsBetween(&pLevel->aRhs[i], pIgnore, iFirst, iLast) ){
1718       return 1;
1719     }
1720   }
1721 
1722   return 0;
1723 }
1724 
1725 /*
1726 ** Block iBlk is no longer in use by segment pIgnore. If it is not in use
1727 ** by any other segment, move it to the free block list.
1728 */
fsFreeBlock(FileSystem * pFS,Snapshot * pSnapshot,Segment * pIgnore,int iBlk)1729 static int fsFreeBlock(
1730   FileSystem *pFS,                /* File system object */
1731   Snapshot *pSnapshot,            /* Worker snapshot */
1732   Segment *pIgnore,               /* Ignore this run when searching */
1733   int iBlk                        /* Block number of block to free */
1734 ){
1735   int rc = LSM_OK;                /* Return code */
1736   LsmPgno iFirst;                 /* First page on block iBlk */
1737   LsmPgno iLast;                  /* Last page on block iBlk */
1738   Level *pLevel;                  /* Used to iterate through levels */
1739 
1740   int iIn;                        /* Used to iterate through append points */
1741   int iOut = 0;                   /* Used to output append points */
1742   LsmPgno *aApp = pSnapshot->aiAppend;
1743 
1744   iFirst = fsFirstPageOnBlock(pFS, iBlk);
1745   iLast = fsLastPageOnBlock(pFS, iBlk);
1746 
1747   /* Check if any other run in the snapshot has a start or end page
1748   ** within this block. If there is such a run, return early. */
1749   for(pLevel=lsmDbSnapshotLevel(pSnapshot); pLevel; pLevel=pLevel->pNext){
1750     if( fsLevelEndsBetween(pLevel, pIgnore, iFirst, iLast) ){
1751       return LSM_OK;
1752     }
1753   }
1754 
1755   /* Remove any entries that lie on this block from the append-list. */
1756   for(iIn=0; iIn<LSM_APPLIST_SZ; iIn++){
1757     if( aApp[iIn]<iFirst || aApp[iIn]>iLast ){
1758       aApp[iOut++] = aApp[iIn];
1759     }
1760   }
1761   while( iOut<LSM_APPLIST_SZ ) aApp[iOut++] = 0;
1762 
1763   if( rc==LSM_OK ){
1764     rc = lsmBlockFree(pFS->pDb, iBlk);
1765   }
1766   return rc;
1767 }
1768 
1769 /*
1770 ** Delete or otherwise recycle the blocks currently occupied by run pDel.
1771 */
lsmFsSortedDelete(FileSystem * pFS,Snapshot * pSnapshot,int bZero,Segment * pDel)1772 int lsmFsSortedDelete(
1773   FileSystem *pFS,
1774   Snapshot *pSnapshot,
1775   int bZero,                      /* True to zero the Segment structure */
1776   Segment *pDel
1777 ){
1778   if( pDel->iFirst ){
1779     int rc = LSM_OK;
1780 
1781     int iBlk;
1782     int iLastBlk;
1783 
1784     iBlk = fsPageToBlock(pFS, pDel->iFirst);
1785     iLastBlk = fsPageToBlock(pFS, pDel->iLastPg);
1786 
1787     /* Mark all blocks currently used by this sorted run as free */
1788     while( iBlk && rc==LSM_OK ){
1789       int iNext = 0;
1790       if( iBlk!=iLastBlk ){
1791         rc = fsBlockNext(pFS, pDel, iBlk, &iNext);
1792       }else if( bZero==0 && pDel->iLastPg!=fsLastPageOnBlock(pFS, iLastBlk) ){
1793         break;
1794       }
1795       rc = fsFreeBlock(pFS, pSnapshot, pDel, iBlk);
1796       iBlk = iNext;
1797     }
1798 
1799     if( pDel->pRedirect ){
1800       assert( pDel->pRedirect==&pSnapshot->redirect );
1801       pSnapshot->redirect.n = 0;
1802     }
1803 
1804     if( bZero ) memset(pDel, 0, sizeof(Segment));
1805   }
1806   return LSM_OK;
1807 }
1808 
1809 /*
1810 ** aPgno is an array containing nPgno page numbers. Return the smallest page
1811 ** number from the array that falls on block iBlk. Or, if none of the pages
1812 ** in aPgno[] fall on block iBlk, return 0.
1813 */
firstOnBlock(FileSystem * pFS,int iBlk,LsmPgno * aPgno,int nPgno)1814 static LsmPgno firstOnBlock(
1815   FileSystem *pFS,
1816   int iBlk,
1817   LsmPgno *aPgno,
1818   int nPgno
1819 ){
1820   LsmPgno iRet = 0;
1821   int i;
1822   for(i=0; i<nPgno; i++){
1823     LsmPgno iPg = aPgno[i];
1824     if( fsPageToBlock(pFS, iPg)==iBlk && (iRet==0 || iPg<iRet) ){
1825       iRet = iPg;
1826     }
1827   }
1828   return iRet;
1829 }
1830 
1831 #ifndef NDEBUG
1832 /*
1833 ** Return true if page iPg, which is a part of segment p, lies on
1834 ** a redirected block.
1835 */
fsPageRedirects(FileSystem * pFS,Segment * p,LsmPgno iPg)1836 static int fsPageRedirects(FileSystem *pFS, Segment *p, LsmPgno iPg){
1837   return (iPg!=0 && iPg!=lsmFsRedirectPage(pFS, p->pRedirect, iPg));
1838 }
1839 
1840 /*
1841 ** Return true if the second argument is not NULL and any of the first
1842 ** last or root pages lie on a redirected block.
1843 */
fsSegmentRedirects(FileSystem * pFS,Segment * p)1844 static int fsSegmentRedirects(FileSystem *pFS, Segment *p){
1845   return (p && (
1846       fsPageRedirects(pFS, p, p->iFirst)
1847    || fsPageRedirects(pFS, p, p->iRoot)
1848    || fsPageRedirects(pFS, p, p->iLastPg)
1849   ));
1850 }
1851 #endif
1852 
1853 /*
1854 ** Argument aPgno is an array of nPgno page numbers. All pages belong to
1855 ** the segment pRun. This function gobbles from the start of the run to the
1856 ** first page that appears in aPgno[] (i.e. so that the aPgno[] entry is
1857 ** the new first page of the run).
1858 */
lsmFsGobble(lsm_db * pDb,Segment * pRun,LsmPgno * aPgno,int nPgno)1859 void lsmFsGobble(
1860   lsm_db *pDb,
1861   Segment *pRun,
1862   LsmPgno *aPgno,
1863   int nPgno
1864 ){
1865   int rc = LSM_OK;
1866   FileSystem *pFS = pDb->pFS;
1867   Snapshot *pSnapshot = pDb->pWorker;
1868   int iBlk;
1869 
1870   assert( pRun->nSize>0 );
1871   assert( 0==fsSegmentRedirects(pFS, pRun) );
1872   assert( nPgno>0 && 0==fsPageRedirects(pFS, pRun, aPgno[0]) );
1873 
1874   iBlk = fsPageToBlock(pFS, pRun->iFirst);
1875   pRun->nSize += (int)(pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
1876 
1877   while( rc==LSM_OK ){
1878     int iNext = 0;
1879     LsmPgno iFirst = firstOnBlock(pFS, iBlk, aPgno, nPgno);
1880     if( iFirst ){
1881       pRun->iFirst = iFirst;
1882       break;
1883     }
1884     rc = fsBlockNext(pFS, pRun, iBlk, &iNext);
1885     if( rc==LSM_OK ) rc = fsFreeBlock(pFS, pSnapshot, pRun, iBlk);
1886     pRun->nSize -= (int)(
1887         1 + fsLastPageOnBlock(pFS, iBlk) - fsFirstPageOnBlock(pFS, iBlk)
1888     );
1889     iBlk = iNext;
1890   }
1891 
1892   pRun->nSize -= (int)(pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
1893   assert( pRun->nSize>0 );
1894 }
1895 
1896 /*
1897 ** This function is only used in compressed database mode.
1898 **
1899 ** Argument iPg is the page number (byte offset) of a page within segment
1900 ** pSeg. The page record, including all headers, is nByte bytes in size.
1901 ** Before returning, set *piNext to the page number of the next page in
1902 ** the segment, or to zero if iPg is the last.
1903 **
1904 ** In other words, do:
1905 **
1906 **   *piNext = iPg + nByte;
1907 **
1908 ** But take block overflow and redirection into account.
1909 */
fsNextPageOffset(FileSystem * pFS,Segment * pSeg,LsmPgno iPg,int nByte,LsmPgno * piNext)1910 static int fsNextPageOffset(
1911   FileSystem *pFS,                /* File system object */
1912   Segment *pSeg,                  /* Segment to move within */
1913   LsmPgno iPg,                    /* Offset of current page */
1914   int nByte,                      /* Size of current page including headers */
1915   LsmPgno *piNext                 /* OUT: Offset of next page. Or zero (EOF) */
1916 ){
1917   LsmPgno iNext;
1918   int rc;
1919 
1920   assert( pFS->pCompress );
1921 
1922   rc = fsAddOffset(pFS, pSeg, iPg, nByte-1, &iNext);
1923   if( pSeg && iNext==pSeg->iLastPg ){
1924     iNext = 0;
1925   }else if( rc==LSM_OK ){
1926     rc = fsAddOffset(pFS, pSeg, iNext, 1, &iNext);
1927   }
1928 
1929   *piNext = iNext;
1930   return rc;
1931 }
1932 
1933 /*
1934 ** This function is only used in compressed database mode.
1935 **
1936 ** Argument iPg is the page number of a pagethat appears in segment pSeg.
1937 ** This function determines the page number of the previous page in the
1938 ** same run. *piPrev is set to the previous page number before returning.
1939 **
1940 ** LSM_OK is returned if no error occurs. Otherwise, an lsm error code.
1941 ** If any value other than LSM_OK is returned, then the final value of
1942 ** *piPrev is undefined.
1943 */
fsGetPageBefore(FileSystem * pFS,Segment * pSeg,LsmPgno iPg,LsmPgno * piPrev)1944 static int fsGetPageBefore(
1945   FileSystem *pFS,
1946   Segment *pSeg,
1947   LsmPgno iPg,
1948   LsmPgno *piPrev
1949 ){
1950   u8 aSz[3];
1951   int rc;
1952   i64 iRead;
1953 
1954   assert( pFS->pCompress );
1955 
1956   rc = fsSubtractOffset(pFS, pSeg, iPg, sizeof(aSz), &iRead);
1957   if( rc==LSM_OK ) rc = fsReadData(pFS, pSeg, iRead, aSz, sizeof(aSz));
1958 
1959   if( rc==LSM_OK ){
1960     int bFree;
1961     int nSz;
1962     if( aSz[2] & 0x80 ){
1963       nSz = getRecordSize(aSz, &bFree) + sizeof(aSz)*2;
1964     }else{
1965       nSz = (int)(aSz[2] & 0x7F);
1966       bFree = 1;
1967     }
1968     rc = fsSubtractOffset(pFS, pSeg, iPg, nSz, piPrev);
1969   }
1970 
1971   return rc;
1972 }
1973 
1974 /*
1975 ** The first argument to this function is a valid reference to a database
1976 ** file page that is part of a sorted run. If parameter eDir is -1, this
1977 ** function attempts to locate and load the previous page in the same run.
1978 ** Or, if eDir is +1, it attempts to find the next page in the same run.
1979 ** The results of passing an eDir value other than positive or negative one
1980 ** are undefined.
1981 **
1982 ** If parameter pRun is not NULL then it must point to the run that page
1983 ** pPg belongs to. In this case, if pPg is the first or last page of the
1984 ** run, and the request is for the previous or next page, respectively,
1985 ** *ppNext is set to NULL before returning LSM_OK. If pRun is NULL, then it
1986 ** is assumed that the next or previous page, as requested, exists.
1987 **
1988 ** If the previous/next page does exist and is successfully loaded, *ppNext
1989 ** is set to point to it and LSM_OK is returned. Otherwise, if an error
1990 ** occurs, *ppNext is set to NULL and and lsm error code returned.
1991 **
1992 ** Page references returned by this function should be released by the
1993 ** caller using lsmFsPageRelease().
1994 */
lsmFsDbPageNext(Segment * pRun,Page * pPg,int eDir,Page ** ppNext)1995 int lsmFsDbPageNext(Segment *pRun, Page *pPg, int eDir, Page **ppNext){
1996   int rc = LSM_OK;
1997   FileSystem *pFS = pPg->pFS;
1998   LsmPgno iPg = pPg->iPg;
1999 
2000   assert( 0==fsSegmentRedirects(pFS, pRun) );
2001   if( pFS->pCompress ){
2002     int nSpace = pPg->nCompress + 2*3;
2003 
2004     do {
2005       if( eDir>0 ){
2006         rc = fsNextPageOffset(pFS, pRun, iPg, nSpace, &iPg);
2007       }else{
2008         if( iPg==pRun->iFirst ){
2009           iPg = 0;
2010         }else{
2011           rc = fsGetPageBefore(pFS, pRun, iPg, &iPg);
2012         }
2013       }
2014 
2015       nSpace = 0;
2016       if( iPg!=0 ){
2017         rc = fsPageGet(pFS, pRun, iPg, 0, ppNext, &nSpace);
2018         assert( (*ppNext==0)==(rc!=LSM_OK || nSpace>0) );
2019       }else{
2020         *ppNext = 0;
2021       }
2022     }while( nSpace>0 && rc==LSM_OK );
2023 
2024   }else{
2025     Redirect *pRedir = pRun ? pRun->pRedirect : 0;
2026     assert( eDir==1 || eDir==-1 );
2027     if( eDir<0 ){
2028       if( pRun && iPg==pRun->iFirst ){
2029         *ppNext = 0;
2030         return LSM_OK;
2031       }else if( fsIsFirst(pFS, iPg) ){
2032         assert( pPg->flags & PAGE_HASPREV );
2033         iPg = fsLastPageOnBlock(pFS, lsmGetU32(&pPg->aData[-4]));
2034       }else{
2035         iPg--;
2036       }
2037     }else{
2038       if( pRun ){
2039         if( iPg==pRun->iLastPg ){
2040           *ppNext = 0;
2041           return LSM_OK;
2042         }
2043       }
2044 
2045       if( fsIsLast(pFS, iPg) ){
2046         int iBlk = fsRedirectBlock(
2047             pRedir, lsmGetU32(&pPg->aData[pFS->nPagesize-4])
2048         );
2049         iPg = fsFirstPageOnBlock(pFS, iBlk);
2050       }else{
2051         iPg++;
2052       }
2053     }
2054     rc = fsPageGet(pFS, pRun, iPg, 0, ppNext, 0);
2055   }
2056 
2057   return rc;
2058 }
2059 
2060 /*
2061 ** This function is called when creating a new segment to determine if the
2062 ** first part of it can be written following an existing segment on an
2063 ** already allocated block. If it is possible, the page number of the first
2064 ** page to use for the new segment is returned. Otherwise zero.
2065 **
2066 ** If argument pLvl is not NULL, then this function will not attempt to
2067 ** start the new segment immediately following any segment that is part
2068 ** of the right-hand-side of pLvl.
2069 */
findAppendPoint(FileSystem * pFS,Level * pLvl)2070 static LsmPgno findAppendPoint(FileSystem *pFS, Level *pLvl){
2071   int i;
2072   LsmPgno *aiAppend = pFS->pDb->pWorker->aiAppend;
2073   LsmPgno iRet = 0;
2074 
2075   for(i=LSM_APPLIST_SZ-1; iRet==0 && i>=0; i--){
2076     if( (iRet = aiAppend[i]) ){
2077       if( pLvl ){
2078         int iBlk = fsPageToBlock(pFS, iRet);
2079         int j;
2080         for(j=0; iRet && j<pLvl->nRight; j++){
2081           if( fsPageToBlock(pFS, pLvl->aRhs[j].iLastPg)==iBlk ){
2082             iRet = 0;
2083           }
2084         }
2085       }
2086       if( iRet ) aiAppend[i] = 0;
2087     }
2088   }
2089   return iRet;
2090 }
2091 
2092 /*
2093 ** Append a page to the left-hand-side of pLvl. Set the ref-count to 1 and
2094 ** return a pointer to it. The page is writable until either
2095 ** lsmFsPagePersist() is called on it or the ref-count drops to zero.
2096 */
lsmFsSortedAppend(FileSystem * pFS,Snapshot * pSnapshot,Level * pLvl,int bDefer,Page ** ppOut)2097 int lsmFsSortedAppend(
2098   FileSystem *pFS,
2099   Snapshot *pSnapshot,
2100   Level *pLvl,
2101   int bDefer,
2102   Page **ppOut
2103 ){
2104   int rc = LSM_OK;
2105   Page *pPg = 0;
2106   LsmPgno iApp = 0;
2107   LsmPgno iNext = 0;
2108   Segment *p = &pLvl->lhs;
2109   LsmPgno iPrev = p->iLastPg;
2110 
2111   *ppOut = 0;
2112   assert( p->pRedirect==0 );
2113 
2114   if( pFS->pCompress || bDefer ){
2115     /* In compressed database mode the page is not assigned a page number
2116     ** or location in the database file at this point. This will be done
2117     ** by the lsmFsPagePersist() call.  */
2118     rc = fsPageBuffer(pFS, &pPg);
2119     if( rc==LSM_OK ){
2120       pPg->pFS = pFS;
2121       pPg->pSeg = p;
2122       pPg->iPg = 0;
2123       pPg->flags |= PAGE_DIRTY;
2124       pPg->nData = pFS->nPagesize;
2125       assert( pPg->aData );
2126       if( pFS->pCompress==0 ) pPg->nData -= 4;
2127 
2128       pPg->nRef = 1;
2129       pFS->nOut++;
2130     }
2131   }else{
2132     if( iPrev==0 ){
2133       iApp = findAppendPoint(pFS, pLvl);
2134     }else if( fsIsLast(pFS, iPrev) ){
2135       int iNext2;
2136       rc = fsBlockNext(pFS, 0, fsPageToBlock(pFS, iPrev), &iNext2);
2137       if( rc!=LSM_OK ) return rc;
2138       iApp = fsFirstPageOnBlock(pFS, iNext2);
2139     }else{
2140       iApp = iPrev + 1;
2141     }
2142 
2143     /* If this is the first page allocated, or if the page allocated is the
2144     ** last in the block, also allocate the next block here.  */
2145     if( iApp==0 || fsIsLast(pFS, iApp) ){
2146       int iNew;                     /* New block number */
2147 
2148       rc = lsmBlockAllocate(pFS->pDb, 0, &iNew);
2149       if( rc!=LSM_OK ) return rc;
2150       if( iApp==0 ){
2151         iApp = fsFirstPageOnBlock(pFS, iNew);
2152       }else{
2153         iNext = fsFirstPageOnBlock(pFS, iNew);
2154       }
2155     }
2156 
2157     /* Grab the new page. */
2158     pPg = 0;
2159     rc = fsPageGet(pFS, 0, iApp, 1, &pPg, 0);
2160     assert( rc==LSM_OK || pPg==0 );
2161 
2162     /* If this is the first or last page of a block, fill in the pointer
2163      ** value at the end of the new page. */
2164     if( rc==LSM_OK ){
2165       p->nSize++;
2166       p->iLastPg = iApp;
2167       if( p->iFirst==0 ) p->iFirst = iApp;
2168       pPg->flags |= PAGE_DIRTY;
2169 
2170       if( fsIsLast(pFS, iApp) ){
2171         lsmPutU32(&pPg->aData[pFS->nPagesize-4], fsPageToBlock(pFS, iNext));
2172       }else if( fsIsFirst(pFS, iApp) ){
2173         lsmPutU32(&pPg->aData[-4], fsPageToBlock(pFS, iPrev));
2174       }
2175     }
2176   }
2177 
2178   *ppOut = pPg;
2179   return rc;
2180 }
2181 
2182 /*
2183 ** Mark the segment passed as the second argument as finished. Once a segment
2184 ** is marked as finished it is not possible to append any further pages to
2185 ** it.
2186 **
2187 ** Return LSM_OK if successful or an lsm error code if an error occurs.
2188 */
lsmFsSortedFinish(FileSystem * pFS,Segment * p)2189 int lsmFsSortedFinish(FileSystem *pFS, Segment *p){
2190   int rc = LSM_OK;
2191   if( p && p->iLastPg ){
2192     assert( p->pRedirect==0 );
2193 
2194     /* Check if the last page of this run happens to be the last of a block.
2195     ** If it is, then an extra block has already been allocated for this run.
2196     ** Shift this extra block back to the free-block list.
2197     **
2198     ** Otherwise, add the first free page in the last block used by the run
2199     ** to the lAppend list.
2200     */
2201     if( fsLastPageOnPagesBlock(pFS, p->iLastPg)!=p->iLastPg ){
2202       int i;
2203       LsmPgno *aiAppend = pFS->pDb->pWorker->aiAppend;
2204       for(i=0; i<LSM_APPLIST_SZ; i++){
2205         if( aiAppend[i]==0 ){
2206           aiAppend[i] = p->iLastPg+1;
2207           break;
2208         }
2209       }
2210     }else if( pFS->pCompress==0 ){
2211       Page *pLast;
2212       rc = fsPageGet(pFS, 0, p->iLastPg, 0, &pLast, 0);
2213       if( rc==LSM_OK ){
2214         int iBlk = (int)lsmGetU32(&pLast->aData[pFS->nPagesize-4]);
2215         lsmBlockRefree(pFS->pDb, iBlk);
2216         lsmFsPageRelease(pLast);
2217       }
2218     }else{
2219       int iBlk = 0;
2220       rc = fsBlockNext(pFS, p, fsPageToBlock(pFS, p->iLastPg), &iBlk);
2221       if( rc==LSM_OK ){
2222         lsmBlockRefree(pFS->pDb, iBlk);
2223       }
2224     }
2225   }
2226   return rc;
2227 }
2228 
2229 /*
2230 ** Obtain a reference to page number iPg.
2231 **
2232 ** Return LSM_OK if successful, or an lsm error code if an error occurs.
2233 */
lsmFsDbPageGet(FileSystem * pFS,Segment * pSeg,LsmPgno iPg,Page ** ppPg)2234 int lsmFsDbPageGet(FileSystem *pFS, Segment *pSeg, LsmPgno iPg, Page **ppPg){
2235   return fsPageGet(pFS, pSeg, iPg, 0, ppPg, 0);
2236 }
2237 
2238 /*
2239 ** Obtain a reference to the last page in the segment passed as the
2240 ** second argument.
2241 **
2242 ** Return LSM_OK if successful, or an lsm error code if an error occurs.
2243 */
lsmFsDbPageLast(FileSystem * pFS,Segment * pSeg,Page ** ppPg)2244 int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg){
2245   int rc;
2246   LsmPgno iPg = pSeg->iLastPg;
2247   if( pFS->pCompress ){
2248     int nSpace;
2249     iPg++;
2250     do {
2251       nSpace = 0;
2252       rc = fsGetPageBefore(pFS, pSeg, iPg, &iPg);
2253       if( rc==LSM_OK ){
2254         rc = fsPageGet(pFS, pSeg, iPg, 0, ppPg, &nSpace);
2255       }
2256     }while( rc==LSM_OK && nSpace>0 );
2257 
2258   }else{
2259     rc = fsPageGet(pFS, pSeg, iPg, 0, ppPg, 0);
2260   }
2261   return rc;
2262 }
2263 
2264 /*
2265 ** Return a reference to meta-page iPg. If successful, LSM_OK is returned
2266 ** and *ppPg populated with the new page reference. The reference should
2267 ** be released by the caller using lsmFsPageRelease().
2268 **
2269 ** Otherwise, if an error occurs, *ppPg is set to NULL and an LSM error
2270 ** code is returned.
2271 */
lsmFsMetaPageGet(FileSystem * pFS,int bWrite,int iPg,MetaPage ** ppPg)2272 int lsmFsMetaPageGet(
2273   FileSystem *pFS,                /* File-system connection */
2274   int bWrite,                     /* True for write access, false for read */
2275   int iPg,                        /* Either 1 or 2 */
2276   MetaPage **ppPg                 /* OUT: Pointer to MetaPage object */
2277 ){
2278   int rc = LSM_OK;
2279   MetaPage *pPg;
2280   assert( iPg==1 || iPg==2 );
2281 
2282   pPg = lsmMallocZeroRc(pFS->pEnv, sizeof(Page), &rc);
2283 
2284   if( pPg ){
2285     i64 iOff = (iPg-1) * pFS->nMetasize;
2286     if( pFS->nMapLimit>0 ){
2287       fsGrowMapping(pFS, 2*pFS->nMetasize, &rc);
2288       pPg->aData = (u8 *)(pFS->pMap) + iOff;
2289     }else{
2290       pPg->aData = lsmMallocRc(pFS->pEnv, pFS->nMetasize, &rc);
2291       if( rc==LSM_OK && bWrite==0 ){
2292         rc = lsmEnvRead(
2293             pFS->pEnv, pFS->fdDb, iOff, pPg->aData, pFS->nMetaRwSize
2294         );
2295       }
2296 #ifndef NDEBUG
2297       /* pPg->aData causes an uninitialized access via a downstreadm write().
2298          After discussion on this list, this memory should not, for performance
2299          reasons, be memset. However, tracking down "real" misuse is more
2300          difficult with this "false" positive, so it is set when NDEBUG.
2301       */
2302       else if( rc==LSM_OK ){
2303         memset( pPg->aData, 0x77, pFS->nMetasize );
2304       }
2305 #endif
2306     }
2307 
2308     if( rc!=LSM_OK ){
2309       if( pFS->nMapLimit==0 ) lsmFree(pFS->pEnv, pPg->aData);
2310       lsmFree(pFS->pEnv, pPg);
2311       pPg = 0;
2312     }else{
2313       pPg->iPg = iPg;
2314       pPg->bWrite = bWrite;
2315       pPg->pFS = pFS;
2316     }
2317   }
2318 
2319   *ppPg = pPg;
2320   return rc;
2321 }
2322 
2323 /*
2324 ** Release a meta-page reference obtained via a call to lsmFsMetaPageGet().
2325 */
lsmFsMetaPageRelease(MetaPage * pPg)2326 int lsmFsMetaPageRelease(MetaPage *pPg){
2327   int rc = LSM_OK;
2328   if( pPg ){
2329     FileSystem *pFS = pPg->pFS;
2330 
2331     if( pFS->nMapLimit==0 ){
2332       if( pPg->bWrite ){
2333         i64 iOff = (pPg->iPg==2 ? pFS->nMetasize : 0);
2334         int nWrite = pFS->nMetaRwSize;
2335         rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, pPg->aData, nWrite);
2336       }
2337       lsmFree(pFS->pEnv, pPg->aData);
2338     }
2339 
2340     lsmFree(pFS->pEnv, pPg);
2341   }
2342   return rc;
2343 }
2344 
2345 /*
2346 ** Return a pointer to a buffer containing the data associated with the
2347 ** meta-page passed as the first argument. If parameter pnData is not NULL,
2348 ** set *pnData to the size of the meta-page in bytes before returning.
2349 */
lsmFsMetaPageData(MetaPage * pPg,int * pnData)2350 u8 *lsmFsMetaPageData(MetaPage *pPg, int *pnData){
2351   if( pnData ) *pnData = pPg->pFS->nMetaRwSize;
2352   return pPg->aData;
2353 }
2354 
2355 /*
2356 ** Return true if page is currently writable. This is used in assert()
2357 ** statements only.
2358 */
2359 #ifndef NDEBUG
lsmFsPageWritable(Page * pPg)2360 int lsmFsPageWritable(Page *pPg){
2361   return (pPg->flags & PAGE_DIRTY) ? 1 : 0;
2362 }
2363 #endif
2364 
2365 /*
2366 ** This is called when block iFrom is being redirected to iTo. If page
2367 ** number (*piPg) lies on block iFrom, then calculate the equivalent
2368 ** page on block iTo and set *piPg to this value before returning.
2369 */
fsMovePage(FileSystem * pFS,int iTo,int iFrom,LsmPgno * piPg)2370 static void fsMovePage(
2371   FileSystem *pFS,                /* File system object */
2372   int iTo,                        /* Destination block */
2373   int iFrom,                      /* Source block */
2374   LsmPgno *piPg                   /* IN/OUT: Page number */
2375 ){
2376   LsmPgno iPg = *piPg;
2377   if( iFrom==fsPageToBlock(pFS, iPg) ){
2378     const int nPagePerBlock = (
2379         pFS->pCompress ? pFS ->nBlocksize : (pFS->nBlocksize / pFS->nPagesize)
2380     );
2381     *piPg = iPg - (LsmPgno)(iFrom - iTo) * nPagePerBlock;
2382   }
2383 }
2384 
2385 /*
2386 ** Copy the contents of block iFrom to block iTo.
2387 **
2388 ** It is safe to assume that there are no outstanding references to pages
2389 ** on block iTo. And that block iFrom is not currently being written. In
2390 ** other words, the data can be read and written directly.
2391 */
lsmFsMoveBlock(FileSystem * pFS,Segment * pSeg,int iTo,int iFrom)2392 int lsmFsMoveBlock(FileSystem *pFS, Segment *pSeg, int iTo, int iFrom){
2393   Snapshot *p = pFS->pDb->pWorker;
2394   int rc = LSM_OK;
2395   int i;
2396   i64 nMap;
2397 
2398   i64 iFromOff = (i64)(iFrom-1) * pFS->nBlocksize;
2399   i64 iToOff = (i64)(iTo-1) * pFS->nBlocksize;
2400 
2401   assert( iTo!=1 );
2402   assert( iFrom>iTo );
2403 
2404   /* Grow the mapping as required. */
2405   nMap = LSM_MIN(pFS->nMapLimit, (i64)iFrom * pFS->nBlocksize);
2406   fsGrowMapping(pFS, nMap, &rc);
2407 
2408   if( rc==LSM_OK ){
2409     const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
2410     int nSz = pFS->nPagesize;
2411     u8 *aBuf = 0;
2412     u8 *aData = 0;
2413 
2414     for(i=0; rc==LSM_OK && i<nPagePerBlock; i++){
2415       i64 iOff = iFromOff + i*nSz;
2416 
2417       /* Set aData to point to a buffer containing the from page */
2418       if( (iOff+nSz)<=pFS->nMapLimit ){
2419         u8 *aMap = (u8 *)(pFS->pMap);
2420         aData = &aMap[iOff];
2421       }else{
2422         if( aBuf==0 ){
2423           aBuf = (u8 *)lsmMallocRc(pFS->pEnv, nSz, &rc);
2424           if( aBuf==0 ) break;
2425         }
2426         aData = aBuf;
2427         rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aData, nSz);
2428       }
2429 
2430       /* Copy aData to the to page */
2431       if( rc==LSM_OK ){
2432         iOff = iToOff + i*nSz;
2433         if( (iOff+nSz)<=pFS->nMapLimit ){
2434           u8 *aMap = (u8 *)(pFS->pMap);
2435           memcpy(&aMap[iOff], aData, nSz);
2436         }else{
2437           rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, aData, nSz);
2438         }
2439       }
2440     }
2441     lsmFree(pFS->pEnv, aBuf);
2442     lsmFsPurgeCache(pFS);
2443   }
2444 
2445   /* Update append-point list if necessary */
2446   for(i=0; i<LSM_APPLIST_SZ; i++){
2447     fsMovePage(pFS, iTo, iFrom, &p->aiAppend[i]);
2448   }
2449 
2450   /* Update the Segment structure itself */
2451   fsMovePage(pFS, iTo, iFrom, &pSeg->iFirst);
2452   fsMovePage(pFS, iTo, iFrom, &pSeg->iLastPg);
2453   fsMovePage(pFS, iTo, iFrom, &pSeg->iRoot);
2454 
2455   return rc;
2456 }
2457 
2458 /*
2459 ** Append raw data to a segment. Return the database file offset that the
2460 ** data is written to (this may be used as the page number if the data
2461 ** being appended is a new page record).
2462 **
2463 ** This function is only used in compressed database mode.
2464 */
fsAppendData(FileSystem * pFS,Segment * pSeg,const u8 * aData,int nData,int * pRc)2465 static LsmPgno fsAppendData(
2466   FileSystem *pFS,                /* File-system handle */
2467   Segment *pSeg,                  /* Segment to append to */
2468   const u8 *aData,                /* Buffer containing data to write */
2469   int nData,                      /* Size of buffer aData[] in bytes */
2470   int *pRc                        /* IN/OUT: Error code */
2471 ){
2472   LsmPgno iRet = 0;
2473   int rc = *pRc;
2474   assert( pFS->pCompress );
2475   if( rc==LSM_OK ){
2476     int nRem = 0;
2477     int nWrite = 0;
2478     LsmPgno iLastOnBlock;
2479     LsmPgno iApp = pSeg->iLastPg+1;
2480 
2481     /* If this is the first data written into the segment, find an append-point
2482     ** or allocate a new block.  */
2483     if( iApp==1 ){
2484       pSeg->iFirst = iApp = findAppendPoint(pFS, 0);
2485       if( iApp==0 ){
2486         int iBlk;
2487         rc = lsmBlockAllocate(pFS->pDb, 0, &iBlk);
2488         pSeg->iFirst = iApp = fsFirstPageOnBlock(pFS, iBlk);
2489       }
2490     }
2491     iRet = iApp;
2492 
2493     /* Write as much data as is possible at iApp (usually all of it). */
2494     iLastOnBlock = fsLastPageOnPagesBlock(pFS, iApp);
2495     if( rc==LSM_OK ){
2496       int nSpace = (int)(iLastOnBlock - iApp + 1);
2497       nWrite = LSM_MIN(nData, nSpace);
2498       nRem = nData - nWrite;
2499       assert( nWrite>=0 );
2500       if( nWrite!=0 ){
2501         rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aData, nWrite);
2502       }
2503       iApp += nWrite;
2504     }
2505 
2506     /* If required, allocate a new block and write the rest of the data
2507     ** into it. Set the next and previous block pointers to link the new
2508     ** block to the old.  */
2509     assert( nRem<=0 || (iApp-1)==iLastOnBlock );
2510     if( rc==LSM_OK && (iApp-1)==iLastOnBlock ){
2511       u8 aPtr[4];                 /* Space to serialize a u32 */
2512       int iBlk;                   /* New block number */
2513 
2514       if( nWrite>0 ){
2515         /* Allocate a new block. */
2516         rc = lsmBlockAllocate(pFS->pDb, 0, &iBlk);
2517 
2518         /* Set the "next" pointer on the old block */
2519         if( rc==LSM_OK ){
2520           assert( iApp==(fsPageToBlock(pFS, iApp)*pFS->nBlocksize)-4 );
2521           lsmPutU32(aPtr, iBlk);
2522           rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aPtr, sizeof(aPtr));
2523         }
2524 
2525         /* Set the "prev" pointer on the new block */
2526         if( rc==LSM_OK ){
2527           LsmPgno iWrite;
2528           lsmPutU32(aPtr, fsPageToBlock(pFS, iApp));
2529           iWrite = fsFirstPageOnBlock(pFS, iBlk);
2530           rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iWrite-4, aPtr, sizeof(aPtr));
2531           if( nRem>0 ) iApp = iWrite;
2532         }
2533       }else{
2534         /* The next block is already allocated. */
2535         assert( nRem>0 );
2536         assert( pSeg->pRedirect==0 );
2537         rc = fsBlockNext(pFS, 0, fsPageToBlock(pFS, iApp), &iBlk);
2538         iRet = iApp = fsFirstPageOnBlock(pFS, iBlk);
2539       }
2540 
2541       /* Write the remaining data into the new block */
2542       if( rc==LSM_OK && nRem>0 ){
2543         rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, &aData[nWrite], nRem);
2544         iApp += nRem;
2545       }
2546     }
2547 
2548     pSeg->iLastPg = iApp-1;
2549     *pRc = rc;
2550   }
2551 
2552   return iRet;
2553 }
2554 
2555 /*
2556 ** This function is only called in compressed database mode. It
2557 ** compresses the contents of page pPg and writes the result to the
2558 ** buffer at pFS->aOBuffer. The size of the compressed data is stored in
2559 ** pPg->nCompress.
2560 **
2561 ** If buffer pFS->aOBuffer[] has not been allocated then this function
2562 ** allocates it. If this fails, LSM_NOMEM is returned. Otherwise, LSM_OK.
2563 */
fsCompressIntoBuffer(FileSystem * pFS,Page * pPg)2564 static int fsCompressIntoBuffer(FileSystem *pFS, Page *pPg){
2565   lsm_compress *p = pFS->pCompress;
2566 
2567   if( fsAllocateBuffer(pFS, 1) ) return LSM_NOMEM;
2568   assert( pPg->nData==pFS->nPagesize );
2569 
2570   pPg->nCompress = pFS->nBuffer;
2571   return p->xCompress(p->pCtx,
2572       (char *)pFS->aOBuffer, &pPg->nCompress,
2573       (const char *)pPg->aData, pPg->nData
2574   );
2575 }
2576 
2577 /*
2578 ** Append a new page to segment pSeg. Set output variable *piNew to the
2579 ** page number of the new page before returning.
2580 **
2581 ** If the new page is the last on its block, then the 'next' block that
2582 ** will be used by the segment is allocated here too. In this case output
2583 ** variable *piNext is set to the block number of the next block.
2584 **
2585 ** If the new page is the first on its block but not the first in the
2586 ** entire segment, set output variable *piPrev to the block number of
2587 ** the previous block in the segment.
2588 **
2589 ** LSM_OK is returned if successful, or an lsm error code otherwise. If
2590 ** any value other than LSM_OK is returned, then the final value of all
2591 ** output variables is undefined.
2592 */
fsAppendPage(FileSystem * pFS,Segment * pSeg,LsmPgno * piNew,int * piPrev,int * piNext)2593 static int fsAppendPage(
2594   FileSystem *pFS,
2595   Segment *pSeg,
2596   LsmPgno *piNew,
2597   int *piPrev,
2598   int *piNext
2599 ){
2600   LsmPgno iPrev = pSeg->iLastPg;
2601   int rc;
2602   assert( iPrev!=0 );
2603 
2604   *piPrev = 0;
2605   *piNext = 0;
2606 
2607   if( fsIsLast(pFS, iPrev) ){
2608     /* Grab the first page on the next block (which has already be
2609     ** allocated). In this case set *piPrev to tell the caller to set
2610     ** the "previous block" pointer in the first 4 bytes of the page.
2611     */
2612     int iNext;
2613     int iBlk = fsPageToBlock(pFS, iPrev);
2614     assert( pSeg->pRedirect==0 );
2615     rc = fsBlockNext(pFS, 0, iBlk, &iNext);
2616     if( rc!=LSM_OK ) return rc;
2617     *piNew = fsFirstPageOnBlock(pFS, iNext);
2618     *piPrev = iBlk;
2619   }else{
2620     *piNew = iPrev+1;
2621     if( fsIsLast(pFS, *piNew) ){
2622       /* Allocate the next block here. */
2623       int iBlk;
2624       rc = lsmBlockAllocate(pFS->pDb, 0, &iBlk);
2625       if( rc!=LSM_OK ) return rc;
2626       *piNext = iBlk;
2627     }
2628   }
2629 
2630   pSeg->nSize++;
2631   pSeg->iLastPg = *piNew;
2632   return LSM_OK;
2633 }
2634 
2635 /*
2636 ** Flush all pages in the FileSystem.pWaiting list to disk.
2637 */
lsmFsFlushWaiting(FileSystem * pFS,int * pRc)2638 void lsmFsFlushWaiting(FileSystem *pFS, int *pRc){
2639   int rc = *pRc;
2640   Page *pPg;
2641 
2642   pPg = pFS->pWaiting;
2643   pFS->pWaiting = 0;
2644 
2645   while( pPg ){
2646     Page *pNext = pPg->pWaitingNext;
2647     if( rc==LSM_OK ) rc = lsmFsPagePersist(pPg);
2648     assert( pPg->nRef==1 );
2649     lsmFsPageRelease(pPg);
2650     pPg = pNext;
2651   }
2652   *pRc = rc;
2653 }
2654 
2655 /*
2656 ** If there exists a hash-table entry associated with page iPg, remove it.
2657 */
fsRemoveHashEntry(FileSystem * pFS,LsmPgno iPg)2658 static void fsRemoveHashEntry(FileSystem *pFS, LsmPgno iPg){
2659   Page *p;
2660   int iHash = fsHashKey(pFS->nHash, iPg);
2661 
2662   for(p=pFS->apHash[iHash]; p && p->iPg!=iPg; p=p->pHashNext);
2663 
2664   if( p ){
2665     assert( p->nRef==0 || (p->flags & PAGE_FREE)==0 );
2666     fsPageRemoveFromHash(pFS, p);
2667     p->iPg = 0;
2668     iHash = fsHashKey(pFS->nHash, 0);
2669     p->pHashNext = pFS->apHash[iHash];
2670     pFS->apHash[iHash] = p;
2671   }
2672 }
2673 
2674 /*
2675 ** If the page passed as an argument is dirty, update the database file
2676 ** (or mapping of the database file) with its current contents and mark
2677 ** the page as clean.
2678 **
2679 ** Return LSM_OK if the operation is a success, or an LSM error code
2680 ** otherwise.
2681 */
lsmFsPagePersist(Page * pPg)2682 int lsmFsPagePersist(Page *pPg){
2683   int rc = LSM_OK;
2684   if( pPg && (pPg->flags & PAGE_DIRTY) ){
2685     FileSystem *pFS = pPg->pFS;
2686 
2687     if( pFS->pCompress ){
2688       int iHash;                  /* Hash key of assigned page number */
2689       u8 aSz[3];                  /* pPg->nCompress as a 24-bit big-endian */
2690       assert( pPg->pSeg && pPg->iPg==0 && pPg->nCompress==0 );
2691 
2692       /* Compress the page image. */
2693       rc = fsCompressIntoBuffer(pFS, pPg);
2694 
2695       /* Serialize the compressed size into buffer aSz[] */
2696       putRecordSize(aSz, pPg->nCompress, 0);
2697 
2698       /* Write the serialized page record into the database file. */
2699       pPg->iPg = fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc);
2700       fsAppendData(pFS, pPg->pSeg, pFS->aOBuffer, pPg->nCompress, &rc);
2701       fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc);
2702 
2703       /* Now that it has a page number, insert the page into the hash table */
2704       iHash = fsHashKey(pFS->nHash, pPg->iPg);
2705       pPg->pHashNext = pFS->apHash[iHash];
2706       pFS->apHash[iHash] = pPg;
2707 
2708       pPg->pSeg->nSize += (sizeof(aSz) * 2) + pPg->nCompress;
2709 
2710       pPg->flags &= ~PAGE_DIRTY;
2711       pFS->nWrite++;
2712     }else{
2713 
2714       if( pPg->iPg==0 ){
2715         /* No page number has been assigned yet. This occurs with pages used
2716         ** in the b-tree hierarchy. They were not assigned page numbers when
2717         ** they were created as doing so would cause this call to
2718         ** lsmFsPagePersist() to write an out-of-order page. Instead a page
2719         ** number is assigned here so that the page data will be appended
2720         ** to the current segment.
2721         */
2722         Page **pp;
2723         int iPrev = 0;
2724         int iNext = 0;
2725         int iHash;
2726 
2727         assert( pPg->pSeg->iFirst );
2728         assert( pPg->flags & PAGE_FREE );
2729         assert( (pPg->flags & PAGE_HASPREV)==0 );
2730         assert( pPg->nData==pFS->nPagesize-4 );
2731 
2732         rc = fsAppendPage(pFS, pPg->pSeg, &pPg->iPg, &iPrev, &iNext);
2733         if( rc!=LSM_OK ) return rc;
2734 
2735         assert( pPg->flags & PAGE_FREE );
2736         iHash = fsHashKey(pFS->nHash, pPg->iPg);
2737         fsRemoveHashEntry(pFS, pPg->iPg);
2738         pPg->pHashNext = pFS->apHash[iHash];
2739         pFS->apHash[iHash] = pPg;
2740         assert( pPg->pHashNext==0 || pPg->pHashNext->iPg!=pPg->iPg );
2741 
2742         if( iPrev ){
2743           assert( iNext==0 );
2744           memmove(&pPg->aData[4], pPg->aData, pPg->nData);
2745           lsmPutU32(pPg->aData, iPrev);
2746           pPg->flags |= PAGE_HASPREV;
2747           pPg->aData += 4;
2748         }else if( iNext ){
2749           assert( iPrev==0 );
2750           lsmPutU32(&pPg->aData[pPg->nData], iNext);
2751         }else{
2752           int nData = pPg->nData;
2753           pPg->nData += 4;
2754           lsmSortedExpandBtreePage(pPg, nData);
2755         }
2756 
2757         pPg->nRef++;
2758         for(pp=&pFS->pWaiting; *pp; pp=&(*pp)->pWaitingNext);
2759         *pp = pPg;
2760         assert( pPg->pWaitingNext==0 );
2761 
2762       }else{
2763         i64 iOff;                   /* Offset to write within database file */
2764 
2765         iOff = (i64)pFS->nPagesize * (i64)(pPg->iPg-1);
2766         if( fsMmapPage(pFS, pPg->iPg)==0 ){
2767           u8 *aData = pPg->aData - (pPg->flags & PAGE_HASPREV);
2768           rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, aData, pFS->nPagesize);
2769         }else if( pPg->flags & PAGE_FREE ){
2770           fsGrowMapping(pFS, iOff + pFS->nPagesize, &rc);
2771           if( rc==LSM_OK ){
2772             u8 *aTo = &((u8 *)(pFS->pMap))[iOff];
2773             u8 *aFrom = pPg->aData - (pPg->flags & PAGE_HASPREV);
2774             memcpy(aTo, aFrom, pFS->nPagesize);
2775             lsmFree(pFS->pEnv, aFrom);
2776             pFS->nCacheAlloc--;
2777             pPg->aData = aTo + (pPg->flags & PAGE_HASPREV);
2778             pPg->flags &= ~PAGE_FREE;
2779             fsPageRemoveFromHash(pFS, pPg);
2780             pPg->pMappedNext = pFS->pMapped;
2781             pFS->pMapped = pPg;
2782           }
2783         }
2784 
2785         lsmFsFlushWaiting(pFS, &rc);
2786         pPg->flags &= ~PAGE_DIRTY;
2787         pFS->nWrite++;
2788       }
2789     }
2790   }
2791 
2792   return rc;
2793 }
2794 
2795 /*
2796 ** For non-compressed databases, this function is a no-op. For compressed
2797 ** databases, it adds a padding record to the segment passed as the third
2798 ** argument.
2799 **
2800 ** The size of the padding records is selected so that the last byte
2801 ** written is the last byte of a disk sector. This means that if a
2802 ** snapshot is taken and checkpointed, subsequent worker processes will
2803 ** not write to any sector that contains checkpointed data.
2804 */
lsmFsSortedPadding(FileSystem * pFS,Snapshot * pSnapshot,Segment * pSeg)2805 int lsmFsSortedPadding(
2806   FileSystem *pFS,
2807   Snapshot *pSnapshot,
2808   Segment *pSeg
2809 ){
2810   int rc = LSM_OK;
2811   if( pFS->pCompress && pSeg->iFirst ){
2812     LsmPgno iLast2;
2813     LsmPgno iLast = pSeg->iLastPg;  /* Current last page of segment */
2814     int nPad;                       /* Bytes of padding required */
2815     u8 aSz[3];
2816 
2817     iLast2 = (1 + iLast/pFS->szSector) * pFS->szSector - 1;
2818     assert( fsPageToBlock(pFS, iLast)==fsPageToBlock(pFS, iLast2) );
2819     nPad = (int)(iLast2 - iLast);
2820 
2821     if( iLast2>fsLastPageOnPagesBlock(pFS, iLast) ){
2822       nPad -= 4;
2823     }
2824     assert( nPad>=0 );
2825 
2826     if( nPad>=6 ){
2827       pSeg->nSize += nPad;
2828       nPad -= 6;
2829       putRecordSize(aSz, nPad, 1);
2830       fsAppendData(pFS, pSeg, aSz, sizeof(aSz), &rc);
2831       memset(pFS->aOBuffer, 0, nPad);
2832       fsAppendData(pFS, pSeg, pFS->aOBuffer, nPad, &rc);
2833       fsAppendData(pFS, pSeg, aSz, sizeof(aSz), &rc);
2834     }else if( nPad>0 ){
2835       u8 aBuf[5] = {0,0,0,0,0};
2836       aBuf[0] = (u8)nPad;
2837       aBuf[nPad-1] = (u8)nPad;
2838       fsAppendData(pFS, pSeg, aBuf, nPad, &rc);
2839     }
2840 
2841     assert( rc!=LSM_OK
2842         || pSeg->iLastPg==fsLastPageOnPagesBlock(pFS, pSeg->iLastPg)
2843         || ((pSeg->iLastPg + 1) % pFS->szSector)==0
2844     );
2845   }
2846 
2847   return rc;
2848 }
2849 
2850 
2851 /*
2852 ** Increment the reference count on the page object passed as the first
2853 ** argument.
2854 */
lsmFsPageRef(Page * pPg)2855 void lsmFsPageRef(Page *pPg){
2856   if( pPg ){
2857     pPg->nRef++;
2858   }
2859 }
2860 
2861 /*
2862 ** Release a page-reference obtained using fsPageGet().
2863 */
lsmFsPageRelease(Page * pPg)2864 int lsmFsPageRelease(Page *pPg){
2865   int rc = LSM_OK;
2866   if( pPg ){
2867     assert( pPg->nRef>0 );
2868     pPg->nRef--;
2869     if( pPg->nRef==0 ){
2870       FileSystem *pFS = pPg->pFS;
2871       rc = lsmFsPagePersist(pPg);
2872       pFS->nOut--;
2873 
2874       assert( pPg->pFS->pCompress
2875            || fsIsFirst(pPg->pFS, pPg->iPg)==0
2876            || (pPg->flags & PAGE_HASPREV)
2877       );
2878       pPg->aData -= (pPg->flags & PAGE_HASPREV);
2879       pPg->flags &= ~PAGE_HASPREV;
2880 
2881       if( (pPg->flags & PAGE_FREE)==0 ){
2882         /* Removed from mapped list */
2883         Page **pp;
2884         for(pp=&pFS->pMapped; (*pp)!=pPg; pp=&(*pp)->pMappedNext);
2885         *pp = pPg->pMappedNext;
2886         pPg->pMappedNext = 0;
2887 
2888         /* Add to free list */
2889         pPg->pFreeNext = pFS->pFree;
2890         pFS->pFree = pPg;
2891       }else{
2892         fsPageAddToLru(pFS, pPg);
2893       }
2894     }
2895   }
2896 
2897   return rc;
2898 }
2899 
2900 /*
2901 ** Return the total number of pages read from the database file.
2902 */
lsmFsNRead(FileSystem * pFS)2903 int lsmFsNRead(FileSystem *pFS){ return pFS->nRead; }
2904 
2905 /*
2906 ** Return the total number of pages written to the database file.
2907 */
lsmFsNWrite(FileSystem * pFS)2908 int lsmFsNWrite(FileSystem *pFS){ return pFS->nWrite; }
2909 
2910 /*
2911 ** Return a copy of the environment pointer used by the file-system object.
2912 */
lsmFsEnv(FileSystem * pFS)2913 lsm_env *lsmFsEnv(FileSystem *pFS){
2914   return pFS->pEnv;
2915 }
2916 
2917 /*
2918 ** Return a copy of the environment pointer used by the file-system object
2919 ** to which this page belongs.
2920 */
lsmPageEnv(Page * pPg)2921 lsm_env *lsmPageEnv(Page *pPg) {
2922   return pPg->pFS->pEnv;
2923 }
2924 
2925 /*
2926 ** Return a pointer to the file-system object associated with the Page
2927 ** passed as the only argument.
2928 */
lsmPageFS(Page * pPg)2929 FileSystem *lsmPageFS(Page *pPg){
2930   return pPg->pFS;
2931 }
2932 
2933 /*
2934 ** Return the sector-size as reported by the log file handle.
2935 */
lsmFsSectorSize(FileSystem * pFS)2936 int lsmFsSectorSize(FileSystem *pFS){
2937   return pFS->szSector;
2938 }
2939 
2940 /*
2941 ** Helper function for lsmInfoArrayStructure().
2942 */
startsWith(Segment * pRun,LsmPgno iFirst)2943 static Segment *startsWith(Segment *pRun, LsmPgno iFirst){
2944   return (iFirst==pRun->iFirst) ? pRun : 0;
2945 }
2946 
2947 /*
2948 ** Return the segment that starts with page iFirst, if any. If no such segment
2949 ** can be found, return NULL.
2950 */
findSegment(Snapshot * pWorker,LsmPgno iFirst)2951 static Segment *findSegment(Snapshot *pWorker, LsmPgno iFirst){
2952   Level *pLvl;                    /* Used to iterate through db levels */
2953   Segment *pSeg = 0;              /* Pointer to segment to return */
2954 
2955   for(pLvl=lsmDbSnapshotLevel(pWorker); pLvl && pSeg==0; pLvl=pLvl->pNext){
2956     if( 0==(pSeg = startsWith(&pLvl->lhs, iFirst)) ){
2957       int i;
2958       for(i=0; i<pLvl->nRight; i++){
2959         if( (pSeg = startsWith(&pLvl->aRhs[i], iFirst)) ) break;
2960       }
2961     }
2962   }
2963 
2964   return pSeg;
2965 }
2966 
2967 /*
2968 ** This function implements the lsm_info(LSM_INFO_ARRAY_STRUCTURE) request.
2969 ** If successful, *pzOut is set to point to a nul-terminated string
2970 ** containing the array structure and LSM_OK is returned. The caller should
2971 ** eventually free the string using lsmFree().
2972 **
2973 ** If an error occurs, *pzOut is set to NULL and an LSM error code returned.
2974 */
lsmInfoArrayStructure(lsm_db * pDb,int bBlock,LsmPgno iFirst,char ** pzOut)2975 int lsmInfoArrayStructure(
2976   lsm_db *pDb,
2977   int bBlock,                     /* True for block numbers only */
2978   LsmPgno iFirst,
2979   char **pzOut
2980 ){
2981   int rc = LSM_OK;
2982   Snapshot *pWorker;              /* Worker snapshot */
2983   Segment *pArray = 0;            /* Array to report on */
2984   int bUnlock = 0;
2985 
2986   *pzOut = 0;
2987   if( iFirst==0 ) return LSM_ERROR;
2988 
2989   /* Obtain the worker snapshot */
2990   pWorker = pDb->pWorker;
2991   if( !pWorker ){
2992     rc = lsmBeginWork(pDb);
2993     if( rc!=LSM_OK ) return rc;
2994     pWorker = pDb->pWorker;
2995     bUnlock = 1;
2996   }
2997 
2998   /* Search for the array that starts on page iFirst */
2999   pArray = findSegment(pWorker, iFirst);
3000 
3001   if( pArray==0 ){
3002     /* Could not find the requested array. This is an error. */
3003     rc = LSM_ERROR;
3004   }else{
3005     FileSystem *pFS = pDb->pFS;
3006     LsmString str;
3007     int iBlk;
3008     int iLastBlk;
3009 
3010     iBlk = fsPageToBlock(pFS, pArray->iFirst);
3011     iLastBlk = fsPageToBlock(pFS, pArray->iLastPg);
3012 
3013     lsmStringInit(&str, pDb->pEnv);
3014     if( bBlock ){
3015       lsmStringAppendf(&str, "%d", iBlk);
3016       while( iBlk!=iLastBlk ){
3017         fsBlockNext(pFS, pArray, iBlk, &iBlk);
3018         lsmStringAppendf(&str, " %d", iBlk);
3019       }
3020     }else{
3021       lsmStringAppendf(&str, "%d", pArray->iFirst);
3022       while( iBlk!=iLastBlk ){
3023         lsmStringAppendf(&str, " %d", fsLastPageOnBlock(pFS, iBlk));
3024         fsBlockNext(pFS, pArray, iBlk, &iBlk);
3025         lsmStringAppendf(&str, " %d", fsFirstPageOnBlock(pFS, iBlk));
3026       }
3027       lsmStringAppendf(&str, " %d", pArray->iLastPg);
3028     }
3029 
3030     *pzOut = str.z;
3031   }
3032 
3033   if( bUnlock ){
3034     int rcwork = LSM_BUSY;
3035     lsmFinishWork(pDb, 0, &rcwork);
3036   }
3037   return rc;
3038 }
3039 
lsmFsSegmentContainsPg(FileSystem * pFS,Segment * pSeg,LsmPgno iPg,int * pbRes)3040 int lsmFsSegmentContainsPg(
3041   FileSystem *pFS,
3042   Segment *pSeg,
3043   LsmPgno iPg,
3044   int *pbRes
3045 ){
3046   Redirect *pRedir = pSeg->pRedirect;
3047   int rc = LSM_OK;
3048   int iBlk;
3049   int iLastBlk;
3050   int iPgBlock;                   /* Block containing page iPg */
3051 
3052   iPgBlock = fsPageToBlock(pFS, pSeg->iFirst);
3053   iBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iFirst));
3054   iLastBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iLastPg));
3055 
3056   while( iBlk!=iLastBlk && iBlk!=iPgBlock && rc==LSM_OK ){
3057     rc = fsBlockNext(pFS, pSeg, iBlk, &iBlk);
3058   }
3059 
3060   *pbRes = (iBlk==iPgBlock);
3061   return rc;
3062 }
3063 
3064 /*
3065 ** This function implements the lsm_info(LSM_INFO_ARRAY_PAGES) request.
3066 ** If successful, *pzOut is set to point to a nul-terminated string
3067 ** containing the array structure and LSM_OK is returned. The caller should
3068 ** eventually free the string using lsmFree().
3069 **
3070 ** If an error occurs, *pzOut is set to NULL and an LSM error code returned.
3071 */
lsmInfoArrayPages(lsm_db * pDb,LsmPgno iFirst,char ** pzOut)3072 int lsmInfoArrayPages(lsm_db *pDb, LsmPgno iFirst, char **pzOut){
3073   int rc = LSM_OK;
3074   Snapshot *pWorker;              /* Worker snapshot */
3075   Segment *pSeg = 0;              /* Array to report on */
3076   int bUnlock = 0;
3077 
3078   *pzOut = 0;
3079   if( iFirst==0 ) return LSM_ERROR;
3080 
3081   /* Obtain the worker snapshot */
3082   pWorker = pDb->pWorker;
3083   if( !pWorker ){
3084     rc = lsmBeginWork(pDb);
3085     if( rc!=LSM_OK ) return rc;
3086     pWorker = pDb->pWorker;
3087     bUnlock = 1;
3088   }
3089 
3090   /* Search for the array that starts on page iFirst */
3091   pSeg = findSegment(pWorker, iFirst);
3092 
3093   if( pSeg==0 ){
3094     /* Could not find the requested array. This is an error. */
3095     rc = LSM_ERROR;
3096   }else{
3097     Page *pPg = 0;
3098     FileSystem *pFS = pDb->pFS;
3099     LsmString str;
3100 
3101     lsmStringInit(&str, pDb->pEnv);
3102     rc = lsmFsDbPageGet(pFS, pSeg, iFirst, &pPg);
3103     while( rc==LSM_OK && pPg ){
3104       Page *pNext = 0;
3105       lsmStringAppendf(&str, " %lld", lsmFsPageNumber(pPg));
3106       rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
3107       lsmFsPageRelease(pPg);
3108       pPg = pNext;
3109     }
3110 
3111     if( rc!=LSM_OK ){
3112       lsmFree(pDb->pEnv, str.z);
3113     }else{
3114       *pzOut = str.z;
3115     }
3116   }
3117 
3118   if( bUnlock ){
3119     int rcwork = LSM_BUSY;
3120     lsmFinishWork(pDb, 0, &rcwork);
3121   }
3122   return rc;
3123 }
3124 
3125 /*
3126 ** The following macros are used by the integrity-check code. Associated with
3127 ** each block in the database is an 8-bit bit mask (the entry in the aUsed[]
3128 ** array). As the integrity-check meanders through the database, it sets the
3129 ** following bits to indicate how each block is used.
3130 **
3131 ** INTEGRITY_CHECK_FIRST_PG:
3132 **   First page of block is in use by sorted run.
3133 **
3134 ** INTEGRITY_CHECK_LAST_PG:
3135 **   Last page of block is in use by sorted run.
3136 **
3137 ** INTEGRITY_CHECK_USED:
3138 **   At least one page of the block is in use by a sorted run.
3139 **
3140 ** INTEGRITY_CHECK_FREE:
3141 **   The free block list contains an entry corresponding to this block.
3142 */
3143 #define INTEGRITY_CHECK_FIRST_PG 0x01
3144 #define INTEGRITY_CHECK_LAST_PG  0x02
3145 #define INTEGRITY_CHECK_USED     0x04
3146 #define INTEGRITY_CHECK_FREE     0x08
3147 
3148 /*
3149 ** Helper function for lsmFsIntegrityCheck()
3150 */
checkBlocks(FileSystem * pFS,Segment * pSeg,int bExtra,int nUsed,u8 * aUsed)3151 static void checkBlocks(
3152   FileSystem *pFS,
3153   Segment *pSeg,
3154   int bExtra,                     /* If true, count the "next" block if any */
3155   int nUsed,
3156   u8 *aUsed
3157 ){
3158   if( pSeg ){
3159     if( pSeg && pSeg->nSize>0 ){
3160       int rc;
3161       int iBlk;                   /* Current block (during iteration) */
3162       int iLastBlk;               /* Last block of segment */
3163       int iFirstBlk;              /* First block of segment */
3164       int bLastIsLastOnBlock;     /* True iLast is the last on its block */
3165 
3166       assert( 0==fsSegmentRedirects(pFS, pSeg) );
3167       iBlk = iFirstBlk = fsPageToBlock(pFS, pSeg->iFirst);
3168       iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg);
3169 
3170       bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==pSeg->iLastPg);
3171       assert( iBlk>0 );
3172 
3173       do {
3174         /* iBlk is a part of this sorted run. */
3175         aUsed[iBlk-1] |= INTEGRITY_CHECK_USED;
3176 
3177         /* If the first page of this block is also part of the segment,
3178         ** set the flag to indicate that the first page of iBlk is in use.
3179         */
3180         if( fsFirstPageOnBlock(pFS, iBlk)==pSeg->iFirst || iBlk!=iFirstBlk ){
3181           assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_FIRST_PG)==0 );
3182           aUsed[iBlk-1] |= INTEGRITY_CHECK_FIRST_PG;
3183         }
3184 
3185         /* Unless the sorted run finishes before the last page on this block,
3186         ** the last page of this block is also in use.  */
3187         if( iBlk!=iLastBlk || bLastIsLastOnBlock ){
3188           assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_LAST_PG)==0 );
3189           aUsed[iBlk-1] |= INTEGRITY_CHECK_LAST_PG;
3190         }
3191 
3192         /* Special case. The sorted run being scanned is the output run of
3193         ** a level currently undergoing an incremental merge. The sorted
3194         ** run ends on the last page of iBlk, but the next block has already
3195         ** been allocated. So mark it as in use as well.  */
3196         if( iBlk==iLastBlk && bLastIsLastOnBlock && bExtra ){
3197           int iExtra = 0;
3198           rc = fsBlockNext(pFS, pSeg, iBlk, &iExtra);
3199           assert( rc==LSM_OK );
3200 
3201           assert( aUsed[iExtra-1]==0 );
3202           aUsed[iExtra-1] |= INTEGRITY_CHECK_USED;
3203           aUsed[iExtra-1] |= INTEGRITY_CHECK_FIRST_PG;
3204           aUsed[iExtra-1] |= INTEGRITY_CHECK_LAST_PG;
3205         }
3206 
3207         /* Move on to the next block in the sorted run. Or set iBlk to zero
3208         ** in order to break out of the loop if this was the last block in
3209         ** the run.  */
3210         if( iBlk==iLastBlk ){
3211           iBlk = 0;
3212         }else{
3213           rc = fsBlockNext(pFS, pSeg, iBlk, &iBlk);
3214           assert( rc==LSM_OK );
3215         }
3216       }while( iBlk );
3217     }
3218   }
3219 }
3220 
3221 typedef struct CheckFreelistCtx CheckFreelistCtx;
3222 struct CheckFreelistCtx {
3223   u8 *aUsed;
3224   int nBlock;
3225 };
checkFreelistCb(void * pCtx,int iBlk,i64 iSnapshot)3226 static int checkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
3227   CheckFreelistCtx *p = (CheckFreelistCtx *)pCtx;
3228 
3229   assert( iBlk>=1 );
3230   assert( iBlk<=p->nBlock );
3231   assert( p->aUsed[iBlk-1]==0 );
3232   p->aUsed[iBlk-1] = INTEGRITY_CHECK_FREE;
3233   return 0;
3234 }
3235 
3236 /*
3237 ** This function checks that all blocks in the database file are accounted
3238 ** for. For each block, exactly one of the following must be true:
3239 **
3240 **   + the block is part of a sorted run, or
3241 **   + the block is on the free-block list
3242 **
3243 ** This function also checks that there are no references to blocks with
3244 ** out-of-range block numbers.
3245 **
3246 ** If no errors are found, non-zero is returned. If an error is found, an
3247 ** assert() fails.
3248 */
lsmFsIntegrityCheck(lsm_db * pDb)3249 int lsmFsIntegrityCheck(lsm_db *pDb){
3250   CheckFreelistCtx ctx;
3251   FileSystem *pFS = pDb->pFS;
3252   int i;
3253   int rc;
3254   Freelist freelist = {0, 0, 0};
3255   u8 *aUsed;
3256   Level *pLevel;
3257   Snapshot *pWorker = pDb->pWorker;
3258   int nBlock = pWorker->nBlock;
3259 
3260 #if 0
3261   static int nCall = 0;
3262   nCall++;
3263   printf("%d calls\n", nCall);
3264 #endif
3265 
3266   aUsed = lsmMallocZero(pDb->pEnv, nBlock);
3267   if( aUsed==0 ){
3268     /* Malloc has failed. Since this function is only called within debug
3269     ** builds, this probably means the user is running an OOM injection test.
3270     ** Regardless, it will not be possible to run the integrity-check at this
3271     ** time, so assume the database is Ok and return non-zero. */
3272     return 1;
3273   }
3274 
3275   for(pLevel=pWorker->pLevel; pLevel; pLevel=pLevel->pNext){
3276     int j;
3277     checkBlocks(pFS, &pLevel->lhs, (pLevel->nRight!=0), nBlock, aUsed);
3278     for(j=0; j<pLevel->nRight; j++){
3279       checkBlocks(pFS, &pLevel->aRhs[j], 0, nBlock, aUsed);
3280     }
3281   }
3282 
3283   /* Mark all blocks in the free-list as used */
3284   ctx.aUsed = aUsed;
3285   ctx.nBlock = nBlock;
3286   rc = lsmWalkFreelist(pDb, 0, checkFreelistCb, (void *)&ctx);
3287 
3288   if( rc==LSM_OK ){
3289     for(i=0; i<nBlock; i++) assert( aUsed[i]!=0 );
3290   }
3291 
3292   lsmFree(pDb->pEnv, aUsed);
3293   lsmFree(pDb->pEnv, freelist.aEntry);
3294 
3295   return 1;
3296 }
3297 
3298 #ifndef NDEBUG
3299 /*
3300 ** Return true if pPg happens to be the last page in segment pSeg. Or false
3301 ** otherwise. This function is only invoked as part of assert() conditions.
3302 */
lsmFsDbPageIsLast(Segment * pSeg,Page * pPg)3303 int lsmFsDbPageIsLast(Segment *pSeg, Page *pPg){
3304   if( pPg->pFS->pCompress ){
3305     LsmPgno iNext = 0;
3306     int rc;
3307     rc = fsNextPageOffset(pPg->pFS, pSeg, pPg->iPg, pPg->nCompress+6, &iNext);
3308     return (rc!=LSM_OK || iNext==0);
3309   }
3310   return (pPg->iPg==pSeg->iLastPg);
3311 }
3312 #endif
3313