xref: /sqlite-3.40.0/ext/lsm1/lsm_tree.c (revision 1be50519)
1 /*
2 ** 2011-08-18
3 **
4 ** The author disclaims copyright to this source code.  In place of
5 ** a legal notice, here is a blessing:
6 **
7 **    May you do good and not evil.
8 **    May you find forgiveness for yourself and forgive others.
9 **    May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 **
13 ** This file contains the implementation of an in-memory tree structure.
14 **
15 ** Technically the tree is a B-tree of order 4 (in the Knuth sense - each
16 ** node may have up to 4 children). Keys are stored within B-tree nodes by
17 ** reference. This may be slightly slower than a conventional red-black
18 ** tree, but it is simpler. It is also an easier structure to modify to
19 ** create a version that supports nested transaction rollback.
20 **
21 ** This tree does not currently support a delete operation. One is not
22 ** required. When LSM deletes a key from a database, it inserts a DELETE
23 ** marker into the data structure. As a result, although the value associated
24 ** with a key stored in the in-memory tree structure may be modified, no
25 ** keys are ever removed.
26 */
27 
28 /*
29 ** MVCC NOTES
30 **
31 **   The in-memory tree structure supports SQLite-style MVCC. This means
32 **   that while one client is writing to the tree structure, other clients
33 **   may still be querying an older snapshot of the tree.
34 **
35 **   One way to implement this is to use an append-only b-tree. In this
36 **   case instead of modifying nodes in-place, a copy of the node is made
37 **   and the required modifications made to the copy. The parent of the
38 **   node is then modified (to update the pointer so that it points to
39 **   the new copy), which causes a copy of the parent to be made, and so on.
40 **   This means that each time the tree is written to a new root node is
41 **   created. A snapshot is identified by the root node that it uses.
42 **
43 **   The problem with the above is that each time the tree is written to,
44 **   a copy of the node structure modified and all of its ancestor nodes
45 **   is made. This may prove excessive with large tree structures.
46 **
47 **   To reduce this overhead, the data structure used for a tree node is
48 **   designed so that it may be edited in place exactly once without
49 **   affecting existing users. In other words, the node structure is capable
50 **   of storing two separate versions of the node at the same time.
51 **   When a node is to be edited, if the node structure already contains
52 **   two versions, a copy is made as in the append-only approach. Or, if
53 **   it only contains a single version, it is edited in place.
54 **
55 **   This reduces the overhead so that, roughly, one new node structure
56 **   must be allocated for each write (on top of those allocations that
57 **   would have been required by a non-MVCC tree). Logic: Assume that at
58 **   any time, 50% of nodes in the tree already contain 2 versions. When
59 **   a new entry is written to a node, there is a 50% chance that a copy
60 **   of the node will be required. And a 25% chance that a copy of its
61 **   parent is required. And so on.
62 **
63 ** ROLLBACK
64 **
65 **   The in-memory tree also supports transaction and sub-transaction
66 **   rollback. In order to rollback to point in time X, the following is
67 **   necessary:
68 **
69 **     1. All memory allocated since X must be freed, and
70 **     2. All "v2" data adding to nodes that existed at X should be zeroed.
71 **     3. The root node must be restored to its X value.
72 **
73 **   The Mempool object used to allocate memory for the tree supports
74 **   operation (1) - see the lsmPoolMark() and lsmPoolRevert() functions.
75 **
76 **   To support (2), all nodes that have v2 data are part of a singly linked
77 **   list, sorted by the age of the v2 data (nodes that have had data added
78 **   most recently are at the end of the list). So to zero all v2 data added
79 **   since X, the linked list is traversed from the first node added following
80 **   X onwards.
81 **
82 */
83 
84 #ifndef _LSM_INT_H
85 # include "lsmInt.h"
86 #endif
87 
88 #include <string.h>
89 
90 #define MAX_DEPTH 32
91 
92 typedef struct TreeKey TreeKey;
93 typedef struct TreeNode TreeNode;
94 typedef struct TreeLeaf TreeLeaf;
95 typedef struct NodeVersion NodeVersion;
96 
97 struct TreeOld {
98   u32 iShmid;                     /* Last shared-memory chunk in use by old */
99   u32 iRoot;                      /* Offset of root node in shm file */
100   u32 nHeight;                    /* Height of tree structure */
101 };
102 
103 #if 0
104 /*
105 ** assert() that a TreeKey.flags value is sane. Usage:
106 **
107 **   assert( lsmAssertFlagsOk(pTreeKey->flags) );
108 */
109 static int lsmAssertFlagsOk(u8 keyflags){
110   /* At least one flag must be set. Otherwise, what is this key doing? */
111   assert( keyflags!=0 );
112 
113   /* The POINT_DELETE and INSERT flags cannot both be set. */
114   assert( (keyflags & LSM_POINT_DELETE)==0 || (keyflags & LSM_INSERT)==0 );
115 
116   /* If both the START_DELETE and END_DELETE flags are set, then the INSERT
117   ** flag must also be set. In other words - the three DELETE flags cannot
118   ** all be set */
119   assert( (keyflags & LSM_END_DELETE)==0
120        || (keyflags & LSM_START_DELETE)==0
121        || (keyflags & LSM_POINT_DELETE)==0
122   );
123 
124   return 1;
125 }
126 #endif
127 static int assert_delete_ranges_match(lsm_db *);
128 static int treeCountEntries(lsm_db *db);
129 
130 /*
131 ** Container for a key-value pair. Within the *-shm file, each key/value
132 ** pair is stored in a single allocation (which may not actually be
133 ** contiguous in memory). Layout is the TreeKey structure, followed by
134 ** the nKey bytes of key blob, followed by the nValue bytes of value blob
135 ** (if nValue is non-negative).
136 */
137 struct TreeKey {
138   int nKey;                       /* Size of pKey in bytes */
139   int nValue;                     /* Size of pValue. Or negative. */
140   u8 flags;                       /* Various LSM_XXX flags */
141 };
142 
143 #define TKV_KEY(p) ((void *)&(p)[1])
144 #define TKV_VAL(p) ((void *)(((u8 *)&(p)[1]) + (p)->nKey))
145 
146 /*
147 ** A single tree node. A node structure may contain up to 3 key/value
148 ** pairs. Internal (non-leaf) nodes have up to 4 children.
149 **
150 ** TODO: Update the format of this to be more compact. Get it working
151 ** first though...
152 */
153 struct TreeNode {
154   u32 aiKeyPtr[3];                /* Array of pointers to TreeKey objects */
155 
156   /* The following fields are present for interior nodes only, not leaves. */
157   u32 aiChildPtr[4];              /* Array of pointers to child nodes */
158 
159   /* The extra child pointer slot. */
160   u32 iV2;                        /* Transaction number of v2 */
161   u8 iV2Child;                    /* apChild[] entry replaced by pV2Ptr */
162   u32 iV2Ptr;                     /* Substitute pointer */
163 };
164 
165 struct TreeLeaf {
166   u32 aiKeyPtr[3];                /* Array of pointers to TreeKey objects */
167 };
168 
169 typedef struct TreeBlob TreeBlob;
170 struct TreeBlob {
171   int n;
172   u8 *a;
173 };
174 
175 /*
176 ** Cursor for searching a tree structure.
177 **
178 ** If a cursor does not point to any element (a.k.a. EOF), then the
179 ** TreeCursor.iNode variable is set to a negative value. Otherwise, the
180 ** cursor currently points to key aiCell[iNode] on node apTreeNode[iNode].
181 **
182 ** Entries in the apTreeNode[] and aiCell[] arrays contain the node and
183 ** index of the TreeNode.apChild[] pointer followed to descend to the
184 ** current element. Hence apTreeNode[0] always contains the root node of
185 ** the tree.
186 */
187 struct TreeCursor {
188   lsm_db *pDb;                    /* Database handle for this cursor */
189   TreeRoot *pRoot;                /* Root node and height of tree to access */
190   int iNode;                      /* Cursor points at apTreeNode[iNode] */
191   TreeNode *apTreeNode[MAX_DEPTH];/* Current position in tree */
192   u8 aiCell[MAX_DEPTH];           /* Current position in tree */
193   TreeKey *pSave;                 /* Saved key */
194   TreeBlob blob;                  /* Dynamic storage for a key */
195 };
196 
197 /*
198 ** A value guaranteed to be larger than the largest possible transaction
199 ** id (TreeHeader.iTransId).
200 */
201 #define WORKING_VERSION (1<<30)
202 
tblobGrow(lsm_db * pDb,TreeBlob * p,int n,int * pRc)203 static int tblobGrow(lsm_db *pDb, TreeBlob *p, int n, int *pRc){
204   if( n>p->n ){
205     lsmFree(pDb->pEnv, p->a);
206     p->a = lsmMallocRc(pDb->pEnv, n, pRc);
207     p->n = n;
208   }
209   return (p->a==0);
210 }
tblobFree(lsm_db * pDb,TreeBlob * p)211 static void tblobFree(lsm_db *pDb, TreeBlob *p){
212   lsmFree(pDb->pEnv, p->a);
213 }
214 
215 
216 /***********************************************************************
217 ** Start of IntArray methods.  */
218 /*
219 ** Append value iVal to the contents of IntArray *p. Return LSM_OK if
220 ** successful, or LSM_NOMEM if an OOM condition is encountered.
221 */
intArrayAppend(lsm_env * pEnv,IntArray * p,u32 iVal)222 static int intArrayAppend(lsm_env *pEnv, IntArray *p, u32 iVal){
223   assert( p->nArray<=p->nAlloc );
224   if( p->nArray>=p->nAlloc ){
225     u32 *aNew;
226     int nNew = p->nArray ? p->nArray*2 : 128;
227     aNew = lsmRealloc(pEnv, p->aArray, nNew*sizeof(u32));
228     if( !aNew ) return LSM_NOMEM_BKPT;
229     p->aArray = aNew;
230     p->nAlloc = nNew;
231   }
232 
233   p->aArray[p->nArray++] = iVal;
234   return LSM_OK;
235 }
236 
237 /*
238 ** Zero the IntArray object.
239 */
intArrayFree(lsm_env * pEnv,IntArray * p)240 static void intArrayFree(lsm_env *pEnv, IntArray *p){
241   p->nArray = 0;
242 }
243 
244 /*
245 ** Return the number of entries currently in the int-array object.
246 */
intArraySize(IntArray * p)247 static int intArraySize(IntArray *p){
248   return p->nArray;
249 }
250 
251 /*
252 ** Return a copy of the iIdx'th entry in the int-array.
253 */
intArrayEntry(IntArray * p,int iIdx)254 static u32 intArrayEntry(IntArray *p, int iIdx){
255   return p->aArray[iIdx];
256 }
257 
258 /*
259 ** Truncate the int-array so that all but the first nVal values are
260 ** discarded.
261 */
intArrayTruncate(IntArray * p,int nVal)262 static void intArrayTruncate(IntArray *p, int nVal){
263   p->nArray = nVal;
264 }
265 /* End of IntArray methods.
266 ***********************************************************************/
267 
treeKeycmp(void * p1,int n1,void * p2,int n2)268 static int treeKeycmp(void *p1, int n1, void *p2, int n2){
269   int res;
270   res = memcmp(p1, p2, LSM_MIN(n1, n2));
271   if( res==0 ) res = (n1-n2);
272   return res;
273 }
274 
275 /*
276 ** The pointer passed as the first argument points to an interior node,
277 ** not a leaf. This function returns the offset of the iCell'th child
278 ** sub-tree of the node.
279 */
getChildPtr(TreeNode * p,int iVersion,int iCell)280 static u32 getChildPtr(TreeNode *p, int iVersion, int iCell){
281   assert( iVersion>=0 );
282   assert( iCell>=0 && iCell<=array_size(p->aiChildPtr) );
283   if( p->iV2 && p->iV2<=(u32)iVersion && iCell==p->iV2Child ) return p->iV2Ptr;
284   return p->aiChildPtr[iCell];
285 }
286 
287 /*
288 ** Given an offset within the *-shm file, return the associated chunk number.
289 */
treeOffsetToChunk(u32 iOff)290 static int treeOffsetToChunk(u32 iOff){
291   assert( LSM_SHM_CHUNK_SIZE==(1<<15) );
292   return (int)(iOff>>15);
293 }
294 
295 #define treeShmptrUnsafe(pDb, iPtr) \
296 (&((u8*)((pDb)->apShm[(iPtr)>>15]))[(iPtr) & (LSM_SHM_CHUNK_SIZE-1)])
297 
298 /*
299 ** Return a pointer to the mapped memory location associated with *-shm
300 ** file offset iPtr.
301 */
treeShmptr(lsm_db * pDb,u32 iPtr)302 static void *treeShmptr(lsm_db *pDb, u32 iPtr){
303 
304   assert( (iPtr>>15)<(u32)pDb->nShm );
305   assert( pDb->apShm[iPtr>>15] );
306 
307   return iPtr ? treeShmptrUnsafe(pDb, iPtr) : 0;
308 }
309 
treeShmChunk(lsm_db * pDb,int iChunk)310 static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){
311   return (ShmChunk *)(pDb->apShm[iChunk]);
312 }
313 
treeShmChunkRc(lsm_db * pDb,int iChunk,int * pRc)314 static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){
315   assert( *pRc==LSM_OK );
316   if( iChunk<pDb->nShm || LSM_OK==(*pRc = lsmShmCacheChunks(pDb, iChunk+1)) ){
317     return (ShmChunk *)(pDb->apShm[iChunk]);
318   }
319   return 0;
320 }
321 
322 
323 #ifndef NDEBUG
assertIsWorkingChild(lsm_db * db,TreeNode * pNode,TreeNode * pParent,int iCell)324 static void assertIsWorkingChild(
325   lsm_db *db,
326   TreeNode *pNode,
327   TreeNode *pParent,
328   int iCell
329 ){
330   TreeNode *p;
331   u32 iPtr = getChildPtr(pParent, WORKING_VERSION, iCell);
332   p = treeShmptr(db, iPtr);
333   assert( p==pNode );
334 }
335 #else
336 # define assertIsWorkingChild(w,x,y,z)
337 #endif
338 
339 /* Values for the third argument to treeShmkey(). */
340 #define TKV_LOADKEY  1
341 #define TKV_LOADVAL  2
342 
treeShmkey(lsm_db * pDb,u32 iPtr,int eLoad,TreeBlob * pBlob,int * pRc)343 static TreeKey *treeShmkey(
344   lsm_db *pDb,                    /* Database handle */
345   u32 iPtr,                       /* Shmptr to TreeKey struct */
346   int eLoad,                      /* Either zero or a TREEKEY_LOADXXX value */
347   TreeBlob *pBlob,                /* Used if dynamic memory is required */
348   int *pRc                        /* IN/OUT: Error code */
349 ){
350   TreeKey *pRet;
351 
352   assert( eLoad==TKV_LOADKEY || eLoad==TKV_LOADVAL );
353   pRet = (TreeKey *)treeShmptr(pDb, iPtr);
354   if( pRet ){
355     int nReq;                     /* Bytes of space required at pRet */
356     int nAvail;                   /* Bytes of space available at pRet */
357 
358     nReq = sizeof(TreeKey) + pRet->nKey;
359     if( eLoad==TKV_LOADVAL && pRet->nValue>0 ){
360       nReq += pRet->nValue;
361     }
362     assert( LSM_SHM_CHUNK_SIZE==(1<<15) );
363     nAvail = LSM_SHM_CHUNK_SIZE - (iPtr & (LSM_SHM_CHUNK_SIZE-1));
364 
365     if( nAvail<nReq ){
366       if( tblobGrow(pDb, pBlob, nReq, pRc)==0 ){
367         int nLoad = 0;
368         while( *pRc==LSM_OK ){
369           ShmChunk *pChunk;
370           void *p = treeShmptr(pDb, iPtr);
371           int n = LSM_MIN(nAvail, nReq-nLoad);
372 
373           memcpy(&pBlob->a[nLoad], p, n);
374           nLoad += n;
375           if( nLoad==nReq ) break;
376 
377           pChunk = treeShmChunk(pDb, treeOffsetToChunk(iPtr));
378           assert( pChunk );
379           iPtr = (pChunk->iNext * LSM_SHM_CHUNK_SIZE) + LSM_SHM_CHUNK_HDR;
380           nAvail = LSM_SHM_CHUNK_SIZE - LSM_SHM_CHUNK_HDR;
381         }
382       }
383       pRet = (TreeKey *)(pBlob->a);
384     }
385   }
386 
387   return pRet;
388 }
389 
390 #if defined(LSM_DEBUG) && defined(LSM_EXPENSIVE_ASSERT)
assert_leaf_looks_ok(TreeNode * pNode)391 void assert_leaf_looks_ok(TreeNode *pNode){
392   assert( pNode->apKey[1] );
393 }
394 
assert_node_looks_ok(TreeNode * pNode,int nHeight)395 void assert_node_looks_ok(TreeNode *pNode, int nHeight){
396   if( pNode ){
397     assert( pNode->apKey[1] );
398     if( nHeight>1 ){
399       int i;
400       assert( getChildPtr(pNode, WORKING_VERSION, 1) );
401       assert( getChildPtr(pNode, WORKING_VERSION, 2) );
402       for(i=0; i<4; i++){
403         assert_node_looks_ok(getChildPtr(pNode, WORKING_VERSION, i), nHeight-1);
404       }
405     }
406   }
407 }
408 
409 /*
410 ** Run various assert() statements to check that the working-version of the
411 ** tree is correct in the following respects:
412 **
413 **   * todo...
414 */
assert_tree_looks_ok(int rc,Tree * pTree)415 void assert_tree_looks_ok(int rc, Tree *pTree){
416 }
417 #else
418 # define assert_tree_looks_ok(x,y)
419 #endif
420 
lsmFlagsToString(int flags,char * zFlags)421 void lsmFlagsToString(int flags, char *zFlags){
422 
423   zFlags[0] = (flags & LSM_END_DELETE)   ? ']' : '.';
424 
425   /* Only one of LSM_POINT_DELETE, LSM_INSERT and LSM_SEPARATOR should ever
426   ** be set. If this is not true, write a '?' to the output.  */
427   switch( flags & (LSM_POINT_DELETE|LSM_INSERT|LSM_SEPARATOR) ){
428     case 0:                zFlags[1] = '.'; break;
429     case LSM_POINT_DELETE: zFlags[1] = '-'; break;
430     case LSM_INSERT:       zFlags[1] = '+'; break;
431     case LSM_SEPARATOR:    zFlags[1] = '^'; break;
432     default:               zFlags[1] = '?'; break;
433   }
434 
435   zFlags[2] = (flags & LSM_SYSTEMKEY)    ? '*' : '.';
436   zFlags[3] = (flags & LSM_START_DELETE) ? '[' : '.';
437   zFlags[4] = '\0';
438 }
439 
440 #ifdef LSM_DEBUG
441 
442 /*
443 ** Pointer pBlob points to a buffer containing a blob of binary data
444 ** nBlob bytes long. Append the contents of this blob to *pStr, with
445 ** each octet represented by a 2-digit hexadecimal number. For example,
446 ** if the input blob is three bytes in size and contains {0x01, 0x44, 0xFF},
447 ** then "0144ff" is appended to *pStr.
448 */
lsmAppendStrBlob(LsmString * pStr,void * pBlob,int nBlob)449 static void lsmAppendStrBlob(LsmString *pStr, void *pBlob, int nBlob){
450   int i;
451   lsmStringExtend(pStr, nBlob*2);
452   if( pStr->nAlloc==0 ) return;
453   for(i=0; i<nBlob; i++){
454     u8 c = ((u8*)pBlob)[i];
455     if( c>='a' && c<='z' ){
456       pStr->z[pStr->n++] = c;
457     }else if( c!=0 || nBlob==1 || i!=(nBlob-1) ){
458       pStr->z[pStr->n++] = "0123456789abcdef"[(c>>4)&0xf];
459       pStr->z[pStr->n++] = "0123456789abcdef"[c&0xf];
460     }
461   }
462   pStr->z[pStr->n] = 0;
463 }
464 
465 #if 0  /* NOT USED */
466 /*
467 ** Append nIndent space (0x20) characters to string *pStr.
468 */
469 static void lsmAppendIndent(LsmString *pStr, int nIndent){
470   int i;
471   lsmStringExtend(pStr, nIndent);
472   for(i=0; i<nIndent; i++) lsmStringAppend(pStr, " ", 1);
473 }
474 #endif
475 
strAppendFlags(LsmString * pStr,u8 flags)476 static void strAppendFlags(LsmString *pStr, u8 flags){
477   char zFlags[8];
478 
479   lsmFlagsToString(flags, zFlags);
480   zFlags[4] = ':';
481 
482   lsmStringAppend(pStr, zFlags, 5);
483 }
484 
dump_node_contents(lsm_db * pDb,u32 iNode,char * zPath,int nPath,int nHeight)485 void dump_node_contents(
486   lsm_db *pDb,
487   u32 iNode,                      /* Print out the contents of this node */
488   char *zPath,                    /* Path from root to this node */
489   int nPath,                      /* Number of bytes in zPath */
490   int nHeight                     /* Height: (0==leaf) (1==parent-of-leaf) */
491 ){
492   const char *zSpace = "                                           ";
493   int i;
494   int rc = LSM_OK;
495   LsmString s;
496   TreeNode *pNode;
497   TreeBlob b = {0, 0};
498 
499   pNode = (TreeNode *)treeShmptr(pDb, iNode);
500 
501   if( nHeight==0 ){
502     /* Append the nIndent bytes of space to string s. */
503     lsmStringInit(&s, pDb->pEnv);
504 
505     /* Append each key to string s. */
506     for(i=0; i<3; i++){
507       u32 iPtr = pNode->aiKeyPtr[i];
508       if( iPtr ){
509         TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i],TKV_LOADKEY, &b,&rc);
510         strAppendFlags(&s, pKey->flags);
511         lsmAppendStrBlob(&s, TKV_KEY(pKey), pKey->nKey);
512         lsmStringAppend(&s, "     ", -1);
513       }
514     }
515 
516     printf("% 6d %.*sleaf%.*s: %s\n",
517         iNode, nPath, zPath, 20-nPath-4, zSpace, s.z
518     );
519     lsmStringClear(&s);
520   }else{
521     for(i=0; i<4 && nHeight>0; i++){
522       u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i);
523       zPath[nPath] = (char)(i+'0');
524       zPath[nPath+1] = '/';
525 
526       if( iPtr ){
527         dump_node_contents(pDb, iPtr, zPath, nPath+2, nHeight-1);
528       }
529       if( i!=3 && pNode->aiKeyPtr[i] ){
530         TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TKV_LOADKEY,&b,&rc);
531         lsmStringInit(&s, pDb->pEnv);
532         strAppendFlags(&s, pKey->flags);
533         lsmAppendStrBlob(&s, TKV_KEY(pKey), pKey->nKey);
534         printf("% 6d %.*s%.*s: %s\n",
535             iNode, nPath+1, zPath, 20-nPath-1, zSpace, s.z);
536         lsmStringClear(&s);
537       }
538     }
539   }
540 
541   tblobFree(pDb, &b);
542 }
543 
dump_tree_contents(lsm_db * pDb,const char * zCaption)544 void dump_tree_contents(lsm_db *pDb, const char *zCaption){
545   char zPath[64];
546   TreeRoot *p = &pDb->treehdr.root;
547   printf("\n%s\n", zCaption);
548   zPath[0] = '/';
549   if( p->iRoot ){
550     dump_node_contents(pDb, p->iRoot, zPath, 1, p->nHeight-1);
551   }
552   fflush(stdout);
553 }
554 
555 #endif
556 
557 /*
558 ** Initialize a cursor object, the space for which has already been
559 ** allocated.
560 */
treeCursorInit(lsm_db * pDb,int bOld,TreeCursor * pCsr)561 static void treeCursorInit(lsm_db *pDb, int bOld, TreeCursor *pCsr){
562   memset(pCsr, 0, sizeof(TreeCursor));
563   pCsr->pDb = pDb;
564   if( bOld ){
565     pCsr->pRoot = &pDb->treehdr.oldroot;
566   }else{
567     pCsr->pRoot = &pDb->treehdr.root;
568   }
569   pCsr->iNode = -1;
570 }
571 
572 /*
573 ** Return a pointer to the mapping of the TreeKey object that the cursor
574 ** is pointing to.
575 */
csrGetKey(TreeCursor * pCsr,TreeBlob * pBlob,int * pRc)576 static TreeKey *csrGetKey(TreeCursor *pCsr, TreeBlob *pBlob, int *pRc){
577   TreeKey *pRet;
578   lsm_db *pDb = pCsr->pDb;
579   u32 iPtr = pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]];
580 
581   assert( iPtr );
582   pRet = (TreeKey*)treeShmptrUnsafe(pDb, iPtr);
583   if( !(pRet->flags & LSM_CONTIGUOUS) ){
584     pRet = treeShmkey(pDb, iPtr, TKV_LOADVAL, pBlob, pRc);
585   }
586 
587   return pRet;
588 }
589 
590 /*
591 ** Save the current position of tree cursor pCsr.
592 */
lsmTreeCursorSave(TreeCursor * pCsr)593 int lsmTreeCursorSave(TreeCursor *pCsr){
594   int rc = LSM_OK;
595   if( pCsr && pCsr->pSave==0 ){
596     int iNode = pCsr->iNode;
597     if( iNode>=0 ){
598       pCsr->pSave = csrGetKey(pCsr, &pCsr->blob, &rc);
599     }
600     pCsr->iNode = -1;
601   }
602   return rc;
603 }
604 
605 /*
606 ** Restore the position of a saved tree cursor.
607 */
treeCursorRestore(TreeCursor * pCsr,int * pRes)608 static int treeCursorRestore(TreeCursor *pCsr, int *pRes){
609   int rc = LSM_OK;
610   if( pCsr->pSave ){
611     TreeKey *pKey = pCsr->pSave;
612     pCsr->pSave = 0;
613     if( pRes ){
614       rc = lsmTreeCursorSeek(pCsr, TKV_KEY(pKey), pKey->nKey, pRes);
615     }
616   }
617   return rc;
618 }
619 
620 /*
621 ** Allocate nByte bytes of space within the *-shm file. If successful,
622 ** return LSM_OK and set *piPtr to the offset within the file at which
623 ** the allocated space is located.
624 */
treeShmalloc(lsm_db * pDb,int bAlign,int nByte,int * pRc)625 static u32 treeShmalloc(lsm_db *pDb, int bAlign, int nByte, int *pRc){
626   u32 iRet = 0;
627   if( *pRc==LSM_OK ){
628     const static int CHUNK_SIZE = LSM_SHM_CHUNK_SIZE;
629     const static int CHUNK_HDR = LSM_SHM_CHUNK_HDR;
630     u32 iWrite;                   /* Current write offset */
631     u32 iEof;                     /* End of current chunk */
632     int iChunk;                   /* Current chunk */
633 
634     assert( nByte <= (CHUNK_SIZE-CHUNK_HDR) );
635 
636     /* Check if there is enough space on the current chunk to fit the
637     ** new allocation. If not, link in a new chunk and put the new
638     ** allocation at the start of it.  */
639     iWrite = pDb->treehdr.iWrite;
640     if( bAlign ){
641       iWrite = (iWrite + 3) & ~0x0003;
642       assert( (iWrite % 4)==0 );
643     }
644 
645     assert( iWrite );
646     iChunk = treeOffsetToChunk(iWrite-1);
647     iEof = (iChunk+1) * CHUNK_SIZE;
648     assert( iEof>=iWrite && (iEof-iWrite)<(u32)CHUNK_SIZE );
649     if( (iWrite+nByte)>iEof ){
650       ShmChunk *pHdr;           /* Header of chunk just finished (iChunk) */
651       ShmChunk *pFirst;         /* Header of chunk treehdr.iFirst */
652       ShmChunk *pNext;          /* Header of new chunk */
653       int iNext = 0;            /* Next chunk */
654       int rc = LSM_OK;
655 
656       pFirst = treeShmChunk(pDb, pDb->treehdr.iFirst);
657 
658       assert( shm_sequence_ge(pDb->treehdr.iUsedShmid, pFirst->iShmid) );
659       assert( (pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk)==pFirst->iShmid );
660 
661       /* Check if the chunk at the start of the linked list is still in
662       ** use. If not, reuse it. If so, allocate a new chunk by appending
663       ** to the *-shm file.  */
664       if( pDb->treehdr.iUsedShmid!=pFirst->iShmid ){
665         int bInUse;
666         rc = lsmTreeInUse(pDb, pFirst->iShmid, &bInUse);
667         if( rc!=LSM_OK ){
668           *pRc = rc;
669           return 0;
670         }
671         if( bInUse==0 ){
672           iNext = pDb->treehdr.iFirst;
673           pDb->treehdr.iFirst = pFirst->iNext;
674           assert( pDb->treehdr.iFirst );
675         }
676       }
677       if( iNext==0 ) iNext = pDb->treehdr.nChunk++;
678 
679       /* Set the header values for the new chunk */
680       pNext = treeShmChunkRc(pDb, iNext, &rc);
681       if( pNext ){
682         pNext->iNext = 0;
683         pNext->iShmid = (pDb->treehdr.iNextShmid++);
684       }else{
685         *pRc = rc;
686         return 0;
687       }
688 
689       /* Set the header values for the chunk just finished */
690       pHdr = (ShmChunk *)treeShmptr(pDb, iChunk*CHUNK_SIZE);
691       pHdr->iNext = iNext;
692 
693       /* Advance to the next chunk */
694       iWrite = iNext * CHUNK_SIZE + CHUNK_HDR;
695     }
696 
697     /* Allocate space at iWrite. */
698     iRet = iWrite;
699     pDb->treehdr.iWrite = iWrite + nByte;
700     pDb->treehdr.root.nByte += nByte;
701   }
702   return iRet;
703 }
704 
705 /*
706 ** Allocate and zero nByte bytes of space within the *-shm file.
707 */
treeShmallocZero(lsm_db * pDb,int nByte,u32 * piPtr,int * pRc)708 static void *treeShmallocZero(lsm_db *pDb, int nByte, u32 *piPtr, int *pRc){
709   u32 iPtr;
710   void *p;
711   iPtr = treeShmalloc(pDb, 1, nByte, pRc);
712   p = treeShmptr(pDb, iPtr);
713   if( p ){
714     assert( *pRc==LSM_OK );
715     memset(p, 0, nByte);
716     *piPtr = iPtr;
717   }
718   return p;
719 }
720 
newTreeNode(lsm_db * pDb,u32 * piPtr,int * pRc)721 static TreeNode *newTreeNode(lsm_db *pDb, u32 *piPtr, int *pRc){
722   return treeShmallocZero(pDb, sizeof(TreeNode), piPtr, pRc);
723 }
724 
newTreeLeaf(lsm_db * pDb,u32 * piPtr,int * pRc)725 static TreeLeaf *newTreeLeaf(lsm_db *pDb, u32 *piPtr, int *pRc){
726   return treeShmallocZero(pDb, sizeof(TreeLeaf), piPtr, pRc);
727 }
728 
newTreeKey(lsm_db * pDb,u32 * piPtr,void * pKey,int nKey,void * pVal,int nVal,int * pRc)729 static TreeKey *newTreeKey(
730   lsm_db *pDb,
731   u32 *piPtr,
732   void *pKey, int nKey,           /* Key data */
733   void *pVal, int nVal,           /* Value data (or nVal<0 for delete) */
734   int *pRc
735 ){
736   TreeKey *p;
737   u32 iPtr;
738   u32 iEnd;
739   int nRem;
740   u8 *a;
741   int n;
742 
743   /* Allocate space for the TreeKey structure itself */
744   *piPtr = iPtr = treeShmalloc(pDb, 1, sizeof(TreeKey), pRc);
745   p = treeShmptr(pDb, iPtr);
746   if( *pRc ) return 0;
747   p->nKey = nKey;
748   p->nValue = nVal;
749 
750   /* Allocate and populate the space required for the key and value. */
751   n = nRem = nKey;
752   a = (u8 *)pKey;
753   while( a ){
754     while( nRem>0 ){
755       u8 *aAlloc;
756       int nAlloc;
757       u32 iWrite;
758 
759       iWrite = (pDb->treehdr.iWrite & (LSM_SHM_CHUNK_SIZE-1));
760       iWrite = LSM_MAX(iWrite, LSM_SHM_CHUNK_HDR);
761       nAlloc = LSM_MIN((LSM_SHM_CHUNK_SIZE-iWrite), (u32)nRem);
762 
763       aAlloc = treeShmptr(pDb, treeShmalloc(pDb, 0, nAlloc, pRc));
764       if( aAlloc==0 ) break;
765       memcpy(aAlloc, &a[n-nRem], nAlloc);
766       nRem -= nAlloc;
767     }
768     a = pVal;
769     n = nRem = nVal;
770     pVal = 0;
771   }
772 
773   iEnd = iPtr + sizeof(TreeKey) + nKey + LSM_MAX(0, nVal);
774   if( (iPtr & ~(LSM_SHM_CHUNK_SIZE-1))!=(iEnd & ~(LSM_SHM_CHUNK_SIZE-1)) ){
775     p->flags = 0;
776   }else{
777     p->flags = LSM_CONTIGUOUS;
778   }
779 
780   if( *pRc ) return 0;
781 #if 0
782   printf("store: %d %s\n", (int)iPtr, (char *)pKey);
783 #endif
784   return p;
785 }
786 
copyTreeNode(lsm_db * pDb,TreeNode * pOld,u32 * piNew,int * pRc)787 static TreeNode *copyTreeNode(
788   lsm_db *pDb,
789   TreeNode *pOld,
790   u32 *piNew,
791   int *pRc
792 ){
793   TreeNode *pNew;
794 
795   pNew = newTreeNode(pDb, piNew, pRc);
796   if( pNew ){
797     memcpy(pNew->aiKeyPtr, pOld->aiKeyPtr, sizeof(pNew->aiKeyPtr));
798     memcpy(pNew->aiChildPtr, pOld->aiChildPtr, sizeof(pNew->aiChildPtr));
799     if( pOld->iV2 ) pNew->aiChildPtr[pOld->iV2Child] = pOld->iV2Ptr;
800   }
801   return pNew;
802 }
803 
copyTreeLeaf(lsm_db * pDb,TreeLeaf * pOld,u32 * piNew,int * pRc)804 static TreeNode *copyTreeLeaf(
805   lsm_db *pDb,
806   TreeLeaf *pOld,
807   u32 *piNew,
808   int *pRc
809 ){
810   TreeLeaf *pNew;
811   pNew = newTreeLeaf(pDb, piNew, pRc);
812   if( pNew ){
813     memcpy(pNew, pOld, sizeof(TreeLeaf));
814   }
815   return (TreeNode *)pNew;
816 }
817 
818 /*
819 ** The tree cursor passed as the second argument currently points to an
820 ** internal node (not a leaf). Specifically, to a sub-tree pointer. This
821 ** function replaces the sub-tree that the cursor currently points to
822 ** with sub-tree pNew.
823 **
824 ** The sub-tree may be replaced either by writing the "v2 data" on the
825 ** internal node, or by allocating a new TreeNode structure and then
826 ** calling this function on the parent of the internal node.
827 */
treeUpdatePtr(lsm_db * pDb,TreeCursor * pCsr,u32 iNew)828 static int treeUpdatePtr(lsm_db *pDb, TreeCursor *pCsr, u32 iNew){
829   int rc = LSM_OK;
830   if( pCsr->iNode<0 ){
831     /* iNew is the new root node */
832     pDb->treehdr.root.iRoot = iNew;
833   }else{
834     /* If this node already has version 2 content, allocate a copy and
835     ** update the copy with the new pointer value. Otherwise, store the
836     ** new pointer as v2 data within the current node structure.  */
837 
838     TreeNode *p;                  /* The node to be modified */
839     int iChildPtr;                /* apChild[] entry to modify */
840 
841     p = pCsr->apTreeNode[pCsr->iNode];
842     iChildPtr = pCsr->aiCell[pCsr->iNode];
843 
844     if( p->iV2 ){
845       /* The "allocate new TreeNode" option */
846       u32 iCopy;
847       TreeNode *pCopy;
848       pCopy = copyTreeNode(pDb, p, &iCopy, &rc);
849       if( pCopy ){
850         assert( rc==LSM_OK );
851         pCopy->aiChildPtr[iChildPtr] = iNew;
852         pCsr->iNode--;
853         rc = treeUpdatePtr(pDb, pCsr, iCopy);
854       }
855     }else{
856       /* The "v2 data" option */
857       u32 iPtr;
858       assert( pDb->treehdr.root.iTransId>0 );
859 
860       if( pCsr->iNode ){
861         iPtr = getChildPtr(
862             pCsr->apTreeNode[pCsr->iNode-1],
863             pDb->treehdr.root.iTransId, pCsr->aiCell[pCsr->iNode-1]
864         );
865       }else{
866         iPtr = pDb->treehdr.root.iRoot;
867       }
868       rc = intArrayAppend(pDb->pEnv, &pDb->rollback, iPtr);
869 
870       if( rc==LSM_OK ){
871         p->iV2 = pDb->treehdr.root.iTransId;
872         p->iV2Child = (u8)iChildPtr;
873         p->iV2Ptr = iNew;
874       }
875     }
876   }
877 
878   return rc;
879 }
880 
881 /*
882 ** Cursor pCsr points at a node that is part of pTree. This function
883 ** inserts a new key and optionally child node pointer into that node.
884 **
885 ** The position into which the new key and pointer are inserted is
886 ** determined by the iSlot parameter. The new key will be inserted to
887 ** the left of the key currently stored in apKey[iSlot]. Or, if iSlot is
888 ** greater than the index of the rightmost key in the node.
889 **
890 ** Pointer pLeftPtr points to a child tree that contains keys that are
891 ** smaller than pTreeKey.
892 */
treeInsert(lsm_db * pDb,TreeCursor * pCsr,u32 iLeftPtr,u32 iTreeKey,u32 iRightPtr,int iSlot)893 static int treeInsert(
894   lsm_db *pDb,                    /* Database handle */
895   TreeCursor *pCsr,               /* Cursor indicating path to insert at */
896   u32 iLeftPtr,                   /* Left child pointer */
897   u32 iTreeKey,                   /* Location of key to insert */
898   u32 iRightPtr,                  /* Right child pointer */
899   int iSlot                       /* Position to insert key into */
900 ){
901   int rc = LSM_OK;
902   TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
903 
904   /* Check if the node is currently full. If so, split pNode in two and
905   ** call this function recursively to add a key to the parent. Otherwise,
906   ** insert the new key directly into pNode.  */
907   assert( pNode->aiKeyPtr[1] );
908   if( pNode->aiKeyPtr[0] && pNode->aiKeyPtr[2] ){
909     u32 iLeft; TreeNode *pLeft;   /* New left-hand sibling node */
910     u32 iRight; TreeNode *pRight; /* New right-hand sibling node */
911 
912     pLeft = newTreeNode(pDb, &iLeft, &rc);
913     pRight = newTreeNode(pDb, &iRight, &rc);
914     if( rc ) return rc;
915 
916     pLeft->aiChildPtr[1] = getChildPtr(pNode, WORKING_VERSION, 0);
917     pLeft->aiKeyPtr[1] = pNode->aiKeyPtr[0];
918     pLeft->aiChildPtr[2] = getChildPtr(pNode, WORKING_VERSION, 1);
919 
920     pRight->aiChildPtr[1] = getChildPtr(pNode, WORKING_VERSION, 2);
921     pRight->aiKeyPtr[1] = pNode->aiKeyPtr[2];
922     pRight->aiChildPtr[2] = getChildPtr(pNode, WORKING_VERSION, 3);
923 
924     if( pCsr->iNode==0 ){
925       /* pNode is the root of the tree. Grow the tree by one level. */
926       u32 iRoot; TreeNode *pRoot; /* New root node */
927 
928       pRoot = newTreeNode(pDb, &iRoot, &rc);
929       pRoot->aiKeyPtr[1] = pNode->aiKeyPtr[1];
930       pRoot->aiChildPtr[1] = iLeft;
931       pRoot->aiChildPtr[2] = iRight;
932 
933       pDb->treehdr.root.iRoot = iRoot;
934       pDb->treehdr.root.nHeight++;
935     }else{
936 
937       pCsr->iNode--;
938       rc = treeInsert(pDb, pCsr,
939           iLeft, pNode->aiKeyPtr[1], iRight, pCsr->aiCell[pCsr->iNode]
940       );
941     }
942 
943     assert( pLeft->iV2==0 );
944     assert( pRight->iV2==0 );
945     switch( iSlot ){
946       case 0:
947         pLeft->aiKeyPtr[0] = iTreeKey;
948         pLeft->aiChildPtr[0] = iLeftPtr;
949         if( iRightPtr ) pLeft->aiChildPtr[1] = iRightPtr;
950         break;
951       case 1:
952         pLeft->aiChildPtr[3] = (iRightPtr ? iRightPtr : pLeft->aiChildPtr[2]);
953         pLeft->aiKeyPtr[2] = iTreeKey;
954         pLeft->aiChildPtr[2] = iLeftPtr;
955         break;
956       case 2:
957         pRight->aiKeyPtr[0] = iTreeKey;
958         pRight->aiChildPtr[0] = iLeftPtr;
959         if( iRightPtr ) pRight->aiChildPtr[1] = iRightPtr;
960         break;
961       case 3:
962         pRight->aiChildPtr[3] = (iRightPtr ? iRightPtr : pRight->aiChildPtr[2]);
963         pRight->aiKeyPtr[2] = iTreeKey;
964         pRight->aiChildPtr[2] = iLeftPtr;
965         break;
966     }
967 
968   }else{
969     TreeNode *pNew;
970     u32 *piKey;
971     u32 *piChild;
972     u32 iStore = 0;
973     u32 iNew = 0;
974     int i;
975 
976     /* Allocate a new version of node pNode. */
977     pNew = newTreeNode(pDb, &iNew, &rc);
978     if( rc ) return rc;
979 
980     piKey = pNew->aiKeyPtr;
981     piChild = pNew->aiChildPtr;
982 
983     for(i=0; i<iSlot; i++){
984       if( pNode->aiKeyPtr[i] ){
985         *(piKey++) = pNode->aiKeyPtr[i];
986         *(piChild++) = getChildPtr(pNode, WORKING_VERSION, i);
987       }
988     }
989 
990     *piKey++ = iTreeKey;
991     *piChild++ = iLeftPtr;
992 
993     iStore = iRightPtr;
994     for(i=iSlot; i<3; i++){
995       if( pNode->aiKeyPtr[i] ){
996         *(piKey++) = pNode->aiKeyPtr[i];
997         *(piChild++) = iStore ? iStore : getChildPtr(pNode, WORKING_VERSION, i);
998         iStore = 0;
999       }
1000     }
1001 
1002     if( iStore ){
1003       *piChild = iStore;
1004     }else{
1005       *piChild = getChildPtr(pNode, WORKING_VERSION,
1006           (pNode->aiKeyPtr[2] ? 3 : 2)
1007       );
1008     }
1009     pCsr->iNode--;
1010     rc = treeUpdatePtr(pDb, pCsr, iNew);
1011   }
1012 
1013   return rc;
1014 }
1015 
treeInsertLeaf(lsm_db * pDb,TreeCursor * pCsr,u32 iTreeKey,int iSlot)1016 static int treeInsertLeaf(
1017   lsm_db *pDb,                    /* Database handle */
1018   TreeCursor *pCsr,               /* Cursor structure */
1019   u32 iTreeKey,                   /* Key pointer to insert */
1020   int iSlot                       /* Insert key to the left of this */
1021 ){
1022   int rc = LSM_OK;                /* Return code */
1023   TreeNode *pLeaf = pCsr->apTreeNode[pCsr->iNode];
1024   TreeLeaf *pNew;
1025   u32 iNew;
1026 
1027   assert( iSlot>=0 && iSlot<=4 );
1028   assert( pCsr->iNode>0 );
1029   assert( pLeaf->aiKeyPtr[1] );
1030 
1031   pCsr->iNode--;
1032 
1033   pNew = newTreeLeaf(pDb, &iNew, &rc);
1034   if( pNew ){
1035     if( pLeaf->aiKeyPtr[0] && pLeaf->aiKeyPtr[2] ){
1036       /* The leaf is full. Split it in two. */
1037       TreeLeaf *pRight;
1038       u32 iRight;
1039       pRight = newTreeLeaf(pDb, &iRight, &rc);
1040       if( pRight ){
1041         assert( rc==LSM_OK );
1042         pNew->aiKeyPtr[1] = pLeaf->aiKeyPtr[0];
1043         pRight->aiKeyPtr[1] = pLeaf->aiKeyPtr[2];
1044         switch( iSlot ){
1045           case 0: pNew->aiKeyPtr[0] = iTreeKey; break;
1046           case 1: pNew->aiKeyPtr[2] = iTreeKey; break;
1047           case 2: pRight->aiKeyPtr[0] = iTreeKey; break;
1048           case 3: pRight->aiKeyPtr[2] = iTreeKey; break;
1049         }
1050 
1051         rc = treeInsert(pDb, pCsr, iNew, pLeaf->aiKeyPtr[1], iRight,
1052             pCsr->aiCell[pCsr->iNode]
1053         );
1054       }
1055     }else{
1056       int iOut = 0;
1057       int i;
1058       for(i=0; i<4; i++){
1059         if( i==iSlot ) pNew->aiKeyPtr[iOut++] = iTreeKey;
1060         if( i<3 && pLeaf->aiKeyPtr[i] ){
1061           pNew->aiKeyPtr[iOut++] = pLeaf->aiKeyPtr[i];
1062         }
1063       }
1064       rc = treeUpdatePtr(pDb, pCsr, iNew);
1065     }
1066   }
1067 
1068   return rc;
1069 }
1070 
lsmTreeMakeOld(lsm_db * pDb)1071 void lsmTreeMakeOld(lsm_db *pDb){
1072 
1073   /* A write transaction must be open. Otherwise the code below that
1074   ** assumes (pDb->pClient->iLogOff) is current may malfunction.
1075   **
1076   ** Update: currently this assert fails due to lsm_flush(), which does
1077   ** not set nTransOpen.
1078   */
1079   assert( /* pDb->nTransOpen>0 && */ pDb->iReader>=0 );
1080 
1081   if( pDb->treehdr.iOldShmid==0 ){
1082     pDb->treehdr.iOldLog = (pDb->treehdr.log.aRegion[2].iEnd << 1);
1083     pDb->treehdr.iOldLog |= (~(pDb->pClient->iLogOff) & (i64)0x0001);
1084 
1085     pDb->treehdr.oldcksum0 = pDb->treehdr.log.cksum0;
1086     pDb->treehdr.oldcksum1 = pDb->treehdr.log.cksum1;
1087     pDb->treehdr.iOldShmid = pDb->treehdr.iNextShmid-1;
1088     memcpy(&pDb->treehdr.oldroot, &pDb->treehdr.root, sizeof(TreeRoot));
1089 
1090     pDb->treehdr.root.iTransId = 1;
1091     pDb->treehdr.root.iRoot = 0;
1092     pDb->treehdr.root.nHeight = 0;
1093     pDb->treehdr.root.nByte = 0;
1094   }
1095 }
1096 
lsmTreeDiscardOld(lsm_db * pDb)1097 void lsmTreeDiscardOld(lsm_db *pDb){
1098   assert( lsmShmAssertLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL)
1099        || lsmShmAssertLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL)
1100   );
1101   pDb->treehdr.iUsedShmid = pDb->treehdr.iOldShmid;
1102   pDb->treehdr.iOldShmid = 0;
1103 }
1104 
lsmTreeHasOld(lsm_db * pDb)1105 int lsmTreeHasOld(lsm_db *pDb){
1106   return pDb->treehdr.iOldShmid!=0;
1107 }
1108 
1109 /*
1110 ** This function is called during recovery to initialize the
1111 ** tree header. Only the database connections private copy of the tree-header
1112 ** is initialized here - it will be copied into shared memory if log file
1113 ** recovery is successful.
1114 */
lsmTreeInit(lsm_db * pDb)1115 int lsmTreeInit(lsm_db *pDb){
1116   ShmChunk *pOne;
1117   int rc = LSM_OK;
1118 
1119   memset(&pDb->treehdr, 0, sizeof(TreeHeader));
1120   pDb->treehdr.root.iTransId = 1;
1121   pDb->treehdr.iFirst = 1;
1122   pDb->treehdr.nChunk = 2;
1123   pDb->treehdr.iWrite = LSM_SHM_CHUNK_SIZE + LSM_SHM_CHUNK_HDR;
1124   pDb->treehdr.iNextShmid = 2;
1125   pDb->treehdr.iUsedShmid = 1;
1126 
1127   pOne = treeShmChunkRc(pDb, 1, &rc);
1128   if( pOne ){
1129     pOne->iNext = 0;
1130     pOne->iShmid = 1;
1131   }
1132   return rc;
1133 }
1134 
treeHeaderChecksum(TreeHeader * pHdr,u32 * aCksum)1135 static void treeHeaderChecksum(
1136   TreeHeader *pHdr,
1137   u32 *aCksum
1138 ){
1139   u32 cksum1 = 0x12345678;
1140   u32 cksum2 = 0x9ABCDEF0;
1141   u32 *a = (u32 *)pHdr;
1142   int i;
1143 
1144   assert( (offsetof(TreeHeader, aCksum) + sizeof(u32)*2)==sizeof(TreeHeader) );
1145   assert( (sizeof(TreeHeader) % (sizeof(u32)*2))==0 );
1146 
1147   for(i=0; i<(offsetof(TreeHeader, aCksum) / sizeof(u32)); i+=2){
1148     cksum1 += a[i];
1149     cksum2 += (cksum1 + a[i+1]);
1150   }
1151   aCksum[0] = cksum1;
1152   aCksum[1] = cksum2;
1153 }
1154 
1155 /*
1156 ** Return true if the checksum stored in TreeHeader object *pHdr is
1157 ** consistent with the contents of its other fields.
1158 */
treeHeaderChecksumOk(TreeHeader * pHdr)1159 static int treeHeaderChecksumOk(TreeHeader *pHdr){
1160   u32 aCksum[2];
1161   treeHeaderChecksum(pHdr, aCksum);
1162   return (0==memcmp(aCksum, pHdr->aCksum, sizeof(aCksum)));
1163 }
1164 
1165 /*
1166 ** This type is used by functions lsmTreeRepair() and treeSortByShmid() to
1167 ** make relinking the linked list of shared-memory chunks easier.
1168 */
1169 typedef struct ShmChunkLoc ShmChunkLoc;
1170 struct ShmChunkLoc {
1171   ShmChunk *pShm;
1172   u32 iLoc;
1173 };
1174 
1175 /*
1176 ** This function checks that the linked list of shared memory chunks
1177 ** that starts at chunk db->treehdr.iFirst:
1178 **
1179 **   1) Includes all chunks in the shared-memory region, and
1180 **   2) Links them together in order of ascending shm-id.
1181 **
1182 ** If no error occurs and the conditions above are met, LSM_OK is returned.
1183 **
1184 ** If either of the conditions are untrue, LSM_CORRUPT is returned. Or, if
1185 ** an error is encountered before the checks are completed, another LSM error
1186 ** code (i.e. LSM_IOERR or LSM_NOMEM) may be returned.
1187 */
treeCheckLinkedList(lsm_db * db)1188 static int treeCheckLinkedList(lsm_db *db){
1189   int rc = LSM_OK;
1190   int nVisit = 0;
1191   ShmChunk *p;
1192 
1193   p = treeShmChunkRc(db, db->treehdr.iFirst, &rc);
1194   while( rc==LSM_OK && p ){
1195     if( p->iNext ){
1196       if( p->iNext>=db->treehdr.nChunk ){
1197         rc = LSM_CORRUPT_BKPT;
1198       }else{
1199         ShmChunk *pNext = treeShmChunkRc(db, p->iNext, &rc);
1200         if( rc==LSM_OK ){
1201           if( pNext->iShmid!=p->iShmid+1 ){
1202             rc = LSM_CORRUPT_BKPT;
1203           }
1204           p = pNext;
1205         }
1206       }
1207     }else{
1208       p = 0;
1209     }
1210     nVisit++;
1211   }
1212 
1213   if( rc==LSM_OK && (u32)nVisit!=db->treehdr.nChunk-1 ){
1214     rc = LSM_CORRUPT_BKPT;
1215   }
1216   return rc;
1217 }
1218 
1219 /*
1220 ** Iterate through the current in-memory tree. If there are any v2-pointers
1221 ** with transaction ids larger than db->treehdr.iTransId, zero them.
1222 */
treeRepairPtrs(lsm_db * db)1223 static int treeRepairPtrs(lsm_db *db){
1224   int rc = LSM_OK;
1225 
1226   if( db->treehdr.root.nHeight>1 ){
1227     TreeCursor csr;               /* Cursor used to iterate through tree */
1228     u32 iTransId = db->treehdr.root.iTransId;
1229 
1230     /* Initialize the cursor structure. Also decrement the nHeight variable
1231     ** in the tree-header. This will prevent the cursor from visiting any
1232     ** leaf nodes.  */
1233     db->treehdr.root.nHeight--;
1234     treeCursorInit(db, 0, &csr);
1235 
1236     rc = lsmTreeCursorEnd(&csr, 0);
1237     while( rc==LSM_OK && lsmTreeCursorValid(&csr) ){
1238       TreeNode *pNode = csr.apTreeNode[csr.iNode];
1239       if( pNode->iV2>iTransId ){
1240         pNode->iV2Child = 0;
1241         pNode->iV2Ptr = 0;
1242         pNode->iV2 = 0;
1243       }
1244       rc = lsmTreeCursorNext(&csr);
1245     }
1246     tblobFree(csr.pDb, &csr.blob);
1247 
1248     db->treehdr.root.nHeight++;
1249   }
1250 
1251   return rc;
1252 }
1253 
treeRepairList(lsm_db * db)1254 static int treeRepairList(lsm_db *db){
1255   int rc = LSM_OK;
1256   int i;
1257   ShmChunk *p;
1258   ShmChunk *pMin = 0;
1259   u32 iMin = 0;
1260 
1261   /* Iterate through all shm chunks. Find the smallest shm-id present in
1262   ** the shared-memory region. */
1263   for(i=1; rc==LSM_OK && (u32)i<db->treehdr.nChunk; i++){
1264     p = treeShmChunkRc(db, i, &rc);
1265     if( p && (pMin==0 || shm_sequence_ge(pMin->iShmid, p->iShmid)) ){
1266       pMin = p;
1267       iMin = i;
1268     }
1269   }
1270 
1271   /* Fix the shm-id values on any chunks with a shm-id greater than or
1272   ** equal to treehdr.iNextShmid. Then do a merge-sort of all chunks to
1273   ** fix the ShmChunk.iNext pointers.
1274   */
1275   if( rc==LSM_OK ){
1276     int nSort;
1277     int nByte;
1278     u32 iPrevShmid;
1279     ShmChunkLoc *aSort;
1280 
1281     /* Allocate space for a merge sort. */
1282     nSort = 1;
1283     while( (u32)nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
1284     nByte = sizeof(ShmChunkLoc) * nSort * 2;
1285     aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);
1286     iPrevShmid = pMin->iShmid;
1287 
1288     /* Fix all shm-ids, if required. */
1289     if( rc==LSM_OK ){
1290       iPrevShmid = pMin->iShmid-1;
1291       for(i=1; (u32)i<db->treehdr.nChunk; i++){
1292         p = treeShmChunk(db, i);
1293         aSort[i-1].pShm = p;
1294         aSort[i-1].iLoc = i;
1295         if( (u32)i!=db->treehdr.iFirst ){
1296           if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
1297             p->iShmid = iPrevShmid--;
1298           }
1299         }
1300       }
1301       if( iMin!=db->treehdr.iFirst ){
1302         p = treeShmChunk(db, db->treehdr.iFirst);
1303         p->iShmid = iPrevShmid;
1304       }
1305     }
1306 
1307     if( rc==LSM_OK ){
1308       ShmChunkLoc *aSpace = &aSort[nSort];
1309       for(i=0; i<nSort; i++){
1310         if( aSort[i].pShm ){
1311           assert( shm_sequence_ge(aSort[i].pShm->iShmid, iPrevShmid) );
1312           assert( aSpace[aSort[i].pShm->iShmid - iPrevShmid].pShm==0 );
1313           aSpace[aSort[i].pShm->iShmid - iPrevShmid] = aSort[i];
1314         }
1315       }
1316 
1317       if( aSpace[nSort-1].pShm ) aSpace[nSort-1].pShm->iNext = 0;
1318       for(i=0; i<nSort-1; i++){
1319         if( aSpace[i].pShm ){
1320           aSpace[i].pShm->iNext = aSpace[i+1].iLoc;
1321         }
1322       }
1323 
1324       rc = treeCheckLinkedList(db);
1325       lsmFree(db->pEnv, aSort);
1326     }
1327   }
1328 
1329   return rc;
1330 }
1331 
1332 /*
1333 ** This function is called as part of opening a write-transaction if the
1334 ** writer-flag is already set - indicating that the previous writer
1335 ** failed before ending its transaction.
1336 */
lsmTreeRepair(lsm_db * db)1337 int lsmTreeRepair(lsm_db *db){
1338   int rc = LSM_OK;
1339   TreeHeader hdr;
1340   ShmHeader *pHdr = db->pShmhdr;
1341 
1342   /* Ensure that the two tree-headers are consistent. Copy one over the other
1343   ** if necessary. Prefer the data from a tree-header for which the checksum
1344   ** computes. Or, if they both compute, prefer tree-header-1.  */
1345   if( memcmp(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader)) ){
1346     if( treeHeaderChecksumOk(&pHdr->hdr1) ){
1347       memcpy(&pHdr->hdr2, &pHdr->hdr1, sizeof(TreeHeader));
1348     }else{
1349       memcpy(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader));
1350     }
1351   }
1352 
1353   /* Save the connections current copy of the tree-header. It will be
1354   ** restored before returning.  */
1355   memcpy(&hdr, &db->treehdr, sizeof(TreeHeader));
1356 
1357   /* Walk the tree. Zero any v2 pointers with a transaction-id greater than
1358   ** the transaction-id currently in the tree-headers.  */
1359   rc = treeRepairPtrs(db);
1360 
1361   /* Repair the linked list of shared-memory chunks. */
1362   if( rc==LSM_OK ){
1363     rc = treeRepairList(db);
1364   }
1365 
1366   memcpy(&db->treehdr, &hdr, sizeof(TreeHeader));
1367   return rc;
1368 }
1369 
treeOverwriteKey(lsm_db * db,TreeCursor * pCsr,u32 iKey,int * pRc)1370 static void treeOverwriteKey(lsm_db *db, TreeCursor *pCsr, u32 iKey, int *pRc){
1371   if( *pRc==LSM_OK ){
1372     TreeRoot *p = &db->treehdr.root;
1373     TreeNode *pNew;
1374     u32 iNew;
1375     TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
1376     int iCell = pCsr->aiCell[pCsr->iNode];
1377 
1378     /* Create a copy of this node */
1379     if( (pCsr->iNode>0 && (u32)pCsr->iNode==(p->nHeight-1)) ){
1380       pNew = copyTreeLeaf(db, (TreeLeaf *)pNode, &iNew, pRc);
1381     }else{
1382       pNew = copyTreeNode(db, pNode, &iNew, pRc);
1383     }
1384 
1385     if( pNew ){
1386       /* Modify the value in the new version */
1387       pNew->aiKeyPtr[iCell] = iKey;
1388 
1389       /* Change the pointer in the parent (if any) to point at the new
1390        ** TreeNode */
1391       pCsr->iNode--;
1392       treeUpdatePtr(db, pCsr, iNew);
1393     }
1394   }
1395 }
1396 
treeNextIsEndDelete(lsm_db * db,TreeCursor * pCsr)1397 static int treeNextIsEndDelete(lsm_db *db, TreeCursor *pCsr){
1398   int iNode = pCsr->iNode;
1399   int iCell = pCsr->aiCell[iNode]+1;
1400 
1401   /* Cursor currently points to a leaf node. */
1402   assert( (u32)pCsr->iNode==(db->treehdr.root.nHeight-1) );
1403 
1404   while( iNode>=0 ){
1405     TreeNode *pNode = pCsr->apTreeNode[iNode];
1406     if( iCell<3 && pNode->aiKeyPtr[iCell] ){
1407       int rc = LSM_OK;
1408       TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
1409       assert( rc==LSM_OK );
1410       return ((pKey->flags & LSM_END_DELETE) ? 1 : 0);
1411     }
1412     iNode--;
1413     iCell = pCsr->aiCell[iNode];
1414   }
1415 
1416   return 0;
1417 }
1418 
treePrevIsStartDelete(lsm_db * db,TreeCursor * pCsr)1419 static int treePrevIsStartDelete(lsm_db *db, TreeCursor *pCsr){
1420   int iNode = pCsr->iNode;
1421 
1422   /* Cursor currently points to a leaf node. */
1423   assert( (u32)pCsr->iNode==(db->treehdr.root.nHeight-1) );
1424 
1425   while( iNode>=0 ){
1426     TreeNode *pNode = pCsr->apTreeNode[iNode];
1427     int iCell = pCsr->aiCell[iNode]-1;
1428     if( iCell>=0 && pNode->aiKeyPtr[iCell] ){
1429       int rc = LSM_OK;
1430       TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
1431       assert( rc==LSM_OK );
1432       return ((pKey->flags & LSM_START_DELETE) ? 1 : 0);
1433     }
1434     iNode--;
1435   }
1436 
1437   return 0;
1438 }
1439 
1440 
treeInsertEntry(lsm_db * pDb,int flags,void * pKey,int nKey,void * pVal,int nVal)1441 static int treeInsertEntry(
1442   lsm_db *pDb,                    /* Database handle */
1443   int flags,                      /* Flags associated with entry */
1444   void *pKey,                     /* Pointer to key data */
1445   int nKey,                       /* Size of key data in bytes */
1446   void *pVal,                     /* Pointer to value data (or NULL) */
1447   int nVal                        /* Bytes in value data (or -ve for delete) */
1448 ){
1449   int rc = LSM_OK;                /* Return Code */
1450   TreeKey *pTreeKey;              /* New key-value being inserted */
1451   u32 iTreeKey;
1452   TreeRoot *p = &pDb->treehdr.root;
1453   TreeCursor csr;                 /* Cursor to seek to pKey/nKey */
1454   int res = 0;                    /* Result of seek operation on csr */
1455 
1456   assert( nVal>=0 || pVal==0 );
1457   assert_tree_looks_ok(LSM_OK, pTree);
1458   assert( flags==LSM_INSERT       || flags==LSM_POINT_DELETE
1459        || flags==LSM_START_DELETE || flags==LSM_END_DELETE
1460   );
1461   assert( (flags & LSM_CONTIGUOUS)==0 );
1462 #if 0
1463   dump_tree_contents(pDb, "before");
1464 #endif
1465 
1466   if( p->iRoot ){
1467     TreeKey *pRes;                /* Key at end of seek operation */
1468     treeCursorInit(pDb, 0, &csr);
1469 
1470     /* Seek to the leaf (or internal node) that the new key belongs on */
1471     rc = lsmTreeCursorSeek(&csr, pKey, nKey, &res);
1472     pRes = csrGetKey(&csr, &csr.blob, &rc);
1473     if( rc!=LSM_OK ) return rc;
1474     assert( pRes );
1475 
1476     if( flags==LSM_START_DELETE ){
1477       /* When inserting a start-delete-range entry, if the key that
1478       ** occurs immediately before the new entry is already a START_DELETE,
1479       ** then the new entry is not required.  */
1480       if( (res<=0 && (pRes->flags & LSM_START_DELETE))
1481        || (res>0  && treePrevIsStartDelete(pDb, &csr))
1482       ){
1483         goto insert_entry_out;
1484       }
1485     }else if( flags==LSM_END_DELETE ){
1486       /* When inserting an start-delete-range entry, if the key that
1487       ** occurs immediately after the new entry is already an END_DELETE,
1488       ** then the new entry is not required.  */
1489       if( (res<0  && treeNextIsEndDelete(pDb, &csr))
1490        || (res>=0 && (pRes->flags & LSM_END_DELETE))
1491       ){
1492         goto insert_entry_out;
1493       }
1494     }
1495 
1496     if( res==0 && (flags & (LSM_END_DELETE|LSM_START_DELETE)) ){
1497       if( pRes->flags & LSM_INSERT ){
1498         nVal = pRes->nValue;
1499         pVal = TKV_VAL(pRes);
1500       }
1501       flags = flags | pRes->flags;
1502     }
1503 
1504     if( flags & (LSM_INSERT|LSM_POINT_DELETE) ){
1505       if( (res<0 && (pRes->flags & LSM_START_DELETE))
1506        || (res>0 && (pRes->flags & LSM_END_DELETE))
1507       ){
1508         flags = flags | (LSM_END_DELETE|LSM_START_DELETE);
1509       }else if( res==0 ){
1510         flags = flags | (pRes->flags & (LSM_END_DELETE|LSM_START_DELETE));
1511       }
1512     }
1513   }else{
1514     memset(&csr, 0, sizeof(TreeCursor));
1515   }
1516 
1517   /* Allocate and populate a new key-value pair structure */
1518   pTreeKey = newTreeKey(pDb, &iTreeKey, pKey, nKey, pVal, nVal, &rc);
1519   if( rc!=LSM_OK ) return rc;
1520   assert( pTreeKey->flags==0 || pTreeKey->flags==LSM_CONTIGUOUS );
1521   pTreeKey->flags |= flags;
1522 
1523   if( p->iRoot==0 ){
1524     /* The tree is completely empty. Add a new root node and install
1525     ** (pKey/nKey) as the middle entry. Even though it is a leaf at the
1526     ** moment, use newTreeNode() to allocate the node (i.e. allocate enough
1527     ** space for the fields used by interior nodes). This is because the
1528     ** treeInsert() routine may convert this node to an interior node. */
1529     TreeNode *pRoot = newTreeNode(pDb, &p->iRoot, &rc);
1530     if( rc==LSM_OK ){
1531       assert( p->nHeight==0 );
1532       pRoot->aiKeyPtr[1] = iTreeKey;
1533       p->nHeight = 1;
1534     }
1535   }else{
1536     if( res==0 ){
1537       /* The search found a match within the tree. */
1538       treeOverwriteKey(pDb, &csr, iTreeKey, &rc);
1539     }else{
1540       /* The cursor now points to the leaf node into which the new entry should
1541       ** be inserted. There may or may not be a free slot within the leaf for
1542       ** the new key-value pair.
1543       **
1544       ** iSlot is set to the index of the key within pLeaf that the new key
1545       ** should be inserted to the left of (or to a value 1 greater than the
1546       ** index of the rightmost key if the new key is larger than all keys
1547       ** currently stored in the node).
1548       */
1549       int iSlot = csr.aiCell[csr.iNode] + (res<0);
1550       if( csr.iNode==0 ){
1551         rc = treeInsert(pDb, &csr, 0, iTreeKey, 0, iSlot);
1552       }else{
1553         rc = treeInsertLeaf(pDb, &csr, iTreeKey, iSlot);
1554       }
1555     }
1556   }
1557 
1558 #if 0
1559   dump_tree_contents(pDb, "after");
1560 #endif
1561  insert_entry_out:
1562   tblobFree(pDb, &csr.blob);
1563   assert_tree_looks_ok(rc, pTree);
1564   return rc;
1565 }
1566 
1567 /*
1568 ** Insert a new entry into the in-memory tree.
1569 **
1570 ** If the value of the 5th parameter, nVal, is negative, then a delete-marker
1571 ** is inserted into the tree. In this case the value pointer, pVal, must be
1572 ** NULL.
1573 */
lsmTreeInsert(lsm_db * pDb,void * pKey,int nKey,void * pVal,int nVal)1574 int lsmTreeInsert(
1575   lsm_db *pDb,                    /* Database handle */
1576   void *pKey,                     /* Pointer to key data */
1577   int nKey,                       /* Size of key data in bytes */
1578   void *pVal,                     /* Pointer to value data (or NULL) */
1579   int nVal                        /* Bytes in value data (or -ve for delete) */
1580 ){
1581   int flags;
1582   if( nVal<0 ){
1583     flags = LSM_POINT_DELETE;
1584   }else{
1585     flags = LSM_INSERT;
1586   }
1587 
1588   return treeInsertEntry(pDb, flags, pKey, nKey, pVal, nVal);
1589 }
1590 
treeDeleteEntry(lsm_db * db,TreeCursor * pCsr,u32 iNewptr)1591 static int treeDeleteEntry(lsm_db *db, TreeCursor *pCsr, u32 iNewptr){
1592   TreeRoot *p = &db->treehdr.root;
1593   TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
1594   int iSlot = pCsr->aiCell[pCsr->iNode];
1595   int bLeaf;
1596   int rc = LSM_OK;
1597 
1598   assert( pNode->aiKeyPtr[1] );
1599   assert( pNode->aiKeyPtr[iSlot] );
1600   assert( iSlot==0 || iSlot==1 || iSlot==2 );
1601   assert( ((u32)pCsr->iNode==(db->treehdr.root.nHeight-1))==(iNewptr==0) );
1602 
1603   bLeaf = ((u32)pCsr->iNode==(p->nHeight-1) && p->nHeight>1);
1604 
1605   if( pNode->aiKeyPtr[0] || pNode->aiKeyPtr[2] ){
1606     /* There are currently at least 2 keys on this node. So just create
1607     ** a new copy of the node with one of the keys removed. If the node
1608     ** happens to be the root node of the tree, allocate an entire
1609     ** TreeNode structure instead of just a TreeLeaf.  */
1610     TreeNode *pNew;
1611     u32 iNew;
1612 
1613     if( bLeaf ){
1614       pNew = (TreeNode *)newTreeLeaf(db, &iNew, &rc);
1615     }else{
1616       pNew = newTreeNode(db, &iNew, &rc);
1617     }
1618     if( pNew ){
1619       int i;
1620       int iOut = 1;
1621       for(i=0; i<4; i++){
1622         if( i==iSlot ){
1623           i++;
1624           if( bLeaf==0 ) pNew->aiChildPtr[iOut] = iNewptr;
1625           if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
1626           iOut++;
1627         }else if( bLeaf || p->nHeight==1 ){
1628           if( i<3 && pNode->aiKeyPtr[i] ){
1629             pNew->aiKeyPtr[iOut++] = pNode->aiKeyPtr[i];
1630           }
1631         }else{
1632           if( getChildPtr(pNode, WORKING_VERSION, i) ){
1633             pNew->aiChildPtr[iOut] = getChildPtr(pNode, WORKING_VERSION, i);
1634             if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
1635             iOut++;
1636           }
1637         }
1638       }
1639       assert( iOut<=4 );
1640       assert( bLeaf || pNew->aiChildPtr[0]==0 );
1641       pCsr->iNode--;
1642       rc = treeUpdatePtr(db, pCsr, iNew);
1643     }
1644 
1645   }else if( pCsr->iNode==0 ){
1646     /* Removing the only key in the root node. iNewptr is the new root. */
1647     assert( iSlot==1 );
1648     db->treehdr.root.iRoot = iNewptr;
1649     db->treehdr.root.nHeight--;
1650 
1651   }else{
1652     /* There is only one key on this node and the node is not the root
1653     ** node. Find a peer for this node. Then redistribute the contents of
1654     ** the peer and the parent cell between the parent and either one or
1655     ** two new nodes.  */
1656     TreeNode *pParent;            /* Parent tree node */
1657     int iPSlot;
1658     u32 iPeer;                    /* Pointer to peer leaf node */
1659     int iDir;
1660     TreeNode *pPeer;              /* The peer leaf node */
1661     TreeNode *pNew1; u32 iNew1;   /* First new leaf node */
1662 
1663     assert( iSlot==1 );
1664 
1665     pParent = pCsr->apTreeNode[pCsr->iNode-1];
1666     iPSlot = pCsr->aiCell[pCsr->iNode-1];
1667 
1668     if( iPSlot>0 && getChildPtr(pParent, WORKING_VERSION, iPSlot-1) ){
1669       iDir = -1;
1670     }else{
1671       iDir = +1;
1672     }
1673     iPeer = getChildPtr(pParent, WORKING_VERSION, iPSlot+iDir);
1674     pPeer = (TreeNode *)treeShmptr(db, iPeer);
1675     assertIsWorkingChild(db, pNode, pParent, iPSlot);
1676 
1677     /* Allocate the first new leaf node. This is always required. */
1678     if( bLeaf ){
1679       pNew1 = (TreeNode *)newTreeLeaf(db, &iNew1, &rc);
1680     }else{
1681       pNew1 = (TreeNode *)newTreeNode(db, &iNew1, &rc);
1682     }
1683 
1684     if( pPeer->aiKeyPtr[0] && pPeer->aiKeyPtr[2] ){
1685       /* Peer node is completely full. This means that two new leaf nodes
1686       ** and a new parent node are required. */
1687 
1688       TreeNode *pNew2; u32 iNew2; /* Second new leaf node */
1689       TreeNode *pNewP; u32 iNewP; /* New parent node */
1690 
1691       if( bLeaf ){
1692         pNew2 = (TreeNode *)newTreeLeaf(db, &iNew2, &rc);
1693       }else{
1694         pNew2 = (TreeNode *)newTreeNode(db, &iNew2, &rc);
1695       }
1696       pNewP = copyTreeNode(db, pParent, &iNewP, &rc);
1697 
1698       if( iDir==-1 ){
1699         pNew1->aiKeyPtr[1] = pPeer->aiKeyPtr[0];
1700         if( bLeaf==0 ){
1701           pNew1->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 0);
1702           pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 1);
1703         }
1704 
1705         pNewP->aiChildPtr[iPSlot-1] = iNew1;
1706         pNewP->aiKeyPtr[iPSlot-1] = pPeer->aiKeyPtr[1];
1707         pNewP->aiChildPtr[iPSlot] = iNew2;
1708 
1709         pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[2];
1710         pNew2->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot-1];
1711         if( bLeaf==0 ){
1712           pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 2);
1713           pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 3);
1714           pNew2->aiChildPtr[2] = iNewptr;
1715         }
1716       }else{
1717         pNew1->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot];
1718         if( bLeaf==0 ){
1719           pNew1->aiChildPtr[1] = iNewptr;
1720           pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 0);
1721         }
1722 
1723         pNewP->aiChildPtr[iPSlot] = iNew1;
1724         pNewP->aiKeyPtr[iPSlot] = pPeer->aiKeyPtr[0];
1725         pNewP->aiChildPtr[iPSlot+1] = iNew2;
1726 
1727         pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[1];
1728         pNew2->aiKeyPtr[1] = pPeer->aiKeyPtr[2];
1729         if( bLeaf==0 ){
1730           pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 1);
1731           pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 2);
1732           pNew2->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 3);
1733         }
1734       }
1735       assert( pCsr->iNode>=1 );
1736       pCsr->iNode -= 2;
1737       if( rc==LSM_OK ){
1738         assert( pNew1->aiKeyPtr[1] && pNew2->aiKeyPtr[1] );
1739         rc = treeUpdatePtr(db, pCsr, iNewP);
1740       }
1741     }else{
1742       int iKOut = 0;
1743       int iPOut = 0;
1744       int i;
1745 
1746       pCsr->iNode--;
1747 
1748       if( iDir==1 ){
1749         pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
1750         if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
1751       }
1752       for(i=0; i<3; i++){
1753         if( pPeer->aiKeyPtr[i] ){
1754           pNew1->aiKeyPtr[iKOut++] = pPeer->aiKeyPtr[i];
1755         }
1756       }
1757       if( bLeaf==0 ){
1758         for(i=0; i<4; i++){
1759           if( getChildPtr(pPeer, WORKING_VERSION, i) ){
1760             pNew1->aiChildPtr[iPOut++] = getChildPtr(pPeer, WORKING_VERSION, i);
1761           }
1762         }
1763       }
1764       if( iDir==-1 ){
1765         iPSlot--;
1766         pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
1767         if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
1768         pCsr->aiCell[pCsr->iNode] = (u8)iPSlot;
1769       }
1770 
1771       rc = treeDeleteEntry(db, pCsr, iNew1);
1772     }
1773   }
1774 
1775   return rc;
1776 }
1777 
1778 /*
1779 ** Delete a range of keys from the tree structure (i.e. the lsm_delete_range()
1780 ** function, not lsm_delete()).
1781 **
1782 ** This is a two step process:
1783 **
1784 **     1) Remove all entries currently stored in the tree that have keys
1785 **        that fall into the deleted range.
1786 **
1787 **        TODO: There are surely good ways to optimize this step - removing
1788 **        a range of keys from a b-tree. But for now, this function removes
1789 **        them one at a time using the usual approach.
1790 **
1791 **     2) Unless the largest key smaller than or equal to (pKey1/nKey1) is
1792 **        already marked as START_DELETE, insert a START_DELETE key.
1793 **        Similarly, unless the smallest key greater than or equal to
1794 **        (pKey2/nKey2) is already START_END, insert a START_END key.
1795 */
lsmTreeDelete(lsm_db * db,void * pKey1,int nKey1,void * pKey2,int nKey2)1796 int lsmTreeDelete(
1797   lsm_db *db,
1798   void *pKey1, int nKey1,         /* Start of range */
1799   void *pKey2, int nKey2          /* End of range */
1800 ){
1801   int rc = LSM_OK;
1802   int bDone = 0;
1803   TreeRoot *p = &db->treehdr.root;
1804   TreeBlob blob = {0, 0};
1805 
1806   /* The range must be sensible - that (key1 < key2). */
1807   assert( treeKeycmp(pKey1, nKey1, pKey2, nKey2)<0 );
1808   assert( assert_delete_ranges_match(db) );
1809 
1810 #if 0
1811   static int nCall = 0;
1812   printf("\n");
1813   nCall++;
1814   printf("%d delete %s .. %s\n", nCall, (char *)pKey1, (char *)pKey2);
1815   dump_tree_contents(db, "before delete");
1816 #endif
1817 
1818   /* Step 1. This loop runs until the tree contains no keys within the
1819   ** range being deleted. Or until an error occurs. */
1820   while( bDone==0 && rc==LSM_OK ){
1821     int res;
1822     TreeCursor csr;               /* Cursor to seek to first key in range */
1823     void *pDel; int nDel;         /* Key to (possibly) delete this iteration */
1824 #ifndef NDEBUG
1825     int nEntry = treeCountEntries(db);
1826 #endif
1827 
1828     /* Seek the cursor to the first entry in the tree greater than pKey1. */
1829     treeCursorInit(db, 0, &csr);
1830     lsmTreeCursorSeek(&csr, pKey1, nKey1, &res);
1831     if( res<=0 && lsmTreeCursorValid(&csr) ) lsmTreeCursorNext(&csr);
1832 
1833     /* If there is no such entry, or if it is greater than pKey2, then the
1834     ** tree now contains no keys in the range being deleted. In this case
1835     ** break out of the loop.  */
1836     bDone = 1;
1837     if( lsmTreeCursorValid(&csr) ){
1838       lsmTreeCursorKey(&csr, 0, &pDel, &nDel);
1839       if( treeKeycmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0;
1840     }
1841 
1842     if( bDone==0 ){
1843       if( (u32)csr.iNode==(p->nHeight-1) ){
1844         /* The element to delete already lies on a leaf node */
1845         rc = treeDeleteEntry(db, &csr, 0);
1846       }else{
1847         /* 1. Overwrite the current key with a copy of the next key in the
1848         **    tree (key N).
1849         **
1850         ** 2. Seek to key N (cursor will stop at the internal node copy of
1851         **    N). Move to the next key (original copy of N). Delete
1852         **    this entry.
1853         */
1854         u32 iKey;
1855         TreeKey *pKey;
1856         int iNode = csr.iNode;
1857         lsmTreeCursorNext(&csr);
1858         assert( (u32)csr.iNode==(p->nHeight-1) );
1859 
1860         iKey = csr.apTreeNode[csr.iNode]->aiKeyPtr[csr.aiCell[csr.iNode]];
1861         lsmTreeCursorPrev(&csr);
1862 
1863         treeOverwriteKey(db, &csr, iKey, &rc);
1864         pKey = treeShmkey(db, iKey, TKV_LOADKEY, &blob, &rc);
1865         if( pKey ){
1866           rc = lsmTreeCursorSeek(&csr, TKV_KEY(pKey), pKey->nKey, &res);
1867         }
1868         if( rc==LSM_OK ){
1869           assert( res==0 && csr.iNode==iNode );
1870           rc = lsmTreeCursorNext(&csr);
1871           if( rc==LSM_OK ){
1872             rc = treeDeleteEntry(db, &csr, 0);
1873           }
1874         }
1875       }
1876     }
1877 
1878     /* Clean up any memory allocated by the cursor. */
1879     tblobFree(db, &csr.blob);
1880 #if 0
1881     dump_tree_contents(db, "ddd delete");
1882 #endif
1883     assert( bDone || treeCountEntries(db)==(nEntry-1) );
1884   }
1885 
1886 #if 0
1887   dump_tree_contents(db, "during delete");
1888 #endif
1889 
1890   /* Now insert the START_DELETE and END_DELETE keys. */
1891   if( rc==LSM_OK ){
1892     rc = treeInsertEntry(db, LSM_START_DELETE, pKey1, nKey1, 0, -1);
1893   }
1894 #if 0
1895   dump_tree_contents(db, "during delete 2");
1896 #endif
1897   if( rc==LSM_OK ){
1898     rc = treeInsertEntry(db, LSM_END_DELETE, pKey2, nKey2, 0, -1);
1899   }
1900 
1901 #if 0
1902   dump_tree_contents(db, "after delete");
1903 #endif
1904 
1905   tblobFree(db, &blob);
1906   assert( assert_delete_ranges_match(db) );
1907   return rc;
1908 }
1909 
1910 /*
1911 ** Return, in bytes, the amount of memory currently used by the tree
1912 ** structure.
1913 */
lsmTreeSize(lsm_db * pDb)1914 int lsmTreeSize(lsm_db *pDb){
1915   return pDb->treehdr.root.nByte;
1916 }
1917 
1918 /*
1919 ** Open a cursor on the in-memory tree pTree.
1920 */
lsmTreeCursorNew(lsm_db * pDb,int bOld,TreeCursor ** ppCsr)1921 int lsmTreeCursorNew(lsm_db *pDb, int bOld, TreeCursor **ppCsr){
1922   TreeCursor *pCsr;
1923   *ppCsr = pCsr = lsmMalloc(pDb->pEnv, sizeof(TreeCursor));
1924   if( pCsr ){
1925     treeCursorInit(pDb, bOld, pCsr);
1926     return LSM_OK;
1927   }
1928   return LSM_NOMEM_BKPT;
1929 }
1930 
1931 /*
1932 ** Close an in-memory tree cursor.
1933 */
lsmTreeCursorDestroy(TreeCursor * pCsr)1934 void lsmTreeCursorDestroy(TreeCursor *pCsr){
1935   if( pCsr ){
1936     tblobFree(pCsr->pDb, &pCsr->blob);
1937     lsmFree(pCsr->pDb->pEnv, pCsr);
1938   }
1939 }
1940 
lsmTreeCursorReset(TreeCursor * pCsr)1941 void lsmTreeCursorReset(TreeCursor *pCsr){
1942   if( pCsr ){
1943     pCsr->iNode = -1;
1944     pCsr->pSave = 0;
1945   }
1946 }
1947 
1948 #ifndef NDEBUG
treeCsrCompare(TreeCursor * pCsr,void * pKey,int nKey,int * pRc)1949 static int treeCsrCompare(TreeCursor *pCsr, void *pKey, int nKey, int *pRc){
1950   TreeKey *p;
1951   int cmp = 0;
1952   assert( pCsr->iNode>=0 );
1953   p = csrGetKey(pCsr, &pCsr->blob, pRc);
1954   if( p ){
1955     cmp = treeKeycmp(TKV_KEY(p), p->nKey, pKey, nKey);
1956   }
1957   return cmp;
1958 }
1959 #endif
1960 
1961 
1962 /*
1963 ** Attempt to seek the cursor passed as the first argument to key (pKey/nKey)
1964 ** in the tree structure. If an exact match for the key is found, leave the
1965 ** cursor pointing to it and set *pRes to zero before returning. If an
1966 ** exact match cannot be found, do one of the following:
1967 **
1968 **   * Leave the cursor pointing to the smallest element in the tree that
1969 **     is larger than the key and set *pRes to +1, or
1970 **
1971 **   * Leave the cursor pointing to the largest element in the tree that
1972 **     is smaller than the key and set *pRes to -1, or
1973 **
1974 **   * If the tree is empty, leave the cursor at EOF and set *pRes to -1.
1975 */
lsmTreeCursorSeek(TreeCursor * pCsr,void * pKey,int nKey,int * pRes)1976 int lsmTreeCursorSeek(TreeCursor *pCsr, void *pKey, int nKey, int *pRes){
1977   int rc = LSM_OK;                /* Return code */
1978   lsm_db *pDb = pCsr->pDb;
1979   TreeRoot *pRoot = pCsr->pRoot;
1980   u32 iNodePtr;                   /* Location of current node in search */
1981 
1982   /* Discard any saved position data */
1983   treeCursorRestore(pCsr, 0);
1984 
1985   iNodePtr = pRoot->iRoot;
1986   if( iNodePtr==0 ){
1987     /* Either an error occurred or the tree is completely empty. */
1988     assert( rc!=LSM_OK || pRoot->iRoot==0 );
1989     *pRes = -1;
1990     pCsr->iNode = -1;
1991   }else{
1992     TreeBlob b = {0, 0};
1993     int res = 0;                  /* Result of comparison function */
1994     int iNode = -1;
1995     while( iNodePtr ){
1996       TreeNode *pNode;            /* Node at location iNodePtr */
1997       int iTest;                  /* Index of second key to test (0 or 2) */
1998       u32 iTreeKey;
1999       TreeKey *pTreeKey;          /* Key to compare against */
2000 
2001       pNode = (TreeNode *)treeShmptrUnsafe(pDb, iNodePtr);
2002       iNode++;
2003       pCsr->apTreeNode[iNode] = pNode;
2004 
2005       /* Compare (pKey/nKey) with the key in the middle slot of B-tree node
2006       ** pNode. The middle slot is never empty. If the comparison is a match,
2007       ** then the search is finished. Break out of the loop. */
2008       pTreeKey = (TreeKey*)treeShmptrUnsafe(pDb, pNode->aiKeyPtr[1]);
2009       if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){
2010         pTreeKey = treeShmkey(pDb, pNode->aiKeyPtr[1], TKV_LOADKEY, &b, &rc);
2011         if( rc!=LSM_OK ) break;
2012       }
2013       res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey);
2014       if( res==0 ){
2015         pCsr->aiCell[iNode] = 1;
2016         break;
2017       }
2018 
2019       /* Based on the results of the previous comparison, compare (pKey/nKey)
2020       ** to either the left or right key of the B-tree node, if such a key
2021       ** exists. */
2022       iTest = (res>0 ? 0 : 2);
2023       iTreeKey = pNode->aiKeyPtr[iTest];
2024       if( iTreeKey ){
2025         pTreeKey = (TreeKey*)treeShmptrUnsafe(pDb, iTreeKey);
2026         if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){
2027           pTreeKey = treeShmkey(pDb, iTreeKey, TKV_LOADKEY, &b, &rc);
2028           if( rc ) break;
2029         }
2030         res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey);
2031         if( res==0 ){
2032           pCsr->aiCell[iNode] = (u8)iTest;
2033           break;
2034         }
2035       }else{
2036         iTest = 1;
2037       }
2038 
2039       if( (u32)iNode<(pRoot->nHeight-1) ){
2040         iNodePtr = getChildPtr(pNode, pRoot->iTransId, iTest + (res<0));
2041       }else{
2042         iNodePtr = 0;
2043       }
2044       pCsr->aiCell[iNode] = (u8)(iTest + (iNodePtr && (res<0)));
2045     }
2046 
2047     *pRes = res;
2048     pCsr->iNode = iNode;
2049     tblobFree(pDb, &b);
2050   }
2051 
2052   /* assert() that *pRes has been set properly */
2053 #ifndef NDEBUG
2054   if( rc==LSM_OK && lsmTreeCursorValid(pCsr) ){
2055     int cmp = treeCsrCompare(pCsr, pKey, nKey, &rc);
2056     assert( rc!=LSM_OK || *pRes==cmp || (*pRes ^ cmp)>0 );
2057   }
2058 #endif
2059 
2060   return rc;
2061 }
2062 
lsmTreeCursorNext(TreeCursor * pCsr)2063 int lsmTreeCursorNext(TreeCursor *pCsr){
2064 #ifndef NDEBUG
2065   TreeKey *pK1;
2066   TreeBlob key1 = {0, 0};
2067 #endif
2068   lsm_db *pDb = pCsr->pDb;
2069   TreeRoot *pRoot = pCsr->pRoot;
2070   const int iLeaf = pRoot->nHeight-1;
2071   int iCell;
2072   int rc = LSM_OK;
2073   TreeNode *pNode;
2074 
2075   /* Restore the cursor position, if required */
2076   int iRestore = 0;
2077   treeCursorRestore(pCsr, &iRestore);
2078   if( iRestore>0 ) return LSM_OK;
2079 
2080   /* Save a pointer to the current key. This is used in an assert() at the
2081   ** end of this function - to check that the 'next' key really is larger
2082   ** than the current key. */
2083 #ifndef NDEBUG
2084   pK1 = csrGetKey(pCsr, &key1, &rc);
2085   if( rc!=LSM_OK ) return rc;
2086 #endif
2087 
2088   assert( lsmTreeCursorValid(pCsr) );
2089   assert( pCsr->aiCell[pCsr->iNode]<3 );
2090 
2091   pNode = pCsr->apTreeNode[pCsr->iNode];
2092   iCell = ++pCsr->aiCell[pCsr->iNode];
2093 
2094   /* If the current node is not a leaf, and the current cell has sub-tree
2095   ** associated with it, descend to the left-most key on the left-most
2096   ** leaf of the sub-tree.  */
2097   if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
2098     do {
2099       u32 iNodePtr;
2100       pCsr->iNode++;
2101       iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
2102       pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
2103       pCsr->apTreeNode[pCsr->iNode] = pNode;
2104       iCell = pCsr->aiCell[pCsr->iNode] = (pNode->aiKeyPtr[0]==0);
2105     }while( pCsr->iNode < iLeaf );
2106   }
2107 
2108   /* Otherwise, the next key is found by following pointer up the tree
2109   ** until there is a key immediately to the right of the pointer followed
2110   ** to reach the sub-tree containing the current key. */
2111   else if( iCell>=3 || pNode->aiKeyPtr[iCell]==0 ){
2112     while( (--pCsr->iNode)>=0 ){
2113       iCell = pCsr->aiCell[pCsr->iNode];
2114       if( iCell<3 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
2115     }
2116   }
2117 
2118 #ifndef NDEBUG
2119   if( pCsr->iNode>=0 ){
2120     TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
2121     assert( rc||treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)>=0 );
2122   }
2123   tblobFree(pDb, &key1);
2124 #endif
2125 
2126   return rc;
2127 }
2128 
lsmTreeCursorPrev(TreeCursor * pCsr)2129 int lsmTreeCursorPrev(TreeCursor *pCsr){
2130 #ifndef NDEBUG
2131   TreeKey *pK1;
2132   TreeBlob key1 = {0, 0};
2133 #endif
2134   lsm_db *pDb = pCsr->pDb;
2135   TreeRoot *pRoot = pCsr->pRoot;
2136   const int iLeaf = pRoot->nHeight-1;
2137   int iCell;
2138   int rc = LSM_OK;
2139   TreeNode *pNode;
2140 
2141   /* Restore the cursor position, if required */
2142   int iRestore = 0;
2143   treeCursorRestore(pCsr, &iRestore);
2144   if( iRestore<0 ) return LSM_OK;
2145 
2146   /* Save a pointer to the current key. This is used in an assert() at the
2147   ** end of this function - to check that the 'next' key really is smaller
2148   ** than the current key. */
2149 #ifndef NDEBUG
2150   pK1 = csrGetKey(pCsr, &key1, &rc);
2151   if( rc!=LSM_OK ) return rc;
2152 #endif
2153 
2154   assert( lsmTreeCursorValid(pCsr) );
2155   pNode = pCsr->apTreeNode[pCsr->iNode];
2156   iCell = pCsr->aiCell[pCsr->iNode];
2157   assert( iCell>=0 && iCell<3 );
2158 
2159   /* If the current node is not a leaf, and the current cell has sub-tree
2160   ** associated with it, descend to the right-most key on the right-most
2161   ** leaf of the sub-tree.  */
2162   if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
2163     do {
2164       u32 iNodePtr;
2165       pCsr->iNode++;
2166       iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
2167       pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
2168       if( rc!=LSM_OK ) break;
2169       pCsr->apTreeNode[pCsr->iNode] = pNode;
2170       iCell = 1 + (pNode->aiKeyPtr[2]!=0) + (pCsr->iNode < iLeaf);
2171       pCsr->aiCell[pCsr->iNode] = (u8)iCell;
2172     }while( pCsr->iNode < iLeaf );
2173   }
2174 
2175   /* Otherwise, the next key is found by following pointer up the tree until
2176   ** there is a key immediately to the left of the pointer followed to reach
2177   ** the sub-tree containing the current key. */
2178   else{
2179     do {
2180       iCell = pCsr->aiCell[pCsr->iNode]-1;
2181       if( iCell>=0 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
2182     }while( (--pCsr->iNode)>=0 );
2183     pCsr->aiCell[pCsr->iNode] = (u8)iCell;
2184   }
2185 
2186 #ifndef NDEBUG
2187   if( pCsr->iNode>=0 ){
2188     TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
2189     assert( rc || treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)<0 );
2190   }
2191   tblobFree(pDb, &key1);
2192 #endif
2193 
2194   return rc;
2195 }
2196 
2197 /*
2198 ** Move the cursor to the first (bLast==0) or last (bLast!=0) entry in the
2199 ** in-memory tree.
2200 */
lsmTreeCursorEnd(TreeCursor * pCsr,int bLast)2201 int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast){
2202   lsm_db *pDb = pCsr->pDb;
2203   TreeRoot *pRoot = pCsr->pRoot;
2204   int rc = LSM_OK;
2205 
2206   u32 iNodePtr;
2207   pCsr->iNode = -1;
2208 
2209   /* Discard any saved position data */
2210   treeCursorRestore(pCsr, 0);
2211 
2212   iNodePtr = pRoot->iRoot;
2213   while( iNodePtr ){
2214     int iCell;
2215     TreeNode *pNode;
2216 
2217     pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
2218     if( rc ) break;
2219 
2220     if( bLast ){
2221       iCell = ((pNode->aiKeyPtr[2]==0) ? 2 : 3);
2222     }else{
2223       iCell = ((pNode->aiKeyPtr[0]==0) ? 1 : 0);
2224     }
2225     pCsr->iNode++;
2226     pCsr->apTreeNode[pCsr->iNode] = pNode;
2227 
2228     if( (u32)pCsr->iNode<pRoot->nHeight-1 ){
2229       iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
2230     }else{
2231       iNodePtr = 0;
2232     }
2233     pCsr->aiCell[pCsr->iNode] = (u8)(iCell - (iNodePtr==0 && bLast));
2234   }
2235 
2236   return rc;
2237 }
2238 
lsmTreeCursorFlags(TreeCursor * pCsr)2239 int lsmTreeCursorFlags(TreeCursor *pCsr){
2240   int flags = 0;
2241   if( pCsr && pCsr->iNode>=0 ){
2242     int rc = LSM_OK;
2243     TreeKey *pKey = (TreeKey *)treeShmptrUnsafe(pCsr->pDb,
2244         pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]]
2245     );
2246     assert( rc==LSM_OK );
2247     flags = (pKey->flags & ~LSM_CONTIGUOUS);
2248   }
2249   return flags;
2250 }
2251 
lsmTreeCursorKey(TreeCursor * pCsr,int * pFlags,void ** ppKey,int * pnKey)2252 int lsmTreeCursorKey(TreeCursor *pCsr, int *pFlags, void **ppKey, int *pnKey){
2253   TreeKey *pTreeKey;
2254   int rc = LSM_OK;
2255 
2256   assert( lsmTreeCursorValid(pCsr) );
2257 
2258   pTreeKey = pCsr->pSave;
2259   if( !pTreeKey ){
2260     pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
2261   }
2262   if( rc==LSM_OK ){
2263     *pnKey = pTreeKey->nKey;
2264     if( pFlags ) *pFlags = pTreeKey->flags;
2265     *ppKey = (void *)&pTreeKey[1];
2266   }
2267 
2268   return rc;
2269 }
2270 
lsmTreeCursorValue(TreeCursor * pCsr,void ** ppVal,int * pnVal)2271 int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal){
2272   int res = 0;
2273   int rc;
2274 
2275   rc = treeCursorRestore(pCsr, &res);
2276   if( res==0 ){
2277     TreeKey *pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
2278     if( rc==LSM_OK ){
2279       if( pTreeKey->flags & LSM_INSERT ){
2280         *pnVal = pTreeKey->nValue;
2281         *ppVal = TKV_VAL(pTreeKey);
2282       }else{
2283         *ppVal = 0;
2284         *pnVal = -1;
2285       }
2286     }
2287   }else{
2288     *ppVal = 0;
2289     *pnVal = 0;
2290   }
2291 
2292   return rc;
2293 }
2294 
2295 /*
2296 ** Return true if the cursor currently points to a valid entry.
2297 */
lsmTreeCursorValid(TreeCursor * pCsr)2298 int lsmTreeCursorValid(TreeCursor *pCsr){
2299   return (pCsr && (pCsr->pSave || pCsr->iNode>=0));
2300 }
2301 
2302 /*
2303 ** Store a mark in *pMark. Later on, a call to lsmTreeRollback() with a
2304 ** pointer to the same TreeMark structure may be used to roll the tree
2305 ** contents back to their current state.
2306 */
lsmTreeMark(lsm_db * pDb,TreeMark * pMark)2307 void lsmTreeMark(lsm_db *pDb, TreeMark *pMark){
2308   pMark->iRoot = pDb->treehdr.root.iRoot;
2309   pMark->nHeight = pDb->treehdr.root.nHeight;
2310   pMark->iWrite = pDb->treehdr.iWrite;
2311   pMark->nChunk = pDb->treehdr.nChunk;
2312   pMark->iNextShmid = pDb->treehdr.iNextShmid;
2313   pMark->iRollback = intArraySize(&pDb->rollback);
2314 }
2315 
2316 /*
2317 ** Roll back to mark pMark. Structure *pMark should have been previously
2318 ** populated by a call to lsmTreeMark().
2319 */
lsmTreeRollback(lsm_db * pDb,TreeMark * pMark)2320 void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark){
2321   int iIdx;
2322   int nIdx;
2323   u32 iNext;
2324   ShmChunk *pChunk;
2325   u32 iChunk;
2326   u32 iShmid;
2327 
2328   /* Revert all required v2 pointers. */
2329   nIdx = intArraySize(&pDb->rollback);
2330   for(iIdx = pMark->iRollback; iIdx<nIdx; iIdx++){
2331     TreeNode *pNode;
2332     pNode = treeShmptr(pDb, intArrayEntry(&pDb->rollback, iIdx));
2333     assert( pNode );
2334     pNode->iV2 = 0;
2335     pNode->iV2Child = 0;
2336     pNode->iV2Ptr = 0;
2337   }
2338   intArrayTruncate(&pDb->rollback, pMark->iRollback);
2339 
2340   /* Restore the free-chunk list. */
2341   assert( pMark->iWrite!=0 );
2342   iChunk = treeOffsetToChunk(pMark->iWrite-1);
2343   pChunk = treeShmChunk(pDb, iChunk);
2344   iNext = pChunk->iNext;
2345   pChunk->iNext = 0;
2346 
2347   pChunk = treeShmChunk(pDb, pDb->treehdr.iFirst);
2348   iShmid = pChunk->iShmid-1;
2349 
2350   while( iNext ){
2351     u32 iFree = iNext;            /* Current chunk being rollback-freed */
2352     ShmChunk *pFree;              /* Pointer to chunk iFree */
2353 
2354     pFree = treeShmChunk(pDb, iFree);
2355     iNext = pFree->iNext;
2356 
2357     if( iFree<pMark->nChunk ){
2358       pFree->iNext = pDb->treehdr.iFirst;
2359       pFree->iShmid = iShmid--;
2360       pDb->treehdr.iFirst = iFree;
2361     }
2362   }
2363 
2364   /* Restore the tree-header fields */
2365   pDb->treehdr.root.iRoot = pMark->iRoot;
2366   pDb->treehdr.root.nHeight = pMark->nHeight;
2367   pDb->treehdr.iWrite = pMark->iWrite;
2368   pDb->treehdr.nChunk = pMark->nChunk;
2369   pDb->treehdr.iNextShmid = pMark->iNextShmid;
2370 }
2371 
2372 /*
2373 ** Load the in-memory tree header from shared-memory into pDb->treehdr.
2374 ** If the header cannot be loaded, return LSM_PROTOCOL.
2375 **
2376 ** If the header is successfully loaded and parameter piRead is not NULL,
2377 ** is is set to 1 if the header was loaded from ShmHeader.hdr1, or 2 if
2378 ** the header was loaded from ShmHeader.hdr2.
2379 */
lsmTreeLoadHeader(lsm_db * pDb,int * piRead)2380 int lsmTreeLoadHeader(lsm_db *pDb, int *piRead){
2381   int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL;
2382   while( (nRem--)>0 ){
2383     ShmHeader *pShm = pDb->pShmhdr;
2384 
2385     memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
2386     if( treeHeaderChecksumOk(&pDb->treehdr) ){
2387       if( piRead ) *piRead = 1;
2388       return LSM_OK;
2389     }
2390     memcpy(&pDb->treehdr, &pShm->hdr2, sizeof(TreeHeader));
2391     if( treeHeaderChecksumOk(&pDb->treehdr) ){
2392       if( piRead ) *piRead = 2;
2393       return LSM_OK;
2394     }
2395 
2396     lsmShmBarrier(pDb);
2397   }
2398   return LSM_PROTOCOL_BKPT;
2399 }
2400 
lsmTreeLoadHeaderOk(lsm_db * pDb,int iRead)2401 int lsmTreeLoadHeaderOk(lsm_db *pDb, int iRead){
2402   TreeHeader *p = (iRead==1) ? &pDb->pShmhdr->hdr1 : &pDb->pShmhdr->hdr2;
2403   assert( iRead==1 || iRead==2 );
2404   return (0==memcmp(pDb->treehdr.aCksum, p->aCksum, sizeof(u32)*2));
2405 }
2406 
2407 /*
2408 ** This function is called to conclude a transaction. If argument bCommit
2409 ** is true, the transaction is committed. Otherwise it is rolled back.
2410 */
lsmTreeEndTransaction(lsm_db * pDb,int bCommit)2411 int lsmTreeEndTransaction(lsm_db *pDb, int bCommit){
2412   ShmHeader *pShm = pDb->pShmhdr;
2413 
2414   treeHeaderChecksum(&pDb->treehdr, pDb->treehdr.aCksum);
2415   memcpy(&pShm->hdr2, &pDb->treehdr, sizeof(TreeHeader));
2416   lsmShmBarrier(pDb);
2417   memcpy(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader));
2418   pShm->bWriter = 0;
2419   intArrayFree(pDb->pEnv, &pDb->rollback);
2420 
2421   return LSM_OK;
2422 }
2423 
2424 #ifndef NDEBUG
assert_delete_ranges_match(lsm_db * db)2425 static int assert_delete_ranges_match(lsm_db *db){
2426   int prev = 0;
2427   TreeBlob blob = {0, 0};
2428   TreeCursor csr;               /* Cursor used to iterate through tree */
2429   int rc;
2430 
2431   treeCursorInit(db, 0, &csr);
2432   for( rc = lsmTreeCursorEnd(&csr, 0);
2433        rc==LSM_OK && lsmTreeCursorValid(&csr);
2434        rc = lsmTreeCursorNext(&csr)
2435   ){
2436     TreeKey *pKey = csrGetKey(&csr, &blob, &rc);
2437     if( rc!=LSM_OK ) break;
2438     assert( ((prev&LSM_START_DELETE)==0)==((pKey->flags&LSM_END_DELETE)==0) );
2439     prev = pKey->flags;
2440   }
2441 
2442   tblobFree(csr.pDb, &csr.blob);
2443   tblobFree(csr.pDb, &blob);
2444 
2445   return 1;
2446 }
2447 
treeCountEntries(lsm_db * db)2448 static int treeCountEntries(lsm_db *db){
2449   TreeCursor csr;               /* Cursor used to iterate through tree */
2450   int rc;
2451   int nEntry = 0;
2452 
2453   treeCursorInit(db, 0, &csr);
2454   for( rc = lsmTreeCursorEnd(&csr, 0);
2455        rc==LSM_OK && lsmTreeCursorValid(&csr);
2456        rc = lsmTreeCursorNext(&csr)
2457   ){
2458     nEntry++;
2459   }
2460 
2461   tblobFree(csr.pDb, &csr.blob);
2462 
2463   return nEntry;
2464 }
2465 #endif
2466