1 /* 2 ** 2005 December 14 3 ** 4 ** The author disclaims copyright to this source code. In place of 5 ** a legal notice, here is a blessing: 6 ** 7 ** May you do good and not evil. 8 ** May you find forgiveness for yourself and forgive others. 9 ** May you share freely, never taking more than you give. 10 ** 11 ************************************************************************* 12 ** 13 ** This file contains an example implementation of an asynchronous IO 14 ** backend for SQLite. 15 ** 16 ** WHAT IS ASYNCHRONOUS I/O? 17 ** 18 ** With asynchronous I/O, write requests are handled by a separate thread 19 ** running in the background. This means that the thread that initiates 20 ** a database write does not have to wait for (sometimes slow) disk I/O 21 ** to occur. The write seems to happen very quickly, though in reality 22 ** it is happening at its usual slow pace in the background. 23 ** 24 ** Asynchronous I/O appears to give better responsiveness, but at a price. 25 ** You lose the Durable property. With the default I/O backend of SQLite, 26 ** once a write completes, you know that the information you wrote is 27 ** safely on disk. With the asynchronous I/O, this is not the case. If 28 ** your program crashes or if a power lose occurs after the database 29 ** write but before the asynchronous write thread has completed, then the 30 ** database change might never make it to disk and the next user of the 31 ** database might not see your change. 32 ** 33 ** You lose Durability with asynchronous I/O, but you still retain the 34 ** other parts of ACID: Atomic, Consistent, and Isolated. Many 35 ** appliations get along fine without the Durablity. 36 ** 37 ** HOW IT WORKS 38 ** 39 ** Asynchronous I/O works by creating a special SQLite "vfs" structure 40 ** and registering it with sqlite3_vfs_register(). When files opened via 41 ** this vfs are written to (using sqlite3OsWrite()), the data is not 42 ** written directly to disk, but is placed in the "write-queue" to be 43 ** handled by the background thread. 44 ** 45 ** When files opened with the asynchronous vfs are read from 46 ** (using sqlite3OsRead()), the data is read from the file on 47 ** disk and the write-queue, so that from the point of view of 48 ** the vfs reader the OsWrite() appears to have already completed. 49 ** 50 ** The special vfs is registered (and unregistered) by calls to 51 ** function asyncEnable() (see below). 52 ** 53 ** LIMITATIONS 54 ** 55 ** This demonstration code is deliberately kept simple in order to keep 56 ** the main ideas clear and easy to understand. Real applications that 57 ** want to do asynchronous I/O might want to add additional capabilities. 58 ** For example, in this demonstration if writes are happening at a steady 59 ** stream that exceeds the I/O capability of the background writer thread, 60 ** the queue of pending write operations will grow without bound until we 61 ** run out of memory. Users of this technique may want to keep track of 62 ** the quantity of pending writes and stop accepting new write requests 63 ** when the buffer gets to be too big. 64 ** 65 ** LOCKING + CONCURRENCY 66 ** 67 ** Multiple connections from within a single process that use this 68 ** implementation of asynchronous IO may access a single database 69 ** file concurrently. From the point of view of the user, if all 70 ** connections are from within a single process, there is no difference 71 ** between the concurrency offered by "normal" SQLite and SQLite 72 ** using the asynchronous backend. 73 ** 74 ** If connections from within multiple database files may access the 75 ** database file, the ENABLE_FILE_LOCKING symbol (see below) must be 76 ** defined. If it is not defined, then no locks are established on 77 ** the database file. In this case, if multiple processes access 78 ** the database file, corruption will quickly result. 79 ** 80 ** If ENABLE_FILE_LOCKING is defined (the default), then connections 81 ** from within multiple processes may access a single database file 82 ** without risking corruption. However concurrency is reduced as 83 ** follows: 84 ** 85 ** * When a connection using asynchronous IO begins a database 86 ** transaction, the database is locked immediately. However the 87 ** lock is not released until after all relevant operations 88 ** in the write-queue have been flushed to disk. This means 89 ** (for example) that the database may remain locked for some 90 ** time after a "COMMIT" or "ROLLBACK" is issued. 91 ** 92 ** * If an application using asynchronous IO executes transactions 93 ** in quick succession, other database users may be effectively 94 ** locked out of the database. This is because when a BEGIN 95 ** is executed, a database lock is established immediately. But 96 ** when the corresponding COMMIT or ROLLBACK occurs, the lock 97 ** is not released until the relevant part of the write-queue 98 ** has been flushed through. As a result, if a COMMIT is followed 99 ** by a BEGIN before the write-queue is flushed through, the database 100 ** is never unlocked,preventing other processes from accessing 101 ** the database. 102 ** 103 ** Defining ENABLE_FILE_LOCKING when using an NFS or other remote 104 ** file-system may slow things down, as synchronous round-trips to the 105 ** server may be required to establish database file locks. 106 */ 107 #define ENABLE_FILE_LOCKING 108 109 #ifndef SQLITE_AMALGAMATION 110 # include "sqliteInt.h" 111 #endif 112 #include <tcl.h> 113 114 /* 115 ** This test uses pthreads and hence only works on unix and with 116 ** a threadsafe build of SQLite. 117 */ 118 #if OS_UNIX && SQLITE_THREADSAFE 119 120 /* 121 ** This demo uses pthreads. If you do not have a pthreads implementation 122 ** for your operating system, you will need to recode the threading 123 ** logic. 124 */ 125 #include <pthread.h> 126 #include <sched.h> 127 128 /* Useful macros used in several places */ 129 #define MIN(x,y) ((x)<(y)?(x):(y)) 130 #define MAX(x,y) ((x)>(y)?(x):(y)) 131 132 /* Forward references */ 133 typedef struct AsyncWrite AsyncWrite; 134 typedef struct AsyncFile AsyncFile; 135 typedef struct AsyncFileData AsyncFileData; 136 typedef struct AsyncFileLock AsyncFileLock; 137 typedef struct AsyncLock AsyncLock; 138 139 /* Enable for debugging */ 140 static int sqlite3async_trace = 0; 141 # define ASYNC_TRACE(X) if( sqlite3async_trace ) asyncTrace X 142 static void asyncTrace(const char *zFormat, ...){ 143 char *z; 144 va_list ap; 145 va_start(ap, zFormat); 146 z = sqlite3_vmprintf(zFormat, ap); 147 va_end(ap); 148 fprintf(stderr, "[%d] %s", (int)pthread_self(), z); 149 sqlite3_free(z); 150 } 151 152 /* 153 ** THREAD SAFETY NOTES 154 ** 155 ** Basic rules: 156 ** 157 ** * Both read and write access to the global write-op queue must be 158 ** protected by the async.queueMutex. As are the async.ioError and 159 ** async.nFile variables. 160 ** 161 ** * The async.aLock hash-table and all AsyncLock and AsyncFileLock 162 ** structures must be protected by the async.lockMutex mutex. 163 ** 164 ** * The file handles from the underlying system are assumed not to 165 ** be thread safe. 166 ** 167 ** * See the last two paragraphs under "The Writer Thread" for 168 ** an assumption to do with file-handle synchronization by the Os. 169 ** 170 ** Deadlock prevention: 171 ** 172 ** There are three mutex used by the system: the "writer" mutex, 173 ** the "queue" mutex and the "lock" mutex. Rules are: 174 ** 175 ** * It is illegal to block on the writer mutex when any other mutex 176 ** are held, and 177 ** 178 ** * It is illegal to block on the queue mutex when the lock mutex 179 ** is held. 180 ** 181 ** i.e. mutex's must be grabbed in the order "writer", "queue", "lock". 182 ** 183 ** File system operations (invoked by SQLite thread): 184 ** 185 ** xOpen 186 ** xDelete 187 ** xFileExists 188 ** 189 ** File handle operations (invoked by SQLite thread): 190 ** 191 ** asyncWrite, asyncClose, asyncTruncate, asyncSync 192 ** 193 ** The operations above add an entry to the global write-op list. They 194 ** prepare the entry, acquire the async.queueMutex momentarily while 195 ** list pointers are manipulated to insert the new entry, then release 196 ** the mutex and signal the writer thread to wake up in case it happens 197 ** to be asleep. 198 ** 199 ** 200 ** asyncRead, asyncFileSize. 201 ** 202 ** Read operations. Both of these read from both the underlying file 203 ** first then adjust their result based on pending writes in the 204 ** write-op queue. So async.queueMutex is held for the duration 205 ** of these operations to prevent other threads from changing the 206 ** queue in mid operation. 207 ** 208 ** 209 ** asyncLock, asyncUnlock, asyncCheckReservedLock 210 ** 211 ** These primitives implement in-process locking using a hash table 212 ** on the file name. Files are locked correctly for connections coming 213 ** from the same process. But other processes cannot see these locks 214 ** and will therefore not honor them. 215 ** 216 ** 217 ** The writer thread: 218 ** 219 ** The async.writerMutex is used to make sure only there is only 220 ** a single writer thread running at a time. 221 ** 222 ** Inside the writer thread is a loop that works like this: 223 ** 224 ** WHILE (write-op list is not empty) 225 ** Do IO operation at head of write-op list 226 ** Remove entry from head of write-op list 227 ** END WHILE 228 ** 229 ** The async.queueMutex is always held during the <write-op list is 230 ** not empty> test, and when the entry is removed from the head 231 ** of the write-op list. Sometimes it is held for the interim 232 ** period (while the IO is performed), and sometimes it is 233 ** relinquished. It is relinquished if (a) the IO op is an 234 ** ASYNC_CLOSE or (b) when the file handle was opened, two of 235 ** the underlying systems handles were opened on the same 236 ** file-system entry. 237 ** 238 ** If condition (b) above is true, then one file-handle 239 ** (AsyncFile.pBaseRead) is used exclusively by sqlite threads to read the 240 ** file, the other (AsyncFile.pBaseWrite) by sqlite3_async_flush() 241 ** threads to perform write() operations. This means that read 242 ** operations are not blocked by asynchronous writes (although 243 ** asynchronous writes may still be blocked by reads). 244 ** 245 ** This assumes that the OS keeps two handles open on the same file 246 ** properly in sync. That is, any read operation that starts after a 247 ** write operation on the same file system entry has completed returns 248 ** data consistent with the write. We also assume that if one thread 249 ** reads a file while another is writing it all bytes other than the 250 ** ones actually being written contain valid data. 251 ** 252 ** If the above assumptions are not true, set the preprocessor symbol 253 ** SQLITE_ASYNC_TWO_FILEHANDLES to 0. 254 */ 255 256 #ifndef SQLITE_ASYNC_TWO_FILEHANDLES 257 /* #define SQLITE_ASYNC_TWO_FILEHANDLES 0 */ 258 #define SQLITE_ASYNC_TWO_FILEHANDLES 1 259 #endif 260 261 /* 262 ** State information is held in the static variable "async" defined 263 ** as the following structure. 264 ** 265 ** Both async.ioError and async.nFile are protected by async.queueMutex. 266 */ 267 static struct TestAsyncStaticData { 268 pthread_mutex_t lockMutex; /* For access to aLock hash table */ 269 pthread_mutex_t queueMutex; /* Mutex for access to write operation queue */ 270 pthread_mutex_t writerMutex; /* Prevents multiple writer threads */ 271 pthread_cond_t queueSignal; /* For waking up sleeping writer thread */ 272 pthread_cond_t emptySignal; /* Notify when the write queue is empty */ 273 AsyncWrite *pQueueFirst; /* Next write operation to be processed */ 274 AsyncWrite *pQueueLast; /* Last write operation on the list */ 275 Hash aLock; /* Files locked */ 276 volatile int ioDelay; /* Extra delay between write operations */ 277 volatile int writerHaltWhenIdle; /* Writer thread halts when queue empty */ 278 volatile int writerHaltNow; /* Writer thread halts after next op */ 279 int ioError; /* True if an IO error has occured */ 280 int nFile; /* Number of open files (from sqlite pov) */ 281 } async = { 282 PTHREAD_MUTEX_INITIALIZER, 283 PTHREAD_MUTEX_INITIALIZER, 284 PTHREAD_MUTEX_INITIALIZER, 285 PTHREAD_COND_INITIALIZER, 286 PTHREAD_COND_INITIALIZER, 287 }; 288 289 /* Possible values of AsyncWrite.op */ 290 #define ASYNC_NOOP 0 291 #define ASYNC_WRITE 1 292 #define ASYNC_SYNC 2 293 #define ASYNC_TRUNCATE 3 294 #define ASYNC_CLOSE 4 295 #define ASYNC_DELETE 5 296 #define ASYNC_OPENEXCLUSIVE 6 297 #define ASYNC_UNLOCK 7 298 299 /* Names of opcodes. Used for debugging only. 300 ** Make sure these stay in sync with the macros above! 301 */ 302 static const char *azOpcodeName[] = { 303 "NOOP", "WRITE", "SYNC", "TRUNCATE", "CLOSE", "DELETE", "OPENEX", "UNLOCK" 304 }; 305 306 /* 307 ** Entries on the write-op queue are instances of the AsyncWrite 308 ** structure, defined here. 309 ** 310 ** The interpretation of the iOffset and nByte variables varies depending 311 ** on the value of AsyncWrite.op: 312 ** 313 ** ASYNC_NOOP: 314 ** No values used. 315 ** 316 ** ASYNC_WRITE: 317 ** iOffset -> Offset in file to write to. 318 ** nByte -> Number of bytes of data to write (pointed to by zBuf). 319 ** 320 ** ASYNC_SYNC: 321 ** nByte -> flags to pass to sqlite3OsSync(). 322 ** 323 ** ASYNC_TRUNCATE: 324 ** iOffset -> Size to truncate file to. 325 ** nByte -> Unused. 326 ** 327 ** ASYNC_CLOSE: 328 ** iOffset -> Unused. 329 ** nByte -> Unused. 330 ** 331 ** ASYNC_DELETE: 332 ** iOffset -> Contains the "syncDir" flag. 333 ** nByte -> Number of bytes of zBuf points to (file name). 334 ** 335 ** ASYNC_OPENEXCLUSIVE: 336 ** iOffset -> Value of "delflag". 337 ** nByte -> Number of bytes of zBuf points to (file name). 338 ** 339 ** ASYNC_UNLOCK: 340 ** nByte -> Argument to sqlite3OsUnlock(). 341 ** 342 ** 343 ** For an ASYNC_WRITE operation, zBuf points to the data to write to the file. 344 ** This space is sqlite3_malloc()d along with the AsyncWrite structure in a 345 ** single blob, so is deleted when sqlite3_free() is called on the parent 346 ** structure. 347 */ 348 struct AsyncWrite { 349 AsyncFileData *pFileData; /* File to write data to or sync */ 350 int op; /* One of ASYNC_xxx etc. */ 351 i64 iOffset; /* See above */ 352 int nByte; /* See above */ 353 char *zBuf; /* Data to write to file (or NULL if op!=ASYNC_WRITE) */ 354 AsyncWrite *pNext; /* Next write operation (to any file) */ 355 }; 356 357 /* 358 ** An instance of this structure is created for each distinct open file 359 ** (i.e. if two handles are opened on the one file, only one of these 360 ** structures is allocated) and stored in the async.aLock hash table. The 361 ** keys for async.aLock are the full pathnames of the opened files. 362 ** 363 ** AsyncLock.pList points to the head of a linked list of AsyncFileLock 364 ** structures, one for each handle currently open on the file. 365 ** 366 ** If the opened file is not a main-database (the SQLITE_OPEN_MAIN_DB is 367 ** not passed to the sqlite3OsOpen() call), or if ENABLE_FILE_LOCKING is 368 ** not defined at compile time, variables AsyncLock.pFile and 369 ** AsyncLock.eLock are never used. Otherwise, pFile is a file handle 370 ** opened on the file in question and used to obtain the file-system 371 ** locks required by database connections within this process. 372 ** 373 ** See comments above the asyncLock() function for more details on 374 ** the implementation of database locking used by this backend. 375 */ 376 struct AsyncLock { 377 sqlite3_file *pFile; 378 int eLock; 379 AsyncFileLock *pList; 380 }; 381 382 /* 383 ** An instance of the following structure is allocated along with each 384 ** AsyncFileData structure (see AsyncFileData.lock), but is only used if the 385 ** file was opened with the SQLITE_OPEN_MAIN_DB. 386 */ 387 struct AsyncFileLock { 388 int eLock; /* Internally visible lock state (sqlite pov) */ 389 int eAsyncLock; /* Lock-state with write-queue unlock */ 390 AsyncFileLock *pNext; 391 }; 392 393 /* 394 ** The AsyncFile structure is a subclass of sqlite3_file used for 395 ** asynchronous IO. 396 ** 397 ** All of the actual data for the structure is stored in the structure 398 ** pointed to by AsyncFile.pData, which is allocated as part of the 399 ** sqlite3OsOpen() using sqlite3_malloc(). The reason for this is that the 400 ** lifetime of the AsyncFile structure is ended by the caller after OsClose() 401 ** is called, but the data in AsyncFileData may be required by the 402 ** writer thread after that point. 403 */ 404 struct AsyncFile { 405 sqlite3_io_methods *pMethod; 406 AsyncFileData *pData; 407 }; 408 struct AsyncFileData { 409 char *zName; /* Underlying OS filename - used for debugging */ 410 int nName; /* Number of characters in zName */ 411 sqlite3_file *pBaseRead; /* Read handle to the underlying Os file */ 412 sqlite3_file *pBaseWrite; /* Write handle to the underlying Os file */ 413 AsyncFileLock lock; 414 AsyncWrite close; 415 }; 416 417 /* 418 ** The following async_XXX functions are debugging wrappers around the 419 ** corresponding pthread_XXX functions: 420 ** 421 ** pthread_mutex_lock(); 422 ** pthread_mutex_unlock(); 423 ** pthread_mutex_trylock(); 424 ** pthread_cond_wait(); 425 ** 426 ** It is illegal to pass any mutex other than those stored in the 427 ** following global variables of these functions. 428 ** 429 ** async.queueMutex 430 ** async.writerMutex 431 ** async.lockMutex 432 ** 433 ** If NDEBUG is defined, these wrappers do nothing except call the 434 ** corresponding pthreads function. If NDEBUG is not defined, then the 435 ** following variables are used to store the thread-id (as returned 436 ** by pthread_self()) currently holding the mutex, or 0 otherwise: 437 ** 438 ** asyncdebug.queueMutexHolder 439 ** asyncdebug.writerMutexHolder 440 ** asyncdebug.lockMutexHolder 441 ** 442 ** These variables are used by some assert() statements that verify 443 ** the statements made in the "Deadlock Prevention" notes earlier 444 ** in this file. 445 */ 446 #ifndef NDEBUG 447 448 static struct TestAsyncDebugData { 449 pthread_t lockMutexHolder; 450 pthread_t queueMutexHolder; 451 pthread_t writerMutexHolder; 452 } asyncdebug = {0, 0, 0}; 453 454 /* 455 ** Wrapper around pthread_mutex_lock(). Checks that we have not violated 456 ** the anti-deadlock rules (see "Deadlock prevention" above). 457 */ 458 static int async_mutex_lock(pthread_mutex_t *pMutex){ 459 int iIdx; 460 int rc; 461 pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); 462 pthread_t *aHolder = (pthread_t *)(&asyncdebug); 463 464 /* The code in this 'ifndef NDEBUG' block depends on a certain alignment 465 * of the variables in TestAsyncStaticData and TestAsyncDebugData. The 466 * following assert() statements check that this has not been changed. 467 * 468 * Really, these only need to be run once at startup time. 469 */ 470 assert(&(aMutex[0])==&async.lockMutex); 471 assert(&(aMutex[1])==&async.queueMutex); 472 assert(&(aMutex[2])==&async.writerMutex); 473 assert(&(aHolder[0])==&asyncdebug.lockMutexHolder); 474 assert(&(aHolder[1])==&asyncdebug.queueMutexHolder); 475 assert(&(aHolder[2])==&asyncdebug.writerMutexHolder); 476 477 assert( pthread_self()!=0 ); 478 479 for(iIdx=0; iIdx<3; iIdx++){ 480 if( pMutex==&aMutex[iIdx] ) break; 481 482 /* This is the key assert(). Here we are checking that if the caller 483 * is trying to block on async.writerMutex, neither of the other two 484 * mutex are held. If the caller is trying to block on async.queueMutex, 485 * lockMutex is not held. 486 */ 487 assert(!pthread_equal(aHolder[iIdx], pthread_self())); 488 } 489 assert(iIdx<3); 490 491 rc = pthread_mutex_lock(pMutex); 492 if( rc==0 ){ 493 assert(aHolder[iIdx]==0); 494 aHolder[iIdx] = pthread_self(); 495 } 496 return rc; 497 } 498 499 /* 500 ** Wrapper around pthread_mutex_unlock(). 501 */ 502 static int async_mutex_unlock(pthread_mutex_t *pMutex){ 503 int iIdx; 504 int rc; 505 pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); 506 pthread_t *aHolder = (pthread_t *)(&asyncdebug); 507 508 for(iIdx=0; iIdx<3; iIdx++){ 509 if( pMutex==&aMutex[iIdx] ) break; 510 } 511 assert(iIdx<3); 512 513 assert(pthread_equal(aHolder[iIdx], pthread_self())); 514 aHolder[iIdx] = 0; 515 rc = pthread_mutex_unlock(pMutex); 516 assert(rc==0); 517 518 return 0; 519 } 520 521 /* 522 ** Wrapper around pthread_mutex_trylock(). 523 */ 524 static int async_mutex_trylock(pthread_mutex_t *pMutex){ 525 int iIdx; 526 int rc; 527 pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); 528 pthread_t *aHolder = (pthread_t *)(&asyncdebug); 529 530 for(iIdx=0; iIdx<3; iIdx++){ 531 if( pMutex==&aMutex[iIdx] ) break; 532 } 533 assert(iIdx<3); 534 535 rc = pthread_mutex_trylock(pMutex); 536 if( rc==0 ){ 537 assert(aHolder[iIdx]==0); 538 aHolder[iIdx] = pthread_self(); 539 } 540 return rc; 541 } 542 543 /* 544 ** Wrapper around pthread_cond_wait(). 545 */ 546 static int async_cond_wait(pthread_cond_t *pCond, pthread_mutex_t *pMutex){ 547 int iIdx; 548 int rc; 549 pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); 550 pthread_t *aHolder = (pthread_t *)(&asyncdebug); 551 552 for(iIdx=0; iIdx<3; iIdx++){ 553 if( pMutex==&aMutex[iIdx] ) break; 554 } 555 assert(iIdx<3); 556 557 assert(pthread_equal(aHolder[iIdx],pthread_self())); 558 aHolder[iIdx] = 0; 559 rc = pthread_cond_wait(pCond, pMutex); 560 if( rc==0 ){ 561 aHolder[iIdx] = pthread_self(); 562 } 563 return rc; 564 } 565 566 /* Call our async_XX wrappers instead of selected pthread_XX functions */ 567 #define pthread_mutex_lock async_mutex_lock 568 #define pthread_mutex_unlock async_mutex_unlock 569 #define pthread_mutex_trylock async_mutex_trylock 570 #define pthread_cond_wait async_cond_wait 571 572 #endif /* !defined(NDEBUG) */ 573 574 /* 575 ** Add an entry to the end of the global write-op list. pWrite should point 576 ** to an AsyncWrite structure allocated using sqlite3_malloc(). The writer 577 ** thread will call sqlite3_free() to free the structure after the specified 578 ** operation has been completed. 579 ** 580 ** Once an AsyncWrite structure has been added to the list, it becomes the 581 ** property of the writer thread and must not be read or modified by the 582 ** caller. 583 */ 584 static void addAsyncWrite(AsyncWrite *pWrite){ 585 /* We must hold the queue mutex in order to modify the queue pointers */ 586 pthread_mutex_lock(&async.queueMutex); 587 588 /* Add the record to the end of the write-op queue */ 589 assert( !pWrite->pNext ); 590 if( async.pQueueLast ){ 591 assert( async.pQueueFirst ); 592 async.pQueueLast->pNext = pWrite; 593 }else{ 594 async.pQueueFirst = pWrite; 595 } 596 async.pQueueLast = pWrite; 597 ASYNC_TRACE(("PUSH %p (%s %s %d)\n", pWrite, azOpcodeName[pWrite->op], 598 pWrite->pFileData ? pWrite->pFileData->zName : "-", pWrite->iOffset)); 599 600 if( pWrite->op==ASYNC_CLOSE ){ 601 async.nFile--; 602 } 603 604 /* Drop the queue mutex */ 605 pthread_mutex_unlock(&async.queueMutex); 606 607 /* The writer thread might have been idle because there was nothing 608 ** on the write-op queue for it to do. So wake it up. */ 609 pthread_cond_signal(&async.queueSignal); 610 } 611 612 /* 613 ** Increment async.nFile in a thread-safe manner. 614 */ 615 static void incrOpenFileCount(){ 616 /* We must hold the queue mutex in order to modify async.nFile */ 617 pthread_mutex_lock(&async.queueMutex); 618 if( async.nFile==0 ){ 619 async.ioError = SQLITE_OK; 620 } 621 async.nFile++; 622 pthread_mutex_unlock(&async.queueMutex); 623 } 624 625 /* 626 ** This is a utility function to allocate and populate a new AsyncWrite 627 ** structure and insert it (via addAsyncWrite() ) into the global list. 628 */ 629 static int addNewAsyncWrite( 630 AsyncFileData *pFileData, 631 int op, 632 i64 iOffset, 633 int nByte, 634 const char *zByte 635 ){ 636 AsyncWrite *p; 637 if( op!=ASYNC_CLOSE && async.ioError ){ 638 return async.ioError; 639 } 640 p = sqlite3_malloc(sizeof(AsyncWrite) + (zByte?nByte:0)); 641 if( !p ){ 642 /* The upper layer does not expect operations like OsWrite() to 643 ** return SQLITE_NOMEM. This is partly because under normal conditions 644 ** SQLite is required to do rollback without calling malloc(). So 645 ** if malloc() fails here, treat it as an I/O error. The above 646 ** layer knows how to handle that. 647 */ 648 return SQLITE_IOERR; 649 } 650 p->op = op; 651 p->iOffset = iOffset; 652 p->nByte = nByte; 653 p->pFileData = pFileData; 654 p->pNext = 0; 655 if( zByte ){ 656 p->zBuf = (char *)&p[1]; 657 memcpy(p->zBuf, zByte, nByte); 658 }else{ 659 p->zBuf = 0; 660 } 661 addAsyncWrite(p); 662 return SQLITE_OK; 663 } 664 665 /* 666 ** Close the file. This just adds an entry to the write-op list, the file is 667 ** not actually closed. 668 */ 669 static int asyncClose(sqlite3_file *pFile){ 670 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 671 672 /* Unlock the file, if it is locked */ 673 pthread_mutex_lock(&async.lockMutex); 674 p->lock.eLock = 0; 675 pthread_mutex_unlock(&async.lockMutex); 676 677 addAsyncWrite(&p->close); 678 return SQLITE_OK; 679 } 680 681 /* 682 ** Implementation of sqlite3OsWrite() for asynchronous files. Instead of 683 ** writing to the underlying file, this function adds an entry to the end of 684 ** the global AsyncWrite list. Either SQLITE_OK or SQLITE_NOMEM may be 685 ** returned. 686 */ 687 static int asyncWrite(sqlite3_file *pFile, const void *pBuf, int amt, i64 iOff){ 688 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 689 return addNewAsyncWrite(p, ASYNC_WRITE, iOff, amt, pBuf); 690 } 691 692 /* 693 ** Read data from the file. First we read from the filesystem, then adjust 694 ** the contents of the buffer based on ASYNC_WRITE operations in the 695 ** write-op queue. 696 ** 697 ** This method holds the mutex from start to finish. 698 */ 699 static int asyncRead(sqlite3_file *pFile, void *zOut, int iAmt, i64 iOffset){ 700 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 701 int rc = SQLITE_OK; 702 i64 filesize; 703 int nRead; 704 sqlite3_file *pBase = p->pBaseRead; 705 706 /* Grab the write queue mutex for the duration of the call */ 707 pthread_mutex_lock(&async.queueMutex); 708 709 /* If an I/O error has previously occurred in this virtual file 710 ** system, then all subsequent operations fail. 711 */ 712 if( async.ioError!=SQLITE_OK ){ 713 rc = async.ioError; 714 goto asyncread_out; 715 } 716 717 if( pBase->pMethods ){ 718 rc = sqlite3OsFileSize(pBase, &filesize); 719 if( rc!=SQLITE_OK ){ 720 goto asyncread_out; 721 } 722 nRead = MIN(filesize - iOffset, iAmt); 723 if( nRead>0 ){ 724 rc = sqlite3OsRead(pBase, zOut, nRead, iOffset); 725 ASYNC_TRACE(("READ %s %d bytes at %d\n", p->zName, nRead, iOffset)); 726 } 727 } 728 729 if( rc==SQLITE_OK ){ 730 AsyncWrite *pWrite; 731 char *zName = p->zName; 732 733 for(pWrite=async.pQueueFirst; pWrite; pWrite = pWrite->pNext){ 734 if( pWrite->op==ASYNC_WRITE && pWrite->pFileData->zName==zName ){ 735 int iBeginOut = (pWrite->iOffset-iOffset); 736 int iBeginIn = -iBeginOut; 737 int nCopy; 738 739 if( iBeginIn<0 ) iBeginIn = 0; 740 if( iBeginOut<0 ) iBeginOut = 0; 741 nCopy = MIN(pWrite->nByte-iBeginIn, iAmt-iBeginOut); 742 743 if( nCopy>0 ){ 744 memcpy(&((char *)zOut)[iBeginOut], &pWrite->zBuf[iBeginIn], nCopy); 745 ASYNC_TRACE(("OVERREAD %d bytes at %d\n", nCopy, iBeginOut+iOffset)); 746 } 747 } 748 } 749 } 750 751 asyncread_out: 752 pthread_mutex_unlock(&async.queueMutex); 753 return rc; 754 } 755 756 /* 757 ** Truncate the file to nByte bytes in length. This just adds an entry to 758 ** the write-op list, no IO actually takes place. 759 */ 760 static int asyncTruncate(sqlite3_file *pFile, i64 nByte){ 761 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 762 return addNewAsyncWrite(p, ASYNC_TRUNCATE, nByte, 0, 0); 763 } 764 765 /* 766 ** Sync the file. This just adds an entry to the write-op list, the 767 ** sync() is done later by sqlite3_async_flush(). 768 */ 769 static int asyncSync(sqlite3_file *pFile, int flags){ 770 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 771 return addNewAsyncWrite(p, ASYNC_SYNC, 0, flags, 0); 772 } 773 774 /* 775 ** Read the size of the file. First we read the size of the file system 776 ** entry, then adjust for any ASYNC_WRITE or ASYNC_TRUNCATE operations 777 ** currently in the write-op list. 778 ** 779 ** This method holds the mutex from start to finish. 780 */ 781 int asyncFileSize(sqlite3_file *pFile, i64 *piSize){ 782 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 783 int rc = SQLITE_OK; 784 i64 s = 0; 785 sqlite3_file *pBase; 786 787 pthread_mutex_lock(&async.queueMutex); 788 789 /* Read the filesystem size from the base file. If pBaseRead is NULL, this 790 ** means the file hasn't been opened yet. In this case all relevant data 791 ** must be in the write-op queue anyway, so we can omit reading from the 792 ** file-system. 793 */ 794 pBase = p->pBaseRead; 795 if( pBase->pMethods ){ 796 rc = sqlite3OsFileSize(pBase, &s); 797 } 798 799 if( rc==SQLITE_OK ){ 800 AsyncWrite *pWrite; 801 for(pWrite=async.pQueueFirst; pWrite; pWrite = pWrite->pNext){ 802 if( pWrite->op==ASYNC_DELETE && strcmp(p->zName, pWrite->zBuf)==0 ){ 803 s = 0; 804 }else if( pWrite->pFileData && pWrite->pFileData->zName==p->zName){ 805 switch( pWrite->op ){ 806 case ASYNC_WRITE: 807 s = MAX(pWrite->iOffset + (i64)(pWrite->nByte), s); 808 break; 809 case ASYNC_TRUNCATE: 810 s = MIN(s, pWrite->iOffset); 811 break; 812 } 813 } 814 } 815 *piSize = s; 816 } 817 pthread_mutex_unlock(&async.queueMutex); 818 return rc; 819 } 820 821 /* 822 ** Lock or unlock the actual file-system entry. 823 */ 824 static int getFileLock(AsyncLock *pLock){ 825 int rc = SQLITE_OK; 826 AsyncFileLock *pIter; 827 int eRequired = 0; 828 829 if( pLock->pFile ){ 830 for(pIter=pLock->pList; pIter; pIter=pIter->pNext){ 831 assert(pIter->eAsyncLock>=pIter->eLock); 832 if( pIter->eAsyncLock>eRequired ){ 833 eRequired = pIter->eAsyncLock; 834 assert(eRequired>=0 && eRequired<=SQLITE_LOCK_EXCLUSIVE); 835 } 836 } 837 838 if( eRequired>pLock->eLock ){ 839 rc = sqlite3OsLock(pLock->pFile, eRequired); 840 if( rc==SQLITE_OK ){ 841 pLock->eLock = eRequired; 842 } 843 } 844 else if( eRequired<pLock->eLock && eRequired<=SQLITE_LOCK_SHARED ){ 845 rc = sqlite3OsUnlock(pLock->pFile, eRequired); 846 if( rc==SQLITE_OK ){ 847 pLock->eLock = eRequired; 848 } 849 } 850 } 851 852 return rc; 853 } 854 855 /* 856 ** The following two methods - asyncLock() and asyncUnlock() - are used 857 ** to obtain and release locks on database files opened with the 858 ** asynchronous backend. 859 */ 860 static int asyncLock(sqlite3_file *pFile, int eLock){ 861 int rc = SQLITE_OK; 862 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 863 864 pthread_mutex_lock(&async.lockMutex); 865 if( p->lock.eLock<eLock ){ 866 AsyncLock *pLock; 867 AsyncFileLock *pIter; 868 pLock = (AsyncLock *)sqlite3HashFind(&async.aLock, p->zName, p->nName); 869 assert(pLock && pLock->pList); 870 for(pIter=pLock->pList; pIter; pIter=pIter->pNext){ 871 if( pIter!=&p->lock && ( 872 (eLock==SQLITE_LOCK_EXCLUSIVE && pIter->eLock>=SQLITE_LOCK_SHARED) || 873 (eLock==SQLITE_LOCK_PENDING && pIter->eLock>=SQLITE_LOCK_RESERVED) || 874 (eLock==SQLITE_LOCK_RESERVED && pIter->eLock>=SQLITE_LOCK_RESERVED) || 875 (eLock==SQLITE_LOCK_SHARED && pIter->eLock>=SQLITE_LOCK_PENDING) 876 )){ 877 rc = SQLITE_BUSY; 878 } 879 } 880 if( rc==SQLITE_OK ){ 881 p->lock.eLock = eLock; 882 p->lock.eAsyncLock = MAX(p->lock.eAsyncLock, eLock); 883 } 884 assert(p->lock.eAsyncLock>=p->lock.eLock); 885 if( rc==SQLITE_OK ){ 886 rc = getFileLock(pLock); 887 } 888 } 889 pthread_mutex_unlock(&async.lockMutex); 890 891 ASYNC_TRACE(("LOCK %d (%s) rc=%d\n", eLock, p->zName, rc)); 892 return rc; 893 } 894 static int asyncUnlock(sqlite3_file *pFile, int eLock){ 895 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 896 AsyncFileLock *pLock = &p->lock; 897 pthread_mutex_lock(&async.lockMutex); 898 pLock->eLock = MIN(pLock->eLock, eLock); 899 pthread_mutex_unlock(&async.lockMutex); 900 return addNewAsyncWrite(p, ASYNC_UNLOCK, 0, eLock, 0); 901 } 902 903 /* 904 ** This function is called when the pager layer first opens a database file 905 ** and is checking for a hot-journal. 906 */ 907 static int asyncCheckReservedLock(sqlite3_file *pFile){ 908 int ret = 0; 909 AsyncFileLock *pIter; 910 AsyncLock *pLock; 911 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 912 913 pthread_mutex_lock(&async.lockMutex); 914 pLock = (AsyncLock *)sqlite3HashFind(&async.aLock, p->zName, p->nName); 915 for(pIter=pLock->pList; pIter; pIter=pIter->pNext){ 916 if( pIter->eLock>=SQLITE_LOCK_RESERVED ){ 917 ret = 1; 918 } 919 } 920 pthread_mutex_unlock(&async.lockMutex); 921 922 ASYNC_TRACE(("CHECK-LOCK %d (%s)\n", ret, p->zName)); 923 return ret; 924 } 925 926 /* 927 ** This is a no-op, as the asynchronous backend does not support locking. 928 */ 929 static int asyncFileControl(sqlite3_file *id, int op, void *pArg){ 930 switch( op ){ 931 case SQLITE_FCNTL_LOCKSTATE: { 932 pthread_mutex_lock(&async.lockMutex); 933 *(int*)pArg = ((AsyncFile*)id)->pData->lock.eLock; 934 pthread_mutex_unlock(&async.lockMutex); 935 return SQLITE_OK; 936 } 937 } 938 return SQLITE_ERROR; 939 } 940 941 /* 942 ** Return the device characteristics and sector-size of the device. It 943 ** is not tricky to implement these correctly, as this backend might 944 ** not have an open file handle at this point. 945 */ 946 static int asyncSectorSize(sqlite3_file *pFile){ 947 return 512; 948 } 949 static int asyncDeviceCharacteristics(sqlite3_file *pFile){ 950 return 0; 951 } 952 953 static int unlinkAsyncFile(AsyncFileData *pData){ 954 AsyncLock *pLock; 955 AsyncFileLock **ppIter; 956 int rc = SQLITE_OK; 957 958 pLock = sqlite3HashFind(&async.aLock, pData->zName, pData->nName); 959 for(ppIter=&pLock->pList; *ppIter; ppIter=&((*ppIter)->pNext)){ 960 if( (*ppIter)==&pData->lock ){ 961 *ppIter = pData->lock.pNext; 962 break; 963 } 964 } 965 if( !pLock->pList ){ 966 if( pLock->pFile ){ 967 sqlite3OsClose(pLock->pFile); 968 } 969 sqlite3_free(pLock); 970 sqlite3HashInsert(&async.aLock, pData->zName, pData->nName, 0); 971 if( !sqliteHashFirst(&async.aLock) ){ 972 sqlite3HashClear(&async.aLock); 973 } 974 }else{ 975 rc = getFileLock(pLock); 976 } 977 978 return rc; 979 } 980 981 /* 982 ** Open a file. 983 */ 984 static int asyncOpen( 985 sqlite3_vfs *pAsyncVfs, 986 const char *zName, 987 sqlite3_file *pFile, 988 int flags, 989 int *pOutFlags 990 ){ 991 static sqlite3_io_methods async_methods = { 992 1, /* iVersion */ 993 asyncClose, /* xClose */ 994 asyncRead, /* xRead */ 995 asyncWrite, /* xWrite */ 996 asyncTruncate, /* xTruncate */ 997 asyncSync, /* xSync */ 998 asyncFileSize, /* xFileSize */ 999 asyncLock, /* xLock */ 1000 asyncUnlock, /* xUnlock */ 1001 asyncCheckReservedLock, /* xCheckReservedLock */ 1002 asyncFileControl, /* xFileControl */ 1003 asyncSectorSize, /* xSectorSize */ 1004 asyncDeviceCharacteristics /* xDeviceCharacteristics */ 1005 }; 1006 1007 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1008 AsyncFile *p = (AsyncFile *)pFile; 1009 int nName = strlen(zName)+1; 1010 int rc = SQLITE_OK; 1011 int nByte; 1012 AsyncFileData *pData; 1013 AsyncLock *pLock = 0; 1014 int isExclusive = (flags&SQLITE_OPEN_EXCLUSIVE); 1015 1016 nByte = ( 1017 sizeof(AsyncFileData) + /* AsyncFileData structure */ 1018 2 * pVfs->szOsFile + /* AsyncFileData.pBaseRead and pBaseWrite */ 1019 nName /* AsyncFileData.zName */ 1020 ); 1021 pData = sqlite3_malloc(nByte); 1022 if( !pData ){ 1023 return SQLITE_NOMEM; 1024 } 1025 memset(pData, 0, nByte); 1026 pData->zName = (char *)&pData[1]; 1027 pData->nName = nName; 1028 pData->pBaseRead = (sqlite3_file *)&pData->zName[nName]; 1029 pData->pBaseWrite = (sqlite3_file *)&pData->zName[nName+pVfs->szOsFile]; 1030 pData->close.pFileData = pData; 1031 pData->close.op = ASYNC_CLOSE; 1032 memcpy(pData->zName, zName, nName); 1033 1034 if( !isExclusive ){ 1035 rc = sqlite3OsOpen(pVfs, zName, pData->pBaseRead, flags, pOutFlags); 1036 if( rc==SQLITE_OK && ((*pOutFlags)&SQLITE_OPEN_READWRITE) ){ 1037 rc = sqlite3OsOpen(pVfs, zName, pData->pBaseWrite, flags, 0); 1038 } 1039 } 1040 1041 pthread_mutex_lock(&async.lockMutex); 1042 1043 if( rc==SQLITE_OK ){ 1044 pLock = sqlite3HashFind(&async.aLock, pData->zName, pData->nName); 1045 if( !pLock ){ 1046 pLock = sqlite3MallocZero(pVfs->szOsFile + sizeof(AsyncLock)); 1047 if( pLock ){ 1048 AsyncLock *pDelete; 1049 #ifdef ENABLE_FILE_LOCKING 1050 if( flags&SQLITE_OPEN_MAIN_DB ){ 1051 pLock->pFile = (sqlite3_file *)&pLock[1]; 1052 rc = sqlite3OsOpen(pVfs, zName, pLock->pFile, flags, 0); 1053 if( rc!=SQLITE_OK ){ 1054 sqlite3_free(pLock); 1055 pLock = 0; 1056 } 1057 } 1058 #endif 1059 pDelete = sqlite3HashInsert( 1060 &async.aLock, pData->zName, pData->nName, (void *)pLock 1061 ); 1062 if( pDelete ){ 1063 rc = SQLITE_NOMEM; 1064 sqlite3_free(pLock); 1065 } 1066 }else{ 1067 rc = SQLITE_NOMEM; 1068 } 1069 } 1070 } 1071 1072 if( rc==SQLITE_OK ){ 1073 HashElem *pElem; 1074 p->pMethod = &async_methods; 1075 p->pData = pData; 1076 1077 /* Link AsyncFileData.lock into the linked list of 1078 ** AsyncFileLock structures for this file. 1079 */ 1080 pData->lock.pNext = pLock->pList; 1081 pLock->pList = &pData->lock; 1082 1083 pElem = sqlite3HashFindElem(&async.aLock, pData->zName, pData->nName); 1084 pData->zName = (char *)sqliteHashKey(pElem); 1085 }else{ 1086 sqlite3OsClose(pData->pBaseRead); 1087 sqlite3OsClose(pData->pBaseWrite); 1088 sqlite3_free(pData); 1089 } 1090 1091 pthread_mutex_unlock(&async.lockMutex); 1092 1093 if( rc==SQLITE_OK ){ 1094 incrOpenFileCount(); 1095 } 1096 1097 if( rc==SQLITE_OK && isExclusive ){ 1098 rc = addNewAsyncWrite(pData, ASYNC_OPENEXCLUSIVE, (i64)flags, 0, 0); 1099 if( rc==SQLITE_OK ){ 1100 if( pOutFlags ) *pOutFlags = flags; 1101 }else{ 1102 pthread_mutex_lock(&async.lockMutex); 1103 unlinkAsyncFile(pData); 1104 pthread_mutex_unlock(&async.lockMutex); 1105 sqlite3_free(pData); 1106 } 1107 } 1108 return rc; 1109 } 1110 1111 /* 1112 ** Implementation of sqlite3OsDelete. Add an entry to the end of the 1113 ** write-op queue to perform the delete. 1114 */ 1115 static int asyncDelete(sqlite3_vfs *pAsyncVfs, const char *z, int syncDir){ 1116 return addNewAsyncWrite(0, ASYNC_DELETE, syncDir, strlen(z)+1, z); 1117 } 1118 1119 /* 1120 ** Implementation of sqlite3OsAccess. This method holds the mutex from 1121 ** start to finish. 1122 */ 1123 static int asyncAccess(sqlite3_vfs *pAsyncVfs, const char *zName, int flags){ 1124 int ret; 1125 AsyncWrite *p; 1126 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1127 1128 assert(flags==SQLITE_ACCESS_READWRITE 1129 || flags==SQLITE_ACCESS_READ 1130 || flags==SQLITE_ACCESS_EXISTS 1131 ); 1132 1133 pthread_mutex_lock(&async.queueMutex); 1134 ret = sqlite3OsAccess(pVfs, zName, flags); 1135 if( flags==SQLITE_ACCESS_EXISTS ){ 1136 for(p=async.pQueueFirst; p; p = p->pNext){ 1137 if( p->op==ASYNC_DELETE && 0==strcmp(p->zBuf, zName) ){ 1138 ret = 0; 1139 }else if( p->op==ASYNC_OPENEXCLUSIVE 1140 && 0==strcmp(p->pFileData->zName, zName) 1141 ){ 1142 ret = 1; 1143 } 1144 } 1145 } 1146 ASYNC_TRACE(("ACCESS(%s): %s = %d\n", 1147 flags==SQLITE_ACCESS_READWRITE?"read-write": 1148 flags==SQLITE_ACCESS_READ?"read":"exists" 1149 , zName, ret) 1150 ); 1151 pthread_mutex_unlock(&async.queueMutex); 1152 return ret; 1153 } 1154 1155 static int asyncGetTempname(sqlite3_vfs *pAsyncVfs, int nBufOut, char *zBufOut){ 1156 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1157 return pVfs->xGetTempname(pVfs, nBufOut, zBufOut); 1158 } 1159 1160 /* 1161 ** Fill in zPathOut with the full path to the file identified by zPath. 1162 */ 1163 static int asyncFullPathname( 1164 sqlite3_vfs *pAsyncVfs, 1165 const char *zPath, 1166 int nPathOut, 1167 char *zPathOut 1168 ){ 1169 int rc; 1170 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1171 rc = sqlite3OsFullPathname(pVfs, zPath, nPathOut, zPathOut); 1172 1173 /* Because of the way intra-process file locking works, this backend 1174 ** needs to return a canonical path. The following block assumes the 1175 ** file-system uses unix style paths. 1176 */ 1177 if( rc==SQLITE_OK ){ 1178 int iIn; 1179 int iOut = 0; 1180 int nPathOut = strlen(zPathOut); 1181 1182 for(iIn=0; iIn<nPathOut; iIn++){ 1183 1184 /* Replace any occurences of "//" with "/" */ 1185 if( iIn<=(nPathOut-2) && zPathOut[iIn]=='/' && zPathOut[iIn+1]=='/' 1186 ){ 1187 continue; 1188 } 1189 1190 /* Replace any occurences of "/./" with "/" */ 1191 if( iIn<=(nPathOut-3) 1192 && zPathOut[iIn]=='/' && zPathOut[iIn+1]=='.' && zPathOut[iIn+2]=='/' 1193 ){ 1194 iIn++; 1195 continue; 1196 } 1197 1198 /* Replace any occurences of "<path-component>/../" with "" */ 1199 if( iOut>0 && iIn<=(nPathOut-4) 1200 && zPathOut[iIn]=='/' && zPathOut[iIn+1]=='.' 1201 && zPathOut[iIn+2]=='.' && zPathOut[iIn+3]=='/' 1202 ){ 1203 iIn += 3; 1204 iOut--; 1205 for( ; iOut>0 && zPathOut[iOut-1]!='/'; iOut--); 1206 continue; 1207 } 1208 1209 zPathOut[iOut++] = zPathOut[iIn]; 1210 } 1211 zPathOut[iOut] = '\0'; 1212 } 1213 1214 return rc; 1215 } 1216 static void *asyncDlOpen(sqlite3_vfs *pAsyncVfs, const char *zPath){ 1217 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1218 return pVfs->xDlOpen(pVfs, zPath); 1219 } 1220 static void asyncDlError(sqlite3_vfs *pAsyncVfs, int nByte, char *zErrMsg){ 1221 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1222 pVfs->xDlError(pVfs, nByte, zErrMsg); 1223 } 1224 static void *asyncDlSym( 1225 sqlite3_vfs *pAsyncVfs, 1226 void *pHandle, 1227 const char *zSymbol 1228 ){ 1229 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1230 return pVfs->xDlSym(pVfs, pHandle, zSymbol); 1231 } 1232 static void asyncDlClose(sqlite3_vfs *pAsyncVfs, void *pHandle){ 1233 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1234 pVfs->xDlClose(pVfs, pHandle); 1235 } 1236 static int asyncRandomness(sqlite3_vfs *pAsyncVfs, int nByte, char *zBufOut){ 1237 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1238 return pVfs->xRandomness(pVfs, nByte, zBufOut); 1239 } 1240 static int asyncSleep(sqlite3_vfs *pAsyncVfs, int nMicro){ 1241 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1242 return pVfs->xSleep(pVfs, nMicro); 1243 } 1244 static int asyncCurrentTime(sqlite3_vfs *pAsyncVfs, double *pTimeOut){ 1245 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1246 return pVfs->xCurrentTime(pVfs, pTimeOut); 1247 } 1248 1249 static sqlite3_vfs async_vfs = { 1250 1, /* iVersion */ 1251 sizeof(AsyncFile), /* szOsFile */ 1252 0, /* mxPathname */ 1253 0, /* pNext */ 1254 "async", /* zName */ 1255 0, /* pAppData */ 1256 asyncOpen, /* xOpen */ 1257 asyncDelete, /* xDelete */ 1258 asyncAccess, /* xAccess */ 1259 asyncGetTempname, /* xGetTempName */ 1260 asyncFullPathname, /* xFullPathname */ 1261 asyncDlOpen, /* xDlOpen */ 1262 asyncDlError, /* xDlError */ 1263 asyncDlSym, /* xDlSym */ 1264 asyncDlClose, /* xDlClose */ 1265 asyncRandomness, /* xDlError */ 1266 asyncSleep, /* xDlSym */ 1267 asyncCurrentTime /* xDlClose */ 1268 }; 1269 1270 /* 1271 ** Call this routine to enable or disable the 1272 ** asynchronous IO features implemented in this file. 1273 ** 1274 ** This routine is not even remotely threadsafe. Do not call 1275 ** this routine while any SQLite database connections are open. 1276 */ 1277 static void asyncEnable(int enable){ 1278 if( enable ){ 1279 if( !async_vfs.pAppData ){ 1280 static int hashTableInit = 0; 1281 async_vfs.pAppData = (void *)sqlite3_vfs_find(0); 1282 async_vfs.mxPathname = ((sqlite3_vfs *)async_vfs.pAppData)->mxPathname; 1283 sqlite3_vfs_register(&async_vfs, 1); 1284 if( !hashTableInit ){ 1285 sqlite3HashInit(&async.aLock, SQLITE_HASH_BINARY, 1); 1286 hashTableInit = 1; 1287 } 1288 } 1289 }else{ 1290 if( async_vfs.pAppData ){ 1291 sqlite3_vfs_unregister(&async_vfs); 1292 async_vfs.pAppData = 0; 1293 } 1294 } 1295 } 1296 1297 /* 1298 ** This procedure runs in a separate thread, reading messages off of the 1299 ** write queue and processing them one by one. 1300 ** 1301 ** If async.writerHaltNow is true, then this procedure exits 1302 ** after processing a single message. 1303 ** 1304 ** If async.writerHaltWhenIdle is true, then this procedure exits when 1305 ** the write queue is empty. 1306 ** 1307 ** If both of the above variables are false, this procedure runs 1308 ** indefinately, waiting for operations to be added to the write queue 1309 ** and processing them in the order in which they arrive. 1310 ** 1311 ** An artifical delay of async.ioDelay milliseconds is inserted before 1312 ** each write operation in order to simulate the effect of a slow disk. 1313 ** 1314 ** Only one instance of this procedure may be running at a time. 1315 */ 1316 static void *asyncWriterThread(void *pIsStarted){ 1317 sqlite3_vfs *pVfs = (sqlite3_vfs *)(async_vfs.pAppData); 1318 AsyncWrite *p = 0; 1319 int rc = SQLITE_OK; 1320 int holdingMutex = 0; 1321 1322 if( pthread_mutex_trylock(&async.writerMutex) ){ 1323 return 0; 1324 } 1325 (*(int *)pIsStarted) = 1; 1326 while( async.writerHaltNow==0 ){ 1327 int doNotFree = 0; 1328 sqlite3_file *pBase = 0; 1329 1330 if( !holdingMutex ){ 1331 pthread_mutex_lock(&async.queueMutex); 1332 } 1333 while( (p = async.pQueueFirst)==0 ){ 1334 pthread_cond_broadcast(&async.emptySignal); 1335 if( async.writerHaltWhenIdle ){ 1336 pthread_mutex_unlock(&async.queueMutex); 1337 break; 1338 }else{ 1339 ASYNC_TRACE(("IDLE\n")); 1340 pthread_cond_wait(&async.queueSignal, &async.queueMutex); 1341 ASYNC_TRACE(("WAKEUP\n")); 1342 } 1343 } 1344 if( p==0 ) break; 1345 holdingMutex = 1; 1346 1347 /* Right now this thread is holding the mutex on the write-op queue. 1348 ** Variable 'p' points to the first entry in the write-op queue. In 1349 ** the general case, we hold on to the mutex for the entire body of 1350 ** the loop. 1351 ** 1352 ** However in the cases enumerated below, we relinquish the mutex, 1353 ** perform the IO, and then re-request the mutex before removing 'p' from 1354 ** the head of the write-op queue. The idea is to increase concurrency with 1355 ** sqlite threads. 1356 ** 1357 ** * An ASYNC_CLOSE operation. 1358 ** * An ASYNC_OPENEXCLUSIVE operation. For this one, we relinquish 1359 ** the mutex, call the underlying xOpenExclusive() function, then 1360 ** re-aquire the mutex before seting the AsyncFile.pBaseRead 1361 ** variable. 1362 ** * ASYNC_SYNC and ASYNC_WRITE operations, if 1363 ** SQLITE_ASYNC_TWO_FILEHANDLES was set at compile time and two 1364 ** file-handles are open for the particular file being "synced". 1365 */ 1366 if( async.ioError!=SQLITE_OK && p->op!=ASYNC_CLOSE ){ 1367 p->op = ASYNC_NOOP; 1368 } 1369 if( p->pFileData ){ 1370 pBase = p->pFileData->pBaseWrite; 1371 if( 1372 p->op==ASYNC_CLOSE || 1373 p->op==ASYNC_OPENEXCLUSIVE || 1374 (pBase->pMethods && (p->op==ASYNC_SYNC || p->op==ASYNC_WRITE) ) 1375 ){ 1376 pthread_mutex_unlock(&async.queueMutex); 1377 holdingMutex = 0; 1378 } 1379 if( !pBase->pMethods ){ 1380 pBase = p->pFileData->pBaseRead; 1381 } 1382 } 1383 1384 switch( p->op ){ 1385 case ASYNC_NOOP: 1386 break; 1387 1388 case ASYNC_WRITE: 1389 assert( pBase ); 1390 ASYNC_TRACE(("WRITE %s %d bytes at %d\n", 1391 p->pFileData->zName, p->nByte, p->iOffset)); 1392 rc = sqlite3OsWrite(pBase, (void *)(p->zBuf), p->nByte, p->iOffset); 1393 break; 1394 1395 case ASYNC_SYNC: 1396 assert( pBase ); 1397 ASYNC_TRACE(("SYNC %s\n", p->pFileData->zName)); 1398 rc = sqlite3OsSync(pBase, p->nByte); 1399 break; 1400 1401 case ASYNC_TRUNCATE: 1402 assert( pBase ); 1403 ASYNC_TRACE(("TRUNCATE %s to %d bytes\n", 1404 p->pFileData->zName, p->iOffset)); 1405 rc = sqlite3OsTruncate(pBase, p->iOffset); 1406 break; 1407 1408 case ASYNC_CLOSE: { 1409 AsyncFileData *pData = p->pFileData; 1410 ASYNC_TRACE(("CLOSE %s\n", p->pFileData->zName)); 1411 sqlite3OsClose(pData->pBaseWrite); 1412 sqlite3OsClose(pData->pBaseRead); 1413 1414 /* Unlink AsyncFileData.lock from the linked list of AsyncFileLock 1415 ** structures for this file. Obtain the async.lockMutex mutex 1416 ** before doing so. 1417 */ 1418 pthread_mutex_lock(&async.lockMutex); 1419 rc = unlinkAsyncFile(pData); 1420 pthread_mutex_unlock(&async.lockMutex); 1421 1422 async.pQueueFirst = p->pNext; 1423 sqlite3_free(pData); 1424 doNotFree = 1; 1425 break; 1426 } 1427 1428 case ASYNC_UNLOCK: { 1429 AsyncLock *pLock; 1430 AsyncFileData *pData = p->pFileData; 1431 int eLock = p->nByte; 1432 pthread_mutex_lock(&async.lockMutex); 1433 pData->lock.eAsyncLock = MIN( 1434 pData->lock.eAsyncLock, MAX(pData->lock.eLock, eLock) 1435 ); 1436 assert(pData->lock.eAsyncLock>=pData->lock.eLock); 1437 pLock = sqlite3HashFind(&async.aLock, pData->zName, pData->nName); 1438 rc = getFileLock(pLock); 1439 pthread_mutex_unlock(&async.lockMutex); 1440 break; 1441 } 1442 1443 case ASYNC_DELETE: 1444 ASYNC_TRACE(("DELETE %s\n", p->zBuf)); 1445 rc = sqlite3OsDelete(pVfs, p->zBuf, (int)p->iOffset); 1446 break; 1447 1448 case ASYNC_OPENEXCLUSIVE: { 1449 int flags = (int)p->iOffset; 1450 AsyncFileData *pData = p->pFileData; 1451 ASYNC_TRACE(("OPEN %s flags=%d\n", p->zBuf, (int)p->iOffset)); 1452 assert(pData->pBaseRead->pMethods==0 && pData->pBaseWrite->pMethods==0); 1453 rc = sqlite3OsOpen(pVfs, pData->zName, pData->pBaseRead, flags, 0); 1454 assert( holdingMutex==0 ); 1455 pthread_mutex_lock(&async.queueMutex); 1456 holdingMutex = 1; 1457 break; 1458 } 1459 1460 default: assert(!"Illegal value for AsyncWrite.op"); 1461 } 1462 1463 /* If we didn't hang on to the mutex during the IO op, obtain it now 1464 ** so that the AsyncWrite structure can be safely removed from the 1465 ** global write-op queue. 1466 */ 1467 if( !holdingMutex ){ 1468 pthread_mutex_lock(&async.queueMutex); 1469 holdingMutex = 1; 1470 } 1471 /* ASYNC_TRACE(("UNLINK %p\n", p)); */ 1472 if( p==async.pQueueLast ){ 1473 async.pQueueLast = 0; 1474 } 1475 if( !doNotFree ){ 1476 async.pQueueFirst = p->pNext; 1477 sqlite3_free(p); 1478 } 1479 assert( holdingMutex ); 1480 1481 /* An IO error has occured. We cannot report the error back to the 1482 ** connection that requested the I/O since the error happened 1483 ** asynchronously. The connection has already moved on. There 1484 ** really is nobody to report the error to. 1485 ** 1486 ** The file for which the error occured may have been a database or 1487 ** journal file. Regardless, none of the currently queued operations 1488 ** associated with the same database should now be performed. Nor should 1489 ** any subsequently requested IO on either a database or journal file 1490 ** handle for the same database be accepted until the main database 1491 ** file handle has been closed and reopened. 1492 ** 1493 ** Furthermore, no further IO should be queued or performed on any file 1494 ** handle associated with a database that may have been part of a 1495 ** multi-file transaction that included the database associated with 1496 ** the IO error (i.e. a database ATTACHed to the same handle at some 1497 ** point in time). 1498 */ 1499 if( rc!=SQLITE_OK ){ 1500 async.ioError = rc; 1501 } 1502 1503 if( async.ioError && !async.pQueueFirst ){ 1504 pthread_mutex_lock(&async.lockMutex); 1505 if( 0==sqliteHashFirst(&async.aLock) ){ 1506 async.ioError = SQLITE_OK; 1507 } 1508 pthread_mutex_unlock(&async.lockMutex); 1509 } 1510 1511 /* Drop the queue mutex before continuing to the next write operation 1512 ** in order to give other threads a chance to work with the write queue. 1513 */ 1514 if( !async.pQueueFirst || !async.ioError ){ 1515 pthread_mutex_unlock(&async.queueMutex); 1516 holdingMutex = 0; 1517 if( async.ioDelay>0 ){ 1518 sqlite3OsSleep(pVfs, async.ioDelay); 1519 }else{ 1520 sched_yield(); 1521 } 1522 } 1523 } 1524 1525 pthread_mutex_unlock(&async.writerMutex); 1526 return 0; 1527 } 1528 1529 /************************************************************************** 1530 ** The remaining code defines a Tcl interface for testing the asynchronous 1531 ** IO implementation in this file. 1532 ** 1533 ** To adapt the code to a non-TCL environment, delete or comment out 1534 ** the code that follows. 1535 */ 1536 1537 /* 1538 ** sqlite3async_enable ?YES/NO? 1539 ** 1540 ** Enable or disable the asynchronous I/O backend. This command is 1541 ** not thread-safe. Do not call it while any database connections 1542 ** are open. 1543 */ 1544 static int testAsyncEnable( 1545 void * clientData, 1546 Tcl_Interp *interp, 1547 int objc, 1548 Tcl_Obj *CONST objv[] 1549 ){ 1550 if( objc!=1 && objc!=2 ){ 1551 Tcl_WrongNumArgs(interp, 1, objv, "?YES/NO?"); 1552 return TCL_ERROR; 1553 } 1554 if( objc==1 ){ 1555 Tcl_SetObjResult(interp, Tcl_NewBooleanObj(async_vfs.pAppData!=0)); 1556 }else{ 1557 int en; 1558 if( Tcl_GetBooleanFromObj(interp, objv[1], &en) ) return TCL_ERROR; 1559 asyncEnable(en); 1560 } 1561 return TCL_OK; 1562 } 1563 1564 /* 1565 ** sqlite3async_halt "now"|"idle"|"never" 1566 ** 1567 ** Set the conditions at which the writer thread will halt. 1568 */ 1569 static int testAsyncHalt( 1570 void * clientData, 1571 Tcl_Interp *interp, 1572 int objc, 1573 Tcl_Obj *CONST objv[] 1574 ){ 1575 const char *zCond; 1576 if( objc!=2 ){ 1577 Tcl_WrongNumArgs(interp, 1, objv, "\"now\"|\"idle\"|\"never\""); 1578 return TCL_ERROR; 1579 } 1580 zCond = Tcl_GetString(objv[1]); 1581 if( strcmp(zCond, "now")==0 ){ 1582 async.writerHaltNow = 1; 1583 pthread_cond_broadcast(&async.queueSignal); 1584 }else if( strcmp(zCond, "idle")==0 ){ 1585 async.writerHaltWhenIdle = 1; 1586 async.writerHaltNow = 0; 1587 pthread_cond_broadcast(&async.queueSignal); 1588 }else if( strcmp(zCond, "never")==0 ){ 1589 async.writerHaltWhenIdle = 0; 1590 async.writerHaltNow = 0; 1591 }else{ 1592 Tcl_AppendResult(interp, 1593 "should be one of: \"now\", \"idle\", or \"never\"", (char*)0); 1594 return TCL_ERROR; 1595 } 1596 return TCL_OK; 1597 } 1598 1599 /* 1600 ** sqlite3async_delay ?MS? 1601 ** 1602 ** Query or set the number of milliseconds of delay in the writer 1603 ** thread after each write operation. The default is 0. By increasing 1604 ** the memory delay we can simulate the effect of slow disk I/O. 1605 */ 1606 static int testAsyncDelay( 1607 void * clientData, 1608 Tcl_Interp *interp, 1609 int objc, 1610 Tcl_Obj *CONST objv[] 1611 ){ 1612 if( objc!=1 && objc!=2 ){ 1613 Tcl_WrongNumArgs(interp, 1, objv, "?MS?"); 1614 return TCL_ERROR; 1615 } 1616 if( objc==1 ){ 1617 Tcl_SetObjResult(interp, Tcl_NewIntObj(async.ioDelay)); 1618 }else{ 1619 int ioDelay; 1620 if( Tcl_GetIntFromObj(interp, objv[1], &ioDelay) ) return TCL_ERROR; 1621 async.ioDelay = ioDelay; 1622 } 1623 return TCL_OK; 1624 } 1625 1626 /* 1627 ** sqlite3async_start 1628 ** 1629 ** Start a new writer thread. 1630 */ 1631 static int testAsyncStart( 1632 void * clientData, 1633 Tcl_Interp *interp, 1634 int objc, 1635 Tcl_Obj *CONST objv[] 1636 ){ 1637 pthread_t x; 1638 int rc; 1639 volatile int isStarted = 0; 1640 rc = pthread_create(&x, 0, asyncWriterThread, (void *)&isStarted); 1641 if( rc ){ 1642 Tcl_AppendResult(interp, "failed to create the thread", 0); 1643 return TCL_ERROR; 1644 } 1645 pthread_detach(x); 1646 while( isStarted==0 ){ 1647 sched_yield(); 1648 } 1649 return TCL_OK; 1650 } 1651 1652 /* 1653 ** sqlite3async_wait 1654 ** 1655 ** Wait for the current writer thread to terminate. 1656 ** 1657 ** If the current writer thread is set to run forever then this 1658 ** command would block forever. To prevent that, an error is returned. 1659 */ 1660 static int testAsyncWait( 1661 void * clientData, 1662 Tcl_Interp *interp, 1663 int objc, 1664 Tcl_Obj *CONST objv[] 1665 ){ 1666 int cnt = 10; 1667 if( async.writerHaltNow==0 && async.writerHaltWhenIdle==0 ){ 1668 Tcl_AppendResult(interp, "would block forever", (char*)0); 1669 return TCL_ERROR; 1670 } 1671 1672 while( cnt-- && !pthread_mutex_trylock(&async.writerMutex) ){ 1673 pthread_mutex_unlock(&async.writerMutex); 1674 sched_yield(); 1675 } 1676 if( cnt>=0 ){ 1677 ASYNC_TRACE(("WAIT\n")); 1678 pthread_mutex_lock(&async.queueMutex); 1679 pthread_cond_broadcast(&async.queueSignal); 1680 pthread_mutex_unlock(&async.queueMutex); 1681 pthread_mutex_lock(&async.writerMutex); 1682 pthread_mutex_unlock(&async.writerMutex); 1683 }else{ 1684 ASYNC_TRACE(("NO-WAIT\n")); 1685 } 1686 return TCL_OK; 1687 } 1688 1689 1690 #endif /* OS_UNIX and SQLITE_THREADSAFE */ 1691 1692 /* 1693 ** This routine registers the custom TCL commands defined in this 1694 ** module. This should be the only procedure visible from outside 1695 ** of this module. 1696 */ 1697 int Sqlitetestasync_Init(Tcl_Interp *interp){ 1698 #if OS_UNIX && SQLITE_THREADSAFE 1699 Tcl_CreateObjCommand(interp,"sqlite3async_enable",testAsyncEnable,0,0); 1700 Tcl_CreateObjCommand(interp,"sqlite3async_halt",testAsyncHalt,0,0); 1701 Tcl_CreateObjCommand(interp,"sqlite3async_delay",testAsyncDelay,0,0); 1702 Tcl_CreateObjCommand(interp,"sqlite3async_start",testAsyncStart,0,0); 1703 Tcl_CreateObjCommand(interp,"sqlite3async_wait",testAsyncWait,0,0); 1704 Tcl_LinkVar(interp, "sqlite3async_trace", 1705 (char*)&sqlite3async_trace, TCL_LINK_INT); 1706 #endif /* OS_UNIX and SQLITE_THREADSAFE */ 1707 return TCL_OK; 1708 } 1709