1 /* 2 ** 2005 December 14 3 ** 4 ** The author disclaims copyright to this source code. In place of 5 ** a legal notice, here is a blessing: 6 ** 7 ** May you do good and not evil. 8 ** May you find forgiveness for yourself and forgive others. 9 ** May you share freely, never taking more than you give. 10 ** 11 ************************************************************************* 12 ** 13 ** $Id: test_async.c,v 1.45 2008/06/26 10:41:19 danielk1977 Exp $ 14 ** 15 ** This file contains an example implementation of an asynchronous IO 16 ** backend for SQLite. 17 ** 18 ** WHAT IS ASYNCHRONOUS I/O? 19 ** 20 ** With asynchronous I/O, write requests are handled by a separate thread 21 ** running in the background. This means that the thread that initiates 22 ** a database write does not have to wait for (sometimes slow) disk I/O 23 ** to occur. The write seems to happen very quickly, though in reality 24 ** it is happening at its usual slow pace in the background. 25 ** 26 ** Asynchronous I/O appears to give better responsiveness, but at a price. 27 ** You lose the Durable property. With the default I/O backend of SQLite, 28 ** once a write completes, you know that the information you wrote is 29 ** safely on disk. With the asynchronous I/O, this is not the case. If 30 ** your program crashes or if a power lose occurs after the database 31 ** write but before the asynchronous write thread has completed, then the 32 ** database change might never make it to disk and the next user of the 33 ** database might not see your change. 34 ** 35 ** You lose Durability with asynchronous I/O, but you still retain the 36 ** other parts of ACID: Atomic, Consistent, and Isolated. Many 37 ** appliations get along fine without the Durablity. 38 ** 39 ** HOW IT WORKS 40 ** 41 ** Asynchronous I/O works by creating a special SQLite "vfs" structure 42 ** and registering it with sqlite3_vfs_register(). When files opened via 43 ** this vfs are written to (using sqlite3OsWrite()), the data is not 44 ** written directly to disk, but is placed in the "write-queue" to be 45 ** handled by the background thread. 46 ** 47 ** When files opened with the asynchronous vfs are read from 48 ** (using sqlite3OsRead()), the data is read from the file on 49 ** disk and the write-queue, so that from the point of view of 50 ** the vfs reader the OsWrite() appears to have already completed. 51 ** 52 ** The special vfs is registered (and unregistered) by calls to 53 ** function asyncEnable() (see below). 54 ** 55 ** LIMITATIONS 56 ** 57 ** This demonstration code is deliberately kept simple in order to keep 58 ** the main ideas clear and easy to understand. Real applications that 59 ** want to do asynchronous I/O might want to add additional capabilities. 60 ** For example, in this demonstration if writes are happening at a steady 61 ** stream that exceeds the I/O capability of the background writer thread, 62 ** the queue of pending write operations will grow without bound until we 63 ** run out of memory. Users of this technique may want to keep track of 64 ** the quantity of pending writes and stop accepting new write requests 65 ** when the buffer gets to be too big. 66 ** 67 ** LOCKING + CONCURRENCY 68 ** 69 ** Multiple connections from within a single process that use this 70 ** implementation of asynchronous IO may access a single database 71 ** file concurrently. From the point of view of the user, if all 72 ** connections are from within a single process, there is no difference 73 ** between the concurrency offered by "normal" SQLite and SQLite 74 ** using the asynchronous backend. 75 ** 76 ** If connections from within multiple database files may access the 77 ** database file, the ENABLE_FILE_LOCKING symbol (see below) must be 78 ** defined. If it is not defined, then no locks are established on 79 ** the database file. In this case, if multiple processes access 80 ** the database file, corruption will quickly result. 81 ** 82 ** If ENABLE_FILE_LOCKING is defined (the default), then connections 83 ** from within multiple processes may access a single database file 84 ** without risking corruption. However concurrency is reduced as 85 ** follows: 86 ** 87 ** * When a connection using asynchronous IO begins a database 88 ** transaction, the database is locked immediately. However the 89 ** lock is not released until after all relevant operations 90 ** in the write-queue have been flushed to disk. This means 91 ** (for example) that the database may remain locked for some 92 ** time after a "COMMIT" or "ROLLBACK" is issued. 93 ** 94 ** * If an application using asynchronous IO executes transactions 95 ** in quick succession, other database users may be effectively 96 ** locked out of the database. This is because when a BEGIN 97 ** is executed, a database lock is established immediately. But 98 ** when the corresponding COMMIT or ROLLBACK occurs, the lock 99 ** is not released until the relevant part of the write-queue 100 ** has been flushed through. As a result, if a COMMIT is followed 101 ** by a BEGIN before the write-queue is flushed through, the database 102 ** is never unlocked,preventing other processes from accessing 103 ** the database. 104 ** 105 ** Defining ENABLE_FILE_LOCKING when using an NFS or other remote 106 ** file-system may slow things down, as synchronous round-trips to the 107 ** server may be required to establish database file locks. 108 */ 109 #define ENABLE_FILE_LOCKING 110 111 #ifndef SQLITE_AMALGAMATION 112 # include "sqliteInt.h" 113 #endif 114 #include <tcl.h> 115 116 /* 117 ** This test uses pthreads and hence only works on unix and with 118 ** a threadsafe build of SQLite. 119 */ 120 #if SQLITE_OS_UNIX && SQLITE_THREADSAFE 121 122 /* 123 ** This demo uses pthreads. If you do not have a pthreads implementation 124 ** for your operating system, you will need to recode the threading 125 ** logic. 126 */ 127 #include <pthread.h> 128 #include <sched.h> 129 130 /* Useful macros used in several places */ 131 #define MIN(x,y) ((x)<(y)?(x):(y)) 132 #define MAX(x,y) ((x)>(y)?(x):(y)) 133 134 /* Forward references */ 135 typedef struct AsyncWrite AsyncWrite; 136 typedef struct AsyncFile AsyncFile; 137 typedef struct AsyncFileData AsyncFileData; 138 typedef struct AsyncFileLock AsyncFileLock; 139 typedef struct AsyncLock AsyncLock; 140 141 /* Enable for debugging */ 142 static int sqlite3async_trace = 0; 143 # define ASYNC_TRACE(X) if( sqlite3async_trace ) asyncTrace X 144 static void asyncTrace(const char *zFormat, ...){ 145 char *z; 146 va_list ap; 147 va_start(ap, zFormat); 148 z = sqlite3_vmprintf(zFormat, ap); 149 va_end(ap); 150 fprintf(stderr, "[%d] %s", (int)pthread_self(), z); 151 sqlite3_free(z); 152 } 153 154 /* 155 ** THREAD SAFETY NOTES 156 ** 157 ** Basic rules: 158 ** 159 ** * Both read and write access to the global write-op queue must be 160 ** protected by the async.queueMutex. As are the async.ioError and 161 ** async.nFile variables. 162 ** 163 ** * The async.aLock hash-table and all AsyncLock and AsyncFileLock 164 ** structures must be protected by the async.lockMutex mutex. 165 ** 166 ** * The file handles from the underlying system are assumed not to 167 ** be thread safe. 168 ** 169 ** * See the last two paragraphs under "The Writer Thread" for 170 ** an assumption to do with file-handle synchronization by the Os. 171 ** 172 ** Deadlock prevention: 173 ** 174 ** There are three mutex used by the system: the "writer" mutex, 175 ** the "queue" mutex and the "lock" mutex. Rules are: 176 ** 177 ** * It is illegal to block on the writer mutex when any other mutex 178 ** are held, and 179 ** 180 ** * It is illegal to block on the queue mutex when the lock mutex 181 ** is held. 182 ** 183 ** i.e. mutex's must be grabbed in the order "writer", "queue", "lock". 184 ** 185 ** File system operations (invoked by SQLite thread): 186 ** 187 ** xOpen 188 ** xDelete 189 ** xFileExists 190 ** 191 ** File handle operations (invoked by SQLite thread): 192 ** 193 ** asyncWrite, asyncClose, asyncTruncate, asyncSync 194 ** 195 ** The operations above add an entry to the global write-op list. They 196 ** prepare the entry, acquire the async.queueMutex momentarily while 197 ** list pointers are manipulated to insert the new entry, then release 198 ** the mutex and signal the writer thread to wake up in case it happens 199 ** to be asleep. 200 ** 201 ** 202 ** asyncRead, asyncFileSize. 203 ** 204 ** Read operations. Both of these read from both the underlying file 205 ** first then adjust their result based on pending writes in the 206 ** write-op queue. So async.queueMutex is held for the duration 207 ** of these operations to prevent other threads from changing the 208 ** queue in mid operation. 209 ** 210 ** 211 ** asyncLock, asyncUnlock, asyncCheckReservedLock 212 ** 213 ** These primitives implement in-process locking using a hash table 214 ** on the file name. Files are locked correctly for connections coming 215 ** from the same process. But other processes cannot see these locks 216 ** and will therefore not honor them. 217 ** 218 ** 219 ** The writer thread: 220 ** 221 ** The async.writerMutex is used to make sure only there is only 222 ** a single writer thread running at a time. 223 ** 224 ** Inside the writer thread is a loop that works like this: 225 ** 226 ** WHILE (write-op list is not empty) 227 ** Do IO operation at head of write-op list 228 ** Remove entry from head of write-op list 229 ** END WHILE 230 ** 231 ** The async.queueMutex is always held during the <write-op list is 232 ** not empty> test, and when the entry is removed from the head 233 ** of the write-op list. Sometimes it is held for the interim 234 ** period (while the IO is performed), and sometimes it is 235 ** relinquished. It is relinquished if (a) the IO op is an 236 ** ASYNC_CLOSE or (b) when the file handle was opened, two of 237 ** the underlying systems handles were opened on the same 238 ** file-system entry. 239 ** 240 ** If condition (b) above is true, then one file-handle 241 ** (AsyncFile.pBaseRead) is used exclusively by sqlite threads to read the 242 ** file, the other (AsyncFile.pBaseWrite) by sqlite3_async_flush() 243 ** threads to perform write() operations. This means that read 244 ** operations are not blocked by asynchronous writes (although 245 ** asynchronous writes may still be blocked by reads). 246 ** 247 ** This assumes that the OS keeps two handles open on the same file 248 ** properly in sync. That is, any read operation that starts after a 249 ** write operation on the same file system entry has completed returns 250 ** data consistent with the write. We also assume that if one thread 251 ** reads a file while another is writing it all bytes other than the 252 ** ones actually being written contain valid data. 253 ** 254 ** If the above assumptions are not true, set the preprocessor symbol 255 ** SQLITE_ASYNC_TWO_FILEHANDLES to 0. 256 */ 257 258 #ifndef SQLITE_ASYNC_TWO_FILEHANDLES 259 /* #define SQLITE_ASYNC_TWO_FILEHANDLES 0 */ 260 #define SQLITE_ASYNC_TWO_FILEHANDLES 1 261 #endif 262 263 /* 264 ** State information is held in the static variable "async" defined 265 ** as the following structure. 266 ** 267 ** Both async.ioError and async.nFile are protected by async.queueMutex. 268 */ 269 static struct TestAsyncStaticData { 270 pthread_mutex_t lockMutex; /* For access to aLock hash table */ 271 pthread_mutex_t queueMutex; /* Mutex for access to write operation queue */ 272 pthread_mutex_t writerMutex; /* Prevents multiple writer threads */ 273 pthread_cond_t queueSignal; /* For waking up sleeping writer thread */ 274 pthread_cond_t emptySignal; /* Notify when the write queue is empty */ 275 AsyncWrite *pQueueFirst; /* Next write operation to be processed */ 276 AsyncWrite *pQueueLast; /* Last write operation on the list */ 277 Hash aLock; /* Files locked */ 278 volatile int ioDelay; /* Extra delay between write operations */ 279 volatile int writerHaltWhenIdle; /* Writer thread halts when queue empty */ 280 volatile int writerHaltNow; /* Writer thread halts after next op */ 281 int ioError; /* True if an IO error has occured */ 282 int nFile; /* Number of open files (from sqlite pov) */ 283 } async = { 284 PTHREAD_MUTEX_INITIALIZER, 285 PTHREAD_MUTEX_INITIALIZER, 286 PTHREAD_MUTEX_INITIALIZER, 287 PTHREAD_COND_INITIALIZER, 288 PTHREAD_COND_INITIALIZER, 289 }; 290 291 /* Possible values of AsyncWrite.op */ 292 #define ASYNC_NOOP 0 293 #define ASYNC_WRITE 1 294 #define ASYNC_SYNC 2 295 #define ASYNC_TRUNCATE 3 296 #define ASYNC_CLOSE 4 297 #define ASYNC_DELETE 5 298 #define ASYNC_OPENEXCLUSIVE 6 299 #define ASYNC_UNLOCK 7 300 301 /* Names of opcodes. Used for debugging only. 302 ** Make sure these stay in sync with the macros above! 303 */ 304 static const char *azOpcodeName[] = { 305 "NOOP", "WRITE", "SYNC", "TRUNCATE", "CLOSE", "DELETE", "OPENEX", "UNLOCK" 306 }; 307 308 /* 309 ** Entries on the write-op queue are instances of the AsyncWrite 310 ** structure, defined here. 311 ** 312 ** The interpretation of the iOffset and nByte variables varies depending 313 ** on the value of AsyncWrite.op: 314 ** 315 ** ASYNC_NOOP: 316 ** No values used. 317 ** 318 ** ASYNC_WRITE: 319 ** iOffset -> Offset in file to write to. 320 ** nByte -> Number of bytes of data to write (pointed to by zBuf). 321 ** 322 ** ASYNC_SYNC: 323 ** nByte -> flags to pass to sqlite3OsSync(). 324 ** 325 ** ASYNC_TRUNCATE: 326 ** iOffset -> Size to truncate file to. 327 ** nByte -> Unused. 328 ** 329 ** ASYNC_CLOSE: 330 ** iOffset -> Unused. 331 ** nByte -> Unused. 332 ** 333 ** ASYNC_DELETE: 334 ** iOffset -> Contains the "syncDir" flag. 335 ** nByte -> Number of bytes of zBuf points to (file name). 336 ** 337 ** ASYNC_OPENEXCLUSIVE: 338 ** iOffset -> Value of "delflag". 339 ** nByte -> Number of bytes of zBuf points to (file name). 340 ** 341 ** ASYNC_UNLOCK: 342 ** nByte -> Argument to sqlite3OsUnlock(). 343 ** 344 ** 345 ** For an ASYNC_WRITE operation, zBuf points to the data to write to the file. 346 ** This space is sqlite3_malloc()d along with the AsyncWrite structure in a 347 ** single blob, so is deleted when sqlite3_free() is called on the parent 348 ** structure. 349 */ 350 struct AsyncWrite { 351 AsyncFileData *pFileData; /* File to write data to or sync */ 352 int op; /* One of ASYNC_xxx etc. */ 353 i64 iOffset; /* See above */ 354 int nByte; /* See above */ 355 char *zBuf; /* Data to write to file (or NULL if op!=ASYNC_WRITE) */ 356 AsyncWrite *pNext; /* Next write operation (to any file) */ 357 }; 358 359 /* 360 ** An instance of this structure is created for each distinct open file 361 ** (i.e. if two handles are opened on the one file, only one of these 362 ** structures is allocated) and stored in the async.aLock hash table. The 363 ** keys for async.aLock are the full pathnames of the opened files. 364 ** 365 ** AsyncLock.pList points to the head of a linked list of AsyncFileLock 366 ** structures, one for each handle currently open on the file. 367 ** 368 ** If the opened file is not a main-database (the SQLITE_OPEN_MAIN_DB is 369 ** not passed to the sqlite3OsOpen() call), or if ENABLE_FILE_LOCKING is 370 ** not defined at compile time, variables AsyncLock.pFile and 371 ** AsyncLock.eLock are never used. Otherwise, pFile is a file handle 372 ** opened on the file in question and used to obtain the file-system 373 ** locks required by database connections within this process. 374 ** 375 ** See comments above the asyncLock() function for more details on 376 ** the implementation of database locking used by this backend. 377 */ 378 struct AsyncLock { 379 sqlite3_file *pFile; 380 int eLock; 381 AsyncFileLock *pList; 382 }; 383 384 /* 385 ** An instance of the following structure is allocated along with each 386 ** AsyncFileData structure (see AsyncFileData.lock), but is only used if the 387 ** file was opened with the SQLITE_OPEN_MAIN_DB. 388 */ 389 struct AsyncFileLock { 390 int eLock; /* Internally visible lock state (sqlite pov) */ 391 int eAsyncLock; /* Lock-state with write-queue unlock */ 392 AsyncFileLock *pNext; 393 }; 394 395 /* 396 ** The AsyncFile structure is a subclass of sqlite3_file used for 397 ** asynchronous IO. 398 ** 399 ** All of the actual data for the structure is stored in the structure 400 ** pointed to by AsyncFile.pData, which is allocated as part of the 401 ** sqlite3OsOpen() using sqlite3_malloc(). The reason for this is that the 402 ** lifetime of the AsyncFile structure is ended by the caller after OsClose() 403 ** is called, but the data in AsyncFileData may be required by the 404 ** writer thread after that point. 405 */ 406 struct AsyncFile { 407 sqlite3_io_methods *pMethod; 408 AsyncFileData *pData; 409 }; 410 struct AsyncFileData { 411 char *zName; /* Underlying OS filename - used for debugging */ 412 int nName; /* Number of characters in zName */ 413 sqlite3_file *pBaseRead; /* Read handle to the underlying Os file */ 414 sqlite3_file *pBaseWrite; /* Write handle to the underlying Os file */ 415 AsyncFileLock lock; 416 AsyncWrite close; 417 }; 418 419 /* 420 ** The following async_XXX functions are debugging wrappers around the 421 ** corresponding pthread_XXX functions: 422 ** 423 ** pthread_mutex_lock(); 424 ** pthread_mutex_unlock(); 425 ** pthread_mutex_trylock(); 426 ** pthread_cond_wait(); 427 ** 428 ** It is illegal to pass any mutex other than those stored in the 429 ** following global variables of these functions. 430 ** 431 ** async.queueMutex 432 ** async.writerMutex 433 ** async.lockMutex 434 ** 435 ** If NDEBUG is defined, these wrappers do nothing except call the 436 ** corresponding pthreads function. If NDEBUG is not defined, then the 437 ** following variables are used to store the thread-id (as returned 438 ** by pthread_self()) currently holding the mutex, or 0 otherwise: 439 ** 440 ** asyncdebug.queueMutexHolder 441 ** asyncdebug.writerMutexHolder 442 ** asyncdebug.lockMutexHolder 443 ** 444 ** These variables are used by some assert() statements that verify 445 ** the statements made in the "Deadlock Prevention" notes earlier 446 ** in this file. 447 */ 448 #ifndef NDEBUG 449 450 static struct TestAsyncDebugData { 451 pthread_t lockMutexHolder; 452 pthread_t queueMutexHolder; 453 pthread_t writerMutexHolder; 454 } asyncdebug = {0, 0, 0}; 455 456 /* 457 ** Wrapper around pthread_mutex_lock(). Checks that we have not violated 458 ** the anti-deadlock rules (see "Deadlock prevention" above). 459 */ 460 static int async_mutex_lock(pthread_mutex_t *pMutex){ 461 int iIdx; 462 int rc; 463 pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); 464 pthread_t *aHolder = (pthread_t *)(&asyncdebug); 465 466 /* The code in this 'ifndef NDEBUG' block depends on a certain alignment 467 * of the variables in TestAsyncStaticData and TestAsyncDebugData. The 468 * following assert() statements check that this has not been changed. 469 * 470 * Really, these only need to be run once at startup time. 471 */ 472 assert(&(aMutex[0])==&async.lockMutex); 473 assert(&(aMutex[1])==&async.queueMutex); 474 assert(&(aMutex[2])==&async.writerMutex); 475 assert(&(aHolder[0])==&asyncdebug.lockMutexHolder); 476 assert(&(aHolder[1])==&asyncdebug.queueMutexHolder); 477 assert(&(aHolder[2])==&asyncdebug.writerMutexHolder); 478 479 assert( pthread_self()!=0 ); 480 481 for(iIdx=0; iIdx<3; iIdx++){ 482 if( pMutex==&aMutex[iIdx] ) break; 483 484 /* This is the key assert(). Here we are checking that if the caller 485 * is trying to block on async.writerMutex, neither of the other two 486 * mutex are held. If the caller is trying to block on async.queueMutex, 487 * lockMutex is not held. 488 */ 489 assert(!pthread_equal(aHolder[iIdx], pthread_self())); 490 } 491 assert(iIdx<3); 492 493 rc = pthread_mutex_lock(pMutex); 494 if( rc==0 ){ 495 assert(aHolder[iIdx]==0); 496 aHolder[iIdx] = pthread_self(); 497 } 498 return rc; 499 } 500 501 /* 502 ** Wrapper around pthread_mutex_unlock(). 503 */ 504 static int async_mutex_unlock(pthread_mutex_t *pMutex){ 505 int iIdx; 506 int rc; 507 pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); 508 pthread_t *aHolder = (pthread_t *)(&asyncdebug); 509 510 for(iIdx=0; iIdx<3; iIdx++){ 511 if( pMutex==&aMutex[iIdx] ) break; 512 } 513 assert(iIdx<3); 514 515 assert(pthread_equal(aHolder[iIdx], pthread_self())); 516 aHolder[iIdx] = 0; 517 rc = pthread_mutex_unlock(pMutex); 518 assert(rc==0); 519 520 return 0; 521 } 522 523 /* 524 ** Wrapper around pthread_mutex_trylock(). 525 */ 526 static int async_mutex_trylock(pthread_mutex_t *pMutex){ 527 int iIdx; 528 int rc; 529 pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); 530 pthread_t *aHolder = (pthread_t *)(&asyncdebug); 531 532 for(iIdx=0; iIdx<3; iIdx++){ 533 if( pMutex==&aMutex[iIdx] ) break; 534 } 535 assert(iIdx<3); 536 537 rc = pthread_mutex_trylock(pMutex); 538 if( rc==0 ){ 539 assert(aHolder[iIdx]==0); 540 aHolder[iIdx] = pthread_self(); 541 } 542 return rc; 543 } 544 545 /* 546 ** Wrapper around pthread_cond_wait(). 547 */ 548 static int async_cond_wait(pthread_cond_t *pCond, pthread_mutex_t *pMutex){ 549 int iIdx; 550 int rc; 551 pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); 552 pthread_t *aHolder = (pthread_t *)(&asyncdebug); 553 554 for(iIdx=0; iIdx<3; iIdx++){ 555 if( pMutex==&aMutex[iIdx] ) break; 556 } 557 assert(iIdx<3); 558 559 assert(pthread_equal(aHolder[iIdx],pthread_self())); 560 aHolder[iIdx] = 0; 561 rc = pthread_cond_wait(pCond, pMutex); 562 if( rc==0 ){ 563 aHolder[iIdx] = pthread_self(); 564 } 565 return rc; 566 } 567 568 /* Call our async_XX wrappers instead of selected pthread_XX functions */ 569 #define pthread_mutex_lock async_mutex_lock 570 #define pthread_mutex_unlock async_mutex_unlock 571 #define pthread_mutex_trylock async_mutex_trylock 572 #define pthread_cond_wait async_cond_wait 573 574 #endif /* !defined(NDEBUG) */ 575 576 /* 577 ** Add an entry to the end of the global write-op list. pWrite should point 578 ** to an AsyncWrite structure allocated using sqlite3_malloc(). The writer 579 ** thread will call sqlite3_free() to free the structure after the specified 580 ** operation has been completed. 581 ** 582 ** Once an AsyncWrite structure has been added to the list, it becomes the 583 ** property of the writer thread and must not be read or modified by the 584 ** caller. 585 */ 586 static void addAsyncWrite(AsyncWrite *pWrite){ 587 /* We must hold the queue mutex in order to modify the queue pointers */ 588 pthread_mutex_lock(&async.queueMutex); 589 590 /* Add the record to the end of the write-op queue */ 591 assert( !pWrite->pNext ); 592 if( async.pQueueLast ){ 593 assert( async.pQueueFirst ); 594 async.pQueueLast->pNext = pWrite; 595 }else{ 596 async.pQueueFirst = pWrite; 597 } 598 async.pQueueLast = pWrite; 599 ASYNC_TRACE(("PUSH %p (%s %s %d)\n", pWrite, azOpcodeName[pWrite->op], 600 pWrite->pFileData ? pWrite->pFileData->zName : "-", pWrite->iOffset)); 601 602 if( pWrite->op==ASYNC_CLOSE ){ 603 async.nFile--; 604 } 605 606 /* Drop the queue mutex */ 607 pthread_mutex_unlock(&async.queueMutex); 608 609 /* The writer thread might have been idle because there was nothing 610 ** on the write-op queue for it to do. So wake it up. */ 611 pthread_cond_signal(&async.queueSignal); 612 } 613 614 /* 615 ** Increment async.nFile in a thread-safe manner. 616 */ 617 static void incrOpenFileCount(){ 618 /* We must hold the queue mutex in order to modify async.nFile */ 619 pthread_mutex_lock(&async.queueMutex); 620 if( async.nFile==0 ){ 621 async.ioError = SQLITE_OK; 622 } 623 async.nFile++; 624 pthread_mutex_unlock(&async.queueMutex); 625 } 626 627 /* 628 ** This is a utility function to allocate and populate a new AsyncWrite 629 ** structure and insert it (via addAsyncWrite() ) into the global list. 630 */ 631 static int addNewAsyncWrite( 632 AsyncFileData *pFileData, 633 int op, 634 i64 iOffset, 635 int nByte, 636 const char *zByte 637 ){ 638 AsyncWrite *p; 639 if( op!=ASYNC_CLOSE && async.ioError ){ 640 return async.ioError; 641 } 642 p = sqlite3_malloc(sizeof(AsyncWrite) + (zByte?nByte:0)); 643 if( !p ){ 644 /* The upper layer does not expect operations like OsWrite() to 645 ** return SQLITE_NOMEM. This is partly because under normal conditions 646 ** SQLite is required to do rollback without calling malloc(). So 647 ** if malloc() fails here, treat it as an I/O error. The above 648 ** layer knows how to handle that. 649 */ 650 return SQLITE_IOERR; 651 } 652 p->op = op; 653 p->iOffset = iOffset; 654 p->nByte = nByte; 655 p->pFileData = pFileData; 656 p->pNext = 0; 657 if( zByte ){ 658 p->zBuf = (char *)&p[1]; 659 memcpy(p->zBuf, zByte, nByte); 660 }else{ 661 p->zBuf = 0; 662 } 663 addAsyncWrite(p); 664 return SQLITE_OK; 665 } 666 667 /* 668 ** Close the file. This just adds an entry to the write-op list, the file is 669 ** not actually closed. 670 */ 671 static int asyncClose(sqlite3_file *pFile){ 672 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 673 674 /* Unlock the file, if it is locked */ 675 pthread_mutex_lock(&async.lockMutex); 676 p->lock.eLock = 0; 677 pthread_mutex_unlock(&async.lockMutex); 678 679 addAsyncWrite(&p->close); 680 return SQLITE_OK; 681 } 682 683 /* 684 ** Implementation of sqlite3OsWrite() for asynchronous files. Instead of 685 ** writing to the underlying file, this function adds an entry to the end of 686 ** the global AsyncWrite list. Either SQLITE_OK or SQLITE_NOMEM may be 687 ** returned. 688 */ 689 static int asyncWrite(sqlite3_file *pFile, const void *pBuf, int amt, i64 iOff){ 690 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 691 return addNewAsyncWrite(p, ASYNC_WRITE, iOff, amt, pBuf); 692 } 693 694 /* 695 ** Read data from the file. First we read from the filesystem, then adjust 696 ** the contents of the buffer based on ASYNC_WRITE operations in the 697 ** write-op queue. 698 ** 699 ** This method holds the mutex from start to finish. 700 */ 701 static int asyncRead(sqlite3_file *pFile, void *zOut, int iAmt, i64 iOffset){ 702 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 703 int rc = SQLITE_OK; 704 i64 filesize; 705 int nRead; 706 sqlite3_file *pBase = p->pBaseRead; 707 708 /* Grab the write queue mutex for the duration of the call */ 709 pthread_mutex_lock(&async.queueMutex); 710 711 /* If an I/O error has previously occurred in this virtual file 712 ** system, then all subsequent operations fail. 713 */ 714 if( async.ioError!=SQLITE_OK ){ 715 rc = async.ioError; 716 goto asyncread_out; 717 } 718 719 if( pBase->pMethods ){ 720 rc = sqlite3OsFileSize(pBase, &filesize); 721 if( rc!=SQLITE_OK ){ 722 goto asyncread_out; 723 } 724 nRead = MIN(filesize - iOffset, iAmt); 725 if( nRead>0 ){ 726 rc = sqlite3OsRead(pBase, zOut, nRead, iOffset); 727 ASYNC_TRACE(("READ %s %d bytes at %d\n", p->zName, nRead, iOffset)); 728 } 729 } 730 731 if( rc==SQLITE_OK ){ 732 AsyncWrite *pWrite; 733 char *zName = p->zName; 734 735 for(pWrite=async.pQueueFirst; pWrite; pWrite = pWrite->pNext){ 736 if( pWrite->op==ASYNC_WRITE && pWrite->pFileData->zName==zName ){ 737 int iBeginOut = (pWrite->iOffset-iOffset); 738 int iBeginIn = -iBeginOut; 739 int nCopy; 740 741 if( iBeginIn<0 ) iBeginIn = 0; 742 if( iBeginOut<0 ) iBeginOut = 0; 743 nCopy = MIN(pWrite->nByte-iBeginIn, iAmt-iBeginOut); 744 745 if( nCopy>0 ){ 746 memcpy(&((char *)zOut)[iBeginOut], &pWrite->zBuf[iBeginIn], nCopy); 747 ASYNC_TRACE(("OVERREAD %d bytes at %d\n", nCopy, iBeginOut+iOffset)); 748 } 749 } 750 } 751 } 752 753 asyncread_out: 754 pthread_mutex_unlock(&async.queueMutex); 755 return rc; 756 } 757 758 /* 759 ** Truncate the file to nByte bytes in length. This just adds an entry to 760 ** the write-op list, no IO actually takes place. 761 */ 762 static int asyncTruncate(sqlite3_file *pFile, i64 nByte){ 763 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 764 return addNewAsyncWrite(p, ASYNC_TRUNCATE, nByte, 0, 0); 765 } 766 767 /* 768 ** Sync the file. This just adds an entry to the write-op list, the 769 ** sync() is done later by sqlite3_async_flush(). 770 */ 771 static int asyncSync(sqlite3_file *pFile, int flags){ 772 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 773 return addNewAsyncWrite(p, ASYNC_SYNC, 0, flags, 0); 774 } 775 776 /* 777 ** Read the size of the file. First we read the size of the file system 778 ** entry, then adjust for any ASYNC_WRITE or ASYNC_TRUNCATE operations 779 ** currently in the write-op list. 780 ** 781 ** This method holds the mutex from start to finish. 782 */ 783 int asyncFileSize(sqlite3_file *pFile, i64 *piSize){ 784 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 785 int rc = SQLITE_OK; 786 i64 s = 0; 787 sqlite3_file *pBase; 788 789 pthread_mutex_lock(&async.queueMutex); 790 791 /* Read the filesystem size from the base file. If pBaseRead is NULL, this 792 ** means the file hasn't been opened yet. In this case all relevant data 793 ** must be in the write-op queue anyway, so we can omit reading from the 794 ** file-system. 795 */ 796 pBase = p->pBaseRead; 797 if( pBase->pMethods ){ 798 rc = sqlite3OsFileSize(pBase, &s); 799 } 800 801 if( rc==SQLITE_OK ){ 802 AsyncWrite *pWrite; 803 for(pWrite=async.pQueueFirst; pWrite; pWrite = pWrite->pNext){ 804 if( pWrite->op==ASYNC_DELETE && strcmp(p->zName, pWrite->zBuf)==0 ){ 805 s = 0; 806 }else if( pWrite->pFileData && pWrite->pFileData->zName==p->zName){ 807 switch( pWrite->op ){ 808 case ASYNC_WRITE: 809 s = MAX(pWrite->iOffset + (i64)(pWrite->nByte), s); 810 break; 811 case ASYNC_TRUNCATE: 812 s = MIN(s, pWrite->iOffset); 813 break; 814 } 815 } 816 } 817 *piSize = s; 818 } 819 pthread_mutex_unlock(&async.queueMutex); 820 return rc; 821 } 822 823 /* 824 ** Lock or unlock the actual file-system entry. 825 */ 826 static int getFileLock(AsyncLock *pLock){ 827 int rc = SQLITE_OK; 828 AsyncFileLock *pIter; 829 int eRequired = 0; 830 831 if( pLock->pFile ){ 832 for(pIter=pLock->pList; pIter; pIter=pIter->pNext){ 833 assert(pIter->eAsyncLock>=pIter->eLock); 834 if( pIter->eAsyncLock>eRequired ){ 835 eRequired = pIter->eAsyncLock; 836 assert(eRequired>=0 && eRequired<=SQLITE_LOCK_EXCLUSIVE); 837 } 838 } 839 840 if( eRequired>pLock->eLock ){ 841 rc = sqlite3OsLock(pLock->pFile, eRequired); 842 if( rc==SQLITE_OK ){ 843 pLock->eLock = eRequired; 844 } 845 } 846 else if( eRequired<pLock->eLock && eRequired<=SQLITE_LOCK_SHARED ){ 847 rc = sqlite3OsUnlock(pLock->pFile, eRequired); 848 if( rc==SQLITE_OK ){ 849 pLock->eLock = eRequired; 850 } 851 } 852 } 853 854 return rc; 855 } 856 857 /* 858 ** The following two methods - asyncLock() and asyncUnlock() - are used 859 ** to obtain and release locks on database files opened with the 860 ** asynchronous backend. 861 */ 862 static int asyncLock(sqlite3_file *pFile, int eLock){ 863 int rc = SQLITE_OK; 864 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 865 866 pthread_mutex_lock(&async.lockMutex); 867 if( p->lock.eLock<eLock ){ 868 AsyncLock *pLock; 869 AsyncFileLock *pIter; 870 pLock = (AsyncLock *)sqlite3HashFind(&async.aLock, p->zName, p->nName); 871 assert(pLock && pLock->pList); 872 for(pIter=pLock->pList; pIter; pIter=pIter->pNext){ 873 if( pIter!=&p->lock && ( 874 (eLock==SQLITE_LOCK_EXCLUSIVE && pIter->eLock>=SQLITE_LOCK_SHARED) || 875 (eLock==SQLITE_LOCK_PENDING && pIter->eLock>=SQLITE_LOCK_RESERVED) || 876 (eLock==SQLITE_LOCK_RESERVED && pIter->eLock>=SQLITE_LOCK_RESERVED) || 877 (eLock==SQLITE_LOCK_SHARED && pIter->eLock>=SQLITE_LOCK_PENDING) 878 )){ 879 rc = SQLITE_BUSY; 880 } 881 } 882 if( rc==SQLITE_OK ){ 883 p->lock.eLock = eLock; 884 p->lock.eAsyncLock = MAX(p->lock.eAsyncLock, eLock); 885 } 886 assert(p->lock.eAsyncLock>=p->lock.eLock); 887 if( rc==SQLITE_OK ){ 888 rc = getFileLock(pLock); 889 } 890 } 891 pthread_mutex_unlock(&async.lockMutex); 892 893 ASYNC_TRACE(("LOCK %d (%s) rc=%d\n", eLock, p->zName, rc)); 894 return rc; 895 } 896 static int asyncUnlock(sqlite3_file *pFile, int eLock){ 897 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 898 AsyncFileLock *pLock = &p->lock; 899 pthread_mutex_lock(&async.lockMutex); 900 pLock->eLock = MIN(pLock->eLock, eLock); 901 pthread_mutex_unlock(&async.lockMutex); 902 return addNewAsyncWrite(p, ASYNC_UNLOCK, 0, eLock, 0); 903 } 904 905 /* 906 ** This function is called when the pager layer first opens a database file 907 ** and is checking for a hot-journal. 908 */ 909 static int asyncCheckReservedLock(sqlite3_file *pFile, int *pResOut){ 910 int ret = 0; 911 AsyncFileLock *pIter; 912 AsyncLock *pLock; 913 AsyncFileData *p = ((AsyncFile *)pFile)->pData; 914 915 pthread_mutex_lock(&async.lockMutex); 916 pLock = (AsyncLock *)sqlite3HashFind(&async.aLock, p->zName, p->nName); 917 for(pIter=pLock->pList; pIter; pIter=pIter->pNext){ 918 if( pIter->eLock>=SQLITE_LOCK_RESERVED ){ 919 ret = 1; 920 } 921 } 922 pthread_mutex_unlock(&async.lockMutex); 923 924 ASYNC_TRACE(("CHECK-LOCK %d (%s)\n", ret, p->zName)); 925 *pResOut = ret; 926 return SQLITE_OK; 927 } 928 929 /* 930 ** This is a no-op, as the asynchronous backend does not support locking. 931 */ 932 static int asyncFileControl(sqlite3_file *id, int op, void *pArg){ 933 switch( op ){ 934 case SQLITE_FCNTL_LOCKSTATE: { 935 pthread_mutex_lock(&async.lockMutex); 936 *(int*)pArg = ((AsyncFile*)id)->pData->lock.eLock; 937 pthread_mutex_unlock(&async.lockMutex); 938 return SQLITE_OK; 939 } 940 } 941 return SQLITE_ERROR; 942 } 943 944 /* 945 ** Return the device characteristics and sector-size of the device. It 946 ** is not tricky to implement these correctly, as this backend might 947 ** not have an open file handle at this point. 948 */ 949 static int asyncSectorSize(sqlite3_file *pFile){ 950 return 512; 951 } 952 static int asyncDeviceCharacteristics(sqlite3_file *pFile){ 953 return 0; 954 } 955 956 static int unlinkAsyncFile(AsyncFileData *pData){ 957 AsyncLock *pLock; 958 AsyncFileLock **ppIter; 959 int rc = SQLITE_OK; 960 961 pLock = sqlite3HashFind(&async.aLock, pData->zName, pData->nName); 962 for(ppIter=&pLock->pList; *ppIter; ppIter=&((*ppIter)->pNext)){ 963 if( (*ppIter)==&pData->lock ){ 964 *ppIter = pData->lock.pNext; 965 break; 966 } 967 } 968 if( !pLock->pList ){ 969 if( pLock->pFile ){ 970 sqlite3OsClose(pLock->pFile); 971 } 972 sqlite3_free(pLock); 973 sqlite3HashInsert(&async.aLock, pData->zName, pData->nName, 0); 974 if( !sqliteHashFirst(&async.aLock) ){ 975 sqlite3HashClear(&async.aLock); 976 } 977 }else{ 978 rc = getFileLock(pLock); 979 } 980 981 return rc; 982 } 983 984 /* 985 ** Open a file. 986 */ 987 static int asyncOpen( 988 sqlite3_vfs *pAsyncVfs, 989 const char *zName, 990 sqlite3_file *pFile, 991 int flags, 992 int *pOutFlags 993 ){ 994 static sqlite3_io_methods async_methods = { 995 1, /* iVersion */ 996 asyncClose, /* xClose */ 997 asyncRead, /* xRead */ 998 asyncWrite, /* xWrite */ 999 asyncTruncate, /* xTruncate */ 1000 asyncSync, /* xSync */ 1001 asyncFileSize, /* xFileSize */ 1002 asyncLock, /* xLock */ 1003 asyncUnlock, /* xUnlock */ 1004 asyncCheckReservedLock, /* xCheckReservedLock */ 1005 asyncFileControl, /* xFileControl */ 1006 asyncSectorSize, /* xSectorSize */ 1007 asyncDeviceCharacteristics /* xDeviceCharacteristics */ 1008 }; 1009 1010 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1011 AsyncFile *p = (AsyncFile *)pFile; 1012 int nName = strlen(zName)+1; 1013 int rc = SQLITE_OK; 1014 int nByte; 1015 AsyncFileData *pData; 1016 AsyncLock *pLock = 0; 1017 char *z; 1018 int isExclusive = (flags&SQLITE_OPEN_EXCLUSIVE); 1019 1020 nByte = ( 1021 sizeof(AsyncFileData) + /* AsyncFileData structure */ 1022 2 * pVfs->szOsFile + /* AsyncFileData.pBaseRead and pBaseWrite */ 1023 nName /* AsyncFileData.zName */ 1024 ); 1025 z = sqlite3_malloc(nByte); 1026 if( !z ){ 1027 return SQLITE_NOMEM; 1028 } 1029 memset(z, 0, nByte); 1030 pData = (AsyncFileData*)z; 1031 z += sizeof(pData[0]); 1032 pData->pBaseRead = (sqlite3_file*)z; 1033 z += pVfs->szOsFile; 1034 pData->pBaseWrite = (sqlite3_file*)z; 1035 z += pVfs->szOsFile; 1036 pData->zName = z; 1037 pData->nName = nName; 1038 pData->close.pFileData = pData; 1039 pData->close.op = ASYNC_CLOSE; 1040 memcpy(pData->zName, zName, nName); 1041 1042 if( !isExclusive ){ 1043 rc = sqlite3OsOpen(pVfs, zName, pData->pBaseRead, flags, pOutFlags); 1044 if( rc==SQLITE_OK && ((*pOutFlags)&SQLITE_OPEN_READWRITE) ){ 1045 rc = sqlite3OsOpen(pVfs, zName, pData->pBaseWrite, flags, 0); 1046 } 1047 } 1048 1049 pthread_mutex_lock(&async.lockMutex); 1050 1051 if( rc==SQLITE_OK ){ 1052 pLock = sqlite3HashFind(&async.aLock, pData->zName, pData->nName); 1053 if( !pLock ){ 1054 pLock = sqlite3MallocZero(pVfs->szOsFile + sizeof(AsyncLock)); 1055 if( pLock ){ 1056 AsyncLock *pDelete; 1057 #ifdef ENABLE_FILE_LOCKING 1058 if( flags&SQLITE_OPEN_MAIN_DB ){ 1059 pLock->pFile = (sqlite3_file *)&pLock[1]; 1060 rc = sqlite3OsOpen(pVfs, zName, pLock->pFile, flags, 0); 1061 if( rc!=SQLITE_OK ){ 1062 sqlite3_free(pLock); 1063 pLock = 0; 1064 } 1065 } 1066 #endif 1067 pDelete = sqlite3HashInsert( 1068 &async.aLock, pData->zName, pData->nName, (void *)pLock 1069 ); 1070 if( pDelete ){ 1071 rc = SQLITE_NOMEM; 1072 sqlite3_free(pLock); 1073 } 1074 }else{ 1075 rc = SQLITE_NOMEM; 1076 } 1077 } 1078 } 1079 1080 if( rc==SQLITE_OK ){ 1081 HashElem *pElem; 1082 p->pMethod = &async_methods; 1083 p->pData = pData; 1084 1085 /* Link AsyncFileData.lock into the linked list of 1086 ** AsyncFileLock structures for this file. 1087 */ 1088 pData->lock.pNext = pLock->pList; 1089 pLock->pList = &pData->lock; 1090 1091 pElem = sqlite3HashFindElem(&async.aLock, pData->zName, pData->nName); 1092 pData->zName = (char *)sqliteHashKey(pElem); 1093 }else{ 1094 sqlite3OsClose(pData->pBaseRead); 1095 sqlite3OsClose(pData->pBaseWrite); 1096 sqlite3_free(pData); 1097 } 1098 1099 pthread_mutex_unlock(&async.lockMutex); 1100 1101 if( rc==SQLITE_OK ){ 1102 incrOpenFileCount(); 1103 } 1104 1105 if( rc==SQLITE_OK && isExclusive ){ 1106 rc = addNewAsyncWrite(pData, ASYNC_OPENEXCLUSIVE, (i64)flags, 0, 0); 1107 if( rc==SQLITE_OK ){ 1108 if( pOutFlags ) *pOutFlags = flags; 1109 }else{ 1110 pthread_mutex_lock(&async.lockMutex); 1111 unlinkAsyncFile(pData); 1112 pthread_mutex_unlock(&async.lockMutex); 1113 sqlite3_free(pData); 1114 } 1115 } 1116 return rc; 1117 } 1118 1119 /* 1120 ** Implementation of sqlite3OsDelete. Add an entry to the end of the 1121 ** write-op queue to perform the delete. 1122 */ 1123 static int asyncDelete(sqlite3_vfs *pAsyncVfs, const char *z, int syncDir){ 1124 return addNewAsyncWrite(0, ASYNC_DELETE, syncDir, strlen(z)+1, z); 1125 } 1126 1127 /* 1128 ** Implementation of sqlite3OsAccess. This method holds the mutex from 1129 ** start to finish. 1130 */ 1131 static int asyncAccess( 1132 sqlite3_vfs *pAsyncVfs, 1133 const char *zName, 1134 int flags, 1135 int *pResOut 1136 ){ 1137 int rc; 1138 int ret; 1139 AsyncWrite *p; 1140 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1141 1142 assert(flags==SQLITE_ACCESS_READWRITE 1143 || flags==SQLITE_ACCESS_READ 1144 || flags==SQLITE_ACCESS_EXISTS 1145 ); 1146 1147 pthread_mutex_lock(&async.queueMutex); 1148 rc = sqlite3OsAccess(pVfs, zName, flags, &ret); 1149 if( rc==SQLITE_OK && flags==SQLITE_ACCESS_EXISTS ){ 1150 for(p=async.pQueueFirst; p; p = p->pNext){ 1151 if( p->op==ASYNC_DELETE && 0==strcmp(p->zBuf, zName) ){ 1152 ret = 0; 1153 }else if( p->op==ASYNC_OPENEXCLUSIVE 1154 && 0==strcmp(p->pFileData->zName, zName) 1155 ){ 1156 ret = 1; 1157 } 1158 } 1159 } 1160 ASYNC_TRACE(("ACCESS(%s): %s = %d\n", 1161 flags==SQLITE_ACCESS_READWRITE?"read-write": 1162 flags==SQLITE_ACCESS_READ?"read":"exists" 1163 , zName, ret) 1164 ); 1165 pthread_mutex_unlock(&async.queueMutex); 1166 *pResOut = ret; 1167 return rc; 1168 } 1169 1170 /* 1171 ** Fill in zPathOut with the full path to the file identified by zPath. 1172 */ 1173 static int asyncFullPathname( 1174 sqlite3_vfs *pAsyncVfs, 1175 const char *zPath, 1176 int nPathOut, 1177 char *zPathOut 1178 ){ 1179 int rc; 1180 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1181 rc = sqlite3OsFullPathname(pVfs, zPath, nPathOut, zPathOut); 1182 1183 /* Because of the way intra-process file locking works, this backend 1184 ** needs to return a canonical path. The following block assumes the 1185 ** file-system uses unix style paths. 1186 */ 1187 if( rc==SQLITE_OK ){ 1188 int iIn; 1189 int iOut = 0; 1190 int nPathOut = strlen(zPathOut); 1191 1192 for(iIn=0; iIn<nPathOut; iIn++){ 1193 1194 /* Replace any occurences of "//" with "/" */ 1195 if( iIn<=(nPathOut-2) && zPathOut[iIn]=='/' && zPathOut[iIn+1]=='/' 1196 ){ 1197 continue; 1198 } 1199 1200 /* Replace any occurences of "/./" with "/" */ 1201 if( iIn<=(nPathOut-3) 1202 && zPathOut[iIn]=='/' && zPathOut[iIn+1]=='.' && zPathOut[iIn+2]=='/' 1203 ){ 1204 iIn++; 1205 continue; 1206 } 1207 1208 /* Replace any occurences of "<path-component>/../" with "" */ 1209 if( iOut>0 && iIn<=(nPathOut-4) 1210 && zPathOut[iIn]=='/' && zPathOut[iIn+1]=='.' 1211 && zPathOut[iIn+2]=='.' && zPathOut[iIn+3]=='/' 1212 ){ 1213 iIn += 3; 1214 iOut--; 1215 for( ; iOut>0 && zPathOut[iOut-1]!='/'; iOut--); 1216 continue; 1217 } 1218 1219 zPathOut[iOut++] = zPathOut[iIn]; 1220 } 1221 zPathOut[iOut] = '\0'; 1222 } 1223 1224 return rc; 1225 } 1226 static void *asyncDlOpen(sqlite3_vfs *pAsyncVfs, const char *zPath){ 1227 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1228 return pVfs->xDlOpen(pVfs, zPath); 1229 } 1230 static void asyncDlError(sqlite3_vfs *pAsyncVfs, int nByte, char *zErrMsg){ 1231 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1232 pVfs->xDlError(pVfs, nByte, zErrMsg); 1233 } 1234 static void *asyncDlSym( 1235 sqlite3_vfs *pAsyncVfs, 1236 void *pHandle, 1237 const char *zSymbol 1238 ){ 1239 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1240 return pVfs->xDlSym(pVfs, pHandle, zSymbol); 1241 } 1242 static void asyncDlClose(sqlite3_vfs *pAsyncVfs, void *pHandle){ 1243 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1244 pVfs->xDlClose(pVfs, pHandle); 1245 } 1246 static int asyncRandomness(sqlite3_vfs *pAsyncVfs, int nByte, char *zBufOut){ 1247 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1248 return pVfs->xRandomness(pVfs, nByte, zBufOut); 1249 } 1250 static int asyncSleep(sqlite3_vfs *pAsyncVfs, int nMicro){ 1251 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1252 return pVfs->xSleep(pVfs, nMicro); 1253 } 1254 static int asyncCurrentTime(sqlite3_vfs *pAsyncVfs, double *pTimeOut){ 1255 sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; 1256 return pVfs->xCurrentTime(pVfs, pTimeOut); 1257 } 1258 1259 static sqlite3_vfs async_vfs = { 1260 1, /* iVersion */ 1261 sizeof(AsyncFile), /* szOsFile */ 1262 0, /* mxPathname */ 1263 0, /* pNext */ 1264 "async", /* zName */ 1265 0, /* pAppData */ 1266 asyncOpen, /* xOpen */ 1267 asyncDelete, /* xDelete */ 1268 asyncAccess, /* xAccess */ 1269 asyncFullPathname, /* xFullPathname */ 1270 asyncDlOpen, /* xDlOpen */ 1271 asyncDlError, /* xDlError */ 1272 asyncDlSym, /* xDlSym */ 1273 asyncDlClose, /* xDlClose */ 1274 asyncRandomness, /* xDlError */ 1275 asyncSleep, /* xDlSym */ 1276 asyncCurrentTime /* xDlClose */ 1277 }; 1278 1279 /* 1280 ** Call this routine to enable or disable the 1281 ** asynchronous IO features implemented in this file. 1282 ** 1283 ** This routine is not even remotely threadsafe. Do not call 1284 ** this routine while any SQLite database connections are open. 1285 */ 1286 static void asyncEnable(int enable){ 1287 if( enable ){ 1288 if( !async_vfs.pAppData ){ 1289 static int hashTableInit = 0; 1290 async_vfs.pAppData = (void *)sqlite3_vfs_find(0); 1291 async_vfs.mxPathname = ((sqlite3_vfs *)async_vfs.pAppData)->mxPathname; 1292 sqlite3_vfs_register(&async_vfs, 1); 1293 if( !hashTableInit ){ 1294 sqlite3HashInit(&async.aLock, SQLITE_HASH_BINARY, 1); 1295 hashTableInit = 1; 1296 } 1297 } 1298 }else{ 1299 if( async_vfs.pAppData ){ 1300 sqlite3_vfs_unregister(&async_vfs); 1301 async_vfs.pAppData = 0; 1302 } 1303 } 1304 } 1305 1306 /* 1307 ** This procedure runs in a separate thread, reading messages off of the 1308 ** write queue and processing them one by one. 1309 ** 1310 ** If async.writerHaltNow is true, then this procedure exits 1311 ** after processing a single message. 1312 ** 1313 ** If async.writerHaltWhenIdle is true, then this procedure exits when 1314 ** the write queue is empty. 1315 ** 1316 ** If both of the above variables are false, this procedure runs 1317 ** indefinately, waiting for operations to be added to the write queue 1318 ** and processing them in the order in which they arrive. 1319 ** 1320 ** An artifical delay of async.ioDelay milliseconds is inserted before 1321 ** each write operation in order to simulate the effect of a slow disk. 1322 ** 1323 ** Only one instance of this procedure may be running at a time. 1324 */ 1325 static void *asyncWriterThread(void *pIsStarted){ 1326 sqlite3_vfs *pVfs = (sqlite3_vfs *)(async_vfs.pAppData); 1327 AsyncWrite *p = 0; 1328 int rc = SQLITE_OK; 1329 int holdingMutex = 0; 1330 1331 if( pthread_mutex_trylock(&async.writerMutex) ){ 1332 return 0; 1333 } 1334 (*(int *)pIsStarted) = 1; 1335 while( async.writerHaltNow==0 ){ 1336 int doNotFree = 0; 1337 sqlite3_file *pBase = 0; 1338 1339 if( !holdingMutex ){ 1340 pthread_mutex_lock(&async.queueMutex); 1341 } 1342 while( (p = async.pQueueFirst)==0 ){ 1343 pthread_cond_broadcast(&async.emptySignal); 1344 if( async.writerHaltWhenIdle ){ 1345 pthread_mutex_unlock(&async.queueMutex); 1346 break; 1347 }else{ 1348 ASYNC_TRACE(("IDLE\n")); 1349 pthread_cond_wait(&async.queueSignal, &async.queueMutex); 1350 ASYNC_TRACE(("WAKEUP\n")); 1351 } 1352 } 1353 if( p==0 ) break; 1354 holdingMutex = 1; 1355 1356 /* Right now this thread is holding the mutex on the write-op queue. 1357 ** Variable 'p' points to the first entry in the write-op queue. In 1358 ** the general case, we hold on to the mutex for the entire body of 1359 ** the loop. 1360 ** 1361 ** However in the cases enumerated below, we relinquish the mutex, 1362 ** perform the IO, and then re-request the mutex before removing 'p' from 1363 ** the head of the write-op queue. The idea is to increase concurrency with 1364 ** sqlite threads. 1365 ** 1366 ** * An ASYNC_CLOSE operation. 1367 ** * An ASYNC_OPENEXCLUSIVE operation. For this one, we relinquish 1368 ** the mutex, call the underlying xOpenExclusive() function, then 1369 ** re-aquire the mutex before seting the AsyncFile.pBaseRead 1370 ** variable. 1371 ** * ASYNC_SYNC and ASYNC_WRITE operations, if 1372 ** SQLITE_ASYNC_TWO_FILEHANDLES was set at compile time and two 1373 ** file-handles are open for the particular file being "synced". 1374 */ 1375 if( async.ioError!=SQLITE_OK && p->op!=ASYNC_CLOSE ){ 1376 p->op = ASYNC_NOOP; 1377 } 1378 if( p->pFileData ){ 1379 pBase = p->pFileData->pBaseWrite; 1380 if( 1381 p->op==ASYNC_CLOSE || 1382 p->op==ASYNC_OPENEXCLUSIVE || 1383 (pBase->pMethods && (p->op==ASYNC_SYNC || p->op==ASYNC_WRITE) ) 1384 ){ 1385 pthread_mutex_unlock(&async.queueMutex); 1386 holdingMutex = 0; 1387 } 1388 if( !pBase->pMethods ){ 1389 pBase = p->pFileData->pBaseRead; 1390 } 1391 } 1392 1393 switch( p->op ){ 1394 case ASYNC_NOOP: 1395 break; 1396 1397 case ASYNC_WRITE: 1398 assert( pBase ); 1399 ASYNC_TRACE(("WRITE %s %d bytes at %d\n", 1400 p->pFileData->zName, p->nByte, p->iOffset)); 1401 rc = sqlite3OsWrite(pBase, (void *)(p->zBuf), p->nByte, p->iOffset); 1402 break; 1403 1404 case ASYNC_SYNC: 1405 assert( pBase ); 1406 ASYNC_TRACE(("SYNC %s\n", p->pFileData->zName)); 1407 rc = sqlite3OsSync(pBase, p->nByte); 1408 break; 1409 1410 case ASYNC_TRUNCATE: 1411 assert( pBase ); 1412 ASYNC_TRACE(("TRUNCATE %s to %d bytes\n", 1413 p->pFileData->zName, p->iOffset)); 1414 rc = sqlite3OsTruncate(pBase, p->iOffset); 1415 break; 1416 1417 case ASYNC_CLOSE: { 1418 AsyncFileData *pData = p->pFileData; 1419 ASYNC_TRACE(("CLOSE %s\n", p->pFileData->zName)); 1420 sqlite3OsClose(pData->pBaseWrite); 1421 sqlite3OsClose(pData->pBaseRead); 1422 1423 /* Unlink AsyncFileData.lock from the linked list of AsyncFileLock 1424 ** structures for this file. Obtain the async.lockMutex mutex 1425 ** before doing so. 1426 */ 1427 pthread_mutex_lock(&async.lockMutex); 1428 rc = unlinkAsyncFile(pData); 1429 pthread_mutex_unlock(&async.lockMutex); 1430 1431 async.pQueueFirst = p->pNext; 1432 sqlite3_free(pData); 1433 doNotFree = 1; 1434 break; 1435 } 1436 1437 case ASYNC_UNLOCK: { 1438 AsyncLock *pLock; 1439 AsyncFileData *pData = p->pFileData; 1440 int eLock = p->nByte; 1441 pthread_mutex_lock(&async.lockMutex); 1442 pData->lock.eAsyncLock = MIN( 1443 pData->lock.eAsyncLock, MAX(pData->lock.eLock, eLock) 1444 ); 1445 assert(pData->lock.eAsyncLock>=pData->lock.eLock); 1446 pLock = sqlite3HashFind(&async.aLock, pData->zName, pData->nName); 1447 rc = getFileLock(pLock); 1448 pthread_mutex_unlock(&async.lockMutex); 1449 break; 1450 } 1451 1452 case ASYNC_DELETE: 1453 ASYNC_TRACE(("DELETE %s\n", p->zBuf)); 1454 rc = sqlite3OsDelete(pVfs, p->zBuf, (int)p->iOffset); 1455 break; 1456 1457 case ASYNC_OPENEXCLUSIVE: { 1458 int flags = (int)p->iOffset; 1459 AsyncFileData *pData = p->pFileData; 1460 ASYNC_TRACE(("OPEN %s flags=%d\n", p->zBuf, (int)p->iOffset)); 1461 assert(pData->pBaseRead->pMethods==0 && pData->pBaseWrite->pMethods==0); 1462 rc = sqlite3OsOpen(pVfs, pData->zName, pData->pBaseRead, flags, 0); 1463 assert( holdingMutex==0 ); 1464 pthread_mutex_lock(&async.queueMutex); 1465 holdingMutex = 1; 1466 break; 1467 } 1468 1469 default: assert(!"Illegal value for AsyncWrite.op"); 1470 } 1471 1472 /* If we didn't hang on to the mutex during the IO op, obtain it now 1473 ** so that the AsyncWrite structure can be safely removed from the 1474 ** global write-op queue. 1475 */ 1476 if( !holdingMutex ){ 1477 pthread_mutex_lock(&async.queueMutex); 1478 holdingMutex = 1; 1479 } 1480 /* ASYNC_TRACE(("UNLINK %p\n", p)); */ 1481 if( p==async.pQueueLast ){ 1482 async.pQueueLast = 0; 1483 } 1484 if( !doNotFree ){ 1485 async.pQueueFirst = p->pNext; 1486 sqlite3_free(p); 1487 } 1488 assert( holdingMutex ); 1489 1490 /* An IO error has occured. We cannot report the error back to the 1491 ** connection that requested the I/O since the error happened 1492 ** asynchronously. The connection has already moved on. There 1493 ** really is nobody to report the error to. 1494 ** 1495 ** The file for which the error occured may have been a database or 1496 ** journal file. Regardless, none of the currently queued operations 1497 ** associated with the same database should now be performed. Nor should 1498 ** any subsequently requested IO on either a database or journal file 1499 ** handle for the same database be accepted until the main database 1500 ** file handle has been closed and reopened. 1501 ** 1502 ** Furthermore, no further IO should be queued or performed on any file 1503 ** handle associated with a database that may have been part of a 1504 ** multi-file transaction that included the database associated with 1505 ** the IO error (i.e. a database ATTACHed to the same handle at some 1506 ** point in time). 1507 */ 1508 if( rc!=SQLITE_OK ){ 1509 async.ioError = rc; 1510 } 1511 1512 if( async.ioError && !async.pQueueFirst ){ 1513 pthread_mutex_lock(&async.lockMutex); 1514 if( 0==sqliteHashFirst(&async.aLock) ){ 1515 async.ioError = SQLITE_OK; 1516 } 1517 pthread_mutex_unlock(&async.lockMutex); 1518 } 1519 1520 /* Drop the queue mutex before continuing to the next write operation 1521 ** in order to give other threads a chance to work with the write queue. 1522 */ 1523 if( !async.pQueueFirst || !async.ioError ){ 1524 pthread_mutex_unlock(&async.queueMutex); 1525 holdingMutex = 0; 1526 if( async.ioDelay>0 ){ 1527 sqlite3OsSleep(pVfs, async.ioDelay); 1528 }else{ 1529 sched_yield(); 1530 } 1531 } 1532 } 1533 1534 pthread_mutex_unlock(&async.writerMutex); 1535 return 0; 1536 } 1537 1538 /************************************************************************** 1539 ** The remaining code defines a Tcl interface for testing the asynchronous 1540 ** IO implementation in this file. 1541 ** 1542 ** To adapt the code to a non-TCL environment, delete or comment out 1543 ** the code that follows. 1544 */ 1545 1546 /* 1547 ** sqlite3async_enable ?YES/NO? 1548 ** 1549 ** Enable or disable the asynchronous I/O backend. This command is 1550 ** not thread-safe. Do not call it while any database connections 1551 ** are open. 1552 */ 1553 static int testAsyncEnable( 1554 void * clientData, 1555 Tcl_Interp *interp, 1556 int objc, 1557 Tcl_Obj *CONST objv[] 1558 ){ 1559 if( objc!=1 && objc!=2 ){ 1560 Tcl_WrongNumArgs(interp, 1, objv, "?YES/NO?"); 1561 return TCL_ERROR; 1562 } 1563 if( objc==1 ){ 1564 Tcl_SetObjResult(interp, Tcl_NewBooleanObj(async_vfs.pAppData!=0)); 1565 }else{ 1566 int en; 1567 if( Tcl_GetBooleanFromObj(interp, objv[1], &en) ) return TCL_ERROR; 1568 asyncEnable(en); 1569 } 1570 return TCL_OK; 1571 } 1572 1573 /* 1574 ** sqlite3async_halt "now"|"idle"|"never" 1575 ** 1576 ** Set the conditions at which the writer thread will halt. 1577 */ 1578 static int testAsyncHalt( 1579 void * clientData, 1580 Tcl_Interp *interp, 1581 int objc, 1582 Tcl_Obj *CONST objv[] 1583 ){ 1584 const char *zCond; 1585 if( objc!=2 ){ 1586 Tcl_WrongNumArgs(interp, 1, objv, "\"now\"|\"idle\"|\"never\""); 1587 return TCL_ERROR; 1588 } 1589 zCond = Tcl_GetString(objv[1]); 1590 if( strcmp(zCond, "now")==0 ){ 1591 async.writerHaltNow = 1; 1592 pthread_cond_broadcast(&async.queueSignal); 1593 }else if( strcmp(zCond, "idle")==0 ){ 1594 async.writerHaltWhenIdle = 1; 1595 async.writerHaltNow = 0; 1596 pthread_cond_broadcast(&async.queueSignal); 1597 }else if( strcmp(zCond, "never")==0 ){ 1598 async.writerHaltWhenIdle = 0; 1599 async.writerHaltNow = 0; 1600 }else{ 1601 Tcl_AppendResult(interp, 1602 "should be one of: \"now\", \"idle\", or \"never\"", (char*)0); 1603 return TCL_ERROR; 1604 } 1605 return TCL_OK; 1606 } 1607 1608 /* 1609 ** sqlite3async_delay ?MS? 1610 ** 1611 ** Query or set the number of milliseconds of delay in the writer 1612 ** thread after each write operation. The default is 0. By increasing 1613 ** the memory delay we can simulate the effect of slow disk I/O. 1614 */ 1615 static int testAsyncDelay( 1616 void * clientData, 1617 Tcl_Interp *interp, 1618 int objc, 1619 Tcl_Obj *CONST objv[] 1620 ){ 1621 if( objc!=1 && objc!=2 ){ 1622 Tcl_WrongNumArgs(interp, 1, objv, "?MS?"); 1623 return TCL_ERROR; 1624 } 1625 if( objc==1 ){ 1626 Tcl_SetObjResult(interp, Tcl_NewIntObj(async.ioDelay)); 1627 }else{ 1628 int ioDelay; 1629 if( Tcl_GetIntFromObj(interp, objv[1], &ioDelay) ) return TCL_ERROR; 1630 async.ioDelay = ioDelay; 1631 } 1632 return TCL_OK; 1633 } 1634 1635 /* 1636 ** sqlite3async_start 1637 ** 1638 ** Start a new writer thread. 1639 */ 1640 static int testAsyncStart( 1641 void * clientData, 1642 Tcl_Interp *interp, 1643 int objc, 1644 Tcl_Obj *CONST objv[] 1645 ){ 1646 pthread_t x; 1647 int rc; 1648 volatile int isStarted = 0; 1649 rc = pthread_create(&x, 0, asyncWriterThread, (void *)&isStarted); 1650 if( rc ){ 1651 Tcl_AppendResult(interp, "failed to create the thread", 0); 1652 return TCL_ERROR; 1653 } 1654 pthread_detach(x); 1655 while( isStarted==0 ){ 1656 sched_yield(); 1657 } 1658 return TCL_OK; 1659 } 1660 1661 /* 1662 ** sqlite3async_wait 1663 ** 1664 ** Wait for the current writer thread to terminate. 1665 ** 1666 ** If the current writer thread is set to run forever then this 1667 ** command would block forever. To prevent that, an error is returned. 1668 */ 1669 static int testAsyncWait( 1670 void * clientData, 1671 Tcl_Interp *interp, 1672 int objc, 1673 Tcl_Obj *CONST objv[] 1674 ){ 1675 int cnt = 10; 1676 if( async.writerHaltNow==0 && async.writerHaltWhenIdle==0 ){ 1677 Tcl_AppendResult(interp, "would block forever", (char*)0); 1678 return TCL_ERROR; 1679 } 1680 1681 while( cnt-- && !pthread_mutex_trylock(&async.writerMutex) ){ 1682 pthread_mutex_unlock(&async.writerMutex); 1683 sched_yield(); 1684 } 1685 if( cnt>=0 ){ 1686 ASYNC_TRACE(("WAIT\n")); 1687 pthread_mutex_lock(&async.queueMutex); 1688 pthread_cond_broadcast(&async.queueSignal); 1689 pthread_mutex_unlock(&async.queueMutex); 1690 pthread_mutex_lock(&async.writerMutex); 1691 pthread_mutex_unlock(&async.writerMutex); 1692 }else{ 1693 ASYNC_TRACE(("NO-WAIT\n")); 1694 } 1695 return TCL_OK; 1696 } 1697 1698 1699 #endif /* SQLITE_OS_UNIX and SQLITE_THREADSAFE */ 1700 1701 /* 1702 ** This routine registers the custom TCL commands defined in this 1703 ** module. This should be the only procedure visible from outside 1704 ** of this module. 1705 */ 1706 int Sqlitetestasync_Init(Tcl_Interp *interp){ 1707 #if SQLITE_OS_UNIX && SQLITE_THREADSAFE 1708 Tcl_CreateObjCommand(interp,"sqlite3async_enable",testAsyncEnable,0,0); 1709 Tcl_CreateObjCommand(interp,"sqlite3async_halt",testAsyncHalt,0,0); 1710 Tcl_CreateObjCommand(interp,"sqlite3async_delay",testAsyncDelay,0,0); 1711 Tcl_CreateObjCommand(interp,"sqlite3async_start",testAsyncStart,0,0); 1712 Tcl_CreateObjCommand(interp,"sqlite3async_wait",testAsyncWait,0,0); 1713 Tcl_LinkVar(interp, "sqlite3async_trace", 1714 (char*)&sqlite3async_trace, TCL_LINK_INT); 1715 #endif /* SQLITE_OS_UNIX and SQLITE_THREADSAFE */ 1716 return TCL_OK; 1717 } 1718