1 /* 2 ** 2005 December 14 3 ** 4 ** The author disclaims copyright to this source code. In place of 5 ** a legal notice, here is a blessing: 6 ** 7 ** May you do good and not evil. 8 ** May you find forgiveness for yourself and forgive others. 9 ** May you share freely, never taking more than you give. 10 ** 11 ************************************************************************* 12 ** 13 ** This file contains an example implementation of an asynchronous IO 14 ** backend for SQLite. 15 ** 16 ** WHAT IS ASYNCHRONOUS I/O? 17 ** 18 ** With asynchronous I/O, write requests are handled by a separate thread 19 ** running in the background. This means that the thread that initiates 20 ** a database write does not have to wait for (sometimes slow) disk I/O 21 ** to occur. The write seems to happen very quickly, though in reality 22 ** it is happening at its usual slow pace in the background. 23 ** 24 ** Asynchronous I/O appears to give better responsiveness, but at a price. 25 ** You lose the Durable property. With the default I/O backend of SQLite, 26 ** once a write completes, you know that the information you wrote is 27 ** safely on disk. With the asynchronous I/O, this is not the case. If 28 ** your program crashes or if a power lose occurs after the database 29 ** write but before the asynchronous write thread has completed, then the 30 ** database change might never make it to disk and the next user of the 31 ** database might not see your change. 32 ** 33 ** You lose Durability with asynchronous I/O, but you still retain the 34 ** other parts of ACID: Atomic, Consistent, and Isolated. Many 35 ** appliations get along fine without the Durablity. 36 ** 37 ** HOW IT WORKS 38 ** 39 ** Asynchronous I/O works by creating a special SQLite "vfs" structure 40 ** and registering it with sqlite3_vfs_register(). When files opened via 41 ** this vfs are written to (using sqlite3OsWrite()), the data is not 42 ** written directly to disk, but is placed in the "write-queue" to be 43 ** handled by the background thread. 44 ** 45 ** When files opened with the asynchronous vfs are read from 46 ** (using sqlite3OsRead()), the data is read from the file on 47 ** disk and the write-queue, so that from the point of view of 48 ** the vfs reader the OsWrite() appears to have already completed. 49 ** 50 ** The special vfs is registered (and unregistered) by calls to 51 ** function asyncEnable() (see below). 52 ** 53 ** LIMITATIONS 54 ** 55 ** This demonstration code is deliberately kept simple in order to keep 56 ** the main ideas clear and easy to understand. Real applications that 57 ** want to do asynchronous I/O might want to add additional capabilities. 58 ** For example, in this demonstration if writes are happening at a steady 59 ** stream that exceeds the I/O capability of the background writer thread, 60 ** the queue of pending write operations will grow without bound until we 61 ** run out of memory. Users of this technique may want to keep track of 62 ** the quantity of pending writes and stop accepting new write requests 63 ** when the buffer gets to be too big. 64 ** 65 ** LOCKING + CONCURRENCY 66 ** 67 ** Multiple connections from within a single process that use this 68 ** implementation of asynchronous IO may access a single database 69 ** file concurrently. From the point of view of the user, if all 70 ** connections are from within a single process, there is no difference 71 ** between the concurrency offered by "normal" SQLite and SQLite 72 ** using the asynchronous backend. 73 ** 74 ** If connections from within multiple database files may access the 75 ** database file, the ENABLE_FILE_LOCKING symbol (see below) must be 76 ** defined. If it is not defined, then no locks are established on 77 ** the database file. In this case, if multiple processes access 78 ** the database file, corruption will quickly result. 79 ** 80 ** If ENABLE_FILE_LOCKING is defined (the default), then connections 81 ** from within multiple processes may access a single database file 82 ** without risking corruption. However concurrency is reduced as 83 ** follows: 84 ** 85 ** * When a connection using asynchronous IO begins a database 86 ** transaction, the database is locked immediately. However the 87 ** lock is not released until after all relevant operations 88 ** in the write-queue have been flushed to disk. This means 89 ** (for example) that the database may remain locked for some 90 ** time after a "COMMIT" or "ROLLBACK" is issued. 91 ** 92 ** * If an application using asynchronous IO executes transactions 93 ** in quick succession, other database users may be effectively 94 ** locked out of the database. This is because when a BEGIN 95 ** is executed, a database lock is established immediately. But 96 ** when the corresponding COMMIT or ROLLBACK occurs, the lock 97 ** is not released until the relevant part of the write-queue 98 ** has been flushed through. As a result, if a COMMIT is followed 99 ** by a BEGIN before the write-queue is flushed through, the database 100 ** is never unlocked,preventing other processes from accessing 101 ** the database. 102 ** 103 ** Defining ENABLE_FILE_LOCKING when using an NFS or other remote 104 ** file-system may slow things down, as synchronous round-trips to the 105 ** server may be required to establish database file locks. 106 */ 107 #define ENABLE_FILE_LOCKING 108 109 #ifndef SQLITE_AMALGAMATION 110 # include "sqliteInt.h" 111 #endif 112 #include <tcl.h> 113 114 /* 115 ** This test uses pthreads and hence only works on unix and with 116 ** a threadsafe build of SQLite. 117 */ 118 #if OS_UNIX && SQLITE_THREADSAFE 119 120 /* 121 ** This demo uses pthreads. If you do not have a pthreads implementation 122 ** for your operating system, you will need to recode the threading 123 ** logic. 124 */ 125 #include <pthread.h> 126 #include <sched.h> 127 128 /* Useful macros used in several places */ 129 #define MIN(x,y) ((x)<(y)?(x):(y)) 130 #define MAX(x,y) ((x)>(y)?(x):(y)) 131 132 /* Forward references */ 133 typedef struct AsyncWrite AsyncWrite; 134 typedef struct AsyncFile AsyncFile; 135 typedef struct AsyncFileData AsyncFileData; 136 typedef struct AsyncFileLock AsyncFileLock; 137 typedef struct AsyncLock AsyncLock; 138 139 /* Enable for debugging */ 140 static int sqlite3async_trace = 0; 141 # define ASYNC_TRACE(X) if( sqlite3async_trace ) asyncTrace X 142 static void asyncTrace(const char *zFormat, ...){ 143 char *z; 144 va_list ap; 145 va_start(ap, zFormat); 146 z = sqlite3_vmprintf(zFormat, ap); 147 va_end(ap); 148 fprintf(stderr, "[%d] %s", (int)pthread_self(), z); 149 sqlite3_free(z); 150 } 151 152 /* 153 ** THREAD SAFETY NOTES 154 ** 155 ** Basic rules: 156 ** 157 ** * Both read and write access to the global write-op queue must be 158 ** protected by the async.queueMutex. As are the async.ioError and 159 ** async.nFile variables. 160 ** 161 ** * The async.aLock hash-table and all AsyncLock and AsyncFileLock 162 ** structures must be protected by the async.lockMutex mutex. 163 ** 164 ** * The file handles from the underlying system are assumed not to 165 ** be thread safe. 166 ** 167 ** * See the last two paragraphs under "The Writer Thread" for 168 ** an assumption to do with file-handle synchronization by the Os. 169 ** 170 ** Deadlock prevention: 171 ** 172 ** There are three mutex used by the system: the "writer" mutex, 173 ** the "queue" mutex and the "lock" mutex. Rules are: 174 ** 175 ** * It is illegal to block on the writer mutex when any other mutex 176 ** are held, and 177 ** 178 ** * It is illegal to block on the queue mutex when the lock mutex 179 ** is held. 180 ** 181 ** i.e. mutex's must be grabbed in the order "writer", "queue", "lock". 182 ** 183 ** File system operations (invoked by SQLite thread): 184 ** 185 ** xOpen 186 ** xDelete 187 ** xFileExists 188 ** 189 ** File handle operations (invoked by SQLite thread): 190 ** 191 ** asyncWrite, asyncClose, asyncTruncate, asyncSync 192 ** 193 ** The operations above add an entry to the global write-op list. They 194 ** prepare the entry, acquire the async.queueMutex momentarily while 195 ** list pointers are manipulated to insert the new entry, then release 196 ** the mutex and signal the writer thread to wake up in case it happens 197 ** to be asleep. 198 ** 199 ** 200 ** asyncRead, asyncFileSize. 201 ** 202 ** Read operations. Both of these read from both the underlying file 203 ** first then adjust their result based on pending writes in the 204 ** write-op queue. So async.queueMutex is held for the duration 205 ** of these operations to prevent other threads from changing the 206 ** queue in mid operation. 207 ** 208 ** 209 ** asyncLock, asyncUnlock, asyncCheckReservedLock 210 ** 211 ** These primitives implement in-process locking using a hash table 212 ** on the file name. Files are locked correctly for connections coming 213 ** from the same process. But other processes cannot see these locks 214 ** and will therefore not honor them. 215 ** 216 ** 217 ** The writer thread: 218 ** 219 ** The async.writerMutex is used to make sure only there is only 220 ** a single writer thread running at a time. 221 ** 222 ** Inside the writer thread is a loop that works like this: 223 ** 224 ** WHILE (write-op list is not empty) 225 ** Do IO operation at head of write-op list 226 ** Remove entry from head of write-op list 227 ** END WHILE 228 ** 229 ** The async.queueMutex is always held during the <write-op list is 230 ** not empty> test, and when the entry is removed from the head 231 ** of the write-op list. Sometimes it is held for the interim 232 ** period (while the IO is performed), and sometimes it is 233 ** relinquished. It is relinquished if (a) the IO op is an 234 ** ASYNC_CLOSE or (b) when the file handle was opened, two of 235 ** the underlying systems handles were opened on the same 236 ** file-system entry. 237 ** 238 ** If condition (b) above is true, then one file-handle 239 ** (AsyncFile.pBaseRead) is used exclusively by sqlite threads to read the 240 ** file, the other (AsyncFile.pBaseWrite) by sqlite3_async_flush() 241 ** threads to perform write() operations. This means that read 242 ** operations are not blocked by asynchronous writes (although 243 ** asynchronous writes may still be blocked by reads). 244 ** 245 ** This assumes that the OS keeps two handles open on the same file 246 ** properly in sync. That is, any read operation that starts after a 247 ** write operation on the same file system entry has completed returns 248 ** data consistent with the write. We also assume that if one thread 249 ** reads a file while another is writing it all bytes other than the 250 ** ones actually being written contain valid data. 251 ** 252 ** If the above assumptions are not true, set the preprocessor symbol 253 ** SQLITE_ASYNC_TWO_FILEHANDLES to 0. 254 */ 255 256 #ifndef SQLITE_ASYNC_TWO_FILEHANDLES 257 /* #define SQLITE_ASYNC_TWO_FILEHANDLES 0 */ 258 #define SQLITE_ASYNC_TWO_FILEHANDLES 1 259 #endif 260 261 /* 262 ** State information is held in the static variable "async" defined 263 ** as the following structure. 264 ** 265 ** Both async.ioError and async.nFile are protected by async.queueMutex. 266 */ 267 static struct TestAsyncStaticData { 268 pthread_mutex_t lockMutex; /* For access to aLock hash table */ 269 pthread_mutex_t queueMutex; /* Mutex for access to write operation queue */ 270 pthread_mutex_t writerMutex; /* Prevents multiple writer threads */ 271 pthread_cond_t queueSignal; /* For waking up sleeping writer thread */ 272 pthread_cond_t emptySignal; /* Notify when the write queue is empty */ 273 AsyncWrite *pQueueFirst; /* Next write operation to be processed */ 274 AsyncWrite *pQueueLast; /* Last write operation on the list */ 275 Hash aLock; /* Files locked */ 276 volatile int ioDelay; /* Extra delay between write operations */ 277 volatile int writerHaltWhenIdle; /* Writer thread halts when queue empty */ 278 volatile int writerHaltNow; /* Writer thread halts after next op */ 279 int ioError; /* True if an IO error has occured */ 280 int nFile; /* Number of open files (from sqlite pov) */ 281 } async = { 282 PTHREAD_MUTEX_INITIALIZER, 283 PTHREAD_MUTEX_INITIALIZER, 284 PTHREAD_MUTEX_INITIALIZER, 285 PTHREAD_COND_INITIALIZER, 286 PTHREAD_COND_INITIALIZER, 287 }; 288 289 /* Possible values of AsyncWrite.op */ 290 #define ASYNC_NOOP 0 291 #define ASYNC_WRITE 1 292 #define ASYNC_SYNC 2 293 #define ASYNC_TRUNCATE 3 294 #define ASYNC_CLOSE 4 295 #define ASYNC_DELETE 5 296 #define ASYNC_OPENEXCLUSIVE 6 297 #define ASYNC_UNLOCK 7 298 299 /* Names of opcodes. Used for debugging only. 300 ** Make sure these stay in sync with the macros above! 301 */ 302 static const char *azOpcodeName[] = { 303 "NOOP", "WRITE", "SYNC", "TRUNCATE", "CLOSE", "DELETE", "OPENEX", "UNLOCK" 304 }; 305 306 /* 307 ** Entries on the write-op queue are instances of the AsyncWrite 308 ** structure, defined here. 309 ** 310 ** The interpretation of the iOffset and nByte variables varies depending 311 ** on the value of AsyncWrite.op: 312 ** 313 ** ASYNC_NOOP: 314 ** No values used. 315 ** 316 ** ASYNC_WRITE: 317 ** iOffset -> Offset in file to write to. 318 ** nByte -> Number of bytes of data to write (pointed to by zBuf). 319 ** 320 ** ASYNC_SYNC: 321 ** nByte -> flags to pass to sqlite3OsSync(). 322 ** 323 ** ASYNC_TRUNCATE: 324 ** iOffset -> Size to truncate file to. 325 ** nByte -> Unused. 326 ** 327 ** ASYNC_CLOSE: 328 ** iOffset -> Unused. 329 ** nByte -> Unused. 330 ** 331 ** ASYNC_DELETE: 332 ** iOffset -> Contains the "syncDir" flag. 333 ** nByte -> Number of bytes of zBuf points to (file name). 334 ** 335 ** ASYNC_OPENEXCLUSIVE: 336 ** iOffset -> Value of "delflag". 337 ** nByte -> Number of bytes of zBuf points to (file name). 338 ** 339 ** ASYNC_UNLOCK: 340 ** nByte -> Argument to sqlite3OsUnlock(). 341 ** 342 ** 343 ** For an ASYNC_WRITE operation, zBuf points to the data to write to the file. 344 ** This space is sqlite3_malloc()d along with the AsyncWrite structure in a 345 ** single blob, so is deleted when sqlite3_free() is called on the parent 346 ** structure. 347 */ 348 struct AsyncWrite { 349 AsyncFileData *pFileData; /* File to write data to or sync */ 350 int op; /* One of ASYNC_xxx etc. */ 351 i64 iOffset; /* See above */ 352 int nByte; /* See above */ 353 char *zBuf; /* Data to write to file (or NULL if op!=ASYNC_WRITE) */ 354 AsyncWrite *pNext; /* Next write operation (to any file) */ 355 }; 356 357 /* 358 ** An instance of this structure is created for each distinct open file 359 ** (i.e. if two handles are opened on the one file, only one of these 360 ** structures is allocated) and stored in the async.aLock hash table. The 361 ** keys for async.aLock are the full pathnames of the opened files. 362 ** 363 ** AsyncLock.pList points to the head of a linked list of AsyncFileLock 364 ** structures, one for each handle currently open on the file. 365 ** 366 ** If the opened file is not a main-database (the SQLITE_OPEN_MAIN_DB is 367 ** not passed to the sqlite3OsOpen() call), or if ENABLE_FILE_LOCKING is 368 ** not defined at compile time, variables AsyncLock.pFile and 369 ** AsyncLock.eLock are never used. Otherwise, pFile is a file handle 370 ** opened on the file in question and used to obtain the file-system 371 ** locks required by database connections within this process. 372 ** 373 ** See comments above the asyncLock() function for more details on 374 ** the implementation of database locking used by this backend. 375 */ 376 struct AsyncLock { 377 sqlite3_file *pFile; 378 int eLock; 379 AsyncFileLock *pList; 380 }; 381 382 /* 383 ** An instance of the following structure is allocated along with each 384 ** AsyncFileData structure (see AsyncFileData.lock), but is only used if the 385 ** file was opened with the SQLITE_OPEN_MAIN_DB. 386 */ 387 struct AsyncFileLock { 388 int eLock; /* Internally visible lock state (sqlite pov) */ 389 int eAsyncLock; /* Lock-state with write-queue unlock */ 390 AsyncFileLock *pNext; 391 }; 392 393 /* 394 ** The AsyncFile structure is a subclass of sqlite3_file used for 395 ** asynchronous IO. 396 ** 397 ** All of the actual data for the structure is stored in the structure 398 ** pointed to by AsyncFile.pData, which is allocated as part of the 399 ** sqlite3OsOpen() using sqlite3_malloc(). The reason for this is that the 400 ** lifetime of the AsyncFile structure is ended by the caller after OsClose() 401 ** is called, but the data in AsyncFileData may be required by the 402 ** writer thread after that point. 403 */ 404 struct AsyncFile { 405 sqlite3_io_methods *pMethod; 406 AsyncFileData *pData; 407 }; 408 struct AsyncFileData { 409 char *zName; /* Underlying OS filename - used for debugging */ 410 int nName; /* Number of characters in zName */ 411 sqlite3_file *pBaseRead; /* Read handle to the underlying Os file */ 412 sqlite3_file *pBaseWrite; /* Write handle to the underlying Os file */ 413 AsyncFileLock lock; 414 AsyncWrite close; 415 }; 416 417 /* 418 ** The following async_XXX functions are debugging wrappers around the 419 ** corresponding pthread_XXX functions: 420 ** 421 ** pthread_mutex_lock(); 422 ** pthread_mutex_unlock(); 423 ** pthread_mutex_trylock(); 424 ** pthread_cond_wait(); 425 ** 426 ** It is illegal to pass any mutex other than those stored in the 427 ** following global variables of these functions. 428 ** 429 ** async.queueMutex 430 ** async.writerMutex 431 ** async.lockMutex 432 ** 433 ** If NDEBUG is defined, these wrappers do nothing except call the 434 ** corresponding pthreads function. If NDEBUG is not defined, then the 435 ** following variables are used to store the thread-id (as returned 436 ** by pthread_self()) currently holding the mutex, or 0 otherwise: 437 ** 438 ** asyncdebug.queueMutexHolder 439 ** asyncdebug.writerMutexHolder 440 ** asyncdebug.lockMutexHolder 441 ** 442 ** These variables are used by some assert() statements that verify 443 ** the statements made in the "Deadlock Prevention" notes earlier 444 ** in this file. 445 */ 446 #ifndef NDEBUG 447 448 static struct TestAsyncDebugData { 449 pthread_t lockMutexHolder; 450 pthread_t queueMutexHolder; 451 pthread_t writerMutexHolder; 452 } asyncdebug = {0, 0, 0}; 453 454 /* 455 ** Wrapper around pthread_mutex_lock(). Checks that we have not violated 456 ** the anti-deadlock rules (see "Deadlock prevention" above). 457 */ 458 static int async_mutex_lock(pthread_mutex_t *pMutex){ 459 int iIdx; 460 int rc; 461 pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); 462 pthread_t *aHolder = (pthread_t *)(&asyncdebug); 463 464 /* The code in this 'ifndef NDEBUG' block depends on a certain alignment 465 * of the variables in TestAsyncStaticData and TestAsyncDebugData. The 466 * following assert() statements check that this has not been changed. 467 * 468 * Really, these only need to be run once at startup time. 469 */ 470 assert(&(aMutex[0])==&async.lockMutex); 471 assert(&(aMutex[1])==&async.queueMutex); 472 assert(&(aMutex[2])==&async.writerMutex); 473 assert(&(aHolder[0])==&asyncdebug.lockMutexHolder); 474 assert(&(aHolder[1])==&asyncdebug.queueMutexHolder); 475 assert(&(aHolder[2])==&asyncdebug.writerMutexHolder); 476 477 assert( pthread_self()!=0 ); 478 479 for(iIdx=0; iIdx<3; iIdx++){ 480 if( pMutex==&aMutex[iIdx] ) break; 481 482 /* This is the key assert(). Here we are checking that if the caller 483 * is trying to block on async.writerMutex, neither of the other two 484 * mutex are held. If the caller is trying to block on async.queueMutex, 485 * lockMutex is not held. 486 */ 487 assert(!pthread_equal(aHolder[iIdx], pthread_self())); 488 } 489 assert(iIdx<3); 490 491 rc = pthread_mutex_lock(pMutex); 492 if( rc==0 ){ 493 assert(aHolder[iIdx]==0); 494 aHolder[iIdx] = pthread_self(); 495 } 496 return rc; 497 } 498 499 /* 500 ** Wrapper around pthread_mutex_unlock(). 501 */ 502 static int async_mutex_unlock(pthread_mutex_t *pMutex){ 503 int iIdx; 504 int rc; 505 pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); 506 pthread_t *aHolder = (pthread_t *)(&asyncdebug); 507 508 for(iIdx=0; iIdx<3; iIdx++){ 509 if( pMutex==&aMutex[iIdx] ) break; 510 } 511 assert(iIdx<3); 512 513 assert(pthread_equal(aHolder[iIdx], pthread_self())); 514 aHolder[iIdx] = 0; 515 rc = pthread_mutex_unlock(pMutex); 516 assert(rc==0); 517 518 return 0; 519 } 520 521 /* 522 ** Wrapper around pthread_mutex_trylock(). 523 */ 524 static int async_mutex_trylock(pthread_mutex_t *pMutex){ 525 int iIdx; 526 int rc; 527 pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); 528 pthread_t *aHolder = (pthread_t *)(&asyncdebug); 529 530 for(iIdx=0; iIdx<3; iIdx++){ 531 if( pMutex==&aMutex[iIdx] ) break; 532 } 533 assert(iIdx<3); 534 535 rc = pthread_mutex_trylock(pMutex); 536 if( rc==0 ){ 537 assert(aHolder[iIdx]==0); 538 aHolder[iIdx] = pthread_self(); 539 } 540 return rc; 541 } 542 543 /* 544 ** Wrapper around pthread_cond_wait(). 545 */ 546 static int async_cond_wait(pthread_cond_t *pCond, pthread_mutex_t *pMutex){ 547 int iIdx; 548 int rc; 549 pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); 550 pthread_t *aHolder = (pthread_t *)(&asyncdebug); 551 552 for(iIdx=0; iIdx<3; iIdx++){ 553 if( pMutex==&aMutex[iIdx] ) break; 554 } 555 assert(iIdx<3); 556 557 assert(pthread_equal(aHolder[iIdx],pthread_self())); 558 aHolder[iIdx] = 0; 559 rc = pthread_cond_wait(pCond, pMutex); 560 if( rc==0 ){ 561 aHolder[iIdx] = pthread_self(); 562 } 563 return rc; 564 } 565 566 /* Call our async_XX wrappers instead of selected pthread_XX functions */ 567 #define pthread_mutex_lock async_mutex_lock 568 #define pthread_mutex_unlock async_mutex_unlock 569 #define pthread_mutex_trylock async_mutex_trylock 570 #define pthread_cond_wait async_cond_wait 571 572 #endif /* !defined(NDEBUG) */ 573 574 /* 575 ** Add an entry to the end of the global write-op list. pWrite should point 576 ** to an AsyncWrite structure allocated using sqlite3_malloc(). The writer 577 ** thread will call sqlite3_free() to free the structure after the specified 578 ** operation has been completed. 579 ** 580 ** Once an AsyncWrite structure has been added to the list, it becomes the 581 ** property of the writer thread and must not be read or modified by the 582 ** caller. 583 */ 584 static void addAsyncWrite(AsyncWrite *pWrite){ 585 /* We must hold the queue mutex in order to modify the queue pointers */ 586 pthread_mutex_lock(&async.queueMutex); 587 588 /* Add the record to the end of the write-op queue */ 589 assert( !pWrite->pNext ); 590 if( async.pQueueLast ){ 591 assert( async.pQueueFirst ); 592 async.pQueueLast->pNext = pWrite; 593 }else{ 594 async.pQueueFirst = pWrite; 595 } 596 async.pQueueLast = pWrite; 597 ASYNC_TRACE(("PUSH %p (%s %s %d)\n", pWrite, azOpcodeName[pWrite->op], 598 pWrite->pFileData ? pWrite->pFileData->zName : "-", pWrite->iOffset)); 599 600 if( pWrite->op==ASYNC_CLOSE ){ 601 async.nFile--; 602 } 603 604 /* Drop the queue mutex */ 605 pthread_mutex_unlock(&async.queueMutex); 606 607 /* The writer thread might have been idle because there was nothing 608 ** on the write-op queue for it to do. So wake it up. */ 609 pthread_cond_signal(&async.queueSignal); 610 } 611 612 /* 613 ** Increment async.nFile in a thread-safe manner. 614 */ 615 static void incrOpenFileCount(){ 616 /* We must hold the queue mutex in order to modify async.nFile */ 617 pthread_mutex_lock(&async.queueMutex); 618 if( async.nFile==0 ){ 619 async.ioError = SQLITE_OK; 620 } 621 async.nFile++; 622 pthread_mutex_unlock(&async.queueMutex); 623 } 624 625 /* 626 ** This is a utility function to allocate and populate a new AsyncWrite 627 ** structure and insert it (via addAsyncWrite() ) into the global list. 628 */ 629 static int addNewAsyncWrite( 630 AsyncFileData *pFileData, 631 int op, 632 i64 iOffset, 633 int nByte, 634 const char *zByte 635 ){ 636 AsyncWrite *p; 637 if( op!=ASYNC_CLOSE && async.ioError ){ 638 return async.ioError; 639 } 640 p = sqlite3_malloc(sizeof(AsyncWrite) + (zByte?nByte:0)); 641 if( !p ){ 642 /* The upper layer does not expect operations like OsWrite() to 643 ** return SQLITE_NOMEM. This is partly because under normal conditions 644 ** SQLite is required to do rollback without calling malloc(). So 645 ** if malloc() fails here, treat it as an I/O error. The above 646 ** layer knows how to handle that. 647 */ 648 return SQLITE_IOERR; 649 } 650 p->op = op; 651 p->iOffset = iOffset; 652 p->nByte = nByte; 653 p->pFileData = pFileData; 654 p->pNext = 0; 655 if( zByte ){ 656 p->zBuf = (char *)&p[1]; 657 memcpy(p->zBuf, zByte, nByte); 658 }else{ 659 p->zBuf = 0; 660 } 661 addAsyncWrite(p); 662 return SQLITE_OK; 663 } 664 665 /* 666 ** Close the file. This just adds an entry to the write-op list, the file is 667 ** not actually closed. 668 */ 669 static int asyncClose(sqlite3_file *pFile){ 670 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 671 672 /* Unlock the file, if it is locked */ 673 pthread_mutex_lock(&async.lockMutex); 674 p->lock.eLock = 0; 675 pthread_mutex_unlock(&async.lockMutex); 676 677 addAsyncWrite(&p->close); 678 return SQLITE_OK; 679 } 680 681 /* 682 ** Implementation of sqlite3OsWrite() for asynchronous files. Instead of 683 ** writing to the underlying file, this function adds an entry to the end of 684 ** the global AsyncWrite list. Either SQLITE_OK or SQLITE_NOMEM may be 685 ** returned. 686 */ 687 static int asyncWrite(sqlite3_file *pFile, const void *pBuf, int amt, i64 iOff){ 688 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 689 return addNewAsyncWrite(p, ASYNC_WRITE, iOff, amt, pBuf); 690 } 691 692 /* 693 ** Read data from the file. First we read from the filesystem, then adjust 694 ** the contents of the buffer based on ASYNC_WRITE operations in the 695 ** write-op queue. 696 ** 697 ** This method holds the mutex from start to finish. 698 */ 699 static int asyncRead(sqlite3_file *pFile, void *zOut, int iAmt, i64 iOffset){ 700 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 701 int rc = SQLITE_OK; 702 i64 filesize; 703 int nRead; 704 sqlite3_file *pBase = p->pBaseRead; 705 706 /* Grab the write queue mutex for the duration of the call */ 707 pthread_mutex_lock(&async.queueMutex); 708 709 /* If an I/O error has previously occurred in this virtual file 710 ** system, then all subsequent operations fail. 711 */ 712 if( async.ioError!=SQLITE_OK ){ 713 rc = async.ioError; 714 goto asyncread_out; 715 } 716 717 if( pBase->pMethods ){ 718 rc = sqlite3OsFileSize(pBase, &filesize); 719 if( rc!=SQLITE_OK ){ 720 goto asyncread_out; 721 } 722 nRead = MIN(filesize - iOffset, iAmt); 723 if( nRead>0 ){ 724 rc = sqlite3OsRead(pBase, zOut, nRead, iOffset); 725 ASYNC_TRACE(("READ %s %d bytes at %d\n", p->zName, nRead, iOffset)); 726 } 727 } 728 729 if( rc==SQLITE_OK ){ 730 AsyncWrite *pWrite; 731 char *zName = p->zName; 732 733 for(pWrite=async.pQueueFirst; pWrite; pWrite = pWrite->pNext){ 734 if( pWrite->op==ASYNC_WRITE && pWrite->pFileData->zName==zName ){ 735 int iBeginOut = (pWrite->iOffset-iOffset); 736 int iBeginIn = -iBeginOut; 737 int nCopy; 738 739 if( iBeginIn<0 ) iBeginIn = 0; 740 if( iBeginOut<0 ) iBeginOut = 0; 741 nCopy = MIN(pWrite->nByte-iBeginIn, iAmt-iBeginOut); 742 743 if( nCopy>0 ){ 744 memcpy(&((char *)zOut)[iBeginOut], &pWrite->zBuf[iBeginIn], nCopy); 745 ASYNC_TRACE(("OVERREAD %d bytes at %d\n", nCopy, iBeginOut+iOffset)); 746 } 747 } 748 } 749 } 750 751 asyncread_out: 752 pthread_mutex_unlock(&async.queueMutex); 753 return rc; 754 } 755 756 /* 757 ** Truncate the file to nByte bytes in length. This just adds an entry to 758 ** the write-op list, no IO actually takes place. 759 */ 760 static int asyncTruncate(sqlite3_file *pFile, i64 nByte){ 761 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 762 return addNewAsyncWrite(p, ASYNC_TRUNCATE, nByte, 0, 0); 763 } 764 765 /* 766 ** Sync the file. This just adds an entry to the write-op list, the 767 ** sync() is done later by sqlite3_async_flush(). 768 */ 769 static int asyncSync(sqlite3_file *pFile, int flags){ 770 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 771 return addNewAsyncWrite(p, ASYNC_SYNC, 0, flags, 0); 772 } 773 774 /* 775 ** Read the size of the file. First we read the size of the file system 776 ** entry, then adjust for any ASYNC_WRITE or ASYNC_TRUNCATE operations 777 ** currently in the write-op list. 778 ** 779 ** This method holds the mutex from start to finish. 780 */ 781 int asyncFileSize(sqlite3_file *pFile, i64 *piSize){ 782 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 783 int rc = SQLITE_OK; 784 i64 s = 0; 785 sqlite3_file *pBase; 786 787 pthread_mutex_lock(&async.queueMutex); 788 789 /* Read the filesystem size from the base file. If pBaseRead is NULL, this 790 ** means the file hasn't been opened yet. In this case all relevant data 791 ** must be in the write-op queue anyway, so we can omit reading from the 792 ** file-system. 793 */ 794 pBase = p->pBaseRead; 795 if( pBase->pMethods ){ 796 rc = sqlite3OsFileSize(pBase, &s); 797 } 798 799 if( rc==SQLITE_OK ){ 800 AsyncWrite *pWrite; 801 for(pWrite=async.pQueueFirst; pWrite; pWrite = pWrite->pNext){ 802 if( pWrite->op==ASYNC_DELETE && strcmp(p->zName, pWrite->zBuf)==0 ){ 803 s = 0; 804 }else if( pWrite->pFileData && pWrite->pFileData->zName==p->zName){ 805 switch( pWrite->op ){ 806 case ASYNC_WRITE: 807 s = MAX(pWrite->iOffset + (i64)(pWrite->nByte), s); 808 break; 809 case ASYNC_TRUNCATE: 810 s = MIN(s, pWrite->iOffset); 811 break; 812 } 813 } 814 } 815 *piSize = s; 816 } 817 pthread_mutex_unlock(&async.queueMutex); 818 return rc; 819 } 820 821 /* 822 ** Lock or unlock the actual file-system entry. 823 */ 824 static int getFileLock(AsyncLock *pLock){ 825 int rc = SQLITE_OK; 826 AsyncFileLock *pIter; 827 int eRequired = 0; 828 829 if( pLock->pFile ){ 830 for(pIter=pLock->pList; pIter; pIter=pIter->pNext){ 831 assert(pIter->eAsyncLock>=pIter->eLock); 832 if( pIter->eAsyncLock>eRequired ){ 833 eRequired = pIter->eAsyncLock; 834 assert(eRequired>=0 && eRequired<=SQLITE_LOCK_EXCLUSIVE); 835 } 836 } 837 838 if( eRequired>pLock->eLock ){ 839 rc = sqlite3OsLock(pLock->pFile, eRequired); 840 if( rc==SQLITE_OK ){ 841 pLock->eLock = eRequired; 842 } 843 } 844 else if( eRequired<pLock->eLock && eRequired<=SQLITE_LOCK_SHARED ){ 845 rc = sqlite3OsUnlock(pLock->pFile, eRequired); 846 if( rc==SQLITE_OK ){ 847 pLock->eLock = eRequired; 848 } 849 } 850 } 851 852 return rc; 853 } 854 855 /* 856 ** The following two methods - asyncLock() and asyncUnlock() - are used 857 ** to obtain and release locks on database files opened with the 858 ** asynchronous backend. 859 */ 860 static int asyncLock(sqlite3_file *pFile, int eLock){ 861 int rc = SQLITE_OK; 862 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 863 864 pthread_mutex_lock(&async.lockMutex); 865 if( p->lock.eLock<eLock ){ 866 AsyncLock *pLock; 867 AsyncFileLock *pIter; 868 pLock = (AsyncLock *)sqlite3HashFind(&async.aLock, p->zName, p->nName); 869 assert(pLock && pLock->pList); 870 for(pIter=pLock->pList; pIter; pIter=pIter->pNext){ 871 if( pIter!=&p->lock && ( 872 (eLock==SQLITE_LOCK_EXCLUSIVE && pIter->eLock>=SQLITE_LOCK_SHARED) || 873 (eLock==SQLITE_LOCK_PENDING && pIter->eLock>=SQLITE_LOCK_RESERVED) || 874 (eLock==SQLITE_LOCK_RESERVED && pIter->eLock>=SQLITE_LOCK_RESERVED) || 875 (eLock==SQLITE_LOCK_SHARED && pIter->eLock>=SQLITE_LOCK_PENDING) 876 )){ 877 rc = SQLITE_BUSY; 878 } 879 } 880 if( rc==SQLITE_OK ){ 881 p->lock.eLock = eLock; 882 p->lock.eAsyncLock = MAX(p->lock.eAsyncLock, eLock); 883 } 884 assert(p->lock.eAsyncLock>=p->lock.eLock); 885 if( rc==SQLITE_OK ){ 886 rc = getFileLock(pLock); 887 } 888 } 889 pthread_mutex_unlock(&async.lockMutex); 890 891 ASYNC_TRACE(("LOCK %d (%s) rc=%d\n", eLock, p->zName, rc)); 892 return rc; 893 } 894 static int asyncUnlock(sqlite3_file *pFile, int eLock){ 895 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 896 AsyncFileLock *pLock = &p->lock; 897 pthread_mutex_lock(&async.lockMutex); 898 pLock->eLock = MIN(pLock->eLock, eLock); 899 pthread_mutex_unlock(&async.lockMutex); 900 return addNewAsyncWrite(p, ASYNC_UNLOCK, 0, eLock, 0); 901 } 902 903 /* 904 ** This function is called when the pager layer first opens a database file 905 ** and is checking for a hot-journal. 906 */ 907 static int asyncCheckReservedLock(sqlite3_file *pFile){ 908 int ret = 0; 909 AsyncFileLock *pIter; 910 AsyncLock *pLock; 911 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 912 913 pthread_mutex_lock(&async.lockMutex); 914 pLock = (AsyncLock *)sqlite3HashFind(&async.aLock, p->zName, p->nName); 915 for(pIter=pLock->pList; pIter; pIter=pIter->pNext){ 916 if( pIter->eLock>=SQLITE_LOCK_RESERVED ){ 917 ret = 1; 918 } 919 } 920 pthread_mutex_unlock(&async.lockMutex); 921 922 ASYNC_TRACE(("CHECK-LOCK %d (%s)\n", ret, p->zName)); 923 return ret; 924 } 925 926 /* 927 ** This is a no-op, as the asynchronous backend does not support locking. 928 */ 929 static int asyncFileControl(sqlite3_file *id, int op, void *pArg){ 930 switch( op ){ 931 case SQLITE_FCNTL_LOCKSTATE: { 932 pthread_mutex_lock(&async.lockMutex); 933 *(int*)pArg = ((AsyncFile*)id)->pData->lock.eLock; 934 pthread_mutex_unlock(&async.lockMutex); 935 return SQLITE_OK; 936 } 937 } 938 return SQLITE_ERROR; 939 } 940 941 /* 942 ** Return the device characteristics and sector-size of the device. It 943 ** is not tricky to implement these correctly, as this backend might 944 ** not have an open file handle at this point. 945 */ 946 static int asyncSectorSize(sqlite3_file *pFile){ 947 return 512; 948 } 949 static int asyncDeviceCharacteristics(sqlite3_file *pFile){ 950 return 0; 951 } 952 953 static int unlinkAsyncFile(AsyncFileData *pData){ 954 AsyncLock *pLock; 955 AsyncFileLock **ppIter; 956 int rc = SQLITE_OK; 957 958 pLock = sqlite3HashFind(&async.aLock, pData->zName, pData->nName); 959 for(ppIter=&pLock->pList; *ppIter; ppIter=&((*ppIter)->pNext)){ 960 if( (*ppIter)==&pData->lock ){ 961 *ppIter = pData->lock.pNext; 962 break; 963 } 964 } 965 if( !pLock->pList ){ 966 if( pLock->pFile ){ 967 sqlite3OsClose(pLock->pFile); 968 } 969 sqlite3_free(pLock); 970 sqlite3HashInsert(&async.aLock, pData->zName, pData->nName, 0); 971 if( !sqliteHashFirst(&async.aLock) ){ 972 sqlite3HashClear(&async.aLock); 973 } 974 }else{ 975 rc = getFileLock(pLock); 976 } 977 978 return rc; 979 } 980 981 /* 982 ** Open a file. 983 */ 984 static int asyncOpen( 985 sqlite3_vfs *pAsyncVfs, 986 const char *zName, 987 sqlite3_file *pFile, 988 int flags, 989 int *pOutFlags 990 ){ 991 static sqlite3_io_methods async_methods = { 992 1, /* iVersion */ 993 asyncClose, /* xClose */ 994 asyncRead, /* xRead */ 995 asyncWrite, /* xWrite */ 996 asyncTruncate, /* xTruncate */ 997 asyncSync, /* xSync */ 998 asyncFileSize, /* xFileSize */ 999 asyncLock, /* xLock */ 1000 asyncUnlock, /* xUnlock */ 1001 asyncCheckReservedLock, /* xCheckReservedLock */ 1002 asyncFileControl, /* xFileControl */ 1003 asyncSectorSize, /* xSectorSize */ 1004 asyncDeviceCharacteristics /* xDeviceCharacteristics */ 1005 }; 1006 1007 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1008 AsyncFile *p = (AsyncFile *)pFile; 1009 int nName = strlen(zName)+1; 1010 int rc = SQLITE_OK; 1011 int nByte; 1012 AsyncFileData *pData; 1013 AsyncLock *pLock = 0; 1014 char *z; 1015 int isExclusive = (flags&SQLITE_OPEN_EXCLUSIVE); 1016 1017 nByte = ( 1018 sizeof(AsyncFileData) + /* AsyncFileData structure */ 1019 2 * pVfs->szOsFile + /* AsyncFileData.pBaseRead and pBaseWrite */ 1020 nName /* AsyncFileData.zName */ 1021 ); 1022 z = sqlite3_malloc(nByte); 1023 if( !z ){ 1024 return SQLITE_NOMEM; 1025 } 1026 memset(z, 0, nByte); 1027 pData = (AsyncFileData*)z; 1028 z += sizeof(pData[0]); 1029 pData->pBaseRead = (sqlite3_file*)z; 1030 z += pVfs->szOsFile; 1031 pData->pBaseWrite = (sqlite3_file*)z; 1032 z += pVfs->szOsFile; 1033 pData->zName = z; 1034 pData->nName = nName; 1035 pData->close.pFileData = pData; 1036 pData->close.op = ASYNC_CLOSE; 1037 memcpy(pData->zName, zName, nName); 1038 1039 if( !isExclusive ){ 1040 rc = sqlite3OsOpen(pVfs, zName, pData->pBaseRead, flags, pOutFlags); 1041 if( rc==SQLITE_OK && ((*pOutFlags)&SQLITE_OPEN_READWRITE) ){ 1042 rc = sqlite3OsOpen(pVfs, zName, pData->pBaseWrite, flags, 0); 1043 } 1044 } 1045 1046 pthread_mutex_lock(&async.lockMutex); 1047 1048 if( rc==SQLITE_OK ){ 1049 pLock = sqlite3HashFind(&async.aLock, pData->zName, pData->nName); 1050 if( !pLock ){ 1051 pLock = sqlite3MallocZero(pVfs->szOsFile + sizeof(AsyncLock)); 1052 if( pLock ){ 1053 AsyncLock *pDelete; 1054 #ifdef ENABLE_FILE_LOCKING 1055 if( flags&SQLITE_OPEN_MAIN_DB ){ 1056 pLock->pFile = (sqlite3_file *)&pLock[1]; 1057 rc = sqlite3OsOpen(pVfs, zName, pLock->pFile, flags, 0); 1058 if( rc!=SQLITE_OK ){ 1059 sqlite3_free(pLock); 1060 pLock = 0; 1061 } 1062 } 1063 #endif 1064 pDelete = sqlite3HashInsert( 1065 &async.aLock, pData->zName, pData->nName, (void *)pLock 1066 ); 1067 if( pDelete ){ 1068 rc = SQLITE_NOMEM; 1069 sqlite3_free(pLock); 1070 } 1071 }else{ 1072 rc = SQLITE_NOMEM; 1073 } 1074 } 1075 } 1076 1077 if( rc==SQLITE_OK ){ 1078 HashElem *pElem; 1079 p->pMethod = &async_methods; 1080 p->pData = pData; 1081 1082 /* Link AsyncFileData.lock into the linked list of 1083 ** AsyncFileLock structures for this file. 1084 */ 1085 pData->lock.pNext = pLock->pList; 1086 pLock->pList = &pData->lock; 1087 1088 pElem = sqlite3HashFindElem(&async.aLock, pData->zName, pData->nName); 1089 pData->zName = (char *)sqliteHashKey(pElem); 1090 }else{ 1091 sqlite3OsClose(pData->pBaseRead); 1092 sqlite3OsClose(pData->pBaseWrite); 1093 sqlite3_free(pData); 1094 } 1095 1096 pthread_mutex_unlock(&async.lockMutex); 1097 1098 if( rc==SQLITE_OK ){ 1099 incrOpenFileCount(); 1100 } 1101 1102 if( rc==SQLITE_OK && isExclusive ){ 1103 rc = addNewAsyncWrite(pData, ASYNC_OPENEXCLUSIVE, (i64)flags, 0, 0); 1104 if( rc==SQLITE_OK ){ 1105 if( pOutFlags ) *pOutFlags = flags; 1106 }else{ 1107 pthread_mutex_lock(&async.lockMutex); 1108 unlinkAsyncFile(pData); 1109 pthread_mutex_unlock(&async.lockMutex); 1110 sqlite3_free(pData); 1111 } 1112 } 1113 return rc; 1114 } 1115 1116 /* 1117 ** Implementation of sqlite3OsDelete. Add an entry to the end of the 1118 ** write-op queue to perform the delete. 1119 */ 1120 static int asyncDelete(sqlite3_vfs *pAsyncVfs, const char *z, int syncDir){ 1121 return addNewAsyncWrite(0, ASYNC_DELETE, syncDir, strlen(z)+1, z); 1122 } 1123 1124 /* 1125 ** Implementation of sqlite3OsAccess. This method holds the mutex from 1126 ** start to finish. 1127 */ 1128 static int asyncAccess(sqlite3_vfs *pAsyncVfs, const char *zName, int flags){ 1129 int ret; 1130 AsyncWrite *p; 1131 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1132 1133 assert(flags==SQLITE_ACCESS_READWRITE 1134 || flags==SQLITE_ACCESS_READ 1135 || flags==SQLITE_ACCESS_EXISTS 1136 ); 1137 1138 pthread_mutex_lock(&async.queueMutex); 1139 ret = sqlite3OsAccess(pVfs, zName, flags); 1140 if( flags==SQLITE_ACCESS_EXISTS ){ 1141 for(p=async.pQueueFirst; p; p = p->pNext){ 1142 if( p->op==ASYNC_DELETE && 0==strcmp(p->zBuf, zName) ){ 1143 ret = 0; 1144 }else if( p->op==ASYNC_OPENEXCLUSIVE 1145 && 0==strcmp(p->pFileData->zName, zName) 1146 ){ 1147 ret = 1; 1148 } 1149 } 1150 } 1151 ASYNC_TRACE(("ACCESS(%s): %s = %d\n", 1152 flags==SQLITE_ACCESS_READWRITE?"read-write": 1153 flags==SQLITE_ACCESS_READ?"read":"exists" 1154 , zName, ret) 1155 ); 1156 pthread_mutex_unlock(&async.queueMutex); 1157 return ret; 1158 } 1159 1160 static int asyncGetTempname(sqlite3_vfs *pAsyncVfs, int nBufOut, char *zBufOut){ 1161 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1162 return pVfs->xGetTempname(pVfs, nBufOut, zBufOut); 1163 } 1164 1165 /* 1166 ** Fill in zPathOut with the full path to the file identified by zPath. 1167 */ 1168 static int asyncFullPathname( 1169 sqlite3_vfs *pAsyncVfs, 1170 const char *zPath, 1171 int nPathOut, 1172 char *zPathOut 1173 ){ 1174 int rc; 1175 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1176 rc = sqlite3OsFullPathname(pVfs, zPath, nPathOut, zPathOut); 1177 1178 /* Because of the way intra-process file locking works, this backend 1179 ** needs to return a canonical path. The following block assumes the 1180 ** file-system uses unix style paths. 1181 */ 1182 if( rc==SQLITE_OK ){ 1183 int iIn; 1184 int iOut = 0; 1185 int nPathOut = strlen(zPathOut); 1186 1187 for(iIn=0; iIn<nPathOut; iIn++){ 1188 1189 /* Replace any occurences of "//" with "/" */ 1190 if( iIn<=(nPathOut-2) && zPathOut[iIn]=='/' && zPathOut[iIn+1]=='/' 1191 ){ 1192 continue; 1193 } 1194 1195 /* Replace any occurences of "/./" with "/" */ 1196 if( iIn<=(nPathOut-3) 1197 && zPathOut[iIn]=='/' && zPathOut[iIn+1]=='.' && zPathOut[iIn+2]=='/' 1198 ){ 1199 iIn++; 1200 continue; 1201 } 1202 1203 /* Replace any occurences of "<path-component>/../" with "" */ 1204 if( iOut>0 && iIn<=(nPathOut-4) 1205 && zPathOut[iIn]=='/' && zPathOut[iIn+1]=='.' 1206 && zPathOut[iIn+2]=='.' && zPathOut[iIn+3]=='/' 1207 ){ 1208 iIn += 3; 1209 iOut--; 1210 for( ; iOut>0 && zPathOut[iOut-1]!='/'; iOut--); 1211 continue; 1212 } 1213 1214 zPathOut[iOut++] = zPathOut[iIn]; 1215 } 1216 zPathOut[iOut] = '\0'; 1217 } 1218 1219 return rc; 1220 } 1221 static void *asyncDlOpen(sqlite3_vfs *pAsyncVfs, const char *zPath){ 1222 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1223 return pVfs->xDlOpen(pVfs, zPath); 1224 } 1225 static void asyncDlError(sqlite3_vfs *pAsyncVfs, int nByte, char *zErrMsg){ 1226 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1227 pVfs->xDlError(pVfs, nByte, zErrMsg); 1228 } 1229 static void *asyncDlSym( 1230 sqlite3_vfs *pAsyncVfs, 1231 void *pHandle, 1232 const char *zSymbol 1233 ){ 1234 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1235 return pVfs->xDlSym(pVfs, pHandle, zSymbol); 1236 } 1237 static void asyncDlClose(sqlite3_vfs *pAsyncVfs, void *pHandle){ 1238 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1239 pVfs->xDlClose(pVfs, pHandle); 1240 } 1241 static int asyncRandomness(sqlite3_vfs *pAsyncVfs, int nByte, char *zBufOut){ 1242 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1243 return pVfs->xRandomness(pVfs, nByte, zBufOut); 1244 } 1245 static int asyncSleep(sqlite3_vfs *pAsyncVfs, int nMicro){ 1246 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1247 return pVfs->xSleep(pVfs, nMicro); 1248 } 1249 static int asyncCurrentTime(sqlite3_vfs *pAsyncVfs, double *pTimeOut){ 1250 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1251 return pVfs->xCurrentTime(pVfs, pTimeOut); 1252 } 1253 1254 static sqlite3_vfs async_vfs = { 1255 1, /* iVersion */ 1256 sizeof(AsyncFile), /* szOsFile */ 1257 0, /* mxPathname */ 1258 0, /* pNext */ 1259 "async", /* zName */ 1260 0, /* pAppData */ 1261 asyncOpen, /* xOpen */ 1262 asyncDelete, /* xDelete */ 1263 asyncAccess, /* xAccess */ 1264 asyncGetTempname, /* xGetTempName */ 1265 asyncFullPathname, /* xFullPathname */ 1266 asyncDlOpen, /* xDlOpen */ 1267 asyncDlError, /* xDlError */ 1268 asyncDlSym, /* xDlSym */ 1269 asyncDlClose, /* xDlClose */ 1270 asyncRandomness, /* xDlError */ 1271 asyncSleep, /* xDlSym */ 1272 asyncCurrentTime /* xDlClose */ 1273 }; 1274 1275 /* 1276 ** Call this routine to enable or disable the 1277 ** asynchronous IO features implemented in this file. 1278 ** 1279 ** This routine is not even remotely threadsafe. Do not call 1280 ** this routine while any SQLite database connections are open. 1281 */ 1282 static void asyncEnable(int enable){ 1283 if( enable ){ 1284 if( !async_vfs.pAppData ){ 1285 static int hashTableInit = 0; 1286 async_vfs.pAppData = (void *)sqlite3_vfs_find(0); 1287 async_vfs.mxPathname = ((sqlite3_vfs *)async_vfs.pAppData)->mxPathname; 1288 sqlite3_vfs_register(&async_vfs, 1); 1289 if( !hashTableInit ){ 1290 sqlite3HashInit(&async.aLock, SQLITE_HASH_BINARY, 1); 1291 hashTableInit = 1; 1292 } 1293 } 1294 }else{ 1295 if( async_vfs.pAppData ){ 1296 sqlite3_vfs_unregister(&async_vfs); 1297 async_vfs.pAppData = 0; 1298 } 1299 } 1300 } 1301 1302 /* 1303 ** This procedure runs in a separate thread, reading messages off of the 1304 ** write queue and processing them one by one. 1305 ** 1306 ** If async.writerHaltNow is true, then this procedure exits 1307 ** after processing a single message. 1308 ** 1309 ** If async.writerHaltWhenIdle is true, then this procedure exits when 1310 ** the write queue is empty. 1311 ** 1312 ** If both of the above variables are false, this procedure runs 1313 ** indefinately, waiting for operations to be added to the write queue 1314 ** and processing them in the order in which they arrive. 1315 ** 1316 ** An artifical delay of async.ioDelay milliseconds is inserted before 1317 ** each write operation in order to simulate the effect of a slow disk. 1318 ** 1319 ** Only one instance of this procedure may be running at a time. 1320 */ 1321 static void *asyncWriterThread(void *pIsStarted){ 1322 sqlite3_vfs *pVfs = (sqlite3_vfs *)(async_vfs.pAppData); 1323 AsyncWrite *p = 0; 1324 int rc = SQLITE_OK; 1325 int holdingMutex = 0; 1326 1327 if( pthread_mutex_trylock(&async.writerMutex) ){ 1328 return 0; 1329 } 1330 (*(int *)pIsStarted) = 1; 1331 while( async.writerHaltNow==0 ){ 1332 int doNotFree = 0; 1333 sqlite3_file *pBase = 0; 1334 1335 if( !holdingMutex ){ 1336 pthread_mutex_lock(&async.queueMutex); 1337 } 1338 while( (p = async.pQueueFirst)==0 ){ 1339 pthread_cond_broadcast(&async.emptySignal); 1340 if( async.writerHaltWhenIdle ){ 1341 pthread_mutex_unlock(&async.queueMutex); 1342 break; 1343 }else{ 1344 ASYNC_TRACE(("IDLE\n")); 1345 pthread_cond_wait(&async.queueSignal, &async.queueMutex); 1346 ASYNC_TRACE(("WAKEUP\n")); 1347 } 1348 } 1349 if( p==0 ) break; 1350 holdingMutex = 1; 1351 1352 /* Right now this thread is holding the mutex on the write-op queue. 1353 ** Variable 'p' points to the first entry in the write-op queue. In 1354 ** the general case, we hold on to the mutex for the entire body of 1355 ** the loop. 1356 ** 1357 ** However in the cases enumerated below, we relinquish the mutex, 1358 ** perform the IO, and then re-request the mutex before removing 'p' from 1359 ** the head of the write-op queue. The idea is to increase concurrency with 1360 ** sqlite threads. 1361 ** 1362 ** * An ASYNC_CLOSE operation. 1363 ** * An ASYNC_OPENEXCLUSIVE operation. For this one, we relinquish 1364 ** the mutex, call the underlying xOpenExclusive() function, then 1365 ** re-aquire the mutex before seting the AsyncFile.pBaseRead 1366 ** variable. 1367 ** * ASYNC_SYNC and ASYNC_WRITE operations, if 1368 ** SQLITE_ASYNC_TWO_FILEHANDLES was set at compile time and two 1369 ** file-handles are open for the particular file being "synced". 1370 */ 1371 if( async.ioError!=SQLITE_OK && p->op!=ASYNC_CLOSE ){ 1372 p->op = ASYNC_NOOP; 1373 } 1374 if( p->pFileData ){ 1375 pBase = p->pFileData->pBaseWrite; 1376 if( 1377 p->op==ASYNC_CLOSE || 1378 p->op==ASYNC_OPENEXCLUSIVE || 1379 (pBase->pMethods && (p->op==ASYNC_SYNC || p->op==ASYNC_WRITE) ) 1380 ){ 1381 pthread_mutex_unlock(&async.queueMutex); 1382 holdingMutex = 0; 1383 } 1384 if( !pBase->pMethods ){ 1385 pBase = p->pFileData->pBaseRead; 1386 } 1387 } 1388 1389 switch( p->op ){ 1390 case ASYNC_NOOP: 1391 break; 1392 1393 case ASYNC_WRITE: 1394 assert( pBase ); 1395 ASYNC_TRACE(("WRITE %s %d bytes at %d\n", 1396 p->pFileData->zName, p->nByte, p->iOffset)); 1397 rc = sqlite3OsWrite(pBase, (void *)(p->zBuf), p->nByte, p->iOffset); 1398 break; 1399 1400 case ASYNC_SYNC: 1401 assert( pBase ); 1402 ASYNC_TRACE(("SYNC %s\n", p->pFileData->zName)); 1403 rc = sqlite3OsSync(pBase, p->nByte); 1404 break; 1405 1406 case ASYNC_TRUNCATE: 1407 assert( pBase ); 1408 ASYNC_TRACE(("TRUNCATE %s to %d bytes\n", 1409 p->pFileData->zName, p->iOffset)); 1410 rc = sqlite3OsTruncate(pBase, p->iOffset); 1411 break; 1412 1413 case ASYNC_CLOSE: { 1414 AsyncFileData *pData = p->pFileData; 1415 ASYNC_TRACE(("CLOSE %s\n", p->pFileData->zName)); 1416 sqlite3OsClose(pData->pBaseWrite); 1417 sqlite3OsClose(pData->pBaseRead); 1418 1419 /* Unlink AsyncFileData.lock from the linked list of AsyncFileLock 1420 ** structures for this file. Obtain the async.lockMutex mutex 1421 ** before doing so. 1422 */ 1423 pthread_mutex_lock(&async.lockMutex); 1424 rc = unlinkAsyncFile(pData); 1425 pthread_mutex_unlock(&async.lockMutex); 1426 1427 async.pQueueFirst = p->pNext; 1428 sqlite3_free(pData); 1429 doNotFree = 1; 1430 break; 1431 } 1432 1433 case ASYNC_UNLOCK: { 1434 AsyncLock *pLock; 1435 AsyncFileData *pData = p->pFileData; 1436 int eLock = p->nByte; 1437 pthread_mutex_lock(&async.lockMutex); 1438 pData->lock.eAsyncLock = MIN( 1439 pData->lock.eAsyncLock, MAX(pData->lock.eLock, eLock) 1440 ); 1441 assert(pData->lock.eAsyncLock>=pData->lock.eLock); 1442 pLock = sqlite3HashFind(&async.aLock, pData->zName, pData->nName); 1443 rc = getFileLock(pLock); 1444 pthread_mutex_unlock(&async.lockMutex); 1445 break; 1446 } 1447 1448 case ASYNC_DELETE: 1449 ASYNC_TRACE(("DELETE %s\n", p->zBuf)); 1450 rc = sqlite3OsDelete(pVfs, p->zBuf, (int)p->iOffset); 1451 break; 1452 1453 case ASYNC_OPENEXCLUSIVE: { 1454 int flags = (int)p->iOffset; 1455 AsyncFileData *pData = p->pFileData; 1456 ASYNC_TRACE(("OPEN %s flags=%d\n", p->zBuf, (int)p->iOffset)); 1457 assert(pData->pBaseRead->pMethods==0 && pData->pBaseWrite->pMethods==0); 1458 rc = sqlite3OsOpen(pVfs, pData->zName, pData->pBaseRead, flags, 0); 1459 assert( holdingMutex==0 ); 1460 pthread_mutex_lock(&async.queueMutex); 1461 holdingMutex = 1; 1462 break; 1463 } 1464 1465 default: assert(!"Illegal value for AsyncWrite.op"); 1466 } 1467 1468 /* If we didn't hang on to the mutex during the IO op, obtain it now 1469 ** so that the AsyncWrite structure can be safely removed from the 1470 ** global write-op queue. 1471 */ 1472 if( !holdingMutex ){ 1473 pthread_mutex_lock(&async.queueMutex); 1474 holdingMutex = 1; 1475 } 1476 /* ASYNC_TRACE(("UNLINK %p\n", p)); */ 1477 if( p==async.pQueueLast ){ 1478 async.pQueueLast = 0; 1479 } 1480 if( !doNotFree ){ 1481 async.pQueueFirst = p->pNext; 1482 sqlite3_free(p); 1483 } 1484 assert( holdingMutex ); 1485 1486 /* An IO error has occured. We cannot report the error back to the 1487 ** connection that requested the I/O since the error happened 1488 ** asynchronously. The connection has already moved on. There 1489 ** really is nobody to report the error to. 1490 ** 1491 ** The file for which the error occured may have been a database or 1492 ** journal file. Regardless, none of the currently queued operations 1493 ** associated with the same database should now be performed. Nor should 1494 ** any subsequently requested IO on either a database or journal file 1495 ** handle for the same database be accepted until the main database 1496 ** file handle has been closed and reopened. 1497 ** 1498 ** Furthermore, no further IO should be queued or performed on any file 1499 ** handle associated with a database that may have been part of a 1500 ** multi-file transaction that included the database associated with 1501 ** the IO error (i.e. a database ATTACHed to the same handle at some 1502 ** point in time). 1503 */ 1504 if( rc!=SQLITE_OK ){ 1505 async.ioError = rc; 1506 } 1507 1508 if( async.ioError && !async.pQueueFirst ){ 1509 pthread_mutex_lock(&async.lockMutex); 1510 if( 0==sqliteHashFirst(&async.aLock) ){ 1511 async.ioError = SQLITE_OK; 1512 } 1513 pthread_mutex_unlock(&async.lockMutex); 1514 } 1515 1516 /* Drop the queue mutex before continuing to the next write operation 1517 ** in order to give other threads a chance to work with the write queue. 1518 */ 1519 if( !async.pQueueFirst || !async.ioError ){ 1520 pthread_mutex_unlock(&async.queueMutex); 1521 holdingMutex = 0; 1522 if( async.ioDelay>0 ){ 1523 sqlite3OsSleep(pVfs, async.ioDelay); 1524 }else{ 1525 sched_yield(); 1526 } 1527 } 1528 } 1529 1530 pthread_mutex_unlock(&async.writerMutex); 1531 return 0; 1532 } 1533 1534 /************************************************************************** 1535 ** The remaining code defines a Tcl interface for testing the asynchronous 1536 ** IO implementation in this file. 1537 ** 1538 ** To adapt the code to a non-TCL environment, delete or comment out 1539 ** the code that follows. 1540 */ 1541 1542 /* 1543 ** sqlite3async_enable ?YES/NO? 1544 ** 1545 ** Enable or disable the asynchronous I/O backend. This command is 1546 ** not thread-safe. Do not call it while any database connections 1547 ** are open. 1548 */ 1549 static int testAsyncEnable( 1550 void * clientData, 1551 Tcl_Interp *interp, 1552 int objc, 1553 Tcl_Obj *CONST objv[] 1554 ){ 1555 if( objc!=1 && objc!=2 ){ 1556 Tcl_WrongNumArgs(interp, 1, objv, "?YES/NO?"); 1557 return TCL_ERROR; 1558 } 1559 if( objc==1 ){ 1560 Tcl_SetObjResult(interp, Tcl_NewBooleanObj(async_vfs.pAppData!=0)); 1561 }else{ 1562 int en; 1563 if( Tcl_GetBooleanFromObj(interp, objv[1], &en) ) return TCL_ERROR; 1564 asyncEnable(en); 1565 } 1566 return TCL_OK; 1567 } 1568 1569 /* 1570 ** sqlite3async_halt "now"|"idle"|"never" 1571 ** 1572 ** Set the conditions at which the writer thread will halt. 1573 */ 1574 static int testAsyncHalt( 1575 void * clientData, 1576 Tcl_Interp *interp, 1577 int objc, 1578 Tcl_Obj *CONST objv[] 1579 ){ 1580 const char *zCond; 1581 if( objc!=2 ){ 1582 Tcl_WrongNumArgs(interp, 1, objv, "\"now\"|\"idle\"|\"never\""); 1583 return TCL_ERROR; 1584 } 1585 zCond = Tcl_GetString(objv[1]); 1586 if( strcmp(zCond, "now")==0 ){ 1587 async.writerHaltNow = 1; 1588 pthread_cond_broadcast(&async.queueSignal); 1589 }else if( strcmp(zCond, "idle")==0 ){ 1590 async.writerHaltWhenIdle = 1; 1591 async.writerHaltNow = 0; 1592 pthread_cond_broadcast(&async.queueSignal); 1593 }else if( strcmp(zCond, "never")==0 ){ 1594 async.writerHaltWhenIdle = 0; 1595 async.writerHaltNow = 0; 1596 }else{ 1597 Tcl_AppendResult(interp, 1598 "should be one of: \"now\", \"idle\", or \"never\"", (char*)0); 1599 return TCL_ERROR; 1600 } 1601 return TCL_OK; 1602 } 1603 1604 /* 1605 ** sqlite3async_delay ?MS? 1606 ** 1607 ** Query or set the number of milliseconds of delay in the writer 1608 ** thread after each write operation. The default is 0. By increasing 1609 ** the memory delay we can simulate the effect of slow disk I/O. 1610 */ 1611 static int testAsyncDelay( 1612 void * clientData, 1613 Tcl_Interp *interp, 1614 int objc, 1615 Tcl_Obj *CONST objv[] 1616 ){ 1617 if( objc!=1 && objc!=2 ){ 1618 Tcl_WrongNumArgs(interp, 1, objv, "?MS?"); 1619 return TCL_ERROR; 1620 } 1621 if( objc==1 ){ 1622 Tcl_SetObjResult(interp, Tcl_NewIntObj(async.ioDelay)); 1623 }else{ 1624 int ioDelay; 1625 if( Tcl_GetIntFromObj(interp, objv[1], &ioDelay) ) return TCL_ERROR; 1626 async.ioDelay = ioDelay; 1627 } 1628 return TCL_OK; 1629 } 1630 1631 /* 1632 ** sqlite3async_start 1633 ** 1634 ** Start a new writer thread. 1635 */ 1636 static int testAsyncStart( 1637 void * clientData, 1638 Tcl_Interp *interp, 1639 int objc, 1640 Tcl_Obj *CONST objv[] 1641 ){ 1642 pthread_t x; 1643 int rc; 1644 volatile int isStarted = 0; 1645 rc = pthread_create(&x, 0, asyncWriterThread, (void *)&isStarted); 1646 if( rc ){ 1647 Tcl_AppendResult(interp, "failed to create the thread", 0); 1648 return TCL_ERROR; 1649 } 1650 pthread_detach(x); 1651 while( isStarted==0 ){ 1652 sched_yield(); 1653 } 1654 return TCL_OK; 1655 } 1656 1657 /* 1658 ** sqlite3async_wait 1659 ** 1660 ** Wait for the current writer thread to terminate. 1661 ** 1662 ** If the current writer thread is set to run forever then this 1663 ** command would block forever. To prevent that, an error is returned. 1664 */ 1665 static int testAsyncWait( 1666 void * clientData, 1667 Tcl_Interp *interp, 1668 int objc, 1669 Tcl_Obj *CONST objv[] 1670 ){ 1671 int cnt = 10; 1672 if( async.writerHaltNow==0 && async.writerHaltWhenIdle==0 ){ 1673 Tcl_AppendResult(interp, "would block forever", (char*)0); 1674 return TCL_ERROR; 1675 } 1676 1677 while( cnt-- && !pthread_mutex_trylock(&async.writerMutex) ){ 1678 pthread_mutex_unlock(&async.writerMutex); 1679 sched_yield(); 1680 } 1681 if( cnt>=0 ){ 1682 ASYNC_TRACE(("WAIT\n")); 1683 pthread_mutex_lock(&async.queueMutex); 1684 pthread_cond_broadcast(&async.queueSignal); 1685 pthread_mutex_unlock(&async.queueMutex); 1686 pthread_mutex_lock(&async.writerMutex); 1687 pthread_mutex_unlock(&async.writerMutex); 1688 }else{ 1689 ASYNC_TRACE(("NO-WAIT\n")); 1690 } 1691 return TCL_OK; 1692 } 1693 1694 1695 #endif /* OS_UNIX and SQLITE_THREADSAFE */ 1696 1697 /* 1698 ** This routine registers the custom TCL commands defined in this 1699 ** module. This should be the only procedure visible from outside 1700 ** of this module. 1701 */ 1702 int Sqlitetestasync_Init(Tcl_Interp *interp){ 1703 #if OS_UNIX && SQLITE_THREADSAFE 1704 Tcl_CreateObjCommand(interp,"sqlite3async_enable",testAsyncEnable,0,0); 1705 Tcl_CreateObjCommand(interp,"sqlite3async_halt",testAsyncHalt,0,0); 1706 Tcl_CreateObjCommand(interp,"sqlite3async_delay",testAsyncDelay,0,0); 1707 Tcl_CreateObjCommand(interp,"sqlite3async_start",testAsyncStart,0,0); 1708 Tcl_CreateObjCommand(interp,"sqlite3async_wait",testAsyncWait,0,0); 1709 Tcl_LinkVar(interp, "sqlite3async_trace", 1710 (char*)&sqlite3async_trace, TCL_LINK_INT); 1711 #endif /* OS_UNIX and SQLITE_THREADSAFE */ 1712 return TCL_OK; 1713 } 1714