xref: /sqlite-3.40.0/src/vdbesort.c (revision b248668b)
1a20fde64Sdan /*
2ac4f0039Sdrh ** 2011-07-09
3a20fde64Sdan **
4a20fde64Sdan ** The author disclaims copyright to this source code.  In place of
5a20fde64Sdan ** a legal notice, here is a blessing:
6a20fde64Sdan **
7a20fde64Sdan **    May you do good and not evil.
8a20fde64Sdan **    May you find forgiveness for yourself and forgive others.
9a20fde64Sdan **    May you share freely, never taking more than you give.
10a20fde64Sdan **
11a20fde64Sdan *************************************************************************
12a20fde64Sdan ** This file contains code for the VdbeSorter object, used in concert with
131a088a8eSdan ** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
14ac4f0039Sdrh ** or by SELECT statements with ORDER BY clauses that cannot be satisfied
15ac4f0039Sdrh ** using indexes and without LIMIT clauses.
16ac4f0039Sdrh **
17ac4f0039Sdrh ** The VdbeSorter object implements a multi-threaded external merge sort
183de4df27Sdrh ** algorithm that is efficient even if the number of elements being sorted
19ac4f0039Sdrh ** exceeds the available memory.
20ac4f0039Sdrh **
21ac4f0039Sdrh ** Here is the (internal, non-API) interface between this module and the
22ac4f0039Sdrh ** rest of the SQLite system:
23ac4f0039Sdrh **
24ac4f0039Sdrh **    sqlite3VdbeSorterInit()       Create a new VdbeSorter object.
25ac4f0039Sdrh **
26ac4f0039Sdrh **    sqlite3VdbeSorterWrite()      Add a single new row to the VdbeSorter
27ac4f0039Sdrh **                                  object.  The row is a binary blob in the
28ac4f0039Sdrh **                                  OP_MakeRecord format that contains both
29ac4f0039Sdrh **                                  the ORDER BY key columns and result columns
30ac4f0039Sdrh **                                  in the case of a SELECT w/ ORDER BY, or
31ac4f0039Sdrh **                                  the complete record for an index entry
32ac4f0039Sdrh **                                  in the case of a CREATE INDEX.
33ac4f0039Sdrh **
34ac4f0039Sdrh **    sqlite3VdbeSorterRewind()     Sort all content previously added.
35ac4f0039Sdrh **                                  Position the read cursor on the
36ac4f0039Sdrh **                                  first sorted element.
37ac4f0039Sdrh **
38ac4f0039Sdrh **    sqlite3VdbeSorterNext()       Advance the read cursor to the next sorted
39ac4f0039Sdrh **                                  element.
40ac4f0039Sdrh **
41ac4f0039Sdrh **    sqlite3VdbeSorterRowkey()     Return the complete binary blob for the
42ac4f0039Sdrh **                                  row currently under the read cursor.
43ac4f0039Sdrh **
44ac4f0039Sdrh **    sqlite3VdbeSorterCompare()    Compare the binary blob for the row
45ac4f0039Sdrh **                                  currently under the read cursor against
46ac4f0039Sdrh **                                  another binary blob X and report if
47ac4f0039Sdrh **                                  X is strictly less than the read cursor.
48ac4f0039Sdrh **                                  Used to enforce uniqueness in a
49ac4f0039Sdrh **                                  CREATE UNIQUE INDEX statement.
50ac4f0039Sdrh **
51ac4f0039Sdrh **    sqlite3VdbeSorterClose()      Close the VdbeSorter object and reclaim
52ac4f0039Sdrh **                                  all resources.
53ac4f0039Sdrh **
54ac4f0039Sdrh **    sqlite3VdbeSorterReset()      Refurbish the VdbeSorter for reuse.  This
55ac4f0039Sdrh **                                  is like Close() followed by Init() only
56ac4f0039Sdrh **                                  much faster.
57ac4f0039Sdrh **
58ac4f0039Sdrh ** The interfaces above must be called in a particular order.  Write() can
59ac4f0039Sdrh ** only occur in between Init()/Reset() and Rewind().  Next(), Rowkey(), and
601a088a8eSdan ** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
611a088a8eSdan **
621a088a8eSdan **   Init()
631a088a8eSdan **   for each record: Write()
641a088a8eSdan **   Rewind()
651a088a8eSdan **     Rowkey()/Compare()
661a088a8eSdan **   Next()
671a088a8eSdan **   Close()
68ac4f0039Sdrh **
69ac4f0039Sdrh ** Algorithm:
70ac4f0039Sdrh **
711a088a8eSdan ** Records passed to the sorter via calls to Write() are initially held
721a088a8eSdan ** unsorted in main memory. Assuming the amount of memory used never exceeds
731a088a8eSdan ** a threshold, when Rewind() is called the set of records is sorted using
741a088a8eSdan ** an in-memory merge sort. In this case, no temporary files are required
751a088a8eSdan ** and subsequent calls to Rowkey(), Next() and Compare() read records
761a088a8eSdan ** directly from main memory.
77ac4f0039Sdrh **
781a088a8eSdan ** If the amount of space used to store records in main memory exceeds the
791a088a8eSdan ** threshold, then the set of records currently in memory are sorted and
801a088a8eSdan ** written to a temporary file in "Packed Memory Array" (PMA) format.
811a088a8eSdan ** A PMA created at this point is known as a "level-0 PMA". Higher levels
821a088a8eSdan ** of PMAs may be created by merging existing PMAs together - for example
831a088a8eSdan ** merging two or more level-0 PMAs together creates a level-1 PMA.
84ac4f0039Sdrh **
851a088a8eSdan ** The threshold for the amount of main memory to use before flushing
861a088a8eSdan ** records to a PMA is roughly the same as the limit configured for the
871a088a8eSdan ** page-cache of the main database. Specifically, the threshold is set to
88ac65196eSdrh ** the value returned by "PRAGMA main.page_size" multipled by
891a088a8eSdan ** that returned by "PRAGMA main.cache_size", in bytes.
90ac4f0039Sdrh **
911a088a8eSdan ** If the sorter is running in single-threaded mode, then all PMAs generated
921a088a8eSdan ** are appended to a single temporary file. Or, if the sorter is running in
931a088a8eSdan ** multi-threaded mode then up to (N+1) temporary files may be opened, where
941a088a8eSdan ** N is the configured number of worker threads. In this case, instead of
951a088a8eSdan ** sorting the records and writing the PMA to a temporary file itself, the
961a088a8eSdan ** calling thread usually launches a worker thread to do so. Except, if
971a088a8eSdan ** there are already N worker threads running, the main thread does the work
981a088a8eSdan ** itself.
991a088a8eSdan **
1001a088a8eSdan ** The sorter is running in multi-threaded mode if (a) the library was built
1011a088a8eSdan ** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
1021a088a8eSdan ** than zero, and (b) worker threads have been enabled at runtime by calling
1034d9f188fSdrh ** "PRAGMA threads=N" with some value of N greater than 0.
1041a088a8eSdan **
1051a088a8eSdan ** When Rewind() is called, any data remaining in memory is flushed to a
1061a088a8eSdan ** final PMA. So at this point the data is stored in some number of sorted
1073de4df27Sdrh ** PMAs within temporary files on disk.
1081a088a8eSdan **
1091a088a8eSdan ** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
1101a088a8eSdan ** sorter is running in single-threaded mode, then these PMAs are merged
1115f4a4790Sdrh ** incrementally as keys are retreived from the sorter by the VDBE.  The
1125f4a4790Sdrh ** MergeEngine object, described in further detail below, performs this
1135f4a4790Sdrh ** merge.
1141a088a8eSdan **
1151a088a8eSdan ** Or, if running in multi-threaded mode, then a background thread is
1161a088a8eSdan ** launched to merge the existing PMAs. Once the background thread has
1171a088a8eSdan ** merged T bytes of data into a single sorted PMA, the main thread
1181a088a8eSdan ** begins reading keys from that PMA while the background thread proceeds
1191a088a8eSdan ** with merging the next T bytes of data. And so on.
1201a088a8eSdan **
1211a088a8eSdan ** Parameter T is set to half the value of the memory threshold used
1221a088a8eSdan ** by Write() above to determine when to create a new PMA.
1231a088a8eSdan **
1241a088a8eSdan ** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when
1251a088a8eSdan ** Rewind() is called, then a hierarchy of incremental-merges is used.
1261a088a8eSdan ** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on
1271a088a8eSdan ** disk are merged together. Then T bytes of data from the second set, and
1281a088a8eSdan ** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
1291a088a8eSdan ** PMAs at a time. This done is to improve locality.
1301a088a8eSdan **
1311a088a8eSdan ** If running in multi-threaded mode and there are more than
1321a088a8eSdan ** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
1331a088a8eSdan ** than one background thread may be created. Specifically, there may be
1341a088a8eSdan ** one background thread for each temporary file on disk, and one background
1351a088a8eSdan ** thread to merge the output of each of the others to a single PMA for
1361a088a8eSdan ** the main thread to read from.
137a20fde64Sdan */
138a20fde64Sdan #include "sqliteInt.h"
139a20fde64Sdan #include "vdbeInt.h"
140a20fde64Sdan 
141ac4f0039Sdrh /*
14282a8a9f1Sdan ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
14382a8a9f1Sdan ** messages to stderr that may be helpful in understanding the performance
14482a8a9f1Sdan ** characteristics of the sorter in multi-threaded mode.
14582a8a9f1Sdan */
14682a8a9f1Sdan #if 0
14782a8a9f1Sdan # define SQLITE_DEBUG_SORTER_THREADS 1
14882a8a9f1Sdan #endif
149a20fde64Sdan 
150a20fde64Sdan /*
1510a79238bSdan ** Hard-coded maximum amount of data to accumulate in memory before flushing
1520a79238bSdan ** to a level 0 PMA. The purpose of this limit is to prevent various integer
1530a79238bSdan ** overflows. 512MiB.
1540a79238bSdan */
1553bd1791dSdrh #define SQLITE_MAX_PMASZ    (1<<29)
1560a79238bSdan 
1570a79238bSdan /*
158ac4f0039Sdrh ** Private objects used by the sorter
159ac4f0039Sdrh */
160a634fb15Sdrh typedef struct MergeEngine MergeEngine;     /* Merge PMAs together */
161a634fb15Sdrh typedef struct PmaReader PmaReader;         /* Incrementally read one PMA */
1621a088a8eSdan typedef struct PmaWriter PmaWriter;         /* Incrementally write one PMA */
163a634fb15Sdrh typedef struct SorterRecord SorterRecord;   /* A record being sorted */
164a634fb15Sdrh typedef struct SortSubtask SortSubtask;     /* A sub-task in the sort process */
1651a088a8eSdan typedef struct SorterFile SorterFile;       /* Temporary file object wrapper */
1661a088a8eSdan typedef struct SorterList SorterList;       /* In-memory list of records */
1673de4df27Sdrh typedef struct IncrMerger IncrMerger;       /* Read & merge multiple PMAs */
168a20fde64Sdan 
16982a8a9f1Sdan /*
17082a8a9f1Sdan ** A container for a temp file handle and the current amount of data
17182a8a9f1Sdan ** stored in the file.
17282a8a9f1Sdan */
17382a8a9f1Sdan struct SorterFile {
17482a8a9f1Sdan   sqlite3_file *pFd;              /* File handle */
17582a8a9f1Sdan   i64 iEof;                       /* Bytes of data stored in pFd */
17682a8a9f1Sdan };
177f8768418Sdan 
178f8768418Sdan /*
1793de4df27Sdrh ** An in-memory list of objects to be sorted.
180e6f7bc63Sdan **
1813de4df27Sdrh ** If aMemory==0 then each object is allocated separately and the objects
1823de4df27Sdrh ** are connected using SorterRecord.u.pNext.  If aMemory!=0 then all objects
1833de4df27Sdrh ** are stored in the aMemory[] bulk memory, one right after the other, and
1843de4df27Sdrh ** are connected using SorterRecord.u.iNext.
185f8768418Sdan */
18682a8a9f1Sdan struct SorterList {
18782a8a9f1Sdan   SorterRecord *pList;            /* Linked list of records */
1883de4df27Sdrh   u8 *aMemory;                    /* If non-NULL, bulk memory to hold pList */
18982a8a9f1Sdan   int szPMA;                      /* Size of pList as PMA in bytes */
190d30ab3d9Sdan };
191d30ab3d9Sdan 
192f8768418Sdan /*
193a634fb15Sdrh ** The MergeEngine object is used to combine two or more smaller PMAs into
194a634fb15Sdrh ** one big PMA using a merge operation.  Separate PMAs all need to be
195a634fb15Sdrh ** combined into one big PMA in order to be able to step through the sorted
196a634fb15Sdrh ** records in order.
197a20fde64Sdan **
198de823bedSdrh ** The aReadr[] array contains a PmaReader object for each of the PMAs being
199de823bedSdrh ** merged.  An aReadr[] object either points to a valid key or else is at EOF.
200ac65196eSdrh ** ("EOF" means "End Of File".  When aReadr[] is at EOF there is no more data.)
201a634fb15Sdrh ** For the purposes of the paragraphs below, we assume that the array is
202a634fb15Sdrh ** actually N elements in size, where N is the smallest power of 2 greater
203de823bedSdrh ** to or equal to the number of PMAs being merged. The extra aReadr[] elements
204a634fb15Sdrh ** are treated as if they are empty (always at EOF).
205c6e73455Sdan **
206f25eef98Sdan ** The aTree[] array is also N elements in size. The value of N is stored in
207a634fb15Sdrh ** the MergeEngine.nTree variable.
208a20fde64Sdan **
209a20fde64Sdan ** The final (N/2) elements of aTree[] contain the results of comparing
210a634fb15Sdrh ** pairs of PMA keys together. Element i contains the result of
211de823bedSdrh ** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the
212a20fde64Sdan ** aTree element is set to the index of it.
213a20fde64Sdan **
214a20fde64Sdan ** For the purposes of this comparison, EOF is considered greater than any
215a20fde64Sdan ** other key value. If the keys are equal (only possible with two EOF
216a20fde64Sdan ** values), it doesn't matter which index is stored.
217a20fde64Sdan **
218f7b5496eSdrh ** The (N/4) elements of aTree[] that precede the final (N/2) described
219de823bedSdrh ** above contains the index of the smallest of each block of 4 PmaReaders
220de823bedSdrh ** And so on. So that aTree[1] contains the index of the PmaReader that
221a20fde64Sdan ** currently points to the smallest key value. aTree[0] is unused.
222a20fde64Sdan **
223a20fde64Sdan ** Example:
224a20fde64Sdan **
225de823bedSdrh **     aReadr[0] -> Banana
226de823bedSdrh **     aReadr[1] -> Feijoa
227de823bedSdrh **     aReadr[2] -> Elderberry
228de823bedSdrh **     aReadr[3] -> Currant
229de823bedSdrh **     aReadr[4] -> Grapefruit
230de823bedSdrh **     aReadr[5] -> Apple
231de823bedSdrh **     aReadr[6] -> Durian
232de823bedSdrh **     aReadr[7] -> EOF
233a20fde64Sdan **
234a20fde64Sdan **     aTree[] = { X, 5   0, 5    0, 3, 5, 6 }
235a20fde64Sdan **
236a20fde64Sdan ** The current element is "Apple" (the value of the key indicated by
237de823bedSdrh ** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will
238a20fde64Sdan ** be advanced to the next key in its segment. Say the next key is
239a20fde64Sdan ** "Eggplant":
240a20fde64Sdan **
241de823bedSdrh **     aReadr[5] -> Eggplant
242a20fde64Sdan **
243de823bedSdrh ** The contents of aTree[] are updated first by comparing the new PmaReader
244de823bedSdrh ** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader
245a20fde64Sdan ** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree.
246de823bedSdrh ** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader
247a20fde64Sdan ** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian),
248a20fde64Sdan ** so the value written into element 1 of the array is 0. As follows:
249a20fde64Sdan **
250a20fde64Sdan **     aTree[] = { X, 0   0, 6    0, 3, 5, 6 }
251a20fde64Sdan **
252a20fde64Sdan ** In other words, each time we advance to the next sorter element, log2(N)
253a20fde64Sdan ** key comparison operations are required, where N is the number of segments
254a20fde64Sdan ** being merged (rounded up to the next power of 2).
255a20fde64Sdan */
256a634fb15Sdrh struct MergeEngine {
257de823bedSdrh   int nTree;                 /* Used size of aTree/aReadr (power of 2) */
258ac65196eSdrh   SortSubtask *pTask;        /* Used by this thread only */
2593fb757b4Sdrh   int *aTree;                /* Current state of incremental merge */
260de823bedSdrh   PmaReader *aReadr;         /* Array of PmaReaders to merge data from */
261a20fde64Sdan };
262a20fde64Sdan 
263a20fde64Sdan /*
264ac65196eSdrh ** This object represents a single thread of control in a sort operation.
2651a088a8eSdan ** Exactly VdbeSorter.nTask instances of this object are allocated
2661a088a8eSdan ** as part of each VdbeSorter object. Instances are never allocated any
2671a088a8eSdan ** other way. VdbeSorter.nTask is set to the number of worker threads allowed
268ac65196eSdrh ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).  Thus for
269ac65196eSdrh ** single-threaded operation, there is exactly one instance of this object
270ac65196eSdrh ** and for multi-threaded operation there are two or more instances.
2711a088a8eSdan **
2721a088a8eSdan ** Essentially, this structure contains all those fields of the VdbeSorter
2731a088a8eSdan ** structure for which each thread requires a separate instance. For example,
2741a088a8eSdan ** each thread requries its own UnpackedRecord object to unpack records in
2751a088a8eSdan ** as part of comparison operations.
2761a088a8eSdan **
2771a088a8eSdan ** Before a background thread is launched, variable bDone is set to 0. Then,
2781a088a8eSdan ** right before it exits, the thread itself sets bDone to 1. This is used for
2791a088a8eSdan ** two purposes:
2801a088a8eSdan **
2811a088a8eSdan **   1. When flushing the contents of memory to a level-0 PMA on disk, to
2821a088a8eSdan **      attempt to select a SortSubtask for which there is not already an
2831a088a8eSdan **      active background thread (since doing so causes the main thread
2841a088a8eSdan **      to block until it finishes).
2851a088a8eSdan **
2861a088a8eSdan **   2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
2871a088a8eSdan **      to sqlite3ThreadJoin() is likely to block. Cases that are likely to
2881a088a8eSdan **      block provoke debugging output.
2891a088a8eSdan **
2901a088a8eSdan ** In both cases, the effects of the main thread seeing (bDone==0) even
2911a088a8eSdan ** after the thread has finished are not dire. So we don't worry about
2921a088a8eSdan ** memory barriers and such here.
293a20fde64Sdan */
294a9d9111cSdan typedef int (*SorterCompare)(SortSubtask*,int*,const void*,int,const void*,int);
2951a088a8eSdan struct SortSubtask {
2961a088a8eSdan   SQLiteThread *pThread;          /* Background thread, if any */
2971a088a8eSdan   int bDone;                      /* Set if thread is finished but not joined */
2981a088a8eSdan   VdbeSorter *pSorter;            /* Sorter that owns this sub-task */
2991a088a8eSdan   UnpackedRecord *pUnpacked;      /* Space to unpack a record */
3001a088a8eSdan   SorterList list;                /* List for thread to write to a PMA */
3011a088a8eSdan   int nPMA;                       /* Number of PMAs currently in file */
302a9d9111cSdan   SorterCompare xCompare;         /* Compare function to use */
3031a088a8eSdan   SorterFile file;                /* Temp file for level-0 PMAs */
3041a088a8eSdan   SorterFile file2;               /* Space for other PMAs */
3051a088a8eSdan };
3061a088a8eSdan 
307a9d9111cSdan 
3081a088a8eSdan /*
309f8768418Sdan ** Main sorter structure. A single instance of this is allocated for each
310f8768418Sdan ** sorter cursor created by the VDBE.
3114be4c406Sdan **
3124be4c406Sdan ** mxKeysize:
3134be4c406Sdan **   As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
3144be4c406Sdan **   this variable is updated so as to be set to the size on disk of the
3154be4c406Sdan **   largest record in the sorter.
316f8768418Sdan */
317f8768418Sdan struct VdbeSorter {
318a20fde64Sdan   int mnPmaSize;                  /* Minimum PMA size, in bytes */
319a20fde64Sdan   int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
3201a088a8eSdan   int mxKeysize;                  /* Largest serialized key seen so far */
3211a088a8eSdan   int pgsz;                       /* Main database page size */
322de823bedSdrh   PmaReader *pReader;             /* Readr data from here after Rewind() */
323f77ceba5Sdan   MergeEngine *pMerger;           /* Or here, if bUseThreads==0 */
3241a088a8eSdan   sqlite3 *db;                    /* Database connection */
3251a088a8eSdan   KeyInfo *pKeyInfo;              /* How to compare records */
326d30ab3d9Sdan   UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
327f77ceba5Sdan   SorterList list;                /* List of in-memory records */
328f77ceba5Sdan   int iMemory;                    /* Offset of free space in list.aMemory */
329f77ceba5Sdan   int nMemory;                    /* Size of list.aMemory allocation in bytes */
330f77ceba5Sdan   u8 bUsePMA;                     /* True if one or more PMAs created */
331f77ceba5Sdan   u8 bUseThreads;                 /* True to use background threads */
332f77ceba5Sdan   u8 iPrev;                       /* Previous thread used to flush PMA */
333f77ceba5Sdan   u8 nTask;                       /* Size of aTask[] array */
33457a14094Sdan   u8 typeMask;
335a634fb15Sdrh   SortSubtask aTask[1];           /* One or more subtasks */
336a20fde64Sdan };
337a20fde64Sdan 
33857a14094Sdan #define SORTER_TYPE_INTEGER 0x01
33957a14094Sdan #define SORTER_TYPE_TEXT    0x02
34057a14094Sdan 
341a20fde64Sdan /*
342a634fb15Sdrh ** An instance of the following object is used to read records out of a
343a634fb15Sdrh ** PMA, in sorted order.  The next key to be read is cached in nKey/aKey.
344a4c8ca04Sdrh ** aKey might point into aMap or into aBuffer.  If neither of those locations
345a4c8ca04Sdrh ** contain a contiguous representation of the key, then aAlloc is allocated
346a4c8ca04Sdrh ** and the key is copied into aAlloc and aKey is made to poitn to aAlloc.
347a4c8ca04Sdrh **
348a4c8ca04Sdrh ** pFd==0 at EOF.
349a20fde64Sdan */
350a634fb15Sdrh struct PmaReader {
351c6e73455Sdan   i64 iReadOff;               /* Current read offset */
352de823bedSdrh   i64 iEof;                   /* 1 byte past EOF for this PmaReader */
353c6e73455Sdan   int nAlloc;                 /* Bytes of space at aAlloc */
354a20fde64Sdan   int nKey;                   /* Number of bytes in key */
355a4c8ca04Sdrh   sqlite3_file *pFd;          /* File handle we are reading from */
356a4c8ca04Sdrh   u8 *aAlloc;                 /* Space for aKey if aBuffer and pMap wont work */
357a20fde64Sdan   u8 *aKey;                   /* Pointer to current key */
3583b2c9b32Sdan   u8 *aBuffer;                /* Current read buffer */
3593b2c9b32Sdan   int nBuffer;                /* Size of read buffer in bytes */
3602f170015Sdan   u8 *aMap;                   /* Pointer to mapping of entire file */
361d30ab3d9Sdan   IncrMerger *pIncr;          /* Incremental merger */
3623b2c9b32Sdan };
3633b2c9b32Sdan 
3643b2c9b32Sdan /*
3654be4c406Sdan ** Normally, a PmaReader object iterates through an existing PMA stored
3664be4c406Sdan ** within a temp file. However, if the PmaReader.pIncr variable points to
3674be4c406Sdan ** an object of the following type, it may be used to iterate/merge through
3684be4c406Sdan ** multiple PMAs simultaneously.
3691a088a8eSdan **
3701a088a8eSdan ** There are two types of IncrMerger object - single (bUseThread==0) and
3711a088a8eSdan ** multi-threaded (bUseThread==1).
3721a088a8eSdan **
3731a088a8eSdan ** A multi-threaded IncrMerger object uses two temporary files - aFile[0]
3741a088a8eSdan ** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in
3751a088a8eSdan ** size. When the IncrMerger is initialized, it reads enough data from
3761a088a8eSdan ** pMerger to populate aFile[0]. It then sets variables within the
3771a088a8eSdan ** corresponding PmaReader object to read from that file and kicks off
3781a088a8eSdan ** a background thread to populate aFile[1] with the next mxSz bytes of
3791a088a8eSdan ** sorted record data from pMerger.
3801a088a8eSdan **
3811a088a8eSdan ** When the PmaReader reaches the end of aFile[0], it blocks until the
3821a088a8eSdan ** background thread has finished populating aFile[1]. It then exchanges
3831a088a8eSdan ** the contents of the aFile[0] and aFile[1] variables within this structure,
3841a088a8eSdan ** sets the PmaReader fields to read from the new aFile[0] and kicks off
3851a088a8eSdan ** another background thread to populate the new aFile[1]. And so on, until
3861a088a8eSdan ** the contents of pMerger are exhausted.
3871a088a8eSdan **
3881a088a8eSdan ** A single-threaded IncrMerger does not open any temporary files of its
3891a088a8eSdan ** own. Instead, it has exclusive access to mxSz bytes of space beginning
3901a088a8eSdan ** at offset iStartOff of file pTask->file2. And instead of using a
3911a088a8eSdan ** background thread to prepare data for the PmaReader, with a single
3921a088a8eSdan ** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
3931a088a8eSdan ** keys from pMerger by the calling thread whenever the PmaReader runs out
3941a088a8eSdan ** of data.
3953b2c9b32Sdan */
396d30ab3d9Sdan struct IncrMerger {
397d30ab3d9Sdan   SortSubtask *pTask;             /* Task that owns this merger */
3984be4c406Sdan   MergeEngine *pMerger;           /* Merge engine thread reads data from */
3994be4c406Sdan   i64 iStartOff;                  /* Offset to start writing file at */
4004be4c406Sdan   int mxSz;                       /* Maximum bytes of data to store */
4014be4c406Sdan   int bEof;                       /* Set to true when merge is finished */
4024be4c406Sdan   int bUseThread;                 /* True to use a bg thread for this object */
4034be4c406Sdan   SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
4043b2c9b32Sdan };
4053b2c9b32Sdan 
4063b2c9b32Sdan /*
407a634fb15Sdrh ** An instance of this object is used for writing a PMA.
408a634fb15Sdrh **
409a634fb15Sdrh ** The PMA is written one record at a time.  Each record is of an arbitrary
410a634fb15Sdrh ** size.  But I/O is more efficient if it occurs in page-sized blocks where
411a634fb15Sdrh ** each block is aligned on a page boundary.  This object caches writes to
412a634fb15Sdrh ** the PMA so that aligned, page-size blocks are written.
4133b2c9b32Sdan */
414a634fb15Sdrh struct PmaWriter {
41507f54792Sdrh   int eFWErr;                     /* Non-zero if in an error state */
4163b2c9b32Sdan   u8 *aBuffer;                    /* Pointer to write buffer */
4173b2c9b32Sdan   int nBuffer;                    /* Size of write buffer in bytes */
4183b2c9b32Sdan   int iBufStart;                  /* First byte of buffer to write */
4193b2c9b32Sdan   int iBufEnd;                    /* Last byte of buffer to write */
4203b2c9b32Sdan   i64 iWriteOff;                  /* Offset of start of buffer in file */
421a4c8ca04Sdrh   sqlite3_file *pFd;              /* File handle to write to */
422a20fde64Sdan };
423a20fde64Sdan 
4245134d135Sdan /*
425a634fb15Sdrh ** This object is the header on a single record while that record is being
426a634fb15Sdrh ** held in memory and prior to being written out as part of a PMA.
4276971952cSdan **
4286971952cSdan ** How the linked list is connected depends on how memory is being managed
4296971952cSdan ** by this module. If using a separate allocation for each in-memory record
4301a088a8eSdan ** (VdbeSorter.list.aMemory==0), then the list is always connected using the
4316971952cSdan ** SorterRecord.u.pNext pointers.
4326971952cSdan **
4331a088a8eSdan ** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
4346971952cSdan ** then while records are being accumulated the list is linked using the
4356971952cSdan ** SorterRecord.u.iNext offset. This is because the aMemory[] array may
4366971952cSdan ** be sqlite3Realloc()ed while records are being accumulated. Once the VM
4376971952cSdan ** has finished passing records to the sorter, or when the in-memory buffer
4386971952cSdan ** is full, the list is sorted. As part of the sorting process, it is
4396971952cSdan ** converted to use the SorterRecord.u.pNext pointers. See function
4406971952cSdan ** vdbeSorterSort() for details.
4415134d135Sdan */
4425134d135Sdan struct SorterRecord {
443a634fb15Sdrh   int nVal;                       /* Size of the record in bytes */
4446971952cSdan   union {
4456971952cSdan     SorterRecord *pNext;          /* Pointer to next record in list */
4466971952cSdan     int iNext;                    /* Offset within aMemory of next record */
4476971952cSdan   } u;
448a634fb15Sdrh   /* The data for the record immediately follows this header */
4495134d135Sdan };
4505134d135Sdan 
4516971952cSdan /* Return a pointer to the buffer containing the record data for SorterRecord
4526971952cSdan ** object p. Should be used as if:
4536971952cSdan **
4546971952cSdan **   void *SRVAL(SorterRecord *p) { return (void*)&p[1]; }
4556971952cSdan */
4566971952cSdan #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1))
4576971952cSdan 
458a20fde64Sdan 
459a634fb15Sdrh /* Maximum number of PMAs that a single MergeEngine can merge */
460f834eff2Sdan #define SORTER_MAX_MERGE_COUNT 16
4617fe6270bSdan 
462d30ab3d9Sdan static int vdbeIncrSwap(IncrMerger*);
463d30ab3d9Sdan static void vdbeIncrFree(IncrMerger *);
464d30ab3d9Sdan 
465a20fde64Sdan /*
466ac65196eSdrh ** Free all memory belonging to the PmaReader object passed as the
467c6e73455Sdan ** argument. All structure fields are set to zero before returning.
468a20fde64Sdan */
vdbePmaReaderClear(PmaReader * pReadr)469de823bedSdrh static void vdbePmaReaderClear(PmaReader *pReadr){
470de823bedSdrh   sqlite3_free(pReadr->aAlloc);
471de823bedSdrh   sqlite3_free(pReadr->aBuffer);
472a4c8ca04Sdrh   if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
473de823bedSdrh   vdbeIncrFree(pReadr->pIncr);
474de823bedSdrh   memset(pReadr, 0, sizeof(PmaReader));
475a20fde64Sdan }
476a20fde64Sdan 
477a20fde64Sdan /*
478ac65196eSdrh ** Read the next nByte bytes of data from the PMA p.
4793b2c9b32Sdan ** If successful, set *ppOut to point to a buffer containing the data
4803b2c9b32Sdan ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite
4813b2c9b32Sdan ** error code.
4823b2c9b32Sdan **
483ac65196eSdrh ** The buffer returned in *ppOut is only valid until the
4843b2c9b32Sdan ** next call to this function.
4853b2c9b32Sdan */
vdbePmaReadBlob(PmaReader * p,int nByte,u8 ** ppOut)486a634fb15Sdrh static int vdbePmaReadBlob(
487de823bedSdrh   PmaReader *p,                   /* PmaReader from which to take the blob */
4883b2c9b32Sdan   int nByte,                      /* Bytes of data to read */
4893b2c9b32Sdan   u8 **ppOut                      /* OUT: Pointer to buffer containing data */
4903b2c9b32Sdan ){
4919d0c0ea6Sdan   int iBuf;                       /* Offset within buffer to read from */
4929d0c0ea6Sdan   int nAvail;                     /* Bytes of data available in buffer */
493face0872Sdan 
494face0872Sdan   if( p->aMap ){
495face0872Sdan     *ppOut = &p->aMap[p->iReadOff];
496face0872Sdan     p->iReadOff += nByte;
497face0872Sdan     return SQLITE_OK;
498face0872Sdan   }
499face0872Sdan 
5003b2c9b32Sdan   assert( p->aBuffer );
5013b2c9b32Sdan 
5029d0c0ea6Sdan   /* If there is no more data to be read from the buffer, read the next
5039d0c0ea6Sdan   ** p->nBuffer bytes of data from the file into it. Or, if there are less
5049d0c0ea6Sdan   ** than p->nBuffer bytes remaining in the PMA, read all remaining data.  */
5053b2c9b32Sdan   iBuf = p->iReadOff % p->nBuffer;
5063b2c9b32Sdan   if( iBuf==0 ){
5079d0c0ea6Sdan     int nRead;                    /* Bytes to read from disk */
5089d0c0ea6Sdan     int rc;                       /* sqlite3OsRead() return code */
5093b2c9b32Sdan 
5109d0c0ea6Sdan     /* Determine how many bytes of data to read. */
511d4e97e8aSdan     if( (p->iEof - p->iReadOff) > (i64)p->nBuffer ){
512d4e97e8aSdan       nRead = p->nBuffer;
513d4e97e8aSdan     }else{
5148b64b397Sdrh       nRead = (int)(p->iEof - p->iReadOff);
515d4e97e8aSdan     }
5163b2c9b32Sdan     assert( nRead>0 );
5179d0c0ea6Sdan 
518de823bedSdrh     /* Readr data from the file. Return early if an error occurs. */
519a4c8ca04Sdrh     rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff);
5203b2c9b32Sdan     assert( rc!=SQLITE_IOERR_SHORT_READ );
5213b2c9b32Sdan     if( rc!=SQLITE_OK ) return rc;
5223b2c9b32Sdan   }
5233b2c9b32Sdan   nAvail = p->nBuffer - iBuf;
5243b2c9b32Sdan 
5253b2c9b32Sdan   if( nByte<=nAvail ){
5269d0c0ea6Sdan     /* The requested data is available in the in-memory buffer. In this
5279d0c0ea6Sdan     ** case there is no need to make a copy of the data, just return a
5289d0c0ea6Sdan     ** pointer into the buffer to the caller.  */
5293b2c9b32Sdan     *ppOut = &p->aBuffer[iBuf];
5303b2c9b32Sdan     p->iReadOff += nByte;
5313b2c9b32Sdan   }else{
5329d0c0ea6Sdan     /* The requested data is not all available in the in-memory buffer.
5339d0c0ea6Sdan     ** In this case, allocate space at p->aAlloc[] to copy the requested
5349d0c0ea6Sdan     ** range into. Then return a copy of pointer p->aAlloc to the caller.  */
5359d0c0ea6Sdan     int nRem;                     /* Bytes remaining to copy */
5369d0c0ea6Sdan 
5379d0c0ea6Sdan     /* Extend the p->aAlloc[] allocation if required. */
5383b2c9b32Sdan     if( p->nAlloc<nByte ){
539f8768418Sdan       u8 *aNew;
5400aa3231fSdrh       sqlite3_int64 nNew = MAX(128, 2*(sqlite3_int64)p->nAlloc);
5413b2c9b32Sdan       while( nByte>nNew ) nNew = nNew*2;
542f8768418Sdan       aNew = sqlite3Realloc(p->aAlloc, nNew);
543fad3039cSmistachkin       if( !aNew ) return SQLITE_NOMEM_BKPT;
54409ac7ec5Sdan       p->nAlloc = nNew;
545f8768418Sdan       p->aAlloc = aNew;
5463b2c9b32Sdan     }
5473b2c9b32Sdan 
5489d0c0ea6Sdan     /* Copy as much data as is available in the buffer into the start of
5499d0c0ea6Sdan     ** p->aAlloc[].  */
5503b2c9b32Sdan     memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail);
5513b2c9b32Sdan     p->iReadOff += nAvail;
5523b2c9b32Sdan     nRem = nByte - nAvail;
5539d0c0ea6Sdan 
5549d0c0ea6Sdan     /* The following loop copies up to p->nBuffer bytes per iteration into
5559d0c0ea6Sdan     ** the p->aAlloc[] buffer.  */
5563b2c9b32Sdan     while( nRem>0 ){
557a634fb15Sdrh       int rc;                     /* vdbePmaReadBlob() return code */
5589d0c0ea6Sdan       int nCopy;                  /* Number of bytes to copy */
5599d0c0ea6Sdan       u8 *aNext;                  /* Pointer to buffer to copy data from */
5603b2c9b32Sdan 
5613b2c9b32Sdan       nCopy = nRem;
5623b2c9b32Sdan       if( nRem>p->nBuffer ) nCopy = p->nBuffer;
563a634fb15Sdrh       rc = vdbePmaReadBlob(p, nCopy, &aNext);
5643b2c9b32Sdan       if( rc!=SQLITE_OK ) return rc;
5653b2c9b32Sdan       assert( aNext!=p->aAlloc );
5663b2c9b32Sdan       memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy);
5673b2c9b32Sdan       nRem -= nCopy;
5683b2c9b32Sdan     }
5693b2c9b32Sdan 
5703b2c9b32Sdan     *ppOut = p->aAlloc;
5713b2c9b32Sdan   }
5723b2c9b32Sdan 
5733b2c9b32Sdan   return SQLITE_OK;
5743b2c9b32Sdan }
5753b2c9b32Sdan 
5763b2c9b32Sdan /*
5773b2c9b32Sdan ** Read a varint from the stream of data accessed by p. Set *pnOut to
5783b2c9b32Sdan ** the value read.
5793b2c9b32Sdan */
vdbePmaReadVarint(PmaReader * p,u64 * pnOut)580a634fb15Sdrh static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){
5813b2c9b32Sdan   int iBuf;
5823b2c9b32Sdan 
583face0872Sdan   if( p->aMap ){
584face0872Sdan     p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut);
585face0872Sdan   }else{
5863b2c9b32Sdan     iBuf = p->iReadOff % p->nBuffer;
5873b2c9b32Sdan     if( iBuf && (p->nBuffer-iBuf)>=9 ){
5883b2c9b32Sdan       p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut);
5893b2c9b32Sdan     }else{
59007f54792Sdrh       u8 aVarint[16], *a;
59107f54792Sdrh       int i = 0, rc;
59207f54792Sdrh       do{
593a634fb15Sdrh         rc = vdbePmaReadBlob(p, 1, &a);
5943b2c9b32Sdan         if( rc ) return rc;
59507f54792Sdrh         aVarint[(i++)&0xf] = a[0];
59607f54792Sdrh       }while( (a[0]&0x80)!=0 );
5973b2c9b32Sdan       sqlite3GetVarint(aVarint, pnOut);
5983b2c9b32Sdan     }
599face0872Sdan   }
6003b2c9b32Sdan 
6013b2c9b32Sdan   return SQLITE_OK;
6023b2c9b32Sdan }
6033b2c9b32Sdan 
6043b2c9b32Sdan /*
6051a088a8eSdan ** Attempt to memory map file pFile. If successful, set *pp to point to the
6061a088a8eSdan ** new mapping and return SQLITE_OK. If the mapping is not attempted
6071a088a8eSdan ** (because the file is too large or the VFS layer is configured not to use
6081a088a8eSdan ** mmap), return SQLITE_OK and set *pp to NULL.
6091a088a8eSdan **
6101a088a8eSdan ** Or, if an error occurs, return an SQLite error code. The final value of
6111a088a8eSdan ** *pp is undefined in this case.
612a20fde64Sdan */
vdbeSorterMapFile(SortSubtask * pTask,SorterFile * pFile,u8 ** pp)613d30ab3d9Sdan static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
614d30ab3d9Sdan   int rc = SQLITE_OK;
6151a088a8eSdan   if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
616ed7bcba7Sdan     sqlite3_file *pFd = pFile->pFd;
617ed7bcba7Sdan     if( pFd->pMethods->iVersion>=3 ){
618ed7bcba7Sdan       rc = sqlite3OsFetch(pFd, 0, (int)pFile->iEof, (void**)pp);
619ac65196eSdrh       testcase( rc!=SQLITE_OK );
620d30ab3d9Sdan     }
621ed7bcba7Sdan   }
622d30ab3d9Sdan   return rc;
623a20fde64Sdan }
624a20fde64Sdan 
6251a088a8eSdan /*
6268a4865f1Sdrh ** Attach PmaReader pReadr to file pFile (if it is not already attached to
6278a4865f1Sdrh ** that file) and seek it to offset iOff within the file.  Return SQLITE_OK
6281a088a8eSdan ** if successful, or an SQLite error code if an error occurs.
6291a088a8eSdan */
vdbePmaReaderSeek(SortSubtask * pTask,PmaReader * pReadr,SorterFile * pFile,i64 iOff)6301a088a8eSdan static int vdbePmaReaderSeek(
6311a088a8eSdan   SortSubtask *pTask,             /* Task context */
632ac65196eSdrh   PmaReader *pReadr,              /* Reader whose cursor is to be moved */
6331a088a8eSdan   SorterFile *pFile,              /* Sorter file to read from */
6341a088a8eSdan   i64 iOff                        /* Offset in pFile */
6351a088a8eSdan ){
636d30ab3d9Sdan   int rc = SQLITE_OK;
637d30ab3d9Sdan 
638de823bedSdrh   assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 );
639d30ab3d9Sdan 
640c0fea3cfSdrh   if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ;
641de823bedSdrh   if( pReadr->aMap ){
642a4c8ca04Sdrh     sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
643de823bedSdrh     pReadr->aMap = 0;
644d30ab3d9Sdan   }
645de823bedSdrh   pReadr->iReadOff = iOff;
646de823bedSdrh   pReadr->iEof = pFile->iEof;
647a4c8ca04Sdrh   pReadr->pFd = pFile->pFd;
648d30ab3d9Sdan 
649de823bedSdrh   rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap);
650de823bedSdrh   if( rc==SQLITE_OK && pReadr->aMap==0 ){
6511a088a8eSdan     int pgsz = pTask->pSorter->pgsz;
652de823bedSdrh     int iBuf = pReadr->iReadOff % pgsz;
653de823bedSdrh     if( pReadr->aBuffer==0 ){
654de823bedSdrh       pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz);
655fad3039cSmistachkin       if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM_BKPT;
656de823bedSdrh       pReadr->nBuffer = pgsz;
657d30ab3d9Sdan     }
65822ace891Sdan     if( rc==SQLITE_OK && iBuf ){
6591a088a8eSdan       int nRead = pgsz - iBuf;
660de823bedSdrh       if( (pReadr->iReadOff + nRead) > pReadr->iEof ){
661de823bedSdrh         nRead = (int)(pReadr->iEof - pReadr->iReadOff);
6624be4c406Sdan       }
6634be4c406Sdan       rc = sqlite3OsRead(
664a4c8ca04Sdrh           pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff
6654be4c406Sdan       );
666ac65196eSdrh       testcase( rc!=SQLITE_OK );
6674be4c406Sdan     }
6681e74e602Sdan   }
6691e74e602Sdan 
6701e74e602Sdan   return rc;
6711e74e602Sdan }
6721e74e602Sdan 
673a20fde64Sdan /*
674de823bedSdrh ** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if
675a20fde64Sdan ** no error occurs, or an SQLite error code if one does.
676a20fde64Sdan */
vdbePmaReaderNext(PmaReader * pReadr)677de823bedSdrh static int vdbePmaReaderNext(PmaReader *pReadr){
678d30ab3d9Sdan   int rc = SQLITE_OK;             /* Return Code */
679a20fde64Sdan   u64 nRec = 0;                   /* Size of record in bytes */
680a20fde64Sdan 
681e18e90ebSdan 
682de823bedSdrh   if( pReadr->iReadOff>=pReadr->iEof ){
683de823bedSdrh     IncrMerger *pIncr = pReadr->pIncr;
684d30ab3d9Sdan     int bEof = 1;
6851a088a8eSdan     if( pIncr ){
6861a088a8eSdan       rc = vdbeIncrSwap(pIncr);
6871a088a8eSdan       if( rc==SQLITE_OK && pIncr->bEof==0 ){
6881a088a8eSdan         rc = vdbePmaReaderSeek(
689de823bedSdrh             pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff
6901a088a8eSdan         );
691d30ab3d9Sdan         bEof = 0;
692d30ab3d9Sdan       }
693a20fde64Sdan     }
694a20fde64Sdan 
695d30ab3d9Sdan     if( bEof ){
696d30ab3d9Sdan       /* This is an EOF condition */
697de823bedSdrh       vdbePmaReaderClear(pReadr);
698ac65196eSdrh       testcase( rc!=SQLITE_OK );
699d30ab3d9Sdan       return rc;
700d30ab3d9Sdan     }
701d30ab3d9Sdan   }
702d30ab3d9Sdan 
703d30ab3d9Sdan   if( rc==SQLITE_OK ){
704de823bedSdrh     rc = vdbePmaReadVarint(pReadr, &nRec);
705d30ab3d9Sdan   }
706a20fde64Sdan   if( rc==SQLITE_OK ){
707de823bedSdrh     pReadr->nKey = (int)nRec;
708de823bedSdrh     rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey);
709ac65196eSdrh     testcase( rc!=SQLITE_OK );
710a20fde64Sdan   }
711a20fde64Sdan 
712a20fde64Sdan   return rc;
713a20fde64Sdan }
714a20fde64Sdan 
715a20fde64Sdan /*
716de823bedSdrh ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile
717a20fde64Sdan ** starting at offset iStart and ending at offset iEof-1. This function
718de823bedSdrh ** leaves the PmaReader pointing to the first key in the PMA (or EOF if the
719a20fde64Sdan ** PMA is empty).
720d30ab3d9Sdan **
721d30ab3d9Sdan ** If the pnByte parameter is NULL, then it is assumed that the file
722d30ab3d9Sdan ** contains a single PMA, and that that PMA omits the initial length varint.
723a20fde64Sdan */
vdbePmaReaderInit(SortSubtask * pTask,SorterFile * pFile,i64 iStart,PmaReader * pReadr,i64 * pnByte)724a634fb15Sdrh static int vdbePmaReaderInit(
725d30ab3d9Sdan   SortSubtask *pTask,             /* Task context */
726d30ab3d9Sdan   SorterFile *pFile,              /* Sorter file to read from */
727c6e73455Sdan   i64 iStart,                     /* Start offset in pFile */
728de823bedSdrh   PmaReader *pReadr,              /* PmaReader to populate */
7291e74e602Sdan   i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
730a20fde64Sdan ){
7311a088a8eSdan   int rc;
7323b2c9b32Sdan 
733d30ab3d9Sdan   assert( pFile->iEof>iStart );
734de823bedSdrh   assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 );
735de823bedSdrh   assert( pReadr->aBuffer==0 );
736de823bedSdrh   assert( pReadr->aMap==0 );
7371e74e602Sdan 
738de823bedSdrh   rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart);
7393b2c9b32Sdan   if( rc==SQLITE_OK ){
740d1dd7507Sdrh     u64 nByte = 0;                 /* Size of PMA in bytes */
741de823bedSdrh     rc = vdbePmaReadVarint(pReadr, &nByte);
742de823bedSdrh     pReadr->iEof = pReadr->iReadOff + nByte;
7433b2c9b32Sdan     *pnByte += nByte;
7443b2c9b32Sdan   }
7453b2c9b32Sdan 
7461e74e602Sdan   if( rc==SQLITE_OK ){
747de823bedSdrh     rc = vdbePmaReaderNext(pReadr);
7481e74e602Sdan   }
7491e74e602Sdan   return rc;
750a20fde64Sdan }
751a20fde64Sdan 
7525134d135Sdan /*
7537004f3f6Sdan ** A version of vdbeSorterCompare() that assumes that it has already been
7547004f3f6Sdan ** determined that the first field of key1 is equal to the first field of
7557004f3f6Sdan ** key2.
7567004f3f6Sdan */
vdbeSorterCompareTail(SortSubtask * pTask,int * pbKey2Cached,const void * pKey1,int nKey1,const void * pKey2,int nKey2)7577004f3f6Sdan static int vdbeSorterCompareTail(
7587004f3f6Sdan   SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
7597004f3f6Sdan   int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
7607004f3f6Sdan   const void *pKey1, int nKey1,   /* Left side of comparison */
7617004f3f6Sdan   const void *pKey2, int nKey2    /* Right side of comparison */
7627004f3f6Sdan ){
7637004f3f6Sdan   UnpackedRecord *r2 = pTask->pUnpacked;
7647004f3f6Sdan   if( *pbKey2Cached==0 ){
7657004f3f6Sdan     sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
7667004f3f6Sdan     *pbKey2Cached = 1;
7677004f3f6Sdan   }
7687004f3f6Sdan   return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, r2, 1);
7697004f3f6Sdan }
7705134d135Sdan 
7715134d135Sdan /*
7725134d135Sdan ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2,
773a634fb15Sdrh ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences
774fad9f9a8Sdan ** used by the comparison. Return the result of the comparison.
7755134d135Sdan **
776a9d9111cSdan ** If IN/OUT parameter *pbKey2Cached is true when this function is called,
777a9d9111cSdan ** it is assumed that (pTask->pUnpacked) contains the unpacked version
778a9d9111cSdan ** of key2. If it is false, (pTask->pUnpacked) is populated with the unpacked
779a9d9111cSdan ** version of key2 and *pbKey2Cached set to true before returning.
7808b1ea14fSdan **
781a634fb15Sdrh ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set
782fad9f9a8Sdan ** to SQLITE_NOMEM.
7835134d135Sdan */
vdbeSorterCompare(SortSubtask * pTask,int * pbKey2Cached,const void * pKey1,int nKey1,const void * pKey2,int nKey2)784fad9f9a8Sdan static int vdbeSorterCompare(
785a634fb15Sdrh   SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
786a9d9111cSdan   int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
787c041c16cSdrh   const void *pKey1, int nKey1,   /* Left side of comparison */
788fad9f9a8Sdan   const void *pKey2, int nKey2    /* Right side of comparison */
7895134d135Sdan ){
790a634fb15Sdrh   UnpackedRecord *r2 = pTask->pUnpacked;
791a9d9111cSdan   if( !*pbKey2Cached ){
7921a088a8eSdan     sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
793a9d9111cSdan     *pbKey2Cached = 1;
7948b1ea14fSdan   }
79575179dedSdrh   return sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
796a20fde64Sdan }
797a20fde64Sdan 
798a20fde64Sdan /*
799a9d9111cSdan ** A specially optimized version of vdbeSorterCompare() that assumes that
800a9d9111cSdan ** the first field of each key is a TEXT value and that the collation
801a9d9111cSdan ** sequence to compare them with is BINARY.
802a9d9111cSdan */
vdbeSorterCompareText(SortSubtask * pTask,int * pbKey2Cached,const void * pKey1,int nKey1,const void * pKey2,int nKey2)803a9d9111cSdan static int vdbeSorterCompareText(
804a9d9111cSdan   SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
805a9d9111cSdan   int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
806a9d9111cSdan   const void *pKey1, int nKey1,   /* Left side of comparison */
807a9d9111cSdan   const void *pKey2, int nKey2    /* Right side of comparison */
808a9d9111cSdan ){
809a9d9111cSdan   const u8 * const p1 = (const u8 * const)pKey1;
810a9d9111cSdan   const u8 * const p2 = (const u8 * const)pKey2;
811a9d9111cSdan   const u8 * const v1 = &p1[ p1[0] ];   /* Pointer to value 1 */
812a9d9111cSdan   const u8 * const v2 = &p2[ p2[0] ];   /* Pointer to value 2 */
813a9d9111cSdan 
814a9d9111cSdan   int n1;
815a9d9111cSdan   int n2;
816a9d9111cSdan   int res;
817a9d9111cSdan 
81802a95eb9Sdrh   getVarint32NR(&p1[1], n1);
81902a95eb9Sdrh   getVarint32NR(&p2[1], n2);
820ae2ac854Sdrh   res = memcmp(v1, v2, (MIN(n1, n2) - 13)/2);
821a9d9111cSdan   if( res==0 ){
822a9d9111cSdan     res = n1 - n2;
823a9d9111cSdan   }
824a9d9111cSdan 
825a9d9111cSdan   if( res==0 ){
826a485ad19Sdrh     if( pTask->pSorter->pKeyInfo->nKeyField>1 ){
8277004f3f6Sdan       res = vdbeSorterCompareTail(
8287004f3f6Sdan           pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
8297004f3f6Sdan       );
830a9d9111cSdan     }
831a9d9111cSdan   }else{
8326e11892dSdan     assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) );
8336e11892dSdan     if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){
834a9d9111cSdan       res = res * -1;
835a9d9111cSdan     }
836a9d9111cSdan   }
837a9d9111cSdan 
838a9d9111cSdan   return res;
839a9d9111cSdan }
840a9d9111cSdan 
841a9d9111cSdan /*
842a9d9111cSdan ** A specially optimized version of vdbeSorterCompare() that assumes that
843a9d9111cSdan ** the first field of each key is an INTEGER value.
844a9d9111cSdan */
vdbeSorterCompareInt(SortSubtask * pTask,int * pbKey2Cached,const void * pKey1,int nKey1,const void * pKey2,int nKey2)845a9d9111cSdan static int vdbeSorterCompareInt(
846a9d9111cSdan   SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
847a9d9111cSdan   int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
848a9d9111cSdan   const void *pKey1, int nKey1,   /* Left side of comparison */
849a9d9111cSdan   const void *pKey2, int nKey2    /* Right side of comparison */
850a9d9111cSdan ){
851a9d9111cSdan   const u8 * const p1 = (const u8 * const)pKey1;
852a9d9111cSdan   const u8 * const p2 = (const u8 * const)pKey2;
853a9d9111cSdan   const int s1 = p1[1];                 /* Left hand serial type */
854a9d9111cSdan   const int s2 = p2[1];                 /* Right hand serial type */
855a9d9111cSdan   const u8 * const v1 = &p1[ p1[0] ];   /* Pointer to value 1 */
856a9d9111cSdan   const u8 * const v2 = &p2[ p2[0] ];   /* Pointer to value 2 */
857a9d9111cSdan   int res;                              /* Return value */
858a9d9111cSdan 
859a9d9111cSdan   assert( (s1>0 && s1<7) || s1==8 || s1==9 );
860a9d9111cSdan   assert( (s2>0 && s2<7) || s2==8 || s2==9 );
861a9d9111cSdan 
862a9d9111cSdan   if( s1==s2 ){
863a9d9111cSdan     /* The two values have the same sign. Compare using memcmp(). */
864caab5f42Sdrh     static const u8 aLen[] = {0, 1, 2, 3, 4, 6, 8, 0, 0, 0 };
865caab5f42Sdrh     const u8 n = aLen[s1];
866a9d9111cSdan     int i;
867a9d9111cSdan     res = 0;
868caab5f42Sdrh     for(i=0; i<n; i++){
869caab5f42Sdrh       if( (res = v1[i] - v2[i])!=0 ){
870caab5f42Sdrh         if( ((v1[0] ^ v2[0]) & 0x80)!=0 ){
871caab5f42Sdrh           res = v1[0] & 0x80 ? -1 : +1;
872caab5f42Sdrh         }
873caab5f42Sdrh         break;
874a9d9111cSdan       }
875a9d9111cSdan     }
876caab5f42Sdrh   }else if( s1>7 && s2>7 ){
877caab5f42Sdrh     res = s1 - s2;
878a9d9111cSdan   }else{
879a9d9111cSdan     if( s2>7 ){
880a9d9111cSdan       res = +1;
881a9d9111cSdan     }else if( s1>7 ){
882a9d9111cSdan       res = -1;
883a9d9111cSdan     }else{
884a9d9111cSdan       res = s1 - s2;
885a9d9111cSdan     }
8864a614e90Sdrh     assert( res!=0 );
887a9d9111cSdan 
888a9d9111cSdan     if( res>0 ){
889a9d9111cSdan       if( *v1 & 0x80 ) res = -1;
8904a614e90Sdrh     }else{
891a9d9111cSdan       if( *v2 & 0x80 ) res = +1;
892a9d9111cSdan     }
893a9d9111cSdan   }
894a9d9111cSdan 
895a9d9111cSdan   if( res==0 ){
896a485ad19Sdrh     if( pTask->pSorter->pKeyInfo->nKeyField>1 ){
8977004f3f6Sdan       res = vdbeSorterCompareTail(
8987004f3f6Sdan           pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
8997004f3f6Sdan       );
900a9d9111cSdan     }
9016e11892dSdan   }else if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){
9026e11892dSdan     assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) );
903a9d9111cSdan     res = res * -1;
904a9d9111cSdan   }
905a9d9111cSdan 
906a9d9111cSdan   return res;
907a20fde64Sdan }
908a20fde64Sdan 
909a20fde64Sdan /*
910a20fde64Sdan ** Initialize the temporary index cursor just opened as a sorter cursor.
91131a0bfdeSdan **
912a485ad19Sdrh ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nKeyField)
91331a0bfdeSdan ** to determine the number of fields that should be compared from the
91431a0bfdeSdan ** records being sorted. However, if the value passed as argument nField
91531a0bfdeSdan ** is non-zero and the sorter is able to guarantee a stable sort, nField
91631a0bfdeSdan ** is used instead. This is used when sorting records for a CREATE INDEX
91731a0bfdeSdan ** statement. In this case, keys are always delivered to the sorter in
91831a0bfdeSdan ** order of the primary key, which happens to be make up the final part
91931a0bfdeSdan ** of the records being sorted. So if the sort is stable, there is never
92031a0bfdeSdan ** any reason to compare PK fields and they can be ignored for a small
92131a0bfdeSdan ** performance boost.
92231a0bfdeSdan **
92331a0bfdeSdan ** The sorter can guarantee a stable sort when running in single-threaded
92431a0bfdeSdan ** mode, but not in multi-threaded mode.
92531a0bfdeSdan **
92631a0bfdeSdan ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
927a20fde64Sdan */
sqlite3VdbeSorterInit(sqlite3 * db,int nField,VdbeCursor * pCsr)928a634fb15Sdrh int sqlite3VdbeSorterInit(
929a634fb15Sdrh   sqlite3 *db,                    /* Database connection (for malloc()) */
930a634fb15Sdrh   int nField,                     /* Number of key fields in each record */
931a634fb15Sdrh   VdbeCursor *pCsr                /* Cursor that holds the new sorter */
932a634fb15Sdrh ){
9335134d135Sdan   int pgsz;                       /* Page size of main database */
934a634fb15Sdrh   int i;                          /* Used to iterate through aTask[] */
93534163c68Sdrh   VdbeSorter *pSorter;            /* The new sorter */
936f8768418Sdan   KeyInfo *pKeyInfo;              /* Copy of pCsr->pKeyInfo with db==0 */
937f8768418Sdan   int szKeyInfo;                  /* Size of pCsr->pKeyInfo in bytes */
938b3f56fdbSdan   int sz;                         /* Size of pSorter in bytes */
9392f170015Sdan   int rc = SQLITE_OK;
940b0f935e4Sdrh #if SQLITE_MAX_WORKER_THREADS==0
9418f0dab37Sdrh # define nWorker 0
942b0f935e4Sdrh #else
943111544cbSdrh   int nWorker;
944111544cbSdrh #endif
945111544cbSdrh 
946111544cbSdrh   /* Initialize the upper limit on the number of worker threads */
947111544cbSdrh #if SQLITE_MAX_WORKER_THREADS>0
948111544cbSdrh   if( sqlite3TempInMemory(db) || sqlite3GlobalConfig.bCoreMutex==0 ){
949111544cbSdrh     nWorker = 0;
950111544cbSdrh   }else{
951111544cbSdrh     nWorker = db->aLimit[SQLITE_LIMIT_WORKER_THREADS];
952111544cbSdrh   }
953028696c4Sdrh #endif
954028696c4Sdrh 
955028696c4Sdrh   /* Do not allow the total number of threads (main thread + all workers)
956028696c4Sdrh   ** to exceed the maximum merge count */
957028696c4Sdrh #if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT
958028696c4Sdrh   if( nWorker>=SORTER_MAX_MERGE_COUNT ){
959028696c4Sdrh     nWorker = SORTER_MAX_MERGE_COUNT-1;
960028696c4Sdrh   }
961b0f935e4Sdrh #endif
9625134d135Sdan 
963*b248668bSdrh   assert( pCsr->pKeyInfo );
964*b248668bSdrh   assert( !pCsr->isEphemeral );
965c960dcbaSdrh   assert( pCsr->eCurType==CURTYPE_SORTER );
966a485ad19Sdrh   szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nKeyField-1)*sizeof(CollSeq*);
9678d8f5629Sdrh   sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);
968b3f56fdbSdan 
969b3f56fdbSdan   pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
970c960dcbaSdrh   pCsr->uc.pSorter = pSorter;
97134163c68Sdrh   if( pSorter==0 ){
972fad3039cSmistachkin     rc = SQLITE_NOMEM_BKPT;
9732f170015Sdan   }else{
974ebd2ecddSdan     Btree *pBt = db->aDb[0].pBt;
9751a088a8eSdan     pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
976f8768418Sdan     memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
977f8768418Sdan     pKeyInfo->db = 0;
97857a14094Sdan     if( nField && nWorker==0 ){
979a485ad19Sdrh       pKeyInfo->nKeyField = nField;
98057a14094Sdan     }
981ebd2ecddSdan     sqlite3BtreeEnter(pBt);
982ebd2ecddSdan     pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(pBt);
983ebd2ecddSdan     sqlite3BtreeLeave(pBt);
984a634fb15Sdrh     pSorter->nTask = nWorker + 1;
985cdabd7bdSmistachkin     pSorter->iPrev = (u8)(nWorker - 1);
986d30ab3d9Sdan     pSorter->bUseThreads = (pSorter->nTask>1);
9871a088a8eSdan     pSorter->db = db;
988a634fb15Sdrh     for(i=0; i<pSorter->nTask; i++){
989a634fb15Sdrh       SortSubtask *pTask = &pSorter->aTask[i];
990d30ab3d9Sdan       pTask->pSorter = pSorter;
9915134d135Sdan     }
9925134d135Sdan 
993ca892a72Sdrh     if( !sqlite3TempInMemory(db) ){
994fc26f7cfSdan       i64 mxCache;                /* Cache size in bytes*/
9953bd1791dSdrh       u32 szPma = sqlite3GlobalConfig.szPma;
9963bd1791dSdrh       pSorter->mnPmaSize = szPma * pgsz;
997fc26f7cfSdan 
99834163c68Sdrh       mxCache = db->aDb[0].pSchema->cache_size;
999fc26f7cfSdan       if( mxCache<0 ){
1000fc26f7cfSdan         /* A negative cache-size value C indicates that the cache is abs(C)
1001fc26f7cfSdan         ** KiB in size.  */
1002fc26f7cfSdan         mxCache = mxCache * -1024;
1003fc26f7cfSdan       }else{
1004fc26f7cfSdan         mxCache = mxCache * pgsz;
1005fc26f7cfSdan       }
1006fc26f7cfSdan       mxCache = MIN(mxCache, SQLITE_MAX_PMASZ);
1007fc26f7cfSdan       pSorter->mxPmaSize = MAX(pSorter->mnPmaSize, (int)mxCache);
1008face0872Sdan 
1009b2a0f75cSdrh       /* Avoid large memory allocations if the application has requested
1010b2a0f75cSdrh       ** SQLITE_CONFIG_SMALL_MALLOC. */
1011b2a0f75cSdrh       if( sqlite3GlobalConfig.bSmallMalloc==0 ){
1012face0872Sdan         assert( pSorter->iMemory==0 );
10136971952cSdan         pSorter->nMemory = pgsz;
101482a8a9f1Sdan         pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
1015fad3039cSmistachkin         if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM_BKPT;
10162f170015Sdan       }
1017ca892a72Sdrh     }
101857a14094Sdan 
1019a485ad19Sdrh     if( pKeyInfo->nAllField<13
1020a9d9111cSdan      && (pKeyInfo->aColl[0]==0 || pKeyInfo->aColl[0]==db->pDfltColl)
10216e11892dSdan      && (pKeyInfo->aSortFlags[0] & KEYINFO_ORDER_BIGNULL)==0
1022a9d9111cSdan     ){
102357a14094Sdan       pSorter->typeMask = SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT;
102457a14094Sdan     }
102534163c68Sdrh   }
10265134d135Sdan 
10272f170015Sdan   return rc;
10285134d135Sdan }
10298f0dab37Sdrh #undef nWorker   /* Defined at the top of this function */
10305134d135Sdan 
10315134d135Sdan /*
10325134d135Sdan ** Free the list of sorted records starting at pRecord.
10335134d135Sdan */
vdbeSorterRecordFree(sqlite3 * db,SorterRecord * pRecord)10345134d135Sdan static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){
10355134d135Sdan   SorterRecord *p;
10365134d135Sdan   SorterRecord *pNext;
10375134d135Sdan   for(p=pRecord; p; p=pNext){
10386971952cSdan     pNext = p->u.pNext;
10395134d135Sdan     sqlite3DbFree(db, p);
10405134d135Sdan   }
1041a20fde64Sdan }
1042a20fde64Sdan 
1043a20fde64Sdan /*
1044a634fb15Sdrh ** Free all resources owned by the object indicated by argument pTask. All
1045a634fb15Sdrh ** fields of *pTask are zeroed before returning.
1046f8768418Sdan */
vdbeSortSubtaskCleanup(sqlite3 * db,SortSubtask * pTask)1047a634fb15Sdrh static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
1048a634fb15Sdrh   sqlite3DbFree(db, pTask->pUnpacked);
10495f4a4790Sdrh #if SQLITE_MAX_WORKER_THREADS>0
10505f4a4790Sdrh   /* pTask->list.aMemory can only be non-zero if it was handed memory
10515f4a4790Sdrh   ** from the main thread.  That only occurs SQLITE_MAX_WORKER_THREADS>0 */
10525f4a4790Sdrh   if( pTask->list.aMemory ){
105382a8a9f1Sdan     sqlite3_free(pTask->list.aMemory);
10545f4a4790Sdrh   }else
10555f4a4790Sdrh #endif
10565f4a4790Sdrh   {
10575f4a4790Sdrh     assert( pTask->list.aMemory==0 );
10585f4a4790Sdrh     vdbeSorterRecordFree(0, pTask->list.pList);
10592f170015Sdan   }
1060d30ab3d9Sdan   if( pTask->file.pFd ){
1061d30ab3d9Sdan     sqlite3OsCloseFree(pTask->file.pFd);
1062f8768418Sdan   }
10634be4c406Sdan   if( pTask->file2.pFd ){
10644be4c406Sdan     sqlite3OsCloseFree(pTask->file2.pFd);
10654be4c406Sdan   }
106696974bd3Sdan   memset(pTask, 0, sizeof(SortSubtask));
1067f8768418Sdan }
1068f8768418Sdan 
106982a8a9f1Sdan #ifdef SQLITE_DEBUG_SORTER_THREADS
vdbeSorterWorkDebug(SortSubtask * pTask,const char * zEvent)107082a8a9f1Sdan static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
107182a8a9f1Sdan   i64 t;
107282a8a9f1Sdan   int iTask = (pTask - pTask->pSorter->aTask);
1073a9f43d73Sdan   sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
107482a8a9f1Sdan   fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
107582a8a9f1Sdan }
vdbeSorterRewindDebug(const char * zEvent)1076958d261bSdrh static void vdbeSorterRewindDebug(const char *zEvent){
1077a959bf53Sdrh   i64 t = 0;
1078a959bf53Sdrh   sqlite3_vfs *pVfs = sqlite3_vfs_find(0);
1079a959bf53Sdrh   if( ALWAYS(pVfs) ) sqlite3OsCurrentTimeInt64(pVfs, &t);
108082a8a9f1Sdan   fprintf(stderr, "%lld:X %s\n", t, zEvent);
108182a8a9f1Sdan }
vdbeSorterPopulateDebug(SortSubtask * pTask,const char * zEvent)108282a8a9f1Sdan static void vdbeSorterPopulateDebug(
108382a8a9f1Sdan   SortSubtask *pTask,
108482a8a9f1Sdan   const char *zEvent
108582a8a9f1Sdan ){
108682a8a9f1Sdan   i64 t;
108782a8a9f1Sdan   int iTask = (pTask - pTask->pSorter->aTask);
1088a9f43d73Sdan   sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
108982a8a9f1Sdan   fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
109082a8a9f1Sdan }
vdbeSorterBlockDebug(SortSubtask * pTask,int bBlocked,const char * zEvent)109182a8a9f1Sdan static void vdbeSorterBlockDebug(
109282a8a9f1Sdan   SortSubtask *pTask,
109382a8a9f1Sdan   int bBlocked,
109482a8a9f1Sdan   const char *zEvent
109582a8a9f1Sdan ){
109682a8a9f1Sdan   if( bBlocked ){
109782a8a9f1Sdan     i64 t;
1098a9f43d73Sdan     sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
109982a8a9f1Sdan     fprintf(stderr, "%lld:main %s\n", t, zEvent);
110082a8a9f1Sdan   }
110182a8a9f1Sdan }
110282a8a9f1Sdan #else
110382a8a9f1Sdan # define vdbeSorterWorkDebug(x,y)
1104958d261bSdrh # define vdbeSorterRewindDebug(y)
110582a8a9f1Sdan # define vdbeSorterPopulateDebug(x,y)
110682a8a9f1Sdan # define vdbeSorterBlockDebug(x,y,z)
110782a8a9f1Sdan #endif
110882a8a9f1Sdan 
1109b3f56fdbSdan #if SQLITE_MAX_WORKER_THREADS>0
111082a8a9f1Sdan /*
11111a088a8eSdan ** Join thread pTask->thread.
111282a8a9f1Sdan */
vdbeSorterJoinThread(SortSubtask * pTask)11131a088a8eSdan static int vdbeSorterJoinThread(SortSubtask *pTask){
111482a8a9f1Sdan   int rc = SQLITE_OK;
11151a088a8eSdan   if( pTask->pThread ){
111682a8a9f1Sdan #ifdef SQLITE_DEBUG_SORTER_THREADS
11171a088a8eSdan     int bDone = pTask->bDone;
111882a8a9f1Sdan #endif
1119b92284deSdrh     void *pRet = SQLITE_INT_TO_PTR(SQLITE_ERROR);
112082a8a9f1Sdan     vdbeSorterBlockDebug(pTask, !bDone, "enter");
1121b92284deSdrh     (void)sqlite3ThreadJoin(pTask->pThread, &pRet);
112282a8a9f1Sdan     vdbeSorterBlockDebug(pTask, !bDone, "exit");
1123b92284deSdrh     rc = SQLITE_PTR_TO_INT(pRet);
11241a088a8eSdan     assert( pTask->bDone==1 );
11251a088a8eSdan     pTask->bDone = 0;
11261a088a8eSdan     pTask->pThread = 0;
112782a8a9f1Sdan   }
112882a8a9f1Sdan   return rc;
112982a8a9f1Sdan }
113082a8a9f1Sdan 
113182a8a9f1Sdan /*
113282a8a9f1Sdan ** Launch a background thread to run xTask(pIn).
113382a8a9f1Sdan */
vdbeSorterCreateThread(SortSubtask * pTask,void * (* xTask)(void *),void * pIn)113482a8a9f1Sdan static int vdbeSorterCreateThread(
11351a088a8eSdan   SortSubtask *pTask,             /* Thread will use this task object */
113682a8a9f1Sdan   void *(*xTask)(void*),          /* Routine to run in a separate thread */
113782a8a9f1Sdan   void *pIn                       /* Argument passed into xTask() */
113882a8a9f1Sdan ){
11391a088a8eSdan   assert( pTask->pThread==0 && pTask->bDone==0 );
11401a088a8eSdan   return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn);
114182a8a9f1Sdan }
114282a8a9f1Sdan 
114382a8a9f1Sdan /*
114482a8a9f1Sdan ** Join all outstanding threads launched by SorterWrite() to create
114582a8a9f1Sdan ** level-0 PMAs.
114682a8a9f1Sdan */
vdbeSorterJoinAll(VdbeSorter * pSorter,int rcin)1147f8768418Sdan static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
1148f8768418Sdan   int rc = rcin;
1149f8768418Sdan   int i;
11500d3a4085Sdan 
11510d3a4085Sdan   /* This function is always called by the main user thread.
11520d3a4085Sdan   **
11530d3a4085Sdan   ** If this function is being called after SorterRewind() has been called,
11540d3a4085Sdan   ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread
11550d3a4085Sdan   ** is currently attempt to join one of the other threads. To avoid a race
11560d3a4085Sdan   ** condition where this thread also attempts to join the same object, join
11570d3a4085Sdan   ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */
11580d3a4085Sdan   for(i=pSorter->nTask-1; i>=0; i--){
1159a634fb15Sdrh     SortSubtask *pTask = &pSorter->aTask[i];
11601a088a8eSdan     int rc2 = vdbeSorterJoinThread(pTask);
1161f8768418Sdan     if( rc==SQLITE_OK ) rc = rc2;
1162f8768418Sdan   }
1163f8768418Sdan   return rc;
1164f8768418Sdan }
1165b3f56fdbSdan #else
1166b3f56fdbSdan # define vdbeSorterJoinAll(x,rcin) (rcin)
11671a088a8eSdan # define vdbeSorterJoinThread(pTask) SQLITE_OK
1168b3f56fdbSdan #endif
1169f8768418Sdan 
1170f8768418Sdan /*
1171ac65196eSdrh ** Allocate a new MergeEngine object capable of handling up to
1172ac65196eSdrh ** nReader PmaReader inputs.
1173ac65196eSdrh **
1174ac65196eSdrh ** nReader is automatically rounded up to the next power of two.
1175ac65196eSdrh ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up.
1176f8768418Sdan */
vdbeMergeEngineNew(int nReader)1177de823bedSdrh static MergeEngine *vdbeMergeEngineNew(int nReader){
1178de823bedSdrh   int N = 2;                      /* Smallest power of two >= nReader */
1179f8768418Sdan   int nByte;                      /* Total bytes of space to allocate */
1180a634fb15Sdrh   MergeEngine *pNew;              /* Pointer to allocated object to return */
1181f8768418Sdan 
1182de823bedSdrh   assert( nReader<=SORTER_MAX_MERGE_COUNT );
1183d30ab3d9Sdan 
1184de823bedSdrh   while( N<nReader ) N += N;
1185a634fb15Sdrh   nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
1186f8768418Sdan 
1187190d6959Sdrh   pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte);
1188f8768418Sdan   if( pNew ){
1189f8768418Sdan     pNew->nTree = N;
1190ac65196eSdrh     pNew->pTask = 0;
1191de823bedSdrh     pNew->aReadr = (PmaReader*)&pNew[1];
1192de823bedSdrh     pNew->aTree = (int*)&pNew->aReadr[N];
1193f8768418Sdan   }
1194f8768418Sdan   return pNew;
1195f8768418Sdan }
1196f8768418Sdan 
1197f8768418Sdan /*
1198a634fb15Sdrh ** Free the MergeEngine object passed as the only argument.
11995c2b3142Sdrh */
vdbeMergeEngineFree(MergeEngine * pMerger)1200a634fb15Sdrh static void vdbeMergeEngineFree(MergeEngine *pMerger){
12015c2b3142Sdrh   int i;
12025c2b3142Sdrh   if( pMerger ){
12035c2b3142Sdrh     for(i=0; i<pMerger->nTree; i++){
1204de823bedSdrh       vdbePmaReaderClear(&pMerger->aReadr[i]);
12055c2b3142Sdrh     }
12065c2b3142Sdrh   }
1207f8768418Sdan   sqlite3_free(pMerger);
1208f8768418Sdan }
12095c2b3142Sdrh 
12105c2b3142Sdrh /*
12111a088a8eSdan ** Free all resources associated with the IncrMerger object indicated by
12121a088a8eSdan ** the first argument.
12131a088a8eSdan */
vdbeIncrFree(IncrMerger * pIncr)12141a088a8eSdan static void vdbeIncrFree(IncrMerger *pIncr){
12151a088a8eSdan   if( pIncr ){
12161a088a8eSdan #if SQLITE_MAX_WORKER_THREADS>0
12171a088a8eSdan     if( pIncr->bUseThread ){
12181a088a8eSdan       vdbeSorterJoinThread(pIncr->pTask);
12191a088a8eSdan       if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
12201a088a8eSdan       if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
12211a088a8eSdan     }
12221a088a8eSdan #endif
12231a088a8eSdan     vdbeMergeEngineFree(pIncr->pMerger);
12241a088a8eSdan     sqlite3_free(pIncr);
12251a088a8eSdan   }
12261a088a8eSdan }
12271a088a8eSdan 
12281a088a8eSdan /*
122965ea12cbSdrh ** Reset a sorting cursor back to its original empty state.
1230a20fde64Sdan */
sqlite3VdbeSorterReset(sqlite3 * db,VdbeSorter * pSorter)123165ea12cbSdrh void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
1232a20fde64Sdan   int i;
1233a634fb15Sdrh   (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
12346cc37593Sdrh   assert( pSorter->bUseThreads || pSorter->pReader==0 );
12356cc37593Sdrh #if SQLITE_MAX_WORKER_THREADS>0
1236d30ab3d9Sdan   if( pSorter->pReader ){
1237d30ab3d9Sdan     vdbePmaReaderClear(pSorter->pReader);
1238d30ab3d9Sdan     sqlite3DbFree(db, pSorter->pReader);
1239d30ab3d9Sdan     pSorter->pReader = 0;
1240a20fde64Sdan   }
12416cc37593Sdrh #endif
1242f77ceba5Sdan   vdbeMergeEngineFree(pSorter->pMerger);
1243f77ceba5Sdan   pSorter->pMerger = 0;
1244a634fb15Sdrh   for(i=0; i<pSorter->nTask; i++){
1245a634fb15Sdrh     SortSubtask *pTask = &pSorter->aTask[i];
1246a634fb15Sdrh     vdbeSortSubtaskCleanup(db, pTask);
124796974bd3Sdan     pTask->pSorter = pSorter;
1248a20fde64Sdan   }
124982a8a9f1Sdan   if( pSorter->list.aMemory==0 ){
125082a8a9f1Sdan     vdbeSorterRecordFree(0, pSorter->list.pList);
1251c6e73455Sdan   }
125282a8a9f1Sdan   pSorter->list.pList = 0;
125382a8a9f1Sdan   pSorter->list.szPMA = 0;
12545c2b3142Sdrh   pSorter->bUsePMA = 0;
1255face0872Sdan   pSorter->iMemory = 0;
12564be4c406Sdan   pSorter->mxKeysize = 0;
1257d30ab3d9Sdan   sqlite3DbFree(db, pSorter->pUnpacked);
1258d30ab3d9Sdan   pSorter->pUnpacked = 0;
125965ea12cbSdrh }
126065ea12cbSdrh 
126165ea12cbSdrh /*
126265ea12cbSdrh ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
126365ea12cbSdrh */
sqlite3VdbeSorterClose(sqlite3 * db,VdbeCursor * pCsr)126465ea12cbSdrh void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
1265c960dcbaSdrh   VdbeSorter *pSorter;
1266c960dcbaSdrh   assert( pCsr->eCurType==CURTYPE_SORTER );
1267c960dcbaSdrh   pSorter = pCsr->uc.pSorter;
126865ea12cbSdrh   if( pSorter ){
126965ea12cbSdrh     sqlite3VdbeSorterReset(db, pSorter);
127082a8a9f1Sdan     sqlite3_free(pSorter->list.aMemory);
1271a20fde64Sdan     sqlite3DbFree(db, pSorter);
1272c960dcbaSdrh     pCsr->uc.pSorter = 0;
1273a20fde64Sdan   }
1274a20fde64Sdan }
1275a20fde64Sdan 
1276a9f43d73Sdan #if SQLITE_MAX_MMAP_SIZE>0
1277a9f43d73Sdan /*
1278a9f43d73Sdan ** The first argument is a file-handle open on a temporary file. The file
1279a9f43d73Sdan ** is guaranteed to be nByte bytes or smaller in size. This function
1280a9f43d73Sdan ** attempts to extend the file to nByte bytes in size and to ensure that
1281a9f43d73Sdan ** the VFS has memory mapped it.
1282a9f43d73Sdan **
1283a9f43d73Sdan ** Whether or not the file does end up memory mapped of course depends on
1284a9f43d73Sdan ** the specific VFS implementation.
1285a9f43d73Sdan */
vdbeSorterExtendFile(sqlite3 * db,sqlite3_file * pFd,i64 nByte)1286a4c8ca04Sdrh static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){
1287d74a90eaSdrh   if( nByte<=(i64)(db->nMaxSorterMmap) && pFd->pMethods->iVersion>=3 ){
1288a9f43d73Sdan     void *p = 0;
1289d348c66eSdan     int chunksize = 4*1024;
1290d348c66eSdan     sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_CHUNK_SIZE, &chunksize);
1291d348c66eSdan     sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_SIZE_HINT, &nByte);
1292a4c8ca04Sdrh     sqlite3OsFetch(pFd, 0, (int)nByte, &p);
129388944e6fSdrh     if( p ) sqlite3OsUnfetch(pFd, 0, p);
1294a9f43d73Sdan   }
1295a9f43d73Sdan }
1296a9f43d73Sdan #else
1297cd4b6377Sdrh # define vdbeSorterExtendFile(x,y,z)
1298a9f43d73Sdan #endif
1299a9f43d73Sdan 
1300a20fde64Sdan /*
1301c6e73455Sdan ** Allocate space for a file-handle and open a temporary file. If successful,
1302a4c8ca04Sdrh ** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK.
1303a4c8ca04Sdrh ** Otherwise, set *ppFd to 0 and return an SQLite error code.
1304c6e73455Sdan */
vdbeSorterOpenTempFile(sqlite3 * db,i64 nExtend,sqlite3_file ** ppFd)1305a9f43d73Sdan static int vdbeSorterOpenTempFile(
1306a9f43d73Sdan   sqlite3 *db,                    /* Database handle doing sort */
1307a9f43d73Sdan   i64 nExtend,                    /* Attempt to extend file to this size */
1308a4c8ca04Sdrh   sqlite3_file **ppFd
1309a9f43d73Sdan ){
1310face0872Sdan   int rc;
13112b3f1409Sdrh   if( sqlite3FaultSim(202) ) return SQLITE_IOERR_ACCESS;
1312a4c8ca04Sdrh   rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd,
13139d71142cSdan       SQLITE_OPEN_TEMP_JOURNAL |
1314c6e73455Sdan       SQLITE_OPEN_READWRITE    | SQLITE_OPEN_CREATE |
1315face0872Sdan       SQLITE_OPEN_EXCLUSIVE    | SQLITE_OPEN_DELETEONCLOSE, &rc
1316c6e73455Sdan   );
1317face0872Sdan   if( rc==SQLITE_OK ){
1318face0872Sdan     i64 max = SQLITE_MAX_MMAP_SIZE;
1319a4c8ca04Sdrh     sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
1320a9f43d73Sdan     if( nExtend>0 ){
1321a4c8ca04Sdrh       vdbeSorterExtendFile(db, *ppFd, nExtend);
1322c6e73455Sdan     }
1323face0872Sdan   }
1324face0872Sdan   return rc;
1325c6e73455Sdan }
1326c6e73455Sdan 
13271a088a8eSdan /*
13281a088a8eSdan ** If it has not already been allocated, allocate the UnpackedRecord
13291a088a8eSdan ** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or
13301a088a8eSdan ** if no allocation was required), or SQLITE_NOMEM otherwise.
13311a088a8eSdan */
vdbeSortAllocUnpacked(SortSubtask * pTask)133282a8a9f1Sdan static int vdbeSortAllocUnpacked(SortSubtask *pTask){
133382a8a9f1Sdan   if( pTask->pUnpacked==0 ){
1334a582b016Sdrh     pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pTask->pSorter->pKeyInfo);
1335a582b016Sdrh     if( pTask->pUnpacked==0 ) return SQLITE_NOMEM_BKPT;
1336a485ad19Sdrh     pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nKeyField;
133782a8a9f1Sdan     pTask->pUnpacked->errCode = 0;
133882a8a9f1Sdan   }
133982a8a9f1Sdan   return SQLITE_OK;
134082a8a9f1Sdan }
134182a8a9f1Sdan 
1342c6e73455Sdan 
13435134d135Sdan /*
134459ebc991Sdrh ** Merge the two sorted lists p1 and p2 into a single list.
13455134d135Sdan */
vdbeSorterMerge(SortSubtask * pTask,SorterRecord * p1,SorterRecord * p2)1346b982bfeaSdrh static SorterRecord *vdbeSorterMerge(
1347a634fb15Sdrh   SortSubtask *pTask,             /* Calling thread context */
13485134d135Sdan   SorterRecord *p1,               /* First list to merge */
1349b982bfeaSdrh   SorterRecord *p2                /* Second list to merge */
13505134d135Sdan ){
13515134d135Sdan   SorterRecord *pFinal = 0;
13525134d135Sdan   SorterRecord **pp = &pFinal;
1353a9d9111cSdan   int bCached = 0;
13545134d135Sdan 
1355b982bfeaSdrh   assert( p1!=0 && p2!=0 );
1356b982bfeaSdrh   for(;;){
13575134d135Sdan     int res;
1358a9d9111cSdan     res = pTask->xCompare(
1359a9d9111cSdan         pTask, &bCached, SRVAL(p1), p1->nVal, SRVAL(p2), p2->nVal
1360a9d9111cSdan     );
1361a9d9111cSdan 
13625134d135Sdan     if( res<=0 ){
13635134d135Sdan       *pp = p1;
13646971952cSdan       pp = &p1->u.pNext;
13656971952cSdan       p1 = p1->u.pNext;
1366b982bfeaSdrh       if( p1==0 ){
1367b982bfeaSdrh         *pp = p2;
1368b982bfeaSdrh         break;
1369b982bfeaSdrh       }
13705134d135Sdan     }else{
13715134d135Sdan       *pp = p2;
13726971952cSdan       pp = &p2->u.pNext;
13736971952cSdan       p2 = p2->u.pNext;
1374a9d9111cSdan       bCached = 0;
1375b982bfeaSdrh       if( p2==0 ){
1376b982bfeaSdrh         *pp = p1;
1377b982bfeaSdrh         break;
13785134d135Sdan       }
13795134d135Sdan     }
1380b982bfeaSdrh   }
1381b982bfeaSdrh   return pFinal;
13825134d135Sdan }
13831e74e602Sdan 
1384c6e73455Sdan /*
1385a9d9111cSdan ** Return the SorterCompare function to compare values collected by the
1386a9d9111cSdan ** sorter object passed as the only argument.
1387a9d9111cSdan */
vdbeSorterGetCompare(VdbeSorter * p)1388a9d9111cSdan static SorterCompare vdbeSorterGetCompare(VdbeSorter *p){
1389a9d9111cSdan   if( p->typeMask==SORTER_TYPE_INTEGER ){
1390a9d9111cSdan     return vdbeSorterCompareInt;
1391a9d9111cSdan   }else if( p->typeMask==SORTER_TYPE_TEXT ){
1392a9d9111cSdan     return vdbeSorterCompareText;
1393a9d9111cSdan   }
1394a9d9111cSdan   return vdbeSorterCompare;
1395a9d9111cSdan }
1396a9d9111cSdan 
1397a9d9111cSdan /*
1398a634fb15Sdrh ** Sort the linked list of records headed at pTask->pList. Return
1399f8768418Sdan ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if
1400f8768418Sdan ** an error occurs.
14015134d135Sdan */
vdbeSorterSort(SortSubtask * pTask,SorterList * pList)140282a8a9f1Sdan static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){
14035134d135Sdan   int i;
14045134d135Sdan   SorterRecord *p;
140582a8a9f1Sdan   int rc;
140638587159Sdrh   SorterRecord *aSlot[64];
140782a8a9f1Sdan 
140882a8a9f1Sdan   rc = vdbeSortAllocUnpacked(pTask);
140982a8a9f1Sdan   if( rc!=SQLITE_OK ) return rc;
14105134d135Sdan 
141157a14094Sdan   p = pList->pList;
1412a9d9111cSdan   pTask->xCompare = vdbeSorterGetCompare(pTask->pSorter);
141338587159Sdrh   memset(aSlot, 0, sizeof(aSlot));
14145134d135Sdan 
14155134d135Sdan   while( p ){
14166971952cSdan     SorterRecord *pNext;
141782a8a9f1Sdan     if( pList->aMemory ){
141882a8a9f1Sdan       if( (u8*)p==pList->aMemory ){
14196971952cSdan         pNext = 0;
14206971952cSdan       }else{
142182a8a9f1Sdan         assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
142282a8a9f1Sdan         pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
14236971952cSdan       }
14246971952cSdan     }else{
14256971952cSdan       pNext = p->u.pNext;
14266971952cSdan     }
14272f170015Sdan 
14286971952cSdan     p->u.pNext = 0;
142959ebc991Sdrh     for(i=0; aSlot[i]; i++){
1430b982bfeaSdrh       p = vdbeSorterMerge(pTask, p, aSlot[i]);
14315134d135Sdan       aSlot[i] = 0;
14325134d135Sdan     }
14335134d135Sdan     aSlot[i] = p;
14345134d135Sdan     p = pNext;
14355134d135Sdan   }
14365134d135Sdan 
14375134d135Sdan   p = 0;
143838587159Sdrh   for(i=0; i<ArraySize(aSlot); i++){
1439b982bfeaSdrh     if( aSlot[i]==0 ) continue;
1440b982bfeaSdrh     p = p ? vdbeSorterMerge(pTask, p, aSlot[i]) : aSlot[i];
14415134d135Sdan   }
144282a8a9f1Sdan   pList->pList = p;
14435134d135Sdan 
1444d94d4ee7Sdan   assert( pTask->pUnpacked->errCode==SQLITE_OK
1445d94d4ee7Sdan        || pTask->pUnpacked->errCode==SQLITE_NOMEM
1446d94d4ee7Sdan   );
1447d94d4ee7Sdan   return pTask->pUnpacked->errCode;
14485134d135Sdan }
14495134d135Sdan 
14503b2c9b32Sdan /*
1451a634fb15Sdrh ** Initialize a PMA-writer object.
14523b2c9b32Sdan */
vdbePmaWriterInit(sqlite3_file * pFd,PmaWriter * p,int nBuf,i64 iStart)1453a634fb15Sdrh static void vdbePmaWriterInit(
1454a4c8ca04Sdrh   sqlite3_file *pFd,              /* File handle to write to */
1455a634fb15Sdrh   PmaWriter *p,                   /* Object to populate */
1456f8768418Sdan   int nBuf,                       /* Buffer size */
1457a4c8ca04Sdrh   i64 iStart                      /* Offset of pFd to begin writing at */
14583b2c9b32Sdan ){
1459a634fb15Sdrh   memset(p, 0, sizeof(PmaWriter));
1460f8768418Sdan   p->aBuffer = (u8*)sqlite3Malloc(nBuf);
146107f54792Sdrh   if( !p->aBuffer ){
1462fad3039cSmistachkin     p->eFWErr = SQLITE_NOMEM_BKPT;
146307f54792Sdrh   }else{
14643b2c9b32Sdan     p->iBufEnd = p->iBufStart = (iStart % nBuf);
14653b2c9b32Sdan     p->iWriteOff = iStart - p->iBufStart;
14663b2c9b32Sdan     p->nBuffer = nBuf;
1467a4c8ca04Sdrh     p->pFd = pFd;
146807f54792Sdrh   }
14693b2c9b32Sdan }
14703b2c9b32Sdan 
14713b2c9b32Sdan /*
1472a634fb15Sdrh ** Write nData bytes of data to the PMA. Return SQLITE_OK
14733b2c9b32Sdan ** if successful, or an SQLite error code if an error occurs.
14743b2c9b32Sdan */
vdbePmaWriteBlob(PmaWriter * p,u8 * pData,int nData)1475a634fb15Sdrh static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){
14763b2c9b32Sdan   int nRem = nData;
147707f54792Sdrh   while( nRem>0 && p->eFWErr==0 ){
14783b2c9b32Sdan     int nCopy = nRem;
14793b2c9b32Sdan     if( nCopy>(p->nBuffer - p->iBufEnd) ){
14803b2c9b32Sdan       nCopy = p->nBuffer - p->iBufEnd;
14813b2c9b32Sdan     }
14823b2c9b32Sdan 
14833b2c9b32Sdan     memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy);
14843b2c9b32Sdan     p->iBufEnd += nCopy;
14853b2c9b32Sdan     if( p->iBufEnd==p->nBuffer ){
1486a4c8ca04Sdrh       p->eFWErr = sqlite3OsWrite(p->pFd,
14873b2c9b32Sdan           &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
14883b2c9b32Sdan           p->iWriteOff + p->iBufStart
14893b2c9b32Sdan       );
14903b2c9b32Sdan       p->iBufStart = p->iBufEnd = 0;
14913b2c9b32Sdan       p->iWriteOff += p->nBuffer;
14923b2c9b32Sdan     }
14933b2c9b32Sdan     assert( p->iBufEnd<p->nBuffer );
14943b2c9b32Sdan 
14953b2c9b32Sdan     nRem -= nCopy;
14963b2c9b32Sdan   }
14973b2c9b32Sdan }
14983b2c9b32Sdan 
14993b2c9b32Sdan /*
1500a634fb15Sdrh ** Flush any buffered data to disk and clean up the PMA-writer object.
1501a634fb15Sdrh ** The results of using the PMA-writer after this call are undefined.
15023b2c9b32Sdan ** Return SQLITE_OK if flushing the buffered data succeeds or is not
15033b2c9b32Sdan ** required. Otherwise, return an SQLite error code.
15043b2c9b32Sdan **
15053b2c9b32Sdan ** Before returning, set *piEof to the offset immediately following the
15063b2c9b32Sdan ** last byte written to the file.
15073b2c9b32Sdan */
vdbePmaWriterFinish(PmaWriter * p,i64 * piEof)1508a634fb15Sdrh static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){
150907f54792Sdrh   int rc;
151007f54792Sdrh   if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){
1511a4c8ca04Sdrh     p->eFWErr = sqlite3OsWrite(p->pFd,
15123b2c9b32Sdan         &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
15133b2c9b32Sdan         p->iWriteOff + p->iBufStart
15143b2c9b32Sdan     );
15153b2c9b32Sdan   }
15163b2c9b32Sdan   *piEof = (p->iWriteOff + p->iBufEnd);
1517f8768418Sdan   sqlite3_free(p->aBuffer);
151807f54792Sdrh   rc = p->eFWErr;
1519a634fb15Sdrh   memset(p, 0, sizeof(PmaWriter));
15203b2c9b32Sdan   return rc;
15213b2c9b32Sdan }
15223b2c9b32Sdan 
15233b2c9b32Sdan /*
1524a634fb15Sdrh ** Write value iVal encoded as a varint to the PMA. Return
15253b2c9b32Sdan ** SQLITE_OK if successful, or an SQLite error code if an error occurs.
15263b2c9b32Sdan */
vdbePmaWriteVarint(PmaWriter * p,u64 iVal)1527a634fb15Sdrh static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){
15283b2c9b32Sdan   int nByte;
15293b2c9b32Sdan   u8 aByte[10];
15303b2c9b32Sdan   nByte = sqlite3PutVarint(aByte, iVal);
1531a634fb15Sdrh   vdbePmaWriteBlob(p, aByte, nByte);
15323b2c9b32Sdan }
15335134d135Sdan 
15345134d135Sdan /*
153582a8a9f1Sdan ** Write the current contents of in-memory linked-list pList to a level-0
153682a8a9f1Sdan ** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if
153782a8a9f1Sdan ** successful, or an SQLite error code otherwise.
1538e6f7bc63Sdan **
1539e6f7bc63Sdan ** The format of a PMA is:
1540e6f7bc63Sdan **
1541e6f7bc63Sdan **     * A varint. This varint contains the total number of bytes of content
1542e6f7bc63Sdan **       in the PMA (not including the varint itself).
1543e6f7bc63Sdan **
1544e6f7bc63Sdan **     * One or more records packed end-to-end in order of ascending keys.
1545e6f7bc63Sdan **       Each record consists of a varint followed by a blob of data (the
1546e6f7bc63Sdan **       key). The varint is the number of bytes in the blob of data.
1547c6e73455Sdan */
vdbeSorterListToPMA(SortSubtask * pTask,SorterList * pList)154882a8a9f1Sdan static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
15491a088a8eSdan   sqlite3 *db = pTask->pSorter->db;
1550c6e73455Sdan   int rc = SQLITE_OK;             /* Return code */
1551a634fb15Sdrh   PmaWriter writer;               /* Object used to write to the file */
15523b2c9b32Sdan 
155382a8a9f1Sdan #ifdef SQLITE_DEBUG
155482a8a9f1Sdan   /* Set iSz to the expected size of file pTask->file after writing the PMA.
155582a8a9f1Sdan   ** This is used by an assert() statement at the end of this function.  */
155682a8a9f1Sdan   i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
155782a8a9f1Sdan #endif
1558c6e73455Sdan 
155982a8a9f1Sdan   vdbeSorterWorkDebug(pTask, "enter");
1560a634fb15Sdrh   memset(&writer, 0, sizeof(PmaWriter));
156182a8a9f1Sdan   assert( pList->szPMA>0 );
1562c6e73455Sdan 
1563c6e73455Sdan   /* If the first temporary PMA file has not been opened, open it now. */
1564d30ab3d9Sdan   if( pTask->file.pFd==0 ){
1565a9f43d73Sdan     rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd);
1566d30ab3d9Sdan     assert( rc!=SQLITE_OK || pTask->file.pFd );
1567d30ab3d9Sdan     assert( pTask->file.iEof==0 );
1568a634fb15Sdrh     assert( pTask->nPMA==0 );
1569c6e73455Sdan   }
1570c6e73455Sdan 
1571face0872Sdan   /* Try to get the file to memory map */
1572face0872Sdan   if( rc==SQLITE_OK ){
15731a088a8eSdan     vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9);
1574face0872Sdan   }
1575face0872Sdan 
157682a8a9f1Sdan   /* Sort the list */
157782a8a9f1Sdan   if( rc==SQLITE_OK ){
157882a8a9f1Sdan     rc = vdbeSorterSort(pTask, pList);
1579c6e73455Sdan   }
1580c6e73455Sdan 
1581c6e73455Sdan   if( rc==SQLITE_OK ){
15823b2c9b32Sdan     SorterRecord *p;
15833b2c9b32Sdan     SorterRecord *pNext = 0;
15843b2c9b32Sdan 
15851a088a8eSdan     vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
1586d30ab3d9Sdan                       pTask->file.iEof);
1587a634fb15Sdrh     pTask->nPMA++;
158882a8a9f1Sdan     vdbePmaWriteVarint(&writer, pList->szPMA);
158982a8a9f1Sdan     for(p=pList->pList; p; p=pNext){
15906971952cSdan       pNext = p->u.pNext;
1591a634fb15Sdrh       vdbePmaWriteVarint(&writer, p->nVal);
1592a634fb15Sdrh       vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
159382a8a9f1Sdan       if( pList->aMemory==0 ) sqlite3_free(p);
1594c6e73455Sdan     }
159582a8a9f1Sdan     pList->pList = p;
1596d30ab3d9Sdan     rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
1597f8768418Sdan   }
1598f8768418Sdan 
159982a8a9f1Sdan   vdbeSorterWorkDebug(pTask, "exit");
160082a8a9f1Sdan   assert( rc!=SQLITE_OK || pList->pList==0 );
160182a8a9f1Sdan   assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
1602f8768418Sdan   return rc;
1603f8768418Sdan }
1604f8768418Sdan 
1605f8768418Sdan /*
1606bde27aaaSdrh ** Advance the MergeEngine to its next entry.
1607bde27aaaSdrh ** Set *pbEof to true there is no next entry because
1608ac65196eSdrh ** the MergeEngine has reached the end of all its inputs.
1609f8768418Sdan **
1610f8768418Sdan ** Return SQLITE_OK if successful or an error code if an error occurs.
1611f8768418Sdan */
vdbeMergeEngineStep(MergeEngine * pMerger,int * pbEof)1612ac65196eSdrh static int vdbeMergeEngineStep(
1613ac65196eSdrh   MergeEngine *pMerger,      /* The merge engine to advance to the next row */
1614ac65196eSdrh   int *pbEof                 /* Set TRUE at EOF.  Set false for more content */
1615f8768418Sdan ){
1616f8768418Sdan   int rc;
1617de823bedSdrh   int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */
1618bde27aaaSdrh   SortSubtask *pTask = pMerger->pTask;
1619ac65196eSdrh 
1620de823bedSdrh   /* Advance the current PmaReader */
1621de823bedSdrh   rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]);
1622f8768418Sdan 
1623f8768418Sdan   /* Update contents of aTree[] */
1624ff9fce4dSdan   if( rc==SQLITE_OK ){
1625ff9fce4dSdan     int i;                      /* Index of aTree[] to recalculate */
1626de823bedSdrh     PmaReader *pReadr1;         /* First PmaReader to compare */
1627de823bedSdrh     PmaReader *pReadr2;         /* Second PmaReader to compare */
1628a9d9111cSdan     int bCached = 0;
1629ff9fce4dSdan 
1630de823bedSdrh     /* Find the first two PmaReaders to compare. The one that was just
1631ff9fce4dSdan     ** advanced (iPrev) and the one next to it in the array.  */
1632de823bedSdrh     pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)];
1633de823bedSdrh     pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)];
1634ff9fce4dSdan 
1635ff9fce4dSdan     for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){
1636de823bedSdrh       /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */
1637ff9fce4dSdan       int iRes;
1638a4c8ca04Sdrh       if( pReadr1->pFd==0 ){
1639ff9fce4dSdan         iRes = +1;
1640a4c8ca04Sdrh       }else if( pReadr2->pFd==0 ){
1641ff9fce4dSdan         iRes = -1;
1642ff9fce4dSdan       }else{
1643a9d9111cSdan         iRes = pTask->xCompare(pTask, &bCached,
1644a9d9111cSdan             pReadr1->aKey, pReadr1->nKey, pReadr2->aKey, pReadr2->nKey
1645ff9fce4dSdan         );
1646f8768418Sdan       }
1647f8768418Sdan 
1648de823bedSdrh       /* If pReadr1 contained the smaller value, set aTree[i] to its index.
1649de823bedSdrh       ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this
1650de823bedSdrh       ** case there is no cache of pReadr2 in pTask->pUnpacked, so set
1651de823bedSdrh       ** pKey2 to point to the record belonging to pReadr2.
1652ff9fce4dSdan       **
1653de823bedSdrh       ** Alternatively, if pReadr2 contains the smaller of the two values,
1654de823bedSdrh       ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare()
1655a634fb15Sdrh       ** was actually called above, then pTask->pUnpacked now contains
1656de823bedSdrh       ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent
1657de823bedSdrh       ** vdbeSorterCompare() from decoding pReadr2 again.
1658ff9fce4dSdan       **
1659ff9fce4dSdan       ** If the two values were equal, then the value from the oldest
1660de823bedSdrh       ** PMA should be considered smaller. The VdbeSorter.aReadr[] array
1661de823bedSdrh       ** is sorted from oldest to newest, so pReadr1 contains older values
1662de823bedSdrh       ** than pReadr2 iff (pReadr1<pReadr2).  */
1663de823bedSdrh       if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){
1664de823bedSdrh         pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr);
1665de823bedSdrh         pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
1666a9d9111cSdan         bCached = 0;
1667ff9fce4dSdan       }else{
166829f1a19cSdan         if( pReadr1->pFd ) bCached = 0;
1669de823bedSdrh         pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr);
1670de823bedSdrh         pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
1671ff9fce4dSdan       }
1672ff9fce4dSdan     }
1673a4c8ca04Sdrh     *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0);
1674ff9fce4dSdan   }
1675ff9fce4dSdan 
1676d94d4ee7Sdan   return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc);
1677f8768418Sdan }
1678f8768418Sdan 
1679958d261bSdrh #if SQLITE_MAX_WORKER_THREADS>0
1680f8768418Sdan /*
16811a088a8eSdan ** The main routine for background threads that write level-0 PMAs.
1682f8768418Sdan */
vdbeSorterFlushThread(void * pCtx)168382a8a9f1Sdan static void *vdbeSorterFlushThread(void *pCtx){
1684a634fb15Sdrh   SortSubtask *pTask = (SortSubtask*)pCtx;
168582a8a9f1Sdan   int rc;                         /* Return code */
16861a088a8eSdan   assert( pTask->bDone==0 );
168782a8a9f1Sdan   rc = vdbeSorterListToPMA(pTask, &pTask->list);
16881a088a8eSdan   pTask->bDone = 1;
1689f8768418Sdan   return SQLITE_INT_TO_PTR(rc);
1690f8768418Sdan }
1691958d261bSdrh #endif /* SQLITE_MAX_WORKER_THREADS>0 */
1692f8768418Sdan 
1693f8768418Sdan /*
169482a8a9f1Sdan ** Flush the current contents of VdbeSorter.list to a new PMA, possibly
1695f8768418Sdan ** using a background thread.
1696f8768418Sdan */
vdbeSorterFlushPMA(VdbeSorter * pSorter)169782a8a9f1Sdan static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
169882a8a9f1Sdan #if SQLITE_MAX_WORKER_THREADS==0
169982a8a9f1Sdan   pSorter->bUsePMA = 1;
170082a8a9f1Sdan   return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
170182a8a9f1Sdan #else
1702f8768418Sdan   int rc = SQLITE_OK;
1703f8768418Sdan   int i;
1704a634fb15Sdrh   SortSubtask *pTask = 0;    /* Thread context used to create new PMA */
1705a634fb15Sdrh   int nWorker = (pSorter->nTask-1);
1706f8768418Sdan 
170782a8a9f1Sdan   /* Set the flag to indicate that at least one PMA has been written.
170882a8a9f1Sdan   ** Or will be, anyhow.  */
1709f8768418Sdan   pSorter->bUsePMA = 1;
171082a8a9f1Sdan 
171182a8a9f1Sdan   /* Select a sub-task to sort and flush the current list of in-memory
171282a8a9f1Sdan   ** records to disk. If the sorter is running in multi-threaded mode,
171382a8a9f1Sdan   ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
171482a8a9f1Sdan   ** the background thread from a sub-tasks previous turn is still running,
171582a8a9f1Sdan   ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
171682a8a9f1Sdan   ** fall back to using the final sub-task. The first (pSorter->nTask-1)
171782a8a9f1Sdan   ** sub-tasks are prefered as they use background threads - the final
171882a8a9f1Sdan   ** sub-task uses the main thread. */
1719578e1ca8Sdan   for(i=0; i<nWorker; i++){
1720578e1ca8Sdan     int iTest = (pSorter->iPrev + i + 1) % nWorker;
1721a634fb15Sdrh     pTask = &pSorter->aTask[iTest];
17221a088a8eSdan     if( pTask->bDone ){
17231a088a8eSdan       rc = vdbeSorterJoinThread(pTask);
1724f8768418Sdan     }
1725d94d4ee7Sdan     if( rc!=SQLITE_OK || pTask->pThread==0 ) break;
1726f8768418Sdan   }
1727f8768418Sdan 
1728f8768418Sdan   if( rc==SQLITE_OK ){
172982a8a9f1Sdan     if( i==nWorker ){
1730f8768418Sdan       /* Use the foreground thread for this operation */
173182a8a9f1Sdan       rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
173282a8a9f1Sdan     }else{
173382a8a9f1Sdan       /* Launch a background thread for this operation */
173455f66b34Sdrh       u8 *aMem;
173555f66b34Sdrh       void *pCtx;
173682a8a9f1Sdan 
173755f66b34Sdrh       assert( pTask!=0 );
17381a088a8eSdan       assert( pTask->pThread==0 && pTask->bDone==0 );
173982a8a9f1Sdan       assert( pTask->list.pList==0 );
174082a8a9f1Sdan       assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
174182a8a9f1Sdan 
174255f66b34Sdrh       aMem = pTask->list.aMemory;
174355f66b34Sdrh       pCtx = (void*)pTask;
17440f8f2671Sdrh       pSorter->iPrev = (u8)(pTask - pSorter->aTask);
174582a8a9f1Sdan       pTask->list = pSorter->list;
174682a8a9f1Sdan       pSorter->list.pList = 0;
174782a8a9f1Sdan       pSorter->list.szPMA = 0;
174882a8a9f1Sdan       if( aMem ){
174982a8a9f1Sdan         pSorter->list.aMemory = aMem;
175082a8a9f1Sdan         pSorter->nMemory = sqlite3MallocSize(aMem);
17510d51def2Sdan       }else if( pSorter->list.aMemory ){
175282a8a9f1Sdan         pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
1753fad3039cSmistachkin         if( !pSorter->list.aMemory ) return SQLITE_NOMEM_BKPT;
1754dd95d30fSdan       }
175582a8a9f1Sdan 
17561a088a8eSdan       rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
1757f8768418Sdan     }
1758c6e73455Sdan   }
1759c6e73455Sdan 
1760c6e73455Sdan   return rc;
1761958d261bSdrh #endif /* SQLITE_MAX_WORKER_THREADS!=0 */
1762c6e73455Sdan }
1763c6e73455Sdan 
1764c6e73455Sdan /*
17655134d135Sdan ** Add a record to the sorter.
1766a20fde64Sdan */
sqlite3VdbeSorterWrite(const VdbeCursor * pCsr,Mem * pVal)17675134d135Sdan int sqlite3VdbeSorterWrite(
1768c041c16cSdrh   const VdbeCursor *pCsr,         /* Sorter cursor */
17695134d135Sdan   Mem *pVal                       /* Memory cell containing record */
17705134d135Sdan ){
1771c960dcbaSdrh   VdbeSorter *pSorter;
17727733a4dbSdan   int rc = SQLITE_OK;             /* Return Code */
17737733a4dbSdan   SorterRecord *pNew;             /* New list element */
17746971952cSdan   int bFlush;                     /* True to flush contents of memory to PMA */
17756971952cSdan   int nReq;                       /* Bytes of memory required */
17766971952cSdan   int nPMA;                       /* Bytes of PMA space required */
177757a14094Sdan   int t;                          /* serial type of first record field */
177857a14094Sdan 
1779c960dcbaSdrh   assert( pCsr->eCurType==CURTYPE_SORTER );
1780c960dcbaSdrh   pSorter = pCsr->uc.pSorter;
178102a95eb9Sdrh   getVarint32NR((const u8*)&pVal->z[1], t);
178257a14094Sdan   if( t>0 && t<10 && t!=7 ){
178357a14094Sdan     pSorter->typeMask &= SORTER_TYPE_INTEGER;
1784a9d9111cSdan   }else if( t>10 && (t & 0x01) ){
178557a14094Sdan     pSorter->typeMask &= SORTER_TYPE_TEXT;
178657a14094Sdan   }else{
178757a14094Sdan     pSorter->typeMask = 0;
178857a14094Sdan   }
17896971952cSdan 
17905134d135Sdan   assert( pSorter );
17912a5d9908Sdrh 
17926971952cSdan   /* Figure out whether or not the current contents of memory should be
17936971952cSdan   ** flushed to a PMA before continuing. If so, do so.
17946971952cSdan   **
17956971952cSdan   ** If using the single large allocation mode (pSorter->aMemory!=0), then
17966971952cSdan   ** flush the contents of memory to a new PMA if (a) at least one value is
17976971952cSdan   ** already in memory and (b) the new value will not fit in memory.
17986971952cSdan   **
17996971952cSdan   ** Or, if using separate allocations for each record, flush the contents
18006971952cSdan   ** of memory to a PMA if either of the following are true:
18015134d135Sdan   **
18025134d135Sdan   **   * The total memory allocated for the in-memory list is greater
18035134d135Sdan   **     than (page-size * cache-size), or
18045134d135Sdan   **
18055134d135Sdan   **   * The total memory allocated for the in-memory list is greater
18065134d135Sdan   **     than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
18075134d135Sdan   */
18086971952cSdan   nReq = pVal->n + sizeof(SorterRecord);
18096971952cSdan   nPMA = pVal->n + sqlite3VarintLen(pVal->n);
1810e7c84cc7Sdan   if( pSorter->mxPmaSize ){
181182a8a9f1Sdan     if( pSorter->list.aMemory ){
18126971952cSdan       bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
18136971952cSdan     }else{
18146971952cSdan       bFlush = (
181582a8a9f1Sdan           (pSorter->list.szPMA > pSorter->mxPmaSize)
181682a8a9f1Sdan        || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
18176971952cSdan       );
1818a20fde64Sdan     }
18196971952cSdan     if( bFlush ){
182082a8a9f1Sdan       rc = vdbeSorterFlushPMA(pSorter);
182182a8a9f1Sdan       pSorter->list.szPMA = 0;
1822face0872Sdan       pSorter->iMemory = 0;
182382a8a9f1Sdan       assert( rc!=SQLITE_OK || pSorter->list.pList==0 );
1824face0872Sdan     }
1825e7c84cc7Sdan   }
18266971952cSdan 
182782a8a9f1Sdan   pSorter->list.szPMA += nPMA;
18284be4c406Sdan   if( nPMA>pSorter->mxKeysize ){
18294be4c406Sdan     pSorter->mxKeysize = nPMA;
18304be4c406Sdan   }
18316971952cSdan 
183282a8a9f1Sdan   if( pSorter->list.aMemory ){
18336971952cSdan     int nMin = pSorter->iMemory + nReq;
18346971952cSdan 
18356971952cSdan     if( nMin>pSorter->nMemory ){
18366971952cSdan       u8 *aNew;
18370aa3231fSdrh       sqlite3_int64 nNew = 2 * (sqlite3_int64)pSorter->nMemory;
18382eb2ca83Sdan       int iListOff = -1;
18392eb2ca83Sdan       if( pSorter->list.pList ){
18402eb2ca83Sdan         iListOff = (u8*)pSorter->list.pList - pSorter->list.aMemory;
18412eb2ca83Sdan       }
18426971952cSdan       while( nNew < nMin ) nNew = nNew*2;
18436971952cSdan       if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
18446971952cSdan       if( nNew < nMin ) nNew = nMin;
184582a8a9f1Sdan       aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
1846fad3039cSmistachkin       if( !aNew ) return SQLITE_NOMEM_BKPT;
18472eb2ca83Sdan       if( iListOff>=0 ){
184898a4d5a7Sdan         pSorter->list.pList = (SorterRecord*)&aNew[iListOff];
18492eb2ca83Sdan       }
185082a8a9f1Sdan       pSorter->list.aMemory = aNew;
18516971952cSdan       pSorter->nMemory = nNew;
1852a20fde64Sdan     }
18535134d135Sdan 
185482a8a9f1Sdan     pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
18556971952cSdan     pSorter->iMemory += ROUND8(nReq);
18562aac8c7bSdrh     if( pSorter->list.pList ){
1857958d261bSdrh       pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory);
18582aac8c7bSdrh     }
18596971952cSdan   }else{
1860e7c84cc7Sdan     pNew = (SorterRecord *)sqlite3Malloc(nReq);
18616971952cSdan     if( pNew==0 ){
1862fad3039cSmistachkin       return SQLITE_NOMEM_BKPT;
18636971952cSdan     }
186482a8a9f1Sdan     pNew->u.pNext = pSorter->list.pList;
18656971952cSdan   }
18666971952cSdan 
18676971952cSdan   memcpy(SRVAL(pNew), pVal->z, pVal->n);
18686971952cSdan   pNew->nVal = pVal->n;
186982a8a9f1Sdan   pSorter->list.pList = pNew;
1870a20fde64Sdan 
1871a20fde64Sdan   return rc;
1872a20fde64Sdan }
1873a20fde64Sdan 
1874a20fde64Sdan /*
1875d30ab3d9Sdan ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
1876d30ab3d9Sdan ** of the data stored in aFile[1] is the same as that used by regular PMAs,
1877d30ab3d9Sdan ** except that the number-of-bytes varint is omitted from the start.
1878d30ab3d9Sdan */
vdbeIncrPopulate(IncrMerger * pIncr)1879d30ab3d9Sdan static int vdbeIncrPopulate(IncrMerger *pIncr){
1880d30ab3d9Sdan   int rc = SQLITE_OK;
1881d30ab3d9Sdan   int rc2;
18824be4c406Sdan   i64 iStart = pIncr->iStartOff;
1883d30ab3d9Sdan   SorterFile *pOut = &pIncr->aFile[1];
18841a088a8eSdan   SortSubtask *pTask = pIncr->pTask;
1885d30ab3d9Sdan   MergeEngine *pMerger = pIncr->pMerger;
1886d30ab3d9Sdan   PmaWriter writer;
1887d30ab3d9Sdan   assert( pIncr->bEof==0 );
1888d30ab3d9Sdan 
18891a088a8eSdan   vdbeSorterPopulateDebug(pTask, "enter");
18904be4c406Sdan 
18911a088a8eSdan   vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart);
1892d30ab3d9Sdan   while( rc==SQLITE_OK ){
1893d30ab3d9Sdan     int dummy;
1894de823bedSdrh     PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ];
1895d30ab3d9Sdan     int nKey = pReader->nKey;
1896d30ab3d9Sdan     i64 iEof = writer.iWriteOff + writer.iBufEnd;
1897d30ab3d9Sdan 
1898d30ab3d9Sdan     /* Check if the output file is full or if the input has been exhausted.
1899d30ab3d9Sdan     ** In either case exit the loop. */
1900a4c8ca04Sdrh     if( pReader->pFd==0 ) break;
19014be4c406Sdan     if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
1902d30ab3d9Sdan 
1903d30ab3d9Sdan     /* Write the next key to the output. */
1904d30ab3d9Sdan     vdbePmaWriteVarint(&writer, nKey);
1905d30ab3d9Sdan     vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
1906bde27aaaSdrh     assert( pIncr->pMerger->pTask==pTask );
1907bde27aaaSdrh     rc = vdbeMergeEngineStep(pIncr->pMerger, &dummy);
1908d30ab3d9Sdan   }
1909d30ab3d9Sdan 
1910d30ab3d9Sdan   rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
1911d30ab3d9Sdan   if( rc==SQLITE_OK ) rc = rc2;
19121a088a8eSdan   vdbeSorterPopulateDebug(pTask, "exit");
1913d30ab3d9Sdan   return rc;
1914d30ab3d9Sdan }
1915d30ab3d9Sdan 
19161a088a8eSdan #if SQLITE_MAX_WORKER_THREADS>0
19171a088a8eSdan /*
19181a088a8eSdan ** The main routine for background threads that populate aFile[1] of
19191a088a8eSdan ** multi-threaded IncrMerger objects.
19201a088a8eSdan */
vdbeIncrPopulateThread(void * pCtx)192182a8a9f1Sdan static void *vdbeIncrPopulateThread(void *pCtx){
1922d30ab3d9Sdan   IncrMerger *pIncr = (IncrMerger*)pCtx;
192382a8a9f1Sdan   void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
19241a088a8eSdan   pIncr->pTask->bDone = 1;
192582a8a9f1Sdan   return pRet;
1926d30ab3d9Sdan }
1927d30ab3d9Sdan 
19281a088a8eSdan /*
19291a088a8eSdan ** Launch a background thread to populate aFile[1] of pIncr.
19301a088a8eSdan */
vdbeIncrBgPopulate(IncrMerger * pIncr)193182a8a9f1Sdan static int vdbeIncrBgPopulate(IncrMerger *pIncr){
19321a088a8eSdan   void *p = (void*)pIncr;
193382a8a9f1Sdan   assert( pIncr->bUseThread );
19341a088a8eSdan   return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p);
1935d30ab3d9Sdan }
1936a20fde64Sdan #endif
1937d30ab3d9Sdan 
19381a088a8eSdan /*
19391a088a8eSdan ** This function is called when the PmaReader corresponding to pIncr has
19401a088a8eSdan ** finished reading the contents of aFile[0]. Its purpose is to "refill"
1941de823bedSdrh ** aFile[0] such that the PmaReader should start rereading it from the
19421a088a8eSdan ** beginning.
19431a088a8eSdan **
19441a088a8eSdan ** For single-threaded objects, this is accomplished by literally reading
19451a088a8eSdan ** keys from pIncr->pMerger and repopulating aFile[0].
19461a088a8eSdan **
19471a088a8eSdan ** For multi-threaded objects, all that is required is to wait until the
19481a088a8eSdan ** background thread is finished (if it is not already) and then swap
19491a088a8eSdan ** aFile[0] and aFile[1] in place. If the contents of pMerger have not
19501a088a8eSdan ** been exhausted, this function also launches a new background thread
19511a088a8eSdan ** to populate the new aFile[1].
19521a088a8eSdan **
19531a088a8eSdan ** SQLITE_OK is returned on success, or an SQLite error code otherwise.
19541a088a8eSdan */
vdbeIncrSwap(IncrMerger * pIncr)1955d30ab3d9Sdan static int vdbeIncrSwap(IncrMerger *pIncr){
1956d30ab3d9Sdan   int rc = SQLITE_OK;
1957d30ab3d9Sdan 
19584be4c406Sdan #if SQLITE_MAX_WORKER_THREADS>0
195982a8a9f1Sdan   if( pIncr->bUseThread ){
19601a088a8eSdan     rc = vdbeSorterJoinThread(pIncr->pTask);
1961d30ab3d9Sdan 
1962d30ab3d9Sdan     if( rc==SQLITE_OK ){
1963d30ab3d9Sdan       SorterFile f0 = pIncr->aFile[0];
1964d30ab3d9Sdan       pIncr->aFile[0] = pIncr->aFile[1];
1965d30ab3d9Sdan       pIncr->aFile[1] = f0;
19664be4c406Sdan     }
1967d30ab3d9Sdan 
19684be4c406Sdan     if( rc==SQLITE_OK ){
19694be4c406Sdan       if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
1970d30ab3d9Sdan         pIncr->bEof = 1;
1971d30ab3d9Sdan       }else{
1972d30ab3d9Sdan         rc = vdbeIncrBgPopulate(pIncr);
1973d30ab3d9Sdan       }
1974d30ab3d9Sdan     }
197582a8a9f1Sdan   }else
197682a8a9f1Sdan #endif
197782a8a9f1Sdan   {
19784be4c406Sdan     rc = vdbeIncrPopulate(pIncr);
19794be4c406Sdan     pIncr->aFile[0] = pIncr->aFile[1];
19804be4c406Sdan     if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
19814be4c406Sdan       pIncr->bEof = 1;
19824be4c406Sdan     }
1983a20fde64Sdan   }
1984a20fde64Sdan 
1985a20fde64Sdan   return rc;
1986a20fde64Sdan }
1987a20fde64Sdan 
1988a20fde64Sdan /*
19891a088a8eSdan ** Allocate and return a new IncrMerger object to read data from pMerger.
19907f0a24b7Sdan **
19917f0a24b7Sdan ** If an OOM condition is encountered, return NULL. In this case free the
19927f0a24b7Sdan ** pMerger argument before returning.
1993a20fde64Sdan */
vdbeIncrMergerNew(SortSubtask * pTask,MergeEngine * pMerger,IncrMerger ** ppOut)1994bde27aaaSdrh static int vdbeIncrMergerNew(
1995a4c8ca04Sdrh   SortSubtask *pTask,     /* The thread that will be using the new IncrMerger */
1996a4c8ca04Sdrh   MergeEngine *pMerger,   /* The MergeEngine that the IncrMerger will control */
1997a4c8ca04Sdrh   IncrMerger **ppOut      /* Write the new IncrMerger here */
1998a20fde64Sdan ){
19997f0a24b7Sdan   int rc = SQLITE_OK;
2000190d6959Sdrh   IncrMerger *pIncr = *ppOut = (IncrMerger*)
2001190d6959Sdrh        (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr)));
20024be4c406Sdan   if( pIncr ){
20034be4c406Sdan     pIncr->pMerger = pMerger;
20044be4c406Sdan     pIncr->pTask = pTask;
20054be4c406Sdan     pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
20064be4c406Sdan     pTask->file2.iEof += pIncr->mxSz;
20077f0a24b7Sdan   }else{
20087f0a24b7Sdan     vdbeMergeEngineFree(pMerger);
2009fad3039cSmistachkin     rc = SQLITE_NOMEM_BKPT;
20104be4c406Sdan   }
201133d28ab4Sdrh   assert( *ppOut!=0 || rc!=SQLITE_OK );
20127f0a24b7Sdan   return rc;
20134be4c406Sdan }
20144be4c406Sdan 
2015958d261bSdrh #if SQLITE_MAX_WORKER_THREADS>0
20161a088a8eSdan /*
20171a088a8eSdan ** Set the "use-threads" flag on object pIncr.
20181a088a8eSdan */
vdbeIncrMergerSetThreads(IncrMerger * pIncr)2019bde27aaaSdrh static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){
20204be4c406Sdan   pIncr->bUseThread = 1;
20214be4c406Sdan   pIncr->pTask->file2.iEof -= pIncr->mxSz;
20224be4c406Sdan }
2023958d261bSdrh #endif /* SQLITE_MAX_WORKER_THREADS>0 */
20244be4c406Sdan 
20258a4865f1Sdrh 
20268a4865f1Sdrh 
20278a4865f1Sdrh /*
20288a4865f1Sdrh ** Recompute pMerger->aTree[iOut] by comparing the next keys on the
20298a4865f1Sdrh ** two PmaReaders that feed that entry.  Neither of the PmaReaders
20308a4865f1Sdrh ** are advanced.  This routine merely does the comparison.
20318a4865f1Sdrh */
vdbeMergeEngineCompare(MergeEngine * pMerger,int iOut)20328a4865f1Sdrh static void vdbeMergeEngineCompare(
20338a4865f1Sdrh   MergeEngine *pMerger,  /* Merge engine containing PmaReaders to compare */
20348a4865f1Sdrh   int iOut               /* Store the result in pMerger->aTree[iOut] */
20358a4865f1Sdrh ){
20368a4865f1Sdrh   int i1;
20378a4865f1Sdrh   int i2;
20388a4865f1Sdrh   int iRes;
20398a4865f1Sdrh   PmaReader *p1;
20408a4865f1Sdrh   PmaReader *p2;
20418a4865f1Sdrh 
20428a4865f1Sdrh   assert( iOut<pMerger->nTree && iOut>0 );
20438a4865f1Sdrh 
20448a4865f1Sdrh   if( iOut>=(pMerger->nTree/2) ){
20458a4865f1Sdrh     i1 = (iOut - pMerger->nTree/2) * 2;
20468a4865f1Sdrh     i2 = i1 + 1;
20478a4865f1Sdrh   }else{
20488a4865f1Sdrh     i1 = pMerger->aTree[iOut*2];
20498a4865f1Sdrh     i2 = pMerger->aTree[iOut*2+1];
20508a4865f1Sdrh   }
20518a4865f1Sdrh 
20528a4865f1Sdrh   p1 = &pMerger->aReadr[i1];
20538a4865f1Sdrh   p2 = &pMerger->aReadr[i2];
20548a4865f1Sdrh 
20558a4865f1Sdrh   if( p1->pFd==0 ){
20568a4865f1Sdrh     iRes = i2;
20578a4865f1Sdrh   }else if( p2->pFd==0 ){
20588a4865f1Sdrh     iRes = i1;
20598a4865f1Sdrh   }else{
2060a9d9111cSdan     SortSubtask *pTask = pMerger->pTask;
2061a9d9111cSdan     int bCached = 0;
20628a4865f1Sdrh     int res;
2063a9d9111cSdan     assert( pTask->pUnpacked!=0 );  /* from vdbeSortSubtaskMain() */
2064a9d9111cSdan     res = pTask->xCompare(
2065a9d9111cSdan         pTask, &bCached, p1->aKey, p1->nKey, p2->aKey, p2->nKey
20668a4865f1Sdrh     );
20678a4865f1Sdrh     if( res<=0 ){
20688a4865f1Sdrh       iRes = i1;
20698a4865f1Sdrh     }else{
20708a4865f1Sdrh       iRes = i2;
20718a4865f1Sdrh     }
20728a4865f1Sdrh   }
20738a4865f1Sdrh 
20748a4865f1Sdrh   pMerger->aTree[iOut] = iRes;
20758a4865f1Sdrh }
20768a4865f1Sdrh 
20778a4865f1Sdrh /*
2078d906514dSdrh ** Allowed values for the eMode parameter to vdbeMergeEngineInit()
20798a4865f1Sdrh ** and vdbePmaReaderIncrMergeInit().
2080c690461eSdrh **
2081c690461eSdrh ** Only INCRINIT_NORMAL is valid in single-threaded builds (when
2082c690461eSdrh ** SQLITE_MAX_WORKER_THREADS==0).  The other values are only used
2083c690461eSdrh ** when there exists one or more separate worker threads.
20848a4865f1Sdrh */
2085a9f43d73Sdan #define INCRINIT_NORMAL 0
2086a9f43d73Sdan #define INCRINIT_TASK   1
2087a9f43d73Sdan #define INCRINIT_ROOT   2
20888a4865f1Sdrh 
208936b948f8Sdan /*
209036b948f8Sdan ** Forward reference required as the vdbeIncrMergeInit() and
209136b948f8Sdan ** vdbePmaReaderIncrInit() routines are called mutually recursively when
209236b948f8Sdan ** building a merge tree.
20938a4865f1Sdrh */
209436b948f8Sdan static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode);
2095f77ceba5Sdan 
20967f0a24b7Sdan /*
2097a4c8ca04Sdrh ** Initialize the MergeEngine object passed as the second argument. Once this
2098a4c8ca04Sdrh ** function returns, the first key of merged data may be read from the
2099a4c8ca04Sdrh ** MergeEngine object in the usual fashion.
21007f0a24b7Sdan **
2101a9f43d73Sdan ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
21027f0a24b7Sdan ** objects attached to the PmaReader objects that the merger reads from have
21037f0a24b7Sdan ** already been populated, but that they have not yet populated aFile[0] and
21047f0a24b7Sdan ** set the PmaReader objects up to read from it. In this case all that is
2105de823bedSdrh ** required is to call vdbePmaReaderNext() on each PmaReader to point it at
21067f0a24b7Sdan ** its first key.
21077f0a24b7Sdan **
2108a9f43d73Sdan ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use
21098a4865f1Sdrh ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data
2110db30fc4fSdan ** to pMerger.
21117f0a24b7Sdan **
21127f0a24b7Sdan ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
21137f0a24b7Sdan */
vdbeMergeEngineInit(SortSubtask * pTask,MergeEngine * pMerger,int eMode)2114d906514dSdrh static int vdbeMergeEngineInit(
2115a4c8ca04Sdrh   SortSubtask *pTask,             /* Thread that will run pMerger */
2116a4c8ca04Sdrh   MergeEngine *pMerger,           /* MergeEngine to initialize */
2117a9f43d73Sdan   int eMode                       /* One of the INCRINIT_XXX constants */
2118f77ceba5Sdan ){
21197f0a24b7Sdan   int rc = SQLITE_OK;             /* Return code */
2120de823bedSdrh   int i;                          /* For looping over PmaReader objects */
2121f396ecadSdrh   int nTree;                      /* Number of subtrees to merge */
2122f396ecadSdrh 
2123f396ecadSdrh   /* Failure to allocate the merge would have been detected prior to
2124f396ecadSdrh   ** invoking this routine */
2125f396ecadSdrh   assert( pMerger!=0 );
21264be4c406Sdan 
2127c690461eSdrh   /* eMode is always INCRINIT_NORMAL in single-threaded mode */
2128c690461eSdrh   assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
2129c690461eSdrh 
2130a4c8ca04Sdrh   /* Verify that the MergeEngine is assigned to a single thread */
21310f8f2671Sdrh   assert( pMerger->pTask==0 );
2132a4c8ca04Sdrh   pMerger->pTask = pTask;
2133a4c8ca04Sdrh 
2134f396ecadSdrh   nTree = pMerger->nTree;
21358a4865f1Sdrh   for(i=0; i<nTree; i++){
2136c690461eSdrh     if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){
2137de823bedSdrh       /* PmaReaders should be normally initialized in order, as if they are
2138e18e90ebSdan       ** reading from the same temp file this makes for more linear file IO.
2139de823bedSdrh       ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is
2140e18e90ebSdan       ** in use it will block the vdbePmaReaderNext() call while it uses
2141e18e90ebSdan       ** the main thread to fill its buffer. So calling PmaReaderNext()
2142de823bedSdrh       ** on this PmaReader before any of the multi-threaded PmaReaders takes
2143e18e90ebSdan       ** better advantage of multi-processor hardware. */
2144de823bedSdrh       rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]);
2145be3018c1Sdan     }else{
214636b948f8Sdan       rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
2147be3018c1Sdan     }
21488a4865f1Sdrh     if( rc!=SQLITE_OK ) return rc;
21494be4c406Sdan   }
21504be4c406Sdan 
21518a4865f1Sdrh   for(i=pMerger->nTree-1; i>0; i--){
21528a4865f1Sdrh     vdbeMergeEngineCompare(pMerger, i);
2153f77ceba5Sdan   }
21548a4865f1Sdrh   return pTask->pUnpacked->errCode;
2155f77ceba5Sdan }
2156f77ceba5Sdan 
2157a9f43d73Sdan /*
215836b948f8Sdan ** The PmaReader passed as the first argument is guaranteed to be an
215936b948f8Sdan ** incremental-reader (pReadr->pIncr!=0). This function serves to open
216036b948f8Sdan ** and/or initialize the temp file related fields of the IncrMerge
2161de823bedSdrh ** object at (pReadr->pIncr).
2162a9f43d73Sdan **
2163a4c8ca04Sdrh ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
216436b948f8Sdan ** in the sub-tree headed by pReadr are also initialized. Data is then
216536b948f8Sdan ** loaded into the buffers belonging to pReadr and it is set to point to
216636b948f8Sdan ** the first key in its range.
2167a9f43d73Sdan **
2168a4c8ca04Sdrh ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
2169de823bedSdrh ** to be a multi-threaded PmaReader and this function is being called in a
2170de823bedSdrh ** background thread. In this case all PmaReaders in the sub-tree are
2171a9f43d73Sdan ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
2172a4c8ca04Sdrh ** pReadr is populated. However, pReadr itself is not set up to point
2173a9f43d73Sdan ** to its first key. A call to vdbePmaReaderNext() is still required to do
2174a9f43d73Sdan ** that.
2175a9f43d73Sdan **
2176a9f43d73Sdan ** The reason this function does not call vdbePmaReaderNext() immediately
2177a4c8ca04Sdrh ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has
2178a9f43d73Sdan ** to block on thread (pTask->thread) before accessing aFile[1]. But, since
2179a9f43d73Sdan ** this entire function is being run by thread (pTask->thread), that will
2180a9f43d73Sdan ** lead to the current background thread attempting to join itself.
2181a9f43d73Sdan **
2182a9f43d73Sdan ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed
2183de823bedSdrh ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all
2184a9f43d73Sdan ** child-trees have already been initialized using IncrInit(INCRINIT_TASK).
2185de823bedSdrh ** In this case vdbePmaReaderNext() is called on all child PmaReaders and
2186de823bedSdrh ** the current PmaReader set to point to the first key in its range.
2187a9f43d73Sdan **
2188a9f43d73Sdan ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2189a9f43d73Sdan */
vdbePmaReaderIncrMergeInit(PmaReader * pReadr,int eMode)21908a4865f1Sdrh static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){
2191f77ceba5Sdan   int rc = SQLITE_OK;
2192de823bedSdrh   IncrMerger *pIncr = pReadr->pIncr;
219336b948f8Sdan   SortSubtask *pTask = pIncr->pTask;
219436b948f8Sdan   sqlite3 *db = pTask->pSorter->db;
2195c690461eSdrh 
2196c690461eSdrh   /* eMode is always INCRINIT_NORMAL in single-threaded mode */
2197c690461eSdrh   assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
2198c690461eSdrh 
2199d906514dSdrh   rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
2200f77ceba5Sdan 
2201a9f43d73Sdan   /* Set up the required files for pIncr. A multi-theaded IncrMerge object
2202a9f43d73Sdan   ** requires two temp files to itself, whereas a single-threaded object
2203a9f43d73Sdan   ** only requires a region of pTask->file2. */
22044be4c406Sdan   if( rc==SQLITE_OK ){
2205a9f43d73Sdan     int mxSz = pIncr->mxSz;
2206b0f935e4Sdrh #if SQLITE_MAX_WORKER_THREADS>0
2207b0f935e4Sdrh     if( pIncr->bUseThread ){
2208b0f935e4Sdrh       rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
2209b0f935e4Sdrh       if( rc==SQLITE_OK ){
2210b0f935e4Sdrh         rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
2211b0f935e4Sdrh       }
2212b0f935e4Sdrh     }else
2213b0f935e4Sdrh #endif
2214b0f935e4Sdrh     /*if( !pIncr->bUseThread )*/{
22154be4c406Sdan       if( pTask->file2.pFd==0 ){
22164be4c406Sdan         assert( pTask->file2.iEof>0 );
2217a9f43d73Sdan         rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
22184be4c406Sdan         pTask->file2.iEof = 0;
22194be4c406Sdan       }
22204be4c406Sdan       if( rc==SQLITE_OK ){
22214be4c406Sdan         pIncr->aFile[1].pFd = pTask->file2.pFd;
22224be4c406Sdan         pIncr->iStartOff = pTask->file2.iEof;
2223a9f43d73Sdan         pTask->file2.iEof += mxSz;
22244be4c406Sdan       }
22254be4c406Sdan     }
22264be4c406Sdan   }
22274be4c406Sdan 
2228b0f935e4Sdrh #if SQLITE_MAX_WORKER_THREADS>0
22294be4c406Sdan   if( rc==SQLITE_OK && pIncr->bUseThread ){
2230a9f43d73Sdan     /* Use the current thread to populate aFile[1], even though this
223136b948f8Sdan     ** PmaReader is multi-threaded. If this is an INCRINIT_TASK object,
223236b948f8Sdan     ** then this function is already running in background thread
223336b948f8Sdan     ** pIncr->pTask->thread.
223436b948f8Sdan     **
223536b948f8Sdan     ** If this is the INCRINIT_ROOT object, then it is running in the
223636b948f8Sdan     ** main VDBE thread. But that is Ok, as that thread cannot return
223736b948f8Sdan     ** control to the VDBE or proceed with anything useful until the
223836b948f8Sdan     ** first results are ready from this merger object anyway.
223936b948f8Sdan     */
2240a9f43d73Sdan     assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
2241be3018c1Sdan     rc = vdbeIncrPopulate(pIncr);
22424be4c406Sdan   }
2243b0f935e4Sdrh #endif
22444be4c406Sdan 
224536b948f8Sdan   if( rc==SQLITE_OK && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) ){
2246de823bedSdrh     rc = vdbePmaReaderNext(pReadr);
22474be4c406Sdan   }
224836b948f8Sdan 
22494be4c406Sdan   return rc;
22504be4c406Sdan }
22514be4c406Sdan 
225292a20ddeSdan #if SQLITE_MAX_WORKER_THREADS>0
22537f0a24b7Sdan /*
22548a4865f1Sdrh ** The main routine for vdbePmaReaderIncrMergeInit() operations run in
2255db30fc4fSdan ** background threads.
22567f0a24b7Sdan */
vdbePmaReaderBgIncrInit(void * pCtx)225736b948f8Sdan static void *vdbePmaReaderBgIncrInit(void *pCtx){
2258be3018c1Sdan   PmaReader *pReader = (PmaReader*)pCtx;
22598a4865f1Sdrh   void *pRet = SQLITE_INT_TO_PTR(
22608a4865f1Sdrh                   vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK)
22618a4865f1Sdrh                );
22621a088a8eSdan   pReader->pIncr->pTask->bDone = 1;
2263be3018c1Sdan   return pRet;
2264be3018c1Sdan }
226536b948f8Sdan #endif
2266be3018c1Sdan 
22677f0a24b7Sdan /*
226836b948f8Sdan ** If the PmaReader passed as the first argument is not an incremental-reader
226936b948f8Sdan ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it invokes
227036b948f8Sdan ** the vdbePmaReaderIncrMergeInit() function with the parameters passed to
227136b948f8Sdan ** this routine to initialize the incremental merge.
22727f0a24b7Sdan **
227336b948f8Sdan ** If the IncrMerger object is multi-threaded (IncrMerger.bUseThread==1),
227436b948f8Sdan ** then a background thread is launched to call vdbePmaReaderIncrMergeInit().
227536b948f8Sdan ** Or, if the IncrMerger is single threaded, the same function is called
227636b948f8Sdan ** using the current thread.
22777f0a24b7Sdan */
vdbePmaReaderIncrInit(PmaReader * pReadr,int eMode)227836b948f8Sdan static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){
227936b948f8Sdan   IncrMerger *pIncr = pReadr->pIncr;   /* Incremental merger */
228036b948f8Sdan   int rc = SQLITE_OK;                  /* Return code */
228136b948f8Sdan   if( pIncr ){
228236b948f8Sdan #if SQLITE_MAX_WORKER_THREADS>0
228336b948f8Sdan     assert( pIncr->bUseThread==0 || eMode==INCRINIT_TASK );
228436b948f8Sdan     if( pIncr->bUseThread ){
2285de823bedSdrh       void *pCtx = (void*)pReadr;
228636b948f8Sdan       rc = vdbeSorterCreateThread(pIncr->pTask, vdbePmaReaderBgIncrInit, pCtx);
228736b948f8Sdan     }else
228892a20ddeSdan #endif
228936b948f8Sdan     {
229036b948f8Sdan       rc = vdbePmaReaderIncrMergeInit(pReadr, eMode);
229136b948f8Sdan     }
229236b948f8Sdan   }
229336b948f8Sdan   return rc;
229436b948f8Sdan }
2295be3018c1Sdan 
22964be4c406Sdan /*
22974be4c406Sdan ** Allocate a new MergeEngine object to merge the contents of nPMA level-0
22984be4c406Sdan ** PMAs from pTask->file. If no error occurs, set *ppOut to point to
22994be4c406Sdan ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
23004be4c406Sdan ** to NULL and return an SQLite error code.
23014be4c406Sdan **
23024be4c406Sdan ** When this function is called, *piOffset is set to the offset of the
23034be4c406Sdan ** first PMA to read from pTask->file. Assuming no error occurs, it is
23044be4c406Sdan ** set to the offset immediately following the last byte of the last
23054be4c406Sdan ** PMA before returning. If an error does occur, then the final value of
23064be4c406Sdan ** *piOffset is undefined.
23074be4c406Sdan */
vdbeMergeEngineLevel0(SortSubtask * pTask,int nPMA,i64 * piOffset,MergeEngine ** ppOut)23084be4c406Sdan static int vdbeMergeEngineLevel0(
23094be4c406Sdan   SortSubtask *pTask,             /* Sorter task to read from */
23104be4c406Sdan   int nPMA,                       /* Number of PMAs to read */
2311de823bedSdrh   i64 *piOffset,                  /* IN/OUT: Readr offset in pTask->file */
23124be4c406Sdan   MergeEngine **ppOut             /* OUT: New merge-engine */
23134be4c406Sdan ){
23144be4c406Sdan   MergeEngine *pNew;              /* Merge engine to return */
23154be4c406Sdan   i64 iOff = *piOffset;
23164be4c406Sdan   int i;
23174be4c406Sdan   int rc = SQLITE_OK;
23184be4c406Sdan 
23194be4c406Sdan   *ppOut = pNew = vdbeMergeEngineNew(nPMA);
2320fad3039cSmistachkin   if( pNew==0 ) rc = SQLITE_NOMEM_BKPT;
23214be4c406Sdan 
23224be4c406Sdan   for(i=0; i<nPMA && rc==SQLITE_OK; i++){
2323b1f4efd2Sdrh     i64 nDummy = 0;
2324de823bedSdrh     PmaReader *pReadr = &pNew->aReadr[i];
2325de823bedSdrh     rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy);
2326de823bedSdrh     iOff = pReadr->iEof;
23274be4c406Sdan   }
23284be4c406Sdan 
23294be4c406Sdan   if( rc!=SQLITE_OK ){
23304be4c406Sdan     vdbeMergeEngineFree(pNew);
23314be4c406Sdan     *ppOut = 0;
23324be4c406Sdan   }
23334be4c406Sdan   *piOffset = iOff;
23344be4c406Sdan   return rc;
23354be4c406Sdan }
23364be4c406Sdan 
23377f0a24b7Sdan /*
23387f0a24b7Sdan ** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of
23397f0a24b7Sdan ** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes.
23407f0a24b7Sdan **
23417f0a24b7Sdan ** i.e.
23427f0a24b7Sdan **
23437f0a24b7Sdan **   nPMA<=16    -> TreeDepth() == 0
23447f0a24b7Sdan **   nPMA<=256   -> TreeDepth() == 1
23457f0a24b7Sdan **   nPMA<=65536 -> TreeDepth() == 2
23467f0a24b7Sdan */
vdbeSorterTreeDepth(int nPMA)23477f0a24b7Sdan static int vdbeSorterTreeDepth(int nPMA){
23487f0a24b7Sdan   int nDepth = 0;
23497f0a24b7Sdan   i64 nDiv = SORTER_MAX_MERGE_COUNT;
23507f0a24b7Sdan   while( nDiv < (i64)nPMA ){
23517f0a24b7Sdan     nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
23527f0a24b7Sdan     nDepth++;
23537f0a24b7Sdan   }
23547f0a24b7Sdan   return nDepth;
23554be4c406Sdan }
23564be4c406Sdan 
23577f0a24b7Sdan /*
23587f0a24b7Sdan ** pRoot is the root of an incremental merge-tree with depth nDepth (according
23597f0a24b7Sdan ** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the
23607f0a24b7Sdan ** tree, counting from zero. This function adds pLeaf to the tree.
23617f0a24b7Sdan **
23627f0a24b7Sdan ** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error
23637f0a24b7Sdan ** code is returned and pLeaf is freed.
23647f0a24b7Sdan */
vdbeSorterAddToTree(SortSubtask * pTask,int nDepth,int iSeq,MergeEngine * pRoot,MergeEngine * pLeaf)23657f0a24b7Sdan static int vdbeSorterAddToTree(
23667f0a24b7Sdan   SortSubtask *pTask,             /* Task context */
23677f0a24b7Sdan   int nDepth,                     /* Depth of tree according to TreeDepth() */
23687f0a24b7Sdan   int iSeq,                       /* Sequence number of leaf within tree */
23697f0a24b7Sdan   MergeEngine *pRoot,             /* Root of tree */
23707f0a24b7Sdan   MergeEngine *pLeaf              /* Leaf to add to tree */
23717f0a24b7Sdan ){
23727f0a24b7Sdan   int rc = SQLITE_OK;
23737f0a24b7Sdan   int nDiv = 1;
23747f0a24b7Sdan   int i;
23757f0a24b7Sdan   MergeEngine *p = pRoot;
23767f0a24b7Sdan   IncrMerger *pIncr;
23777f0a24b7Sdan 
2378bde27aaaSdrh   rc = vdbeIncrMergerNew(pTask, pLeaf, &pIncr);
23797f0a24b7Sdan 
23807f0a24b7Sdan   for(i=1; i<nDepth; i++){
23817f0a24b7Sdan     nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
23827f0a24b7Sdan   }
23837f0a24b7Sdan 
23847f0a24b7Sdan   for(i=1; i<nDepth && rc==SQLITE_OK; i++){
23857f0a24b7Sdan     int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT;
2386de823bedSdrh     PmaReader *pReadr = &p->aReadr[iIter];
23877f0a24b7Sdan 
2388de823bedSdrh     if( pReadr->pIncr==0 ){
23897f0a24b7Sdan       MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
23907f0a24b7Sdan       if( pNew==0 ){
2391fad3039cSmistachkin         rc = SQLITE_NOMEM_BKPT;
23927f0a24b7Sdan       }else{
2393bde27aaaSdrh         rc = vdbeIncrMergerNew(pTask, pNew, &pReadr->pIncr);
23947f0a24b7Sdan       }
23957f0a24b7Sdan     }
2396d94d4ee7Sdan     if( rc==SQLITE_OK ){
2397de823bedSdrh       p = pReadr->pIncr->pMerger;
23987f0a24b7Sdan       nDiv = nDiv / SORTER_MAX_MERGE_COUNT;
23994be4c406Sdan     }
2400d94d4ee7Sdan   }
24014be4c406Sdan 
24024be4c406Sdan   if( rc==SQLITE_OK ){
2403de823bedSdrh     p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr;
24047f0a24b7Sdan   }else{
24057f0a24b7Sdan     vdbeIncrFree(pIncr);
24064be4c406Sdan   }
24074be4c406Sdan   return rc;
24084be4c406Sdan }
2409d30ab3d9Sdan 
2410d30ab3d9Sdan /*
24117f0a24b7Sdan ** This function is called as part of a SorterRewind() operation on a sorter
24127f0a24b7Sdan ** that has already written two or more level-0 PMAs to one or more temp
24137f0a24b7Sdan ** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that
24147f0a24b7Sdan ** can be used to incrementally merge all PMAs on disk.
24157f0a24b7Sdan **
24167f0a24b7Sdan ** If successful, SQLITE_OK is returned and *ppOut set to point to the
24177f0a24b7Sdan ** MergeEngine object at the root of the tree before returning. Or, if an
24187f0a24b7Sdan ** error occurs, an SQLite error code is returned and the final value
24197f0a24b7Sdan ** of *ppOut is undefined.
2420d30ab3d9Sdan */
vdbeSorterMergeTreeBuild(VdbeSorter * pSorter,MergeEngine ** ppOut)2421ac65196eSdrh static int vdbeSorterMergeTreeBuild(
2422ac65196eSdrh   VdbeSorter *pSorter,       /* The VDBE cursor that implements the sort */
2423ac65196eSdrh   MergeEngine **ppOut        /* Write the MergeEngine here */
2424ac65196eSdrh ){
24254be4c406Sdan   MergeEngine *pMain = 0;
2426d30ab3d9Sdan   int rc = SQLITE_OK;
24274be4c406Sdan   int iTask;
2428d30ab3d9Sdan 
2429b0f935e4Sdrh #if SQLITE_MAX_WORKER_THREADS>0
24307f0a24b7Sdan   /* If the sorter uses more than one task, then create the top-level
24317f0a24b7Sdan   ** MergeEngine here. This MergeEngine will read data from exactly
24327f0a24b7Sdan   ** one PmaReader per sub-task.  */
24337f0a24b7Sdan   assert( pSorter->bUseThreads || pSorter->nTask==1 );
24344be4c406Sdan   if( pSorter->nTask>1 ){
24354be4c406Sdan     pMain = vdbeMergeEngineNew(pSorter->nTask);
2436fad3039cSmistachkin     if( pMain==0 ) rc = SQLITE_NOMEM_BKPT;
2437d30ab3d9Sdan   }
2438b0f935e4Sdrh #endif
24394be4c406Sdan 
24405f4a4790Sdrh   for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
24414be4c406Sdan     SortSubtask *pTask = &pSorter->aTask[iTask];
2442c690461eSdrh     assert( pTask->nPMA>0 || SQLITE_MAX_WORKER_THREADS>0 );
2443c690461eSdrh     if( SQLITE_MAX_WORKER_THREADS==0 || pTask->nPMA ){
24447f0a24b7Sdan       MergeEngine *pRoot = 0;     /* Root node of tree for this task */
24457f0a24b7Sdan       int nDepth = vdbeSorterTreeDepth(pTask->nPMA);
24467f0a24b7Sdan       i64 iReadOff = 0;
24474be4c406Sdan 
24487f0a24b7Sdan       if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){
24497f0a24b7Sdan         rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot);
24504be4c406Sdan       }else{
24514be4c406Sdan         int i;
24527f0a24b7Sdan         int iSeq = 0;
24537f0a24b7Sdan         pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
2454fad3039cSmistachkin         if( pRoot==0 ) rc = SQLITE_NOMEM_BKPT;
24557f0a24b7Sdan         for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){
24567f0a24b7Sdan           MergeEngine *pMerger = 0; /* New level-0 PMA merger */
24577f0a24b7Sdan           int nReader;              /* Number of level-0 PMAs to merge */
24587f0a24b7Sdan 
24597f0a24b7Sdan           nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT);
24607f0a24b7Sdan           rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
24617f0a24b7Sdan           if( rc==SQLITE_OK ){
24627f0a24b7Sdan             rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger);
24634be4c406Sdan           }
2464d30ab3d9Sdan         }
2465d30ab3d9Sdan       }
2466d30ab3d9Sdan 
2467d30ab3d9Sdan       if( rc==SQLITE_OK ){
24685f4a4790Sdrh #if SQLITE_MAX_WORKER_THREADS>0
24695f4a4790Sdrh         if( pMain!=0 ){
2470bde27aaaSdrh           rc = vdbeIncrMergerNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr);
24715f4a4790Sdrh         }else
24725f4a4790Sdrh #endif
24735f4a4790Sdrh         {
24745f4a4790Sdrh           assert( pMain==0 );
24755f4a4790Sdrh           pMain = pRoot;
24764be4c406Sdan         }
24777f0a24b7Sdan       }else{
247822ace891Sdan         vdbeMergeEngineFree(pRoot);
247922ace891Sdan       }
24804be4c406Sdan     }
24817f0a24b7Sdan   }
24824be4c406Sdan 
24837f0a24b7Sdan   if( rc!=SQLITE_OK ){
24847f0a24b7Sdan     vdbeMergeEngineFree(pMain);
24857f0a24b7Sdan     pMain = 0;
24867f0a24b7Sdan   }
24877f0a24b7Sdan   *ppOut = pMain;
24887f0a24b7Sdan   return rc;
24897f0a24b7Sdan }
24907f0a24b7Sdan 
24917f0a24b7Sdan /*
2492db30fc4fSdan ** This function is called as part of an sqlite3VdbeSorterRewind() operation
2493db30fc4fSdan ** on a sorter that has written two or more PMAs to temporary files. It sets
2494db30fc4fSdan ** up either VdbeSorter.pMerger (for single threaded sorters) or pReader
2495db30fc4fSdan ** (for multi-threaded sorters) so that it can be used to iterate through
2496db30fc4fSdan ** all records stored in the sorter.
2497db30fc4fSdan **
2498db30fc4fSdan ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
24997f0a24b7Sdan */
vdbeSorterSetupMerge(VdbeSorter * pSorter)2500db30fc4fSdan static int vdbeSorterSetupMerge(VdbeSorter *pSorter){
25017f0a24b7Sdan   int rc;                         /* Return code */
25027f0a24b7Sdan   SortSubtask *pTask0 = &pSorter->aTask[0];
25037f0a24b7Sdan   MergeEngine *pMain = 0;
2504958d261bSdrh #if SQLITE_MAX_WORKER_THREADS
25057f0a24b7Sdan   sqlite3 *db = pTask0->pSorter->db;
250657a14094Sdan   int i;
2507a9d9111cSdan   SorterCompare xCompare = vdbeSorterGetCompare(pSorter);
250857a14094Sdan   for(i=0; i<pSorter->nTask; i++){
250957a14094Sdan     pSorter->aTask[i].xCompare = xCompare;
251057a14094Sdan   }
2511958d261bSdrh #endif
25127f0a24b7Sdan 
25137f0a24b7Sdan   rc = vdbeSorterMergeTreeBuild(pSorter, &pMain);
25144be4c406Sdan   if( rc==SQLITE_OK ){
2515f77ceba5Sdan #if SQLITE_MAX_WORKER_THREADS
2516d94d4ee7Sdan     assert( pSorter->bUseThreads==0 || pSorter->nTask>1 );
2517f77ceba5Sdan     if( pSorter->bUseThreads ){
2518958d261bSdrh       int iTask;
25197bdc9749Smistachkin       PmaReader *pReadr = 0;
25204be4c406Sdan       SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
25214be4c406Sdan       rc = vdbeSortAllocUnpacked(pLast);
25224be4c406Sdan       if( rc==SQLITE_OK ){
2523de823bedSdrh         pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
2524de823bedSdrh         pSorter->pReader = pReadr;
2525fad3039cSmistachkin         if( pReadr==0 ) rc = SQLITE_NOMEM_BKPT;
2526f77ceba5Sdan       }
2527f77ceba5Sdan       if( rc==SQLITE_OK ){
2528bde27aaaSdrh         rc = vdbeIncrMergerNew(pLast, pMain, &pReadr->pIncr);
25297f0a24b7Sdan         if( rc==SQLITE_OK ){
2530bde27aaaSdrh           vdbeIncrMergerSetThreads(pReadr->pIncr);
25314be4c406Sdan           for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
25324be4c406Sdan             IncrMerger *pIncr;
2533de823bedSdrh             if( (pIncr = pMain->aReadr[iTask].pIncr) ){
2534bde27aaaSdrh               vdbeIncrMergerSetThreads(pIncr);
25354be4c406Sdan               assert( pIncr->pTask!=pLast );
2536d30ab3d9Sdan             }
2537d30ab3d9Sdan           }
2538be3018c1Sdan           for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
253936b948f8Sdan             /* Check that:
254036b948f8Sdan             **
254136b948f8Sdan             **   a) The incremental merge object is configured to use the
254236b948f8Sdan             **      right task, and
254336b948f8Sdan             **   b) If it is using task (nTask-1), it is configured to run
254436b948f8Sdan             **      in single-threaded mode. This is important, as the
254536b948f8Sdan             **      root merge (INCRINIT_ROOT) will be using the same task
254636b948f8Sdan             **      object.
254736b948f8Sdan             */
2548de823bedSdrh             PmaReader *p = &pMain->aReadr[iTask];
254936b948f8Sdan             assert( p->pIncr==0 || (
255036b948f8Sdan                 (p->pIncr->pTask==&pSorter->aTask[iTask])             /* a */
255136b948f8Sdan              && (iTask!=pSorter->nTask-1 || p->pIncr->bUseThread==0)  /* b */
255236b948f8Sdan             ));
255336b948f8Sdan             rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK);
25544be4c406Sdan           }
25554be4c406Sdan         }
25567f0a24b7Sdan         pMain = 0;
2557be3018c1Sdan       }
2558be3018c1Sdan       if( rc==SQLITE_OK ){
25598a4865f1Sdrh         rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT);
2560be3018c1Sdan       }
2561f77ceba5Sdan     }else
2562f77ceba5Sdan #endif
2563f77ceba5Sdan     {
2564d906514dSdrh       rc = vdbeMergeEngineInit(pTask0, pMain, INCRINIT_NORMAL);
25657f0a24b7Sdan       pSorter->pMerger = pMain;
256622ace891Sdan       pMain = 0;
2567f77ceba5Sdan     }
2568f77ceba5Sdan   }
2569d30ab3d9Sdan 
257022ace891Sdan   if( rc!=SQLITE_OK ){
257122ace891Sdan     vdbeMergeEngineFree(pMain);
257222ace891Sdan   }
2573d30ab3d9Sdan   return rc;
2574d30ab3d9Sdan }
2575d30ab3d9Sdan 
2576d30ab3d9Sdan 
2577d30ab3d9Sdan /*
2578a634fb15Sdrh ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
2579a634fb15Sdrh ** this function is called to prepare for iterating through the records
2580a634fb15Sdrh ** in sorted order.
2581a20fde64Sdan */
sqlite3VdbeSorterRewind(const VdbeCursor * pCsr,int * pbEof)2582958d261bSdrh int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){
2583c960dcbaSdrh   VdbeSorter *pSorter;
2584a20fde64Sdan   int rc = SQLITE_OK;             /* Return code */
2585a20fde64Sdan 
2586c960dcbaSdrh   assert( pCsr->eCurType==CURTYPE_SORTER );
2587c960dcbaSdrh   pSorter = pCsr->uc.pSorter;
2588a20fde64Sdan   assert( pSorter );
2589c6e73455Sdan 
25909fed558dSdan   /* If no data has been written to disk, then do not do so now. Instead,
25919fed558dSdan   ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
25929fed558dSdan   ** from the in-memory list.  */
2593f8768418Sdan   if( pSorter->bUsePMA==0 ){
259482a8a9f1Sdan     if( pSorter->list.pList ){
2595f8768418Sdan       *pbEof = 0;
259682a8a9f1Sdan       rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list);
2597c6e73455Sdan     }else{
2598f8768418Sdan       *pbEof = 1;
2599c6e73455Sdan     }
2600f8768418Sdan     return rc;
26015134d135Sdan   }
2602a20fde64Sdan 
2603d94d4ee7Sdan   /* Write the current in-memory list to a PMA. When the VdbeSorterWrite()
2604d94d4ee7Sdan   ** function flushes the contents of memory to disk, it immediately always
2605d94d4ee7Sdan   ** creates a new list consisting of a single key immediately afterwards.
2606d94d4ee7Sdan   ** So the list is never empty at this point.  */
2607d94d4ee7Sdan   assert( pSorter->list.pList );
260882a8a9f1Sdan   rc = vdbeSorterFlushPMA(pSorter);
2609a20fde64Sdan 
2610f8768418Sdan   /* Join all threads */
2611f8768418Sdan   rc = vdbeSorterJoinAll(pSorter, rc);
2612f8768418Sdan 
2613958d261bSdrh   vdbeSorterRewindDebug("rewind");
2614f8768418Sdan 
2615d30ab3d9Sdan   /* Assuming no errors have occurred, set up a merger structure to
2616d30ab3d9Sdan   ** incrementally read and merge all remaining PMAs.  */
2617d30ab3d9Sdan   assert( pSorter->pReader==0 );
2618f8768418Sdan   if( rc==SQLITE_OK ){
2619db30fc4fSdan     rc = vdbeSorterSetupMerge(pSorter);
2620d30ab3d9Sdan     *pbEof = 0;
2621a20fde64Sdan   }
2622f8768418Sdan 
2623958d261bSdrh   vdbeSorterRewindDebug("rewinddone");
2624a20fde64Sdan   return rc;
2625a20fde64Sdan }
2626a20fde64Sdan 
2627a20fde64Sdan /*
26282ab792e4Sdrh ** Advance to the next element in the sorter.  Return value:
26292ab792e4Sdrh **
26302ab792e4Sdrh **    SQLITE_OK     success
26312ab792e4Sdrh **    SQLITE_DONE   end of data
26322ab792e4Sdrh **    otherwise     some kind of error.
2633a20fde64Sdan */
sqlite3VdbeSorterNext(sqlite3 * db,const VdbeCursor * pCsr)26342ab792e4Sdrh int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr){
2635c960dcbaSdrh   VdbeSorter *pSorter;
2636a20fde64Sdan   int rc;                         /* Return code */
2637a20fde64Sdan 
2638c960dcbaSdrh   assert( pCsr->eCurType==CURTYPE_SORTER );
2639c960dcbaSdrh   pSorter = pCsr->uc.pSorter;
2640f77ceba5Sdan   assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) );
2641f77ceba5Sdan   if( pSorter->bUsePMA ){
2642f77ceba5Sdan     assert( pSorter->pReader==0 || pSorter->pMerger==0 );
2643f77ceba5Sdan     assert( pSorter->bUseThreads==0 || pSorter->pReader );
2644f77ceba5Sdan     assert( pSorter->bUseThreads==1 || pSorter->pMerger );
2645b0f935e4Sdrh #if SQLITE_MAX_WORKER_THREADS>0
2646f77ceba5Sdan     if( pSorter->bUseThreads ){
2647d30ab3d9Sdan       rc = vdbePmaReaderNext(pSorter->pReader);
26482ab792e4Sdrh       if( rc==SQLITE_OK && pSorter->pReader->pFd==0 ) rc = SQLITE_DONE;
2649b0f935e4Sdrh     }else
2650b0f935e4Sdrh #endif
2651b0f935e4Sdrh     /*if( !pSorter->bUseThreads )*/ {
26522ab792e4Sdrh       int res = 0;
26538d9da63dSdrh       assert( pSorter->pMerger!=0 );
2654bde27aaaSdrh       assert( pSorter->pMerger->pTask==(&pSorter->aTask[0]) );
26552ab792e4Sdrh       rc = vdbeMergeEngineStep(pSorter->pMerger, &res);
26562ab792e4Sdrh       if( rc==SQLITE_OK && res ) rc = SQLITE_DONE;
2657344510e6Sdan     }
2658a20fde64Sdan   }else{
265982a8a9f1Sdan     SorterRecord *pFree = pSorter->list.pList;
266082a8a9f1Sdan     pSorter->list.pList = pFree->u.pNext;
26616971952cSdan     pFree->u.pNext = 0;
266282a8a9f1Sdan     if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
26632ab792e4Sdrh     rc = pSorter->list.pList ? SQLITE_OK : SQLITE_DONE;
2664a20fde64Sdan   }
2665a20fde64Sdan   return rc;
2666a20fde64Sdan }
2667a20fde64Sdan 
2668a20fde64Sdan /*
2669a20fde64Sdan ** Return a pointer to a buffer owned by the sorter that contains the
2670a20fde64Sdan ** current key.
2671a20fde64Sdan */
vdbeSorterRowkey(const VdbeSorter * pSorter,int * pnKey)2672a20fde64Sdan static void *vdbeSorterRowkey(
2673a20fde64Sdan   const VdbeSorter *pSorter,      /* Sorter object */
2674a20fde64Sdan   int *pnKey                      /* OUT: Size of current key in bytes */
2675a20fde64Sdan ){
2676a20fde64Sdan   void *pKey;
2677f77ceba5Sdan   if( pSorter->bUsePMA ){
2678b0f935e4Sdrh     PmaReader *pReader;
2679b0f935e4Sdrh #if SQLITE_MAX_WORKER_THREADS>0
2680b0f935e4Sdrh     if( pSorter->bUseThreads ){
2681b0f935e4Sdrh       pReader = pSorter->pReader;
2682b0f935e4Sdrh     }else
2683b0f935e4Sdrh #endif
2684b0f935e4Sdrh     /*if( !pSorter->bUseThreads )*/{
2685de823bedSdrh       pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]];
2686b0f935e4Sdrh     }
2687f77ceba5Sdan     *pnKey = pReader->nKey;
2688f77ceba5Sdan     pKey = pReader->aKey;
2689a20fde64Sdan   }else{
269082a8a9f1Sdan     *pnKey = pSorter->list.pList->nVal;
269182a8a9f1Sdan     pKey = SRVAL(pSorter->list.pList);
2692a20fde64Sdan   }
2693a20fde64Sdan   return pKey;
2694a20fde64Sdan }
2695a20fde64Sdan 
2696a20fde64Sdan /*
2697a20fde64Sdan ** Copy the current sorter key into the memory cell pOut.
2698a20fde64Sdan */
sqlite3VdbeSorterRowkey(const VdbeCursor * pCsr,Mem * pOut)2699a20fde64Sdan int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){
2700c960dcbaSdrh   VdbeSorter *pSorter;
2701a20fde64Sdan   void *pKey; int nKey;           /* Sorter key to copy into pOut */
2702a20fde64Sdan 
2703c960dcbaSdrh   assert( pCsr->eCurType==CURTYPE_SORTER );
2704c960dcbaSdrh   pSorter = pCsr->uc.pSorter;
2705a20fde64Sdan   pKey = vdbeSorterRowkey(pSorter, &nKey);
2706322f2852Sdrh   if( sqlite3VdbeMemClearAndResize(pOut, nKey) ){
2707fad3039cSmistachkin     return SQLITE_NOMEM_BKPT;
2708a20fde64Sdan   }
2709a20fde64Sdan   pOut->n = nKey;
2710a20fde64Sdan   MemSetTypeFlag(pOut, MEM_Blob);
2711a20fde64Sdan   memcpy(pOut->z, pKey, nKey);
2712a20fde64Sdan 
2713a20fde64Sdan   return SQLITE_OK;
2714a20fde64Sdan }
2715a20fde64Sdan 
2716a20fde64Sdan /*
2717a20fde64Sdan ** Compare the key in memory cell pVal with the key that the sorter cursor
2718a20fde64Sdan ** passed as the first argument currently points to. For the purposes of
2719a20fde64Sdan ** the comparison, ignore the rowid field at the end of each record.
2720a20fde64Sdan **
2721fad9f9a8Sdan ** If the sorter cursor key contains any NULL values, consider it to be
2722ac4f0039Sdrh ** less than pVal. Even if pVal also contains NULL values.
2723fad9f9a8Sdan **
2724a20fde64Sdan ** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM).
2725a20fde64Sdan ** Otherwise, set *pRes to a negative, zero or positive value if the
2726a20fde64Sdan ** key in pVal is smaller than, equal to or larger than the current sorter
2727a20fde64Sdan ** key.
2728ac4f0039Sdrh **
2729ac4f0039Sdrh ** This routine forms the core of the OP_SorterCompare opcode, which in
2730ac4f0039Sdrh ** turn is used to verify uniqueness when constructing a UNIQUE INDEX.
2731a20fde64Sdan */
sqlite3VdbeSorterCompare(const VdbeCursor * pCsr,Mem * pVal,int nKeyCol,int * pRes)2732a20fde64Sdan int sqlite3VdbeSorterCompare(
2733a20fde64Sdan   const VdbeCursor *pCsr,         /* Sorter cursor */
2734a20fde64Sdan   Mem *pVal,                      /* Value to compare to current sorter key */
2735bd1c881aSdrh   int nKeyCol,                    /* Compare this many columns */
2736a20fde64Sdan   int *pRes                       /* OUT: Result of comparison */
2737a20fde64Sdan ){
2738c960dcbaSdrh   VdbeSorter *pSorter;
2739c960dcbaSdrh   UnpackedRecord *r2;
2740c960dcbaSdrh   KeyInfo *pKeyInfo;
2741fad9f9a8Sdan   int i;
2742a20fde64Sdan   void *pKey; int nKey;           /* Sorter key to compare pVal with */
2743a20fde64Sdan 
2744c960dcbaSdrh   assert( pCsr->eCurType==CURTYPE_SORTER );
2745c960dcbaSdrh   pSorter = pCsr->uc.pSorter;
2746c960dcbaSdrh   r2 = pSorter->pUnpacked;
2747c960dcbaSdrh   pKeyInfo = pCsr->pKeyInfo;
2748d30ab3d9Sdan   if( r2==0 ){
2749a582b016Sdrh     r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo);
2750fad3039cSmistachkin     if( r2==0 ) return SQLITE_NOMEM_BKPT;
2751bd1c881aSdrh     r2->nField = nKeyCol;
2752d30ab3d9Sdan   }
2753bd1c881aSdrh   assert( r2->nField==nKeyCol );
2754fad9f9a8Sdan 
2755a20fde64Sdan   pKey = vdbeSorterRowkey(pSorter, &nKey);
2756fad9f9a8Sdan   sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
2757bd1c881aSdrh   for(i=0; i<nKeyCol; i++){
2758fad9f9a8Sdan     if( r2->aMem[i].flags & MEM_Null ){
2759fad9f9a8Sdan       *pRes = -1;
2760fad9f9a8Sdan       return SQLITE_OK;
2761fad9f9a8Sdan     }
2762fad9f9a8Sdan   }
2763fad9f9a8Sdan 
276475179dedSdrh   *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2);
2765a20fde64Sdan   return SQLITE_OK;
2766a20fde64Sdan }
2767