1 /* 2 ** 2011-07-09 3 ** 4 ** The author disclaims copyright to this source code. In place of 5 ** a legal notice, here is a blessing: 6 ** 7 ** May you do good and not evil. 8 ** May you find forgiveness for yourself and forgive others. 9 ** May you share freely, never taking more than you give. 10 ** 11 ************************************************************************* 12 ** This file contains code for the VdbeSorter object, used in concert with 13 ** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements 14 ** or by SELECT statements with ORDER BY clauses that cannot be satisfied 15 ** using indexes and without LIMIT clauses. 16 ** 17 ** The VdbeSorter object implements a multi-threaded external merge sort 18 ** algorithm that is efficient even if the number of elements being sorted 19 ** exceeds the available memory. 20 ** 21 ** Here is the (internal, non-API) interface between this module and the 22 ** rest of the SQLite system: 23 ** 24 ** sqlite3VdbeSorterInit() Create a new VdbeSorter object. 25 ** 26 ** sqlite3VdbeSorterWrite() Add a single new row to the VdbeSorter 27 ** object. The row is a binary blob in the 28 ** OP_MakeRecord format that contains both 29 ** the ORDER BY key columns and result columns 30 ** in the case of a SELECT w/ ORDER BY, or 31 ** the complete record for an index entry 32 ** in the case of a CREATE INDEX. 33 ** 34 ** sqlite3VdbeSorterRewind() Sort all content previously added. 35 ** Position the read cursor on the 36 ** first sorted element. 37 ** 38 ** sqlite3VdbeSorterNext() Advance the read cursor to the next sorted 39 ** element. 40 ** 41 ** sqlite3VdbeSorterRowkey() Return the complete binary blob for the 42 ** row currently under the read cursor. 43 ** 44 ** sqlite3VdbeSorterCompare() Compare the binary blob for the row 45 ** currently under the read cursor against 46 ** another binary blob X and report if 47 ** X is strictly less than the read cursor. 48 ** Used to enforce uniqueness in a 49 ** CREATE UNIQUE INDEX statement. 50 ** 51 ** sqlite3VdbeSorterClose() Close the VdbeSorter object and reclaim 52 ** all resources. 53 ** 54 ** sqlite3VdbeSorterReset() Refurbish the VdbeSorter for reuse. This 55 ** is like Close() followed by Init() only 56 ** much faster. 57 ** 58 ** The interfaces above must be called in a particular order. Write() can 59 ** only occur in between Init()/Reset() and Rewind(). Next(), Rowkey(), and 60 ** Compare() can only occur in between Rewind() and Close()/Reset(). i.e. 61 ** 62 ** Init() 63 ** for each record: Write() 64 ** Rewind() 65 ** Rowkey()/Compare() 66 ** Next() 67 ** Close() 68 ** 69 ** Algorithm: 70 ** 71 ** Records passed to the sorter via calls to Write() are initially held 72 ** unsorted in main memory. Assuming the amount of memory used never exceeds 73 ** a threshold, when Rewind() is called the set of records is sorted using 74 ** an in-memory merge sort. In this case, no temporary files are required 75 ** and subsequent calls to Rowkey(), Next() and Compare() read records 76 ** directly from main memory. 77 ** 78 ** If the amount of space used to store records in main memory exceeds the 79 ** threshold, then the set of records currently in memory are sorted and 80 ** written to a temporary file in "Packed Memory Array" (PMA) format. 81 ** A PMA created at this point is known as a "level-0 PMA". Higher levels 82 ** of PMAs may be created by merging existing PMAs together - for example 83 ** merging two or more level-0 PMAs together creates a level-1 PMA. 84 ** 85 ** The threshold for the amount of main memory to use before flushing 86 ** records to a PMA is roughly the same as the limit configured for the 87 ** page-cache of the main database. Specifically, the threshold is set to 88 ** the value returned by "PRAGMA main.page_size" multipled by 89 ** that returned by "PRAGMA main.cache_size", in bytes. 90 ** 91 ** If the sorter is running in single-threaded mode, then all PMAs generated 92 ** are appended to a single temporary file. Or, if the sorter is running in 93 ** multi-threaded mode then up to (N+1) temporary files may be opened, where 94 ** N is the configured number of worker threads. In this case, instead of 95 ** sorting the records and writing the PMA to a temporary file itself, the 96 ** calling thread usually launches a worker thread to do so. Except, if 97 ** there are already N worker threads running, the main thread does the work 98 ** itself. 99 ** 100 ** The sorter is running in multi-threaded mode if (a) the library was built 101 ** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater 102 ** than zero, and (b) worker threads have been enabled at runtime by calling 103 ** "PRAGMA threads=N" with some value of N greater than 0. 104 ** 105 ** When Rewind() is called, any data remaining in memory is flushed to a 106 ** final PMA. So at this point the data is stored in some number of sorted 107 ** PMAs within temporary files on disk. 108 ** 109 ** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the 110 ** sorter is running in single-threaded mode, then these PMAs are merged 111 ** incrementally as keys are retreived from the sorter by the VDBE. The 112 ** MergeEngine object, described in further detail below, performs this 113 ** merge. 114 ** 115 ** Or, if running in multi-threaded mode, then a background thread is 116 ** launched to merge the existing PMAs. Once the background thread has 117 ** merged T bytes of data into a single sorted PMA, the main thread 118 ** begins reading keys from that PMA while the background thread proceeds 119 ** with merging the next T bytes of data. And so on. 120 ** 121 ** Parameter T is set to half the value of the memory threshold used 122 ** by Write() above to determine when to create a new PMA. 123 ** 124 ** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when 125 ** Rewind() is called, then a hierarchy of incremental-merges is used. 126 ** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on 127 ** disk are merged together. Then T bytes of data from the second set, and 128 ** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT 129 ** PMAs at a time. This done is to improve locality. 130 ** 131 ** If running in multi-threaded mode and there are more than 132 ** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more 133 ** than one background thread may be created. Specifically, there may be 134 ** one background thread for each temporary file on disk, and one background 135 ** thread to merge the output of each of the others to a single PMA for 136 ** the main thread to read from. 137 */ 138 #include "sqliteInt.h" 139 #include "vdbeInt.h" 140 141 /* 142 ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various 143 ** messages to stderr that may be helpful in understanding the performance 144 ** characteristics of the sorter in multi-threaded mode. 145 */ 146 #if 0 147 # define SQLITE_DEBUG_SORTER_THREADS 1 148 #endif 149 150 /* 151 ** Hard-coded maximum amount of data to accumulate in memory before flushing 152 ** to a level 0 PMA. The purpose of this limit is to prevent various integer 153 ** overflows. 512MiB. 154 */ 155 #define SQLITE_MAX_PMASZ (1<<29) 156 157 /* 158 ** Private objects used by the sorter 159 */ 160 typedef struct MergeEngine MergeEngine; /* Merge PMAs together */ 161 typedef struct PmaReader PmaReader; /* Incrementally read one PMA */ 162 typedef struct PmaWriter PmaWriter; /* Incrementally write one PMA */ 163 typedef struct SorterRecord SorterRecord; /* A record being sorted */ 164 typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ 165 typedef struct SorterFile SorterFile; /* Temporary file object wrapper */ 166 typedef struct SorterList SorterList; /* In-memory list of records */ 167 typedef struct IncrMerger IncrMerger; /* Read & merge multiple PMAs */ 168 169 /* 170 ** A container for a temp file handle and the current amount of data 171 ** stored in the file. 172 */ 173 struct SorterFile { 174 sqlite3_file *pFd; /* File handle */ 175 i64 iEof; /* Bytes of data stored in pFd */ 176 }; 177 178 /* 179 ** An in-memory list of objects to be sorted. 180 ** 181 ** If aMemory==0 then each object is allocated separately and the objects 182 ** are connected using SorterRecord.u.pNext. If aMemory!=0 then all objects 183 ** are stored in the aMemory[] bulk memory, one right after the other, and 184 ** are connected using SorterRecord.u.iNext. 185 */ 186 struct SorterList { 187 SorterRecord *pList; /* Linked list of records */ 188 u8 *aMemory; /* If non-NULL, bulk memory to hold pList */ 189 int szPMA; /* Size of pList as PMA in bytes */ 190 }; 191 192 /* 193 ** The MergeEngine object is used to combine two or more smaller PMAs into 194 ** one big PMA using a merge operation. Separate PMAs all need to be 195 ** combined into one big PMA in order to be able to step through the sorted 196 ** records in order. 197 ** 198 ** The aReadr[] array contains a PmaReader object for each of the PMAs being 199 ** merged. An aReadr[] object either points to a valid key or else is at EOF. 200 ** ("EOF" means "End Of File". When aReadr[] is at EOF there is no more data.) 201 ** For the purposes of the paragraphs below, we assume that the array is 202 ** actually N elements in size, where N is the smallest power of 2 greater 203 ** to or equal to the number of PMAs being merged. The extra aReadr[] elements 204 ** are treated as if they are empty (always at EOF). 205 ** 206 ** The aTree[] array is also N elements in size. The value of N is stored in 207 ** the MergeEngine.nTree variable. 208 ** 209 ** The final (N/2) elements of aTree[] contain the results of comparing 210 ** pairs of PMA keys together. Element i contains the result of 211 ** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the 212 ** aTree element is set to the index of it. 213 ** 214 ** For the purposes of this comparison, EOF is considered greater than any 215 ** other key value. If the keys are equal (only possible with two EOF 216 ** values), it doesn't matter which index is stored. 217 ** 218 ** The (N/4) elements of aTree[] that precede the final (N/2) described 219 ** above contains the index of the smallest of each block of 4 PmaReaders 220 ** And so on. So that aTree[1] contains the index of the PmaReader that 221 ** currently points to the smallest key value. aTree[0] is unused. 222 ** 223 ** Example: 224 ** 225 ** aReadr[0] -> Banana 226 ** aReadr[1] -> Feijoa 227 ** aReadr[2] -> Elderberry 228 ** aReadr[3] -> Currant 229 ** aReadr[4] -> Grapefruit 230 ** aReadr[5] -> Apple 231 ** aReadr[6] -> Durian 232 ** aReadr[7] -> EOF 233 ** 234 ** aTree[] = { X, 5 0, 5 0, 3, 5, 6 } 235 ** 236 ** The current element is "Apple" (the value of the key indicated by 237 ** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will 238 ** be advanced to the next key in its segment. Say the next key is 239 ** "Eggplant": 240 ** 241 ** aReadr[5] -> Eggplant 242 ** 243 ** The contents of aTree[] are updated first by comparing the new PmaReader 244 ** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader 245 ** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree. 246 ** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader 247 ** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian), 248 ** so the value written into element 1 of the array is 0. As follows: 249 ** 250 ** aTree[] = { X, 0 0, 6 0, 3, 5, 6 } 251 ** 252 ** In other words, each time we advance to the next sorter element, log2(N) 253 ** key comparison operations are required, where N is the number of segments 254 ** being merged (rounded up to the next power of 2). 255 */ 256 struct MergeEngine { 257 int nTree; /* Used size of aTree/aReadr (power of 2) */ 258 SortSubtask *pTask; /* Used by this thread only */ 259 int *aTree; /* Current state of incremental merge */ 260 PmaReader *aReadr; /* Array of PmaReaders to merge data from */ 261 }; 262 263 /* 264 ** This object represents a single thread of control in a sort operation. 265 ** Exactly VdbeSorter.nTask instances of this object are allocated 266 ** as part of each VdbeSorter object. Instances are never allocated any 267 ** other way. VdbeSorter.nTask is set to the number of worker threads allowed 268 ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). Thus for 269 ** single-threaded operation, there is exactly one instance of this object 270 ** and for multi-threaded operation there are two or more instances. 271 ** 272 ** Essentially, this structure contains all those fields of the VdbeSorter 273 ** structure for which each thread requires a separate instance. For example, 274 ** each thread requries its own UnpackedRecord object to unpack records in 275 ** as part of comparison operations. 276 ** 277 ** Before a background thread is launched, variable bDone is set to 0. Then, 278 ** right before it exits, the thread itself sets bDone to 1. This is used for 279 ** two purposes: 280 ** 281 ** 1. When flushing the contents of memory to a level-0 PMA on disk, to 282 ** attempt to select a SortSubtask for which there is not already an 283 ** active background thread (since doing so causes the main thread 284 ** to block until it finishes). 285 ** 286 ** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call 287 ** to sqlite3ThreadJoin() is likely to block. Cases that are likely to 288 ** block provoke debugging output. 289 ** 290 ** In both cases, the effects of the main thread seeing (bDone==0) even 291 ** after the thread has finished are not dire. So we don't worry about 292 ** memory barriers and such here. 293 */ 294 typedef int (*SorterCompare)(SortSubtask*,int*,const void*,int,const void*,int); 295 struct SortSubtask { 296 SQLiteThread *pThread; /* Background thread, if any */ 297 int bDone; /* Set if thread is finished but not joined */ 298 VdbeSorter *pSorter; /* Sorter that owns this sub-task */ 299 UnpackedRecord *pUnpacked; /* Space to unpack a record */ 300 SorterList list; /* List for thread to write to a PMA */ 301 int nPMA; /* Number of PMAs currently in file */ 302 SorterCompare xCompare; /* Compare function to use */ 303 SorterFile file; /* Temp file for level-0 PMAs */ 304 SorterFile file2; /* Space for other PMAs */ 305 }; 306 307 308 /* 309 ** Main sorter structure. A single instance of this is allocated for each 310 ** sorter cursor created by the VDBE. 311 ** 312 ** mxKeysize: 313 ** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(), 314 ** this variable is updated so as to be set to the size on disk of the 315 ** largest record in the sorter. 316 */ 317 struct VdbeSorter { 318 int mnPmaSize; /* Minimum PMA size, in bytes */ 319 int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ 320 int mxKeysize; /* Largest serialized key seen so far */ 321 int pgsz; /* Main database page size */ 322 PmaReader *pReader; /* Readr data from here after Rewind() */ 323 MergeEngine *pMerger; /* Or here, if bUseThreads==0 */ 324 sqlite3 *db; /* Database connection */ 325 KeyInfo *pKeyInfo; /* How to compare records */ 326 UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */ 327 SorterList list; /* List of in-memory records */ 328 int iMemory; /* Offset of free space in list.aMemory */ 329 int nMemory; /* Size of list.aMemory allocation in bytes */ 330 u8 bUsePMA; /* True if one or more PMAs created */ 331 u8 bUseThreads; /* True to use background threads */ 332 u8 iPrev; /* Previous thread used to flush PMA */ 333 u8 nTask; /* Size of aTask[] array */ 334 u8 typeMask; 335 SortSubtask aTask[1]; /* One or more subtasks */ 336 }; 337 338 #define SORTER_TYPE_INTEGER 0x01 339 #define SORTER_TYPE_TEXT 0x02 340 341 /* 342 ** An instance of the following object is used to read records out of a 343 ** PMA, in sorted order. The next key to be read is cached in nKey/aKey. 344 ** aKey might point into aMap or into aBuffer. If neither of those locations 345 ** contain a contiguous representation of the key, then aAlloc is allocated 346 ** and the key is copied into aAlloc and aKey is made to poitn to aAlloc. 347 ** 348 ** pFd==0 at EOF. 349 */ 350 struct PmaReader { 351 i64 iReadOff; /* Current read offset */ 352 i64 iEof; /* 1 byte past EOF for this PmaReader */ 353 int nAlloc; /* Bytes of space at aAlloc */ 354 int nKey; /* Number of bytes in key */ 355 sqlite3_file *pFd; /* File handle we are reading from */ 356 u8 *aAlloc; /* Space for aKey if aBuffer and pMap wont work */ 357 u8 *aKey; /* Pointer to current key */ 358 u8 *aBuffer; /* Current read buffer */ 359 int nBuffer; /* Size of read buffer in bytes */ 360 u8 *aMap; /* Pointer to mapping of entire file */ 361 IncrMerger *pIncr; /* Incremental merger */ 362 }; 363 364 /* 365 ** Normally, a PmaReader object iterates through an existing PMA stored 366 ** within a temp file. However, if the PmaReader.pIncr variable points to 367 ** an object of the following type, it may be used to iterate/merge through 368 ** multiple PMAs simultaneously. 369 ** 370 ** There are two types of IncrMerger object - single (bUseThread==0) and 371 ** multi-threaded (bUseThread==1). 372 ** 373 ** A multi-threaded IncrMerger object uses two temporary files - aFile[0] 374 ** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in 375 ** size. When the IncrMerger is initialized, it reads enough data from 376 ** pMerger to populate aFile[0]. It then sets variables within the 377 ** corresponding PmaReader object to read from that file and kicks off 378 ** a background thread to populate aFile[1] with the next mxSz bytes of 379 ** sorted record data from pMerger. 380 ** 381 ** When the PmaReader reaches the end of aFile[0], it blocks until the 382 ** background thread has finished populating aFile[1]. It then exchanges 383 ** the contents of the aFile[0] and aFile[1] variables within this structure, 384 ** sets the PmaReader fields to read from the new aFile[0] and kicks off 385 ** another background thread to populate the new aFile[1]. And so on, until 386 ** the contents of pMerger are exhausted. 387 ** 388 ** A single-threaded IncrMerger does not open any temporary files of its 389 ** own. Instead, it has exclusive access to mxSz bytes of space beginning 390 ** at offset iStartOff of file pTask->file2. And instead of using a 391 ** background thread to prepare data for the PmaReader, with a single 392 ** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with 393 ** keys from pMerger by the calling thread whenever the PmaReader runs out 394 ** of data. 395 */ 396 struct IncrMerger { 397 SortSubtask *pTask; /* Task that owns this merger */ 398 MergeEngine *pMerger; /* Merge engine thread reads data from */ 399 i64 iStartOff; /* Offset to start writing file at */ 400 int mxSz; /* Maximum bytes of data to store */ 401 int bEof; /* Set to true when merge is finished */ 402 int bUseThread; /* True to use a bg thread for this object */ 403 SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */ 404 }; 405 406 /* 407 ** An instance of this object is used for writing a PMA. 408 ** 409 ** The PMA is written one record at a time. Each record is of an arbitrary 410 ** size. But I/O is more efficient if it occurs in page-sized blocks where 411 ** each block is aligned on a page boundary. This object caches writes to 412 ** the PMA so that aligned, page-size blocks are written. 413 */ 414 struct PmaWriter { 415 int eFWErr; /* Non-zero if in an error state */ 416 u8 *aBuffer; /* Pointer to write buffer */ 417 int nBuffer; /* Size of write buffer in bytes */ 418 int iBufStart; /* First byte of buffer to write */ 419 int iBufEnd; /* Last byte of buffer to write */ 420 i64 iWriteOff; /* Offset of start of buffer in file */ 421 sqlite3_file *pFd; /* File handle to write to */ 422 }; 423 424 /* 425 ** This object is the header on a single record while that record is being 426 ** held in memory and prior to being written out as part of a PMA. 427 ** 428 ** How the linked list is connected depends on how memory is being managed 429 ** by this module. If using a separate allocation for each in-memory record 430 ** (VdbeSorter.list.aMemory==0), then the list is always connected using the 431 ** SorterRecord.u.pNext pointers. 432 ** 433 ** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0), 434 ** then while records are being accumulated the list is linked using the 435 ** SorterRecord.u.iNext offset. This is because the aMemory[] array may 436 ** be sqlite3Realloc()ed while records are being accumulated. Once the VM 437 ** has finished passing records to the sorter, or when the in-memory buffer 438 ** is full, the list is sorted. As part of the sorting process, it is 439 ** converted to use the SorterRecord.u.pNext pointers. See function 440 ** vdbeSorterSort() for details. 441 */ 442 struct SorterRecord { 443 int nVal; /* Size of the record in bytes */ 444 union { 445 SorterRecord *pNext; /* Pointer to next record in list */ 446 int iNext; /* Offset within aMemory of next record */ 447 } u; 448 /* The data for the record immediately follows this header */ 449 }; 450 451 /* Return a pointer to the buffer containing the record data for SorterRecord 452 ** object p. Should be used as if: 453 ** 454 ** void *SRVAL(SorterRecord *p) { return (void*)&p[1]; } 455 */ 456 #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1)) 457 458 459 /* Maximum number of PMAs that a single MergeEngine can merge */ 460 #define SORTER_MAX_MERGE_COUNT 16 461 462 static int vdbeIncrSwap(IncrMerger*); 463 static void vdbeIncrFree(IncrMerger *); 464 465 /* 466 ** Free all memory belonging to the PmaReader object passed as the 467 ** argument. All structure fields are set to zero before returning. 468 */ 469 static void vdbePmaReaderClear(PmaReader *pReadr){ 470 sqlite3_free(pReadr->aAlloc); 471 sqlite3_free(pReadr->aBuffer); 472 if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap); 473 vdbeIncrFree(pReadr->pIncr); 474 memset(pReadr, 0, sizeof(PmaReader)); 475 } 476 477 /* 478 ** Read the next nByte bytes of data from the PMA p. 479 ** If successful, set *ppOut to point to a buffer containing the data 480 ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite 481 ** error code. 482 ** 483 ** The buffer returned in *ppOut is only valid until the 484 ** next call to this function. 485 */ 486 static int vdbePmaReadBlob( 487 PmaReader *p, /* PmaReader from which to take the blob */ 488 int nByte, /* Bytes of data to read */ 489 u8 **ppOut /* OUT: Pointer to buffer containing data */ 490 ){ 491 int iBuf; /* Offset within buffer to read from */ 492 int nAvail; /* Bytes of data available in buffer */ 493 494 if( p->aMap ){ 495 *ppOut = &p->aMap[p->iReadOff]; 496 p->iReadOff += nByte; 497 return SQLITE_OK; 498 } 499 500 assert( p->aBuffer ); 501 502 /* If there is no more data to be read from the buffer, read the next 503 ** p->nBuffer bytes of data from the file into it. Or, if there are less 504 ** than p->nBuffer bytes remaining in the PMA, read all remaining data. */ 505 iBuf = p->iReadOff % p->nBuffer; 506 if( iBuf==0 ){ 507 int nRead; /* Bytes to read from disk */ 508 int rc; /* sqlite3OsRead() return code */ 509 510 /* Determine how many bytes of data to read. */ 511 if( (p->iEof - p->iReadOff) > (i64)p->nBuffer ){ 512 nRead = p->nBuffer; 513 }else{ 514 nRead = (int)(p->iEof - p->iReadOff); 515 } 516 assert( nRead>0 ); 517 518 /* Readr data from the file. Return early if an error occurs. */ 519 rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff); 520 assert( rc!=SQLITE_IOERR_SHORT_READ ); 521 if( rc!=SQLITE_OK ) return rc; 522 } 523 nAvail = p->nBuffer - iBuf; 524 525 if( nByte<=nAvail ){ 526 /* The requested data is available in the in-memory buffer. In this 527 ** case there is no need to make a copy of the data, just return a 528 ** pointer into the buffer to the caller. */ 529 *ppOut = &p->aBuffer[iBuf]; 530 p->iReadOff += nByte; 531 }else{ 532 /* The requested data is not all available in the in-memory buffer. 533 ** In this case, allocate space at p->aAlloc[] to copy the requested 534 ** range into. Then return a copy of pointer p->aAlloc to the caller. */ 535 int nRem; /* Bytes remaining to copy */ 536 537 /* Extend the p->aAlloc[] allocation if required. */ 538 if( p->nAlloc<nByte ){ 539 u8 *aNew; 540 sqlite3_int64 nNew = MAX(128, 2*(sqlite3_int64)p->nAlloc); 541 while( nByte>nNew ) nNew = nNew*2; 542 aNew = sqlite3Realloc(p->aAlloc, nNew); 543 if( !aNew ) return SQLITE_NOMEM_BKPT; 544 p->nAlloc = nNew; 545 p->aAlloc = aNew; 546 } 547 548 /* Copy as much data as is available in the buffer into the start of 549 ** p->aAlloc[]. */ 550 memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail); 551 p->iReadOff += nAvail; 552 nRem = nByte - nAvail; 553 554 /* The following loop copies up to p->nBuffer bytes per iteration into 555 ** the p->aAlloc[] buffer. */ 556 while( nRem>0 ){ 557 int rc; /* vdbePmaReadBlob() return code */ 558 int nCopy; /* Number of bytes to copy */ 559 u8 *aNext; /* Pointer to buffer to copy data from */ 560 561 nCopy = nRem; 562 if( nRem>p->nBuffer ) nCopy = p->nBuffer; 563 rc = vdbePmaReadBlob(p, nCopy, &aNext); 564 if( rc!=SQLITE_OK ) return rc; 565 assert( aNext!=p->aAlloc ); 566 memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy); 567 nRem -= nCopy; 568 } 569 570 *ppOut = p->aAlloc; 571 } 572 573 return SQLITE_OK; 574 } 575 576 /* 577 ** Read a varint from the stream of data accessed by p. Set *pnOut to 578 ** the value read. 579 */ 580 static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){ 581 int iBuf; 582 583 if( p->aMap ){ 584 p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut); 585 }else{ 586 iBuf = p->iReadOff % p->nBuffer; 587 if( iBuf && (p->nBuffer-iBuf)>=9 ){ 588 p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut); 589 }else{ 590 u8 aVarint[16], *a; 591 int i = 0, rc; 592 do{ 593 rc = vdbePmaReadBlob(p, 1, &a); 594 if( rc ) return rc; 595 aVarint[(i++)&0xf] = a[0]; 596 }while( (a[0]&0x80)!=0 ); 597 sqlite3GetVarint(aVarint, pnOut); 598 } 599 } 600 601 return SQLITE_OK; 602 } 603 604 /* 605 ** Attempt to memory map file pFile. If successful, set *pp to point to the 606 ** new mapping and return SQLITE_OK. If the mapping is not attempted 607 ** (because the file is too large or the VFS layer is configured not to use 608 ** mmap), return SQLITE_OK and set *pp to NULL. 609 ** 610 ** Or, if an error occurs, return an SQLite error code. The final value of 611 ** *pp is undefined in this case. 612 */ 613 static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){ 614 int rc = SQLITE_OK; 615 if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){ 616 sqlite3_file *pFd = pFile->pFd; 617 if( pFd->pMethods->iVersion>=3 ){ 618 rc = sqlite3OsFetch(pFd, 0, (int)pFile->iEof, (void**)pp); 619 testcase( rc!=SQLITE_OK ); 620 } 621 } 622 return rc; 623 } 624 625 /* 626 ** Attach PmaReader pReadr to file pFile (if it is not already attached to 627 ** that file) and seek it to offset iOff within the file. Return SQLITE_OK 628 ** if successful, or an SQLite error code if an error occurs. 629 */ 630 static int vdbePmaReaderSeek( 631 SortSubtask *pTask, /* Task context */ 632 PmaReader *pReadr, /* Reader whose cursor is to be moved */ 633 SorterFile *pFile, /* Sorter file to read from */ 634 i64 iOff /* Offset in pFile */ 635 ){ 636 int rc = SQLITE_OK; 637 638 assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 ); 639 640 if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ; 641 if( pReadr->aMap ){ 642 sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap); 643 pReadr->aMap = 0; 644 } 645 pReadr->iReadOff = iOff; 646 pReadr->iEof = pFile->iEof; 647 pReadr->pFd = pFile->pFd; 648 649 rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap); 650 if( rc==SQLITE_OK && pReadr->aMap==0 ){ 651 int pgsz = pTask->pSorter->pgsz; 652 int iBuf = pReadr->iReadOff % pgsz; 653 if( pReadr->aBuffer==0 ){ 654 pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz); 655 if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM_BKPT; 656 pReadr->nBuffer = pgsz; 657 } 658 if( rc==SQLITE_OK && iBuf ){ 659 int nRead = pgsz - iBuf; 660 if( (pReadr->iReadOff + nRead) > pReadr->iEof ){ 661 nRead = (int)(pReadr->iEof - pReadr->iReadOff); 662 } 663 rc = sqlite3OsRead( 664 pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff 665 ); 666 testcase( rc!=SQLITE_OK ); 667 } 668 } 669 670 return rc; 671 } 672 673 /* 674 ** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if 675 ** no error occurs, or an SQLite error code if one does. 676 */ 677 static int vdbePmaReaderNext(PmaReader *pReadr){ 678 int rc = SQLITE_OK; /* Return Code */ 679 u64 nRec = 0; /* Size of record in bytes */ 680 681 682 if( pReadr->iReadOff>=pReadr->iEof ){ 683 IncrMerger *pIncr = pReadr->pIncr; 684 int bEof = 1; 685 if( pIncr ){ 686 rc = vdbeIncrSwap(pIncr); 687 if( rc==SQLITE_OK && pIncr->bEof==0 ){ 688 rc = vdbePmaReaderSeek( 689 pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff 690 ); 691 bEof = 0; 692 } 693 } 694 695 if( bEof ){ 696 /* This is an EOF condition */ 697 vdbePmaReaderClear(pReadr); 698 testcase( rc!=SQLITE_OK ); 699 return rc; 700 } 701 } 702 703 if( rc==SQLITE_OK ){ 704 rc = vdbePmaReadVarint(pReadr, &nRec); 705 } 706 if( rc==SQLITE_OK ){ 707 pReadr->nKey = (int)nRec; 708 rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey); 709 testcase( rc!=SQLITE_OK ); 710 } 711 712 return rc; 713 } 714 715 /* 716 ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile 717 ** starting at offset iStart and ending at offset iEof-1. This function 718 ** leaves the PmaReader pointing to the first key in the PMA (or EOF if the 719 ** PMA is empty). 720 ** 721 ** If the pnByte parameter is NULL, then it is assumed that the file 722 ** contains a single PMA, and that that PMA omits the initial length varint. 723 */ 724 static int vdbePmaReaderInit( 725 SortSubtask *pTask, /* Task context */ 726 SorterFile *pFile, /* Sorter file to read from */ 727 i64 iStart, /* Start offset in pFile */ 728 PmaReader *pReadr, /* PmaReader to populate */ 729 i64 *pnByte /* IN/OUT: Increment this value by PMA size */ 730 ){ 731 int rc; 732 733 assert( pFile->iEof>iStart ); 734 assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 ); 735 assert( pReadr->aBuffer==0 ); 736 assert( pReadr->aMap==0 ); 737 738 rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart); 739 if( rc==SQLITE_OK ){ 740 u64 nByte = 0; /* Size of PMA in bytes */ 741 rc = vdbePmaReadVarint(pReadr, &nByte); 742 pReadr->iEof = pReadr->iReadOff + nByte; 743 *pnByte += nByte; 744 } 745 746 if( rc==SQLITE_OK ){ 747 rc = vdbePmaReaderNext(pReadr); 748 } 749 return rc; 750 } 751 752 /* 753 ** A version of vdbeSorterCompare() that assumes that it has already been 754 ** determined that the first field of key1 is equal to the first field of 755 ** key2. 756 */ 757 static int vdbeSorterCompareTail( 758 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ 759 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ 760 const void *pKey1, int nKey1, /* Left side of comparison */ 761 const void *pKey2, int nKey2 /* Right side of comparison */ 762 ){ 763 UnpackedRecord *r2 = pTask->pUnpacked; 764 if( *pbKey2Cached==0 ){ 765 sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2); 766 *pbKey2Cached = 1; 767 } 768 return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, r2, 1); 769 } 770 771 /* 772 ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, 773 ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences 774 ** used by the comparison. Return the result of the comparison. 775 ** 776 ** If IN/OUT parameter *pbKey2Cached is true when this function is called, 777 ** it is assumed that (pTask->pUnpacked) contains the unpacked version 778 ** of key2. If it is false, (pTask->pUnpacked) is populated with the unpacked 779 ** version of key2 and *pbKey2Cached set to true before returning. 780 ** 781 ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set 782 ** to SQLITE_NOMEM. 783 */ 784 static int vdbeSorterCompare( 785 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ 786 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ 787 const void *pKey1, int nKey1, /* Left side of comparison */ 788 const void *pKey2, int nKey2 /* Right side of comparison */ 789 ){ 790 UnpackedRecord *r2 = pTask->pUnpacked; 791 if( !*pbKey2Cached ){ 792 sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2); 793 *pbKey2Cached = 1; 794 } 795 return sqlite3VdbeRecordCompare(nKey1, pKey1, r2); 796 } 797 798 /* 799 ** A specially optimized version of vdbeSorterCompare() that assumes that 800 ** the first field of each key is a TEXT value and that the collation 801 ** sequence to compare them with is BINARY. 802 */ 803 static int vdbeSorterCompareText( 804 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ 805 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ 806 const void *pKey1, int nKey1, /* Left side of comparison */ 807 const void *pKey2, int nKey2 /* Right side of comparison */ 808 ){ 809 const u8 * const p1 = (const u8 * const)pKey1; 810 const u8 * const p2 = (const u8 * const)pKey2; 811 const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */ 812 const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */ 813 814 int n1; 815 int n2; 816 int res; 817 818 getVarint32NR(&p1[1], n1); 819 getVarint32NR(&p2[1], n2); 820 res = memcmp(v1, v2, (MIN(n1, n2) - 13)/2); 821 if( res==0 ){ 822 res = n1 - n2; 823 } 824 825 if( res==0 ){ 826 if( pTask->pSorter->pKeyInfo->nKeyField>1 ){ 827 res = vdbeSorterCompareTail( 828 pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2 829 ); 830 } 831 }else{ 832 assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) ); 833 if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){ 834 res = res * -1; 835 } 836 } 837 838 return res; 839 } 840 841 /* 842 ** A specially optimized version of vdbeSorterCompare() that assumes that 843 ** the first field of each key is an INTEGER value. 844 */ 845 static int vdbeSorterCompareInt( 846 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ 847 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ 848 const void *pKey1, int nKey1, /* Left side of comparison */ 849 const void *pKey2, int nKey2 /* Right side of comparison */ 850 ){ 851 const u8 * const p1 = (const u8 * const)pKey1; 852 const u8 * const p2 = (const u8 * const)pKey2; 853 const int s1 = p1[1]; /* Left hand serial type */ 854 const int s2 = p2[1]; /* Right hand serial type */ 855 const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */ 856 const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */ 857 int res; /* Return value */ 858 859 assert( (s1>0 && s1<7) || s1==8 || s1==9 ); 860 assert( (s2>0 && s2<7) || s2==8 || s2==9 ); 861 862 if( s1==s2 ){ 863 /* The two values have the same sign. Compare using memcmp(). */ 864 static const u8 aLen[] = {0, 1, 2, 3, 4, 6, 8, 0, 0, 0 }; 865 const u8 n = aLen[s1]; 866 int i; 867 res = 0; 868 for(i=0; i<n; i++){ 869 if( (res = v1[i] - v2[i])!=0 ){ 870 if( ((v1[0] ^ v2[0]) & 0x80)!=0 ){ 871 res = v1[0] & 0x80 ? -1 : +1; 872 } 873 break; 874 } 875 } 876 }else if( s1>7 && s2>7 ){ 877 res = s1 - s2; 878 }else{ 879 if( s2>7 ){ 880 res = +1; 881 }else if( s1>7 ){ 882 res = -1; 883 }else{ 884 res = s1 - s2; 885 } 886 assert( res!=0 ); 887 888 if( res>0 ){ 889 if( *v1 & 0x80 ) res = -1; 890 }else{ 891 if( *v2 & 0x80 ) res = +1; 892 } 893 } 894 895 if( res==0 ){ 896 if( pTask->pSorter->pKeyInfo->nKeyField>1 ){ 897 res = vdbeSorterCompareTail( 898 pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2 899 ); 900 } 901 }else if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){ 902 assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) ); 903 res = res * -1; 904 } 905 906 return res; 907 } 908 909 /* 910 ** Initialize the temporary index cursor just opened as a sorter cursor. 911 ** 912 ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nKeyField) 913 ** to determine the number of fields that should be compared from the 914 ** records being sorted. However, if the value passed as argument nField 915 ** is non-zero and the sorter is able to guarantee a stable sort, nField 916 ** is used instead. This is used when sorting records for a CREATE INDEX 917 ** statement. In this case, keys are always delivered to the sorter in 918 ** order of the primary key, which happens to be make up the final part 919 ** of the records being sorted. So if the sort is stable, there is never 920 ** any reason to compare PK fields and they can be ignored for a small 921 ** performance boost. 922 ** 923 ** The sorter can guarantee a stable sort when running in single-threaded 924 ** mode, but not in multi-threaded mode. 925 ** 926 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. 927 */ 928 int sqlite3VdbeSorterInit( 929 sqlite3 *db, /* Database connection (for malloc()) */ 930 int nField, /* Number of key fields in each record */ 931 VdbeCursor *pCsr /* Cursor that holds the new sorter */ 932 ){ 933 int pgsz; /* Page size of main database */ 934 int i; /* Used to iterate through aTask[] */ 935 VdbeSorter *pSorter; /* The new sorter */ 936 KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */ 937 int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */ 938 int sz; /* Size of pSorter in bytes */ 939 int rc = SQLITE_OK; 940 #if SQLITE_MAX_WORKER_THREADS==0 941 # define nWorker 0 942 #else 943 int nWorker; 944 #endif 945 946 /* Initialize the upper limit on the number of worker threads */ 947 #if SQLITE_MAX_WORKER_THREADS>0 948 if( sqlite3TempInMemory(db) || sqlite3GlobalConfig.bCoreMutex==0 ){ 949 nWorker = 0; 950 }else{ 951 nWorker = db->aLimit[SQLITE_LIMIT_WORKER_THREADS]; 952 } 953 #endif 954 955 /* Do not allow the total number of threads (main thread + all workers) 956 ** to exceed the maximum merge count */ 957 #if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT 958 if( nWorker>=SORTER_MAX_MERGE_COUNT ){ 959 nWorker = SORTER_MAX_MERGE_COUNT-1; 960 } 961 #endif 962 963 assert( pCsr->pKeyInfo ); 964 assert( !pCsr->isEphemeral ); 965 assert( pCsr->eCurType==CURTYPE_SORTER ); 966 szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nKeyField-1)*sizeof(CollSeq*); 967 sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask); 968 969 pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo); 970 pCsr->uc.pSorter = pSorter; 971 if( pSorter==0 ){ 972 rc = SQLITE_NOMEM_BKPT; 973 }else{ 974 Btree *pBt = db->aDb[0].pBt; 975 pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz); 976 memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo); 977 pKeyInfo->db = 0; 978 if( nField && nWorker==0 ){ 979 pKeyInfo->nKeyField = nField; 980 } 981 sqlite3BtreeEnter(pBt); 982 pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(pBt); 983 sqlite3BtreeLeave(pBt); 984 pSorter->nTask = nWorker + 1; 985 pSorter->iPrev = (u8)(nWorker - 1); 986 pSorter->bUseThreads = (pSorter->nTask>1); 987 pSorter->db = db; 988 for(i=0; i<pSorter->nTask; i++){ 989 SortSubtask *pTask = &pSorter->aTask[i]; 990 pTask->pSorter = pSorter; 991 } 992 993 if( !sqlite3TempInMemory(db) ){ 994 i64 mxCache; /* Cache size in bytes*/ 995 u32 szPma = sqlite3GlobalConfig.szPma; 996 pSorter->mnPmaSize = szPma * pgsz; 997 998 mxCache = db->aDb[0].pSchema->cache_size; 999 if( mxCache<0 ){ 1000 /* A negative cache-size value C indicates that the cache is abs(C) 1001 ** KiB in size. */ 1002 mxCache = mxCache * -1024; 1003 }else{ 1004 mxCache = mxCache * pgsz; 1005 } 1006 mxCache = MIN(mxCache, SQLITE_MAX_PMASZ); 1007 pSorter->mxPmaSize = MAX(pSorter->mnPmaSize, (int)mxCache); 1008 1009 /* Avoid large memory allocations if the application has requested 1010 ** SQLITE_CONFIG_SMALL_MALLOC. */ 1011 if( sqlite3GlobalConfig.bSmallMalloc==0 ){ 1012 assert( pSorter->iMemory==0 ); 1013 pSorter->nMemory = pgsz; 1014 pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz); 1015 if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM_BKPT; 1016 } 1017 } 1018 1019 if( pKeyInfo->nAllField<13 1020 && (pKeyInfo->aColl[0]==0 || pKeyInfo->aColl[0]==db->pDfltColl) 1021 && (pKeyInfo->aSortFlags[0] & KEYINFO_ORDER_BIGNULL)==0 1022 ){ 1023 pSorter->typeMask = SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT; 1024 } 1025 } 1026 1027 return rc; 1028 } 1029 #undef nWorker /* Defined at the top of this function */ 1030 1031 /* 1032 ** Free the list of sorted records starting at pRecord. 1033 */ 1034 static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){ 1035 SorterRecord *p; 1036 SorterRecord *pNext; 1037 for(p=pRecord; p; p=pNext){ 1038 pNext = p->u.pNext; 1039 sqlite3DbFree(db, p); 1040 } 1041 } 1042 1043 /* 1044 ** Free all resources owned by the object indicated by argument pTask. All 1045 ** fields of *pTask are zeroed before returning. 1046 */ 1047 static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){ 1048 sqlite3DbFree(db, pTask->pUnpacked); 1049 #if SQLITE_MAX_WORKER_THREADS>0 1050 /* pTask->list.aMemory can only be non-zero if it was handed memory 1051 ** from the main thread. That only occurs SQLITE_MAX_WORKER_THREADS>0 */ 1052 if( pTask->list.aMemory ){ 1053 sqlite3_free(pTask->list.aMemory); 1054 }else 1055 #endif 1056 { 1057 assert( pTask->list.aMemory==0 ); 1058 vdbeSorterRecordFree(0, pTask->list.pList); 1059 } 1060 if( pTask->file.pFd ){ 1061 sqlite3OsCloseFree(pTask->file.pFd); 1062 } 1063 if( pTask->file2.pFd ){ 1064 sqlite3OsCloseFree(pTask->file2.pFd); 1065 } 1066 memset(pTask, 0, sizeof(SortSubtask)); 1067 } 1068 1069 #ifdef SQLITE_DEBUG_SORTER_THREADS 1070 static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){ 1071 i64 t; 1072 int iTask = (pTask - pTask->pSorter->aTask); 1073 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); 1074 fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent); 1075 } 1076 static void vdbeSorterRewindDebug(const char *zEvent){ 1077 i64 t = 0; 1078 sqlite3_vfs *pVfs = sqlite3_vfs_find(0); 1079 if( ALWAYS(pVfs) ) sqlite3OsCurrentTimeInt64(pVfs, &t); 1080 fprintf(stderr, "%lld:X %s\n", t, zEvent); 1081 } 1082 static void vdbeSorterPopulateDebug( 1083 SortSubtask *pTask, 1084 const char *zEvent 1085 ){ 1086 i64 t; 1087 int iTask = (pTask - pTask->pSorter->aTask); 1088 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); 1089 fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent); 1090 } 1091 static void vdbeSorterBlockDebug( 1092 SortSubtask *pTask, 1093 int bBlocked, 1094 const char *zEvent 1095 ){ 1096 if( bBlocked ){ 1097 i64 t; 1098 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); 1099 fprintf(stderr, "%lld:main %s\n", t, zEvent); 1100 } 1101 } 1102 #else 1103 # define vdbeSorterWorkDebug(x,y) 1104 # define vdbeSorterRewindDebug(y) 1105 # define vdbeSorterPopulateDebug(x,y) 1106 # define vdbeSorterBlockDebug(x,y,z) 1107 #endif 1108 1109 #if SQLITE_MAX_WORKER_THREADS>0 1110 /* 1111 ** Join thread pTask->thread. 1112 */ 1113 static int vdbeSorterJoinThread(SortSubtask *pTask){ 1114 int rc = SQLITE_OK; 1115 if( pTask->pThread ){ 1116 #ifdef SQLITE_DEBUG_SORTER_THREADS 1117 int bDone = pTask->bDone; 1118 #endif 1119 void *pRet = SQLITE_INT_TO_PTR(SQLITE_ERROR); 1120 vdbeSorterBlockDebug(pTask, !bDone, "enter"); 1121 (void)sqlite3ThreadJoin(pTask->pThread, &pRet); 1122 vdbeSorterBlockDebug(pTask, !bDone, "exit"); 1123 rc = SQLITE_PTR_TO_INT(pRet); 1124 assert( pTask->bDone==1 ); 1125 pTask->bDone = 0; 1126 pTask->pThread = 0; 1127 } 1128 return rc; 1129 } 1130 1131 /* 1132 ** Launch a background thread to run xTask(pIn). 1133 */ 1134 static int vdbeSorterCreateThread( 1135 SortSubtask *pTask, /* Thread will use this task object */ 1136 void *(*xTask)(void*), /* Routine to run in a separate thread */ 1137 void *pIn /* Argument passed into xTask() */ 1138 ){ 1139 assert( pTask->pThread==0 && pTask->bDone==0 ); 1140 return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn); 1141 } 1142 1143 /* 1144 ** Join all outstanding threads launched by SorterWrite() to create 1145 ** level-0 PMAs. 1146 */ 1147 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ 1148 int rc = rcin; 1149 int i; 1150 1151 /* This function is always called by the main user thread. 1152 ** 1153 ** If this function is being called after SorterRewind() has been called, 1154 ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread 1155 ** is currently attempt to join one of the other threads. To avoid a race 1156 ** condition where this thread also attempts to join the same object, join 1157 ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */ 1158 for(i=pSorter->nTask-1; i>=0; i--){ 1159 SortSubtask *pTask = &pSorter->aTask[i]; 1160 int rc2 = vdbeSorterJoinThread(pTask); 1161 if( rc==SQLITE_OK ) rc = rc2; 1162 } 1163 return rc; 1164 } 1165 #else 1166 # define vdbeSorterJoinAll(x,rcin) (rcin) 1167 # define vdbeSorterJoinThread(pTask) SQLITE_OK 1168 #endif 1169 1170 /* 1171 ** Allocate a new MergeEngine object capable of handling up to 1172 ** nReader PmaReader inputs. 1173 ** 1174 ** nReader is automatically rounded up to the next power of two. 1175 ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up. 1176 */ 1177 static MergeEngine *vdbeMergeEngineNew(int nReader){ 1178 int N = 2; /* Smallest power of two >= nReader */ 1179 int nByte; /* Total bytes of space to allocate */ 1180 MergeEngine *pNew; /* Pointer to allocated object to return */ 1181 1182 assert( nReader<=SORTER_MAX_MERGE_COUNT ); 1183 1184 while( N<nReader ) N += N; 1185 nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader)); 1186 1187 pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte); 1188 if( pNew ){ 1189 pNew->nTree = N; 1190 pNew->pTask = 0; 1191 pNew->aReadr = (PmaReader*)&pNew[1]; 1192 pNew->aTree = (int*)&pNew->aReadr[N]; 1193 } 1194 return pNew; 1195 } 1196 1197 /* 1198 ** Free the MergeEngine object passed as the only argument. 1199 */ 1200 static void vdbeMergeEngineFree(MergeEngine *pMerger){ 1201 int i; 1202 if( pMerger ){ 1203 for(i=0; i<pMerger->nTree; i++){ 1204 vdbePmaReaderClear(&pMerger->aReadr[i]); 1205 } 1206 } 1207 sqlite3_free(pMerger); 1208 } 1209 1210 /* 1211 ** Free all resources associated with the IncrMerger object indicated by 1212 ** the first argument. 1213 */ 1214 static void vdbeIncrFree(IncrMerger *pIncr){ 1215 if( pIncr ){ 1216 #if SQLITE_MAX_WORKER_THREADS>0 1217 if( pIncr->bUseThread ){ 1218 vdbeSorterJoinThread(pIncr->pTask); 1219 if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd); 1220 if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd); 1221 } 1222 #endif 1223 vdbeMergeEngineFree(pIncr->pMerger); 1224 sqlite3_free(pIncr); 1225 } 1226 } 1227 1228 /* 1229 ** Reset a sorting cursor back to its original empty state. 1230 */ 1231 void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ 1232 int i; 1233 (void)vdbeSorterJoinAll(pSorter, SQLITE_OK); 1234 assert( pSorter->bUseThreads || pSorter->pReader==0 ); 1235 #if SQLITE_MAX_WORKER_THREADS>0 1236 if( pSorter->pReader ){ 1237 vdbePmaReaderClear(pSorter->pReader); 1238 sqlite3DbFree(db, pSorter->pReader); 1239 pSorter->pReader = 0; 1240 } 1241 #endif 1242 vdbeMergeEngineFree(pSorter->pMerger); 1243 pSorter->pMerger = 0; 1244 for(i=0; i<pSorter->nTask; i++){ 1245 SortSubtask *pTask = &pSorter->aTask[i]; 1246 vdbeSortSubtaskCleanup(db, pTask); 1247 pTask->pSorter = pSorter; 1248 } 1249 if( pSorter->list.aMemory==0 ){ 1250 vdbeSorterRecordFree(0, pSorter->list.pList); 1251 } 1252 pSorter->list.pList = 0; 1253 pSorter->list.szPMA = 0; 1254 pSorter->bUsePMA = 0; 1255 pSorter->iMemory = 0; 1256 pSorter->mxKeysize = 0; 1257 sqlite3DbFree(db, pSorter->pUnpacked); 1258 pSorter->pUnpacked = 0; 1259 } 1260 1261 /* 1262 ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines. 1263 */ 1264 void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ 1265 VdbeSorter *pSorter; 1266 assert( pCsr->eCurType==CURTYPE_SORTER ); 1267 pSorter = pCsr->uc.pSorter; 1268 if( pSorter ){ 1269 sqlite3VdbeSorterReset(db, pSorter); 1270 sqlite3_free(pSorter->list.aMemory); 1271 sqlite3DbFree(db, pSorter); 1272 pCsr->uc.pSorter = 0; 1273 } 1274 } 1275 1276 #if SQLITE_MAX_MMAP_SIZE>0 1277 /* 1278 ** The first argument is a file-handle open on a temporary file. The file 1279 ** is guaranteed to be nByte bytes or smaller in size. This function 1280 ** attempts to extend the file to nByte bytes in size and to ensure that 1281 ** the VFS has memory mapped it. 1282 ** 1283 ** Whether or not the file does end up memory mapped of course depends on 1284 ** the specific VFS implementation. 1285 */ 1286 static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){ 1287 if( nByte<=(i64)(db->nMaxSorterMmap) && pFd->pMethods->iVersion>=3 ){ 1288 void *p = 0; 1289 int chunksize = 4*1024; 1290 sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_CHUNK_SIZE, &chunksize); 1291 sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_SIZE_HINT, &nByte); 1292 sqlite3OsFetch(pFd, 0, (int)nByte, &p); 1293 if( p ) sqlite3OsUnfetch(pFd, 0, p); 1294 } 1295 } 1296 #else 1297 # define vdbeSorterExtendFile(x,y,z) 1298 #endif 1299 1300 /* 1301 ** Allocate space for a file-handle and open a temporary file. If successful, 1302 ** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK. 1303 ** Otherwise, set *ppFd to 0 and return an SQLite error code. 1304 */ 1305 static int vdbeSorterOpenTempFile( 1306 sqlite3 *db, /* Database handle doing sort */ 1307 i64 nExtend, /* Attempt to extend file to this size */ 1308 sqlite3_file **ppFd 1309 ){ 1310 int rc; 1311 if( sqlite3FaultSim(202) ) return SQLITE_IOERR_ACCESS; 1312 rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd, 1313 SQLITE_OPEN_TEMP_JOURNAL | 1314 SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | 1315 SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &rc 1316 ); 1317 if( rc==SQLITE_OK ){ 1318 i64 max = SQLITE_MAX_MMAP_SIZE; 1319 sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max); 1320 if( nExtend>0 ){ 1321 vdbeSorterExtendFile(db, *ppFd, nExtend); 1322 } 1323 } 1324 return rc; 1325 } 1326 1327 /* 1328 ** If it has not already been allocated, allocate the UnpackedRecord 1329 ** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or 1330 ** if no allocation was required), or SQLITE_NOMEM otherwise. 1331 */ 1332 static int vdbeSortAllocUnpacked(SortSubtask *pTask){ 1333 if( pTask->pUnpacked==0 ){ 1334 pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pTask->pSorter->pKeyInfo); 1335 if( pTask->pUnpacked==0 ) return SQLITE_NOMEM_BKPT; 1336 pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nKeyField; 1337 pTask->pUnpacked->errCode = 0; 1338 } 1339 return SQLITE_OK; 1340 } 1341 1342 1343 /* 1344 ** Merge the two sorted lists p1 and p2 into a single list. 1345 */ 1346 static SorterRecord *vdbeSorterMerge( 1347 SortSubtask *pTask, /* Calling thread context */ 1348 SorterRecord *p1, /* First list to merge */ 1349 SorterRecord *p2 /* Second list to merge */ 1350 ){ 1351 SorterRecord *pFinal = 0; 1352 SorterRecord **pp = &pFinal; 1353 int bCached = 0; 1354 1355 assert( p1!=0 && p2!=0 ); 1356 for(;;){ 1357 int res; 1358 res = pTask->xCompare( 1359 pTask, &bCached, SRVAL(p1), p1->nVal, SRVAL(p2), p2->nVal 1360 ); 1361 1362 if( res<=0 ){ 1363 *pp = p1; 1364 pp = &p1->u.pNext; 1365 p1 = p1->u.pNext; 1366 if( p1==0 ){ 1367 *pp = p2; 1368 break; 1369 } 1370 }else{ 1371 *pp = p2; 1372 pp = &p2->u.pNext; 1373 p2 = p2->u.pNext; 1374 bCached = 0; 1375 if( p2==0 ){ 1376 *pp = p1; 1377 break; 1378 } 1379 } 1380 } 1381 return pFinal; 1382 } 1383 1384 /* 1385 ** Return the SorterCompare function to compare values collected by the 1386 ** sorter object passed as the only argument. 1387 */ 1388 static SorterCompare vdbeSorterGetCompare(VdbeSorter *p){ 1389 if( p->typeMask==SORTER_TYPE_INTEGER ){ 1390 return vdbeSorterCompareInt; 1391 }else if( p->typeMask==SORTER_TYPE_TEXT ){ 1392 return vdbeSorterCompareText; 1393 } 1394 return vdbeSorterCompare; 1395 } 1396 1397 /* 1398 ** Sort the linked list of records headed at pTask->pList. Return 1399 ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 1400 ** an error occurs. 1401 */ 1402 static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){ 1403 int i; 1404 SorterRecord *p; 1405 int rc; 1406 SorterRecord *aSlot[64]; 1407 1408 rc = vdbeSortAllocUnpacked(pTask); 1409 if( rc!=SQLITE_OK ) return rc; 1410 1411 p = pList->pList; 1412 pTask->xCompare = vdbeSorterGetCompare(pTask->pSorter); 1413 memset(aSlot, 0, sizeof(aSlot)); 1414 1415 while( p ){ 1416 SorterRecord *pNext; 1417 if( pList->aMemory ){ 1418 if( (u8*)p==pList->aMemory ){ 1419 pNext = 0; 1420 }else{ 1421 assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) ); 1422 pNext = (SorterRecord*)&pList->aMemory[p->u.iNext]; 1423 } 1424 }else{ 1425 pNext = p->u.pNext; 1426 } 1427 1428 p->u.pNext = 0; 1429 for(i=0; aSlot[i]; i++){ 1430 p = vdbeSorterMerge(pTask, p, aSlot[i]); 1431 aSlot[i] = 0; 1432 } 1433 aSlot[i] = p; 1434 p = pNext; 1435 } 1436 1437 p = 0; 1438 for(i=0; i<ArraySize(aSlot); i++){ 1439 if( aSlot[i]==0 ) continue; 1440 p = p ? vdbeSorterMerge(pTask, p, aSlot[i]) : aSlot[i]; 1441 } 1442 pList->pList = p; 1443 1444 assert( pTask->pUnpacked->errCode==SQLITE_OK 1445 || pTask->pUnpacked->errCode==SQLITE_NOMEM 1446 ); 1447 return pTask->pUnpacked->errCode; 1448 } 1449 1450 /* 1451 ** Initialize a PMA-writer object. 1452 */ 1453 static void vdbePmaWriterInit( 1454 sqlite3_file *pFd, /* File handle to write to */ 1455 PmaWriter *p, /* Object to populate */ 1456 int nBuf, /* Buffer size */ 1457 i64 iStart /* Offset of pFd to begin writing at */ 1458 ){ 1459 memset(p, 0, sizeof(PmaWriter)); 1460 p->aBuffer = (u8*)sqlite3Malloc(nBuf); 1461 if( !p->aBuffer ){ 1462 p->eFWErr = SQLITE_NOMEM_BKPT; 1463 }else{ 1464 p->iBufEnd = p->iBufStart = (iStart % nBuf); 1465 p->iWriteOff = iStart - p->iBufStart; 1466 p->nBuffer = nBuf; 1467 p->pFd = pFd; 1468 } 1469 } 1470 1471 /* 1472 ** Write nData bytes of data to the PMA. Return SQLITE_OK 1473 ** if successful, or an SQLite error code if an error occurs. 1474 */ 1475 static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){ 1476 int nRem = nData; 1477 while( nRem>0 && p->eFWErr==0 ){ 1478 int nCopy = nRem; 1479 if( nCopy>(p->nBuffer - p->iBufEnd) ){ 1480 nCopy = p->nBuffer - p->iBufEnd; 1481 } 1482 1483 memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy); 1484 p->iBufEnd += nCopy; 1485 if( p->iBufEnd==p->nBuffer ){ 1486 p->eFWErr = sqlite3OsWrite(p->pFd, 1487 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, 1488 p->iWriteOff + p->iBufStart 1489 ); 1490 p->iBufStart = p->iBufEnd = 0; 1491 p->iWriteOff += p->nBuffer; 1492 } 1493 assert( p->iBufEnd<p->nBuffer ); 1494 1495 nRem -= nCopy; 1496 } 1497 } 1498 1499 /* 1500 ** Flush any buffered data to disk and clean up the PMA-writer object. 1501 ** The results of using the PMA-writer after this call are undefined. 1502 ** Return SQLITE_OK if flushing the buffered data succeeds or is not 1503 ** required. Otherwise, return an SQLite error code. 1504 ** 1505 ** Before returning, set *piEof to the offset immediately following the 1506 ** last byte written to the file. 1507 */ 1508 static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){ 1509 int rc; 1510 if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){ 1511 p->eFWErr = sqlite3OsWrite(p->pFd, 1512 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, 1513 p->iWriteOff + p->iBufStart 1514 ); 1515 } 1516 *piEof = (p->iWriteOff + p->iBufEnd); 1517 sqlite3_free(p->aBuffer); 1518 rc = p->eFWErr; 1519 memset(p, 0, sizeof(PmaWriter)); 1520 return rc; 1521 } 1522 1523 /* 1524 ** Write value iVal encoded as a varint to the PMA. Return 1525 ** SQLITE_OK if successful, or an SQLite error code if an error occurs. 1526 */ 1527 static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){ 1528 int nByte; 1529 u8 aByte[10]; 1530 nByte = sqlite3PutVarint(aByte, iVal); 1531 vdbePmaWriteBlob(p, aByte, nByte); 1532 } 1533 1534 /* 1535 ** Write the current contents of in-memory linked-list pList to a level-0 1536 ** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if 1537 ** successful, or an SQLite error code otherwise. 1538 ** 1539 ** The format of a PMA is: 1540 ** 1541 ** * A varint. This varint contains the total number of bytes of content 1542 ** in the PMA (not including the varint itself). 1543 ** 1544 ** * One or more records packed end-to-end in order of ascending keys. 1545 ** Each record consists of a varint followed by a blob of data (the 1546 ** key). The varint is the number of bytes in the blob of data. 1547 */ 1548 static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){ 1549 sqlite3 *db = pTask->pSorter->db; 1550 int rc = SQLITE_OK; /* Return code */ 1551 PmaWriter writer; /* Object used to write to the file */ 1552 1553 #ifdef SQLITE_DEBUG 1554 /* Set iSz to the expected size of file pTask->file after writing the PMA. 1555 ** This is used by an assert() statement at the end of this function. */ 1556 i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof; 1557 #endif 1558 1559 vdbeSorterWorkDebug(pTask, "enter"); 1560 memset(&writer, 0, sizeof(PmaWriter)); 1561 assert( pList->szPMA>0 ); 1562 1563 /* If the first temporary PMA file has not been opened, open it now. */ 1564 if( pTask->file.pFd==0 ){ 1565 rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd); 1566 assert( rc!=SQLITE_OK || pTask->file.pFd ); 1567 assert( pTask->file.iEof==0 ); 1568 assert( pTask->nPMA==0 ); 1569 } 1570 1571 /* Try to get the file to memory map */ 1572 if( rc==SQLITE_OK ){ 1573 vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9); 1574 } 1575 1576 /* Sort the list */ 1577 if( rc==SQLITE_OK ){ 1578 rc = vdbeSorterSort(pTask, pList); 1579 } 1580 1581 if( rc==SQLITE_OK ){ 1582 SorterRecord *p; 1583 SorterRecord *pNext = 0; 1584 1585 vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz, 1586 pTask->file.iEof); 1587 pTask->nPMA++; 1588 vdbePmaWriteVarint(&writer, pList->szPMA); 1589 for(p=pList->pList; p; p=pNext){ 1590 pNext = p->u.pNext; 1591 vdbePmaWriteVarint(&writer, p->nVal); 1592 vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal); 1593 if( pList->aMemory==0 ) sqlite3_free(p); 1594 } 1595 pList->pList = p; 1596 rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof); 1597 } 1598 1599 vdbeSorterWorkDebug(pTask, "exit"); 1600 assert( rc!=SQLITE_OK || pList->pList==0 ); 1601 assert( rc!=SQLITE_OK || pTask->file.iEof==iSz ); 1602 return rc; 1603 } 1604 1605 /* 1606 ** Advance the MergeEngine to its next entry. 1607 ** Set *pbEof to true there is no next entry because 1608 ** the MergeEngine has reached the end of all its inputs. 1609 ** 1610 ** Return SQLITE_OK if successful or an error code if an error occurs. 1611 */ 1612 static int vdbeMergeEngineStep( 1613 MergeEngine *pMerger, /* The merge engine to advance to the next row */ 1614 int *pbEof /* Set TRUE at EOF. Set false for more content */ 1615 ){ 1616 int rc; 1617 int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */ 1618 SortSubtask *pTask = pMerger->pTask; 1619 1620 /* Advance the current PmaReader */ 1621 rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]); 1622 1623 /* Update contents of aTree[] */ 1624 if( rc==SQLITE_OK ){ 1625 int i; /* Index of aTree[] to recalculate */ 1626 PmaReader *pReadr1; /* First PmaReader to compare */ 1627 PmaReader *pReadr2; /* Second PmaReader to compare */ 1628 int bCached = 0; 1629 1630 /* Find the first two PmaReaders to compare. The one that was just 1631 ** advanced (iPrev) and the one next to it in the array. */ 1632 pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)]; 1633 pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)]; 1634 1635 for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){ 1636 /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */ 1637 int iRes; 1638 if( pReadr1->pFd==0 ){ 1639 iRes = +1; 1640 }else if( pReadr2->pFd==0 ){ 1641 iRes = -1; 1642 }else{ 1643 iRes = pTask->xCompare(pTask, &bCached, 1644 pReadr1->aKey, pReadr1->nKey, pReadr2->aKey, pReadr2->nKey 1645 ); 1646 } 1647 1648 /* If pReadr1 contained the smaller value, set aTree[i] to its index. 1649 ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this 1650 ** case there is no cache of pReadr2 in pTask->pUnpacked, so set 1651 ** pKey2 to point to the record belonging to pReadr2. 1652 ** 1653 ** Alternatively, if pReadr2 contains the smaller of the two values, 1654 ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare() 1655 ** was actually called above, then pTask->pUnpacked now contains 1656 ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent 1657 ** vdbeSorterCompare() from decoding pReadr2 again. 1658 ** 1659 ** If the two values were equal, then the value from the oldest 1660 ** PMA should be considered smaller. The VdbeSorter.aReadr[] array 1661 ** is sorted from oldest to newest, so pReadr1 contains older values 1662 ** than pReadr2 iff (pReadr1<pReadr2). */ 1663 if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){ 1664 pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr); 1665 pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; 1666 bCached = 0; 1667 }else{ 1668 if( pReadr1->pFd ) bCached = 0; 1669 pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr); 1670 pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; 1671 } 1672 } 1673 *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0); 1674 } 1675 1676 return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc); 1677 } 1678 1679 #if SQLITE_MAX_WORKER_THREADS>0 1680 /* 1681 ** The main routine for background threads that write level-0 PMAs. 1682 */ 1683 static void *vdbeSorterFlushThread(void *pCtx){ 1684 SortSubtask *pTask = (SortSubtask*)pCtx; 1685 int rc; /* Return code */ 1686 assert( pTask->bDone==0 ); 1687 rc = vdbeSorterListToPMA(pTask, &pTask->list); 1688 pTask->bDone = 1; 1689 return SQLITE_INT_TO_PTR(rc); 1690 } 1691 #endif /* SQLITE_MAX_WORKER_THREADS>0 */ 1692 1693 /* 1694 ** Flush the current contents of VdbeSorter.list to a new PMA, possibly 1695 ** using a background thread. 1696 */ 1697 static int vdbeSorterFlushPMA(VdbeSorter *pSorter){ 1698 #if SQLITE_MAX_WORKER_THREADS==0 1699 pSorter->bUsePMA = 1; 1700 return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list); 1701 #else 1702 int rc = SQLITE_OK; 1703 int i; 1704 SortSubtask *pTask = 0; /* Thread context used to create new PMA */ 1705 int nWorker = (pSorter->nTask-1); 1706 1707 /* Set the flag to indicate that at least one PMA has been written. 1708 ** Or will be, anyhow. */ 1709 pSorter->bUsePMA = 1; 1710 1711 /* Select a sub-task to sort and flush the current list of in-memory 1712 ** records to disk. If the sorter is running in multi-threaded mode, 1713 ** round-robin between the first (pSorter->nTask-1) tasks. Except, if 1714 ** the background thread from a sub-tasks previous turn is still running, 1715 ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy, 1716 ** fall back to using the final sub-task. The first (pSorter->nTask-1) 1717 ** sub-tasks are prefered as they use background threads - the final 1718 ** sub-task uses the main thread. */ 1719 for(i=0; i<nWorker; i++){ 1720 int iTest = (pSorter->iPrev + i + 1) % nWorker; 1721 pTask = &pSorter->aTask[iTest]; 1722 if( pTask->bDone ){ 1723 rc = vdbeSorterJoinThread(pTask); 1724 } 1725 if( rc!=SQLITE_OK || pTask->pThread==0 ) break; 1726 } 1727 1728 if( rc==SQLITE_OK ){ 1729 if( i==nWorker ){ 1730 /* Use the foreground thread for this operation */ 1731 rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list); 1732 }else{ 1733 /* Launch a background thread for this operation */ 1734 u8 *aMem; 1735 void *pCtx; 1736 1737 assert( pTask!=0 ); 1738 assert( pTask->pThread==0 && pTask->bDone==0 ); 1739 assert( pTask->list.pList==0 ); 1740 assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 ); 1741 1742 aMem = pTask->list.aMemory; 1743 pCtx = (void*)pTask; 1744 pSorter->iPrev = (u8)(pTask - pSorter->aTask); 1745 pTask->list = pSorter->list; 1746 pSorter->list.pList = 0; 1747 pSorter->list.szPMA = 0; 1748 if( aMem ){ 1749 pSorter->list.aMemory = aMem; 1750 pSorter->nMemory = sqlite3MallocSize(aMem); 1751 }else if( pSorter->list.aMemory ){ 1752 pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory); 1753 if( !pSorter->list.aMemory ) return SQLITE_NOMEM_BKPT; 1754 } 1755 1756 rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx); 1757 } 1758 } 1759 1760 return rc; 1761 #endif /* SQLITE_MAX_WORKER_THREADS!=0 */ 1762 } 1763 1764 /* 1765 ** Add a record to the sorter. 1766 */ 1767 int sqlite3VdbeSorterWrite( 1768 const VdbeCursor *pCsr, /* Sorter cursor */ 1769 Mem *pVal /* Memory cell containing record */ 1770 ){ 1771 VdbeSorter *pSorter; 1772 int rc = SQLITE_OK; /* Return Code */ 1773 SorterRecord *pNew; /* New list element */ 1774 int bFlush; /* True to flush contents of memory to PMA */ 1775 int nReq; /* Bytes of memory required */ 1776 int nPMA; /* Bytes of PMA space required */ 1777 int t; /* serial type of first record field */ 1778 1779 assert( pCsr->eCurType==CURTYPE_SORTER ); 1780 pSorter = pCsr->uc.pSorter; 1781 getVarint32NR((const u8*)&pVal->z[1], t); 1782 if( t>0 && t<10 && t!=7 ){ 1783 pSorter->typeMask &= SORTER_TYPE_INTEGER; 1784 }else if( t>10 && (t & 0x01) ){ 1785 pSorter->typeMask &= SORTER_TYPE_TEXT; 1786 }else{ 1787 pSorter->typeMask = 0; 1788 } 1789 1790 assert( pSorter ); 1791 1792 /* Figure out whether or not the current contents of memory should be 1793 ** flushed to a PMA before continuing. If so, do so. 1794 ** 1795 ** If using the single large allocation mode (pSorter->aMemory!=0), then 1796 ** flush the contents of memory to a new PMA if (a) at least one value is 1797 ** already in memory and (b) the new value will not fit in memory. 1798 ** 1799 ** Or, if using separate allocations for each record, flush the contents 1800 ** of memory to a PMA if either of the following are true: 1801 ** 1802 ** * The total memory allocated for the in-memory list is greater 1803 ** than (page-size * cache-size), or 1804 ** 1805 ** * The total memory allocated for the in-memory list is greater 1806 ** than (page-size * 10) and sqlite3HeapNearlyFull() returns true. 1807 */ 1808 nReq = pVal->n + sizeof(SorterRecord); 1809 nPMA = pVal->n + sqlite3VarintLen(pVal->n); 1810 if( pSorter->mxPmaSize ){ 1811 if( pSorter->list.aMemory ){ 1812 bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize; 1813 }else{ 1814 bFlush = ( 1815 (pSorter->list.szPMA > pSorter->mxPmaSize) 1816 || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull()) 1817 ); 1818 } 1819 if( bFlush ){ 1820 rc = vdbeSorterFlushPMA(pSorter); 1821 pSorter->list.szPMA = 0; 1822 pSorter->iMemory = 0; 1823 assert( rc!=SQLITE_OK || pSorter->list.pList==0 ); 1824 } 1825 } 1826 1827 pSorter->list.szPMA += nPMA; 1828 if( nPMA>pSorter->mxKeysize ){ 1829 pSorter->mxKeysize = nPMA; 1830 } 1831 1832 if( pSorter->list.aMemory ){ 1833 int nMin = pSorter->iMemory + nReq; 1834 1835 if( nMin>pSorter->nMemory ){ 1836 u8 *aNew; 1837 sqlite3_int64 nNew = 2 * (sqlite3_int64)pSorter->nMemory; 1838 int iListOff = -1; 1839 if( pSorter->list.pList ){ 1840 iListOff = (u8*)pSorter->list.pList - pSorter->list.aMemory; 1841 } 1842 while( nNew < nMin ) nNew = nNew*2; 1843 if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize; 1844 if( nNew < nMin ) nNew = nMin; 1845 aNew = sqlite3Realloc(pSorter->list.aMemory, nNew); 1846 if( !aNew ) return SQLITE_NOMEM_BKPT; 1847 if( iListOff>=0 ){ 1848 pSorter->list.pList = (SorterRecord*)&aNew[iListOff]; 1849 } 1850 pSorter->list.aMemory = aNew; 1851 pSorter->nMemory = nNew; 1852 } 1853 1854 pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory]; 1855 pSorter->iMemory += ROUND8(nReq); 1856 if( pSorter->list.pList ){ 1857 pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory); 1858 } 1859 }else{ 1860 pNew = (SorterRecord *)sqlite3Malloc(nReq); 1861 if( pNew==0 ){ 1862 return SQLITE_NOMEM_BKPT; 1863 } 1864 pNew->u.pNext = pSorter->list.pList; 1865 } 1866 1867 memcpy(SRVAL(pNew), pVal->z, pVal->n); 1868 pNew->nVal = pVal->n; 1869 pSorter->list.pList = pNew; 1870 1871 return rc; 1872 } 1873 1874 /* 1875 ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format 1876 ** of the data stored in aFile[1] is the same as that used by regular PMAs, 1877 ** except that the number-of-bytes varint is omitted from the start. 1878 */ 1879 static int vdbeIncrPopulate(IncrMerger *pIncr){ 1880 int rc = SQLITE_OK; 1881 int rc2; 1882 i64 iStart = pIncr->iStartOff; 1883 SorterFile *pOut = &pIncr->aFile[1]; 1884 SortSubtask *pTask = pIncr->pTask; 1885 MergeEngine *pMerger = pIncr->pMerger; 1886 PmaWriter writer; 1887 assert( pIncr->bEof==0 ); 1888 1889 vdbeSorterPopulateDebug(pTask, "enter"); 1890 1891 vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart); 1892 while( rc==SQLITE_OK ){ 1893 int dummy; 1894 PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ]; 1895 int nKey = pReader->nKey; 1896 i64 iEof = writer.iWriteOff + writer.iBufEnd; 1897 1898 /* Check if the output file is full or if the input has been exhausted. 1899 ** In either case exit the loop. */ 1900 if( pReader->pFd==0 ) break; 1901 if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break; 1902 1903 /* Write the next key to the output. */ 1904 vdbePmaWriteVarint(&writer, nKey); 1905 vdbePmaWriteBlob(&writer, pReader->aKey, nKey); 1906 assert( pIncr->pMerger->pTask==pTask ); 1907 rc = vdbeMergeEngineStep(pIncr->pMerger, &dummy); 1908 } 1909 1910 rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof); 1911 if( rc==SQLITE_OK ) rc = rc2; 1912 vdbeSorterPopulateDebug(pTask, "exit"); 1913 return rc; 1914 } 1915 1916 #if SQLITE_MAX_WORKER_THREADS>0 1917 /* 1918 ** The main routine for background threads that populate aFile[1] of 1919 ** multi-threaded IncrMerger objects. 1920 */ 1921 static void *vdbeIncrPopulateThread(void *pCtx){ 1922 IncrMerger *pIncr = (IncrMerger*)pCtx; 1923 void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) ); 1924 pIncr->pTask->bDone = 1; 1925 return pRet; 1926 } 1927 1928 /* 1929 ** Launch a background thread to populate aFile[1] of pIncr. 1930 */ 1931 static int vdbeIncrBgPopulate(IncrMerger *pIncr){ 1932 void *p = (void*)pIncr; 1933 assert( pIncr->bUseThread ); 1934 return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p); 1935 } 1936 #endif 1937 1938 /* 1939 ** This function is called when the PmaReader corresponding to pIncr has 1940 ** finished reading the contents of aFile[0]. Its purpose is to "refill" 1941 ** aFile[0] such that the PmaReader should start rereading it from the 1942 ** beginning. 1943 ** 1944 ** For single-threaded objects, this is accomplished by literally reading 1945 ** keys from pIncr->pMerger and repopulating aFile[0]. 1946 ** 1947 ** For multi-threaded objects, all that is required is to wait until the 1948 ** background thread is finished (if it is not already) and then swap 1949 ** aFile[0] and aFile[1] in place. If the contents of pMerger have not 1950 ** been exhausted, this function also launches a new background thread 1951 ** to populate the new aFile[1]. 1952 ** 1953 ** SQLITE_OK is returned on success, or an SQLite error code otherwise. 1954 */ 1955 static int vdbeIncrSwap(IncrMerger *pIncr){ 1956 int rc = SQLITE_OK; 1957 1958 #if SQLITE_MAX_WORKER_THREADS>0 1959 if( pIncr->bUseThread ){ 1960 rc = vdbeSorterJoinThread(pIncr->pTask); 1961 1962 if( rc==SQLITE_OK ){ 1963 SorterFile f0 = pIncr->aFile[0]; 1964 pIncr->aFile[0] = pIncr->aFile[1]; 1965 pIncr->aFile[1] = f0; 1966 } 1967 1968 if( rc==SQLITE_OK ){ 1969 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ 1970 pIncr->bEof = 1; 1971 }else{ 1972 rc = vdbeIncrBgPopulate(pIncr); 1973 } 1974 } 1975 }else 1976 #endif 1977 { 1978 rc = vdbeIncrPopulate(pIncr); 1979 pIncr->aFile[0] = pIncr->aFile[1]; 1980 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ 1981 pIncr->bEof = 1; 1982 } 1983 } 1984 1985 return rc; 1986 } 1987 1988 /* 1989 ** Allocate and return a new IncrMerger object to read data from pMerger. 1990 ** 1991 ** If an OOM condition is encountered, return NULL. In this case free the 1992 ** pMerger argument before returning. 1993 */ 1994 static int vdbeIncrMergerNew( 1995 SortSubtask *pTask, /* The thread that will be using the new IncrMerger */ 1996 MergeEngine *pMerger, /* The MergeEngine that the IncrMerger will control */ 1997 IncrMerger **ppOut /* Write the new IncrMerger here */ 1998 ){ 1999 int rc = SQLITE_OK; 2000 IncrMerger *pIncr = *ppOut = (IncrMerger*) 2001 (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr))); 2002 if( pIncr ){ 2003 pIncr->pMerger = pMerger; 2004 pIncr->pTask = pTask; 2005 pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2); 2006 pTask->file2.iEof += pIncr->mxSz; 2007 }else{ 2008 vdbeMergeEngineFree(pMerger); 2009 rc = SQLITE_NOMEM_BKPT; 2010 } 2011 assert( *ppOut!=0 || rc!=SQLITE_OK ); 2012 return rc; 2013 } 2014 2015 #if SQLITE_MAX_WORKER_THREADS>0 2016 /* 2017 ** Set the "use-threads" flag on object pIncr. 2018 */ 2019 static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){ 2020 pIncr->bUseThread = 1; 2021 pIncr->pTask->file2.iEof -= pIncr->mxSz; 2022 } 2023 #endif /* SQLITE_MAX_WORKER_THREADS>0 */ 2024 2025 2026 2027 /* 2028 ** Recompute pMerger->aTree[iOut] by comparing the next keys on the 2029 ** two PmaReaders that feed that entry. Neither of the PmaReaders 2030 ** are advanced. This routine merely does the comparison. 2031 */ 2032 static void vdbeMergeEngineCompare( 2033 MergeEngine *pMerger, /* Merge engine containing PmaReaders to compare */ 2034 int iOut /* Store the result in pMerger->aTree[iOut] */ 2035 ){ 2036 int i1; 2037 int i2; 2038 int iRes; 2039 PmaReader *p1; 2040 PmaReader *p2; 2041 2042 assert( iOut<pMerger->nTree && iOut>0 ); 2043 2044 if( iOut>=(pMerger->nTree/2) ){ 2045 i1 = (iOut - pMerger->nTree/2) * 2; 2046 i2 = i1 + 1; 2047 }else{ 2048 i1 = pMerger->aTree[iOut*2]; 2049 i2 = pMerger->aTree[iOut*2+1]; 2050 } 2051 2052 p1 = &pMerger->aReadr[i1]; 2053 p2 = &pMerger->aReadr[i2]; 2054 2055 if( p1->pFd==0 ){ 2056 iRes = i2; 2057 }else if( p2->pFd==0 ){ 2058 iRes = i1; 2059 }else{ 2060 SortSubtask *pTask = pMerger->pTask; 2061 int bCached = 0; 2062 int res; 2063 assert( pTask->pUnpacked!=0 ); /* from vdbeSortSubtaskMain() */ 2064 res = pTask->xCompare( 2065 pTask, &bCached, p1->aKey, p1->nKey, p2->aKey, p2->nKey 2066 ); 2067 if( res<=0 ){ 2068 iRes = i1; 2069 }else{ 2070 iRes = i2; 2071 } 2072 } 2073 2074 pMerger->aTree[iOut] = iRes; 2075 } 2076 2077 /* 2078 ** Allowed values for the eMode parameter to vdbeMergeEngineInit() 2079 ** and vdbePmaReaderIncrMergeInit(). 2080 ** 2081 ** Only INCRINIT_NORMAL is valid in single-threaded builds (when 2082 ** SQLITE_MAX_WORKER_THREADS==0). The other values are only used 2083 ** when there exists one or more separate worker threads. 2084 */ 2085 #define INCRINIT_NORMAL 0 2086 #define INCRINIT_TASK 1 2087 #define INCRINIT_ROOT 2 2088 2089 /* 2090 ** Forward reference required as the vdbeIncrMergeInit() and 2091 ** vdbePmaReaderIncrInit() routines are called mutually recursively when 2092 ** building a merge tree. 2093 */ 2094 static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode); 2095 2096 /* 2097 ** Initialize the MergeEngine object passed as the second argument. Once this 2098 ** function returns, the first key of merged data may be read from the 2099 ** MergeEngine object in the usual fashion. 2100 ** 2101 ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge 2102 ** objects attached to the PmaReader objects that the merger reads from have 2103 ** already been populated, but that they have not yet populated aFile[0] and 2104 ** set the PmaReader objects up to read from it. In this case all that is 2105 ** required is to call vdbePmaReaderNext() on each PmaReader to point it at 2106 ** its first key. 2107 ** 2108 ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use 2109 ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data 2110 ** to pMerger. 2111 ** 2112 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. 2113 */ 2114 static int vdbeMergeEngineInit( 2115 SortSubtask *pTask, /* Thread that will run pMerger */ 2116 MergeEngine *pMerger, /* MergeEngine to initialize */ 2117 int eMode /* One of the INCRINIT_XXX constants */ 2118 ){ 2119 int rc = SQLITE_OK; /* Return code */ 2120 int i; /* For looping over PmaReader objects */ 2121 int nTree; /* Number of subtrees to merge */ 2122 2123 /* Failure to allocate the merge would have been detected prior to 2124 ** invoking this routine */ 2125 assert( pMerger!=0 ); 2126 2127 /* eMode is always INCRINIT_NORMAL in single-threaded mode */ 2128 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL ); 2129 2130 /* Verify that the MergeEngine is assigned to a single thread */ 2131 assert( pMerger->pTask==0 ); 2132 pMerger->pTask = pTask; 2133 2134 nTree = pMerger->nTree; 2135 for(i=0; i<nTree; i++){ 2136 if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){ 2137 /* PmaReaders should be normally initialized in order, as if they are 2138 ** reading from the same temp file this makes for more linear file IO. 2139 ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is 2140 ** in use it will block the vdbePmaReaderNext() call while it uses 2141 ** the main thread to fill its buffer. So calling PmaReaderNext() 2142 ** on this PmaReader before any of the multi-threaded PmaReaders takes 2143 ** better advantage of multi-processor hardware. */ 2144 rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]); 2145 }else{ 2146 rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL); 2147 } 2148 if( rc!=SQLITE_OK ) return rc; 2149 } 2150 2151 for(i=pMerger->nTree-1; i>0; i--){ 2152 vdbeMergeEngineCompare(pMerger, i); 2153 } 2154 return pTask->pUnpacked->errCode; 2155 } 2156 2157 /* 2158 ** The PmaReader passed as the first argument is guaranteed to be an 2159 ** incremental-reader (pReadr->pIncr!=0). This function serves to open 2160 ** and/or initialize the temp file related fields of the IncrMerge 2161 ** object at (pReadr->pIncr). 2162 ** 2163 ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders 2164 ** in the sub-tree headed by pReadr are also initialized. Data is then 2165 ** loaded into the buffers belonging to pReadr and it is set to point to 2166 ** the first key in its range. 2167 ** 2168 ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed 2169 ** to be a multi-threaded PmaReader and this function is being called in a 2170 ** background thread. In this case all PmaReaders in the sub-tree are 2171 ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to 2172 ** pReadr is populated. However, pReadr itself is not set up to point 2173 ** to its first key. A call to vdbePmaReaderNext() is still required to do 2174 ** that. 2175 ** 2176 ** The reason this function does not call vdbePmaReaderNext() immediately 2177 ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has 2178 ** to block on thread (pTask->thread) before accessing aFile[1]. But, since 2179 ** this entire function is being run by thread (pTask->thread), that will 2180 ** lead to the current background thread attempting to join itself. 2181 ** 2182 ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed 2183 ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all 2184 ** child-trees have already been initialized using IncrInit(INCRINIT_TASK). 2185 ** In this case vdbePmaReaderNext() is called on all child PmaReaders and 2186 ** the current PmaReader set to point to the first key in its range. 2187 ** 2188 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. 2189 */ 2190 static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){ 2191 int rc = SQLITE_OK; 2192 IncrMerger *pIncr = pReadr->pIncr; 2193 SortSubtask *pTask = pIncr->pTask; 2194 sqlite3 *db = pTask->pSorter->db; 2195 2196 /* eMode is always INCRINIT_NORMAL in single-threaded mode */ 2197 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL ); 2198 2199 rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode); 2200 2201 /* Set up the required files for pIncr. A multi-theaded IncrMerge object 2202 ** requires two temp files to itself, whereas a single-threaded object 2203 ** only requires a region of pTask->file2. */ 2204 if( rc==SQLITE_OK ){ 2205 int mxSz = pIncr->mxSz; 2206 #if SQLITE_MAX_WORKER_THREADS>0 2207 if( pIncr->bUseThread ){ 2208 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd); 2209 if( rc==SQLITE_OK ){ 2210 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd); 2211 } 2212 }else 2213 #endif 2214 /*if( !pIncr->bUseThread )*/{ 2215 if( pTask->file2.pFd==0 ){ 2216 assert( pTask->file2.iEof>0 ); 2217 rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd); 2218 pTask->file2.iEof = 0; 2219 } 2220 if( rc==SQLITE_OK ){ 2221 pIncr->aFile[1].pFd = pTask->file2.pFd; 2222 pIncr->iStartOff = pTask->file2.iEof; 2223 pTask->file2.iEof += mxSz; 2224 } 2225 } 2226 } 2227 2228 #if SQLITE_MAX_WORKER_THREADS>0 2229 if( rc==SQLITE_OK && pIncr->bUseThread ){ 2230 /* Use the current thread to populate aFile[1], even though this 2231 ** PmaReader is multi-threaded. If this is an INCRINIT_TASK object, 2232 ** then this function is already running in background thread 2233 ** pIncr->pTask->thread. 2234 ** 2235 ** If this is the INCRINIT_ROOT object, then it is running in the 2236 ** main VDBE thread. But that is Ok, as that thread cannot return 2237 ** control to the VDBE or proceed with anything useful until the 2238 ** first results are ready from this merger object anyway. 2239 */ 2240 assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK ); 2241 rc = vdbeIncrPopulate(pIncr); 2242 } 2243 #endif 2244 2245 if( rc==SQLITE_OK && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) ){ 2246 rc = vdbePmaReaderNext(pReadr); 2247 } 2248 2249 return rc; 2250 } 2251 2252 #if SQLITE_MAX_WORKER_THREADS>0 2253 /* 2254 ** The main routine for vdbePmaReaderIncrMergeInit() operations run in 2255 ** background threads. 2256 */ 2257 static void *vdbePmaReaderBgIncrInit(void *pCtx){ 2258 PmaReader *pReader = (PmaReader*)pCtx; 2259 void *pRet = SQLITE_INT_TO_PTR( 2260 vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK) 2261 ); 2262 pReader->pIncr->pTask->bDone = 1; 2263 return pRet; 2264 } 2265 #endif 2266 2267 /* 2268 ** If the PmaReader passed as the first argument is not an incremental-reader 2269 ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it invokes 2270 ** the vdbePmaReaderIncrMergeInit() function with the parameters passed to 2271 ** this routine to initialize the incremental merge. 2272 ** 2273 ** If the IncrMerger object is multi-threaded (IncrMerger.bUseThread==1), 2274 ** then a background thread is launched to call vdbePmaReaderIncrMergeInit(). 2275 ** Or, if the IncrMerger is single threaded, the same function is called 2276 ** using the current thread. 2277 */ 2278 static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){ 2279 IncrMerger *pIncr = pReadr->pIncr; /* Incremental merger */ 2280 int rc = SQLITE_OK; /* Return code */ 2281 if( pIncr ){ 2282 #if SQLITE_MAX_WORKER_THREADS>0 2283 assert( pIncr->bUseThread==0 || eMode==INCRINIT_TASK ); 2284 if( pIncr->bUseThread ){ 2285 void *pCtx = (void*)pReadr; 2286 rc = vdbeSorterCreateThread(pIncr->pTask, vdbePmaReaderBgIncrInit, pCtx); 2287 }else 2288 #endif 2289 { 2290 rc = vdbePmaReaderIncrMergeInit(pReadr, eMode); 2291 } 2292 } 2293 return rc; 2294 } 2295 2296 /* 2297 ** Allocate a new MergeEngine object to merge the contents of nPMA level-0 2298 ** PMAs from pTask->file. If no error occurs, set *ppOut to point to 2299 ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut 2300 ** to NULL and return an SQLite error code. 2301 ** 2302 ** When this function is called, *piOffset is set to the offset of the 2303 ** first PMA to read from pTask->file. Assuming no error occurs, it is 2304 ** set to the offset immediately following the last byte of the last 2305 ** PMA before returning. If an error does occur, then the final value of 2306 ** *piOffset is undefined. 2307 */ 2308 static int vdbeMergeEngineLevel0( 2309 SortSubtask *pTask, /* Sorter task to read from */ 2310 int nPMA, /* Number of PMAs to read */ 2311 i64 *piOffset, /* IN/OUT: Readr offset in pTask->file */ 2312 MergeEngine **ppOut /* OUT: New merge-engine */ 2313 ){ 2314 MergeEngine *pNew; /* Merge engine to return */ 2315 i64 iOff = *piOffset; 2316 int i; 2317 int rc = SQLITE_OK; 2318 2319 *ppOut = pNew = vdbeMergeEngineNew(nPMA); 2320 if( pNew==0 ) rc = SQLITE_NOMEM_BKPT; 2321 2322 for(i=0; i<nPMA && rc==SQLITE_OK; i++){ 2323 i64 nDummy = 0; 2324 PmaReader *pReadr = &pNew->aReadr[i]; 2325 rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy); 2326 iOff = pReadr->iEof; 2327 } 2328 2329 if( rc!=SQLITE_OK ){ 2330 vdbeMergeEngineFree(pNew); 2331 *ppOut = 0; 2332 } 2333 *piOffset = iOff; 2334 return rc; 2335 } 2336 2337 /* 2338 ** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of 2339 ** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes. 2340 ** 2341 ** i.e. 2342 ** 2343 ** nPMA<=16 -> TreeDepth() == 0 2344 ** nPMA<=256 -> TreeDepth() == 1 2345 ** nPMA<=65536 -> TreeDepth() == 2 2346 */ 2347 static int vdbeSorterTreeDepth(int nPMA){ 2348 int nDepth = 0; 2349 i64 nDiv = SORTER_MAX_MERGE_COUNT; 2350 while( nDiv < (i64)nPMA ){ 2351 nDiv = nDiv * SORTER_MAX_MERGE_COUNT; 2352 nDepth++; 2353 } 2354 return nDepth; 2355 } 2356 2357 /* 2358 ** pRoot is the root of an incremental merge-tree with depth nDepth (according 2359 ** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the 2360 ** tree, counting from zero. This function adds pLeaf to the tree. 2361 ** 2362 ** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error 2363 ** code is returned and pLeaf is freed. 2364 */ 2365 static int vdbeSorterAddToTree( 2366 SortSubtask *pTask, /* Task context */ 2367 int nDepth, /* Depth of tree according to TreeDepth() */ 2368 int iSeq, /* Sequence number of leaf within tree */ 2369 MergeEngine *pRoot, /* Root of tree */ 2370 MergeEngine *pLeaf /* Leaf to add to tree */ 2371 ){ 2372 int rc = SQLITE_OK; 2373 int nDiv = 1; 2374 int i; 2375 MergeEngine *p = pRoot; 2376 IncrMerger *pIncr; 2377 2378 rc = vdbeIncrMergerNew(pTask, pLeaf, &pIncr); 2379 2380 for(i=1; i<nDepth; i++){ 2381 nDiv = nDiv * SORTER_MAX_MERGE_COUNT; 2382 } 2383 2384 for(i=1; i<nDepth && rc==SQLITE_OK; i++){ 2385 int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT; 2386 PmaReader *pReadr = &p->aReadr[iIter]; 2387 2388 if( pReadr->pIncr==0 ){ 2389 MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); 2390 if( pNew==0 ){ 2391 rc = SQLITE_NOMEM_BKPT; 2392 }else{ 2393 rc = vdbeIncrMergerNew(pTask, pNew, &pReadr->pIncr); 2394 } 2395 } 2396 if( rc==SQLITE_OK ){ 2397 p = pReadr->pIncr->pMerger; 2398 nDiv = nDiv / SORTER_MAX_MERGE_COUNT; 2399 } 2400 } 2401 2402 if( rc==SQLITE_OK ){ 2403 p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr; 2404 }else{ 2405 vdbeIncrFree(pIncr); 2406 } 2407 return rc; 2408 } 2409 2410 /* 2411 ** This function is called as part of a SorterRewind() operation on a sorter 2412 ** that has already written two or more level-0 PMAs to one or more temp 2413 ** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that 2414 ** can be used to incrementally merge all PMAs on disk. 2415 ** 2416 ** If successful, SQLITE_OK is returned and *ppOut set to point to the 2417 ** MergeEngine object at the root of the tree before returning. Or, if an 2418 ** error occurs, an SQLite error code is returned and the final value 2419 ** of *ppOut is undefined. 2420 */ 2421 static int vdbeSorterMergeTreeBuild( 2422 VdbeSorter *pSorter, /* The VDBE cursor that implements the sort */ 2423 MergeEngine **ppOut /* Write the MergeEngine here */ 2424 ){ 2425 MergeEngine *pMain = 0; 2426 int rc = SQLITE_OK; 2427 int iTask; 2428 2429 #if SQLITE_MAX_WORKER_THREADS>0 2430 /* If the sorter uses more than one task, then create the top-level 2431 ** MergeEngine here. This MergeEngine will read data from exactly 2432 ** one PmaReader per sub-task. */ 2433 assert( pSorter->bUseThreads || pSorter->nTask==1 ); 2434 if( pSorter->nTask>1 ){ 2435 pMain = vdbeMergeEngineNew(pSorter->nTask); 2436 if( pMain==0 ) rc = SQLITE_NOMEM_BKPT; 2437 } 2438 #endif 2439 2440 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){ 2441 SortSubtask *pTask = &pSorter->aTask[iTask]; 2442 assert( pTask->nPMA>0 || SQLITE_MAX_WORKER_THREADS>0 ); 2443 if( SQLITE_MAX_WORKER_THREADS==0 || pTask->nPMA ){ 2444 MergeEngine *pRoot = 0; /* Root node of tree for this task */ 2445 int nDepth = vdbeSorterTreeDepth(pTask->nPMA); 2446 i64 iReadOff = 0; 2447 2448 if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){ 2449 rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot); 2450 }else{ 2451 int i; 2452 int iSeq = 0; 2453 pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); 2454 if( pRoot==0 ) rc = SQLITE_NOMEM_BKPT; 2455 for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){ 2456 MergeEngine *pMerger = 0; /* New level-0 PMA merger */ 2457 int nReader; /* Number of level-0 PMAs to merge */ 2458 2459 nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT); 2460 rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger); 2461 if( rc==SQLITE_OK ){ 2462 rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger); 2463 } 2464 } 2465 } 2466 2467 if( rc==SQLITE_OK ){ 2468 #if SQLITE_MAX_WORKER_THREADS>0 2469 if( pMain!=0 ){ 2470 rc = vdbeIncrMergerNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr); 2471 }else 2472 #endif 2473 { 2474 assert( pMain==0 ); 2475 pMain = pRoot; 2476 } 2477 }else{ 2478 vdbeMergeEngineFree(pRoot); 2479 } 2480 } 2481 } 2482 2483 if( rc!=SQLITE_OK ){ 2484 vdbeMergeEngineFree(pMain); 2485 pMain = 0; 2486 } 2487 *ppOut = pMain; 2488 return rc; 2489 } 2490 2491 /* 2492 ** This function is called as part of an sqlite3VdbeSorterRewind() operation 2493 ** on a sorter that has written two or more PMAs to temporary files. It sets 2494 ** up either VdbeSorter.pMerger (for single threaded sorters) or pReader 2495 ** (for multi-threaded sorters) so that it can be used to iterate through 2496 ** all records stored in the sorter. 2497 ** 2498 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. 2499 */ 2500 static int vdbeSorterSetupMerge(VdbeSorter *pSorter){ 2501 int rc; /* Return code */ 2502 SortSubtask *pTask0 = &pSorter->aTask[0]; 2503 MergeEngine *pMain = 0; 2504 #if SQLITE_MAX_WORKER_THREADS 2505 sqlite3 *db = pTask0->pSorter->db; 2506 int i; 2507 SorterCompare xCompare = vdbeSorterGetCompare(pSorter); 2508 for(i=0; i<pSorter->nTask; i++){ 2509 pSorter->aTask[i].xCompare = xCompare; 2510 } 2511 #endif 2512 2513 rc = vdbeSorterMergeTreeBuild(pSorter, &pMain); 2514 if( rc==SQLITE_OK ){ 2515 #if SQLITE_MAX_WORKER_THREADS 2516 assert( pSorter->bUseThreads==0 || pSorter->nTask>1 ); 2517 if( pSorter->bUseThreads ){ 2518 int iTask; 2519 PmaReader *pReadr = 0; 2520 SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1]; 2521 rc = vdbeSortAllocUnpacked(pLast); 2522 if( rc==SQLITE_OK ){ 2523 pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader)); 2524 pSorter->pReader = pReadr; 2525 if( pReadr==0 ) rc = SQLITE_NOMEM_BKPT; 2526 } 2527 if( rc==SQLITE_OK ){ 2528 rc = vdbeIncrMergerNew(pLast, pMain, &pReadr->pIncr); 2529 if( rc==SQLITE_OK ){ 2530 vdbeIncrMergerSetThreads(pReadr->pIncr); 2531 for(iTask=0; iTask<(pSorter->nTask-1); iTask++){ 2532 IncrMerger *pIncr; 2533 if( (pIncr = pMain->aReadr[iTask].pIncr) ){ 2534 vdbeIncrMergerSetThreads(pIncr); 2535 assert( pIncr->pTask!=pLast ); 2536 } 2537 } 2538 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){ 2539 /* Check that: 2540 ** 2541 ** a) The incremental merge object is configured to use the 2542 ** right task, and 2543 ** b) If it is using task (nTask-1), it is configured to run 2544 ** in single-threaded mode. This is important, as the 2545 ** root merge (INCRINIT_ROOT) will be using the same task 2546 ** object. 2547 */ 2548 PmaReader *p = &pMain->aReadr[iTask]; 2549 assert( p->pIncr==0 || ( 2550 (p->pIncr->pTask==&pSorter->aTask[iTask]) /* a */ 2551 && (iTask!=pSorter->nTask-1 || p->pIncr->bUseThread==0) /* b */ 2552 )); 2553 rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK); 2554 } 2555 } 2556 pMain = 0; 2557 } 2558 if( rc==SQLITE_OK ){ 2559 rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT); 2560 } 2561 }else 2562 #endif 2563 { 2564 rc = vdbeMergeEngineInit(pTask0, pMain, INCRINIT_NORMAL); 2565 pSorter->pMerger = pMain; 2566 pMain = 0; 2567 } 2568 } 2569 2570 if( rc!=SQLITE_OK ){ 2571 vdbeMergeEngineFree(pMain); 2572 } 2573 return rc; 2574 } 2575 2576 2577 /* 2578 ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite, 2579 ** this function is called to prepare for iterating through the records 2580 ** in sorted order. 2581 */ 2582 int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){ 2583 VdbeSorter *pSorter; 2584 int rc = SQLITE_OK; /* Return code */ 2585 2586 assert( pCsr->eCurType==CURTYPE_SORTER ); 2587 pSorter = pCsr->uc.pSorter; 2588 assert( pSorter ); 2589 2590 /* If no data has been written to disk, then do not do so now. Instead, 2591 ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly 2592 ** from the in-memory list. */ 2593 if( pSorter->bUsePMA==0 ){ 2594 if( pSorter->list.pList ){ 2595 *pbEof = 0; 2596 rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list); 2597 }else{ 2598 *pbEof = 1; 2599 } 2600 return rc; 2601 } 2602 2603 /* Write the current in-memory list to a PMA. When the VdbeSorterWrite() 2604 ** function flushes the contents of memory to disk, it immediately always 2605 ** creates a new list consisting of a single key immediately afterwards. 2606 ** So the list is never empty at this point. */ 2607 assert( pSorter->list.pList ); 2608 rc = vdbeSorterFlushPMA(pSorter); 2609 2610 /* Join all threads */ 2611 rc = vdbeSorterJoinAll(pSorter, rc); 2612 2613 vdbeSorterRewindDebug("rewind"); 2614 2615 /* Assuming no errors have occurred, set up a merger structure to 2616 ** incrementally read and merge all remaining PMAs. */ 2617 assert( pSorter->pReader==0 ); 2618 if( rc==SQLITE_OK ){ 2619 rc = vdbeSorterSetupMerge(pSorter); 2620 *pbEof = 0; 2621 } 2622 2623 vdbeSorterRewindDebug("rewinddone"); 2624 return rc; 2625 } 2626 2627 /* 2628 ** Advance to the next element in the sorter. Return value: 2629 ** 2630 ** SQLITE_OK success 2631 ** SQLITE_DONE end of data 2632 ** otherwise some kind of error. 2633 */ 2634 int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr){ 2635 VdbeSorter *pSorter; 2636 int rc; /* Return code */ 2637 2638 assert( pCsr->eCurType==CURTYPE_SORTER ); 2639 pSorter = pCsr->uc.pSorter; 2640 assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) ); 2641 if( pSorter->bUsePMA ){ 2642 assert( pSorter->pReader==0 || pSorter->pMerger==0 ); 2643 assert( pSorter->bUseThreads==0 || pSorter->pReader ); 2644 assert( pSorter->bUseThreads==1 || pSorter->pMerger ); 2645 #if SQLITE_MAX_WORKER_THREADS>0 2646 if( pSorter->bUseThreads ){ 2647 rc = vdbePmaReaderNext(pSorter->pReader); 2648 if( rc==SQLITE_OK && pSorter->pReader->pFd==0 ) rc = SQLITE_DONE; 2649 }else 2650 #endif 2651 /*if( !pSorter->bUseThreads )*/ { 2652 int res = 0; 2653 assert( pSorter->pMerger!=0 ); 2654 assert( pSorter->pMerger->pTask==(&pSorter->aTask[0]) ); 2655 rc = vdbeMergeEngineStep(pSorter->pMerger, &res); 2656 if( rc==SQLITE_OK && res ) rc = SQLITE_DONE; 2657 } 2658 }else{ 2659 SorterRecord *pFree = pSorter->list.pList; 2660 pSorter->list.pList = pFree->u.pNext; 2661 pFree->u.pNext = 0; 2662 if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree); 2663 rc = pSorter->list.pList ? SQLITE_OK : SQLITE_DONE; 2664 } 2665 return rc; 2666 } 2667 2668 /* 2669 ** Return a pointer to a buffer owned by the sorter that contains the 2670 ** current key. 2671 */ 2672 static void *vdbeSorterRowkey( 2673 const VdbeSorter *pSorter, /* Sorter object */ 2674 int *pnKey /* OUT: Size of current key in bytes */ 2675 ){ 2676 void *pKey; 2677 if( pSorter->bUsePMA ){ 2678 PmaReader *pReader; 2679 #if SQLITE_MAX_WORKER_THREADS>0 2680 if( pSorter->bUseThreads ){ 2681 pReader = pSorter->pReader; 2682 }else 2683 #endif 2684 /*if( !pSorter->bUseThreads )*/{ 2685 pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]]; 2686 } 2687 *pnKey = pReader->nKey; 2688 pKey = pReader->aKey; 2689 }else{ 2690 *pnKey = pSorter->list.pList->nVal; 2691 pKey = SRVAL(pSorter->list.pList); 2692 } 2693 return pKey; 2694 } 2695 2696 /* 2697 ** Copy the current sorter key into the memory cell pOut. 2698 */ 2699 int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){ 2700 VdbeSorter *pSorter; 2701 void *pKey; int nKey; /* Sorter key to copy into pOut */ 2702 2703 assert( pCsr->eCurType==CURTYPE_SORTER ); 2704 pSorter = pCsr->uc.pSorter; 2705 pKey = vdbeSorterRowkey(pSorter, &nKey); 2706 if( sqlite3VdbeMemClearAndResize(pOut, nKey) ){ 2707 return SQLITE_NOMEM_BKPT; 2708 } 2709 pOut->n = nKey; 2710 MemSetTypeFlag(pOut, MEM_Blob); 2711 memcpy(pOut->z, pKey, nKey); 2712 2713 return SQLITE_OK; 2714 } 2715 2716 /* 2717 ** Compare the key in memory cell pVal with the key that the sorter cursor 2718 ** passed as the first argument currently points to. For the purposes of 2719 ** the comparison, ignore the rowid field at the end of each record. 2720 ** 2721 ** If the sorter cursor key contains any NULL values, consider it to be 2722 ** less than pVal. Even if pVal also contains NULL values. 2723 ** 2724 ** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM). 2725 ** Otherwise, set *pRes to a negative, zero or positive value if the 2726 ** key in pVal is smaller than, equal to or larger than the current sorter 2727 ** key. 2728 ** 2729 ** This routine forms the core of the OP_SorterCompare opcode, which in 2730 ** turn is used to verify uniqueness when constructing a UNIQUE INDEX. 2731 */ 2732 int sqlite3VdbeSorterCompare( 2733 const VdbeCursor *pCsr, /* Sorter cursor */ 2734 Mem *pVal, /* Value to compare to current sorter key */ 2735 int nKeyCol, /* Compare this many columns */ 2736 int *pRes /* OUT: Result of comparison */ 2737 ){ 2738 VdbeSorter *pSorter; 2739 UnpackedRecord *r2; 2740 KeyInfo *pKeyInfo; 2741 int i; 2742 void *pKey; int nKey; /* Sorter key to compare pVal with */ 2743 2744 assert( pCsr->eCurType==CURTYPE_SORTER ); 2745 pSorter = pCsr->uc.pSorter; 2746 r2 = pSorter->pUnpacked; 2747 pKeyInfo = pCsr->pKeyInfo; 2748 if( r2==0 ){ 2749 r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo); 2750 if( r2==0 ) return SQLITE_NOMEM_BKPT; 2751 r2->nField = nKeyCol; 2752 } 2753 assert( r2->nField==nKeyCol ); 2754 2755 pKey = vdbeSorterRowkey(pSorter, &nKey); 2756 sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2); 2757 for(i=0; i<nKeyCol; i++){ 2758 if( r2->aMem[i].flags & MEM_Null ){ 2759 *pRes = -1; 2760 return SQLITE_OK; 2761 } 2762 } 2763 2764 *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2); 2765 return SQLITE_OK; 2766 } 2767