xref: /redis-3.2.3/src/hyperloglog.c (revision fcd2155b)
1 /* hyperloglog.c - Redis HyperLogLog probabilistic cardinality approximation.
2  * This file implements the algorithm and the exported Redis commands.
3  *
4  * Copyright (c) 2014, Salvatore Sanfilippo <antirez at gmail dot com>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  *   * Redistributions of source code must retain the above copyright notice,
11  *     this list of conditions and the following disclaimer.
12  *   * Redistributions in binary form must reproduce the above copyright
13  *     notice, this list of conditions and the following disclaimer in the
14  *     documentation and/or other materials provided with the distribution.
15  *   * Neither the name of Redis nor the names of its contributors may be used
16  *     to endorse or promote products derived from this software without
17  *     specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31 
32 #include "redis.h"
33 
34 #include <stdint.h>
35 #include <math.h>
36 
37 /* The Redis HyperLogLog implementation is based on the following ideas:
38  *
39  * * The use of a 64 bit hash function as proposed in [1], in order to don't
40  *   limited to cardinalities up to 10^9, at the cost of just 1 additional
41  *   bit per register.
42  * * The use of 16384 6-bit registers for a great level of accuracy, using
43  *   a total of 12k per key.
44  * * The use of the Redis string data type. No new type is introduced.
45  * * No attempt is made to compress the data structure as in [1]. Also the
46  *   algorithm used is the original HyperLogLog Algorithm as in [2], with
47  *   the only difference that a 64 bit hash function is used, so no correction
48  *   is performed for values near 2^32 as in [1].
49  *
50  * [1] Heule, Nunkesser, Hall: HyperLogLog in Practice: Algorithmic
51  *     Engineering of a State of The Art Cardinality Estimation Algorithm.
52  *
53  * [2] P. Flajolet, Éric Fusy, O. Gandouet, and F. Meunier. Hyperloglog: The
54  *     analysis of a near-optimal cardinality estimation algorithm.
55  *
56  * Redis uses two representations:
57  *
58  * 1) A "dense" representation where every entry is represented by
59  *    a 6-bit integer.
60  * 2) A "sparse" representation using run length compression suitable
61  *    for representing HyperLogLogs with many registers set to 0 in
62  *    a memory efficient way.
63  *
64  *
65  * HLL header
66  * ===
67  *
68  * Both the dense and sparse representation have a 16 byte header as follows:
69  *
70  * +------+---+-----+----------+
71  * | HYLL | E | N/U | Cardin.  |
72  * +------+---+-----+----------+
73  *
74  * The first 4 bytes are a magic string set to the bytes "HYLL".
75  * "E" is one byte encoding, currently set to HLL_DENSE or
76  * HLL_SPARSE. N/U are three not used bytes.
77  *
78  * The "Cardin." field is a 64 bit integer stored in little endian format
79  * with the latest cardinality computed that can be reused if the data
80  * structure was not modified since the last computation (this is useful
81  * because there are high probabilities that HLLADD operations don't
82  * modify the actual data structure and hence the approximated cardinality).
83  *
84  * When the most significant bit in the most significant byte of the cached
85  * cardinality is set, it means that the data structure was modified and
86  * we can't reuse the cached value that must be recomputed.
87  *
88  * Dense representation
89  * ===
90  *
91  * The dense representation used by Redis is the following:
92  *
93  * +--------+--------+--------+------//      //--+
94  * |11000000|22221111|33333322|55444444 ....     |
95  * +--------+--------+--------+------//      //--+
96  *
97  * The 6 bits counters are encoded one after the other starting from the
98  * LSB to the MSB, and using the next bytes as needed.
99  *
100  * Sparse representation
101  * ===
102  *
103  * The sparse representation encodes registers using a run length
104  * encoding composed of three opcodes, two using one byte, and one using
105  * of two bytes. The opcodes are called ZERO, XZERO and VAL.
106  *
107  * ZERO opcode is represented as 00xxxxxx. The 6-bit integer represented
108  * by the six bits 'xxxxxx', plus 1, means that there are N registers set
109  * to 0. This opcode can represent from 1 to 64 contiguous registers set
110  * to the value of 0.
111  *
112  * XZERO opcode is represented by two bytes 01xxxxxx yyyyyyyy. The 14-bit
113  * integer represented by the bits 'xxxxxx' as most significant bits and
114  * 'yyyyyyyy' as least significant bits, plus 1, means that there are N
115  * registers set to 0. This opcode can represent from 0 to 16384 contiguous
116  * registers set to the value of 0.
117  *
118  * VAL opcode is represented as 1vvvvvxx. It contains a 5-bit integer
119  * representing the value of a register, and a 2-bit integer representing
120  * the number of contiguous registers set to that value 'vvvvv'.
121  * To obtain the value and run length, the integers vvvvv and xx must be
122  * incremented by one. This opcode can represent values from 1 to 32,
123  * repeated from 1 to 4 times.
124  *
125  * The sparse representation can't represent registers with a value greater
126  * than 32, however it is very unlikely that we find such a register in an
127  * HLL with a cardinality where the sparse representation is still more
128  * memory efficient than the dense representation. When this happens the
129  * HLL is converted to the dense representation.
130  *
131  * The sparse representation is purely positional. For example a sparse
132  * representation of an empty HLL is just: XZERO:16384.
133  *
134  * An HLL having only 3 non-zero registers at position 1000, 1020, 1021
135  * respectively set to 2, 3, 3, is represented by the following three
136  * opcodes:
137  *
138  * XZERO:1000 (Registers 0-999 are set to 0)
139  * VAL:2,1    (1 register set to value 2, that is register 1000)
140  * ZERO:19    (Registers 1001-1019 set to 0)
141  * VAL:3,2    (2 registers set to value 3, that is registers 1020,1021)
142  * XZERO:15362 (Registers 1022-16383 set to 0)
143  *
144  * In the example the sparse representation used just 7 bytes instead
145  * of 12k in order to represent the HLL registers. In general for low
146  * cardinality there is a big win in terms of space efficiency, traded
147  * with CPU time since the sparse representation is slower to access:
148  *
149  * The following table shows average cardinality vs bytes used, 100
150  * samples per cardinality (when the set was not representable because
151  * of registers with too big value, the dense representation size was used
152  * as a sample).
153  *
154  * 100 267
155  * 200 485
156  * 300 678
157  * 400 859
158  * 500 1033
159  * 600 1205
160  * 700 1375
161  * 800 1544
162  * 900 1713
163  * 1000 1882
164  * 2000 3480
165  * 3000 4879
166  * 4000 6089
167  * 5000 7138
168  * 6000 8042
169  * 7000 8823
170  * 8000 9500
171  * 9000 10088
172  * 10000 10591
173  *
174  * The dense representation uses 12288 bytes, so there is a big win up to
175  * a cardinality of ~2000-3000. For bigger cardinalities the constant times
176  * involved in updating the sparse representation is not justified by the
177  * memory savings. The exact maximum length of the sparse representation
178  * when this implementation switches to the dense representation is
179  * configured via the define server.hll_sparse_max_bytes.
180  */
181 
182 struct hllhdr {
183     char magic[4];      /* "HYLL" */
184     uint8_t encoding;   /* HLL_DENSE or HLL_SPARSE. */
185     uint8_t notused[3]; /* Reserved for future use, must be zero. */
186     uint8_t card[8];    /* Cached cardinality, little endian. */
187     uint8_t registers[]; /* Data bytes. */
188 };
189 
190 /* The cached cardinality MSB is used to signal validity of the cached value. */
191 #define HLL_INVALIDATE_CACHE(hdr) (hdr)->card[0] |= (1<<7)
192 #define HLL_VALID_CACHE(hdr) (((hdr)->card[0] & (1<<7)) == 0)
193 
194 #define HLL_P 14 /* The greater is P, the smaller the error. */
195 #define HLL_REGISTERS (1<<HLL_P) /* With P=14, 16384 registers. */
196 #define HLL_P_MASK (HLL_REGISTERS-1) /* Mask to index register. */
197 #define HLL_BITS 6 /* Enough to count up to 63 leading zeroes. */
198 #define HLL_REGISTER_MAX ((1<<HLL_BITS)-1)
199 #define HLL_HDR_SIZE sizeof(struct hllhdr)
200 #define HLL_DENSE_SIZE (HLL_HDR_SIZE+((HLL_REGISTERS*HLL_BITS+7)/8))
201 #define HLL_DENSE 0 /* Dense encoding */
202 #define HLL_SPARSE 1 /* Sparse encoding */
203 #define HLL_MAX_ENCODING 1
204 
205 static char *invalid_hll_err = "-INVALIDOBJ Corrupted HLL object detected\r\n";
206 
207 /* =========================== Low level bit macros ========================= */
208 
209 /* Macros to access the dense representation.
210  *
211  * We need to get and set 6 bit counters in an array of 8 bit bytes.
212  * We use macros to make sure the code is inlined since speed is critical
213  * especially in order to compute the approximated cardinality in
214  * HLLCOUNT where we need to access all the registers at once.
215  * For the same reason we also want to avoid conditionals in this code path.
216  *
217  * +--------+--------+--------+------//
218  * |11000000|22221111|33333322|55444444
219  * +--------+--------+--------+------//
220  *
221  * Note: in the above representation the most significant bit (MSB)
222  * of every byte is on the left. We start using bits from the LSB to MSB,
223  * and so forth passing to the next byte.
224  *
225  * Example, we want to access to counter at pos = 1 ("111111" in the
226  * illustration above).
227  *
228  * The index of the first byte b0 containing our data is:
229  *
230  *  b0 = 6 * pos / 8 = 0
231  *
232  *   +--------+
233  *   |11000000|  <- Our byte at b0
234  *   +--------+
235  *
236  * The position of the first bit (counting from the LSB = 0) in the byte
237  * is given by:
238  *
239  *  fb = 6 * pos % 8 -> 6
240  *
241  * Right shift b0 of 'fb' bits.
242  *
243  *   +--------+
244  *   |11000000|  <- Initial value of b0
245  *   |00000011|  <- After right shift of 6 pos.
246  *   +--------+
247  *
248  * Left shift b1 of bits 8-fb bits (2 bits)
249  *
250  *   +--------+
251  *   |22221111|  <- Initial value of b1
252  *   |22111100|  <- After left shift of 2 bits.
253  *   +--------+
254  *
255  * OR the two bits, and finally AND with 111111 (63 in decimal) to
256  * clean the higher order bits we are not interested in:
257  *
258  *   +--------+
259  *   |00000011|  <- b0 right shifted
260  *   |22111100|  <- b1 left shifted
261  *   |22111111|  <- b0 OR b1
262  *   |  111111|  <- (b0 OR b1) AND 63, our value.
263  *   +--------+
264  *
265  * We can try with a different example, like pos = 0. In this case
266  * the 6-bit counter is actually contained in a single byte.
267  *
268  *  b0 = 6 * pos / 8 = 0
269  *
270  *   +--------+
271  *   |11000000|  <- Our byte at b0
272  *   +--------+
273  *
274  *  fb = 6 * pos % 8 = 0
275  *
276  *  So we right shift of 0 bits (no shift in practice) and
277  *  left shift the next byte of 8 bits, even if we don't use it,
278  *  but this has the effect of clearing the bits so the result
279  *  will not be affacted after the OR.
280  *
281  * -------------------------------------------------------------------------
282  *
283  * Setting the register is a bit more complex, let's assume that 'val'
284  * is the value we want to set, already in the right range.
285  *
286  * We need two steps, in one we need to clear the bits, and in the other
287  * we need to bitwise-OR the new bits.
288  *
289  * Let's try with 'pos' = 1, so our first byte at 'b' is 0,
290  *
291  * "fb" is 6 in this case.
292  *
293  *   +--------+
294  *   |11000000|  <- Our byte at b0
295  *   +--------+
296  *
297  * To create a AND-mask to clear the bits about this position, we just
298  * initialize the mask with the value 63, left shift it of "fs" bits,
299  * and finally invert the result.
300  *
301  *   +--------+
302  *   |00111111|  <- "mask" starts at 63
303  *   |11000000|  <- "mask" after left shift of "ls" bits.
304  *   |00111111|  <- "mask" after invert.
305  *   +--------+
306  *
307  * Now we can bitwise-AND the byte at "b" with the mask, and bitwise-OR
308  * it with "val" left-shifted of "ls" bits to set the new bits.
309  *
310  * Now let's focus on the next byte b1:
311  *
312  *   +--------+
313  *   |22221111|  <- Initial value of b1
314  *   +--------+
315  *
316  * To build the AND mask we start again with the 63 value, right shift
317  * it by 8-fb bits, and invert it.
318  *
319  *   +--------+
320  *   |00111111|  <- "mask" set at 2&6-1
321  *   |00001111|  <- "mask" after the right shift by 8-fb = 2 bits
322  *   |11110000|  <- "mask" after bitwise not.
323  *   +--------+
324  *
325  * Now we can mask it with b+1 to clear the old bits, and bitwise-OR
326  * with "val" left-shifted by "rs" bits to set the new value.
327  */
328 
329 /* Note: if we access the last counter, we will also access the b+1 byte
330  * that is out of the array, but sds strings always have an implicit null
331  * term, so the byte exists, and we can skip the conditional (or the need
332  * to allocate 1 byte more explicitly). */
333 
334 /* Store the value of the register at position 'regnum' into variable 'target'.
335  * 'p' is an array of unsigned bytes. */
336 #define HLL_DENSE_GET_REGISTER(target,p,regnum) do { \
337     uint8_t *_p = (uint8_t*) p; \
338     unsigned long _byte = regnum*HLL_BITS/8; \
339     unsigned long _fb = regnum*HLL_BITS&7; \
340     unsigned long _fb8 = 8 - _fb; \
341     unsigned long b0 = _p[_byte]; \
342     unsigned long b1 = _p[_byte+1]; \
343     target = ((b0 >> _fb) | (b1 << _fb8)) & HLL_REGISTER_MAX; \
344 } while(0)
345 
346 /* Set the value of the register at position 'regnum' to 'val'.
347  * 'p' is an array of unsigned bytes. */
348 #define HLL_DENSE_SET_REGISTER(p,regnum,val) do { \
349     uint8_t *_p = (uint8_t*) p; \
350     unsigned long _byte = regnum*HLL_BITS/8; \
351     unsigned long _fb = regnum*HLL_BITS&7; \
352     unsigned long _fb8 = 8 - _fb; \
353     unsigned long _v = val; \
354     _p[_byte] &= ~(HLL_REGISTER_MAX << _fb); \
355     _p[_byte] |= _v << _fb; \
356     _p[_byte+1] &= ~(HLL_REGISTER_MAX >> _fb8); \
357     _p[_byte+1] |= _v >> _fb8; \
358 } while(0)
359 
360 /* Macros to access the sparse representation.
361  * The macros parameter is expected to be an uint8_t pointer. */
362 #define HLL_SPARSE_XZERO_BIT 0x40 /* 01xxxxxx */
363 #define HLL_SPARSE_VAL_BIT 0x80 /* 1vvvvvxx */
364 #define HLL_SPARSE_IS_ZERO(p) (((*(p)) & 0xc0) == 0) /* 00xxxxxx */
365 #define HLL_SPARSE_IS_XZERO(p) (((*(p)) & 0xc0) == HLL_SPARSE_XZERO_BIT)
366 #define HLL_SPARSE_IS_VAL(p) ((*(p)) & HLL_SPARSE_VAL_BIT)
367 #define HLL_SPARSE_ZERO_LEN(p) (((*(p)) & 0x3f)+1)
368 #define HLL_SPARSE_XZERO_LEN(p) (((((*(p)) & 0x3f) << 8) | (*((p)+1)))+1)
369 #define HLL_SPARSE_VAL_VALUE(p) ((((*(p)) >> 2) & 0x1f)+1)
370 #define HLL_SPARSE_VAL_LEN(p) (((*(p)) & 0x3)+1)
371 #define HLL_SPARSE_VAL_MAX_VALUE 32
372 #define HLL_SPARSE_VAL_MAX_LEN 4
373 #define HLL_SPARSE_ZERO_MAX_LEN 64
374 #define HLL_SPARSE_XZERO_MAX_LEN 16384
375 #define HLL_SPARSE_VAL_SET(p,val,len) do { \
376     *(p) = (((val)-1)<<2|((len)-1))|HLL_SPARSE_VAL_BIT; \
377 } while(0)
378 #define HLL_SPARSE_ZERO_SET(p,len) do { \
379     *(p) = (len)-1; \
380 } while(0)
381 #define HLL_SPARSE_XZERO_SET(p,len) do { \
382     int _l = (len)-1; \
383     *(p) = (_l>>8) | HLL_SPARSE_XZERO_BIT; \
384     *((p)+1) = (_l&0xff); \
385 } while(0)
386 
387 /* ========================= HyperLogLog algorithm  ========================= */
388 
389 /* Our hash function is MurmurHash2, 64 bit version.
390  * It was modified for Redis in order to provide the same result in
391  * big and little endian archs (endian neutral). */
392 uint64_t MurmurHash64A (const void * key, int len, unsigned int seed) {
393     const uint64_t m = 0xc6a4a7935bd1e995;
394     const int r = 47;
395     uint64_t h = seed ^ (len * m);
396     const uint8_t *data = (const uint8_t *)key;
397     const uint8_t *end = data + (len-(len&7));
398 
399     while(data != end) {
400         uint64_t k;
401 
402 #if (BYTE_ORDER == LITTLE_ENDIAN)
403         k = *((uint64_t*)data);
404 #else
405         k = (uint64_t) data[0];
406         k |= (uint64_t) data[1] << 8;
407         k |= (uint64_t) data[2] << 16;
408         k |= (uint64_t) data[3] << 24;
409         k |= (uint64_t) data[4] << 32;
410         k |= (uint64_t) data[5] << 40;
411         k |= (uint64_t) data[6] << 48;
412         k |= (uint64_t) data[7] << 56;
413 #endif
414 
415         k *= m;
416         k ^= k >> r;
417         k *= m;
418         h ^= k;
419         h *= m;
420         data += 8;
421     }
422 
423     switch(len & 7) {
424     case 7: h ^= (uint64_t)data[6] << 48;
425     case 6: h ^= (uint64_t)data[5] << 40;
426     case 5: h ^= (uint64_t)data[4] << 32;
427     case 4: h ^= (uint64_t)data[3] << 24;
428     case 3: h ^= (uint64_t)data[2] << 16;
429     case 2: h ^= (uint64_t)data[1] << 8;
430     case 1: h ^= (uint64_t)data[0];
431             h *= m;
432     };
433 
434     h ^= h >> r;
435     h *= m;
436     h ^= h >> r;
437     return h;
438 }
439 
440 /* Given a string element to add to the HyperLogLog, returns the length
441  * of the pattern 000..1 of the element hash. As a side effect 'regp' is
442  * set to the register index this element hashes to. */
443 int hllPatLen(unsigned char *ele, size_t elesize, long *regp) {
444     uint64_t hash, bit, index;
445     int count;
446 
447     /* Count the number of zeroes starting from bit HLL_REGISTERS
448      * (that is a power of two corresponding to the first bit we don't use
449      * as index). The max run can be 64-P+1 bits.
450      *
451      * Note that the final "1" ending the sequence of zeroes must be
452      * included in the count, so if we find "001" the count is 3, and
453      * the smallest count possible is no zeroes at all, just a 1 bit
454      * at the first position, that is a count of 1.
455      *
456      * This may sound like inefficient, but actually in the average case
457      * there are high probabilities to find a 1 after a few iterations. */
458     hash = MurmurHash64A(ele,elesize,0xadc83b19ULL);
459     index = hash & HLL_P_MASK; /* Register index. */
460     hash |= ((uint64_t)1<<63); /* Make sure the loop terminates. */
461     bit = HLL_REGISTERS; /* First bit not used to address the register. */
462     count = 1; /* Initialized to 1 since we count the "00000...1" pattern. */
463     while((hash & bit) == 0) {
464         count++;
465         bit <<= 1;
466     }
467     *regp = (int) index;
468     return count;
469 }
470 
471 /* ================== Dense representation implementation  ================== */
472 
473 /* "Add" the element in the dense hyperloglog data structure.
474  * Actually nothing is added, but the max 0 pattern counter of the subset
475  * the element belongs to is incremented if needed.
476  *
477  * 'registers' is expected to have room for HLL_REGISTERS plus an
478  * additional byte on the right. This requirement is met by sds strings
479  * automatically since they are implicitly null terminated.
480  *
481  * The function always succeed, however if as a result of the operation
482  * the approximated cardinality changed, 1 is returned. Otherwise 0
483  * is returned. */
484 int hllDenseAdd(uint8_t *registers, unsigned char *ele, size_t elesize) {
485     uint8_t oldcount, count;
486     long index;
487 
488     /* Update the register if this element produced a longer run of zeroes. */
489     count = hllPatLen(ele,elesize,&index);
490     HLL_DENSE_GET_REGISTER(oldcount,registers,index);
491     if (count > oldcount) {
492         HLL_DENSE_SET_REGISTER(registers,index,count);
493         return 1;
494     } else {
495         return 0;
496     }
497 }
498 
499 /* Compute SUM(2^-reg) in the dense representation.
500  * PE is an array with a pre-computer table of values 2^-reg indexed by reg.
501  * As a side effect the integer pointed by 'ezp' is set to the number
502  * of zero registers. */
503 double hllDenseSum(uint8_t *registers, double *PE, int *ezp) {
504     double E = 0;
505     int j, ez = 0;
506 
507     /* Redis default is to use 16384 registers 6 bits each. The code works
508      * with other values by modifying the defines, but for our target value
509      * we take a faster path with unrolled loops. */
510     if (HLL_REGISTERS == 16384 && HLL_BITS == 6) {
511         uint8_t *r = registers;
512         unsigned long r0, r1, r2, r3, r4, r5, r6, r7, r8, r9,
513                       r10, r11, r12, r13, r14, r15;
514         for (j = 0; j < 1024; j++) {
515             /* Handle 16 registers per iteration. */
516             r0 = r[0] & 63; if (r0 == 0) ez++;
517             r1 = (r[0] >> 6 | r[1] << 2) & 63; if (r1 == 0) ez++;
518             r2 = (r[1] >> 4 | r[2] << 4) & 63; if (r2 == 0) ez++;
519             r3 = (r[2] >> 2) & 63; if (r3 == 0) ez++;
520             r4 = r[3] & 63; if (r4 == 0) ez++;
521             r5 = (r[3] >> 6 | r[4] << 2) & 63; if (r5 == 0) ez++;
522             r6 = (r[4] >> 4 | r[5] << 4) & 63; if (r6 == 0) ez++;
523             r7 = (r[5] >> 2) & 63; if (r7 == 0) ez++;
524             r8 = r[6] & 63; if (r8 == 0) ez++;
525             r9 = (r[6] >> 6 | r[7] << 2) & 63; if (r9 == 0) ez++;
526             r10 = (r[7] >> 4 | r[8] << 4) & 63; if (r10 == 0) ez++;
527             r11 = (r[8] >> 2) & 63; if (r11 == 0) ez++;
528             r12 = r[9] & 63; if (r12 == 0) ez++;
529             r13 = (r[9] >> 6 | r[10] << 2) & 63; if (r13 == 0) ez++;
530             r14 = (r[10] >> 4 | r[11] << 4) & 63; if (r14 == 0) ez++;
531             r15 = (r[11] >> 2) & 63; if (r15 == 0) ez++;
532 
533             /* Additional parens will allow the compiler to optimize the
534              * code more with a loss of precision that is not very relevant
535              * here (floating point math is not commutative!). */
536             E += (PE[r0] + PE[r1]) + (PE[r2] + PE[r3]) + (PE[r4] + PE[r5]) +
537                  (PE[r6] + PE[r7]) + (PE[r8] + PE[r9]) + (PE[r10] + PE[r11]) +
538                  (PE[r12] + PE[r13]) + (PE[r14] + PE[r15]);
539             r += 12;
540         }
541     } else {
542         for (j = 0; j < HLL_REGISTERS; j++) {
543             unsigned long reg;
544 
545             HLL_DENSE_GET_REGISTER(reg,registers,j);
546             if (reg == 0) {
547                 ez++;
548                 E += 1; /* 2^(-reg[j]) is 1 when m is 0. */
549             } else {
550                 E += PE[reg]; /* Precomputed 2^(-reg[j]). */
551             }
552         }
553     }
554     *ezp = ez;
555     return E;
556 }
557 
558 /* ================== Sparse representation implementation  ================= */
559 
560 /* Convert the HLL with sparse representation given as input in its dense
561  * representation. Both representations are represented by SDS strings, and
562  * the input representation is freed as a side effect.
563  *
564  * The function returns REDIS_OK if the sparse representation was valid,
565  * otherwise REDIS_ERR is returned if the representation was corrupted. */
566 int hllSparseToDense(robj *o) {
567     sds sparse = o->ptr, dense;
568     struct hllhdr *hdr, *oldhdr = (struct hllhdr*)sparse;
569     int idx = 0, runlen, regval;
570     uint8_t *p = (uint8_t*)sparse, *end = p+sdslen(sparse);
571 
572     /* If the representation is already the right one return ASAP. */
573     hdr = (struct hllhdr*) sparse;
574     if (hdr->encoding == HLL_DENSE) return REDIS_OK;
575 
576     /* Create a string of the right size filled with zero bytes.
577      * Note that the cached cardinality is set to 0 as a side effect
578      * that is exactly the cardinality of an empty HLL. */
579     dense = sdsnewlen(NULL,HLL_DENSE_SIZE);
580     hdr = (struct hllhdr*) dense;
581     *hdr = *oldhdr; /* This will copy the magic and cached cardinality. */
582     hdr->encoding = HLL_DENSE;
583 
584     /* Now read the sparse representation and set non-zero registers
585      * accordingly. */
586     p += HLL_HDR_SIZE;
587     while(p < end) {
588         if (HLL_SPARSE_IS_ZERO(p)) {
589             runlen = HLL_SPARSE_ZERO_LEN(p);
590             idx += runlen;
591             p++;
592         } else if (HLL_SPARSE_IS_XZERO(p)) {
593             runlen = HLL_SPARSE_XZERO_LEN(p);
594             idx += runlen;
595             p += 2;
596         } else {
597             runlen = HLL_SPARSE_VAL_LEN(p);
598             regval = HLL_SPARSE_VAL_VALUE(p);
599             while(runlen--) {
600                 HLL_DENSE_SET_REGISTER(hdr->registers,idx,regval);
601                 idx++;
602             }
603             p++;
604         }
605     }
606 
607     /* If the sparse representation was valid, we expect to find idx
608      * set to HLL_REGISTERS. */
609     if (idx != HLL_REGISTERS) {
610         sdsfree(dense);
611         return REDIS_ERR;
612     }
613 
614     /* Free the old representation and set the new one. */
615     sdsfree(o->ptr);
616     o->ptr = dense;
617     return REDIS_OK;
618 }
619 
620 /* "Add" the element in the sparse hyperloglog data structure.
621  * Actually nothing is added, but the max 0 pattern counter of the subset
622  * the element belongs to is incremented if needed.
623  *
624  * The object 'o' is the String object holding the HLL. The function requires
625  * a reference to the object in order to be able to enlarge the string if
626  * needed.
627  *
628  * On success, the function returns 1 if the cardinality changed, or 0
629  * if the register for this element was not updated.
630  * On error (if the representation is invalid) -1 is returned.
631  *
632  * As a side effect the function may promote the HLL representation from
633  * sparse to dense: this happens when a register requires to be set to a value
634  * not representable with the sparse representation, or when the resulting
635  * size would be greater than server.hll_sparse_max_bytes. */
636 int hllSparseAdd(robj *o, unsigned char *ele, size_t elesize) {
637     struct hllhdr *hdr;
638     uint8_t oldcount, count, *sparse, *end, *p, *prev, *next;
639     long index, first, span;
640     long is_zero = 0, is_xzero = 0, is_val = 0, runlen = 0;
641 
642     /* Update the register if this element produced a longer run of zeroes. */
643     count = hllPatLen(ele,elesize,&index);
644 
645     /* If the count is too big to be representable by the sparse representation
646      * switch to dense representation. */
647     if (count > HLL_SPARSE_VAL_MAX_VALUE) goto promote;
648 
649     /* When updating a sparse representation, sometimes we may need to
650      * enlarge the buffer for up to 3 bytes in the worst case (XZERO split
651      * into XZERO-VAL-XZERO). Make sure there is enough space right now
652      * so that the pointers we take during the execution of the function
653      * will be valid all the time. */
654     o->ptr = sdsMakeRoomFor(o->ptr,3);
655 
656     /* Step 1: we need to locate the opcode we need to modify to check
657      * if a value update is actually needed. */
658     sparse = p = ((uint8_t*)o->ptr) + HLL_HDR_SIZE;
659     end = p + sdslen(o->ptr) - HLL_HDR_SIZE;
660 
661     first = 0;
662     prev = NULL; /* Points to previos opcode at the end of the loop. */
663     next = NULL; /* Points to the next opcode at the end of the loop. */
664     span = 0;
665     while(p < end) {
666         long oplen;
667 
668         /* Set span to the number of registers covered by this opcode.
669          *
670          * This is the most performance critical loop of the sparse
671          * representation. Sorting the conditionals from the most to the
672          * least frequent opcode in many-bytes sparse HLLs is faster. */
673         oplen = 1;
674         if (HLL_SPARSE_IS_ZERO(p)) {
675             span = HLL_SPARSE_ZERO_LEN(p);
676         } else if (HLL_SPARSE_IS_VAL(p)) {
677             span = HLL_SPARSE_VAL_LEN(p);
678         } else { /* XZERO. */
679             span = HLL_SPARSE_XZERO_LEN(p);
680             oplen = 2;
681         }
682         /* Break if this opcode covers the register as 'index'. */
683         if (index <= first+span-1) break;
684         prev = p;
685         p += oplen;
686         first += span;
687     }
688     if (span == 0) return -1; /* Invalid format. */
689 
690     next = HLL_SPARSE_IS_XZERO(p) ? p+2 : p+1;
691     if (next >= end) next = NULL;
692 
693     /* Cache current opcode type to avoid using the macro again and
694      * again for something that will not change.
695      * Also cache the run-length of the opcode. */
696     if (HLL_SPARSE_IS_ZERO(p)) {
697         is_zero = 1;
698         runlen = HLL_SPARSE_ZERO_LEN(p);
699     } else if (HLL_SPARSE_IS_XZERO(p)) {
700         is_xzero = 1;
701         runlen = HLL_SPARSE_XZERO_LEN(p);
702     } else {
703         is_val = 1;
704         runlen = HLL_SPARSE_VAL_LEN(p);
705     }
706 
707     /* Step 2: After the loop:
708      *
709      * 'first' stores to the index of the first register covered
710      *  by the current opcode, which is pointed by 'p'.
711      *
712      * 'next' ad 'prev' store respectively the next and previous opcode,
713      *  or NULL if the opcode at 'p' is respectively the last or first.
714      *
715      * 'span' is set to the number of registers covered by the current
716      *  opcode.
717      *
718      * There are different cases in order to update the data structure
719      * in place without generating it from scratch:
720      *
721      * A) If it is a VAL opcode already set to a value >= our 'count'
722      *    no update is needed, regardless of the VAL run-length field.
723      *    In this case PFADD returns 0 since no changes are performed.
724      *
725      * B) If it is a VAL opcode with len = 1 (representing only our
726      *    register) and the value is less than 'count', we just update it
727      *    since this is a trivial case. */
728     if (is_val) {
729         oldcount = HLL_SPARSE_VAL_VALUE(p);
730         /* Case A. */
731         if (oldcount >= count) return 0;
732 
733         /* Case B. */
734         if (runlen == 1) {
735             HLL_SPARSE_VAL_SET(p,count,1);
736             goto updated;
737         }
738     }
739 
740     /* C) Another trivial to handle case is a ZERO opcode with a len of 1.
741      * We can just replace it with a VAL opcode with our value and len of 1. */
742     if (is_zero && runlen == 1) {
743         HLL_SPARSE_VAL_SET(p,count,1);
744         goto updated;
745     }
746 
747     /* D) General case.
748      *
749      * The other cases are more complex: our register requires to be updated
750      * and is either currently represented by a VAL opcode with len > 1,
751      * by a ZERO opcode with len > 1, or by an XZERO opcode.
752      *
753      * In those cases the original opcode must be split into muliple
754      * opcodes. The worst case is an XZERO split in the middle resuling into
755      * XZERO - VAL - XZERO, so the resulting sequence max length is
756      * 5 bytes.
757      *
758      * We perform the split writing the new sequence into the 'new' buffer
759      * with 'newlen' as length. Later the new sequence is inserted in place
760      * of the old one, possibly moving what is on the right a few bytes
761      * if the new sequence is longer than the older one. */
762     uint8_t seq[5], *n = seq;
763     int last = first+span-1; /* Last register covered by the sequence. */
764     int len;
765 
766     if (is_zero || is_xzero) {
767         /* Handle splitting of ZERO / XZERO. */
768         if (index != first) {
769             len = index-first;
770             if (len > HLL_SPARSE_ZERO_MAX_LEN) {
771                 HLL_SPARSE_XZERO_SET(n,len);
772                 n += 2;
773             } else {
774                 HLL_SPARSE_ZERO_SET(n,len);
775                 n++;
776             }
777         }
778         HLL_SPARSE_VAL_SET(n,count,1);
779         n++;
780         if (index != last) {
781             len = last-index;
782             if (len > HLL_SPARSE_ZERO_MAX_LEN) {
783                 HLL_SPARSE_XZERO_SET(n,len);
784                 n += 2;
785             } else {
786                 HLL_SPARSE_ZERO_SET(n,len);
787                 n++;
788             }
789         }
790     } else {
791         /* Handle splitting of VAL. */
792         int curval = HLL_SPARSE_VAL_VALUE(p);
793 
794         if (index != first) {
795             len = index-first;
796             HLL_SPARSE_VAL_SET(n,curval,len);
797             n++;
798         }
799         HLL_SPARSE_VAL_SET(n,count,1);
800         n++;
801         if (index != last) {
802             len = last-index;
803             HLL_SPARSE_VAL_SET(n,curval,len);
804             n++;
805         }
806     }
807 
808     /* Step 3: substitute the new sequence with the old one.
809      *
810      * Note that we already allocated space on the sds string
811      * calling sdsMakeRoomFor(). */
812      int seqlen = n-seq;
813      int oldlen = is_xzero ? 2 : 1;
814      int deltalen = seqlen-oldlen;
815 
816      if (deltalen > 0 &&
817          sdslen(o->ptr)+deltalen > server.hll_sparse_max_bytes) goto promote;
818      if (deltalen && next) memmove(next+deltalen,next,end-next);
819      sdsIncrLen(o->ptr,deltalen);
820      memcpy(p,seq,seqlen);
821      end += deltalen;
822 
823 updated:
824     /* Step 4: Merge adjacent values if possible.
825      *
826      * The representation was updated, however the resulting representation
827      * may not be optimal: adjacent VAL opcodes can sometimes be merged into
828      * a single one. */
829     p = prev ? prev : sparse;
830     int scanlen = 5; /* Scan up to 5 upcodes starting from prev. */
831     while (p < end && scanlen--) {
832         if (HLL_SPARSE_IS_XZERO(p)) {
833             p += 2;
834             continue;
835         } else if (HLL_SPARSE_IS_ZERO(p)) {
836             p++;
837             continue;
838         }
839         /* We need two adjacent VAL opcodes to try a merge, having
840          * the same value, and a len that fits the VAL opcode max len. */
841         if (p+1 < end && HLL_SPARSE_IS_VAL(p+1)) {
842             int v1 = HLL_SPARSE_VAL_VALUE(p);
843             int v2 = HLL_SPARSE_VAL_VALUE(p+1);
844             if (v1 == v2) {
845                 int len = HLL_SPARSE_VAL_LEN(p)+HLL_SPARSE_VAL_LEN(p+1);
846                 if (len <= HLL_SPARSE_VAL_MAX_LEN) {
847                     HLL_SPARSE_VAL_SET(p+1,v1,len);
848                     memmove(p,p+1,end-p);
849                     sdsIncrLen(o->ptr,-1);
850                     end--;
851                     /* After a merge we reiterate without incrementing 'p'
852                      * in order to try to merge the just merged value with
853                      * a value on its right. */
854                     continue;
855                 }
856             }
857         }
858         p++;
859     }
860 
861     /* Invalidate the cached cardinality. */
862     hdr = o->ptr;
863     HLL_INVALIDATE_CACHE(hdr);
864     return 1;
865 
866 promote: /* Promote to dense representation. */
867     if (hllSparseToDense(o) == REDIS_ERR) return -1; /* Corrupted HLL. */
868     hdr = o->ptr;
869 
870     /* We need to call hllDenseAdd() to perform the operation after the
871      * conversion. However the result must be 1, since if we need to
872      * convert from sparse to dense a register requires to be updated.
873      *
874      * Note that this in turn means that PFADD will make sure the command
875      * is propagated to slaves / AOF, so if there is a sparse -> dense
876      * convertion, it will be performed in all the slaves as well. */
877     int dense_retval = hllDenseAdd(hdr->registers, ele, elesize);
878     redisAssert(dense_retval == 1);
879     return dense_retval;
880 }
881 
882 /* Compute SUM(2^-reg) in the sparse representation.
883  * PE is an array with a pre-computer table of values 2^-reg indexed by reg.
884  * As a side effect the integer pointed by 'ezp' is set to the number
885  * of zero registers. */
886 double hllSparseSum(uint8_t *sparse, int sparselen, double *PE, int *ezp, int *invalid) {
887     double E = 0;
888     int ez = 0, idx = 0, runlen, regval;
889     uint8_t *end = sparse+sparselen, *p = sparse;
890 
891     while(p < end) {
892         if (HLL_SPARSE_IS_ZERO(p)) {
893             runlen = HLL_SPARSE_ZERO_LEN(p);
894             idx += runlen;
895             ez += runlen;
896             E += 1*runlen; /* 2^(-reg[j]) is 1 when m is 0. */
897             p++;
898         } else if (HLL_SPARSE_IS_XZERO(p)) {
899             runlen = HLL_SPARSE_XZERO_LEN(p);
900             idx += runlen;
901             ez += runlen;
902             E += 1*runlen; /* 2^(-reg[j]) is 1 when m is 0. */
903             p += 2;
904         } else {
905             runlen = HLL_SPARSE_VAL_LEN(p);
906             regval = HLL_SPARSE_VAL_VALUE(p);
907             idx += runlen;
908             E += PE[regval]*runlen;
909             p++;
910         }
911     }
912     if (idx != HLL_REGISTERS && invalid) *invalid = 1;
913     *ezp = ez;
914     return E;
915 }
916 
917 /* ========================= HyperLogLog Count ==============================
918  * This is the core of the algorithm where the approximated count is computed.
919  * The function uses the lower level hllDenseSum() and hllSparseSum() functions
920  * as helpers to compute the SUM(2^-reg) part of the computation, which is
921  * representation-specific, while all the rest is common. */
922 
923 /* Return the approximated cardinality of the set based on the armonic
924  * mean of the registers values. 'hdr' points to the start of the SDS
925  * representing the String object holding the HLL representation.
926  *
927  * If the sparse representation of the HLL object is not valid, the integer
928  * pointed by 'invalid' is set to non-zero, otherwise it is left untouched. */
929 uint64_t hllCount(struct hllhdr *hdr, int *invalid) {
930     double m = HLL_REGISTERS;
931     double E, alpha = 0.7213/(1+1.079/m);
932     int j, ez; /* Number of registers equal to 0. */
933 
934     /* We precompute 2^(-reg[j]) in a small table in order to
935      * speedup the computation of SUM(2^-register[0..i]). */
936     static int initialized = 0;
937     static double PE[64];
938     if (!initialized) {
939         PE[0] = 1; /* 2^(-reg[j]) is 1 when m is 0. */
940         for (j = 1; j < 64; j++) {
941             /* 2^(-reg[j]) is the same as 1/2^reg[j]. */
942             PE[j] = 1.0/(1ULL << j);
943         }
944         initialized = 1;
945     }
946 
947     /* Compute SUM(2^-register[0..i]). */
948     if (hdr->encoding == HLL_DENSE) {
949         E = hllDenseSum(hdr->registers,PE,&ez);
950     } else {
951         E = hllSparseSum(hdr->registers,
952                          sdslen((sds)hdr)-HLL_HDR_SIZE,PE,&ez,invalid);
953     }
954 
955     /* Muliply the inverse of E for alpha_m * m^2 to have the raw estimate. */
956     E = (1/E)*alpha*m*m;
957 
958     /* Use the LINEARCOUNTING algorithm for small cardinalities.
959      * For larger values but up to 72000 HyperLogLog raw approximation is
960      * used since linear counting error starts to increase. However HyperLogLog
961      * shows a strong bias in the range 2.5*16384 - 72000, so we try to
962      * compensate for it. */
963     if (E < m*2.5 && ez != 0) {
964         E = m*log(m/ez); /* LINEARCOUNTING() */
965     } else if (m == 16384 && E < 72000) {
966         /* We did polynomial regression of the bias for this range, this
967          * way we can compute the bias for a given cardinality and correct
968          * according to it. Only apply the correction for P=14 that's what
969          * we use and the value the correction was verified with. */
970         double bias = 5.9119*1.0e-18*(E*E*E*E)
971                       -1.4253*1.0e-12*(E*E*E)+
972                       1.2940*1.0e-7*(E*E)
973                       -5.2921*1.0e-3*E+
974                       83.3216;
975         E -= E*(bias/100);
976     }
977     /* We don't apply the correction for E > 1/30 of 2^32 since we use
978      * a 64 bit function and 6 bit counters. To apply the correction for
979      * 1/30 of 2^64 is not needed since it would require a huge set
980      * to approach such a value. */
981     return (uint64_t) E;
982 }
983 
984 /* Call hllDenseAdd() or hllSparseAdd() according to the HLL encoding. */
985 int hllAdd(robj *o, unsigned char *ele, size_t elesize) {
986     struct hllhdr *hdr = o->ptr;
987     switch(hdr->encoding) {
988     case HLL_DENSE: return hllDenseAdd(hdr->registers,ele,elesize);
989     case HLL_SPARSE: return hllSparseAdd(o,ele,elesize);
990     default: return -1; /* Invalid representation. */
991     }
992 }
993 
994 /* Merge by computing MAX(registers[i],hll[i]) the HyperLogLog 'hll'
995  * with an array of uint8_t HLL_REGISTERS registers pointed by 'max'.
996  *
997  * The hll object must be already validated via isHLLObjectOrReply()
998  * or in some other way.
999  *
1000  * If the HyperLogLog is sparse and is found to be invalid, REDIS_ERR
1001  * is returned, otherwise the function always succeeds. */
1002 int hllMerge(uint8_t *max, robj *hll) {
1003     struct hllhdr *hdr = hll->ptr;
1004     int i;
1005 
1006     if (hdr->encoding == HLL_DENSE) {
1007         uint8_t val;
1008 
1009         for (i = 0; i < HLL_REGISTERS; i++) {
1010             HLL_DENSE_GET_REGISTER(val,hdr->registers,i);
1011             if (val > max[i]) max[i] = val;
1012         }
1013     } else {
1014         uint8_t *p = hll->ptr, *end = p + sdslen(hll->ptr);
1015         long runlen, regval;
1016 
1017         p += HLL_HDR_SIZE;
1018         i = 0;
1019         while(p < end) {
1020             if (HLL_SPARSE_IS_ZERO(p)) {
1021                 runlen = HLL_SPARSE_ZERO_LEN(p);
1022                 i += runlen;
1023                 p++;
1024             } else if (HLL_SPARSE_IS_XZERO(p)) {
1025                 runlen = HLL_SPARSE_XZERO_LEN(p);
1026                 i += runlen;
1027                 p += 2;
1028             } else {
1029                 runlen = HLL_SPARSE_VAL_LEN(p);
1030                 regval = HLL_SPARSE_VAL_VALUE(p);
1031                 while(runlen--) {
1032                     if (regval > max[i]) max[i] = regval;
1033                     i++;
1034                 }
1035                 p++;
1036             }
1037         }
1038         if (i != HLL_REGISTERS) return REDIS_ERR;
1039     }
1040     return REDIS_OK;
1041 }
1042 
1043 /* ========================== HyperLogLog commands ========================== */
1044 
1045 /* Create an HLL object. We always create the HLL using sparse encoding.
1046  * This will be upgraded to the dense representation as needed. */
1047 robj *createHLLObject(void) {
1048     robj *o;
1049     struct hllhdr *hdr;
1050     sds s;
1051     uint8_t *p;
1052     int sparselen = HLL_HDR_SIZE +
1053                     (((HLL_REGISTERS+(HLL_SPARSE_XZERO_MAX_LEN-1)) /
1054                      HLL_SPARSE_XZERO_MAX_LEN)*2);
1055     int aux;
1056 
1057     /* Populate the sparse representation with as many XZERO opcodes as
1058      * needed to represent all the registers. */
1059     aux = HLL_REGISTERS;
1060     s = sdsnewlen(NULL,sparselen);
1061     p = (uint8_t*)s + HLL_HDR_SIZE;
1062     while(aux) {
1063         int xzero = HLL_SPARSE_XZERO_MAX_LEN;
1064         if (xzero > aux) xzero = aux;
1065         HLL_SPARSE_XZERO_SET(p,xzero);
1066         p += 2;
1067         aux -= xzero;
1068     }
1069     redisAssert((p-(uint8_t*)s) == sparselen);
1070 
1071     /* Create the actual object. */
1072     o = createObject(REDIS_STRING,s);
1073     hdr = o->ptr;
1074     memcpy(hdr->magic,"HYLL",4);
1075     hdr->encoding = HLL_SPARSE;
1076     return o;
1077 }
1078 
1079 /* Check if the object is a String with a valid HLL representation.
1080  * Return REDIS_OK if this is true, otherwise reply to the client
1081  * with an error and return REDIS_ERR. */
1082 int isHLLObjectOrReply(redisClient *c, robj *o) {
1083     struct hllhdr *hdr;
1084 
1085     /* Key exists, check type */
1086     if (checkType(c,o,REDIS_STRING))
1087         return REDIS_ERR; /* Error already sent. */
1088 
1089     if (stringObjectLen(o) < sizeof(*hdr)) goto invalid;
1090     hdr = o->ptr;
1091 
1092     /* Magic should be "HYLL". */
1093     if (hdr->magic[0] != 'H' || hdr->magic[1] != 'Y' ||
1094         hdr->magic[2] != 'L' || hdr->magic[3] != 'L') goto invalid;
1095 
1096     if (hdr->encoding > HLL_MAX_ENCODING) goto invalid;
1097 
1098     /* Dense representation string length should match exactly. */
1099     if (hdr->encoding == HLL_DENSE &&
1100         stringObjectLen(o) != HLL_DENSE_SIZE) goto invalid;
1101 
1102     /* All tests passed. */
1103     return REDIS_OK;
1104 
1105 invalid:
1106     addReplySds(c,
1107         sdsnew("-WRONGTYPE Key is not a valid "
1108                "HyperLogLog string value.\r\n"));
1109     return REDIS_ERR;
1110 }
1111 
1112 /* PFADD var ele ele ele ... ele => :0 or :1 */
1113 void pfaddCommand(redisClient *c) {
1114     robj *o = lookupKeyWrite(c->db,c->argv[1]);
1115     struct hllhdr *hdr;
1116     int updated = 0, j;
1117 
1118     if (o == NULL) {
1119         /* Create the key with a string value of the exact length to
1120          * hold our HLL data structure. sdsnewlen() when NULL is passed
1121          * is guaranteed to return bytes initialized to zero. */
1122         o = createHLLObject();
1123         dbAdd(c->db,c->argv[1],o);
1124         updated++;
1125     } else {
1126         if (isHLLObjectOrReply(c,o) != REDIS_OK) return;
1127         o = dbUnshareStringValue(c->db,c->argv[1],o);
1128     }
1129     /* Perform the low level ADD operation for every element. */
1130     for (j = 2; j < c->argc; j++) {
1131         int retval = hllAdd(o, (unsigned char*)c->argv[j]->ptr,
1132                                sdslen(c->argv[j]->ptr));
1133         switch(retval) {
1134         case 1:
1135             updated++;
1136             break;
1137         case -1:
1138             addReplySds(c,sdsnew(invalid_hll_err));
1139             return;
1140         }
1141     }
1142     hdr = o->ptr;
1143     if (updated) {
1144         signalModifiedKey(c->db,c->argv[1]);
1145         notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);
1146         server.dirty++;
1147         HLL_INVALIDATE_CACHE(hdr);
1148     }
1149     addReply(c, updated ? shared.cone : shared.czero);
1150 }
1151 
1152 /* PFCOUNT var -> approximated cardinality of set. */
1153 void pfcountCommand(redisClient *c) {
1154     robj *o = lookupKeyRead(c->db,c->argv[1]);
1155     struct hllhdr *hdr;
1156     uint64_t card;
1157 
1158     if (o == NULL) {
1159         /* No key? Cardinality is zero since no element was added, otherwise
1160          * we would have a key as HLLADD creates it as a side effect. */
1161         addReply(c,shared.czero);
1162     } else {
1163         if (isHLLObjectOrReply(c,o) != REDIS_OK) return;
1164         o = dbUnshareStringValue(c->db,c->argv[1],o);
1165 
1166         /* Check if the cached cardinality is valid. */
1167         hdr = o->ptr;
1168         if (HLL_VALID_CACHE(hdr)) {
1169             /* Just return the cached value. */
1170             card = (uint64_t)hdr->card[0];
1171             card |= (uint64_t)hdr->card[1] << 8;
1172             card |= (uint64_t)hdr->card[2] << 16;
1173             card |= (uint64_t)hdr->card[3] << 24;
1174             card |= (uint64_t)hdr->card[4] << 32;
1175             card |= (uint64_t)hdr->card[5] << 40;
1176             card |= (uint64_t)hdr->card[6] << 48;
1177             card |= (uint64_t)hdr->card[7] << 56;
1178         } else {
1179             int invalid = 0;
1180             /* Recompute it and update the cached value. */
1181             card = hllCount(hdr,&invalid);
1182             if (invalid) {
1183                 addReplySds(c,sdsnew(invalid_hll_err));
1184                 return;
1185             }
1186             hdr->card[0] = card & 0xff;
1187             hdr->card[1] = (card >> 8) & 0xff;
1188             hdr->card[2] = (card >> 16) & 0xff;
1189             hdr->card[3] = (card >> 24) & 0xff;
1190             hdr->card[4] = (card >> 32) & 0xff;
1191             hdr->card[5] = (card >> 40) & 0xff;
1192             hdr->card[6] = (card >> 48) & 0xff;
1193             hdr->card[7] = (card >> 56) & 0xff;
1194             /* This is not considered a read-only command even if the
1195              * data structure is not modified, since the cached value
1196              * may be modified and given that the HLL is a Redis string
1197              * we need to propagate the change. */
1198             signalModifiedKey(c->db,c->argv[1]);
1199             server.dirty++;
1200         }
1201         addReplyLongLong(c,card);
1202     }
1203 }
1204 
1205 /* PFMERGE dest src1 src2 src3 ... srcN => OK */
1206 void pfmergeCommand(redisClient *c) {
1207     uint8_t max[HLL_REGISTERS];
1208     struct hllhdr *hdr;
1209     int j;
1210 
1211     /* Compute an HLL with M[i] = MAX(M[i]_j).
1212      * We we the maximum into the max array of registers. We'll write
1213      * it to the target variable later. */
1214     memset(max,0,sizeof(max));
1215     for (j = 1; j < c->argc; j++) {
1216         /* Check type and size. */
1217         robj *o = lookupKeyRead(c->db,c->argv[j]);
1218         if (o == NULL) continue; /* Assume empty HLL for non existing var. */
1219         if (isHLLObjectOrReply(c,o) != REDIS_OK) return;
1220 
1221         /* Merge with this HLL with our 'max' HHL by setting max[i]
1222          * to MAX(max[i],hll[i]). */
1223         if (hllMerge(max,o) == REDIS_ERR) {
1224             addReplySds(c,sdsnew(invalid_hll_err));
1225             return;
1226         }
1227     }
1228 
1229     /* Create / unshare the destination key's value if needed. */
1230     robj *o = lookupKeyWrite(c->db,c->argv[1]);
1231     if (o == NULL) {
1232         /* Create the key with a string value of the exact length to
1233          * hold our HLL data structure. sdsnewlen() when NULL is passed
1234          * is guaranteed to return bytes initialized to zero. */
1235         o = createHLLObject();
1236         dbAdd(c->db,c->argv[1],o);
1237     } else {
1238         /* If key exists we are sure it's of the right type/size
1239          * since we checked when merging the different HLLs, so we
1240          * don't check again. */
1241         o = dbUnshareStringValue(c->db,c->argv[1],o);
1242     }
1243 
1244     /* Only support dense objects as destination. */
1245     if (hllSparseToDense(o) == REDIS_ERR) {
1246         addReplySds(c,sdsnew(invalid_hll_err));
1247         return;
1248     }
1249 
1250     /* Write the resulting HLL to the destination HLL registers and
1251      * invalidate the cached value. */
1252     hdr = o->ptr;
1253     for (j = 0; j < HLL_REGISTERS; j++) {
1254         HLL_DENSE_SET_REGISTER(hdr->registers,j,max[j]);
1255     }
1256     HLL_INVALIDATE_CACHE(hdr);
1257 
1258     signalModifiedKey(c->db,c->argv[1]);
1259     /* We generate an PFADD event for PFMERGE for semantical simplicity
1260      * since in theory this is a mass-add of elements. */
1261     notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);
1262     server.dirty++;
1263     addReply(c,shared.ok);
1264 }
1265 
1266 /* ========================== Testing / Debugging  ========================== */
1267 
1268 /* PFSELFTEST
1269  * This command performs a self-test of the HLL registers implementation.
1270  * Something that is not easy to test from within the outside. */
1271 #define HLL_TEST_CYCLES 1000
1272 void pfselftestCommand(redisClient *c) {
1273     int j, i;
1274     sds bitcounters = sdsnewlen(NULL,HLL_DENSE_SIZE);
1275     struct hllhdr *hdr = (struct hllhdr*) bitcounters, *hdr2;
1276     robj *o = NULL;
1277     uint8_t bytecounters[HLL_REGISTERS];
1278 
1279     /* Test 1: access registers.
1280      * The test is conceived to test that the different counters of our data
1281      * structure are accessible and that setting their values both result in
1282      * the correct value to be retained and not affect adjacent values. */
1283     for (j = 0; j < HLL_TEST_CYCLES; j++) {
1284         /* Set the HLL counters and an array of unsigned byes of the
1285          * same size to the same set of random values. */
1286         for (i = 0; i < HLL_REGISTERS; i++) {
1287             unsigned int r = rand() & HLL_REGISTER_MAX;
1288 
1289             bytecounters[i] = r;
1290             HLL_DENSE_SET_REGISTER(hdr->registers,i,r);
1291         }
1292         /* Check that we are able to retrieve the same values. */
1293         for (i = 0; i < HLL_REGISTERS; i++) {
1294             unsigned int val;
1295 
1296             HLL_DENSE_GET_REGISTER(val,hdr->registers,i);
1297             if (val != bytecounters[i]) {
1298                 addReplyErrorFormat(c,
1299                     "TESTFAILED Register %d should be %d but is %d",
1300                     i, (int) bytecounters[i], (int) val);
1301                 goto cleanup;
1302             }
1303         }
1304     }
1305 
1306     /* Test 2: approximation error.
1307      * The test adds unique elements and check that the estimated value
1308      * is always reasonable bounds.
1309      *
1310      * We check that the error is smaller than 4 times than the expected
1311      * standard error, to make it very unlikely for the test to fail because
1312      * of a "bad" run.
1313      *
1314      * The test is performed with both dense and sparse HLLs at the same
1315      * time also verifying that the computed cardinality is the same. */
1316     memset(hdr->registers,0,HLL_DENSE_SIZE-HLL_HDR_SIZE);
1317     o = createHLLObject();
1318     double relerr = 1.04/sqrt(HLL_REGISTERS);
1319     int64_t checkpoint = 1;
1320     uint64_t seed = (uint64_t)rand() | (uint64_t)rand() << 32;
1321     uint64_t ele;
1322     for (j = 1; j <= 10000000; j++) {
1323         ele = j ^ seed;
1324         hllDenseAdd(hdr->registers,(unsigned char*)&ele,sizeof(ele));
1325         hllAdd(o,(unsigned char*)&ele,sizeof(ele));
1326 
1327         /* Make sure that for small cardinalities we use sparse
1328          * encoding. */
1329         if (j == checkpoint && j < server.hll_sparse_max_bytes/2) {
1330             hdr2 = o->ptr;
1331             if (hdr2->encoding != HLL_SPARSE) {
1332                 addReplyError(c, "TESTFAILED sparse encoding not used");
1333                 goto cleanup;
1334             }
1335         }
1336 
1337         /* Check that dense and sparse representations agree. */
1338         if (j == checkpoint && hllCount(hdr,NULL) != hllCount(o->ptr,NULL)) {
1339                 addReplyError(c, "TESTFAILED dense/sparse disagree");
1340                 goto cleanup;
1341         }
1342 
1343         /* Check error. */
1344         if (j == checkpoint) {
1345             int64_t abserr = checkpoint - (int64_t)hllCount(hdr,NULL);
1346             if (abserr < 0) abserr = -abserr;
1347             if (abserr > (uint64_t)(relerr*4*checkpoint)) {
1348                 addReplyErrorFormat(c,
1349                     "TESTFAILED Too big error. card:%llu abserr:%llu",
1350                     (unsigned long long) checkpoint,
1351                     (unsigned long long) abserr);
1352                 goto cleanup;
1353             }
1354             checkpoint *= 10;
1355         }
1356     }
1357 
1358     /* Success! */
1359     addReply(c,shared.ok);
1360 
1361 cleanup:
1362     sdsfree(bitcounters);
1363     if (o) decrRefCount(o);
1364 }
1365 
1366 /* PFDEBUG <subcommand> <key> ... args ...
1367  * Different debugging related operations about the HLL implementation. */
1368 void pfdebugCommand(redisClient *c) {
1369     char *cmd = c->argv[1]->ptr;
1370     struct hllhdr *hdr;
1371     robj *o;
1372     int j;
1373 
1374     o = lookupKeyRead(c->db,c->argv[2]);
1375     if (o == NULL) {
1376         addReplyError(c,"The specified key does not exist");
1377         return;
1378     }
1379     if (isHLLObjectOrReply(c,o) != REDIS_OK) return;
1380     o = dbUnshareStringValue(c->db,c->argv[2],o);
1381     hdr = o->ptr;
1382 
1383     /* PFDEBUG GETREG <key> */
1384     if (!strcasecmp(cmd,"getreg")) {
1385         if (c->argc != 3) goto arityerr;
1386 
1387         if (hdr->encoding == HLL_SPARSE) {
1388             if (hllSparseToDense(o) == REDIS_ERR) {
1389                 addReplySds(c,sdsnew(invalid_hll_err));
1390                 return;
1391             }
1392             server.dirty++; /* Force propagation on encoding change. */
1393         }
1394 
1395         hdr = o->ptr;
1396         addReplyMultiBulkLen(c,HLL_REGISTERS);
1397         for (j = 0; j < HLL_REGISTERS; j++) {
1398             uint8_t val;
1399 
1400             HLL_DENSE_GET_REGISTER(val,hdr->registers,j);
1401             addReplyLongLong(c,val);
1402         }
1403     }
1404     /* PFDEBUG DECODE <key> */
1405     else if (!strcasecmp(cmd,"decode")) {
1406         if (c->argc != 3) goto arityerr;
1407 
1408         uint8_t *p = o->ptr, *end = p+sdslen(o->ptr);
1409         sds decoded = sdsempty();
1410 
1411         if (hdr->encoding != HLL_SPARSE) {
1412             addReplyError(c,"HLL encoding is not sparse");
1413             return;
1414         }
1415 
1416         p += HLL_HDR_SIZE;
1417         while(p < end) {
1418             int runlen, regval;
1419 
1420             if (HLL_SPARSE_IS_ZERO(p)) {
1421                 runlen = HLL_SPARSE_ZERO_LEN(p);
1422                 p++;
1423                 decoded = sdscatprintf(decoded,"z:%d ",runlen);
1424             } else if (HLL_SPARSE_IS_XZERO(p)) {
1425                 runlen = HLL_SPARSE_XZERO_LEN(p);
1426                 p += 2;
1427                 decoded = sdscatprintf(decoded,"Z:%d ",runlen);
1428             } else {
1429                 runlen = HLL_SPARSE_VAL_LEN(p);
1430                 regval = HLL_SPARSE_VAL_VALUE(p);
1431                 p++;
1432                 decoded = sdscatprintf(decoded,"v:%d,%d ",regval,runlen);
1433             }
1434         }
1435         decoded = sdstrim(decoded," ");
1436         addReplyBulkCBuffer(c,decoded,sdslen(decoded));
1437         sdsfree(decoded);
1438     }
1439     /* PFDEBUG ENCODING <key> */
1440     else if (!strcasecmp(cmd,"encoding")) {
1441         char *encodingstr[2] = {"dense","sparse"};
1442         if (c->argc != 3) goto arityerr;
1443 
1444         addReplyStatus(c,encodingstr[hdr->encoding]);
1445     }
1446     /* PFDEBUG TODENSE <key> */
1447     else if (!strcasecmp(cmd,"todense")) {
1448         int conv = 0;
1449         if (c->argc != 3) goto arityerr;
1450 
1451         if (hdr->encoding == HLL_SPARSE) {
1452             if (hllSparseToDense(o) == REDIS_ERR) {
1453                 addReplySds(c,sdsnew(invalid_hll_err));
1454                 return;
1455             }
1456             conv = 1;
1457             server.dirty++; /* Force propagation on encoding change. */
1458         }
1459         addReply(c,conv ? shared.cone : shared.czero);
1460     } else {
1461         addReplyErrorFormat(c,"Unknown PFDEBUG subcommand '%s'", cmd);
1462     }
1463     return;
1464 
1465 arityerr:
1466     addReplyErrorFormat(c,
1467         "Wrong number of arguments for the '%s' subcommand",cmd);
1468 }
1469 
1470