1 2 #if defined(SQLITE_ENABLE_SESSION) && defined(SQLITE_ENABLE_PREUPDATE_HOOK) 3 #include "sqlite3session.h" 4 #include <assert.h> 5 #include <string.h> 6 7 #ifndef SQLITE_AMALGAMATION 8 # include "sqliteInt.h" 9 # include "vdbeInt.h" 10 #endif 11 12 typedef struct SessionTable SessionTable; 13 typedef struct SessionChange SessionChange; 14 typedef struct SessionBuffer SessionBuffer; 15 typedef struct SessionInput SessionInput; 16 17 /* 18 ** Minimum chunk size used by streaming versions of functions. 19 */ 20 #ifndef SESSIONS_STRM_CHUNK_SIZE 21 # ifdef SQLITE_TEST 22 # define SESSIONS_STRM_CHUNK_SIZE 64 23 # else 24 # define SESSIONS_STRM_CHUNK_SIZE 1024 25 # endif 26 #endif 27 28 static int sessions_strm_chunk_size = SESSIONS_STRM_CHUNK_SIZE; 29 30 typedef struct SessionHook SessionHook; 31 struct SessionHook { 32 void *pCtx; 33 int (*xOld)(void*,int,sqlite3_value**); 34 int (*xNew)(void*,int,sqlite3_value**); 35 int (*xCount)(void*); 36 int (*xDepth)(void*); 37 }; 38 39 /* 40 ** Session handle structure. 41 */ 42 struct sqlite3_session { 43 sqlite3 *db; /* Database handle session is attached to */ 44 char *zDb; /* Name of database session is attached to */ 45 int bEnable; /* True if currently recording */ 46 int bIndirect; /* True if all changes are indirect */ 47 int bAutoAttach; /* True to auto-attach tables */ 48 int rc; /* Non-zero if an error has occurred */ 49 void *pFilterCtx; /* First argument to pass to xTableFilter */ 50 int (*xTableFilter)(void *pCtx, const char *zTab); 51 i64 nMalloc; /* Number of bytes of data allocated */ 52 i64 nMaxChangesetSize; 53 sqlite3_value *pZeroBlob; /* Value containing X'' */ 54 sqlite3_session *pNext; /* Next session object on same db. */ 55 SessionTable *pTable; /* List of attached tables */ 56 SessionHook hook; /* APIs to grab new and old data with */ 57 }; 58 59 /* 60 ** Instances of this structure are used to build strings or binary records. 61 */ 62 struct SessionBuffer { 63 u8 *aBuf; /* Pointer to changeset buffer */ 64 int nBuf; /* Size of buffer aBuf */ 65 int nAlloc; /* Size of allocation containing aBuf */ 66 }; 67 68 /* 69 ** An object of this type is used internally as an abstraction for 70 ** input data. Input data may be supplied either as a single large buffer 71 ** (e.g. sqlite3changeset_start()) or using a stream function (e.g. 72 ** sqlite3changeset_start_strm()). 73 */ 74 struct SessionInput { 75 int bNoDiscard; /* If true, do not discard in InputBuffer() */ 76 int iCurrent; /* Offset in aData[] of current change */ 77 int iNext; /* Offset in aData[] of next change */ 78 u8 *aData; /* Pointer to buffer containing changeset */ 79 int nData; /* Number of bytes in aData */ 80 81 SessionBuffer buf; /* Current read buffer */ 82 int (*xInput)(void*, void*, int*); /* Input stream call (or NULL) */ 83 void *pIn; /* First argument to xInput */ 84 int bEof; /* Set to true after xInput finished */ 85 }; 86 87 /* 88 ** Structure for changeset iterators. 89 */ 90 struct sqlite3_changeset_iter { 91 SessionInput in; /* Input buffer or stream */ 92 SessionBuffer tblhdr; /* Buffer to hold apValue/zTab/abPK/ */ 93 int bPatchset; /* True if this is a patchset */ 94 int bInvert; /* True to invert changeset */ 95 int bSkipEmpty; /* Skip noop UPDATE changes */ 96 int rc; /* Iterator error code */ 97 sqlite3_stmt *pConflict; /* Points to conflicting row, if any */ 98 char *zTab; /* Current table */ 99 int nCol; /* Number of columns in zTab */ 100 int op; /* Current operation */ 101 int bIndirect; /* True if current change was indirect */ 102 u8 *abPK; /* Primary key array */ 103 sqlite3_value **apValue; /* old.* and new.* values */ 104 }; 105 106 /* 107 ** Each session object maintains a set of the following structures, one 108 ** for each table the session object is monitoring. The structures are 109 ** stored in a linked list starting at sqlite3_session.pTable. 110 ** 111 ** The keys of the SessionTable.aChange[] hash table are all rows that have 112 ** been modified in any way since the session object was attached to the 113 ** table. 114 ** 115 ** The data associated with each hash-table entry is a structure containing 116 ** a subset of the initial values that the modified row contained at the 117 ** start of the session. Or no initial values if the row was inserted. 118 */ 119 struct SessionTable { 120 SessionTable *pNext; 121 char *zName; /* Local name of table */ 122 int nCol; /* Number of columns in table zName */ 123 int bStat1; /* True if this is sqlite_stat1 */ 124 const char **azCol; /* Column names */ 125 u8 *abPK; /* Array of primary key flags */ 126 int nEntry; /* Total number of entries in hash table */ 127 int nChange; /* Size of apChange[] array */ 128 SessionChange **apChange; /* Hash table buckets */ 129 }; 130 131 /* 132 ** RECORD FORMAT: 133 ** 134 ** The following record format is similar to (but not compatible with) that 135 ** used in SQLite database files. This format is used as part of the 136 ** change-set binary format, and so must be architecture independent. 137 ** 138 ** Unlike the SQLite database record format, each field is self-contained - 139 ** there is no separation of header and data. Each field begins with a 140 ** single byte describing its type, as follows: 141 ** 142 ** 0x00: Undefined value. 143 ** 0x01: Integer value. 144 ** 0x02: Real value. 145 ** 0x03: Text value. 146 ** 0x04: Blob value. 147 ** 0x05: SQL NULL value. 148 ** 149 ** Note that the above match the definitions of SQLITE_INTEGER, SQLITE_TEXT 150 ** and so on in sqlite3.h. For undefined and NULL values, the field consists 151 ** only of the single type byte. For other types of values, the type byte 152 ** is followed by: 153 ** 154 ** Text values: 155 ** A varint containing the number of bytes in the value (encoded using 156 ** UTF-8). Followed by a buffer containing the UTF-8 representation 157 ** of the text value. There is no nul terminator. 158 ** 159 ** Blob values: 160 ** A varint containing the number of bytes in the value, followed by 161 ** a buffer containing the value itself. 162 ** 163 ** Integer values: 164 ** An 8-byte big-endian integer value. 165 ** 166 ** Real values: 167 ** An 8-byte big-endian IEEE 754-2008 real value. 168 ** 169 ** Varint values are encoded in the same way as varints in the SQLite 170 ** record format. 171 ** 172 ** CHANGESET FORMAT: 173 ** 174 ** A changeset is a collection of DELETE, UPDATE and INSERT operations on 175 ** one or more tables. Operations on a single table are grouped together, 176 ** but may occur in any order (i.e. deletes, updates and inserts are all 177 ** mixed together). 178 ** 179 ** Each group of changes begins with a table header: 180 ** 181 ** 1 byte: Constant 0x54 (capital 'T') 182 ** Varint: Number of columns in the table. 183 ** nCol bytes: 0x01 for PK columns, 0x00 otherwise. 184 ** N bytes: Unqualified table name (encoded using UTF-8). Nul-terminated. 185 ** 186 ** Followed by one or more changes to the table. 187 ** 188 ** 1 byte: Either SQLITE_INSERT (0x12), UPDATE (0x17) or DELETE (0x09). 189 ** 1 byte: The "indirect-change" flag. 190 ** old.* record: (delete and update only) 191 ** new.* record: (insert and update only) 192 ** 193 ** The "old.*" and "new.*" records, if present, are N field records in the 194 ** format described above under "RECORD FORMAT", where N is the number of 195 ** columns in the table. The i'th field of each record is associated with 196 ** the i'th column of the table, counting from left to right in the order 197 ** in which columns were declared in the CREATE TABLE statement. 198 ** 199 ** The new.* record that is part of each INSERT change contains the values 200 ** that make up the new row. Similarly, the old.* record that is part of each 201 ** DELETE change contains the values that made up the row that was deleted 202 ** from the database. In the changeset format, the records that are part 203 ** of INSERT or DELETE changes never contain any undefined (type byte 0x00) 204 ** fields. 205 ** 206 ** Within the old.* record associated with an UPDATE change, all fields 207 ** associated with table columns that are not PRIMARY KEY columns and are 208 ** not modified by the UPDATE change are set to "undefined". Other fields 209 ** are set to the values that made up the row before the UPDATE that the 210 ** change records took place. Within the new.* record, fields associated 211 ** with table columns modified by the UPDATE change contain the new 212 ** values. Fields associated with table columns that are not modified 213 ** are set to "undefined". 214 ** 215 ** PATCHSET FORMAT: 216 ** 217 ** A patchset is also a collection of changes. It is similar to a changeset, 218 ** but leaves undefined those fields that are not useful if no conflict 219 ** resolution is required when applying the changeset. 220 ** 221 ** Each group of changes begins with a table header: 222 ** 223 ** 1 byte: Constant 0x50 (capital 'P') 224 ** Varint: Number of columns in the table. 225 ** nCol bytes: 0x01 for PK columns, 0x00 otherwise. 226 ** N bytes: Unqualified table name (encoded using UTF-8). Nul-terminated. 227 ** 228 ** Followed by one or more changes to the table. 229 ** 230 ** 1 byte: Either SQLITE_INSERT (0x12), UPDATE (0x17) or DELETE (0x09). 231 ** 1 byte: The "indirect-change" flag. 232 ** single record: (PK fields for DELETE, PK and modified fields for UPDATE, 233 ** full record for INSERT). 234 ** 235 ** As in the changeset format, each field of the single record that is part 236 ** of a patchset change is associated with the correspondingly positioned 237 ** table column, counting from left to right within the CREATE TABLE 238 ** statement. 239 ** 240 ** For a DELETE change, all fields within the record except those associated 241 ** with PRIMARY KEY columns are omitted. The PRIMARY KEY fields contain the 242 ** values identifying the row to delete. 243 ** 244 ** For an UPDATE change, all fields except those associated with PRIMARY KEY 245 ** columns and columns that are modified by the UPDATE are set to "undefined". 246 ** PRIMARY KEY fields contain the values identifying the table row to update, 247 ** and fields associated with modified columns contain the new column values. 248 ** 249 ** The records associated with INSERT changes are in the same format as for 250 ** changesets. It is not possible for a record associated with an INSERT 251 ** change to contain a field set to "undefined". 252 ** 253 ** REBASE BLOB FORMAT: 254 ** 255 ** A rebase blob may be output by sqlite3changeset_apply_v2() and its 256 ** streaming equivalent for use with the sqlite3_rebaser APIs to rebase 257 ** existing changesets. A rebase blob contains one entry for each conflict 258 ** resolved using either the OMIT or REPLACE strategies within the apply_v2() 259 ** call. 260 ** 261 ** The format used for a rebase blob is very similar to that used for 262 ** changesets. All entries related to a single table are grouped together. 263 ** 264 ** Each group of entries begins with a table header in changeset format: 265 ** 266 ** 1 byte: Constant 0x54 (capital 'T') 267 ** Varint: Number of columns in the table. 268 ** nCol bytes: 0x01 for PK columns, 0x00 otherwise. 269 ** N bytes: Unqualified table name (encoded using UTF-8). Nul-terminated. 270 ** 271 ** Followed by one or more entries associated with the table. 272 ** 273 ** 1 byte: Either SQLITE_INSERT (0x12), DELETE (0x09). 274 ** 1 byte: Flag. 0x01 for REPLACE, 0x00 for OMIT. 275 ** record: (in the record format defined above). 276 ** 277 ** In a rebase blob, the first field is set to SQLITE_INSERT if the change 278 ** that caused the conflict was an INSERT or UPDATE, or to SQLITE_DELETE if 279 ** it was a DELETE. The second field is set to 0x01 if the conflict 280 ** resolution strategy was REPLACE, or 0x00 if it was OMIT. 281 ** 282 ** If the change that caused the conflict was a DELETE, then the single 283 ** record is a copy of the old.* record from the original changeset. If it 284 ** was an INSERT, then the single record is a copy of the new.* record. If 285 ** the conflicting change was an UPDATE, then the single record is a copy 286 ** of the new.* record with the PK fields filled in based on the original 287 ** old.* record. 288 */ 289 290 /* 291 ** For each row modified during a session, there exists a single instance of 292 ** this structure stored in a SessionTable.aChange[] hash table. 293 */ 294 struct SessionChange { 295 u8 op; /* One of UPDATE, DELETE, INSERT */ 296 u8 bIndirect; /* True if this change is "indirect" */ 297 int nMaxSize; /* Max size of eventual changeset record */ 298 int nRecord; /* Number of bytes in buffer aRecord[] */ 299 u8 *aRecord; /* Buffer containing old.* record */ 300 SessionChange *pNext; /* For hash-table collisions */ 301 }; 302 303 /* 304 ** Write a varint with value iVal into the buffer at aBuf. Return the 305 ** number of bytes written. 306 */ 307 static int sessionVarintPut(u8 *aBuf, int iVal){ 308 return putVarint32(aBuf, iVal); 309 } 310 311 /* 312 ** Return the number of bytes required to store value iVal as a varint. 313 */ 314 static int sessionVarintLen(int iVal){ 315 return sqlite3VarintLen(iVal); 316 } 317 318 /* 319 ** Read a varint value from aBuf[] into *piVal. Return the number of 320 ** bytes read. 321 */ 322 static int sessionVarintGet(u8 *aBuf, int *piVal){ 323 return getVarint32(aBuf, *piVal); 324 } 325 326 /* Load an unaligned and unsigned 32-bit integer */ 327 #define SESSION_UINT32(x) (((u32)(x)[0]<<24)|((x)[1]<<16)|((x)[2]<<8)|(x)[3]) 328 329 /* 330 ** Read a 64-bit big-endian integer value from buffer aRec[]. Return 331 ** the value read. 332 */ 333 static sqlite3_int64 sessionGetI64(u8 *aRec){ 334 u64 x = SESSION_UINT32(aRec); 335 u32 y = SESSION_UINT32(aRec+4); 336 x = (x<<32) + y; 337 return (sqlite3_int64)x; 338 } 339 340 /* 341 ** Write a 64-bit big-endian integer value to the buffer aBuf[]. 342 */ 343 static void sessionPutI64(u8 *aBuf, sqlite3_int64 i){ 344 aBuf[0] = (i>>56) & 0xFF; 345 aBuf[1] = (i>>48) & 0xFF; 346 aBuf[2] = (i>>40) & 0xFF; 347 aBuf[3] = (i>>32) & 0xFF; 348 aBuf[4] = (i>>24) & 0xFF; 349 aBuf[5] = (i>>16) & 0xFF; 350 aBuf[6] = (i>> 8) & 0xFF; 351 aBuf[7] = (i>> 0) & 0xFF; 352 } 353 354 /* 355 ** This function is used to serialize the contents of value pValue (see 356 ** comment titled "RECORD FORMAT" above). 357 ** 358 ** If it is non-NULL, the serialized form of the value is written to 359 ** buffer aBuf. *pnWrite is set to the number of bytes written before 360 ** returning. Or, if aBuf is NULL, the only thing this function does is 361 ** set *pnWrite. 362 ** 363 ** If no error occurs, SQLITE_OK is returned. Or, if an OOM error occurs 364 ** within a call to sqlite3_value_text() (may fail if the db is utf-16)) 365 ** SQLITE_NOMEM is returned. 366 */ 367 static int sessionSerializeValue( 368 u8 *aBuf, /* If non-NULL, write serialized value here */ 369 sqlite3_value *pValue, /* Value to serialize */ 370 sqlite3_int64 *pnWrite /* IN/OUT: Increment by bytes written */ 371 ){ 372 int nByte; /* Size of serialized value in bytes */ 373 374 if( pValue ){ 375 int eType; /* Value type (SQLITE_NULL, TEXT etc.) */ 376 377 eType = sqlite3_value_type(pValue); 378 if( aBuf ) aBuf[0] = eType; 379 380 switch( eType ){ 381 case SQLITE_NULL: 382 nByte = 1; 383 break; 384 385 case SQLITE_INTEGER: 386 case SQLITE_FLOAT: 387 if( aBuf ){ 388 /* TODO: SQLite does something special to deal with mixed-endian 389 ** floating point values (e.g. ARM7). This code probably should 390 ** too. */ 391 u64 i; 392 if( eType==SQLITE_INTEGER ){ 393 i = (u64)sqlite3_value_int64(pValue); 394 }else{ 395 double r; 396 assert( sizeof(double)==8 && sizeof(u64)==8 ); 397 r = sqlite3_value_double(pValue); 398 memcpy(&i, &r, 8); 399 } 400 sessionPutI64(&aBuf[1], i); 401 } 402 nByte = 9; 403 break; 404 405 default: { 406 u8 *z; 407 int n; 408 int nVarint; 409 410 assert( eType==SQLITE_TEXT || eType==SQLITE_BLOB ); 411 if( eType==SQLITE_TEXT ){ 412 z = (u8 *)sqlite3_value_text(pValue); 413 }else{ 414 z = (u8 *)sqlite3_value_blob(pValue); 415 } 416 n = sqlite3_value_bytes(pValue); 417 if( z==0 && (eType!=SQLITE_BLOB || n>0) ) return SQLITE_NOMEM; 418 nVarint = sessionVarintLen(n); 419 420 if( aBuf ){ 421 sessionVarintPut(&aBuf[1], n); 422 if( n ) memcpy(&aBuf[nVarint + 1], z, n); 423 } 424 425 nByte = 1 + nVarint + n; 426 break; 427 } 428 } 429 }else{ 430 nByte = 1; 431 if( aBuf ) aBuf[0] = '\0'; 432 } 433 434 if( pnWrite ) *pnWrite += nByte; 435 return SQLITE_OK; 436 } 437 438 /* 439 ** Allocate and return a pointer to a buffer nByte bytes in size. If 440 ** pSession is not NULL, increase the sqlite3_session.nMalloc variable 441 ** by the number of bytes allocated. 442 */ 443 static void *sessionMalloc64(sqlite3_session *pSession, i64 nByte){ 444 void *pRet = sqlite3_malloc64(nByte); 445 if( pSession ) pSession->nMalloc += sqlite3_msize(pRet); 446 return pRet; 447 } 448 449 /* 450 ** Free buffer pFree, which must have been allocated by an earlier 451 ** call to sessionMalloc64(). If pSession is not NULL, decrease the 452 ** sqlite3_session.nMalloc counter by the number of bytes freed. 453 */ 454 static void sessionFree(sqlite3_session *pSession, void *pFree){ 455 if( pSession ) pSession->nMalloc -= sqlite3_msize(pFree); 456 sqlite3_free(pFree); 457 } 458 459 /* 460 ** This macro is used to calculate hash key values for data structures. In 461 ** order to use this macro, the entire data structure must be represented 462 ** as a series of unsigned integers. In order to calculate a hash-key value 463 ** for a data structure represented as three such integers, the macro may 464 ** then be used as follows: 465 ** 466 ** int hash_key_value; 467 ** hash_key_value = HASH_APPEND(0, <value 1>); 468 ** hash_key_value = HASH_APPEND(hash_key_value, <value 2>); 469 ** hash_key_value = HASH_APPEND(hash_key_value, <value 3>); 470 ** 471 ** In practice, the data structures this macro is used for are the primary 472 ** key values of modified rows. 473 */ 474 #define HASH_APPEND(hash, add) ((hash) << 3) ^ (hash) ^ (unsigned int)(add) 475 476 /* 477 ** Append the hash of the 64-bit integer passed as the second argument to the 478 ** hash-key value passed as the first. Return the new hash-key value. 479 */ 480 static unsigned int sessionHashAppendI64(unsigned int h, i64 i){ 481 h = HASH_APPEND(h, i & 0xFFFFFFFF); 482 return HASH_APPEND(h, (i>>32)&0xFFFFFFFF); 483 } 484 485 /* 486 ** Append the hash of the blob passed via the second and third arguments to 487 ** the hash-key value passed as the first. Return the new hash-key value. 488 */ 489 static unsigned int sessionHashAppendBlob(unsigned int h, int n, const u8 *z){ 490 int i; 491 for(i=0; i<n; i++) h = HASH_APPEND(h, z[i]); 492 return h; 493 } 494 495 /* 496 ** Append the hash of the data type passed as the second argument to the 497 ** hash-key value passed as the first. Return the new hash-key value. 498 */ 499 static unsigned int sessionHashAppendType(unsigned int h, int eType){ 500 return HASH_APPEND(h, eType); 501 } 502 503 /* 504 ** This function may only be called from within a pre-update callback. 505 ** It calculates a hash based on the primary key values of the old.* or 506 ** new.* row currently available and, assuming no error occurs, writes it to 507 ** *piHash before returning. If the primary key contains one or more NULL 508 ** values, *pbNullPK is set to true before returning. 509 ** 510 ** If an error occurs, an SQLite error code is returned and the final values 511 ** of *piHash asn *pbNullPK are undefined. Otherwise, SQLITE_OK is returned 512 ** and the output variables are set as described above. 513 */ 514 static int sessionPreupdateHash( 515 sqlite3_session *pSession, /* Session object that owns pTab */ 516 SessionTable *pTab, /* Session table handle */ 517 int bNew, /* True to hash the new.* PK */ 518 int *piHash, /* OUT: Hash value */ 519 int *pbNullPK /* OUT: True if there are NULL values in PK */ 520 ){ 521 unsigned int h = 0; /* Hash value to return */ 522 int i; /* Used to iterate through columns */ 523 524 assert( *pbNullPK==0 ); 525 assert( pTab->nCol==pSession->hook.xCount(pSession->hook.pCtx) ); 526 for(i=0; i<pTab->nCol; i++){ 527 if( pTab->abPK[i] ){ 528 int rc; 529 int eType; 530 sqlite3_value *pVal; 531 532 if( bNew ){ 533 rc = pSession->hook.xNew(pSession->hook.pCtx, i, &pVal); 534 }else{ 535 rc = pSession->hook.xOld(pSession->hook.pCtx, i, &pVal); 536 } 537 if( rc!=SQLITE_OK ) return rc; 538 539 eType = sqlite3_value_type(pVal); 540 h = sessionHashAppendType(h, eType); 541 if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){ 542 i64 iVal; 543 if( eType==SQLITE_INTEGER ){ 544 iVal = sqlite3_value_int64(pVal); 545 }else{ 546 double rVal = sqlite3_value_double(pVal); 547 assert( sizeof(iVal)==8 && sizeof(rVal)==8 ); 548 memcpy(&iVal, &rVal, 8); 549 } 550 h = sessionHashAppendI64(h, iVal); 551 }else if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){ 552 const u8 *z; 553 int n; 554 if( eType==SQLITE_TEXT ){ 555 z = (const u8 *)sqlite3_value_text(pVal); 556 }else{ 557 z = (const u8 *)sqlite3_value_blob(pVal); 558 } 559 n = sqlite3_value_bytes(pVal); 560 if( !z && (eType!=SQLITE_BLOB || n>0) ) return SQLITE_NOMEM; 561 h = sessionHashAppendBlob(h, n, z); 562 }else{ 563 assert( eType==SQLITE_NULL ); 564 assert( pTab->bStat1==0 || i!=1 ); 565 *pbNullPK = 1; 566 } 567 } 568 } 569 570 *piHash = (h % pTab->nChange); 571 return SQLITE_OK; 572 } 573 574 /* 575 ** The buffer that the argument points to contains a serialized SQL value. 576 ** Return the number of bytes of space occupied by the value (including 577 ** the type byte). 578 */ 579 static int sessionSerialLen(u8 *a){ 580 int e = *a; 581 int n; 582 if( e==0 || e==0xFF ) return 1; 583 if( e==SQLITE_NULL ) return 1; 584 if( e==SQLITE_INTEGER || e==SQLITE_FLOAT ) return 9; 585 return sessionVarintGet(&a[1], &n) + 1 + n; 586 } 587 588 /* 589 ** Based on the primary key values stored in change aRecord, calculate a 590 ** hash key. Assume the has table has nBucket buckets. The hash keys 591 ** calculated by this function are compatible with those calculated by 592 ** sessionPreupdateHash(). 593 ** 594 ** The bPkOnly argument is non-zero if the record at aRecord[] is from 595 ** a patchset DELETE. In this case the non-PK fields are omitted entirely. 596 */ 597 static unsigned int sessionChangeHash( 598 SessionTable *pTab, /* Table handle */ 599 int bPkOnly, /* Record consists of PK fields only */ 600 u8 *aRecord, /* Change record */ 601 int nBucket /* Assume this many buckets in hash table */ 602 ){ 603 unsigned int h = 0; /* Value to return */ 604 int i; /* Used to iterate through columns */ 605 u8 *a = aRecord; /* Used to iterate through change record */ 606 607 for(i=0; i<pTab->nCol; i++){ 608 int eType = *a; 609 int isPK = pTab->abPK[i]; 610 if( bPkOnly && isPK==0 ) continue; 611 612 /* It is not possible for eType to be SQLITE_NULL here. The session 613 ** module does not record changes for rows with NULL values stored in 614 ** primary key columns. */ 615 assert( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT 616 || eType==SQLITE_TEXT || eType==SQLITE_BLOB 617 || eType==SQLITE_NULL || eType==0 618 ); 619 assert( !isPK || (eType!=0 && eType!=SQLITE_NULL) ); 620 621 if( isPK ){ 622 a++; 623 h = sessionHashAppendType(h, eType); 624 if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){ 625 h = sessionHashAppendI64(h, sessionGetI64(a)); 626 a += 8; 627 }else{ 628 int n; 629 a += sessionVarintGet(a, &n); 630 h = sessionHashAppendBlob(h, n, a); 631 a += n; 632 } 633 }else{ 634 a += sessionSerialLen(a); 635 } 636 } 637 return (h % nBucket); 638 } 639 640 /* 641 ** Arguments aLeft and aRight are pointers to change records for table pTab. 642 ** This function returns true if the two records apply to the same row (i.e. 643 ** have the same values stored in the primary key columns), or false 644 ** otherwise. 645 */ 646 static int sessionChangeEqual( 647 SessionTable *pTab, /* Table used for PK definition */ 648 int bLeftPkOnly, /* True if aLeft[] contains PK fields only */ 649 u8 *aLeft, /* Change record */ 650 int bRightPkOnly, /* True if aRight[] contains PK fields only */ 651 u8 *aRight /* Change record */ 652 ){ 653 u8 *a1 = aLeft; /* Cursor to iterate through aLeft */ 654 u8 *a2 = aRight; /* Cursor to iterate through aRight */ 655 int iCol; /* Used to iterate through table columns */ 656 657 for(iCol=0; iCol<pTab->nCol; iCol++){ 658 if( pTab->abPK[iCol] ){ 659 int n1 = sessionSerialLen(a1); 660 int n2 = sessionSerialLen(a2); 661 662 if( n1!=n2 || memcmp(a1, a2, n1) ){ 663 return 0; 664 } 665 a1 += n1; 666 a2 += n2; 667 }else{ 668 if( bLeftPkOnly==0 ) a1 += sessionSerialLen(a1); 669 if( bRightPkOnly==0 ) a2 += sessionSerialLen(a2); 670 } 671 } 672 673 return 1; 674 } 675 676 /* 677 ** Arguments aLeft and aRight both point to buffers containing change 678 ** records with nCol columns. This function "merges" the two records into 679 ** a single records which is written to the buffer at *paOut. *paOut is 680 ** then set to point to one byte after the last byte written before 681 ** returning. 682 ** 683 ** The merging of records is done as follows: For each column, if the 684 ** aRight record contains a value for the column, copy the value from 685 ** their. Otherwise, if aLeft contains a value, copy it. If neither 686 ** record contains a value for a given column, then neither does the 687 ** output record. 688 */ 689 static void sessionMergeRecord( 690 u8 **paOut, 691 int nCol, 692 u8 *aLeft, 693 u8 *aRight 694 ){ 695 u8 *a1 = aLeft; /* Cursor used to iterate through aLeft */ 696 u8 *a2 = aRight; /* Cursor used to iterate through aRight */ 697 u8 *aOut = *paOut; /* Output cursor */ 698 int iCol; /* Used to iterate from 0 to nCol */ 699 700 for(iCol=0; iCol<nCol; iCol++){ 701 int n1 = sessionSerialLen(a1); 702 int n2 = sessionSerialLen(a2); 703 if( *a2 ){ 704 memcpy(aOut, a2, n2); 705 aOut += n2; 706 }else{ 707 memcpy(aOut, a1, n1); 708 aOut += n1; 709 } 710 a1 += n1; 711 a2 += n2; 712 } 713 714 *paOut = aOut; 715 } 716 717 /* 718 ** This is a helper function used by sessionMergeUpdate(). 719 ** 720 ** When this function is called, both *paOne and *paTwo point to a value 721 ** within a change record. Before it returns, both have been advanced so 722 ** as to point to the next value in the record. 723 ** 724 ** If, when this function is called, *paTwo points to a valid value (i.e. 725 ** *paTwo[0] is not 0x00 - the "no value" placeholder), a copy of the *paTwo 726 ** pointer is returned and *pnVal is set to the number of bytes in the 727 ** serialized value. Otherwise, a copy of *paOne is returned and *pnVal 728 ** set to the number of bytes in the value at *paOne. If *paOne points 729 ** to the "no value" placeholder, *pnVal is set to 1. In other words: 730 ** 731 ** if( *paTwo is valid ) return *paTwo; 732 ** return *paOne; 733 ** 734 */ 735 static u8 *sessionMergeValue( 736 u8 **paOne, /* IN/OUT: Left-hand buffer pointer */ 737 u8 **paTwo, /* IN/OUT: Right-hand buffer pointer */ 738 int *pnVal /* OUT: Bytes in returned value */ 739 ){ 740 u8 *a1 = *paOne; 741 u8 *a2 = *paTwo; 742 u8 *pRet = 0; 743 int n1; 744 745 assert( a1 ); 746 if( a2 ){ 747 int n2 = sessionSerialLen(a2); 748 if( *a2 ){ 749 *pnVal = n2; 750 pRet = a2; 751 } 752 *paTwo = &a2[n2]; 753 } 754 755 n1 = sessionSerialLen(a1); 756 if( pRet==0 ){ 757 *pnVal = n1; 758 pRet = a1; 759 } 760 *paOne = &a1[n1]; 761 762 return pRet; 763 } 764 765 /* 766 ** This function is used by changeset_concat() to merge two UPDATE changes 767 ** on the same row. 768 */ 769 static int sessionMergeUpdate( 770 u8 **paOut, /* IN/OUT: Pointer to output buffer */ 771 SessionTable *pTab, /* Table change pertains to */ 772 int bPatchset, /* True if records are patchset records */ 773 u8 *aOldRecord1, /* old.* record for first change */ 774 u8 *aOldRecord2, /* old.* record for second change */ 775 u8 *aNewRecord1, /* new.* record for first change */ 776 u8 *aNewRecord2 /* new.* record for second change */ 777 ){ 778 u8 *aOld1 = aOldRecord1; 779 u8 *aOld2 = aOldRecord2; 780 u8 *aNew1 = aNewRecord1; 781 u8 *aNew2 = aNewRecord2; 782 783 u8 *aOut = *paOut; 784 int i; 785 786 if( bPatchset==0 ){ 787 int bRequired = 0; 788 789 assert( aOldRecord1 && aNewRecord1 ); 790 791 /* Write the old.* vector first. */ 792 for(i=0; i<pTab->nCol; i++){ 793 int nOld; 794 u8 *aOld; 795 int nNew; 796 u8 *aNew; 797 798 aOld = sessionMergeValue(&aOld1, &aOld2, &nOld); 799 aNew = sessionMergeValue(&aNew1, &aNew2, &nNew); 800 if( pTab->abPK[i] || nOld!=nNew || memcmp(aOld, aNew, nNew) ){ 801 if( pTab->abPK[i]==0 ) bRequired = 1; 802 memcpy(aOut, aOld, nOld); 803 aOut += nOld; 804 }else{ 805 *(aOut++) = '\0'; 806 } 807 } 808 809 if( !bRequired ) return 0; 810 } 811 812 /* Write the new.* vector */ 813 aOld1 = aOldRecord1; 814 aOld2 = aOldRecord2; 815 aNew1 = aNewRecord1; 816 aNew2 = aNewRecord2; 817 for(i=0; i<pTab->nCol; i++){ 818 int nOld; 819 u8 *aOld; 820 int nNew; 821 u8 *aNew; 822 823 aOld = sessionMergeValue(&aOld1, &aOld2, &nOld); 824 aNew = sessionMergeValue(&aNew1, &aNew2, &nNew); 825 if( bPatchset==0 826 && (pTab->abPK[i] || (nOld==nNew && 0==memcmp(aOld, aNew, nNew))) 827 ){ 828 *(aOut++) = '\0'; 829 }else{ 830 memcpy(aOut, aNew, nNew); 831 aOut += nNew; 832 } 833 } 834 835 *paOut = aOut; 836 return 1; 837 } 838 839 /* 840 ** This function is only called from within a pre-update-hook callback. 841 ** It determines if the current pre-update-hook change affects the same row 842 ** as the change stored in argument pChange. If so, it returns true. Otherwise 843 ** if the pre-update-hook does not affect the same row as pChange, it returns 844 ** false. 845 */ 846 static int sessionPreupdateEqual( 847 sqlite3_session *pSession, /* Session object that owns SessionTable */ 848 SessionTable *pTab, /* Table associated with change */ 849 SessionChange *pChange, /* Change to compare to */ 850 int op /* Current pre-update operation */ 851 ){ 852 int iCol; /* Used to iterate through columns */ 853 u8 *a = pChange->aRecord; /* Cursor used to scan change record */ 854 855 assert( op==SQLITE_INSERT || op==SQLITE_UPDATE || op==SQLITE_DELETE ); 856 for(iCol=0; iCol<pTab->nCol; iCol++){ 857 if( !pTab->abPK[iCol] ){ 858 a += sessionSerialLen(a); 859 }else{ 860 sqlite3_value *pVal; /* Value returned by preupdate_new/old */ 861 int rc; /* Error code from preupdate_new/old */ 862 int eType = *a++; /* Type of value from change record */ 863 864 /* The following calls to preupdate_new() and preupdate_old() can not 865 ** fail. This is because they cache their return values, and by the 866 ** time control flows to here they have already been called once from 867 ** within sessionPreupdateHash(). The first two asserts below verify 868 ** this (that the method has already been called). */ 869 if( op==SQLITE_INSERT ){ 870 /* assert( db->pPreUpdate->pNewUnpacked || db->pPreUpdate->aNew ); */ 871 rc = pSession->hook.xNew(pSession->hook.pCtx, iCol, &pVal); 872 }else{ 873 /* assert( db->pPreUpdate->pUnpacked ); */ 874 rc = pSession->hook.xOld(pSession->hook.pCtx, iCol, &pVal); 875 } 876 assert( rc==SQLITE_OK ); 877 if( sqlite3_value_type(pVal)!=eType ) return 0; 878 879 /* A SessionChange object never has a NULL value in a PK column */ 880 assert( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT 881 || eType==SQLITE_BLOB || eType==SQLITE_TEXT 882 ); 883 884 if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){ 885 i64 iVal = sessionGetI64(a); 886 a += 8; 887 if( eType==SQLITE_INTEGER ){ 888 if( sqlite3_value_int64(pVal)!=iVal ) return 0; 889 }else{ 890 double rVal; 891 assert( sizeof(iVal)==8 && sizeof(rVal)==8 ); 892 memcpy(&rVal, &iVal, 8); 893 if( sqlite3_value_double(pVal)!=rVal ) return 0; 894 } 895 }else{ 896 int n; 897 const u8 *z; 898 a += sessionVarintGet(a, &n); 899 if( sqlite3_value_bytes(pVal)!=n ) return 0; 900 if( eType==SQLITE_TEXT ){ 901 z = sqlite3_value_text(pVal); 902 }else{ 903 z = sqlite3_value_blob(pVal); 904 } 905 if( n>0 && memcmp(a, z, n) ) return 0; 906 a += n; 907 } 908 } 909 } 910 911 return 1; 912 } 913 914 /* 915 ** If required, grow the hash table used to store changes on table pTab 916 ** (part of the session pSession). If a fatal OOM error occurs, set the 917 ** session object to failed and return SQLITE_ERROR. Otherwise, return 918 ** SQLITE_OK. 919 ** 920 ** It is possible that a non-fatal OOM error occurs in this function. In 921 ** that case the hash-table does not grow, but SQLITE_OK is returned anyway. 922 ** Growing the hash table in this case is a performance optimization only, 923 ** it is not required for correct operation. 924 */ 925 static int sessionGrowHash( 926 sqlite3_session *pSession, /* For memory accounting. May be NULL */ 927 int bPatchset, 928 SessionTable *pTab 929 ){ 930 if( pTab->nChange==0 || pTab->nEntry>=(pTab->nChange/2) ){ 931 int i; 932 SessionChange **apNew; 933 sqlite3_int64 nNew = 2*(sqlite3_int64)(pTab->nChange ? pTab->nChange : 128); 934 935 apNew = (SessionChange**)sessionMalloc64( 936 pSession, sizeof(SessionChange*) * nNew 937 ); 938 if( apNew==0 ){ 939 if( pTab->nChange==0 ){ 940 return SQLITE_ERROR; 941 } 942 return SQLITE_OK; 943 } 944 memset(apNew, 0, sizeof(SessionChange *) * nNew); 945 946 for(i=0; i<pTab->nChange; i++){ 947 SessionChange *p; 948 SessionChange *pNext; 949 for(p=pTab->apChange[i]; p; p=pNext){ 950 int bPkOnly = (p->op==SQLITE_DELETE && bPatchset); 951 int iHash = sessionChangeHash(pTab, bPkOnly, p->aRecord, nNew); 952 pNext = p->pNext; 953 p->pNext = apNew[iHash]; 954 apNew[iHash] = p; 955 } 956 } 957 958 sessionFree(pSession, pTab->apChange); 959 pTab->nChange = nNew; 960 pTab->apChange = apNew; 961 } 962 963 return SQLITE_OK; 964 } 965 966 /* 967 ** This function queries the database for the names of the columns of table 968 ** zThis, in schema zDb. 969 ** 970 ** Otherwise, if they are not NULL, variable *pnCol is set to the number 971 ** of columns in the database table and variable *pzTab is set to point to a 972 ** nul-terminated copy of the table name. *pazCol (if not NULL) is set to 973 ** point to an array of pointers to column names. And *pabPK (again, if not 974 ** NULL) is set to point to an array of booleans - true if the corresponding 975 ** column is part of the primary key. 976 ** 977 ** For example, if the table is declared as: 978 ** 979 ** CREATE TABLE tbl1(w, x, y, z, PRIMARY KEY(w, z)); 980 ** 981 ** Then the four output variables are populated as follows: 982 ** 983 ** *pnCol = 4 984 ** *pzTab = "tbl1" 985 ** *pazCol = {"w", "x", "y", "z"} 986 ** *pabPK = {1, 0, 0, 1} 987 ** 988 ** All returned buffers are part of the same single allocation, which must 989 ** be freed using sqlite3_free() by the caller 990 */ 991 static int sessionTableInfo( 992 sqlite3_session *pSession, /* For memory accounting. May be NULL */ 993 sqlite3 *db, /* Database connection */ 994 const char *zDb, /* Name of attached database (e.g. "main") */ 995 const char *zThis, /* Table name */ 996 int *pnCol, /* OUT: number of columns */ 997 const char **pzTab, /* OUT: Copy of zThis */ 998 const char ***pazCol, /* OUT: Array of column names for table */ 999 u8 **pabPK /* OUT: Array of booleans - true for PK col */ 1000 ){ 1001 char *zPragma; 1002 sqlite3_stmt *pStmt; 1003 int rc; 1004 sqlite3_int64 nByte; 1005 int nDbCol = 0; 1006 int nThis; 1007 int i; 1008 u8 *pAlloc = 0; 1009 char **azCol = 0; 1010 u8 *abPK = 0; 1011 1012 assert( pazCol && pabPK ); 1013 1014 nThis = sqlite3Strlen30(zThis); 1015 if( nThis==12 && 0==sqlite3_stricmp("sqlite_stat1", zThis) ){ 1016 rc = sqlite3_table_column_metadata(db, zDb, zThis, 0, 0, 0, 0, 0, 0); 1017 if( rc==SQLITE_OK ){ 1018 /* For sqlite_stat1, pretend that (tbl,idx) is the PRIMARY KEY. */ 1019 zPragma = sqlite3_mprintf( 1020 "SELECT 0, 'tbl', '', 0, '', 1 UNION ALL " 1021 "SELECT 1, 'idx', '', 0, '', 2 UNION ALL " 1022 "SELECT 2, 'stat', '', 0, '', 0" 1023 ); 1024 }else if( rc==SQLITE_ERROR ){ 1025 zPragma = sqlite3_mprintf(""); 1026 }else{ 1027 return rc; 1028 } 1029 }else{ 1030 zPragma = sqlite3_mprintf("PRAGMA '%q'.table_info('%q')", zDb, zThis); 1031 } 1032 if( !zPragma ) return SQLITE_NOMEM; 1033 1034 rc = sqlite3_prepare_v2(db, zPragma, -1, &pStmt, 0); 1035 sqlite3_free(zPragma); 1036 if( rc!=SQLITE_OK ) return rc; 1037 1038 nByte = nThis + 1; 1039 while( SQLITE_ROW==sqlite3_step(pStmt) ){ 1040 nByte += sqlite3_column_bytes(pStmt, 1); 1041 nDbCol++; 1042 } 1043 rc = sqlite3_reset(pStmt); 1044 1045 if( rc==SQLITE_OK ){ 1046 nByte += nDbCol * (sizeof(const char *) + sizeof(u8) + 1); 1047 pAlloc = sessionMalloc64(pSession, nByte); 1048 if( pAlloc==0 ){ 1049 rc = SQLITE_NOMEM; 1050 } 1051 } 1052 if( rc==SQLITE_OK ){ 1053 azCol = (char **)pAlloc; 1054 pAlloc = (u8 *)&azCol[nDbCol]; 1055 abPK = (u8 *)pAlloc; 1056 pAlloc = &abPK[nDbCol]; 1057 if( pzTab ){ 1058 memcpy(pAlloc, zThis, nThis+1); 1059 *pzTab = (char *)pAlloc; 1060 pAlloc += nThis+1; 1061 } 1062 1063 i = 0; 1064 while( SQLITE_ROW==sqlite3_step(pStmt) ){ 1065 int nName = sqlite3_column_bytes(pStmt, 1); 1066 const unsigned char *zName = sqlite3_column_text(pStmt, 1); 1067 if( zName==0 ) break; 1068 memcpy(pAlloc, zName, nName+1); 1069 azCol[i] = (char *)pAlloc; 1070 pAlloc += nName+1; 1071 abPK[i] = sqlite3_column_int(pStmt, 5); 1072 i++; 1073 } 1074 rc = sqlite3_reset(pStmt); 1075 1076 } 1077 1078 /* If successful, populate the output variables. Otherwise, zero them and 1079 ** free any allocation made. An error code will be returned in this case. 1080 */ 1081 if( rc==SQLITE_OK ){ 1082 *pazCol = (const char **)azCol; 1083 *pabPK = abPK; 1084 *pnCol = nDbCol; 1085 }else{ 1086 *pazCol = 0; 1087 *pabPK = 0; 1088 *pnCol = 0; 1089 if( pzTab ) *pzTab = 0; 1090 sessionFree(pSession, azCol); 1091 } 1092 sqlite3_finalize(pStmt); 1093 return rc; 1094 } 1095 1096 /* 1097 ** This function is only called from within a pre-update handler for a 1098 ** write to table pTab, part of session pSession. If this is the first 1099 ** write to this table, initalize the SessionTable.nCol, azCol[] and 1100 ** abPK[] arrays accordingly. 1101 ** 1102 ** If an error occurs, an error code is stored in sqlite3_session.rc and 1103 ** non-zero returned. Or, if no error occurs but the table has no primary 1104 ** key, sqlite3_session.rc is left set to SQLITE_OK and non-zero returned to 1105 ** indicate that updates on this table should be ignored. SessionTable.abPK 1106 ** is set to NULL in this case. 1107 */ 1108 static int sessionInitTable(sqlite3_session *pSession, SessionTable *pTab){ 1109 if( pTab->nCol==0 ){ 1110 u8 *abPK; 1111 assert( pTab->azCol==0 || pTab->abPK==0 ); 1112 pSession->rc = sessionTableInfo(pSession, pSession->db, pSession->zDb, 1113 pTab->zName, &pTab->nCol, 0, &pTab->azCol, &abPK 1114 ); 1115 if( pSession->rc==SQLITE_OK ){ 1116 int i; 1117 for(i=0; i<pTab->nCol; i++){ 1118 if( abPK[i] ){ 1119 pTab->abPK = abPK; 1120 break; 1121 } 1122 } 1123 if( 0==sqlite3_stricmp("sqlite_stat1", pTab->zName) ){ 1124 pTab->bStat1 = 1; 1125 } 1126 1127 pSession->nMaxChangesetSize += ( 1128 1 + sessionVarintLen(pTab->nCol) + pTab->nCol + strlen(pTab->zName) + 1 1129 ); 1130 } 1131 } 1132 return (pSession->rc || pTab->abPK==0); 1133 } 1134 1135 /* 1136 ** Versions of the four methods in object SessionHook for use with the 1137 ** sqlite_stat1 table. The purpose of this is to substitute a zero-length 1138 ** blob each time a NULL value is read from the "idx" column of the 1139 ** sqlite_stat1 table. 1140 */ 1141 typedef struct SessionStat1Ctx SessionStat1Ctx; 1142 struct SessionStat1Ctx { 1143 SessionHook hook; 1144 sqlite3_session *pSession; 1145 }; 1146 static int sessionStat1Old(void *pCtx, int iCol, sqlite3_value **ppVal){ 1147 SessionStat1Ctx *p = (SessionStat1Ctx*)pCtx; 1148 sqlite3_value *pVal = 0; 1149 int rc = p->hook.xOld(p->hook.pCtx, iCol, &pVal); 1150 if( rc==SQLITE_OK && iCol==1 && sqlite3_value_type(pVal)==SQLITE_NULL ){ 1151 pVal = p->pSession->pZeroBlob; 1152 } 1153 *ppVal = pVal; 1154 return rc; 1155 } 1156 static int sessionStat1New(void *pCtx, int iCol, sqlite3_value **ppVal){ 1157 SessionStat1Ctx *p = (SessionStat1Ctx*)pCtx; 1158 sqlite3_value *pVal = 0; 1159 int rc = p->hook.xNew(p->hook.pCtx, iCol, &pVal); 1160 if( rc==SQLITE_OK && iCol==1 && sqlite3_value_type(pVal)==SQLITE_NULL ){ 1161 pVal = p->pSession->pZeroBlob; 1162 } 1163 *ppVal = pVal; 1164 return rc; 1165 } 1166 static int sessionStat1Count(void *pCtx){ 1167 SessionStat1Ctx *p = (SessionStat1Ctx*)pCtx; 1168 return p->hook.xCount(p->hook.pCtx); 1169 } 1170 static int sessionStat1Depth(void *pCtx){ 1171 SessionStat1Ctx *p = (SessionStat1Ctx*)pCtx; 1172 return p->hook.xDepth(p->hook.pCtx); 1173 } 1174 1175 static int sessionUpdateMaxSize( 1176 int op, 1177 sqlite3_session *pSession, /* Session object pTab is attached to */ 1178 SessionTable *pTab, /* Table that change applies to */ 1179 SessionChange *pC /* Update pC->nMaxSize */ 1180 ){ 1181 i64 nNew = 2; 1182 if( pC->op==SQLITE_INSERT ){ 1183 if( op!=SQLITE_DELETE ){ 1184 int ii; 1185 for(ii=0; ii<pTab->nCol; ii++){ 1186 sqlite3_value *p = 0; 1187 pSession->hook.xNew(pSession->hook.pCtx, ii, &p); 1188 sessionSerializeValue(0, p, &nNew); 1189 } 1190 } 1191 }else if( op==SQLITE_DELETE ){ 1192 nNew += pC->nRecord; 1193 if( sqlite3_preupdate_blobwrite(pSession->db)>=0 ){ 1194 nNew += pC->nRecord; 1195 } 1196 }else{ 1197 int ii; 1198 u8 *pCsr = pC->aRecord; 1199 for(ii=0; ii<pTab->nCol; ii++){ 1200 int bChanged = 1; 1201 int nOld = 0; 1202 int eType; 1203 sqlite3_value *p = 0; 1204 pSession->hook.xNew(pSession->hook.pCtx, ii, &p); 1205 if( p==0 ){ 1206 return SQLITE_NOMEM; 1207 } 1208 1209 eType = *pCsr++; 1210 switch( eType ){ 1211 case SQLITE_NULL: 1212 bChanged = sqlite3_value_type(p)!=SQLITE_NULL; 1213 break; 1214 1215 case SQLITE_FLOAT: 1216 case SQLITE_INTEGER: { 1217 if( eType==sqlite3_value_type(p) ){ 1218 sqlite3_int64 iVal = sessionGetI64(pCsr); 1219 if( eType==SQLITE_INTEGER ){ 1220 bChanged = (iVal!=sqlite3_value_int64(p)); 1221 }else{ 1222 double dVal; 1223 memcpy(&dVal, &iVal, 8); 1224 bChanged = (dVal!=sqlite3_value_double(p)); 1225 } 1226 } 1227 nOld = 8; 1228 pCsr += 8; 1229 break; 1230 } 1231 1232 default: { 1233 int nByte; 1234 nOld = sessionVarintGet(pCsr, &nByte); 1235 pCsr += nOld; 1236 nOld += nByte; 1237 assert( eType==SQLITE_TEXT || eType==SQLITE_BLOB ); 1238 if( eType==sqlite3_value_type(p) 1239 && nByte==sqlite3_value_bytes(p) 1240 && (nByte==0 || 0==memcmp(pCsr, sqlite3_value_blob(p), nByte)) 1241 ){ 1242 bChanged = 0; 1243 } 1244 pCsr += nByte; 1245 break; 1246 } 1247 } 1248 1249 if( bChanged && pTab->abPK[ii] ){ 1250 nNew = pC->nRecord + 2; 1251 break; 1252 } 1253 1254 if( bChanged ){ 1255 nNew += 1 + nOld; 1256 sessionSerializeValue(0, p, &nNew); 1257 }else if( pTab->abPK[ii] ){ 1258 nNew += 2 + nOld; 1259 }else{ 1260 nNew += 2; 1261 } 1262 } 1263 } 1264 1265 if( nNew>pC->nMaxSize ){ 1266 int nIncr = nNew - pC->nMaxSize; 1267 pC->nMaxSize = nNew; 1268 pSession->nMaxChangesetSize += nIncr; 1269 } 1270 return SQLITE_OK; 1271 } 1272 1273 /* 1274 ** This function is only called from with a pre-update-hook reporting a 1275 ** change on table pTab (attached to session pSession). The type of change 1276 ** (UPDATE, INSERT, DELETE) is specified by the first argument. 1277 ** 1278 ** Unless one is already present or an error occurs, an entry is added 1279 ** to the changed-rows hash table associated with table pTab. 1280 */ 1281 static void sessionPreupdateOneChange( 1282 int op, /* One of SQLITE_UPDATE, INSERT, DELETE */ 1283 sqlite3_session *pSession, /* Session object pTab is attached to */ 1284 SessionTable *pTab /* Table that change applies to */ 1285 ){ 1286 int iHash; 1287 int bNull = 0; 1288 int rc = SQLITE_OK; 1289 SessionStat1Ctx stat1 = {{0,0,0,0,0},0}; 1290 1291 if( pSession->rc ) return; 1292 1293 /* Load table details if required */ 1294 if( sessionInitTable(pSession, pTab) ) return; 1295 1296 /* Check the number of columns in this xPreUpdate call matches the 1297 ** number of columns in the table. */ 1298 if( pTab->nCol!=pSession->hook.xCount(pSession->hook.pCtx) ){ 1299 pSession->rc = SQLITE_SCHEMA; 1300 return; 1301 } 1302 1303 /* Grow the hash table if required */ 1304 if( sessionGrowHash(pSession, 0, pTab) ){ 1305 pSession->rc = SQLITE_NOMEM; 1306 return; 1307 } 1308 1309 if( pTab->bStat1 ){ 1310 stat1.hook = pSession->hook; 1311 stat1.pSession = pSession; 1312 pSession->hook.pCtx = (void*)&stat1; 1313 pSession->hook.xNew = sessionStat1New; 1314 pSession->hook.xOld = sessionStat1Old; 1315 pSession->hook.xCount = sessionStat1Count; 1316 pSession->hook.xDepth = sessionStat1Depth; 1317 if( pSession->pZeroBlob==0 ){ 1318 sqlite3_value *p = sqlite3ValueNew(0); 1319 if( p==0 ){ 1320 rc = SQLITE_NOMEM; 1321 goto error_out; 1322 } 1323 sqlite3ValueSetStr(p, 0, "", 0, SQLITE_STATIC); 1324 pSession->pZeroBlob = p; 1325 } 1326 } 1327 1328 /* Calculate the hash-key for this change. If the primary key of the row 1329 ** includes a NULL value, exit early. Such changes are ignored by the 1330 ** session module. */ 1331 rc = sessionPreupdateHash(pSession, pTab, op==SQLITE_INSERT, &iHash, &bNull); 1332 if( rc!=SQLITE_OK ) goto error_out; 1333 1334 if( bNull==0 ){ 1335 /* Search the hash table for an existing record for this row. */ 1336 SessionChange *pC; 1337 for(pC=pTab->apChange[iHash]; pC; pC=pC->pNext){ 1338 if( sessionPreupdateEqual(pSession, pTab, pC, op) ) break; 1339 } 1340 1341 if( pC==0 ){ 1342 /* Create a new change object containing all the old values (if 1343 ** this is an SQLITE_UPDATE or SQLITE_DELETE), or just the PK 1344 ** values (if this is an INSERT). */ 1345 sqlite3_int64 nByte; /* Number of bytes to allocate */ 1346 int i; /* Used to iterate through columns */ 1347 1348 assert( rc==SQLITE_OK ); 1349 pTab->nEntry++; 1350 1351 /* Figure out how large an allocation is required */ 1352 nByte = sizeof(SessionChange); 1353 for(i=0; i<pTab->nCol; i++){ 1354 sqlite3_value *p = 0; 1355 if( op!=SQLITE_INSERT ){ 1356 TESTONLY(int trc = ) pSession->hook.xOld(pSession->hook.pCtx, i, &p); 1357 assert( trc==SQLITE_OK ); 1358 }else if( pTab->abPK[i] ){ 1359 TESTONLY(int trc = ) pSession->hook.xNew(pSession->hook.pCtx, i, &p); 1360 assert( trc==SQLITE_OK ); 1361 } 1362 1363 /* This may fail if SQLite value p contains a utf-16 string that must 1364 ** be converted to utf-8 and an OOM error occurs while doing so. */ 1365 rc = sessionSerializeValue(0, p, &nByte); 1366 if( rc!=SQLITE_OK ) goto error_out; 1367 } 1368 1369 /* Allocate the change object */ 1370 pC = (SessionChange *)sessionMalloc64(pSession, nByte); 1371 if( !pC ){ 1372 rc = SQLITE_NOMEM; 1373 goto error_out; 1374 }else{ 1375 memset(pC, 0, sizeof(SessionChange)); 1376 pC->aRecord = (u8 *)&pC[1]; 1377 } 1378 1379 /* Populate the change object. None of the preupdate_old(), 1380 ** preupdate_new() or SerializeValue() calls below may fail as all 1381 ** required values and encodings have already been cached in memory. 1382 ** It is not possible for an OOM to occur in this block. */ 1383 nByte = 0; 1384 for(i=0; i<pTab->nCol; i++){ 1385 sqlite3_value *p = 0; 1386 if( op!=SQLITE_INSERT ){ 1387 pSession->hook.xOld(pSession->hook.pCtx, i, &p); 1388 }else if( pTab->abPK[i] ){ 1389 pSession->hook.xNew(pSession->hook.pCtx, i, &p); 1390 } 1391 sessionSerializeValue(&pC->aRecord[nByte], p, &nByte); 1392 } 1393 1394 /* Add the change to the hash-table */ 1395 if( pSession->bIndirect || pSession->hook.xDepth(pSession->hook.pCtx) ){ 1396 pC->bIndirect = 1; 1397 } 1398 pC->nRecord = nByte; 1399 pC->op = op; 1400 pC->pNext = pTab->apChange[iHash]; 1401 pTab->apChange[iHash] = pC; 1402 1403 }else if( pC->bIndirect ){ 1404 /* If the existing change is considered "indirect", but this current 1405 ** change is "direct", mark the change object as direct. */ 1406 if( pSession->hook.xDepth(pSession->hook.pCtx)==0 1407 && pSession->bIndirect==0 1408 ){ 1409 pC->bIndirect = 0; 1410 } 1411 } 1412 1413 assert( rc==SQLITE_OK ); 1414 rc = sessionUpdateMaxSize(op, pSession, pTab, pC); 1415 } 1416 1417 1418 /* If an error has occurred, mark the session object as failed. */ 1419 error_out: 1420 if( pTab->bStat1 ){ 1421 pSession->hook = stat1.hook; 1422 } 1423 if( rc!=SQLITE_OK ){ 1424 pSession->rc = rc; 1425 } 1426 } 1427 1428 static int sessionFindTable( 1429 sqlite3_session *pSession, 1430 const char *zName, 1431 SessionTable **ppTab 1432 ){ 1433 int rc = SQLITE_OK; 1434 int nName = sqlite3Strlen30(zName); 1435 SessionTable *pRet; 1436 1437 /* Search for an existing table */ 1438 for(pRet=pSession->pTable; pRet; pRet=pRet->pNext){ 1439 if( 0==sqlite3_strnicmp(pRet->zName, zName, nName+1) ) break; 1440 } 1441 1442 if( pRet==0 && pSession->bAutoAttach ){ 1443 /* If there is a table-filter configured, invoke it. If it returns 0, 1444 ** do not automatically add the new table. */ 1445 if( pSession->xTableFilter==0 1446 || pSession->xTableFilter(pSession->pFilterCtx, zName) 1447 ){ 1448 rc = sqlite3session_attach(pSession, zName); 1449 if( rc==SQLITE_OK ){ 1450 for(pRet=pSession->pTable; pRet->pNext; pRet=pRet->pNext); 1451 assert( 0==sqlite3_strnicmp(pRet->zName, zName, nName+1) ); 1452 } 1453 } 1454 } 1455 1456 assert( rc==SQLITE_OK || pRet==0 ); 1457 *ppTab = pRet; 1458 return rc; 1459 } 1460 1461 /* 1462 ** The 'pre-update' hook registered by this module with SQLite databases. 1463 */ 1464 static void xPreUpdate( 1465 void *pCtx, /* Copy of third arg to preupdate_hook() */ 1466 sqlite3 *db, /* Database handle */ 1467 int op, /* SQLITE_UPDATE, DELETE or INSERT */ 1468 char const *zDb, /* Database name */ 1469 char const *zName, /* Table name */ 1470 sqlite3_int64 iKey1, /* Rowid of row about to be deleted/updated */ 1471 sqlite3_int64 iKey2 /* New rowid value (for a rowid UPDATE) */ 1472 ){ 1473 sqlite3_session *pSession; 1474 int nDb = sqlite3Strlen30(zDb); 1475 1476 assert( sqlite3_mutex_held(db->mutex) ); 1477 1478 for(pSession=(sqlite3_session *)pCtx; pSession; pSession=pSession->pNext){ 1479 SessionTable *pTab; 1480 1481 /* If this session is attached to a different database ("main", "temp" 1482 ** etc.), or if it is not currently enabled, there is nothing to do. Skip 1483 ** to the next session object attached to this database. */ 1484 if( pSession->bEnable==0 ) continue; 1485 if( pSession->rc ) continue; 1486 if( sqlite3_strnicmp(zDb, pSession->zDb, nDb+1) ) continue; 1487 1488 pSession->rc = sessionFindTable(pSession, zName, &pTab); 1489 if( pTab ){ 1490 assert( pSession->rc==SQLITE_OK ); 1491 sessionPreupdateOneChange(op, pSession, pTab); 1492 if( op==SQLITE_UPDATE ){ 1493 sessionPreupdateOneChange(SQLITE_INSERT, pSession, pTab); 1494 } 1495 } 1496 } 1497 } 1498 1499 /* 1500 ** The pre-update hook implementations. 1501 */ 1502 static int sessionPreupdateOld(void *pCtx, int iVal, sqlite3_value **ppVal){ 1503 return sqlite3_preupdate_old((sqlite3*)pCtx, iVal, ppVal); 1504 } 1505 static int sessionPreupdateNew(void *pCtx, int iVal, sqlite3_value **ppVal){ 1506 return sqlite3_preupdate_new((sqlite3*)pCtx, iVal, ppVal); 1507 } 1508 static int sessionPreupdateCount(void *pCtx){ 1509 return sqlite3_preupdate_count((sqlite3*)pCtx); 1510 } 1511 static int sessionPreupdateDepth(void *pCtx){ 1512 return sqlite3_preupdate_depth((sqlite3*)pCtx); 1513 } 1514 1515 /* 1516 ** Install the pre-update hooks on the session object passed as the only 1517 ** argument. 1518 */ 1519 static void sessionPreupdateHooks( 1520 sqlite3_session *pSession 1521 ){ 1522 pSession->hook.pCtx = (void*)pSession->db; 1523 pSession->hook.xOld = sessionPreupdateOld; 1524 pSession->hook.xNew = sessionPreupdateNew; 1525 pSession->hook.xCount = sessionPreupdateCount; 1526 pSession->hook.xDepth = sessionPreupdateDepth; 1527 } 1528 1529 typedef struct SessionDiffCtx SessionDiffCtx; 1530 struct SessionDiffCtx { 1531 sqlite3_stmt *pStmt; 1532 int nOldOff; 1533 }; 1534 1535 /* 1536 ** The diff hook implementations. 1537 */ 1538 static int sessionDiffOld(void *pCtx, int iVal, sqlite3_value **ppVal){ 1539 SessionDiffCtx *p = (SessionDiffCtx*)pCtx; 1540 *ppVal = sqlite3_column_value(p->pStmt, iVal+p->nOldOff); 1541 return SQLITE_OK; 1542 } 1543 static int sessionDiffNew(void *pCtx, int iVal, sqlite3_value **ppVal){ 1544 SessionDiffCtx *p = (SessionDiffCtx*)pCtx; 1545 *ppVal = sqlite3_column_value(p->pStmt, iVal); 1546 return SQLITE_OK; 1547 } 1548 static int sessionDiffCount(void *pCtx){ 1549 SessionDiffCtx *p = (SessionDiffCtx*)pCtx; 1550 return p->nOldOff ? p->nOldOff : sqlite3_column_count(p->pStmt); 1551 } 1552 static int sessionDiffDepth(void *pCtx){ 1553 return 0; 1554 } 1555 1556 /* 1557 ** Install the diff hooks on the session object passed as the only 1558 ** argument. 1559 */ 1560 static void sessionDiffHooks( 1561 sqlite3_session *pSession, 1562 SessionDiffCtx *pDiffCtx 1563 ){ 1564 pSession->hook.pCtx = (void*)pDiffCtx; 1565 pSession->hook.xOld = sessionDiffOld; 1566 pSession->hook.xNew = sessionDiffNew; 1567 pSession->hook.xCount = sessionDiffCount; 1568 pSession->hook.xDepth = sessionDiffDepth; 1569 } 1570 1571 static char *sessionExprComparePK( 1572 int nCol, 1573 const char *zDb1, const char *zDb2, 1574 const char *zTab, 1575 const char **azCol, u8 *abPK 1576 ){ 1577 int i; 1578 const char *zSep = ""; 1579 char *zRet = 0; 1580 1581 for(i=0; i<nCol; i++){ 1582 if( abPK[i] ){ 1583 zRet = sqlite3_mprintf("%z%s\"%w\".\"%w\".\"%w\"=\"%w\".\"%w\".\"%w\"", 1584 zRet, zSep, zDb1, zTab, azCol[i], zDb2, zTab, azCol[i] 1585 ); 1586 zSep = " AND "; 1587 if( zRet==0 ) break; 1588 } 1589 } 1590 1591 return zRet; 1592 } 1593 1594 static char *sessionExprCompareOther( 1595 int nCol, 1596 const char *zDb1, const char *zDb2, 1597 const char *zTab, 1598 const char **azCol, u8 *abPK 1599 ){ 1600 int i; 1601 const char *zSep = ""; 1602 char *zRet = 0; 1603 int bHave = 0; 1604 1605 for(i=0; i<nCol; i++){ 1606 if( abPK[i]==0 ){ 1607 bHave = 1; 1608 zRet = sqlite3_mprintf( 1609 "%z%s\"%w\".\"%w\".\"%w\" IS NOT \"%w\".\"%w\".\"%w\"", 1610 zRet, zSep, zDb1, zTab, azCol[i], zDb2, zTab, azCol[i] 1611 ); 1612 zSep = " OR "; 1613 if( zRet==0 ) break; 1614 } 1615 } 1616 1617 if( bHave==0 ){ 1618 assert( zRet==0 ); 1619 zRet = sqlite3_mprintf("0"); 1620 } 1621 1622 return zRet; 1623 } 1624 1625 static char *sessionSelectFindNew( 1626 int nCol, 1627 const char *zDb1, /* Pick rows in this db only */ 1628 const char *zDb2, /* But not in this one */ 1629 const char *zTbl, /* Table name */ 1630 const char *zExpr 1631 ){ 1632 char *zRet = sqlite3_mprintf( 1633 "SELECT * FROM \"%w\".\"%w\" WHERE NOT EXISTS (" 1634 " SELECT 1 FROM \"%w\".\"%w\" WHERE %s" 1635 ")", 1636 zDb1, zTbl, zDb2, zTbl, zExpr 1637 ); 1638 return zRet; 1639 } 1640 1641 static int sessionDiffFindNew( 1642 int op, 1643 sqlite3_session *pSession, 1644 SessionTable *pTab, 1645 const char *zDb1, 1646 const char *zDb2, 1647 char *zExpr 1648 ){ 1649 int rc = SQLITE_OK; 1650 char *zStmt = sessionSelectFindNew(pTab->nCol, zDb1, zDb2, pTab->zName,zExpr); 1651 1652 if( zStmt==0 ){ 1653 rc = SQLITE_NOMEM; 1654 }else{ 1655 sqlite3_stmt *pStmt; 1656 rc = sqlite3_prepare(pSession->db, zStmt, -1, &pStmt, 0); 1657 if( rc==SQLITE_OK ){ 1658 SessionDiffCtx *pDiffCtx = (SessionDiffCtx*)pSession->hook.pCtx; 1659 pDiffCtx->pStmt = pStmt; 1660 pDiffCtx->nOldOff = 0; 1661 while( SQLITE_ROW==sqlite3_step(pStmt) ){ 1662 sessionPreupdateOneChange(op, pSession, pTab); 1663 } 1664 rc = sqlite3_finalize(pStmt); 1665 } 1666 sqlite3_free(zStmt); 1667 } 1668 1669 return rc; 1670 } 1671 1672 static int sessionDiffFindModified( 1673 sqlite3_session *pSession, 1674 SessionTable *pTab, 1675 const char *zFrom, 1676 const char *zExpr 1677 ){ 1678 int rc = SQLITE_OK; 1679 1680 char *zExpr2 = sessionExprCompareOther(pTab->nCol, 1681 pSession->zDb, zFrom, pTab->zName, pTab->azCol, pTab->abPK 1682 ); 1683 if( zExpr2==0 ){ 1684 rc = SQLITE_NOMEM; 1685 }else{ 1686 char *zStmt = sqlite3_mprintf( 1687 "SELECT * FROM \"%w\".\"%w\", \"%w\".\"%w\" WHERE %s AND (%z)", 1688 pSession->zDb, pTab->zName, zFrom, pTab->zName, zExpr, zExpr2 1689 ); 1690 if( zStmt==0 ){ 1691 rc = SQLITE_NOMEM; 1692 }else{ 1693 sqlite3_stmt *pStmt; 1694 rc = sqlite3_prepare(pSession->db, zStmt, -1, &pStmt, 0); 1695 1696 if( rc==SQLITE_OK ){ 1697 SessionDiffCtx *pDiffCtx = (SessionDiffCtx*)pSession->hook.pCtx; 1698 pDiffCtx->pStmt = pStmt; 1699 pDiffCtx->nOldOff = pTab->nCol; 1700 while( SQLITE_ROW==sqlite3_step(pStmt) ){ 1701 sessionPreupdateOneChange(SQLITE_UPDATE, pSession, pTab); 1702 } 1703 rc = sqlite3_finalize(pStmt); 1704 } 1705 sqlite3_free(zStmt); 1706 } 1707 } 1708 1709 return rc; 1710 } 1711 1712 int sqlite3session_diff( 1713 sqlite3_session *pSession, 1714 const char *zFrom, 1715 const char *zTbl, 1716 char **pzErrMsg 1717 ){ 1718 const char *zDb = pSession->zDb; 1719 int rc = pSession->rc; 1720 SessionDiffCtx d; 1721 1722 memset(&d, 0, sizeof(d)); 1723 sessionDiffHooks(pSession, &d); 1724 1725 sqlite3_mutex_enter(sqlite3_db_mutex(pSession->db)); 1726 if( pzErrMsg ) *pzErrMsg = 0; 1727 if( rc==SQLITE_OK ){ 1728 char *zExpr = 0; 1729 sqlite3 *db = pSession->db; 1730 SessionTable *pTo; /* Table zTbl */ 1731 1732 /* Locate and if necessary initialize the target table object */ 1733 rc = sessionFindTable(pSession, zTbl, &pTo); 1734 if( pTo==0 ) goto diff_out; 1735 if( sessionInitTable(pSession, pTo) ){ 1736 rc = pSession->rc; 1737 goto diff_out; 1738 } 1739 1740 /* Check the table schemas match */ 1741 if( rc==SQLITE_OK ){ 1742 int bHasPk = 0; 1743 int bMismatch = 0; 1744 int nCol; /* Columns in zFrom.zTbl */ 1745 u8 *abPK; 1746 const char **azCol = 0; 1747 rc = sessionTableInfo(0, db, zFrom, zTbl, &nCol, 0, &azCol, &abPK); 1748 if( rc==SQLITE_OK ){ 1749 if( pTo->nCol!=nCol ){ 1750 bMismatch = 1; 1751 }else{ 1752 int i; 1753 for(i=0; i<nCol; i++){ 1754 if( pTo->abPK[i]!=abPK[i] ) bMismatch = 1; 1755 if( sqlite3_stricmp(azCol[i], pTo->azCol[i]) ) bMismatch = 1; 1756 if( abPK[i] ) bHasPk = 1; 1757 } 1758 } 1759 } 1760 sqlite3_free((char*)azCol); 1761 if( bMismatch ){ 1762 if( pzErrMsg ){ 1763 *pzErrMsg = sqlite3_mprintf("table schemas do not match"); 1764 } 1765 rc = SQLITE_SCHEMA; 1766 } 1767 if( bHasPk==0 ){ 1768 /* Ignore tables with no primary keys */ 1769 goto diff_out; 1770 } 1771 } 1772 1773 if( rc==SQLITE_OK ){ 1774 zExpr = sessionExprComparePK(pTo->nCol, 1775 zDb, zFrom, pTo->zName, pTo->azCol, pTo->abPK 1776 ); 1777 } 1778 1779 /* Find new rows */ 1780 if( rc==SQLITE_OK ){ 1781 rc = sessionDiffFindNew(SQLITE_INSERT, pSession, pTo, zDb, zFrom, zExpr); 1782 } 1783 1784 /* Find old rows */ 1785 if( rc==SQLITE_OK ){ 1786 rc = sessionDiffFindNew(SQLITE_DELETE, pSession, pTo, zFrom, zDb, zExpr); 1787 } 1788 1789 /* Find modified rows */ 1790 if( rc==SQLITE_OK ){ 1791 rc = sessionDiffFindModified(pSession, pTo, zFrom, zExpr); 1792 } 1793 1794 sqlite3_free(zExpr); 1795 } 1796 1797 diff_out: 1798 sessionPreupdateHooks(pSession); 1799 sqlite3_mutex_leave(sqlite3_db_mutex(pSession->db)); 1800 return rc; 1801 } 1802 1803 /* 1804 ** Create a session object. This session object will record changes to 1805 ** database zDb attached to connection db. 1806 */ 1807 int sqlite3session_create( 1808 sqlite3 *db, /* Database handle */ 1809 const char *zDb, /* Name of db (e.g. "main") */ 1810 sqlite3_session **ppSession /* OUT: New session object */ 1811 ){ 1812 sqlite3_session *pNew; /* Newly allocated session object */ 1813 sqlite3_session *pOld; /* Session object already attached to db */ 1814 int nDb = sqlite3Strlen30(zDb); /* Length of zDb in bytes */ 1815 1816 /* Zero the output value in case an error occurs. */ 1817 *ppSession = 0; 1818 1819 /* Allocate and populate the new session object. */ 1820 pNew = (sqlite3_session *)sqlite3_malloc64(sizeof(sqlite3_session) + nDb + 1); 1821 if( !pNew ) return SQLITE_NOMEM; 1822 memset(pNew, 0, sizeof(sqlite3_session)); 1823 pNew->db = db; 1824 pNew->zDb = (char *)&pNew[1]; 1825 pNew->bEnable = 1; 1826 memcpy(pNew->zDb, zDb, nDb+1); 1827 sessionPreupdateHooks(pNew); 1828 1829 /* Add the new session object to the linked list of session objects 1830 ** attached to database handle $db. Do this under the cover of the db 1831 ** handle mutex. */ 1832 sqlite3_mutex_enter(sqlite3_db_mutex(db)); 1833 pOld = (sqlite3_session*)sqlite3_preupdate_hook(db, xPreUpdate, (void*)pNew); 1834 pNew->pNext = pOld; 1835 sqlite3_mutex_leave(sqlite3_db_mutex(db)); 1836 1837 *ppSession = pNew; 1838 return SQLITE_OK; 1839 } 1840 1841 /* 1842 ** Free the list of table objects passed as the first argument. The contents 1843 ** of the changed-rows hash tables are also deleted. 1844 */ 1845 static void sessionDeleteTable(sqlite3_session *pSession, SessionTable *pList){ 1846 SessionTable *pNext; 1847 SessionTable *pTab; 1848 1849 for(pTab=pList; pTab; pTab=pNext){ 1850 int i; 1851 pNext = pTab->pNext; 1852 for(i=0; i<pTab->nChange; i++){ 1853 SessionChange *p; 1854 SessionChange *pNextChange; 1855 for(p=pTab->apChange[i]; p; p=pNextChange){ 1856 pNextChange = p->pNext; 1857 sessionFree(pSession, p); 1858 } 1859 } 1860 sessionFree(pSession, (char*)pTab->azCol); /* cast works around VC++ bug */ 1861 sessionFree(pSession, pTab->apChange); 1862 sessionFree(pSession, pTab); 1863 } 1864 } 1865 1866 /* 1867 ** Delete a session object previously allocated using sqlite3session_create(). 1868 */ 1869 void sqlite3session_delete(sqlite3_session *pSession){ 1870 sqlite3 *db = pSession->db; 1871 sqlite3_session *pHead; 1872 sqlite3_session **pp; 1873 1874 /* Unlink the session from the linked list of sessions attached to the 1875 ** database handle. Hold the db mutex while doing so. */ 1876 sqlite3_mutex_enter(sqlite3_db_mutex(db)); 1877 pHead = (sqlite3_session*)sqlite3_preupdate_hook(db, 0, 0); 1878 for(pp=&pHead; ALWAYS((*pp)!=0); pp=&((*pp)->pNext)){ 1879 if( (*pp)==pSession ){ 1880 *pp = (*pp)->pNext; 1881 if( pHead ) sqlite3_preupdate_hook(db, xPreUpdate, (void*)pHead); 1882 break; 1883 } 1884 } 1885 sqlite3_mutex_leave(sqlite3_db_mutex(db)); 1886 sqlite3ValueFree(pSession->pZeroBlob); 1887 1888 /* Delete all attached table objects. And the contents of their 1889 ** associated hash-tables. */ 1890 sessionDeleteTable(pSession, pSession->pTable); 1891 1892 /* Assert that all allocations have been freed and then free the 1893 ** session object itself. */ 1894 assert( pSession->nMalloc==0 ); 1895 sqlite3_free(pSession); 1896 } 1897 1898 /* 1899 ** Set a table filter on a Session Object. 1900 */ 1901 void sqlite3session_table_filter( 1902 sqlite3_session *pSession, 1903 int(*xFilter)(void*, const char*), 1904 void *pCtx /* First argument passed to xFilter */ 1905 ){ 1906 pSession->bAutoAttach = 1; 1907 pSession->pFilterCtx = pCtx; 1908 pSession->xTableFilter = xFilter; 1909 } 1910 1911 /* 1912 ** Attach a table to a session. All subsequent changes made to the table 1913 ** while the session object is enabled will be recorded. 1914 ** 1915 ** Only tables that have a PRIMARY KEY defined may be attached. It does 1916 ** not matter if the PRIMARY KEY is an "INTEGER PRIMARY KEY" (rowid alias) 1917 ** or not. 1918 */ 1919 int sqlite3session_attach( 1920 sqlite3_session *pSession, /* Session object */ 1921 const char *zName /* Table name */ 1922 ){ 1923 int rc = SQLITE_OK; 1924 sqlite3_mutex_enter(sqlite3_db_mutex(pSession->db)); 1925 1926 if( !zName ){ 1927 pSession->bAutoAttach = 1; 1928 }else{ 1929 SessionTable *pTab; /* New table object (if required) */ 1930 int nName; /* Number of bytes in string zName */ 1931 1932 /* First search for an existing entry. If one is found, this call is 1933 ** a no-op. Return early. */ 1934 nName = sqlite3Strlen30(zName); 1935 for(pTab=pSession->pTable; pTab; pTab=pTab->pNext){ 1936 if( 0==sqlite3_strnicmp(pTab->zName, zName, nName+1) ) break; 1937 } 1938 1939 if( !pTab ){ 1940 /* Allocate new SessionTable object. */ 1941 int nByte = sizeof(SessionTable) + nName + 1; 1942 pTab = (SessionTable*)sessionMalloc64(pSession, nByte); 1943 if( !pTab ){ 1944 rc = SQLITE_NOMEM; 1945 }else{ 1946 /* Populate the new SessionTable object and link it into the list. 1947 ** The new object must be linked onto the end of the list, not 1948 ** simply added to the start of it in order to ensure that tables 1949 ** appear in the correct order when a changeset or patchset is 1950 ** eventually generated. */ 1951 SessionTable **ppTab; 1952 memset(pTab, 0, sizeof(SessionTable)); 1953 pTab->zName = (char *)&pTab[1]; 1954 memcpy(pTab->zName, zName, nName+1); 1955 for(ppTab=&pSession->pTable; *ppTab; ppTab=&(*ppTab)->pNext); 1956 *ppTab = pTab; 1957 } 1958 } 1959 } 1960 1961 sqlite3_mutex_leave(sqlite3_db_mutex(pSession->db)); 1962 return rc; 1963 } 1964 1965 /* 1966 ** Ensure that there is room in the buffer to append nByte bytes of data. 1967 ** If not, use sqlite3_realloc() to grow the buffer so that there is. 1968 ** 1969 ** If successful, return zero. Otherwise, if an OOM condition is encountered, 1970 ** set *pRc to SQLITE_NOMEM and return non-zero. 1971 */ 1972 static int sessionBufferGrow(SessionBuffer *p, size_t nByte, int *pRc){ 1973 if( *pRc==SQLITE_OK && (size_t)(p->nAlloc-p->nBuf)<nByte ){ 1974 u8 *aNew; 1975 i64 nNew = p->nAlloc ? p->nAlloc : 128; 1976 do { 1977 nNew = nNew*2; 1978 }while( (size_t)(nNew-p->nBuf)<nByte ); 1979 1980 aNew = (u8 *)sqlite3_realloc64(p->aBuf, nNew); 1981 if( 0==aNew ){ 1982 *pRc = SQLITE_NOMEM; 1983 }else{ 1984 p->aBuf = aNew; 1985 p->nAlloc = nNew; 1986 } 1987 } 1988 return (*pRc!=SQLITE_OK); 1989 } 1990 1991 /* 1992 ** Append the value passed as the second argument to the buffer passed 1993 ** as the first. 1994 ** 1995 ** This function is a no-op if *pRc is non-zero when it is called. 1996 ** Otherwise, if an error occurs, *pRc is set to an SQLite error code 1997 ** before returning. 1998 */ 1999 static void sessionAppendValue(SessionBuffer *p, sqlite3_value *pVal, int *pRc){ 2000 int rc = *pRc; 2001 if( rc==SQLITE_OK ){ 2002 sqlite3_int64 nByte = 0; 2003 rc = sessionSerializeValue(0, pVal, &nByte); 2004 sessionBufferGrow(p, nByte, &rc); 2005 if( rc==SQLITE_OK ){ 2006 rc = sessionSerializeValue(&p->aBuf[p->nBuf], pVal, 0); 2007 p->nBuf += nByte; 2008 }else{ 2009 *pRc = rc; 2010 } 2011 } 2012 } 2013 2014 /* 2015 ** This function is a no-op if *pRc is other than SQLITE_OK when it is 2016 ** called. Otherwise, append a single byte to the buffer. 2017 ** 2018 ** If an OOM condition is encountered, set *pRc to SQLITE_NOMEM before 2019 ** returning. 2020 */ 2021 static void sessionAppendByte(SessionBuffer *p, u8 v, int *pRc){ 2022 if( 0==sessionBufferGrow(p, 1, pRc) ){ 2023 p->aBuf[p->nBuf++] = v; 2024 } 2025 } 2026 2027 /* 2028 ** This function is a no-op if *pRc is other than SQLITE_OK when it is 2029 ** called. Otherwise, append a single varint to the buffer. 2030 ** 2031 ** If an OOM condition is encountered, set *pRc to SQLITE_NOMEM before 2032 ** returning. 2033 */ 2034 static void sessionAppendVarint(SessionBuffer *p, int v, int *pRc){ 2035 if( 0==sessionBufferGrow(p, 9, pRc) ){ 2036 p->nBuf += sessionVarintPut(&p->aBuf[p->nBuf], v); 2037 } 2038 } 2039 2040 /* 2041 ** This function is a no-op if *pRc is other than SQLITE_OK when it is 2042 ** called. Otherwise, append a blob of data to the buffer. 2043 ** 2044 ** If an OOM condition is encountered, set *pRc to SQLITE_NOMEM before 2045 ** returning. 2046 */ 2047 static void sessionAppendBlob( 2048 SessionBuffer *p, 2049 const u8 *aBlob, 2050 int nBlob, 2051 int *pRc 2052 ){ 2053 if( nBlob>0 && 0==sessionBufferGrow(p, nBlob, pRc) ){ 2054 memcpy(&p->aBuf[p->nBuf], aBlob, nBlob); 2055 p->nBuf += nBlob; 2056 } 2057 } 2058 2059 /* 2060 ** This function is a no-op if *pRc is other than SQLITE_OK when it is 2061 ** called. Otherwise, append a string to the buffer. All bytes in the string 2062 ** up to (but not including) the nul-terminator are written to the buffer. 2063 ** 2064 ** If an OOM condition is encountered, set *pRc to SQLITE_NOMEM before 2065 ** returning. 2066 */ 2067 static void sessionAppendStr( 2068 SessionBuffer *p, 2069 const char *zStr, 2070 int *pRc 2071 ){ 2072 int nStr = sqlite3Strlen30(zStr); 2073 if( 0==sessionBufferGrow(p, nStr, pRc) ){ 2074 memcpy(&p->aBuf[p->nBuf], zStr, nStr); 2075 p->nBuf += nStr; 2076 } 2077 } 2078 2079 /* 2080 ** This function is a no-op if *pRc is other than SQLITE_OK when it is 2081 ** called. Otherwise, append the string representation of integer iVal 2082 ** to the buffer. No nul-terminator is written. 2083 ** 2084 ** If an OOM condition is encountered, set *pRc to SQLITE_NOMEM before 2085 ** returning. 2086 */ 2087 static void sessionAppendInteger( 2088 SessionBuffer *p, /* Buffer to append to */ 2089 int iVal, /* Value to write the string rep. of */ 2090 int *pRc /* IN/OUT: Error code */ 2091 ){ 2092 char aBuf[24]; 2093 sqlite3_snprintf(sizeof(aBuf)-1, aBuf, "%d", iVal); 2094 sessionAppendStr(p, aBuf, pRc); 2095 } 2096 2097 /* 2098 ** This function is a no-op if *pRc is other than SQLITE_OK when it is 2099 ** called. Otherwise, append the string zStr enclosed in quotes (") and 2100 ** with any embedded quote characters escaped to the buffer. No 2101 ** nul-terminator byte is written. 2102 ** 2103 ** If an OOM condition is encountered, set *pRc to SQLITE_NOMEM before 2104 ** returning. 2105 */ 2106 static void sessionAppendIdent( 2107 SessionBuffer *p, /* Buffer to a append to */ 2108 const char *zStr, /* String to quote, escape and append */ 2109 int *pRc /* IN/OUT: Error code */ 2110 ){ 2111 int nStr = sqlite3Strlen30(zStr)*2 + 2 + 1; 2112 if( 0==sessionBufferGrow(p, nStr, pRc) ){ 2113 char *zOut = (char *)&p->aBuf[p->nBuf]; 2114 const char *zIn = zStr; 2115 *zOut++ = '"'; 2116 while( *zIn ){ 2117 if( *zIn=='"' ) *zOut++ = '"'; 2118 *zOut++ = *(zIn++); 2119 } 2120 *zOut++ = '"'; 2121 p->nBuf = (int)((u8 *)zOut - p->aBuf); 2122 } 2123 } 2124 2125 /* 2126 ** This function is a no-op if *pRc is other than SQLITE_OK when it is 2127 ** called. Otherwse, it appends the serialized version of the value stored 2128 ** in column iCol of the row that SQL statement pStmt currently points 2129 ** to to the buffer. 2130 */ 2131 static void sessionAppendCol( 2132 SessionBuffer *p, /* Buffer to append to */ 2133 sqlite3_stmt *pStmt, /* Handle pointing to row containing value */ 2134 int iCol, /* Column to read value from */ 2135 int *pRc /* IN/OUT: Error code */ 2136 ){ 2137 if( *pRc==SQLITE_OK ){ 2138 int eType = sqlite3_column_type(pStmt, iCol); 2139 sessionAppendByte(p, (u8)eType, pRc); 2140 if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){ 2141 sqlite3_int64 i; 2142 u8 aBuf[8]; 2143 if( eType==SQLITE_INTEGER ){ 2144 i = sqlite3_column_int64(pStmt, iCol); 2145 }else{ 2146 double r = sqlite3_column_double(pStmt, iCol); 2147 memcpy(&i, &r, 8); 2148 } 2149 sessionPutI64(aBuf, i); 2150 sessionAppendBlob(p, aBuf, 8, pRc); 2151 } 2152 if( eType==SQLITE_BLOB || eType==SQLITE_TEXT ){ 2153 u8 *z; 2154 int nByte; 2155 if( eType==SQLITE_BLOB ){ 2156 z = (u8 *)sqlite3_column_blob(pStmt, iCol); 2157 }else{ 2158 z = (u8 *)sqlite3_column_text(pStmt, iCol); 2159 } 2160 nByte = sqlite3_column_bytes(pStmt, iCol); 2161 if( z || (eType==SQLITE_BLOB && nByte==0) ){ 2162 sessionAppendVarint(p, nByte, pRc); 2163 sessionAppendBlob(p, z, nByte, pRc); 2164 }else{ 2165 *pRc = SQLITE_NOMEM; 2166 } 2167 } 2168 } 2169 } 2170 2171 /* 2172 ** 2173 ** This function appends an update change to the buffer (see the comments 2174 ** under "CHANGESET FORMAT" at the top of the file). An update change 2175 ** consists of: 2176 ** 2177 ** 1 byte: SQLITE_UPDATE (0x17) 2178 ** n bytes: old.* record (see RECORD FORMAT) 2179 ** m bytes: new.* record (see RECORD FORMAT) 2180 ** 2181 ** The SessionChange object passed as the third argument contains the 2182 ** values that were stored in the row when the session began (the old.* 2183 ** values). The statement handle passed as the second argument points 2184 ** at the current version of the row (the new.* values). 2185 ** 2186 ** If all of the old.* values are equal to their corresponding new.* value 2187 ** (i.e. nothing has changed), then no data at all is appended to the buffer. 2188 ** 2189 ** Otherwise, the old.* record contains all primary key values and the 2190 ** original values of any fields that have been modified. The new.* record 2191 ** contains the new values of only those fields that have been modified. 2192 */ 2193 static int sessionAppendUpdate( 2194 SessionBuffer *pBuf, /* Buffer to append to */ 2195 int bPatchset, /* True for "patchset", 0 for "changeset" */ 2196 sqlite3_stmt *pStmt, /* Statement handle pointing at new row */ 2197 SessionChange *p, /* Object containing old values */ 2198 u8 *abPK /* Boolean array - true for PK columns */ 2199 ){ 2200 int rc = SQLITE_OK; 2201 SessionBuffer buf2 = {0,0,0}; /* Buffer to accumulate new.* record in */ 2202 int bNoop = 1; /* Set to zero if any values are modified */ 2203 int nRewind = pBuf->nBuf; /* Set to zero if any values are modified */ 2204 int i; /* Used to iterate through columns */ 2205 u8 *pCsr = p->aRecord; /* Used to iterate through old.* values */ 2206 2207 sessionAppendByte(pBuf, SQLITE_UPDATE, &rc); 2208 sessionAppendByte(pBuf, p->bIndirect, &rc); 2209 for(i=0; i<sqlite3_column_count(pStmt); i++){ 2210 int bChanged = 0; 2211 int nAdvance; 2212 int eType = *pCsr; 2213 switch( eType ){ 2214 case SQLITE_NULL: 2215 nAdvance = 1; 2216 if( sqlite3_column_type(pStmt, i)!=SQLITE_NULL ){ 2217 bChanged = 1; 2218 } 2219 break; 2220 2221 case SQLITE_FLOAT: 2222 case SQLITE_INTEGER: { 2223 nAdvance = 9; 2224 if( eType==sqlite3_column_type(pStmt, i) ){ 2225 sqlite3_int64 iVal = sessionGetI64(&pCsr[1]); 2226 if( eType==SQLITE_INTEGER ){ 2227 if( iVal==sqlite3_column_int64(pStmt, i) ) break; 2228 }else{ 2229 double dVal; 2230 memcpy(&dVal, &iVal, 8); 2231 if( dVal==sqlite3_column_double(pStmt, i) ) break; 2232 } 2233 } 2234 bChanged = 1; 2235 break; 2236 } 2237 2238 default: { 2239 int n; 2240 int nHdr = 1 + sessionVarintGet(&pCsr[1], &n); 2241 assert( eType==SQLITE_TEXT || eType==SQLITE_BLOB ); 2242 nAdvance = nHdr + n; 2243 if( eType==sqlite3_column_type(pStmt, i) 2244 && n==sqlite3_column_bytes(pStmt, i) 2245 && (n==0 || 0==memcmp(&pCsr[nHdr], sqlite3_column_blob(pStmt, i), n)) 2246 ){ 2247 break; 2248 } 2249 bChanged = 1; 2250 } 2251 } 2252 2253 /* If at least one field has been modified, this is not a no-op. */ 2254 if( bChanged ) bNoop = 0; 2255 2256 /* Add a field to the old.* record. This is omitted if this modules is 2257 ** currently generating a patchset. */ 2258 if( bPatchset==0 ){ 2259 if( bChanged || abPK[i] ){ 2260 sessionAppendBlob(pBuf, pCsr, nAdvance, &rc); 2261 }else{ 2262 sessionAppendByte(pBuf, 0, &rc); 2263 } 2264 } 2265 2266 /* Add a field to the new.* record. Or the only record if currently 2267 ** generating a patchset. */ 2268 if( bChanged || (bPatchset && abPK[i]) ){ 2269 sessionAppendCol(&buf2, pStmt, i, &rc); 2270 }else{ 2271 sessionAppendByte(&buf2, 0, &rc); 2272 } 2273 2274 pCsr += nAdvance; 2275 } 2276 2277 if( bNoop ){ 2278 pBuf->nBuf = nRewind; 2279 }else{ 2280 sessionAppendBlob(pBuf, buf2.aBuf, buf2.nBuf, &rc); 2281 } 2282 sqlite3_free(buf2.aBuf); 2283 2284 return rc; 2285 } 2286 2287 /* 2288 ** Append a DELETE change to the buffer passed as the first argument. Use 2289 ** the changeset format if argument bPatchset is zero, or the patchset 2290 ** format otherwise. 2291 */ 2292 static int sessionAppendDelete( 2293 SessionBuffer *pBuf, /* Buffer to append to */ 2294 int bPatchset, /* True for "patchset", 0 for "changeset" */ 2295 SessionChange *p, /* Object containing old values */ 2296 int nCol, /* Number of columns in table */ 2297 u8 *abPK /* Boolean array - true for PK columns */ 2298 ){ 2299 int rc = SQLITE_OK; 2300 2301 sessionAppendByte(pBuf, SQLITE_DELETE, &rc); 2302 sessionAppendByte(pBuf, p->bIndirect, &rc); 2303 2304 if( bPatchset==0 ){ 2305 sessionAppendBlob(pBuf, p->aRecord, p->nRecord, &rc); 2306 }else{ 2307 int i; 2308 u8 *a = p->aRecord; 2309 for(i=0; i<nCol; i++){ 2310 u8 *pStart = a; 2311 int eType = *a++; 2312 2313 switch( eType ){ 2314 case 0: 2315 case SQLITE_NULL: 2316 assert( abPK[i]==0 ); 2317 break; 2318 2319 case SQLITE_FLOAT: 2320 case SQLITE_INTEGER: 2321 a += 8; 2322 break; 2323 2324 default: { 2325 int n; 2326 a += sessionVarintGet(a, &n); 2327 a += n; 2328 break; 2329 } 2330 } 2331 if( abPK[i] ){ 2332 sessionAppendBlob(pBuf, pStart, (int)(a-pStart), &rc); 2333 } 2334 } 2335 assert( (a - p->aRecord)==p->nRecord ); 2336 } 2337 2338 return rc; 2339 } 2340 2341 /* 2342 ** Formulate and prepare a SELECT statement to retrieve a row from table 2343 ** zTab in database zDb based on its primary key. i.e. 2344 ** 2345 ** SELECT * FROM zDb.zTab WHERE pk1 = ? AND pk2 = ? AND ... 2346 */ 2347 static int sessionSelectStmt( 2348 sqlite3 *db, /* Database handle */ 2349 const char *zDb, /* Database name */ 2350 const char *zTab, /* Table name */ 2351 int nCol, /* Number of columns in table */ 2352 const char **azCol, /* Names of table columns */ 2353 u8 *abPK, /* PRIMARY KEY array */ 2354 sqlite3_stmt **ppStmt /* OUT: Prepared SELECT statement */ 2355 ){ 2356 int rc = SQLITE_OK; 2357 char *zSql = 0; 2358 int nSql = -1; 2359 2360 if( 0==sqlite3_stricmp("sqlite_stat1", zTab) ){ 2361 zSql = sqlite3_mprintf( 2362 "SELECT tbl, ?2, stat FROM %Q.sqlite_stat1 WHERE tbl IS ?1 AND " 2363 "idx IS (CASE WHEN ?2=X'' THEN NULL ELSE ?2 END)", zDb 2364 ); 2365 if( zSql==0 ) rc = SQLITE_NOMEM; 2366 }else{ 2367 int i; 2368 const char *zSep = ""; 2369 SessionBuffer buf = {0, 0, 0}; 2370 2371 sessionAppendStr(&buf, "SELECT * FROM ", &rc); 2372 sessionAppendIdent(&buf, zDb, &rc); 2373 sessionAppendStr(&buf, ".", &rc); 2374 sessionAppendIdent(&buf, zTab, &rc); 2375 sessionAppendStr(&buf, " WHERE ", &rc); 2376 for(i=0; i<nCol; i++){ 2377 if( abPK[i] ){ 2378 sessionAppendStr(&buf, zSep, &rc); 2379 sessionAppendIdent(&buf, azCol[i], &rc); 2380 sessionAppendStr(&buf, " IS ?", &rc); 2381 sessionAppendInteger(&buf, i+1, &rc); 2382 zSep = " AND "; 2383 } 2384 } 2385 zSql = (char*)buf.aBuf; 2386 nSql = buf.nBuf; 2387 } 2388 2389 if( rc==SQLITE_OK ){ 2390 rc = sqlite3_prepare_v2(db, zSql, nSql, ppStmt, 0); 2391 } 2392 sqlite3_free(zSql); 2393 return rc; 2394 } 2395 2396 /* 2397 ** Bind the PRIMARY KEY values from the change passed in argument pChange 2398 ** to the SELECT statement passed as the first argument. The SELECT statement 2399 ** is as prepared by function sessionSelectStmt(). 2400 ** 2401 ** Return SQLITE_OK if all PK values are successfully bound, or an SQLite 2402 ** error code (e.g. SQLITE_NOMEM) otherwise. 2403 */ 2404 static int sessionSelectBind( 2405 sqlite3_stmt *pSelect, /* SELECT from sessionSelectStmt() */ 2406 int nCol, /* Number of columns in table */ 2407 u8 *abPK, /* PRIMARY KEY array */ 2408 SessionChange *pChange /* Change structure */ 2409 ){ 2410 int i; 2411 int rc = SQLITE_OK; 2412 u8 *a = pChange->aRecord; 2413 2414 for(i=0; i<nCol && rc==SQLITE_OK; i++){ 2415 int eType = *a++; 2416 2417 switch( eType ){ 2418 case 0: 2419 case SQLITE_NULL: 2420 assert( abPK[i]==0 ); 2421 break; 2422 2423 case SQLITE_INTEGER: { 2424 if( abPK[i] ){ 2425 i64 iVal = sessionGetI64(a); 2426 rc = sqlite3_bind_int64(pSelect, i+1, iVal); 2427 } 2428 a += 8; 2429 break; 2430 } 2431 2432 case SQLITE_FLOAT: { 2433 if( abPK[i] ){ 2434 double rVal; 2435 i64 iVal = sessionGetI64(a); 2436 memcpy(&rVal, &iVal, 8); 2437 rc = sqlite3_bind_double(pSelect, i+1, rVal); 2438 } 2439 a += 8; 2440 break; 2441 } 2442 2443 case SQLITE_TEXT: { 2444 int n; 2445 a += sessionVarintGet(a, &n); 2446 if( abPK[i] ){ 2447 rc = sqlite3_bind_text(pSelect, i+1, (char *)a, n, SQLITE_TRANSIENT); 2448 } 2449 a += n; 2450 break; 2451 } 2452 2453 default: { 2454 int n; 2455 assert( eType==SQLITE_BLOB ); 2456 a += sessionVarintGet(a, &n); 2457 if( abPK[i] ){ 2458 rc = sqlite3_bind_blob(pSelect, i+1, a, n, SQLITE_TRANSIENT); 2459 } 2460 a += n; 2461 break; 2462 } 2463 } 2464 } 2465 2466 return rc; 2467 } 2468 2469 /* 2470 ** This function is a no-op if *pRc is set to other than SQLITE_OK when it 2471 ** is called. Otherwise, append a serialized table header (part of the binary 2472 ** changeset format) to buffer *pBuf. If an error occurs, set *pRc to an 2473 ** SQLite error code before returning. 2474 */ 2475 static void sessionAppendTableHdr( 2476 SessionBuffer *pBuf, /* Append header to this buffer */ 2477 int bPatchset, /* Use the patchset format if true */ 2478 SessionTable *pTab, /* Table object to append header for */ 2479 int *pRc /* IN/OUT: Error code */ 2480 ){ 2481 /* Write a table header */ 2482 sessionAppendByte(pBuf, (bPatchset ? 'P' : 'T'), pRc); 2483 sessionAppendVarint(pBuf, pTab->nCol, pRc); 2484 sessionAppendBlob(pBuf, pTab->abPK, pTab->nCol, pRc); 2485 sessionAppendBlob(pBuf, (u8 *)pTab->zName, (int)strlen(pTab->zName)+1, pRc); 2486 } 2487 2488 /* 2489 ** Generate either a changeset (if argument bPatchset is zero) or a patchset 2490 ** (if it is non-zero) based on the current contents of the session object 2491 ** passed as the first argument. 2492 ** 2493 ** If no error occurs, SQLITE_OK is returned and the new changeset/patchset 2494 ** stored in output variables *pnChangeset and *ppChangeset. Or, if an error 2495 ** occurs, an SQLite error code is returned and both output variables set 2496 ** to 0. 2497 */ 2498 static int sessionGenerateChangeset( 2499 sqlite3_session *pSession, /* Session object */ 2500 int bPatchset, /* True for patchset, false for changeset */ 2501 int (*xOutput)(void *pOut, const void *pData, int nData), 2502 void *pOut, /* First argument for xOutput */ 2503 int *pnChangeset, /* OUT: Size of buffer at *ppChangeset */ 2504 void **ppChangeset /* OUT: Buffer containing changeset */ 2505 ){ 2506 sqlite3 *db = pSession->db; /* Source database handle */ 2507 SessionTable *pTab; /* Used to iterate through attached tables */ 2508 SessionBuffer buf = {0,0,0}; /* Buffer in which to accumlate changeset */ 2509 int rc; /* Return code */ 2510 2511 assert( xOutput==0 || (pnChangeset==0 && ppChangeset==0 ) ); 2512 2513 /* Zero the output variables in case an error occurs. If this session 2514 ** object is already in the error state (sqlite3_session.rc != SQLITE_OK), 2515 ** this call will be a no-op. */ 2516 if( xOutput==0 ){ 2517 *pnChangeset = 0; 2518 *ppChangeset = 0; 2519 } 2520 2521 if( pSession->rc ) return pSession->rc; 2522 rc = sqlite3_exec(pSession->db, "SAVEPOINT changeset", 0, 0, 0); 2523 if( rc!=SQLITE_OK ) return rc; 2524 2525 sqlite3_mutex_enter(sqlite3_db_mutex(db)); 2526 2527 for(pTab=pSession->pTable; rc==SQLITE_OK && pTab; pTab=pTab->pNext){ 2528 if( pTab->nEntry ){ 2529 const char *zName = pTab->zName; 2530 int nCol; /* Number of columns in table */ 2531 u8 *abPK; /* Primary key array */ 2532 const char **azCol = 0; /* Table columns */ 2533 int i; /* Used to iterate through hash buckets */ 2534 sqlite3_stmt *pSel = 0; /* SELECT statement to query table pTab */ 2535 int nRewind = buf.nBuf; /* Initial size of write buffer */ 2536 int nNoop; /* Size of buffer after writing tbl header */ 2537 2538 /* Check the table schema is still Ok. */ 2539 rc = sessionTableInfo(0, db, pSession->zDb, zName, &nCol, 0,&azCol,&abPK); 2540 if( !rc && (pTab->nCol!=nCol || memcmp(abPK, pTab->abPK, nCol)) ){ 2541 rc = SQLITE_SCHEMA; 2542 } 2543 2544 /* Write a table header */ 2545 sessionAppendTableHdr(&buf, bPatchset, pTab, &rc); 2546 2547 /* Build and compile a statement to execute: */ 2548 if( rc==SQLITE_OK ){ 2549 rc = sessionSelectStmt( 2550 db, pSession->zDb, zName, nCol, azCol, abPK, &pSel); 2551 } 2552 2553 nNoop = buf.nBuf; 2554 for(i=0; i<pTab->nChange && rc==SQLITE_OK; i++){ 2555 SessionChange *p; /* Used to iterate through changes */ 2556 2557 for(p=pTab->apChange[i]; rc==SQLITE_OK && p; p=p->pNext){ 2558 rc = sessionSelectBind(pSel, nCol, abPK, p); 2559 if( rc!=SQLITE_OK ) continue; 2560 if( sqlite3_step(pSel)==SQLITE_ROW ){ 2561 if( p->op==SQLITE_INSERT ){ 2562 int iCol; 2563 sessionAppendByte(&buf, SQLITE_INSERT, &rc); 2564 sessionAppendByte(&buf, p->bIndirect, &rc); 2565 for(iCol=0; iCol<nCol; iCol++){ 2566 sessionAppendCol(&buf, pSel, iCol, &rc); 2567 } 2568 }else{ 2569 rc = sessionAppendUpdate(&buf, bPatchset, pSel, p, abPK); 2570 } 2571 }else if( p->op!=SQLITE_INSERT ){ 2572 rc = sessionAppendDelete(&buf, bPatchset, p, nCol, abPK); 2573 } 2574 if( rc==SQLITE_OK ){ 2575 rc = sqlite3_reset(pSel); 2576 } 2577 2578 /* If the buffer is now larger than sessions_strm_chunk_size, pass 2579 ** its contents to the xOutput() callback. */ 2580 if( xOutput 2581 && rc==SQLITE_OK 2582 && buf.nBuf>nNoop 2583 && buf.nBuf>sessions_strm_chunk_size 2584 ){ 2585 rc = xOutput(pOut, (void*)buf.aBuf, buf.nBuf); 2586 nNoop = -1; 2587 buf.nBuf = 0; 2588 } 2589 2590 } 2591 } 2592 2593 sqlite3_finalize(pSel); 2594 if( buf.nBuf==nNoop ){ 2595 buf.nBuf = nRewind; 2596 } 2597 sqlite3_free((char*)azCol); /* cast works around VC++ bug */ 2598 } 2599 } 2600 2601 if( rc==SQLITE_OK ){ 2602 if( xOutput==0 ){ 2603 *pnChangeset = buf.nBuf; 2604 *ppChangeset = buf.aBuf; 2605 buf.aBuf = 0; 2606 }else if( buf.nBuf>0 ){ 2607 rc = xOutput(pOut, (void*)buf.aBuf, buf.nBuf); 2608 } 2609 } 2610 2611 sqlite3_free(buf.aBuf); 2612 sqlite3_exec(db, "RELEASE changeset", 0, 0, 0); 2613 sqlite3_mutex_leave(sqlite3_db_mutex(db)); 2614 return rc; 2615 } 2616 2617 /* 2618 ** Obtain a changeset object containing all changes recorded by the 2619 ** session object passed as the first argument. 2620 ** 2621 ** It is the responsibility of the caller to eventually free the buffer 2622 ** using sqlite3_free(). 2623 */ 2624 int sqlite3session_changeset( 2625 sqlite3_session *pSession, /* Session object */ 2626 int *pnChangeset, /* OUT: Size of buffer at *ppChangeset */ 2627 void **ppChangeset /* OUT: Buffer containing changeset */ 2628 ){ 2629 int rc = sessionGenerateChangeset(pSession, 0, 0, 0, pnChangeset,ppChangeset); 2630 assert( rc || pnChangeset==0 || *pnChangeset<=pSession->nMaxChangesetSize ); 2631 return rc; 2632 } 2633 2634 /* 2635 ** Streaming version of sqlite3session_changeset(). 2636 */ 2637 int sqlite3session_changeset_strm( 2638 sqlite3_session *pSession, 2639 int (*xOutput)(void *pOut, const void *pData, int nData), 2640 void *pOut 2641 ){ 2642 return sessionGenerateChangeset(pSession, 0, xOutput, pOut, 0, 0); 2643 } 2644 2645 /* 2646 ** Streaming version of sqlite3session_patchset(). 2647 */ 2648 int sqlite3session_patchset_strm( 2649 sqlite3_session *pSession, 2650 int (*xOutput)(void *pOut, const void *pData, int nData), 2651 void *pOut 2652 ){ 2653 return sessionGenerateChangeset(pSession, 1, xOutput, pOut, 0, 0); 2654 } 2655 2656 /* 2657 ** Obtain a patchset object containing all changes recorded by the 2658 ** session object passed as the first argument. 2659 ** 2660 ** It is the responsibility of the caller to eventually free the buffer 2661 ** using sqlite3_free(). 2662 */ 2663 int sqlite3session_patchset( 2664 sqlite3_session *pSession, /* Session object */ 2665 int *pnPatchset, /* OUT: Size of buffer at *ppChangeset */ 2666 void **ppPatchset /* OUT: Buffer containing changeset */ 2667 ){ 2668 return sessionGenerateChangeset(pSession, 1, 0, 0, pnPatchset, ppPatchset); 2669 } 2670 2671 /* 2672 ** Enable or disable the session object passed as the first argument. 2673 */ 2674 int sqlite3session_enable(sqlite3_session *pSession, int bEnable){ 2675 int ret; 2676 sqlite3_mutex_enter(sqlite3_db_mutex(pSession->db)); 2677 if( bEnable>=0 ){ 2678 pSession->bEnable = bEnable; 2679 } 2680 ret = pSession->bEnable; 2681 sqlite3_mutex_leave(sqlite3_db_mutex(pSession->db)); 2682 return ret; 2683 } 2684 2685 /* 2686 ** Enable or disable the session object passed as the first argument. 2687 */ 2688 int sqlite3session_indirect(sqlite3_session *pSession, int bIndirect){ 2689 int ret; 2690 sqlite3_mutex_enter(sqlite3_db_mutex(pSession->db)); 2691 if( bIndirect>=0 ){ 2692 pSession->bIndirect = bIndirect; 2693 } 2694 ret = pSession->bIndirect; 2695 sqlite3_mutex_leave(sqlite3_db_mutex(pSession->db)); 2696 return ret; 2697 } 2698 2699 /* 2700 ** Return true if there have been no changes to monitored tables recorded 2701 ** by the session object passed as the only argument. 2702 */ 2703 int sqlite3session_isempty(sqlite3_session *pSession){ 2704 int ret = 0; 2705 SessionTable *pTab; 2706 2707 sqlite3_mutex_enter(sqlite3_db_mutex(pSession->db)); 2708 for(pTab=pSession->pTable; pTab && ret==0; pTab=pTab->pNext){ 2709 ret = (pTab->nEntry>0); 2710 } 2711 sqlite3_mutex_leave(sqlite3_db_mutex(pSession->db)); 2712 2713 return (ret==0); 2714 } 2715 2716 /* 2717 ** Return the amount of heap memory in use. 2718 */ 2719 sqlite3_int64 sqlite3session_memory_used(sqlite3_session *pSession){ 2720 return pSession->nMalloc; 2721 } 2722 2723 /* 2724 ** Return the maximum size of sqlite3session_changeset() output. 2725 */ 2726 sqlite3_int64 sqlite3session_changeset_size(sqlite3_session *pSession){ 2727 return pSession->nMaxChangesetSize; 2728 } 2729 2730 /* 2731 ** Do the work for either sqlite3changeset_start() or start_strm(). 2732 */ 2733 static int sessionChangesetStart( 2734 sqlite3_changeset_iter **pp, /* OUT: Changeset iterator handle */ 2735 int (*xInput)(void *pIn, void *pData, int *pnData), 2736 void *pIn, 2737 int nChangeset, /* Size of buffer pChangeset in bytes */ 2738 void *pChangeset, /* Pointer to buffer containing changeset */ 2739 int bInvert, /* True to invert changeset */ 2740 int bSkipEmpty /* True to skip empty UPDATE changes */ 2741 ){ 2742 sqlite3_changeset_iter *pRet; /* Iterator to return */ 2743 int nByte; /* Number of bytes to allocate for iterator */ 2744 2745 assert( xInput==0 || (pChangeset==0 && nChangeset==0) ); 2746 2747 /* Zero the output variable in case an error occurs. */ 2748 *pp = 0; 2749 2750 /* Allocate and initialize the iterator structure. */ 2751 nByte = sizeof(sqlite3_changeset_iter); 2752 pRet = (sqlite3_changeset_iter *)sqlite3_malloc(nByte); 2753 if( !pRet ) return SQLITE_NOMEM; 2754 memset(pRet, 0, sizeof(sqlite3_changeset_iter)); 2755 pRet->in.aData = (u8 *)pChangeset; 2756 pRet->in.nData = nChangeset; 2757 pRet->in.xInput = xInput; 2758 pRet->in.pIn = pIn; 2759 pRet->in.bEof = (xInput ? 0 : 1); 2760 pRet->bInvert = bInvert; 2761 pRet->bSkipEmpty = bSkipEmpty; 2762 2763 /* Populate the output variable and return success. */ 2764 *pp = pRet; 2765 return SQLITE_OK; 2766 } 2767 2768 /* 2769 ** Create an iterator used to iterate through the contents of a changeset. 2770 */ 2771 int sqlite3changeset_start( 2772 sqlite3_changeset_iter **pp, /* OUT: Changeset iterator handle */ 2773 int nChangeset, /* Size of buffer pChangeset in bytes */ 2774 void *pChangeset /* Pointer to buffer containing changeset */ 2775 ){ 2776 return sessionChangesetStart(pp, 0, 0, nChangeset, pChangeset, 0, 0); 2777 } 2778 int sqlite3changeset_start_v2( 2779 sqlite3_changeset_iter **pp, /* OUT: Changeset iterator handle */ 2780 int nChangeset, /* Size of buffer pChangeset in bytes */ 2781 void *pChangeset, /* Pointer to buffer containing changeset */ 2782 int flags 2783 ){ 2784 int bInvert = !!(flags & SQLITE_CHANGESETSTART_INVERT); 2785 return sessionChangesetStart(pp, 0, 0, nChangeset, pChangeset, bInvert, 0); 2786 } 2787 2788 /* 2789 ** Streaming version of sqlite3changeset_start(). 2790 */ 2791 int sqlite3changeset_start_strm( 2792 sqlite3_changeset_iter **pp, /* OUT: Changeset iterator handle */ 2793 int (*xInput)(void *pIn, void *pData, int *pnData), 2794 void *pIn 2795 ){ 2796 return sessionChangesetStart(pp, xInput, pIn, 0, 0, 0, 0); 2797 } 2798 int sqlite3changeset_start_v2_strm( 2799 sqlite3_changeset_iter **pp, /* OUT: Changeset iterator handle */ 2800 int (*xInput)(void *pIn, void *pData, int *pnData), 2801 void *pIn, 2802 int flags 2803 ){ 2804 int bInvert = !!(flags & SQLITE_CHANGESETSTART_INVERT); 2805 return sessionChangesetStart(pp, xInput, pIn, 0, 0, bInvert, 0); 2806 } 2807 2808 /* 2809 ** If the SessionInput object passed as the only argument is a streaming 2810 ** object and the buffer is full, discard some data to free up space. 2811 */ 2812 static void sessionDiscardData(SessionInput *pIn){ 2813 if( pIn->xInput && pIn->iNext>=sessions_strm_chunk_size ){ 2814 int nMove = pIn->buf.nBuf - pIn->iNext; 2815 assert( nMove>=0 ); 2816 if( nMove>0 ){ 2817 memmove(pIn->buf.aBuf, &pIn->buf.aBuf[pIn->iNext], nMove); 2818 } 2819 pIn->buf.nBuf -= pIn->iNext; 2820 pIn->iNext = 0; 2821 pIn->nData = pIn->buf.nBuf; 2822 } 2823 } 2824 2825 /* 2826 ** Ensure that there are at least nByte bytes available in the buffer. Or, 2827 ** if there are not nByte bytes remaining in the input, that all available 2828 ** data is in the buffer. 2829 ** 2830 ** Return an SQLite error code if an error occurs, or SQLITE_OK otherwise. 2831 */ 2832 static int sessionInputBuffer(SessionInput *pIn, int nByte){ 2833 int rc = SQLITE_OK; 2834 if( pIn->xInput ){ 2835 while( !pIn->bEof && (pIn->iNext+nByte)>=pIn->nData && rc==SQLITE_OK ){ 2836 int nNew = sessions_strm_chunk_size; 2837 2838 if( pIn->bNoDiscard==0 ) sessionDiscardData(pIn); 2839 if( SQLITE_OK==sessionBufferGrow(&pIn->buf, nNew, &rc) ){ 2840 rc = pIn->xInput(pIn->pIn, &pIn->buf.aBuf[pIn->buf.nBuf], &nNew); 2841 if( nNew==0 ){ 2842 pIn->bEof = 1; 2843 }else{ 2844 pIn->buf.nBuf += nNew; 2845 } 2846 } 2847 2848 pIn->aData = pIn->buf.aBuf; 2849 pIn->nData = pIn->buf.nBuf; 2850 } 2851 } 2852 return rc; 2853 } 2854 2855 /* 2856 ** When this function is called, *ppRec points to the start of a record 2857 ** that contains nCol values. This function advances the pointer *ppRec 2858 ** until it points to the byte immediately following that record. 2859 */ 2860 static void sessionSkipRecord( 2861 u8 **ppRec, /* IN/OUT: Record pointer */ 2862 int nCol /* Number of values in record */ 2863 ){ 2864 u8 *aRec = *ppRec; 2865 int i; 2866 for(i=0; i<nCol; i++){ 2867 int eType = *aRec++; 2868 if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){ 2869 int nByte; 2870 aRec += sessionVarintGet((u8*)aRec, &nByte); 2871 aRec += nByte; 2872 }else if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){ 2873 aRec += 8; 2874 } 2875 } 2876 2877 *ppRec = aRec; 2878 } 2879 2880 /* 2881 ** This function sets the value of the sqlite3_value object passed as the 2882 ** first argument to a copy of the string or blob held in the aData[] 2883 ** buffer. SQLITE_OK is returned if successful, or SQLITE_NOMEM if an OOM 2884 ** error occurs. 2885 */ 2886 static int sessionValueSetStr( 2887 sqlite3_value *pVal, /* Set the value of this object */ 2888 u8 *aData, /* Buffer containing string or blob data */ 2889 int nData, /* Size of buffer aData[] in bytes */ 2890 u8 enc /* String encoding (0 for blobs) */ 2891 ){ 2892 /* In theory this code could just pass SQLITE_TRANSIENT as the final 2893 ** argument to sqlite3ValueSetStr() and have the copy created 2894 ** automatically. But doing so makes it difficult to detect any OOM 2895 ** error. Hence the code to create the copy externally. */ 2896 u8 *aCopy = sqlite3_malloc64((sqlite3_int64)nData+1); 2897 if( aCopy==0 ) return SQLITE_NOMEM; 2898 memcpy(aCopy, aData, nData); 2899 sqlite3ValueSetStr(pVal, nData, (char*)aCopy, enc, sqlite3_free); 2900 return SQLITE_OK; 2901 } 2902 2903 /* 2904 ** Deserialize a single record from a buffer in memory. See "RECORD FORMAT" 2905 ** for details. 2906 ** 2907 ** When this function is called, *paChange points to the start of the record 2908 ** to deserialize. Assuming no error occurs, *paChange is set to point to 2909 ** one byte after the end of the same record before this function returns. 2910 ** If the argument abPK is NULL, then the record contains nCol values. Or, 2911 ** if abPK is other than NULL, then the record contains only the PK fields 2912 ** (in other words, it is a patchset DELETE record). 2913 ** 2914 ** If successful, each element of the apOut[] array (allocated by the caller) 2915 ** is set to point to an sqlite3_value object containing the value read 2916 ** from the corresponding position in the record. If that value is not 2917 ** included in the record (i.e. because the record is part of an UPDATE change 2918 ** and the field was not modified), the corresponding element of apOut[] is 2919 ** set to NULL. 2920 ** 2921 ** It is the responsibility of the caller to free all sqlite_value structures 2922 ** using sqlite3_free(). 2923 ** 2924 ** If an error occurs, an SQLite error code (e.g. SQLITE_NOMEM) is returned. 2925 ** The apOut[] array may have been partially populated in this case. 2926 */ 2927 static int sessionReadRecord( 2928 SessionInput *pIn, /* Input data */ 2929 int nCol, /* Number of values in record */ 2930 u8 *abPK, /* Array of primary key flags, or NULL */ 2931 sqlite3_value **apOut, /* Write values to this array */ 2932 int *pbEmpty 2933 ){ 2934 int i; /* Used to iterate through columns */ 2935 int rc = SQLITE_OK; 2936 2937 assert( pbEmpty==0 || *pbEmpty==0 ); 2938 if( pbEmpty ) *pbEmpty = 1; 2939 for(i=0; i<nCol && rc==SQLITE_OK; i++){ 2940 int eType = 0; /* Type of value (SQLITE_NULL, TEXT etc.) */ 2941 if( abPK && abPK[i]==0 ) continue; 2942 rc = sessionInputBuffer(pIn, 9); 2943 if( rc==SQLITE_OK ){ 2944 if( pIn->iNext>=pIn->nData ){ 2945 rc = SQLITE_CORRUPT_BKPT; 2946 }else{ 2947 eType = pIn->aData[pIn->iNext++]; 2948 assert( apOut[i]==0 ); 2949 if( eType ){ 2950 if( pbEmpty ) *pbEmpty = 0; 2951 apOut[i] = sqlite3ValueNew(0); 2952 if( !apOut[i] ) rc = SQLITE_NOMEM; 2953 } 2954 } 2955 } 2956 2957 if( rc==SQLITE_OK ){ 2958 u8 *aVal = &pIn->aData[pIn->iNext]; 2959 if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){ 2960 int nByte; 2961 pIn->iNext += sessionVarintGet(aVal, &nByte); 2962 rc = sessionInputBuffer(pIn, nByte); 2963 if( rc==SQLITE_OK ){ 2964 if( nByte<0 || nByte>pIn->nData-pIn->iNext ){ 2965 rc = SQLITE_CORRUPT_BKPT; 2966 }else{ 2967 u8 enc = (eType==SQLITE_TEXT ? SQLITE_UTF8 : 0); 2968 rc = sessionValueSetStr(apOut[i],&pIn->aData[pIn->iNext],nByte,enc); 2969 pIn->iNext += nByte; 2970 } 2971 } 2972 } 2973 if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){ 2974 sqlite3_int64 v = sessionGetI64(aVal); 2975 if( eType==SQLITE_INTEGER ){ 2976 sqlite3VdbeMemSetInt64(apOut[i], v); 2977 }else{ 2978 double d; 2979 memcpy(&d, &v, 8); 2980 sqlite3VdbeMemSetDouble(apOut[i], d); 2981 } 2982 pIn->iNext += 8; 2983 } 2984 } 2985 } 2986 2987 return rc; 2988 } 2989 2990 /* 2991 ** The input pointer currently points to the second byte of a table-header. 2992 ** Specifically, to the following: 2993 ** 2994 ** + number of columns in table (varint) 2995 ** + array of PK flags (1 byte per column), 2996 ** + table name (nul terminated). 2997 ** 2998 ** This function ensures that all of the above is present in the input 2999 ** buffer (i.e. that it can be accessed without any calls to xInput()). 3000 ** If successful, SQLITE_OK is returned. Otherwise, an SQLite error code. 3001 ** The input pointer is not moved. 3002 */ 3003 static int sessionChangesetBufferTblhdr(SessionInput *pIn, int *pnByte){ 3004 int rc = SQLITE_OK; 3005 int nCol = 0; 3006 int nRead = 0; 3007 3008 rc = sessionInputBuffer(pIn, 9); 3009 if( rc==SQLITE_OK ){ 3010 nRead += sessionVarintGet(&pIn->aData[pIn->iNext + nRead], &nCol); 3011 /* The hard upper limit for the number of columns in an SQLite 3012 ** database table is, according to sqliteLimit.h, 32676. So 3013 ** consider any table-header that purports to have more than 65536 3014 ** columns to be corrupt. This is convenient because otherwise, 3015 ** if the (nCol>65536) condition below were omitted, a sufficiently 3016 ** large value for nCol may cause nRead to wrap around and become 3017 ** negative. Leading to a crash. */ 3018 if( nCol<0 || nCol>65536 ){ 3019 rc = SQLITE_CORRUPT_BKPT; 3020 }else{ 3021 rc = sessionInputBuffer(pIn, nRead+nCol+100); 3022 nRead += nCol; 3023 } 3024 } 3025 3026 while( rc==SQLITE_OK ){ 3027 while( (pIn->iNext + nRead)<pIn->nData && pIn->aData[pIn->iNext + nRead] ){ 3028 nRead++; 3029 } 3030 if( (pIn->iNext + nRead)<pIn->nData ) break; 3031 rc = sessionInputBuffer(pIn, nRead + 100); 3032 } 3033 *pnByte = nRead+1; 3034 return rc; 3035 } 3036 3037 /* 3038 ** The input pointer currently points to the first byte of the first field 3039 ** of a record consisting of nCol columns. This function ensures the entire 3040 ** record is buffered. It does not move the input pointer. 3041 ** 3042 ** If successful, SQLITE_OK is returned and *pnByte is set to the size of 3043 ** the record in bytes. Otherwise, an SQLite error code is returned. The 3044 ** final value of *pnByte is undefined in this case. 3045 */ 3046 static int sessionChangesetBufferRecord( 3047 SessionInput *pIn, /* Input data */ 3048 int nCol, /* Number of columns in record */ 3049 int *pnByte /* OUT: Size of record in bytes */ 3050 ){ 3051 int rc = SQLITE_OK; 3052 int nByte = 0; 3053 int i; 3054 for(i=0; rc==SQLITE_OK && i<nCol; i++){ 3055 int eType; 3056 rc = sessionInputBuffer(pIn, nByte + 10); 3057 if( rc==SQLITE_OK ){ 3058 eType = pIn->aData[pIn->iNext + nByte++]; 3059 if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){ 3060 int n; 3061 nByte += sessionVarintGet(&pIn->aData[pIn->iNext+nByte], &n); 3062 nByte += n; 3063 rc = sessionInputBuffer(pIn, nByte); 3064 }else if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){ 3065 nByte += 8; 3066 } 3067 } 3068 } 3069 *pnByte = nByte; 3070 return rc; 3071 } 3072 3073 /* 3074 ** The input pointer currently points to the second byte of a table-header. 3075 ** Specifically, to the following: 3076 ** 3077 ** + number of columns in table (varint) 3078 ** + array of PK flags (1 byte per column), 3079 ** + table name (nul terminated). 3080 ** 3081 ** This function decodes the table-header and populates the p->nCol, 3082 ** p->zTab and p->abPK[] variables accordingly. The p->apValue[] array is 3083 ** also allocated or resized according to the new value of p->nCol. The 3084 ** input pointer is left pointing to the byte following the table header. 3085 ** 3086 ** If successful, SQLITE_OK is returned. Otherwise, an SQLite error code 3087 ** is returned and the final values of the various fields enumerated above 3088 ** are undefined. 3089 */ 3090 static int sessionChangesetReadTblhdr(sqlite3_changeset_iter *p){ 3091 int rc; 3092 int nCopy; 3093 assert( p->rc==SQLITE_OK ); 3094 3095 rc = sessionChangesetBufferTblhdr(&p->in, &nCopy); 3096 if( rc==SQLITE_OK ){ 3097 int nByte; 3098 int nVarint; 3099 nVarint = sessionVarintGet(&p->in.aData[p->in.iNext], &p->nCol); 3100 if( p->nCol>0 ){ 3101 nCopy -= nVarint; 3102 p->in.iNext += nVarint; 3103 nByte = p->nCol * sizeof(sqlite3_value*) * 2 + nCopy; 3104 p->tblhdr.nBuf = 0; 3105 sessionBufferGrow(&p->tblhdr, nByte, &rc); 3106 }else{ 3107 rc = SQLITE_CORRUPT_BKPT; 3108 } 3109 } 3110 3111 if( rc==SQLITE_OK ){ 3112 size_t iPK = sizeof(sqlite3_value*)*p->nCol*2; 3113 memset(p->tblhdr.aBuf, 0, iPK); 3114 memcpy(&p->tblhdr.aBuf[iPK], &p->in.aData[p->in.iNext], nCopy); 3115 p->in.iNext += nCopy; 3116 } 3117 3118 p->apValue = (sqlite3_value**)p->tblhdr.aBuf; 3119 if( p->apValue==0 ){ 3120 p->abPK = 0; 3121 p->zTab = 0; 3122 }else{ 3123 p->abPK = (u8*)&p->apValue[p->nCol*2]; 3124 p->zTab = p->abPK ? (char*)&p->abPK[p->nCol] : 0; 3125 } 3126 return (p->rc = rc); 3127 } 3128 3129 /* 3130 ** Advance the changeset iterator to the next change. The differences between 3131 ** this function and sessionChangesetNext() are that 3132 ** 3133 ** * If pbEmpty is not NULL and the change is a no-op UPDATE (an UPDATE 3134 ** that modifies no columns), this function sets (*pbEmpty) to 1. 3135 ** 3136 ** * If the iterator is configured to skip no-op UPDATEs, 3137 ** sessionChangesetNext() does that. This function does not. 3138 */ 3139 static int sessionChangesetNextOne( 3140 sqlite3_changeset_iter *p, /* Changeset iterator */ 3141 u8 **paRec, /* If non-NULL, store record pointer here */ 3142 int *pnRec, /* If non-NULL, store size of record here */ 3143 int *pbNew, /* If non-NULL, true if new table */ 3144 int *pbEmpty 3145 ){ 3146 int i; 3147 u8 op; 3148 3149 assert( (paRec==0 && pnRec==0) || (paRec && pnRec) ); 3150 assert( pbEmpty==0 || *pbEmpty==0 ); 3151 3152 /* If the iterator is in the error-state, return immediately. */ 3153 if( p->rc!=SQLITE_OK ) return p->rc; 3154 3155 /* Free the current contents of p->apValue[], if any. */ 3156 if( p->apValue ){ 3157 for(i=0; i<p->nCol*2; i++){ 3158 sqlite3ValueFree(p->apValue[i]); 3159 } 3160 memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2); 3161 } 3162 3163 /* Make sure the buffer contains at least 10 bytes of input data, or all 3164 ** remaining data if there are less than 10 bytes available. This is 3165 ** sufficient either for the 'T' or 'P' byte and the varint that follows 3166 ** it, or for the two single byte values otherwise. */ 3167 p->rc = sessionInputBuffer(&p->in, 2); 3168 if( p->rc!=SQLITE_OK ) return p->rc; 3169 3170 /* If the iterator is already at the end of the changeset, return DONE. */ 3171 if( p->in.iNext>=p->in.nData ){ 3172 return SQLITE_DONE; 3173 } 3174 3175 sessionDiscardData(&p->in); 3176 p->in.iCurrent = p->in.iNext; 3177 3178 op = p->in.aData[p->in.iNext++]; 3179 while( op=='T' || op=='P' ){ 3180 if( pbNew ) *pbNew = 1; 3181 p->bPatchset = (op=='P'); 3182 if( sessionChangesetReadTblhdr(p) ) return p->rc; 3183 if( (p->rc = sessionInputBuffer(&p->in, 2)) ) return p->rc; 3184 p->in.iCurrent = p->in.iNext; 3185 if( p->in.iNext>=p->in.nData ) return SQLITE_DONE; 3186 op = p->in.aData[p->in.iNext++]; 3187 } 3188 3189 if( p->zTab==0 || (p->bPatchset && p->bInvert) ){ 3190 /* The first record in the changeset is not a table header. Must be a 3191 ** corrupt changeset. */ 3192 assert( p->in.iNext==1 || p->zTab ); 3193 return (p->rc = SQLITE_CORRUPT_BKPT); 3194 } 3195 3196 p->op = op; 3197 p->bIndirect = p->in.aData[p->in.iNext++]; 3198 if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){ 3199 return (p->rc = SQLITE_CORRUPT_BKPT); 3200 } 3201 3202 if( paRec ){ 3203 int nVal; /* Number of values to buffer */ 3204 if( p->bPatchset==0 && op==SQLITE_UPDATE ){ 3205 nVal = p->nCol * 2; 3206 }else if( p->bPatchset && op==SQLITE_DELETE ){ 3207 nVal = 0; 3208 for(i=0; i<p->nCol; i++) if( p->abPK[i] ) nVal++; 3209 }else{ 3210 nVal = p->nCol; 3211 } 3212 p->rc = sessionChangesetBufferRecord(&p->in, nVal, pnRec); 3213 if( p->rc!=SQLITE_OK ) return p->rc; 3214 *paRec = &p->in.aData[p->in.iNext]; 3215 p->in.iNext += *pnRec; 3216 }else{ 3217 sqlite3_value **apOld = (p->bInvert ? &p->apValue[p->nCol] : p->apValue); 3218 sqlite3_value **apNew = (p->bInvert ? p->apValue : &p->apValue[p->nCol]); 3219 3220 /* If this is an UPDATE or DELETE, read the old.* record. */ 3221 if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){ 3222 u8 *abPK = p->bPatchset ? p->abPK : 0; 3223 p->rc = sessionReadRecord(&p->in, p->nCol, abPK, apOld, 0); 3224 if( p->rc!=SQLITE_OK ) return p->rc; 3225 } 3226 3227 /* If this is an INSERT or UPDATE, read the new.* record. */ 3228 if( p->op!=SQLITE_DELETE ){ 3229 p->rc = sessionReadRecord(&p->in, p->nCol, 0, apNew, pbEmpty); 3230 if( p->rc!=SQLITE_OK ) return p->rc; 3231 } 3232 3233 if( (p->bPatchset || p->bInvert) && p->op==SQLITE_UPDATE ){ 3234 /* If this is an UPDATE that is part of a patchset, then all PK and 3235 ** modified fields are present in the new.* record. The old.* record 3236 ** is currently completely empty. This block shifts the PK fields from 3237 ** new.* to old.*, to accommodate the code that reads these arrays. */ 3238 for(i=0; i<p->nCol; i++){ 3239 assert( p->bPatchset==0 || p->apValue[i]==0 ); 3240 if( p->abPK[i] ){ 3241 assert( p->apValue[i]==0 ); 3242 p->apValue[i] = p->apValue[i+p->nCol]; 3243 if( p->apValue[i]==0 ) return (p->rc = SQLITE_CORRUPT_BKPT); 3244 p->apValue[i+p->nCol] = 0; 3245 } 3246 } 3247 }else if( p->bInvert ){ 3248 if( p->op==SQLITE_INSERT ) p->op = SQLITE_DELETE; 3249 else if( p->op==SQLITE_DELETE ) p->op = SQLITE_INSERT; 3250 } 3251 } 3252 3253 return SQLITE_ROW; 3254 } 3255 3256 /* 3257 ** Advance the changeset iterator to the next change. 3258 ** 3259 ** If both paRec and pnRec are NULL, then this function works like the public 3260 ** API sqlite3changeset_next(). If SQLITE_ROW is returned, then the 3261 ** sqlite3changeset_new() and old() APIs may be used to query for values. 3262 ** 3263 ** Otherwise, if paRec and pnRec are not NULL, then a pointer to the change 3264 ** record is written to *paRec before returning and the number of bytes in 3265 ** the record to *pnRec. 3266 ** 3267 ** Either way, this function returns SQLITE_ROW if the iterator is 3268 ** successfully advanced to the next change in the changeset, an SQLite 3269 ** error code if an error occurs, or SQLITE_DONE if there are no further 3270 ** changes in the changeset. 3271 */ 3272 static int sessionChangesetNext( 3273 sqlite3_changeset_iter *p, /* Changeset iterator */ 3274 u8 **paRec, /* If non-NULL, store record pointer here */ 3275 int *pnRec, /* If non-NULL, store size of record here */ 3276 int *pbNew /* If non-NULL, true if new table */ 3277 ){ 3278 int bEmpty; 3279 int rc; 3280 do { 3281 bEmpty = 0; 3282 rc = sessionChangesetNextOne(p, paRec, pnRec, pbNew, &bEmpty); 3283 }while( rc==SQLITE_ROW && p->bSkipEmpty && bEmpty); 3284 return rc; 3285 } 3286 3287 /* 3288 ** Advance an iterator created by sqlite3changeset_start() to the next 3289 ** change in the changeset. This function may return SQLITE_ROW, SQLITE_DONE 3290 ** or SQLITE_CORRUPT. 3291 ** 3292 ** This function may not be called on iterators passed to a conflict handler 3293 ** callback by changeset_apply(). 3294 */ 3295 int sqlite3changeset_next(sqlite3_changeset_iter *p){ 3296 return sessionChangesetNext(p, 0, 0, 0); 3297 } 3298 3299 /* 3300 ** The following function extracts information on the current change 3301 ** from a changeset iterator. It may only be called after changeset_next() 3302 ** has returned SQLITE_ROW. 3303 */ 3304 int sqlite3changeset_op( 3305 sqlite3_changeset_iter *pIter, /* Iterator handle */ 3306 const char **pzTab, /* OUT: Pointer to table name */ 3307 int *pnCol, /* OUT: Number of columns in table */ 3308 int *pOp, /* OUT: SQLITE_INSERT, DELETE or UPDATE */ 3309 int *pbIndirect /* OUT: True if change is indirect */ 3310 ){ 3311 *pOp = pIter->op; 3312 *pnCol = pIter->nCol; 3313 *pzTab = pIter->zTab; 3314 if( pbIndirect ) *pbIndirect = pIter->bIndirect; 3315 return SQLITE_OK; 3316 } 3317 3318 /* 3319 ** Return information regarding the PRIMARY KEY and number of columns in 3320 ** the database table affected by the change that pIter currently points 3321 ** to. This function may only be called after changeset_next() returns 3322 ** SQLITE_ROW. 3323 */ 3324 int sqlite3changeset_pk( 3325 sqlite3_changeset_iter *pIter, /* Iterator object */ 3326 unsigned char **pabPK, /* OUT: Array of boolean - true for PK cols */ 3327 int *pnCol /* OUT: Number of entries in output array */ 3328 ){ 3329 *pabPK = pIter->abPK; 3330 if( pnCol ) *pnCol = pIter->nCol; 3331 return SQLITE_OK; 3332 } 3333 3334 /* 3335 ** This function may only be called while the iterator is pointing to an 3336 ** SQLITE_UPDATE or SQLITE_DELETE change (see sqlite3changeset_op()). 3337 ** Otherwise, SQLITE_MISUSE is returned. 3338 ** 3339 ** It sets *ppValue to point to an sqlite3_value structure containing the 3340 ** iVal'th value in the old.* record. Or, if that particular value is not 3341 ** included in the record (because the change is an UPDATE and the field 3342 ** was not modified and is not a PK column), set *ppValue to NULL. 3343 ** 3344 ** If value iVal is out-of-range, SQLITE_RANGE is returned and *ppValue is 3345 ** not modified. Otherwise, SQLITE_OK. 3346 */ 3347 int sqlite3changeset_old( 3348 sqlite3_changeset_iter *pIter, /* Changeset iterator */ 3349 int iVal, /* Index of old.* value to retrieve */ 3350 sqlite3_value **ppValue /* OUT: Old value (or NULL pointer) */ 3351 ){ 3352 if( pIter->op!=SQLITE_UPDATE && pIter->op!=SQLITE_DELETE ){ 3353 return SQLITE_MISUSE; 3354 } 3355 if( iVal<0 || iVal>=pIter->nCol ){ 3356 return SQLITE_RANGE; 3357 } 3358 *ppValue = pIter->apValue[iVal]; 3359 return SQLITE_OK; 3360 } 3361 3362 /* 3363 ** This function may only be called while the iterator is pointing to an 3364 ** SQLITE_UPDATE or SQLITE_INSERT change (see sqlite3changeset_op()). 3365 ** Otherwise, SQLITE_MISUSE is returned. 3366 ** 3367 ** It sets *ppValue to point to an sqlite3_value structure containing the 3368 ** iVal'th value in the new.* record. Or, if that particular value is not 3369 ** included in the record (because the change is an UPDATE and the field 3370 ** was not modified), set *ppValue to NULL. 3371 ** 3372 ** If value iVal is out-of-range, SQLITE_RANGE is returned and *ppValue is 3373 ** not modified. Otherwise, SQLITE_OK. 3374 */ 3375 int sqlite3changeset_new( 3376 sqlite3_changeset_iter *pIter, /* Changeset iterator */ 3377 int iVal, /* Index of new.* value to retrieve */ 3378 sqlite3_value **ppValue /* OUT: New value (or NULL pointer) */ 3379 ){ 3380 if( pIter->op!=SQLITE_UPDATE && pIter->op!=SQLITE_INSERT ){ 3381 return SQLITE_MISUSE; 3382 } 3383 if( iVal<0 || iVal>=pIter->nCol ){ 3384 return SQLITE_RANGE; 3385 } 3386 *ppValue = pIter->apValue[pIter->nCol+iVal]; 3387 return SQLITE_OK; 3388 } 3389 3390 /* 3391 ** The following two macros are used internally. They are similar to the 3392 ** sqlite3changeset_new() and sqlite3changeset_old() functions, except that 3393 ** they omit all error checking and return a pointer to the requested value. 3394 */ 3395 #define sessionChangesetNew(pIter, iVal) (pIter)->apValue[(pIter)->nCol+(iVal)] 3396 #define sessionChangesetOld(pIter, iVal) (pIter)->apValue[(iVal)] 3397 3398 /* 3399 ** This function may only be called with a changeset iterator that has been 3400 ** passed to an SQLITE_CHANGESET_DATA or SQLITE_CHANGESET_CONFLICT 3401 ** conflict-handler function. Otherwise, SQLITE_MISUSE is returned. 3402 ** 3403 ** If successful, *ppValue is set to point to an sqlite3_value structure 3404 ** containing the iVal'th value of the conflicting record. 3405 ** 3406 ** If value iVal is out-of-range or some other error occurs, an SQLite error 3407 ** code is returned. Otherwise, SQLITE_OK. 3408 */ 3409 int sqlite3changeset_conflict( 3410 sqlite3_changeset_iter *pIter, /* Changeset iterator */ 3411 int iVal, /* Index of conflict record value to fetch */ 3412 sqlite3_value **ppValue /* OUT: Value from conflicting row */ 3413 ){ 3414 if( !pIter->pConflict ){ 3415 return SQLITE_MISUSE; 3416 } 3417 if( iVal<0 || iVal>=pIter->nCol ){ 3418 return SQLITE_RANGE; 3419 } 3420 *ppValue = sqlite3_column_value(pIter->pConflict, iVal); 3421 return SQLITE_OK; 3422 } 3423 3424 /* 3425 ** This function may only be called with an iterator passed to an 3426 ** SQLITE_CHANGESET_FOREIGN_KEY conflict handler callback. In this case 3427 ** it sets the output variable to the total number of known foreign key 3428 ** violations in the destination database and returns SQLITE_OK. 3429 ** 3430 ** In all other cases this function returns SQLITE_MISUSE. 3431 */ 3432 int sqlite3changeset_fk_conflicts( 3433 sqlite3_changeset_iter *pIter, /* Changeset iterator */ 3434 int *pnOut /* OUT: Number of FK violations */ 3435 ){ 3436 if( pIter->pConflict || pIter->apValue ){ 3437 return SQLITE_MISUSE; 3438 } 3439 *pnOut = pIter->nCol; 3440 return SQLITE_OK; 3441 } 3442 3443 3444 /* 3445 ** Finalize an iterator allocated with sqlite3changeset_start(). 3446 ** 3447 ** This function may not be called on iterators passed to a conflict handler 3448 ** callback by changeset_apply(). 3449 */ 3450 int sqlite3changeset_finalize(sqlite3_changeset_iter *p){ 3451 int rc = SQLITE_OK; 3452 if( p ){ 3453 int i; /* Used to iterate through p->apValue[] */ 3454 rc = p->rc; 3455 if( p->apValue ){ 3456 for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]); 3457 } 3458 sqlite3_free(p->tblhdr.aBuf); 3459 sqlite3_free(p->in.buf.aBuf); 3460 sqlite3_free(p); 3461 } 3462 return rc; 3463 } 3464 3465 static int sessionChangesetInvert( 3466 SessionInput *pInput, /* Input changeset */ 3467 int (*xOutput)(void *pOut, const void *pData, int nData), 3468 void *pOut, 3469 int *pnInverted, /* OUT: Number of bytes in output changeset */ 3470 void **ppInverted /* OUT: Inverse of pChangeset */ 3471 ){ 3472 int rc = SQLITE_OK; /* Return value */ 3473 SessionBuffer sOut; /* Output buffer */ 3474 int nCol = 0; /* Number of cols in current table */ 3475 u8 *abPK = 0; /* PK array for current table */ 3476 sqlite3_value **apVal = 0; /* Space for values for UPDATE inversion */ 3477 SessionBuffer sPK = {0, 0, 0}; /* PK array for current table */ 3478 3479 /* Initialize the output buffer */ 3480 memset(&sOut, 0, sizeof(SessionBuffer)); 3481 3482 /* Zero the output variables in case an error occurs. */ 3483 if( ppInverted ){ 3484 *ppInverted = 0; 3485 *pnInverted = 0; 3486 } 3487 3488 while( 1 ){ 3489 u8 eType; 3490 3491 /* Test for EOF. */ 3492 if( (rc = sessionInputBuffer(pInput, 2)) ) goto finished_invert; 3493 if( pInput->iNext>=pInput->nData ) break; 3494 eType = pInput->aData[pInput->iNext]; 3495 3496 switch( eType ){ 3497 case 'T': { 3498 /* A 'table' record consists of: 3499 ** 3500 ** * A constant 'T' character, 3501 ** * Number of columns in said table (a varint), 3502 ** * An array of nCol bytes (sPK), 3503 ** * A nul-terminated table name. 3504 */ 3505 int nByte; 3506 int nVar; 3507 pInput->iNext++; 3508 if( (rc = sessionChangesetBufferTblhdr(pInput, &nByte)) ){ 3509 goto finished_invert; 3510 } 3511 nVar = sessionVarintGet(&pInput->aData[pInput->iNext], &nCol); 3512 sPK.nBuf = 0; 3513 sessionAppendBlob(&sPK, &pInput->aData[pInput->iNext+nVar], nCol, &rc); 3514 sessionAppendByte(&sOut, eType, &rc); 3515 sessionAppendBlob(&sOut, &pInput->aData[pInput->iNext], nByte, &rc); 3516 if( rc ) goto finished_invert; 3517 3518 pInput->iNext += nByte; 3519 sqlite3_free(apVal); 3520 apVal = 0; 3521 abPK = sPK.aBuf; 3522 break; 3523 } 3524 3525 case SQLITE_INSERT: 3526 case SQLITE_DELETE: { 3527 int nByte; 3528 int bIndirect = pInput->aData[pInput->iNext+1]; 3529 int eType2 = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE); 3530 pInput->iNext += 2; 3531 assert( rc==SQLITE_OK ); 3532 rc = sessionChangesetBufferRecord(pInput, nCol, &nByte); 3533 sessionAppendByte(&sOut, eType2, &rc); 3534 sessionAppendByte(&sOut, bIndirect, &rc); 3535 sessionAppendBlob(&sOut, &pInput->aData[pInput->iNext], nByte, &rc); 3536 pInput->iNext += nByte; 3537 if( rc ) goto finished_invert; 3538 break; 3539 } 3540 3541 case SQLITE_UPDATE: { 3542 int iCol; 3543 3544 if( 0==apVal ){ 3545 apVal = (sqlite3_value **)sqlite3_malloc64(sizeof(apVal[0])*nCol*2); 3546 if( 0==apVal ){ 3547 rc = SQLITE_NOMEM; 3548 goto finished_invert; 3549 } 3550 memset(apVal, 0, sizeof(apVal[0])*nCol*2); 3551 } 3552 3553 /* Write the header for the new UPDATE change. Same as the original. */ 3554 sessionAppendByte(&sOut, eType, &rc); 3555 sessionAppendByte(&sOut, pInput->aData[pInput->iNext+1], &rc); 3556 3557 /* Read the old.* and new.* records for the update change. */ 3558 pInput->iNext += 2; 3559 rc = sessionReadRecord(pInput, nCol, 0, &apVal[0], 0); 3560 if( rc==SQLITE_OK ){ 3561 rc = sessionReadRecord(pInput, nCol, 0, &apVal[nCol], 0); 3562 } 3563 3564 /* Write the new old.* record. Consists of the PK columns from the 3565 ** original old.* record, and the other values from the original 3566 ** new.* record. */ 3567 for(iCol=0; iCol<nCol; iCol++){ 3568 sqlite3_value *pVal = apVal[iCol + (abPK[iCol] ? 0 : nCol)]; 3569 sessionAppendValue(&sOut, pVal, &rc); 3570 } 3571 3572 /* Write the new new.* record. Consists of a copy of all values 3573 ** from the original old.* record, except for the PK columns, which 3574 ** are set to "undefined". */ 3575 for(iCol=0; iCol<nCol; iCol++){ 3576 sqlite3_value *pVal = (abPK[iCol] ? 0 : apVal[iCol]); 3577 sessionAppendValue(&sOut, pVal, &rc); 3578 } 3579 3580 for(iCol=0; iCol<nCol*2; iCol++){ 3581 sqlite3ValueFree(apVal[iCol]); 3582 } 3583 memset(apVal, 0, sizeof(apVal[0])*nCol*2); 3584 if( rc!=SQLITE_OK ){ 3585 goto finished_invert; 3586 } 3587 3588 break; 3589 } 3590 3591 default: 3592 rc = SQLITE_CORRUPT_BKPT; 3593 goto finished_invert; 3594 } 3595 3596 assert( rc==SQLITE_OK ); 3597 if( xOutput && sOut.nBuf>=sessions_strm_chunk_size ){ 3598 rc = xOutput(pOut, sOut.aBuf, sOut.nBuf); 3599 sOut.nBuf = 0; 3600 if( rc!=SQLITE_OK ) goto finished_invert; 3601 } 3602 } 3603 3604 assert( rc==SQLITE_OK ); 3605 if( pnInverted ){ 3606 *pnInverted = sOut.nBuf; 3607 *ppInverted = sOut.aBuf; 3608 sOut.aBuf = 0; 3609 }else if( sOut.nBuf>0 ){ 3610 rc = xOutput(pOut, sOut.aBuf, sOut.nBuf); 3611 } 3612 3613 finished_invert: 3614 sqlite3_free(sOut.aBuf); 3615 sqlite3_free(apVal); 3616 sqlite3_free(sPK.aBuf); 3617 return rc; 3618 } 3619 3620 3621 /* 3622 ** Invert a changeset object. 3623 */ 3624 int sqlite3changeset_invert( 3625 int nChangeset, /* Number of bytes in input */ 3626 const void *pChangeset, /* Input changeset */ 3627 int *pnInverted, /* OUT: Number of bytes in output changeset */ 3628 void **ppInverted /* OUT: Inverse of pChangeset */ 3629 ){ 3630 SessionInput sInput; 3631 3632 /* Set up the input stream */ 3633 memset(&sInput, 0, sizeof(SessionInput)); 3634 sInput.nData = nChangeset; 3635 sInput.aData = (u8*)pChangeset; 3636 3637 return sessionChangesetInvert(&sInput, 0, 0, pnInverted, ppInverted); 3638 } 3639 3640 /* 3641 ** Streaming version of sqlite3changeset_invert(). 3642 */ 3643 int sqlite3changeset_invert_strm( 3644 int (*xInput)(void *pIn, void *pData, int *pnData), 3645 void *pIn, 3646 int (*xOutput)(void *pOut, const void *pData, int nData), 3647 void *pOut 3648 ){ 3649 SessionInput sInput; 3650 int rc; 3651 3652 /* Set up the input stream */ 3653 memset(&sInput, 0, sizeof(SessionInput)); 3654 sInput.xInput = xInput; 3655 sInput.pIn = pIn; 3656 3657 rc = sessionChangesetInvert(&sInput, xOutput, pOut, 0, 0); 3658 sqlite3_free(sInput.buf.aBuf); 3659 return rc; 3660 } 3661 3662 3663 typedef struct SessionUpdate SessionUpdate; 3664 struct SessionUpdate { 3665 sqlite3_stmt *pStmt; 3666 u32 *aMask; 3667 SessionUpdate *pNext; 3668 }; 3669 3670 typedef struct SessionApplyCtx SessionApplyCtx; 3671 struct SessionApplyCtx { 3672 sqlite3 *db; 3673 sqlite3_stmt *pDelete; /* DELETE statement */ 3674 sqlite3_stmt *pInsert; /* INSERT statement */ 3675 sqlite3_stmt *pSelect; /* SELECT statement */ 3676 int nCol; /* Size of azCol[] and abPK[] arrays */ 3677 const char **azCol; /* Array of column names */ 3678 u8 *abPK; /* Boolean array - true if column is in PK */ 3679 u32 *aUpdateMask; /* Used by sessionUpdateFind */ 3680 SessionUpdate *pUp; 3681 int bStat1; /* True if table is sqlite_stat1 */ 3682 int bDeferConstraints; /* True to defer constraints */ 3683 int bInvertConstraints; /* Invert when iterating constraints buffer */ 3684 SessionBuffer constraints; /* Deferred constraints are stored here */ 3685 SessionBuffer rebase; /* Rebase information (if any) here */ 3686 u8 bRebaseStarted; /* If table header is already in rebase */ 3687 u8 bRebase; /* True to collect rebase information */ 3688 }; 3689 3690 /* Number of prepared UPDATE statements to cache. */ 3691 #define SESSION_UPDATE_CACHE_SZ 12 3692 3693 /* 3694 ** Find a prepared UPDATE statement suitable for the UPDATE step currently 3695 ** being visited by the iterator. The UPDATE is of the form: 3696 ** 3697 ** UPDATE tbl SET col = ?, col2 = ? WHERE pk1 IS ? AND pk2 IS ? 3698 */ 3699 static int sessionUpdateFind( 3700 sqlite3_changeset_iter *pIter, 3701 SessionApplyCtx *p, 3702 int bPatchset, 3703 sqlite3_stmt **ppStmt 3704 ){ 3705 int rc = SQLITE_OK; 3706 SessionUpdate *pUp = 0; 3707 int nCol = pIter->nCol; 3708 int nU32 = (pIter->nCol+33)/32; 3709 int ii; 3710 3711 if( p->aUpdateMask==0 ){ 3712 p->aUpdateMask = sqlite3_malloc(nU32*sizeof(u32)); 3713 if( p->aUpdateMask==0 ){ 3714 rc = SQLITE_NOMEM; 3715 } 3716 } 3717 3718 if( rc==SQLITE_OK ){ 3719 memset(p->aUpdateMask, 0, nU32*sizeof(u32)); 3720 rc = SQLITE_CORRUPT; 3721 for(ii=0; ii<pIter->nCol; ii++){ 3722 if( sessionChangesetNew(pIter, ii) ){ 3723 p->aUpdateMask[ii/32] |= (1<<(ii%32)); 3724 rc = SQLITE_OK; 3725 } 3726 } 3727 } 3728 3729 if( rc==SQLITE_OK ){ 3730 if( bPatchset ) p->aUpdateMask[nCol/32] |= (1<<(nCol%32)); 3731 3732 if( p->pUp ){ 3733 int nUp = 0; 3734 SessionUpdate **pp = &p->pUp; 3735 while( 1 ){ 3736 nUp++; 3737 if( 0==memcmp(p->aUpdateMask, (*pp)->aMask, nU32*sizeof(u32)) ){ 3738 pUp = *pp; 3739 *pp = pUp->pNext; 3740 pUp->pNext = p->pUp; 3741 p->pUp = pUp; 3742 break; 3743 } 3744 3745 if( (*pp)->pNext ){ 3746 pp = &(*pp)->pNext; 3747 }else{ 3748 if( nUp>=SESSION_UPDATE_CACHE_SZ ){ 3749 sqlite3_finalize((*pp)->pStmt); 3750 sqlite3_free(*pp); 3751 *pp = 0; 3752 } 3753 break; 3754 } 3755 } 3756 } 3757 3758 if( pUp==0 ){ 3759 int nByte = sizeof(SessionUpdate) * nU32*sizeof(u32); 3760 int bStat1 = (sqlite3_stricmp(pIter->zTab, "sqlite_stat1")==0); 3761 pUp = (SessionUpdate*)sqlite3_malloc(nByte); 3762 if( pUp==0 ){ 3763 rc = SQLITE_NOMEM; 3764 }else{ 3765 const char *zSep = ""; 3766 SessionBuffer buf; 3767 3768 memset(&buf, 0, sizeof(buf)); 3769 pUp->aMask = (u32*)&pUp[1]; 3770 memcpy(pUp->aMask, p->aUpdateMask, nU32*sizeof(u32)); 3771 3772 sessionAppendStr(&buf, "UPDATE main.", &rc); 3773 sessionAppendIdent(&buf, pIter->zTab, &rc); 3774 sessionAppendStr(&buf, " SET ", &rc); 3775 3776 /* Create the assignments part of the UPDATE */ 3777 for(ii=0; ii<pIter->nCol; ii++){ 3778 if( p->abPK[ii]==0 && sessionChangesetNew(pIter, ii) ){ 3779 sessionAppendStr(&buf, zSep, &rc); 3780 sessionAppendIdent(&buf, p->azCol[ii], &rc); 3781 sessionAppendStr(&buf, " = ?", &rc); 3782 sessionAppendInteger(&buf, ii*2+1, &rc); 3783 zSep = ", "; 3784 } 3785 } 3786 3787 /* Create the WHERE clause part of the UPDATE */ 3788 zSep = ""; 3789 sessionAppendStr(&buf, " WHERE ", &rc); 3790 for(ii=0; ii<pIter->nCol; ii++){ 3791 if( p->abPK[ii] || (bPatchset==0 && sessionChangesetOld(pIter, ii)) ){ 3792 sessionAppendStr(&buf, zSep, &rc); 3793 if( bStat1 && ii==1 ){ 3794 assert( sqlite3_stricmp(p->azCol[ii], "idx")==0 ); 3795 sessionAppendStr(&buf, 3796 "idx IS CASE " 3797 "WHEN length(?4)=0 AND typeof(?4)='blob' THEN NULL " 3798 "ELSE ?4 END ", &rc 3799 ); 3800 }else{ 3801 sessionAppendIdent(&buf, p->azCol[ii], &rc); 3802 sessionAppendStr(&buf, " IS ?", &rc); 3803 sessionAppendInteger(&buf, ii*2+2, &rc); 3804 } 3805 zSep = " AND "; 3806 } 3807 } 3808 3809 if( rc==SQLITE_OK ){ 3810 char *zSql = (char*)buf.aBuf; 3811 rc = sqlite3_prepare_v2(p->db, zSql, buf.nBuf, &pUp->pStmt, 0); 3812 } 3813 3814 if( rc!=SQLITE_OK ){ 3815 sqlite3_free(pUp); 3816 pUp = 0; 3817 }else{ 3818 pUp->pNext = p->pUp; 3819 p->pUp = pUp; 3820 } 3821 sqlite3_free(buf.aBuf); 3822 } 3823 } 3824 } 3825 3826 assert( (rc==SQLITE_OK)==(pUp!=0) ); 3827 if( pUp ){ 3828 *ppStmt = pUp->pStmt; 3829 }else{ 3830 *ppStmt = 0; 3831 } 3832 return rc; 3833 } 3834 3835 /* 3836 ** Free all cached UPDATE statements. 3837 */ 3838 static void sessionUpdateFree(SessionApplyCtx *p){ 3839 SessionUpdate *pUp; 3840 SessionUpdate *pNext; 3841 for(pUp=p->pUp; pUp; pUp=pNext){ 3842 pNext = pUp->pNext; 3843 sqlite3_finalize(pUp->pStmt); 3844 sqlite3_free(pUp); 3845 } 3846 p->pUp = 0; 3847 sqlite3_free(p->aUpdateMask); 3848 p->aUpdateMask = 0; 3849 } 3850 3851 /* 3852 ** Formulate a statement to DELETE a row from database db. Assuming a table 3853 ** structure like this: 3854 ** 3855 ** CREATE TABLE x(a, b, c, d, PRIMARY KEY(a, c)); 3856 ** 3857 ** The DELETE statement looks like this: 3858 ** 3859 ** DELETE FROM x WHERE a = :1 AND c = :3 AND (:5 OR b IS :2 AND d IS :4) 3860 ** 3861 ** Variable :5 (nCol+1) is a boolean. It should be set to 0 if we require 3862 ** matching b and d values, or 1 otherwise. The second case comes up if the 3863 ** conflict handler is invoked with NOTFOUND and returns CHANGESET_REPLACE. 3864 ** 3865 ** If successful, SQLITE_OK is returned and SessionApplyCtx.pDelete is left 3866 ** pointing to the prepared version of the SQL statement. 3867 */ 3868 static int sessionDeleteRow( 3869 sqlite3 *db, /* Database handle */ 3870 const char *zTab, /* Table name */ 3871 SessionApplyCtx *p /* Session changeset-apply context */ 3872 ){ 3873 int i; 3874 const char *zSep = ""; 3875 int rc = SQLITE_OK; 3876 SessionBuffer buf = {0, 0, 0}; 3877 int nPk = 0; 3878 3879 sessionAppendStr(&buf, "DELETE FROM main.", &rc); 3880 sessionAppendIdent(&buf, zTab, &rc); 3881 sessionAppendStr(&buf, " WHERE ", &rc); 3882 3883 for(i=0; i<p->nCol; i++){ 3884 if( p->abPK[i] ){ 3885 nPk++; 3886 sessionAppendStr(&buf, zSep, &rc); 3887 sessionAppendIdent(&buf, p->azCol[i], &rc); 3888 sessionAppendStr(&buf, " = ?", &rc); 3889 sessionAppendInteger(&buf, i+1, &rc); 3890 zSep = " AND "; 3891 } 3892 } 3893 3894 if( nPk<p->nCol ){ 3895 sessionAppendStr(&buf, " AND (?", &rc); 3896 sessionAppendInteger(&buf, p->nCol+1, &rc); 3897 sessionAppendStr(&buf, " OR ", &rc); 3898 3899 zSep = ""; 3900 for(i=0; i<p->nCol; i++){ 3901 if( !p->abPK[i] ){ 3902 sessionAppendStr(&buf, zSep, &rc); 3903 sessionAppendIdent(&buf, p->azCol[i], &rc); 3904 sessionAppendStr(&buf, " IS ?", &rc); 3905 sessionAppendInteger(&buf, i+1, &rc); 3906 zSep = "AND "; 3907 } 3908 } 3909 sessionAppendStr(&buf, ")", &rc); 3910 } 3911 3912 if( rc==SQLITE_OK ){ 3913 rc = sqlite3_prepare_v2(db, (char *)buf.aBuf, buf.nBuf, &p->pDelete, 0); 3914 } 3915 sqlite3_free(buf.aBuf); 3916 3917 return rc; 3918 } 3919 3920 /* 3921 ** Formulate and prepare an SQL statement to query table zTab by primary 3922 ** key. Assuming the following table structure: 3923 ** 3924 ** CREATE TABLE x(a, b, c, d, PRIMARY KEY(a, c)); 3925 ** 3926 ** The SELECT statement looks like this: 3927 ** 3928 ** SELECT * FROM x WHERE a = ?1 AND c = ?3 3929 ** 3930 ** If successful, SQLITE_OK is returned and SessionApplyCtx.pSelect is left 3931 ** pointing to the prepared version of the SQL statement. 3932 */ 3933 static int sessionSelectRow( 3934 sqlite3 *db, /* Database handle */ 3935 const char *zTab, /* Table name */ 3936 SessionApplyCtx *p /* Session changeset-apply context */ 3937 ){ 3938 return sessionSelectStmt( 3939 db, "main", zTab, p->nCol, p->azCol, p->abPK, &p->pSelect); 3940 } 3941 3942 /* 3943 ** Formulate and prepare an INSERT statement to add a record to table zTab. 3944 ** For example: 3945 ** 3946 ** INSERT INTO main."zTab" VALUES(?1, ?2, ?3 ...); 3947 ** 3948 ** If successful, SQLITE_OK is returned and SessionApplyCtx.pInsert is left 3949 ** pointing to the prepared version of the SQL statement. 3950 */ 3951 static int sessionInsertRow( 3952 sqlite3 *db, /* Database handle */ 3953 const char *zTab, /* Table name */ 3954 SessionApplyCtx *p /* Session changeset-apply context */ 3955 ){ 3956 int rc = SQLITE_OK; 3957 int i; 3958 SessionBuffer buf = {0, 0, 0}; 3959 3960 sessionAppendStr(&buf, "INSERT INTO main.", &rc); 3961 sessionAppendIdent(&buf, zTab, &rc); 3962 sessionAppendStr(&buf, "(", &rc); 3963 for(i=0; i<p->nCol; i++){ 3964 if( i!=0 ) sessionAppendStr(&buf, ", ", &rc); 3965 sessionAppendIdent(&buf, p->azCol[i], &rc); 3966 } 3967 3968 sessionAppendStr(&buf, ") VALUES(?", &rc); 3969 for(i=1; i<p->nCol; i++){ 3970 sessionAppendStr(&buf, ", ?", &rc); 3971 } 3972 sessionAppendStr(&buf, ")", &rc); 3973 3974 if( rc==SQLITE_OK ){ 3975 rc = sqlite3_prepare_v2(db, (char *)buf.aBuf, buf.nBuf, &p->pInsert, 0); 3976 } 3977 sqlite3_free(buf.aBuf); 3978 return rc; 3979 } 3980 3981 static int sessionPrepare(sqlite3 *db, sqlite3_stmt **pp, const char *zSql){ 3982 return sqlite3_prepare_v2(db, zSql, -1, pp, 0); 3983 } 3984 3985 /* 3986 ** Prepare statements for applying changes to the sqlite_stat1 table. 3987 ** These are similar to those created by sessionSelectRow(), 3988 ** sessionInsertRow(), sessionUpdateRow() and sessionDeleteRow() for 3989 ** other tables. 3990 */ 3991 static int sessionStat1Sql(sqlite3 *db, SessionApplyCtx *p){ 3992 int rc = sessionSelectRow(db, "sqlite_stat1", p); 3993 if( rc==SQLITE_OK ){ 3994 rc = sessionPrepare(db, &p->pInsert, 3995 "INSERT INTO main.sqlite_stat1 VALUES(?1, " 3996 "CASE WHEN length(?2)=0 AND typeof(?2)='blob' THEN NULL ELSE ?2 END, " 3997 "?3)" 3998 ); 3999 } 4000 if( rc==SQLITE_OK ){ 4001 rc = sessionPrepare(db, &p->pDelete, 4002 "DELETE FROM main.sqlite_stat1 WHERE tbl=?1 AND idx IS " 4003 "CASE WHEN length(?2)=0 AND typeof(?2)='blob' THEN NULL ELSE ?2 END " 4004 "AND (?4 OR stat IS ?3)" 4005 ); 4006 } 4007 return rc; 4008 } 4009 4010 /* 4011 ** A wrapper around sqlite3_bind_value() that detects an extra problem. 4012 ** See comments in the body of this function for details. 4013 */ 4014 static int sessionBindValue( 4015 sqlite3_stmt *pStmt, /* Statement to bind value to */ 4016 int i, /* Parameter number to bind to */ 4017 sqlite3_value *pVal /* Value to bind */ 4018 ){ 4019 int eType = sqlite3_value_type(pVal); 4020 /* COVERAGE: The (pVal->z==0) branch is never true using current versions 4021 ** of SQLite. If a malloc fails in an sqlite3_value_xxx() function, either 4022 ** the (pVal->z) variable remains as it was or the type of the value is 4023 ** set to SQLITE_NULL. */ 4024 if( (eType==SQLITE_TEXT || eType==SQLITE_BLOB) && pVal->z==0 ){ 4025 /* This condition occurs when an earlier OOM in a call to 4026 ** sqlite3_value_text() or sqlite3_value_blob() (perhaps from within 4027 ** a conflict-handler) has zeroed the pVal->z pointer. Return NOMEM. */ 4028 return SQLITE_NOMEM; 4029 } 4030 return sqlite3_bind_value(pStmt, i, pVal); 4031 } 4032 4033 /* 4034 ** Iterator pIter must point to an SQLITE_INSERT entry. This function 4035 ** transfers new.* values from the current iterator entry to statement 4036 ** pStmt. The table being inserted into has nCol columns. 4037 ** 4038 ** New.* value $i from the iterator is bound to variable ($i+1) of 4039 ** statement pStmt. If parameter abPK is NULL, all values from 0 to (nCol-1) 4040 ** are transfered to the statement. Otherwise, if abPK is not NULL, it points 4041 ** to an array nCol elements in size. In this case only those values for 4042 ** which abPK[$i] is true are read from the iterator and bound to the 4043 ** statement. 4044 ** 4045 ** An SQLite error code is returned if an error occurs. Otherwise, SQLITE_OK. 4046 */ 4047 static int sessionBindRow( 4048 sqlite3_changeset_iter *pIter, /* Iterator to read values from */ 4049 int(*xValue)(sqlite3_changeset_iter *, int, sqlite3_value **), 4050 int nCol, /* Number of columns */ 4051 u8 *abPK, /* If not NULL, bind only if true */ 4052 sqlite3_stmt *pStmt /* Bind values to this statement */ 4053 ){ 4054 int i; 4055 int rc = SQLITE_OK; 4056 4057 /* Neither sqlite3changeset_old or sqlite3changeset_new can fail if the 4058 ** argument iterator points to a suitable entry. Make sure that xValue 4059 ** is one of these to guarantee that it is safe to ignore the return 4060 ** in the code below. */ 4061 assert( xValue==sqlite3changeset_old || xValue==sqlite3changeset_new ); 4062 4063 for(i=0; rc==SQLITE_OK && i<nCol; i++){ 4064 if( !abPK || abPK[i] ){ 4065 sqlite3_value *pVal; 4066 (void)xValue(pIter, i, &pVal); 4067 if( pVal==0 ){ 4068 /* The value in the changeset was "undefined". This indicates a 4069 ** corrupt changeset blob. */ 4070 rc = SQLITE_CORRUPT_BKPT; 4071 }else{ 4072 rc = sessionBindValue(pStmt, i+1, pVal); 4073 } 4074 } 4075 } 4076 return rc; 4077 } 4078 4079 /* 4080 ** SQL statement pSelect is as generated by the sessionSelectRow() function. 4081 ** This function binds the primary key values from the change that changeset 4082 ** iterator pIter points to to the SELECT and attempts to seek to the table 4083 ** entry. If a row is found, the SELECT statement left pointing at the row 4084 ** and SQLITE_ROW is returned. Otherwise, if no row is found and no error 4085 ** has occured, the statement is reset and SQLITE_OK is returned. If an 4086 ** error occurs, the statement is reset and an SQLite error code is returned. 4087 ** 4088 ** If this function returns SQLITE_ROW, the caller must eventually reset() 4089 ** statement pSelect. If any other value is returned, the statement does 4090 ** not require a reset(). 4091 ** 4092 ** If the iterator currently points to an INSERT record, bind values from the 4093 ** new.* record to the SELECT statement. Or, if it points to a DELETE or 4094 ** UPDATE, bind values from the old.* record. 4095 */ 4096 static int sessionSeekToRow( 4097 sqlite3 *db, /* Database handle */ 4098 sqlite3_changeset_iter *pIter, /* Changeset iterator */ 4099 u8 *abPK, /* Primary key flags array */ 4100 sqlite3_stmt *pSelect /* SELECT statement from sessionSelectRow() */ 4101 ){ 4102 int rc; /* Return code */ 4103 int nCol; /* Number of columns in table */ 4104 int op; /* Changset operation (SQLITE_UPDATE etc.) */ 4105 const char *zDummy; /* Unused */ 4106 4107 sqlite3changeset_op(pIter, &zDummy, &nCol, &op, 0); 4108 rc = sessionBindRow(pIter, 4109 op==SQLITE_INSERT ? sqlite3changeset_new : sqlite3changeset_old, 4110 nCol, abPK, pSelect 4111 ); 4112 4113 if( rc==SQLITE_OK ){ 4114 rc = sqlite3_step(pSelect); 4115 if( rc!=SQLITE_ROW ) rc = sqlite3_reset(pSelect); 4116 } 4117 4118 return rc; 4119 } 4120 4121 /* 4122 ** This function is called from within sqlite3changeset_apply_v2() when 4123 ** a conflict is encountered and resolved using conflict resolution 4124 ** mode eType (either SQLITE_CHANGESET_OMIT or SQLITE_CHANGESET_REPLACE).. 4125 ** It adds a conflict resolution record to the buffer in 4126 ** SessionApplyCtx.rebase, which will eventually be returned to the caller 4127 ** of apply_v2() as the "rebase" buffer. 4128 ** 4129 ** Return SQLITE_OK if successful, or an SQLite error code otherwise. 4130 */ 4131 static int sessionRebaseAdd( 4132 SessionApplyCtx *p, /* Apply context */ 4133 int eType, /* Conflict resolution (OMIT or REPLACE) */ 4134 sqlite3_changeset_iter *pIter /* Iterator pointing at current change */ 4135 ){ 4136 int rc = SQLITE_OK; 4137 if( p->bRebase ){ 4138 int i; 4139 int eOp = pIter->op; 4140 if( p->bRebaseStarted==0 ){ 4141 /* Append a table-header to the rebase buffer */ 4142 const char *zTab = pIter->zTab; 4143 sessionAppendByte(&p->rebase, 'T', &rc); 4144 sessionAppendVarint(&p->rebase, p->nCol, &rc); 4145 sessionAppendBlob(&p->rebase, p->abPK, p->nCol, &rc); 4146 sessionAppendBlob(&p->rebase, (u8*)zTab, (int)strlen(zTab)+1, &rc); 4147 p->bRebaseStarted = 1; 4148 } 4149 4150 assert( eType==SQLITE_CHANGESET_REPLACE||eType==SQLITE_CHANGESET_OMIT ); 4151 assert( eOp==SQLITE_DELETE || eOp==SQLITE_INSERT || eOp==SQLITE_UPDATE ); 4152 4153 sessionAppendByte(&p->rebase, 4154 (eOp==SQLITE_DELETE ? SQLITE_DELETE : SQLITE_INSERT), &rc 4155 ); 4156 sessionAppendByte(&p->rebase, (eType==SQLITE_CHANGESET_REPLACE), &rc); 4157 for(i=0; i<p->nCol; i++){ 4158 sqlite3_value *pVal = 0; 4159 if( eOp==SQLITE_DELETE || (eOp==SQLITE_UPDATE && p->abPK[i]) ){ 4160 sqlite3changeset_old(pIter, i, &pVal); 4161 }else{ 4162 sqlite3changeset_new(pIter, i, &pVal); 4163 } 4164 sessionAppendValue(&p->rebase, pVal, &rc); 4165 } 4166 } 4167 return rc; 4168 } 4169 4170 /* 4171 ** Invoke the conflict handler for the change that the changeset iterator 4172 ** currently points to. 4173 ** 4174 ** Argument eType must be either CHANGESET_DATA or CHANGESET_CONFLICT. 4175 ** If argument pbReplace is NULL, then the type of conflict handler invoked 4176 ** depends solely on eType, as follows: 4177 ** 4178 ** eType value Value passed to xConflict 4179 ** ------------------------------------------------- 4180 ** CHANGESET_DATA CHANGESET_NOTFOUND 4181 ** CHANGESET_CONFLICT CHANGESET_CONSTRAINT 4182 ** 4183 ** Or, if pbReplace is not NULL, then an attempt is made to find an existing 4184 ** record with the same primary key as the record about to be deleted, updated 4185 ** or inserted. If such a record can be found, it is available to the conflict 4186 ** handler as the "conflicting" record. In this case the type of conflict 4187 ** handler invoked is as follows: 4188 ** 4189 ** eType value PK Record found? Value passed to xConflict 4190 ** ---------------------------------------------------------------- 4191 ** CHANGESET_DATA Yes CHANGESET_DATA 4192 ** CHANGESET_DATA No CHANGESET_NOTFOUND 4193 ** CHANGESET_CONFLICT Yes CHANGESET_CONFLICT 4194 ** CHANGESET_CONFLICT No CHANGESET_CONSTRAINT 4195 ** 4196 ** If pbReplace is not NULL, and a record with a matching PK is found, and 4197 ** the conflict handler function returns SQLITE_CHANGESET_REPLACE, *pbReplace 4198 ** is set to non-zero before returning SQLITE_OK. 4199 ** 4200 ** If the conflict handler returns SQLITE_CHANGESET_ABORT, SQLITE_ABORT is 4201 ** returned. Or, if the conflict handler returns an invalid value, 4202 ** SQLITE_MISUSE. If the conflict handler returns SQLITE_CHANGESET_OMIT, 4203 ** this function returns SQLITE_OK. 4204 */ 4205 static int sessionConflictHandler( 4206 int eType, /* Either CHANGESET_DATA or CONFLICT */ 4207 SessionApplyCtx *p, /* changeset_apply() context */ 4208 sqlite3_changeset_iter *pIter, /* Changeset iterator */ 4209 int(*xConflict)(void *, int, sqlite3_changeset_iter*), 4210 void *pCtx, /* First argument for conflict handler */ 4211 int *pbReplace /* OUT: Set to true if PK row is found */ 4212 ){ 4213 int res = 0; /* Value returned by conflict handler */ 4214 int rc; 4215 int nCol; 4216 int op; 4217 const char *zDummy; 4218 4219 sqlite3changeset_op(pIter, &zDummy, &nCol, &op, 0); 4220 4221 assert( eType==SQLITE_CHANGESET_CONFLICT || eType==SQLITE_CHANGESET_DATA ); 4222 assert( SQLITE_CHANGESET_CONFLICT+1==SQLITE_CHANGESET_CONSTRAINT ); 4223 assert( SQLITE_CHANGESET_DATA+1==SQLITE_CHANGESET_NOTFOUND ); 4224 4225 /* Bind the new.* PRIMARY KEY values to the SELECT statement. */ 4226 if( pbReplace ){ 4227 rc = sessionSeekToRow(p->db, pIter, p->abPK, p->pSelect); 4228 }else{ 4229 rc = SQLITE_OK; 4230 } 4231 4232 if( rc==SQLITE_ROW ){ 4233 /* There exists another row with the new.* primary key. */ 4234 pIter->pConflict = p->pSelect; 4235 res = xConflict(pCtx, eType, pIter); 4236 pIter->pConflict = 0; 4237 rc = sqlite3_reset(p->pSelect); 4238 }else if( rc==SQLITE_OK ){ 4239 if( p->bDeferConstraints && eType==SQLITE_CHANGESET_CONFLICT ){ 4240 /* Instead of invoking the conflict handler, append the change blob 4241 ** to the SessionApplyCtx.constraints buffer. */ 4242 u8 *aBlob = &pIter->in.aData[pIter->in.iCurrent]; 4243 int nBlob = pIter->in.iNext - pIter->in.iCurrent; 4244 sessionAppendBlob(&p->constraints, aBlob, nBlob, &rc); 4245 return SQLITE_OK; 4246 }else{ 4247 /* No other row with the new.* primary key. */ 4248 res = xConflict(pCtx, eType+1, pIter); 4249 if( res==SQLITE_CHANGESET_REPLACE ) rc = SQLITE_MISUSE; 4250 } 4251 } 4252 4253 if( rc==SQLITE_OK ){ 4254 switch( res ){ 4255 case SQLITE_CHANGESET_REPLACE: 4256 assert( pbReplace ); 4257 *pbReplace = 1; 4258 break; 4259 4260 case SQLITE_CHANGESET_OMIT: 4261 break; 4262 4263 case SQLITE_CHANGESET_ABORT: 4264 rc = SQLITE_ABORT; 4265 break; 4266 4267 default: 4268 rc = SQLITE_MISUSE; 4269 break; 4270 } 4271 if( rc==SQLITE_OK ){ 4272 rc = sessionRebaseAdd(p, res, pIter); 4273 } 4274 } 4275 4276 return rc; 4277 } 4278 4279 /* 4280 ** Attempt to apply the change that the iterator passed as the first argument 4281 ** currently points to to the database. If a conflict is encountered, invoke 4282 ** the conflict handler callback. 4283 ** 4284 ** If argument pbRetry is NULL, then ignore any CHANGESET_DATA conflict. If 4285 ** one is encountered, update or delete the row with the matching primary key 4286 ** instead. Or, if pbRetry is not NULL and a CHANGESET_DATA conflict occurs, 4287 ** invoke the conflict handler. If it returns CHANGESET_REPLACE, set *pbRetry 4288 ** to true before returning. In this case the caller will invoke this function 4289 ** again, this time with pbRetry set to NULL. 4290 ** 4291 ** If argument pbReplace is NULL and a CHANGESET_CONFLICT conflict is 4292 ** encountered invoke the conflict handler with CHANGESET_CONSTRAINT instead. 4293 ** Or, if pbReplace is not NULL, invoke it with CHANGESET_CONFLICT. If such 4294 ** an invocation returns SQLITE_CHANGESET_REPLACE, set *pbReplace to true 4295 ** before retrying. In this case the caller attempts to remove the conflicting 4296 ** row before invoking this function again, this time with pbReplace set 4297 ** to NULL. 4298 ** 4299 ** If any conflict handler returns SQLITE_CHANGESET_ABORT, this function 4300 ** returns SQLITE_ABORT. Otherwise, if no error occurs, SQLITE_OK is 4301 ** returned. 4302 */ 4303 static int sessionApplyOneOp( 4304 sqlite3_changeset_iter *pIter, /* Changeset iterator */ 4305 SessionApplyCtx *p, /* changeset_apply() context */ 4306 int(*xConflict)(void *, int, sqlite3_changeset_iter *), 4307 void *pCtx, /* First argument for the conflict handler */ 4308 int *pbReplace, /* OUT: True to remove PK row and retry */ 4309 int *pbRetry /* OUT: True to retry. */ 4310 ){ 4311 const char *zDummy; 4312 int op; 4313 int nCol; 4314 int rc = SQLITE_OK; 4315 4316 assert( p->pDelete && p->pInsert && p->pSelect ); 4317 assert( p->azCol && p->abPK ); 4318 assert( !pbReplace || *pbReplace==0 ); 4319 4320 sqlite3changeset_op(pIter, &zDummy, &nCol, &op, 0); 4321 4322 if( op==SQLITE_DELETE ){ 4323 4324 /* Bind values to the DELETE statement. If conflict handling is required, 4325 ** bind values for all columns and set bound variable (nCol+1) to true. 4326 ** Or, if conflict handling is not required, bind just the PK column 4327 ** values and, if it exists, set (nCol+1) to false. Conflict handling 4328 ** is not required if: 4329 ** 4330 ** * this is a patchset, or 4331 ** * (pbRetry==0), or 4332 ** * all columns of the table are PK columns (in this case there is 4333 ** no (nCol+1) variable to bind to). 4334 */ 4335 u8 *abPK = (pIter->bPatchset ? p->abPK : 0); 4336 rc = sessionBindRow(pIter, sqlite3changeset_old, nCol, abPK, p->pDelete); 4337 if( rc==SQLITE_OK && sqlite3_bind_parameter_count(p->pDelete)>nCol ){ 4338 rc = sqlite3_bind_int(p->pDelete, nCol+1, (pbRetry==0 || abPK)); 4339 } 4340 if( rc!=SQLITE_OK ) return rc; 4341 4342 sqlite3_step(p->pDelete); 4343 rc = sqlite3_reset(p->pDelete); 4344 if( rc==SQLITE_OK && sqlite3_changes(p->db)==0 ){ 4345 rc = sessionConflictHandler( 4346 SQLITE_CHANGESET_DATA, p, pIter, xConflict, pCtx, pbRetry 4347 ); 4348 }else if( (rc&0xff)==SQLITE_CONSTRAINT ){ 4349 rc = sessionConflictHandler( 4350 SQLITE_CHANGESET_CONFLICT, p, pIter, xConflict, pCtx, 0 4351 ); 4352 } 4353 4354 }else if( op==SQLITE_UPDATE ){ 4355 int i; 4356 sqlite3_stmt *pUp = 0; 4357 int bPatchset = (pbRetry==0 || pIter->bPatchset); 4358 4359 rc = sessionUpdateFind(pIter, p, bPatchset, &pUp); 4360 4361 /* Bind values to the UPDATE statement. */ 4362 for(i=0; rc==SQLITE_OK && i<nCol; i++){ 4363 sqlite3_value *pOld = sessionChangesetOld(pIter, i); 4364 sqlite3_value *pNew = sessionChangesetNew(pIter, i); 4365 if( p->abPK[i] || (bPatchset==0 && pOld) ){ 4366 rc = sessionBindValue(pUp, i*2+2, pOld); 4367 } 4368 if( rc==SQLITE_OK && pNew ){ 4369 rc = sessionBindValue(pUp, i*2+1, pNew); 4370 } 4371 } 4372 if( rc!=SQLITE_OK ) return rc; 4373 4374 /* Attempt the UPDATE. In the case of a NOTFOUND or DATA conflict, 4375 ** the result will be SQLITE_OK with 0 rows modified. */ 4376 sqlite3_step(pUp); 4377 rc = sqlite3_reset(pUp); 4378 4379 if( rc==SQLITE_OK && sqlite3_changes(p->db)==0 ){ 4380 /* A NOTFOUND or DATA error. Search the table to see if it contains 4381 ** a row with a matching primary key. If so, this is a DATA conflict. 4382 ** Otherwise, if there is no primary key match, it is a NOTFOUND. */ 4383 4384 rc = sessionConflictHandler( 4385 SQLITE_CHANGESET_DATA, p, pIter, xConflict, pCtx, pbRetry 4386 ); 4387 4388 }else if( (rc&0xff)==SQLITE_CONSTRAINT ){ 4389 /* This is always a CONSTRAINT conflict. */ 4390 rc = sessionConflictHandler( 4391 SQLITE_CHANGESET_CONFLICT, p, pIter, xConflict, pCtx, 0 4392 ); 4393 } 4394 4395 }else{ 4396 assert( op==SQLITE_INSERT ); 4397 if( p->bStat1 ){ 4398 /* Check if there is a conflicting row. For sqlite_stat1, this needs 4399 ** to be done using a SELECT, as there is no PRIMARY KEY in the 4400 ** database schema to throw an exception if a duplicate is inserted. */ 4401 rc = sessionSeekToRow(p->db, pIter, p->abPK, p->pSelect); 4402 if( rc==SQLITE_ROW ){ 4403 rc = SQLITE_CONSTRAINT; 4404 sqlite3_reset(p->pSelect); 4405 } 4406 } 4407 4408 if( rc==SQLITE_OK ){ 4409 rc = sessionBindRow(pIter, sqlite3changeset_new, nCol, 0, p->pInsert); 4410 if( rc!=SQLITE_OK ) return rc; 4411 4412 sqlite3_step(p->pInsert); 4413 rc = sqlite3_reset(p->pInsert); 4414 } 4415 4416 if( (rc&0xff)==SQLITE_CONSTRAINT ){ 4417 rc = sessionConflictHandler( 4418 SQLITE_CHANGESET_CONFLICT, p, pIter, xConflict, pCtx, pbReplace 4419 ); 4420 } 4421 } 4422 4423 return rc; 4424 } 4425 4426 /* 4427 ** Attempt to apply the change that the iterator passed as the first argument 4428 ** currently points to to the database. If a conflict is encountered, invoke 4429 ** the conflict handler callback. 4430 ** 4431 ** The difference between this function and sessionApplyOne() is that this 4432 ** function handles the case where the conflict-handler is invoked and 4433 ** returns SQLITE_CHANGESET_REPLACE - indicating that the change should be 4434 ** retried in some manner. 4435 */ 4436 static int sessionApplyOneWithRetry( 4437 sqlite3 *db, /* Apply change to "main" db of this handle */ 4438 sqlite3_changeset_iter *pIter, /* Changeset iterator to read change from */ 4439 SessionApplyCtx *pApply, /* Apply context */ 4440 int(*xConflict)(void*, int, sqlite3_changeset_iter*), 4441 void *pCtx /* First argument passed to xConflict */ 4442 ){ 4443 int bReplace = 0; 4444 int bRetry = 0; 4445 int rc; 4446 4447 rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, &bReplace, &bRetry); 4448 if( rc==SQLITE_OK ){ 4449 /* If the bRetry flag is set, the change has not been applied due to an 4450 ** SQLITE_CHANGESET_DATA problem (i.e. this is an UPDATE or DELETE and 4451 ** a row with the correct PK is present in the db, but one or more other 4452 ** fields do not contain the expected values) and the conflict handler 4453 ** returned SQLITE_CHANGESET_REPLACE. In this case retry the operation, 4454 ** but pass NULL as the final argument so that sessionApplyOneOp() ignores 4455 ** the SQLITE_CHANGESET_DATA problem. */ 4456 if( bRetry ){ 4457 assert( pIter->op==SQLITE_UPDATE || pIter->op==SQLITE_DELETE ); 4458 rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, 0, 0); 4459 } 4460 4461 /* If the bReplace flag is set, the change is an INSERT that has not 4462 ** been performed because the database already contains a row with the 4463 ** specified primary key and the conflict handler returned 4464 ** SQLITE_CHANGESET_REPLACE. In this case remove the conflicting row 4465 ** before reattempting the INSERT. */ 4466 else if( bReplace ){ 4467 assert( pIter->op==SQLITE_INSERT ); 4468 rc = sqlite3_exec(db, "SAVEPOINT replace_op", 0, 0, 0); 4469 if( rc==SQLITE_OK ){ 4470 rc = sessionBindRow(pIter, 4471 sqlite3changeset_new, pApply->nCol, pApply->abPK, pApply->pDelete); 4472 sqlite3_bind_int(pApply->pDelete, pApply->nCol+1, 1); 4473 } 4474 if( rc==SQLITE_OK ){ 4475 sqlite3_step(pApply->pDelete); 4476 rc = sqlite3_reset(pApply->pDelete); 4477 } 4478 if( rc==SQLITE_OK ){ 4479 rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, 0, 0); 4480 } 4481 if( rc==SQLITE_OK ){ 4482 rc = sqlite3_exec(db, "RELEASE replace_op", 0, 0, 0); 4483 } 4484 } 4485 } 4486 4487 return rc; 4488 } 4489 4490 /* 4491 ** Retry the changes accumulated in the pApply->constraints buffer. 4492 */ 4493 static int sessionRetryConstraints( 4494 sqlite3 *db, 4495 int bPatchset, 4496 const char *zTab, 4497 SessionApplyCtx *pApply, 4498 int(*xConflict)(void*, int, sqlite3_changeset_iter*), 4499 void *pCtx /* First argument passed to xConflict */ 4500 ){ 4501 int rc = SQLITE_OK; 4502 4503 while( pApply->constraints.nBuf ){ 4504 sqlite3_changeset_iter *pIter2 = 0; 4505 SessionBuffer cons = pApply->constraints; 4506 memset(&pApply->constraints, 0, sizeof(SessionBuffer)); 4507 4508 rc = sessionChangesetStart( 4509 &pIter2, 0, 0, cons.nBuf, cons.aBuf, pApply->bInvertConstraints, 1 4510 ); 4511 if( rc==SQLITE_OK ){ 4512 size_t nByte = 2*pApply->nCol*sizeof(sqlite3_value*); 4513 int rc2; 4514 pIter2->bPatchset = bPatchset; 4515 pIter2->zTab = (char*)zTab; 4516 pIter2->nCol = pApply->nCol; 4517 pIter2->abPK = pApply->abPK; 4518 sessionBufferGrow(&pIter2->tblhdr, nByte, &rc); 4519 pIter2->apValue = (sqlite3_value**)pIter2->tblhdr.aBuf; 4520 if( rc==SQLITE_OK ) memset(pIter2->apValue, 0, nByte); 4521 4522 while( rc==SQLITE_OK && SQLITE_ROW==sqlite3changeset_next(pIter2) ){ 4523 rc = sessionApplyOneWithRetry(db, pIter2, pApply, xConflict, pCtx); 4524 } 4525 4526 rc2 = sqlite3changeset_finalize(pIter2); 4527 if( rc==SQLITE_OK ) rc = rc2; 4528 } 4529 assert( pApply->bDeferConstraints || pApply->constraints.nBuf==0 ); 4530 4531 sqlite3_free(cons.aBuf); 4532 if( rc!=SQLITE_OK ) break; 4533 if( pApply->constraints.nBuf>=cons.nBuf ){ 4534 /* No progress was made on the last round. */ 4535 pApply->bDeferConstraints = 0; 4536 } 4537 } 4538 4539 return rc; 4540 } 4541 4542 /* 4543 ** Argument pIter is a changeset iterator that has been initialized, but 4544 ** not yet passed to sqlite3changeset_next(). This function applies the 4545 ** changeset to the main database attached to handle "db". The supplied 4546 ** conflict handler callback is invoked to resolve any conflicts encountered 4547 ** while applying the change. 4548 */ 4549 static int sessionChangesetApply( 4550 sqlite3 *db, /* Apply change to "main" db of this handle */ 4551 sqlite3_changeset_iter *pIter, /* Changeset to apply */ 4552 int(*xFilter)( 4553 void *pCtx, /* Copy of sixth arg to _apply() */ 4554 const char *zTab /* Table name */ 4555 ), 4556 int(*xConflict)( 4557 void *pCtx, /* Copy of fifth arg to _apply() */ 4558 int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */ 4559 sqlite3_changeset_iter *p /* Handle describing change and conflict */ 4560 ), 4561 void *pCtx, /* First argument passed to xConflict */ 4562 void **ppRebase, int *pnRebase, /* OUT: Rebase information */ 4563 int flags /* SESSION_APPLY_XXX flags */ 4564 ){ 4565 int schemaMismatch = 0; 4566 int rc = SQLITE_OK; /* Return code */ 4567 const char *zTab = 0; /* Name of current table */ 4568 int nTab = 0; /* Result of sqlite3Strlen30(zTab) */ 4569 SessionApplyCtx sApply; /* changeset_apply() context object */ 4570 int bPatchset; 4571 4572 assert( xConflict!=0 ); 4573 4574 pIter->in.bNoDiscard = 1; 4575 memset(&sApply, 0, sizeof(sApply)); 4576 sApply.bRebase = (ppRebase && pnRebase); 4577 sApply.bInvertConstraints = !!(flags & SQLITE_CHANGESETAPPLY_INVERT); 4578 sqlite3_mutex_enter(sqlite3_db_mutex(db)); 4579 if( (flags & SQLITE_CHANGESETAPPLY_NOSAVEPOINT)==0 ){ 4580 rc = sqlite3_exec(db, "SAVEPOINT changeset_apply", 0, 0, 0); 4581 } 4582 if( rc==SQLITE_OK ){ 4583 rc = sqlite3_exec(db, "PRAGMA defer_foreign_keys = 1", 0, 0, 0); 4584 } 4585 while( rc==SQLITE_OK && SQLITE_ROW==sqlite3changeset_next(pIter) ){ 4586 int nCol; 4587 int op; 4588 const char *zNew; 4589 4590 sqlite3changeset_op(pIter, &zNew, &nCol, &op, 0); 4591 4592 if( zTab==0 || sqlite3_strnicmp(zNew, zTab, nTab+1) ){ 4593 u8 *abPK; 4594 4595 rc = sessionRetryConstraints( 4596 db, pIter->bPatchset, zTab, &sApply, xConflict, pCtx 4597 ); 4598 if( rc!=SQLITE_OK ) break; 4599 4600 sessionUpdateFree(&sApply); 4601 sqlite3_free((char*)sApply.azCol); /* cast works around VC++ bug */ 4602 sqlite3_finalize(sApply.pDelete); 4603 sqlite3_finalize(sApply.pInsert); 4604 sqlite3_finalize(sApply.pSelect); 4605 sApply.db = db; 4606 sApply.pDelete = 0; 4607 sApply.pInsert = 0; 4608 sApply.pSelect = 0; 4609 sApply.nCol = 0; 4610 sApply.azCol = 0; 4611 sApply.abPK = 0; 4612 sApply.bStat1 = 0; 4613 sApply.bDeferConstraints = 1; 4614 sApply.bRebaseStarted = 0; 4615 memset(&sApply.constraints, 0, sizeof(SessionBuffer)); 4616 4617 /* If an xFilter() callback was specified, invoke it now. If the 4618 ** xFilter callback returns zero, skip this table. If it returns 4619 ** non-zero, proceed. */ 4620 schemaMismatch = (xFilter && (0==xFilter(pCtx, zNew))); 4621 if( schemaMismatch ){ 4622 zTab = sqlite3_mprintf("%s", zNew); 4623 if( zTab==0 ){ 4624 rc = SQLITE_NOMEM; 4625 break; 4626 } 4627 nTab = (int)strlen(zTab); 4628 sApply.azCol = (const char **)zTab; 4629 }else{ 4630 int nMinCol = 0; 4631 int i; 4632 4633 sqlite3changeset_pk(pIter, &abPK, 0); 4634 rc = sessionTableInfo(0, 4635 db, "main", zNew, &sApply.nCol, &zTab, &sApply.azCol, &sApply.abPK 4636 ); 4637 if( rc!=SQLITE_OK ) break; 4638 for(i=0; i<sApply.nCol; i++){ 4639 if( sApply.abPK[i] ) nMinCol = i+1; 4640 } 4641 4642 if( sApply.nCol==0 ){ 4643 schemaMismatch = 1; 4644 sqlite3_log(SQLITE_SCHEMA, 4645 "sqlite3changeset_apply(): no such table: %s", zTab 4646 ); 4647 } 4648 else if( sApply.nCol<nCol ){ 4649 schemaMismatch = 1; 4650 sqlite3_log(SQLITE_SCHEMA, 4651 "sqlite3changeset_apply(): table %s has %d columns, " 4652 "expected %d or more", 4653 zTab, sApply.nCol, nCol 4654 ); 4655 } 4656 else if( nCol<nMinCol || memcmp(sApply.abPK, abPK, nCol)!=0 ){ 4657 schemaMismatch = 1; 4658 sqlite3_log(SQLITE_SCHEMA, "sqlite3changeset_apply(): " 4659 "primary key mismatch for table %s", zTab 4660 ); 4661 } 4662 else{ 4663 sApply.nCol = nCol; 4664 if( 0==sqlite3_stricmp(zTab, "sqlite_stat1") ){ 4665 if( (rc = sessionStat1Sql(db, &sApply) ) ){ 4666 break; 4667 } 4668 sApply.bStat1 = 1; 4669 }else{ 4670 if( (rc = sessionSelectRow(db, zTab, &sApply)) 4671 || (rc = sessionDeleteRow(db, zTab, &sApply)) 4672 || (rc = sessionInsertRow(db, zTab, &sApply)) 4673 ){ 4674 break; 4675 } 4676 sApply.bStat1 = 0; 4677 } 4678 } 4679 nTab = sqlite3Strlen30(zTab); 4680 } 4681 } 4682 4683 /* If there is a schema mismatch on the current table, proceed to the 4684 ** next change. A log message has already been issued. */ 4685 if( schemaMismatch ) continue; 4686 4687 rc = sessionApplyOneWithRetry(db, pIter, &sApply, xConflict, pCtx); 4688 } 4689 4690 bPatchset = pIter->bPatchset; 4691 if( rc==SQLITE_OK ){ 4692 rc = sqlite3changeset_finalize(pIter); 4693 }else{ 4694 sqlite3changeset_finalize(pIter); 4695 } 4696 4697 if( rc==SQLITE_OK ){ 4698 rc = sessionRetryConstraints(db, bPatchset, zTab, &sApply, xConflict, pCtx); 4699 } 4700 4701 if( rc==SQLITE_OK ){ 4702 int nFk, notUsed; 4703 sqlite3_db_status(db, SQLITE_DBSTATUS_DEFERRED_FKS, &nFk, ¬Used, 0); 4704 if( nFk!=0 ){ 4705 int res = SQLITE_CHANGESET_ABORT; 4706 sqlite3_changeset_iter sIter; 4707 memset(&sIter, 0, sizeof(sIter)); 4708 sIter.nCol = nFk; 4709 res = xConflict(pCtx, SQLITE_CHANGESET_FOREIGN_KEY, &sIter); 4710 if( res!=SQLITE_CHANGESET_OMIT ){ 4711 rc = SQLITE_CONSTRAINT; 4712 } 4713 } 4714 } 4715 sqlite3_exec(db, "PRAGMA defer_foreign_keys = 0", 0, 0, 0); 4716 4717 if( (flags & SQLITE_CHANGESETAPPLY_NOSAVEPOINT)==0 ){ 4718 if( rc==SQLITE_OK ){ 4719 rc = sqlite3_exec(db, "RELEASE changeset_apply", 0, 0, 0); 4720 }else{ 4721 sqlite3_exec(db, "ROLLBACK TO changeset_apply", 0, 0, 0); 4722 sqlite3_exec(db, "RELEASE changeset_apply", 0, 0, 0); 4723 } 4724 } 4725 4726 assert( sApply.bRebase || sApply.rebase.nBuf==0 ); 4727 if( rc==SQLITE_OK && bPatchset==0 && sApply.bRebase ){ 4728 *ppRebase = (void*)sApply.rebase.aBuf; 4729 *pnRebase = sApply.rebase.nBuf; 4730 sApply.rebase.aBuf = 0; 4731 } 4732 sessionUpdateFree(&sApply); 4733 sqlite3_finalize(sApply.pInsert); 4734 sqlite3_finalize(sApply.pDelete); 4735 sqlite3_finalize(sApply.pSelect); 4736 sqlite3_free((char*)sApply.azCol); /* cast works around VC++ bug */ 4737 sqlite3_free((char*)sApply.constraints.aBuf); 4738 sqlite3_free((char*)sApply.rebase.aBuf); 4739 sqlite3_mutex_leave(sqlite3_db_mutex(db)); 4740 return rc; 4741 } 4742 4743 /* 4744 ** Apply the changeset passed via pChangeset/nChangeset to the main 4745 ** database attached to handle "db". 4746 */ 4747 int sqlite3changeset_apply_v2( 4748 sqlite3 *db, /* Apply change to "main" db of this handle */ 4749 int nChangeset, /* Size of changeset in bytes */ 4750 void *pChangeset, /* Changeset blob */ 4751 int(*xFilter)( 4752 void *pCtx, /* Copy of sixth arg to _apply() */ 4753 const char *zTab /* Table name */ 4754 ), 4755 int(*xConflict)( 4756 void *pCtx, /* Copy of sixth arg to _apply() */ 4757 int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */ 4758 sqlite3_changeset_iter *p /* Handle describing change and conflict */ 4759 ), 4760 void *pCtx, /* First argument passed to xConflict */ 4761 void **ppRebase, int *pnRebase, 4762 int flags 4763 ){ 4764 sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */ 4765 int bInv = !!(flags & SQLITE_CHANGESETAPPLY_INVERT); 4766 int rc = sessionChangesetStart(&pIter, 0, 0, nChangeset, pChangeset, bInv, 1); 4767 if( rc==SQLITE_OK ){ 4768 rc = sessionChangesetApply( 4769 db, pIter, xFilter, xConflict, pCtx, ppRebase, pnRebase, flags 4770 ); 4771 } 4772 return rc; 4773 } 4774 4775 /* 4776 ** Apply the changeset passed via pChangeset/nChangeset to the main database 4777 ** attached to handle "db". Invoke the supplied conflict handler callback 4778 ** to resolve any conflicts encountered while applying the change. 4779 */ 4780 int sqlite3changeset_apply( 4781 sqlite3 *db, /* Apply change to "main" db of this handle */ 4782 int nChangeset, /* Size of changeset in bytes */ 4783 void *pChangeset, /* Changeset blob */ 4784 int(*xFilter)( 4785 void *pCtx, /* Copy of sixth arg to _apply() */ 4786 const char *zTab /* Table name */ 4787 ), 4788 int(*xConflict)( 4789 void *pCtx, /* Copy of fifth arg to _apply() */ 4790 int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */ 4791 sqlite3_changeset_iter *p /* Handle describing change and conflict */ 4792 ), 4793 void *pCtx /* First argument passed to xConflict */ 4794 ){ 4795 return sqlite3changeset_apply_v2( 4796 db, nChangeset, pChangeset, xFilter, xConflict, pCtx, 0, 0, 0 4797 ); 4798 } 4799 4800 /* 4801 ** Apply the changeset passed via xInput/pIn to the main database 4802 ** attached to handle "db". Invoke the supplied conflict handler callback 4803 ** to resolve any conflicts encountered while applying the change. 4804 */ 4805 int sqlite3changeset_apply_v2_strm( 4806 sqlite3 *db, /* Apply change to "main" db of this handle */ 4807 int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */ 4808 void *pIn, /* First arg for xInput */ 4809 int(*xFilter)( 4810 void *pCtx, /* Copy of sixth arg to _apply() */ 4811 const char *zTab /* Table name */ 4812 ), 4813 int(*xConflict)( 4814 void *pCtx, /* Copy of sixth arg to _apply() */ 4815 int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */ 4816 sqlite3_changeset_iter *p /* Handle describing change and conflict */ 4817 ), 4818 void *pCtx, /* First argument passed to xConflict */ 4819 void **ppRebase, int *pnRebase, 4820 int flags 4821 ){ 4822 sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */ 4823 int bInverse = !!(flags & SQLITE_CHANGESETAPPLY_INVERT); 4824 int rc = sessionChangesetStart(&pIter, xInput, pIn, 0, 0, bInverse, 1); 4825 if( rc==SQLITE_OK ){ 4826 rc = sessionChangesetApply( 4827 db, pIter, xFilter, xConflict, pCtx, ppRebase, pnRebase, flags 4828 ); 4829 } 4830 return rc; 4831 } 4832 int sqlite3changeset_apply_strm( 4833 sqlite3 *db, /* Apply change to "main" db of this handle */ 4834 int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */ 4835 void *pIn, /* First arg for xInput */ 4836 int(*xFilter)( 4837 void *pCtx, /* Copy of sixth arg to _apply() */ 4838 const char *zTab /* Table name */ 4839 ), 4840 int(*xConflict)( 4841 void *pCtx, /* Copy of sixth arg to _apply() */ 4842 int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */ 4843 sqlite3_changeset_iter *p /* Handle describing change and conflict */ 4844 ), 4845 void *pCtx /* First argument passed to xConflict */ 4846 ){ 4847 return sqlite3changeset_apply_v2_strm( 4848 db, xInput, pIn, xFilter, xConflict, pCtx, 0, 0, 0 4849 ); 4850 } 4851 4852 /* 4853 ** sqlite3_changegroup handle. 4854 */ 4855 struct sqlite3_changegroup { 4856 int rc; /* Error code */ 4857 int bPatch; /* True to accumulate patchsets */ 4858 SessionTable *pList; /* List of tables in current patch */ 4859 }; 4860 4861 /* 4862 ** This function is called to merge two changes to the same row together as 4863 ** part of an sqlite3changeset_concat() operation. A new change object is 4864 ** allocated and a pointer to it stored in *ppNew. 4865 */ 4866 static int sessionChangeMerge( 4867 SessionTable *pTab, /* Table structure */ 4868 int bRebase, /* True for a rebase hash-table */ 4869 int bPatchset, /* True for patchsets */ 4870 SessionChange *pExist, /* Existing change */ 4871 int op2, /* Second change operation */ 4872 int bIndirect, /* True if second change is indirect */ 4873 u8 *aRec, /* Second change record */ 4874 int nRec, /* Number of bytes in aRec */ 4875 SessionChange **ppNew /* OUT: Merged change */ 4876 ){ 4877 SessionChange *pNew = 0; 4878 int rc = SQLITE_OK; 4879 4880 if( !pExist ){ 4881 pNew = (SessionChange *)sqlite3_malloc64(sizeof(SessionChange) + nRec); 4882 if( !pNew ){ 4883 return SQLITE_NOMEM; 4884 } 4885 memset(pNew, 0, sizeof(SessionChange)); 4886 pNew->op = op2; 4887 pNew->bIndirect = bIndirect; 4888 pNew->aRecord = (u8*)&pNew[1]; 4889 if( bIndirect==0 || bRebase==0 ){ 4890 pNew->nRecord = nRec; 4891 memcpy(pNew->aRecord, aRec, nRec); 4892 }else{ 4893 int i; 4894 u8 *pIn = aRec; 4895 u8 *pOut = pNew->aRecord; 4896 for(i=0; i<pTab->nCol; i++){ 4897 int nIn = sessionSerialLen(pIn); 4898 if( *pIn==0 ){ 4899 *pOut++ = 0; 4900 }else if( pTab->abPK[i]==0 ){ 4901 *pOut++ = 0xFF; 4902 }else{ 4903 memcpy(pOut, pIn, nIn); 4904 pOut += nIn; 4905 } 4906 pIn += nIn; 4907 } 4908 pNew->nRecord = pOut - pNew->aRecord; 4909 } 4910 }else if( bRebase ){ 4911 if( pExist->op==SQLITE_DELETE && pExist->bIndirect ){ 4912 *ppNew = pExist; 4913 }else{ 4914 sqlite3_int64 nByte = nRec + pExist->nRecord + sizeof(SessionChange); 4915 pNew = (SessionChange*)sqlite3_malloc64(nByte); 4916 if( pNew==0 ){ 4917 rc = SQLITE_NOMEM; 4918 }else{ 4919 int i; 4920 u8 *a1 = pExist->aRecord; 4921 u8 *a2 = aRec; 4922 u8 *pOut; 4923 4924 memset(pNew, 0, nByte); 4925 pNew->bIndirect = bIndirect || pExist->bIndirect; 4926 pNew->op = op2; 4927 pOut = pNew->aRecord = (u8*)&pNew[1]; 4928 4929 for(i=0; i<pTab->nCol; i++){ 4930 int n1 = sessionSerialLen(a1); 4931 int n2 = sessionSerialLen(a2); 4932 if( *a1==0xFF || (pTab->abPK[i]==0 && bIndirect) ){ 4933 *pOut++ = 0xFF; 4934 }else if( *a2==0 ){ 4935 memcpy(pOut, a1, n1); 4936 pOut += n1; 4937 }else{ 4938 memcpy(pOut, a2, n2); 4939 pOut += n2; 4940 } 4941 a1 += n1; 4942 a2 += n2; 4943 } 4944 pNew->nRecord = pOut - pNew->aRecord; 4945 } 4946 sqlite3_free(pExist); 4947 } 4948 }else{ 4949 int op1 = pExist->op; 4950 4951 /* 4952 ** op1=INSERT, op2=INSERT -> Unsupported. Discard op2. 4953 ** op1=INSERT, op2=UPDATE -> INSERT. 4954 ** op1=INSERT, op2=DELETE -> (none) 4955 ** 4956 ** op1=UPDATE, op2=INSERT -> Unsupported. Discard op2. 4957 ** op1=UPDATE, op2=UPDATE -> UPDATE. 4958 ** op1=UPDATE, op2=DELETE -> DELETE. 4959 ** 4960 ** op1=DELETE, op2=INSERT -> UPDATE. 4961 ** op1=DELETE, op2=UPDATE -> Unsupported. Discard op2. 4962 ** op1=DELETE, op2=DELETE -> Unsupported. Discard op2. 4963 */ 4964 if( (op1==SQLITE_INSERT && op2==SQLITE_INSERT) 4965 || (op1==SQLITE_UPDATE && op2==SQLITE_INSERT) 4966 || (op1==SQLITE_DELETE && op2==SQLITE_UPDATE) 4967 || (op1==SQLITE_DELETE && op2==SQLITE_DELETE) 4968 ){ 4969 pNew = pExist; 4970 }else if( op1==SQLITE_INSERT && op2==SQLITE_DELETE ){ 4971 sqlite3_free(pExist); 4972 assert( pNew==0 ); 4973 }else{ 4974 u8 *aExist = pExist->aRecord; 4975 sqlite3_int64 nByte; 4976 u8 *aCsr; 4977 4978 /* Allocate a new SessionChange object. Ensure that the aRecord[] 4979 ** buffer of the new object is large enough to hold any record that 4980 ** may be generated by combining the input records. */ 4981 nByte = sizeof(SessionChange) + pExist->nRecord + nRec; 4982 pNew = (SessionChange *)sqlite3_malloc64(nByte); 4983 if( !pNew ){ 4984 sqlite3_free(pExist); 4985 return SQLITE_NOMEM; 4986 } 4987 memset(pNew, 0, sizeof(SessionChange)); 4988 pNew->bIndirect = (bIndirect && pExist->bIndirect); 4989 aCsr = pNew->aRecord = (u8 *)&pNew[1]; 4990 4991 if( op1==SQLITE_INSERT ){ /* INSERT + UPDATE */ 4992 u8 *a1 = aRec; 4993 assert( op2==SQLITE_UPDATE ); 4994 pNew->op = SQLITE_INSERT; 4995 if( bPatchset==0 ) sessionSkipRecord(&a1, pTab->nCol); 4996 sessionMergeRecord(&aCsr, pTab->nCol, aExist, a1); 4997 }else if( op1==SQLITE_DELETE ){ /* DELETE + INSERT */ 4998 assert( op2==SQLITE_INSERT ); 4999 pNew->op = SQLITE_UPDATE; 5000 if( bPatchset ){ 5001 memcpy(aCsr, aRec, nRec); 5002 aCsr += nRec; 5003 }else{ 5004 if( 0==sessionMergeUpdate(&aCsr, pTab, bPatchset, aExist, 0,aRec,0) ){ 5005 sqlite3_free(pNew); 5006 pNew = 0; 5007 } 5008 } 5009 }else if( op2==SQLITE_UPDATE ){ /* UPDATE + UPDATE */ 5010 u8 *a1 = aExist; 5011 u8 *a2 = aRec; 5012 assert( op1==SQLITE_UPDATE ); 5013 if( bPatchset==0 ){ 5014 sessionSkipRecord(&a1, pTab->nCol); 5015 sessionSkipRecord(&a2, pTab->nCol); 5016 } 5017 pNew->op = SQLITE_UPDATE; 5018 if( 0==sessionMergeUpdate(&aCsr, pTab, bPatchset, aRec, aExist,a1,a2) ){ 5019 sqlite3_free(pNew); 5020 pNew = 0; 5021 } 5022 }else{ /* UPDATE + DELETE */ 5023 assert( op1==SQLITE_UPDATE && op2==SQLITE_DELETE ); 5024 pNew->op = SQLITE_DELETE; 5025 if( bPatchset ){ 5026 memcpy(aCsr, aRec, nRec); 5027 aCsr += nRec; 5028 }else{ 5029 sessionMergeRecord(&aCsr, pTab->nCol, aRec, aExist); 5030 } 5031 } 5032 5033 if( pNew ){ 5034 pNew->nRecord = (int)(aCsr - pNew->aRecord); 5035 } 5036 sqlite3_free(pExist); 5037 } 5038 } 5039 5040 *ppNew = pNew; 5041 return rc; 5042 } 5043 5044 /* 5045 ** Add all changes in the changeset traversed by the iterator passed as 5046 ** the first argument to the changegroup hash tables. 5047 */ 5048 static int sessionChangesetToHash( 5049 sqlite3_changeset_iter *pIter, /* Iterator to read from */ 5050 sqlite3_changegroup *pGrp, /* Changegroup object to add changeset to */ 5051 int bRebase /* True if hash table is for rebasing */ 5052 ){ 5053 u8 *aRec; 5054 int nRec; 5055 int rc = SQLITE_OK; 5056 SessionTable *pTab = 0; 5057 5058 while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec, 0) ){ 5059 const char *zNew; 5060 int nCol; 5061 int op; 5062 int iHash; 5063 int bIndirect; 5064 SessionChange *pChange; 5065 SessionChange *pExist = 0; 5066 SessionChange **pp; 5067 5068 if( pGrp->pList==0 ){ 5069 pGrp->bPatch = pIter->bPatchset; 5070 }else if( pIter->bPatchset!=pGrp->bPatch ){ 5071 rc = SQLITE_ERROR; 5072 break; 5073 } 5074 5075 sqlite3changeset_op(pIter, &zNew, &nCol, &op, &bIndirect); 5076 if( !pTab || sqlite3_stricmp(zNew, pTab->zName) ){ 5077 /* Search the list for a matching table */ 5078 int nNew = (int)strlen(zNew); 5079 u8 *abPK; 5080 5081 sqlite3changeset_pk(pIter, &abPK, 0); 5082 for(pTab = pGrp->pList; pTab; pTab=pTab->pNext){ 5083 if( 0==sqlite3_strnicmp(pTab->zName, zNew, nNew+1) ) break; 5084 } 5085 if( !pTab ){ 5086 SessionTable **ppTab; 5087 5088 pTab = sqlite3_malloc64(sizeof(SessionTable) + nCol + nNew+1); 5089 if( !pTab ){ 5090 rc = SQLITE_NOMEM; 5091 break; 5092 } 5093 memset(pTab, 0, sizeof(SessionTable)); 5094 pTab->nCol = nCol; 5095 pTab->abPK = (u8*)&pTab[1]; 5096 memcpy(pTab->abPK, abPK, nCol); 5097 pTab->zName = (char*)&pTab->abPK[nCol]; 5098 memcpy(pTab->zName, zNew, nNew+1); 5099 5100 /* The new object must be linked on to the end of the list, not 5101 ** simply added to the start of it. This is to ensure that the 5102 ** tables within the output of sqlite3changegroup_output() are in 5103 ** the right order. */ 5104 for(ppTab=&pGrp->pList; *ppTab; ppTab=&(*ppTab)->pNext); 5105 *ppTab = pTab; 5106 }else if( pTab->nCol!=nCol || memcmp(pTab->abPK, abPK, nCol) ){ 5107 rc = SQLITE_SCHEMA; 5108 break; 5109 } 5110 } 5111 5112 if( sessionGrowHash(0, pIter->bPatchset, pTab) ){ 5113 rc = SQLITE_NOMEM; 5114 break; 5115 } 5116 iHash = sessionChangeHash( 5117 pTab, (pIter->bPatchset && op==SQLITE_DELETE), aRec, pTab->nChange 5118 ); 5119 5120 /* Search for existing entry. If found, remove it from the hash table. 5121 ** Code below may link it back in. 5122 */ 5123 for(pp=&pTab->apChange[iHash]; *pp; pp=&(*pp)->pNext){ 5124 int bPkOnly1 = 0; 5125 int bPkOnly2 = 0; 5126 if( pIter->bPatchset ){ 5127 bPkOnly1 = (*pp)->op==SQLITE_DELETE; 5128 bPkOnly2 = op==SQLITE_DELETE; 5129 } 5130 if( sessionChangeEqual(pTab, bPkOnly1, (*pp)->aRecord, bPkOnly2, aRec) ){ 5131 pExist = *pp; 5132 *pp = (*pp)->pNext; 5133 pTab->nEntry--; 5134 break; 5135 } 5136 } 5137 5138 rc = sessionChangeMerge(pTab, bRebase, 5139 pIter->bPatchset, pExist, op, bIndirect, aRec, nRec, &pChange 5140 ); 5141 if( rc ) break; 5142 if( pChange ){ 5143 pChange->pNext = pTab->apChange[iHash]; 5144 pTab->apChange[iHash] = pChange; 5145 pTab->nEntry++; 5146 } 5147 } 5148 5149 if( rc==SQLITE_OK ) rc = pIter->rc; 5150 return rc; 5151 } 5152 5153 /* 5154 ** Serialize a changeset (or patchset) based on all changesets (or patchsets) 5155 ** added to the changegroup object passed as the first argument. 5156 ** 5157 ** If xOutput is not NULL, then the changeset/patchset is returned to the 5158 ** user via one or more calls to xOutput, as with the other streaming 5159 ** interfaces. 5160 ** 5161 ** Or, if xOutput is NULL, then (*ppOut) is populated with a pointer to a 5162 ** buffer containing the output changeset before this function returns. In 5163 ** this case (*pnOut) is set to the size of the output buffer in bytes. It 5164 ** is the responsibility of the caller to free the output buffer using 5165 ** sqlite3_free() when it is no longer required. 5166 ** 5167 ** If successful, SQLITE_OK is returned. Or, if an error occurs, an SQLite 5168 ** error code. If an error occurs and xOutput is NULL, (*ppOut) and (*pnOut) 5169 ** are both set to 0 before returning. 5170 */ 5171 static int sessionChangegroupOutput( 5172 sqlite3_changegroup *pGrp, 5173 int (*xOutput)(void *pOut, const void *pData, int nData), 5174 void *pOut, 5175 int *pnOut, 5176 void **ppOut 5177 ){ 5178 int rc = SQLITE_OK; 5179 SessionBuffer buf = {0, 0, 0}; 5180 SessionTable *pTab; 5181 assert( xOutput==0 || (ppOut==0 && pnOut==0) ); 5182 5183 /* Create the serialized output changeset based on the contents of the 5184 ** hash tables attached to the SessionTable objects in list p->pList. 5185 */ 5186 for(pTab=pGrp->pList; rc==SQLITE_OK && pTab; pTab=pTab->pNext){ 5187 int i; 5188 if( pTab->nEntry==0 ) continue; 5189 5190 sessionAppendTableHdr(&buf, pGrp->bPatch, pTab, &rc); 5191 for(i=0; i<pTab->nChange; i++){ 5192 SessionChange *p; 5193 for(p=pTab->apChange[i]; p; p=p->pNext){ 5194 sessionAppendByte(&buf, p->op, &rc); 5195 sessionAppendByte(&buf, p->bIndirect, &rc); 5196 sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc); 5197 if( rc==SQLITE_OK && xOutput && buf.nBuf>=sessions_strm_chunk_size ){ 5198 rc = xOutput(pOut, buf.aBuf, buf.nBuf); 5199 buf.nBuf = 0; 5200 } 5201 } 5202 } 5203 } 5204 5205 if( rc==SQLITE_OK ){ 5206 if( xOutput ){ 5207 if( buf.nBuf>0 ) rc = xOutput(pOut, buf.aBuf, buf.nBuf); 5208 }else{ 5209 *ppOut = buf.aBuf; 5210 *pnOut = buf.nBuf; 5211 buf.aBuf = 0; 5212 } 5213 } 5214 sqlite3_free(buf.aBuf); 5215 5216 return rc; 5217 } 5218 5219 /* 5220 ** Allocate a new, empty, sqlite3_changegroup. 5221 */ 5222 int sqlite3changegroup_new(sqlite3_changegroup **pp){ 5223 int rc = SQLITE_OK; /* Return code */ 5224 sqlite3_changegroup *p; /* New object */ 5225 p = (sqlite3_changegroup*)sqlite3_malloc(sizeof(sqlite3_changegroup)); 5226 if( p==0 ){ 5227 rc = SQLITE_NOMEM; 5228 }else{ 5229 memset(p, 0, sizeof(sqlite3_changegroup)); 5230 } 5231 *pp = p; 5232 return rc; 5233 } 5234 5235 /* 5236 ** Add the changeset currently stored in buffer pData, size nData bytes, 5237 ** to changeset-group p. 5238 */ 5239 int sqlite3changegroup_add(sqlite3_changegroup *pGrp, int nData, void *pData){ 5240 sqlite3_changeset_iter *pIter; /* Iterator opened on pData/nData */ 5241 int rc; /* Return code */ 5242 5243 rc = sqlite3changeset_start(&pIter, nData, pData); 5244 if( rc==SQLITE_OK ){ 5245 rc = sessionChangesetToHash(pIter, pGrp, 0); 5246 } 5247 sqlite3changeset_finalize(pIter); 5248 return rc; 5249 } 5250 5251 /* 5252 ** Obtain a buffer containing a changeset representing the concatenation 5253 ** of all changesets added to the group so far. 5254 */ 5255 int sqlite3changegroup_output( 5256 sqlite3_changegroup *pGrp, 5257 int *pnData, 5258 void **ppData 5259 ){ 5260 return sessionChangegroupOutput(pGrp, 0, 0, pnData, ppData); 5261 } 5262 5263 /* 5264 ** Streaming versions of changegroup_add(). 5265 */ 5266 int sqlite3changegroup_add_strm( 5267 sqlite3_changegroup *pGrp, 5268 int (*xInput)(void *pIn, void *pData, int *pnData), 5269 void *pIn 5270 ){ 5271 sqlite3_changeset_iter *pIter; /* Iterator opened on pData/nData */ 5272 int rc; /* Return code */ 5273 5274 rc = sqlite3changeset_start_strm(&pIter, xInput, pIn); 5275 if( rc==SQLITE_OK ){ 5276 rc = sessionChangesetToHash(pIter, pGrp, 0); 5277 } 5278 sqlite3changeset_finalize(pIter); 5279 return rc; 5280 } 5281 5282 /* 5283 ** Streaming versions of changegroup_output(). 5284 */ 5285 int sqlite3changegroup_output_strm( 5286 sqlite3_changegroup *pGrp, 5287 int (*xOutput)(void *pOut, const void *pData, int nData), 5288 void *pOut 5289 ){ 5290 return sessionChangegroupOutput(pGrp, xOutput, pOut, 0, 0); 5291 } 5292 5293 /* 5294 ** Delete a changegroup object. 5295 */ 5296 void sqlite3changegroup_delete(sqlite3_changegroup *pGrp){ 5297 if( pGrp ){ 5298 sessionDeleteTable(0, pGrp->pList); 5299 sqlite3_free(pGrp); 5300 } 5301 } 5302 5303 /* 5304 ** Combine two changesets together. 5305 */ 5306 int sqlite3changeset_concat( 5307 int nLeft, /* Number of bytes in lhs input */ 5308 void *pLeft, /* Lhs input changeset */ 5309 int nRight /* Number of bytes in rhs input */, 5310 void *pRight, /* Rhs input changeset */ 5311 int *pnOut, /* OUT: Number of bytes in output changeset */ 5312 void **ppOut /* OUT: changeset (left <concat> right) */ 5313 ){ 5314 sqlite3_changegroup *pGrp; 5315 int rc; 5316 5317 rc = sqlite3changegroup_new(&pGrp); 5318 if( rc==SQLITE_OK ){ 5319 rc = sqlite3changegroup_add(pGrp, nLeft, pLeft); 5320 } 5321 if( rc==SQLITE_OK ){ 5322 rc = sqlite3changegroup_add(pGrp, nRight, pRight); 5323 } 5324 if( rc==SQLITE_OK ){ 5325 rc = sqlite3changegroup_output(pGrp, pnOut, ppOut); 5326 } 5327 sqlite3changegroup_delete(pGrp); 5328 5329 return rc; 5330 } 5331 5332 /* 5333 ** Streaming version of sqlite3changeset_concat(). 5334 */ 5335 int sqlite3changeset_concat_strm( 5336 int (*xInputA)(void *pIn, void *pData, int *pnData), 5337 void *pInA, 5338 int (*xInputB)(void *pIn, void *pData, int *pnData), 5339 void *pInB, 5340 int (*xOutput)(void *pOut, const void *pData, int nData), 5341 void *pOut 5342 ){ 5343 sqlite3_changegroup *pGrp; 5344 int rc; 5345 5346 rc = sqlite3changegroup_new(&pGrp); 5347 if( rc==SQLITE_OK ){ 5348 rc = sqlite3changegroup_add_strm(pGrp, xInputA, pInA); 5349 } 5350 if( rc==SQLITE_OK ){ 5351 rc = sqlite3changegroup_add_strm(pGrp, xInputB, pInB); 5352 } 5353 if( rc==SQLITE_OK ){ 5354 rc = sqlite3changegroup_output_strm(pGrp, xOutput, pOut); 5355 } 5356 sqlite3changegroup_delete(pGrp); 5357 5358 return rc; 5359 } 5360 5361 /* 5362 ** Changeset rebaser handle. 5363 */ 5364 struct sqlite3_rebaser { 5365 sqlite3_changegroup grp; /* Hash table */ 5366 }; 5367 5368 /* 5369 ** Buffers a1 and a2 must both contain a sessions module record nCol 5370 ** fields in size. This function appends an nCol sessions module 5371 ** record to buffer pBuf that is a copy of a1, except that for 5372 ** each field that is undefined in a1[], swap in the field from a2[]. 5373 */ 5374 static void sessionAppendRecordMerge( 5375 SessionBuffer *pBuf, /* Buffer to append to */ 5376 int nCol, /* Number of columns in each record */ 5377 u8 *a1, int n1, /* Record 1 */ 5378 u8 *a2, int n2, /* Record 2 */ 5379 int *pRc /* IN/OUT: error code */ 5380 ){ 5381 sessionBufferGrow(pBuf, n1+n2, pRc); 5382 if( *pRc==SQLITE_OK ){ 5383 int i; 5384 u8 *pOut = &pBuf->aBuf[pBuf->nBuf]; 5385 for(i=0; i<nCol; i++){ 5386 int nn1 = sessionSerialLen(a1); 5387 int nn2 = sessionSerialLen(a2); 5388 if( *a1==0 || *a1==0xFF ){ 5389 memcpy(pOut, a2, nn2); 5390 pOut += nn2; 5391 }else{ 5392 memcpy(pOut, a1, nn1); 5393 pOut += nn1; 5394 } 5395 a1 += nn1; 5396 a2 += nn2; 5397 } 5398 5399 pBuf->nBuf = pOut-pBuf->aBuf; 5400 assert( pBuf->nBuf<=pBuf->nAlloc ); 5401 } 5402 } 5403 5404 /* 5405 ** This function is called when rebasing a local UPDATE change against one 5406 ** or more remote UPDATE changes. The aRec/nRec buffer contains the current 5407 ** old.* and new.* records for the change. The rebase buffer (a single 5408 ** record) is in aChange/nChange. The rebased change is appended to buffer 5409 ** pBuf. 5410 ** 5411 ** Rebasing the UPDATE involves: 5412 ** 5413 ** * Removing any changes to fields for which the corresponding field 5414 ** in the rebase buffer is set to "replaced" (type 0xFF). If this 5415 ** means the UPDATE change updates no fields, nothing is appended 5416 ** to the output buffer. 5417 ** 5418 ** * For each field modified by the local change for which the 5419 ** corresponding field in the rebase buffer is not "undefined" (0x00) 5420 ** or "replaced" (0xFF), the old.* value is replaced by the value 5421 ** in the rebase buffer. 5422 */ 5423 static void sessionAppendPartialUpdate( 5424 SessionBuffer *pBuf, /* Append record here */ 5425 sqlite3_changeset_iter *pIter, /* Iterator pointed at local change */ 5426 u8 *aRec, int nRec, /* Local change */ 5427 u8 *aChange, int nChange, /* Record to rebase against */ 5428 int *pRc /* IN/OUT: Return Code */ 5429 ){ 5430 sessionBufferGrow(pBuf, 2+nRec+nChange, pRc); 5431 if( *pRc==SQLITE_OK ){ 5432 int bData = 0; 5433 u8 *pOut = &pBuf->aBuf[pBuf->nBuf]; 5434 int i; 5435 u8 *a1 = aRec; 5436 u8 *a2 = aChange; 5437 5438 *pOut++ = SQLITE_UPDATE; 5439 *pOut++ = pIter->bIndirect; 5440 for(i=0; i<pIter->nCol; i++){ 5441 int n1 = sessionSerialLen(a1); 5442 int n2 = sessionSerialLen(a2); 5443 if( pIter->abPK[i] || a2[0]==0 ){ 5444 if( !pIter->abPK[i] && a1[0] ) bData = 1; 5445 memcpy(pOut, a1, n1); 5446 pOut += n1; 5447 }else if( a2[0]!=0xFF ){ 5448 bData = 1; 5449 memcpy(pOut, a2, n2); 5450 pOut += n2; 5451 }else{ 5452 *pOut++ = '\0'; 5453 } 5454 a1 += n1; 5455 a2 += n2; 5456 } 5457 if( bData ){ 5458 a2 = aChange; 5459 for(i=0; i<pIter->nCol; i++){ 5460 int n1 = sessionSerialLen(a1); 5461 int n2 = sessionSerialLen(a2); 5462 if( pIter->abPK[i] || a2[0]!=0xFF ){ 5463 memcpy(pOut, a1, n1); 5464 pOut += n1; 5465 }else{ 5466 *pOut++ = '\0'; 5467 } 5468 a1 += n1; 5469 a2 += n2; 5470 } 5471 pBuf->nBuf = (pOut - pBuf->aBuf); 5472 } 5473 } 5474 } 5475 5476 /* 5477 ** pIter is configured to iterate through a changeset. This function rebases 5478 ** that changeset according to the current configuration of the rebaser 5479 ** object passed as the first argument. If no error occurs and argument xOutput 5480 ** is not NULL, then the changeset is returned to the caller by invoking 5481 ** xOutput zero or more times and SQLITE_OK returned. Or, if xOutput is NULL, 5482 ** then (*ppOut) is set to point to a buffer containing the rebased changeset 5483 ** before this function returns. In this case (*pnOut) is set to the size of 5484 ** the buffer in bytes. It is the responsibility of the caller to eventually 5485 ** free the (*ppOut) buffer using sqlite3_free(). 5486 ** 5487 ** If an error occurs, an SQLite error code is returned. If ppOut and 5488 ** pnOut are not NULL, then the two output parameters are set to 0 before 5489 ** returning. 5490 */ 5491 static int sessionRebase( 5492 sqlite3_rebaser *p, /* Rebaser hash table */ 5493 sqlite3_changeset_iter *pIter, /* Input data */ 5494 int (*xOutput)(void *pOut, const void *pData, int nData), 5495 void *pOut, /* Context for xOutput callback */ 5496 int *pnOut, /* OUT: Number of bytes in output changeset */ 5497 void **ppOut /* OUT: Inverse of pChangeset */ 5498 ){ 5499 int rc = SQLITE_OK; 5500 u8 *aRec = 0; 5501 int nRec = 0; 5502 int bNew = 0; 5503 SessionTable *pTab = 0; 5504 SessionBuffer sOut = {0,0,0}; 5505 5506 while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec, &bNew) ){ 5507 SessionChange *pChange = 0; 5508 int bDone = 0; 5509 5510 if( bNew ){ 5511 const char *zTab = pIter->zTab; 5512 for(pTab=p->grp.pList; pTab; pTab=pTab->pNext){ 5513 if( 0==sqlite3_stricmp(pTab->zName, zTab) ) break; 5514 } 5515 bNew = 0; 5516 5517 /* A patchset may not be rebased */ 5518 if( pIter->bPatchset ){ 5519 rc = SQLITE_ERROR; 5520 } 5521 5522 /* Append a table header to the output for this new table */ 5523 sessionAppendByte(&sOut, pIter->bPatchset ? 'P' : 'T', &rc); 5524 sessionAppendVarint(&sOut, pIter->nCol, &rc); 5525 sessionAppendBlob(&sOut, pIter->abPK, pIter->nCol, &rc); 5526 sessionAppendBlob(&sOut,(u8*)pIter->zTab,(int)strlen(pIter->zTab)+1,&rc); 5527 } 5528 5529 if( pTab && rc==SQLITE_OK ){ 5530 int iHash = sessionChangeHash(pTab, 0, aRec, pTab->nChange); 5531 5532 for(pChange=pTab->apChange[iHash]; pChange; pChange=pChange->pNext){ 5533 if( sessionChangeEqual(pTab, 0, aRec, 0, pChange->aRecord) ){ 5534 break; 5535 } 5536 } 5537 } 5538 5539 if( pChange ){ 5540 assert( pChange->op==SQLITE_DELETE || pChange->op==SQLITE_INSERT ); 5541 switch( pIter->op ){ 5542 case SQLITE_INSERT: 5543 if( pChange->op==SQLITE_INSERT ){ 5544 bDone = 1; 5545 if( pChange->bIndirect==0 ){ 5546 sessionAppendByte(&sOut, SQLITE_UPDATE, &rc); 5547 sessionAppendByte(&sOut, pIter->bIndirect, &rc); 5548 sessionAppendBlob(&sOut, pChange->aRecord, pChange->nRecord, &rc); 5549 sessionAppendBlob(&sOut, aRec, nRec, &rc); 5550 } 5551 } 5552 break; 5553 5554 case SQLITE_UPDATE: 5555 bDone = 1; 5556 if( pChange->op==SQLITE_DELETE ){ 5557 if( pChange->bIndirect==0 ){ 5558 u8 *pCsr = aRec; 5559 sessionSkipRecord(&pCsr, pIter->nCol); 5560 sessionAppendByte(&sOut, SQLITE_INSERT, &rc); 5561 sessionAppendByte(&sOut, pIter->bIndirect, &rc); 5562 sessionAppendRecordMerge(&sOut, pIter->nCol, 5563 pCsr, nRec-(pCsr-aRec), 5564 pChange->aRecord, pChange->nRecord, &rc 5565 ); 5566 } 5567 }else{ 5568 sessionAppendPartialUpdate(&sOut, pIter, 5569 aRec, nRec, pChange->aRecord, pChange->nRecord, &rc 5570 ); 5571 } 5572 break; 5573 5574 default: 5575 assert( pIter->op==SQLITE_DELETE ); 5576 bDone = 1; 5577 if( pChange->op==SQLITE_INSERT ){ 5578 sessionAppendByte(&sOut, SQLITE_DELETE, &rc); 5579 sessionAppendByte(&sOut, pIter->bIndirect, &rc); 5580 sessionAppendRecordMerge(&sOut, pIter->nCol, 5581 pChange->aRecord, pChange->nRecord, aRec, nRec, &rc 5582 ); 5583 } 5584 break; 5585 } 5586 } 5587 5588 if( bDone==0 ){ 5589 sessionAppendByte(&sOut, pIter->op, &rc); 5590 sessionAppendByte(&sOut, pIter->bIndirect, &rc); 5591 sessionAppendBlob(&sOut, aRec, nRec, &rc); 5592 } 5593 if( rc==SQLITE_OK && xOutput && sOut.nBuf>sessions_strm_chunk_size ){ 5594 rc = xOutput(pOut, sOut.aBuf, sOut.nBuf); 5595 sOut.nBuf = 0; 5596 } 5597 if( rc ) break; 5598 } 5599 5600 if( rc!=SQLITE_OK ){ 5601 sqlite3_free(sOut.aBuf); 5602 memset(&sOut, 0, sizeof(sOut)); 5603 } 5604 5605 if( rc==SQLITE_OK ){ 5606 if( xOutput ){ 5607 if( sOut.nBuf>0 ){ 5608 rc = xOutput(pOut, sOut.aBuf, sOut.nBuf); 5609 } 5610 }else{ 5611 *ppOut = (void*)sOut.aBuf; 5612 *pnOut = sOut.nBuf; 5613 sOut.aBuf = 0; 5614 } 5615 } 5616 sqlite3_free(sOut.aBuf); 5617 return rc; 5618 } 5619 5620 /* 5621 ** Create a new rebaser object. 5622 */ 5623 int sqlite3rebaser_create(sqlite3_rebaser **ppNew){ 5624 int rc = SQLITE_OK; 5625 sqlite3_rebaser *pNew; 5626 5627 pNew = sqlite3_malloc(sizeof(sqlite3_rebaser)); 5628 if( pNew==0 ){ 5629 rc = SQLITE_NOMEM; 5630 }else{ 5631 memset(pNew, 0, sizeof(sqlite3_rebaser)); 5632 } 5633 *ppNew = pNew; 5634 return rc; 5635 } 5636 5637 /* 5638 ** Call this one or more times to configure a rebaser. 5639 */ 5640 int sqlite3rebaser_configure( 5641 sqlite3_rebaser *p, 5642 int nRebase, const void *pRebase 5643 ){ 5644 sqlite3_changeset_iter *pIter = 0; /* Iterator opened on pData/nData */ 5645 int rc; /* Return code */ 5646 rc = sqlite3changeset_start(&pIter, nRebase, (void*)pRebase); 5647 if( rc==SQLITE_OK ){ 5648 rc = sessionChangesetToHash(pIter, &p->grp, 1); 5649 } 5650 sqlite3changeset_finalize(pIter); 5651 return rc; 5652 } 5653 5654 /* 5655 ** Rebase a changeset according to current rebaser configuration 5656 */ 5657 int sqlite3rebaser_rebase( 5658 sqlite3_rebaser *p, 5659 int nIn, const void *pIn, 5660 int *pnOut, void **ppOut 5661 ){ 5662 sqlite3_changeset_iter *pIter = 0; /* Iterator to skip through input */ 5663 int rc = sqlite3changeset_start(&pIter, nIn, (void*)pIn); 5664 5665 if( rc==SQLITE_OK ){ 5666 rc = sessionRebase(p, pIter, 0, 0, pnOut, ppOut); 5667 sqlite3changeset_finalize(pIter); 5668 } 5669 5670 return rc; 5671 } 5672 5673 /* 5674 ** Rebase a changeset according to current rebaser configuration 5675 */ 5676 int sqlite3rebaser_rebase_strm( 5677 sqlite3_rebaser *p, 5678 int (*xInput)(void *pIn, void *pData, int *pnData), 5679 void *pIn, 5680 int (*xOutput)(void *pOut, const void *pData, int nData), 5681 void *pOut 5682 ){ 5683 sqlite3_changeset_iter *pIter = 0; /* Iterator to skip through input */ 5684 int rc = sqlite3changeset_start_strm(&pIter, xInput, pIn); 5685 5686 if( rc==SQLITE_OK ){ 5687 rc = sessionRebase(p, pIter, xOutput, pOut, 0, 0); 5688 sqlite3changeset_finalize(pIter); 5689 } 5690 5691 return rc; 5692 } 5693 5694 /* 5695 ** Destroy a rebaser object 5696 */ 5697 void sqlite3rebaser_delete(sqlite3_rebaser *p){ 5698 if( p ){ 5699 sessionDeleteTable(0, p->grp.pList); 5700 sqlite3_free(p); 5701 } 5702 } 5703 5704 /* 5705 ** Global configuration 5706 */ 5707 int sqlite3session_config(int op, void *pArg){ 5708 int rc = SQLITE_OK; 5709 switch( op ){ 5710 case SQLITE_SESSION_CONFIG_STRMSIZE: { 5711 int *pInt = (int*)pArg; 5712 if( *pInt>0 ){ 5713 sessions_strm_chunk_size = *pInt; 5714 } 5715 *pInt = sessions_strm_chunk_size; 5716 break; 5717 } 5718 default: 5719 rc = SQLITE_MISUSE; 5720 break; 5721 } 5722 return rc; 5723 } 5724 5725 #endif /* SQLITE_ENABLE_SESSION && SQLITE_ENABLE_PREUPDATE_HOOK */ 5726