1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <[email protected]> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/trace_seq.h> 13 #include <linux/spinlock.h> 14 #include <linux/irq_work.h> 15 #include <linux/security.h> 16 #include <linux/uaccess.h> 17 #include <linux/hardirq.h> 18 #include <linux/kthread.h> /* for self test */ 19 #include <linux/module.h> 20 #include <linux/percpu.h> 21 #include <linux/mutex.h> 22 #include <linux/delay.h> 23 #include <linux/slab.h> 24 #include <linux/init.h> 25 #include <linux/hash.h> 26 #include <linux/list.h> 27 #include <linux/cpu.h> 28 #include <linux/oom.h> 29 30 #include <asm/local.h> 31 32 /* 33 * The "absolute" timestamp in the buffer is only 59 bits. 34 * If a clock has the 5 MSBs set, it needs to be saved and 35 * reinserted. 36 */ 37 #define TS_MSB (0xf8ULL << 56) 38 #define ABS_TS_MASK (~TS_MSB) 39 40 static void update_pages_handler(struct work_struct *work); 41 42 /* 43 * The ring buffer header is special. We must manually up keep it. 44 */ 45 int ring_buffer_print_entry_header(struct trace_seq *s) 46 { 47 trace_seq_puts(s, "# compressed entry header\n"); 48 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 49 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 50 trace_seq_puts(s, "\tarray : 32 bits\n"); 51 trace_seq_putc(s, '\n'); 52 trace_seq_printf(s, "\tpadding : type == %d\n", 53 RINGBUF_TYPE_PADDING); 54 trace_seq_printf(s, "\ttime_extend : type == %d\n", 55 RINGBUF_TYPE_TIME_EXTEND); 56 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 57 RINGBUF_TYPE_TIME_STAMP); 58 trace_seq_printf(s, "\tdata max type_len == %d\n", 59 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 60 61 return !trace_seq_has_overflowed(s); 62 } 63 64 /* 65 * The ring buffer is made up of a list of pages. A separate list of pages is 66 * allocated for each CPU. A writer may only write to a buffer that is 67 * associated with the CPU it is currently executing on. A reader may read 68 * from any per cpu buffer. 69 * 70 * The reader is special. For each per cpu buffer, the reader has its own 71 * reader page. When a reader has read the entire reader page, this reader 72 * page is swapped with another page in the ring buffer. 73 * 74 * Now, as long as the writer is off the reader page, the reader can do what 75 * ever it wants with that page. The writer will never write to that page 76 * again (as long as it is out of the ring buffer). 77 * 78 * Here's some silly ASCII art. 79 * 80 * +------+ 81 * |reader| RING BUFFER 82 * |page | 83 * +------+ +---+ +---+ +---+ 84 * | |-->| |-->| | 85 * +---+ +---+ +---+ 86 * ^ | 87 * | | 88 * +---------------+ 89 * 90 * 91 * +------+ 92 * |reader| RING BUFFER 93 * |page |------------------v 94 * +------+ +---+ +---+ +---+ 95 * | |-->| |-->| | 96 * +---+ +---+ +---+ 97 * ^ | 98 * | | 99 * +---------------+ 100 * 101 * 102 * +------+ 103 * |reader| RING BUFFER 104 * |page |------------------v 105 * +------+ +---+ +---+ +---+ 106 * ^ | |-->| |-->| | 107 * | +---+ +---+ +---+ 108 * | | 109 * | | 110 * +------------------------------+ 111 * 112 * 113 * +------+ 114 * |buffer| RING BUFFER 115 * |page |------------------v 116 * +------+ +---+ +---+ +---+ 117 * ^ | | | |-->| | 118 * | New +---+ +---+ +---+ 119 * | Reader------^ | 120 * | page | 121 * +------------------------------+ 122 * 123 * 124 * After we make this swap, the reader can hand this page off to the splice 125 * code and be done with it. It can even allocate a new page if it needs to 126 * and swap that into the ring buffer. 127 * 128 * We will be using cmpxchg soon to make all this lockless. 129 * 130 */ 131 132 /* Used for individual buffers (after the counter) */ 133 #define RB_BUFFER_OFF (1 << 20) 134 135 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 136 137 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 138 #define RB_ALIGNMENT 4U 139 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 140 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 141 142 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 143 # define RB_FORCE_8BYTE_ALIGNMENT 0 144 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 145 #else 146 # define RB_FORCE_8BYTE_ALIGNMENT 1 147 # define RB_ARCH_ALIGNMENT 8U 148 #endif 149 150 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 151 152 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 153 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 154 155 enum { 156 RB_LEN_TIME_EXTEND = 8, 157 RB_LEN_TIME_STAMP = 8, 158 }; 159 160 #define skip_time_extend(event) \ 161 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 162 163 #define extended_time(event) \ 164 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 165 166 static inline bool rb_null_event(struct ring_buffer_event *event) 167 { 168 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 169 } 170 171 static void rb_event_set_padding(struct ring_buffer_event *event) 172 { 173 /* padding has a NULL time_delta */ 174 event->type_len = RINGBUF_TYPE_PADDING; 175 event->time_delta = 0; 176 } 177 178 static unsigned 179 rb_event_data_length(struct ring_buffer_event *event) 180 { 181 unsigned length; 182 183 if (event->type_len) 184 length = event->type_len * RB_ALIGNMENT; 185 else 186 length = event->array[0]; 187 return length + RB_EVNT_HDR_SIZE; 188 } 189 190 /* 191 * Return the length of the given event. Will return 192 * the length of the time extend if the event is a 193 * time extend. 194 */ 195 static inline unsigned 196 rb_event_length(struct ring_buffer_event *event) 197 { 198 switch (event->type_len) { 199 case RINGBUF_TYPE_PADDING: 200 if (rb_null_event(event)) 201 /* undefined */ 202 return -1; 203 return event->array[0] + RB_EVNT_HDR_SIZE; 204 205 case RINGBUF_TYPE_TIME_EXTEND: 206 return RB_LEN_TIME_EXTEND; 207 208 case RINGBUF_TYPE_TIME_STAMP: 209 return RB_LEN_TIME_STAMP; 210 211 case RINGBUF_TYPE_DATA: 212 return rb_event_data_length(event); 213 default: 214 WARN_ON_ONCE(1); 215 } 216 /* not hit */ 217 return 0; 218 } 219 220 /* 221 * Return total length of time extend and data, 222 * or just the event length for all other events. 223 */ 224 static inline unsigned 225 rb_event_ts_length(struct ring_buffer_event *event) 226 { 227 unsigned len = 0; 228 229 if (extended_time(event)) { 230 /* time extends include the data event after it */ 231 len = RB_LEN_TIME_EXTEND; 232 event = skip_time_extend(event); 233 } 234 return len + rb_event_length(event); 235 } 236 237 /** 238 * ring_buffer_event_length - return the length of the event 239 * @event: the event to get the length of 240 * 241 * Returns the size of the data load of a data event. 242 * If the event is something other than a data event, it 243 * returns the size of the event itself. With the exception 244 * of a TIME EXTEND, where it still returns the size of the 245 * data load of the data event after it. 246 */ 247 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 248 { 249 unsigned length; 250 251 if (extended_time(event)) 252 event = skip_time_extend(event); 253 254 length = rb_event_length(event); 255 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 256 return length; 257 length -= RB_EVNT_HDR_SIZE; 258 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 259 length -= sizeof(event->array[0]); 260 return length; 261 } 262 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 263 264 /* inline for ring buffer fast paths */ 265 static __always_inline void * 266 rb_event_data(struct ring_buffer_event *event) 267 { 268 if (extended_time(event)) 269 event = skip_time_extend(event); 270 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 271 /* If length is in len field, then array[0] has the data */ 272 if (event->type_len) 273 return (void *)&event->array[0]; 274 /* Otherwise length is in array[0] and array[1] has the data */ 275 return (void *)&event->array[1]; 276 } 277 278 /** 279 * ring_buffer_event_data - return the data of the event 280 * @event: the event to get the data from 281 */ 282 void *ring_buffer_event_data(struct ring_buffer_event *event) 283 { 284 return rb_event_data(event); 285 } 286 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 287 288 #define for_each_buffer_cpu(buffer, cpu) \ 289 for_each_cpu(cpu, buffer->cpumask) 290 291 #define for_each_online_buffer_cpu(buffer, cpu) \ 292 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 293 294 #define TS_SHIFT 27 295 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 296 #define TS_DELTA_TEST (~TS_MASK) 297 298 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 299 { 300 u64 ts; 301 302 ts = event->array[0]; 303 ts <<= TS_SHIFT; 304 ts += event->time_delta; 305 306 return ts; 307 } 308 309 /* Flag when events were overwritten */ 310 #define RB_MISSED_EVENTS (1 << 31) 311 /* Missed count stored at end */ 312 #define RB_MISSED_STORED (1 << 30) 313 314 struct buffer_data_page { 315 u64 time_stamp; /* page time stamp */ 316 local_t commit; /* write committed index */ 317 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 318 }; 319 320 /* 321 * Note, the buffer_page list must be first. The buffer pages 322 * are allocated in cache lines, which means that each buffer 323 * page will be at the beginning of a cache line, and thus 324 * the least significant bits will be zero. We use this to 325 * add flags in the list struct pointers, to make the ring buffer 326 * lockless. 327 */ 328 struct buffer_page { 329 struct list_head list; /* list of buffer pages */ 330 local_t write; /* index for next write */ 331 unsigned read; /* index for next read */ 332 local_t entries; /* entries on this page */ 333 unsigned long real_end; /* real end of data */ 334 struct buffer_data_page *page; /* Actual data page */ 335 }; 336 337 /* 338 * The buffer page counters, write and entries, must be reset 339 * atomically when crossing page boundaries. To synchronize this 340 * update, two counters are inserted into the number. One is 341 * the actual counter for the write position or count on the page. 342 * 343 * The other is a counter of updaters. Before an update happens 344 * the update partition of the counter is incremented. This will 345 * allow the updater to update the counter atomically. 346 * 347 * The counter is 20 bits, and the state data is 12. 348 */ 349 #define RB_WRITE_MASK 0xfffff 350 #define RB_WRITE_INTCNT (1 << 20) 351 352 static void rb_init_page(struct buffer_data_page *bpage) 353 { 354 local_set(&bpage->commit, 0); 355 } 356 357 static void free_buffer_page(struct buffer_page *bpage) 358 { 359 free_page((unsigned long)bpage->page); 360 kfree(bpage); 361 } 362 363 /* 364 * We need to fit the time_stamp delta into 27 bits. 365 */ 366 static inline bool test_time_stamp(u64 delta) 367 { 368 return !!(delta & TS_DELTA_TEST); 369 } 370 371 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 372 373 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 374 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 375 376 int ring_buffer_print_page_header(struct trace_seq *s) 377 { 378 struct buffer_data_page field; 379 380 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 381 "offset:0;\tsize:%u;\tsigned:%u;\n", 382 (unsigned int)sizeof(field.time_stamp), 383 (unsigned int)is_signed_type(u64)); 384 385 trace_seq_printf(s, "\tfield: local_t commit;\t" 386 "offset:%u;\tsize:%u;\tsigned:%u;\n", 387 (unsigned int)offsetof(typeof(field), commit), 388 (unsigned int)sizeof(field.commit), 389 (unsigned int)is_signed_type(long)); 390 391 trace_seq_printf(s, "\tfield: int overwrite;\t" 392 "offset:%u;\tsize:%u;\tsigned:%u;\n", 393 (unsigned int)offsetof(typeof(field), commit), 394 1, 395 (unsigned int)is_signed_type(long)); 396 397 trace_seq_printf(s, "\tfield: char data;\t" 398 "offset:%u;\tsize:%u;\tsigned:%u;\n", 399 (unsigned int)offsetof(typeof(field), data), 400 (unsigned int)BUF_PAGE_SIZE, 401 (unsigned int)is_signed_type(char)); 402 403 return !trace_seq_has_overflowed(s); 404 } 405 406 struct rb_irq_work { 407 struct irq_work work; 408 wait_queue_head_t waiters; 409 wait_queue_head_t full_waiters; 410 long wait_index; 411 bool waiters_pending; 412 bool full_waiters_pending; 413 bool wakeup_full; 414 }; 415 416 /* 417 * Structure to hold event state and handle nested events. 418 */ 419 struct rb_event_info { 420 u64 ts; 421 u64 delta; 422 u64 before; 423 u64 after; 424 unsigned long length; 425 struct buffer_page *tail_page; 426 int add_timestamp; 427 }; 428 429 /* 430 * Used for the add_timestamp 431 * NONE 432 * EXTEND - wants a time extend 433 * ABSOLUTE - the buffer requests all events to have absolute time stamps 434 * FORCE - force a full time stamp. 435 */ 436 enum { 437 RB_ADD_STAMP_NONE = 0, 438 RB_ADD_STAMP_EXTEND = BIT(1), 439 RB_ADD_STAMP_ABSOLUTE = BIT(2), 440 RB_ADD_STAMP_FORCE = BIT(3) 441 }; 442 /* 443 * Used for which event context the event is in. 444 * TRANSITION = 0 445 * NMI = 1 446 * IRQ = 2 447 * SOFTIRQ = 3 448 * NORMAL = 4 449 * 450 * See trace_recursive_lock() comment below for more details. 451 */ 452 enum { 453 RB_CTX_TRANSITION, 454 RB_CTX_NMI, 455 RB_CTX_IRQ, 456 RB_CTX_SOFTIRQ, 457 RB_CTX_NORMAL, 458 RB_CTX_MAX 459 }; 460 461 #if BITS_PER_LONG == 32 462 #define RB_TIME_32 463 #endif 464 465 /* To test on 64 bit machines */ 466 //#define RB_TIME_32 467 468 #ifdef RB_TIME_32 469 470 struct rb_time_struct { 471 local_t cnt; 472 local_t top; 473 local_t bottom; 474 local_t msb; 475 }; 476 #else 477 #include <asm/local64.h> 478 struct rb_time_struct { 479 local64_t time; 480 }; 481 #endif 482 typedef struct rb_time_struct rb_time_t; 483 484 #define MAX_NEST 5 485 486 /* 487 * head_page == tail_page && head == tail then buffer is empty. 488 */ 489 struct ring_buffer_per_cpu { 490 int cpu; 491 atomic_t record_disabled; 492 atomic_t resize_disabled; 493 struct trace_buffer *buffer; 494 raw_spinlock_t reader_lock; /* serialize readers */ 495 arch_spinlock_t lock; 496 struct lock_class_key lock_key; 497 struct buffer_data_page *free_page; 498 unsigned long nr_pages; 499 unsigned int current_context; 500 struct list_head *pages; 501 struct buffer_page *head_page; /* read from head */ 502 struct buffer_page *tail_page; /* write to tail */ 503 struct buffer_page *commit_page; /* committed pages */ 504 struct buffer_page *reader_page; 505 unsigned long lost_events; 506 unsigned long last_overrun; 507 unsigned long nest; 508 local_t entries_bytes; 509 local_t entries; 510 local_t overrun; 511 local_t commit_overrun; 512 local_t dropped_events; 513 local_t committing; 514 local_t commits; 515 local_t pages_touched; 516 local_t pages_lost; 517 local_t pages_read; 518 long last_pages_touch; 519 size_t shortest_full; 520 unsigned long read; 521 unsigned long read_bytes; 522 rb_time_t write_stamp; 523 rb_time_t before_stamp; 524 u64 event_stamp[MAX_NEST]; 525 u64 read_stamp; 526 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 527 long nr_pages_to_update; 528 struct list_head new_pages; /* new pages to add */ 529 struct work_struct update_pages_work; 530 struct completion update_done; 531 532 struct rb_irq_work irq_work; 533 }; 534 535 struct trace_buffer { 536 unsigned flags; 537 int cpus; 538 atomic_t record_disabled; 539 cpumask_var_t cpumask; 540 541 struct lock_class_key *reader_lock_key; 542 543 struct mutex mutex; 544 545 struct ring_buffer_per_cpu **buffers; 546 547 struct hlist_node node; 548 u64 (*clock)(void); 549 550 struct rb_irq_work irq_work; 551 bool time_stamp_abs; 552 }; 553 554 struct ring_buffer_iter { 555 struct ring_buffer_per_cpu *cpu_buffer; 556 unsigned long head; 557 unsigned long next_event; 558 struct buffer_page *head_page; 559 struct buffer_page *cache_reader_page; 560 unsigned long cache_read; 561 u64 read_stamp; 562 u64 page_stamp; 563 struct ring_buffer_event *event; 564 int missed_events; 565 }; 566 567 #ifdef RB_TIME_32 568 569 /* 570 * On 32 bit machines, local64_t is very expensive. As the ring 571 * buffer doesn't need all the features of a true 64 bit atomic, 572 * on 32 bit, it uses these functions (64 still uses local64_t). 573 * 574 * For the ring buffer, 64 bit required operations for the time is 575 * the following: 576 * 577 * - Reads may fail if it interrupted a modification of the time stamp. 578 * It will succeed if it did not interrupt another write even if 579 * the read itself is interrupted by a write. 580 * It returns whether it was successful or not. 581 * 582 * - Writes always succeed and will overwrite other writes and writes 583 * that were done by events interrupting the current write. 584 * 585 * - A write followed by a read of the same time stamp will always succeed, 586 * but may not contain the same value. 587 * 588 * - A cmpxchg will fail if it interrupted another write or cmpxchg. 589 * Other than that, it acts like a normal cmpxchg. 590 * 591 * The 60 bit time stamp is broken up by 30 bits in a top and bottom half 592 * (bottom being the least significant 30 bits of the 60 bit time stamp). 593 * 594 * The two most significant bits of each half holds a 2 bit counter (0-3). 595 * Each update will increment this counter by one. 596 * When reading the top and bottom, if the two counter bits match then the 597 * top and bottom together make a valid 60 bit number. 598 */ 599 #define RB_TIME_SHIFT 30 600 #define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1) 601 #define RB_TIME_MSB_SHIFT 60 602 603 static inline int rb_time_cnt(unsigned long val) 604 { 605 return (val >> RB_TIME_SHIFT) & 3; 606 } 607 608 static inline u64 rb_time_val(unsigned long top, unsigned long bottom) 609 { 610 u64 val; 611 612 val = top & RB_TIME_VAL_MASK; 613 val <<= RB_TIME_SHIFT; 614 val |= bottom & RB_TIME_VAL_MASK; 615 616 return val; 617 } 618 619 static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt) 620 { 621 unsigned long top, bottom, msb; 622 unsigned long c; 623 624 /* 625 * If the read is interrupted by a write, then the cnt will 626 * be different. Loop until both top and bottom have been read 627 * without interruption. 628 */ 629 do { 630 c = local_read(&t->cnt); 631 top = local_read(&t->top); 632 bottom = local_read(&t->bottom); 633 msb = local_read(&t->msb); 634 } while (c != local_read(&t->cnt)); 635 636 *cnt = rb_time_cnt(top); 637 638 /* If top and bottom counts don't match, this interrupted a write */ 639 if (*cnt != rb_time_cnt(bottom)) 640 return false; 641 642 /* The shift to msb will lose its cnt bits */ 643 *ret = rb_time_val(top, bottom) | ((u64)msb << RB_TIME_MSB_SHIFT); 644 return true; 645 } 646 647 static bool rb_time_read(rb_time_t *t, u64 *ret) 648 { 649 unsigned long cnt; 650 651 return __rb_time_read(t, ret, &cnt); 652 } 653 654 static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt) 655 { 656 return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT); 657 } 658 659 static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom, 660 unsigned long *msb) 661 { 662 *top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK); 663 *bottom = (unsigned long)(val & RB_TIME_VAL_MASK); 664 *msb = (unsigned long)(val >> RB_TIME_MSB_SHIFT); 665 } 666 667 static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt) 668 { 669 val = rb_time_val_cnt(val, cnt); 670 local_set(t, val); 671 } 672 673 static void rb_time_set(rb_time_t *t, u64 val) 674 { 675 unsigned long cnt, top, bottom, msb; 676 677 rb_time_split(val, &top, &bottom, &msb); 678 679 /* Writes always succeed with a valid number even if it gets interrupted. */ 680 do { 681 cnt = local_inc_return(&t->cnt); 682 rb_time_val_set(&t->top, top, cnt); 683 rb_time_val_set(&t->bottom, bottom, cnt); 684 rb_time_val_set(&t->msb, val >> RB_TIME_MSB_SHIFT, cnt); 685 } while (cnt != local_read(&t->cnt)); 686 } 687 688 static inline bool 689 rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set) 690 { 691 unsigned long ret; 692 693 ret = local_cmpxchg(l, expect, set); 694 return ret == expect; 695 } 696 697 static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) 698 { 699 unsigned long cnt, top, bottom, msb; 700 unsigned long cnt2, top2, bottom2, msb2; 701 u64 val; 702 703 /* The cmpxchg always fails if it interrupted an update */ 704 if (!__rb_time_read(t, &val, &cnt2)) 705 return false; 706 707 if (val != expect) 708 return false; 709 710 cnt = local_read(&t->cnt); 711 if ((cnt & 3) != cnt2) 712 return false; 713 714 cnt2 = cnt + 1; 715 716 rb_time_split(val, &top, &bottom, &msb); 717 top = rb_time_val_cnt(top, cnt); 718 bottom = rb_time_val_cnt(bottom, cnt); 719 720 rb_time_split(set, &top2, &bottom2, &msb2); 721 top2 = rb_time_val_cnt(top2, cnt2); 722 bottom2 = rb_time_val_cnt(bottom2, cnt2); 723 724 if (!rb_time_read_cmpxchg(&t->cnt, cnt, cnt2)) 725 return false; 726 if (!rb_time_read_cmpxchg(&t->msb, msb, msb2)) 727 return false; 728 if (!rb_time_read_cmpxchg(&t->top, top, top2)) 729 return false; 730 if (!rb_time_read_cmpxchg(&t->bottom, bottom, bottom2)) 731 return false; 732 return true; 733 } 734 735 #else /* 64 bits */ 736 737 /* local64_t always succeeds */ 738 739 static inline bool rb_time_read(rb_time_t *t, u64 *ret) 740 { 741 *ret = local64_read(&t->time); 742 return true; 743 } 744 static void rb_time_set(rb_time_t *t, u64 val) 745 { 746 local64_set(&t->time, val); 747 } 748 749 static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) 750 { 751 u64 val; 752 val = local64_cmpxchg(&t->time, expect, set); 753 return val == expect; 754 } 755 #endif 756 757 /* 758 * Enable this to make sure that the event passed to 759 * ring_buffer_event_time_stamp() is not committed and also 760 * is on the buffer that it passed in. 761 */ 762 //#define RB_VERIFY_EVENT 763 #ifdef RB_VERIFY_EVENT 764 static struct list_head *rb_list_head(struct list_head *list); 765 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 766 void *event) 767 { 768 struct buffer_page *page = cpu_buffer->commit_page; 769 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 770 struct list_head *next; 771 long commit, write; 772 unsigned long addr = (unsigned long)event; 773 bool done = false; 774 int stop = 0; 775 776 /* Make sure the event exists and is not committed yet */ 777 do { 778 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 779 done = true; 780 commit = local_read(&page->page->commit); 781 write = local_read(&page->write); 782 if (addr >= (unsigned long)&page->page->data[commit] && 783 addr < (unsigned long)&page->page->data[write]) 784 return; 785 786 next = rb_list_head(page->list.next); 787 page = list_entry(next, struct buffer_page, list); 788 } while (!done); 789 WARN_ON_ONCE(1); 790 } 791 #else 792 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 793 void *event) 794 { 795 } 796 #endif 797 798 /* 799 * The absolute time stamp drops the 5 MSBs and some clocks may 800 * require them. The rb_fix_abs_ts() will take a previous full 801 * time stamp, and add the 5 MSB of that time stamp on to the 802 * saved absolute time stamp. Then they are compared in case of 803 * the unlikely event that the latest time stamp incremented 804 * the 5 MSB. 805 */ 806 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 807 { 808 if (save_ts & TS_MSB) { 809 abs |= save_ts & TS_MSB; 810 /* Check for overflow */ 811 if (unlikely(abs < save_ts)) 812 abs += 1ULL << 59; 813 } 814 return abs; 815 } 816 817 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 818 819 /** 820 * ring_buffer_event_time_stamp - return the event's current time stamp 821 * @buffer: The buffer that the event is on 822 * @event: the event to get the time stamp of 823 * 824 * Note, this must be called after @event is reserved, and before it is 825 * committed to the ring buffer. And must be called from the same 826 * context where the event was reserved (normal, softirq, irq, etc). 827 * 828 * Returns the time stamp associated with the current event. 829 * If the event has an extended time stamp, then that is used as 830 * the time stamp to return. 831 * In the highly unlikely case that the event was nested more than 832 * the max nesting, then the write_stamp of the buffer is returned, 833 * otherwise current time is returned, but that really neither of 834 * the last two cases should ever happen. 835 */ 836 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 837 struct ring_buffer_event *event) 838 { 839 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 840 unsigned int nest; 841 u64 ts; 842 843 /* If the event includes an absolute time, then just use that */ 844 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 845 ts = rb_event_time_stamp(event); 846 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 847 } 848 849 nest = local_read(&cpu_buffer->committing); 850 verify_event(cpu_buffer, event); 851 if (WARN_ON_ONCE(!nest)) 852 goto fail; 853 854 /* Read the current saved nesting level time stamp */ 855 if (likely(--nest < MAX_NEST)) 856 return cpu_buffer->event_stamp[nest]; 857 858 /* Shouldn't happen, warn if it does */ 859 WARN_ONCE(1, "nest (%d) greater than max", nest); 860 861 fail: 862 /* Can only fail on 32 bit */ 863 if (!rb_time_read(&cpu_buffer->write_stamp, &ts)) 864 /* Screw it, just read the current time */ 865 ts = rb_time_stamp(cpu_buffer->buffer); 866 867 return ts; 868 } 869 870 /** 871 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer 872 * @buffer: The ring_buffer to get the number of pages from 873 * @cpu: The cpu of the ring_buffer to get the number of pages from 874 * 875 * Returns the number of pages used by a per_cpu buffer of the ring buffer. 876 */ 877 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu) 878 { 879 return buffer->buffers[cpu]->nr_pages; 880 } 881 882 /** 883 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 884 * @buffer: The ring_buffer to get the number of pages from 885 * @cpu: The cpu of the ring_buffer to get the number of pages from 886 * 887 * Returns the number of pages that have content in the ring buffer. 888 */ 889 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 890 { 891 size_t read; 892 size_t lost; 893 size_t cnt; 894 895 read = local_read(&buffer->buffers[cpu]->pages_read); 896 lost = local_read(&buffer->buffers[cpu]->pages_lost); 897 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 898 899 if (WARN_ON_ONCE(cnt < lost)) 900 return 0; 901 902 cnt -= lost; 903 904 /* The reader can read an empty page, but not more than that */ 905 if (cnt < read) { 906 WARN_ON_ONCE(read > cnt + 1); 907 return 0; 908 } 909 910 return cnt - read; 911 } 912 913 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 914 { 915 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 916 size_t nr_pages; 917 size_t dirty; 918 919 nr_pages = cpu_buffer->nr_pages; 920 if (!nr_pages || !full) 921 return true; 922 923 dirty = ring_buffer_nr_dirty_pages(buffer, cpu); 924 925 return (dirty * 100) > (full * nr_pages); 926 } 927 928 /* 929 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 930 * 931 * Schedules a delayed work to wake up any task that is blocked on the 932 * ring buffer waiters queue. 933 */ 934 static void rb_wake_up_waiters(struct irq_work *work) 935 { 936 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 937 938 wake_up_all(&rbwork->waiters); 939 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 940 rbwork->wakeup_full = false; 941 rbwork->full_waiters_pending = false; 942 wake_up_all(&rbwork->full_waiters); 943 } 944 } 945 946 /** 947 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 948 * @buffer: The ring buffer to wake waiters on 949 * 950 * In the case of a file that represents a ring buffer is closing, 951 * it is prudent to wake up any waiters that are on this. 952 */ 953 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 954 { 955 struct ring_buffer_per_cpu *cpu_buffer; 956 struct rb_irq_work *rbwork; 957 958 if (!buffer) 959 return; 960 961 if (cpu == RING_BUFFER_ALL_CPUS) { 962 963 /* Wake up individual ones too. One level recursion */ 964 for_each_buffer_cpu(buffer, cpu) 965 ring_buffer_wake_waiters(buffer, cpu); 966 967 rbwork = &buffer->irq_work; 968 } else { 969 if (WARN_ON_ONCE(!buffer->buffers)) 970 return; 971 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 972 return; 973 974 cpu_buffer = buffer->buffers[cpu]; 975 /* The CPU buffer may not have been initialized yet */ 976 if (!cpu_buffer) 977 return; 978 rbwork = &cpu_buffer->irq_work; 979 } 980 981 rbwork->wait_index++; 982 /* make sure the waiters see the new index */ 983 smp_wmb(); 984 985 rb_wake_up_waiters(&rbwork->work); 986 } 987 988 /** 989 * ring_buffer_wait - wait for input to the ring buffer 990 * @buffer: buffer to wait on 991 * @cpu: the cpu buffer to wait on 992 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 993 * 994 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 995 * as data is added to any of the @buffer's cpu buffers. Otherwise 996 * it will wait for data to be added to a specific cpu buffer. 997 */ 998 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full) 999 { 1000 struct ring_buffer_per_cpu *cpu_buffer; 1001 DEFINE_WAIT(wait); 1002 struct rb_irq_work *work; 1003 long wait_index; 1004 int ret = 0; 1005 1006 /* 1007 * Depending on what the caller is waiting for, either any 1008 * data in any cpu buffer, or a specific buffer, put the 1009 * caller on the appropriate wait queue. 1010 */ 1011 if (cpu == RING_BUFFER_ALL_CPUS) { 1012 work = &buffer->irq_work; 1013 /* Full only makes sense on per cpu reads */ 1014 full = 0; 1015 } else { 1016 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1017 return -ENODEV; 1018 cpu_buffer = buffer->buffers[cpu]; 1019 work = &cpu_buffer->irq_work; 1020 } 1021 1022 wait_index = READ_ONCE(work->wait_index); 1023 1024 while (true) { 1025 if (full) 1026 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE); 1027 else 1028 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE); 1029 1030 /* 1031 * The events can happen in critical sections where 1032 * checking a work queue can cause deadlocks. 1033 * After adding a task to the queue, this flag is set 1034 * only to notify events to try to wake up the queue 1035 * using irq_work. 1036 * 1037 * We don't clear it even if the buffer is no longer 1038 * empty. The flag only causes the next event to run 1039 * irq_work to do the work queue wake up. The worse 1040 * that can happen if we race with !trace_empty() is that 1041 * an event will cause an irq_work to try to wake up 1042 * an empty queue. 1043 * 1044 * There's no reason to protect this flag either, as 1045 * the work queue and irq_work logic will do the necessary 1046 * synchronization for the wake ups. The only thing 1047 * that is necessary is that the wake up happens after 1048 * a task has been queued. It's OK for spurious wake ups. 1049 */ 1050 if (full) 1051 work->full_waiters_pending = true; 1052 else 1053 work->waiters_pending = true; 1054 1055 if (signal_pending(current)) { 1056 ret = -EINTR; 1057 break; 1058 } 1059 1060 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) 1061 break; 1062 1063 if (cpu != RING_BUFFER_ALL_CPUS && 1064 !ring_buffer_empty_cpu(buffer, cpu)) { 1065 unsigned long flags; 1066 bool pagebusy; 1067 bool done; 1068 1069 if (!full) 1070 break; 1071 1072 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1073 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 1074 done = !pagebusy && full_hit(buffer, cpu, full); 1075 1076 if (!cpu_buffer->shortest_full || 1077 cpu_buffer->shortest_full > full) 1078 cpu_buffer->shortest_full = full; 1079 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1080 if (done) 1081 break; 1082 } 1083 1084 schedule(); 1085 1086 /* Make sure to see the new wait index */ 1087 smp_rmb(); 1088 if (wait_index != work->wait_index) 1089 break; 1090 } 1091 1092 if (full) 1093 finish_wait(&work->full_waiters, &wait); 1094 else 1095 finish_wait(&work->waiters, &wait); 1096 1097 return ret; 1098 } 1099 1100 /** 1101 * ring_buffer_poll_wait - poll on buffer input 1102 * @buffer: buffer to wait on 1103 * @cpu: the cpu buffer to wait on 1104 * @filp: the file descriptor 1105 * @poll_table: The poll descriptor 1106 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1107 * 1108 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1109 * as data is added to any of the @buffer's cpu buffers. Otherwise 1110 * it will wait for data to be added to a specific cpu buffer. 1111 * 1112 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1113 * zero otherwise. 1114 */ 1115 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1116 struct file *filp, poll_table *poll_table, int full) 1117 { 1118 struct ring_buffer_per_cpu *cpu_buffer; 1119 struct rb_irq_work *work; 1120 1121 if (cpu == RING_BUFFER_ALL_CPUS) { 1122 work = &buffer->irq_work; 1123 full = 0; 1124 } else { 1125 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1126 return -EINVAL; 1127 1128 cpu_buffer = buffer->buffers[cpu]; 1129 work = &cpu_buffer->irq_work; 1130 } 1131 1132 if (full) { 1133 poll_wait(filp, &work->full_waiters, poll_table); 1134 work->full_waiters_pending = true; 1135 } else { 1136 poll_wait(filp, &work->waiters, poll_table); 1137 work->waiters_pending = true; 1138 } 1139 1140 /* 1141 * There's a tight race between setting the waiters_pending and 1142 * checking if the ring buffer is empty. Once the waiters_pending bit 1143 * is set, the next event will wake the task up, but we can get stuck 1144 * if there's only a single event in. 1145 * 1146 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1147 * but adding a memory barrier to all events will cause too much of a 1148 * performance hit in the fast path. We only need a memory barrier when 1149 * the buffer goes from empty to having content. But as this race is 1150 * extremely small, and it's not a problem if another event comes in, we 1151 * will fix it later. 1152 */ 1153 smp_mb(); 1154 1155 if (full) 1156 return full_hit(buffer, cpu, full) ? EPOLLIN | EPOLLRDNORM : 0; 1157 1158 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1159 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1160 return EPOLLIN | EPOLLRDNORM; 1161 return 0; 1162 } 1163 1164 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1165 #define RB_WARN_ON(b, cond) \ 1166 ({ \ 1167 int _____ret = unlikely(cond); \ 1168 if (_____ret) { \ 1169 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1170 struct ring_buffer_per_cpu *__b = \ 1171 (void *)b; \ 1172 atomic_inc(&__b->buffer->record_disabled); \ 1173 } else \ 1174 atomic_inc(&b->record_disabled); \ 1175 WARN_ON(1); \ 1176 } \ 1177 _____ret; \ 1178 }) 1179 1180 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1181 #define DEBUG_SHIFT 0 1182 1183 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1184 { 1185 u64 ts; 1186 1187 /* Skip retpolines :-( */ 1188 if (IS_ENABLED(CONFIG_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1189 ts = trace_clock_local(); 1190 else 1191 ts = buffer->clock(); 1192 1193 /* shift to debug/test normalization and TIME_EXTENTS */ 1194 return ts << DEBUG_SHIFT; 1195 } 1196 1197 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1198 { 1199 u64 time; 1200 1201 preempt_disable_notrace(); 1202 time = rb_time_stamp(buffer); 1203 preempt_enable_notrace(); 1204 1205 return time; 1206 } 1207 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1208 1209 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1210 int cpu, u64 *ts) 1211 { 1212 /* Just stupid testing the normalize function and deltas */ 1213 *ts >>= DEBUG_SHIFT; 1214 } 1215 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1216 1217 /* 1218 * Making the ring buffer lockless makes things tricky. 1219 * Although writes only happen on the CPU that they are on, 1220 * and they only need to worry about interrupts. Reads can 1221 * happen on any CPU. 1222 * 1223 * The reader page is always off the ring buffer, but when the 1224 * reader finishes with a page, it needs to swap its page with 1225 * a new one from the buffer. The reader needs to take from 1226 * the head (writes go to the tail). But if a writer is in overwrite 1227 * mode and wraps, it must push the head page forward. 1228 * 1229 * Here lies the problem. 1230 * 1231 * The reader must be careful to replace only the head page, and 1232 * not another one. As described at the top of the file in the 1233 * ASCII art, the reader sets its old page to point to the next 1234 * page after head. It then sets the page after head to point to 1235 * the old reader page. But if the writer moves the head page 1236 * during this operation, the reader could end up with the tail. 1237 * 1238 * We use cmpxchg to help prevent this race. We also do something 1239 * special with the page before head. We set the LSB to 1. 1240 * 1241 * When the writer must push the page forward, it will clear the 1242 * bit that points to the head page, move the head, and then set 1243 * the bit that points to the new head page. 1244 * 1245 * We also don't want an interrupt coming in and moving the head 1246 * page on another writer. Thus we use the second LSB to catch 1247 * that too. Thus: 1248 * 1249 * head->list->prev->next bit 1 bit 0 1250 * ------- ------- 1251 * Normal page 0 0 1252 * Points to head page 0 1 1253 * New head page 1 0 1254 * 1255 * Note we can not trust the prev pointer of the head page, because: 1256 * 1257 * +----+ +-----+ +-----+ 1258 * | |------>| T |---X--->| N | 1259 * | |<------| | | | 1260 * +----+ +-----+ +-----+ 1261 * ^ ^ | 1262 * | +-----+ | | 1263 * +----------| R |----------+ | 1264 * | |<-----------+ 1265 * +-----+ 1266 * 1267 * Key: ---X--> HEAD flag set in pointer 1268 * T Tail page 1269 * R Reader page 1270 * N Next page 1271 * 1272 * (see __rb_reserve_next() to see where this happens) 1273 * 1274 * What the above shows is that the reader just swapped out 1275 * the reader page with a page in the buffer, but before it 1276 * could make the new header point back to the new page added 1277 * it was preempted by a writer. The writer moved forward onto 1278 * the new page added by the reader and is about to move forward 1279 * again. 1280 * 1281 * You can see, it is legitimate for the previous pointer of 1282 * the head (or any page) not to point back to itself. But only 1283 * temporarily. 1284 */ 1285 1286 #define RB_PAGE_NORMAL 0UL 1287 #define RB_PAGE_HEAD 1UL 1288 #define RB_PAGE_UPDATE 2UL 1289 1290 1291 #define RB_FLAG_MASK 3UL 1292 1293 /* PAGE_MOVED is not part of the mask */ 1294 #define RB_PAGE_MOVED 4UL 1295 1296 /* 1297 * rb_list_head - remove any bit 1298 */ 1299 static struct list_head *rb_list_head(struct list_head *list) 1300 { 1301 unsigned long val = (unsigned long)list; 1302 1303 return (struct list_head *)(val & ~RB_FLAG_MASK); 1304 } 1305 1306 /* 1307 * rb_is_head_page - test if the given page is the head page 1308 * 1309 * Because the reader may move the head_page pointer, we can 1310 * not trust what the head page is (it may be pointing to 1311 * the reader page). But if the next page is a header page, 1312 * its flags will be non zero. 1313 */ 1314 static inline int 1315 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1316 { 1317 unsigned long val; 1318 1319 val = (unsigned long)list->next; 1320 1321 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1322 return RB_PAGE_MOVED; 1323 1324 return val & RB_FLAG_MASK; 1325 } 1326 1327 /* 1328 * rb_is_reader_page 1329 * 1330 * The unique thing about the reader page, is that, if the 1331 * writer is ever on it, the previous pointer never points 1332 * back to the reader page. 1333 */ 1334 static bool rb_is_reader_page(struct buffer_page *page) 1335 { 1336 struct list_head *list = page->list.prev; 1337 1338 return rb_list_head(list->next) != &page->list; 1339 } 1340 1341 /* 1342 * rb_set_list_to_head - set a list_head to be pointing to head. 1343 */ 1344 static void rb_set_list_to_head(struct list_head *list) 1345 { 1346 unsigned long *ptr; 1347 1348 ptr = (unsigned long *)&list->next; 1349 *ptr |= RB_PAGE_HEAD; 1350 *ptr &= ~RB_PAGE_UPDATE; 1351 } 1352 1353 /* 1354 * rb_head_page_activate - sets up head page 1355 */ 1356 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1357 { 1358 struct buffer_page *head; 1359 1360 head = cpu_buffer->head_page; 1361 if (!head) 1362 return; 1363 1364 /* 1365 * Set the previous list pointer to have the HEAD flag. 1366 */ 1367 rb_set_list_to_head(head->list.prev); 1368 } 1369 1370 static void rb_list_head_clear(struct list_head *list) 1371 { 1372 unsigned long *ptr = (unsigned long *)&list->next; 1373 1374 *ptr &= ~RB_FLAG_MASK; 1375 } 1376 1377 /* 1378 * rb_head_page_deactivate - clears head page ptr (for free list) 1379 */ 1380 static void 1381 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1382 { 1383 struct list_head *hd; 1384 1385 /* Go through the whole list and clear any pointers found. */ 1386 rb_list_head_clear(cpu_buffer->pages); 1387 1388 list_for_each(hd, cpu_buffer->pages) 1389 rb_list_head_clear(hd); 1390 } 1391 1392 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1393 struct buffer_page *head, 1394 struct buffer_page *prev, 1395 int old_flag, int new_flag) 1396 { 1397 struct list_head *list; 1398 unsigned long val = (unsigned long)&head->list; 1399 unsigned long ret; 1400 1401 list = &prev->list; 1402 1403 val &= ~RB_FLAG_MASK; 1404 1405 ret = cmpxchg((unsigned long *)&list->next, 1406 val | old_flag, val | new_flag); 1407 1408 /* check if the reader took the page */ 1409 if ((ret & ~RB_FLAG_MASK) != val) 1410 return RB_PAGE_MOVED; 1411 1412 return ret & RB_FLAG_MASK; 1413 } 1414 1415 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1416 struct buffer_page *head, 1417 struct buffer_page *prev, 1418 int old_flag) 1419 { 1420 return rb_head_page_set(cpu_buffer, head, prev, 1421 old_flag, RB_PAGE_UPDATE); 1422 } 1423 1424 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1425 struct buffer_page *head, 1426 struct buffer_page *prev, 1427 int old_flag) 1428 { 1429 return rb_head_page_set(cpu_buffer, head, prev, 1430 old_flag, RB_PAGE_HEAD); 1431 } 1432 1433 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1434 struct buffer_page *head, 1435 struct buffer_page *prev, 1436 int old_flag) 1437 { 1438 return rb_head_page_set(cpu_buffer, head, prev, 1439 old_flag, RB_PAGE_NORMAL); 1440 } 1441 1442 static inline void rb_inc_page(struct buffer_page **bpage) 1443 { 1444 struct list_head *p = rb_list_head((*bpage)->list.next); 1445 1446 *bpage = list_entry(p, struct buffer_page, list); 1447 } 1448 1449 static struct buffer_page * 1450 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1451 { 1452 struct buffer_page *head; 1453 struct buffer_page *page; 1454 struct list_head *list; 1455 int i; 1456 1457 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1458 return NULL; 1459 1460 /* sanity check */ 1461 list = cpu_buffer->pages; 1462 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1463 return NULL; 1464 1465 page = head = cpu_buffer->head_page; 1466 /* 1467 * It is possible that the writer moves the header behind 1468 * where we started, and we miss in one loop. 1469 * A second loop should grab the header, but we'll do 1470 * three loops just because I'm paranoid. 1471 */ 1472 for (i = 0; i < 3; i++) { 1473 do { 1474 if (rb_is_head_page(page, page->list.prev)) { 1475 cpu_buffer->head_page = page; 1476 return page; 1477 } 1478 rb_inc_page(&page); 1479 } while (page != head); 1480 } 1481 1482 RB_WARN_ON(cpu_buffer, 1); 1483 1484 return NULL; 1485 } 1486 1487 static bool rb_head_page_replace(struct buffer_page *old, 1488 struct buffer_page *new) 1489 { 1490 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1491 unsigned long val; 1492 unsigned long ret; 1493 1494 val = *ptr & ~RB_FLAG_MASK; 1495 val |= RB_PAGE_HEAD; 1496 1497 ret = cmpxchg(ptr, val, (unsigned long)&new->list); 1498 1499 return ret == val; 1500 } 1501 1502 /* 1503 * rb_tail_page_update - move the tail page forward 1504 */ 1505 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1506 struct buffer_page *tail_page, 1507 struct buffer_page *next_page) 1508 { 1509 unsigned long old_entries; 1510 unsigned long old_write; 1511 1512 /* 1513 * The tail page now needs to be moved forward. 1514 * 1515 * We need to reset the tail page, but without messing 1516 * with possible erasing of data brought in by interrupts 1517 * that have moved the tail page and are currently on it. 1518 * 1519 * We add a counter to the write field to denote this. 1520 */ 1521 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1522 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1523 1524 local_inc(&cpu_buffer->pages_touched); 1525 /* 1526 * Just make sure we have seen our old_write and synchronize 1527 * with any interrupts that come in. 1528 */ 1529 barrier(); 1530 1531 /* 1532 * If the tail page is still the same as what we think 1533 * it is, then it is up to us to update the tail 1534 * pointer. 1535 */ 1536 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1537 /* Zero the write counter */ 1538 unsigned long val = old_write & ~RB_WRITE_MASK; 1539 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1540 1541 /* 1542 * This will only succeed if an interrupt did 1543 * not come in and change it. In which case, we 1544 * do not want to modify it. 1545 * 1546 * We add (void) to let the compiler know that we do not care 1547 * about the return value of these functions. We use the 1548 * cmpxchg to only update if an interrupt did not already 1549 * do it for us. If the cmpxchg fails, we don't care. 1550 */ 1551 (void)local_cmpxchg(&next_page->write, old_write, val); 1552 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1553 1554 /* 1555 * No need to worry about races with clearing out the commit. 1556 * it only can increment when a commit takes place. But that 1557 * only happens in the outer most nested commit. 1558 */ 1559 local_set(&next_page->page->commit, 0); 1560 1561 /* Again, either we update tail_page or an interrupt does */ 1562 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page); 1563 } 1564 } 1565 1566 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1567 struct buffer_page *bpage) 1568 { 1569 unsigned long val = (unsigned long)bpage; 1570 1571 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1572 } 1573 1574 /** 1575 * rb_check_pages - integrity check of buffer pages 1576 * @cpu_buffer: CPU buffer with pages to test 1577 * 1578 * As a safety measure we check to make sure the data pages have not 1579 * been corrupted. 1580 */ 1581 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1582 { 1583 struct list_head *head = rb_list_head(cpu_buffer->pages); 1584 struct list_head *tmp; 1585 1586 if (RB_WARN_ON(cpu_buffer, 1587 rb_list_head(rb_list_head(head->next)->prev) != head)) 1588 return; 1589 1590 if (RB_WARN_ON(cpu_buffer, 1591 rb_list_head(rb_list_head(head->prev)->next) != head)) 1592 return; 1593 1594 for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) { 1595 if (RB_WARN_ON(cpu_buffer, 1596 rb_list_head(rb_list_head(tmp->next)->prev) != tmp)) 1597 return; 1598 1599 if (RB_WARN_ON(cpu_buffer, 1600 rb_list_head(rb_list_head(tmp->prev)->next) != tmp)) 1601 return; 1602 } 1603 } 1604 1605 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1606 long nr_pages, struct list_head *pages) 1607 { 1608 struct buffer_page *bpage, *tmp; 1609 bool user_thread = current->mm != NULL; 1610 gfp_t mflags; 1611 long i; 1612 1613 /* 1614 * Check if the available memory is there first. 1615 * Note, si_mem_available() only gives us a rough estimate of available 1616 * memory. It may not be accurate. But we don't care, we just want 1617 * to prevent doing any allocation when it is obvious that it is 1618 * not going to succeed. 1619 */ 1620 i = si_mem_available(); 1621 if (i < nr_pages) 1622 return -ENOMEM; 1623 1624 /* 1625 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 1626 * gracefully without invoking oom-killer and the system is not 1627 * destabilized. 1628 */ 1629 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 1630 1631 /* 1632 * If a user thread allocates too much, and si_mem_available() 1633 * reports there's enough memory, even though there is not. 1634 * Make sure the OOM killer kills this thread. This can happen 1635 * even with RETRY_MAYFAIL because another task may be doing 1636 * an allocation after this task has taken all memory. 1637 * This is the task the OOM killer needs to take out during this 1638 * loop, even if it was triggered by an allocation somewhere else. 1639 */ 1640 if (user_thread) 1641 set_current_oom_origin(); 1642 for (i = 0; i < nr_pages; i++) { 1643 struct page *page; 1644 1645 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1646 mflags, cpu_to_node(cpu_buffer->cpu)); 1647 if (!bpage) 1648 goto free_pages; 1649 1650 rb_check_bpage(cpu_buffer, bpage); 1651 1652 list_add(&bpage->list, pages); 1653 1654 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), mflags, 0); 1655 if (!page) 1656 goto free_pages; 1657 bpage->page = page_address(page); 1658 rb_init_page(bpage->page); 1659 1660 if (user_thread && fatal_signal_pending(current)) 1661 goto free_pages; 1662 } 1663 if (user_thread) 1664 clear_current_oom_origin(); 1665 1666 return 0; 1667 1668 free_pages: 1669 list_for_each_entry_safe(bpage, tmp, pages, list) { 1670 list_del_init(&bpage->list); 1671 free_buffer_page(bpage); 1672 } 1673 if (user_thread) 1674 clear_current_oom_origin(); 1675 1676 return -ENOMEM; 1677 } 1678 1679 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1680 unsigned long nr_pages) 1681 { 1682 LIST_HEAD(pages); 1683 1684 WARN_ON(!nr_pages); 1685 1686 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 1687 return -ENOMEM; 1688 1689 /* 1690 * The ring buffer page list is a circular list that does not 1691 * start and end with a list head. All page list items point to 1692 * other pages. 1693 */ 1694 cpu_buffer->pages = pages.next; 1695 list_del(&pages); 1696 1697 cpu_buffer->nr_pages = nr_pages; 1698 1699 rb_check_pages(cpu_buffer); 1700 1701 return 0; 1702 } 1703 1704 static struct ring_buffer_per_cpu * 1705 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 1706 { 1707 struct ring_buffer_per_cpu *cpu_buffer; 1708 struct buffer_page *bpage; 1709 struct page *page; 1710 int ret; 1711 1712 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1713 GFP_KERNEL, cpu_to_node(cpu)); 1714 if (!cpu_buffer) 1715 return NULL; 1716 1717 cpu_buffer->cpu = cpu; 1718 cpu_buffer->buffer = buffer; 1719 raw_spin_lock_init(&cpu_buffer->reader_lock); 1720 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1721 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1722 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 1723 init_completion(&cpu_buffer->update_done); 1724 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 1725 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 1726 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 1727 1728 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1729 GFP_KERNEL, cpu_to_node(cpu)); 1730 if (!bpage) 1731 goto fail_free_buffer; 1732 1733 rb_check_bpage(cpu_buffer, bpage); 1734 1735 cpu_buffer->reader_page = bpage; 1736 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0); 1737 if (!page) 1738 goto fail_free_reader; 1739 bpage->page = page_address(page); 1740 rb_init_page(bpage->page); 1741 1742 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1743 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1744 1745 ret = rb_allocate_pages(cpu_buffer, nr_pages); 1746 if (ret < 0) 1747 goto fail_free_reader; 1748 1749 cpu_buffer->head_page 1750 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1751 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1752 1753 rb_head_page_activate(cpu_buffer); 1754 1755 return cpu_buffer; 1756 1757 fail_free_reader: 1758 free_buffer_page(cpu_buffer->reader_page); 1759 1760 fail_free_buffer: 1761 kfree(cpu_buffer); 1762 return NULL; 1763 } 1764 1765 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1766 { 1767 struct list_head *head = cpu_buffer->pages; 1768 struct buffer_page *bpage, *tmp; 1769 1770 free_buffer_page(cpu_buffer->reader_page); 1771 1772 if (head) { 1773 rb_head_page_deactivate(cpu_buffer); 1774 1775 list_for_each_entry_safe(bpage, tmp, head, list) { 1776 list_del_init(&bpage->list); 1777 free_buffer_page(bpage); 1778 } 1779 bpage = list_entry(head, struct buffer_page, list); 1780 free_buffer_page(bpage); 1781 } 1782 1783 kfree(cpu_buffer); 1784 } 1785 1786 /** 1787 * __ring_buffer_alloc - allocate a new ring_buffer 1788 * @size: the size in bytes per cpu that is needed. 1789 * @flags: attributes to set for the ring buffer. 1790 * @key: ring buffer reader_lock_key. 1791 * 1792 * Currently the only flag that is available is the RB_FL_OVERWRITE 1793 * flag. This flag means that the buffer will overwrite old data 1794 * when the buffer wraps. If this flag is not set, the buffer will 1795 * drop data when the tail hits the head. 1796 */ 1797 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1798 struct lock_class_key *key) 1799 { 1800 struct trace_buffer *buffer; 1801 long nr_pages; 1802 int bsize; 1803 int cpu; 1804 int ret; 1805 1806 /* keep it in its own cache line */ 1807 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1808 GFP_KERNEL); 1809 if (!buffer) 1810 return NULL; 1811 1812 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1813 goto fail_free_buffer; 1814 1815 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1816 buffer->flags = flags; 1817 buffer->clock = trace_clock_local; 1818 buffer->reader_lock_key = key; 1819 1820 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 1821 init_waitqueue_head(&buffer->irq_work.waiters); 1822 1823 /* need at least two pages */ 1824 if (nr_pages < 2) 1825 nr_pages = 2; 1826 1827 buffer->cpus = nr_cpu_ids; 1828 1829 bsize = sizeof(void *) * nr_cpu_ids; 1830 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1831 GFP_KERNEL); 1832 if (!buffer->buffers) 1833 goto fail_free_cpumask; 1834 1835 cpu = raw_smp_processor_id(); 1836 cpumask_set_cpu(cpu, buffer->cpumask); 1837 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 1838 if (!buffer->buffers[cpu]) 1839 goto fail_free_buffers; 1840 1841 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1842 if (ret < 0) 1843 goto fail_free_buffers; 1844 1845 mutex_init(&buffer->mutex); 1846 1847 return buffer; 1848 1849 fail_free_buffers: 1850 for_each_buffer_cpu(buffer, cpu) { 1851 if (buffer->buffers[cpu]) 1852 rb_free_cpu_buffer(buffer->buffers[cpu]); 1853 } 1854 kfree(buffer->buffers); 1855 1856 fail_free_cpumask: 1857 free_cpumask_var(buffer->cpumask); 1858 1859 fail_free_buffer: 1860 kfree(buffer); 1861 return NULL; 1862 } 1863 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1864 1865 /** 1866 * ring_buffer_free - free a ring buffer. 1867 * @buffer: the buffer to free. 1868 */ 1869 void 1870 ring_buffer_free(struct trace_buffer *buffer) 1871 { 1872 int cpu; 1873 1874 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1875 1876 for_each_buffer_cpu(buffer, cpu) 1877 rb_free_cpu_buffer(buffer->buffers[cpu]); 1878 1879 kfree(buffer->buffers); 1880 free_cpumask_var(buffer->cpumask); 1881 1882 kfree(buffer); 1883 } 1884 EXPORT_SYMBOL_GPL(ring_buffer_free); 1885 1886 void ring_buffer_set_clock(struct trace_buffer *buffer, 1887 u64 (*clock)(void)) 1888 { 1889 buffer->clock = clock; 1890 } 1891 1892 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 1893 { 1894 buffer->time_stamp_abs = abs; 1895 } 1896 1897 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 1898 { 1899 return buffer->time_stamp_abs; 1900 } 1901 1902 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1903 1904 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1905 { 1906 return local_read(&bpage->entries) & RB_WRITE_MASK; 1907 } 1908 1909 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1910 { 1911 return local_read(&bpage->write) & RB_WRITE_MASK; 1912 } 1913 1914 static bool 1915 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 1916 { 1917 struct list_head *tail_page, *to_remove, *next_page; 1918 struct buffer_page *to_remove_page, *tmp_iter_page; 1919 struct buffer_page *last_page, *first_page; 1920 unsigned long nr_removed; 1921 unsigned long head_bit; 1922 int page_entries; 1923 1924 head_bit = 0; 1925 1926 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1927 atomic_inc(&cpu_buffer->record_disabled); 1928 /* 1929 * We don't race with the readers since we have acquired the reader 1930 * lock. We also don't race with writers after disabling recording. 1931 * This makes it easy to figure out the first and the last page to be 1932 * removed from the list. We unlink all the pages in between including 1933 * the first and last pages. This is done in a busy loop so that we 1934 * lose the least number of traces. 1935 * The pages are freed after we restart recording and unlock readers. 1936 */ 1937 tail_page = &cpu_buffer->tail_page->list; 1938 1939 /* 1940 * tail page might be on reader page, we remove the next page 1941 * from the ring buffer 1942 */ 1943 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 1944 tail_page = rb_list_head(tail_page->next); 1945 to_remove = tail_page; 1946 1947 /* start of pages to remove */ 1948 first_page = list_entry(rb_list_head(to_remove->next), 1949 struct buffer_page, list); 1950 1951 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 1952 to_remove = rb_list_head(to_remove)->next; 1953 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 1954 } 1955 1956 next_page = rb_list_head(to_remove)->next; 1957 1958 /* 1959 * Now we remove all pages between tail_page and next_page. 1960 * Make sure that we have head_bit value preserved for the 1961 * next page 1962 */ 1963 tail_page->next = (struct list_head *)((unsigned long)next_page | 1964 head_bit); 1965 next_page = rb_list_head(next_page); 1966 next_page->prev = tail_page; 1967 1968 /* make sure pages points to a valid page in the ring buffer */ 1969 cpu_buffer->pages = next_page; 1970 1971 /* update head page */ 1972 if (head_bit) 1973 cpu_buffer->head_page = list_entry(next_page, 1974 struct buffer_page, list); 1975 1976 /* 1977 * change read pointer to make sure any read iterators reset 1978 * themselves 1979 */ 1980 cpu_buffer->read = 0; 1981 1982 /* pages are removed, resume tracing and then free the pages */ 1983 atomic_dec(&cpu_buffer->record_disabled); 1984 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1985 1986 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 1987 1988 /* last buffer page to remove */ 1989 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 1990 list); 1991 tmp_iter_page = first_page; 1992 1993 do { 1994 cond_resched(); 1995 1996 to_remove_page = tmp_iter_page; 1997 rb_inc_page(&tmp_iter_page); 1998 1999 /* update the counters */ 2000 page_entries = rb_page_entries(to_remove_page); 2001 if (page_entries) { 2002 /* 2003 * If something was added to this page, it was full 2004 * since it is not the tail page. So we deduct the 2005 * bytes consumed in ring buffer from here. 2006 * Increment overrun to account for the lost events. 2007 */ 2008 local_add(page_entries, &cpu_buffer->overrun); 2009 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 2010 local_inc(&cpu_buffer->pages_lost); 2011 } 2012 2013 /* 2014 * We have already removed references to this list item, just 2015 * free up the buffer_page and its page 2016 */ 2017 free_buffer_page(to_remove_page); 2018 nr_removed--; 2019 2020 } while (to_remove_page != last_page); 2021 2022 RB_WARN_ON(cpu_buffer, nr_removed); 2023 2024 return nr_removed == 0; 2025 } 2026 2027 static bool 2028 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2029 { 2030 struct list_head *pages = &cpu_buffer->new_pages; 2031 unsigned long flags; 2032 bool success; 2033 int retries; 2034 2035 /* Can be called at early boot up, where interrupts must not been enabled */ 2036 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2037 /* 2038 * We are holding the reader lock, so the reader page won't be swapped 2039 * in the ring buffer. Now we are racing with the writer trying to 2040 * move head page and the tail page. 2041 * We are going to adapt the reader page update process where: 2042 * 1. We first splice the start and end of list of new pages between 2043 * the head page and its previous page. 2044 * 2. We cmpxchg the prev_page->next to point from head page to the 2045 * start of new pages list. 2046 * 3. Finally, we update the head->prev to the end of new list. 2047 * 2048 * We will try this process 10 times, to make sure that we don't keep 2049 * spinning. 2050 */ 2051 retries = 10; 2052 success = false; 2053 while (retries--) { 2054 struct list_head *head_page, *prev_page, *r; 2055 struct list_head *last_page, *first_page; 2056 struct list_head *head_page_with_bit; 2057 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2058 2059 if (!hpage) 2060 break; 2061 head_page = &hpage->list; 2062 prev_page = head_page->prev; 2063 2064 first_page = pages->next; 2065 last_page = pages->prev; 2066 2067 head_page_with_bit = (struct list_head *) 2068 ((unsigned long)head_page | RB_PAGE_HEAD); 2069 2070 last_page->next = head_page_with_bit; 2071 first_page->prev = prev_page; 2072 2073 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page); 2074 2075 if (r == head_page_with_bit) { 2076 /* 2077 * yay, we replaced the page pointer to our new list, 2078 * now, we just have to update to head page's prev 2079 * pointer to point to end of list 2080 */ 2081 head_page->prev = last_page; 2082 success = true; 2083 break; 2084 } 2085 } 2086 2087 if (success) 2088 INIT_LIST_HEAD(pages); 2089 /* 2090 * If we weren't successful in adding in new pages, warn and stop 2091 * tracing 2092 */ 2093 RB_WARN_ON(cpu_buffer, !success); 2094 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2095 2096 /* free pages if they weren't inserted */ 2097 if (!success) { 2098 struct buffer_page *bpage, *tmp; 2099 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2100 list) { 2101 list_del_init(&bpage->list); 2102 free_buffer_page(bpage); 2103 } 2104 } 2105 return success; 2106 } 2107 2108 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2109 { 2110 bool success; 2111 2112 if (cpu_buffer->nr_pages_to_update > 0) 2113 success = rb_insert_pages(cpu_buffer); 2114 else 2115 success = rb_remove_pages(cpu_buffer, 2116 -cpu_buffer->nr_pages_to_update); 2117 2118 if (success) 2119 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2120 } 2121 2122 static void update_pages_handler(struct work_struct *work) 2123 { 2124 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2125 struct ring_buffer_per_cpu, update_pages_work); 2126 rb_update_pages(cpu_buffer); 2127 complete(&cpu_buffer->update_done); 2128 } 2129 2130 /** 2131 * ring_buffer_resize - resize the ring buffer 2132 * @buffer: the buffer to resize. 2133 * @size: the new size. 2134 * @cpu_id: the cpu buffer to resize 2135 * 2136 * Minimum size is 2 * BUF_PAGE_SIZE. 2137 * 2138 * Returns 0 on success and < 0 on failure. 2139 */ 2140 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2141 int cpu_id) 2142 { 2143 struct ring_buffer_per_cpu *cpu_buffer; 2144 unsigned long nr_pages; 2145 int cpu, err; 2146 2147 /* 2148 * Always succeed at resizing a non-existent buffer: 2149 */ 2150 if (!buffer) 2151 return 0; 2152 2153 /* Make sure the requested buffer exists */ 2154 if (cpu_id != RING_BUFFER_ALL_CPUS && 2155 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2156 return 0; 2157 2158 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 2159 2160 /* we need a minimum of two pages */ 2161 if (nr_pages < 2) 2162 nr_pages = 2; 2163 2164 /* prevent another thread from changing buffer sizes */ 2165 mutex_lock(&buffer->mutex); 2166 2167 2168 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2169 /* 2170 * Don't succeed if resizing is disabled, as a reader might be 2171 * manipulating the ring buffer and is expecting a sane state while 2172 * this is true. 2173 */ 2174 for_each_buffer_cpu(buffer, cpu) { 2175 cpu_buffer = buffer->buffers[cpu]; 2176 if (atomic_read(&cpu_buffer->resize_disabled)) { 2177 err = -EBUSY; 2178 goto out_err_unlock; 2179 } 2180 } 2181 2182 /* calculate the pages to update */ 2183 for_each_buffer_cpu(buffer, cpu) { 2184 cpu_buffer = buffer->buffers[cpu]; 2185 2186 cpu_buffer->nr_pages_to_update = nr_pages - 2187 cpu_buffer->nr_pages; 2188 /* 2189 * nothing more to do for removing pages or no update 2190 */ 2191 if (cpu_buffer->nr_pages_to_update <= 0) 2192 continue; 2193 /* 2194 * to add pages, make sure all new pages can be 2195 * allocated without receiving ENOMEM 2196 */ 2197 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2198 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2199 &cpu_buffer->new_pages)) { 2200 /* not enough memory for new pages */ 2201 err = -ENOMEM; 2202 goto out_err; 2203 } 2204 } 2205 2206 cpus_read_lock(); 2207 /* 2208 * Fire off all the required work handlers 2209 * We can't schedule on offline CPUs, but it's not necessary 2210 * since we can change their buffer sizes without any race. 2211 */ 2212 for_each_buffer_cpu(buffer, cpu) { 2213 cpu_buffer = buffer->buffers[cpu]; 2214 if (!cpu_buffer->nr_pages_to_update) 2215 continue; 2216 2217 /* Can't run something on an offline CPU. */ 2218 if (!cpu_online(cpu)) { 2219 rb_update_pages(cpu_buffer); 2220 cpu_buffer->nr_pages_to_update = 0; 2221 } else { 2222 /* Run directly if possible. */ 2223 migrate_disable(); 2224 if (cpu != smp_processor_id()) { 2225 migrate_enable(); 2226 schedule_work_on(cpu, 2227 &cpu_buffer->update_pages_work); 2228 } else { 2229 update_pages_handler(&cpu_buffer->update_pages_work); 2230 migrate_enable(); 2231 } 2232 } 2233 } 2234 2235 /* wait for all the updates to complete */ 2236 for_each_buffer_cpu(buffer, cpu) { 2237 cpu_buffer = buffer->buffers[cpu]; 2238 if (!cpu_buffer->nr_pages_to_update) 2239 continue; 2240 2241 if (cpu_online(cpu)) 2242 wait_for_completion(&cpu_buffer->update_done); 2243 cpu_buffer->nr_pages_to_update = 0; 2244 } 2245 2246 cpus_read_unlock(); 2247 } else { 2248 cpu_buffer = buffer->buffers[cpu_id]; 2249 2250 if (nr_pages == cpu_buffer->nr_pages) 2251 goto out; 2252 2253 /* 2254 * Don't succeed if resizing is disabled, as a reader might be 2255 * manipulating the ring buffer and is expecting a sane state while 2256 * this is true. 2257 */ 2258 if (atomic_read(&cpu_buffer->resize_disabled)) { 2259 err = -EBUSY; 2260 goto out_err_unlock; 2261 } 2262 2263 cpu_buffer->nr_pages_to_update = nr_pages - 2264 cpu_buffer->nr_pages; 2265 2266 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2267 if (cpu_buffer->nr_pages_to_update > 0 && 2268 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2269 &cpu_buffer->new_pages)) { 2270 err = -ENOMEM; 2271 goto out_err; 2272 } 2273 2274 cpus_read_lock(); 2275 2276 /* Can't run something on an offline CPU. */ 2277 if (!cpu_online(cpu_id)) 2278 rb_update_pages(cpu_buffer); 2279 else { 2280 /* Run directly if possible. */ 2281 migrate_disable(); 2282 if (cpu_id == smp_processor_id()) { 2283 rb_update_pages(cpu_buffer); 2284 migrate_enable(); 2285 } else { 2286 migrate_enable(); 2287 schedule_work_on(cpu_id, 2288 &cpu_buffer->update_pages_work); 2289 wait_for_completion(&cpu_buffer->update_done); 2290 } 2291 } 2292 2293 cpu_buffer->nr_pages_to_update = 0; 2294 cpus_read_unlock(); 2295 } 2296 2297 out: 2298 /* 2299 * The ring buffer resize can happen with the ring buffer 2300 * enabled, so that the update disturbs the tracing as little 2301 * as possible. But if the buffer is disabled, we do not need 2302 * to worry about that, and we can take the time to verify 2303 * that the buffer is not corrupt. 2304 */ 2305 if (atomic_read(&buffer->record_disabled)) { 2306 atomic_inc(&buffer->record_disabled); 2307 /* 2308 * Even though the buffer was disabled, we must make sure 2309 * that it is truly disabled before calling rb_check_pages. 2310 * There could have been a race between checking 2311 * record_disable and incrementing it. 2312 */ 2313 synchronize_rcu(); 2314 for_each_buffer_cpu(buffer, cpu) { 2315 cpu_buffer = buffer->buffers[cpu]; 2316 rb_check_pages(cpu_buffer); 2317 } 2318 atomic_dec(&buffer->record_disabled); 2319 } 2320 2321 mutex_unlock(&buffer->mutex); 2322 return 0; 2323 2324 out_err: 2325 for_each_buffer_cpu(buffer, cpu) { 2326 struct buffer_page *bpage, *tmp; 2327 2328 cpu_buffer = buffer->buffers[cpu]; 2329 cpu_buffer->nr_pages_to_update = 0; 2330 2331 if (list_empty(&cpu_buffer->new_pages)) 2332 continue; 2333 2334 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2335 list) { 2336 list_del_init(&bpage->list); 2337 free_buffer_page(bpage); 2338 } 2339 } 2340 out_err_unlock: 2341 mutex_unlock(&buffer->mutex); 2342 return err; 2343 } 2344 EXPORT_SYMBOL_GPL(ring_buffer_resize); 2345 2346 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 2347 { 2348 mutex_lock(&buffer->mutex); 2349 if (val) 2350 buffer->flags |= RB_FL_OVERWRITE; 2351 else 2352 buffer->flags &= ~RB_FL_OVERWRITE; 2353 mutex_unlock(&buffer->mutex); 2354 } 2355 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 2356 2357 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 2358 { 2359 return bpage->page->data + index; 2360 } 2361 2362 static __always_inline struct ring_buffer_event * 2363 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 2364 { 2365 return __rb_page_index(cpu_buffer->reader_page, 2366 cpu_buffer->reader_page->read); 2367 } 2368 2369 static __always_inline unsigned rb_page_commit(struct buffer_page *bpage) 2370 { 2371 return local_read(&bpage->page->commit); 2372 } 2373 2374 static struct ring_buffer_event * 2375 rb_iter_head_event(struct ring_buffer_iter *iter) 2376 { 2377 struct ring_buffer_event *event; 2378 struct buffer_page *iter_head_page = iter->head_page; 2379 unsigned long commit; 2380 unsigned length; 2381 2382 if (iter->head != iter->next_event) 2383 return iter->event; 2384 2385 /* 2386 * When the writer goes across pages, it issues a cmpxchg which 2387 * is a mb(), which will synchronize with the rmb here. 2388 * (see rb_tail_page_update() and __rb_reserve_next()) 2389 */ 2390 commit = rb_page_commit(iter_head_page); 2391 smp_rmb(); 2392 event = __rb_page_index(iter_head_page, iter->head); 2393 length = rb_event_length(event); 2394 2395 /* 2396 * READ_ONCE() doesn't work on functions and we don't want the 2397 * compiler doing any crazy optimizations with length. 2398 */ 2399 barrier(); 2400 2401 if ((iter->head + length) > commit || length > BUF_MAX_DATA_SIZE) 2402 /* Writer corrupted the read? */ 2403 goto reset; 2404 2405 memcpy(iter->event, event, length); 2406 /* 2407 * If the page stamp is still the same after this rmb() then the 2408 * event was safely copied without the writer entering the page. 2409 */ 2410 smp_rmb(); 2411 2412 /* Make sure the page didn't change since we read this */ 2413 if (iter->page_stamp != iter_head_page->page->time_stamp || 2414 commit > rb_page_commit(iter_head_page)) 2415 goto reset; 2416 2417 iter->next_event = iter->head + length; 2418 return iter->event; 2419 reset: 2420 /* Reset to the beginning */ 2421 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2422 iter->head = 0; 2423 iter->next_event = 0; 2424 iter->missed_events = 1; 2425 return NULL; 2426 } 2427 2428 /* Size is determined by what has been committed */ 2429 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 2430 { 2431 return rb_page_commit(bpage); 2432 } 2433 2434 static __always_inline unsigned 2435 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 2436 { 2437 return rb_page_commit(cpu_buffer->commit_page); 2438 } 2439 2440 static __always_inline unsigned 2441 rb_event_index(struct ring_buffer_event *event) 2442 { 2443 unsigned long addr = (unsigned long)event; 2444 2445 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 2446 } 2447 2448 static void rb_inc_iter(struct ring_buffer_iter *iter) 2449 { 2450 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2451 2452 /* 2453 * The iterator could be on the reader page (it starts there). 2454 * But the head could have moved, since the reader was 2455 * found. Check for this case and assign the iterator 2456 * to the head page instead of next. 2457 */ 2458 if (iter->head_page == cpu_buffer->reader_page) 2459 iter->head_page = rb_set_head_page(cpu_buffer); 2460 else 2461 rb_inc_page(&iter->head_page); 2462 2463 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2464 iter->head = 0; 2465 iter->next_event = 0; 2466 } 2467 2468 /* 2469 * rb_handle_head_page - writer hit the head page 2470 * 2471 * Returns: +1 to retry page 2472 * 0 to continue 2473 * -1 on error 2474 */ 2475 static int 2476 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 2477 struct buffer_page *tail_page, 2478 struct buffer_page *next_page) 2479 { 2480 struct buffer_page *new_head; 2481 int entries; 2482 int type; 2483 int ret; 2484 2485 entries = rb_page_entries(next_page); 2486 2487 /* 2488 * The hard part is here. We need to move the head 2489 * forward, and protect against both readers on 2490 * other CPUs and writers coming in via interrupts. 2491 */ 2492 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 2493 RB_PAGE_HEAD); 2494 2495 /* 2496 * type can be one of four: 2497 * NORMAL - an interrupt already moved it for us 2498 * HEAD - we are the first to get here. 2499 * UPDATE - we are the interrupt interrupting 2500 * a current move. 2501 * MOVED - a reader on another CPU moved the next 2502 * pointer to its reader page. Give up 2503 * and try again. 2504 */ 2505 2506 switch (type) { 2507 case RB_PAGE_HEAD: 2508 /* 2509 * We changed the head to UPDATE, thus 2510 * it is our responsibility to update 2511 * the counters. 2512 */ 2513 local_add(entries, &cpu_buffer->overrun); 2514 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 2515 local_inc(&cpu_buffer->pages_lost); 2516 2517 /* 2518 * The entries will be zeroed out when we move the 2519 * tail page. 2520 */ 2521 2522 /* still more to do */ 2523 break; 2524 2525 case RB_PAGE_UPDATE: 2526 /* 2527 * This is an interrupt that interrupt the 2528 * previous update. Still more to do. 2529 */ 2530 break; 2531 case RB_PAGE_NORMAL: 2532 /* 2533 * An interrupt came in before the update 2534 * and processed this for us. 2535 * Nothing left to do. 2536 */ 2537 return 1; 2538 case RB_PAGE_MOVED: 2539 /* 2540 * The reader is on another CPU and just did 2541 * a swap with our next_page. 2542 * Try again. 2543 */ 2544 return 1; 2545 default: 2546 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 2547 return -1; 2548 } 2549 2550 /* 2551 * Now that we are here, the old head pointer is 2552 * set to UPDATE. This will keep the reader from 2553 * swapping the head page with the reader page. 2554 * The reader (on another CPU) will spin till 2555 * we are finished. 2556 * 2557 * We just need to protect against interrupts 2558 * doing the job. We will set the next pointer 2559 * to HEAD. After that, we set the old pointer 2560 * to NORMAL, but only if it was HEAD before. 2561 * otherwise we are an interrupt, and only 2562 * want the outer most commit to reset it. 2563 */ 2564 new_head = next_page; 2565 rb_inc_page(&new_head); 2566 2567 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 2568 RB_PAGE_NORMAL); 2569 2570 /* 2571 * Valid returns are: 2572 * HEAD - an interrupt came in and already set it. 2573 * NORMAL - One of two things: 2574 * 1) We really set it. 2575 * 2) A bunch of interrupts came in and moved 2576 * the page forward again. 2577 */ 2578 switch (ret) { 2579 case RB_PAGE_HEAD: 2580 case RB_PAGE_NORMAL: 2581 /* OK */ 2582 break; 2583 default: 2584 RB_WARN_ON(cpu_buffer, 1); 2585 return -1; 2586 } 2587 2588 /* 2589 * It is possible that an interrupt came in, 2590 * set the head up, then more interrupts came in 2591 * and moved it again. When we get back here, 2592 * the page would have been set to NORMAL but we 2593 * just set it back to HEAD. 2594 * 2595 * How do you detect this? Well, if that happened 2596 * the tail page would have moved. 2597 */ 2598 if (ret == RB_PAGE_NORMAL) { 2599 struct buffer_page *buffer_tail_page; 2600 2601 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 2602 /* 2603 * If the tail had moved passed next, then we need 2604 * to reset the pointer. 2605 */ 2606 if (buffer_tail_page != tail_page && 2607 buffer_tail_page != next_page) 2608 rb_head_page_set_normal(cpu_buffer, new_head, 2609 next_page, 2610 RB_PAGE_HEAD); 2611 } 2612 2613 /* 2614 * If this was the outer most commit (the one that 2615 * changed the original pointer from HEAD to UPDATE), 2616 * then it is up to us to reset it to NORMAL. 2617 */ 2618 if (type == RB_PAGE_HEAD) { 2619 ret = rb_head_page_set_normal(cpu_buffer, next_page, 2620 tail_page, 2621 RB_PAGE_UPDATE); 2622 if (RB_WARN_ON(cpu_buffer, 2623 ret != RB_PAGE_UPDATE)) 2624 return -1; 2625 } 2626 2627 return 0; 2628 } 2629 2630 static inline void 2631 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 2632 unsigned long tail, struct rb_event_info *info) 2633 { 2634 struct buffer_page *tail_page = info->tail_page; 2635 struct ring_buffer_event *event; 2636 unsigned long length = info->length; 2637 2638 /* 2639 * Only the event that crossed the page boundary 2640 * must fill the old tail_page with padding. 2641 */ 2642 if (tail >= BUF_PAGE_SIZE) { 2643 /* 2644 * If the page was filled, then we still need 2645 * to update the real_end. Reset it to zero 2646 * and the reader will ignore it. 2647 */ 2648 if (tail == BUF_PAGE_SIZE) 2649 tail_page->real_end = 0; 2650 2651 local_sub(length, &tail_page->write); 2652 return; 2653 } 2654 2655 event = __rb_page_index(tail_page, tail); 2656 2657 /* account for padding bytes */ 2658 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes); 2659 2660 /* 2661 * Save the original length to the meta data. 2662 * This will be used by the reader to add lost event 2663 * counter. 2664 */ 2665 tail_page->real_end = tail; 2666 2667 /* 2668 * If this event is bigger than the minimum size, then 2669 * we need to be careful that we don't subtract the 2670 * write counter enough to allow another writer to slip 2671 * in on this page. 2672 * We put in a discarded commit instead, to make sure 2673 * that this space is not used again. 2674 * 2675 * If we are less than the minimum size, we don't need to 2676 * worry about it. 2677 */ 2678 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 2679 /* No room for any events */ 2680 2681 /* Mark the rest of the page with padding */ 2682 rb_event_set_padding(event); 2683 2684 /* Make sure the padding is visible before the write update */ 2685 smp_wmb(); 2686 2687 /* Set the write back to the previous setting */ 2688 local_sub(length, &tail_page->write); 2689 return; 2690 } 2691 2692 /* Put in a discarded event */ 2693 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 2694 event->type_len = RINGBUF_TYPE_PADDING; 2695 /* time delta must be non zero */ 2696 event->time_delta = 1; 2697 2698 /* Make sure the padding is visible before the tail_page->write update */ 2699 smp_wmb(); 2700 2701 /* Set write to end of buffer */ 2702 length = (tail + length) - BUF_PAGE_SIZE; 2703 local_sub(length, &tail_page->write); 2704 } 2705 2706 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 2707 2708 /* 2709 * This is the slow path, force gcc not to inline it. 2710 */ 2711 static noinline struct ring_buffer_event * 2712 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 2713 unsigned long tail, struct rb_event_info *info) 2714 { 2715 struct buffer_page *tail_page = info->tail_page; 2716 struct buffer_page *commit_page = cpu_buffer->commit_page; 2717 struct trace_buffer *buffer = cpu_buffer->buffer; 2718 struct buffer_page *next_page; 2719 int ret; 2720 2721 next_page = tail_page; 2722 2723 rb_inc_page(&next_page); 2724 2725 /* 2726 * If for some reason, we had an interrupt storm that made 2727 * it all the way around the buffer, bail, and warn 2728 * about it. 2729 */ 2730 if (unlikely(next_page == commit_page)) { 2731 local_inc(&cpu_buffer->commit_overrun); 2732 goto out_reset; 2733 } 2734 2735 /* 2736 * This is where the fun begins! 2737 * 2738 * We are fighting against races between a reader that 2739 * could be on another CPU trying to swap its reader 2740 * page with the buffer head. 2741 * 2742 * We are also fighting against interrupts coming in and 2743 * moving the head or tail on us as well. 2744 * 2745 * If the next page is the head page then we have filled 2746 * the buffer, unless the commit page is still on the 2747 * reader page. 2748 */ 2749 if (rb_is_head_page(next_page, &tail_page->list)) { 2750 2751 /* 2752 * If the commit is not on the reader page, then 2753 * move the header page. 2754 */ 2755 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 2756 /* 2757 * If we are not in overwrite mode, 2758 * this is easy, just stop here. 2759 */ 2760 if (!(buffer->flags & RB_FL_OVERWRITE)) { 2761 local_inc(&cpu_buffer->dropped_events); 2762 goto out_reset; 2763 } 2764 2765 ret = rb_handle_head_page(cpu_buffer, 2766 tail_page, 2767 next_page); 2768 if (ret < 0) 2769 goto out_reset; 2770 if (ret) 2771 goto out_again; 2772 } else { 2773 /* 2774 * We need to be careful here too. The 2775 * commit page could still be on the reader 2776 * page. We could have a small buffer, and 2777 * have filled up the buffer with events 2778 * from interrupts and such, and wrapped. 2779 * 2780 * Note, if the tail page is also on the 2781 * reader_page, we let it move out. 2782 */ 2783 if (unlikely((cpu_buffer->commit_page != 2784 cpu_buffer->tail_page) && 2785 (cpu_buffer->commit_page == 2786 cpu_buffer->reader_page))) { 2787 local_inc(&cpu_buffer->commit_overrun); 2788 goto out_reset; 2789 } 2790 } 2791 } 2792 2793 rb_tail_page_update(cpu_buffer, tail_page, next_page); 2794 2795 out_again: 2796 2797 rb_reset_tail(cpu_buffer, tail, info); 2798 2799 /* Commit what we have for now. */ 2800 rb_end_commit(cpu_buffer); 2801 /* rb_end_commit() decs committing */ 2802 local_inc(&cpu_buffer->committing); 2803 2804 /* fail and let the caller try again */ 2805 return ERR_PTR(-EAGAIN); 2806 2807 out_reset: 2808 /* reset write */ 2809 rb_reset_tail(cpu_buffer, tail, info); 2810 2811 return NULL; 2812 } 2813 2814 /* Slow path */ 2815 static struct ring_buffer_event * 2816 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs) 2817 { 2818 if (abs) 2819 event->type_len = RINGBUF_TYPE_TIME_STAMP; 2820 else 2821 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 2822 2823 /* Not the first event on the page, or not delta? */ 2824 if (abs || rb_event_index(event)) { 2825 event->time_delta = delta & TS_MASK; 2826 event->array[0] = delta >> TS_SHIFT; 2827 } else { 2828 /* nope, just zero it */ 2829 event->time_delta = 0; 2830 event->array[0] = 0; 2831 } 2832 2833 return skip_time_extend(event); 2834 } 2835 2836 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 2837 static inline bool sched_clock_stable(void) 2838 { 2839 return true; 2840 } 2841 #endif 2842 2843 static void 2844 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2845 struct rb_event_info *info) 2846 { 2847 u64 write_stamp; 2848 2849 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 2850 (unsigned long long)info->delta, 2851 (unsigned long long)info->ts, 2852 (unsigned long long)info->before, 2853 (unsigned long long)info->after, 2854 (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0), 2855 sched_clock_stable() ? "" : 2856 "If you just came from a suspend/resume,\n" 2857 "please switch to the trace global clock:\n" 2858 " echo global > /sys/kernel/tracing/trace_clock\n" 2859 "or add trace_clock=global to the kernel command line\n"); 2860 } 2861 2862 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2863 struct ring_buffer_event **event, 2864 struct rb_event_info *info, 2865 u64 *delta, 2866 unsigned int *length) 2867 { 2868 bool abs = info->add_timestamp & 2869 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 2870 2871 if (unlikely(info->delta > (1ULL << 59))) { 2872 /* 2873 * Some timers can use more than 59 bits, and when a timestamp 2874 * is added to the buffer, it will lose those bits. 2875 */ 2876 if (abs && (info->ts & TS_MSB)) { 2877 info->delta &= ABS_TS_MASK; 2878 2879 /* did the clock go backwards */ 2880 } else if (info->before == info->after && info->before > info->ts) { 2881 /* not interrupted */ 2882 static int once; 2883 2884 /* 2885 * This is possible with a recalibrating of the TSC. 2886 * Do not produce a call stack, but just report it. 2887 */ 2888 if (!once) { 2889 once++; 2890 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 2891 info->before, info->ts); 2892 } 2893 } else 2894 rb_check_timestamp(cpu_buffer, info); 2895 if (!abs) 2896 info->delta = 0; 2897 } 2898 *event = rb_add_time_stamp(*event, info->delta, abs); 2899 *length -= RB_LEN_TIME_EXTEND; 2900 *delta = 0; 2901 } 2902 2903 /** 2904 * rb_update_event - update event type and data 2905 * @cpu_buffer: The per cpu buffer of the @event 2906 * @event: the event to update 2907 * @info: The info to update the @event with (contains length and delta) 2908 * 2909 * Update the type and data fields of the @event. The length 2910 * is the actual size that is written to the ring buffer, 2911 * and with this, we can determine what to place into the 2912 * data field. 2913 */ 2914 static void 2915 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 2916 struct ring_buffer_event *event, 2917 struct rb_event_info *info) 2918 { 2919 unsigned length = info->length; 2920 u64 delta = info->delta; 2921 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 2922 2923 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 2924 cpu_buffer->event_stamp[nest] = info->ts; 2925 2926 /* 2927 * If we need to add a timestamp, then we 2928 * add it to the start of the reserved space. 2929 */ 2930 if (unlikely(info->add_timestamp)) 2931 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 2932 2933 event->time_delta = delta; 2934 length -= RB_EVNT_HDR_SIZE; 2935 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 2936 event->type_len = 0; 2937 event->array[0] = length; 2938 } else 2939 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 2940 } 2941 2942 static unsigned rb_calculate_event_length(unsigned length) 2943 { 2944 struct ring_buffer_event event; /* Used only for sizeof array */ 2945 2946 /* zero length can cause confusions */ 2947 if (!length) 2948 length++; 2949 2950 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 2951 length += sizeof(event.array[0]); 2952 2953 length += RB_EVNT_HDR_SIZE; 2954 length = ALIGN(length, RB_ARCH_ALIGNMENT); 2955 2956 /* 2957 * In case the time delta is larger than the 27 bits for it 2958 * in the header, we need to add a timestamp. If another 2959 * event comes in when trying to discard this one to increase 2960 * the length, then the timestamp will be added in the allocated 2961 * space of this event. If length is bigger than the size needed 2962 * for the TIME_EXTEND, then padding has to be used. The events 2963 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 2964 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 2965 * As length is a multiple of 4, we only need to worry if it 2966 * is 12 (RB_LEN_TIME_EXTEND + 4). 2967 */ 2968 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 2969 length += RB_ALIGNMENT; 2970 2971 return length; 2972 } 2973 2974 static u64 rb_time_delta(struct ring_buffer_event *event) 2975 { 2976 switch (event->type_len) { 2977 case RINGBUF_TYPE_PADDING: 2978 return 0; 2979 2980 case RINGBUF_TYPE_TIME_EXTEND: 2981 return rb_event_time_stamp(event); 2982 2983 case RINGBUF_TYPE_TIME_STAMP: 2984 return 0; 2985 2986 case RINGBUF_TYPE_DATA: 2987 return event->time_delta; 2988 default: 2989 return 0; 2990 } 2991 } 2992 2993 static inline bool 2994 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2995 struct ring_buffer_event *event) 2996 { 2997 unsigned long new_index, old_index; 2998 struct buffer_page *bpage; 2999 unsigned long index; 3000 unsigned long addr; 3001 u64 write_stamp; 3002 u64 delta; 3003 3004 new_index = rb_event_index(event); 3005 old_index = new_index + rb_event_ts_length(event); 3006 addr = (unsigned long)event; 3007 addr &= PAGE_MASK; 3008 3009 bpage = READ_ONCE(cpu_buffer->tail_page); 3010 3011 delta = rb_time_delta(event); 3012 3013 if (!rb_time_read(&cpu_buffer->write_stamp, &write_stamp)) 3014 return false; 3015 3016 /* Make sure the write stamp is read before testing the location */ 3017 barrier(); 3018 3019 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3020 unsigned long write_mask = 3021 local_read(&bpage->write) & ~RB_WRITE_MASK; 3022 unsigned long event_length = rb_event_length(event); 3023 3024 /* Something came in, can't discard */ 3025 if (!rb_time_cmpxchg(&cpu_buffer->write_stamp, 3026 write_stamp, write_stamp - delta)) 3027 return false; 3028 3029 /* 3030 * It's possible that the event time delta is zero 3031 * (has the same time stamp as the previous event) 3032 * in which case write_stamp and before_stamp could 3033 * be the same. In such a case, force before_stamp 3034 * to be different than write_stamp. It doesn't 3035 * matter what it is, as long as its different. 3036 */ 3037 if (!delta) 3038 rb_time_set(&cpu_buffer->before_stamp, 0); 3039 3040 /* 3041 * If an event were to come in now, it would see that the 3042 * write_stamp and the before_stamp are different, and assume 3043 * that this event just added itself before updating 3044 * the write stamp. The interrupting event will fix the 3045 * write stamp for us, and use the before stamp as its delta. 3046 */ 3047 3048 /* 3049 * This is on the tail page. It is possible that 3050 * a write could come in and move the tail page 3051 * and write to the next page. That is fine 3052 * because we just shorten what is on this page. 3053 */ 3054 old_index += write_mask; 3055 new_index += write_mask; 3056 index = local_cmpxchg(&bpage->write, old_index, new_index); 3057 if (index == old_index) { 3058 /* update counters */ 3059 local_sub(event_length, &cpu_buffer->entries_bytes); 3060 return true; 3061 } 3062 } 3063 3064 /* could not discard */ 3065 return false; 3066 } 3067 3068 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3069 { 3070 local_inc(&cpu_buffer->committing); 3071 local_inc(&cpu_buffer->commits); 3072 } 3073 3074 static __always_inline void 3075 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3076 { 3077 unsigned long max_count; 3078 3079 /* 3080 * We only race with interrupts and NMIs on this CPU. 3081 * If we own the commit event, then we can commit 3082 * all others that interrupted us, since the interruptions 3083 * are in stack format (they finish before they come 3084 * back to us). This allows us to do a simple loop to 3085 * assign the commit to the tail. 3086 */ 3087 again: 3088 max_count = cpu_buffer->nr_pages * 100; 3089 3090 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3091 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3092 return; 3093 if (RB_WARN_ON(cpu_buffer, 3094 rb_is_reader_page(cpu_buffer->tail_page))) 3095 return; 3096 local_set(&cpu_buffer->commit_page->page->commit, 3097 rb_page_write(cpu_buffer->commit_page)); 3098 rb_inc_page(&cpu_buffer->commit_page); 3099 /* add barrier to keep gcc from optimizing too much */ 3100 barrier(); 3101 } 3102 while (rb_commit_index(cpu_buffer) != 3103 rb_page_write(cpu_buffer->commit_page)) { 3104 3105 local_set(&cpu_buffer->commit_page->page->commit, 3106 rb_page_write(cpu_buffer->commit_page)); 3107 RB_WARN_ON(cpu_buffer, 3108 local_read(&cpu_buffer->commit_page->page->commit) & 3109 ~RB_WRITE_MASK); 3110 barrier(); 3111 } 3112 3113 /* again, keep gcc from optimizing */ 3114 barrier(); 3115 3116 /* 3117 * If an interrupt came in just after the first while loop 3118 * and pushed the tail page forward, we will be left with 3119 * a dangling commit that will never go forward. 3120 */ 3121 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3122 goto again; 3123 } 3124 3125 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3126 { 3127 unsigned long commits; 3128 3129 if (RB_WARN_ON(cpu_buffer, 3130 !local_read(&cpu_buffer->committing))) 3131 return; 3132 3133 again: 3134 commits = local_read(&cpu_buffer->commits); 3135 /* synchronize with interrupts */ 3136 barrier(); 3137 if (local_read(&cpu_buffer->committing) == 1) 3138 rb_set_commit_to_write(cpu_buffer); 3139 3140 local_dec(&cpu_buffer->committing); 3141 3142 /* synchronize with interrupts */ 3143 barrier(); 3144 3145 /* 3146 * Need to account for interrupts coming in between the 3147 * updating of the commit page and the clearing of the 3148 * committing counter. 3149 */ 3150 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3151 !local_read(&cpu_buffer->committing)) { 3152 local_inc(&cpu_buffer->committing); 3153 goto again; 3154 } 3155 } 3156 3157 static inline void rb_event_discard(struct ring_buffer_event *event) 3158 { 3159 if (extended_time(event)) 3160 event = skip_time_extend(event); 3161 3162 /* array[0] holds the actual length for the discarded event */ 3163 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 3164 event->type_len = RINGBUF_TYPE_PADDING; 3165 /* time delta must be non zero */ 3166 if (!event->time_delta) 3167 event->time_delta = 1; 3168 } 3169 3170 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 3171 { 3172 local_inc(&cpu_buffer->entries); 3173 rb_end_commit(cpu_buffer); 3174 } 3175 3176 static __always_inline void 3177 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 3178 { 3179 if (buffer->irq_work.waiters_pending) { 3180 buffer->irq_work.waiters_pending = false; 3181 /* irq_work_queue() supplies it's own memory barriers */ 3182 irq_work_queue(&buffer->irq_work.work); 3183 } 3184 3185 if (cpu_buffer->irq_work.waiters_pending) { 3186 cpu_buffer->irq_work.waiters_pending = false; 3187 /* irq_work_queue() supplies it's own memory barriers */ 3188 irq_work_queue(&cpu_buffer->irq_work.work); 3189 } 3190 3191 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 3192 return; 3193 3194 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 3195 return; 3196 3197 if (!cpu_buffer->irq_work.full_waiters_pending) 3198 return; 3199 3200 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 3201 3202 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 3203 return; 3204 3205 cpu_buffer->irq_work.wakeup_full = true; 3206 cpu_buffer->irq_work.full_waiters_pending = false; 3207 /* irq_work_queue() supplies it's own memory barriers */ 3208 irq_work_queue(&cpu_buffer->irq_work.work); 3209 } 3210 3211 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 3212 # define do_ring_buffer_record_recursion() \ 3213 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 3214 #else 3215 # define do_ring_buffer_record_recursion() do { } while (0) 3216 #endif 3217 3218 /* 3219 * The lock and unlock are done within a preempt disable section. 3220 * The current_context per_cpu variable can only be modified 3221 * by the current task between lock and unlock. But it can 3222 * be modified more than once via an interrupt. To pass this 3223 * information from the lock to the unlock without having to 3224 * access the 'in_interrupt()' functions again (which do show 3225 * a bit of overhead in something as critical as function tracing, 3226 * we use a bitmask trick. 3227 * 3228 * bit 1 = NMI context 3229 * bit 2 = IRQ context 3230 * bit 3 = SoftIRQ context 3231 * bit 4 = normal context. 3232 * 3233 * This works because this is the order of contexts that can 3234 * preempt other contexts. A SoftIRQ never preempts an IRQ 3235 * context. 3236 * 3237 * When the context is determined, the corresponding bit is 3238 * checked and set (if it was set, then a recursion of that context 3239 * happened). 3240 * 3241 * On unlock, we need to clear this bit. To do so, just subtract 3242 * 1 from the current_context and AND it to itself. 3243 * 3244 * (binary) 3245 * 101 - 1 = 100 3246 * 101 & 100 = 100 (clearing bit zero) 3247 * 3248 * 1010 - 1 = 1001 3249 * 1010 & 1001 = 1000 (clearing bit 1) 3250 * 3251 * The least significant bit can be cleared this way, and it 3252 * just so happens that it is the same bit corresponding to 3253 * the current context. 3254 * 3255 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3256 * is set when a recursion is detected at the current context, and if 3257 * the TRANSITION bit is already set, it will fail the recursion. 3258 * This is needed because there's a lag between the changing of 3259 * interrupt context and updating the preempt count. In this case, 3260 * a false positive will be found. To handle this, one extra recursion 3261 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3262 * bit is already set, then it is considered a recursion and the function 3263 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3264 * 3265 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3266 * to be cleared. Even if it wasn't the context that set it. That is, 3267 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3268 * is called before preempt_count() is updated, since the check will 3269 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3270 * NMI then comes in, it will set the NMI bit, but when the NMI code 3271 * does the trace_recursive_unlock() it will clear the TRANSITION bit 3272 * and leave the NMI bit set. But this is fine, because the interrupt 3273 * code that set the TRANSITION bit will then clear the NMI bit when it 3274 * calls trace_recursive_unlock(). If another NMI comes in, it will 3275 * set the TRANSITION bit and continue. 3276 * 3277 * Note: The TRANSITION bit only handles a single transition between context. 3278 */ 3279 3280 static __always_inline bool 3281 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 3282 { 3283 unsigned int val = cpu_buffer->current_context; 3284 int bit = interrupt_context_level(); 3285 3286 bit = RB_CTX_NORMAL - bit; 3287 3288 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 3289 /* 3290 * It is possible that this was called by transitioning 3291 * between interrupt context, and preempt_count() has not 3292 * been updated yet. In this case, use the TRANSITION bit. 3293 */ 3294 bit = RB_CTX_TRANSITION; 3295 if (val & (1 << (bit + cpu_buffer->nest))) { 3296 do_ring_buffer_record_recursion(); 3297 return true; 3298 } 3299 } 3300 3301 val |= (1 << (bit + cpu_buffer->nest)); 3302 cpu_buffer->current_context = val; 3303 3304 return false; 3305 } 3306 3307 static __always_inline void 3308 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 3309 { 3310 cpu_buffer->current_context &= 3311 cpu_buffer->current_context - (1 << cpu_buffer->nest); 3312 } 3313 3314 /* The recursive locking above uses 5 bits */ 3315 #define NESTED_BITS 5 3316 3317 /** 3318 * ring_buffer_nest_start - Allow to trace while nested 3319 * @buffer: The ring buffer to modify 3320 * 3321 * The ring buffer has a safety mechanism to prevent recursion. 3322 * But there may be a case where a trace needs to be done while 3323 * tracing something else. In this case, calling this function 3324 * will allow this function to nest within a currently active 3325 * ring_buffer_lock_reserve(). 3326 * 3327 * Call this function before calling another ring_buffer_lock_reserve() and 3328 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 3329 */ 3330 void ring_buffer_nest_start(struct trace_buffer *buffer) 3331 { 3332 struct ring_buffer_per_cpu *cpu_buffer; 3333 int cpu; 3334 3335 /* Enabled by ring_buffer_nest_end() */ 3336 preempt_disable_notrace(); 3337 cpu = raw_smp_processor_id(); 3338 cpu_buffer = buffer->buffers[cpu]; 3339 /* This is the shift value for the above recursive locking */ 3340 cpu_buffer->nest += NESTED_BITS; 3341 } 3342 3343 /** 3344 * ring_buffer_nest_end - Allow to trace while nested 3345 * @buffer: The ring buffer to modify 3346 * 3347 * Must be called after ring_buffer_nest_start() and after the 3348 * ring_buffer_unlock_commit(). 3349 */ 3350 void ring_buffer_nest_end(struct trace_buffer *buffer) 3351 { 3352 struct ring_buffer_per_cpu *cpu_buffer; 3353 int cpu; 3354 3355 /* disabled by ring_buffer_nest_start() */ 3356 cpu = raw_smp_processor_id(); 3357 cpu_buffer = buffer->buffers[cpu]; 3358 /* This is the shift value for the above recursive locking */ 3359 cpu_buffer->nest -= NESTED_BITS; 3360 preempt_enable_notrace(); 3361 } 3362 3363 /** 3364 * ring_buffer_unlock_commit - commit a reserved 3365 * @buffer: The buffer to commit to 3366 * @event: The event pointer to commit. 3367 * 3368 * This commits the data to the ring buffer, and releases any locks held. 3369 * 3370 * Must be paired with ring_buffer_lock_reserve. 3371 */ 3372 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 3373 { 3374 struct ring_buffer_per_cpu *cpu_buffer; 3375 int cpu = raw_smp_processor_id(); 3376 3377 cpu_buffer = buffer->buffers[cpu]; 3378 3379 rb_commit(cpu_buffer); 3380 3381 rb_wakeups(buffer, cpu_buffer); 3382 3383 trace_recursive_unlock(cpu_buffer); 3384 3385 preempt_enable_notrace(); 3386 3387 return 0; 3388 } 3389 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 3390 3391 /* Special value to validate all deltas on a page. */ 3392 #define CHECK_FULL_PAGE 1L 3393 3394 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 3395 static void dump_buffer_page(struct buffer_data_page *bpage, 3396 struct rb_event_info *info, 3397 unsigned long tail) 3398 { 3399 struct ring_buffer_event *event; 3400 u64 ts, delta; 3401 int e; 3402 3403 ts = bpage->time_stamp; 3404 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 3405 3406 for (e = 0; e < tail; e += rb_event_length(event)) { 3407 3408 event = (struct ring_buffer_event *)(bpage->data + e); 3409 3410 switch (event->type_len) { 3411 3412 case RINGBUF_TYPE_TIME_EXTEND: 3413 delta = rb_event_time_stamp(event); 3414 ts += delta; 3415 pr_warn(" [%lld] delta:%lld TIME EXTEND\n", ts, delta); 3416 break; 3417 3418 case RINGBUF_TYPE_TIME_STAMP: 3419 delta = rb_event_time_stamp(event); 3420 ts = rb_fix_abs_ts(delta, ts); 3421 pr_warn(" [%lld] absolute:%lld TIME STAMP\n", ts, delta); 3422 break; 3423 3424 case RINGBUF_TYPE_PADDING: 3425 ts += event->time_delta; 3426 pr_warn(" [%lld] delta:%d PADDING\n", ts, event->time_delta); 3427 break; 3428 3429 case RINGBUF_TYPE_DATA: 3430 ts += event->time_delta; 3431 pr_warn(" [%lld] delta:%d\n", ts, event->time_delta); 3432 break; 3433 3434 default: 3435 break; 3436 } 3437 } 3438 } 3439 3440 static DEFINE_PER_CPU(atomic_t, checking); 3441 static atomic_t ts_dump; 3442 3443 /* 3444 * Check if the current event time stamp matches the deltas on 3445 * the buffer page. 3446 */ 3447 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3448 struct rb_event_info *info, 3449 unsigned long tail) 3450 { 3451 struct ring_buffer_event *event; 3452 struct buffer_data_page *bpage; 3453 u64 ts, delta; 3454 bool full = false; 3455 int e; 3456 3457 bpage = info->tail_page->page; 3458 3459 if (tail == CHECK_FULL_PAGE) { 3460 full = true; 3461 tail = local_read(&bpage->commit); 3462 } else if (info->add_timestamp & 3463 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 3464 /* Ignore events with absolute time stamps */ 3465 return; 3466 } 3467 3468 /* 3469 * Do not check the first event (skip possible extends too). 3470 * Also do not check if previous events have not been committed. 3471 */ 3472 if (tail <= 8 || tail > local_read(&bpage->commit)) 3473 return; 3474 3475 /* 3476 * If this interrupted another event, 3477 */ 3478 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 3479 goto out; 3480 3481 ts = bpage->time_stamp; 3482 3483 for (e = 0; e < tail; e += rb_event_length(event)) { 3484 3485 event = (struct ring_buffer_event *)(bpage->data + e); 3486 3487 switch (event->type_len) { 3488 3489 case RINGBUF_TYPE_TIME_EXTEND: 3490 delta = rb_event_time_stamp(event); 3491 ts += delta; 3492 break; 3493 3494 case RINGBUF_TYPE_TIME_STAMP: 3495 delta = rb_event_time_stamp(event); 3496 ts = rb_fix_abs_ts(delta, ts); 3497 break; 3498 3499 case RINGBUF_TYPE_PADDING: 3500 if (event->time_delta == 1) 3501 break; 3502 fallthrough; 3503 case RINGBUF_TYPE_DATA: 3504 ts += event->time_delta; 3505 break; 3506 3507 default: 3508 RB_WARN_ON(cpu_buffer, 1); 3509 } 3510 } 3511 if ((full && ts > info->ts) || 3512 (!full && ts + info->delta != info->ts)) { 3513 /* If another report is happening, ignore this one */ 3514 if (atomic_inc_return(&ts_dump) != 1) { 3515 atomic_dec(&ts_dump); 3516 goto out; 3517 } 3518 atomic_inc(&cpu_buffer->record_disabled); 3519 /* There's some cases in boot up that this can happen */ 3520 WARN_ON_ONCE(system_state != SYSTEM_BOOTING); 3521 pr_warn("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s\n", 3522 cpu_buffer->cpu, 3523 ts + info->delta, info->ts, info->delta, 3524 info->before, info->after, 3525 full ? " (full)" : ""); 3526 dump_buffer_page(bpage, info, tail); 3527 atomic_dec(&ts_dump); 3528 /* Do not re-enable checking */ 3529 return; 3530 } 3531 out: 3532 atomic_dec(this_cpu_ptr(&checking)); 3533 } 3534 #else 3535 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3536 struct rb_event_info *info, 3537 unsigned long tail) 3538 { 3539 } 3540 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 3541 3542 static struct ring_buffer_event * 3543 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 3544 struct rb_event_info *info) 3545 { 3546 struct ring_buffer_event *event; 3547 struct buffer_page *tail_page; 3548 unsigned long tail, write, w; 3549 bool a_ok; 3550 bool b_ok; 3551 3552 /* Don't let the compiler play games with cpu_buffer->tail_page */ 3553 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 3554 3555 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 3556 barrier(); 3557 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before); 3558 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3559 barrier(); 3560 info->ts = rb_time_stamp(cpu_buffer->buffer); 3561 3562 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 3563 info->delta = info->ts; 3564 } else { 3565 /* 3566 * If interrupting an event time update, we may need an 3567 * absolute timestamp. 3568 * Don't bother if this is the start of a new page (w == 0). 3569 */ 3570 if (unlikely(!a_ok || !b_ok || (info->before != info->after && w))) { 3571 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 3572 info->length += RB_LEN_TIME_EXTEND; 3573 } else { 3574 info->delta = info->ts - info->after; 3575 if (unlikely(test_time_stamp(info->delta))) { 3576 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 3577 info->length += RB_LEN_TIME_EXTEND; 3578 } 3579 } 3580 } 3581 3582 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 3583 3584 /*C*/ write = local_add_return(info->length, &tail_page->write); 3585 3586 /* set write to only the index of the write */ 3587 write &= RB_WRITE_MASK; 3588 3589 tail = write - info->length; 3590 3591 /* See if we shot pass the end of this buffer page */ 3592 if (unlikely(write > BUF_PAGE_SIZE)) { 3593 /* before and after may now different, fix it up*/ 3594 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before); 3595 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3596 if (a_ok && b_ok && info->before != info->after) 3597 (void)rb_time_cmpxchg(&cpu_buffer->before_stamp, 3598 info->before, info->after); 3599 if (a_ok && b_ok) 3600 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 3601 return rb_move_tail(cpu_buffer, tail, info); 3602 } 3603 3604 if (likely(tail == w)) { 3605 u64 save_before; 3606 bool s_ok; 3607 3608 /* Nothing interrupted us between A and C */ 3609 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 3610 barrier(); 3611 /*E*/ s_ok = rb_time_read(&cpu_buffer->before_stamp, &save_before); 3612 RB_WARN_ON(cpu_buffer, !s_ok); 3613 if (likely(!(info->add_timestamp & 3614 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3615 /* This did not interrupt any time update */ 3616 info->delta = info->ts - info->after; 3617 else 3618 /* Just use full timestamp for interrupting event */ 3619 info->delta = info->ts; 3620 barrier(); 3621 check_buffer(cpu_buffer, info, tail); 3622 if (unlikely(info->ts != save_before)) { 3623 /* SLOW PATH - Interrupted between C and E */ 3624 3625 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3626 RB_WARN_ON(cpu_buffer, !a_ok); 3627 3628 /* Write stamp must only go forward */ 3629 if (save_before > info->after) { 3630 /* 3631 * We do not care about the result, only that 3632 * it gets updated atomically. 3633 */ 3634 (void)rb_time_cmpxchg(&cpu_buffer->write_stamp, 3635 info->after, save_before); 3636 } 3637 } 3638 } else { 3639 u64 ts; 3640 /* SLOW PATH - Interrupted between A and C */ 3641 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3642 /* Was interrupted before here, write_stamp must be valid */ 3643 RB_WARN_ON(cpu_buffer, !a_ok); 3644 ts = rb_time_stamp(cpu_buffer->buffer); 3645 barrier(); 3646 /*E*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 3647 info->after < ts && 3648 rb_time_cmpxchg(&cpu_buffer->write_stamp, 3649 info->after, ts)) { 3650 /* Nothing came after this event between C and E */ 3651 info->delta = ts - info->after; 3652 } else { 3653 /* 3654 * Interrupted between C and E: 3655 * Lost the previous events time stamp. Just set the 3656 * delta to zero, and this will be the same time as 3657 * the event this event interrupted. And the events that 3658 * came after this will still be correct (as they would 3659 * have built their delta on the previous event. 3660 */ 3661 info->delta = 0; 3662 } 3663 info->ts = ts; 3664 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 3665 } 3666 3667 /* 3668 * If this is the first commit on the page, then it has the same 3669 * timestamp as the page itself. 3670 */ 3671 if (unlikely(!tail && !(info->add_timestamp & 3672 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3673 info->delta = 0; 3674 3675 /* We reserved something on the buffer */ 3676 3677 event = __rb_page_index(tail_page, tail); 3678 rb_update_event(cpu_buffer, event, info); 3679 3680 local_inc(&tail_page->entries); 3681 3682 /* 3683 * If this is the first commit on the page, then update 3684 * its timestamp. 3685 */ 3686 if (unlikely(!tail)) 3687 tail_page->page->time_stamp = info->ts; 3688 3689 /* account for these added bytes */ 3690 local_add(info->length, &cpu_buffer->entries_bytes); 3691 3692 return event; 3693 } 3694 3695 static __always_inline struct ring_buffer_event * 3696 rb_reserve_next_event(struct trace_buffer *buffer, 3697 struct ring_buffer_per_cpu *cpu_buffer, 3698 unsigned long length) 3699 { 3700 struct ring_buffer_event *event; 3701 struct rb_event_info info; 3702 int nr_loops = 0; 3703 int add_ts_default; 3704 3705 rb_start_commit(cpu_buffer); 3706 /* The commit page can not change after this */ 3707 3708 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3709 /* 3710 * Due to the ability to swap a cpu buffer from a buffer 3711 * it is possible it was swapped before we committed. 3712 * (committing stops a swap). We check for it here and 3713 * if it happened, we have to fail the write. 3714 */ 3715 barrier(); 3716 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 3717 local_dec(&cpu_buffer->committing); 3718 local_dec(&cpu_buffer->commits); 3719 return NULL; 3720 } 3721 #endif 3722 3723 info.length = rb_calculate_event_length(length); 3724 3725 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 3726 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 3727 info.length += RB_LEN_TIME_EXTEND; 3728 } else { 3729 add_ts_default = RB_ADD_STAMP_NONE; 3730 } 3731 3732 again: 3733 info.add_timestamp = add_ts_default; 3734 info.delta = 0; 3735 3736 /* 3737 * We allow for interrupts to reenter here and do a trace. 3738 * If one does, it will cause this original code to loop 3739 * back here. Even with heavy interrupts happening, this 3740 * should only happen a few times in a row. If this happens 3741 * 1000 times in a row, there must be either an interrupt 3742 * storm or we have something buggy. 3743 * Bail! 3744 */ 3745 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 3746 goto out_fail; 3747 3748 event = __rb_reserve_next(cpu_buffer, &info); 3749 3750 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 3751 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 3752 info.length -= RB_LEN_TIME_EXTEND; 3753 goto again; 3754 } 3755 3756 if (likely(event)) 3757 return event; 3758 out_fail: 3759 rb_end_commit(cpu_buffer); 3760 return NULL; 3761 } 3762 3763 /** 3764 * ring_buffer_lock_reserve - reserve a part of the buffer 3765 * @buffer: the ring buffer to reserve from 3766 * @length: the length of the data to reserve (excluding event header) 3767 * 3768 * Returns a reserved event on the ring buffer to copy directly to. 3769 * The user of this interface will need to get the body to write into 3770 * and can use the ring_buffer_event_data() interface. 3771 * 3772 * The length is the length of the data needed, not the event length 3773 * which also includes the event header. 3774 * 3775 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 3776 * If NULL is returned, then nothing has been allocated or locked. 3777 */ 3778 struct ring_buffer_event * 3779 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 3780 { 3781 struct ring_buffer_per_cpu *cpu_buffer; 3782 struct ring_buffer_event *event; 3783 int cpu; 3784 3785 /* If we are tracing schedule, we don't want to recurse */ 3786 preempt_disable_notrace(); 3787 3788 if (unlikely(atomic_read(&buffer->record_disabled))) 3789 goto out; 3790 3791 cpu = raw_smp_processor_id(); 3792 3793 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 3794 goto out; 3795 3796 cpu_buffer = buffer->buffers[cpu]; 3797 3798 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 3799 goto out; 3800 3801 if (unlikely(length > BUF_MAX_DATA_SIZE)) 3802 goto out; 3803 3804 if (unlikely(trace_recursive_lock(cpu_buffer))) 3805 goto out; 3806 3807 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3808 if (!event) 3809 goto out_unlock; 3810 3811 return event; 3812 3813 out_unlock: 3814 trace_recursive_unlock(cpu_buffer); 3815 out: 3816 preempt_enable_notrace(); 3817 return NULL; 3818 } 3819 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 3820 3821 /* 3822 * Decrement the entries to the page that an event is on. 3823 * The event does not even need to exist, only the pointer 3824 * to the page it is on. This may only be called before the commit 3825 * takes place. 3826 */ 3827 static inline void 3828 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 3829 struct ring_buffer_event *event) 3830 { 3831 unsigned long addr = (unsigned long)event; 3832 struct buffer_page *bpage = cpu_buffer->commit_page; 3833 struct buffer_page *start; 3834 3835 addr &= PAGE_MASK; 3836 3837 /* Do the likely case first */ 3838 if (likely(bpage->page == (void *)addr)) { 3839 local_dec(&bpage->entries); 3840 return; 3841 } 3842 3843 /* 3844 * Because the commit page may be on the reader page we 3845 * start with the next page and check the end loop there. 3846 */ 3847 rb_inc_page(&bpage); 3848 start = bpage; 3849 do { 3850 if (bpage->page == (void *)addr) { 3851 local_dec(&bpage->entries); 3852 return; 3853 } 3854 rb_inc_page(&bpage); 3855 } while (bpage != start); 3856 3857 /* commit not part of this buffer?? */ 3858 RB_WARN_ON(cpu_buffer, 1); 3859 } 3860 3861 /** 3862 * ring_buffer_discard_commit - discard an event that has not been committed 3863 * @buffer: the ring buffer 3864 * @event: non committed event to discard 3865 * 3866 * Sometimes an event that is in the ring buffer needs to be ignored. 3867 * This function lets the user discard an event in the ring buffer 3868 * and then that event will not be read later. 3869 * 3870 * This function only works if it is called before the item has been 3871 * committed. It will try to free the event from the ring buffer 3872 * if another event has not been added behind it. 3873 * 3874 * If another event has been added behind it, it will set the event 3875 * up as discarded, and perform the commit. 3876 * 3877 * If this function is called, do not call ring_buffer_unlock_commit on 3878 * the event. 3879 */ 3880 void ring_buffer_discard_commit(struct trace_buffer *buffer, 3881 struct ring_buffer_event *event) 3882 { 3883 struct ring_buffer_per_cpu *cpu_buffer; 3884 int cpu; 3885 3886 /* The event is discarded regardless */ 3887 rb_event_discard(event); 3888 3889 cpu = smp_processor_id(); 3890 cpu_buffer = buffer->buffers[cpu]; 3891 3892 /* 3893 * This must only be called if the event has not been 3894 * committed yet. Thus we can assume that preemption 3895 * is still disabled. 3896 */ 3897 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 3898 3899 rb_decrement_entry(cpu_buffer, event); 3900 if (rb_try_to_discard(cpu_buffer, event)) 3901 goto out; 3902 3903 out: 3904 rb_end_commit(cpu_buffer); 3905 3906 trace_recursive_unlock(cpu_buffer); 3907 3908 preempt_enable_notrace(); 3909 3910 } 3911 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 3912 3913 /** 3914 * ring_buffer_write - write data to the buffer without reserving 3915 * @buffer: The ring buffer to write to. 3916 * @length: The length of the data being written (excluding the event header) 3917 * @data: The data to write to the buffer. 3918 * 3919 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 3920 * one function. If you already have the data to write to the buffer, it 3921 * may be easier to simply call this function. 3922 * 3923 * Note, like ring_buffer_lock_reserve, the length is the length of the data 3924 * and not the length of the event which would hold the header. 3925 */ 3926 int ring_buffer_write(struct trace_buffer *buffer, 3927 unsigned long length, 3928 void *data) 3929 { 3930 struct ring_buffer_per_cpu *cpu_buffer; 3931 struct ring_buffer_event *event; 3932 void *body; 3933 int ret = -EBUSY; 3934 int cpu; 3935 3936 preempt_disable_notrace(); 3937 3938 if (atomic_read(&buffer->record_disabled)) 3939 goto out; 3940 3941 cpu = raw_smp_processor_id(); 3942 3943 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3944 goto out; 3945 3946 cpu_buffer = buffer->buffers[cpu]; 3947 3948 if (atomic_read(&cpu_buffer->record_disabled)) 3949 goto out; 3950 3951 if (length > BUF_MAX_DATA_SIZE) 3952 goto out; 3953 3954 if (unlikely(trace_recursive_lock(cpu_buffer))) 3955 goto out; 3956 3957 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3958 if (!event) 3959 goto out_unlock; 3960 3961 body = rb_event_data(event); 3962 3963 memcpy(body, data, length); 3964 3965 rb_commit(cpu_buffer); 3966 3967 rb_wakeups(buffer, cpu_buffer); 3968 3969 ret = 0; 3970 3971 out_unlock: 3972 trace_recursive_unlock(cpu_buffer); 3973 3974 out: 3975 preempt_enable_notrace(); 3976 3977 return ret; 3978 } 3979 EXPORT_SYMBOL_GPL(ring_buffer_write); 3980 3981 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 3982 { 3983 struct buffer_page *reader = cpu_buffer->reader_page; 3984 struct buffer_page *head = rb_set_head_page(cpu_buffer); 3985 struct buffer_page *commit = cpu_buffer->commit_page; 3986 3987 /* In case of error, head will be NULL */ 3988 if (unlikely(!head)) 3989 return true; 3990 3991 /* Reader should exhaust content in reader page */ 3992 if (reader->read != rb_page_commit(reader)) 3993 return false; 3994 3995 /* 3996 * If writers are committing on the reader page, knowing all 3997 * committed content has been read, the ring buffer is empty. 3998 */ 3999 if (commit == reader) 4000 return true; 4001 4002 /* 4003 * If writers are committing on a page other than reader page 4004 * and head page, there should always be content to read. 4005 */ 4006 if (commit != head) 4007 return false; 4008 4009 /* 4010 * Writers are committing on the head page, we just need 4011 * to care about there're committed data, and the reader will 4012 * swap reader page with head page when it is to read data. 4013 */ 4014 return rb_page_commit(commit) == 0; 4015 } 4016 4017 /** 4018 * ring_buffer_record_disable - stop all writes into the buffer 4019 * @buffer: The ring buffer to stop writes to. 4020 * 4021 * This prevents all writes to the buffer. Any attempt to write 4022 * to the buffer after this will fail and return NULL. 4023 * 4024 * The caller should call synchronize_rcu() after this. 4025 */ 4026 void ring_buffer_record_disable(struct trace_buffer *buffer) 4027 { 4028 atomic_inc(&buffer->record_disabled); 4029 } 4030 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4031 4032 /** 4033 * ring_buffer_record_enable - enable writes to the buffer 4034 * @buffer: The ring buffer to enable writes 4035 * 4036 * Note, multiple disables will need the same number of enables 4037 * to truly enable the writing (much like preempt_disable). 4038 */ 4039 void ring_buffer_record_enable(struct trace_buffer *buffer) 4040 { 4041 atomic_dec(&buffer->record_disabled); 4042 } 4043 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4044 4045 /** 4046 * ring_buffer_record_off - stop all writes into the buffer 4047 * @buffer: The ring buffer to stop writes to. 4048 * 4049 * This prevents all writes to the buffer. Any attempt to write 4050 * to the buffer after this will fail and return NULL. 4051 * 4052 * This is different than ring_buffer_record_disable() as 4053 * it works like an on/off switch, where as the disable() version 4054 * must be paired with a enable(). 4055 */ 4056 void ring_buffer_record_off(struct trace_buffer *buffer) 4057 { 4058 unsigned int rd; 4059 unsigned int new_rd; 4060 4061 rd = atomic_read(&buffer->record_disabled); 4062 do { 4063 new_rd = rd | RB_BUFFER_OFF; 4064 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4065 } 4066 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4067 4068 /** 4069 * ring_buffer_record_on - restart writes into the buffer 4070 * @buffer: The ring buffer to start writes to. 4071 * 4072 * This enables all writes to the buffer that was disabled by 4073 * ring_buffer_record_off(). 4074 * 4075 * This is different than ring_buffer_record_enable() as 4076 * it works like an on/off switch, where as the enable() version 4077 * must be paired with a disable(). 4078 */ 4079 void ring_buffer_record_on(struct trace_buffer *buffer) 4080 { 4081 unsigned int rd; 4082 unsigned int new_rd; 4083 4084 rd = atomic_read(&buffer->record_disabled); 4085 do { 4086 new_rd = rd & ~RB_BUFFER_OFF; 4087 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4088 } 4089 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4090 4091 /** 4092 * ring_buffer_record_is_on - return true if the ring buffer can write 4093 * @buffer: The ring buffer to see if write is enabled 4094 * 4095 * Returns true if the ring buffer is in a state that it accepts writes. 4096 */ 4097 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4098 { 4099 return !atomic_read(&buffer->record_disabled); 4100 } 4101 4102 /** 4103 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4104 * @buffer: The ring buffer to see if write is set enabled 4105 * 4106 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4107 * Note that this does NOT mean it is in a writable state. 4108 * 4109 * It may return true when the ring buffer has been disabled by 4110 * ring_buffer_record_disable(), as that is a temporary disabling of 4111 * the ring buffer. 4112 */ 4113 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4114 { 4115 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4116 } 4117 4118 /** 4119 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4120 * @buffer: The ring buffer to stop writes to. 4121 * @cpu: The CPU buffer to stop 4122 * 4123 * This prevents all writes to the buffer. Any attempt to write 4124 * to the buffer after this will fail and return NULL. 4125 * 4126 * The caller should call synchronize_rcu() after this. 4127 */ 4128 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 4129 { 4130 struct ring_buffer_per_cpu *cpu_buffer; 4131 4132 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4133 return; 4134 4135 cpu_buffer = buffer->buffers[cpu]; 4136 atomic_inc(&cpu_buffer->record_disabled); 4137 } 4138 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 4139 4140 /** 4141 * ring_buffer_record_enable_cpu - enable writes to the buffer 4142 * @buffer: The ring buffer to enable writes 4143 * @cpu: The CPU to enable. 4144 * 4145 * Note, multiple disables will need the same number of enables 4146 * to truly enable the writing (much like preempt_disable). 4147 */ 4148 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 4149 { 4150 struct ring_buffer_per_cpu *cpu_buffer; 4151 4152 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4153 return; 4154 4155 cpu_buffer = buffer->buffers[cpu]; 4156 atomic_dec(&cpu_buffer->record_disabled); 4157 } 4158 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 4159 4160 /* 4161 * The total entries in the ring buffer is the running counter 4162 * of entries entered into the ring buffer, minus the sum of 4163 * the entries read from the ring buffer and the number of 4164 * entries that were overwritten. 4165 */ 4166 static inline unsigned long 4167 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4168 { 4169 return local_read(&cpu_buffer->entries) - 4170 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4171 } 4172 4173 /** 4174 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 4175 * @buffer: The ring buffer 4176 * @cpu: The per CPU buffer to read from. 4177 */ 4178 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 4179 { 4180 unsigned long flags; 4181 struct ring_buffer_per_cpu *cpu_buffer; 4182 struct buffer_page *bpage; 4183 u64 ret = 0; 4184 4185 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4186 return 0; 4187 4188 cpu_buffer = buffer->buffers[cpu]; 4189 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4190 /* 4191 * if the tail is on reader_page, oldest time stamp is on the reader 4192 * page 4193 */ 4194 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 4195 bpage = cpu_buffer->reader_page; 4196 else 4197 bpage = rb_set_head_page(cpu_buffer); 4198 if (bpage) 4199 ret = bpage->page->time_stamp; 4200 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4201 4202 return ret; 4203 } 4204 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 4205 4206 /** 4207 * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer 4208 * @buffer: The ring buffer 4209 * @cpu: The per CPU buffer to read from. 4210 */ 4211 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 4212 { 4213 struct ring_buffer_per_cpu *cpu_buffer; 4214 unsigned long ret; 4215 4216 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4217 return 0; 4218 4219 cpu_buffer = buffer->buffers[cpu]; 4220 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 4221 4222 return ret; 4223 } 4224 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 4225 4226 /** 4227 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 4228 * @buffer: The ring buffer 4229 * @cpu: The per CPU buffer to get the entries from. 4230 */ 4231 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 4232 { 4233 struct ring_buffer_per_cpu *cpu_buffer; 4234 4235 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4236 return 0; 4237 4238 cpu_buffer = buffer->buffers[cpu]; 4239 4240 return rb_num_of_entries(cpu_buffer); 4241 } 4242 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 4243 4244 /** 4245 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 4246 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 4247 * @buffer: The ring buffer 4248 * @cpu: The per CPU buffer to get the number of overruns from 4249 */ 4250 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 4251 { 4252 struct ring_buffer_per_cpu *cpu_buffer; 4253 unsigned long ret; 4254 4255 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4256 return 0; 4257 4258 cpu_buffer = buffer->buffers[cpu]; 4259 ret = local_read(&cpu_buffer->overrun); 4260 4261 return ret; 4262 } 4263 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 4264 4265 /** 4266 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 4267 * commits failing due to the buffer wrapping around while there are uncommitted 4268 * events, such as during an interrupt storm. 4269 * @buffer: The ring buffer 4270 * @cpu: The per CPU buffer to get the number of overruns from 4271 */ 4272 unsigned long 4273 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 4274 { 4275 struct ring_buffer_per_cpu *cpu_buffer; 4276 unsigned long ret; 4277 4278 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4279 return 0; 4280 4281 cpu_buffer = buffer->buffers[cpu]; 4282 ret = local_read(&cpu_buffer->commit_overrun); 4283 4284 return ret; 4285 } 4286 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 4287 4288 /** 4289 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 4290 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 4291 * @buffer: The ring buffer 4292 * @cpu: The per CPU buffer to get the number of overruns from 4293 */ 4294 unsigned long 4295 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 4296 { 4297 struct ring_buffer_per_cpu *cpu_buffer; 4298 unsigned long ret; 4299 4300 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4301 return 0; 4302 4303 cpu_buffer = buffer->buffers[cpu]; 4304 ret = local_read(&cpu_buffer->dropped_events); 4305 4306 return ret; 4307 } 4308 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 4309 4310 /** 4311 * ring_buffer_read_events_cpu - get the number of events successfully read 4312 * @buffer: The ring buffer 4313 * @cpu: The per CPU buffer to get the number of events read 4314 */ 4315 unsigned long 4316 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 4317 { 4318 struct ring_buffer_per_cpu *cpu_buffer; 4319 4320 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4321 return 0; 4322 4323 cpu_buffer = buffer->buffers[cpu]; 4324 return cpu_buffer->read; 4325 } 4326 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 4327 4328 /** 4329 * ring_buffer_entries - get the number of entries in a buffer 4330 * @buffer: The ring buffer 4331 * 4332 * Returns the total number of entries in the ring buffer 4333 * (all CPU entries) 4334 */ 4335 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 4336 { 4337 struct ring_buffer_per_cpu *cpu_buffer; 4338 unsigned long entries = 0; 4339 int cpu; 4340 4341 /* if you care about this being correct, lock the buffer */ 4342 for_each_buffer_cpu(buffer, cpu) { 4343 cpu_buffer = buffer->buffers[cpu]; 4344 entries += rb_num_of_entries(cpu_buffer); 4345 } 4346 4347 return entries; 4348 } 4349 EXPORT_SYMBOL_GPL(ring_buffer_entries); 4350 4351 /** 4352 * ring_buffer_overruns - get the number of overruns in buffer 4353 * @buffer: The ring buffer 4354 * 4355 * Returns the total number of overruns in the ring buffer 4356 * (all CPU entries) 4357 */ 4358 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 4359 { 4360 struct ring_buffer_per_cpu *cpu_buffer; 4361 unsigned long overruns = 0; 4362 int cpu; 4363 4364 /* if you care about this being correct, lock the buffer */ 4365 for_each_buffer_cpu(buffer, cpu) { 4366 cpu_buffer = buffer->buffers[cpu]; 4367 overruns += local_read(&cpu_buffer->overrun); 4368 } 4369 4370 return overruns; 4371 } 4372 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 4373 4374 static void rb_iter_reset(struct ring_buffer_iter *iter) 4375 { 4376 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4377 4378 /* Iterator usage is expected to have record disabled */ 4379 iter->head_page = cpu_buffer->reader_page; 4380 iter->head = cpu_buffer->reader_page->read; 4381 iter->next_event = iter->head; 4382 4383 iter->cache_reader_page = iter->head_page; 4384 iter->cache_read = cpu_buffer->read; 4385 4386 if (iter->head) { 4387 iter->read_stamp = cpu_buffer->read_stamp; 4388 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 4389 } else { 4390 iter->read_stamp = iter->head_page->page->time_stamp; 4391 iter->page_stamp = iter->read_stamp; 4392 } 4393 } 4394 4395 /** 4396 * ring_buffer_iter_reset - reset an iterator 4397 * @iter: The iterator to reset 4398 * 4399 * Resets the iterator, so that it will start from the beginning 4400 * again. 4401 */ 4402 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 4403 { 4404 struct ring_buffer_per_cpu *cpu_buffer; 4405 unsigned long flags; 4406 4407 if (!iter) 4408 return; 4409 4410 cpu_buffer = iter->cpu_buffer; 4411 4412 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4413 rb_iter_reset(iter); 4414 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4415 } 4416 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 4417 4418 /** 4419 * ring_buffer_iter_empty - check if an iterator has no more to read 4420 * @iter: The iterator to check 4421 */ 4422 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 4423 { 4424 struct ring_buffer_per_cpu *cpu_buffer; 4425 struct buffer_page *reader; 4426 struct buffer_page *head_page; 4427 struct buffer_page *commit_page; 4428 struct buffer_page *curr_commit_page; 4429 unsigned commit; 4430 u64 curr_commit_ts; 4431 u64 commit_ts; 4432 4433 cpu_buffer = iter->cpu_buffer; 4434 reader = cpu_buffer->reader_page; 4435 head_page = cpu_buffer->head_page; 4436 commit_page = cpu_buffer->commit_page; 4437 commit_ts = commit_page->page->time_stamp; 4438 4439 /* 4440 * When the writer goes across pages, it issues a cmpxchg which 4441 * is a mb(), which will synchronize with the rmb here. 4442 * (see rb_tail_page_update()) 4443 */ 4444 smp_rmb(); 4445 commit = rb_page_commit(commit_page); 4446 /* We want to make sure that the commit page doesn't change */ 4447 smp_rmb(); 4448 4449 /* Make sure commit page didn't change */ 4450 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 4451 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 4452 4453 /* If the commit page changed, then there's more data */ 4454 if (curr_commit_page != commit_page || 4455 curr_commit_ts != commit_ts) 4456 return 0; 4457 4458 /* Still racy, as it may return a false positive, but that's OK */ 4459 return ((iter->head_page == commit_page && iter->head >= commit) || 4460 (iter->head_page == reader && commit_page == head_page && 4461 head_page->read == commit && 4462 iter->head == rb_page_commit(cpu_buffer->reader_page))); 4463 } 4464 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 4465 4466 static void 4467 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 4468 struct ring_buffer_event *event) 4469 { 4470 u64 delta; 4471 4472 switch (event->type_len) { 4473 case RINGBUF_TYPE_PADDING: 4474 return; 4475 4476 case RINGBUF_TYPE_TIME_EXTEND: 4477 delta = rb_event_time_stamp(event); 4478 cpu_buffer->read_stamp += delta; 4479 return; 4480 4481 case RINGBUF_TYPE_TIME_STAMP: 4482 delta = rb_event_time_stamp(event); 4483 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 4484 cpu_buffer->read_stamp = delta; 4485 return; 4486 4487 case RINGBUF_TYPE_DATA: 4488 cpu_buffer->read_stamp += event->time_delta; 4489 return; 4490 4491 default: 4492 RB_WARN_ON(cpu_buffer, 1); 4493 } 4494 } 4495 4496 static void 4497 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 4498 struct ring_buffer_event *event) 4499 { 4500 u64 delta; 4501 4502 switch (event->type_len) { 4503 case RINGBUF_TYPE_PADDING: 4504 return; 4505 4506 case RINGBUF_TYPE_TIME_EXTEND: 4507 delta = rb_event_time_stamp(event); 4508 iter->read_stamp += delta; 4509 return; 4510 4511 case RINGBUF_TYPE_TIME_STAMP: 4512 delta = rb_event_time_stamp(event); 4513 delta = rb_fix_abs_ts(delta, iter->read_stamp); 4514 iter->read_stamp = delta; 4515 return; 4516 4517 case RINGBUF_TYPE_DATA: 4518 iter->read_stamp += event->time_delta; 4519 return; 4520 4521 default: 4522 RB_WARN_ON(iter->cpu_buffer, 1); 4523 } 4524 } 4525 4526 static struct buffer_page * 4527 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 4528 { 4529 struct buffer_page *reader = NULL; 4530 unsigned long overwrite; 4531 unsigned long flags; 4532 int nr_loops = 0; 4533 bool ret; 4534 4535 local_irq_save(flags); 4536 arch_spin_lock(&cpu_buffer->lock); 4537 4538 again: 4539 /* 4540 * This should normally only loop twice. But because the 4541 * start of the reader inserts an empty page, it causes 4542 * a case where we will loop three times. There should be no 4543 * reason to loop four times (that I know of). 4544 */ 4545 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 4546 reader = NULL; 4547 goto out; 4548 } 4549 4550 reader = cpu_buffer->reader_page; 4551 4552 /* If there's more to read, return this page */ 4553 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 4554 goto out; 4555 4556 /* Never should we have an index greater than the size */ 4557 if (RB_WARN_ON(cpu_buffer, 4558 cpu_buffer->reader_page->read > rb_page_size(reader))) 4559 goto out; 4560 4561 /* check if we caught up to the tail */ 4562 reader = NULL; 4563 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 4564 goto out; 4565 4566 /* Don't bother swapping if the ring buffer is empty */ 4567 if (rb_num_of_entries(cpu_buffer) == 0) 4568 goto out; 4569 4570 /* 4571 * Reset the reader page to size zero. 4572 */ 4573 local_set(&cpu_buffer->reader_page->write, 0); 4574 local_set(&cpu_buffer->reader_page->entries, 0); 4575 local_set(&cpu_buffer->reader_page->page->commit, 0); 4576 cpu_buffer->reader_page->real_end = 0; 4577 4578 spin: 4579 /* 4580 * Splice the empty reader page into the list around the head. 4581 */ 4582 reader = rb_set_head_page(cpu_buffer); 4583 if (!reader) 4584 goto out; 4585 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 4586 cpu_buffer->reader_page->list.prev = reader->list.prev; 4587 4588 /* 4589 * cpu_buffer->pages just needs to point to the buffer, it 4590 * has no specific buffer page to point to. Lets move it out 4591 * of our way so we don't accidentally swap it. 4592 */ 4593 cpu_buffer->pages = reader->list.prev; 4594 4595 /* The reader page will be pointing to the new head */ 4596 rb_set_list_to_head(&cpu_buffer->reader_page->list); 4597 4598 /* 4599 * We want to make sure we read the overruns after we set up our 4600 * pointers to the next object. The writer side does a 4601 * cmpxchg to cross pages which acts as the mb on the writer 4602 * side. Note, the reader will constantly fail the swap 4603 * while the writer is updating the pointers, so this 4604 * guarantees that the overwrite recorded here is the one we 4605 * want to compare with the last_overrun. 4606 */ 4607 smp_mb(); 4608 overwrite = local_read(&(cpu_buffer->overrun)); 4609 4610 /* 4611 * Here's the tricky part. 4612 * 4613 * We need to move the pointer past the header page. 4614 * But we can only do that if a writer is not currently 4615 * moving it. The page before the header page has the 4616 * flag bit '1' set if it is pointing to the page we want. 4617 * but if the writer is in the process of moving it 4618 * than it will be '2' or already moved '0'. 4619 */ 4620 4621 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 4622 4623 /* 4624 * If we did not convert it, then we must try again. 4625 */ 4626 if (!ret) 4627 goto spin; 4628 4629 /* 4630 * Yay! We succeeded in replacing the page. 4631 * 4632 * Now make the new head point back to the reader page. 4633 */ 4634 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 4635 rb_inc_page(&cpu_buffer->head_page); 4636 4637 local_inc(&cpu_buffer->pages_read); 4638 4639 /* Finally update the reader page to the new head */ 4640 cpu_buffer->reader_page = reader; 4641 cpu_buffer->reader_page->read = 0; 4642 4643 if (overwrite != cpu_buffer->last_overrun) { 4644 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 4645 cpu_buffer->last_overrun = overwrite; 4646 } 4647 4648 goto again; 4649 4650 out: 4651 /* Update the read_stamp on the first event */ 4652 if (reader && reader->read == 0) 4653 cpu_buffer->read_stamp = reader->page->time_stamp; 4654 4655 arch_spin_unlock(&cpu_buffer->lock); 4656 local_irq_restore(flags); 4657 4658 /* 4659 * The writer has preempt disable, wait for it. But not forever 4660 * Although, 1 second is pretty much "forever" 4661 */ 4662 #define USECS_WAIT 1000000 4663 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 4664 /* If the write is past the end of page, a writer is still updating it */ 4665 if (likely(!reader || rb_page_write(reader) <= BUF_PAGE_SIZE)) 4666 break; 4667 4668 udelay(1); 4669 4670 /* Get the latest version of the reader write value */ 4671 smp_rmb(); 4672 } 4673 4674 /* The writer is not moving forward? Something is wrong */ 4675 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 4676 reader = NULL; 4677 4678 /* 4679 * Make sure we see any padding after the write update 4680 * (see rb_reset_tail()) 4681 */ 4682 smp_rmb(); 4683 4684 4685 return reader; 4686 } 4687 4688 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 4689 { 4690 struct ring_buffer_event *event; 4691 struct buffer_page *reader; 4692 unsigned length; 4693 4694 reader = rb_get_reader_page(cpu_buffer); 4695 4696 /* This function should not be called when buffer is empty */ 4697 if (RB_WARN_ON(cpu_buffer, !reader)) 4698 return; 4699 4700 event = rb_reader_event(cpu_buffer); 4701 4702 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 4703 cpu_buffer->read++; 4704 4705 rb_update_read_stamp(cpu_buffer, event); 4706 4707 length = rb_event_length(event); 4708 cpu_buffer->reader_page->read += length; 4709 } 4710 4711 static void rb_advance_iter(struct ring_buffer_iter *iter) 4712 { 4713 struct ring_buffer_per_cpu *cpu_buffer; 4714 4715 cpu_buffer = iter->cpu_buffer; 4716 4717 /* If head == next_event then we need to jump to the next event */ 4718 if (iter->head == iter->next_event) { 4719 /* If the event gets overwritten again, there's nothing to do */ 4720 if (rb_iter_head_event(iter) == NULL) 4721 return; 4722 } 4723 4724 iter->head = iter->next_event; 4725 4726 /* 4727 * Check if we are at the end of the buffer. 4728 */ 4729 if (iter->next_event >= rb_page_size(iter->head_page)) { 4730 /* discarded commits can make the page empty */ 4731 if (iter->head_page == cpu_buffer->commit_page) 4732 return; 4733 rb_inc_iter(iter); 4734 return; 4735 } 4736 4737 rb_update_iter_read_stamp(iter, iter->event); 4738 } 4739 4740 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 4741 { 4742 return cpu_buffer->lost_events; 4743 } 4744 4745 static struct ring_buffer_event * 4746 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 4747 unsigned long *lost_events) 4748 { 4749 struct ring_buffer_event *event; 4750 struct buffer_page *reader; 4751 int nr_loops = 0; 4752 4753 if (ts) 4754 *ts = 0; 4755 again: 4756 /* 4757 * We repeat when a time extend is encountered. 4758 * Since the time extend is always attached to a data event, 4759 * we should never loop more than once. 4760 * (We never hit the following condition more than twice). 4761 */ 4762 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 4763 return NULL; 4764 4765 reader = rb_get_reader_page(cpu_buffer); 4766 if (!reader) 4767 return NULL; 4768 4769 event = rb_reader_event(cpu_buffer); 4770 4771 switch (event->type_len) { 4772 case RINGBUF_TYPE_PADDING: 4773 if (rb_null_event(event)) 4774 RB_WARN_ON(cpu_buffer, 1); 4775 /* 4776 * Because the writer could be discarding every 4777 * event it creates (which would probably be bad) 4778 * if we were to go back to "again" then we may never 4779 * catch up, and will trigger the warn on, or lock 4780 * the box. Return the padding, and we will release 4781 * the current locks, and try again. 4782 */ 4783 return event; 4784 4785 case RINGBUF_TYPE_TIME_EXTEND: 4786 /* Internal data, OK to advance */ 4787 rb_advance_reader(cpu_buffer); 4788 goto again; 4789 4790 case RINGBUF_TYPE_TIME_STAMP: 4791 if (ts) { 4792 *ts = rb_event_time_stamp(event); 4793 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 4794 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4795 cpu_buffer->cpu, ts); 4796 } 4797 /* Internal data, OK to advance */ 4798 rb_advance_reader(cpu_buffer); 4799 goto again; 4800 4801 case RINGBUF_TYPE_DATA: 4802 if (ts && !(*ts)) { 4803 *ts = cpu_buffer->read_stamp + event->time_delta; 4804 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4805 cpu_buffer->cpu, ts); 4806 } 4807 if (lost_events) 4808 *lost_events = rb_lost_events(cpu_buffer); 4809 return event; 4810 4811 default: 4812 RB_WARN_ON(cpu_buffer, 1); 4813 } 4814 4815 return NULL; 4816 } 4817 EXPORT_SYMBOL_GPL(ring_buffer_peek); 4818 4819 static struct ring_buffer_event * 4820 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4821 { 4822 struct trace_buffer *buffer; 4823 struct ring_buffer_per_cpu *cpu_buffer; 4824 struct ring_buffer_event *event; 4825 int nr_loops = 0; 4826 4827 if (ts) 4828 *ts = 0; 4829 4830 cpu_buffer = iter->cpu_buffer; 4831 buffer = cpu_buffer->buffer; 4832 4833 /* 4834 * Check if someone performed a consuming read to 4835 * the buffer. A consuming read invalidates the iterator 4836 * and we need to reset the iterator in this case. 4837 */ 4838 if (unlikely(iter->cache_read != cpu_buffer->read || 4839 iter->cache_reader_page != cpu_buffer->reader_page)) 4840 rb_iter_reset(iter); 4841 4842 again: 4843 if (ring_buffer_iter_empty(iter)) 4844 return NULL; 4845 4846 /* 4847 * As the writer can mess with what the iterator is trying 4848 * to read, just give up if we fail to get an event after 4849 * three tries. The iterator is not as reliable when reading 4850 * the ring buffer with an active write as the consumer is. 4851 * Do not warn if the three failures is reached. 4852 */ 4853 if (++nr_loops > 3) 4854 return NULL; 4855 4856 if (rb_per_cpu_empty(cpu_buffer)) 4857 return NULL; 4858 4859 if (iter->head >= rb_page_size(iter->head_page)) { 4860 rb_inc_iter(iter); 4861 goto again; 4862 } 4863 4864 event = rb_iter_head_event(iter); 4865 if (!event) 4866 goto again; 4867 4868 switch (event->type_len) { 4869 case RINGBUF_TYPE_PADDING: 4870 if (rb_null_event(event)) { 4871 rb_inc_iter(iter); 4872 goto again; 4873 } 4874 rb_advance_iter(iter); 4875 return event; 4876 4877 case RINGBUF_TYPE_TIME_EXTEND: 4878 /* Internal data, OK to advance */ 4879 rb_advance_iter(iter); 4880 goto again; 4881 4882 case RINGBUF_TYPE_TIME_STAMP: 4883 if (ts) { 4884 *ts = rb_event_time_stamp(event); 4885 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 4886 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4887 cpu_buffer->cpu, ts); 4888 } 4889 /* Internal data, OK to advance */ 4890 rb_advance_iter(iter); 4891 goto again; 4892 4893 case RINGBUF_TYPE_DATA: 4894 if (ts && !(*ts)) { 4895 *ts = iter->read_stamp + event->time_delta; 4896 ring_buffer_normalize_time_stamp(buffer, 4897 cpu_buffer->cpu, ts); 4898 } 4899 return event; 4900 4901 default: 4902 RB_WARN_ON(cpu_buffer, 1); 4903 } 4904 4905 return NULL; 4906 } 4907 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 4908 4909 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 4910 { 4911 if (likely(!in_nmi())) { 4912 raw_spin_lock(&cpu_buffer->reader_lock); 4913 return true; 4914 } 4915 4916 /* 4917 * If an NMI die dumps out the content of the ring buffer 4918 * trylock must be used to prevent a deadlock if the NMI 4919 * preempted a task that holds the ring buffer locks. If 4920 * we get the lock then all is fine, if not, then continue 4921 * to do the read, but this can corrupt the ring buffer, 4922 * so it must be permanently disabled from future writes. 4923 * Reading from NMI is a oneshot deal. 4924 */ 4925 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 4926 return true; 4927 4928 /* Continue without locking, but disable the ring buffer */ 4929 atomic_inc(&cpu_buffer->record_disabled); 4930 return false; 4931 } 4932 4933 static inline void 4934 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 4935 { 4936 if (likely(locked)) 4937 raw_spin_unlock(&cpu_buffer->reader_lock); 4938 } 4939 4940 /** 4941 * ring_buffer_peek - peek at the next event to be read 4942 * @buffer: The ring buffer to read 4943 * @cpu: The cpu to peak at 4944 * @ts: The timestamp counter of this event. 4945 * @lost_events: a variable to store if events were lost (may be NULL) 4946 * 4947 * This will return the event that will be read next, but does 4948 * not consume the data. 4949 */ 4950 struct ring_buffer_event * 4951 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 4952 unsigned long *lost_events) 4953 { 4954 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4955 struct ring_buffer_event *event; 4956 unsigned long flags; 4957 bool dolock; 4958 4959 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4960 return NULL; 4961 4962 again: 4963 local_irq_save(flags); 4964 dolock = rb_reader_lock(cpu_buffer); 4965 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4966 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4967 rb_advance_reader(cpu_buffer); 4968 rb_reader_unlock(cpu_buffer, dolock); 4969 local_irq_restore(flags); 4970 4971 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4972 goto again; 4973 4974 return event; 4975 } 4976 4977 /** ring_buffer_iter_dropped - report if there are dropped events 4978 * @iter: The ring buffer iterator 4979 * 4980 * Returns true if there was dropped events since the last peek. 4981 */ 4982 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 4983 { 4984 bool ret = iter->missed_events != 0; 4985 4986 iter->missed_events = 0; 4987 return ret; 4988 } 4989 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 4990 4991 /** 4992 * ring_buffer_iter_peek - peek at the next event to be read 4993 * @iter: The ring buffer iterator 4994 * @ts: The timestamp counter of this event. 4995 * 4996 * This will return the event that will be read next, but does 4997 * not increment the iterator. 4998 */ 4999 struct ring_buffer_event * 5000 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5001 { 5002 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5003 struct ring_buffer_event *event; 5004 unsigned long flags; 5005 5006 again: 5007 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5008 event = rb_iter_peek(iter, ts); 5009 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5010 5011 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5012 goto again; 5013 5014 return event; 5015 } 5016 5017 /** 5018 * ring_buffer_consume - return an event and consume it 5019 * @buffer: The ring buffer to get the next event from 5020 * @cpu: the cpu to read the buffer from 5021 * @ts: a variable to store the timestamp (may be NULL) 5022 * @lost_events: a variable to store if events were lost (may be NULL) 5023 * 5024 * Returns the next event in the ring buffer, and that event is consumed. 5025 * Meaning, that sequential reads will keep returning a different event, 5026 * and eventually empty the ring buffer if the producer is slower. 5027 */ 5028 struct ring_buffer_event * 5029 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5030 unsigned long *lost_events) 5031 { 5032 struct ring_buffer_per_cpu *cpu_buffer; 5033 struct ring_buffer_event *event = NULL; 5034 unsigned long flags; 5035 bool dolock; 5036 5037 again: 5038 /* might be called in atomic */ 5039 preempt_disable(); 5040 5041 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5042 goto out; 5043 5044 cpu_buffer = buffer->buffers[cpu]; 5045 local_irq_save(flags); 5046 dolock = rb_reader_lock(cpu_buffer); 5047 5048 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5049 if (event) { 5050 cpu_buffer->lost_events = 0; 5051 rb_advance_reader(cpu_buffer); 5052 } 5053 5054 rb_reader_unlock(cpu_buffer, dolock); 5055 local_irq_restore(flags); 5056 5057 out: 5058 preempt_enable(); 5059 5060 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5061 goto again; 5062 5063 return event; 5064 } 5065 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5066 5067 /** 5068 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 5069 * @buffer: The ring buffer to read from 5070 * @cpu: The cpu buffer to iterate over 5071 * @flags: gfp flags to use for memory allocation 5072 * 5073 * This performs the initial preparations necessary to iterate 5074 * through the buffer. Memory is allocated, buffer recording 5075 * is disabled, and the iterator pointer is returned to the caller. 5076 * 5077 * Disabling buffer recording prevents the reading from being 5078 * corrupted. This is not a consuming read, so a producer is not 5079 * expected. 5080 * 5081 * After a sequence of ring_buffer_read_prepare calls, the user is 5082 * expected to make at least one call to ring_buffer_read_prepare_sync. 5083 * Afterwards, ring_buffer_read_start is invoked to get things going 5084 * for real. 5085 * 5086 * This overall must be paired with ring_buffer_read_finish. 5087 */ 5088 struct ring_buffer_iter * 5089 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 5090 { 5091 struct ring_buffer_per_cpu *cpu_buffer; 5092 struct ring_buffer_iter *iter; 5093 5094 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5095 return NULL; 5096 5097 iter = kzalloc(sizeof(*iter), flags); 5098 if (!iter) 5099 return NULL; 5100 5101 iter->event = kmalloc(BUF_MAX_DATA_SIZE, flags); 5102 if (!iter->event) { 5103 kfree(iter); 5104 return NULL; 5105 } 5106 5107 cpu_buffer = buffer->buffers[cpu]; 5108 5109 iter->cpu_buffer = cpu_buffer; 5110 5111 atomic_inc(&cpu_buffer->resize_disabled); 5112 5113 return iter; 5114 } 5115 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 5116 5117 /** 5118 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 5119 * 5120 * All previously invoked ring_buffer_read_prepare calls to prepare 5121 * iterators will be synchronized. Afterwards, read_buffer_read_start 5122 * calls on those iterators are allowed. 5123 */ 5124 void 5125 ring_buffer_read_prepare_sync(void) 5126 { 5127 synchronize_rcu(); 5128 } 5129 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 5130 5131 /** 5132 * ring_buffer_read_start - start a non consuming read of the buffer 5133 * @iter: The iterator returned by ring_buffer_read_prepare 5134 * 5135 * This finalizes the startup of an iteration through the buffer. 5136 * The iterator comes from a call to ring_buffer_read_prepare and 5137 * an intervening ring_buffer_read_prepare_sync must have been 5138 * performed. 5139 * 5140 * Must be paired with ring_buffer_read_finish. 5141 */ 5142 void 5143 ring_buffer_read_start(struct ring_buffer_iter *iter) 5144 { 5145 struct ring_buffer_per_cpu *cpu_buffer; 5146 unsigned long flags; 5147 5148 if (!iter) 5149 return; 5150 5151 cpu_buffer = iter->cpu_buffer; 5152 5153 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5154 arch_spin_lock(&cpu_buffer->lock); 5155 rb_iter_reset(iter); 5156 arch_spin_unlock(&cpu_buffer->lock); 5157 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5158 } 5159 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5160 5161 /** 5162 * ring_buffer_read_finish - finish reading the iterator of the buffer 5163 * @iter: The iterator retrieved by ring_buffer_start 5164 * 5165 * This re-enables the recording to the buffer, and frees the 5166 * iterator. 5167 */ 5168 void 5169 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5170 { 5171 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5172 unsigned long flags; 5173 5174 /* 5175 * Ring buffer is disabled from recording, here's a good place 5176 * to check the integrity of the ring buffer. 5177 * Must prevent readers from trying to read, as the check 5178 * clears the HEAD page and readers require it. 5179 */ 5180 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5181 rb_check_pages(cpu_buffer); 5182 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5183 5184 atomic_dec(&cpu_buffer->resize_disabled); 5185 kfree(iter->event); 5186 kfree(iter); 5187 } 5188 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 5189 5190 /** 5191 * ring_buffer_iter_advance - advance the iterator to the next location 5192 * @iter: The ring buffer iterator 5193 * 5194 * Move the location of the iterator such that the next read will 5195 * be the next location of the iterator. 5196 */ 5197 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 5198 { 5199 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5200 unsigned long flags; 5201 5202 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5203 5204 rb_advance_iter(iter); 5205 5206 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5207 } 5208 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 5209 5210 /** 5211 * ring_buffer_size - return the size of the ring buffer (in bytes) 5212 * @buffer: The ring buffer. 5213 * @cpu: The CPU to get ring buffer size from. 5214 */ 5215 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 5216 { 5217 /* 5218 * Earlier, this method returned 5219 * BUF_PAGE_SIZE * buffer->nr_pages 5220 * Since the nr_pages field is now removed, we have converted this to 5221 * return the per cpu buffer value. 5222 */ 5223 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5224 return 0; 5225 5226 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages; 5227 } 5228 EXPORT_SYMBOL_GPL(ring_buffer_size); 5229 5230 static void 5231 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 5232 { 5233 rb_head_page_deactivate(cpu_buffer); 5234 5235 cpu_buffer->head_page 5236 = list_entry(cpu_buffer->pages, struct buffer_page, list); 5237 local_set(&cpu_buffer->head_page->write, 0); 5238 local_set(&cpu_buffer->head_page->entries, 0); 5239 local_set(&cpu_buffer->head_page->page->commit, 0); 5240 5241 cpu_buffer->head_page->read = 0; 5242 5243 cpu_buffer->tail_page = cpu_buffer->head_page; 5244 cpu_buffer->commit_page = cpu_buffer->head_page; 5245 5246 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 5247 INIT_LIST_HEAD(&cpu_buffer->new_pages); 5248 local_set(&cpu_buffer->reader_page->write, 0); 5249 local_set(&cpu_buffer->reader_page->entries, 0); 5250 local_set(&cpu_buffer->reader_page->page->commit, 0); 5251 cpu_buffer->reader_page->read = 0; 5252 5253 local_set(&cpu_buffer->entries_bytes, 0); 5254 local_set(&cpu_buffer->overrun, 0); 5255 local_set(&cpu_buffer->commit_overrun, 0); 5256 local_set(&cpu_buffer->dropped_events, 0); 5257 local_set(&cpu_buffer->entries, 0); 5258 local_set(&cpu_buffer->committing, 0); 5259 local_set(&cpu_buffer->commits, 0); 5260 local_set(&cpu_buffer->pages_touched, 0); 5261 local_set(&cpu_buffer->pages_lost, 0); 5262 local_set(&cpu_buffer->pages_read, 0); 5263 cpu_buffer->last_pages_touch = 0; 5264 cpu_buffer->shortest_full = 0; 5265 cpu_buffer->read = 0; 5266 cpu_buffer->read_bytes = 0; 5267 5268 rb_time_set(&cpu_buffer->write_stamp, 0); 5269 rb_time_set(&cpu_buffer->before_stamp, 0); 5270 5271 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 5272 5273 cpu_buffer->lost_events = 0; 5274 cpu_buffer->last_overrun = 0; 5275 5276 rb_head_page_activate(cpu_buffer); 5277 } 5278 5279 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 5280 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 5281 { 5282 unsigned long flags; 5283 5284 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5285 5286 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 5287 goto out; 5288 5289 arch_spin_lock(&cpu_buffer->lock); 5290 5291 rb_reset_cpu(cpu_buffer); 5292 5293 arch_spin_unlock(&cpu_buffer->lock); 5294 5295 out: 5296 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5297 } 5298 5299 /** 5300 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 5301 * @buffer: The ring buffer to reset a per cpu buffer of 5302 * @cpu: The CPU buffer to be reset 5303 */ 5304 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 5305 { 5306 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5307 5308 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5309 return; 5310 5311 /* prevent another thread from changing buffer sizes */ 5312 mutex_lock(&buffer->mutex); 5313 5314 atomic_inc(&cpu_buffer->resize_disabled); 5315 atomic_inc(&cpu_buffer->record_disabled); 5316 5317 /* Make sure all commits have finished */ 5318 synchronize_rcu(); 5319 5320 reset_disabled_cpu_buffer(cpu_buffer); 5321 5322 atomic_dec(&cpu_buffer->record_disabled); 5323 atomic_dec(&cpu_buffer->resize_disabled); 5324 5325 mutex_unlock(&buffer->mutex); 5326 } 5327 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 5328 5329 /** 5330 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 5331 * @buffer: The ring buffer to reset a per cpu buffer of 5332 * @cpu: The CPU buffer to be reset 5333 */ 5334 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 5335 { 5336 struct ring_buffer_per_cpu *cpu_buffer; 5337 int cpu; 5338 5339 /* prevent another thread from changing buffer sizes */ 5340 mutex_lock(&buffer->mutex); 5341 5342 for_each_online_buffer_cpu(buffer, cpu) { 5343 cpu_buffer = buffer->buffers[cpu]; 5344 5345 atomic_inc(&cpu_buffer->resize_disabled); 5346 atomic_inc(&cpu_buffer->record_disabled); 5347 } 5348 5349 /* Make sure all commits have finished */ 5350 synchronize_rcu(); 5351 5352 for_each_online_buffer_cpu(buffer, cpu) { 5353 cpu_buffer = buffer->buffers[cpu]; 5354 5355 reset_disabled_cpu_buffer(cpu_buffer); 5356 5357 atomic_dec(&cpu_buffer->record_disabled); 5358 atomic_dec(&cpu_buffer->resize_disabled); 5359 } 5360 5361 mutex_unlock(&buffer->mutex); 5362 } 5363 5364 /** 5365 * ring_buffer_reset - reset a ring buffer 5366 * @buffer: The ring buffer to reset all cpu buffers 5367 */ 5368 void ring_buffer_reset(struct trace_buffer *buffer) 5369 { 5370 struct ring_buffer_per_cpu *cpu_buffer; 5371 int cpu; 5372 5373 /* prevent another thread from changing buffer sizes */ 5374 mutex_lock(&buffer->mutex); 5375 5376 for_each_buffer_cpu(buffer, cpu) { 5377 cpu_buffer = buffer->buffers[cpu]; 5378 5379 atomic_inc(&cpu_buffer->resize_disabled); 5380 atomic_inc(&cpu_buffer->record_disabled); 5381 } 5382 5383 /* Make sure all commits have finished */ 5384 synchronize_rcu(); 5385 5386 for_each_buffer_cpu(buffer, cpu) { 5387 cpu_buffer = buffer->buffers[cpu]; 5388 5389 reset_disabled_cpu_buffer(cpu_buffer); 5390 5391 atomic_dec(&cpu_buffer->record_disabled); 5392 atomic_dec(&cpu_buffer->resize_disabled); 5393 } 5394 5395 mutex_unlock(&buffer->mutex); 5396 } 5397 EXPORT_SYMBOL_GPL(ring_buffer_reset); 5398 5399 /** 5400 * ring_buffer_empty - is the ring buffer empty? 5401 * @buffer: The ring buffer to test 5402 */ 5403 bool ring_buffer_empty(struct trace_buffer *buffer) 5404 { 5405 struct ring_buffer_per_cpu *cpu_buffer; 5406 unsigned long flags; 5407 bool dolock; 5408 bool ret; 5409 int cpu; 5410 5411 /* yes this is racy, but if you don't like the race, lock the buffer */ 5412 for_each_buffer_cpu(buffer, cpu) { 5413 cpu_buffer = buffer->buffers[cpu]; 5414 local_irq_save(flags); 5415 dolock = rb_reader_lock(cpu_buffer); 5416 ret = rb_per_cpu_empty(cpu_buffer); 5417 rb_reader_unlock(cpu_buffer, dolock); 5418 local_irq_restore(flags); 5419 5420 if (!ret) 5421 return false; 5422 } 5423 5424 return true; 5425 } 5426 EXPORT_SYMBOL_GPL(ring_buffer_empty); 5427 5428 /** 5429 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 5430 * @buffer: The ring buffer 5431 * @cpu: The CPU buffer to test 5432 */ 5433 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 5434 { 5435 struct ring_buffer_per_cpu *cpu_buffer; 5436 unsigned long flags; 5437 bool dolock; 5438 bool ret; 5439 5440 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5441 return true; 5442 5443 cpu_buffer = buffer->buffers[cpu]; 5444 local_irq_save(flags); 5445 dolock = rb_reader_lock(cpu_buffer); 5446 ret = rb_per_cpu_empty(cpu_buffer); 5447 rb_reader_unlock(cpu_buffer, dolock); 5448 local_irq_restore(flags); 5449 5450 return ret; 5451 } 5452 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 5453 5454 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 5455 /** 5456 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 5457 * @buffer_a: One buffer to swap with 5458 * @buffer_b: The other buffer to swap with 5459 * @cpu: the CPU of the buffers to swap 5460 * 5461 * This function is useful for tracers that want to take a "snapshot" 5462 * of a CPU buffer and has another back up buffer lying around. 5463 * it is expected that the tracer handles the cpu buffer not being 5464 * used at the moment. 5465 */ 5466 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 5467 struct trace_buffer *buffer_b, int cpu) 5468 { 5469 struct ring_buffer_per_cpu *cpu_buffer_a; 5470 struct ring_buffer_per_cpu *cpu_buffer_b; 5471 int ret = -EINVAL; 5472 5473 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 5474 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 5475 goto out; 5476 5477 cpu_buffer_a = buffer_a->buffers[cpu]; 5478 cpu_buffer_b = buffer_b->buffers[cpu]; 5479 5480 /* At least make sure the two buffers are somewhat the same */ 5481 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 5482 goto out; 5483 5484 ret = -EAGAIN; 5485 5486 if (atomic_read(&buffer_a->record_disabled)) 5487 goto out; 5488 5489 if (atomic_read(&buffer_b->record_disabled)) 5490 goto out; 5491 5492 if (atomic_read(&cpu_buffer_a->record_disabled)) 5493 goto out; 5494 5495 if (atomic_read(&cpu_buffer_b->record_disabled)) 5496 goto out; 5497 5498 /* 5499 * We can't do a synchronize_rcu here because this 5500 * function can be called in atomic context. 5501 * Normally this will be called from the same CPU as cpu. 5502 * If not it's up to the caller to protect this. 5503 */ 5504 atomic_inc(&cpu_buffer_a->record_disabled); 5505 atomic_inc(&cpu_buffer_b->record_disabled); 5506 5507 ret = -EBUSY; 5508 if (local_read(&cpu_buffer_a->committing)) 5509 goto out_dec; 5510 if (local_read(&cpu_buffer_b->committing)) 5511 goto out_dec; 5512 5513 buffer_a->buffers[cpu] = cpu_buffer_b; 5514 buffer_b->buffers[cpu] = cpu_buffer_a; 5515 5516 cpu_buffer_b->buffer = buffer_a; 5517 cpu_buffer_a->buffer = buffer_b; 5518 5519 ret = 0; 5520 5521 out_dec: 5522 atomic_dec(&cpu_buffer_a->record_disabled); 5523 atomic_dec(&cpu_buffer_b->record_disabled); 5524 out: 5525 return ret; 5526 } 5527 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 5528 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 5529 5530 /** 5531 * ring_buffer_alloc_read_page - allocate a page to read from buffer 5532 * @buffer: the buffer to allocate for. 5533 * @cpu: the cpu buffer to allocate. 5534 * 5535 * This function is used in conjunction with ring_buffer_read_page. 5536 * When reading a full page from the ring buffer, these functions 5537 * can be used to speed up the process. The calling function should 5538 * allocate a few pages first with this function. Then when it 5539 * needs to get pages from the ring buffer, it passes the result 5540 * of this function into ring_buffer_read_page, which will swap 5541 * the page that was allocated, with the read page of the buffer. 5542 * 5543 * Returns: 5544 * The page allocated, or ERR_PTR 5545 */ 5546 void *ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 5547 { 5548 struct ring_buffer_per_cpu *cpu_buffer; 5549 struct buffer_data_page *bpage = NULL; 5550 unsigned long flags; 5551 struct page *page; 5552 5553 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5554 return ERR_PTR(-ENODEV); 5555 5556 cpu_buffer = buffer->buffers[cpu]; 5557 local_irq_save(flags); 5558 arch_spin_lock(&cpu_buffer->lock); 5559 5560 if (cpu_buffer->free_page) { 5561 bpage = cpu_buffer->free_page; 5562 cpu_buffer->free_page = NULL; 5563 } 5564 5565 arch_spin_unlock(&cpu_buffer->lock); 5566 local_irq_restore(flags); 5567 5568 if (bpage) 5569 goto out; 5570 5571 page = alloc_pages_node(cpu_to_node(cpu), 5572 GFP_KERNEL | __GFP_NORETRY, 0); 5573 if (!page) 5574 return ERR_PTR(-ENOMEM); 5575 5576 bpage = page_address(page); 5577 5578 out: 5579 rb_init_page(bpage); 5580 5581 return bpage; 5582 } 5583 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 5584 5585 /** 5586 * ring_buffer_free_read_page - free an allocated read page 5587 * @buffer: the buffer the page was allocate for 5588 * @cpu: the cpu buffer the page came from 5589 * @data: the page to free 5590 * 5591 * Free a page allocated from ring_buffer_alloc_read_page. 5592 */ 5593 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, void *data) 5594 { 5595 struct ring_buffer_per_cpu *cpu_buffer; 5596 struct buffer_data_page *bpage = data; 5597 struct page *page = virt_to_page(bpage); 5598 unsigned long flags; 5599 5600 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 5601 return; 5602 5603 cpu_buffer = buffer->buffers[cpu]; 5604 5605 /* If the page is still in use someplace else, we can't reuse it */ 5606 if (page_ref_count(page) > 1) 5607 goto out; 5608 5609 local_irq_save(flags); 5610 arch_spin_lock(&cpu_buffer->lock); 5611 5612 if (!cpu_buffer->free_page) { 5613 cpu_buffer->free_page = bpage; 5614 bpage = NULL; 5615 } 5616 5617 arch_spin_unlock(&cpu_buffer->lock); 5618 local_irq_restore(flags); 5619 5620 out: 5621 free_page((unsigned long)bpage); 5622 } 5623 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 5624 5625 /** 5626 * ring_buffer_read_page - extract a page from the ring buffer 5627 * @buffer: buffer to extract from 5628 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 5629 * @len: amount to extract 5630 * @cpu: the cpu of the buffer to extract 5631 * @full: should the extraction only happen when the page is full. 5632 * 5633 * This function will pull out a page from the ring buffer and consume it. 5634 * @data_page must be the address of the variable that was returned 5635 * from ring_buffer_alloc_read_page. This is because the page might be used 5636 * to swap with a page in the ring buffer. 5637 * 5638 * for example: 5639 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 5640 * if (IS_ERR(rpage)) 5641 * return PTR_ERR(rpage); 5642 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 5643 * if (ret >= 0) 5644 * process_page(rpage, ret); 5645 * 5646 * When @full is set, the function will not return true unless 5647 * the writer is off the reader page. 5648 * 5649 * Note: it is up to the calling functions to handle sleeps and wakeups. 5650 * The ring buffer can be used anywhere in the kernel and can not 5651 * blindly call wake_up. The layer that uses the ring buffer must be 5652 * responsible for that. 5653 * 5654 * Returns: 5655 * >=0 if data has been transferred, returns the offset of consumed data. 5656 * <0 if no data has been transferred. 5657 */ 5658 int ring_buffer_read_page(struct trace_buffer *buffer, 5659 void **data_page, size_t len, int cpu, int full) 5660 { 5661 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5662 struct ring_buffer_event *event; 5663 struct buffer_data_page *bpage; 5664 struct buffer_page *reader; 5665 unsigned long missed_events; 5666 unsigned long flags; 5667 unsigned int commit; 5668 unsigned int read; 5669 u64 save_timestamp; 5670 int ret = -1; 5671 5672 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5673 goto out; 5674 5675 /* 5676 * If len is not big enough to hold the page header, then 5677 * we can not copy anything. 5678 */ 5679 if (len <= BUF_PAGE_HDR_SIZE) 5680 goto out; 5681 5682 len -= BUF_PAGE_HDR_SIZE; 5683 5684 if (!data_page) 5685 goto out; 5686 5687 bpage = *data_page; 5688 if (!bpage) 5689 goto out; 5690 5691 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5692 5693 reader = rb_get_reader_page(cpu_buffer); 5694 if (!reader) 5695 goto out_unlock; 5696 5697 event = rb_reader_event(cpu_buffer); 5698 5699 read = reader->read; 5700 commit = rb_page_commit(reader); 5701 5702 /* Check if any events were dropped */ 5703 missed_events = cpu_buffer->lost_events; 5704 5705 /* 5706 * If this page has been partially read or 5707 * if len is not big enough to read the rest of the page or 5708 * a writer is still on the page, then 5709 * we must copy the data from the page to the buffer. 5710 * Otherwise, we can simply swap the page with the one passed in. 5711 */ 5712 if (read || (len < (commit - read)) || 5713 cpu_buffer->reader_page == cpu_buffer->commit_page) { 5714 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 5715 unsigned int rpos = read; 5716 unsigned int pos = 0; 5717 unsigned int size; 5718 5719 /* 5720 * If a full page is expected, this can still be returned 5721 * if there's been a previous partial read and the 5722 * rest of the page can be read and the commit page is off 5723 * the reader page. 5724 */ 5725 if (full && 5726 (!read || (len < (commit - read)) || 5727 cpu_buffer->reader_page == cpu_buffer->commit_page)) 5728 goto out_unlock; 5729 5730 if (len > (commit - read)) 5731 len = (commit - read); 5732 5733 /* Always keep the time extend and data together */ 5734 size = rb_event_ts_length(event); 5735 5736 if (len < size) 5737 goto out_unlock; 5738 5739 /* save the current timestamp, since the user will need it */ 5740 save_timestamp = cpu_buffer->read_stamp; 5741 5742 /* Need to copy one event at a time */ 5743 do { 5744 /* We need the size of one event, because 5745 * rb_advance_reader only advances by one event, 5746 * whereas rb_event_ts_length may include the size of 5747 * one or two events. 5748 * We have already ensured there's enough space if this 5749 * is a time extend. */ 5750 size = rb_event_length(event); 5751 memcpy(bpage->data + pos, rpage->data + rpos, size); 5752 5753 len -= size; 5754 5755 rb_advance_reader(cpu_buffer); 5756 rpos = reader->read; 5757 pos += size; 5758 5759 if (rpos >= commit) 5760 break; 5761 5762 event = rb_reader_event(cpu_buffer); 5763 /* Always keep the time extend and data together */ 5764 size = rb_event_ts_length(event); 5765 } while (len >= size); 5766 5767 /* update bpage */ 5768 local_set(&bpage->commit, pos); 5769 bpage->time_stamp = save_timestamp; 5770 5771 /* we copied everything to the beginning */ 5772 read = 0; 5773 } else { 5774 /* update the entry counter */ 5775 cpu_buffer->read += rb_page_entries(reader); 5776 cpu_buffer->read_bytes += BUF_PAGE_SIZE; 5777 5778 /* swap the pages */ 5779 rb_init_page(bpage); 5780 bpage = reader->page; 5781 reader->page = *data_page; 5782 local_set(&reader->write, 0); 5783 local_set(&reader->entries, 0); 5784 reader->read = 0; 5785 *data_page = bpage; 5786 5787 /* 5788 * Use the real_end for the data size, 5789 * This gives us a chance to store the lost events 5790 * on the page. 5791 */ 5792 if (reader->real_end) 5793 local_set(&bpage->commit, reader->real_end); 5794 } 5795 ret = read; 5796 5797 cpu_buffer->lost_events = 0; 5798 5799 commit = local_read(&bpage->commit); 5800 /* 5801 * Set a flag in the commit field if we lost events 5802 */ 5803 if (missed_events) { 5804 /* If there is room at the end of the page to save the 5805 * missed events, then record it there. 5806 */ 5807 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) { 5808 memcpy(&bpage->data[commit], &missed_events, 5809 sizeof(missed_events)); 5810 local_add(RB_MISSED_STORED, &bpage->commit); 5811 commit += sizeof(missed_events); 5812 } 5813 local_add(RB_MISSED_EVENTS, &bpage->commit); 5814 } 5815 5816 /* 5817 * This page may be off to user land. Zero it out here. 5818 */ 5819 if (commit < BUF_PAGE_SIZE) 5820 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit); 5821 5822 out_unlock: 5823 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5824 5825 out: 5826 return ret; 5827 } 5828 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 5829 5830 /* 5831 * We only allocate new buffers, never free them if the CPU goes down. 5832 * If we were to free the buffer, then the user would lose any trace that was in 5833 * the buffer. 5834 */ 5835 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 5836 { 5837 struct trace_buffer *buffer; 5838 long nr_pages_same; 5839 int cpu_i; 5840 unsigned long nr_pages; 5841 5842 buffer = container_of(node, struct trace_buffer, node); 5843 if (cpumask_test_cpu(cpu, buffer->cpumask)) 5844 return 0; 5845 5846 nr_pages = 0; 5847 nr_pages_same = 1; 5848 /* check if all cpu sizes are same */ 5849 for_each_buffer_cpu(buffer, cpu_i) { 5850 /* fill in the size from first enabled cpu */ 5851 if (nr_pages == 0) 5852 nr_pages = buffer->buffers[cpu_i]->nr_pages; 5853 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 5854 nr_pages_same = 0; 5855 break; 5856 } 5857 } 5858 /* allocate minimum pages, user can later expand it */ 5859 if (!nr_pages_same) 5860 nr_pages = 2; 5861 buffer->buffers[cpu] = 5862 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 5863 if (!buffer->buffers[cpu]) { 5864 WARN(1, "failed to allocate ring buffer on CPU %u\n", 5865 cpu); 5866 return -ENOMEM; 5867 } 5868 smp_wmb(); 5869 cpumask_set_cpu(cpu, buffer->cpumask); 5870 return 0; 5871 } 5872 5873 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 5874 /* 5875 * This is a basic integrity check of the ring buffer. 5876 * Late in the boot cycle this test will run when configured in. 5877 * It will kick off a thread per CPU that will go into a loop 5878 * writing to the per cpu ring buffer various sizes of data. 5879 * Some of the data will be large items, some small. 5880 * 5881 * Another thread is created that goes into a spin, sending out 5882 * IPIs to the other CPUs to also write into the ring buffer. 5883 * this is to test the nesting ability of the buffer. 5884 * 5885 * Basic stats are recorded and reported. If something in the 5886 * ring buffer should happen that's not expected, a big warning 5887 * is displayed and all ring buffers are disabled. 5888 */ 5889 static struct task_struct *rb_threads[NR_CPUS] __initdata; 5890 5891 struct rb_test_data { 5892 struct trace_buffer *buffer; 5893 unsigned long events; 5894 unsigned long bytes_written; 5895 unsigned long bytes_alloc; 5896 unsigned long bytes_dropped; 5897 unsigned long events_nested; 5898 unsigned long bytes_written_nested; 5899 unsigned long bytes_alloc_nested; 5900 unsigned long bytes_dropped_nested; 5901 int min_size_nested; 5902 int max_size_nested; 5903 int max_size; 5904 int min_size; 5905 int cpu; 5906 int cnt; 5907 }; 5908 5909 static struct rb_test_data rb_data[NR_CPUS] __initdata; 5910 5911 /* 1 meg per cpu */ 5912 #define RB_TEST_BUFFER_SIZE 1048576 5913 5914 static char rb_string[] __initdata = 5915 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 5916 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 5917 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 5918 5919 static bool rb_test_started __initdata; 5920 5921 struct rb_item { 5922 int size; 5923 char str[]; 5924 }; 5925 5926 static __init int rb_write_something(struct rb_test_data *data, bool nested) 5927 { 5928 struct ring_buffer_event *event; 5929 struct rb_item *item; 5930 bool started; 5931 int event_len; 5932 int size; 5933 int len; 5934 int cnt; 5935 5936 /* Have nested writes different that what is written */ 5937 cnt = data->cnt + (nested ? 27 : 0); 5938 5939 /* Multiply cnt by ~e, to make some unique increment */ 5940 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 5941 5942 len = size + sizeof(struct rb_item); 5943 5944 started = rb_test_started; 5945 /* read rb_test_started before checking buffer enabled */ 5946 smp_rmb(); 5947 5948 event = ring_buffer_lock_reserve(data->buffer, len); 5949 if (!event) { 5950 /* Ignore dropped events before test starts. */ 5951 if (started) { 5952 if (nested) 5953 data->bytes_dropped += len; 5954 else 5955 data->bytes_dropped_nested += len; 5956 } 5957 return len; 5958 } 5959 5960 event_len = ring_buffer_event_length(event); 5961 5962 if (RB_WARN_ON(data->buffer, event_len < len)) 5963 goto out; 5964 5965 item = ring_buffer_event_data(event); 5966 item->size = size; 5967 memcpy(item->str, rb_string, size); 5968 5969 if (nested) { 5970 data->bytes_alloc_nested += event_len; 5971 data->bytes_written_nested += len; 5972 data->events_nested++; 5973 if (!data->min_size_nested || len < data->min_size_nested) 5974 data->min_size_nested = len; 5975 if (len > data->max_size_nested) 5976 data->max_size_nested = len; 5977 } else { 5978 data->bytes_alloc += event_len; 5979 data->bytes_written += len; 5980 data->events++; 5981 if (!data->min_size || len < data->min_size) 5982 data->max_size = len; 5983 if (len > data->max_size) 5984 data->max_size = len; 5985 } 5986 5987 out: 5988 ring_buffer_unlock_commit(data->buffer); 5989 5990 return 0; 5991 } 5992 5993 static __init int rb_test(void *arg) 5994 { 5995 struct rb_test_data *data = arg; 5996 5997 while (!kthread_should_stop()) { 5998 rb_write_something(data, false); 5999 data->cnt++; 6000 6001 set_current_state(TASK_INTERRUPTIBLE); 6002 /* Now sleep between a min of 100-300us and a max of 1ms */ 6003 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 6004 } 6005 6006 return 0; 6007 } 6008 6009 static __init void rb_ipi(void *ignore) 6010 { 6011 struct rb_test_data *data; 6012 int cpu = smp_processor_id(); 6013 6014 data = &rb_data[cpu]; 6015 rb_write_something(data, true); 6016 } 6017 6018 static __init int rb_hammer_test(void *arg) 6019 { 6020 while (!kthread_should_stop()) { 6021 6022 /* Send an IPI to all cpus to write data! */ 6023 smp_call_function(rb_ipi, NULL, 1); 6024 /* No sleep, but for non preempt, let others run */ 6025 schedule(); 6026 } 6027 6028 return 0; 6029 } 6030 6031 static __init int test_ringbuffer(void) 6032 { 6033 struct task_struct *rb_hammer; 6034 struct trace_buffer *buffer; 6035 int cpu; 6036 int ret = 0; 6037 6038 if (security_locked_down(LOCKDOWN_TRACEFS)) { 6039 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 6040 return 0; 6041 } 6042 6043 pr_info("Running ring buffer tests...\n"); 6044 6045 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 6046 if (WARN_ON(!buffer)) 6047 return 0; 6048 6049 /* Disable buffer so that threads can't write to it yet */ 6050 ring_buffer_record_off(buffer); 6051 6052 for_each_online_cpu(cpu) { 6053 rb_data[cpu].buffer = buffer; 6054 rb_data[cpu].cpu = cpu; 6055 rb_data[cpu].cnt = cpu; 6056 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 6057 cpu, "rbtester/%u"); 6058 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 6059 pr_cont("FAILED\n"); 6060 ret = PTR_ERR(rb_threads[cpu]); 6061 goto out_free; 6062 } 6063 } 6064 6065 /* Now create the rb hammer! */ 6066 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 6067 if (WARN_ON(IS_ERR(rb_hammer))) { 6068 pr_cont("FAILED\n"); 6069 ret = PTR_ERR(rb_hammer); 6070 goto out_free; 6071 } 6072 6073 ring_buffer_record_on(buffer); 6074 /* 6075 * Show buffer is enabled before setting rb_test_started. 6076 * Yes there's a small race window where events could be 6077 * dropped and the thread wont catch it. But when a ring 6078 * buffer gets enabled, there will always be some kind of 6079 * delay before other CPUs see it. Thus, we don't care about 6080 * those dropped events. We care about events dropped after 6081 * the threads see that the buffer is active. 6082 */ 6083 smp_wmb(); 6084 rb_test_started = true; 6085 6086 set_current_state(TASK_INTERRUPTIBLE); 6087 /* Just run for 10 seconds */; 6088 schedule_timeout(10 * HZ); 6089 6090 kthread_stop(rb_hammer); 6091 6092 out_free: 6093 for_each_online_cpu(cpu) { 6094 if (!rb_threads[cpu]) 6095 break; 6096 kthread_stop(rb_threads[cpu]); 6097 } 6098 if (ret) { 6099 ring_buffer_free(buffer); 6100 return ret; 6101 } 6102 6103 /* Report! */ 6104 pr_info("finished\n"); 6105 for_each_online_cpu(cpu) { 6106 struct ring_buffer_event *event; 6107 struct rb_test_data *data = &rb_data[cpu]; 6108 struct rb_item *item; 6109 unsigned long total_events; 6110 unsigned long total_dropped; 6111 unsigned long total_written; 6112 unsigned long total_alloc; 6113 unsigned long total_read = 0; 6114 unsigned long total_size = 0; 6115 unsigned long total_len = 0; 6116 unsigned long total_lost = 0; 6117 unsigned long lost; 6118 int big_event_size; 6119 int small_event_size; 6120 6121 ret = -1; 6122 6123 total_events = data->events + data->events_nested; 6124 total_written = data->bytes_written + data->bytes_written_nested; 6125 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 6126 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 6127 6128 big_event_size = data->max_size + data->max_size_nested; 6129 small_event_size = data->min_size + data->min_size_nested; 6130 6131 pr_info("CPU %d:\n", cpu); 6132 pr_info(" events: %ld\n", total_events); 6133 pr_info(" dropped bytes: %ld\n", total_dropped); 6134 pr_info(" alloced bytes: %ld\n", total_alloc); 6135 pr_info(" written bytes: %ld\n", total_written); 6136 pr_info(" biggest event: %d\n", big_event_size); 6137 pr_info(" smallest event: %d\n", small_event_size); 6138 6139 if (RB_WARN_ON(buffer, total_dropped)) 6140 break; 6141 6142 ret = 0; 6143 6144 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 6145 total_lost += lost; 6146 item = ring_buffer_event_data(event); 6147 total_len += ring_buffer_event_length(event); 6148 total_size += item->size + sizeof(struct rb_item); 6149 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 6150 pr_info("FAILED!\n"); 6151 pr_info("buffer had: %.*s\n", item->size, item->str); 6152 pr_info("expected: %.*s\n", item->size, rb_string); 6153 RB_WARN_ON(buffer, 1); 6154 ret = -1; 6155 break; 6156 } 6157 total_read++; 6158 } 6159 if (ret) 6160 break; 6161 6162 ret = -1; 6163 6164 pr_info(" read events: %ld\n", total_read); 6165 pr_info(" lost events: %ld\n", total_lost); 6166 pr_info(" total events: %ld\n", total_lost + total_read); 6167 pr_info(" recorded len bytes: %ld\n", total_len); 6168 pr_info(" recorded size bytes: %ld\n", total_size); 6169 if (total_lost) { 6170 pr_info(" With dropped events, record len and size may not match\n" 6171 " alloced and written from above\n"); 6172 } else { 6173 if (RB_WARN_ON(buffer, total_len != total_alloc || 6174 total_size != total_written)) 6175 break; 6176 } 6177 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 6178 break; 6179 6180 ret = 0; 6181 } 6182 if (!ret) 6183 pr_info("Ring buffer PASSED!\n"); 6184 6185 ring_buffer_free(buffer); 6186 return 0; 6187 } 6188 6189 late_initcall(test_ringbuffer); 6190 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 6191