xref: /sqlite-3.40.0/ext/lsm1/lsm_shared.c (revision a3bc8425)
1 /*
2 ** 2012-01-23
3 **
4 ** The author disclaims copyright to this source code.  In place of
5 ** a legal notice, here is a blessing:
6 **
7 **    May you do good and not evil.
8 **    May you find forgiveness for yourself and forgive others.
9 **    May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 **
13 ** Utilities used to help multiple LSM clients to coexist within the
14 ** same process space.
15 */
16 #include "lsmInt.h"
17 
18 /*
19 ** Global data. All global variables used by code in this file are grouped
20 ** into the following structure instance.
21 **
22 ** pDatabase:
23 **   Linked list of all Database objects allocated within this process.
24 **   This list may not be traversed without holding the global mutex (see
25 **   functions enterGlobalMutex() and leaveGlobalMutex()).
26 */
27 static struct SharedData {
28   Database *pDatabase;            /* Linked list of all Database objects */
29 } gShared;
30 
31 /*
32 ** Database structure. There is one such structure for each distinct
33 ** database accessed by this process. They are stored in the singly linked
34 ** list starting at global variable gShared.pDatabase. Database objects are
35 ** reference counted. Once the number of connections to the associated
36 ** database drops to zero, they are removed from the linked list and deleted.
37 **
38 ** pFile:
39 **   In multi-process mode, this file descriptor is used to obtain locks
40 **   and to access shared-memory. In single process mode, its only job is
41 **   to hold the exclusive lock on the file.
42 **
43 */
44 struct Database {
45   /* Protected by the global mutex (enterGlobalMutex/leaveGlobalMutex): */
46   char *zName;                    /* Canonical path to database file */
47   int nName;                      /* strlen(zName) */
48   int nDbRef;                     /* Number of associated lsm_db handles */
49   Database *pDbNext;              /* Next Database structure in global list */
50 
51   /* Protected by the local mutex (pClientMutex) */
52   int bReadonly;                  /* True if Database.pFile is read-only */
53   int bMultiProc;                 /* True if running in multi-process mode */
54   lsm_file *pFile;                /* Used for locks/shm in multi-proc mode */
55   LsmFile *pLsmFile;              /* List of deferred closes */
56   lsm_mutex *pClientMutex;        /* Protects the apShmChunk[] and pConn */
57   int nShmChunk;                  /* Number of entries in apShmChunk[] array */
58   void **apShmChunk;              /* Array of "shared" memory regions */
59   lsm_db *pConn;                  /* List of connections to this db. */
60 };
61 
62 /*
63 ** Functions to enter and leave the global mutex. This mutex is used
64 ** to protect the global linked-list headed at gShared.pDatabase.
65 */
enterGlobalMutex(lsm_env * pEnv)66 static int enterGlobalMutex(lsm_env *pEnv){
67   lsm_mutex *p;
68   int rc = lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
69   if( rc==LSM_OK ) lsmMutexEnter(pEnv, p);
70   return rc;
71 }
leaveGlobalMutex(lsm_env * pEnv)72 static void leaveGlobalMutex(lsm_env *pEnv){
73   lsm_mutex *p;
74   lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
75   lsmMutexLeave(pEnv, p);
76 }
77 
78 #ifdef LSM_DEBUG
holdingGlobalMutex(lsm_env * pEnv)79 static int holdingGlobalMutex(lsm_env *pEnv){
80   lsm_mutex *p;
81   lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
82   return lsmMutexHeld(pEnv, p);
83 }
84 #endif
85 
86 #if 0
87 static void assertNotInFreelist(Freelist *p, int iBlk){
88   int i;
89   for(i=0; i<p->nEntry; i++){
90     assert( p->aEntry[i].iBlk!=iBlk );
91   }
92 }
93 #else
94 # define assertNotInFreelist(x,y)
95 #endif
96 
97 /*
98 ** Append an entry to the free-list. If (iId==-1), this is a delete.
99 */
freelistAppend(lsm_db * db,u32 iBlk,i64 iId)100 int freelistAppend(lsm_db *db, u32 iBlk, i64 iId){
101   lsm_env *pEnv = db->pEnv;
102   Freelist *p;
103   int i;
104 
105   assert( iId==-1 || iId>=0 );
106   p = db->bUseFreelist ? db->pFreelist : &db->pWorker->freelist;
107 
108   /* Extend the space allocated for the freelist, if required */
109   assert( p->nAlloc>=p->nEntry );
110   if( p->nAlloc==p->nEntry ){
111     int nNew;
112     int nByte;
113     FreelistEntry *aNew;
114 
115     nNew = (p->nAlloc==0 ? 4 : p->nAlloc*2);
116     nByte = sizeof(FreelistEntry) * nNew;
117     aNew = (FreelistEntry *)lsmRealloc(pEnv, p->aEntry, nByte);
118     if( !aNew ) return LSM_NOMEM_BKPT;
119     p->nAlloc = nNew;
120     p->aEntry = aNew;
121   }
122 
123   for(i=0; i<p->nEntry; i++){
124     assert( i==0 || p->aEntry[i].iBlk > p->aEntry[i-1].iBlk );
125     if( p->aEntry[i].iBlk>=iBlk ) break;
126   }
127 
128   if( i<p->nEntry && p->aEntry[i].iBlk==iBlk ){
129     /* Clobber an existing entry */
130     p->aEntry[i].iId = iId;
131   }else{
132     /* Insert a new entry into the list */
133     int nByte = sizeof(FreelistEntry)*(p->nEntry-i);
134     memmove(&p->aEntry[i+1], &p->aEntry[i], nByte);
135     p->aEntry[i].iBlk = iBlk;
136     p->aEntry[i].iId = iId;
137     p->nEntry++;
138   }
139 
140   return LSM_OK;
141 }
142 
143 /*
144 ** This function frees all resources held by the Database structure passed
145 ** as the only argument.
146 */
freeDatabase(lsm_env * pEnv,Database * p)147 static void freeDatabase(lsm_env *pEnv, Database *p){
148   assert( holdingGlobalMutex(pEnv) );
149   if( p ){
150     /* Free the mutexes */
151     lsmMutexDel(pEnv, p->pClientMutex);
152 
153     if( p->pFile ){
154       lsmEnvClose(pEnv, p->pFile);
155     }
156 
157     /* Free the array of shm pointers */
158     lsmFree(pEnv, p->apShmChunk);
159 
160     /* Free the memory allocated for the Database struct itself */
161     lsmFree(pEnv, p);
162   }
163 }
164 
165 typedef struct DbTruncateCtx DbTruncateCtx;
166 struct DbTruncateCtx {
167   int nBlock;
168   i64 iInUse;
169 };
170 
dbTruncateCb(void * pCtx,int iBlk,i64 iSnapshot)171 static int dbTruncateCb(void *pCtx, int iBlk, i64 iSnapshot){
172   DbTruncateCtx *p = (DbTruncateCtx *)pCtx;
173   if( iBlk!=p->nBlock || (p->iInUse>=0 && iSnapshot>=p->iInUse) ) return 1;
174   p->nBlock--;
175   return 0;
176 }
177 
dbTruncate(lsm_db * pDb,i64 iInUse)178 static int dbTruncate(lsm_db *pDb, i64 iInUse){
179   int rc = LSM_OK;
180 #if 0
181   int i;
182   DbTruncateCtx ctx;
183 
184   assert( pDb->pWorker );
185   ctx.nBlock = pDb->pWorker->nBlock;
186   ctx.iInUse = iInUse;
187 
188   rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx);
189   for(i=ctx.nBlock+1; rc==LSM_OK && i<=pDb->pWorker->nBlock; i++){
190     rc = freelistAppend(pDb, i, -1);
191   }
192 
193   if( rc==LSM_OK ){
194 #ifdef LSM_LOG_FREELIST
195     if( ctx.nBlock!=pDb->pWorker->nBlock ){
196       lsmLogMessage(pDb, 0,
197           "dbTruncate(): truncated db to %d blocks",ctx.nBlock
198       );
199     }
200 #endif
201     pDb->pWorker->nBlock = ctx.nBlock;
202   }
203 #endif
204   return rc;
205 }
206 
207 
208 /*
209 ** This function is called during database shutdown (when the number of
210 ** connections drops from one to zero). It truncates the database file
211 ** to as small a size as possible without truncating away any blocks that
212 ** contain data.
213 */
dbTruncateFile(lsm_db * pDb)214 static int dbTruncateFile(lsm_db *pDb){
215   int rc;
216 
217   assert( pDb->pWorker==0 );
218   assert( lsmShmAssertLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL) );
219   rc = lsmCheckpointLoadWorker(pDb);
220 
221   if( rc==LSM_OK ){
222     DbTruncateCtx ctx;
223 
224     /* Walk the database free-block-list in reverse order. Set ctx.nBlock
225     ** to the block number of the last block in the database that actually
226     ** contains data. */
227     ctx.nBlock = pDb->pWorker->nBlock;
228     ctx.iInUse = -1;
229     rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx);
230 
231     /* If the last block that contains data is not already the last block in
232     ** the database file, truncate the database file so that it is. */
233     if( rc==LSM_OK ){
234       rc = lsmFsTruncateDb(
235           pDb->pFS, (i64)ctx.nBlock*lsmFsBlockSize(pDb->pFS)
236       );
237     }
238   }
239 
240   lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
241   pDb->pWorker = 0;
242   return rc;
243 }
244 
doDbDisconnect(lsm_db * pDb)245 static void doDbDisconnect(lsm_db *pDb){
246   int rc;
247 
248   if( pDb->bReadonly ){
249     lsmShmLock(pDb, LSM_LOCK_DMS3, LSM_LOCK_UNLOCK, 0);
250   }else{
251     /* Block for an exclusive lock on DMS1. This lock serializes all calls
252     ** to doDbConnect() and doDbDisconnect() across all processes.  */
253     rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
254     if( rc==LSM_OK ){
255 
256       lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0);
257 
258       /* Try an exclusive lock on DMS2. If successful, this is the last
259       ** connection to the database. In this case flush the contents of the
260       ** in-memory tree to disk and write a checkpoint.  */
261       rc = lsmShmTestLock(pDb, LSM_LOCK_DMS2, 1, LSM_LOCK_EXCL);
262       if( rc==LSM_OK ){
263         rc = lsmShmTestLock(pDb, LSM_LOCK_CHECKPOINTER, 1, LSM_LOCK_EXCL);
264       }
265       if( rc==LSM_OK ){
266         int bReadonly = 0;        /* True if there exist read-only conns. */
267 
268         /* Flush the in-memory tree, if required. If there is data to flush,
269         ** this will create a new client snapshot in Database.pClient. The
270         ** checkpoint (serialization) of this snapshot may be written to disk
271         ** by the following block.
272         **
273         ** There is no need to take a WRITER lock here. That there are no
274         ** other locks on DMS2 guarantees that there are no other read-write
275         ** connections at this time (and the lock on DMS1 guarantees that
276         ** no new ones may appear).
277         */
278         rc = lsmTreeLoadHeader(pDb, 0);
279         if( rc==LSM_OK && (lsmTreeHasOld(pDb) || lsmTreeSize(pDb)>0) ){
280           rc = lsmFlushTreeToDisk(pDb);
281         }
282 
283         /* Now check if there are any read-only connections. If there are,
284         ** then do not truncate the db file or unlink the shared-memory
285         ** region.  */
286         if( rc==LSM_OK ){
287           rc = lsmShmTestLock(pDb, LSM_LOCK_DMS3, 1, LSM_LOCK_EXCL);
288           if( rc==LSM_BUSY ){
289             bReadonly = 1;
290             rc = LSM_OK;
291           }
292         }
293 
294         /* Write a checkpoint to disk. */
295         if( rc==LSM_OK ){
296           rc = lsmCheckpointWrite(pDb, 0);
297         }
298 
299         /* If the checkpoint was written successfully, delete the log file
300         ** and, if possible, truncate the database file.  */
301         if( rc==LSM_OK ){
302           int bRotrans = 0;
303           Database *p = pDb->pDatabase;
304 
305           /* The log file may only be deleted if there are no clients
306           ** read-only clients running rotrans transactions.  */
307           rc = lsmDetectRoTrans(pDb, &bRotrans);
308           if( rc==LSM_OK && bRotrans==0 ){
309             lsmFsCloseAndDeleteLog(pDb->pFS);
310           }
311 
312           /* The database may only be truncated if there exist no read-only
313           ** clients - either connected or running rotrans transactions. */
314           if( bReadonly==0 && bRotrans==0 ){
315             lsmFsUnmap(pDb->pFS);
316             dbTruncateFile(pDb);
317             if( p->pFile && p->bMultiProc ){
318               lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1);
319             }
320           }
321         }
322       }
323     }
324 
325     if( pDb->iRwclient>=0 ){
326       lsmShmLock(pDb, LSM_LOCK_RWCLIENT(pDb->iRwclient), LSM_LOCK_UNLOCK, 0);
327       pDb->iRwclient = -1;
328     }
329 
330     lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
331   }
332   pDb->pShmhdr = 0;
333 }
334 
doDbConnect(lsm_db * pDb)335 static int doDbConnect(lsm_db *pDb){
336   const int nUsMax = 100000;      /* Max value for nUs */
337   int nUs = 1000;                 /* us to wait between DMS1 attempts */
338   int rc;
339 
340   /* Obtain a pointer to the shared-memory header */
341   assert( pDb->pShmhdr==0 );
342   assert( pDb->bReadonly==0 );
343 
344   /* Block for an exclusive lock on DMS1. This lock serializes all calls
345   ** to doDbConnect() and doDbDisconnect() across all processes.  */
346   while( 1 ){
347     rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
348     if( rc!=LSM_BUSY ) break;
349     lsmEnvSleep(pDb->pEnv, nUs);
350     nUs = nUs * 2;
351     if( nUs>nUsMax ) nUs = nUsMax;
352   }
353   if( rc==LSM_OK ){
354     rc = lsmShmCacheChunks(pDb, 1);
355   }
356   if( rc!=LSM_OK ) return rc;
357   pDb->pShmhdr = (ShmHeader *)pDb->apShm[0];
358 
359   /* Try an exclusive lock on DMS2/DMS3. If successful, this is the first
360   ** and only connection to the database. In this case initialize the
361   ** shared-memory and run log file recovery.  */
362   assert( LSM_LOCK_DMS3==1+LSM_LOCK_DMS2 );
363   rc = lsmShmTestLock(pDb, LSM_LOCK_DMS2, 2, LSM_LOCK_EXCL);
364   if( rc==LSM_OK ){
365     memset(pDb->pShmhdr, 0, sizeof(ShmHeader));
366     rc = lsmCheckpointRecover(pDb);
367     if( rc==LSM_OK ){
368       rc = lsmLogRecover(pDb);
369     }
370     if( rc==LSM_OK ){
371       ShmHeader *pShm = pDb->pShmhdr;
372       pShm->aReader[0].iLsmId = lsmCheckpointId(pShm->aSnap1, 0);
373       pShm->aReader[0].iTreeId = pDb->treehdr.iUsedShmid;
374     }
375   }else if( rc==LSM_BUSY ){
376     rc = LSM_OK;
377   }
378 
379   /* Take a shared lock on DMS2. In multi-process mode this lock "cannot"
380   ** fail, as connections may only hold an exclusive lock on DMS2 if they
381   ** first hold an exclusive lock on DMS1. And this connection is currently
382   ** holding the exclusive lock on DSM1.
383   **
384   ** However, if some other connection has the database open in single-process
385   ** mode, this operation will fail. In this case, return the error to the
386   ** caller - the attempt to connect to the db has failed.
387   */
388   if( rc==LSM_OK ){
389     rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_SHARED, 0);
390   }
391 
392   /* If anything went wrong, unlock DMS2. Otherwise, try to take an exclusive
393   ** lock on one of the LSM_LOCK_RWCLIENT() locks. Unlock DMS1 in any case. */
394   if( rc!=LSM_OK ){
395     pDb->pShmhdr = 0;
396   }else{
397     int i;
398     for(i=0; i<LSM_LOCK_NRWCLIENT; i++){
399       int rc2 = lsmShmLock(pDb, LSM_LOCK_RWCLIENT(i), LSM_LOCK_EXCL, 0);
400       if( rc2==LSM_OK ) pDb->iRwclient = i;
401       if( rc2!=LSM_BUSY ){
402         rc = rc2;
403         break;
404       }
405     }
406   }
407   lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
408 
409   return rc;
410 }
411 
dbOpenSharedFd(lsm_env * pEnv,Database * p,int bRoOk)412 static int dbOpenSharedFd(lsm_env *pEnv, Database *p, int bRoOk){
413   int rc;
414 
415   rc = lsmEnvOpen(pEnv, p->zName, 0, &p->pFile);
416   if( rc==LSM_IOERR && bRoOk ){
417     rc = lsmEnvOpen(pEnv, p->zName, LSM_OPEN_READONLY, &p->pFile);
418     p->bReadonly = 1;
419   }
420 
421   return rc;
422 }
423 
424 /*
425 ** Return a reference to the shared Database handle for the database
426 ** identified by canonical path zName. If this is the first connection to
427 ** the named database, a new Database object is allocated. Otherwise, a
428 ** pointer to an existing object is returned.
429 **
430 ** If successful, *ppDatabase is set to point to the shared Database
431 ** structure and LSM_OK returned. Otherwise, *ppDatabase is set to NULL
432 ** and and LSM error code returned.
433 **
434 ** Each successful call to this function should be (eventually) matched
435 ** by a call to lsmDbDatabaseRelease().
436 */
lsmDbDatabaseConnect(lsm_db * pDb,const char * zName)437 int lsmDbDatabaseConnect(
438   lsm_db *pDb,                    /* Database handle */
439   const char *zName               /* Full-path to db file */
440 ){
441   lsm_env *pEnv = pDb->pEnv;
442   int rc;                         /* Return code */
443   Database *p = 0;                /* Pointer returned via *ppDatabase */
444   int nName = lsmStrlen(zName);
445 
446   assert( pDb->pDatabase==0 );
447   rc = enterGlobalMutex(pEnv);
448   if( rc==LSM_OK ){
449 
450     /* Search the global list for an existing object. TODO: Need something
451     ** better than the memcmp() below to figure out if a given Database
452     ** object represents the requested file.  */
453     for(p=gShared.pDatabase; p; p=p->pDbNext){
454       if( nName==p->nName && 0==memcmp(zName, p->zName, nName) ) break;
455     }
456 
457     /* If no suitable Database object was found, allocate a new one. */
458     if( p==0 ){
459       p = (Database *)lsmMallocZeroRc(pEnv, sizeof(Database)+nName+1, &rc);
460 
461       /* If the allocation was successful, fill in other fields and
462       ** allocate the client mutex. */
463       if( rc==LSM_OK ){
464         p->bMultiProc = pDb->bMultiProc;
465         p->zName = (char *)&p[1];
466         p->nName = nName;
467         memcpy((void *)p->zName, zName, nName+1);
468         rc = lsmMutexNew(pEnv, &p->pClientMutex);
469       }
470 
471       /* If nothing has gone wrong so far, open the shared fd. And if that
472       ** succeeds and this connection requested single-process mode,
473       ** attempt to take the exclusive lock on DMS2.  */
474       if( rc==LSM_OK ){
475         int bReadonly = (pDb->bReadonly && pDb->bMultiProc);
476         rc = dbOpenSharedFd(pDb->pEnv, p, bReadonly);
477       }
478 
479       if( rc==LSM_OK && p->bMultiProc==0 ){
480         /* Hold an exclusive lock DMS1 while grabbing DMS2. This ensures
481         ** that any ongoing call to doDbDisconnect() (even one in another
482         ** process) is finished before proceeding.  */
483         assert( p->bReadonly==0 );
484         rc = lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS1, LSM_LOCK_EXCL);
485         if( rc==LSM_OK ){
486           rc = lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS2, LSM_LOCK_EXCL);
487           lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK);
488         }
489       }
490 
491       if( rc==LSM_OK ){
492         p->pDbNext = gShared.pDatabase;
493         gShared.pDatabase = p;
494       }else{
495         freeDatabase(pEnv, p);
496         p = 0;
497       }
498     }
499 
500     if( p ){
501       p->nDbRef++;
502     }
503     leaveGlobalMutex(pEnv);
504 
505     if( p ){
506       lsmMutexEnter(pDb->pEnv, p->pClientMutex);
507       pDb->pNext = p->pConn;
508       p->pConn = pDb;
509       lsmMutexLeave(pDb->pEnv, p->pClientMutex);
510     }
511   }
512 
513   pDb->pDatabase = p;
514   if( rc==LSM_OK ){
515     assert( p );
516     rc = lsmFsOpen(pDb, zName, p->bReadonly);
517   }
518 
519   /* If the db handle is read-write, then connect to the system now. Run
520   ** recovery as necessary. Or, if this is a read-only database handle,
521   ** defer attempting to connect to the system until a read-transaction
522   ** is opened.  */
523   if( rc==LSM_OK ){
524     rc = lsmFsConfigure(pDb);
525   }
526   if( rc==LSM_OK && pDb->bReadonly==0 ){
527     rc = doDbConnect(pDb);
528   }
529 
530   return rc;
531 }
532 
dbDeferClose(lsm_db * pDb)533 static void dbDeferClose(lsm_db *pDb){
534   if( pDb->pFS ){
535     LsmFile *pLsmFile;
536     Database *p = pDb->pDatabase;
537     pLsmFile = lsmFsDeferClose(pDb->pFS);
538     pLsmFile->pNext = p->pLsmFile;
539     p->pLsmFile = pLsmFile;
540   }
541 }
542 
lsmDbRecycleFd(lsm_db * db)543 LsmFile *lsmDbRecycleFd(lsm_db *db){
544   LsmFile *pRet;
545   Database *p = db->pDatabase;
546   lsmMutexEnter(db->pEnv, p->pClientMutex);
547   if( (pRet = p->pLsmFile)!=0 ){
548     p->pLsmFile = pRet->pNext;
549   }
550   lsmMutexLeave(db->pEnv, p->pClientMutex);
551   return pRet;
552 }
553 
554 /*
555 ** Release a reference to a Database object obtained from
556 ** lsmDbDatabaseConnect(). There should be exactly one call to this function
557 ** for each successful call to Find().
558 */
lsmDbDatabaseRelease(lsm_db * pDb)559 void lsmDbDatabaseRelease(lsm_db *pDb){
560   Database *p = pDb->pDatabase;
561   if( p ){
562     lsm_db **ppDb;
563 
564     if( pDb->pShmhdr ){
565       doDbDisconnect(pDb);
566     }
567 
568     lsmFsUnmap(pDb->pFS);
569     lsmMutexEnter(pDb->pEnv, p->pClientMutex);
570     for(ppDb=&p->pConn; *ppDb!=pDb; ppDb=&((*ppDb)->pNext));
571     *ppDb = pDb->pNext;
572     dbDeferClose(pDb);
573     lsmMutexLeave(pDb->pEnv, p->pClientMutex);
574 
575     enterGlobalMutex(pDb->pEnv);
576     p->nDbRef--;
577     if( p->nDbRef==0 ){
578       LsmFile *pIter;
579       LsmFile *pNext;
580       Database **pp;
581 
582       /* Remove the Database structure from the linked list. */
583       for(pp=&gShared.pDatabase; *pp!=p; pp=&((*pp)->pDbNext));
584       *pp = p->pDbNext;
585 
586       /* If they were allocated from the heap, free the shared memory chunks */
587       if( p->bMultiProc==0 ){
588         int i;
589         for(i=0; i<p->nShmChunk; i++){
590           lsmFree(pDb->pEnv, p->apShmChunk[i]);
591         }
592       }
593 
594       /* Close any outstanding file descriptors */
595       for(pIter=p->pLsmFile; pIter; pIter=pNext){
596         pNext = pIter->pNext;
597         lsmEnvClose(pDb->pEnv, pIter->pFile);
598         lsmFree(pDb->pEnv, pIter);
599       }
600       freeDatabase(pDb->pEnv, p);
601     }
602     leaveGlobalMutex(pDb->pEnv);
603   }
604 }
605 
lsmDbSnapshotLevel(Snapshot * pSnapshot)606 Level *lsmDbSnapshotLevel(Snapshot *pSnapshot){
607   return pSnapshot->pLevel;
608 }
609 
lsmDbSnapshotSetLevel(Snapshot * pSnap,Level * pLevel)610 void lsmDbSnapshotSetLevel(Snapshot *pSnap, Level *pLevel){
611   pSnap->pLevel = pLevel;
612 }
613 
614 /* TODO: Shuffle things around to get rid of this */
615 static int firstSnapshotInUse(lsm_db *, i64 *);
616 
617 /*
618 ** Context object used by the lsmWalkFreelist() utility.
619 */
620 typedef struct WalkFreelistCtx WalkFreelistCtx;
621 struct WalkFreelistCtx {
622   lsm_db *pDb;
623   int bReverse;
624   Freelist *pFreelist;
625   int iFree;
626   int (*xUsr)(void *, int, i64);  /* User callback function */
627   void *pUsrctx;                  /* User callback context */
628   int bDone;                      /* Set to true after xUsr() returns true */
629 };
630 
631 /*
632 ** Callback used by lsmWalkFreelist().
633 */
walkFreelistCb(void * pCtx,int iBlk,i64 iSnapshot)634 static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
635   WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
636   const int iDir = (p->bReverse ? -1 : 1);
637   Freelist *pFree = p->pFreelist;
638 
639   assert( p->bDone==0 );
640   assert( iBlk>=0 );
641   if( pFree ){
642     while( (p->iFree < pFree->nEntry) && p->iFree>=0 ){
643       FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
644       if( (p->bReverse==0 && pEntry->iBlk>(u32)iBlk)
645        || (p->bReverse!=0 && pEntry->iBlk<(u32)iBlk)
646       ){
647         break;
648       }else{
649         p->iFree += iDir;
650         if( pEntry->iId>=0
651             && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId)
652           ){
653           p->bDone = 1;
654           return 1;
655         }
656         if( pEntry->iBlk==(u32)iBlk ) return 0;
657       }
658     }
659   }
660 
661   if( p->xUsr(p->pUsrctx, iBlk, iSnapshot) ){
662     p->bDone = 1;
663     return 1;
664   }
665   return 0;
666 }
667 
668 /*
669 ** The database handle passed as the first argument must be the worker
670 ** connection. This function iterates through the contents of the current
671 ** free block list, invoking the supplied callback once for each list
672 ** element.
673 **
674 ** The difference between this function and lsmSortedWalkFreelist() is
675 ** that lsmSortedWalkFreelist() only considers those free-list elements
676 ** stored within the LSM. This function also merges in any in-memory
677 ** elements.
678 */
lsmWalkFreelist(lsm_db * pDb,int bReverse,int (* x)(void *,int,i64),void * pCtx)679 int lsmWalkFreelist(
680   lsm_db *pDb,                    /* Database handle (must be worker) */
681   int bReverse,                   /* True to iterate from largest to smallest */
682   int (*x)(void *, int, i64),     /* Callback function */
683   void *pCtx                      /* First argument to pass to callback */
684 ){
685   const int iDir = (bReverse ? -1 : 1);
686   int rc;
687   int iCtx;
688 
689   WalkFreelistCtx ctx[2];
690 
691   ctx[0].pDb = pDb;
692   ctx[0].bReverse = bReverse;
693   ctx[0].pFreelist = &pDb->pWorker->freelist;
694   if( ctx[0].pFreelist && bReverse ){
695     ctx[0].iFree = ctx[0].pFreelist->nEntry-1;
696   }else{
697     ctx[0].iFree = 0;
698   }
699   ctx[0].xUsr = walkFreelistCb;
700   ctx[0].pUsrctx = (void *)&ctx[1];
701   ctx[0].bDone = 0;
702 
703   ctx[1].pDb = pDb;
704   ctx[1].bReverse = bReverse;
705   ctx[1].pFreelist = pDb->pFreelist;
706   if( ctx[1].pFreelist && bReverse ){
707     ctx[1].iFree = ctx[1].pFreelist->nEntry-1;
708   }else{
709     ctx[1].iFree = 0;
710   }
711   ctx[1].xUsr = x;
712   ctx[1].pUsrctx = pCtx;
713   ctx[1].bDone = 0;
714 
715   rc = lsmSortedWalkFreelist(pDb, bReverse, walkFreelistCb, (void *)&ctx[0]);
716 
717   if( ctx[0].bDone==0 ){
718     for(iCtx=0; iCtx<2; iCtx++){
719       int i;
720       WalkFreelistCtx *p = &ctx[iCtx];
721       for(i=p->iFree;
722           p->pFreelist && rc==LSM_OK && i<p->pFreelist->nEntry && i>=0;
723           i += iDir
724          ){
725         FreelistEntry *pEntry = &p->pFreelist->aEntry[i];
726         if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){
727           return LSM_OK;
728         }
729       }
730     }
731   }
732 
733   return rc;
734 }
735 
736 
737 typedef struct FindFreeblockCtx FindFreeblockCtx;
738 struct FindFreeblockCtx {
739   i64 iInUse;
740   int iRet;
741   int bNotOne;
742 };
743 
findFreeblockCb(void * pCtx,int iBlk,i64 iSnapshot)744 static int findFreeblockCb(void *pCtx, int iBlk, i64 iSnapshot){
745   FindFreeblockCtx *p = (FindFreeblockCtx *)pCtx;
746   if( iSnapshot<p->iInUse && (iBlk!=1 || p->bNotOne==0) ){
747     p->iRet = iBlk;
748     return 1;
749   }
750   return 0;
751 }
752 
findFreeblock(lsm_db * pDb,i64 iInUse,int bNotOne,int * piRet)753 static int findFreeblock(lsm_db *pDb, i64 iInUse, int bNotOne, int *piRet){
754   int rc;                         /* Return code */
755   FindFreeblockCtx ctx;           /* Context object */
756 
757   ctx.iInUse = iInUse;
758   ctx.iRet = 0;
759   ctx.bNotOne = bNotOne;
760   rc = lsmWalkFreelist(pDb, 0, findFreeblockCb, (void *)&ctx);
761   *piRet = ctx.iRet;
762 
763   return rc;
764 }
765 
766 /*
767 ** Allocate a new database file block to write data to, either by extending
768 ** the database file or by recycling a free-list entry. The worker snapshot
769 ** must be held in order to call this function.
770 **
771 ** If successful, *piBlk is set to the block number allocated and LSM_OK is
772 ** returned. Otherwise, *piBlk is zeroed and an lsm error code returned.
773 */
lsmBlockAllocate(lsm_db * pDb,int iBefore,int * piBlk)774 int lsmBlockAllocate(lsm_db *pDb, int iBefore, int *piBlk){
775   Snapshot *p = pDb->pWorker;
776   int iRet = 0;                   /* Block number of allocated block */
777   int rc = LSM_OK;
778   i64 iInUse = 0;                 /* Snapshot id still in use */
779   i64 iSynced = 0;                /* Snapshot id synced to disk */
780 
781   assert( p );
782 
783 #ifdef LSM_LOG_FREELIST
784   {
785     static int nCall = 0;
786     char *zFree = 0;
787     nCall++;
788     rc = lsmInfoFreelist(pDb, &zFree);
789     if( rc!=LSM_OK ) return rc;
790     lsmLogMessage(pDb, 0, "lsmBlockAllocate(): %d freelist: %s", nCall, zFree);
791     lsmFree(pDb->pEnv, zFree);
792   }
793 #endif
794 
795   /* Set iInUse to the smallest snapshot id that is either:
796   **
797   **   * Currently in use by a database client,
798   **   * May be used by a database client in the future, or
799   **   * Is the most recently checkpointed snapshot (i.e. the one that will
800   **     be used following recovery if a failure occurs at this point).
801   */
802   rc = lsmCheckpointSynced(pDb, &iSynced, 0, 0);
803   if( rc==LSM_OK && iSynced==0 ) iSynced = p->iId;
804   iInUse = iSynced;
805   if( rc==LSM_OK && pDb->iReader>=0 ){
806     assert( pDb->pClient );
807     iInUse = LSM_MIN(iInUse, pDb->pClient->iId);
808   }
809   if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);
810 
811 #ifdef LSM_LOG_FREELIST
812   {
813     lsmLogMessage(pDb, 0, "lsmBlockAllocate(): "
814         "snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)",
815         iInUse, iSynced, (pDb->iReader>=0 ? pDb->pClient->iId : 0)
816     );
817   }
818 #endif
819 
820 
821   /* Unless there exists a read-only transaction (which prevents us from
822   ** recycling any blocks regardless, query the free block list for a
823   ** suitable block to reuse.
824   **
825   ** It might seem more natural to check for a read-only transaction at
826   ** the start of this function. However, it is better do wait until after
827   ** the call to lsmCheckpointSynced() to do so.
828   */
829   if( rc==LSM_OK ){
830     int bRotrans;
831     rc = lsmDetectRoTrans(pDb, &bRotrans);
832 
833     if( rc==LSM_OK && bRotrans==0 ){
834       rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet);
835     }
836   }
837 
838   if( iBefore>0 && (iRet<=0 || iRet>=iBefore) ){
839     iRet = 0;
840 
841   }else if( rc==LSM_OK ){
842     /* If a block was found in the free block list, use it and remove it from
843     ** the list. Otherwise, if no suitable block was found, allocate one from
844     ** the end of the file.  */
845     if( iRet>0 ){
846 #ifdef LSM_LOG_FREELIST
847       lsmLogMessage(pDb, 0,
848           "reusing block %d (snapshot-in-use=%lld)", iRet, iInUse);
849 #endif
850       rc = freelistAppend(pDb, iRet, -1);
851       if( rc==LSM_OK ){
852         rc = dbTruncate(pDb, iInUse);
853       }
854     }else{
855       iRet = ++(p->nBlock);
856 #ifdef LSM_LOG_FREELIST
857       lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
858 #endif
859     }
860   }
861 
862   assert( iBefore>0 || iRet>0 || rc!=LSM_OK );
863   *piBlk = iRet;
864   return rc;
865 }
866 
867 /*
868 ** Free a database block. The worker snapshot must be held in order to call
869 ** this function.
870 **
871 ** If successful, LSM_OK is returned. Otherwise, an lsm error code (e.g.
872 ** LSM_NOMEM).
873 */
lsmBlockFree(lsm_db * pDb,int iBlk)874 int lsmBlockFree(lsm_db *pDb, int iBlk){
875   Snapshot *p = pDb->pWorker;
876   assert( lsmShmAssertWorker(pDb) );
877 
878 #ifdef LSM_LOG_FREELIST
879   lsmLogMessage(pDb, LSM_OK, "lsmBlockFree(): Free block %d", iBlk);
880 #endif
881 
882   return freelistAppend(pDb, iBlk, p->iId);
883 }
884 
885 /*
886 ** Refree a database block. The worker snapshot must be held in order to call
887 ** this function.
888 **
889 ** Refreeing is required when a block is allocated using lsmBlockAllocate()
890 ** but then not used. This function is used to push the block back onto
891 ** the freelist. Refreeing a block is different from freeing is, as a refreed
892 ** block may be reused immediately. Whereas a freed block can not be reused
893 ** until (at least) after the next checkpoint.
894 */
lsmBlockRefree(lsm_db * pDb,int iBlk)895 int lsmBlockRefree(lsm_db *pDb, int iBlk){
896   int rc = LSM_OK;                /* Return code */
897 
898 #ifdef LSM_LOG_FREELIST
899   lsmLogMessage(pDb, LSM_OK, "lsmBlockRefree(): Refree block %d", iBlk);
900 #endif
901 
902   rc = freelistAppend(pDb, iBlk, 0);
903   return rc;
904 }
905 
906 /*
907 ** If required, copy a database checkpoint from shared memory into the
908 ** database itself.
909 **
910 ** The WORKER lock must not be held when this is called. This is because
911 ** this function may indirectly call fsync(). And the WORKER lock should
912 ** not be held that long (in case it is required by a client flushing an
913 ** in-memory tree to disk).
914 */
lsmCheckpointWrite(lsm_db * pDb,u32 * pnWrite)915 int lsmCheckpointWrite(lsm_db *pDb, u32 *pnWrite){
916   int rc;                         /* Return Code */
917   u32 nWrite = 0;
918 
919   assert( pDb->pWorker==0 );
920   assert( 1 || pDb->pClient==0 );
921   assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );
922 
923   rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0);
924   if( rc!=LSM_OK ) return rc;
925 
926   rc = lsmCheckpointLoad(pDb, 0);
927   if( rc==LSM_OK ){
928     int nBlock = lsmCheckpointNBlock(pDb->aSnapshot);
929     ShmHeader *pShm = pDb->pShmhdr;
930     int bDone = 0;                /* True if checkpoint is already stored */
931 
932     /* Check if this checkpoint has already been written to the database
933     ** file. If so, set variable bDone to true.  */
934     if( pShm->iMetaPage ){
935       MetaPage *pPg;              /* Meta page */
936       u8 *aData;                  /* Meta-page data buffer */
937       int nData;                  /* Size of aData[] in bytes */
938       i64 iCkpt;                  /* Id of checkpoint just loaded */
939       i64 iDisk = 0;              /* Id of checkpoint already stored in db */
940       iCkpt = lsmCheckpointId(pDb->aSnapshot, 0);
941       rc = lsmFsMetaPageGet(pDb->pFS, 0, pShm->iMetaPage, &pPg);
942       if( rc==LSM_OK ){
943         aData = lsmFsMetaPageData(pPg, &nData);
944         iDisk = lsmCheckpointId((u32 *)aData, 1);
945         nWrite = lsmCheckpointNWrite((u32 *)aData, 1);
946         lsmFsMetaPageRelease(pPg);
947       }
948       bDone = (iDisk>=iCkpt);
949     }
950 
951     if( rc==LSM_OK && bDone==0 ){
952       int iMeta = (pShm->iMetaPage % 2) + 1;
953       if( pDb->eSafety!=LSM_SAFETY_OFF ){
954         rc = lsmFsSyncDb(pDb->pFS, nBlock);
955       }
956       if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
957       if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
958         rc = lsmFsSyncDb(pDb->pFS, 0);
959       }
960       if( rc==LSM_OK ){
961         pShm->iMetaPage = iMeta;
962         nWrite = lsmCheckpointNWrite(pDb->aSnapshot, 0) - nWrite;
963       }
964 #ifdef LSM_LOG_WORK
965       lsmLogMessage(pDb, 0, "finish checkpoint %d",
966           (int)lsmCheckpointId(pDb->aSnapshot, 0)
967       );
968 #endif
969     }
970   }
971 
972   lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
973   if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite;
974   return rc;
975 }
976 
lsmBeginWork(lsm_db * pDb)977 int lsmBeginWork(lsm_db *pDb){
978   int rc;
979 
980   /* Attempt to take the WORKER lock */
981   rc = lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL, 0);
982 
983   /* Deserialize the current worker snapshot */
984   if( rc==LSM_OK ){
985     rc = lsmCheckpointLoadWorker(pDb);
986   }
987   return rc;
988 }
989 
lsmFreeSnapshot(lsm_env * pEnv,Snapshot * p)990 void lsmFreeSnapshot(lsm_env *pEnv, Snapshot *p){
991   if( p ){
992     lsmSortedFreeLevel(pEnv, p->pLevel);
993     lsmFree(pEnv, p->freelist.aEntry);
994     lsmFree(pEnv, p->redirect.a);
995     lsmFree(pEnv, p);
996   }
997 }
998 
999 /*
1000 ** Attempt to populate one of the read-lock slots to contain lock values
1001 ** iLsm/iShm. Or, if such a slot exists already, this function is a no-op.
1002 **
1003 ** It is not an error if no slot can be populated because the write-lock
1004 ** cannot be obtained. If any other error occurs, return an LSM error code.
1005 ** Otherwise, LSM_OK.
1006 **
1007 ** This function is called at various points to try to ensure that there
1008 ** always exists at least one read-lock slot that can be used by a read-only
1009 ** client. And so that, in the usual case, there is an "exact match" available
1010 ** whenever a read transaction is opened by any client. At present this
1011 ** function is called when:
1012 **
1013 **    * A write transaction that called lsmTreeDiscardOld() is committed, and
1014 **    * Whenever the working snapshot is updated (i.e. lsmFinishWork()).
1015 */
dbSetReadLock(lsm_db * db,i64 iLsm,u32 iShm)1016 static int dbSetReadLock(lsm_db *db, i64 iLsm, u32 iShm){
1017   int rc = LSM_OK;
1018   ShmHeader *pShm = db->pShmhdr;
1019   int i;
1020 
1021   /* Check if there is already a slot containing the required values. */
1022   for(i=0; i<LSM_LOCK_NREADER; i++){
1023     ShmReader *p = &pShm->aReader[i];
1024     if( p->iLsmId==iLsm && p->iTreeId==iShm ) return LSM_OK;
1025   }
1026 
1027   /* Iterate through all read-lock slots, attempting to take a write-lock
1028   ** on each of them. If a write-lock succeeds, populate the locked slot
1029   ** with the required values and break out of the loop.  */
1030   for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
1031     rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
1032     if( rc==LSM_BUSY ){
1033       rc = LSM_OK;
1034     }else{
1035       ShmReader *p = &pShm->aReader[i];
1036       p->iLsmId = iLsm;
1037       p->iTreeId = iShm;
1038       lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
1039       break;
1040     }
1041   }
1042 
1043   return rc;
1044 }
1045 
1046 /*
1047 ** Release the read-lock currently held by connection db.
1048 */
dbReleaseReadlock(lsm_db * db)1049 int dbReleaseReadlock(lsm_db *db){
1050   int rc = LSM_OK;
1051   if( db->iReader>=0 ){
1052     rc = lsmShmLock(db, LSM_LOCK_READER(db->iReader), LSM_LOCK_UNLOCK, 0);
1053     db->iReader = -1;
1054   }
1055   db->bRoTrans = 0;
1056   return rc;
1057 }
1058 
1059 
1060 /*
1061 ** Argument bFlush is true if the contents of the in-memory tree has just
1062 ** been flushed to disk. The significance of this is that once the snapshot
1063 ** created to hold the updated state of the database is synced to disk, log
1064 ** file space can be recycled.
1065 */
lsmFinishWork(lsm_db * pDb,int bFlush,int * pRc)1066 void lsmFinishWork(lsm_db *pDb, int bFlush, int *pRc){
1067   int rc = *pRc;
1068   assert( rc!=0 || pDb->pWorker );
1069   if( pDb->pWorker ){
1070     /* If no error has occurred, serialize the worker snapshot and write
1071     ** it to shared memory.  */
1072     if( rc==LSM_OK ){
1073       rc = lsmSaveWorker(pDb, bFlush);
1074     }
1075 
1076     /* Assuming no error has occurred, update a read lock slot with the
1077     ** new snapshot id (see comments above function dbSetReadLock()).  */
1078     if( rc==LSM_OK ){
1079       if( pDb->iReader<0 ){
1080         rc = lsmTreeLoadHeader(pDb, 0);
1081       }
1082       if( rc==LSM_OK ){
1083         rc = dbSetReadLock(pDb, pDb->pWorker->iId, pDb->treehdr.iUsedShmid);
1084       }
1085     }
1086 
1087     /* Free the snapshot object. */
1088     lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
1089     pDb->pWorker = 0;
1090   }
1091 
1092   lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
1093   *pRc = rc;
1094 }
1095 
1096 /*
1097 ** Called when recovery is finished.
1098 */
lsmFinishRecovery(lsm_db * pDb)1099 int lsmFinishRecovery(lsm_db *pDb){
1100   lsmTreeEndTransaction(pDb, 1);
1101   return LSM_OK;
1102 }
1103 
1104 /*
1105 ** Check if the currently configured compression functions
1106 ** (LSM_CONFIG_SET_COMPRESSION) are compatible with a database that has its
1107 ** compression id set to iReq. Compression routines are compatible if iReq
1108 ** is zero (indicating the database is empty), or if it is equal to the
1109 ** compression id of the configured compression routines.
1110 **
1111 ** If the check shows that the current compression are incompatible and there
1112 ** is a compression factory registered, give it a chance to install new
1113 ** compression routines.
1114 **
1115 ** If, after any registered factory is invoked, the compression functions
1116 ** are still incompatible, return LSM_MISMATCH. Otherwise, LSM_OK.
1117 */
lsmCheckCompressionId(lsm_db * pDb,u32 iReq)1118 int lsmCheckCompressionId(lsm_db *pDb, u32 iReq){
1119   if( iReq!=LSM_COMPRESSION_EMPTY && pDb->compress.iId!=iReq ){
1120     if( pDb->factory.xFactory ){
1121       pDb->bInFactory = 1;
1122       pDb->factory.xFactory(pDb->factory.pCtx, pDb, iReq);
1123       pDb->bInFactory = 0;
1124     }
1125     if( pDb->compress.iId!=iReq ){
1126       /* Incompatible */
1127       return LSM_MISMATCH;
1128     }
1129   }
1130   /* Compatible */
1131   return LSM_OK;
1132 }
1133 
1134 /*
1135 ** Begin a read transaction. This function is a no-op if the connection
1136 ** passed as the only argument already has an open read transaction.
1137 */
lsmBeginReadTrans(lsm_db * pDb)1138 int lsmBeginReadTrans(lsm_db *pDb){
1139   const int MAX_READLOCK_ATTEMPTS = 10;
1140   const int nMaxAttempt = (pDb->bRoTrans ? 1 : MAX_READLOCK_ATTEMPTS);
1141 
1142   int rc = LSM_OK;                /* Return code */
1143   int iAttempt = 0;
1144 
1145   assert( pDb->pWorker==0 );
1146 
1147   while( rc==LSM_OK && pDb->iReader<0 && (iAttempt++)<nMaxAttempt ){
1148     int iTreehdr = 0;
1149     int iSnap = 0;
1150     assert( pDb->pCsr==0 && pDb->nTransOpen==0 );
1151 
1152     /* Load the in-memory tree header. */
1153     rc = lsmTreeLoadHeader(pDb, &iTreehdr);
1154 
1155     /* Load the database snapshot */
1156     if( rc==LSM_OK ){
1157       if( lsmCheckpointClientCacheOk(pDb)==0 ){
1158         lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
1159         pDb->pClient = 0;
1160         lsmMCursorFreeCache(pDb);
1161         lsmFsPurgeCache(pDb->pFS);
1162         rc = lsmCheckpointLoad(pDb, &iSnap);
1163       }else{
1164         iSnap = 1;
1165       }
1166     }
1167 
1168     /* Take a read-lock on the tree and snapshot just loaded. Then check
1169     ** that the shared-memory still contains the same values. If so, proceed.
1170     ** Otherwise, relinquish the read-lock and retry the whole procedure
1171     ** (starting with loading the in-memory tree header).  */
1172     if( rc==LSM_OK ){
1173       u32 iShmMax = pDb->treehdr.iUsedShmid;
1174       u32 iShmMin = pDb->treehdr.iNextShmid+1-LSM_MAX_SHMCHUNKS;
1175       rc = lsmReadlock(
1176           pDb, lsmCheckpointId(pDb->aSnapshot, 0), iShmMin, iShmMax
1177       );
1178       if( rc==LSM_OK ){
1179         if( lsmTreeLoadHeaderOk(pDb, iTreehdr)
1180          && lsmCheckpointLoadOk(pDb, iSnap)
1181         ){
1182           /* Read lock has been successfully obtained. Deserialize the
1183           ** checkpoint just loaded. TODO: This will be removed after
1184           ** lsm_sorted.c is changed to work directly from the serialized
1185           ** version of the snapshot.  */
1186           if( pDb->pClient==0 ){
1187             rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot,&pDb->pClient);
1188           }
1189           assert( (rc==LSM_OK)==(pDb->pClient!=0) );
1190           assert( pDb->iReader>=0 );
1191 
1192           /* Check that the client has the right compression hooks loaded.
1193           ** If not, set rc to LSM_MISMATCH.  */
1194           if( rc==LSM_OK ){
1195             rc = lsmCheckCompressionId(pDb, pDb->pClient->iCmpId);
1196           }
1197         }else{
1198           rc = dbReleaseReadlock(pDb);
1199         }
1200       }
1201 
1202       if( rc==LSM_BUSY ){
1203         rc = LSM_OK;
1204       }
1205     }
1206 #if 0
1207 if( rc==LSM_OK && pDb->pClient ){
1208   fprintf(stderr,
1209       "reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n",
1210       (void *)pDb,
1211       (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid,
1212       (int)pDb->treehdr.root.iTransId,
1213       (int)pDb->treehdr.iOldShmid
1214   );
1215 }
1216 #endif
1217   }
1218 
1219   if( rc==LSM_OK ){
1220     rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
1221   }
1222   if( rc!=LSM_OK ){
1223     dbReleaseReadlock(pDb);
1224   }
1225   if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;
1226   return rc;
1227 }
1228 
1229 /*
1230 ** This function is used by a read-write connection to determine if there
1231 ** are currently one or more read-only transactions open on the database
1232 ** (in this context a read-only transaction is one opened by a read-only
1233 ** connection on a non-live database).
1234 **
1235 ** If no error occurs, LSM_OK is returned and *pbExists is set to true if
1236 ** some other connection has a read-only transaction open, or false
1237 ** otherwise. If an error occurs an LSM error code is returned and the final
1238 ** value of *pbExist is undefined.
1239 */
lsmDetectRoTrans(lsm_db * db,int * pbExist)1240 int lsmDetectRoTrans(lsm_db *db, int *pbExist){
1241   int rc;
1242 
1243   /* Only a read-write connection may use this function. */
1244   assert( db->bReadonly==0 );
1245 
1246   rc = lsmShmTestLock(db, LSM_LOCK_ROTRANS, 1, LSM_LOCK_EXCL);
1247   if( rc==LSM_BUSY ){
1248     *pbExist = 1;
1249     rc = LSM_OK;
1250   }else{
1251     *pbExist = 0;
1252   }
1253 
1254   return rc;
1255 }
1256 
1257 /*
1258 ** db is a read-only database handle in the disconnected state. This function
1259 ** attempts to open a read-transaction on the database. This may involve
1260 ** connecting to the database system (opening shared memory etc.).
1261 */
lsmBeginRoTrans(lsm_db * db)1262 int lsmBeginRoTrans(lsm_db *db){
1263   int rc = LSM_OK;
1264 
1265   assert( db->bReadonly && db->pShmhdr==0 );
1266   assert( db->iReader<0 );
1267 
1268   if( db->bRoTrans==0 ){
1269 
1270     /* Attempt a shared-lock on DMS1. */
1271     rc = lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_SHARED, 0);
1272     if( rc!=LSM_OK ) return rc;
1273 
1274     rc = lsmShmTestLock(
1275         db, LSM_LOCK_RWCLIENT(0), LSM_LOCK_NREADER, LSM_LOCK_SHARED
1276     );
1277     if( rc==LSM_OK ){
1278       /* System is not live. Take a SHARED lock on the ROTRANS byte and
1279       ** release DMS1. Locking ROTRANS tells all read-write clients that they
1280       ** may not recycle any disk space from within the database or log files,
1281       ** as a read-only client may be using it.  */
1282       rc = lsmShmLock(db, LSM_LOCK_ROTRANS, LSM_LOCK_SHARED, 0);
1283       lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
1284 
1285       if( rc==LSM_OK ){
1286         db->bRoTrans = 1;
1287         rc = lsmShmCacheChunks(db, 1);
1288         if( rc==LSM_OK ){
1289           db->pShmhdr = (ShmHeader *)db->apShm[0];
1290           memset(db->pShmhdr, 0, sizeof(ShmHeader));
1291           rc = lsmCheckpointRecover(db);
1292           if( rc==LSM_OK ){
1293             rc = lsmLogRecover(db);
1294           }
1295         }
1296       }
1297     }else if( rc==LSM_BUSY ){
1298       /* System is live! */
1299       rc = lsmShmLock(db, LSM_LOCK_DMS3, LSM_LOCK_SHARED, 0);
1300       lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
1301       if( rc==LSM_OK ){
1302         rc = lsmShmCacheChunks(db, 1);
1303         if( rc==LSM_OK ){
1304           db->pShmhdr = (ShmHeader *)db->apShm[0];
1305         }
1306       }
1307     }
1308 
1309     if( rc==LSM_OK ){
1310       rc = lsmBeginReadTrans(db);
1311     }
1312   }
1313 
1314   return rc;
1315 }
1316 
1317 /*
1318 ** Close the currently open read transaction.
1319 */
lsmFinishReadTrans(lsm_db * pDb)1320 void lsmFinishReadTrans(lsm_db *pDb){
1321 
1322   /* Worker connections should not be closing read transactions. And
1323   ** read transactions should only be closed after all cursors and write
1324   ** transactions have been closed. Finally pClient should be non-NULL
1325   ** only iff pDb->iReader>=0.  */
1326   assert( pDb->pWorker==0 );
1327   assert( pDb->pCsr==0 && pDb->nTransOpen==0 );
1328 
1329   if( pDb->bRoTrans ){
1330     int i;
1331     for(i=0; i<pDb->nShm; i++){
1332       lsmFree(pDb->pEnv, pDb->apShm[i]);
1333     }
1334     lsmFree(pDb->pEnv, pDb->apShm);
1335     pDb->apShm = 0;
1336     pDb->nShm = 0;
1337     pDb->pShmhdr = 0;
1338 
1339     lsmShmLock(pDb, LSM_LOCK_ROTRANS, LSM_LOCK_UNLOCK, 0);
1340   }
1341   dbReleaseReadlock(pDb);
1342 }
1343 
1344 /*
1345 ** Open a write transaction.
1346 */
lsmBeginWriteTrans(lsm_db * pDb)1347 int lsmBeginWriteTrans(lsm_db *pDb){
1348   int rc = LSM_OK;                /* Return code */
1349   ShmHeader *pShm = pDb->pShmhdr; /* Shared memory header */
1350 
1351   assert( pDb->nTransOpen==0 );
1352   assert( pDb->bDiscardOld==0 );
1353   assert( pDb->bReadonly==0 );
1354 
1355   /* If there is no read-transaction open, open one now. */
1356   if( pDb->iReader<0 ){
1357     rc = lsmBeginReadTrans(pDb);
1358   }
1359 
1360   /* Attempt to take the WRITER lock */
1361   if( rc==LSM_OK ){
1362     rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
1363   }
1364 
1365   /* If the previous writer failed mid-transaction, run emergency rollback. */
1366   if( rc==LSM_OK && pShm->bWriter ){
1367     rc = lsmTreeRepair(pDb);
1368     if( rc==LSM_OK ) pShm->bWriter = 0;
1369   }
1370 
1371   /* Check that this connection is currently reading from the most recent
1372   ** version of the database. If not, return LSM_BUSY.  */
1373   if( rc==LSM_OK && memcmp(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader)) ){
1374     rc = LSM_BUSY;
1375   }
1376 
1377   if( rc==LSM_OK ){
1378     rc = lsmLogBegin(pDb);
1379   }
1380 
1381   /* If everything was successful, set the "transaction-in-progress" flag
1382   ** and return LSM_OK. Otherwise, if some error occurred, relinquish the
1383   ** WRITER lock and return an error code.  */
1384   if( rc==LSM_OK ){
1385     TreeHeader *p = &pDb->treehdr;
1386     pShm->bWriter = 1;
1387     p->root.iTransId++;
1388     if( lsmTreeHasOld(pDb) && p->iOldLog==pDb->pClient->iLogOff ){
1389       lsmTreeDiscardOld(pDb);
1390       pDb->bDiscardOld = 1;
1391     }
1392   }else{
1393     lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
1394     if( pDb->pCsr==0 ) lsmFinishReadTrans(pDb);
1395   }
1396   return rc;
1397 }
1398 
1399 /*
1400 ** End the current write transaction. The connection is left with an open
1401 ** read transaction. It is an error to call this if there is no open write
1402 ** transaction.
1403 **
1404 ** If the transaction was committed, then a commit record has already been
1405 ** written into the log file when this function is called. Or, if the
1406 ** transaction was rolled back, both the log file and in-memory tree
1407 ** structure have already been restored. In either case, this function
1408 ** merely releases locks and other resources held by the write-transaction.
1409 **
1410 ** LSM_OK is returned if successful, or an LSM error code otherwise.
1411 */
lsmFinishWriteTrans(lsm_db * pDb,int bCommit)1412 int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){
1413   int rc = LSM_OK;
1414   int bFlush = 0;
1415 
1416   lsmLogEnd(pDb, bCommit);
1417   if( rc==LSM_OK && bCommit && lsmTreeSize(pDb)>pDb->nTreeLimit ){
1418     bFlush = 1;
1419     lsmTreeMakeOld(pDb);
1420   }
1421   lsmTreeEndTransaction(pDb, bCommit);
1422 
1423   if( rc==LSM_OK ){
1424     if( bFlush && pDb->bAutowork ){
1425       rc = lsmSortedAutoWork(pDb, 1);
1426     }else if( bCommit && pDb->bDiscardOld ){
1427       rc = dbSetReadLock(pDb, pDb->pClient->iId, pDb->treehdr.iUsedShmid);
1428     }
1429   }
1430   pDb->bDiscardOld = 0;
1431   lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
1432 
1433   if( bFlush && pDb->bAutowork==0 && pDb->xWork ){
1434     pDb->xWork(pDb, pDb->pWorkCtx);
1435   }
1436   return rc;
1437 }
1438 
1439 
1440 /*
1441 ** Return non-zero if the caller is holding the client mutex.
1442 */
1443 #ifdef LSM_DEBUG
lsmHoldingClientMutex(lsm_db * pDb)1444 int lsmHoldingClientMutex(lsm_db *pDb){
1445   return lsmMutexHeld(pDb->pEnv, pDb->pDatabase->pClientMutex);
1446 }
1447 #endif
1448 
slotIsUsable(ShmReader * p,i64 iLsm,u32 iShmMin,u32 iShmMax)1449 static int slotIsUsable(ShmReader *p, i64 iLsm, u32 iShmMin, u32 iShmMax){
1450   return(
1451       p->iLsmId && p->iLsmId<=iLsm
1452       && shm_sequence_ge(iShmMax, p->iTreeId)
1453       && shm_sequence_ge(p->iTreeId, iShmMin)
1454   );
1455 }
1456 
1457 /*
1458 ** Obtain a read-lock on database version identified by the combination
1459 ** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or
1460 ** an LSM error code otherwise.
1461 */
lsmReadlock(lsm_db * db,i64 iLsm,u32 iShmMin,u32 iShmMax)1462 int lsmReadlock(lsm_db *db, i64 iLsm, u32 iShmMin, u32 iShmMax){
1463   int rc = LSM_OK;
1464   ShmHeader *pShm = db->pShmhdr;
1465   int i;
1466 
1467   assert( db->iReader<0 );
1468   assert( shm_sequence_ge(iShmMax, iShmMin) );
1469 
1470   /* This is a no-op if the read-only transaction flag is set. */
1471   if( db->bRoTrans ){
1472     db->iReader = 0;
1473     return LSM_OK;
1474   }
1475 
1476   /* Search for an exact match. */
1477   for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
1478     ShmReader *p = &pShm->aReader[i];
1479     if( p->iLsmId==iLsm && p->iTreeId==iShmMax ){
1480       rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
1481       if( rc==LSM_OK && p->iLsmId==iLsm && p->iTreeId==iShmMax ){
1482         db->iReader = i;
1483       }else if( rc==LSM_BUSY ){
1484         rc = LSM_OK;
1485       }
1486     }
1487   }
1488 
1489   /* Try to obtain a write-lock on each slot, in order. If successful, set
1490   ** the slot values to iLsm/iTree.  */
1491   for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
1492     rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
1493     if( rc==LSM_BUSY ){
1494       rc = LSM_OK;
1495     }else{
1496       ShmReader *p = &pShm->aReader[i];
1497       p->iLsmId = iLsm;
1498       p->iTreeId = iShmMax;
1499       rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
1500       assert( rc!=LSM_BUSY );
1501       if( rc==LSM_OK ) db->iReader = i;
1502     }
1503   }
1504 
1505   /* Search for any usable slot */
1506   for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
1507     ShmReader *p = &pShm->aReader[i];
1508     if( slotIsUsable(p, iLsm, iShmMin, iShmMax) ){
1509       rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
1510       if( rc==LSM_OK && slotIsUsable(p, iLsm, iShmMin, iShmMax) ){
1511         db->iReader = i;
1512       }else if( rc==LSM_BUSY ){
1513         rc = LSM_OK;
1514       }
1515     }
1516   }
1517 
1518   if( rc==LSM_OK && db->iReader<0 ){
1519     rc = LSM_BUSY;
1520   }
1521   return rc;
1522 }
1523 
1524 /*
1525 ** This is used to check if there exists a read-lock locking a particular
1526 ** version of either the in-memory tree or database file.
1527 **
1528 ** If iLsmId is non-zero, then it is a snapshot id. If there exists a
1529 ** read-lock using this snapshot or newer, set *pbInUse to true. Or,
1530 ** if there is no such read-lock, set it to false.
1531 **
1532 ** Or, if iLsmId is zero, then iShmid is a shared-memory sequence id.
1533 ** Search for a read-lock using this sequence id or newer. etc.
1534 */
isInUse(lsm_db * db,i64 iLsmId,u32 iShmid,int * pbInUse)1535 static int isInUse(lsm_db *db, i64 iLsmId, u32 iShmid, int *pbInUse){
1536   ShmHeader *pShm = db->pShmhdr;
1537   int i;
1538   int rc = LSM_OK;
1539 
1540   for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
1541     ShmReader *p = &pShm->aReader[i];
1542     if( p->iLsmId ){
1543       if( (iLsmId!=0 && p->iLsmId!=0 && iLsmId>=p->iLsmId)
1544        || (iLsmId==0 && shm_sequence_ge(p->iTreeId, iShmid))
1545       ){
1546         rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
1547         if( rc==LSM_OK ){
1548           p->iLsmId = 0;
1549           lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
1550         }
1551       }
1552     }
1553   }
1554 
1555   if( rc==LSM_BUSY ){
1556     *pbInUse = 1;
1557     return LSM_OK;
1558   }
1559   *pbInUse = 0;
1560   return rc;
1561 }
1562 
1563 /*
1564 ** This function is called by worker connections to determine the smallest
1565 ** snapshot id that is currently in use by a database client. The worker
1566 ** connection uses this result to determine whether or not it is safe to
1567 ** recycle a database block.
1568 */
firstSnapshotInUse(lsm_db * db,i64 * piInUse)1569 static int firstSnapshotInUse(
1570   lsm_db *db,                     /* Database handle */
1571   i64 *piInUse                    /* IN/OUT: Smallest snapshot id in use */
1572 ){
1573   ShmHeader *pShm = db->pShmhdr;
1574   i64 iInUse = *piInUse;
1575   int i;
1576 
1577   assert( iInUse>0 );
1578   for(i=0; i<LSM_LOCK_NREADER; i++){
1579     ShmReader *p = &pShm->aReader[i];
1580     if( p->iLsmId ){
1581       i64 iThis = p->iLsmId;
1582       if( iThis!=0 && iInUse>iThis ){
1583         int rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
1584         if( rc==LSM_OK ){
1585           p->iLsmId = 0;
1586           lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
1587         }else if( rc==LSM_BUSY ){
1588           iInUse = iThis;
1589         }else{
1590           /* Some error other than LSM_BUSY. Return the error code to
1591           ** the caller in this case.  */
1592           return rc;
1593         }
1594       }
1595     }
1596   }
1597 
1598   *piInUse = iInUse;
1599   return LSM_OK;
1600 }
1601 
lsmTreeInUse(lsm_db * db,u32 iShmid,int * pbInUse)1602 int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){
1603   if( db->treehdr.iUsedShmid==iShmid ){
1604     *pbInUse = 1;
1605     return LSM_OK;
1606   }
1607   return isInUse(db, 0, iShmid, pbInUse);
1608 }
1609 
lsmLsmInUse(lsm_db * db,i64 iLsmId,int * pbInUse)1610 int lsmLsmInUse(lsm_db *db, i64 iLsmId, int *pbInUse){
1611   if( db->pClient && db->pClient->iId<=iLsmId ){
1612     *pbInUse = 1;
1613     return LSM_OK;
1614   }
1615   return isInUse(db, iLsmId, 0, pbInUse);
1616 }
1617 
1618 /*
1619 ** This function may only be called after a successful call to
1620 ** lsmDbDatabaseConnect(). It returns true if the connection is in
1621 ** multi-process mode, or false otherwise.
1622 */
lsmDbMultiProc(lsm_db * pDb)1623 int lsmDbMultiProc(lsm_db *pDb){
1624   return pDb->pDatabase && pDb->pDatabase->bMultiProc;
1625 }
1626 
1627 
1628 /*************************************************************************
1629 **************************************************************************
1630 **************************************************************************
1631 **************************************************************************
1632 **************************************************************************
1633 *************************************************************************/
1634 
1635 /*
1636 ** Ensure that database connection db has cached pointers to at least the
1637 ** first nChunk chunks of shared memory.
1638 */
lsmShmCacheChunks(lsm_db * db,int nChunk)1639 int lsmShmCacheChunks(lsm_db *db, int nChunk){
1640   int rc = LSM_OK;
1641   if( nChunk>db->nShm ){
1642     static const int NINCR = 16;
1643     Database *p = db->pDatabase;
1644     lsm_env *pEnv = db->pEnv;
1645     int nAlloc;
1646     int i;
1647 
1648     /* Ensure that the db->apShm[] array is large enough. If an attempt to
1649     ** allocate memory fails, return LSM_NOMEM immediately. The apShm[] array
1650     ** is always extended in multiples of 16 entries - so the actual allocated
1651     ** size can be inferred from nShm.  */
1652     nAlloc = ((db->nShm + NINCR - 1) / NINCR) * NINCR;
1653     while( nChunk>=nAlloc ){
1654       void **apShm;
1655       nAlloc += NINCR;
1656       apShm = lsmRealloc(pEnv, db->apShm, sizeof(void*)*nAlloc);
1657       if( !apShm ) return LSM_NOMEM_BKPT;
1658       db->apShm = apShm;
1659     }
1660 
1661     if( db->bRoTrans ){
1662       for(i=db->nShm; rc==LSM_OK && i<nChunk; i++){
1663         db->apShm[i] = lsmMallocZeroRc(pEnv, LSM_SHM_CHUNK_SIZE, &rc);
1664         db->nShm++;
1665       }
1666 
1667     }else{
1668 
1669       /* Enter the client mutex */
1670       lsmMutexEnter(pEnv, p->pClientMutex);
1671 
1672       /* Extend the Database objects apShmChunk[] array if necessary. Using the
1673        ** same pattern as for the lsm_db.apShm[] array above.  */
1674       nAlloc = ((p->nShmChunk + NINCR - 1) / NINCR) * NINCR;
1675       while( nChunk>=nAlloc ){
1676         void **apShm;
1677         nAlloc +=  NINCR;
1678         apShm = lsmRealloc(pEnv, p->apShmChunk, sizeof(void*)*nAlloc);
1679         if( !apShm ){
1680           rc = LSM_NOMEM_BKPT;
1681           break;
1682         }
1683         p->apShmChunk = apShm;
1684       }
1685 
1686       for(i=db->nShm; rc==LSM_OK && i<nChunk; i++){
1687         if( i>=p->nShmChunk ){
1688           void *pChunk = 0;
1689           if( p->bMultiProc==0 ){
1690             /* Single process mode */
1691             pChunk = lsmMallocZeroRc(pEnv, LSM_SHM_CHUNK_SIZE, &rc);
1692           }else{
1693             /* Multi-process mode */
1694             rc = lsmEnvShmMap(pEnv, p->pFile, i, LSM_SHM_CHUNK_SIZE, &pChunk);
1695           }
1696           if( rc==LSM_OK ){
1697             p->apShmChunk[i] = pChunk;
1698             p->nShmChunk++;
1699           }
1700         }
1701         if( rc==LSM_OK ){
1702           db->apShm[i] = p->apShmChunk[i];
1703           db->nShm++;
1704         }
1705       }
1706 
1707       /* Release the client mutex */
1708       lsmMutexLeave(pEnv, p->pClientMutex);
1709     }
1710   }
1711 
1712   return rc;
1713 }
1714 
lockSharedFile(lsm_env * pEnv,Database * p,int iLock,int eOp)1715 static int lockSharedFile(lsm_env *pEnv, Database *p, int iLock, int eOp){
1716   int rc = LSM_OK;
1717   if( p->bMultiProc ){
1718     rc = lsmEnvLock(pEnv, p->pFile, iLock, eOp);
1719   }
1720   return rc;
1721 }
1722 
1723 /*
1724 ** Test if it would be possible for connection db to obtain a lock of type
1725 ** eType on the nLock locks starting at iLock. If so, return LSM_OK. If it
1726 ** would not be possible to obtain the lock due to a lock held by another
1727 ** connection, return LSM_BUSY. If an IO or other error occurs (i.e. in the
1728 ** lsm_env.xTestLock function), return some other LSM error code.
1729 **
1730 ** Note that this function never actually locks the database - it merely
1731 ** queries the system to see if there exists a lock that would prevent
1732 ** it from doing so.
1733 */
lsmShmTestLock(lsm_db * db,int iLock,int nLock,int eOp)1734 int lsmShmTestLock(
1735   lsm_db *db,
1736   int iLock,
1737   int nLock,
1738   int eOp
1739 ){
1740   int rc = LSM_OK;
1741   lsm_db *pIter;
1742   Database *p = db->pDatabase;
1743   int i;
1744   u64 mask = 0;
1745 
1746   for(i=iLock; i<(iLock+nLock); i++){
1747     mask |= ((u64)1 << (iLock-1));
1748     if( eOp==LSM_LOCK_EXCL ) mask |= ((u64)1 << (iLock+32-1));
1749   }
1750 
1751   lsmMutexEnter(db->pEnv, p->pClientMutex);
1752   for(pIter=p->pConn; pIter; pIter=pIter->pNext){
1753     if( pIter!=db && (pIter->mLock & mask) ){
1754       assert( pIter!=db );
1755       break;
1756     }
1757   }
1758 
1759   if( pIter ){
1760     rc = LSM_BUSY;
1761   }else if( p->bMultiProc ){
1762     rc = lsmEnvTestLock(db->pEnv, p->pFile, iLock, nLock, eOp);
1763   }
1764 
1765   lsmMutexLeave(db->pEnv, p->pClientMutex);
1766   return rc;
1767 }
1768 
1769 /*
1770 ** Attempt to obtain the lock identified by the iLock and bExcl parameters.
1771 ** If successful, return LSM_OK. If the lock cannot be obtained because
1772 ** there exists some other conflicting lock, return LSM_BUSY. If some other
1773 ** error occurs, return an LSM error code.
1774 **
1775 ** Parameter iLock must be one of LSM_LOCK_WRITER, WORKER or CHECKPOINTER,
1776 ** or else a value returned by the LSM_LOCK_READER macro.
1777 */
lsmShmLock(lsm_db * db,int iLock,int eOp,int bBlock)1778 int lsmShmLock(
1779   lsm_db *db,
1780   int iLock,
1781   int eOp,                        /* One of LSM_LOCK_UNLOCK, SHARED or EXCL */
1782   int bBlock                      /* True for a blocking lock */
1783 ){
1784   lsm_db *pIter;
1785   const u64 me = ((u64)1 << (iLock-1));
1786   const u64 ms = ((u64)1 << (iLock+32-1));
1787   int rc = LSM_OK;
1788   Database *p = db->pDatabase;
1789 
1790   assert( eOp!=LSM_LOCK_EXCL || p->bReadonly==0 );
1791   assert( iLock>=1 && iLock<=LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT-1) );
1792   assert( LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT-1)<=32 );
1793   assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );
1794 
1795   /* Check for a no-op. Proceed only if this is not one of those. */
1796   if( (eOp==LSM_LOCK_UNLOCK && (db->mLock & (me|ms))!=0)
1797    || (eOp==LSM_LOCK_SHARED && (db->mLock & (me|ms))!=ms)
1798    || (eOp==LSM_LOCK_EXCL   && (db->mLock & me)==0)
1799   ){
1800     int nExcl = 0;                /* Number of connections holding EXCLUSIVE */
1801     int nShared = 0;              /* Number of connections holding SHARED */
1802     lsmMutexEnter(db->pEnv, p->pClientMutex);
1803 
1804     /* Figure out the locks currently held by this process on iLock, not
1805     ** including any held by connection db.  */
1806     for(pIter=p->pConn; pIter; pIter=pIter->pNext){
1807       assert( (pIter->mLock & me)==0 || (pIter->mLock & ms)!=0 );
1808       if( pIter!=db ){
1809         if( pIter->mLock & me ){
1810           nExcl++;
1811         }else if( pIter->mLock & ms ){
1812           nShared++;
1813         }
1814       }
1815     }
1816     assert( nExcl==0 || nExcl==1 );
1817     assert( nExcl==0 || nShared==0 );
1818     assert( nExcl==0 || (db->mLock & (me|ms))==0 );
1819 
1820     switch( eOp ){
1821       case LSM_LOCK_UNLOCK:
1822         if( nShared==0 ){
1823           lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_UNLOCK);
1824         }
1825         db->mLock &= ~(me|ms);
1826         break;
1827 
1828       case LSM_LOCK_SHARED:
1829         if( nExcl ){
1830           rc = LSM_BUSY;
1831         }else{
1832           if( nShared==0 ){
1833             rc = lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_SHARED);
1834           }
1835           if( rc==LSM_OK ){
1836             db->mLock |= ms;
1837             db->mLock &= ~me;
1838           }
1839         }
1840         break;
1841 
1842       default:
1843         assert( eOp==LSM_LOCK_EXCL );
1844         if( nExcl || nShared ){
1845           rc = LSM_BUSY;
1846         }else{
1847           rc = lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_EXCL);
1848           if( rc==LSM_OK ){
1849             db->mLock |= (me|ms);
1850           }
1851         }
1852         break;
1853     }
1854 
1855     lsmMutexLeave(db->pEnv, p->pClientMutex);
1856   }
1857 
1858   return rc;
1859 }
1860 
1861 #ifdef LSM_DEBUG
1862 
shmLockType(lsm_db * db,int iLock)1863 int shmLockType(lsm_db *db, int iLock){
1864   const u64 me = ((u64)1 << (iLock-1));
1865   const u64 ms = ((u64)1 << (iLock+32-1));
1866 
1867   if( db->mLock & me ) return LSM_LOCK_EXCL;
1868   if( db->mLock & ms ) return LSM_LOCK_SHARED;
1869   return LSM_LOCK_UNLOCK;
1870 }
1871 
1872 /*
1873 ** The arguments passed to this function are similar to those passed to
1874 ** the lsmShmLock() function. However, instead of obtaining a new lock
1875 ** this function returns true if the specified connection already holds
1876 ** (or does not hold) such a lock, depending on the value of eOp. As
1877 ** follows:
1878 **
1879 **   (eOp==LSM_LOCK_UNLOCK) -> true if db has no lock on iLock
1880 **   (eOp==LSM_LOCK_SHARED) -> true if db has at least a SHARED lock on iLock.
1881 **   (eOp==LSM_LOCK_EXCL)   -> true if db has an EXCLUSIVE lock on iLock.
1882 */
lsmShmAssertLock(lsm_db * db,int iLock,int eOp)1883 int lsmShmAssertLock(lsm_db *db, int iLock, int eOp){
1884   int ret = 0;
1885   int eHave;
1886 
1887   assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
1888   assert( iLock<=16 );
1889   assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );
1890 
1891   eHave = shmLockType(db, iLock);
1892 
1893   switch( eOp ){
1894     case LSM_LOCK_UNLOCK:
1895       ret = (eHave==LSM_LOCK_UNLOCK);
1896       break;
1897     case LSM_LOCK_SHARED:
1898       ret = (eHave!=LSM_LOCK_UNLOCK);
1899       break;
1900     case LSM_LOCK_EXCL:
1901       ret = (eHave==LSM_LOCK_EXCL);
1902       break;
1903     default:
1904       assert( !"bad eOp value passed to lsmShmAssertLock()" );
1905       break;
1906   }
1907 
1908   return ret;
1909 }
1910 
lsmShmAssertWorker(lsm_db * db)1911 int lsmShmAssertWorker(lsm_db *db){
1912   return lsmShmAssertLock(db, LSM_LOCK_WORKER, LSM_LOCK_EXCL) && db->pWorker;
1913 }
1914 
1915 /*
1916 ** This function does not contribute to library functionality, and is not
1917 ** included in release builds. It is intended to be called from within
1918 ** an interactive debugger.
1919 **
1920 ** When called, this function prints a single line of human readable output
1921 ** to stdout describing the locks currently held by the connection. For
1922 ** example:
1923 **
1924 **     (gdb) call print_db_locks(pDb)
1925 **     (shared on dms2) (exclusive on writer)
1926 */
print_db_locks(lsm_db * db)1927 void print_db_locks(lsm_db *db){
1928   int iLock;
1929   for(iLock=0; iLock<16; iLock++){
1930     int bOne = 0;
1931     const char *azLock[] = {0, "shared", "exclusive"};
1932     const char *azName[] = {
1933       0, "dms1", "dms2", "writer", "worker", "checkpointer",
1934       "reader0", "reader1", "reader2", "reader3", "reader4", "reader5"
1935     };
1936     int eHave = shmLockType(db, iLock);
1937     if( azLock[eHave] ){
1938       printf("%s(%s on %s)", (bOne?" ":""), azLock[eHave], azName[iLock]);
1939       bOne = 1;
1940     }
1941   }
1942   printf("\n");
1943 }
print_all_db_locks(lsm_db * db)1944 void print_all_db_locks(lsm_db *db){
1945   lsm_db *p;
1946   for(p=db->pDatabase->pConn; p; p=p->pNext){
1947     printf("%s connection %p ", ((p==db)?"*":""), p);
1948     print_db_locks(p);
1949   }
1950 }
1951 #endif
1952 
lsmShmBarrier(lsm_db * db)1953 void lsmShmBarrier(lsm_db *db){
1954   lsmEnvShmBarrier(db->pEnv);
1955 }
1956 
lsm_checkpoint(lsm_db * pDb,int * pnKB)1957 int lsm_checkpoint(lsm_db *pDb, int *pnKB){
1958   int rc;                         /* Return code */
1959   u32 nWrite = 0;                 /* Number of pages checkpointed */
1960 
1961   /* Attempt the checkpoint. If successful, nWrite is set to the number of
1962   ** pages written between this and the previous checkpoint.  */
1963   rc = lsmCheckpointWrite(pDb, &nWrite);
1964 
1965   /* If required, calculate the output variable (KB of data checkpointed).
1966   ** Set it to zero if an error occured.  */
1967   if( pnKB ){
1968     int nKB = 0;
1969     if( rc==LSM_OK && nWrite ){
1970       nKB = (((i64)nWrite * lsmFsPageSize(pDb->pFS)) + 1023) / 1024;
1971     }
1972     *pnKB = nKB;
1973   }
1974 
1975   return rc;
1976 }
1977