1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <[email protected]> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/trace_seq.h> 13 #include <linux/spinlock.h> 14 #include <linux/irq_work.h> 15 #include <linux/security.h> 16 #include <linux/uaccess.h> 17 #include <linux/hardirq.h> 18 #include <linux/kthread.h> /* for self test */ 19 #include <linux/module.h> 20 #include <linux/percpu.h> 21 #include <linux/mutex.h> 22 #include <linux/delay.h> 23 #include <linux/slab.h> 24 #include <linux/init.h> 25 #include <linux/hash.h> 26 #include <linux/list.h> 27 #include <linux/cpu.h> 28 #include <linux/oom.h> 29 30 #include <asm/local64.h> 31 #include <asm/local.h> 32 33 /* 34 * The "absolute" timestamp in the buffer is only 59 bits. 35 * If a clock has the 5 MSBs set, it needs to be saved and 36 * reinserted. 37 */ 38 #define TS_MSB (0xf8ULL << 56) 39 #define ABS_TS_MASK (~TS_MSB) 40 41 static void update_pages_handler(struct work_struct *work); 42 43 /* 44 * The ring buffer header is special. We must manually up keep it. 45 */ 46 int ring_buffer_print_entry_header(struct trace_seq *s) 47 { 48 trace_seq_puts(s, "# compressed entry header\n"); 49 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 50 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 51 trace_seq_puts(s, "\tarray : 32 bits\n"); 52 trace_seq_putc(s, '\n'); 53 trace_seq_printf(s, "\tpadding : type == %d\n", 54 RINGBUF_TYPE_PADDING); 55 trace_seq_printf(s, "\ttime_extend : type == %d\n", 56 RINGBUF_TYPE_TIME_EXTEND); 57 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 58 RINGBUF_TYPE_TIME_STAMP); 59 trace_seq_printf(s, "\tdata max type_len == %d\n", 60 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 61 62 return !trace_seq_has_overflowed(s); 63 } 64 65 /* 66 * The ring buffer is made up of a list of pages. A separate list of pages is 67 * allocated for each CPU. A writer may only write to a buffer that is 68 * associated with the CPU it is currently executing on. A reader may read 69 * from any per cpu buffer. 70 * 71 * The reader is special. For each per cpu buffer, the reader has its own 72 * reader page. When a reader has read the entire reader page, this reader 73 * page is swapped with another page in the ring buffer. 74 * 75 * Now, as long as the writer is off the reader page, the reader can do what 76 * ever it wants with that page. The writer will never write to that page 77 * again (as long as it is out of the ring buffer). 78 * 79 * Here's some silly ASCII art. 80 * 81 * +------+ 82 * |reader| RING BUFFER 83 * |page | 84 * +------+ +---+ +---+ +---+ 85 * | |-->| |-->| | 86 * +---+ +---+ +---+ 87 * ^ | 88 * | | 89 * +---------------+ 90 * 91 * 92 * +------+ 93 * |reader| RING BUFFER 94 * |page |------------------v 95 * +------+ +---+ +---+ +---+ 96 * | |-->| |-->| | 97 * +---+ +---+ +---+ 98 * ^ | 99 * | | 100 * +---------------+ 101 * 102 * 103 * +------+ 104 * |reader| RING BUFFER 105 * |page |------------------v 106 * +------+ +---+ +---+ +---+ 107 * ^ | |-->| |-->| | 108 * | +---+ +---+ +---+ 109 * | | 110 * | | 111 * +------------------------------+ 112 * 113 * 114 * +------+ 115 * |buffer| RING BUFFER 116 * |page |------------------v 117 * +------+ +---+ +---+ +---+ 118 * ^ | | | |-->| | 119 * | New +---+ +---+ +---+ 120 * | Reader------^ | 121 * | page | 122 * +------------------------------+ 123 * 124 * 125 * After we make this swap, the reader can hand this page off to the splice 126 * code and be done with it. It can even allocate a new page if it needs to 127 * and swap that into the ring buffer. 128 * 129 * We will be using cmpxchg soon to make all this lockless. 130 * 131 */ 132 133 /* Used for individual buffers (after the counter) */ 134 #define RB_BUFFER_OFF (1 << 20) 135 136 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 137 138 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 139 #define RB_ALIGNMENT 4U 140 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 141 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 142 143 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 144 # define RB_FORCE_8BYTE_ALIGNMENT 0 145 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 146 #else 147 # define RB_FORCE_8BYTE_ALIGNMENT 1 148 # define RB_ARCH_ALIGNMENT 8U 149 #endif 150 151 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 152 153 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 154 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 155 156 enum { 157 RB_LEN_TIME_EXTEND = 8, 158 RB_LEN_TIME_STAMP = 8, 159 }; 160 161 #define skip_time_extend(event) \ 162 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 163 164 #define extended_time(event) \ 165 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 166 167 static inline bool rb_null_event(struct ring_buffer_event *event) 168 { 169 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 170 } 171 172 static void rb_event_set_padding(struct ring_buffer_event *event) 173 { 174 /* padding has a NULL time_delta */ 175 event->type_len = RINGBUF_TYPE_PADDING; 176 event->time_delta = 0; 177 } 178 179 static unsigned 180 rb_event_data_length(struct ring_buffer_event *event) 181 { 182 unsigned length; 183 184 if (event->type_len) 185 length = event->type_len * RB_ALIGNMENT; 186 else 187 length = event->array[0]; 188 return length + RB_EVNT_HDR_SIZE; 189 } 190 191 /* 192 * Return the length of the given event. Will return 193 * the length of the time extend if the event is a 194 * time extend. 195 */ 196 static inline unsigned 197 rb_event_length(struct ring_buffer_event *event) 198 { 199 switch (event->type_len) { 200 case RINGBUF_TYPE_PADDING: 201 if (rb_null_event(event)) 202 /* undefined */ 203 return -1; 204 return event->array[0] + RB_EVNT_HDR_SIZE; 205 206 case RINGBUF_TYPE_TIME_EXTEND: 207 return RB_LEN_TIME_EXTEND; 208 209 case RINGBUF_TYPE_TIME_STAMP: 210 return RB_LEN_TIME_STAMP; 211 212 case RINGBUF_TYPE_DATA: 213 return rb_event_data_length(event); 214 default: 215 WARN_ON_ONCE(1); 216 } 217 /* not hit */ 218 return 0; 219 } 220 221 /* 222 * Return total length of time extend and data, 223 * or just the event length for all other events. 224 */ 225 static inline unsigned 226 rb_event_ts_length(struct ring_buffer_event *event) 227 { 228 unsigned len = 0; 229 230 if (extended_time(event)) { 231 /* time extends include the data event after it */ 232 len = RB_LEN_TIME_EXTEND; 233 event = skip_time_extend(event); 234 } 235 return len + rb_event_length(event); 236 } 237 238 /** 239 * ring_buffer_event_length - return the length of the event 240 * @event: the event to get the length of 241 * 242 * Returns the size of the data load of a data event. 243 * If the event is something other than a data event, it 244 * returns the size of the event itself. With the exception 245 * of a TIME EXTEND, where it still returns the size of the 246 * data load of the data event after it. 247 */ 248 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 249 { 250 unsigned length; 251 252 if (extended_time(event)) 253 event = skip_time_extend(event); 254 255 length = rb_event_length(event); 256 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 257 return length; 258 length -= RB_EVNT_HDR_SIZE; 259 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 260 length -= sizeof(event->array[0]); 261 return length; 262 } 263 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 264 265 /* inline for ring buffer fast paths */ 266 static __always_inline void * 267 rb_event_data(struct ring_buffer_event *event) 268 { 269 if (extended_time(event)) 270 event = skip_time_extend(event); 271 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 272 /* If length is in len field, then array[0] has the data */ 273 if (event->type_len) 274 return (void *)&event->array[0]; 275 /* Otherwise length is in array[0] and array[1] has the data */ 276 return (void *)&event->array[1]; 277 } 278 279 /** 280 * ring_buffer_event_data - return the data of the event 281 * @event: the event to get the data from 282 */ 283 void *ring_buffer_event_data(struct ring_buffer_event *event) 284 { 285 return rb_event_data(event); 286 } 287 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 288 289 #define for_each_buffer_cpu(buffer, cpu) \ 290 for_each_cpu(cpu, buffer->cpumask) 291 292 #define for_each_online_buffer_cpu(buffer, cpu) \ 293 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 294 295 #define TS_SHIFT 27 296 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 297 #define TS_DELTA_TEST (~TS_MASK) 298 299 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 300 { 301 u64 ts; 302 303 ts = event->array[0]; 304 ts <<= TS_SHIFT; 305 ts += event->time_delta; 306 307 return ts; 308 } 309 310 /* Flag when events were overwritten */ 311 #define RB_MISSED_EVENTS (1 << 31) 312 /* Missed count stored at end */ 313 #define RB_MISSED_STORED (1 << 30) 314 315 struct buffer_data_page { 316 u64 time_stamp; /* page time stamp */ 317 local_t commit; /* write committed index */ 318 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 319 }; 320 321 struct buffer_data_read_page { 322 unsigned order; /* order of the page */ 323 struct buffer_data_page *data; /* actual data, stored in this page */ 324 }; 325 326 /* 327 * Note, the buffer_page list must be first. The buffer pages 328 * are allocated in cache lines, which means that each buffer 329 * page will be at the beginning of a cache line, and thus 330 * the least significant bits will be zero. We use this to 331 * add flags in the list struct pointers, to make the ring buffer 332 * lockless. 333 */ 334 struct buffer_page { 335 struct list_head list; /* list of buffer pages */ 336 local_t write; /* index for next write */ 337 unsigned read; /* index for next read */ 338 local_t entries; /* entries on this page */ 339 unsigned long real_end; /* real end of data */ 340 unsigned order; /* order of the page */ 341 struct buffer_data_page *page; /* Actual data page */ 342 }; 343 344 /* 345 * The buffer page counters, write and entries, must be reset 346 * atomically when crossing page boundaries. To synchronize this 347 * update, two counters are inserted into the number. One is 348 * the actual counter for the write position or count on the page. 349 * 350 * The other is a counter of updaters. Before an update happens 351 * the update partition of the counter is incremented. This will 352 * allow the updater to update the counter atomically. 353 * 354 * The counter is 20 bits, and the state data is 12. 355 */ 356 #define RB_WRITE_MASK 0xfffff 357 #define RB_WRITE_INTCNT (1 << 20) 358 359 static void rb_init_page(struct buffer_data_page *bpage) 360 { 361 local_set(&bpage->commit, 0); 362 } 363 364 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 365 { 366 return local_read(&bpage->page->commit); 367 } 368 369 static void free_buffer_page(struct buffer_page *bpage) 370 { 371 free_pages((unsigned long)bpage->page, bpage->order); 372 kfree(bpage); 373 } 374 375 /* 376 * We need to fit the time_stamp delta into 27 bits. 377 */ 378 static inline bool test_time_stamp(u64 delta) 379 { 380 return !!(delta & TS_DELTA_TEST); 381 } 382 383 struct rb_irq_work { 384 struct irq_work work; 385 wait_queue_head_t waiters; 386 wait_queue_head_t full_waiters; 387 long wait_index; 388 bool waiters_pending; 389 bool full_waiters_pending; 390 bool wakeup_full; 391 }; 392 393 /* 394 * Structure to hold event state and handle nested events. 395 */ 396 struct rb_event_info { 397 u64 ts; 398 u64 delta; 399 u64 before; 400 u64 after; 401 unsigned long length; 402 struct buffer_page *tail_page; 403 int add_timestamp; 404 }; 405 406 /* 407 * Used for the add_timestamp 408 * NONE 409 * EXTEND - wants a time extend 410 * ABSOLUTE - the buffer requests all events to have absolute time stamps 411 * FORCE - force a full time stamp. 412 */ 413 enum { 414 RB_ADD_STAMP_NONE = 0, 415 RB_ADD_STAMP_EXTEND = BIT(1), 416 RB_ADD_STAMP_ABSOLUTE = BIT(2), 417 RB_ADD_STAMP_FORCE = BIT(3) 418 }; 419 /* 420 * Used for which event context the event is in. 421 * TRANSITION = 0 422 * NMI = 1 423 * IRQ = 2 424 * SOFTIRQ = 3 425 * NORMAL = 4 426 * 427 * See trace_recursive_lock() comment below for more details. 428 */ 429 enum { 430 RB_CTX_TRANSITION, 431 RB_CTX_NMI, 432 RB_CTX_IRQ, 433 RB_CTX_SOFTIRQ, 434 RB_CTX_NORMAL, 435 RB_CTX_MAX 436 }; 437 438 struct rb_time_struct { 439 local64_t time; 440 }; 441 typedef struct rb_time_struct rb_time_t; 442 443 #define MAX_NEST 5 444 445 /* 446 * head_page == tail_page && head == tail then buffer is empty. 447 */ 448 struct ring_buffer_per_cpu { 449 int cpu; 450 atomic_t record_disabled; 451 atomic_t resize_disabled; 452 struct trace_buffer *buffer; 453 raw_spinlock_t reader_lock; /* serialize readers */ 454 arch_spinlock_t lock; 455 struct lock_class_key lock_key; 456 struct buffer_data_page *free_page; 457 unsigned long nr_pages; 458 unsigned int current_context; 459 struct list_head *pages; 460 struct buffer_page *head_page; /* read from head */ 461 struct buffer_page *tail_page; /* write to tail */ 462 struct buffer_page *commit_page; /* committed pages */ 463 struct buffer_page *reader_page; 464 unsigned long lost_events; 465 unsigned long last_overrun; 466 unsigned long nest; 467 local_t entries_bytes; 468 local_t entries; 469 local_t overrun; 470 local_t commit_overrun; 471 local_t dropped_events; 472 local_t committing; 473 local_t commits; 474 local_t pages_touched; 475 local_t pages_lost; 476 local_t pages_read; 477 long last_pages_touch; 478 size_t shortest_full; 479 unsigned long read; 480 unsigned long read_bytes; 481 rb_time_t write_stamp; 482 rb_time_t before_stamp; 483 u64 event_stamp[MAX_NEST]; 484 u64 read_stamp; 485 /* pages removed since last reset */ 486 unsigned long pages_removed; 487 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 488 long nr_pages_to_update; 489 struct list_head new_pages; /* new pages to add */ 490 struct work_struct update_pages_work; 491 struct completion update_done; 492 493 struct rb_irq_work irq_work; 494 }; 495 496 struct trace_buffer { 497 unsigned flags; 498 int cpus; 499 atomic_t record_disabled; 500 atomic_t resizing; 501 cpumask_var_t cpumask; 502 503 struct lock_class_key *reader_lock_key; 504 505 struct mutex mutex; 506 507 struct ring_buffer_per_cpu **buffers; 508 509 struct hlist_node node; 510 u64 (*clock)(void); 511 512 struct rb_irq_work irq_work; 513 bool time_stamp_abs; 514 515 unsigned int subbuf_size; 516 unsigned int subbuf_order; 517 unsigned int max_data_size; 518 }; 519 520 struct ring_buffer_iter { 521 struct ring_buffer_per_cpu *cpu_buffer; 522 unsigned long head; 523 unsigned long next_event; 524 struct buffer_page *head_page; 525 struct buffer_page *cache_reader_page; 526 unsigned long cache_read; 527 unsigned long cache_pages_removed; 528 u64 read_stamp; 529 u64 page_stamp; 530 struct ring_buffer_event *event; 531 size_t event_size; 532 int missed_events; 533 }; 534 535 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 536 { 537 struct buffer_data_page field; 538 539 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 540 "offset:0;\tsize:%u;\tsigned:%u;\n", 541 (unsigned int)sizeof(field.time_stamp), 542 (unsigned int)is_signed_type(u64)); 543 544 trace_seq_printf(s, "\tfield: local_t commit;\t" 545 "offset:%u;\tsize:%u;\tsigned:%u;\n", 546 (unsigned int)offsetof(typeof(field), commit), 547 (unsigned int)sizeof(field.commit), 548 (unsigned int)is_signed_type(long)); 549 550 trace_seq_printf(s, "\tfield: int overwrite;\t" 551 "offset:%u;\tsize:%u;\tsigned:%u;\n", 552 (unsigned int)offsetof(typeof(field), commit), 553 1, 554 (unsigned int)is_signed_type(long)); 555 556 trace_seq_printf(s, "\tfield: char data;\t" 557 "offset:%u;\tsize:%u;\tsigned:%u;\n", 558 (unsigned int)offsetof(typeof(field), data), 559 (unsigned int)buffer->subbuf_size, 560 (unsigned int)is_signed_type(char)); 561 562 return !trace_seq_has_overflowed(s); 563 } 564 565 static inline void rb_time_read(rb_time_t *t, u64 *ret) 566 { 567 *ret = local64_read(&t->time); 568 } 569 static void rb_time_set(rb_time_t *t, u64 val) 570 { 571 local64_set(&t->time, val); 572 } 573 574 /* 575 * Enable this to make sure that the event passed to 576 * ring_buffer_event_time_stamp() is not committed and also 577 * is on the buffer that it passed in. 578 */ 579 //#define RB_VERIFY_EVENT 580 #ifdef RB_VERIFY_EVENT 581 static struct list_head *rb_list_head(struct list_head *list); 582 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 583 void *event) 584 { 585 struct buffer_page *page = cpu_buffer->commit_page; 586 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 587 struct list_head *next; 588 long commit, write; 589 unsigned long addr = (unsigned long)event; 590 bool done = false; 591 int stop = 0; 592 593 /* Make sure the event exists and is not committed yet */ 594 do { 595 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 596 done = true; 597 commit = local_read(&page->page->commit); 598 write = local_read(&page->write); 599 if (addr >= (unsigned long)&page->page->data[commit] && 600 addr < (unsigned long)&page->page->data[write]) 601 return; 602 603 next = rb_list_head(page->list.next); 604 page = list_entry(next, struct buffer_page, list); 605 } while (!done); 606 WARN_ON_ONCE(1); 607 } 608 #else 609 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 610 void *event) 611 { 612 } 613 #endif 614 615 /* 616 * The absolute time stamp drops the 5 MSBs and some clocks may 617 * require them. The rb_fix_abs_ts() will take a previous full 618 * time stamp, and add the 5 MSB of that time stamp on to the 619 * saved absolute time stamp. Then they are compared in case of 620 * the unlikely event that the latest time stamp incremented 621 * the 5 MSB. 622 */ 623 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 624 { 625 if (save_ts & TS_MSB) { 626 abs |= save_ts & TS_MSB; 627 /* Check for overflow */ 628 if (unlikely(abs < save_ts)) 629 abs += 1ULL << 59; 630 } 631 return abs; 632 } 633 634 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 635 636 /** 637 * ring_buffer_event_time_stamp - return the event's current time stamp 638 * @buffer: The buffer that the event is on 639 * @event: the event to get the time stamp of 640 * 641 * Note, this must be called after @event is reserved, and before it is 642 * committed to the ring buffer. And must be called from the same 643 * context where the event was reserved (normal, softirq, irq, etc). 644 * 645 * Returns the time stamp associated with the current event. 646 * If the event has an extended time stamp, then that is used as 647 * the time stamp to return. 648 * In the highly unlikely case that the event was nested more than 649 * the max nesting, then the write_stamp of the buffer is returned, 650 * otherwise current time is returned, but that really neither of 651 * the last two cases should ever happen. 652 */ 653 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 654 struct ring_buffer_event *event) 655 { 656 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 657 unsigned int nest; 658 u64 ts; 659 660 /* If the event includes an absolute time, then just use that */ 661 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 662 ts = rb_event_time_stamp(event); 663 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 664 } 665 666 nest = local_read(&cpu_buffer->committing); 667 verify_event(cpu_buffer, event); 668 if (WARN_ON_ONCE(!nest)) 669 goto fail; 670 671 /* Read the current saved nesting level time stamp */ 672 if (likely(--nest < MAX_NEST)) 673 return cpu_buffer->event_stamp[nest]; 674 675 /* Shouldn't happen, warn if it does */ 676 WARN_ONCE(1, "nest (%d) greater than max", nest); 677 678 fail: 679 rb_time_read(&cpu_buffer->write_stamp, &ts); 680 681 return ts; 682 } 683 684 /** 685 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer 686 * @buffer: The ring_buffer to get the number of pages from 687 * @cpu: The cpu of the ring_buffer to get the number of pages from 688 * 689 * Returns the number of pages used by a per_cpu buffer of the ring buffer. 690 */ 691 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu) 692 { 693 return buffer->buffers[cpu]->nr_pages; 694 } 695 696 /** 697 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 698 * @buffer: The ring_buffer to get the number of pages from 699 * @cpu: The cpu of the ring_buffer to get the number of pages from 700 * 701 * Returns the number of pages that have content in the ring buffer. 702 */ 703 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 704 { 705 size_t read; 706 size_t lost; 707 size_t cnt; 708 709 read = local_read(&buffer->buffers[cpu]->pages_read); 710 lost = local_read(&buffer->buffers[cpu]->pages_lost); 711 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 712 713 if (WARN_ON_ONCE(cnt < lost)) 714 return 0; 715 716 cnt -= lost; 717 718 /* The reader can read an empty page, but not more than that */ 719 if (cnt < read) { 720 WARN_ON_ONCE(read > cnt + 1); 721 return 0; 722 } 723 724 return cnt - read; 725 } 726 727 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 728 { 729 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 730 size_t nr_pages; 731 size_t dirty; 732 733 nr_pages = cpu_buffer->nr_pages; 734 if (!nr_pages || !full) 735 return true; 736 737 dirty = ring_buffer_nr_dirty_pages(buffer, cpu); 738 739 return (dirty * 100) > (full * nr_pages); 740 } 741 742 /* 743 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 744 * 745 * Schedules a delayed work to wake up any task that is blocked on the 746 * ring buffer waiters queue. 747 */ 748 static void rb_wake_up_waiters(struct irq_work *work) 749 { 750 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 751 752 wake_up_all(&rbwork->waiters); 753 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 754 rbwork->wakeup_full = false; 755 rbwork->full_waiters_pending = false; 756 wake_up_all(&rbwork->full_waiters); 757 } 758 } 759 760 /** 761 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 762 * @buffer: The ring buffer to wake waiters on 763 * @cpu: The CPU buffer to wake waiters on 764 * 765 * In the case of a file that represents a ring buffer is closing, 766 * it is prudent to wake up any waiters that are on this. 767 */ 768 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 769 { 770 struct ring_buffer_per_cpu *cpu_buffer; 771 struct rb_irq_work *rbwork; 772 773 if (!buffer) 774 return; 775 776 if (cpu == RING_BUFFER_ALL_CPUS) { 777 778 /* Wake up individual ones too. One level recursion */ 779 for_each_buffer_cpu(buffer, cpu) 780 ring_buffer_wake_waiters(buffer, cpu); 781 782 rbwork = &buffer->irq_work; 783 } else { 784 if (WARN_ON_ONCE(!buffer->buffers)) 785 return; 786 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 787 return; 788 789 cpu_buffer = buffer->buffers[cpu]; 790 /* The CPU buffer may not have been initialized yet */ 791 if (!cpu_buffer) 792 return; 793 rbwork = &cpu_buffer->irq_work; 794 } 795 796 rbwork->wait_index++; 797 /* make sure the waiters see the new index */ 798 smp_wmb(); 799 800 rb_wake_up_waiters(&rbwork->work); 801 } 802 803 /** 804 * ring_buffer_wait - wait for input to the ring buffer 805 * @buffer: buffer to wait on 806 * @cpu: the cpu buffer to wait on 807 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 808 * 809 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 810 * as data is added to any of the @buffer's cpu buffers. Otherwise 811 * it will wait for data to be added to a specific cpu buffer. 812 */ 813 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full) 814 { 815 struct ring_buffer_per_cpu *cpu_buffer; 816 DEFINE_WAIT(wait); 817 struct rb_irq_work *work; 818 long wait_index; 819 int ret = 0; 820 821 /* 822 * Depending on what the caller is waiting for, either any 823 * data in any cpu buffer, or a specific buffer, put the 824 * caller on the appropriate wait queue. 825 */ 826 if (cpu == RING_BUFFER_ALL_CPUS) { 827 work = &buffer->irq_work; 828 /* Full only makes sense on per cpu reads */ 829 full = 0; 830 } else { 831 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 832 return -ENODEV; 833 cpu_buffer = buffer->buffers[cpu]; 834 work = &cpu_buffer->irq_work; 835 } 836 837 wait_index = READ_ONCE(work->wait_index); 838 839 while (true) { 840 if (full) 841 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE); 842 else 843 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE); 844 845 /* 846 * The events can happen in critical sections where 847 * checking a work queue can cause deadlocks. 848 * After adding a task to the queue, this flag is set 849 * only to notify events to try to wake up the queue 850 * using irq_work. 851 * 852 * We don't clear it even if the buffer is no longer 853 * empty. The flag only causes the next event to run 854 * irq_work to do the work queue wake up. The worse 855 * that can happen if we race with !trace_empty() is that 856 * an event will cause an irq_work to try to wake up 857 * an empty queue. 858 * 859 * There's no reason to protect this flag either, as 860 * the work queue and irq_work logic will do the necessary 861 * synchronization for the wake ups. The only thing 862 * that is necessary is that the wake up happens after 863 * a task has been queued. It's OK for spurious wake ups. 864 */ 865 if (full) 866 work->full_waiters_pending = true; 867 else 868 work->waiters_pending = true; 869 870 if (signal_pending(current)) { 871 ret = -EINTR; 872 break; 873 } 874 875 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) 876 break; 877 878 if (cpu != RING_BUFFER_ALL_CPUS && 879 !ring_buffer_empty_cpu(buffer, cpu)) { 880 unsigned long flags; 881 bool pagebusy; 882 bool done; 883 884 if (!full) 885 break; 886 887 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 888 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 889 done = !pagebusy && full_hit(buffer, cpu, full); 890 891 if (!cpu_buffer->shortest_full || 892 cpu_buffer->shortest_full > full) 893 cpu_buffer->shortest_full = full; 894 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 895 if (done) 896 break; 897 } 898 899 schedule(); 900 901 /* Make sure to see the new wait index */ 902 smp_rmb(); 903 if (wait_index != work->wait_index) 904 break; 905 } 906 907 if (full) 908 finish_wait(&work->full_waiters, &wait); 909 else 910 finish_wait(&work->waiters, &wait); 911 912 return ret; 913 } 914 915 /** 916 * ring_buffer_poll_wait - poll on buffer input 917 * @buffer: buffer to wait on 918 * @cpu: the cpu buffer to wait on 919 * @filp: the file descriptor 920 * @poll_table: The poll descriptor 921 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 922 * 923 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 924 * as data is added to any of the @buffer's cpu buffers. Otherwise 925 * it will wait for data to be added to a specific cpu buffer. 926 * 927 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 928 * zero otherwise. 929 */ 930 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 931 struct file *filp, poll_table *poll_table, int full) 932 { 933 struct ring_buffer_per_cpu *cpu_buffer; 934 struct rb_irq_work *work; 935 936 if (cpu == RING_BUFFER_ALL_CPUS) { 937 work = &buffer->irq_work; 938 full = 0; 939 } else { 940 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 941 return -EINVAL; 942 943 cpu_buffer = buffer->buffers[cpu]; 944 work = &cpu_buffer->irq_work; 945 } 946 947 if (full) { 948 poll_wait(filp, &work->full_waiters, poll_table); 949 work->full_waiters_pending = true; 950 if (!cpu_buffer->shortest_full || 951 cpu_buffer->shortest_full > full) 952 cpu_buffer->shortest_full = full; 953 } else { 954 poll_wait(filp, &work->waiters, poll_table); 955 work->waiters_pending = true; 956 } 957 958 /* 959 * There's a tight race between setting the waiters_pending and 960 * checking if the ring buffer is empty. Once the waiters_pending bit 961 * is set, the next event will wake the task up, but we can get stuck 962 * if there's only a single event in. 963 * 964 * FIXME: Ideally, we need a memory barrier on the writer side as well, 965 * but adding a memory barrier to all events will cause too much of a 966 * performance hit in the fast path. We only need a memory barrier when 967 * the buffer goes from empty to having content. But as this race is 968 * extremely small, and it's not a problem if another event comes in, we 969 * will fix it later. 970 */ 971 smp_mb(); 972 973 if (full) 974 return full_hit(buffer, cpu, full) ? EPOLLIN | EPOLLRDNORM : 0; 975 976 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 977 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 978 return EPOLLIN | EPOLLRDNORM; 979 return 0; 980 } 981 982 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 983 #define RB_WARN_ON(b, cond) \ 984 ({ \ 985 int _____ret = unlikely(cond); \ 986 if (_____ret) { \ 987 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 988 struct ring_buffer_per_cpu *__b = \ 989 (void *)b; \ 990 atomic_inc(&__b->buffer->record_disabled); \ 991 } else \ 992 atomic_inc(&b->record_disabled); \ 993 WARN_ON(1); \ 994 } \ 995 _____ret; \ 996 }) 997 998 /* Up this if you want to test the TIME_EXTENTS and normalization */ 999 #define DEBUG_SHIFT 0 1000 1001 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1002 { 1003 u64 ts; 1004 1005 /* Skip retpolines :-( */ 1006 if (IS_ENABLED(CONFIG_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1007 ts = trace_clock_local(); 1008 else 1009 ts = buffer->clock(); 1010 1011 /* shift to debug/test normalization and TIME_EXTENTS */ 1012 return ts << DEBUG_SHIFT; 1013 } 1014 1015 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1016 { 1017 u64 time; 1018 1019 preempt_disable_notrace(); 1020 time = rb_time_stamp(buffer); 1021 preempt_enable_notrace(); 1022 1023 return time; 1024 } 1025 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1026 1027 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1028 int cpu, u64 *ts) 1029 { 1030 /* Just stupid testing the normalize function and deltas */ 1031 *ts >>= DEBUG_SHIFT; 1032 } 1033 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1034 1035 /* 1036 * Making the ring buffer lockless makes things tricky. 1037 * Although writes only happen on the CPU that they are on, 1038 * and they only need to worry about interrupts. Reads can 1039 * happen on any CPU. 1040 * 1041 * The reader page is always off the ring buffer, but when the 1042 * reader finishes with a page, it needs to swap its page with 1043 * a new one from the buffer. The reader needs to take from 1044 * the head (writes go to the tail). But if a writer is in overwrite 1045 * mode and wraps, it must push the head page forward. 1046 * 1047 * Here lies the problem. 1048 * 1049 * The reader must be careful to replace only the head page, and 1050 * not another one. As described at the top of the file in the 1051 * ASCII art, the reader sets its old page to point to the next 1052 * page after head. It then sets the page after head to point to 1053 * the old reader page. But if the writer moves the head page 1054 * during this operation, the reader could end up with the tail. 1055 * 1056 * We use cmpxchg to help prevent this race. We also do something 1057 * special with the page before head. We set the LSB to 1. 1058 * 1059 * When the writer must push the page forward, it will clear the 1060 * bit that points to the head page, move the head, and then set 1061 * the bit that points to the new head page. 1062 * 1063 * We also don't want an interrupt coming in and moving the head 1064 * page on another writer. Thus we use the second LSB to catch 1065 * that too. Thus: 1066 * 1067 * head->list->prev->next bit 1 bit 0 1068 * ------- ------- 1069 * Normal page 0 0 1070 * Points to head page 0 1 1071 * New head page 1 0 1072 * 1073 * Note we can not trust the prev pointer of the head page, because: 1074 * 1075 * +----+ +-----+ +-----+ 1076 * | |------>| T |---X--->| N | 1077 * | |<------| | | | 1078 * +----+ +-----+ +-----+ 1079 * ^ ^ | 1080 * | +-----+ | | 1081 * +----------| R |----------+ | 1082 * | |<-----------+ 1083 * +-----+ 1084 * 1085 * Key: ---X--> HEAD flag set in pointer 1086 * T Tail page 1087 * R Reader page 1088 * N Next page 1089 * 1090 * (see __rb_reserve_next() to see where this happens) 1091 * 1092 * What the above shows is that the reader just swapped out 1093 * the reader page with a page in the buffer, but before it 1094 * could make the new header point back to the new page added 1095 * it was preempted by a writer. The writer moved forward onto 1096 * the new page added by the reader and is about to move forward 1097 * again. 1098 * 1099 * You can see, it is legitimate for the previous pointer of 1100 * the head (or any page) not to point back to itself. But only 1101 * temporarily. 1102 */ 1103 1104 #define RB_PAGE_NORMAL 0UL 1105 #define RB_PAGE_HEAD 1UL 1106 #define RB_PAGE_UPDATE 2UL 1107 1108 1109 #define RB_FLAG_MASK 3UL 1110 1111 /* PAGE_MOVED is not part of the mask */ 1112 #define RB_PAGE_MOVED 4UL 1113 1114 /* 1115 * rb_list_head - remove any bit 1116 */ 1117 static struct list_head *rb_list_head(struct list_head *list) 1118 { 1119 unsigned long val = (unsigned long)list; 1120 1121 return (struct list_head *)(val & ~RB_FLAG_MASK); 1122 } 1123 1124 /* 1125 * rb_is_head_page - test if the given page is the head page 1126 * 1127 * Because the reader may move the head_page pointer, we can 1128 * not trust what the head page is (it may be pointing to 1129 * the reader page). But if the next page is a header page, 1130 * its flags will be non zero. 1131 */ 1132 static inline int 1133 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1134 { 1135 unsigned long val; 1136 1137 val = (unsigned long)list->next; 1138 1139 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1140 return RB_PAGE_MOVED; 1141 1142 return val & RB_FLAG_MASK; 1143 } 1144 1145 /* 1146 * rb_is_reader_page 1147 * 1148 * The unique thing about the reader page, is that, if the 1149 * writer is ever on it, the previous pointer never points 1150 * back to the reader page. 1151 */ 1152 static bool rb_is_reader_page(struct buffer_page *page) 1153 { 1154 struct list_head *list = page->list.prev; 1155 1156 return rb_list_head(list->next) != &page->list; 1157 } 1158 1159 /* 1160 * rb_set_list_to_head - set a list_head to be pointing to head. 1161 */ 1162 static void rb_set_list_to_head(struct list_head *list) 1163 { 1164 unsigned long *ptr; 1165 1166 ptr = (unsigned long *)&list->next; 1167 *ptr |= RB_PAGE_HEAD; 1168 *ptr &= ~RB_PAGE_UPDATE; 1169 } 1170 1171 /* 1172 * rb_head_page_activate - sets up head page 1173 */ 1174 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1175 { 1176 struct buffer_page *head; 1177 1178 head = cpu_buffer->head_page; 1179 if (!head) 1180 return; 1181 1182 /* 1183 * Set the previous list pointer to have the HEAD flag. 1184 */ 1185 rb_set_list_to_head(head->list.prev); 1186 } 1187 1188 static void rb_list_head_clear(struct list_head *list) 1189 { 1190 unsigned long *ptr = (unsigned long *)&list->next; 1191 1192 *ptr &= ~RB_FLAG_MASK; 1193 } 1194 1195 /* 1196 * rb_head_page_deactivate - clears head page ptr (for free list) 1197 */ 1198 static void 1199 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1200 { 1201 struct list_head *hd; 1202 1203 /* Go through the whole list and clear any pointers found. */ 1204 rb_list_head_clear(cpu_buffer->pages); 1205 1206 list_for_each(hd, cpu_buffer->pages) 1207 rb_list_head_clear(hd); 1208 } 1209 1210 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1211 struct buffer_page *head, 1212 struct buffer_page *prev, 1213 int old_flag, int new_flag) 1214 { 1215 struct list_head *list; 1216 unsigned long val = (unsigned long)&head->list; 1217 unsigned long ret; 1218 1219 list = &prev->list; 1220 1221 val &= ~RB_FLAG_MASK; 1222 1223 ret = cmpxchg((unsigned long *)&list->next, 1224 val | old_flag, val | new_flag); 1225 1226 /* check if the reader took the page */ 1227 if ((ret & ~RB_FLAG_MASK) != val) 1228 return RB_PAGE_MOVED; 1229 1230 return ret & RB_FLAG_MASK; 1231 } 1232 1233 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1234 struct buffer_page *head, 1235 struct buffer_page *prev, 1236 int old_flag) 1237 { 1238 return rb_head_page_set(cpu_buffer, head, prev, 1239 old_flag, RB_PAGE_UPDATE); 1240 } 1241 1242 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1243 struct buffer_page *head, 1244 struct buffer_page *prev, 1245 int old_flag) 1246 { 1247 return rb_head_page_set(cpu_buffer, head, prev, 1248 old_flag, RB_PAGE_HEAD); 1249 } 1250 1251 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1252 struct buffer_page *head, 1253 struct buffer_page *prev, 1254 int old_flag) 1255 { 1256 return rb_head_page_set(cpu_buffer, head, prev, 1257 old_flag, RB_PAGE_NORMAL); 1258 } 1259 1260 static inline void rb_inc_page(struct buffer_page **bpage) 1261 { 1262 struct list_head *p = rb_list_head((*bpage)->list.next); 1263 1264 *bpage = list_entry(p, struct buffer_page, list); 1265 } 1266 1267 static struct buffer_page * 1268 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1269 { 1270 struct buffer_page *head; 1271 struct buffer_page *page; 1272 struct list_head *list; 1273 int i; 1274 1275 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1276 return NULL; 1277 1278 /* sanity check */ 1279 list = cpu_buffer->pages; 1280 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1281 return NULL; 1282 1283 page = head = cpu_buffer->head_page; 1284 /* 1285 * It is possible that the writer moves the header behind 1286 * where we started, and we miss in one loop. 1287 * A second loop should grab the header, but we'll do 1288 * three loops just because I'm paranoid. 1289 */ 1290 for (i = 0; i < 3; i++) { 1291 do { 1292 if (rb_is_head_page(page, page->list.prev)) { 1293 cpu_buffer->head_page = page; 1294 return page; 1295 } 1296 rb_inc_page(&page); 1297 } while (page != head); 1298 } 1299 1300 RB_WARN_ON(cpu_buffer, 1); 1301 1302 return NULL; 1303 } 1304 1305 static bool rb_head_page_replace(struct buffer_page *old, 1306 struct buffer_page *new) 1307 { 1308 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1309 unsigned long val; 1310 1311 val = *ptr & ~RB_FLAG_MASK; 1312 val |= RB_PAGE_HEAD; 1313 1314 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1315 } 1316 1317 /* 1318 * rb_tail_page_update - move the tail page forward 1319 */ 1320 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1321 struct buffer_page *tail_page, 1322 struct buffer_page *next_page) 1323 { 1324 unsigned long old_entries; 1325 unsigned long old_write; 1326 1327 /* 1328 * The tail page now needs to be moved forward. 1329 * 1330 * We need to reset the tail page, but without messing 1331 * with possible erasing of data brought in by interrupts 1332 * that have moved the tail page and are currently on it. 1333 * 1334 * We add a counter to the write field to denote this. 1335 */ 1336 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1337 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1338 1339 local_inc(&cpu_buffer->pages_touched); 1340 /* 1341 * Just make sure we have seen our old_write and synchronize 1342 * with any interrupts that come in. 1343 */ 1344 barrier(); 1345 1346 /* 1347 * If the tail page is still the same as what we think 1348 * it is, then it is up to us to update the tail 1349 * pointer. 1350 */ 1351 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1352 /* Zero the write counter */ 1353 unsigned long val = old_write & ~RB_WRITE_MASK; 1354 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1355 1356 /* 1357 * This will only succeed if an interrupt did 1358 * not come in and change it. In which case, we 1359 * do not want to modify it. 1360 * 1361 * We add (void) to let the compiler know that we do not care 1362 * about the return value of these functions. We use the 1363 * cmpxchg to only update if an interrupt did not already 1364 * do it for us. If the cmpxchg fails, we don't care. 1365 */ 1366 (void)local_cmpxchg(&next_page->write, old_write, val); 1367 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1368 1369 /* 1370 * No need to worry about races with clearing out the commit. 1371 * it only can increment when a commit takes place. But that 1372 * only happens in the outer most nested commit. 1373 */ 1374 local_set(&next_page->page->commit, 0); 1375 1376 /* Again, either we update tail_page or an interrupt does */ 1377 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page); 1378 } 1379 } 1380 1381 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1382 struct buffer_page *bpage) 1383 { 1384 unsigned long val = (unsigned long)bpage; 1385 1386 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1387 } 1388 1389 /** 1390 * rb_check_pages - integrity check of buffer pages 1391 * @cpu_buffer: CPU buffer with pages to test 1392 * 1393 * As a safety measure we check to make sure the data pages have not 1394 * been corrupted. 1395 */ 1396 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1397 { 1398 struct list_head *head = rb_list_head(cpu_buffer->pages); 1399 struct list_head *tmp; 1400 1401 if (RB_WARN_ON(cpu_buffer, 1402 rb_list_head(rb_list_head(head->next)->prev) != head)) 1403 return; 1404 1405 if (RB_WARN_ON(cpu_buffer, 1406 rb_list_head(rb_list_head(head->prev)->next) != head)) 1407 return; 1408 1409 for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) { 1410 if (RB_WARN_ON(cpu_buffer, 1411 rb_list_head(rb_list_head(tmp->next)->prev) != tmp)) 1412 return; 1413 1414 if (RB_WARN_ON(cpu_buffer, 1415 rb_list_head(rb_list_head(tmp->prev)->next) != tmp)) 1416 return; 1417 } 1418 } 1419 1420 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1421 long nr_pages, struct list_head *pages) 1422 { 1423 struct buffer_page *bpage, *tmp; 1424 bool user_thread = current->mm != NULL; 1425 gfp_t mflags; 1426 long i; 1427 1428 /* 1429 * Check if the available memory is there first. 1430 * Note, si_mem_available() only gives us a rough estimate of available 1431 * memory. It may not be accurate. But we don't care, we just want 1432 * to prevent doing any allocation when it is obvious that it is 1433 * not going to succeed. 1434 */ 1435 i = si_mem_available(); 1436 if (i < nr_pages) 1437 return -ENOMEM; 1438 1439 /* 1440 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 1441 * gracefully without invoking oom-killer and the system is not 1442 * destabilized. 1443 */ 1444 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 1445 1446 /* 1447 * If a user thread allocates too much, and si_mem_available() 1448 * reports there's enough memory, even though there is not. 1449 * Make sure the OOM killer kills this thread. This can happen 1450 * even with RETRY_MAYFAIL because another task may be doing 1451 * an allocation after this task has taken all memory. 1452 * This is the task the OOM killer needs to take out during this 1453 * loop, even if it was triggered by an allocation somewhere else. 1454 */ 1455 if (user_thread) 1456 set_current_oom_origin(); 1457 for (i = 0; i < nr_pages; i++) { 1458 struct page *page; 1459 1460 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1461 mflags, cpu_to_node(cpu_buffer->cpu)); 1462 if (!bpage) 1463 goto free_pages; 1464 1465 rb_check_bpage(cpu_buffer, bpage); 1466 1467 list_add(&bpage->list, pages); 1468 1469 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), mflags, 1470 cpu_buffer->buffer->subbuf_order); 1471 if (!page) 1472 goto free_pages; 1473 bpage->page = page_address(page); 1474 bpage->order = cpu_buffer->buffer->subbuf_order; 1475 rb_init_page(bpage->page); 1476 1477 if (user_thread && fatal_signal_pending(current)) 1478 goto free_pages; 1479 } 1480 if (user_thread) 1481 clear_current_oom_origin(); 1482 1483 return 0; 1484 1485 free_pages: 1486 list_for_each_entry_safe(bpage, tmp, pages, list) { 1487 list_del_init(&bpage->list); 1488 free_buffer_page(bpage); 1489 } 1490 if (user_thread) 1491 clear_current_oom_origin(); 1492 1493 return -ENOMEM; 1494 } 1495 1496 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1497 unsigned long nr_pages) 1498 { 1499 LIST_HEAD(pages); 1500 1501 WARN_ON(!nr_pages); 1502 1503 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 1504 return -ENOMEM; 1505 1506 /* 1507 * The ring buffer page list is a circular list that does not 1508 * start and end with a list head. All page list items point to 1509 * other pages. 1510 */ 1511 cpu_buffer->pages = pages.next; 1512 list_del(&pages); 1513 1514 cpu_buffer->nr_pages = nr_pages; 1515 1516 rb_check_pages(cpu_buffer); 1517 1518 return 0; 1519 } 1520 1521 static struct ring_buffer_per_cpu * 1522 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 1523 { 1524 struct ring_buffer_per_cpu *cpu_buffer; 1525 struct buffer_page *bpage; 1526 struct page *page; 1527 int ret; 1528 1529 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1530 GFP_KERNEL, cpu_to_node(cpu)); 1531 if (!cpu_buffer) 1532 return NULL; 1533 1534 cpu_buffer->cpu = cpu; 1535 cpu_buffer->buffer = buffer; 1536 raw_spin_lock_init(&cpu_buffer->reader_lock); 1537 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1538 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1539 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 1540 init_completion(&cpu_buffer->update_done); 1541 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 1542 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 1543 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 1544 1545 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1546 GFP_KERNEL, cpu_to_node(cpu)); 1547 if (!bpage) 1548 goto fail_free_buffer; 1549 1550 rb_check_bpage(cpu_buffer, bpage); 1551 1552 cpu_buffer->reader_page = bpage; 1553 1554 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, cpu_buffer->buffer->subbuf_order); 1555 if (!page) 1556 goto fail_free_reader; 1557 bpage->page = page_address(page); 1558 rb_init_page(bpage->page); 1559 1560 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1561 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1562 1563 ret = rb_allocate_pages(cpu_buffer, nr_pages); 1564 if (ret < 0) 1565 goto fail_free_reader; 1566 1567 cpu_buffer->head_page 1568 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1569 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1570 1571 rb_head_page_activate(cpu_buffer); 1572 1573 return cpu_buffer; 1574 1575 fail_free_reader: 1576 free_buffer_page(cpu_buffer->reader_page); 1577 1578 fail_free_buffer: 1579 kfree(cpu_buffer); 1580 return NULL; 1581 } 1582 1583 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1584 { 1585 struct list_head *head = cpu_buffer->pages; 1586 struct buffer_page *bpage, *tmp; 1587 1588 irq_work_sync(&cpu_buffer->irq_work.work); 1589 1590 free_buffer_page(cpu_buffer->reader_page); 1591 1592 if (head) { 1593 rb_head_page_deactivate(cpu_buffer); 1594 1595 list_for_each_entry_safe(bpage, tmp, head, list) { 1596 list_del_init(&bpage->list); 1597 free_buffer_page(bpage); 1598 } 1599 bpage = list_entry(head, struct buffer_page, list); 1600 free_buffer_page(bpage); 1601 } 1602 1603 free_page((unsigned long)cpu_buffer->free_page); 1604 1605 kfree(cpu_buffer); 1606 } 1607 1608 /** 1609 * __ring_buffer_alloc - allocate a new ring_buffer 1610 * @size: the size in bytes per cpu that is needed. 1611 * @flags: attributes to set for the ring buffer. 1612 * @key: ring buffer reader_lock_key. 1613 * 1614 * Currently the only flag that is available is the RB_FL_OVERWRITE 1615 * flag. This flag means that the buffer will overwrite old data 1616 * when the buffer wraps. If this flag is not set, the buffer will 1617 * drop data when the tail hits the head. 1618 */ 1619 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1620 struct lock_class_key *key) 1621 { 1622 struct trace_buffer *buffer; 1623 long nr_pages; 1624 int bsize; 1625 int cpu; 1626 int ret; 1627 1628 /* keep it in its own cache line */ 1629 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1630 GFP_KERNEL); 1631 if (!buffer) 1632 return NULL; 1633 1634 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1635 goto fail_free_buffer; 1636 1637 /* Default buffer page size - one system page */ 1638 buffer->subbuf_order = 0; 1639 buffer->subbuf_size = PAGE_SIZE - BUF_PAGE_HDR_SIZE; 1640 1641 /* Max payload is buffer page size - header (8bytes) */ 1642 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 1643 1644 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 1645 buffer->flags = flags; 1646 buffer->clock = trace_clock_local; 1647 buffer->reader_lock_key = key; 1648 1649 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 1650 init_waitqueue_head(&buffer->irq_work.waiters); 1651 1652 /* need at least two pages */ 1653 if (nr_pages < 2) 1654 nr_pages = 2; 1655 1656 buffer->cpus = nr_cpu_ids; 1657 1658 bsize = sizeof(void *) * nr_cpu_ids; 1659 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1660 GFP_KERNEL); 1661 if (!buffer->buffers) 1662 goto fail_free_cpumask; 1663 1664 cpu = raw_smp_processor_id(); 1665 cpumask_set_cpu(cpu, buffer->cpumask); 1666 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 1667 if (!buffer->buffers[cpu]) 1668 goto fail_free_buffers; 1669 1670 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1671 if (ret < 0) 1672 goto fail_free_buffers; 1673 1674 mutex_init(&buffer->mutex); 1675 1676 return buffer; 1677 1678 fail_free_buffers: 1679 for_each_buffer_cpu(buffer, cpu) { 1680 if (buffer->buffers[cpu]) 1681 rb_free_cpu_buffer(buffer->buffers[cpu]); 1682 } 1683 kfree(buffer->buffers); 1684 1685 fail_free_cpumask: 1686 free_cpumask_var(buffer->cpumask); 1687 1688 fail_free_buffer: 1689 kfree(buffer); 1690 return NULL; 1691 } 1692 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1693 1694 /** 1695 * ring_buffer_free - free a ring buffer. 1696 * @buffer: the buffer to free. 1697 */ 1698 void 1699 ring_buffer_free(struct trace_buffer *buffer) 1700 { 1701 int cpu; 1702 1703 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1704 1705 irq_work_sync(&buffer->irq_work.work); 1706 1707 for_each_buffer_cpu(buffer, cpu) 1708 rb_free_cpu_buffer(buffer->buffers[cpu]); 1709 1710 kfree(buffer->buffers); 1711 free_cpumask_var(buffer->cpumask); 1712 1713 kfree(buffer); 1714 } 1715 EXPORT_SYMBOL_GPL(ring_buffer_free); 1716 1717 void ring_buffer_set_clock(struct trace_buffer *buffer, 1718 u64 (*clock)(void)) 1719 { 1720 buffer->clock = clock; 1721 } 1722 1723 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 1724 { 1725 buffer->time_stamp_abs = abs; 1726 } 1727 1728 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 1729 { 1730 return buffer->time_stamp_abs; 1731 } 1732 1733 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1734 1735 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1736 { 1737 return local_read(&bpage->entries) & RB_WRITE_MASK; 1738 } 1739 1740 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1741 { 1742 return local_read(&bpage->write) & RB_WRITE_MASK; 1743 } 1744 1745 static bool 1746 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 1747 { 1748 struct list_head *tail_page, *to_remove, *next_page; 1749 struct buffer_page *to_remove_page, *tmp_iter_page; 1750 struct buffer_page *last_page, *first_page; 1751 unsigned long nr_removed; 1752 unsigned long head_bit; 1753 int page_entries; 1754 1755 head_bit = 0; 1756 1757 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1758 atomic_inc(&cpu_buffer->record_disabled); 1759 /* 1760 * We don't race with the readers since we have acquired the reader 1761 * lock. We also don't race with writers after disabling recording. 1762 * This makes it easy to figure out the first and the last page to be 1763 * removed from the list. We unlink all the pages in between including 1764 * the first and last pages. This is done in a busy loop so that we 1765 * lose the least number of traces. 1766 * The pages are freed after we restart recording and unlock readers. 1767 */ 1768 tail_page = &cpu_buffer->tail_page->list; 1769 1770 /* 1771 * tail page might be on reader page, we remove the next page 1772 * from the ring buffer 1773 */ 1774 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 1775 tail_page = rb_list_head(tail_page->next); 1776 to_remove = tail_page; 1777 1778 /* start of pages to remove */ 1779 first_page = list_entry(rb_list_head(to_remove->next), 1780 struct buffer_page, list); 1781 1782 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 1783 to_remove = rb_list_head(to_remove)->next; 1784 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 1785 } 1786 /* Read iterators need to reset themselves when some pages removed */ 1787 cpu_buffer->pages_removed += nr_removed; 1788 1789 next_page = rb_list_head(to_remove)->next; 1790 1791 /* 1792 * Now we remove all pages between tail_page and next_page. 1793 * Make sure that we have head_bit value preserved for the 1794 * next page 1795 */ 1796 tail_page->next = (struct list_head *)((unsigned long)next_page | 1797 head_bit); 1798 next_page = rb_list_head(next_page); 1799 next_page->prev = tail_page; 1800 1801 /* make sure pages points to a valid page in the ring buffer */ 1802 cpu_buffer->pages = next_page; 1803 1804 /* update head page */ 1805 if (head_bit) 1806 cpu_buffer->head_page = list_entry(next_page, 1807 struct buffer_page, list); 1808 1809 /* pages are removed, resume tracing and then free the pages */ 1810 atomic_dec(&cpu_buffer->record_disabled); 1811 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1812 1813 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 1814 1815 /* last buffer page to remove */ 1816 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 1817 list); 1818 tmp_iter_page = first_page; 1819 1820 do { 1821 cond_resched(); 1822 1823 to_remove_page = tmp_iter_page; 1824 rb_inc_page(&tmp_iter_page); 1825 1826 /* update the counters */ 1827 page_entries = rb_page_entries(to_remove_page); 1828 if (page_entries) { 1829 /* 1830 * If something was added to this page, it was full 1831 * since it is not the tail page. So we deduct the 1832 * bytes consumed in ring buffer from here. 1833 * Increment overrun to account for the lost events. 1834 */ 1835 local_add(page_entries, &cpu_buffer->overrun); 1836 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 1837 local_inc(&cpu_buffer->pages_lost); 1838 } 1839 1840 /* 1841 * We have already removed references to this list item, just 1842 * free up the buffer_page and its page 1843 */ 1844 free_buffer_page(to_remove_page); 1845 nr_removed--; 1846 1847 } while (to_remove_page != last_page); 1848 1849 RB_WARN_ON(cpu_buffer, nr_removed); 1850 1851 return nr_removed == 0; 1852 } 1853 1854 static bool 1855 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 1856 { 1857 struct list_head *pages = &cpu_buffer->new_pages; 1858 unsigned long flags; 1859 bool success; 1860 int retries; 1861 1862 /* Can be called at early boot up, where interrupts must not been enabled */ 1863 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1864 /* 1865 * We are holding the reader lock, so the reader page won't be swapped 1866 * in the ring buffer. Now we are racing with the writer trying to 1867 * move head page and the tail page. 1868 * We are going to adapt the reader page update process where: 1869 * 1. We first splice the start and end of list of new pages between 1870 * the head page and its previous page. 1871 * 2. We cmpxchg the prev_page->next to point from head page to the 1872 * start of new pages list. 1873 * 3. Finally, we update the head->prev to the end of new list. 1874 * 1875 * We will try this process 10 times, to make sure that we don't keep 1876 * spinning. 1877 */ 1878 retries = 10; 1879 success = false; 1880 while (retries--) { 1881 struct list_head *head_page, *prev_page; 1882 struct list_head *last_page, *first_page; 1883 struct list_head *head_page_with_bit; 1884 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 1885 1886 if (!hpage) 1887 break; 1888 head_page = &hpage->list; 1889 prev_page = head_page->prev; 1890 1891 first_page = pages->next; 1892 last_page = pages->prev; 1893 1894 head_page_with_bit = (struct list_head *) 1895 ((unsigned long)head_page | RB_PAGE_HEAD); 1896 1897 last_page->next = head_page_with_bit; 1898 first_page->prev = prev_page; 1899 1900 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 1901 if (try_cmpxchg(&prev_page->next, 1902 &head_page_with_bit, first_page)) { 1903 /* 1904 * yay, we replaced the page pointer to our new list, 1905 * now, we just have to update to head page's prev 1906 * pointer to point to end of list 1907 */ 1908 head_page->prev = last_page; 1909 success = true; 1910 break; 1911 } 1912 } 1913 1914 if (success) 1915 INIT_LIST_HEAD(pages); 1916 /* 1917 * If we weren't successful in adding in new pages, warn and stop 1918 * tracing 1919 */ 1920 RB_WARN_ON(cpu_buffer, !success); 1921 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1922 1923 /* free pages if they weren't inserted */ 1924 if (!success) { 1925 struct buffer_page *bpage, *tmp; 1926 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 1927 list) { 1928 list_del_init(&bpage->list); 1929 free_buffer_page(bpage); 1930 } 1931 } 1932 return success; 1933 } 1934 1935 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 1936 { 1937 bool success; 1938 1939 if (cpu_buffer->nr_pages_to_update > 0) 1940 success = rb_insert_pages(cpu_buffer); 1941 else 1942 success = rb_remove_pages(cpu_buffer, 1943 -cpu_buffer->nr_pages_to_update); 1944 1945 if (success) 1946 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 1947 } 1948 1949 static void update_pages_handler(struct work_struct *work) 1950 { 1951 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 1952 struct ring_buffer_per_cpu, update_pages_work); 1953 rb_update_pages(cpu_buffer); 1954 complete(&cpu_buffer->update_done); 1955 } 1956 1957 /** 1958 * ring_buffer_resize - resize the ring buffer 1959 * @buffer: the buffer to resize. 1960 * @size: the new size. 1961 * @cpu_id: the cpu buffer to resize 1962 * 1963 * Minimum size is 2 * buffer->subbuf_size. 1964 * 1965 * Returns 0 on success and < 0 on failure. 1966 */ 1967 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 1968 int cpu_id) 1969 { 1970 struct ring_buffer_per_cpu *cpu_buffer; 1971 unsigned long nr_pages; 1972 int cpu, err; 1973 1974 /* 1975 * Always succeed at resizing a non-existent buffer: 1976 */ 1977 if (!buffer) 1978 return 0; 1979 1980 /* Make sure the requested buffer exists */ 1981 if (cpu_id != RING_BUFFER_ALL_CPUS && 1982 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 1983 return 0; 1984 1985 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 1986 1987 /* we need a minimum of two pages */ 1988 if (nr_pages < 2) 1989 nr_pages = 2; 1990 1991 /* prevent another thread from changing buffer sizes */ 1992 mutex_lock(&buffer->mutex); 1993 atomic_inc(&buffer->resizing); 1994 1995 if (cpu_id == RING_BUFFER_ALL_CPUS) { 1996 /* 1997 * Don't succeed if resizing is disabled, as a reader might be 1998 * manipulating the ring buffer and is expecting a sane state while 1999 * this is true. 2000 */ 2001 for_each_buffer_cpu(buffer, cpu) { 2002 cpu_buffer = buffer->buffers[cpu]; 2003 if (atomic_read(&cpu_buffer->resize_disabled)) { 2004 err = -EBUSY; 2005 goto out_err_unlock; 2006 } 2007 } 2008 2009 /* calculate the pages to update */ 2010 for_each_buffer_cpu(buffer, cpu) { 2011 cpu_buffer = buffer->buffers[cpu]; 2012 2013 cpu_buffer->nr_pages_to_update = nr_pages - 2014 cpu_buffer->nr_pages; 2015 /* 2016 * nothing more to do for removing pages or no update 2017 */ 2018 if (cpu_buffer->nr_pages_to_update <= 0) 2019 continue; 2020 /* 2021 * to add pages, make sure all new pages can be 2022 * allocated without receiving ENOMEM 2023 */ 2024 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2025 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2026 &cpu_buffer->new_pages)) { 2027 /* not enough memory for new pages */ 2028 err = -ENOMEM; 2029 goto out_err; 2030 } 2031 2032 cond_resched(); 2033 } 2034 2035 cpus_read_lock(); 2036 /* 2037 * Fire off all the required work handlers 2038 * We can't schedule on offline CPUs, but it's not necessary 2039 * since we can change their buffer sizes without any race. 2040 */ 2041 for_each_buffer_cpu(buffer, cpu) { 2042 cpu_buffer = buffer->buffers[cpu]; 2043 if (!cpu_buffer->nr_pages_to_update) 2044 continue; 2045 2046 /* Can't run something on an offline CPU. */ 2047 if (!cpu_online(cpu)) { 2048 rb_update_pages(cpu_buffer); 2049 cpu_buffer->nr_pages_to_update = 0; 2050 } else { 2051 /* Run directly if possible. */ 2052 migrate_disable(); 2053 if (cpu != smp_processor_id()) { 2054 migrate_enable(); 2055 schedule_work_on(cpu, 2056 &cpu_buffer->update_pages_work); 2057 } else { 2058 update_pages_handler(&cpu_buffer->update_pages_work); 2059 migrate_enable(); 2060 } 2061 } 2062 } 2063 2064 /* wait for all the updates to complete */ 2065 for_each_buffer_cpu(buffer, cpu) { 2066 cpu_buffer = buffer->buffers[cpu]; 2067 if (!cpu_buffer->nr_pages_to_update) 2068 continue; 2069 2070 if (cpu_online(cpu)) 2071 wait_for_completion(&cpu_buffer->update_done); 2072 cpu_buffer->nr_pages_to_update = 0; 2073 } 2074 2075 cpus_read_unlock(); 2076 } else { 2077 cpu_buffer = buffer->buffers[cpu_id]; 2078 2079 if (nr_pages == cpu_buffer->nr_pages) 2080 goto out; 2081 2082 /* 2083 * Don't succeed if resizing is disabled, as a reader might be 2084 * manipulating the ring buffer and is expecting a sane state while 2085 * this is true. 2086 */ 2087 if (atomic_read(&cpu_buffer->resize_disabled)) { 2088 err = -EBUSY; 2089 goto out_err_unlock; 2090 } 2091 2092 cpu_buffer->nr_pages_to_update = nr_pages - 2093 cpu_buffer->nr_pages; 2094 2095 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2096 if (cpu_buffer->nr_pages_to_update > 0 && 2097 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2098 &cpu_buffer->new_pages)) { 2099 err = -ENOMEM; 2100 goto out_err; 2101 } 2102 2103 cpus_read_lock(); 2104 2105 /* Can't run something on an offline CPU. */ 2106 if (!cpu_online(cpu_id)) 2107 rb_update_pages(cpu_buffer); 2108 else { 2109 /* Run directly if possible. */ 2110 migrate_disable(); 2111 if (cpu_id == smp_processor_id()) { 2112 rb_update_pages(cpu_buffer); 2113 migrate_enable(); 2114 } else { 2115 migrate_enable(); 2116 schedule_work_on(cpu_id, 2117 &cpu_buffer->update_pages_work); 2118 wait_for_completion(&cpu_buffer->update_done); 2119 } 2120 } 2121 2122 cpu_buffer->nr_pages_to_update = 0; 2123 cpus_read_unlock(); 2124 } 2125 2126 out: 2127 /* 2128 * The ring buffer resize can happen with the ring buffer 2129 * enabled, so that the update disturbs the tracing as little 2130 * as possible. But if the buffer is disabled, we do not need 2131 * to worry about that, and we can take the time to verify 2132 * that the buffer is not corrupt. 2133 */ 2134 if (atomic_read(&buffer->record_disabled)) { 2135 atomic_inc(&buffer->record_disabled); 2136 /* 2137 * Even though the buffer was disabled, we must make sure 2138 * that it is truly disabled before calling rb_check_pages. 2139 * There could have been a race between checking 2140 * record_disable and incrementing it. 2141 */ 2142 synchronize_rcu(); 2143 for_each_buffer_cpu(buffer, cpu) { 2144 cpu_buffer = buffer->buffers[cpu]; 2145 rb_check_pages(cpu_buffer); 2146 } 2147 atomic_dec(&buffer->record_disabled); 2148 } 2149 2150 atomic_dec(&buffer->resizing); 2151 mutex_unlock(&buffer->mutex); 2152 return 0; 2153 2154 out_err: 2155 for_each_buffer_cpu(buffer, cpu) { 2156 struct buffer_page *bpage, *tmp; 2157 2158 cpu_buffer = buffer->buffers[cpu]; 2159 cpu_buffer->nr_pages_to_update = 0; 2160 2161 if (list_empty(&cpu_buffer->new_pages)) 2162 continue; 2163 2164 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2165 list) { 2166 list_del_init(&bpage->list); 2167 free_buffer_page(bpage); 2168 } 2169 } 2170 out_err_unlock: 2171 atomic_dec(&buffer->resizing); 2172 mutex_unlock(&buffer->mutex); 2173 return err; 2174 } 2175 EXPORT_SYMBOL_GPL(ring_buffer_resize); 2176 2177 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 2178 { 2179 mutex_lock(&buffer->mutex); 2180 if (val) 2181 buffer->flags |= RB_FL_OVERWRITE; 2182 else 2183 buffer->flags &= ~RB_FL_OVERWRITE; 2184 mutex_unlock(&buffer->mutex); 2185 } 2186 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 2187 2188 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 2189 { 2190 return bpage->page->data + index; 2191 } 2192 2193 static __always_inline struct ring_buffer_event * 2194 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 2195 { 2196 return __rb_page_index(cpu_buffer->reader_page, 2197 cpu_buffer->reader_page->read); 2198 } 2199 2200 static struct ring_buffer_event * 2201 rb_iter_head_event(struct ring_buffer_iter *iter) 2202 { 2203 struct ring_buffer_event *event; 2204 struct buffer_page *iter_head_page = iter->head_page; 2205 unsigned long commit; 2206 unsigned length; 2207 2208 if (iter->head != iter->next_event) 2209 return iter->event; 2210 2211 /* 2212 * When the writer goes across pages, it issues a cmpxchg which 2213 * is a mb(), which will synchronize with the rmb here. 2214 * (see rb_tail_page_update() and __rb_reserve_next()) 2215 */ 2216 commit = rb_page_commit(iter_head_page); 2217 smp_rmb(); 2218 2219 /* An event needs to be at least 8 bytes in size */ 2220 if (iter->head > commit - 8) 2221 goto reset; 2222 2223 event = __rb_page_index(iter_head_page, iter->head); 2224 length = rb_event_length(event); 2225 2226 /* 2227 * READ_ONCE() doesn't work on functions and we don't want the 2228 * compiler doing any crazy optimizations with length. 2229 */ 2230 barrier(); 2231 2232 if ((iter->head + length) > commit || length > iter->event_size) 2233 /* Writer corrupted the read? */ 2234 goto reset; 2235 2236 memcpy(iter->event, event, length); 2237 /* 2238 * If the page stamp is still the same after this rmb() then the 2239 * event was safely copied without the writer entering the page. 2240 */ 2241 smp_rmb(); 2242 2243 /* Make sure the page didn't change since we read this */ 2244 if (iter->page_stamp != iter_head_page->page->time_stamp || 2245 commit > rb_page_commit(iter_head_page)) 2246 goto reset; 2247 2248 iter->next_event = iter->head + length; 2249 return iter->event; 2250 reset: 2251 /* Reset to the beginning */ 2252 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2253 iter->head = 0; 2254 iter->next_event = 0; 2255 iter->missed_events = 1; 2256 return NULL; 2257 } 2258 2259 /* Size is determined by what has been committed */ 2260 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 2261 { 2262 return rb_page_commit(bpage); 2263 } 2264 2265 static __always_inline unsigned 2266 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 2267 { 2268 return rb_page_commit(cpu_buffer->commit_page); 2269 } 2270 2271 static __always_inline unsigned 2272 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 2273 { 2274 unsigned long addr = (unsigned long)event; 2275 2276 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 2277 2278 return addr - BUF_PAGE_HDR_SIZE; 2279 } 2280 2281 static void rb_inc_iter(struct ring_buffer_iter *iter) 2282 { 2283 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2284 2285 /* 2286 * The iterator could be on the reader page (it starts there). 2287 * But the head could have moved, since the reader was 2288 * found. Check for this case and assign the iterator 2289 * to the head page instead of next. 2290 */ 2291 if (iter->head_page == cpu_buffer->reader_page) 2292 iter->head_page = rb_set_head_page(cpu_buffer); 2293 else 2294 rb_inc_page(&iter->head_page); 2295 2296 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2297 iter->head = 0; 2298 iter->next_event = 0; 2299 } 2300 2301 /* 2302 * rb_handle_head_page - writer hit the head page 2303 * 2304 * Returns: +1 to retry page 2305 * 0 to continue 2306 * -1 on error 2307 */ 2308 static int 2309 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 2310 struct buffer_page *tail_page, 2311 struct buffer_page *next_page) 2312 { 2313 struct buffer_page *new_head; 2314 int entries; 2315 int type; 2316 int ret; 2317 2318 entries = rb_page_entries(next_page); 2319 2320 /* 2321 * The hard part is here. We need to move the head 2322 * forward, and protect against both readers on 2323 * other CPUs and writers coming in via interrupts. 2324 */ 2325 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 2326 RB_PAGE_HEAD); 2327 2328 /* 2329 * type can be one of four: 2330 * NORMAL - an interrupt already moved it for us 2331 * HEAD - we are the first to get here. 2332 * UPDATE - we are the interrupt interrupting 2333 * a current move. 2334 * MOVED - a reader on another CPU moved the next 2335 * pointer to its reader page. Give up 2336 * and try again. 2337 */ 2338 2339 switch (type) { 2340 case RB_PAGE_HEAD: 2341 /* 2342 * We changed the head to UPDATE, thus 2343 * it is our responsibility to update 2344 * the counters. 2345 */ 2346 local_add(entries, &cpu_buffer->overrun); 2347 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 2348 local_inc(&cpu_buffer->pages_lost); 2349 2350 /* 2351 * The entries will be zeroed out when we move the 2352 * tail page. 2353 */ 2354 2355 /* still more to do */ 2356 break; 2357 2358 case RB_PAGE_UPDATE: 2359 /* 2360 * This is an interrupt that interrupt the 2361 * previous update. Still more to do. 2362 */ 2363 break; 2364 case RB_PAGE_NORMAL: 2365 /* 2366 * An interrupt came in before the update 2367 * and processed this for us. 2368 * Nothing left to do. 2369 */ 2370 return 1; 2371 case RB_PAGE_MOVED: 2372 /* 2373 * The reader is on another CPU and just did 2374 * a swap with our next_page. 2375 * Try again. 2376 */ 2377 return 1; 2378 default: 2379 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 2380 return -1; 2381 } 2382 2383 /* 2384 * Now that we are here, the old head pointer is 2385 * set to UPDATE. This will keep the reader from 2386 * swapping the head page with the reader page. 2387 * The reader (on another CPU) will spin till 2388 * we are finished. 2389 * 2390 * We just need to protect against interrupts 2391 * doing the job. We will set the next pointer 2392 * to HEAD. After that, we set the old pointer 2393 * to NORMAL, but only if it was HEAD before. 2394 * otherwise we are an interrupt, and only 2395 * want the outer most commit to reset it. 2396 */ 2397 new_head = next_page; 2398 rb_inc_page(&new_head); 2399 2400 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 2401 RB_PAGE_NORMAL); 2402 2403 /* 2404 * Valid returns are: 2405 * HEAD - an interrupt came in and already set it. 2406 * NORMAL - One of two things: 2407 * 1) We really set it. 2408 * 2) A bunch of interrupts came in and moved 2409 * the page forward again. 2410 */ 2411 switch (ret) { 2412 case RB_PAGE_HEAD: 2413 case RB_PAGE_NORMAL: 2414 /* OK */ 2415 break; 2416 default: 2417 RB_WARN_ON(cpu_buffer, 1); 2418 return -1; 2419 } 2420 2421 /* 2422 * It is possible that an interrupt came in, 2423 * set the head up, then more interrupts came in 2424 * and moved it again. When we get back here, 2425 * the page would have been set to NORMAL but we 2426 * just set it back to HEAD. 2427 * 2428 * How do you detect this? Well, if that happened 2429 * the tail page would have moved. 2430 */ 2431 if (ret == RB_PAGE_NORMAL) { 2432 struct buffer_page *buffer_tail_page; 2433 2434 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 2435 /* 2436 * If the tail had moved passed next, then we need 2437 * to reset the pointer. 2438 */ 2439 if (buffer_tail_page != tail_page && 2440 buffer_tail_page != next_page) 2441 rb_head_page_set_normal(cpu_buffer, new_head, 2442 next_page, 2443 RB_PAGE_HEAD); 2444 } 2445 2446 /* 2447 * If this was the outer most commit (the one that 2448 * changed the original pointer from HEAD to UPDATE), 2449 * then it is up to us to reset it to NORMAL. 2450 */ 2451 if (type == RB_PAGE_HEAD) { 2452 ret = rb_head_page_set_normal(cpu_buffer, next_page, 2453 tail_page, 2454 RB_PAGE_UPDATE); 2455 if (RB_WARN_ON(cpu_buffer, 2456 ret != RB_PAGE_UPDATE)) 2457 return -1; 2458 } 2459 2460 return 0; 2461 } 2462 2463 static inline void 2464 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 2465 unsigned long tail, struct rb_event_info *info) 2466 { 2467 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 2468 struct buffer_page *tail_page = info->tail_page; 2469 struct ring_buffer_event *event; 2470 unsigned long length = info->length; 2471 2472 /* 2473 * Only the event that crossed the page boundary 2474 * must fill the old tail_page with padding. 2475 */ 2476 if (tail >= bsize) { 2477 /* 2478 * If the page was filled, then we still need 2479 * to update the real_end. Reset it to zero 2480 * and the reader will ignore it. 2481 */ 2482 if (tail == bsize) 2483 tail_page->real_end = 0; 2484 2485 local_sub(length, &tail_page->write); 2486 return; 2487 } 2488 2489 event = __rb_page_index(tail_page, tail); 2490 2491 /* 2492 * Save the original length to the meta data. 2493 * This will be used by the reader to add lost event 2494 * counter. 2495 */ 2496 tail_page->real_end = tail; 2497 2498 /* 2499 * If this event is bigger than the minimum size, then 2500 * we need to be careful that we don't subtract the 2501 * write counter enough to allow another writer to slip 2502 * in on this page. 2503 * We put in a discarded commit instead, to make sure 2504 * that this space is not used again, and this space will 2505 * not be accounted into 'entries_bytes'. 2506 * 2507 * If we are less than the minimum size, we don't need to 2508 * worry about it. 2509 */ 2510 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 2511 /* No room for any events */ 2512 2513 /* Mark the rest of the page with padding */ 2514 rb_event_set_padding(event); 2515 2516 /* Make sure the padding is visible before the write update */ 2517 smp_wmb(); 2518 2519 /* Set the write back to the previous setting */ 2520 local_sub(length, &tail_page->write); 2521 return; 2522 } 2523 2524 /* Put in a discarded event */ 2525 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 2526 event->type_len = RINGBUF_TYPE_PADDING; 2527 /* time delta must be non zero */ 2528 event->time_delta = 1; 2529 2530 /* account for padding bytes */ 2531 local_add(bsize - tail, &cpu_buffer->entries_bytes); 2532 2533 /* Make sure the padding is visible before the tail_page->write update */ 2534 smp_wmb(); 2535 2536 /* Set write to end of buffer */ 2537 length = (tail + length) - bsize; 2538 local_sub(length, &tail_page->write); 2539 } 2540 2541 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 2542 2543 /* 2544 * This is the slow path, force gcc not to inline it. 2545 */ 2546 static noinline struct ring_buffer_event * 2547 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 2548 unsigned long tail, struct rb_event_info *info) 2549 { 2550 struct buffer_page *tail_page = info->tail_page; 2551 struct buffer_page *commit_page = cpu_buffer->commit_page; 2552 struct trace_buffer *buffer = cpu_buffer->buffer; 2553 struct buffer_page *next_page; 2554 int ret; 2555 2556 next_page = tail_page; 2557 2558 rb_inc_page(&next_page); 2559 2560 /* 2561 * If for some reason, we had an interrupt storm that made 2562 * it all the way around the buffer, bail, and warn 2563 * about it. 2564 */ 2565 if (unlikely(next_page == commit_page)) { 2566 local_inc(&cpu_buffer->commit_overrun); 2567 goto out_reset; 2568 } 2569 2570 /* 2571 * This is where the fun begins! 2572 * 2573 * We are fighting against races between a reader that 2574 * could be on another CPU trying to swap its reader 2575 * page with the buffer head. 2576 * 2577 * We are also fighting against interrupts coming in and 2578 * moving the head or tail on us as well. 2579 * 2580 * If the next page is the head page then we have filled 2581 * the buffer, unless the commit page is still on the 2582 * reader page. 2583 */ 2584 if (rb_is_head_page(next_page, &tail_page->list)) { 2585 2586 /* 2587 * If the commit is not on the reader page, then 2588 * move the header page. 2589 */ 2590 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 2591 /* 2592 * If we are not in overwrite mode, 2593 * this is easy, just stop here. 2594 */ 2595 if (!(buffer->flags & RB_FL_OVERWRITE)) { 2596 local_inc(&cpu_buffer->dropped_events); 2597 goto out_reset; 2598 } 2599 2600 ret = rb_handle_head_page(cpu_buffer, 2601 tail_page, 2602 next_page); 2603 if (ret < 0) 2604 goto out_reset; 2605 if (ret) 2606 goto out_again; 2607 } else { 2608 /* 2609 * We need to be careful here too. The 2610 * commit page could still be on the reader 2611 * page. We could have a small buffer, and 2612 * have filled up the buffer with events 2613 * from interrupts and such, and wrapped. 2614 * 2615 * Note, if the tail page is also on the 2616 * reader_page, we let it move out. 2617 */ 2618 if (unlikely((cpu_buffer->commit_page != 2619 cpu_buffer->tail_page) && 2620 (cpu_buffer->commit_page == 2621 cpu_buffer->reader_page))) { 2622 local_inc(&cpu_buffer->commit_overrun); 2623 goto out_reset; 2624 } 2625 } 2626 } 2627 2628 rb_tail_page_update(cpu_buffer, tail_page, next_page); 2629 2630 out_again: 2631 2632 rb_reset_tail(cpu_buffer, tail, info); 2633 2634 /* Commit what we have for now. */ 2635 rb_end_commit(cpu_buffer); 2636 /* rb_end_commit() decs committing */ 2637 local_inc(&cpu_buffer->committing); 2638 2639 /* fail and let the caller try again */ 2640 return ERR_PTR(-EAGAIN); 2641 2642 out_reset: 2643 /* reset write */ 2644 rb_reset_tail(cpu_buffer, tail, info); 2645 2646 return NULL; 2647 } 2648 2649 /* Slow path */ 2650 static struct ring_buffer_event * 2651 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2652 struct ring_buffer_event *event, u64 delta, bool abs) 2653 { 2654 if (abs) 2655 event->type_len = RINGBUF_TYPE_TIME_STAMP; 2656 else 2657 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 2658 2659 /* Not the first event on the page, or not delta? */ 2660 if (abs || rb_event_index(cpu_buffer, event)) { 2661 event->time_delta = delta & TS_MASK; 2662 event->array[0] = delta >> TS_SHIFT; 2663 } else { 2664 /* nope, just zero it */ 2665 event->time_delta = 0; 2666 event->array[0] = 0; 2667 } 2668 2669 return skip_time_extend(event); 2670 } 2671 2672 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 2673 static inline bool sched_clock_stable(void) 2674 { 2675 return true; 2676 } 2677 #endif 2678 2679 static void 2680 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2681 struct rb_event_info *info) 2682 { 2683 u64 write_stamp; 2684 2685 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 2686 (unsigned long long)info->delta, 2687 (unsigned long long)info->ts, 2688 (unsigned long long)info->before, 2689 (unsigned long long)info->after, 2690 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 2691 sched_clock_stable() ? "" : 2692 "If you just came from a suspend/resume,\n" 2693 "please switch to the trace global clock:\n" 2694 " echo global > /sys/kernel/tracing/trace_clock\n" 2695 "or add trace_clock=global to the kernel command line\n"); 2696 } 2697 2698 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2699 struct ring_buffer_event **event, 2700 struct rb_event_info *info, 2701 u64 *delta, 2702 unsigned int *length) 2703 { 2704 bool abs = info->add_timestamp & 2705 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 2706 2707 if (unlikely(info->delta > (1ULL << 59))) { 2708 /* 2709 * Some timers can use more than 59 bits, and when a timestamp 2710 * is added to the buffer, it will lose those bits. 2711 */ 2712 if (abs && (info->ts & TS_MSB)) { 2713 info->delta &= ABS_TS_MASK; 2714 2715 /* did the clock go backwards */ 2716 } else if (info->before == info->after && info->before > info->ts) { 2717 /* not interrupted */ 2718 static int once; 2719 2720 /* 2721 * This is possible with a recalibrating of the TSC. 2722 * Do not produce a call stack, but just report it. 2723 */ 2724 if (!once) { 2725 once++; 2726 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 2727 info->before, info->ts); 2728 } 2729 } else 2730 rb_check_timestamp(cpu_buffer, info); 2731 if (!abs) 2732 info->delta = 0; 2733 } 2734 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 2735 *length -= RB_LEN_TIME_EXTEND; 2736 *delta = 0; 2737 } 2738 2739 /** 2740 * rb_update_event - update event type and data 2741 * @cpu_buffer: The per cpu buffer of the @event 2742 * @event: the event to update 2743 * @info: The info to update the @event with (contains length and delta) 2744 * 2745 * Update the type and data fields of the @event. The length 2746 * is the actual size that is written to the ring buffer, 2747 * and with this, we can determine what to place into the 2748 * data field. 2749 */ 2750 static void 2751 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 2752 struct ring_buffer_event *event, 2753 struct rb_event_info *info) 2754 { 2755 unsigned length = info->length; 2756 u64 delta = info->delta; 2757 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 2758 2759 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 2760 cpu_buffer->event_stamp[nest] = info->ts; 2761 2762 /* 2763 * If we need to add a timestamp, then we 2764 * add it to the start of the reserved space. 2765 */ 2766 if (unlikely(info->add_timestamp)) 2767 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 2768 2769 event->time_delta = delta; 2770 length -= RB_EVNT_HDR_SIZE; 2771 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 2772 event->type_len = 0; 2773 event->array[0] = length; 2774 } else 2775 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 2776 } 2777 2778 static unsigned rb_calculate_event_length(unsigned length) 2779 { 2780 struct ring_buffer_event event; /* Used only for sizeof array */ 2781 2782 /* zero length can cause confusions */ 2783 if (!length) 2784 length++; 2785 2786 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 2787 length += sizeof(event.array[0]); 2788 2789 length += RB_EVNT_HDR_SIZE; 2790 length = ALIGN(length, RB_ARCH_ALIGNMENT); 2791 2792 /* 2793 * In case the time delta is larger than the 27 bits for it 2794 * in the header, we need to add a timestamp. If another 2795 * event comes in when trying to discard this one to increase 2796 * the length, then the timestamp will be added in the allocated 2797 * space of this event. If length is bigger than the size needed 2798 * for the TIME_EXTEND, then padding has to be used. The events 2799 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 2800 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 2801 * As length is a multiple of 4, we only need to worry if it 2802 * is 12 (RB_LEN_TIME_EXTEND + 4). 2803 */ 2804 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 2805 length += RB_ALIGNMENT; 2806 2807 return length; 2808 } 2809 2810 static inline bool 2811 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2812 struct ring_buffer_event *event) 2813 { 2814 unsigned long new_index, old_index; 2815 struct buffer_page *bpage; 2816 unsigned long addr; 2817 2818 new_index = rb_event_index(cpu_buffer, event); 2819 old_index = new_index + rb_event_ts_length(event); 2820 addr = (unsigned long)event; 2821 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 2822 2823 bpage = READ_ONCE(cpu_buffer->tail_page); 2824 2825 /* 2826 * Make sure the tail_page is still the same and 2827 * the next write location is the end of this event 2828 */ 2829 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 2830 unsigned long write_mask = 2831 local_read(&bpage->write) & ~RB_WRITE_MASK; 2832 unsigned long event_length = rb_event_length(event); 2833 2834 /* 2835 * For the before_stamp to be different than the write_stamp 2836 * to make sure that the next event adds an absolute 2837 * value and does not rely on the saved write stamp, which 2838 * is now going to be bogus. 2839 * 2840 * By setting the before_stamp to zero, the next event 2841 * is not going to use the write_stamp and will instead 2842 * create an absolute timestamp. This means there's no 2843 * reason to update the wirte_stamp! 2844 */ 2845 rb_time_set(&cpu_buffer->before_stamp, 0); 2846 2847 /* 2848 * If an event were to come in now, it would see that the 2849 * write_stamp and the before_stamp are different, and assume 2850 * that this event just added itself before updating 2851 * the write stamp. The interrupting event will fix the 2852 * write stamp for us, and use an absolute timestamp. 2853 */ 2854 2855 /* 2856 * This is on the tail page. It is possible that 2857 * a write could come in and move the tail page 2858 * and write to the next page. That is fine 2859 * because we just shorten what is on this page. 2860 */ 2861 old_index += write_mask; 2862 new_index += write_mask; 2863 2864 /* caution: old_index gets updated on cmpxchg failure */ 2865 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 2866 /* update counters */ 2867 local_sub(event_length, &cpu_buffer->entries_bytes); 2868 return true; 2869 } 2870 } 2871 2872 /* could not discard */ 2873 return false; 2874 } 2875 2876 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2877 { 2878 local_inc(&cpu_buffer->committing); 2879 local_inc(&cpu_buffer->commits); 2880 } 2881 2882 static __always_inline void 2883 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 2884 { 2885 unsigned long max_count; 2886 2887 /* 2888 * We only race with interrupts and NMIs on this CPU. 2889 * If we own the commit event, then we can commit 2890 * all others that interrupted us, since the interruptions 2891 * are in stack format (they finish before they come 2892 * back to us). This allows us to do a simple loop to 2893 * assign the commit to the tail. 2894 */ 2895 again: 2896 max_count = cpu_buffer->nr_pages * 100; 2897 2898 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 2899 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 2900 return; 2901 if (RB_WARN_ON(cpu_buffer, 2902 rb_is_reader_page(cpu_buffer->tail_page))) 2903 return; 2904 /* 2905 * No need for a memory barrier here, as the update 2906 * of the tail_page did it for this page. 2907 */ 2908 local_set(&cpu_buffer->commit_page->page->commit, 2909 rb_page_write(cpu_buffer->commit_page)); 2910 rb_inc_page(&cpu_buffer->commit_page); 2911 /* add barrier to keep gcc from optimizing too much */ 2912 barrier(); 2913 } 2914 while (rb_commit_index(cpu_buffer) != 2915 rb_page_write(cpu_buffer->commit_page)) { 2916 2917 /* Make sure the readers see the content of what is committed. */ 2918 smp_wmb(); 2919 local_set(&cpu_buffer->commit_page->page->commit, 2920 rb_page_write(cpu_buffer->commit_page)); 2921 RB_WARN_ON(cpu_buffer, 2922 local_read(&cpu_buffer->commit_page->page->commit) & 2923 ~RB_WRITE_MASK); 2924 barrier(); 2925 } 2926 2927 /* again, keep gcc from optimizing */ 2928 barrier(); 2929 2930 /* 2931 * If an interrupt came in just after the first while loop 2932 * and pushed the tail page forward, we will be left with 2933 * a dangling commit that will never go forward. 2934 */ 2935 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 2936 goto again; 2937 } 2938 2939 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 2940 { 2941 unsigned long commits; 2942 2943 if (RB_WARN_ON(cpu_buffer, 2944 !local_read(&cpu_buffer->committing))) 2945 return; 2946 2947 again: 2948 commits = local_read(&cpu_buffer->commits); 2949 /* synchronize with interrupts */ 2950 barrier(); 2951 if (local_read(&cpu_buffer->committing) == 1) 2952 rb_set_commit_to_write(cpu_buffer); 2953 2954 local_dec(&cpu_buffer->committing); 2955 2956 /* synchronize with interrupts */ 2957 barrier(); 2958 2959 /* 2960 * Need to account for interrupts coming in between the 2961 * updating of the commit page and the clearing of the 2962 * committing counter. 2963 */ 2964 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 2965 !local_read(&cpu_buffer->committing)) { 2966 local_inc(&cpu_buffer->committing); 2967 goto again; 2968 } 2969 } 2970 2971 static inline void rb_event_discard(struct ring_buffer_event *event) 2972 { 2973 if (extended_time(event)) 2974 event = skip_time_extend(event); 2975 2976 /* array[0] holds the actual length for the discarded event */ 2977 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 2978 event->type_len = RINGBUF_TYPE_PADDING; 2979 /* time delta must be non zero */ 2980 if (!event->time_delta) 2981 event->time_delta = 1; 2982 } 2983 2984 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 2985 { 2986 local_inc(&cpu_buffer->entries); 2987 rb_end_commit(cpu_buffer); 2988 } 2989 2990 static __always_inline void 2991 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 2992 { 2993 if (buffer->irq_work.waiters_pending) { 2994 buffer->irq_work.waiters_pending = false; 2995 /* irq_work_queue() supplies it's own memory barriers */ 2996 irq_work_queue(&buffer->irq_work.work); 2997 } 2998 2999 if (cpu_buffer->irq_work.waiters_pending) { 3000 cpu_buffer->irq_work.waiters_pending = false; 3001 /* irq_work_queue() supplies it's own memory barriers */ 3002 irq_work_queue(&cpu_buffer->irq_work.work); 3003 } 3004 3005 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 3006 return; 3007 3008 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 3009 return; 3010 3011 if (!cpu_buffer->irq_work.full_waiters_pending) 3012 return; 3013 3014 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 3015 3016 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 3017 return; 3018 3019 cpu_buffer->irq_work.wakeup_full = true; 3020 cpu_buffer->irq_work.full_waiters_pending = false; 3021 /* irq_work_queue() supplies it's own memory barriers */ 3022 irq_work_queue(&cpu_buffer->irq_work.work); 3023 } 3024 3025 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 3026 # define do_ring_buffer_record_recursion() \ 3027 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 3028 #else 3029 # define do_ring_buffer_record_recursion() do { } while (0) 3030 #endif 3031 3032 /* 3033 * The lock and unlock are done within a preempt disable section. 3034 * The current_context per_cpu variable can only be modified 3035 * by the current task between lock and unlock. But it can 3036 * be modified more than once via an interrupt. To pass this 3037 * information from the lock to the unlock without having to 3038 * access the 'in_interrupt()' functions again (which do show 3039 * a bit of overhead in something as critical as function tracing, 3040 * we use a bitmask trick. 3041 * 3042 * bit 1 = NMI context 3043 * bit 2 = IRQ context 3044 * bit 3 = SoftIRQ context 3045 * bit 4 = normal context. 3046 * 3047 * This works because this is the order of contexts that can 3048 * preempt other contexts. A SoftIRQ never preempts an IRQ 3049 * context. 3050 * 3051 * When the context is determined, the corresponding bit is 3052 * checked and set (if it was set, then a recursion of that context 3053 * happened). 3054 * 3055 * On unlock, we need to clear this bit. To do so, just subtract 3056 * 1 from the current_context and AND it to itself. 3057 * 3058 * (binary) 3059 * 101 - 1 = 100 3060 * 101 & 100 = 100 (clearing bit zero) 3061 * 3062 * 1010 - 1 = 1001 3063 * 1010 & 1001 = 1000 (clearing bit 1) 3064 * 3065 * The least significant bit can be cleared this way, and it 3066 * just so happens that it is the same bit corresponding to 3067 * the current context. 3068 * 3069 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3070 * is set when a recursion is detected at the current context, and if 3071 * the TRANSITION bit is already set, it will fail the recursion. 3072 * This is needed because there's a lag between the changing of 3073 * interrupt context and updating the preempt count. In this case, 3074 * a false positive will be found. To handle this, one extra recursion 3075 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3076 * bit is already set, then it is considered a recursion and the function 3077 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3078 * 3079 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3080 * to be cleared. Even if it wasn't the context that set it. That is, 3081 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3082 * is called before preempt_count() is updated, since the check will 3083 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3084 * NMI then comes in, it will set the NMI bit, but when the NMI code 3085 * does the trace_recursive_unlock() it will clear the TRANSITION bit 3086 * and leave the NMI bit set. But this is fine, because the interrupt 3087 * code that set the TRANSITION bit will then clear the NMI bit when it 3088 * calls trace_recursive_unlock(). If another NMI comes in, it will 3089 * set the TRANSITION bit and continue. 3090 * 3091 * Note: The TRANSITION bit only handles a single transition between context. 3092 */ 3093 3094 static __always_inline bool 3095 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 3096 { 3097 unsigned int val = cpu_buffer->current_context; 3098 int bit = interrupt_context_level(); 3099 3100 bit = RB_CTX_NORMAL - bit; 3101 3102 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 3103 /* 3104 * It is possible that this was called by transitioning 3105 * between interrupt context, and preempt_count() has not 3106 * been updated yet. In this case, use the TRANSITION bit. 3107 */ 3108 bit = RB_CTX_TRANSITION; 3109 if (val & (1 << (bit + cpu_buffer->nest))) { 3110 do_ring_buffer_record_recursion(); 3111 return true; 3112 } 3113 } 3114 3115 val |= (1 << (bit + cpu_buffer->nest)); 3116 cpu_buffer->current_context = val; 3117 3118 return false; 3119 } 3120 3121 static __always_inline void 3122 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 3123 { 3124 cpu_buffer->current_context &= 3125 cpu_buffer->current_context - (1 << cpu_buffer->nest); 3126 } 3127 3128 /* The recursive locking above uses 5 bits */ 3129 #define NESTED_BITS 5 3130 3131 /** 3132 * ring_buffer_nest_start - Allow to trace while nested 3133 * @buffer: The ring buffer to modify 3134 * 3135 * The ring buffer has a safety mechanism to prevent recursion. 3136 * But there may be a case where a trace needs to be done while 3137 * tracing something else. In this case, calling this function 3138 * will allow this function to nest within a currently active 3139 * ring_buffer_lock_reserve(). 3140 * 3141 * Call this function before calling another ring_buffer_lock_reserve() and 3142 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 3143 */ 3144 void ring_buffer_nest_start(struct trace_buffer *buffer) 3145 { 3146 struct ring_buffer_per_cpu *cpu_buffer; 3147 int cpu; 3148 3149 /* Enabled by ring_buffer_nest_end() */ 3150 preempt_disable_notrace(); 3151 cpu = raw_smp_processor_id(); 3152 cpu_buffer = buffer->buffers[cpu]; 3153 /* This is the shift value for the above recursive locking */ 3154 cpu_buffer->nest += NESTED_BITS; 3155 } 3156 3157 /** 3158 * ring_buffer_nest_end - Allow to trace while nested 3159 * @buffer: The ring buffer to modify 3160 * 3161 * Must be called after ring_buffer_nest_start() and after the 3162 * ring_buffer_unlock_commit(). 3163 */ 3164 void ring_buffer_nest_end(struct trace_buffer *buffer) 3165 { 3166 struct ring_buffer_per_cpu *cpu_buffer; 3167 int cpu; 3168 3169 /* disabled by ring_buffer_nest_start() */ 3170 cpu = raw_smp_processor_id(); 3171 cpu_buffer = buffer->buffers[cpu]; 3172 /* This is the shift value for the above recursive locking */ 3173 cpu_buffer->nest -= NESTED_BITS; 3174 preempt_enable_notrace(); 3175 } 3176 3177 /** 3178 * ring_buffer_unlock_commit - commit a reserved 3179 * @buffer: The buffer to commit to 3180 * 3181 * This commits the data to the ring buffer, and releases any locks held. 3182 * 3183 * Must be paired with ring_buffer_lock_reserve. 3184 */ 3185 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 3186 { 3187 struct ring_buffer_per_cpu *cpu_buffer; 3188 int cpu = raw_smp_processor_id(); 3189 3190 cpu_buffer = buffer->buffers[cpu]; 3191 3192 rb_commit(cpu_buffer); 3193 3194 rb_wakeups(buffer, cpu_buffer); 3195 3196 trace_recursive_unlock(cpu_buffer); 3197 3198 preempt_enable_notrace(); 3199 3200 return 0; 3201 } 3202 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 3203 3204 /* Special value to validate all deltas on a page. */ 3205 #define CHECK_FULL_PAGE 1L 3206 3207 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 3208 3209 static const char *show_irq_str(int bits) 3210 { 3211 const char *type[] = { 3212 ".", // 0 3213 "s", // 1 3214 "h", // 2 3215 "Hs", // 3 3216 "n", // 4 3217 "Ns", // 5 3218 "Nh", // 6 3219 "NHs", // 7 3220 }; 3221 3222 return type[bits]; 3223 } 3224 3225 /* Assume this is an trace event */ 3226 static const char *show_flags(struct ring_buffer_event *event) 3227 { 3228 struct trace_entry *entry; 3229 int bits = 0; 3230 3231 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 3232 return "X"; 3233 3234 entry = ring_buffer_event_data(event); 3235 3236 if (entry->flags & TRACE_FLAG_SOFTIRQ) 3237 bits |= 1; 3238 3239 if (entry->flags & TRACE_FLAG_HARDIRQ) 3240 bits |= 2; 3241 3242 if (entry->flags & TRACE_FLAG_NMI) 3243 bits |= 4; 3244 3245 return show_irq_str(bits); 3246 } 3247 3248 static const char *show_irq(struct ring_buffer_event *event) 3249 { 3250 struct trace_entry *entry; 3251 3252 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 3253 return ""; 3254 3255 entry = ring_buffer_event_data(event); 3256 if (entry->flags & TRACE_FLAG_IRQS_OFF) 3257 return "d"; 3258 return ""; 3259 } 3260 3261 static const char *show_interrupt_level(void) 3262 { 3263 unsigned long pc = preempt_count(); 3264 unsigned char level = 0; 3265 3266 if (pc & SOFTIRQ_OFFSET) 3267 level |= 1; 3268 3269 if (pc & HARDIRQ_MASK) 3270 level |= 2; 3271 3272 if (pc & NMI_MASK) 3273 level |= 4; 3274 3275 return show_irq_str(level); 3276 } 3277 3278 static void dump_buffer_page(struct buffer_data_page *bpage, 3279 struct rb_event_info *info, 3280 unsigned long tail) 3281 { 3282 struct ring_buffer_event *event; 3283 u64 ts, delta; 3284 int e; 3285 3286 ts = bpage->time_stamp; 3287 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 3288 3289 for (e = 0; e < tail; e += rb_event_length(event)) { 3290 3291 event = (struct ring_buffer_event *)(bpage->data + e); 3292 3293 switch (event->type_len) { 3294 3295 case RINGBUF_TYPE_TIME_EXTEND: 3296 delta = rb_event_time_stamp(event); 3297 ts += delta; 3298 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 3299 e, ts, delta); 3300 break; 3301 3302 case RINGBUF_TYPE_TIME_STAMP: 3303 delta = rb_event_time_stamp(event); 3304 ts = rb_fix_abs_ts(delta, ts); 3305 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 3306 e, ts, delta); 3307 break; 3308 3309 case RINGBUF_TYPE_PADDING: 3310 ts += event->time_delta; 3311 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 3312 e, ts, event->time_delta); 3313 break; 3314 3315 case RINGBUF_TYPE_DATA: 3316 ts += event->time_delta; 3317 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 3318 e, ts, event->time_delta, 3319 show_flags(event), show_irq(event)); 3320 break; 3321 3322 default: 3323 break; 3324 } 3325 } 3326 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 3327 } 3328 3329 static DEFINE_PER_CPU(atomic_t, checking); 3330 static atomic_t ts_dump; 3331 3332 #define buffer_warn_return(fmt, ...) \ 3333 do { \ 3334 /* If another report is happening, ignore this one */ \ 3335 if (atomic_inc_return(&ts_dump) != 1) { \ 3336 atomic_dec(&ts_dump); \ 3337 goto out; \ 3338 } \ 3339 atomic_inc(&cpu_buffer->record_disabled); \ 3340 pr_warn(fmt, ##__VA_ARGS__); \ 3341 dump_buffer_page(bpage, info, tail); \ 3342 atomic_dec(&ts_dump); \ 3343 /* There's some cases in boot up that this can happen */ \ 3344 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 3345 /* Do not re-enable checking */ \ 3346 return; \ 3347 } while (0) 3348 3349 /* 3350 * Check if the current event time stamp matches the deltas on 3351 * the buffer page. 3352 */ 3353 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3354 struct rb_event_info *info, 3355 unsigned long tail) 3356 { 3357 struct ring_buffer_event *event; 3358 struct buffer_data_page *bpage; 3359 u64 ts, delta; 3360 bool full = false; 3361 int e; 3362 3363 bpage = info->tail_page->page; 3364 3365 if (tail == CHECK_FULL_PAGE) { 3366 full = true; 3367 tail = local_read(&bpage->commit); 3368 } else if (info->add_timestamp & 3369 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 3370 /* Ignore events with absolute time stamps */ 3371 return; 3372 } 3373 3374 /* 3375 * Do not check the first event (skip possible extends too). 3376 * Also do not check if previous events have not been committed. 3377 */ 3378 if (tail <= 8 || tail > local_read(&bpage->commit)) 3379 return; 3380 3381 /* 3382 * If this interrupted another event, 3383 */ 3384 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 3385 goto out; 3386 3387 ts = bpage->time_stamp; 3388 3389 for (e = 0; e < tail; e += rb_event_length(event)) { 3390 3391 event = (struct ring_buffer_event *)(bpage->data + e); 3392 3393 switch (event->type_len) { 3394 3395 case RINGBUF_TYPE_TIME_EXTEND: 3396 delta = rb_event_time_stamp(event); 3397 ts += delta; 3398 break; 3399 3400 case RINGBUF_TYPE_TIME_STAMP: 3401 delta = rb_event_time_stamp(event); 3402 delta = rb_fix_abs_ts(delta, ts); 3403 if (delta < ts) { 3404 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 3405 cpu_buffer->cpu, ts, delta); 3406 } 3407 ts = delta; 3408 break; 3409 3410 case RINGBUF_TYPE_PADDING: 3411 if (event->time_delta == 1) 3412 break; 3413 fallthrough; 3414 case RINGBUF_TYPE_DATA: 3415 ts += event->time_delta; 3416 break; 3417 3418 default: 3419 RB_WARN_ON(cpu_buffer, 1); 3420 } 3421 } 3422 if ((full && ts > info->ts) || 3423 (!full && ts + info->delta != info->ts)) { 3424 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 3425 cpu_buffer->cpu, 3426 ts + info->delta, info->ts, info->delta, 3427 info->before, info->after, 3428 full ? " (full)" : "", show_interrupt_level()); 3429 } 3430 out: 3431 atomic_dec(this_cpu_ptr(&checking)); 3432 } 3433 #else 3434 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3435 struct rb_event_info *info, 3436 unsigned long tail) 3437 { 3438 } 3439 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 3440 3441 static struct ring_buffer_event * 3442 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 3443 struct rb_event_info *info) 3444 { 3445 struct ring_buffer_event *event; 3446 struct buffer_page *tail_page; 3447 unsigned long tail, write, w; 3448 3449 /* Don't let the compiler play games with cpu_buffer->tail_page */ 3450 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 3451 3452 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 3453 barrier(); 3454 rb_time_read(&cpu_buffer->before_stamp, &info->before); 3455 rb_time_read(&cpu_buffer->write_stamp, &info->after); 3456 barrier(); 3457 info->ts = rb_time_stamp(cpu_buffer->buffer); 3458 3459 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 3460 info->delta = info->ts; 3461 } else { 3462 /* 3463 * If interrupting an event time update, we may need an 3464 * absolute timestamp. 3465 * Don't bother if this is the start of a new page (w == 0). 3466 */ 3467 if (!w) { 3468 /* Use the sub-buffer timestamp */ 3469 info->delta = 0; 3470 } else if (unlikely(info->before != info->after)) { 3471 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 3472 info->length += RB_LEN_TIME_EXTEND; 3473 } else { 3474 info->delta = info->ts - info->after; 3475 if (unlikely(test_time_stamp(info->delta))) { 3476 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 3477 info->length += RB_LEN_TIME_EXTEND; 3478 } 3479 } 3480 } 3481 3482 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 3483 3484 /*C*/ write = local_add_return(info->length, &tail_page->write); 3485 3486 /* set write to only the index of the write */ 3487 write &= RB_WRITE_MASK; 3488 3489 tail = write - info->length; 3490 3491 /* See if we shot pass the end of this buffer page */ 3492 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 3493 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 3494 return rb_move_tail(cpu_buffer, tail, info); 3495 } 3496 3497 if (likely(tail == w)) { 3498 /* Nothing interrupted us between A and C */ 3499 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 3500 /* 3501 * If something came in between C and D, the write stamp 3502 * may now not be in sync. But that's fine as the before_stamp 3503 * will be different and then next event will just be forced 3504 * to use an absolute timestamp. 3505 */ 3506 if (likely(!(info->add_timestamp & 3507 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3508 /* This did not interrupt any time update */ 3509 info->delta = info->ts - info->after; 3510 else 3511 /* Just use full timestamp for interrupting event */ 3512 info->delta = info->ts; 3513 check_buffer(cpu_buffer, info, tail); 3514 } else { 3515 u64 ts; 3516 /* SLOW PATH - Interrupted between A and C */ 3517 3518 /* Save the old before_stamp */ 3519 rb_time_read(&cpu_buffer->before_stamp, &info->before); 3520 3521 /* 3522 * Read a new timestamp and update the before_stamp to make 3523 * the next event after this one force using an absolute 3524 * timestamp. This is in case an interrupt were to come in 3525 * between E and F. 3526 */ 3527 ts = rb_time_stamp(cpu_buffer->buffer); 3528 rb_time_set(&cpu_buffer->before_stamp, ts); 3529 3530 barrier(); 3531 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 3532 barrier(); 3533 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 3534 info->after == info->before && info->after < ts) { 3535 /* 3536 * Nothing came after this event between C and F, it is 3537 * safe to use info->after for the delta as it 3538 * matched info->before and is still valid. 3539 */ 3540 info->delta = ts - info->after; 3541 } else { 3542 /* 3543 * Interrupted between C and F: 3544 * Lost the previous events time stamp. Just set the 3545 * delta to zero, and this will be the same time as 3546 * the event this event interrupted. And the events that 3547 * came after this will still be correct (as they would 3548 * have built their delta on the previous event. 3549 */ 3550 info->delta = 0; 3551 } 3552 info->ts = ts; 3553 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 3554 } 3555 3556 /* 3557 * If this is the first commit on the page, then it has the same 3558 * timestamp as the page itself. 3559 */ 3560 if (unlikely(!tail && !(info->add_timestamp & 3561 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3562 info->delta = 0; 3563 3564 /* We reserved something on the buffer */ 3565 3566 event = __rb_page_index(tail_page, tail); 3567 rb_update_event(cpu_buffer, event, info); 3568 3569 local_inc(&tail_page->entries); 3570 3571 /* 3572 * If this is the first commit on the page, then update 3573 * its timestamp. 3574 */ 3575 if (unlikely(!tail)) 3576 tail_page->page->time_stamp = info->ts; 3577 3578 /* account for these added bytes */ 3579 local_add(info->length, &cpu_buffer->entries_bytes); 3580 3581 return event; 3582 } 3583 3584 static __always_inline struct ring_buffer_event * 3585 rb_reserve_next_event(struct trace_buffer *buffer, 3586 struct ring_buffer_per_cpu *cpu_buffer, 3587 unsigned long length) 3588 { 3589 struct ring_buffer_event *event; 3590 struct rb_event_info info; 3591 int nr_loops = 0; 3592 int add_ts_default; 3593 3594 /* ring buffer does cmpxchg, make sure it is safe in NMI context */ 3595 if (!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) && 3596 (unlikely(in_nmi()))) { 3597 return NULL; 3598 } 3599 3600 rb_start_commit(cpu_buffer); 3601 /* The commit page can not change after this */ 3602 3603 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3604 /* 3605 * Due to the ability to swap a cpu buffer from a buffer 3606 * it is possible it was swapped before we committed. 3607 * (committing stops a swap). We check for it here and 3608 * if it happened, we have to fail the write. 3609 */ 3610 barrier(); 3611 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 3612 local_dec(&cpu_buffer->committing); 3613 local_dec(&cpu_buffer->commits); 3614 return NULL; 3615 } 3616 #endif 3617 3618 info.length = rb_calculate_event_length(length); 3619 3620 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 3621 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 3622 info.length += RB_LEN_TIME_EXTEND; 3623 if (info.length > cpu_buffer->buffer->max_data_size) 3624 goto out_fail; 3625 } else { 3626 add_ts_default = RB_ADD_STAMP_NONE; 3627 } 3628 3629 again: 3630 info.add_timestamp = add_ts_default; 3631 info.delta = 0; 3632 3633 /* 3634 * We allow for interrupts to reenter here and do a trace. 3635 * If one does, it will cause this original code to loop 3636 * back here. Even with heavy interrupts happening, this 3637 * should only happen a few times in a row. If this happens 3638 * 1000 times in a row, there must be either an interrupt 3639 * storm or we have something buggy. 3640 * Bail! 3641 */ 3642 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 3643 goto out_fail; 3644 3645 event = __rb_reserve_next(cpu_buffer, &info); 3646 3647 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 3648 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 3649 info.length -= RB_LEN_TIME_EXTEND; 3650 goto again; 3651 } 3652 3653 if (likely(event)) 3654 return event; 3655 out_fail: 3656 rb_end_commit(cpu_buffer); 3657 return NULL; 3658 } 3659 3660 /** 3661 * ring_buffer_lock_reserve - reserve a part of the buffer 3662 * @buffer: the ring buffer to reserve from 3663 * @length: the length of the data to reserve (excluding event header) 3664 * 3665 * Returns a reserved event on the ring buffer to copy directly to. 3666 * The user of this interface will need to get the body to write into 3667 * and can use the ring_buffer_event_data() interface. 3668 * 3669 * The length is the length of the data needed, not the event length 3670 * which also includes the event header. 3671 * 3672 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 3673 * If NULL is returned, then nothing has been allocated or locked. 3674 */ 3675 struct ring_buffer_event * 3676 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 3677 { 3678 struct ring_buffer_per_cpu *cpu_buffer; 3679 struct ring_buffer_event *event; 3680 int cpu; 3681 3682 /* If we are tracing schedule, we don't want to recurse */ 3683 preempt_disable_notrace(); 3684 3685 if (unlikely(atomic_read(&buffer->record_disabled))) 3686 goto out; 3687 3688 cpu = raw_smp_processor_id(); 3689 3690 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 3691 goto out; 3692 3693 cpu_buffer = buffer->buffers[cpu]; 3694 3695 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 3696 goto out; 3697 3698 if (unlikely(length > buffer->max_data_size)) 3699 goto out; 3700 3701 if (unlikely(trace_recursive_lock(cpu_buffer))) 3702 goto out; 3703 3704 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3705 if (!event) 3706 goto out_unlock; 3707 3708 return event; 3709 3710 out_unlock: 3711 trace_recursive_unlock(cpu_buffer); 3712 out: 3713 preempt_enable_notrace(); 3714 return NULL; 3715 } 3716 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 3717 3718 /* 3719 * Decrement the entries to the page that an event is on. 3720 * The event does not even need to exist, only the pointer 3721 * to the page it is on. This may only be called before the commit 3722 * takes place. 3723 */ 3724 static inline void 3725 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 3726 struct ring_buffer_event *event) 3727 { 3728 unsigned long addr = (unsigned long)event; 3729 struct buffer_page *bpage = cpu_buffer->commit_page; 3730 struct buffer_page *start; 3731 3732 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3733 3734 /* Do the likely case first */ 3735 if (likely(bpage->page == (void *)addr)) { 3736 local_dec(&bpage->entries); 3737 return; 3738 } 3739 3740 /* 3741 * Because the commit page may be on the reader page we 3742 * start with the next page and check the end loop there. 3743 */ 3744 rb_inc_page(&bpage); 3745 start = bpage; 3746 do { 3747 if (bpage->page == (void *)addr) { 3748 local_dec(&bpage->entries); 3749 return; 3750 } 3751 rb_inc_page(&bpage); 3752 } while (bpage != start); 3753 3754 /* commit not part of this buffer?? */ 3755 RB_WARN_ON(cpu_buffer, 1); 3756 } 3757 3758 /** 3759 * ring_buffer_discard_commit - discard an event that has not been committed 3760 * @buffer: the ring buffer 3761 * @event: non committed event to discard 3762 * 3763 * Sometimes an event that is in the ring buffer needs to be ignored. 3764 * This function lets the user discard an event in the ring buffer 3765 * and then that event will not be read later. 3766 * 3767 * This function only works if it is called before the item has been 3768 * committed. It will try to free the event from the ring buffer 3769 * if another event has not been added behind it. 3770 * 3771 * If another event has been added behind it, it will set the event 3772 * up as discarded, and perform the commit. 3773 * 3774 * If this function is called, do not call ring_buffer_unlock_commit on 3775 * the event. 3776 */ 3777 void ring_buffer_discard_commit(struct trace_buffer *buffer, 3778 struct ring_buffer_event *event) 3779 { 3780 struct ring_buffer_per_cpu *cpu_buffer; 3781 int cpu; 3782 3783 /* The event is discarded regardless */ 3784 rb_event_discard(event); 3785 3786 cpu = smp_processor_id(); 3787 cpu_buffer = buffer->buffers[cpu]; 3788 3789 /* 3790 * This must only be called if the event has not been 3791 * committed yet. Thus we can assume that preemption 3792 * is still disabled. 3793 */ 3794 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 3795 3796 rb_decrement_entry(cpu_buffer, event); 3797 if (rb_try_to_discard(cpu_buffer, event)) 3798 goto out; 3799 3800 out: 3801 rb_end_commit(cpu_buffer); 3802 3803 trace_recursive_unlock(cpu_buffer); 3804 3805 preempt_enable_notrace(); 3806 3807 } 3808 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 3809 3810 /** 3811 * ring_buffer_write - write data to the buffer without reserving 3812 * @buffer: The ring buffer to write to. 3813 * @length: The length of the data being written (excluding the event header) 3814 * @data: The data to write to the buffer. 3815 * 3816 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 3817 * one function. If you already have the data to write to the buffer, it 3818 * may be easier to simply call this function. 3819 * 3820 * Note, like ring_buffer_lock_reserve, the length is the length of the data 3821 * and not the length of the event which would hold the header. 3822 */ 3823 int ring_buffer_write(struct trace_buffer *buffer, 3824 unsigned long length, 3825 void *data) 3826 { 3827 struct ring_buffer_per_cpu *cpu_buffer; 3828 struct ring_buffer_event *event; 3829 void *body; 3830 int ret = -EBUSY; 3831 int cpu; 3832 3833 preempt_disable_notrace(); 3834 3835 if (atomic_read(&buffer->record_disabled)) 3836 goto out; 3837 3838 cpu = raw_smp_processor_id(); 3839 3840 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3841 goto out; 3842 3843 cpu_buffer = buffer->buffers[cpu]; 3844 3845 if (atomic_read(&cpu_buffer->record_disabled)) 3846 goto out; 3847 3848 if (length > buffer->max_data_size) 3849 goto out; 3850 3851 if (unlikely(trace_recursive_lock(cpu_buffer))) 3852 goto out; 3853 3854 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3855 if (!event) 3856 goto out_unlock; 3857 3858 body = rb_event_data(event); 3859 3860 memcpy(body, data, length); 3861 3862 rb_commit(cpu_buffer); 3863 3864 rb_wakeups(buffer, cpu_buffer); 3865 3866 ret = 0; 3867 3868 out_unlock: 3869 trace_recursive_unlock(cpu_buffer); 3870 3871 out: 3872 preempt_enable_notrace(); 3873 3874 return ret; 3875 } 3876 EXPORT_SYMBOL_GPL(ring_buffer_write); 3877 3878 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 3879 { 3880 struct buffer_page *reader = cpu_buffer->reader_page; 3881 struct buffer_page *head = rb_set_head_page(cpu_buffer); 3882 struct buffer_page *commit = cpu_buffer->commit_page; 3883 3884 /* In case of error, head will be NULL */ 3885 if (unlikely(!head)) 3886 return true; 3887 3888 /* Reader should exhaust content in reader page */ 3889 if (reader->read != rb_page_commit(reader)) 3890 return false; 3891 3892 /* 3893 * If writers are committing on the reader page, knowing all 3894 * committed content has been read, the ring buffer is empty. 3895 */ 3896 if (commit == reader) 3897 return true; 3898 3899 /* 3900 * If writers are committing on a page other than reader page 3901 * and head page, there should always be content to read. 3902 */ 3903 if (commit != head) 3904 return false; 3905 3906 /* 3907 * Writers are committing on the head page, we just need 3908 * to care about there're committed data, and the reader will 3909 * swap reader page with head page when it is to read data. 3910 */ 3911 return rb_page_commit(commit) == 0; 3912 } 3913 3914 /** 3915 * ring_buffer_record_disable - stop all writes into the buffer 3916 * @buffer: The ring buffer to stop writes to. 3917 * 3918 * This prevents all writes to the buffer. Any attempt to write 3919 * to the buffer after this will fail and return NULL. 3920 * 3921 * The caller should call synchronize_rcu() after this. 3922 */ 3923 void ring_buffer_record_disable(struct trace_buffer *buffer) 3924 { 3925 atomic_inc(&buffer->record_disabled); 3926 } 3927 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 3928 3929 /** 3930 * ring_buffer_record_enable - enable writes to the buffer 3931 * @buffer: The ring buffer to enable writes 3932 * 3933 * Note, multiple disables will need the same number of enables 3934 * to truly enable the writing (much like preempt_disable). 3935 */ 3936 void ring_buffer_record_enable(struct trace_buffer *buffer) 3937 { 3938 atomic_dec(&buffer->record_disabled); 3939 } 3940 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 3941 3942 /** 3943 * ring_buffer_record_off - stop all writes into the buffer 3944 * @buffer: The ring buffer to stop writes to. 3945 * 3946 * This prevents all writes to the buffer. Any attempt to write 3947 * to the buffer after this will fail and return NULL. 3948 * 3949 * This is different than ring_buffer_record_disable() as 3950 * it works like an on/off switch, where as the disable() version 3951 * must be paired with a enable(). 3952 */ 3953 void ring_buffer_record_off(struct trace_buffer *buffer) 3954 { 3955 unsigned int rd; 3956 unsigned int new_rd; 3957 3958 rd = atomic_read(&buffer->record_disabled); 3959 do { 3960 new_rd = rd | RB_BUFFER_OFF; 3961 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 3962 } 3963 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 3964 3965 /** 3966 * ring_buffer_record_on - restart writes into the buffer 3967 * @buffer: The ring buffer to start writes to. 3968 * 3969 * This enables all writes to the buffer that was disabled by 3970 * ring_buffer_record_off(). 3971 * 3972 * This is different than ring_buffer_record_enable() as 3973 * it works like an on/off switch, where as the enable() version 3974 * must be paired with a disable(). 3975 */ 3976 void ring_buffer_record_on(struct trace_buffer *buffer) 3977 { 3978 unsigned int rd; 3979 unsigned int new_rd; 3980 3981 rd = atomic_read(&buffer->record_disabled); 3982 do { 3983 new_rd = rd & ~RB_BUFFER_OFF; 3984 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 3985 } 3986 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 3987 3988 /** 3989 * ring_buffer_record_is_on - return true if the ring buffer can write 3990 * @buffer: The ring buffer to see if write is enabled 3991 * 3992 * Returns true if the ring buffer is in a state that it accepts writes. 3993 */ 3994 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 3995 { 3996 return !atomic_read(&buffer->record_disabled); 3997 } 3998 3999 /** 4000 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4001 * @buffer: The ring buffer to see if write is set enabled 4002 * 4003 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4004 * Note that this does NOT mean it is in a writable state. 4005 * 4006 * It may return true when the ring buffer has been disabled by 4007 * ring_buffer_record_disable(), as that is a temporary disabling of 4008 * the ring buffer. 4009 */ 4010 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4011 { 4012 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4013 } 4014 4015 /** 4016 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4017 * @buffer: The ring buffer to stop writes to. 4018 * @cpu: The CPU buffer to stop 4019 * 4020 * This prevents all writes to the buffer. Any attempt to write 4021 * to the buffer after this will fail and return NULL. 4022 * 4023 * The caller should call synchronize_rcu() after this. 4024 */ 4025 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 4026 { 4027 struct ring_buffer_per_cpu *cpu_buffer; 4028 4029 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4030 return; 4031 4032 cpu_buffer = buffer->buffers[cpu]; 4033 atomic_inc(&cpu_buffer->record_disabled); 4034 } 4035 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 4036 4037 /** 4038 * ring_buffer_record_enable_cpu - enable writes to the buffer 4039 * @buffer: The ring buffer to enable writes 4040 * @cpu: The CPU to enable. 4041 * 4042 * Note, multiple disables will need the same number of enables 4043 * to truly enable the writing (much like preempt_disable). 4044 */ 4045 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 4046 { 4047 struct ring_buffer_per_cpu *cpu_buffer; 4048 4049 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4050 return; 4051 4052 cpu_buffer = buffer->buffers[cpu]; 4053 atomic_dec(&cpu_buffer->record_disabled); 4054 } 4055 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 4056 4057 /* 4058 * The total entries in the ring buffer is the running counter 4059 * of entries entered into the ring buffer, minus the sum of 4060 * the entries read from the ring buffer and the number of 4061 * entries that were overwritten. 4062 */ 4063 static inline unsigned long 4064 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4065 { 4066 return local_read(&cpu_buffer->entries) - 4067 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4068 } 4069 4070 /** 4071 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 4072 * @buffer: The ring buffer 4073 * @cpu: The per CPU buffer to read from. 4074 */ 4075 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 4076 { 4077 unsigned long flags; 4078 struct ring_buffer_per_cpu *cpu_buffer; 4079 struct buffer_page *bpage; 4080 u64 ret = 0; 4081 4082 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4083 return 0; 4084 4085 cpu_buffer = buffer->buffers[cpu]; 4086 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4087 /* 4088 * if the tail is on reader_page, oldest time stamp is on the reader 4089 * page 4090 */ 4091 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 4092 bpage = cpu_buffer->reader_page; 4093 else 4094 bpage = rb_set_head_page(cpu_buffer); 4095 if (bpage) 4096 ret = bpage->page->time_stamp; 4097 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4098 4099 return ret; 4100 } 4101 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 4102 4103 /** 4104 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 4105 * @buffer: The ring buffer 4106 * @cpu: The per CPU buffer to read from. 4107 */ 4108 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 4109 { 4110 struct ring_buffer_per_cpu *cpu_buffer; 4111 unsigned long ret; 4112 4113 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4114 return 0; 4115 4116 cpu_buffer = buffer->buffers[cpu]; 4117 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 4118 4119 return ret; 4120 } 4121 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 4122 4123 /** 4124 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 4125 * @buffer: The ring buffer 4126 * @cpu: The per CPU buffer to get the entries from. 4127 */ 4128 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 4129 { 4130 struct ring_buffer_per_cpu *cpu_buffer; 4131 4132 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4133 return 0; 4134 4135 cpu_buffer = buffer->buffers[cpu]; 4136 4137 return rb_num_of_entries(cpu_buffer); 4138 } 4139 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 4140 4141 /** 4142 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 4143 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 4144 * @buffer: The ring buffer 4145 * @cpu: The per CPU buffer to get the number of overruns from 4146 */ 4147 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 4148 { 4149 struct ring_buffer_per_cpu *cpu_buffer; 4150 unsigned long ret; 4151 4152 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4153 return 0; 4154 4155 cpu_buffer = buffer->buffers[cpu]; 4156 ret = local_read(&cpu_buffer->overrun); 4157 4158 return ret; 4159 } 4160 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 4161 4162 /** 4163 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 4164 * commits failing due to the buffer wrapping around while there are uncommitted 4165 * events, such as during an interrupt storm. 4166 * @buffer: The ring buffer 4167 * @cpu: The per CPU buffer to get the number of overruns from 4168 */ 4169 unsigned long 4170 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 4171 { 4172 struct ring_buffer_per_cpu *cpu_buffer; 4173 unsigned long ret; 4174 4175 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4176 return 0; 4177 4178 cpu_buffer = buffer->buffers[cpu]; 4179 ret = local_read(&cpu_buffer->commit_overrun); 4180 4181 return ret; 4182 } 4183 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 4184 4185 /** 4186 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 4187 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 4188 * @buffer: The ring buffer 4189 * @cpu: The per CPU buffer to get the number of overruns from 4190 */ 4191 unsigned long 4192 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 4193 { 4194 struct ring_buffer_per_cpu *cpu_buffer; 4195 unsigned long ret; 4196 4197 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4198 return 0; 4199 4200 cpu_buffer = buffer->buffers[cpu]; 4201 ret = local_read(&cpu_buffer->dropped_events); 4202 4203 return ret; 4204 } 4205 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 4206 4207 /** 4208 * ring_buffer_read_events_cpu - get the number of events successfully read 4209 * @buffer: The ring buffer 4210 * @cpu: The per CPU buffer to get the number of events read 4211 */ 4212 unsigned long 4213 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 4214 { 4215 struct ring_buffer_per_cpu *cpu_buffer; 4216 4217 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4218 return 0; 4219 4220 cpu_buffer = buffer->buffers[cpu]; 4221 return cpu_buffer->read; 4222 } 4223 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 4224 4225 /** 4226 * ring_buffer_entries - get the number of entries in a buffer 4227 * @buffer: The ring buffer 4228 * 4229 * Returns the total number of entries in the ring buffer 4230 * (all CPU entries) 4231 */ 4232 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 4233 { 4234 struct ring_buffer_per_cpu *cpu_buffer; 4235 unsigned long entries = 0; 4236 int cpu; 4237 4238 /* if you care about this being correct, lock the buffer */ 4239 for_each_buffer_cpu(buffer, cpu) { 4240 cpu_buffer = buffer->buffers[cpu]; 4241 entries += rb_num_of_entries(cpu_buffer); 4242 } 4243 4244 return entries; 4245 } 4246 EXPORT_SYMBOL_GPL(ring_buffer_entries); 4247 4248 /** 4249 * ring_buffer_overruns - get the number of overruns in buffer 4250 * @buffer: The ring buffer 4251 * 4252 * Returns the total number of overruns in the ring buffer 4253 * (all CPU entries) 4254 */ 4255 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 4256 { 4257 struct ring_buffer_per_cpu *cpu_buffer; 4258 unsigned long overruns = 0; 4259 int cpu; 4260 4261 /* if you care about this being correct, lock the buffer */ 4262 for_each_buffer_cpu(buffer, cpu) { 4263 cpu_buffer = buffer->buffers[cpu]; 4264 overruns += local_read(&cpu_buffer->overrun); 4265 } 4266 4267 return overruns; 4268 } 4269 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 4270 4271 static void rb_iter_reset(struct ring_buffer_iter *iter) 4272 { 4273 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4274 4275 /* Iterator usage is expected to have record disabled */ 4276 iter->head_page = cpu_buffer->reader_page; 4277 iter->head = cpu_buffer->reader_page->read; 4278 iter->next_event = iter->head; 4279 4280 iter->cache_reader_page = iter->head_page; 4281 iter->cache_read = cpu_buffer->read; 4282 iter->cache_pages_removed = cpu_buffer->pages_removed; 4283 4284 if (iter->head) { 4285 iter->read_stamp = cpu_buffer->read_stamp; 4286 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 4287 } else { 4288 iter->read_stamp = iter->head_page->page->time_stamp; 4289 iter->page_stamp = iter->read_stamp; 4290 } 4291 } 4292 4293 /** 4294 * ring_buffer_iter_reset - reset an iterator 4295 * @iter: The iterator to reset 4296 * 4297 * Resets the iterator, so that it will start from the beginning 4298 * again. 4299 */ 4300 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 4301 { 4302 struct ring_buffer_per_cpu *cpu_buffer; 4303 unsigned long flags; 4304 4305 if (!iter) 4306 return; 4307 4308 cpu_buffer = iter->cpu_buffer; 4309 4310 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4311 rb_iter_reset(iter); 4312 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4313 } 4314 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 4315 4316 /** 4317 * ring_buffer_iter_empty - check if an iterator has no more to read 4318 * @iter: The iterator to check 4319 */ 4320 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 4321 { 4322 struct ring_buffer_per_cpu *cpu_buffer; 4323 struct buffer_page *reader; 4324 struct buffer_page *head_page; 4325 struct buffer_page *commit_page; 4326 struct buffer_page *curr_commit_page; 4327 unsigned commit; 4328 u64 curr_commit_ts; 4329 u64 commit_ts; 4330 4331 cpu_buffer = iter->cpu_buffer; 4332 reader = cpu_buffer->reader_page; 4333 head_page = cpu_buffer->head_page; 4334 commit_page = cpu_buffer->commit_page; 4335 commit_ts = commit_page->page->time_stamp; 4336 4337 /* 4338 * When the writer goes across pages, it issues a cmpxchg which 4339 * is a mb(), which will synchronize with the rmb here. 4340 * (see rb_tail_page_update()) 4341 */ 4342 smp_rmb(); 4343 commit = rb_page_commit(commit_page); 4344 /* We want to make sure that the commit page doesn't change */ 4345 smp_rmb(); 4346 4347 /* Make sure commit page didn't change */ 4348 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 4349 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 4350 4351 /* If the commit page changed, then there's more data */ 4352 if (curr_commit_page != commit_page || 4353 curr_commit_ts != commit_ts) 4354 return 0; 4355 4356 /* Still racy, as it may return a false positive, but that's OK */ 4357 return ((iter->head_page == commit_page && iter->head >= commit) || 4358 (iter->head_page == reader && commit_page == head_page && 4359 head_page->read == commit && 4360 iter->head == rb_page_commit(cpu_buffer->reader_page))); 4361 } 4362 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 4363 4364 static void 4365 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 4366 struct ring_buffer_event *event) 4367 { 4368 u64 delta; 4369 4370 switch (event->type_len) { 4371 case RINGBUF_TYPE_PADDING: 4372 return; 4373 4374 case RINGBUF_TYPE_TIME_EXTEND: 4375 delta = rb_event_time_stamp(event); 4376 cpu_buffer->read_stamp += delta; 4377 return; 4378 4379 case RINGBUF_TYPE_TIME_STAMP: 4380 delta = rb_event_time_stamp(event); 4381 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 4382 cpu_buffer->read_stamp = delta; 4383 return; 4384 4385 case RINGBUF_TYPE_DATA: 4386 cpu_buffer->read_stamp += event->time_delta; 4387 return; 4388 4389 default: 4390 RB_WARN_ON(cpu_buffer, 1); 4391 } 4392 } 4393 4394 static void 4395 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 4396 struct ring_buffer_event *event) 4397 { 4398 u64 delta; 4399 4400 switch (event->type_len) { 4401 case RINGBUF_TYPE_PADDING: 4402 return; 4403 4404 case RINGBUF_TYPE_TIME_EXTEND: 4405 delta = rb_event_time_stamp(event); 4406 iter->read_stamp += delta; 4407 return; 4408 4409 case RINGBUF_TYPE_TIME_STAMP: 4410 delta = rb_event_time_stamp(event); 4411 delta = rb_fix_abs_ts(delta, iter->read_stamp); 4412 iter->read_stamp = delta; 4413 return; 4414 4415 case RINGBUF_TYPE_DATA: 4416 iter->read_stamp += event->time_delta; 4417 return; 4418 4419 default: 4420 RB_WARN_ON(iter->cpu_buffer, 1); 4421 } 4422 } 4423 4424 static struct buffer_page * 4425 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 4426 { 4427 struct buffer_page *reader = NULL; 4428 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 4429 unsigned long overwrite; 4430 unsigned long flags; 4431 int nr_loops = 0; 4432 bool ret; 4433 4434 local_irq_save(flags); 4435 arch_spin_lock(&cpu_buffer->lock); 4436 4437 again: 4438 /* 4439 * This should normally only loop twice. But because the 4440 * start of the reader inserts an empty page, it causes 4441 * a case where we will loop three times. There should be no 4442 * reason to loop four times (that I know of). 4443 */ 4444 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 4445 reader = NULL; 4446 goto out; 4447 } 4448 4449 reader = cpu_buffer->reader_page; 4450 4451 /* If there's more to read, return this page */ 4452 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 4453 goto out; 4454 4455 /* Never should we have an index greater than the size */ 4456 if (RB_WARN_ON(cpu_buffer, 4457 cpu_buffer->reader_page->read > rb_page_size(reader))) 4458 goto out; 4459 4460 /* check if we caught up to the tail */ 4461 reader = NULL; 4462 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 4463 goto out; 4464 4465 /* Don't bother swapping if the ring buffer is empty */ 4466 if (rb_num_of_entries(cpu_buffer) == 0) 4467 goto out; 4468 4469 /* 4470 * Reset the reader page to size zero. 4471 */ 4472 local_set(&cpu_buffer->reader_page->write, 0); 4473 local_set(&cpu_buffer->reader_page->entries, 0); 4474 local_set(&cpu_buffer->reader_page->page->commit, 0); 4475 cpu_buffer->reader_page->real_end = 0; 4476 4477 spin: 4478 /* 4479 * Splice the empty reader page into the list around the head. 4480 */ 4481 reader = rb_set_head_page(cpu_buffer); 4482 if (!reader) 4483 goto out; 4484 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 4485 cpu_buffer->reader_page->list.prev = reader->list.prev; 4486 4487 /* 4488 * cpu_buffer->pages just needs to point to the buffer, it 4489 * has no specific buffer page to point to. Lets move it out 4490 * of our way so we don't accidentally swap it. 4491 */ 4492 cpu_buffer->pages = reader->list.prev; 4493 4494 /* The reader page will be pointing to the new head */ 4495 rb_set_list_to_head(&cpu_buffer->reader_page->list); 4496 4497 /* 4498 * We want to make sure we read the overruns after we set up our 4499 * pointers to the next object. The writer side does a 4500 * cmpxchg to cross pages which acts as the mb on the writer 4501 * side. Note, the reader will constantly fail the swap 4502 * while the writer is updating the pointers, so this 4503 * guarantees that the overwrite recorded here is the one we 4504 * want to compare with the last_overrun. 4505 */ 4506 smp_mb(); 4507 overwrite = local_read(&(cpu_buffer->overrun)); 4508 4509 /* 4510 * Here's the tricky part. 4511 * 4512 * We need to move the pointer past the header page. 4513 * But we can only do that if a writer is not currently 4514 * moving it. The page before the header page has the 4515 * flag bit '1' set if it is pointing to the page we want. 4516 * but if the writer is in the process of moving it 4517 * than it will be '2' or already moved '0'. 4518 */ 4519 4520 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 4521 4522 /* 4523 * If we did not convert it, then we must try again. 4524 */ 4525 if (!ret) 4526 goto spin; 4527 4528 /* 4529 * Yay! We succeeded in replacing the page. 4530 * 4531 * Now make the new head point back to the reader page. 4532 */ 4533 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 4534 rb_inc_page(&cpu_buffer->head_page); 4535 4536 local_inc(&cpu_buffer->pages_read); 4537 4538 /* Finally update the reader page to the new head */ 4539 cpu_buffer->reader_page = reader; 4540 cpu_buffer->reader_page->read = 0; 4541 4542 if (overwrite != cpu_buffer->last_overrun) { 4543 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 4544 cpu_buffer->last_overrun = overwrite; 4545 } 4546 4547 goto again; 4548 4549 out: 4550 /* Update the read_stamp on the first event */ 4551 if (reader && reader->read == 0) 4552 cpu_buffer->read_stamp = reader->page->time_stamp; 4553 4554 arch_spin_unlock(&cpu_buffer->lock); 4555 local_irq_restore(flags); 4556 4557 /* 4558 * The writer has preempt disable, wait for it. But not forever 4559 * Although, 1 second is pretty much "forever" 4560 */ 4561 #define USECS_WAIT 1000000 4562 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 4563 /* If the write is past the end of page, a writer is still updating it */ 4564 if (likely(!reader || rb_page_write(reader) <= bsize)) 4565 break; 4566 4567 udelay(1); 4568 4569 /* Get the latest version of the reader write value */ 4570 smp_rmb(); 4571 } 4572 4573 /* The writer is not moving forward? Something is wrong */ 4574 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 4575 reader = NULL; 4576 4577 /* 4578 * Make sure we see any padding after the write update 4579 * (see rb_reset_tail()). 4580 * 4581 * In addition, a writer may be writing on the reader page 4582 * if the page has not been fully filled, so the read barrier 4583 * is also needed to make sure we see the content of what is 4584 * committed by the writer (see rb_set_commit_to_write()). 4585 */ 4586 smp_rmb(); 4587 4588 4589 return reader; 4590 } 4591 4592 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 4593 { 4594 struct ring_buffer_event *event; 4595 struct buffer_page *reader; 4596 unsigned length; 4597 4598 reader = rb_get_reader_page(cpu_buffer); 4599 4600 /* This function should not be called when buffer is empty */ 4601 if (RB_WARN_ON(cpu_buffer, !reader)) 4602 return; 4603 4604 event = rb_reader_event(cpu_buffer); 4605 4606 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 4607 cpu_buffer->read++; 4608 4609 rb_update_read_stamp(cpu_buffer, event); 4610 4611 length = rb_event_length(event); 4612 cpu_buffer->reader_page->read += length; 4613 cpu_buffer->read_bytes += length; 4614 } 4615 4616 static void rb_advance_iter(struct ring_buffer_iter *iter) 4617 { 4618 struct ring_buffer_per_cpu *cpu_buffer; 4619 4620 cpu_buffer = iter->cpu_buffer; 4621 4622 /* If head == next_event then we need to jump to the next event */ 4623 if (iter->head == iter->next_event) { 4624 /* If the event gets overwritten again, there's nothing to do */ 4625 if (rb_iter_head_event(iter) == NULL) 4626 return; 4627 } 4628 4629 iter->head = iter->next_event; 4630 4631 /* 4632 * Check if we are at the end of the buffer. 4633 */ 4634 if (iter->next_event >= rb_page_size(iter->head_page)) { 4635 /* discarded commits can make the page empty */ 4636 if (iter->head_page == cpu_buffer->commit_page) 4637 return; 4638 rb_inc_iter(iter); 4639 return; 4640 } 4641 4642 rb_update_iter_read_stamp(iter, iter->event); 4643 } 4644 4645 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 4646 { 4647 return cpu_buffer->lost_events; 4648 } 4649 4650 static struct ring_buffer_event * 4651 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 4652 unsigned long *lost_events) 4653 { 4654 struct ring_buffer_event *event; 4655 struct buffer_page *reader; 4656 int nr_loops = 0; 4657 4658 if (ts) 4659 *ts = 0; 4660 again: 4661 /* 4662 * We repeat when a time extend is encountered. 4663 * Since the time extend is always attached to a data event, 4664 * we should never loop more than once. 4665 * (We never hit the following condition more than twice). 4666 */ 4667 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 4668 return NULL; 4669 4670 reader = rb_get_reader_page(cpu_buffer); 4671 if (!reader) 4672 return NULL; 4673 4674 event = rb_reader_event(cpu_buffer); 4675 4676 switch (event->type_len) { 4677 case RINGBUF_TYPE_PADDING: 4678 if (rb_null_event(event)) 4679 RB_WARN_ON(cpu_buffer, 1); 4680 /* 4681 * Because the writer could be discarding every 4682 * event it creates (which would probably be bad) 4683 * if we were to go back to "again" then we may never 4684 * catch up, and will trigger the warn on, or lock 4685 * the box. Return the padding, and we will release 4686 * the current locks, and try again. 4687 */ 4688 return event; 4689 4690 case RINGBUF_TYPE_TIME_EXTEND: 4691 /* Internal data, OK to advance */ 4692 rb_advance_reader(cpu_buffer); 4693 goto again; 4694 4695 case RINGBUF_TYPE_TIME_STAMP: 4696 if (ts) { 4697 *ts = rb_event_time_stamp(event); 4698 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 4699 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4700 cpu_buffer->cpu, ts); 4701 } 4702 /* Internal data, OK to advance */ 4703 rb_advance_reader(cpu_buffer); 4704 goto again; 4705 4706 case RINGBUF_TYPE_DATA: 4707 if (ts && !(*ts)) { 4708 *ts = cpu_buffer->read_stamp + event->time_delta; 4709 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4710 cpu_buffer->cpu, ts); 4711 } 4712 if (lost_events) 4713 *lost_events = rb_lost_events(cpu_buffer); 4714 return event; 4715 4716 default: 4717 RB_WARN_ON(cpu_buffer, 1); 4718 } 4719 4720 return NULL; 4721 } 4722 EXPORT_SYMBOL_GPL(ring_buffer_peek); 4723 4724 static struct ring_buffer_event * 4725 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4726 { 4727 struct trace_buffer *buffer; 4728 struct ring_buffer_per_cpu *cpu_buffer; 4729 struct ring_buffer_event *event; 4730 int nr_loops = 0; 4731 4732 if (ts) 4733 *ts = 0; 4734 4735 cpu_buffer = iter->cpu_buffer; 4736 buffer = cpu_buffer->buffer; 4737 4738 /* 4739 * Check if someone performed a consuming read to the buffer 4740 * or removed some pages from the buffer. In these cases, 4741 * iterator was invalidated and we need to reset it. 4742 */ 4743 if (unlikely(iter->cache_read != cpu_buffer->read || 4744 iter->cache_reader_page != cpu_buffer->reader_page || 4745 iter->cache_pages_removed != cpu_buffer->pages_removed)) 4746 rb_iter_reset(iter); 4747 4748 again: 4749 if (ring_buffer_iter_empty(iter)) 4750 return NULL; 4751 4752 /* 4753 * As the writer can mess with what the iterator is trying 4754 * to read, just give up if we fail to get an event after 4755 * three tries. The iterator is not as reliable when reading 4756 * the ring buffer with an active write as the consumer is. 4757 * Do not warn if the three failures is reached. 4758 */ 4759 if (++nr_loops > 3) 4760 return NULL; 4761 4762 if (rb_per_cpu_empty(cpu_buffer)) 4763 return NULL; 4764 4765 if (iter->head >= rb_page_size(iter->head_page)) { 4766 rb_inc_iter(iter); 4767 goto again; 4768 } 4769 4770 event = rb_iter_head_event(iter); 4771 if (!event) 4772 goto again; 4773 4774 switch (event->type_len) { 4775 case RINGBUF_TYPE_PADDING: 4776 if (rb_null_event(event)) { 4777 rb_inc_iter(iter); 4778 goto again; 4779 } 4780 rb_advance_iter(iter); 4781 return event; 4782 4783 case RINGBUF_TYPE_TIME_EXTEND: 4784 /* Internal data, OK to advance */ 4785 rb_advance_iter(iter); 4786 goto again; 4787 4788 case RINGBUF_TYPE_TIME_STAMP: 4789 if (ts) { 4790 *ts = rb_event_time_stamp(event); 4791 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 4792 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4793 cpu_buffer->cpu, ts); 4794 } 4795 /* Internal data, OK to advance */ 4796 rb_advance_iter(iter); 4797 goto again; 4798 4799 case RINGBUF_TYPE_DATA: 4800 if (ts && !(*ts)) { 4801 *ts = iter->read_stamp + event->time_delta; 4802 ring_buffer_normalize_time_stamp(buffer, 4803 cpu_buffer->cpu, ts); 4804 } 4805 return event; 4806 4807 default: 4808 RB_WARN_ON(cpu_buffer, 1); 4809 } 4810 4811 return NULL; 4812 } 4813 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 4814 4815 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 4816 { 4817 if (likely(!in_nmi())) { 4818 raw_spin_lock(&cpu_buffer->reader_lock); 4819 return true; 4820 } 4821 4822 /* 4823 * If an NMI die dumps out the content of the ring buffer 4824 * trylock must be used to prevent a deadlock if the NMI 4825 * preempted a task that holds the ring buffer locks. If 4826 * we get the lock then all is fine, if not, then continue 4827 * to do the read, but this can corrupt the ring buffer, 4828 * so it must be permanently disabled from future writes. 4829 * Reading from NMI is a oneshot deal. 4830 */ 4831 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 4832 return true; 4833 4834 /* Continue without locking, but disable the ring buffer */ 4835 atomic_inc(&cpu_buffer->record_disabled); 4836 return false; 4837 } 4838 4839 static inline void 4840 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 4841 { 4842 if (likely(locked)) 4843 raw_spin_unlock(&cpu_buffer->reader_lock); 4844 } 4845 4846 /** 4847 * ring_buffer_peek - peek at the next event to be read 4848 * @buffer: The ring buffer to read 4849 * @cpu: The cpu to peak at 4850 * @ts: The timestamp counter of this event. 4851 * @lost_events: a variable to store if events were lost (may be NULL) 4852 * 4853 * This will return the event that will be read next, but does 4854 * not consume the data. 4855 */ 4856 struct ring_buffer_event * 4857 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 4858 unsigned long *lost_events) 4859 { 4860 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4861 struct ring_buffer_event *event; 4862 unsigned long flags; 4863 bool dolock; 4864 4865 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4866 return NULL; 4867 4868 again: 4869 local_irq_save(flags); 4870 dolock = rb_reader_lock(cpu_buffer); 4871 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4872 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4873 rb_advance_reader(cpu_buffer); 4874 rb_reader_unlock(cpu_buffer, dolock); 4875 local_irq_restore(flags); 4876 4877 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4878 goto again; 4879 4880 return event; 4881 } 4882 4883 /** ring_buffer_iter_dropped - report if there are dropped events 4884 * @iter: The ring buffer iterator 4885 * 4886 * Returns true if there was dropped events since the last peek. 4887 */ 4888 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 4889 { 4890 bool ret = iter->missed_events != 0; 4891 4892 iter->missed_events = 0; 4893 return ret; 4894 } 4895 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 4896 4897 /** 4898 * ring_buffer_iter_peek - peek at the next event to be read 4899 * @iter: The ring buffer iterator 4900 * @ts: The timestamp counter of this event. 4901 * 4902 * This will return the event that will be read next, but does 4903 * not increment the iterator. 4904 */ 4905 struct ring_buffer_event * 4906 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4907 { 4908 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4909 struct ring_buffer_event *event; 4910 unsigned long flags; 4911 4912 again: 4913 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4914 event = rb_iter_peek(iter, ts); 4915 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4916 4917 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4918 goto again; 4919 4920 return event; 4921 } 4922 4923 /** 4924 * ring_buffer_consume - return an event and consume it 4925 * @buffer: The ring buffer to get the next event from 4926 * @cpu: the cpu to read the buffer from 4927 * @ts: a variable to store the timestamp (may be NULL) 4928 * @lost_events: a variable to store if events were lost (may be NULL) 4929 * 4930 * Returns the next event in the ring buffer, and that event is consumed. 4931 * Meaning, that sequential reads will keep returning a different event, 4932 * and eventually empty the ring buffer if the producer is slower. 4933 */ 4934 struct ring_buffer_event * 4935 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 4936 unsigned long *lost_events) 4937 { 4938 struct ring_buffer_per_cpu *cpu_buffer; 4939 struct ring_buffer_event *event = NULL; 4940 unsigned long flags; 4941 bool dolock; 4942 4943 again: 4944 /* might be called in atomic */ 4945 preempt_disable(); 4946 4947 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4948 goto out; 4949 4950 cpu_buffer = buffer->buffers[cpu]; 4951 local_irq_save(flags); 4952 dolock = rb_reader_lock(cpu_buffer); 4953 4954 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4955 if (event) { 4956 cpu_buffer->lost_events = 0; 4957 rb_advance_reader(cpu_buffer); 4958 } 4959 4960 rb_reader_unlock(cpu_buffer, dolock); 4961 local_irq_restore(flags); 4962 4963 out: 4964 preempt_enable(); 4965 4966 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4967 goto again; 4968 4969 return event; 4970 } 4971 EXPORT_SYMBOL_GPL(ring_buffer_consume); 4972 4973 /** 4974 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 4975 * @buffer: The ring buffer to read from 4976 * @cpu: The cpu buffer to iterate over 4977 * @flags: gfp flags to use for memory allocation 4978 * 4979 * This performs the initial preparations necessary to iterate 4980 * through the buffer. Memory is allocated, buffer recording 4981 * is disabled, and the iterator pointer is returned to the caller. 4982 * 4983 * Disabling buffer recording prevents the reading from being 4984 * corrupted. This is not a consuming read, so a producer is not 4985 * expected. 4986 * 4987 * After a sequence of ring_buffer_read_prepare calls, the user is 4988 * expected to make at least one call to ring_buffer_read_prepare_sync. 4989 * Afterwards, ring_buffer_read_start is invoked to get things going 4990 * for real. 4991 * 4992 * This overall must be paired with ring_buffer_read_finish. 4993 */ 4994 struct ring_buffer_iter * 4995 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 4996 { 4997 struct ring_buffer_per_cpu *cpu_buffer; 4998 struct ring_buffer_iter *iter; 4999 5000 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5001 return NULL; 5002 5003 iter = kzalloc(sizeof(*iter), flags); 5004 if (!iter) 5005 return NULL; 5006 5007 /* Holds the entire event: data and meta data */ 5008 iter->event_size = buffer->subbuf_size; 5009 iter->event = kmalloc(iter->event_size, flags); 5010 if (!iter->event) { 5011 kfree(iter); 5012 return NULL; 5013 } 5014 5015 cpu_buffer = buffer->buffers[cpu]; 5016 5017 iter->cpu_buffer = cpu_buffer; 5018 5019 atomic_inc(&cpu_buffer->resize_disabled); 5020 5021 return iter; 5022 } 5023 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 5024 5025 /** 5026 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 5027 * 5028 * All previously invoked ring_buffer_read_prepare calls to prepare 5029 * iterators will be synchronized. Afterwards, read_buffer_read_start 5030 * calls on those iterators are allowed. 5031 */ 5032 void 5033 ring_buffer_read_prepare_sync(void) 5034 { 5035 synchronize_rcu(); 5036 } 5037 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 5038 5039 /** 5040 * ring_buffer_read_start - start a non consuming read of the buffer 5041 * @iter: The iterator returned by ring_buffer_read_prepare 5042 * 5043 * This finalizes the startup of an iteration through the buffer. 5044 * The iterator comes from a call to ring_buffer_read_prepare and 5045 * an intervening ring_buffer_read_prepare_sync must have been 5046 * performed. 5047 * 5048 * Must be paired with ring_buffer_read_finish. 5049 */ 5050 void 5051 ring_buffer_read_start(struct ring_buffer_iter *iter) 5052 { 5053 struct ring_buffer_per_cpu *cpu_buffer; 5054 unsigned long flags; 5055 5056 if (!iter) 5057 return; 5058 5059 cpu_buffer = iter->cpu_buffer; 5060 5061 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5062 arch_spin_lock(&cpu_buffer->lock); 5063 rb_iter_reset(iter); 5064 arch_spin_unlock(&cpu_buffer->lock); 5065 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5066 } 5067 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5068 5069 /** 5070 * ring_buffer_read_finish - finish reading the iterator of the buffer 5071 * @iter: The iterator retrieved by ring_buffer_start 5072 * 5073 * This re-enables the recording to the buffer, and frees the 5074 * iterator. 5075 */ 5076 void 5077 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5078 { 5079 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5080 unsigned long flags; 5081 5082 /* 5083 * Ring buffer is disabled from recording, here's a good place 5084 * to check the integrity of the ring buffer. 5085 * Must prevent readers from trying to read, as the check 5086 * clears the HEAD page and readers require it. 5087 */ 5088 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5089 rb_check_pages(cpu_buffer); 5090 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5091 5092 atomic_dec(&cpu_buffer->resize_disabled); 5093 kfree(iter->event); 5094 kfree(iter); 5095 } 5096 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 5097 5098 /** 5099 * ring_buffer_iter_advance - advance the iterator to the next location 5100 * @iter: The ring buffer iterator 5101 * 5102 * Move the location of the iterator such that the next read will 5103 * be the next location of the iterator. 5104 */ 5105 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 5106 { 5107 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5108 unsigned long flags; 5109 5110 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5111 5112 rb_advance_iter(iter); 5113 5114 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5115 } 5116 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 5117 5118 /** 5119 * ring_buffer_size - return the size of the ring buffer (in bytes) 5120 * @buffer: The ring buffer. 5121 * @cpu: The CPU to get ring buffer size from. 5122 */ 5123 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 5124 { 5125 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5126 return 0; 5127 5128 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 5129 } 5130 EXPORT_SYMBOL_GPL(ring_buffer_size); 5131 5132 /** 5133 * ring_buffer_max_event_size - return the max data size of an event 5134 * @buffer: The ring buffer. 5135 * 5136 * Returns the maximum size an event can be. 5137 */ 5138 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 5139 { 5140 /* If abs timestamp is requested, events have a timestamp too */ 5141 if (ring_buffer_time_stamp_abs(buffer)) 5142 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 5143 return buffer->max_data_size; 5144 } 5145 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 5146 5147 static void rb_clear_buffer_page(struct buffer_page *page) 5148 { 5149 local_set(&page->write, 0); 5150 local_set(&page->entries, 0); 5151 rb_init_page(page->page); 5152 page->read = 0; 5153 } 5154 5155 static void 5156 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 5157 { 5158 struct buffer_page *page; 5159 5160 rb_head_page_deactivate(cpu_buffer); 5161 5162 cpu_buffer->head_page 5163 = list_entry(cpu_buffer->pages, struct buffer_page, list); 5164 rb_clear_buffer_page(cpu_buffer->head_page); 5165 list_for_each_entry(page, cpu_buffer->pages, list) { 5166 rb_clear_buffer_page(page); 5167 } 5168 5169 cpu_buffer->tail_page = cpu_buffer->head_page; 5170 cpu_buffer->commit_page = cpu_buffer->head_page; 5171 5172 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 5173 INIT_LIST_HEAD(&cpu_buffer->new_pages); 5174 rb_clear_buffer_page(cpu_buffer->reader_page); 5175 5176 local_set(&cpu_buffer->entries_bytes, 0); 5177 local_set(&cpu_buffer->overrun, 0); 5178 local_set(&cpu_buffer->commit_overrun, 0); 5179 local_set(&cpu_buffer->dropped_events, 0); 5180 local_set(&cpu_buffer->entries, 0); 5181 local_set(&cpu_buffer->committing, 0); 5182 local_set(&cpu_buffer->commits, 0); 5183 local_set(&cpu_buffer->pages_touched, 0); 5184 local_set(&cpu_buffer->pages_lost, 0); 5185 local_set(&cpu_buffer->pages_read, 0); 5186 cpu_buffer->last_pages_touch = 0; 5187 cpu_buffer->shortest_full = 0; 5188 cpu_buffer->read = 0; 5189 cpu_buffer->read_bytes = 0; 5190 5191 rb_time_set(&cpu_buffer->write_stamp, 0); 5192 rb_time_set(&cpu_buffer->before_stamp, 0); 5193 5194 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 5195 5196 cpu_buffer->lost_events = 0; 5197 cpu_buffer->last_overrun = 0; 5198 5199 rb_head_page_activate(cpu_buffer); 5200 cpu_buffer->pages_removed = 0; 5201 } 5202 5203 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 5204 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 5205 { 5206 unsigned long flags; 5207 5208 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5209 5210 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 5211 goto out; 5212 5213 arch_spin_lock(&cpu_buffer->lock); 5214 5215 rb_reset_cpu(cpu_buffer); 5216 5217 arch_spin_unlock(&cpu_buffer->lock); 5218 5219 out: 5220 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5221 } 5222 5223 /** 5224 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 5225 * @buffer: The ring buffer to reset a per cpu buffer of 5226 * @cpu: The CPU buffer to be reset 5227 */ 5228 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 5229 { 5230 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5231 5232 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5233 return; 5234 5235 /* prevent another thread from changing buffer sizes */ 5236 mutex_lock(&buffer->mutex); 5237 5238 atomic_inc(&cpu_buffer->resize_disabled); 5239 atomic_inc(&cpu_buffer->record_disabled); 5240 5241 /* Make sure all commits have finished */ 5242 synchronize_rcu(); 5243 5244 reset_disabled_cpu_buffer(cpu_buffer); 5245 5246 atomic_dec(&cpu_buffer->record_disabled); 5247 atomic_dec(&cpu_buffer->resize_disabled); 5248 5249 mutex_unlock(&buffer->mutex); 5250 } 5251 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 5252 5253 /* Flag to ensure proper resetting of atomic variables */ 5254 #define RESET_BIT (1 << 30) 5255 5256 /** 5257 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 5258 * @buffer: The ring buffer to reset a per cpu buffer of 5259 */ 5260 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 5261 { 5262 struct ring_buffer_per_cpu *cpu_buffer; 5263 int cpu; 5264 5265 /* prevent another thread from changing buffer sizes */ 5266 mutex_lock(&buffer->mutex); 5267 5268 for_each_online_buffer_cpu(buffer, cpu) { 5269 cpu_buffer = buffer->buffers[cpu]; 5270 5271 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 5272 atomic_inc(&cpu_buffer->record_disabled); 5273 } 5274 5275 /* Make sure all commits have finished */ 5276 synchronize_rcu(); 5277 5278 for_each_buffer_cpu(buffer, cpu) { 5279 cpu_buffer = buffer->buffers[cpu]; 5280 5281 /* 5282 * If a CPU came online during the synchronize_rcu(), then 5283 * ignore it. 5284 */ 5285 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 5286 continue; 5287 5288 reset_disabled_cpu_buffer(cpu_buffer); 5289 5290 atomic_dec(&cpu_buffer->record_disabled); 5291 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 5292 } 5293 5294 mutex_unlock(&buffer->mutex); 5295 } 5296 5297 /** 5298 * ring_buffer_reset - reset a ring buffer 5299 * @buffer: The ring buffer to reset all cpu buffers 5300 */ 5301 void ring_buffer_reset(struct trace_buffer *buffer) 5302 { 5303 struct ring_buffer_per_cpu *cpu_buffer; 5304 int cpu; 5305 5306 /* prevent another thread from changing buffer sizes */ 5307 mutex_lock(&buffer->mutex); 5308 5309 for_each_buffer_cpu(buffer, cpu) { 5310 cpu_buffer = buffer->buffers[cpu]; 5311 5312 atomic_inc(&cpu_buffer->resize_disabled); 5313 atomic_inc(&cpu_buffer->record_disabled); 5314 } 5315 5316 /* Make sure all commits have finished */ 5317 synchronize_rcu(); 5318 5319 for_each_buffer_cpu(buffer, cpu) { 5320 cpu_buffer = buffer->buffers[cpu]; 5321 5322 reset_disabled_cpu_buffer(cpu_buffer); 5323 5324 atomic_dec(&cpu_buffer->record_disabled); 5325 atomic_dec(&cpu_buffer->resize_disabled); 5326 } 5327 5328 mutex_unlock(&buffer->mutex); 5329 } 5330 EXPORT_SYMBOL_GPL(ring_buffer_reset); 5331 5332 /** 5333 * ring_buffer_empty - is the ring buffer empty? 5334 * @buffer: The ring buffer to test 5335 */ 5336 bool ring_buffer_empty(struct trace_buffer *buffer) 5337 { 5338 struct ring_buffer_per_cpu *cpu_buffer; 5339 unsigned long flags; 5340 bool dolock; 5341 bool ret; 5342 int cpu; 5343 5344 /* yes this is racy, but if you don't like the race, lock the buffer */ 5345 for_each_buffer_cpu(buffer, cpu) { 5346 cpu_buffer = buffer->buffers[cpu]; 5347 local_irq_save(flags); 5348 dolock = rb_reader_lock(cpu_buffer); 5349 ret = rb_per_cpu_empty(cpu_buffer); 5350 rb_reader_unlock(cpu_buffer, dolock); 5351 local_irq_restore(flags); 5352 5353 if (!ret) 5354 return false; 5355 } 5356 5357 return true; 5358 } 5359 EXPORT_SYMBOL_GPL(ring_buffer_empty); 5360 5361 /** 5362 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 5363 * @buffer: The ring buffer 5364 * @cpu: The CPU buffer to test 5365 */ 5366 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 5367 { 5368 struct ring_buffer_per_cpu *cpu_buffer; 5369 unsigned long flags; 5370 bool dolock; 5371 bool ret; 5372 5373 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5374 return true; 5375 5376 cpu_buffer = buffer->buffers[cpu]; 5377 local_irq_save(flags); 5378 dolock = rb_reader_lock(cpu_buffer); 5379 ret = rb_per_cpu_empty(cpu_buffer); 5380 rb_reader_unlock(cpu_buffer, dolock); 5381 local_irq_restore(flags); 5382 5383 return ret; 5384 } 5385 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 5386 5387 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 5388 /** 5389 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 5390 * @buffer_a: One buffer to swap with 5391 * @buffer_b: The other buffer to swap with 5392 * @cpu: the CPU of the buffers to swap 5393 * 5394 * This function is useful for tracers that want to take a "snapshot" 5395 * of a CPU buffer and has another back up buffer lying around. 5396 * it is expected that the tracer handles the cpu buffer not being 5397 * used at the moment. 5398 */ 5399 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 5400 struct trace_buffer *buffer_b, int cpu) 5401 { 5402 struct ring_buffer_per_cpu *cpu_buffer_a; 5403 struct ring_buffer_per_cpu *cpu_buffer_b; 5404 int ret = -EINVAL; 5405 5406 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 5407 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 5408 goto out; 5409 5410 cpu_buffer_a = buffer_a->buffers[cpu]; 5411 cpu_buffer_b = buffer_b->buffers[cpu]; 5412 5413 /* At least make sure the two buffers are somewhat the same */ 5414 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 5415 goto out; 5416 5417 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 5418 goto out; 5419 5420 ret = -EAGAIN; 5421 5422 if (atomic_read(&buffer_a->record_disabled)) 5423 goto out; 5424 5425 if (atomic_read(&buffer_b->record_disabled)) 5426 goto out; 5427 5428 if (atomic_read(&cpu_buffer_a->record_disabled)) 5429 goto out; 5430 5431 if (atomic_read(&cpu_buffer_b->record_disabled)) 5432 goto out; 5433 5434 /* 5435 * We can't do a synchronize_rcu here because this 5436 * function can be called in atomic context. 5437 * Normally this will be called from the same CPU as cpu. 5438 * If not it's up to the caller to protect this. 5439 */ 5440 atomic_inc(&cpu_buffer_a->record_disabled); 5441 atomic_inc(&cpu_buffer_b->record_disabled); 5442 5443 ret = -EBUSY; 5444 if (local_read(&cpu_buffer_a->committing)) 5445 goto out_dec; 5446 if (local_read(&cpu_buffer_b->committing)) 5447 goto out_dec; 5448 5449 /* 5450 * When resize is in progress, we cannot swap it because 5451 * it will mess the state of the cpu buffer. 5452 */ 5453 if (atomic_read(&buffer_a->resizing)) 5454 goto out_dec; 5455 if (atomic_read(&buffer_b->resizing)) 5456 goto out_dec; 5457 5458 buffer_a->buffers[cpu] = cpu_buffer_b; 5459 buffer_b->buffers[cpu] = cpu_buffer_a; 5460 5461 cpu_buffer_b->buffer = buffer_a; 5462 cpu_buffer_a->buffer = buffer_b; 5463 5464 ret = 0; 5465 5466 out_dec: 5467 atomic_dec(&cpu_buffer_a->record_disabled); 5468 atomic_dec(&cpu_buffer_b->record_disabled); 5469 out: 5470 return ret; 5471 } 5472 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 5473 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 5474 5475 /** 5476 * ring_buffer_alloc_read_page - allocate a page to read from buffer 5477 * @buffer: the buffer to allocate for. 5478 * @cpu: the cpu buffer to allocate. 5479 * 5480 * This function is used in conjunction with ring_buffer_read_page. 5481 * When reading a full page from the ring buffer, these functions 5482 * can be used to speed up the process. The calling function should 5483 * allocate a few pages first with this function. Then when it 5484 * needs to get pages from the ring buffer, it passes the result 5485 * of this function into ring_buffer_read_page, which will swap 5486 * the page that was allocated, with the read page of the buffer. 5487 * 5488 * Returns: 5489 * The page allocated, or ERR_PTR 5490 */ 5491 struct buffer_data_read_page * 5492 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 5493 { 5494 struct ring_buffer_per_cpu *cpu_buffer; 5495 struct buffer_data_read_page *bpage = NULL; 5496 unsigned long flags; 5497 struct page *page; 5498 5499 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5500 return ERR_PTR(-ENODEV); 5501 5502 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 5503 if (!bpage) 5504 return ERR_PTR(-ENOMEM); 5505 5506 bpage->order = buffer->subbuf_order; 5507 cpu_buffer = buffer->buffers[cpu]; 5508 local_irq_save(flags); 5509 arch_spin_lock(&cpu_buffer->lock); 5510 5511 if (cpu_buffer->free_page) { 5512 bpage->data = cpu_buffer->free_page; 5513 cpu_buffer->free_page = NULL; 5514 } 5515 5516 arch_spin_unlock(&cpu_buffer->lock); 5517 local_irq_restore(flags); 5518 5519 if (bpage->data) 5520 goto out; 5521 5522 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_NORETRY, 5523 cpu_buffer->buffer->subbuf_order); 5524 if (!page) { 5525 kfree(bpage); 5526 return ERR_PTR(-ENOMEM); 5527 } 5528 5529 bpage->data = page_address(page); 5530 5531 out: 5532 rb_init_page(bpage->data); 5533 5534 return bpage; 5535 } 5536 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 5537 5538 /** 5539 * ring_buffer_free_read_page - free an allocated read page 5540 * @buffer: the buffer the page was allocate for 5541 * @cpu: the cpu buffer the page came from 5542 * @data_page: the page to free 5543 * 5544 * Free a page allocated from ring_buffer_alloc_read_page. 5545 */ 5546 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 5547 struct buffer_data_read_page *data_page) 5548 { 5549 struct ring_buffer_per_cpu *cpu_buffer; 5550 struct buffer_data_page *bpage = data_page->data; 5551 struct page *page = virt_to_page(bpage); 5552 unsigned long flags; 5553 5554 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 5555 return; 5556 5557 cpu_buffer = buffer->buffers[cpu]; 5558 5559 /* 5560 * If the page is still in use someplace else, or order of the page 5561 * is different from the subbuffer order of the buffer - 5562 * we can't reuse it 5563 */ 5564 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 5565 goto out; 5566 5567 local_irq_save(flags); 5568 arch_spin_lock(&cpu_buffer->lock); 5569 5570 if (!cpu_buffer->free_page) { 5571 cpu_buffer->free_page = bpage; 5572 bpage = NULL; 5573 } 5574 5575 arch_spin_unlock(&cpu_buffer->lock); 5576 local_irq_restore(flags); 5577 5578 out: 5579 free_pages((unsigned long)bpage, data_page->order); 5580 kfree(data_page); 5581 } 5582 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 5583 5584 /** 5585 * ring_buffer_read_page - extract a page from the ring buffer 5586 * @buffer: buffer to extract from 5587 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 5588 * @len: amount to extract 5589 * @cpu: the cpu of the buffer to extract 5590 * @full: should the extraction only happen when the page is full. 5591 * 5592 * This function will pull out a page from the ring buffer and consume it. 5593 * @data_page must be the address of the variable that was returned 5594 * from ring_buffer_alloc_read_page. This is because the page might be used 5595 * to swap with a page in the ring buffer. 5596 * 5597 * for example: 5598 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 5599 * if (IS_ERR(rpage)) 5600 * return PTR_ERR(rpage); 5601 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 5602 * if (ret >= 0) 5603 * process_page(ring_buffer_read_page_data(rpage), ret); 5604 * ring_buffer_free_read_page(buffer, cpu, rpage); 5605 * 5606 * When @full is set, the function will not return true unless 5607 * the writer is off the reader page. 5608 * 5609 * Note: it is up to the calling functions to handle sleeps and wakeups. 5610 * The ring buffer can be used anywhere in the kernel and can not 5611 * blindly call wake_up. The layer that uses the ring buffer must be 5612 * responsible for that. 5613 * 5614 * Returns: 5615 * >=0 if data has been transferred, returns the offset of consumed data. 5616 * <0 if no data has been transferred. 5617 */ 5618 int ring_buffer_read_page(struct trace_buffer *buffer, 5619 struct buffer_data_read_page *data_page, 5620 size_t len, int cpu, int full) 5621 { 5622 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5623 struct ring_buffer_event *event; 5624 struct buffer_data_page *bpage; 5625 struct buffer_page *reader; 5626 unsigned long missed_events; 5627 unsigned long flags; 5628 unsigned int commit; 5629 unsigned int read; 5630 u64 save_timestamp; 5631 int ret = -1; 5632 5633 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5634 goto out; 5635 5636 /* 5637 * If len is not big enough to hold the page header, then 5638 * we can not copy anything. 5639 */ 5640 if (len <= BUF_PAGE_HDR_SIZE) 5641 goto out; 5642 5643 len -= BUF_PAGE_HDR_SIZE; 5644 5645 if (!data_page || !data_page->data) 5646 goto out; 5647 if (data_page->order != buffer->subbuf_order) 5648 goto out; 5649 5650 bpage = data_page->data; 5651 if (!bpage) 5652 goto out; 5653 5654 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5655 5656 reader = rb_get_reader_page(cpu_buffer); 5657 if (!reader) 5658 goto out_unlock; 5659 5660 event = rb_reader_event(cpu_buffer); 5661 5662 read = reader->read; 5663 commit = rb_page_commit(reader); 5664 5665 /* Check if any events were dropped */ 5666 missed_events = cpu_buffer->lost_events; 5667 5668 /* 5669 * If this page has been partially read or 5670 * if len is not big enough to read the rest of the page or 5671 * a writer is still on the page, then 5672 * we must copy the data from the page to the buffer. 5673 * Otherwise, we can simply swap the page with the one passed in. 5674 */ 5675 if (read || (len < (commit - read)) || 5676 cpu_buffer->reader_page == cpu_buffer->commit_page) { 5677 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 5678 unsigned int rpos = read; 5679 unsigned int pos = 0; 5680 unsigned int size; 5681 5682 /* 5683 * If a full page is expected, this can still be returned 5684 * if there's been a previous partial read and the 5685 * rest of the page can be read and the commit page is off 5686 * the reader page. 5687 */ 5688 if (full && 5689 (!read || (len < (commit - read)) || 5690 cpu_buffer->reader_page == cpu_buffer->commit_page)) 5691 goto out_unlock; 5692 5693 if (len > (commit - read)) 5694 len = (commit - read); 5695 5696 /* Always keep the time extend and data together */ 5697 size = rb_event_ts_length(event); 5698 5699 if (len < size) 5700 goto out_unlock; 5701 5702 /* save the current timestamp, since the user will need it */ 5703 save_timestamp = cpu_buffer->read_stamp; 5704 5705 /* Need to copy one event at a time */ 5706 do { 5707 /* We need the size of one event, because 5708 * rb_advance_reader only advances by one event, 5709 * whereas rb_event_ts_length may include the size of 5710 * one or two events. 5711 * We have already ensured there's enough space if this 5712 * is a time extend. */ 5713 size = rb_event_length(event); 5714 memcpy(bpage->data + pos, rpage->data + rpos, size); 5715 5716 len -= size; 5717 5718 rb_advance_reader(cpu_buffer); 5719 rpos = reader->read; 5720 pos += size; 5721 5722 if (rpos >= commit) 5723 break; 5724 5725 event = rb_reader_event(cpu_buffer); 5726 /* Always keep the time extend and data together */ 5727 size = rb_event_ts_length(event); 5728 } while (len >= size); 5729 5730 /* update bpage */ 5731 local_set(&bpage->commit, pos); 5732 bpage->time_stamp = save_timestamp; 5733 5734 /* we copied everything to the beginning */ 5735 read = 0; 5736 } else { 5737 /* update the entry counter */ 5738 cpu_buffer->read += rb_page_entries(reader); 5739 cpu_buffer->read_bytes += rb_page_commit(reader); 5740 5741 /* swap the pages */ 5742 rb_init_page(bpage); 5743 bpage = reader->page; 5744 reader->page = data_page->data; 5745 local_set(&reader->write, 0); 5746 local_set(&reader->entries, 0); 5747 reader->read = 0; 5748 data_page->data = bpage; 5749 5750 /* 5751 * Use the real_end for the data size, 5752 * This gives us a chance to store the lost events 5753 * on the page. 5754 */ 5755 if (reader->real_end) 5756 local_set(&bpage->commit, reader->real_end); 5757 } 5758 ret = read; 5759 5760 cpu_buffer->lost_events = 0; 5761 5762 commit = local_read(&bpage->commit); 5763 /* 5764 * Set a flag in the commit field if we lost events 5765 */ 5766 if (missed_events) { 5767 /* If there is room at the end of the page to save the 5768 * missed events, then record it there. 5769 */ 5770 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 5771 memcpy(&bpage->data[commit], &missed_events, 5772 sizeof(missed_events)); 5773 local_add(RB_MISSED_STORED, &bpage->commit); 5774 commit += sizeof(missed_events); 5775 } 5776 local_add(RB_MISSED_EVENTS, &bpage->commit); 5777 } 5778 5779 /* 5780 * This page may be off to user land. Zero it out here. 5781 */ 5782 if (commit < buffer->subbuf_size) 5783 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 5784 5785 out_unlock: 5786 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5787 5788 out: 5789 return ret; 5790 } 5791 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 5792 5793 /** 5794 * ring_buffer_read_page_data - get pointer to the data in the page. 5795 * @page: the page to get the data from 5796 * 5797 * Returns pointer to the actual data in this page. 5798 */ 5799 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 5800 { 5801 return page->data; 5802 } 5803 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 5804 5805 /** 5806 * ring_buffer_subbuf_size_get - get size of the sub buffer. 5807 * @buffer: the buffer to get the sub buffer size from 5808 * 5809 * Returns size of the sub buffer, in bytes. 5810 */ 5811 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 5812 { 5813 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 5814 } 5815 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 5816 5817 /** 5818 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 5819 * @buffer: The ring_buffer to get the system sub page order from 5820 * 5821 * By default, one ring buffer sub page equals to one system page. This parameter 5822 * is configurable, per ring buffer. The size of the ring buffer sub page can be 5823 * extended, but must be an order of system page size. 5824 * 5825 * Returns the order of buffer sub page size, in system pages: 5826 * 0 means the sub buffer size is 1 system page and so forth. 5827 * In case of an error < 0 is returned. 5828 */ 5829 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 5830 { 5831 if (!buffer) 5832 return -EINVAL; 5833 5834 return buffer->subbuf_order; 5835 } 5836 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 5837 5838 /** 5839 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 5840 * @buffer: The ring_buffer to set the new page size. 5841 * @order: Order of the system pages in one sub buffer page 5842 * 5843 * By default, one ring buffer pages equals to one system page. This API can be 5844 * used to set new size of the ring buffer page. The size must be order of 5845 * system page size, that's why the input parameter @order is the order of 5846 * system pages that are allocated for one ring buffer page: 5847 * 0 - 1 system page 5848 * 1 - 2 system pages 5849 * 3 - 4 system pages 5850 * ... 5851 * 5852 * Returns 0 on success or < 0 in case of an error. 5853 */ 5854 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 5855 { 5856 struct ring_buffer_per_cpu *cpu_buffer; 5857 struct buffer_page *bpage, *tmp; 5858 int old_order, old_size; 5859 int nr_pages; 5860 int psize; 5861 int err; 5862 int cpu; 5863 5864 if (!buffer || order < 0) 5865 return -EINVAL; 5866 5867 if (buffer->subbuf_order == order) 5868 return 0; 5869 5870 psize = (1 << order) * PAGE_SIZE; 5871 if (psize <= BUF_PAGE_HDR_SIZE) 5872 return -EINVAL; 5873 5874 old_order = buffer->subbuf_order; 5875 old_size = buffer->subbuf_size; 5876 5877 /* prevent another thread from changing buffer sizes */ 5878 mutex_lock(&buffer->mutex); 5879 atomic_inc(&buffer->record_disabled); 5880 5881 /* Make sure all commits have finished */ 5882 synchronize_rcu(); 5883 5884 buffer->subbuf_order = order; 5885 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 5886 5887 /* Make sure all new buffers are allocated, before deleting the old ones */ 5888 for_each_buffer_cpu(buffer, cpu) { 5889 5890 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5891 continue; 5892 5893 cpu_buffer = buffer->buffers[cpu]; 5894 5895 /* Update the number of pages to match the new size */ 5896 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 5897 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 5898 5899 /* we need a minimum of two pages */ 5900 if (nr_pages < 2) 5901 nr_pages = 2; 5902 5903 cpu_buffer->nr_pages_to_update = nr_pages; 5904 5905 /* Include the reader page */ 5906 nr_pages++; 5907 5908 /* Allocate the new size buffer */ 5909 INIT_LIST_HEAD(&cpu_buffer->new_pages); 5910 if (__rb_allocate_pages(cpu_buffer, nr_pages, 5911 &cpu_buffer->new_pages)) { 5912 /* not enough memory for new pages */ 5913 err = -ENOMEM; 5914 goto error; 5915 } 5916 } 5917 5918 for_each_buffer_cpu(buffer, cpu) { 5919 5920 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5921 continue; 5922 5923 cpu_buffer = buffer->buffers[cpu]; 5924 5925 /* Clear the head bit to make the link list normal to read */ 5926 rb_head_page_deactivate(cpu_buffer); 5927 5928 /* Now walk the list and free all the old sub buffers */ 5929 list_for_each_entry_safe(bpage, tmp, cpu_buffer->pages, list) { 5930 list_del_init(&bpage->list); 5931 free_buffer_page(bpage); 5932 } 5933 /* The above loop stopped an the last page needing to be freed */ 5934 bpage = list_entry(cpu_buffer->pages, struct buffer_page, list); 5935 free_buffer_page(bpage); 5936 5937 /* Free the current reader page */ 5938 free_buffer_page(cpu_buffer->reader_page); 5939 5940 /* One page was allocated for the reader page */ 5941 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 5942 struct buffer_page, list); 5943 list_del_init(&cpu_buffer->reader_page->list); 5944 5945 /* The cpu_buffer pages are a link list with no head */ 5946 cpu_buffer->pages = cpu_buffer->new_pages.next; 5947 cpu_buffer->new_pages.next->prev = cpu_buffer->new_pages.prev; 5948 cpu_buffer->new_pages.prev->next = cpu_buffer->new_pages.next; 5949 5950 /* Clear the new_pages list */ 5951 INIT_LIST_HEAD(&cpu_buffer->new_pages); 5952 5953 cpu_buffer->head_page 5954 = list_entry(cpu_buffer->pages, struct buffer_page, list); 5955 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 5956 5957 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 5958 cpu_buffer->nr_pages_to_update = 0; 5959 5960 free_pages((unsigned long)cpu_buffer->free_page, old_order); 5961 cpu_buffer->free_page = NULL; 5962 5963 rb_head_page_activate(cpu_buffer); 5964 5965 rb_check_pages(cpu_buffer); 5966 } 5967 5968 atomic_dec(&buffer->record_disabled); 5969 mutex_unlock(&buffer->mutex); 5970 5971 return 0; 5972 5973 error: 5974 buffer->subbuf_order = old_order; 5975 buffer->subbuf_size = old_size; 5976 5977 atomic_dec(&buffer->record_disabled); 5978 mutex_unlock(&buffer->mutex); 5979 5980 for_each_buffer_cpu(buffer, cpu) { 5981 cpu_buffer = buffer->buffers[cpu]; 5982 5983 if (!cpu_buffer->nr_pages_to_update) 5984 continue; 5985 5986 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 5987 list_del_init(&bpage->list); 5988 free_buffer_page(bpage); 5989 } 5990 } 5991 5992 return err; 5993 } 5994 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 5995 5996 /* 5997 * We only allocate new buffers, never free them if the CPU goes down. 5998 * If we were to free the buffer, then the user would lose any trace that was in 5999 * the buffer. 6000 */ 6001 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 6002 { 6003 struct trace_buffer *buffer; 6004 long nr_pages_same; 6005 int cpu_i; 6006 unsigned long nr_pages; 6007 6008 buffer = container_of(node, struct trace_buffer, node); 6009 if (cpumask_test_cpu(cpu, buffer->cpumask)) 6010 return 0; 6011 6012 nr_pages = 0; 6013 nr_pages_same = 1; 6014 /* check if all cpu sizes are same */ 6015 for_each_buffer_cpu(buffer, cpu_i) { 6016 /* fill in the size from first enabled cpu */ 6017 if (nr_pages == 0) 6018 nr_pages = buffer->buffers[cpu_i]->nr_pages; 6019 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 6020 nr_pages_same = 0; 6021 break; 6022 } 6023 } 6024 /* allocate minimum pages, user can later expand it */ 6025 if (!nr_pages_same) 6026 nr_pages = 2; 6027 buffer->buffers[cpu] = 6028 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 6029 if (!buffer->buffers[cpu]) { 6030 WARN(1, "failed to allocate ring buffer on CPU %u\n", 6031 cpu); 6032 return -ENOMEM; 6033 } 6034 smp_wmb(); 6035 cpumask_set_cpu(cpu, buffer->cpumask); 6036 return 0; 6037 } 6038 6039 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 6040 /* 6041 * This is a basic integrity check of the ring buffer. 6042 * Late in the boot cycle this test will run when configured in. 6043 * It will kick off a thread per CPU that will go into a loop 6044 * writing to the per cpu ring buffer various sizes of data. 6045 * Some of the data will be large items, some small. 6046 * 6047 * Another thread is created that goes into a spin, sending out 6048 * IPIs to the other CPUs to also write into the ring buffer. 6049 * this is to test the nesting ability of the buffer. 6050 * 6051 * Basic stats are recorded and reported. If something in the 6052 * ring buffer should happen that's not expected, a big warning 6053 * is displayed and all ring buffers are disabled. 6054 */ 6055 static struct task_struct *rb_threads[NR_CPUS] __initdata; 6056 6057 struct rb_test_data { 6058 struct trace_buffer *buffer; 6059 unsigned long events; 6060 unsigned long bytes_written; 6061 unsigned long bytes_alloc; 6062 unsigned long bytes_dropped; 6063 unsigned long events_nested; 6064 unsigned long bytes_written_nested; 6065 unsigned long bytes_alloc_nested; 6066 unsigned long bytes_dropped_nested; 6067 int min_size_nested; 6068 int max_size_nested; 6069 int max_size; 6070 int min_size; 6071 int cpu; 6072 int cnt; 6073 }; 6074 6075 static struct rb_test_data rb_data[NR_CPUS] __initdata; 6076 6077 /* 1 meg per cpu */ 6078 #define RB_TEST_BUFFER_SIZE 1048576 6079 6080 static char rb_string[] __initdata = 6081 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 6082 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 6083 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 6084 6085 static bool rb_test_started __initdata; 6086 6087 struct rb_item { 6088 int size; 6089 char str[]; 6090 }; 6091 6092 static __init int rb_write_something(struct rb_test_data *data, bool nested) 6093 { 6094 struct ring_buffer_event *event; 6095 struct rb_item *item; 6096 bool started; 6097 int event_len; 6098 int size; 6099 int len; 6100 int cnt; 6101 6102 /* Have nested writes different that what is written */ 6103 cnt = data->cnt + (nested ? 27 : 0); 6104 6105 /* Multiply cnt by ~e, to make some unique increment */ 6106 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 6107 6108 len = size + sizeof(struct rb_item); 6109 6110 started = rb_test_started; 6111 /* read rb_test_started before checking buffer enabled */ 6112 smp_rmb(); 6113 6114 event = ring_buffer_lock_reserve(data->buffer, len); 6115 if (!event) { 6116 /* Ignore dropped events before test starts. */ 6117 if (started) { 6118 if (nested) 6119 data->bytes_dropped += len; 6120 else 6121 data->bytes_dropped_nested += len; 6122 } 6123 return len; 6124 } 6125 6126 event_len = ring_buffer_event_length(event); 6127 6128 if (RB_WARN_ON(data->buffer, event_len < len)) 6129 goto out; 6130 6131 item = ring_buffer_event_data(event); 6132 item->size = size; 6133 memcpy(item->str, rb_string, size); 6134 6135 if (nested) { 6136 data->bytes_alloc_nested += event_len; 6137 data->bytes_written_nested += len; 6138 data->events_nested++; 6139 if (!data->min_size_nested || len < data->min_size_nested) 6140 data->min_size_nested = len; 6141 if (len > data->max_size_nested) 6142 data->max_size_nested = len; 6143 } else { 6144 data->bytes_alloc += event_len; 6145 data->bytes_written += len; 6146 data->events++; 6147 if (!data->min_size || len < data->min_size) 6148 data->max_size = len; 6149 if (len > data->max_size) 6150 data->max_size = len; 6151 } 6152 6153 out: 6154 ring_buffer_unlock_commit(data->buffer); 6155 6156 return 0; 6157 } 6158 6159 static __init int rb_test(void *arg) 6160 { 6161 struct rb_test_data *data = arg; 6162 6163 while (!kthread_should_stop()) { 6164 rb_write_something(data, false); 6165 data->cnt++; 6166 6167 set_current_state(TASK_INTERRUPTIBLE); 6168 /* Now sleep between a min of 100-300us and a max of 1ms */ 6169 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 6170 } 6171 6172 return 0; 6173 } 6174 6175 static __init void rb_ipi(void *ignore) 6176 { 6177 struct rb_test_data *data; 6178 int cpu = smp_processor_id(); 6179 6180 data = &rb_data[cpu]; 6181 rb_write_something(data, true); 6182 } 6183 6184 static __init int rb_hammer_test(void *arg) 6185 { 6186 while (!kthread_should_stop()) { 6187 6188 /* Send an IPI to all cpus to write data! */ 6189 smp_call_function(rb_ipi, NULL, 1); 6190 /* No sleep, but for non preempt, let others run */ 6191 schedule(); 6192 } 6193 6194 return 0; 6195 } 6196 6197 static __init int test_ringbuffer(void) 6198 { 6199 struct task_struct *rb_hammer; 6200 struct trace_buffer *buffer; 6201 int cpu; 6202 int ret = 0; 6203 6204 if (security_locked_down(LOCKDOWN_TRACEFS)) { 6205 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 6206 return 0; 6207 } 6208 6209 pr_info("Running ring buffer tests...\n"); 6210 6211 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 6212 if (WARN_ON(!buffer)) 6213 return 0; 6214 6215 /* Disable buffer so that threads can't write to it yet */ 6216 ring_buffer_record_off(buffer); 6217 6218 for_each_online_cpu(cpu) { 6219 rb_data[cpu].buffer = buffer; 6220 rb_data[cpu].cpu = cpu; 6221 rb_data[cpu].cnt = cpu; 6222 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 6223 cpu, "rbtester/%u"); 6224 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 6225 pr_cont("FAILED\n"); 6226 ret = PTR_ERR(rb_threads[cpu]); 6227 goto out_free; 6228 } 6229 } 6230 6231 /* Now create the rb hammer! */ 6232 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 6233 if (WARN_ON(IS_ERR(rb_hammer))) { 6234 pr_cont("FAILED\n"); 6235 ret = PTR_ERR(rb_hammer); 6236 goto out_free; 6237 } 6238 6239 ring_buffer_record_on(buffer); 6240 /* 6241 * Show buffer is enabled before setting rb_test_started. 6242 * Yes there's a small race window where events could be 6243 * dropped and the thread wont catch it. But when a ring 6244 * buffer gets enabled, there will always be some kind of 6245 * delay before other CPUs see it. Thus, we don't care about 6246 * those dropped events. We care about events dropped after 6247 * the threads see that the buffer is active. 6248 */ 6249 smp_wmb(); 6250 rb_test_started = true; 6251 6252 set_current_state(TASK_INTERRUPTIBLE); 6253 /* Just run for 10 seconds */; 6254 schedule_timeout(10 * HZ); 6255 6256 kthread_stop(rb_hammer); 6257 6258 out_free: 6259 for_each_online_cpu(cpu) { 6260 if (!rb_threads[cpu]) 6261 break; 6262 kthread_stop(rb_threads[cpu]); 6263 } 6264 if (ret) { 6265 ring_buffer_free(buffer); 6266 return ret; 6267 } 6268 6269 /* Report! */ 6270 pr_info("finished\n"); 6271 for_each_online_cpu(cpu) { 6272 struct ring_buffer_event *event; 6273 struct rb_test_data *data = &rb_data[cpu]; 6274 struct rb_item *item; 6275 unsigned long total_events; 6276 unsigned long total_dropped; 6277 unsigned long total_written; 6278 unsigned long total_alloc; 6279 unsigned long total_read = 0; 6280 unsigned long total_size = 0; 6281 unsigned long total_len = 0; 6282 unsigned long total_lost = 0; 6283 unsigned long lost; 6284 int big_event_size; 6285 int small_event_size; 6286 6287 ret = -1; 6288 6289 total_events = data->events + data->events_nested; 6290 total_written = data->bytes_written + data->bytes_written_nested; 6291 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 6292 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 6293 6294 big_event_size = data->max_size + data->max_size_nested; 6295 small_event_size = data->min_size + data->min_size_nested; 6296 6297 pr_info("CPU %d:\n", cpu); 6298 pr_info(" events: %ld\n", total_events); 6299 pr_info(" dropped bytes: %ld\n", total_dropped); 6300 pr_info(" alloced bytes: %ld\n", total_alloc); 6301 pr_info(" written bytes: %ld\n", total_written); 6302 pr_info(" biggest event: %d\n", big_event_size); 6303 pr_info(" smallest event: %d\n", small_event_size); 6304 6305 if (RB_WARN_ON(buffer, total_dropped)) 6306 break; 6307 6308 ret = 0; 6309 6310 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 6311 total_lost += lost; 6312 item = ring_buffer_event_data(event); 6313 total_len += ring_buffer_event_length(event); 6314 total_size += item->size + sizeof(struct rb_item); 6315 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 6316 pr_info("FAILED!\n"); 6317 pr_info("buffer had: %.*s\n", item->size, item->str); 6318 pr_info("expected: %.*s\n", item->size, rb_string); 6319 RB_WARN_ON(buffer, 1); 6320 ret = -1; 6321 break; 6322 } 6323 total_read++; 6324 } 6325 if (ret) 6326 break; 6327 6328 ret = -1; 6329 6330 pr_info(" read events: %ld\n", total_read); 6331 pr_info(" lost events: %ld\n", total_lost); 6332 pr_info(" total events: %ld\n", total_lost + total_read); 6333 pr_info(" recorded len bytes: %ld\n", total_len); 6334 pr_info(" recorded size bytes: %ld\n", total_size); 6335 if (total_lost) { 6336 pr_info(" With dropped events, record len and size may not match\n" 6337 " alloced and written from above\n"); 6338 } else { 6339 if (RB_WARN_ON(buffer, total_len != total_alloc || 6340 total_size != total_written)) 6341 break; 6342 } 6343 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 6344 break; 6345 6346 ret = 0; 6347 } 6348 if (!ret) 6349 pr_info("Ring buffer PASSED!\n"); 6350 6351 ring_buffer_free(buffer); 6352 return 0; 6353 } 6354 6355 late_initcall(test_ringbuffer); 6356 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 6357