xref: /sqlite-3.40.0/src/vdbesort.c (revision a959bf53)
1 /*
2 ** 2011-07-09
3 **
4 ** The author disclaims copyright to this source code.  In place of
5 ** a legal notice, here is a blessing:
6 **
7 **    May you do good and not evil.
8 **    May you find forgiveness for yourself and forgive others.
9 **    May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 ** This file contains code for the VdbeSorter object, used in concert with
13 ** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
14 ** or by SELECT statements with ORDER BY clauses that cannot be satisfied
15 ** using indexes and without LIMIT clauses.
16 **
17 ** The VdbeSorter object implements a multi-threaded external merge sort
18 ** algorithm that is efficient even if the number of elements being sorted
19 ** exceeds the available memory.
20 **
21 ** Here is the (internal, non-API) interface between this module and the
22 ** rest of the SQLite system:
23 **
24 **    sqlite3VdbeSorterInit()       Create a new VdbeSorter object.
25 **
26 **    sqlite3VdbeSorterWrite()      Add a single new row to the VdbeSorter
27 **                                  object.  The row is a binary blob in the
28 **                                  OP_MakeRecord format that contains both
29 **                                  the ORDER BY key columns and result columns
30 **                                  in the case of a SELECT w/ ORDER BY, or
31 **                                  the complete record for an index entry
32 **                                  in the case of a CREATE INDEX.
33 **
34 **    sqlite3VdbeSorterRewind()     Sort all content previously added.
35 **                                  Position the read cursor on the
36 **                                  first sorted element.
37 **
38 **    sqlite3VdbeSorterNext()       Advance the read cursor to the next sorted
39 **                                  element.
40 **
41 **    sqlite3VdbeSorterRowkey()     Return the complete binary blob for the
42 **                                  row currently under the read cursor.
43 **
44 **    sqlite3VdbeSorterCompare()    Compare the binary blob for the row
45 **                                  currently under the read cursor against
46 **                                  another binary blob X and report if
47 **                                  X is strictly less than the read cursor.
48 **                                  Used to enforce uniqueness in a
49 **                                  CREATE UNIQUE INDEX statement.
50 **
51 **    sqlite3VdbeSorterClose()      Close the VdbeSorter object and reclaim
52 **                                  all resources.
53 **
54 **    sqlite3VdbeSorterReset()      Refurbish the VdbeSorter for reuse.  This
55 **                                  is like Close() followed by Init() only
56 **                                  much faster.
57 **
58 ** The interfaces above must be called in a particular order.  Write() can
59 ** only occur in between Init()/Reset() and Rewind().  Next(), Rowkey(), and
60 ** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
61 **
62 **   Init()
63 **   for each record: Write()
64 **   Rewind()
65 **     Rowkey()/Compare()
66 **   Next()
67 **   Close()
68 **
69 ** Algorithm:
70 **
71 ** Records passed to the sorter via calls to Write() are initially held
72 ** unsorted in main memory. Assuming the amount of memory used never exceeds
73 ** a threshold, when Rewind() is called the set of records is sorted using
74 ** an in-memory merge sort. In this case, no temporary files are required
75 ** and subsequent calls to Rowkey(), Next() and Compare() read records
76 ** directly from main memory.
77 **
78 ** If the amount of space used to store records in main memory exceeds the
79 ** threshold, then the set of records currently in memory are sorted and
80 ** written to a temporary file in "Packed Memory Array" (PMA) format.
81 ** A PMA created at this point is known as a "level-0 PMA". Higher levels
82 ** of PMAs may be created by merging existing PMAs together - for example
83 ** merging two or more level-0 PMAs together creates a level-1 PMA.
84 **
85 ** The threshold for the amount of main memory to use before flushing
86 ** records to a PMA is roughly the same as the limit configured for the
87 ** page-cache of the main database. Specifically, the threshold is set to
88 ** the value returned by "PRAGMA main.page_size" multipled by
89 ** that returned by "PRAGMA main.cache_size", in bytes.
90 **
91 ** If the sorter is running in single-threaded mode, then all PMAs generated
92 ** are appended to a single temporary file. Or, if the sorter is running in
93 ** multi-threaded mode then up to (N+1) temporary files may be opened, where
94 ** N is the configured number of worker threads. In this case, instead of
95 ** sorting the records and writing the PMA to a temporary file itself, the
96 ** calling thread usually launches a worker thread to do so. Except, if
97 ** there are already N worker threads running, the main thread does the work
98 ** itself.
99 **
100 ** The sorter is running in multi-threaded mode if (a) the library was built
101 ** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
102 ** than zero, and (b) worker threads have been enabled at runtime by calling
103 ** "PRAGMA threads=N" with some value of N greater than 0.
104 **
105 ** When Rewind() is called, any data remaining in memory is flushed to a
106 ** final PMA. So at this point the data is stored in some number of sorted
107 ** PMAs within temporary files on disk.
108 **
109 ** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
110 ** sorter is running in single-threaded mode, then these PMAs are merged
111 ** incrementally as keys are retreived from the sorter by the VDBE.  The
112 ** MergeEngine object, described in further detail below, performs this
113 ** merge.
114 **
115 ** Or, if running in multi-threaded mode, then a background thread is
116 ** launched to merge the existing PMAs. Once the background thread has
117 ** merged T bytes of data into a single sorted PMA, the main thread
118 ** begins reading keys from that PMA while the background thread proceeds
119 ** with merging the next T bytes of data. And so on.
120 **
121 ** Parameter T is set to half the value of the memory threshold used
122 ** by Write() above to determine when to create a new PMA.
123 **
124 ** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when
125 ** Rewind() is called, then a hierarchy of incremental-merges is used.
126 ** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on
127 ** disk are merged together. Then T bytes of data from the second set, and
128 ** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
129 ** PMAs at a time. This done is to improve locality.
130 **
131 ** If running in multi-threaded mode and there are more than
132 ** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
133 ** than one background thread may be created. Specifically, there may be
134 ** one background thread for each temporary file on disk, and one background
135 ** thread to merge the output of each of the others to a single PMA for
136 ** the main thread to read from.
137 */
138 #include "sqliteInt.h"
139 #include "vdbeInt.h"
140 
141 /*
142 ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
143 ** messages to stderr that may be helpful in understanding the performance
144 ** characteristics of the sorter in multi-threaded mode.
145 */
146 #if 0
147 # define SQLITE_DEBUG_SORTER_THREADS 1
148 #endif
149 
150 /*
151 ** Hard-coded maximum amount of data to accumulate in memory before flushing
152 ** to a level 0 PMA. The purpose of this limit is to prevent various integer
153 ** overflows. 512MiB.
154 */
155 #define SQLITE_MAX_PMASZ    (1<<29)
156 
157 /*
158 ** Private objects used by the sorter
159 */
160 typedef struct MergeEngine MergeEngine;     /* Merge PMAs together */
161 typedef struct PmaReader PmaReader;         /* Incrementally read one PMA */
162 typedef struct PmaWriter PmaWriter;         /* Incrementally write one PMA */
163 typedef struct SorterRecord SorterRecord;   /* A record being sorted */
164 typedef struct SortSubtask SortSubtask;     /* A sub-task in the sort process */
165 typedef struct SorterFile SorterFile;       /* Temporary file object wrapper */
166 typedef struct SorterList SorterList;       /* In-memory list of records */
167 typedef struct IncrMerger IncrMerger;       /* Read & merge multiple PMAs */
168 
169 /*
170 ** A container for a temp file handle and the current amount of data
171 ** stored in the file.
172 */
173 struct SorterFile {
174   sqlite3_file *pFd;              /* File handle */
175   i64 iEof;                       /* Bytes of data stored in pFd */
176 };
177 
178 /*
179 ** An in-memory list of objects to be sorted.
180 **
181 ** If aMemory==0 then each object is allocated separately and the objects
182 ** are connected using SorterRecord.u.pNext.  If aMemory!=0 then all objects
183 ** are stored in the aMemory[] bulk memory, one right after the other, and
184 ** are connected using SorterRecord.u.iNext.
185 */
186 struct SorterList {
187   SorterRecord *pList;            /* Linked list of records */
188   u8 *aMemory;                    /* If non-NULL, bulk memory to hold pList */
189   int szPMA;                      /* Size of pList as PMA in bytes */
190 };
191 
192 /*
193 ** The MergeEngine object is used to combine two or more smaller PMAs into
194 ** one big PMA using a merge operation.  Separate PMAs all need to be
195 ** combined into one big PMA in order to be able to step through the sorted
196 ** records in order.
197 **
198 ** The aReadr[] array contains a PmaReader object for each of the PMAs being
199 ** merged.  An aReadr[] object either points to a valid key or else is at EOF.
200 ** ("EOF" means "End Of File".  When aReadr[] is at EOF there is no more data.)
201 ** For the purposes of the paragraphs below, we assume that the array is
202 ** actually N elements in size, where N is the smallest power of 2 greater
203 ** to or equal to the number of PMAs being merged. The extra aReadr[] elements
204 ** are treated as if they are empty (always at EOF).
205 **
206 ** The aTree[] array is also N elements in size. The value of N is stored in
207 ** the MergeEngine.nTree variable.
208 **
209 ** The final (N/2) elements of aTree[] contain the results of comparing
210 ** pairs of PMA keys together. Element i contains the result of
211 ** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the
212 ** aTree element is set to the index of it.
213 **
214 ** For the purposes of this comparison, EOF is considered greater than any
215 ** other key value. If the keys are equal (only possible with two EOF
216 ** values), it doesn't matter which index is stored.
217 **
218 ** The (N/4) elements of aTree[] that precede the final (N/2) described
219 ** above contains the index of the smallest of each block of 4 PmaReaders
220 ** And so on. So that aTree[1] contains the index of the PmaReader that
221 ** currently points to the smallest key value. aTree[0] is unused.
222 **
223 ** Example:
224 **
225 **     aReadr[0] -> Banana
226 **     aReadr[1] -> Feijoa
227 **     aReadr[2] -> Elderberry
228 **     aReadr[3] -> Currant
229 **     aReadr[4] -> Grapefruit
230 **     aReadr[5] -> Apple
231 **     aReadr[6] -> Durian
232 **     aReadr[7] -> EOF
233 **
234 **     aTree[] = { X, 5   0, 5    0, 3, 5, 6 }
235 **
236 ** The current element is "Apple" (the value of the key indicated by
237 ** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will
238 ** be advanced to the next key in its segment. Say the next key is
239 ** "Eggplant":
240 **
241 **     aReadr[5] -> Eggplant
242 **
243 ** The contents of aTree[] are updated first by comparing the new PmaReader
244 ** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader
245 ** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree.
246 ** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader
247 ** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian),
248 ** so the value written into element 1 of the array is 0. As follows:
249 **
250 **     aTree[] = { X, 0   0, 6    0, 3, 5, 6 }
251 **
252 ** In other words, each time we advance to the next sorter element, log2(N)
253 ** key comparison operations are required, where N is the number of segments
254 ** being merged (rounded up to the next power of 2).
255 */
256 struct MergeEngine {
257   int nTree;                 /* Used size of aTree/aReadr (power of 2) */
258   SortSubtask *pTask;        /* Used by this thread only */
259   int *aTree;                /* Current state of incremental merge */
260   PmaReader *aReadr;         /* Array of PmaReaders to merge data from */
261 };
262 
263 /*
264 ** This object represents a single thread of control in a sort operation.
265 ** Exactly VdbeSorter.nTask instances of this object are allocated
266 ** as part of each VdbeSorter object. Instances are never allocated any
267 ** other way. VdbeSorter.nTask is set to the number of worker threads allowed
268 ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).  Thus for
269 ** single-threaded operation, there is exactly one instance of this object
270 ** and for multi-threaded operation there are two or more instances.
271 **
272 ** Essentially, this structure contains all those fields of the VdbeSorter
273 ** structure for which each thread requires a separate instance. For example,
274 ** each thread requries its own UnpackedRecord object to unpack records in
275 ** as part of comparison operations.
276 **
277 ** Before a background thread is launched, variable bDone is set to 0. Then,
278 ** right before it exits, the thread itself sets bDone to 1. This is used for
279 ** two purposes:
280 **
281 **   1. When flushing the contents of memory to a level-0 PMA on disk, to
282 **      attempt to select a SortSubtask for which there is not already an
283 **      active background thread (since doing so causes the main thread
284 **      to block until it finishes).
285 **
286 **   2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
287 **      to sqlite3ThreadJoin() is likely to block. Cases that are likely to
288 **      block provoke debugging output.
289 **
290 ** In both cases, the effects of the main thread seeing (bDone==0) even
291 ** after the thread has finished are not dire. So we don't worry about
292 ** memory barriers and such here.
293 */
294 typedef int (*SorterCompare)(SortSubtask*,int*,const void*,int,const void*,int);
295 struct SortSubtask {
296   SQLiteThread *pThread;          /* Background thread, if any */
297   int bDone;                      /* Set if thread is finished but not joined */
298   VdbeSorter *pSorter;            /* Sorter that owns this sub-task */
299   UnpackedRecord *pUnpacked;      /* Space to unpack a record */
300   SorterList list;                /* List for thread to write to a PMA */
301   int nPMA;                       /* Number of PMAs currently in file */
302   SorterCompare xCompare;         /* Compare function to use */
303   SorterFile file;                /* Temp file for level-0 PMAs */
304   SorterFile file2;               /* Space for other PMAs */
305 };
306 
307 
308 /*
309 ** Main sorter structure. A single instance of this is allocated for each
310 ** sorter cursor created by the VDBE.
311 **
312 ** mxKeysize:
313 **   As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
314 **   this variable is updated so as to be set to the size on disk of the
315 **   largest record in the sorter.
316 */
317 struct VdbeSorter {
318   int mnPmaSize;                  /* Minimum PMA size, in bytes */
319   int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
320   int mxKeysize;                  /* Largest serialized key seen so far */
321   int pgsz;                       /* Main database page size */
322   PmaReader *pReader;             /* Readr data from here after Rewind() */
323   MergeEngine *pMerger;           /* Or here, if bUseThreads==0 */
324   sqlite3 *db;                    /* Database connection */
325   KeyInfo *pKeyInfo;              /* How to compare records */
326   UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
327   SorterList list;                /* List of in-memory records */
328   int iMemory;                    /* Offset of free space in list.aMemory */
329   int nMemory;                    /* Size of list.aMemory allocation in bytes */
330   u8 bUsePMA;                     /* True if one or more PMAs created */
331   u8 bUseThreads;                 /* True to use background threads */
332   u8 iPrev;                       /* Previous thread used to flush PMA */
333   u8 nTask;                       /* Size of aTask[] array */
334   u8 typeMask;
335   SortSubtask aTask[1];           /* One or more subtasks */
336 };
337 
338 #define SORTER_TYPE_INTEGER 0x01
339 #define SORTER_TYPE_TEXT    0x02
340 
341 /*
342 ** An instance of the following object is used to read records out of a
343 ** PMA, in sorted order.  The next key to be read is cached in nKey/aKey.
344 ** aKey might point into aMap or into aBuffer.  If neither of those locations
345 ** contain a contiguous representation of the key, then aAlloc is allocated
346 ** and the key is copied into aAlloc and aKey is made to poitn to aAlloc.
347 **
348 ** pFd==0 at EOF.
349 */
350 struct PmaReader {
351   i64 iReadOff;               /* Current read offset */
352   i64 iEof;                   /* 1 byte past EOF for this PmaReader */
353   int nAlloc;                 /* Bytes of space at aAlloc */
354   int nKey;                   /* Number of bytes in key */
355   sqlite3_file *pFd;          /* File handle we are reading from */
356   u8 *aAlloc;                 /* Space for aKey if aBuffer and pMap wont work */
357   u8 *aKey;                   /* Pointer to current key */
358   u8 *aBuffer;                /* Current read buffer */
359   int nBuffer;                /* Size of read buffer in bytes */
360   u8 *aMap;                   /* Pointer to mapping of entire file */
361   IncrMerger *pIncr;          /* Incremental merger */
362 };
363 
364 /*
365 ** Normally, a PmaReader object iterates through an existing PMA stored
366 ** within a temp file. However, if the PmaReader.pIncr variable points to
367 ** an object of the following type, it may be used to iterate/merge through
368 ** multiple PMAs simultaneously.
369 **
370 ** There are two types of IncrMerger object - single (bUseThread==0) and
371 ** multi-threaded (bUseThread==1).
372 **
373 ** A multi-threaded IncrMerger object uses two temporary files - aFile[0]
374 ** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in
375 ** size. When the IncrMerger is initialized, it reads enough data from
376 ** pMerger to populate aFile[0]. It then sets variables within the
377 ** corresponding PmaReader object to read from that file and kicks off
378 ** a background thread to populate aFile[1] with the next mxSz bytes of
379 ** sorted record data from pMerger.
380 **
381 ** When the PmaReader reaches the end of aFile[0], it blocks until the
382 ** background thread has finished populating aFile[1]. It then exchanges
383 ** the contents of the aFile[0] and aFile[1] variables within this structure,
384 ** sets the PmaReader fields to read from the new aFile[0] and kicks off
385 ** another background thread to populate the new aFile[1]. And so on, until
386 ** the contents of pMerger are exhausted.
387 **
388 ** A single-threaded IncrMerger does not open any temporary files of its
389 ** own. Instead, it has exclusive access to mxSz bytes of space beginning
390 ** at offset iStartOff of file pTask->file2. And instead of using a
391 ** background thread to prepare data for the PmaReader, with a single
392 ** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
393 ** keys from pMerger by the calling thread whenever the PmaReader runs out
394 ** of data.
395 */
396 struct IncrMerger {
397   SortSubtask *pTask;             /* Task that owns this merger */
398   MergeEngine *pMerger;           /* Merge engine thread reads data from */
399   i64 iStartOff;                  /* Offset to start writing file at */
400   int mxSz;                       /* Maximum bytes of data to store */
401   int bEof;                       /* Set to true when merge is finished */
402   int bUseThread;                 /* True to use a bg thread for this object */
403   SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
404 };
405 
406 /*
407 ** An instance of this object is used for writing a PMA.
408 **
409 ** The PMA is written one record at a time.  Each record is of an arbitrary
410 ** size.  But I/O is more efficient if it occurs in page-sized blocks where
411 ** each block is aligned on a page boundary.  This object caches writes to
412 ** the PMA so that aligned, page-size blocks are written.
413 */
414 struct PmaWriter {
415   int eFWErr;                     /* Non-zero if in an error state */
416   u8 *aBuffer;                    /* Pointer to write buffer */
417   int nBuffer;                    /* Size of write buffer in bytes */
418   int iBufStart;                  /* First byte of buffer to write */
419   int iBufEnd;                    /* Last byte of buffer to write */
420   i64 iWriteOff;                  /* Offset of start of buffer in file */
421   sqlite3_file *pFd;              /* File handle to write to */
422 };
423 
424 /*
425 ** This object is the header on a single record while that record is being
426 ** held in memory and prior to being written out as part of a PMA.
427 **
428 ** How the linked list is connected depends on how memory is being managed
429 ** by this module. If using a separate allocation for each in-memory record
430 ** (VdbeSorter.list.aMemory==0), then the list is always connected using the
431 ** SorterRecord.u.pNext pointers.
432 **
433 ** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
434 ** then while records are being accumulated the list is linked using the
435 ** SorterRecord.u.iNext offset. This is because the aMemory[] array may
436 ** be sqlite3Realloc()ed while records are being accumulated. Once the VM
437 ** has finished passing records to the sorter, or when the in-memory buffer
438 ** is full, the list is sorted. As part of the sorting process, it is
439 ** converted to use the SorterRecord.u.pNext pointers. See function
440 ** vdbeSorterSort() for details.
441 */
442 struct SorterRecord {
443   int nVal;                       /* Size of the record in bytes */
444   union {
445     SorterRecord *pNext;          /* Pointer to next record in list */
446     int iNext;                    /* Offset within aMemory of next record */
447   } u;
448   /* The data for the record immediately follows this header */
449 };
450 
451 /* Return a pointer to the buffer containing the record data for SorterRecord
452 ** object p. Should be used as if:
453 **
454 **   void *SRVAL(SorterRecord *p) { return (void*)&p[1]; }
455 */
456 #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1))
457 
458 
459 /* Maximum number of PMAs that a single MergeEngine can merge */
460 #define SORTER_MAX_MERGE_COUNT 16
461 
462 static int vdbeIncrSwap(IncrMerger*);
463 static void vdbeIncrFree(IncrMerger *);
464 
465 /*
466 ** Free all memory belonging to the PmaReader object passed as the
467 ** argument. All structure fields are set to zero before returning.
468 */
469 static void vdbePmaReaderClear(PmaReader *pReadr){
470   sqlite3_free(pReadr->aAlloc);
471   sqlite3_free(pReadr->aBuffer);
472   if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
473   vdbeIncrFree(pReadr->pIncr);
474   memset(pReadr, 0, sizeof(PmaReader));
475 }
476 
477 /*
478 ** Read the next nByte bytes of data from the PMA p.
479 ** If successful, set *ppOut to point to a buffer containing the data
480 ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite
481 ** error code.
482 **
483 ** The buffer returned in *ppOut is only valid until the
484 ** next call to this function.
485 */
486 static int vdbePmaReadBlob(
487   PmaReader *p,                   /* PmaReader from which to take the blob */
488   int nByte,                      /* Bytes of data to read */
489   u8 **ppOut                      /* OUT: Pointer to buffer containing data */
490 ){
491   int iBuf;                       /* Offset within buffer to read from */
492   int nAvail;                     /* Bytes of data available in buffer */
493 
494   if( p->aMap ){
495     *ppOut = &p->aMap[p->iReadOff];
496     p->iReadOff += nByte;
497     return SQLITE_OK;
498   }
499 
500   assert( p->aBuffer );
501 
502   /* If there is no more data to be read from the buffer, read the next
503   ** p->nBuffer bytes of data from the file into it. Or, if there are less
504   ** than p->nBuffer bytes remaining in the PMA, read all remaining data.  */
505   iBuf = p->iReadOff % p->nBuffer;
506   if( iBuf==0 ){
507     int nRead;                    /* Bytes to read from disk */
508     int rc;                       /* sqlite3OsRead() return code */
509 
510     /* Determine how many bytes of data to read. */
511     if( (p->iEof - p->iReadOff) > (i64)p->nBuffer ){
512       nRead = p->nBuffer;
513     }else{
514       nRead = (int)(p->iEof - p->iReadOff);
515     }
516     assert( nRead>0 );
517 
518     /* Readr data from the file. Return early if an error occurs. */
519     rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff);
520     assert( rc!=SQLITE_IOERR_SHORT_READ );
521     if( rc!=SQLITE_OK ) return rc;
522   }
523   nAvail = p->nBuffer - iBuf;
524 
525   if( nByte<=nAvail ){
526     /* The requested data is available in the in-memory buffer. In this
527     ** case there is no need to make a copy of the data, just return a
528     ** pointer into the buffer to the caller.  */
529     *ppOut = &p->aBuffer[iBuf];
530     p->iReadOff += nByte;
531   }else{
532     /* The requested data is not all available in the in-memory buffer.
533     ** In this case, allocate space at p->aAlloc[] to copy the requested
534     ** range into. Then return a copy of pointer p->aAlloc to the caller.  */
535     int nRem;                     /* Bytes remaining to copy */
536 
537     /* Extend the p->aAlloc[] allocation if required. */
538     if( p->nAlloc<nByte ){
539       u8 *aNew;
540       sqlite3_int64 nNew = MAX(128, 2*(sqlite3_int64)p->nAlloc);
541       while( nByte>nNew ) nNew = nNew*2;
542       aNew = sqlite3Realloc(p->aAlloc, nNew);
543       if( !aNew ) return SQLITE_NOMEM_BKPT;
544       p->nAlloc = nNew;
545       p->aAlloc = aNew;
546     }
547 
548     /* Copy as much data as is available in the buffer into the start of
549     ** p->aAlloc[].  */
550     memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail);
551     p->iReadOff += nAvail;
552     nRem = nByte - nAvail;
553 
554     /* The following loop copies up to p->nBuffer bytes per iteration into
555     ** the p->aAlloc[] buffer.  */
556     while( nRem>0 ){
557       int rc;                     /* vdbePmaReadBlob() return code */
558       int nCopy;                  /* Number of bytes to copy */
559       u8 *aNext;                  /* Pointer to buffer to copy data from */
560 
561       nCopy = nRem;
562       if( nRem>p->nBuffer ) nCopy = p->nBuffer;
563       rc = vdbePmaReadBlob(p, nCopy, &aNext);
564       if( rc!=SQLITE_OK ) return rc;
565       assert( aNext!=p->aAlloc );
566       memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy);
567       nRem -= nCopy;
568     }
569 
570     *ppOut = p->aAlloc;
571   }
572 
573   return SQLITE_OK;
574 }
575 
576 /*
577 ** Read a varint from the stream of data accessed by p. Set *pnOut to
578 ** the value read.
579 */
580 static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){
581   int iBuf;
582 
583   if( p->aMap ){
584     p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut);
585   }else{
586     iBuf = p->iReadOff % p->nBuffer;
587     if( iBuf && (p->nBuffer-iBuf)>=9 ){
588       p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut);
589     }else{
590       u8 aVarint[16], *a;
591       int i = 0, rc;
592       do{
593         rc = vdbePmaReadBlob(p, 1, &a);
594         if( rc ) return rc;
595         aVarint[(i++)&0xf] = a[0];
596       }while( (a[0]&0x80)!=0 );
597       sqlite3GetVarint(aVarint, pnOut);
598     }
599   }
600 
601   return SQLITE_OK;
602 }
603 
604 /*
605 ** Attempt to memory map file pFile. If successful, set *pp to point to the
606 ** new mapping and return SQLITE_OK. If the mapping is not attempted
607 ** (because the file is too large or the VFS layer is configured not to use
608 ** mmap), return SQLITE_OK and set *pp to NULL.
609 **
610 ** Or, if an error occurs, return an SQLite error code. The final value of
611 ** *pp is undefined in this case.
612 */
613 static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
614   int rc = SQLITE_OK;
615   if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
616     sqlite3_file *pFd = pFile->pFd;
617     if( pFd->pMethods->iVersion>=3 ){
618       rc = sqlite3OsFetch(pFd, 0, (int)pFile->iEof, (void**)pp);
619       testcase( rc!=SQLITE_OK );
620     }
621   }
622   return rc;
623 }
624 
625 /*
626 ** Attach PmaReader pReadr to file pFile (if it is not already attached to
627 ** that file) and seek it to offset iOff within the file.  Return SQLITE_OK
628 ** if successful, or an SQLite error code if an error occurs.
629 */
630 static int vdbePmaReaderSeek(
631   SortSubtask *pTask,             /* Task context */
632   PmaReader *pReadr,              /* Reader whose cursor is to be moved */
633   SorterFile *pFile,              /* Sorter file to read from */
634   i64 iOff                        /* Offset in pFile */
635 ){
636   int rc = SQLITE_OK;
637 
638   assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 );
639 
640   if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ;
641   if( pReadr->aMap ){
642     sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
643     pReadr->aMap = 0;
644   }
645   pReadr->iReadOff = iOff;
646   pReadr->iEof = pFile->iEof;
647   pReadr->pFd = pFile->pFd;
648 
649   rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap);
650   if( rc==SQLITE_OK && pReadr->aMap==0 ){
651     int pgsz = pTask->pSorter->pgsz;
652     int iBuf = pReadr->iReadOff % pgsz;
653     if( pReadr->aBuffer==0 ){
654       pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz);
655       if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM_BKPT;
656       pReadr->nBuffer = pgsz;
657     }
658     if( rc==SQLITE_OK && iBuf ){
659       int nRead = pgsz - iBuf;
660       if( (pReadr->iReadOff + nRead) > pReadr->iEof ){
661         nRead = (int)(pReadr->iEof - pReadr->iReadOff);
662       }
663       rc = sqlite3OsRead(
664           pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff
665       );
666       testcase( rc!=SQLITE_OK );
667     }
668   }
669 
670   return rc;
671 }
672 
673 /*
674 ** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if
675 ** no error occurs, or an SQLite error code if one does.
676 */
677 static int vdbePmaReaderNext(PmaReader *pReadr){
678   int rc = SQLITE_OK;             /* Return Code */
679   u64 nRec = 0;                   /* Size of record in bytes */
680 
681 
682   if( pReadr->iReadOff>=pReadr->iEof ){
683     IncrMerger *pIncr = pReadr->pIncr;
684     int bEof = 1;
685     if( pIncr ){
686       rc = vdbeIncrSwap(pIncr);
687       if( rc==SQLITE_OK && pIncr->bEof==0 ){
688         rc = vdbePmaReaderSeek(
689             pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff
690         );
691         bEof = 0;
692       }
693     }
694 
695     if( bEof ){
696       /* This is an EOF condition */
697       vdbePmaReaderClear(pReadr);
698       testcase( rc!=SQLITE_OK );
699       return rc;
700     }
701   }
702 
703   if( rc==SQLITE_OK ){
704     rc = vdbePmaReadVarint(pReadr, &nRec);
705   }
706   if( rc==SQLITE_OK ){
707     pReadr->nKey = (int)nRec;
708     rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey);
709     testcase( rc!=SQLITE_OK );
710   }
711 
712   return rc;
713 }
714 
715 /*
716 ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile
717 ** starting at offset iStart and ending at offset iEof-1. This function
718 ** leaves the PmaReader pointing to the first key in the PMA (or EOF if the
719 ** PMA is empty).
720 **
721 ** If the pnByte parameter is NULL, then it is assumed that the file
722 ** contains a single PMA, and that that PMA omits the initial length varint.
723 */
724 static int vdbePmaReaderInit(
725   SortSubtask *pTask,             /* Task context */
726   SorterFile *pFile,              /* Sorter file to read from */
727   i64 iStart,                     /* Start offset in pFile */
728   PmaReader *pReadr,              /* PmaReader to populate */
729   i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
730 ){
731   int rc;
732 
733   assert( pFile->iEof>iStart );
734   assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 );
735   assert( pReadr->aBuffer==0 );
736   assert( pReadr->aMap==0 );
737 
738   rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart);
739   if( rc==SQLITE_OK ){
740     u64 nByte = 0;                 /* Size of PMA in bytes */
741     rc = vdbePmaReadVarint(pReadr, &nByte);
742     pReadr->iEof = pReadr->iReadOff + nByte;
743     *pnByte += nByte;
744   }
745 
746   if( rc==SQLITE_OK ){
747     rc = vdbePmaReaderNext(pReadr);
748   }
749   return rc;
750 }
751 
752 /*
753 ** A version of vdbeSorterCompare() that assumes that it has already been
754 ** determined that the first field of key1 is equal to the first field of
755 ** key2.
756 */
757 static int vdbeSorterCompareTail(
758   SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
759   int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
760   const void *pKey1, int nKey1,   /* Left side of comparison */
761   const void *pKey2, int nKey2    /* Right side of comparison */
762 ){
763   UnpackedRecord *r2 = pTask->pUnpacked;
764   if( *pbKey2Cached==0 ){
765     sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
766     *pbKey2Cached = 1;
767   }
768   return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, r2, 1);
769 }
770 
771 /*
772 ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2,
773 ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences
774 ** used by the comparison. Return the result of the comparison.
775 **
776 ** If IN/OUT parameter *pbKey2Cached is true when this function is called,
777 ** it is assumed that (pTask->pUnpacked) contains the unpacked version
778 ** of key2. If it is false, (pTask->pUnpacked) is populated with the unpacked
779 ** version of key2 and *pbKey2Cached set to true before returning.
780 **
781 ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set
782 ** to SQLITE_NOMEM.
783 */
784 static int vdbeSorterCompare(
785   SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
786   int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
787   const void *pKey1, int nKey1,   /* Left side of comparison */
788   const void *pKey2, int nKey2    /* Right side of comparison */
789 ){
790   UnpackedRecord *r2 = pTask->pUnpacked;
791   if( !*pbKey2Cached ){
792     sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
793     *pbKey2Cached = 1;
794   }
795   return sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
796 }
797 
798 /*
799 ** A specially optimized version of vdbeSorterCompare() that assumes that
800 ** the first field of each key is a TEXT value and that the collation
801 ** sequence to compare them with is BINARY.
802 */
803 static int vdbeSorterCompareText(
804   SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
805   int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
806   const void *pKey1, int nKey1,   /* Left side of comparison */
807   const void *pKey2, int nKey2    /* Right side of comparison */
808 ){
809   const u8 * const p1 = (const u8 * const)pKey1;
810   const u8 * const p2 = (const u8 * const)pKey2;
811   const u8 * const v1 = &p1[ p1[0] ];   /* Pointer to value 1 */
812   const u8 * const v2 = &p2[ p2[0] ];   /* Pointer to value 2 */
813 
814   int n1;
815   int n2;
816   int res;
817 
818   getVarint32NR(&p1[1], n1);
819   getVarint32NR(&p2[1], n2);
820   res = memcmp(v1, v2, (MIN(n1, n2) - 13)/2);
821   if( res==0 ){
822     res = n1 - n2;
823   }
824 
825   if( res==0 ){
826     if( pTask->pSorter->pKeyInfo->nKeyField>1 ){
827       res = vdbeSorterCompareTail(
828           pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
829       );
830     }
831   }else{
832     assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) );
833     if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){
834       res = res * -1;
835     }
836   }
837 
838   return res;
839 }
840 
841 /*
842 ** A specially optimized version of vdbeSorterCompare() that assumes that
843 ** the first field of each key is an INTEGER value.
844 */
845 static int vdbeSorterCompareInt(
846   SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
847   int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
848   const void *pKey1, int nKey1,   /* Left side of comparison */
849   const void *pKey2, int nKey2    /* Right side of comparison */
850 ){
851   const u8 * const p1 = (const u8 * const)pKey1;
852   const u8 * const p2 = (const u8 * const)pKey2;
853   const int s1 = p1[1];                 /* Left hand serial type */
854   const int s2 = p2[1];                 /* Right hand serial type */
855   const u8 * const v1 = &p1[ p1[0] ];   /* Pointer to value 1 */
856   const u8 * const v2 = &p2[ p2[0] ];   /* Pointer to value 2 */
857   int res;                              /* Return value */
858 
859   assert( (s1>0 && s1<7) || s1==8 || s1==9 );
860   assert( (s2>0 && s2<7) || s2==8 || s2==9 );
861 
862   if( s1==s2 ){
863     /* The two values have the same sign. Compare using memcmp(). */
864     static const u8 aLen[] = {0, 1, 2, 3, 4, 6, 8, 0, 0, 0 };
865     const u8 n = aLen[s1];
866     int i;
867     res = 0;
868     for(i=0; i<n; i++){
869       if( (res = v1[i] - v2[i])!=0 ){
870         if( ((v1[0] ^ v2[0]) & 0x80)!=0 ){
871           res = v1[0] & 0x80 ? -1 : +1;
872         }
873         break;
874       }
875     }
876   }else if( s1>7 && s2>7 ){
877     res = s1 - s2;
878   }else{
879     if( s2>7 ){
880       res = +1;
881     }else if( s1>7 ){
882       res = -1;
883     }else{
884       res = s1 - s2;
885     }
886     assert( res!=0 );
887 
888     if( res>0 ){
889       if( *v1 & 0x80 ) res = -1;
890     }else{
891       if( *v2 & 0x80 ) res = +1;
892     }
893   }
894 
895   if( res==0 ){
896     if( pTask->pSorter->pKeyInfo->nKeyField>1 ){
897       res = vdbeSorterCompareTail(
898           pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
899       );
900     }
901   }else if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){
902     assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) );
903     res = res * -1;
904   }
905 
906   return res;
907 }
908 
909 /*
910 ** Initialize the temporary index cursor just opened as a sorter cursor.
911 **
912 ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nKeyField)
913 ** to determine the number of fields that should be compared from the
914 ** records being sorted. However, if the value passed as argument nField
915 ** is non-zero and the sorter is able to guarantee a stable sort, nField
916 ** is used instead. This is used when sorting records for a CREATE INDEX
917 ** statement. In this case, keys are always delivered to the sorter in
918 ** order of the primary key, which happens to be make up the final part
919 ** of the records being sorted. So if the sort is stable, there is never
920 ** any reason to compare PK fields and they can be ignored for a small
921 ** performance boost.
922 **
923 ** The sorter can guarantee a stable sort when running in single-threaded
924 ** mode, but not in multi-threaded mode.
925 **
926 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
927 */
928 int sqlite3VdbeSorterInit(
929   sqlite3 *db,                    /* Database connection (for malloc()) */
930   int nField,                     /* Number of key fields in each record */
931   VdbeCursor *pCsr                /* Cursor that holds the new sorter */
932 ){
933   int pgsz;                       /* Page size of main database */
934   int i;                          /* Used to iterate through aTask[] */
935   VdbeSorter *pSorter;            /* The new sorter */
936   KeyInfo *pKeyInfo;              /* Copy of pCsr->pKeyInfo with db==0 */
937   int szKeyInfo;                  /* Size of pCsr->pKeyInfo in bytes */
938   int sz;                         /* Size of pSorter in bytes */
939   int rc = SQLITE_OK;
940 #if SQLITE_MAX_WORKER_THREADS==0
941 # define nWorker 0
942 #else
943   int nWorker;
944 #endif
945 
946   /* Initialize the upper limit on the number of worker threads */
947 #if SQLITE_MAX_WORKER_THREADS>0
948   if( sqlite3TempInMemory(db) || sqlite3GlobalConfig.bCoreMutex==0 ){
949     nWorker = 0;
950   }else{
951     nWorker = db->aLimit[SQLITE_LIMIT_WORKER_THREADS];
952   }
953 #endif
954 
955   /* Do not allow the total number of threads (main thread + all workers)
956   ** to exceed the maximum merge count */
957 #if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT
958   if( nWorker>=SORTER_MAX_MERGE_COUNT ){
959     nWorker = SORTER_MAX_MERGE_COUNT-1;
960   }
961 #endif
962 
963   assert( pCsr->pKeyInfo && pCsr->pBtx==0 );
964   assert( pCsr->eCurType==CURTYPE_SORTER );
965   szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nKeyField-1)*sizeof(CollSeq*);
966   sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);
967 
968   pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
969   pCsr->uc.pSorter = pSorter;
970   if( pSorter==0 ){
971     rc = SQLITE_NOMEM_BKPT;
972   }else{
973     Btree *pBt = db->aDb[0].pBt;
974     pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
975     memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
976     pKeyInfo->db = 0;
977     if( nField && nWorker==0 ){
978       pKeyInfo->nKeyField = nField;
979     }
980     sqlite3BtreeEnter(pBt);
981     pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(pBt);
982     sqlite3BtreeLeave(pBt);
983     pSorter->nTask = nWorker + 1;
984     pSorter->iPrev = (u8)(nWorker - 1);
985     pSorter->bUseThreads = (pSorter->nTask>1);
986     pSorter->db = db;
987     for(i=0; i<pSorter->nTask; i++){
988       SortSubtask *pTask = &pSorter->aTask[i];
989       pTask->pSorter = pSorter;
990     }
991 
992     if( !sqlite3TempInMemory(db) ){
993       i64 mxCache;                /* Cache size in bytes*/
994       u32 szPma = sqlite3GlobalConfig.szPma;
995       pSorter->mnPmaSize = szPma * pgsz;
996 
997       mxCache = db->aDb[0].pSchema->cache_size;
998       if( mxCache<0 ){
999         /* A negative cache-size value C indicates that the cache is abs(C)
1000         ** KiB in size.  */
1001         mxCache = mxCache * -1024;
1002       }else{
1003         mxCache = mxCache * pgsz;
1004       }
1005       mxCache = MIN(mxCache, SQLITE_MAX_PMASZ);
1006       pSorter->mxPmaSize = MAX(pSorter->mnPmaSize, (int)mxCache);
1007 
1008       /* Avoid large memory allocations if the application has requested
1009       ** SQLITE_CONFIG_SMALL_MALLOC. */
1010       if( sqlite3GlobalConfig.bSmallMalloc==0 ){
1011         assert( pSorter->iMemory==0 );
1012         pSorter->nMemory = pgsz;
1013         pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
1014         if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM_BKPT;
1015       }
1016     }
1017 
1018     if( pKeyInfo->nAllField<13
1019      && (pKeyInfo->aColl[0]==0 || pKeyInfo->aColl[0]==db->pDfltColl)
1020      && (pKeyInfo->aSortFlags[0] & KEYINFO_ORDER_BIGNULL)==0
1021     ){
1022       pSorter->typeMask = SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT;
1023     }
1024   }
1025 
1026   return rc;
1027 }
1028 #undef nWorker   /* Defined at the top of this function */
1029 
1030 /*
1031 ** Free the list of sorted records starting at pRecord.
1032 */
1033 static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){
1034   SorterRecord *p;
1035   SorterRecord *pNext;
1036   for(p=pRecord; p; p=pNext){
1037     pNext = p->u.pNext;
1038     sqlite3DbFree(db, p);
1039   }
1040 }
1041 
1042 /*
1043 ** Free all resources owned by the object indicated by argument pTask. All
1044 ** fields of *pTask are zeroed before returning.
1045 */
1046 static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
1047   sqlite3DbFree(db, pTask->pUnpacked);
1048 #if SQLITE_MAX_WORKER_THREADS>0
1049   /* pTask->list.aMemory can only be non-zero if it was handed memory
1050   ** from the main thread.  That only occurs SQLITE_MAX_WORKER_THREADS>0 */
1051   if( pTask->list.aMemory ){
1052     sqlite3_free(pTask->list.aMemory);
1053   }else
1054 #endif
1055   {
1056     assert( pTask->list.aMemory==0 );
1057     vdbeSorterRecordFree(0, pTask->list.pList);
1058   }
1059   if( pTask->file.pFd ){
1060     sqlite3OsCloseFree(pTask->file.pFd);
1061   }
1062   if( pTask->file2.pFd ){
1063     sqlite3OsCloseFree(pTask->file2.pFd);
1064   }
1065   memset(pTask, 0, sizeof(SortSubtask));
1066 }
1067 
1068 #ifdef SQLITE_DEBUG_SORTER_THREADS
1069 static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
1070   i64 t;
1071   int iTask = (pTask - pTask->pSorter->aTask);
1072   sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
1073   fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
1074 }
1075 static void vdbeSorterRewindDebug(const char *zEvent){
1076   i64 t = 0;
1077   sqlite3_vfs *pVfs = sqlite3_vfs_find(0);
1078   if( ALWAYS(pVfs) ) sqlite3OsCurrentTimeInt64(pVfs, &t);
1079   fprintf(stderr, "%lld:X %s\n", t, zEvent);
1080 }
1081 static void vdbeSorterPopulateDebug(
1082   SortSubtask *pTask,
1083   const char *zEvent
1084 ){
1085   i64 t;
1086   int iTask = (pTask - pTask->pSorter->aTask);
1087   sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
1088   fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
1089 }
1090 static void vdbeSorterBlockDebug(
1091   SortSubtask *pTask,
1092   int bBlocked,
1093   const char *zEvent
1094 ){
1095   if( bBlocked ){
1096     i64 t;
1097     sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
1098     fprintf(stderr, "%lld:main %s\n", t, zEvent);
1099   }
1100 }
1101 #else
1102 # define vdbeSorterWorkDebug(x,y)
1103 # define vdbeSorterRewindDebug(y)
1104 # define vdbeSorterPopulateDebug(x,y)
1105 # define vdbeSorterBlockDebug(x,y,z)
1106 #endif
1107 
1108 #if SQLITE_MAX_WORKER_THREADS>0
1109 /*
1110 ** Join thread pTask->thread.
1111 */
1112 static int vdbeSorterJoinThread(SortSubtask *pTask){
1113   int rc = SQLITE_OK;
1114   if( pTask->pThread ){
1115 #ifdef SQLITE_DEBUG_SORTER_THREADS
1116     int bDone = pTask->bDone;
1117 #endif
1118     void *pRet = SQLITE_INT_TO_PTR(SQLITE_ERROR);
1119     vdbeSorterBlockDebug(pTask, !bDone, "enter");
1120     (void)sqlite3ThreadJoin(pTask->pThread, &pRet);
1121     vdbeSorterBlockDebug(pTask, !bDone, "exit");
1122     rc = SQLITE_PTR_TO_INT(pRet);
1123     assert( pTask->bDone==1 );
1124     pTask->bDone = 0;
1125     pTask->pThread = 0;
1126   }
1127   return rc;
1128 }
1129 
1130 /*
1131 ** Launch a background thread to run xTask(pIn).
1132 */
1133 static int vdbeSorterCreateThread(
1134   SortSubtask *pTask,             /* Thread will use this task object */
1135   void *(*xTask)(void*),          /* Routine to run in a separate thread */
1136   void *pIn                       /* Argument passed into xTask() */
1137 ){
1138   assert( pTask->pThread==0 && pTask->bDone==0 );
1139   return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn);
1140 }
1141 
1142 /*
1143 ** Join all outstanding threads launched by SorterWrite() to create
1144 ** level-0 PMAs.
1145 */
1146 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
1147   int rc = rcin;
1148   int i;
1149 
1150   /* This function is always called by the main user thread.
1151   **
1152   ** If this function is being called after SorterRewind() has been called,
1153   ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread
1154   ** is currently attempt to join one of the other threads. To avoid a race
1155   ** condition where this thread also attempts to join the same object, join
1156   ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */
1157   for(i=pSorter->nTask-1; i>=0; i--){
1158     SortSubtask *pTask = &pSorter->aTask[i];
1159     int rc2 = vdbeSorterJoinThread(pTask);
1160     if( rc==SQLITE_OK ) rc = rc2;
1161   }
1162   return rc;
1163 }
1164 #else
1165 # define vdbeSorterJoinAll(x,rcin) (rcin)
1166 # define vdbeSorterJoinThread(pTask) SQLITE_OK
1167 #endif
1168 
1169 /*
1170 ** Allocate a new MergeEngine object capable of handling up to
1171 ** nReader PmaReader inputs.
1172 **
1173 ** nReader is automatically rounded up to the next power of two.
1174 ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up.
1175 */
1176 static MergeEngine *vdbeMergeEngineNew(int nReader){
1177   int N = 2;                      /* Smallest power of two >= nReader */
1178   int nByte;                      /* Total bytes of space to allocate */
1179   MergeEngine *pNew;              /* Pointer to allocated object to return */
1180 
1181   assert( nReader<=SORTER_MAX_MERGE_COUNT );
1182 
1183   while( N<nReader ) N += N;
1184   nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
1185 
1186   pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte);
1187   if( pNew ){
1188     pNew->nTree = N;
1189     pNew->pTask = 0;
1190     pNew->aReadr = (PmaReader*)&pNew[1];
1191     pNew->aTree = (int*)&pNew->aReadr[N];
1192   }
1193   return pNew;
1194 }
1195 
1196 /*
1197 ** Free the MergeEngine object passed as the only argument.
1198 */
1199 static void vdbeMergeEngineFree(MergeEngine *pMerger){
1200   int i;
1201   if( pMerger ){
1202     for(i=0; i<pMerger->nTree; i++){
1203       vdbePmaReaderClear(&pMerger->aReadr[i]);
1204     }
1205   }
1206   sqlite3_free(pMerger);
1207 }
1208 
1209 /*
1210 ** Free all resources associated with the IncrMerger object indicated by
1211 ** the first argument.
1212 */
1213 static void vdbeIncrFree(IncrMerger *pIncr){
1214   if( pIncr ){
1215 #if SQLITE_MAX_WORKER_THREADS>0
1216     if( pIncr->bUseThread ){
1217       vdbeSorterJoinThread(pIncr->pTask);
1218       if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
1219       if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
1220     }
1221 #endif
1222     vdbeMergeEngineFree(pIncr->pMerger);
1223     sqlite3_free(pIncr);
1224   }
1225 }
1226 
1227 /*
1228 ** Reset a sorting cursor back to its original empty state.
1229 */
1230 void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
1231   int i;
1232   (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
1233   assert( pSorter->bUseThreads || pSorter->pReader==0 );
1234 #if SQLITE_MAX_WORKER_THREADS>0
1235   if( pSorter->pReader ){
1236     vdbePmaReaderClear(pSorter->pReader);
1237     sqlite3DbFree(db, pSorter->pReader);
1238     pSorter->pReader = 0;
1239   }
1240 #endif
1241   vdbeMergeEngineFree(pSorter->pMerger);
1242   pSorter->pMerger = 0;
1243   for(i=0; i<pSorter->nTask; i++){
1244     SortSubtask *pTask = &pSorter->aTask[i];
1245     vdbeSortSubtaskCleanup(db, pTask);
1246     pTask->pSorter = pSorter;
1247   }
1248   if( pSorter->list.aMemory==0 ){
1249     vdbeSorterRecordFree(0, pSorter->list.pList);
1250   }
1251   pSorter->list.pList = 0;
1252   pSorter->list.szPMA = 0;
1253   pSorter->bUsePMA = 0;
1254   pSorter->iMemory = 0;
1255   pSorter->mxKeysize = 0;
1256   sqlite3DbFree(db, pSorter->pUnpacked);
1257   pSorter->pUnpacked = 0;
1258 }
1259 
1260 /*
1261 ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
1262 */
1263 void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
1264   VdbeSorter *pSorter;
1265   assert( pCsr->eCurType==CURTYPE_SORTER );
1266   pSorter = pCsr->uc.pSorter;
1267   if( pSorter ){
1268     sqlite3VdbeSorterReset(db, pSorter);
1269     sqlite3_free(pSorter->list.aMemory);
1270     sqlite3DbFree(db, pSorter);
1271     pCsr->uc.pSorter = 0;
1272   }
1273 }
1274 
1275 #if SQLITE_MAX_MMAP_SIZE>0
1276 /*
1277 ** The first argument is a file-handle open on a temporary file. The file
1278 ** is guaranteed to be nByte bytes or smaller in size. This function
1279 ** attempts to extend the file to nByte bytes in size and to ensure that
1280 ** the VFS has memory mapped it.
1281 **
1282 ** Whether or not the file does end up memory mapped of course depends on
1283 ** the specific VFS implementation.
1284 */
1285 static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){
1286   if( nByte<=(i64)(db->nMaxSorterMmap) && pFd->pMethods->iVersion>=3 ){
1287     void *p = 0;
1288     int chunksize = 4*1024;
1289     sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_CHUNK_SIZE, &chunksize);
1290     sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_SIZE_HINT, &nByte);
1291     sqlite3OsFetch(pFd, 0, (int)nByte, &p);
1292     sqlite3OsUnfetch(pFd, 0, p);
1293   }
1294 }
1295 #else
1296 # define vdbeSorterExtendFile(x,y,z)
1297 #endif
1298 
1299 /*
1300 ** Allocate space for a file-handle and open a temporary file. If successful,
1301 ** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK.
1302 ** Otherwise, set *ppFd to 0 and return an SQLite error code.
1303 */
1304 static int vdbeSorterOpenTempFile(
1305   sqlite3 *db,                    /* Database handle doing sort */
1306   i64 nExtend,                    /* Attempt to extend file to this size */
1307   sqlite3_file **ppFd
1308 ){
1309   int rc;
1310   if( sqlite3FaultSim(202) ) return SQLITE_IOERR_ACCESS;
1311   rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd,
1312       SQLITE_OPEN_TEMP_JOURNAL |
1313       SQLITE_OPEN_READWRITE    | SQLITE_OPEN_CREATE |
1314       SQLITE_OPEN_EXCLUSIVE    | SQLITE_OPEN_DELETEONCLOSE, &rc
1315   );
1316   if( rc==SQLITE_OK ){
1317     i64 max = SQLITE_MAX_MMAP_SIZE;
1318     sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
1319     if( nExtend>0 ){
1320       vdbeSorterExtendFile(db, *ppFd, nExtend);
1321     }
1322   }
1323   return rc;
1324 }
1325 
1326 /*
1327 ** If it has not already been allocated, allocate the UnpackedRecord
1328 ** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or
1329 ** if no allocation was required), or SQLITE_NOMEM otherwise.
1330 */
1331 static int vdbeSortAllocUnpacked(SortSubtask *pTask){
1332   if( pTask->pUnpacked==0 ){
1333     pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pTask->pSorter->pKeyInfo);
1334     if( pTask->pUnpacked==0 ) return SQLITE_NOMEM_BKPT;
1335     pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nKeyField;
1336     pTask->pUnpacked->errCode = 0;
1337   }
1338   return SQLITE_OK;
1339 }
1340 
1341 
1342 /*
1343 ** Merge the two sorted lists p1 and p2 into a single list.
1344 */
1345 static SorterRecord *vdbeSorterMerge(
1346   SortSubtask *pTask,             /* Calling thread context */
1347   SorterRecord *p1,               /* First list to merge */
1348   SorterRecord *p2                /* Second list to merge */
1349 ){
1350   SorterRecord *pFinal = 0;
1351   SorterRecord **pp = &pFinal;
1352   int bCached = 0;
1353 
1354   assert( p1!=0 && p2!=0 );
1355   for(;;){
1356     int res;
1357     res = pTask->xCompare(
1358         pTask, &bCached, SRVAL(p1), p1->nVal, SRVAL(p2), p2->nVal
1359     );
1360 
1361     if( res<=0 ){
1362       *pp = p1;
1363       pp = &p1->u.pNext;
1364       p1 = p1->u.pNext;
1365       if( p1==0 ){
1366         *pp = p2;
1367         break;
1368       }
1369     }else{
1370       *pp = p2;
1371       pp = &p2->u.pNext;
1372       p2 = p2->u.pNext;
1373       bCached = 0;
1374       if( p2==0 ){
1375         *pp = p1;
1376         break;
1377       }
1378     }
1379   }
1380   return pFinal;
1381 }
1382 
1383 /*
1384 ** Return the SorterCompare function to compare values collected by the
1385 ** sorter object passed as the only argument.
1386 */
1387 static SorterCompare vdbeSorterGetCompare(VdbeSorter *p){
1388   if( p->typeMask==SORTER_TYPE_INTEGER ){
1389     return vdbeSorterCompareInt;
1390   }else if( p->typeMask==SORTER_TYPE_TEXT ){
1391     return vdbeSorterCompareText;
1392   }
1393   return vdbeSorterCompare;
1394 }
1395 
1396 /*
1397 ** Sort the linked list of records headed at pTask->pList. Return
1398 ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if
1399 ** an error occurs.
1400 */
1401 static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){
1402   int i;
1403   SorterRecord *p;
1404   int rc;
1405   SorterRecord *aSlot[64];
1406 
1407   rc = vdbeSortAllocUnpacked(pTask);
1408   if( rc!=SQLITE_OK ) return rc;
1409 
1410   p = pList->pList;
1411   pTask->xCompare = vdbeSorterGetCompare(pTask->pSorter);
1412   memset(aSlot, 0, sizeof(aSlot));
1413 
1414   while( p ){
1415     SorterRecord *pNext;
1416     if( pList->aMemory ){
1417       if( (u8*)p==pList->aMemory ){
1418         pNext = 0;
1419       }else{
1420         assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
1421         pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
1422       }
1423     }else{
1424       pNext = p->u.pNext;
1425     }
1426 
1427     p->u.pNext = 0;
1428     for(i=0; aSlot[i]; i++){
1429       p = vdbeSorterMerge(pTask, p, aSlot[i]);
1430       aSlot[i] = 0;
1431     }
1432     aSlot[i] = p;
1433     p = pNext;
1434   }
1435 
1436   p = 0;
1437   for(i=0; i<ArraySize(aSlot); i++){
1438     if( aSlot[i]==0 ) continue;
1439     p = p ? vdbeSorterMerge(pTask, p, aSlot[i]) : aSlot[i];
1440   }
1441   pList->pList = p;
1442 
1443   assert( pTask->pUnpacked->errCode==SQLITE_OK
1444        || pTask->pUnpacked->errCode==SQLITE_NOMEM
1445   );
1446   return pTask->pUnpacked->errCode;
1447 }
1448 
1449 /*
1450 ** Initialize a PMA-writer object.
1451 */
1452 static void vdbePmaWriterInit(
1453   sqlite3_file *pFd,              /* File handle to write to */
1454   PmaWriter *p,                   /* Object to populate */
1455   int nBuf,                       /* Buffer size */
1456   i64 iStart                      /* Offset of pFd to begin writing at */
1457 ){
1458   memset(p, 0, sizeof(PmaWriter));
1459   p->aBuffer = (u8*)sqlite3Malloc(nBuf);
1460   if( !p->aBuffer ){
1461     p->eFWErr = SQLITE_NOMEM_BKPT;
1462   }else{
1463     p->iBufEnd = p->iBufStart = (iStart % nBuf);
1464     p->iWriteOff = iStart - p->iBufStart;
1465     p->nBuffer = nBuf;
1466     p->pFd = pFd;
1467   }
1468 }
1469 
1470 /*
1471 ** Write nData bytes of data to the PMA. Return SQLITE_OK
1472 ** if successful, or an SQLite error code if an error occurs.
1473 */
1474 static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){
1475   int nRem = nData;
1476   while( nRem>0 && p->eFWErr==0 ){
1477     int nCopy = nRem;
1478     if( nCopy>(p->nBuffer - p->iBufEnd) ){
1479       nCopy = p->nBuffer - p->iBufEnd;
1480     }
1481 
1482     memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy);
1483     p->iBufEnd += nCopy;
1484     if( p->iBufEnd==p->nBuffer ){
1485       p->eFWErr = sqlite3OsWrite(p->pFd,
1486           &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
1487           p->iWriteOff + p->iBufStart
1488       );
1489       p->iBufStart = p->iBufEnd = 0;
1490       p->iWriteOff += p->nBuffer;
1491     }
1492     assert( p->iBufEnd<p->nBuffer );
1493 
1494     nRem -= nCopy;
1495   }
1496 }
1497 
1498 /*
1499 ** Flush any buffered data to disk and clean up the PMA-writer object.
1500 ** The results of using the PMA-writer after this call are undefined.
1501 ** Return SQLITE_OK if flushing the buffered data succeeds or is not
1502 ** required. Otherwise, return an SQLite error code.
1503 **
1504 ** Before returning, set *piEof to the offset immediately following the
1505 ** last byte written to the file.
1506 */
1507 static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){
1508   int rc;
1509   if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){
1510     p->eFWErr = sqlite3OsWrite(p->pFd,
1511         &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
1512         p->iWriteOff + p->iBufStart
1513     );
1514   }
1515   *piEof = (p->iWriteOff + p->iBufEnd);
1516   sqlite3_free(p->aBuffer);
1517   rc = p->eFWErr;
1518   memset(p, 0, sizeof(PmaWriter));
1519   return rc;
1520 }
1521 
1522 /*
1523 ** Write value iVal encoded as a varint to the PMA. Return
1524 ** SQLITE_OK if successful, or an SQLite error code if an error occurs.
1525 */
1526 static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){
1527   int nByte;
1528   u8 aByte[10];
1529   nByte = sqlite3PutVarint(aByte, iVal);
1530   vdbePmaWriteBlob(p, aByte, nByte);
1531 }
1532 
1533 /*
1534 ** Write the current contents of in-memory linked-list pList to a level-0
1535 ** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if
1536 ** successful, or an SQLite error code otherwise.
1537 **
1538 ** The format of a PMA is:
1539 **
1540 **     * A varint. This varint contains the total number of bytes of content
1541 **       in the PMA (not including the varint itself).
1542 **
1543 **     * One or more records packed end-to-end in order of ascending keys.
1544 **       Each record consists of a varint followed by a blob of data (the
1545 **       key). The varint is the number of bytes in the blob of data.
1546 */
1547 static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
1548   sqlite3 *db = pTask->pSorter->db;
1549   int rc = SQLITE_OK;             /* Return code */
1550   PmaWriter writer;               /* Object used to write to the file */
1551 
1552 #ifdef SQLITE_DEBUG
1553   /* Set iSz to the expected size of file pTask->file after writing the PMA.
1554   ** This is used by an assert() statement at the end of this function.  */
1555   i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
1556 #endif
1557 
1558   vdbeSorterWorkDebug(pTask, "enter");
1559   memset(&writer, 0, sizeof(PmaWriter));
1560   assert( pList->szPMA>0 );
1561 
1562   /* If the first temporary PMA file has not been opened, open it now. */
1563   if( pTask->file.pFd==0 ){
1564     rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd);
1565     assert( rc!=SQLITE_OK || pTask->file.pFd );
1566     assert( pTask->file.iEof==0 );
1567     assert( pTask->nPMA==0 );
1568   }
1569 
1570   /* Try to get the file to memory map */
1571   if( rc==SQLITE_OK ){
1572     vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9);
1573   }
1574 
1575   /* Sort the list */
1576   if( rc==SQLITE_OK ){
1577     rc = vdbeSorterSort(pTask, pList);
1578   }
1579 
1580   if( rc==SQLITE_OK ){
1581     SorterRecord *p;
1582     SorterRecord *pNext = 0;
1583 
1584     vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
1585                       pTask->file.iEof);
1586     pTask->nPMA++;
1587     vdbePmaWriteVarint(&writer, pList->szPMA);
1588     for(p=pList->pList; p; p=pNext){
1589       pNext = p->u.pNext;
1590       vdbePmaWriteVarint(&writer, p->nVal);
1591       vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
1592       if( pList->aMemory==0 ) sqlite3_free(p);
1593     }
1594     pList->pList = p;
1595     rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
1596   }
1597 
1598   vdbeSorterWorkDebug(pTask, "exit");
1599   assert( rc!=SQLITE_OK || pList->pList==0 );
1600   assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
1601   return rc;
1602 }
1603 
1604 /*
1605 ** Advance the MergeEngine to its next entry.
1606 ** Set *pbEof to true there is no next entry because
1607 ** the MergeEngine has reached the end of all its inputs.
1608 **
1609 ** Return SQLITE_OK if successful or an error code if an error occurs.
1610 */
1611 static int vdbeMergeEngineStep(
1612   MergeEngine *pMerger,      /* The merge engine to advance to the next row */
1613   int *pbEof                 /* Set TRUE at EOF.  Set false for more content */
1614 ){
1615   int rc;
1616   int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */
1617   SortSubtask *pTask = pMerger->pTask;
1618 
1619   /* Advance the current PmaReader */
1620   rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]);
1621 
1622   /* Update contents of aTree[] */
1623   if( rc==SQLITE_OK ){
1624     int i;                      /* Index of aTree[] to recalculate */
1625     PmaReader *pReadr1;         /* First PmaReader to compare */
1626     PmaReader *pReadr2;         /* Second PmaReader to compare */
1627     int bCached = 0;
1628 
1629     /* Find the first two PmaReaders to compare. The one that was just
1630     ** advanced (iPrev) and the one next to it in the array.  */
1631     pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)];
1632     pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)];
1633 
1634     for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){
1635       /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */
1636       int iRes;
1637       if( pReadr1->pFd==0 ){
1638         iRes = +1;
1639       }else if( pReadr2->pFd==0 ){
1640         iRes = -1;
1641       }else{
1642         iRes = pTask->xCompare(pTask, &bCached,
1643             pReadr1->aKey, pReadr1->nKey, pReadr2->aKey, pReadr2->nKey
1644         );
1645       }
1646 
1647       /* If pReadr1 contained the smaller value, set aTree[i] to its index.
1648       ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this
1649       ** case there is no cache of pReadr2 in pTask->pUnpacked, so set
1650       ** pKey2 to point to the record belonging to pReadr2.
1651       **
1652       ** Alternatively, if pReadr2 contains the smaller of the two values,
1653       ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare()
1654       ** was actually called above, then pTask->pUnpacked now contains
1655       ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent
1656       ** vdbeSorterCompare() from decoding pReadr2 again.
1657       **
1658       ** If the two values were equal, then the value from the oldest
1659       ** PMA should be considered smaller. The VdbeSorter.aReadr[] array
1660       ** is sorted from oldest to newest, so pReadr1 contains older values
1661       ** than pReadr2 iff (pReadr1<pReadr2).  */
1662       if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){
1663         pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr);
1664         pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
1665         bCached = 0;
1666       }else{
1667         if( pReadr1->pFd ) bCached = 0;
1668         pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr);
1669         pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
1670       }
1671     }
1672     *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0);
1673   }
1674 
1675   return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc);
1676 }
1677 
1678 #if SQLITE_MAX_WORKER_THREADS>0
1679 /*
1680 ** The main routine for background threads that write level-0 PMAs.
1681 */
1682 static void *vdbeSorterFlushThread(void *pCtx){
1683   SortSubtask *pTask = (SortSubtask*)pCtx;
1684   int rc;                         /* Return code */
1685   assert( pTask->bDone==0 );
1686   rc = vdbeSorterListToPMA(pTask, &pTask->list);
1687   pTask->bDone = 1;
1688   return SQLITE_INT_TO_PTR(rc);
1689 }
1690 #endif /* SQLITE_MAX_WORKER_THREADS>0 */
1691 
1692 /*
1693 ** Flush the current contents of VdbeSorter.list to a new PMA, possibly
1694 ** using a background thread.
1695 */
1696 static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
1697 #if SQLITE_MAX_WORKER_THREADS==0
1698   pSorter->bUsePMA = 1;
1699   return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
1700 #else
1701   int rc = SQLITE_OK;
1702   int i;
1703   SortSubtask *pTask = 0;    /* Thread context used to create new PMA */
1704   int nWorker = (pSorter->nTask-1);
1705 
1706   /* Set the flag to indicate that at least one PMA has been written.
1707   ** Or will be, anyhow.  */
1708   pSorter->bUsePMA = 1;
1709 
1710   /* Select a sub-task to sort and flush the current list of in-memory
1711   ** records to disk. If the sorter is running in multi-threaded mode,
1712   ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
1713   ** the background thread from a sub-tasks previous turn is still running,
1714   ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
1715   ** fall back to using the final sub-task. The first (pSorter->nTask-1)
1716   ** sub-tasks are prefered as they use background threads - the final
1717   ** sub-task uses the main thread. */
1718   for(i=0; i<nWorker; i++){
1719     int iTest = (pSorter->iPrev + i + 1) % nWorker;
1720     pTask = &pSorter->aTask[iTest];
1721     if( pTask->bDone ){
1722       rc = vdbeSorterJoinThread(pTask);
1723     }
1724     if( rc!=SQLITE_OK || pTask->pThread==0 ) break;
1725   }
1726 
1727   if( rc==SQLITE_OK ){
1728     if( i==nWorker ){
1729       /* Use the foreground thread for this operation */
1730       rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
1731     }else{
1732       /* Launch a background thread for this operation */
1733       u8 *aMem;
1734       void *pCtx;
1735 
1736       assert( pTask!=0 );
1737       assert( pTask->pThread==0 && pTask->bDone==0 );
1738       assert( pTask->list.pList==0 );
1739       assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
1740 
1741       aMem = pTask->list.aMemory;
1742       pCtx = (void*)pTask;
1743       pSorter->iPrev = (u8)(pTask - pSorter->aTask);
1744       pTask->list = pSorter->list;
1745       pSorter->list.pList = 0;
1746       pSorter->list.szPMA = 0;
1747       if( aMem ){
1748         pSorter->list.aMemory = aMem;
1749         pSorter->nMemory = sqlite3MallocSize(aMem);
1750       }else if( pSorter->list.aMemory ){
1751         pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
1752         if( !pSorter->list.aMemory ) return SQLITE_NOMEM_BKPT;
1753       }
1754 
1755       rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
1756     }
1757   }
1758 
1759   return rc;
1760 #endif /* SQLITE_MAX_WORKER_THREADS!=0 */
1761 }
1762 
1763 /*
1764 ** Add a record to the sorter.
1765 */
1766 int sqlite3VdbeSorterWrite(
1767   const VdbeCursor *pCsr,         /* Sorter cursor */
1768   Mem *pVal                       /* Memory cell containing record */
1769 ){
1770   VdbeSorter *pSorter;
1771   int rc = SQLITE_OK;             /* Return Code */
1772   SorterRecord *pNew;             /* New list element */
1773   int bFlush;                     /* True to flush contents of memory to PMA */
1774   int nReq;                       /* Bytes of memory required */
1775   int nPMA;                       /* Bytes of PMA space required */
1776   int t;                          /* serial type of first record field */
1777 
1778   assert( pCsr->eCurType==CURTYPE_SORTER );
1779   pSorter = pCsr->uc.pSorter;
1780   getVarint32NR((const u8*)&pVal->z[1], t);
1781   if( t>0 && t<10 && t!=7 ){
1782     pSorter->typeMask &= SORTER_TYPE_INTEGER;
1783   }else if( t>10 && (t & 0x01) ){
1784     pSorter->typeMask &= SORTER_TYPE_TEXT;
1785   }else{
1786     pSorter->typeMask = 0;
1787   }
1788 
1789   assert( pSorter );
1790 
1791   /* Figure out whether or not the current contents of memory should be
1792   ** flushed to a PMA before continuing. If so, do so.
1793   **
1794   ** If using the single large allocation mode (pSorter->aMemory!=0), then
1795   ** flush the contents of memory to a new PMA if (a) at least one value is
1796   ** already in memory and (b) the new value will not fit in memory.
1797   **
1798   ** Or, if using separate allocations for each record, flush the contents
1799   ** of memory to a PMA if either of the following are true:
1800   **
1801   **   * The total memory allocated for the in-memory list is greater
1802   **     than (page-size * cache-size), or
1803   **
1804   **   * The total memory allocated for the in-memory list is greater
1805   **     than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
1806   */
1807   nReq = pVal->n + sizeof(SorterRecord);
1808   nPMA = pVal->n + sqlite3VarintLen(pVal->n);
1809   if( pSorter->mxPmaSize ){
1810     if( pSorter->list.aMemory ){
1811       bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
1812     }else{
1813       bFlush = (
1814           (pSorter->list.szPMA > pSorter->mxPmaSize)
1815        || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
1816       );
1817     }
1818     if( bFlush ){
1819       rc = vdbeSorterFlushPMA(pSorter);
1820       pSorter->list.szPMA = 0;
1821       pSorter->iMemory = 0;
1822       assert( rc!=SQLITE_OK || pSorter->list.pList==0 );
1823     }
1824   }
1825 
1826   pSorter->list.szPMA += nPMA;
1827   if( nPMA>pSorter->mxKeysize ){
1828     pSorter->mxKeysize = nPMA;
1829   }
1830 
1831   if( pSorter->list.aMemory ){
1832     int nMin = pSorter->iMemory + nReq;
1833 
1834     if( nMin>pSorter->nMemory ){
1835       u8 *aNew;
1836       sqlite3_int64 nNew = 2 * (sqlite3_int64)pSorter->nMemory;
1837       int iListOff = -1;
1838       if( pSorter->list.pList ){
1839         iListOff = (u8*)pSorter->list.pList - pSorter->list.aMemory;
1840       }
1841       while( nNew < nMin ) nNew = nNew*2;
1842       if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
1843       if( nNew < nMin ) nNew = nMin;
1844       aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
1845       if( !aNew ) return SQLITE_NOMEM_BKPT;
1846       if( iListOff>=0 ){
1847         pSorter->list.pList = (SorterRecord*)&aNew[iListOff];
1848       }
1849       pSorter->list.aMemory = aNew;
1850       pSorter->nMemory = nNew;
1851     }
1852 
1853     pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
1854     pSorter->iMemory += ROUND8(nReq);
1855     if( pSorter->list.pList ){
1856       pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory);
1857     }
1858   }else{
1859     pNew = (SorterRecord *)sqlite3Malloc(nReq);
1860     if( pNew==0 ){
1861       return SQLITE_NOMEM_BKPT;
1862     }
1863     pNew->u.pNext = pSorter->list.pList;
1864   }
1865 
1866   memcpy(SRVAL(pNew), pVal->z, pVal->n);
1867   pNew->nVal = pVal->n;
1868   pSorter->list.pList = pNew;
1869 
1870   return rc;
1871 }
1872 
1873 /*
1874 ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
1875 ** of the data stored in aFile[1] is the same as that used by regular PMAs,
1876 ** except that the number-of-bytes varint is omitted from the start.
1877 */
1878 static int vdbeIncrPopulate(IncrMerger *pIncr){
1879   int rc = SQLITE_OK;
1880   int rc2;
1881   i64 iStart = pIncr->iStartOff;
1882   SorterFile *pOut = &pIncr->aFile[1];
1883   SortSubtask *pTask = pIncr->pTask;
1884   MergeEngine *pMerger = pIncr->pMerger;
1885   PmaWriter writer;
1886   assert( pIncr->bEof==0 );
1887 
1888   vdbeSorterPopulateDebug(pTask, "enter");
1889 
1890   vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart);
1891   while( rc==SQLITE_OK ){
1892     int dummy;
1893     PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ];
1894     int nKey = pReader->nKey;
1895     i64 iEof = writer.iWriteOff + writer.iBufEnd;
1896 
1897     /* Check if the output file is full or if the input has been exhausted.
1898     ** In either case exit the loop. */
1899     if( pReader->pFd==0 ) break;
1900     if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
1901 
1902     /* Write the next key to the output. */
1903     vdbePmaWriteVarint(&writer, nKey);
1904     vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
1905     assert( pIncr->pMerger->pTask==pTask );
1906     rc = vdbeMergeEngineStep(pIncr->pMerger, &dummy);
1907   }
1908 
1909   rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
1910   if( rc==SQLITE_OK ) rc = rc2;
1911   vdbeSorterPopulateDebug(pTask, "exit");
1912   return rc;
1913 }
1914 
1915 #if SQLITE_MAX_WORKER_THREADS>0
1916 /*
1917 ** The main routine for background threads that populate aFile[1] of
1918 ** multi-threaded IncrMerger objects.
1919 */
1920 static void *vdbeIncrPopulateThread(void *pCtx){
1921   IncrMerger *pIncr = (IncrMerger*)pCtx;
1922   void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
1923   pIncr->pTask->bDone = 1;
1924   return pRet;
1925 }
1926 
1927 /*
1928 ** Launch a background thread to populate aFile[1] of pIncr.
1929 */
1930 static int vdbeIncrBgPopulate(IncrMerger *pIncr){
1931   void *p = (void*)pIncr;
1932   assert( pIncr->bUseThread );
1933   return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p);
1934 }
1935 #endif
1936 
1937 /*
1938 ** This function is called when the PmaReader corresponding to pIncr has
1939 ** finished reading the contents of aFile[0]. Its purpose is to "refill"
1940 ** aFile[0] such that the PmaReader should start rereading it from the
1941 ** beginning.
1942 **
1943 ** For single-threaded objects, this is accomplished by literally reading
1944 ** keys from pIncr->pMerger and repopulating aFile[0].
1945 **
1946 ** For multi-threaded objects, all that is required is to wait until the
1947 ** background thread is finished (if it is not already) and then swap
1948 ** aFile[0] and aFile[1] in place. If the contents of pMerger have not
1949 ** been exhausted, this function also launches a new background thread
1950 ** to populate the new aFile[1].
1951 **
1952 ** SQLITE_OK is returned on success, or an SQLite error code otherwise.
1953 */
1954 static int vdbeIncrSwap(IncrMerger *pIncr){
1955   int rc = SQLITE_OK;
1956 
1957 #if SQLITE_MAX_WORKER_THREADS>0
1958   if( pIncr->bUseThread ){
1959     rc = vdbeSorterJoinThread(pIncr->pTask);
1960 
1961     if( rc==SQLITE_OK ){
1962       SorterFile f0 = pIncr->aFile[0];
1963       pIncr->aFile[0] = pIncr->aFile[1];
1964       pIncr->aFile[1] = f0;
1965     }
1966 
1967     if( rc==SQLITE_OK ){
1968       if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
1969         pIncr->bEof = 1;
1970       }else{
1971         rc = vdbeIncrBgPopulate(pIncr);
1972       }
1973     }
1974   }else
1975 #endif
1976   {
1977     rc = vdbeIncrPopulate(pIncr);
1978     pIncr->aFile[0] = pIncr->aFile[1];
1979     if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
1980       pIncr->bEof = 1;
1981     }
1982   }
1983 
1984   return rc;
1985 }
1986 
1987 /*
1988 ** Allocate and return a new IncrMerger object to read data from pMerger.
1989 **
1990 ** If an OOM condition is encountered, return NULL. In this case free the
1991 ** pMerger argument before returning.
1992 */
1993 static int vdbeIncrMergerNew(
1994   SortSubtask *pTask,     /* The thread that will be using the new IncrMerger */
1995   MergeEngine *pMerger,   /* The MergeEngine that the IncrMerger will control */
1996   IncrMerger **ppOut      /* Write the new IncrMerger here */
1997 ){
1998   int rc = SQLITE_OK;
1999   IncrMerger *pIncr = *ppOut = (IncrMerger*)
2000        (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr)));
2001   if( pIncr ){
2002     pIncr->pMerger = pMerger;
2003     pIncr->pTask = pTask;
2004     pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
2005     pTask->file2.iEof += pIncr->mxSz;
2006   }else{
2007     vdbeMergeEngineFree(pMerger);
2008     rc = SQLITE_NOMEM_BKPT;
2009   }
2010   return rc;
2011 }
2012 
2013 #if SQLITE_MAX_WORKER_THREADS>0
2014 /*
2015 ** Set the "use-threads" flag on object pIncr.
2016 */
2017 static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){
2018   pIncr->bUseThread = 1;
2019   pIncr->pTask->file2.iEof -= pIncr->mxSz;
2020 }
2021 #endif /* SQLITE_MAX_WORKER_THREADS>0 */
2022 
2023 
2024 
2025 /*
2026 ** Recompute pMerger->aTree[iOut] by comparing the next keys on the
2027 ** two PmaReaders that feed that entry.  Neither of the PmaReaders
2028 ** are advanced.  This routine merely does the comparison.
2029 */
2030 static void vdbeMergeEngineCompare(
2031   MergeEngine *pMerger,  /* Merge engine containing PmaReaders to compare */
2032   int iOut               /* Store the result in pMerger->aTree[iOut] */
2033 ){
2034   int i1;
2035   int i2;
2036   int iRes;
2037   PmaReader *p1;
2038   PmaReader *p2;
2039 
2040   assert( iOut<pMerger->nTree && iOut>0 );
2041 
2042   if( iOut>=(pMerger->nTree/2) ){
2043     i1 = (iOut - pMerger->nTree/2) * 2;
2044     i2 = i1 + 1;
2045   }else{
2046     i1 = pMerger->aTree[iOut*2];
2047     i2 = pMerger->aTree[iOut*2+1];
2048   }
2049 
2050   p1 = &pMerger->aReadr[i1];
2051   p2 = &pMerger->aReadr[i2];
2052 
2053   if( p1->pFd==0 ){
2054     iRes = i2;
2055   }else if( p2->pFd==0 ){
2056     iRes = i1;
2057   }else{
2058     SortSubtask *pTask = pMerger->pTask;
2059     int bCached = 0;
2060     int res;
2061     assert( pTask->pUnpacked!=0 );  /* from vdbeSortSubtaskMain() */
2062     res = pTask->xCompare(
2063         pTask, &bCached, p1->aKey, p1->nKey, p2->aKey, p2->nKey
2064     );
2065     if( res<=0 ){
2066       iRes = i1;
2067     }else{
2068       iRes = i2;
2069     }
2070   }
2071 
2072   pMerger->aTree[iOut] = iRes;
2073 }
2074 
2075 /*
2076 ** Allowed values for the eMode parameter to vdbeMergeEngineInit()
2077 ** and vdbePmaReaderIncrMergeInit().
2078 **
2079 ** Only INCRINIT_NORMAL is valid in single-threaded builds (when
2080 ** SQLITE_MAX_WORKER_THREADS==0).  The other values are only used
2081 ** when there exists one or more separate worker threads.
2082 */
2083 #define INCRINIT_NORMAL 0
2084 #define INCRINIT_TASK   1
2085 #define INCRINIT_ROOT   2
2086 
2087 /*
2088 ** Forward reference required as the vdbeIncrMergeInit() and
2089 ** vdbePmaReaderIncrInit() routines are called mutually recursively when
2090 ** building a merge tree.
2091 */
2092 static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode);
2093 
2094 /*
2095 ** Initialize the MergeEngine object passed as the second argument. Once this
2096 ** function returns, the first key of merged data may be read from the
2097 ** MergeEngine object in the usual fashion.
2098 **
2099 ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
2100 ** objects attached to the PmaReader objects that the merger reads from have
2101 ** already been populated, but that they have not yet populated aFile[0] and
2102 ** set the PmaReader objects up to read from it. In this case all that is
2103 ** required is to call vdbePmaReaderNext() on each PmaReader to point it at
2104 ** its first key.
2105 **
2106 ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use
2107 ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data
2108 ** to pMerger.
2109 **
2110 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2111 */
2112 static int vdbeMergeEngineInit(
2113   SortSubtask *pTask,             /* Thread that will run pMerger */
2114   MergeEngine *pMerger,           /* MergeEngine to initialize */
2115   int eMode                       /* One of the INCRINIT_XXX constants */
2116 ){
2117   int rc = SQLITE_OK;             /* Return code */
2118   int i;                          /* For looping over PmaReader objects */
2119   int nTree;                      /* Number of subtrees to merge */
2120 
2121   /* Failure to allocate the merge would have been detected prior to
2122   ** invoking this routine */
2123   assert( pMerger!=0 );
2124 
2125   /* eMode is always INCRINIT_NORMAL in single-threaded mode */
2126   assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
2127 
2128   /* Verify that the MergeEngine is assigned to a single thread */
2129   assert( pMerger->pTask==0 );
2130   pMerger->pTask = pTask;
2131 
2132   nTree = pMerger->nTree;
2133   for(i=0; i<nTree; i++){
2134     if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){
2135       /* PmaReaders should be normally initialized in order, as if they are
2136       ** reading from the same temp file this makes for more linear file IO.
2137       ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is
2138       ** in use it will block the vdbePmaReaderNext() call while it uses
2139       ** the main thread to fill its buffer. So calling PmaReaderNext()
2140       ** on this PmaReader before any of the multi-threaded PmaReaders takes
2141       ** better advantage of multi-processor hardware. */
2142       rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]);
2143     }else{
2144       rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
2145     }
2146     if( rc!=SQLITE_OK ) return rc;
2147   }
2148 
2149   for(i=pMerger->nTree-1; i>0; i--){
2150     vdbeMergeEngineCompare(pMerger, i);
2151   }
2152   return pTask->pUnpacked->errCode;
2153 }
2154 
2155 /*
2156 ** The PmaReader passed as the first argument is guaranteed to be an
2157 ** incremental-reader (pReadr->pIncr!=0). This function serves to open
2158 ** and/or initialize the temp file related fields of the IncrMerge
2159 ** object at (pReadr->pIncr).
2160 **
2161 ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
2162 ** in the sub-tree headed by pReadr are also initialized. Data is then
2163 ** loaded into the buffers belonging to pReadr and it is set to point to
2164 ** the first key in its range.
2165 **
2166 ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
2167 ** to be a multi-threaded PmaReader and this function is being called in a
2168 ** background thread. In this case all PmaReaders in the sub-tree are
2169 ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
2170 ** pReadr is populated. However, pReadr itself is not set up to point
2171 ** to its first key. A call to vdbePmaReaderNext() is still required to do
2172 ** that.
2173 **
2174 ** The reason this function does not call vdbePmaReaderNext() immediately
2175 ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has
2176 ** to block on thread (pTask->thread) before accessing aFile[1]. But, since
2177 ** this entire function is being run by thread (pTask->thread), that will
2178 ** lead to the current background thread attempting to join itself.
2179 **
2180 ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed
2181 ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all
2182 ** child-trees have already been initialized using IncrInit(INCRINIT_TASK).
2183 ** In this case vdbePmaReaderNext() is called on all child PmaReaders and
2184 ** the current PmaReader set to point to the first key in its range.
2185 **
2186 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2187 */
2188 static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){
2189   int rc = SQLITE_OK;
2190   IncrMerger *pIncr = pReadr->pIncr;
2191   SortSubtask *pTask = pIncr->pTask;
2192   sqlite3 *db = pTask->pSorter->db;
2193 
2194   /* eMode is always INCRINIT_NORMAL in single-threaded mode */
2195   assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
2196 
2197   rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
2198 
2199   /* Set up the required files for pIncr. A multi-theaded IncrMerge object
2200   ** requires two temp files to itself, whereas a single-threaded object
2201   ** only requires a region of pTask->file2. */
2202   if( rc==SQLITE_OK ){
2203     int mxSz = pIncr->mxSz;
2204 #if SQLITE_MAX_WORKER_THREADS>0
2205     if( pIncr->bUseThread ){
2206       rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
2207       if( rc==SQLITE_OK ){
2208         rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
2209       }
2210     }else
2211 #endif
2212     /*if( !pIncr->bUseThread )*/{
2213       if( pTask->file2.pFd==0 ){
2214         assert( pTask->file2.iEof>0 );
2215         rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
2216         pTask->file2.iEof = 0;
2217       }
2218       if( rc==SQLITE_OK ){
2219         pIncr->aFile[1].pFd = pTask->file2.pFd;
2220         pIncr->iStartOff = pTask->file2.iEof;
2221         pTask->file2.iEof += mxSz;
2222       }
2223     }
2224   }
2225 
2226 #if SQLITE_MAX_WORKER_THREADS>0
2227   if( rc==SQLITE_OK && pIncr->bUseThread ){
2228     /* Use the current thread to populate aFile[1], even though this
2229     ** PmaReader is multi-threaded. If this is an INCRINIT_TASK object,
2230     ** then this function is already running in background thread
2231     ** pIncr->pTask->thread.
2232     **
2233     ** If this is the INCRINIT_ROOT object, then it is running in the
2234     ** main VDBE thread. But that is Ok, as that thread cannot return
2235     ** control to the VDBE or proceed with anything useful until the
2236     ** first results are ready from this merger object anyway.
2237     */
2238     assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
2239     rc = vdbeIncrPopulate(pIncr);
2240   }
2241 #endif
2242 
2243   if( rc==SQLITE_OK && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) ){
2244     rc = vdbePmaReaderNext(pReadr);
2245   }
2246 
2247   return rc;
2248 }
2249 
2250 #if SQLITE_MAX_WORKER_THREADS>0
2251 /*
2252 ** The main routine for vdbePmaReaderIncrMergeInit() operations run in
2253 ** background threads.
2254 */
2255 static void *vdbePmaReaderBgIncrInit(void *pCtx){
2256   PmaReader *pReader = (PmaReader*)pCtx;
2257   void *pRet = SQLITE_INT_TO_PTR(
2258                   vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK)
2259                );
2260   pReader->pIncr->pTask->bDone = 1;
2261   return pRet;
2262 }
2263 #endif
2264 
2265 /*
2266 ** If the PmaReader passed as the first argument is not an incremental-reader
2267 ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it invokes
2268 ** the vdbePmaReaderIncrMergeInit() function with the parameters passed to
2269 ** this routine to initialize the incremental merge.
2270 **
2271 ** If the IncrMerger object is multi-threaded (IncrMerger.bUseThread==1),
2272 ** then a background thread is launched to call vdbePmaReaderIncrMergeInit().
2273 ** Or, if the IncrMerger is single threaded, the same function is called
2274 ** using the current thread.
2275 */
2276 static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){
2277   IncrMerger *pIncr = pReadr->pIncr;   /* Incremental merger */
2278   int rc = SQLITE_OK;                  /* Return code */
2279   if( pIncr ){
2280 #if SQLITE_MAX_WORKER_THREADS>0
2281     assert( pIncr->bUseThread==0 || eMode==INCRINIT_TASK );
2282     if( pIncr->bUseThread ){
2283       void *pCtx = (void*)pReadr;
2284       rc = vdbeSorterCreateThread(pIncr->pTask, vdbePmaReaderBgIncrInit, pCtx);
2285     }else
2286 #endif
2287     {
2288       rc = vdbePmaReaderIncrMergeInit(pReadr, eMode);
2289     }
2290   }
2291   return rc;
2292 }
2293 
2294 /*
2295 ** Allocate a new MergeEngine object to merge the contents of nPMA level-0
2296 ** PMAs from pTask->file. If no error occurs, set *ppOut to point to
2297 ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
2298 ** to NULL and return an SQLite error code.
2299 **
2300 ** When this function is called, *piOffset is set to the offset of the
2301 ** first PMA to read from pTask->file. Assuming no error occurs, it is
2302 ** set to the offset immediately following the last byte of the last
2303 ** PMA before returning. If an error does occur, then the final value of
2304 ** *piOffset is undefined.
2305 */
2306 static int vdbeMergeEngineLevel0(
2307   SortSubtask *pTask,             /* Sorter task to read from */
2308   int nPMA,                       /* Number of PMAs to read */
2309   i64 *piOffset,                  /* IN/OUT: Readr offset in pTask->file */
2310   MergeEngine **ppOut             /* OUT: New merge-engine */
2311 ){
2312   MergeEngine *pNew;              /* Merge engine to return */
2313   i64 iOff = *piOffset;
2314   int i;
2315   int rc = SQLITE_OK;
2316 
2317   *ppOut = pNew = vdbeMergeEngineNew(nPMA);
2318   if( pNew==0 ) rc = SQLITE_NOMEM_BKPT;
2319 
2320   for(i=0; i<nPMA && rc==SQLITE_OK; i++){
2321     i64 nDummy = 0;
2322     PmaReader *pReadr = &pNew->aReadr[i];
2323     rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy);
2324     iOff = pReadr->iEof;
2325   }
2326 
2327   if( rc!=SQLITE_OK ){
2328     vdbeMergeEngineFree(pNew);
2329     *ppOut = 0;
2330   }
2331   *piOffset = iOff;
2332   return rc;
2333 }
2334 
2335 /*
2336 ** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of
2337 ** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes.
2338 **
2339 ** i.e.
2340 **
2341 **   nPMA<=16    -> TreeDepth() == 0
2342 **   nPMA<=256   -> TreeDepth() == 1
2343 **   nPMA<=65536 -> TreeDepth() == 2
2344 */
2345 static int vdbeSorterTreeDepth(int nPMA){
2346   int nDepth = 0;
2347   i64 nDiv = SORTER_MAX_MERGE_COUNT;
2348   while( nDiv < (i64)nPMA ){
2349     nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
2350     nDepth++;
2351   }
2352   return nDepth;
2353 }
2354 
2355 /*
2356 ** pRoot is the root of an incremental merge-tree with depth nDepth (according
2357 ** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the
2358 ** tree, counting from zero. This function adds pLeaf to the tree.
2359 **
2360 ** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error
2361 ** code is returned and pLeaf is freed.
2362 */
2363 static int vdbeSorterAddToTree(
2364   SortSubtask *pTask,             /* Task context */
2365   int nDepth,                     /* Depth of tree according to TreeDepth() */
2366   int iSeq,                       /* Sequence number of leaf within tree */
2367   MergeEngine *pRoot,             /* Root of tree */
2368   MergeEngine *pLeaf              /* Leaf to add to tree */
2369 ){
2370   int rc = SQLITE_OK;
2371   int nDiv = 1;
2372   int i;
2373   MergeEngine *p = pRoot;
2374   IncrMerger *pIncr;
2375 
2376   rc = vdbeIncrMergerNew(pTask, pLeaf, &pIncr);
2377 
2378   for(i=1; i<nDepth; i++){
2379     nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
2380   }
2381 
2382   for(i=1; i<nDepth && rc==SQLITE_OK; i++){
2383     int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT;
2384     PmaReader *pReadr = &p->aReadr[iIter];
2385 
2386     if( pReadr->pIncr==0 ){
2387       MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
2388       if( pNew==0 ){
2389         rc = SQLITE_NOMEM_BKPT;
2390       }else{
2391         rc = vdbeIncrMergerNew(pTask, pNew, &pReadr->pIncr);
2392       }
2393     }
2394     if( rc==SQLITE_OK ){
2395       p = pReadr->pIncr->pMerger;
2396       nDiv = nDiv / SORTER_MAX_MERGE_COUNT;
2397     }
2398   }
2399 
2400   if( rc==SQLITE_OK ){
2401     p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr;
2402   }else{
2403     vdbeIncrFree(pIncr);
2404   }
2405   return rc;
2406 }
2407 
2408 /*
2409 ** This function is called as part of a SorterRewind() operation on a sorter
2410 ** that has already written two or more level-0 PMAs to one or more temp
2411 ** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that
2412 ** can be used to incrementally merge all PMAs on disk.
2413 **
2414 ** If successful, SQLITE_OK is returned and *ppOut set to point to the
2415 ** MergeEngine object at the root of the tree before returning. Or, if an
2416 ** error occurs, an SQLite error code is returned and the final value
2417 ** of *ppOut is undefined.
2418 */
2419 static int vdbeSorterMergeTreeBuild(
2420   VdbeSorter *pSorter,       /* The VDBE cursor that implements the sort */
2421   MergeEngine **ppOut        /* Write the MergeEngine here */
2422 ){
2423   MergeEngine *pMain = 0;
2424   int rc = SQLITE_OK;
2425   int iTask;
2426 
2427 #if SQLITE_MAX_WORKER_THREADS>0
2428   /* If the sorter uses more than one task, then create the top-level
2429   ** MergeEngine here. This MergeEngine will read data from exactly
2430   ** one PmaReader per sub-task.  */
2431   assert( pSorter->bUseThreads || pSorter->nTask==1 );
2432   if( pSorter->nTask>1 ){
2433     pMain = vdbeMergeEngineNew(pSorter->nTask);
2434     if( pMain==0 ) rc = SQLITE_NOMEM_BKPT;
2435   }
2436 #endif
2437 
2438   for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
2439     SortSubtask *pTask = &pSorter->aTask[iTask];
2440     assert( pTask->nPMA>0 || SQLITE_MAX_WORKER_THREADS>0 );
2441     if( SQLITE_MAX_WORKER_THREADS==0 || pTask->nPMA ){
2442       MergeEngine *pRoot = 0;     /* Root node of tree for this task */
2443       int nDepth = vdbeSorterTreeDepth(pTask->nPMA);
2444       i64 iReadOff = 0;
2445 
2446       if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){
2447         rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot);
2448       }else{
2449         int i;
2450         int iSeq = 0;
2451         pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
2452         if( pRoot==0 ) rc = SQLITE_NOMEM_BKPT;
2453         for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){
2454           MergeEngine *pMerger = 0; /* New level-0 PMA merger */
2455           int nReader;              /* Number of level-0 PMAs to merge */
2456 
2457           nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT);
2458           rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
2459           if( rc==SQLITE_OK ){
2460             rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger);
2461           }
2462         }
2463       }
2464 
2465       if( rc==SQLITE_OK ){
2466 #if SQLITE_MAX_WORKER_THREADS>0
2467         if( pMain!=0 ){
2468           rc = vdbeIncrMergerNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr);
2469         }else
2470 #endif
2471         {
2472           assert( pMain==0 );
2473           pMain = pRoot;
2474         }
2475       }else{
2476         vdbeMergeEngineFree(pRoot);
2477       }
2478     }
2479   }
2480 
2481   if( rc!=SQLITE_OK ){
2482     vdbeMergeEngineFree(pMain);
2483     pMain = 0;
2484   }
2485   *ppOut = pMain;
2486   return rc;
2487 }
2488 
2489 /*
2490 ** This function is called as part of an sqlite3VdbeSorterRewind() operation
2491 ** on a sorter that has written two or more PMAs to temporary files. It sets
2492 ** up either VdbeSorter.pMerger (for single threaded sorters) or pReader
2493 ** (for multi-threaded sorters) so that it can be used to iterate through
2494 ** all records stored in the sorter.
2495 **
2496 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2497 */
2498 static int vdbeSorterSetupMerge(VdbeSorter *pSorter){
2499   int rc;                         /* Return code */
2500   SortSubtask *pTask0 = &pSorter->aTask[0];
2501   MergeEngine *pMain = 0;
2502 #if SQLITE_MAX_WORKER_THREADS
2503   sqlite3 *db = pTask0->pSorter->db;
2504   int i;
2505   SorterCompare xCompare = vdbeSorterGetCompare(pSorter);
2506   for(i=0; i<pSorter->nTask; i++){
2507     pSorter->aTask[i].xCompare = xCompare;
2508   }
2509 #endif
2510 
2511   rc = vdbeSorterMergeTreeBuild(pSorter, &pMain);
2512   if( rc==SQLITE_OK ){
2513 #if SQLITE_MAX_WORKER_THREADS
2514     assert( pSorter->bUseThreads==0 || pSorter->nTask>1 );
2515     if( pSorter->bUseThreads ){
2516       int iTask;
2517       PmaReader *pReadr = 0;
2518       SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
2519       rc = vdbeSortAllocUnpacked(pLast);
2520       if( rc==SQLITE_OK ){
2521         pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
2522         pSorter->pReader = pReadr;
2523         if( pReadr==0 ) rc = SQLITE_NOMEM_BKPT;
2524       }
2525       if( rc==SQLITE_OK ){
2526         rc = vdbeIncrMergerNew(pLast, pMain, &pReadr->pIncr);
2527         if( rc==SQLITE_OK ){
2528           vdbeIncrMergerSetThreads(pReadr->pIncr);
2529           for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
2530             IncrMerger *pIncr;
2531             if( (pIncr = pMain->aReadr[iTask].pIncr) ){
2532               vdbeIncrMergerSetThreads(pIncr);
2533               assert( pIncr->pTask!=pLast );
2534             }
2535           }
2536           for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
2537             /* Check that:
2538             **
2539             **   a) The incremental merge object is configured to use the
2540             **      right task, and
2541             **   b) If it is using task (nTask-1), it is configured to run
2542             **      in single-threaded mode. This is important, as the
2543             **      root merge (INCRINIT_ROOT) will be using the same task
2544             **      object.
2545             */
2546             PmaReader *p = &pMain->aReadr[iTask];
2547             assert( p->pIncr==0 || (
2548                 (p->pIncr->pTask==&pSorter->aTask[iTask])             /* a */
2549              && (iTask!=pSorter->nTask-1 || p->pIncr->bUseThread==0)  /* b */
2550             ));
2551             rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK);
2552           }
2553         }
2554         pMain = 0;
2555       }
2556       if( rc==SQLITE_OK ){
2557         rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT);
2558       }
2559     }else
2560 #endif
2561     {
2562       rc = vdbeMergeEngineInit(pTask0, pMain, INCRINIT_NORMAL);
2563       pSorter->pMerger = pMain;
2564       pMain = 0;
2565     }
2566   }
2567 
2568   if( rc!=SQLITE_OK ){
2569     vdbeMergeEngineFree(pMain);
2570   }
2571   return rc;
2572 }
2573 
2574 
2575 /*
2576 ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
2577 ** this function is called to prepare for iterating through the records
2578 ** in sorted order.
2579 */
2580 int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){
2581   VdbeSorter *pSorter;
2582   int rc = SQLITE_OK;             /* Return code */
2583 
2584   assert( pCsr->eCurType==CURTYPE_SORTER );
2585   pSorter = pCsr->uc.pSorter;
2586   assert( pSorter );
2587 
2588   /* If no data has been written to disk, then do not do so now. Instead,
2589   ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
2590   ** from the in-memory list.  */
2591   if( pSorter->bUsePMA==0 ){
2592     if( pSorter->list.pList ){
2593       *pbEof = 0;
2594       rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list);
2595     }else{
2596       *pbEof = 1;
2597     }
2598     return rc;
2599   }
2600 
2601   /* Write the current in-memory list to a PMA. When the VdbeSorterWrite()
2602   ** function flushes the contents of memory to disk, it immediately always
2603   ** creates a new list consisting of a single key immediately afterwards.
2604   ** So the list is never empty at this point.  */
2605   assert( pSorter->list.pList );
2606   rc = vdbeSorterFlushPMA(pSorter);
2607 
2608   /* Join all threads */
2609   rc = vdbeSorterJoinAll(pSorter, rc);
2610 
2611   vdbeSorterRewindDebug("rewind");
2612 
2613   /* Assuming no errors have occurred, set up a merger structure to
2614   ** incrementally read and merge all remaining PMAs.  */
2615   assert( pSorter->pReader==0 );
2616   if( rc==SQLITE_OK ){
2617     rc = vdbeSorterSetupMerge(pSorter);
2618     *pbEof = 0;
2619   }
2620 
2621   vdbeSorterRewindDebug("rewinddone");
2622   return rc;
2623 }
2624 
2625 /*
2626 ** Advance to the next element in the sorter.  Return value:
2627 **
2628 **    SQLITE_OK     success
2629 **    SQLITE_DONE   end of data
2630 **    otherwise     some kind of error.
2631 */
2632 int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr){
2633   VdbeSorter *pSorter;
2634   int rc;                         /* Return code */
2635 
2636   assert( pCsr->eCurType==CURTYPE_SORTER );
2637   pSorter = pCsr->uc.pSorter;
2638   assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) );
2639   if( pSorter->bUsePMA ){
2640     assert( pSorter->pReader==0 || pSorter->pMerger==0 );
2641     assert( pSorter->bUseThreads==0 || pSorter->pReader );
2642     assert( pSorter->bUseThreads==1 || pSorter->pMerger );
2643 #if SQLITE_MAX_WORKER_THREADS>0
2644     if( pSorter->bUseThreads ){
2645       rc = vdbePmaReaderNext(pSorter->pReader);
2646       if( rc==SQLITE_OK && pSorter->pReader->pFd==0 ) rc = SQLITE_DONE;
2647     }else
2648 #endif
2649     /*if( !pSorter->bUseThreads )*/ {
2650       int res = 0;
2651       assert( pSorter->pMerger!=0 );
2652       assert( pSorter->pMerger->pTask==(&pSorter->aTask[0]) );
2653       rc = vdbeMergeEngineStep(pSorter->pMerger, &res);
2654       if( rc==SQLITE_OK && res ) rc = SQLITE_DONE;
2655     }
2656   }else{
2657     SorterRecord *pFree = pSorter->list.pList;
2658     pSorter->list.pList = pFree->u.pNext;
2659     pFree->u.pNext = 0;
2660     if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
2661     rc = pSorter->list.pList ? SQLITE_OK : SQLITE_DONE;
2662   }
2663   return rc;
2664 }
2665 
2666 /*
2667 ** Return a pointer to a buffer owned by the sorter that contains the
2668 ** current key.
2669 */
2670 static void *vdbeSorterRowkey(
2671   const VdbeSorter *pSorter,      /* Sorter object */
2672   int *pnKey                      /* OUT: Size of current key in bytes */
2673 ){
2674   void *pKey;
2675   if( pSorter->bUsePMA ){
2676     PmaReader *pReader;
2677 #if SQLITE_MAX_WORKER_THREADS>0
2678     if( pSorter->bUseThreads ){
2679       pReader = pSorter->pReader;
2680     }else
2681 #endif
2682     /*if( !pSorter->bUseThreads )*/{
2683       pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]];
2684     }
2685     *pnKey = pReader->nKey;
2686     pKey = pReader->aKey;
2687   }else{
2688     *pnKey = pSorter->list.pList->nVal;
2689     pKey = SRVAL(pSorter->list.pList);
2690   }
2691   return pKey;
2692 }
2693 
2694 /*
2695 ** Copy the current sorter key into the memory cell pOut.
2696 */
2697 int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){
2698   VdbeSorter *pSorter;
2699   void *pKey; int nKey;           /* Sorter key to copy into pOut */
2700 
2701   assert( pCsr->eCurType==CURTYPE_SORTER );
2702   pSorter = pCsr->uc.pSorter;
2703   pKey = vdbeSorterRowkey(pSorter, &nKey);
2704   if( sqlite3VdbeMemClearAndResize(pOut, nKey) ){
2705     return SQLITE_NOMEM_BKPT;
2706   }
2707   pOut->n = nKey;
2708   MemSetTypeFlag(pOut, MEM_Blob);
2709   memcpy(pOut->z, pKey, nKey);
2710 
2711   return SQLITE_OK;
2712 }
2713 
2714 /*
2715 ** Compare the key in memory cell pVal with the key that the sorter cursor
2716 ** passed as the first argument currently points to. For the purposes of
2717 ** the comparison, ignore the rowid field at the end of each record.
2718 **
2719 ** If the sorter cursor key contains any NULL values, consider it to be
2720 ** less than pVal. Even if pVal also contains NULL values.
2721 **
2722 ** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM).
2723 ** Otherwise, set *pRes to a negative, zero or positive value if the
2724 ** key in pVal is smaller than, equal to or larger than the current sorter
2725 ** key.
2726 **
2727 ** This routine forms the core of the OP_SorterCompare opcode, which in
2728 ** turn is used to verify uniqueness when constructing a UNIQUE INDEX.
2729 */
2730 int sqlite3VdbeSorterCompare(
2731   const VdbeCursor *pCsr,         /* Sorter cursor */
2732   Mem *pVal,                      /* Value to compare to current sorter key */
2733   int nKeyCol,                    /* Compare this many columns */
2734   int *pRes                       /* OUT: Result of comparison */
2735 ){
2736   VdbeSorter *pSorter;
2737   UnpackedRecord *r2;
2738   KeyInfo *pKeyInfo;
2739   int i;
2740   void *pKey; int nKey;           /* Sorter key to compare pVal with */
2741 
2742   assert( pCsr->eCurType==CURTYPE_SORTER );
2743   pSorter = pCsr->uc.pSorter;
2744   r2 = pSorter->pUnpacked;
2745   pKeyInfo = pCsr->pKeyInfo;
2746   if( r2==0 ){
2747     r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo);
2748     if( r2==0 ) return SQLITE_NOMEM_BKPT;
2749     r2->nField = nKeyCol;
2750   }
2751   assert( r2->nField==nKeyCol );
2752 
2753   pKey = vdbeSorterRowkey(pSorter, &nKey);
2754   sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
2755   for(i=0; i<nKeyCol; i++){
2756     if( r2->aMem[i].flags & MEM_Null ){
2757       *pRes = -1;
2758       return SQLITE_OK;
2759     }
2760   }
2761 
2762   *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2);
2763   return SQLITE_OK;
2764 }
2765