1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <[email protected]> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/cacheflush.h> 13 #include <linux/trace_seq.h> 14 #include <linux/spinlock.h> 15 #include <linux/irq_work.h> 16 #include <linux/security.h> 17 #include <linux/uaccess.h> 18 #include <linux/hardirq.h> 19 #include <linux/kthread.h> /* for self test */ 20 #include <linux/module.h> 21 #include <linux/percpu.h> 22 #include <linux/mutex.h> 23 #include <linux/delay.h> 24 #include <linux/slab.h> 25 #include <linux/init.h> 26 #include <linux/hash.h> 27 #include <linux/list.h> 28 #include <linux/cpu.h> 29 #include <linux/oom.h> 30 #include <linux/mm.h> 31 32 #include <asm/local64.h> 33 #include <asm/local.h> 34 35 #include "trace.h" 36 37 /* 38 * The "absolute" timestamp in the buffer is only 59 bits. 39 * If a clock has the 5 MSBs set, it needs to be saved and 40 * reinserted. 41 */ 42 #define TS_MSB (0xf8ULL << 56) 43 #define ABS_TS_MASK (~TS_MSB) 44 45 static void update_pages_handler(struct work_struct *work); 46 47 struct ring_buffer_meta { 48 unsigned long text_addr; 49 unsigned long data_addr; 50 unsigned long first_buffer; 51 unsigned long head_buffer; 52 unsigned long commit_buffer; 53 __u32 subbuf_size; 54 __u32 nr_subbufs; 55 int buffers[]; 56 }; 57 58 /* 59 * The ring buffer header is special. We must manually up keep it. 60 */ 61 int ring_buffer_print_entry_header(struct trace_seq *s) 62 { 63 trace_seq_puts(s, "# compressed entry header\n"); 64 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 65 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 66 trace_seq_puts(s, "\tarray : 32 bits\n"); 67 trace_seq_putc(s, '\n'); 68 trace_seq_printf(s, "\tpadding : type == %d\n", 69 RINGBUF_TYPE_PADDING); 70 trace_seq_printf(s, "\ttime_extend : type == %d\n", 71 RINGBUF_TYPE_TIME_EXTEND); 72 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 73 RINGBUF_TYPE_TIME_STAMP); 74 trace_seq_printf(s, "\tdata max type_len == %d\n", 75 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 76 77 return !trace_seq_has_overflowed(s); 78 } 79 80 /* 81 * The ring buffer is made up of a list of pages. A separate list of pages is 82 * allocated for each CPU. A writer may only write to a buffer that is 83 * associated with the CPU it is currently executing on. A reader may read 84 * from any per cpu buffer. 85 * 86 * The reader is special. For each per cpu buffer, the reader has its own 87 * reader page. When a reader has read the entire reader page, this reader 88 * page is swapped with another page in the ring buffer. 89 * 90 * Now, as long as the writer is off the reader page, the reader can do what 91 * ever it wants with that page. The writer will never write to that page 92 * again (as long as it is out of the ring buffer). 93 * 94 * Here's some silly ASCII art. 95 * 96 * +------+ 97 * |reader| RING BUFFER 98 * |page | 99 * +------+ +---+ +---+ +---+ 100 * | |-->| |-->| | 101 * +---+ +---+ +---+ 102 * ^ | 103 * | | 104 * +---------------+ 105 * 106 * 107 * +------+ 108 * |reader| RING BUFFER 109 * |page |------------------v 110 * +------+ +---+ +---+ +---+ 111 * | |-->| |-->| | 112 * +---+ +---+ +---+ 113 * ^ | 114 * | | 115 * +---------------+ 116 * 117 * 118 * +------+ 119 * |reader| RING BUFFER 120 * |page |------------------v 121 * +------+ +---+ +---+ +---+ 122 * ^ | |-->| |-->| | 123 * | +---+ +---+ +---+ 124 * | | 125 * | | 126 * +------------------------------+ 127 * 128 * 129 * +------+ 130 * |buffer| RING BUFFER 131 * |page |------------------v 132 * +------+ +---+ +---+ +---+ 133 * ^ | | | |-->| | 134 * | New +---+ +---+ +---+ 135 * | Reader------^ | 136 * | page | 137 * +------------------------------+ 138 * 139 * 140 * After we make this swap, the reader can hand this page off to the splice 141 * code and be done with it. It can even allocate a new page if it needs to 142 * and swap that into the ring buffer. 143 * 144 * We will be using cmpxchg soon to make all this lockless. 145 * 146 */ 147 148 /* Used for individual buffers (after the counter) */ 149 #define RB_BUFFER_OFF (1 << 20) 150 151 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 152 153 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 154 #define RB_ALIGNMENT 4U 155 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 156 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 157 158 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 159 # define RB_FORCE_8BYTE_ALIGNMENT 0 160 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 161 #else 162 # define RB_FORCE_8BYTE_ALIGNMENT 1 163 # define RB_ARCH_ALIGNMENT 8U 164 #endif 165 166 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 167 168 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 169 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 170 171 enum { 172 RB_LEN_TIME_EXTEND = 8, 173 RB_LEN_TIME_STAMP = 8, 174 }; 175 176 #define skip_time_extend(event) \ 177 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 178 179 #define extended_time(event) \ 180 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 181 182 static inline bool rb_null_event(struct ring_buffer_event *event) 183 { 184 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 185 } 186 187 static void rb_event_set_padding(struct ring_buffer_event *event) 188 { 189 /* padding has a NULL time_delta */ 190 event->type_len = RINGBUF_TYPE_PADDING; 191 event->time_delta = 0; 192 } 193 194 static unsigned 195 rb_event_data_length(struct ring_buffer_event *event) 196 { 197 unsigned length; 198 199 if (event->type_len) 200 length = event->type_len * RB_ALIGNMENT; 201 else 202 length = event->array[0]; 203 return length + RB_EVNT_HDR_SIZE; 204 } 205 206 /* 207 * Return the length of the given event. Will return 208 * the length of the time extend if the event is a 209 * time extend. 210 */ 211 static inline unsigned 212 rb_event_length(struct ring_buffer_event *event) 213 { 214 switch (event->type_len) { 215 case RINGBUF_TYPE_PADDING: 216 if (rb_null_event(event)) 217 /* undefined */ 218 return -1; 219 return event->array[0] + RB_EVNT_HDR_SIZE; 220 221 case RINGBUF_TYPE_TIME_EXTEND: 222 return RB_LEN_TIME_EXTEND; 223 224 case RINGBUF_TYPE_TIME_STAMP: 225 return RB_LEN_TIME_STAMP; 226 227 case RINGBUF_TYPE_DATA: 228 return rb_event_data_length(event); 229 default: 230 WARN_ON_ONCE(1); 231 } 232 /* not hit */ 233 return 0; 234 } 235 236 /* 237 * Return total length of time extend and data, 238 * or just the event length for all other events. 239 */ 240 static inline unsigned 241 rb_event_ts_length(struct ring_buffer_event *event) 242 { 243 unsigned len = 0; 244 245 if (extended_time(event)) { 246 /* time extends include the data event after it */ 247 len = RB_LEN_TIME_EXTEND; 248 event = skip_time_extend(event); 249 } 250 return len + rb_event_length(event); 251 } 252 253 /** 254 * ring_buffer_event_length - return the length of the event 255 * @event: the event to get the length of 256 * 257 * Returns the size of the data load of a data event. 258 * If the event is something other than a data event, it 259 * returns the size of the event itself. With the exception 260 * of a TIME EXTEND, where it still returns the size of the 261 * data load of the data event after it. 262 */ 263 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 264 { 265 unsigned length; 266 267 if (extended_time(event)) 268 event = skip_time_extend(event); 269 270 length = rb_event_length(event); 271 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 272 return length; 273 length -= RB_EVNT_HDR_SIZE; 274 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 275 length -= sizeof(event->array[0]); 276 return length; 277 } 278 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 279 280 /* inline for ring buffer fast paths */ 281 static __always_inline void * 282 rb_event_data(struct ring_buffer_event *event) 283 { 284 if (extended_time(event)) 285 event = skip_time_extend(event); 286 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 287 /* If length is in len field, then array[0] has the data */ 288 if (event->type_len) 289 return (void *)&event->array[0]; 290 /* Otherwise length is in array[0] and array[1] has the data */ 291 return (void *)&event->array[1]; 292 } 293 294 /** 295 * ring_buffer_event_data - return the data of the event 296 * @event: the event to get the data from 297 */ 298 void *ring_buffer_event_data(struct ring_buffer_event *event) 299 { 300 return rb_event_data(event); 301 } 302 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 303 304 #define for_each_buffer_cpu(buffer, cpu) \ 305 for_each_cpu(cpu, buffer->cpumask) 306 307 #define for_each_online_buffer_cpu(buffer, cpu) \ 308 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 309 310 #define TS_SHIFT 27 311 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 312 #define TS_DELTA_TEST (~TS_MASK) 313 314 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 315 { 316 u64 ts; 317 318 ts = event->array[0]; 319 ts <<= TS_SHIFT; 320 ts += event->time_delta; 321 322 return ts; 323 } 324 325 /* Flag when events were overwritten */ 326 #define RB_MISSED_EVENTS (1 << 31) 327 /* Missed count stored at end */ 328 #define RB_MISSED_STORED (1 << 30) 329 330 #define RB_MISSED_MASK (3 << 30) 331 332 struct buffer_data_page { 333 u64 time_stamp; /* page time stamp */ 334 local_t commit; /* write committed index */ 335 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 336 }; 337 338 struct buffer_data_read_page { 339 unsigned order; /* order of the page */ 340 struct buffer_data_page *data; /* actual data, stored in this page */ 341 }; 342 343 /* 344 * Note, the buffer_page list must be first. The buffer pages 345 * are allocated in cache lines, which means that each buffer 346 * page will be at the beginning of a cache line, and thus 347 * the least significant bits will be zero. We use this to 348 * add flags in the list struct pointers, to make the ring buffer 349 * lockless. 350 */ 351 struct buffer_page { 352 struct list_head list; /* list of buffer pages */ 353 local_t write; /* index for next write */ 354 unsigned read; /* index for next read */ 355 local_t entries; /* entries on this page */ 356 unsigned long real_end; /* real end of data */ 357 unsigned order; /* order of the page */ 358 u32 id:30; /* ID for external mapping */ 359 u32 range:1; /* Mapped via a range */ 360 struct buffer_data_page *page; /* Actual data page */ 361 }; 362 363 /* 364 * The buffer page counters, write and entries, must be reset 365 * atomically when crossing page boundaries. To synchronize this 366 * update, two counters are inserted into the number. One is 367 * the actual counter for the write position or count on the page. 368 * 369 * The other is a counter of updaters. Before an update happens 370 * the update partition of the counter is incremented. This will 371 * allow the updater to update the counter atomically. 372 * 373 * The counter is 20 bits, and the state data is 12. 374 */ 375 #define RB_WRITE_MASK 0xfffff 376 #define RB_WRITE_INTCNT (1 << 20) 377 378 static void rb_init_page(struct buffer_data_page *bpage) 379 { 380 local_set(&bpage->commit, 0); 381 } 382 383 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 384 { 385 return local_read(&bpage->page->commit); 386 } 387 388 static void free_buffer_page(struct buffer_page *bpage) 389 { 390 /* Range pages are not to be freed */ 391 if (!bpage->range) 392 free_pages((unsigned long)bpage->page, bpage->order); 393 kfree(bpage); 394 } 395 396 /* 397 * We need to fit the time_stamp delta into 27 bits. 398 */ 399 static inline bool test_time_stamp(u64 delta) 400 { 401 return !!(delta & TS_DELTA_TEST); 402 } 403 404 struct rb_irq_work { 405 struct irq_work work; 406 wait_queue_head_t waiters; 407 wait_queue_head_t full_waiters; 408 atomic_t seq; 409 bool waiters_pending; 410 bool full_waiters_pending; 411 bool wakeup_full; 412 }; 413 414 /* 415 * Structure to hold event state and handle nested events. 416 */ 417 struct rb_event_info { 418 u64 ts; 419 u64 delta; 420 u64 before; 421 u64 after; 422 unsigned long length; 423 struct buffer_page *tail_page; 424 int add_timestamp; 425 }; 426 427 /* 428 * Used for the add_timestamp 429 * NONE 430 * EXTEND - wants a time extend 431 * ABSOLUTE - the buffer requests all events to have absolute time stamps 432 * FORCE - force a full time stamp. 433 */ 434 enum { 435 RB_ADD_STAMP_NONE = 0, 436 RB_ADD_STAMP_EXTEND = BIT(1), 437 RB_ADD_STAMP_ABSOLUTE = BIT(2), 438 RB_ADD_STAMP_FORCE = BIT(3) 439 }; 440 /* 441 * Used for which event context the event is in. 442 * TRANSITION = 0 443 * NMI = 1 444 * IRQ = 2 445 * SOFTIRQ = 3 446 * NORMAL = 4 447 * 448 * See trace_recursive_lock() comment below for more details. 449 */ 450 enum { 451 RB_CTX_TRANSITION, 452 RB_CTX_NMI, 453 RB_CTX_IRQ, 454 RB_CTX_SOFTIRQ, 455 RB_CTX_NORMAL, 456 RB_CTX_MAX 457 }; 458 459 struct rb_time_struct { 460 local64_t time; 461 }; 462 typedef struct rb_time_struct rb_time_t; 463 464 #define MAX_NEST 5 465 466 /* 467 * head_page == tail_page && head == tail then buffer is empty. 468 */ 469 struct ring_buffer_per_cpu { 470 int cpu; 471 atomic_t record_disabled; 472 atomic_t resize_disabled; 473 struct trace_buffer *buffer; 474 raw_spinlock_t reader_lock; /* serialize readers */ 475 arch_spinlock_t lock; 476 struct lock_class_key lock_key; 477 struct buffer_data_page *free_page; 478 unsigned long nr_pages; 479 unsigned int current_context; 480 struct list_head *pages; 481 struct buffer_page *head_page; /* read from head */ 482 struct buffer_page *tail_page; /* write to tail */ 483 struct buffer_page *commit_page; /* committed pages */ 484 struct buffer_page *reader_page; 485 unsigned long lost_events; 486 unsigned long last_overrun; 487 unsigned long nest; 488 local_t entries_bytes; 489 local_t entries; 490 local_t overrun; 491 local_t commit_overrun; 492 local_t dropped_events; 493 local_t committing; 494 local_t commits; 495 local_t pages_touched; 496 local_t pages_lost; 497 local_t pages_read; 498 long last_pages_touch; 499 size_t shortest_full; 500 unsigned long read; 501 unsigned long read_bytes; 502 rb_time_t write_stamp; 503 rb_time_t before_stamp; 504 u64 event_stamp[MAX_NEST]; 505 u64 read_stamp; 506 /* pages removed since last reset */ 507 unsigned long pages_removed; 508 509 unsigned int mapped; 510 unsigned int user_mapped; /* user space mapping */ 511 struct mutex mapping_lock; 512 unsigned long *subbuf_ids; /* ID to subbuf VA */ 513 struct trace_buffer_meta *meta_page; 514 struct ring_buffer_meta *ring_meta; 515 516 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 517 long nr_pages_to_update; 518 struct list_head new_pages; /* new pages to add */ 519 struct work_struct update_pages_work; 520 struct completion update_done; 521 522 struct rb_irq_work irq_work; 523 }; 524 525 struct trace_buffer { 526 unsigned flags; 527 int cpus; 528 atomic_t record_disabled; 529 atomic_t resizing; 530 cpumask_var_t cpumask; 531 532 struct lock_class_key *reader_lock_key; 533 534 struct mutex mutex; 535 536 struct ring_buffer_per_cpu **buffers; 537 538 struct hlist_node node; 539 u64 (*clock)(void); 540 541 struct rb_irq_work irq_work; 542 bool time_stamp_abs; 543 544 unsigned long range_addr_start; 545 unsigned long range_addr_end; 546 547 long last_text_delta; 548 long last_data_delta; 549 550 unsigned int subbuf_size; 551 unsigned int subbuf_order; 552 unsigned int max_data_size; 553 }; 554 555 struct ring_buffer_iter { 556 struct ring_buffer_per_cpu *cpu_buffer; 557 unsigned long head; 558 unsigned long next_event; 559 struct buffer_page *head_page; 560 struct buffer_page *cache_reader_page; 561 unsigned long cache_read; 562 unsigned long cache_pages_removed; 563 u64 read_stamp; 564 u64 page_stamp; 565 struct ring_buffer_event *event; 566 size_t event_size; 567 int missed_events; 568 }; 569 570 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 571 { 572 struct buffer_data_page field; 573 574 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 575 "offset:0;\tsize:%u;\tsigned:%u;\n", 576 (unsigned int)sizeof(field.time_stamp), 577 (unsigned int)is_signed_type(u64)); 578 579 trace_seq_printf(s, "\tfield: local_t commit;\t" 580 "offset:%u;\tsize:%u;\tsigned:%u;\n", 581 (unsigned int)offsetof(typeof(field), commit), 582 (unsigned int)sizeof(field.commit), 583 (unsigned int)is_signed_type(long)); 584 585 trace_seq_printf(s, "\tfield: int overwrite;\t" 586 "offset:%u;\tsize:%u;\tsigned:%u;\n", 587 (unsigned int)offsetof(typeof(field), commit), 588 1, 589 (unsigned int)is_signed_type(long)); 590 591 trace_seq_printf(s, "\tfield: char data;\t" 592 "offset:%u;\tsize:%u;\tsigned:%u;\n", 593 (unsigned int)offsetof(typeof(field), data), 594 (unsigned int)buffer->subbuf_size, 595 (unsigned int)is_signed_type(char)); 596 597 return !trace_seq_has_overflowed(s); 598 } 599 600 static inline void rb_time_read(rb_time_t *t, u64 *ret) 601 { 602 *ret = local64_read(&t->time); 603 } 604 static void rb_time_set(rb_time_t *t, u64 val) 605 { 606 local64_set(&t->time, val); 607 } 608 609 /* 610 * Enable this to make sure that the event passed to 611 * ring_buffer_event_time_stamp() is not committed and also 612 * is on the buffer that it passed in. 613 */ 614 //#define RB_VERIFY_EVENT 615 #ifdef RB_VERIFY_EVENT 616 static struct list_head *rb_list_head(struct list_head *list); 617 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 618 void *event) 619 { 620 struct buffer_page *page = cpu_buffer->commit_page; 621 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 622 struct list_head *next; 623 long commit, write; 624 unsigned long addr = (unsigned long)event; 625 bool done = false; 626 int stop = 0; 627 628 /* Make sure the event exists and is not committed yet */ 629 do { 630 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 631 done = true; 632 commit = local_read(&page->page->commit); 633 write = local_read(&page->write); 634 if (addr >= (unsigned long)&page->page->data[commit] && 635 addr < (unsigned long)&page->page->data[write]) 636 return; 637 638 next = rb_list_head(page->list.next); 639 page = list_entry(next, struct buffer_page, list); 640 } while (!done); 641 WARN_ON_ONCE(1); 642 } 643 #else 644 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 645 void *event) 646 { 647 } 648 #endif 649 650 /* 651 * The absolute time stamp drops the 5 MSBs and some clocks may 652 * require them. The rb_fix_abs_ts() will take a previous full 653 * time stamp, and add the 5 MSB of that time stamp on to the 654 * saved absolute time stamp. Then they are compared in case of 655 * the unlikely event that the latest time stamp incremented 656 * the 5 MSB. 657 */ 658 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 659 { 660 if (save_ts & TS_MSB) { 661 abs |= save_ts & TS_MSB; 662 /* Check for overflow */ 663 if (unlikely(abs < save_ts)) 664 abs += 1ULL << 59; 665 } 666 return abs; 667 } 668 669 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 670 671 /** 672 * ring_buffer_event_time_stamp - return the event's current time stamp 673 * @buffer: The buffer that the event is on 674 * @event: the event to get the time stamp of 675 * 676 * Note, this must be called after @event is reserved, and before it is 677 * committed to the ring buffer. And must be called from the same 678 * context where the event was reserved (normal, softirq, irq, etc). 679 * 680 * Returns the time stamp associated with the current event. 681 * If the event has an extended time stamp, then that is used as 682 * the time stamp to return. 683 * In the highly unlikely case that the event was nested more than 684 * the max nesting, then the write_stamp of the buffer is returned, 685 * otherwise current time is returned, but that really neither of 686 * the last two cases should ever happen. 687 */ 688 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 689 struct ring_buffer_event *event) 690 { 691 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 692 unsigned int nest; 693 u64 ts; 694 695 /* If the event includes an absolute time, then just use that */ 696 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 697 ts = rb_event_time_stamp(event); 698 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 699 } 700 701 nest = local_read(&cpu_buffer->committing); 702 verify_event(cpu_buffer, event); 703 if (WARN_ON_ONCE(!nest)) 704 goto fail; 705 706 /* Read the current saved nesting level time stamp */ 707 if (likely(--nest < MAX_NEST)) 708 return cpu_buffer->event_stamp[nest]; 709 710 /* Shouldn't happen, warn if it does */ 711 WARN_ONCE(1, "nest (%d) greater than max", nest); 712 713 fail: 714 rb_time_read(&cpu_buffer->write_stamp, &ts); 715 716 return ts; 717 } 718 719 /** 720 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 721 * @buffer: The ring_buffer to get the number of pages from 722 * @cpu: The cpu of the ring_buffer to get the number of pages from 723 * 724 * Returns the number of pages that have content in the ring buffer. 725 */ 726 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 727 { 728 size_t read; 729 size_t lost; 730 size_t cnt; 731 732 read = local_read(&buffer->buffers[cpu]->pages_read); 733 lost = local_read(&buffer->buffers[cpu]->pages_lost); 734 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 735 736 if (WARN_ON_ONCE(cnt < lost)) 737 return 0; 738 739 cnt -= lost; 740 741 /* The reader can read an empty page, but not more than that */ 742 if (cnt < read) { 743 WARN_ON_ONCE(read > cnt + 1); 744 return 0; 745 } 746 747 return cnt - read; 748 } 749 750 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 751 { 752 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 753 size_t nr_pages; 754 size_t dirty; 755 756 nr_pages = cpu_buffer->nr_pages; 757 if (!nr_pages || !full) 758 return true; 759 760 /* 761 * Add one as dirty will never equal nr_pages, as the sub-buffer 762 * that the writer is on is not counted as dirty. 763 * This is needed if "buffer_percent" is set to 100. 764 */ 765 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 766 767 return (dirty * 100) >= (full * nr_pages); 768 } 769 770 /* 771 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 772 * 773 * Schedules a delayed work to wake up any task that is blocked on the 774 * ring buffer waiters queue. 775 */ 776 static void rb_wake_up_waiters(struct irq_work *work) 777 { 778 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 779 780 /* For waiters waiting for the first wake up */ 781 (void)atomic_fetch_inc_release(&rbwork->seq); 782 783 wake_up_all(&rbwork->waiters); 784 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 785 /* Only cpu_buffer sets the above flags */ 786 struct ring_buffer_per_cpu *cpu_buffer = 787 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 788 789 /* Called from interrupt context */ 790 raw_spin_lock(&cpu_buffer->reader_lock); 791 rbwork->wakeup_full = false; 792 rbwork->full_waiters_pending = false; 793 794 /* Waking up all waiters, they will reset the shortest full */ 795 cpu_buffer->shortest_full = 0; 796 raw_spin_unlock(&cpu_buffer->reader_lock); 797 798 wake_up_all(&rbwork->full_waiters); 799 } 800 } 801 802 /** 803 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 804 * @buffer: The ring buffer to wake waiters on 805 * @cpu: The CPU buffer to wake waiters on 806 * 807 * In the case of a file that represents a ring buffer is closing, 808 * it is prudent to wake up any waiters that are on this. 809 */ 810 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 811 { 812 struct ring_buffer_per_cpu *cpu_buffer; 813 struct rb_irq_work *rbwork; 814 815 if (!buffer) 816 return; 817 818 if (cpu == RING_BUFFER_ALL_CPUS) { 819 820 /* Wake up individual ones too. One level recursion */ 821 for_each_buffer_cpu(buffer, cpu) 822 ring_buffer_wake_waiters(buffer, cpu); 823 824 rbwork = &buffer->irq_work; 825 } else { 826 if (WARN_ON_ONCE(!buffer->buffers)) 827 return; 828 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 829 return; 830 831 cpu_buffer = buffer->buffers[cpu]; 832 /* The CPU buffer may not have been initialized yet */ 833 if (!cpu_buffer) 834 return; 835 rbwork = &cpu_buffer->irq_work; 836 } 837 838 /* This can be called in any context */ 839 irq_work_queue(&rbwork->work); 840 } 841 842 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 843 { 844 struct ring_buffer_per_cpu *cpu_buffer; 845 bool ret = false; 846 847 /* Reads of all CPUs always waits for any data */ 848 if (cpu == RING_BUFFER_ALL_CPUS) 849 return !ring_buffer_empty(buffer); 850 851 cpu_buffer = buffer->buffers[cpu]; 852 853 if (!ring_buffer_empty_cpu(buffer, cpu)) { 854 unsigned long flags; 855 bool pagebusy; 856 857 if (!full) 858 return true; 859 860 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 861 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 862 ret = !pagebusy && full_hit(buffer, cpu, full); 863 864 if (!ret && (!cpu_buffer->shortest_full || 865 cpu_buffer->shortest_full > full)) { 866 cpu_buffer->shortest_full = full; 867 } 868 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 869 } 870 return ret; 871 } 872 873 static inline bool 874 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 875 int cpu, int full, ring_buffer_cond_fn cond, void *data) 876 { 877 if (rb_watermark_hit(buffer, cpu, full)) 878 return true; 879 880 if (cond(data)) 881 return true; 882 883 /* 884 * The events can happen in critical sections where 885 * checking a work queue can cause deadlocks. 886 * After adding a task to the queue, this flag is set 887 * only to notify events to try to wake up the queue 888 * using irq_work. 889 * 890 * We don't clear it even if the buffer is no longer 891 * empty. The flag only causes the next event to run 892 * irq_work to do the work queue wake up. The worse 893 * that can happen if we race with !trace_empty() is that 894 * an event will cause an irq_work to try to wake up 895 * an empty queue. 896 * 897 * There's no reason to protect this flag either, as 898 * the work queue and irq_work logic will do the necessary 899 * synchronization for the wake ups. The only thing 900 * that is necessary is that the wake up happens after 901 * a task has been queued. It's OK for spurious wake ups. 902 */ 903 if (full) 904 rbwork->full_waiters_pending = true; 905 else 906 rbwork->waiters_pending = true; 907 908 return false; 909 } 910 911 struct rb_wait_data { 912 struct rb_irq_work *irq_work; 913 int seq; 914 }; 915 916 /* 917 * The default wait condition for ring_buffer_wait() is to just to exit the 918 * wait loop the first time it is woken up. 919 */ 920 static bool rb_wait_once(void *data) 921 { 922 struct rb_wait_data *rdata = data; 923 struct rb_irq_work *rbwork = rdata->irq_work; 924 925 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 926 } 927 928 /** 929 * ring_buffer_wait - wait for input to the ring buffer 930 * @buffer: buffer to wait on 931 * @cpu: the cpu buffer to wait on 932 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 933 * @cond: condition function to break out of wait (NULL to run once) 934 * @data: the data to pass to @cond. 935 * 936 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 937 * as data is added to any of the @buffer's cpu buffers. Otherwise 938 * it will wait for data to be added to a specific cpu buffer. 939 */ 940 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 941 ring_buffer_cond_fn cond, void *data) 942 { 943 struct ring_buffer_per_cpu *cpu_buffer; 944 struct wait_queue_head *waitq; 945 struct rb_irq_work *rbwork; 946 struct rb_wait_data rdata; 947 int ret = 0; 948 949 /* 950 * Depending on what the caller is waiting for, either any 951 * data in any cpu buffer, or a specific buffer, put the 952 * caller on the appropriate wait queue. 953 */ 954 if (cpu == RING_BUFFER_ALL_CPUS) { 955 rbwork = &buffer->irq_work; 956 /* Full only makes sense on per cpu reads */ 957 full = 0; 958 } else { 959 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 960 return -ENODEV; 961 cpu_buffer = buffer->buffers[cpu]; 962 rbwork = &cpu_buffer->irq_work; 963 } 964 965 if (full) 966 waitq = &rbwork->full_waiters; 967 else 968 waitq = &rbwork->waiters; 969 970 /* Set up to exit loop as soon as it is woken */ 971 if (!cond) { 972 cond = rb_wait_once; 973 rdata.irq_work = rbwork; 974 rdata.seq = atomic_read_acquire(&rbwork->seq); 975 data = &rdata; 976 } 977 978 ret = wait_event_interruptible((*waitq), 979 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 980 981 return ret; 982 } 983 984 /** 985 * ring_buffer_poll_wait - poll on buffer input 986 * @buffer: buffer to wait on 987 * @cpu: the cpu buffer to wait on 988 * @filp: the file descriptor 989 * @poll_table: The poll descriptor 990 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 991 * 992 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 993 * as data is added to any of the @buffer's cpu buffers. Otherwise 994 * it will wait for data to be added to a specific cpu buffer. 995 * 996 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 997 * zero otherwise. 998 */ 999 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1000 struct file *filp, poll_table *poll_table, int full) 1001 { 1002 struct ring_buffer_per_cpu *cpu_buffer; 1003 struct rb_irq_work *rbwork; 1004 1005 if (cpu == RING_BUFFER_ALL_CPUS) { 1006 rbwork = &buffer->irq_work; 1007 full = 0; 1008 } else { 1009 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1010 return EPOLLERR; 1011 1012 cpu_buffer = buffer->buffers[cpu]; 1013 rbwork = &cpu_buffer->irq_work; 1014 } 1015 1016 if (full) { 1017 poll_wait(filp, &rbwork->full_waiters, poll_table); 1018 1019 if (rb_watermark_hit(buffer, cpu, full)) 1020 return EPOLLIN | EPOLLRDNORM; 1021 /* 1022 * Only allow full_waiters_pending update to be seen after 1023 * the shortest_full is set (in rb_watermark_hit). If the 1024 * writer sees the full_waiters_pending flag set, it will 1025 * compare the amount in the ring buffer to shortest_full. 1026 * If the amount in the ring buffer is greater than the 1027 * shortest_full percent, it will call the irq_work handler 1028 * to wake up this list. The irq_handler will reset shortest_full 1029 * back to zero. That's done under the reader_lock, but 1030 * the below smp_mb() makes sure that the update to 1031 * full_waiters_pending doesn't leak up into the above. 1032 */ 1033 smp_mb(); 1034 rbwork->full_waiters_pending = true; 1035 return 0; 1036 } 1037 1038 poll_wait(filp, &rbwork->waiters, poll_table); 1039 rbwork->waiters_pending = true; 1040 1041 /* 1042 * There's a tight race between setting the waiters_pending and 1043 * checking if the ring buffer is empty. Once the waiters_pending bit 1044 * is set, the next event will wake the task up, but we can get stuck 1045 * if there's only a single event in. 1046 * 1047 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1048 * but adding a memory barrier to all events will cause too much of a 1049 * performance hit in the fast path. We only need a memory barrier when 1050 * the buffer goes from empty to having content. But as this race is 1051 * extremely small, and it's not a problem if another event comes in, we 1052 * will fix it later. 1053 */ 1054 smp_mb(); 1055 1056 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1057 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1058 return EPOLLIN | EPOLLRDNORM; 1059 return 0; 1060 } 1061 1062 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1063 #define RB_WARN_ON(b, cond) \ 1064 ({ \ 1065 int _____ret = unlikely(cond); \ 1066 if (_____ret) { \ 1067 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1068 struct ring_buffer_per_cpu *__b = \ 1069 (void *)b; \ 1070 atomic_inc(&__b->buffer->record_disabled); \ 1071 } else \ 1072 atomic_inc(&b->record_disabled); \ 1073 WARN_ON(1); \ 1074 } \ 1075 _____ret; \ 1076 }) 1077 1078 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1079 #define DEBUG_SHIFT 0 1080 1081 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1082 { 1083 u64 ts; 1084 1085 /* Skip retpolines :-( */ 1086 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1087 ts = trace_clock_local(); 1088 else 1089 ts = buffer->clock(); 1090 1091 /* shift to debug/test normalization and TIME_EXTENTS */ 1092 return ts << DEBUG_SHIFT; 1093 } 1094 1095 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1096 { 1097 u64 time; 1098 1099 preempt_disable_notrace(); 1100 time = rb_time_stamp(buffer); 1101 preempt_enable_notrace(); 1102 1103 return time; 1104 } 1105 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1106 1107 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1108 int cpu, u64 *ts) 1109 { 1110 /* Just stupid testing the normalize function and deltas */ 1111 *ts >>= DEBUG_SHIFT; 1112 } 1113 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1114 1115 /* 1116 * Making the ring buffer lockless makes things tricky. 1117 * Although writes only happen on the CPU that they are on, 1118 * and they only need to worry about interrupts. Reads can 1119 * happen on any CPU. 1120 * 1121 * The reader page is always off the ring buffer, but when the 1122 * reader finishes with a page, it needs to swap its page with 1123 * a new one from the buffer. The reader needs to take from 1124 * the head (writes go to the tail). But if a writer is in overwrite 1125 * mode and wraps, it must push the head page forward. 1126 * 1127 * Here lies the problem. 1128 * 1129 * The reader must be careful to replace only the head page, and 1130 * not another one. As described at the top of the file in the 1131 * ASCII art, the reader sets its old page to point to the next 1132 * page after head. It then sets the page after head to point to 1133 * the old reader page. But if the writer moves the head page 1134 * during this operation, the reader could end up with the tail. 1135 * 1136 * We use cmpxchg to help prevent this race. We also do something 1137 * special with the page before head. We set the LSB to 1. 1138 * 1139 * When the writer must push the page forward, it will clear the 1140 * bit that points to the head page, move the head, and then set 1141 * the bit that points to the new head page. 1142 * 1143 * We also don't want an interrupt coming in and moving the head 1144 * page on another writer. Thus we use the second LSB to catch 1145 * that too. Thus: 1146 * 1147 * head->list->prev->next bit 1 bit 0 1148 * ------- ------- 1149 * Normal page 0 0 1150 * Points to head page 0 1 1151 * New head page 1 0 1152 * 1153 * Note we can not trust the prev pointer of the head page, because: 1154 * 1155 * +----+ +-----+ +-----+ 1156 * | |------>| T |---X--->| N | 1157 * | |<------| | | | 1158 * +----+ +-----+ +-----+ 1159 * ^ ^ | 1160 * | +-----+ | | 1161 * +----------| R |----------+ | 1162 * | |<-----------+ 1163 * +-----+ 1164 * 1165 * Key: ---X--> HEAD flag set in pointer 1166 * T Tail page 1167 * R Reader page 1168 * N Next page 1169 * 1170 * (see __rb_reserve_next() to see where this happens) 1171 * 1172 * What the above shows is that the reader just swapped out 1173 * the reader page with a page in the buffer, but before it 1174 * could make the new header point back to the new page added 1175 * it was preempted by a writer. The writer moved forward onto 1176 * the new page added by the reader and is about to move forward 1177 * again. 1178 * 1179 * You can see, it is legitimate for the previous pointer of 1180 * the head (or any page) not to point back to itself. But only 1181 * temporarily. 1182 */ 1183 1184 #define RB_PAGE_NORMAL 0UL 1185 #define RB_PAGE_HEAD 1UL 1186 #define RB_PAGE_UPDATE 2UL 1187 1188 1189 #define RB_FLAG_MASK 3UL 1190 1191 /* PAGE_MOVED is not part of the mask */ 1192 #define RB_PAGE_MOVED 4UL 1193 1194 /* 1195 * rb_list_head - remove any bit 1196 */ 1197 static struct list_head *rb_list_head(struct list_head *list) 1198 { 1199 unsigned long val = (unsigned long)list; 1200 1201 return (struct list_head *)(val & ~RB_FLAG_MASK); 1202 } 1203 1204 /* 1205 * rb_is_head_page - test if the given page is the head page 1206 * 1207 * Because the reader may move the head_page pointer, we can 1208 * not trust what the head page is (it may be pointing to 1209 * the reader page). But if the next page is a header page, 1210 * its flags will be non zero. 1211 */ 1212 static inline int 1213 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1214 { 1215 unsigned long val; 1216 1217 val = (unsigned long)list->next; 1218 1219 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1220 return RB_PAGE_MOVED; 1221 1222 return val & RB_FLAG_MASK; 1223 } 1224 1225 /* 1226 * rb_is_reader_page 1227 * 1228 * The unique thing about the reader page, is that, if the 1229 * writer is ever on it, the previous pointer never points 1230 * back to the reader page. 1231 */ 1232 static bool rb_is_reader_page(struct buffer_page *page) 1233 { 1234 struct list_head *list = page->list.prev; 1235 1236 return rb_list_head(list->next) != &page->list; 1237 } 1238 1239 /* 1240 * rb_set_list_to_head - set a list_head to be pointing to head. 1241 */ 1242 static void rb_set_list_to_head(struct list_head *list) 1243 { 1244 unsigned long *ptr; 1245 1246 ptr = (unsigned long *)&list->next; 1247 *ptr |= RB_PAGE_HEAD; 1248 *ptr &= ~RB_PAGE_UPDATE; 1249 } 1250 1251 /* 1252 * rb_head_page_activate - sets up head page 1253 */ 1254 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1255 { 1256 struct buffer_page *head; 1257 1258 head = cpu_buffer->head_page; 1259 if (!head) 1260 return; 1261 1262 /* 1263 * Set the previous list pointer to have the HEAD flag. 1264 */ 1265 rb_set_list_to_head(head->list.prev); 1266 1267 if (cpu_buffer->ring_meta) { 1268 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 1269 meta->head_buffer = (unsigned long)head->page; 1270 } 1271 } 1272 1273 static void rb_list_head_clear(struct list_head *list) 1274 { 1275 unsigned long *ptr = (unsigned long *)&list->next; 1276 1277 *ptr &= ~RB_FLAG_MASK; 1278 } 1279 1280 /* 1281 * rb_head_page_deactivate - clears head page ptr (for free list) 1282 */ 1283 static void 1284 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1285 { 1286 struct list_head *hd; 1287 1288 /* Go through the whole list and clear any pointers found. */ 1289 rb_list_head_clear(cpu_buffer->pages); 1290 1291 list_for_each(hd, cpu_buffer->pages) 1292 rb_list_head_clear(hd); 1293 } 1294 1295 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1296 struct buffer_page *head, 1297 struct buffer_page *prev, 1298 int old_flag, int new_flag) 1299 { 1300 struct list_head *list; 1301 unsigned long val = (unsigned long)&head->list; 1302 unsigned long ret; 1303 1304 list = &prev->list; 1305 1306 val &= ~RB_FLAG_MASK; 1307 1308 ret = cmpxchg((unsigned long *)&list->next, 1309 val | old_flag, val | new_flag); 1310 1311 /* check if the reader took the page */ 1312 if ((ret & ~RB_FLAG_MASK) != val) 1313 return RB_PAGE_MOVED; 1314 1315 return ret & RB_FLAG_MASK; 1316 } 1317 1318 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1319 struct buffer_page *head, 1320 struct buffer_page *prev, 1321 int old_flag) 1322 { 1323 return rb_head_page_set(cpu_buffer, head, prev, 1324 old_flag, RB_PAGE_UPDATE); 1325 } 1326 1327 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1328 struct buffer_page *head, 1329 struct buffer_page *prev, 1330 int old_flag) 1331 { 1332 return rb_head_page_set(cpu_buffer, head, prev, 1333 old_flag, RB_PAGE_HEAD); 1334 } 1335 1336 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1337 struct buffer_page *head, 1338 struct buffer_page *prev, 1339 int old_flag) 1340 { 1341 return rb_head_page_set(cpu_buffer, head, prev, 1342 old_flag, RB_PAGE_NORMAL); 1343 } 1344 1345 static inline void rb_inc_page(struct buffer_page **bpage) 1346 { 1347 struct list_head *p = rb_list_head((*bpage)->list.next); 1348 1349 *bpage = list_entry(p, struct buffer_page, list); 1350 } 1351 1352 static struct buffer_page * 1353 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1354 { 1355 struct buffer_page *head; 1356 struct buffer_page *page; 1357 struct list_head *list; 1358 int i; 1359 1360 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1361 return NULL; 1362 1363 /* sanity check */ 1364 list = cpu_buffer->pages; 1365 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1366 return NULL; 1367 1368 page = head = cpu_buffer->head_page; 1369 /* 1370 * It is possible that the writer moves the header behind 1371 * where we started, and we miss in one loop. 1372 * A second loop should grab the header, but we'll do 1373 * three loops just because I'm paranoid. 1374 */ 1375 for (i = 0; i < 3; i++) { 1376 do { 1377 if (rb_is_head_page(page, page->list.prev)) { 1378 cpu_buffer->head_page = page; 1379 return page; 1380 } 1381 rb_inc_page(&page); 1382 } while (page != head); 1383 } 1384 1385 RB_WARN_ON(cpu_buffer, 1); 1386 1387 return NULL; 1388 } 1389 1390 static bool rb_head_page_replace(struct buffer_page *old, 1391 struct buffer_page *new) 1392 { 1393 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1394 unsigned long val; 1395 1396 val = *ptr & ~RB_FLAG_MASK; 1397 val |= RB_PAGE_HEAD; 1398 1399 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1400 } 1401 1402 /* 1403 * rb_tail_page_update - move the tail page forward 1404 */ 1405 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1406 struct buffer_page *tail_page, 1407 struct buffer_page *next_page) 1408 { 1409 unsigned long old_entries; 1410 unsigned long old_write; 1411 1412 /* 1413 * The tail page now needs to be moved forward. 1414 * 1415 * We need to reset the tail page, but without messing 1416 * with possible erasing of data brought in by interrupts 1417 * that have moved the tail page and are currently on it. 1418 * 1419 * We add a counter to the write field to denote this. 1420 */ 1421 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1422 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1423 1424 /* 1425 * Just make sure we have seen our old_write and synchronize 1426 * with any interrupts that come in. 1427 */ 1428 barrier(); 1429 1430 /* 1431 * If the tail page is still the same as what we think 1432 * it is, then it is up to us to update the tail 1433 * pointer. 1434 */ 1435 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1436 /* Zero the write counter */ 1437 unsigned long val = old_write & ~RB_WRITE_MASK; 1438 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1439 1440 /* 1441 * This will only succeed if an interrupt did 1442 * not come in and change it. In which case, we 1443 * do not want to modify it. 1444 * 1445 * We add (void) to let the compiler know that we do not care 1446 * about the return value of these functions. We use the 1447 * cmpxchg to only update if an interrupt did not already 1448 * do it for us. If the cmpxchg fails, we don't care. 1449 */ 1450 (void)local_cmpxchg(&next_page->write, old_write, val); 1451 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1452 1453 /* 1454 * No need to worry about races with clearing out the commit. 1455 * it only can increment when a commit takes place. But that 1456 * only happens in the outer most nested commit. 1457 */ 1458 local_set(&next_page->page->commit, 0); 1459 1460 /* Either we update tail_page or an interrupt does */ 1461 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1462 local_inc(&cpu_buffer->pages_touched); 1463 } 1464 } 1465 1466 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1467 struct buffer_page *bpage) 1468 { 1469 unsigned long val = (unsigned long)bpage; 1470 1471 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1472 } 1473 1474 /** 1475 * rb_check_pages - integrity check of buffer pages 1476 * @cpu_buffer: CPU buffer with pages to test 1477 * 1478 * As a safety measure we check to make sure the data pages have not 1479 * been corrupted. 1480 * 1481 * Callers of this function need to guarantee that the list of pages doesn't get 1482 * modified during the check. In particular, if it's possible that the function 1483 * is invoked with concurrent readers which can swap in a new reader page then 1484 * the caller should take cpu_buffer->reader_lock. 1485 */ 1486 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1487 { 1488 struct list_head *head = rb_list_head(cpu_buffer->pages); 1489 struct list_head *tmp; 1490 1491 if (RB_WARN_ON(cpu_buffer, 1492 rb_list_head(rb_list_head(head->next)->prev) != head)) 1493 return; 1494 1495 if (RB_WARN_ON(cpu_buffer, 1496 rb_list_head(rb_list_head(head->prev)->next) != head)) 1497 return; 1498 1499 for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) { 1500 if (RB_WARN_ON(cpu_buffer, 1501 rb_list_head(rb_list_head(tmp->next)->prev) != tmp)) 1502 return; 1503 1504 if (RB_WARN_ON(cpu_buffer, 1505 rb_list_head(rb_list_head(tmp->prev)->next) != tmp)) 1506 return; 1507 } 1508 } 1509 1510 /* 1511 * Take an address, add the meta data size as well as the array of 1512 * array subbuffer indexes, then align it to a subbuffer size. 1513 * 1514 * This is used to help find the next per cpu subbuffer within a mapped range. 1515 */ 1516 static unsigned long 1517 rb_range_align_subbuf(unsigned long addr, int subbuf_size, int nr_subbufs) 1518 { 1519 addr += sizeof(struct ring_buffer_meta) + 1520 sizeof(int) * nr_subbufs; 1521 return ALIGN(addr, subbuf_size); 1522 } 1523 1524 /* 1525 * Return the ring_buffer_meta for a given @cpu. 1526 */ 1527 static void *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu) 1528 { 1529 int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 1530 unsigned long ptr = buffer->range_addr_start; 1531 struct ring_buffer_meta *meta; 1532 int nr_subbufs; 1533 1534 if (!ptr) 1535 return NULL; 1536 1537 /* When nr_pages passed in is zero, the first meta has already been initialized */ 1538 if (!nr_pages) { 1539 meta = (struct ring_buffer_meta *)ptr; 1540 nr_subbufs = meta->nr_subbufs; 1541 } else { 1542 meta = NULL; 1543 /* Include the reader page */ 1544 nr_subbufs = nr_pages + 1; 1545 } 1546 1547 /* 1548 * The first chunk may not be subbuffer aligned, where as 1549 * the rest of the chunks are. 1550 */ 1551 if (cpu) { 1552 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1553 ptr += subbuf_size * nr_subbufs; 1554 1555 /* We can use multiplication to find chunks greater than 1 */ 1556 if (cpu > 1) { 1557 unsigned long size; 1558 unsigned long p; 1559 1560 /* Save the beginning of this CPU chunk */ 1561 p = ptr; 1562 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1563 ptr += subbuf_size * nr_subbufs; 1564 1565 /* Now all chunks after this are the same size */ 1566 size = ptr - p; 1567 ptr += size * (cpu - 2); 1568 } 1569 } 1570 return (void *)ptr; 1571 } 1572 1573 /* Return the start of subbufs given the meta pointer */ 1574 static void *rb_subbufs_from_meta(struct ring_buffer_meta *meta) 1575 { 1576 int subbuf_size = meta->subbuf_size; 1577 unsigned long ptr; 1578 1579 ptr = (unsigned long)meta; 1580 ptr = rb_range_align_subbuf(ptr, subbuf_size, meta->nr_subbufs); 1581 1582 return (void *)ptr; 1583 } 1584 1585 /* 1586 * Return a specific sub-buffer for a given @cpu defined by @idx. 1587 */ 1588 static void *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx) 1589 { 1590 struct ring_buffer_meta *meta; 1591 unsigned long ptr; 1592 int subbuf_size; 1593 1594 meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu); 1595 if (!meta) 1596 return NULL; 1597 1598 if (WARN_ON_ONCE(idx >= meta->nr_subbufs)) 1599 return NULL; 1600 1601 subbuf_size = meta->subbuf_size; 1602 1603 /* Map this buffer to the order that's in meta->buffers[] */ 1604 idx = meta->buffers[idx]; 1605 1606 ptr = (unsigned long)rb_subbufs_from_meta(meta); 1607 1608 ptr += subbuf_size * idx; 1609 if (ptr + subbuf_size > cpu_buffer->buffer->range_addr_end) 1610 return NULL; 1611 1612 return (void *)ptr; 1613 } 1614 1615 /* 1616 * See if the existing memory contains valid ring buffer data. 1617 * As the previous kernel must be the same as this kernel, all 1618 * the calculations (size of buffers and number of buffers) 1619 * must be the same. 1620 */ 1621 static bool rb_meta_valid(struct ring_buffer_meta *meta, int cpu, 1622 struct trace_buffer *buffer, int nr_pages) 1623 { 1624 int subbuf_size = PAGE_SIZE; 1625 struct buffer_data_page *subbuf; 1626 unsigned long buffers_start; 1627 unsigned long buffers_end; 1628 int i; 1629 1630 /* The subbuffer's size and number of subbuffers must match */ 1631 if (meta->subbuf_size != subbuf_size || 1632 meta->nr_subbufs != nr_pages + 1) { 1633 pr_info("Ring buffer boot meta [%d] mismatch of subbuf_size/nr_pages\n", cpu); 1634 return false; 1635 } 1636 1637 buffers_start = meta->first_buffer; 1638 buffers_end = meta->first_buffer + (subbuf_size * meta->nr_subbufs); 1639 1640 /* Is the head and commit buffers within the range of buffers? */ 1641 if (meta->head_buffer < buffers_start || 1642 meta->head_buffer >= buffers_end) { 1643 pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu); 1644 return false; 1645 } 1646 1647 if (meta->commit_buffer < buffers_start || 1648 meta->commit_buffer >= buffers_end) { 1649 pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu); 1650 return false; 1651 } 1652 1653 subbuf = rb_subbufs_from_meta(meta); 1654 1655 /* Is the meta buffers and the subbufs themselves have correct data? */ 1656 for (i = 0; i < meta->nr_subbufs; i++) { 1657 if (meta->buffers[i] < 0 || 1658 meta->buffers[i] >= meta->nr_subbufs) { 1659 pr_info("Ring buffer boot meta [%d] array out of range\n", cpu); 1660 return false; 1661 } 1662 1663 if ((unsigned)local_read(&subbuf->commit) > subbuf_size) { 1664 pr_info("Ring buffer boot meta [%d] buffer invalid commit\n", cpu); 1665 return false; 1666 } 1667 1668 subbuf = (void *)subbuf + subbuf_size; 1669 } 1670 1671 return true; 1672 } 1673 1674 static int rb_meta_subbuf_idx(struct ring_buffer_meta *meta, void *subbuf); 1675 1676 static int rb_read_data_buffer(struct buffer_data_page *dpage, int tail, int cpu, 1677 unsigned long long *timestamp, u64 *delta_ptr) 1678 { 1679 struct ring_buffer_event *event; 1680 u64 ts, delta; 1681 int events = 0; 1682 int e; 1683 1684 *delta_ptr = 0; 1685 *timestamp = 0; 1686 1687 ts = dpage->time_stamp; 1688 1689 for (e = 0; e < tail; e += rb_event_length(event)) { 1690 1691 event = (struct ring_buffer_event *)(dpage->data + e); 1692 1693 switch (event->type_len) { 1694 1695 case RINGBUF_TYPE_TIME_EXTEND: 1696 delta = rb_event_time_stamp(event); 1697 ts += delta; 1698 break; 1699 1700 case RINGBUF_TYPE_TIME_STAMP: 1701 delta = rb_event_time_stamp(event); 1702 delta = rb_fix_abs_ts(delta, ts); 1703 if (delta < ts) { 1704 *delta_ptr = delta; 1705 *timestamp = ts; 1706 return -1; 1707 } 1708 ts = delta; 1709 break; 1710 1711 case RINGBUF_TYPE_PADDING: 1712 if (event->time_delta == 1) 1713 break; 1714 fallthrough; 1715 case RINGBUF_TYPE_DATA: 1716 events++; 1717 ts += event->time_delta; 1718 break; 1719 1720 default: 1721 return -1; 1722 } 1723 } 1724 *timestamp = ts; 1725 return events; 1726 } 1727 1728 static int rb_validate_buffer(struct buffer_data_page *dpage, int cpu) 1729 { 1730 unsigned long long ts; 1731 u64 delta; 1732 int tail; 1733 1734 tail = local_read(&dpage->commit); 1735 return rb_read_data_buffer(dpage, tail, cpu, &ts, &delta); 1736 } 1737 1738 /* If the meta data has been validated, now validate the events */ 1739 static void rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer) 1740 { 1741 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 1742 struct buffer_page *head_page; 1743 unsigned long entry_bytes = 0; 1744 unsigned long entries = 0; 1745 int ret; 1746 int i; 1747 1748 if (!meta || !meta->head_buffer) 1749 return; 1750 1751 /* Do the reader page first */ 1752 ret = rb_validate_buffer(cpu_buffer->reader_page->page, cpu_buffer->cpu); 1753 if (ret < 0) { 1754 pr_info("Ring buffer reader page is invalid\n"); 1755 goto invalid; 1756 } 1757 entries += ret; 1758 entry_bytes += local_read(&cpu_buffer->reader_page->page->commit); 1759 local_set(&cpu_buffer->reader_page->entries, ret); 1760 1761 head_page = cpu_buffer->head_page; 1762 1763 /* If both the head and commit are on the reader_page then we are done. */ 1764 if (head_page == cpu_buffer->reader_page && 1765 head_page == cpu_buffer->commit_page) 1766 goto done; 1767 1768 /* Iterate until finding the commit page */ 1769 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) { 1770 1771 /* Reader page has already been done */ 1772 if (head_page == cpu_buffer->reader_page) 1773 continue; 1774 1775 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 1776 if (ret < 0) { 1777 pr_info("Ring buffer meta [%d] invalid buffer page\n", 1778 cpu_buffer->cpu); 1779 goto invalid; 1780 } 1781 entries += ret; 1782 entry_bytes += local_read(&head_page->page->commit); 1783 local_set(&cpu_buffer->head_page->entries, ret); 1784 1785 if (head_page == cpu_buffer->commit_page) 1786 break; 1787 } 1788 1789 if (head_page != cpu_buffer->commit_page) { 1790 pr_info("Ring buffer meta [%d] commit page not found\n", 1791 cpu_buffer->cpu); 1792 goto invalid; 1793 } 1794 done: 1795 local_set(&cpu_buffer->entries, entries); 1796 local_set(&cpu_buffer->entries_bytes, entry_bytes); 1797 1798 pr_info("Ring buffer meta [%d] is from previous boot!\n", cpu_buffer->cpu); 1799 return; 1800 1801 invalid: 1802 /* The content of the buffers are invalid, reset the meta data */ 1803 meta->head_buffer = 0; 1804 meta->commit_buffer = 0; 1805 1806 /* Reset the reader page */ 1807 local_set(&cpu_buffer->reader_page->entries, 0); 1808 local_set(&cpu_buffer->reader_page->page->commit, 0); 1809 1810 /* Reset all the subbuffers */ 1811 for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) { 1812 local_set(&head_page->entries, 0); 1813 local_set(&head_page->page->commit, 0); 1814 } 1815 } 1816 1817 /* Used to calculate data delta */ 1818 static char rb_data_ptr[] = ""; 1819 1820 #define THIS_TEXT_PTR ((unsigned long)rb_meta_init_text_addr) 1821 #define THIS_DATA_PTR ((unsigned long)rb_data_ptr) 1822 1823 static void rb_meta_init_text_addr(struct ring_buffer_meta *meta) 1824 { 1825 meta->text_addr = THIS_TEXT_PTR; 1826 meta->data_addr = THIS_DATA_PTR; 1827 } 1828 1829 static void rb_range_meta_init(struct trace_buffer *buffer, int nr_pages) 1830 { 1831 struct ring_buffer_meta *meta; 1832 unsigned long delta; 1833 void *subbuf; 1834 int cpu; 1835 int i; 1836 1837 for (cpu = 0; cpu < nr_cpu_ids; cpu++) { 1838 void *next_meta; 1839 1840 meta = rb_range_meta(buffer, nr_pages, cpu); 1841 1842 if (rb_meta_valid(meta, cpu, buffer, nr_pages)) { 1843 /* Make the mappings match the current address */ 1844 subbuf = rb_subbufs_from_meta(meta); 1845 delta = (unsigned long)subbuf - meta->first_buffer; 1846 meta->first_buffer += delta; 1847 meta->head_buffer += delta; 1848 meta->commit_buffer += delta; 1849 buffer->last_text_delta = THIS_TEXT_PTR - meta->text_addr; 1850 buffer->last_data_delta = THIS_DATA_PTR - meta->data_addr; 1851 continue; 1852 } 1853 1854 if (cpu < nr_cpu_ids - 1) 1855 next_meta = rb_range_meta(buffer, nr_pages, cpu + 1); 1856 else 1857 next_meta = (void *)buffer->range_addr_end; 1858 1859 memset(meta, 0, next_meta - (void *)meta); 1860 1861 meta->nr_subbufs = nr_pages + 1; 1862 meta->subbuf_size = PAGE_SIZE; 1863 1864 subbuf = rb_subbufs_from_meta(meta); 1865 1866 meta->first_buffer = (unsigned long)subbuf; 1867 rb_meta_init_text_addr(meta); 1868 1869 /* 1870 * The buffers[] array holds the order of the sub-buffers 1871 * that are after the meta data. The sub-buffers may 1872 * be swapped out when read and inserted into a different 1873 * location of the ring buffer. Although their addresses 1874 * remain the same, the buffers[] array contains the 1875 * index into the sub-buffers holding their actual order. 1876 */ 1877 for (i = 0; i < meta->nr_subbufs; i++) { 1878 meta->buffers[i] = i; 1879 rb_init_page(subbuf); 1880 subbuf += meta->subbuf_size; 1881 } 1882 } 1883 } 1884 1885 static void *rbm_start(struct seq_file *m, loff_t *pos) 1886 { 1887 struct ring_buffer_per_cpu *cpu_buffer = m->private; 1888 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 1889 unsigned long val; 1890 1891 if (!meta) 1892 return NULL; 1893 1894 if (*pos > meta->nr_subbufs) 1895 return NULL; 1896 1897 val = *pos; 1898 val++; 1899 1900 return (void *)val; 1901 } 1902 1903 static void *rbm_next(struct seq_file *m, void *v, loff_t *pos) 1904 { 1905 (*pos)++; 1906 1907 return rbm_start(m, pos); 1908 } 1909 1910 static int rbm_show(struct seq_file *m, void *v) 1911 { 1912 struct ring_buffer_per_cpu *cpu_buffer = m->private; 1913 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 1914 unsigned long val = (unsigned long)v; 1915 1916 if (val == 1) { 1917 seq_printf(m, "head_buffer: %d\n", 1918 rb_meta_subbuf_idx(meta, (void *)meta->head_buffer)); 1919 seq_printf(m, "commit_buffer: %d\n", 1920 rb_meta_subbuf_idx(meta, (void *)meta->commit_buffer)); 1921 seq_printf(m, "subbuf_size: %d\n", meta->subbuf_size); 1922 seq_printf(m, "nr_subbufs: %d\n", meta->nr_subbufs); 1923 return 0; 1924 } 1925 1926 val -= 2; 1927 seq_printf(m, "buffer[%ld]: %d\n", val, meta->buffers[val]); 1928 1929 return 0; 1930 } 1931 1932 static void rbm_stop(struct seq_file *m, void *p) 1933 { 1934 } 1935 1936 static const struct seq_operations rb_meta_seq_ops = { 1937 .start = rbm_start, 1938 .next = rbm_next, 1939 .show = rbm_show, 1940 .stop = rbm_stop, 1941 }; 1942 1943 int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu) 1944 { 1945 struct seq_file *m; 1946 int ret; 1947 1948 ret = seq_open(file, &rb_meta_seq_ops); 1949 if (ret) 1950 return ret; 1951 1952 m = file->private_data; 1953 m->private = buffer->buffers[cpu]; 1954 1955 return 0; 1956 } 1957 1958 /* Map the buffer_pages to the previous head and commit pages */ 1959 static void rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer, 1960 struct buffer_page *bpage) 1961 { 1962 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 1963 1964 if (meta->head_buffer == (unsigned long)bpage->page) 1965 cpu_buffer->head_page = bpage; 1966 1967 if (meta->commit_buffer == (unsigned long)bpage->page) { 1968 cpu_buffer->commit_page = bpage; 1969 cpu_buffer->tail_page = bpage; 1970 } 1971 } 1972 1973 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1974 long nr_pages, struct list_head *pages) 1975 { 1976 struct trace_buffer *buffer = cpu_buffer->buffer; 1977 struct ring_buffer_meta *meta = NULL; 1978 struct buffer_page *bpage, *tmp; 1979 bool user_thread = current->mm != NULL; 1980 gfp_t mflags; 1981 long i; 1982 1983 /* 1984 * Check if the available memory is there first. 1985 * Note, si_mem_available() only gives us a rough estimate of available 1986 * memory. It may not be accurate. But we don't care, we just want 1987 * to prevent doing any allocation when it is obvious that it is 1988 * not going to succeed. 1989 */ 1990 i = si_mem_available(); 1991 if (i < nr_pages) 1992 return -ENOMEM; 1993 1994 /* 1995 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 1996 * gracefully without invoking oom-killer and the system is not 1997 * destabilized. 1998 */ 1999 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 2000 2001 /* 2002 * If a user thread allocates too much, and si_mem_available() 2003 * reports there's enough memory, even though there is not. 2004 * Make sure the OOM killer kills this thread. This can happen 2005 * even with RETRY_MAYFAIL because another task may be doing 2006 * an allocation after this task has taken all memory. 2007 * This is the task the OOM killer needs to take out during this 2008 * loop, even if it was triggered by an allocation somewhere else. 2009 */ 2010 if (user_thread) 2011 set_current_oom_origin(); 2012 2013 if (buffer->range_addr_start) 2014 meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu); 2015 2016 for (i = 0; i < nr_pages; i++) { 2017 struct page *page; 2018 2019 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 2020 mflags, cpu_to_node(cpu_buffer->cpu)); 2021 if (!bpage) 2022 goto free_pages; 2023 2024 rb_check_bpage(cpu_buffer, bpage); 2025 2026 /* 2027 * Append the pages as for mapped buffers we want to keep 2028 * the order 2029 */ 2030 list_add_tail(&bpage->list, pages); 2031 2032 if (meta) { 2033 /* A range was given. Use that for the buffer page */ 2034 bpage->page = rb_range_buffer(cpu_buffer, i + 1); 2035 if (!bpage->page) 2036 goto free_pages; 2037 /* If this is valid from a previous boot */ 2038 if (meta->head_buffer) 2039 rb_meta_buffer_update(cpu_buffer, bpage); 2040 bpage->range = 1; 2041 bpage->id = i + 1; 2042 } else { 2043 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), 2044 mflags | __GFP_COMP | __GFP_ZERO, 2045 cpu_buffer->buffer->subbuf_order); 2046 if (!page) 2047 goto free_pages; 2048 bpage->page = page_address(page); 2049 rb_init_page(bpage->page); 2050 } 2051 bpage->order = cpu_buffer->buffer->subbuf_order; 2052 2053 if (user_thread && fatal_signal_pending(current)) 2054 goto free_pages; 2055 } 2056 if (user_thread) 2057 clear_current_oom_origin(); 2058 2059 return 0; 2060 2061 free_pages: 2062 list_for_each_entry_safe(bpage, tmp, pages, list) { 2063 list_del_init(&bpage->list); 2064 free_buffer_page(bpage); 2065 } 2066 if (user_thread) 2067 clear_current_oom_origin(); 2068 2069 return -ENOMEM; 2070 } 2071 2072 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2073 unsigned long nr_pages) 2074 { 2075 LIST_HEAD(pages); 2076 2077 WARN_ON(!nr_pages); 2078 2079 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 2080 return -ENOMEM; 2081 2082 /* 2083 * The ring buffer page list is a circular list that does not 2084 * start and end with a list head. All page list items point to 2085 * other pages. 2086 */ 2087 cpu_buffer->pages = pages.next; 2088 list_del(&pages); 2089 2090 cpu_buffer->nr_pages = nr_pages; 2091 2092 rb_check_pages(cpu_buffer); 2093 2094 return 0; 2095 } 2096 2097 static struct ring_buffer_per_cpu * 2098 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 2099 { 2100 struct ring_buffer_per_cpu *cpu_buffer; 2101 struct ring_buffer_meta *meta; 2102 struct buffer_page *bpage; 2103 struct page *page; 2104 int ret; 2105 2106 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 2107 GFP_KERNEL, cpu_to_node(cpu)); 2108 if (!cpu_buffer) 2109 return NULL; 2110 2111 cpu_buffer->cpu = cpu; 2112 cpu_buffer->buffer = buffer; 2113 raw_spin_lock_init(&cpu_buffer->reader_lock); 2114 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 2115 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 2116 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 2117 init_completion(&cpu_buffer->update_done); 2118 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 2119 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 2120 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 2121 mutex_init(&cpu_buffer->mapping_lock); 2122 2123 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 2124 GFP_KERNEL, cpu_to_node(cpu)); 2125 if (!bpage) 2126 goto fail_free_buffer; 2127 2128 rb_check_bpage(cpu_buffer, bpage); 2129 2130 cpu_buffer->reader_page = bpage; 2131 2132 if (buffer->range_addr_start) { 2133 /* 2134 * Range mapped buffers have the same restrictions as memory 2135 * mapped ones do. 2136 */ 2137 cpu_buffer->mapped = 1; 2138 cpu_buffer->ring_meta = rb_range_meta(buffer, nr_pages, cpu); 2139 bpage->page = rb_range_buffer(cpu_buffer, 0); 2140 if (!bpage->page) 2141 goto fail_free_reader; 2142 if (cpu_buffer->ring_meta->head_buffer) 2143 rb_meta_buffer_update(cpu_buffer, bpage); 2144 bpage->range = 1; 2145 } else { 2146 page = alloc_pages_node(cpu_to_node(cpu), 2147 GFP_KERNEL | __GFP_COMP | __GFP_ZERO, 2148 cpu_buffer->buffer->subbuf_order); 2149 if (!page) 2150 goto fail_free_reader; 2151 bpage->page = page_address(page); 2152 rb_init_page(bpage->page); 2153 } 2154 2155 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2156 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2157 2158 ret = rb_allocate_pages(cpu_buffer, nr_pages); 2159 if (ret < 0) 2160 goto fail_free_reader; 2161 2162 rb_meta_validate_events(cpu_buffer); 2163 2164 /* If the boot meta was valid then this has already been updated */ 2165 meta = cpu_buffer->ring_meta; 2166 if (!meta || !meta->head_buffer || 2167 !cpu_buffer->head_page || !cpu_buffer->commit_page || !cpu_buffer->tail_page) { 2168 if (meta && meta->head_buffer && 2169 (cpu_buffer->head_page || cpu_buffer->commit_page || cpu_buffer->tail_page)) { 2170 pr_warn("Ring buffer meta buffers not all mapped\n"); 2171 if (!cpu_buffer->head_page) 2172 pr_warn(" Missing head_page\n"); 2173 if (!cpu_buffer->commit_page) 2174 pr_warn(" Missing commit_page\n"); 2175 if (!cpu_buffer->tail_page) 2176 pr_warn(" Missing tail_page\n"); 2177 } 2178 2179 cpu_buffer->head_page 2180 = list_entry(cpu_buffer->pages, struct buffer_page, list); 2181 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 2182 2183 rb_head_page_activate(cpu_buffer); 2184 2185 if (cpu_buffer->ring_meta) 2186 meta->commit_buffer = meta->head_buffer; 2187 } else { 2188 /* The valid meta buffer still needs to activate the head page */ 2189 rb_head_page_activate(cpu_buffer); 2190 } 2191 2192 return cpu_buffer; 2193 2194 fail_free_reader: 2195 free_buffer_page(cpu_buffer->reader_page); 2196 2197 fail_free_buffer: 2198 kfree(cpu_buffer); 2199 return NULL; 2200 } 2201 2202 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 2203 { 2204 struct list_head *head = cpu_buffer->pages; 2205 struct buffer_page *bpage, *tmp; 2206 2207 irq_work_sync(&cpu_buffer->irq_work.work); 2208 2209 free_buffer_page(cpu_buffer->reader_page); 2210 2211 if (head) { 2212 rb_head_page_deactivate(cpu_buffer); 2213 2214 list_for_each_entry_safe(bpage, tmp, head, list) { 2215 list_del_init(&bpage->list); 2216 free_buffer_page(bpage); 2217 } 2218 bpage = list_entry(head, struct buffer_page, list); 2219 free_buffer_page(bpage); 2220 } 2221 2222 free_page((unsigned long)cpu_buffer->free_page); 2223 2224 kfree(cpu_buffer); 2225 } 2226 2227 static struct trace_buffer *alloc_buffer(unsigned long size, unsigned flags, 2228 int order, unsigned long start, 2229 unsigned long end, 2230 struct lock_class_key *key) 2231 { 2232 struct trace_buffer *buffer; 2233 long nr_pages; 2234 int subbuf_size; 2235 int bsize; 2236 int cpu; 2237 int ret; 2238 2239 /* keep it in its own cache line */ 2240 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 2241 GFP_KERNEL); 2242 if (!buffer) 2243 return NULL; 2244 2245 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 2246 goto fail_free_buffer; 2247 2248 buffer->subbuf_order = order; 2249 subbuf_size = (PAGE_SIZE << order); 2250 buffer->subbuf_size = subbuf_size - BUF_PAGE_HDR_SIZE; 2251 2252 /* Max payload is buffer page size - header (8bytes) */ 2253 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 2254 2255 buffer->flags = flags; 2256 buffer->clock = trace_clock_local; 2257 buffer->reader_lock_key = key; 2258 2259 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 2260 init_waitqueue_head(&buffer->irq_work.waiters); 2261 2262 buffer->cpus = nr_cpu_ids; 2263 2264 bsize = sizeof(void *) * nr_cpu_ids; 2265 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 2266 GFP_KERNEL); 2267 if (!buffer->buffers) 2268 goto fail_free_cpumask; 2269 2270 /* If start/end are specified, then that overrides size */ 2271 if (start && end) { 2272 unsigned long ptr; 2273 int n; 2274 2275 size = end - start; 2276 size = size / nr_cpu_ids; 2277 2278 /* 2279 * The number of sub-buffers (nr_pages) is determined by the 2280 * total size allocated minus the meta data size. 2281 * Then that is divided by the number of per CPU buffers 2282 * needed, plus account for the integer array index that 2283 * will be appended to the meta data. 2284 */ 2285 nr_pages = (size - sizeof(struct ring_buffer_meta)) / 2286 (subbuf_size + sizeof(int)); 2287 /* Need at least two pages plus the reader page */ 2288 if (nr_pages < 3) 2289 goto fail_free_buffers; 2290 2291 again: 2292 /* Make sure that the size fits aligned */ 2293 for (n = 0, ptr = start; n < nr_cpu_ids; n++) { 2294 ptr += sizeof(struct ring_buffer_meta) + 2295 sizeof(int) * nr_pages; 2296 ptr = ALIGN(ptr, subbuf_size); 2297 ptr += subbuf_size * nr_pages; 2298 } 2299 if (ptr > end) { 2300 if (nr_pages <= 3) 2301 goto fail_free_buffers; 2302 nr_pages--; 2303 goto again; 2304 } 2305 2306 /* nr_pages should not count the reader page */ 2307 nr_pages--; 2308 buffer->range_addr_start = start; 2309 buffer->range_addr_end = end; 2310 2311 rb_range_meta_init(buffer, nr_pages); 2312 } else { 2313 2314 /* need at least two pages */ 2315 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2316 if (nr_pages < 2) 2317 nr_pages = 2; 2318 } 2319 2320 cpu = raw_smp_processor_id(); 2321 cpumask_set_cpu(cpu, buffer->cpumask); 2322 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 2323 if (!buffer->buffers[cpu]) 2324 goto fail_free_buffers; 2325 2326 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2327 if (ret < 0) 2328 goto fail_free_buffers; 2329 2330 mutex_init(&buffer->mutex); 2331 2332 return buffer; 2333 2334 fail_free_buffers: 2335 for_each_buffer_cpu(buffer, cpu) { 2336 if (buffer->buffers[cpu]) 2337 rb_free_cpu_buffer(buffer->buffers[cpu]); 2338 } 2339 kfree(buffer->buffers); 2340 2341 fail_free_cpumask: 2342 free_cpumask_var(buffer->cpumask); 2343 2344 fail_free_buffer: 2345 kfree(buffer); 2346 return NULL; 2347 } 2348 2349 /** 2350 * __ring_buffer_alloc - allocate a new ring_buffer 2351 * @size: the size in bytes per cpu that is needed. 2352 * @flags: attributes to set for the ring buffer. 2353 * @key: ring buffer reader_lock_key. 2354 * 2355 * Currently the only flag that is available is the RB_FL_OVERWRITE 2356 * flag. This flag means that the buffer will overwrite old data 2357 * when the buffer wraps. If this flag is not set, the buffer will 2358 * drop data when the tail hits the head. 2359 */ 2360 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 2361 struct lock_class_key *key) 2362 { 2363 /* Default buffer page size - one system page */ 2364 return alloc_buffer(size, flags, 0, 0, 0,key); 2365 2366 } 2367 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 2368 2369 /** 2370 * __ring_buffer_alloc_range - allocate a new ring_buffer from existing memory 2371 * @size: the size in bytes per cpu that is needed. 2372 * @flags: attributes to set for the ring buffer. 2373 * @start: start of allocated range 2374 * @range_size: size of allocated range 2375 * @order: sub-buffer order 2376 * @key: ring buffer reader_lock_key. 2377 * 2378 * Currently the only flag that is available is the RB_FL_OVERWRITE 2379 * flag. This flag means that the buffer will overwrite old data 2380 * when the buffer wraps. If this flag is not set, the buffer will 2381 * drop data when the tail hits the head. 2382 */ 2383 struct trace_buffer *__ring_buffer_alloc_range(unsigned long size, unsigned flags, 2384 int order, unsigned long start, 2385 unsigned long range_size, 2386 struct lock_class_key *key) 2387 { 2388 return alloc_buffer(size, flags, order, start, start + range_size, key); 2389 } 2390 2391 /** 2392 * ring_buffer_last_boot_delta - return the delta offset from last boot 2393 * @buffer: The buffer to return the delta from 2394 * @text: Return text delta 2395 * @data: Return data delta 2396 * 2397 * Returns: The true if the delta is non zero 2398 */ 2399 bool ring_buffer_last_boot_delta(struct trace_buffer *buffer, long *text, 2400 long *data) 2401 { 2402 if (!buffer) 2403 return false; 2404 2405 if (!buffer->last_text_delta) 2406 return false; 2407 2408 *text = buffer->last_text_delta; 2409 *data = buffer->last_data_delta; 2410 2411 return true; 2412 } 2413 2414 /** 2415 * ring_buffer_free - free a ring buffer. 2416 * @buffer: the buffer to free. 2417 */ 2418 void 2419 ring_buffer_free(struct trace_buffer *buffer) 2420 { 2421 int cpu; 2422 2423 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2424 2425 irq_work_sync(&buffer->irq_work.work); 2426 2427 for_each_buffer_cpu(buffer, cpu) 2428 rb_free_cpu_buffer(buffer->buffers[cpu]); 2429 2430 kfree(buffer->buffers); 2431 free_cpumask_var(buffer->cpumask); 2432 2433 kfree(buffer); 2434 } 2435 EXPORT_SYMBOL_GPL(ring_buffer_free); 2436 2437 void ring_buffer_set_clock(struct trace_buffer *buffer, 2438 u64 (*clock)(void)) 2439 { 2440 buffer->clock = clock; 2441 } 2442 2443 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 2444 { 2445 buffer->time_stamp_abs = abs; 2446 } 2447 2448 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 2449 { 2450 return buffer->time_stamp_abs; 2451 } 2452 2453 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 2454 { 2455 return local_read(&bpage->entries) & RB_WRITE_MASK; 2456 } 2457 2458 static inline unsigned long rb_page_write(struct buffer_page *bpage) 2459 { 2460 return local_read(&bpage->write) & RB_WRITE_MASK; 2461 } 2462 2463 static bool 2464 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 2465 { 2466 struct list_head *tail_page, *to_remove, *next_page; 2467 struct buffer_page *to_remove_page, *tmp_iter_page; 2468 struct buffer_page *last_page, *first_page; 2469 unsigned long nr_removed; 2470 unsigned long head_bit; 2471 int page_entries; 2472 2473 head_bit = 0; 2474 2475 raw_spin_lock_irq(&cpu_buffer->reader_lock); 2476 atomic_inc(&cpu_buffer->record_disabled); 2477 /* 2478 * We don't race with the readers since we have acquired the reader 2479 * lock. We also don't race with writers after disabling recording. 2480 * This makes it easy to figure out the first and the last page to be 2481 * removed from the list. We unlink all the pages in between including 2482 * the first and last pages. This is done in a busy loop so that we 2483 * lose the least number of traces. 2484 * The pages are freed after we restart recording and unlock readers. 2485 */ 2486 tail_page = &cpu_buffer->tail_page->list; 2487 2488 /* 2489 * tail page might be on reader page, we remove the next page 2490 * from the ring buffer 2491 */ 2492 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 2493 tail_page = rb_list_head(tail_page->next); 2494 to_remove = tail_page; 2495 2496 /* start of pages to remove */ 2497 first_page = list_entry(rb_list_head(to_remove->next), 2498 struct buffer_page, list); 2499 2500 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 2501 to_remove = rb_list_head(to_remove)->next; 2502 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 2503 } 2504 /* Read iterators need to reset themselves when some pages removed */ 2505 cpu_buffer->pages_removed += nr_removed; 2506 2507 next_page = rb_list_head(to_remove)->next; 2508 2509 /* 2510 * Now we remove all pages between tail_page and next_page. 2511 * Make sure that we have head_bit value preserved for the 2512 * next page 2513 */ 2514 tail_page->next = (struct list_head *)((unsigned long)next_page | 2515 head_bit); 2516 next_page = rb_list_head(next_page); 2517 next_page->prev = tail_page; 2518 2519 /* make sure pages points to a valid page in the ring buffer */ 2520 cpu_buffer->pages = next_page; 2521 2522 /* update head page */ 2523 if (head_bit) 2524 cpu_buffer->head_page = list_entry(next_page, 2525 struct buffer_page, list); 2526 2527 /* pages are removed, resume tracing and then free the pages */ 2528 atomic_dec(&cpu_buffer->record_disabled); 2529 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 2530 2531 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 2532 2533 /* last buffer page to remove */ 2534 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 2535 list); 2536 tmp_iter_page = first_page; 2537 2538 do { 2539 cond_resched(); 2540 2541 to_remove_page = tmp_iter_page; 2542 rb_inc_page(&tmp_iter_page); 2543 2544 /* update the counters */ 2545 page_entries = rb_page_entries(to_remove_page); 2546 if (page_entries) { 2547 /* 2548 * If something was added to this page, it was full 2549 * since it is not the tail page. So we deduct the 2550 * bytes consumed in ring buffer from here. 2551 * Increment overrun to account for the lost events. 2552 */ 2553 local_add(page_entries, &cpu_buffer->overrun); 2554 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 2555 local_inc(&cpu_buffer->pages_lost); 2556 } 2557 2558 /* 2559 * We have already removed references to this list item, just 2560 * free up the buffer_page and its page 2561 */ 2562 free_buffer_page(to_remove_page); 2563 nr_removed--; 2564 2565 } while (to_remove_page != last_page); 2566 2567 RB_WARN_ON(cpu_buffer, nr_removed); 2568 2569 return nr_removed == 0; 2570 } 2571 2572 static bool 2573 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2574 { 2575 struct list_head *pages = &cpu_buffer->new_pages; 2576 unsigned long flags; 2577 bool success; 2578 int retries; 2579 2580 /* Can be called at early boot up, where interrupts must not been enabled */ 2581 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2582 /* 2583 * We are holding the reader lock, so the reader page won't be swapped 2584 * in the ring buffer. Now we are racing with the writer trying to 2585 * move head page and the tail page. 2586 * We are going to adapt the reader page update process where: 2587 * 1. We first splice the start and end of list of new pages between 2588 * the head page and its previous page. 2589 * 2. We cmpxchg the prev_page->next to point from head page to the 2590 * start of new pages list. 2591 * 3. Finally, we update the head->prev to the end of new list. 2592 * 2593 * We will try this process 10 times, to make sure that we don't keep 2594 * spinning. 2595 */ 2596 retries = 10; 2597 success = false; 2598 while (retries--) { 2599 struct list_head *head_page, *prev_page; 2600 struct list_head *last_page, *first_page; 2601 struct list_head *head_page_with_bit; 2602 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2603 2604 if (!hpage) 2605 break; 2606 head_page = &hpage->list; 2607 prev_page = head_page->prev; 2608 2609 first_page = pages->next; 2610 last_page = pages->prev; 2611 2612 head_page_with_bit = (struct list_head *) 2613 ((unsigned long)head_page | RB_PAGE_HEAD); 2614 2615 last_page->next = head_page_with_bit; 2616 first_page->prev = prev_page; 2617 2618 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 2619 if (try_cmpxchg(&prev_page->next, 2620 &head_page_with_bit, first_page)) { 2621 /* 2622 * yay, we replaced the page pointer to our new list, 2623 * now, we just have to update to head page's prev 2624 * pointer to point to end of list 2625 */ 2626 head_page->prev = last_page; 2627 success = true; 2628 break; 2629 } 2630 } 2631 2632 if (success) 2633 INIT_LIST_HEAD(pages); 2634 /* 2635 * If we weren't successful in adding in new pages, warn and stop 2636 * tracing 2637 */ 2638 RB_WARN_ON(cpu_buffer, !success); 2639 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2640 2641 /* free pages if they weren't inserted */ 2642 if (!success) { 2643 struct buffer_page *bpage, *tmp; 2644 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2645 list) { 2646 list_del_init(&bpage->list); 2647 free_buffer_page(bpage); 2648 } 2649 } 2650 return success; 2651 } 2652 2653 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2654 { 2655 bool success; 2656 2657 if (cpu_buffer->nr_pages_to_update > 0) 2658 success = rb_insert_pages(cpu_buffer); 2659 else 2660 success = rb_remove_pages(cpu_buffer, 2661 -cpu_buffer->nr_pages_to_update); 2662 2663 if (success) 2664 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2665 } 2666 2667 static void update_pages_handler(struct work_struct *work) 2668 { 2669 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2670 struct ring_buffer_per_cpu, update_pages_work); 2671 rb_update_pages(cpu_buffer); 2672 complete(&cpu_buffer->update_done); 2673 } 2674 2675 /** 2676 * ring_buffer_resize - resize the ring buffer 2677 * @buffer: the buffer to resize. 2678 * @size: the new size. 2679 * @cpu_id: the cpu buffer to resize 2680 * 2681 * Minimum size is 2 * buffer->subbuf_size. 2682 * 2683 * Returns 0 on success and < 0 on failure. 2684 */ 2685 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2686 int cpu_id) 2687 { 2688 struct ring_buffer_per_cpu *cpu_buffer; 2689 unsigned long nr_pages; 2690 int cpu, err; 2691 2692 /* 2693 * Always succeed at resizing a non-existent buffer: 2694 */ 2695 if (!buffer) 2696 return 0; 2697 2698 /* Make sure the requested buffer exists */ 2699 if (cpu_id != RING_BUFFER_ALL_CPUS && 2700 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2701 return 0; 2702 2703 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2704 2705 /* we need a minimum of two pages */ 2706 if (nr_pages < 2) 2707 nr_pages = 2; 2708 2709 /* prevent another thread from changing buffer sizes */ 2710 mutex_lock(&buffer->mutex); 2711 atomic_inc(&buffer->resizing); 2712 2713 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2714 /* 2715 * Don't succeed if resizing is disabled, as a reader might be 2716 * manipulating the ring buffer and is expecting a sane state while 2717 * this is true. 2718 */ 2719 for_each_buffer_cpu(buffer, cpu) { 2720 cpu_buffer = buffer->buffers[cpu]; 2721 if (atomic_read(&cpu_buffer->resize_disabled)) { 2722 err = -EBUSY; 2723 goto out_err_unlock; 2724 } 2725 } 2726 2727 /* calculate the pages to update */ 2728 for_each_buffer_cpu(buffer, cpu) { 2729 cpu_buffer = buffer->buffers[cpu]; 2730 2731 cpu_buffer->nr_pages_to_update = nr_pages - 2732 cpu_buffer->nr_pages; 2733 /* 2734 * nothing more to do for removing pages or no update 2735 */ 2736 if (cpu_buffer->nr_pages_to_update <= 0) 2737 continue; 2738 /* 2739 * to add pages, make sure all new pages can be 2740 * allocated without receiving ENOMEM 2741 */ 2742 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2743 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2744 &cpu_buffer->new_pages)) { 2745 /* not enough memory for new pages */ 2746 err = -ENOMEM; 2747 goto out_err; 2748 } 2749 2750 cond_resched(); 2751 } 2752 2753 cpus_read_lock(); 2754 /* 2755 * Fire off all the required work handlers 2756 * We can't schedule on offline CPUs, but it's not necessary 2757 * since we can change their buffer sizes without any race. 2758 */ 2759 for_each_buffer_cpu(buffer, cpu) { 2760 cpu_buffer = buffer->buffers[cpu]; 2761 if (!cpu_buffer->nr_pages_to_update) 2762 continue; 2763 2764 /* Can't run something on an offline CPU. */ 2765 if (!cpu_online(cpu)) { 2766 rb_update_pages(cpu_buffer); 2767 cpu_buffer->nr_pages_to_update = 0; 2768 } else { 2769 /* Run directly if possible. */ 2770 migrate_disable(); 2771 if (cpu != smp_processor_id()) { 2772 migrate_enable(); 2773 schedule_work_on(cpu, 2774 &cpu_buffer->update_pages_work); 2775 } else { 2776 update_pages_handler(&cpu_buffer->update_pages_work); 2777 migrate_enable(); 2778 } 2779 } 2780 } 2781 2782 /* wait for all the updates to complete */ 2783 for_each_buffer_cpu(buffer, cpu) { 2784 cpu_buffer = buffer->buffers[cpu]; 2785 if (!cpu_buffer->nr_pages_to_update) 2786 continue; 2787 2788 if (cpu_online(cpu)) 2789 wait_for_completion(&cpu_buffer->update_done); 2790 cpu_buffer->nr_pages_to_update = 0; 2791 } 2792 2793 cpus_read_unlock(); 2794 } else { 2795 cpu_buffer = buffer->buffers[cpu_id]; 2796 2797 if (nr_pages == cpu_buffer->nr_pages) 2798 goto out; 2799 2800 /* 2801 * Don't succeed if resizing is disabled, as a reader might be 2802 * manipulating the ring buffer and is expecting a sane state while 2803 * this is true. 2804 */ 2805 if (atomic_read(&cpu_buffer->resize_disabled)) { 2806 err = -EBUSY; 2807 goto out_err_unlock; 2808 } 2809 2810 cpu_buffer->nr_pages_to_update = nr_pages - 2811 cpu_buffer->nr_pages; 2812 2813 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2814 if (cpu_buffer->nr_pages_to_update > 0 && 2815 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2816 &cpu_buffer->new_pages)) { 2817 err = -ENOMEM; 2818 goto out_err; 2819 } 2820 2821 cpus_read_lock(); 2822 2823 /* Can't run something on an offline CPU. */ 2824 if (!cpu_online(cpu_id)) 2825 rb_update_pages(cpu_buffer); 2826 else { 2827 /* Run directly if possible. */ 2828 migrate_disable(); 2829 if (cpu_id == smp_processor_id()) { 2830 rb_update_pages(cpu_buffer); 2831 migrate_enable(); 2832 } else { 2833 migrate_enable(); 2834 schedule_work_on(cpu_id, 2835 &cpu_buffer->update_pages_work); 2836 wait_for_completion(&cpu_buffer->update_done); 2837 } 2838 } 2839 2840 cpu_buffer->nr_pages_to_update = 0; 2841 cpus_read_unlock(); 2842 } 2843 2844 out: 2845 /* 2846 * The ring buffer resize can happen with the ring buffer 2847 * enabled, so that the update disturbs the tracing as little 2848 * as possible. But if the buffer is disabled, we do not need 2849 * to worry about that, and we can take the time to verify 2850 * that the buffer is not corrupt. 2851 */ 2852 if (atomic_read(&buffer->record_disabled)) { 2853 atomic_inc(&buffer->record_disabled); 2854 /* 2855 * Even though the buffer was disabled, we must make sure 2856 * that it is truly disabled before calling rb_check_pages. 2857 * There could have been a race between checking 2858 * record_disable and incrementing it. 2859 */ 2860 synchronize_rcu(); 2861 for_each_buffer_cpu(buffer, cpu) { 2862 unsigned long flags; 2863 2864 cpu_buffer = buffer->buffers[cpu]; 2865 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2866 rb_check_pages(cpu_buffer); 2867 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2868 } 2869 atomic_dec(&buffer->record_disabled); 2870 } 2871 2872 atomic_dec(&buffer->resizing); 2873 mutex_unlock(&buffer->mutex); 2874 return 0; 2875 2876 out_err: 2877 for_each_buffer_cpu(buffer, cpu) { 2878 struct buffer_page *bpage, *tmp; 2879 2880 cpu_buffer = buffer->buffers[cpu]; 2881 cpu_buffer->nr_pages_to_update = 0; 2882 2883 if (list_empty(&cpu_buffer->new_pages)) 2884 continue; 2885 2886 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2887 list) { 2888 list_del_init(&bpage->list); 2889 free_buffer_page(bpage); 2890 } 2891 } 2892 out_err_unlock: 2893 atomic_dec(&buffer->resizing); 2894 mutex_unlock(&buffer->mutex); 2895 return err; 2896 } 2897 EXPORT_SYMBOL_GPL(ring_buffer_resize); 2898 2899 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 2900 { 2901 mutex_lock(&buffer->mutex); 2902 if (val) 2903 buffer->flags |= RB_FL_OVERWRITE; 2904 else 2905 buffer->flags &= ~RB_FL_OVERWRITE; 2906 mutex_unlock(&buffer->mutex); 2907 } 2908 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 2909 2910 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 2911 { 2912 return bpage->page->data + index; 2913 } 2914 2915 static __always_inline struct ring_buffer_event * 2916 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 2917 { 2918 return __rb_page_index(cpu_buffer->reader_page, 2919 cpu_buffer->reader_page->read); 2920 } 2921 2922 static struct ring_buffer_event * 2923 rb_iter_head_event(struct ring_buffer_iter *iter) 2924 { 2925 struct ring_buffer_event *event; 2926 struct buffer_page *iter_head_page = iter->head_page; 2927 unsigned long commit; 2928 unsigned length; 2929 2930 if (iter->head != iter->next_event) 2931 return iter->event; 2932 2933 /* 2934 * When the writer goes across pages, it issues a cmpxchg which 2935 * is a mb(), which will synchronize with the rmb here. 2936 * (see rb_tail_page_update() and __rb_reserve_next()) 2937 */ 2938 commit = rb_page_commit(iter_head_page); 2939 smp_rmb(); 2940 2941 /* An event needs to be at least 8 bytes in size */ 2942 if (iter->head > commit - 8) 2943 goto reset; 2944 2945 event = __rb_page_index(iter_head_page, iter->head); 2946 length = rb_event_length(event); 2947 2948 /* 2949 * READ_ONCE() doesn't work on functions and we don't want the 2950 * compiler doing any crazy optimizations with length. 2951 */ 2952 barrier(); 2953 2954 if ((iter->head + length) > commit || length > iter->event_size) 2955 /* Writer corrupted the read? */ 2956 goto reset; 2957 2958 memcpy(iter->event, event, length); 2959 /* 2960 * If the page stamp is still the same after this rmb() then the 2961 * event was safely copied without the writer entering the page. 2962 */ 2963 smp_rmb(); 2964 2965 /* Make sure the page didn't change since we read this */ 2966 if (iter->page_stamp != iter_head_page->page->time_stamp || 2967 commit > rb_page_commit(iter_head_page)) 2968 goto reset; 2969 2970 iter->next_event = iter->head + length; 2971 return iter->event; 2972 reset: 2973 /* Reset to the beginning */ 2974 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2975 iter->head = 0; 2976 iter->next_event = 0; 2977 iter->missed_events = 1; 2978 return NULL; 2979 } 2980 2981 /* Size is determined by what has been committed */ 2982 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 2983 { 2984 return rb_page_commit(bpage) & ~RB_MISSED_MASK; 2985 } 2986 2987 static __always_inline unsigned 2988 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 2989 { 2990 return rb_page_commit(cpu_buffer->commit_page); 2991 } 2992 2993 static __always_inline unsigned 2994 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 2995 { 2996 unsigned long addr = (unsigned long)event; 2997 2998 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 2999 3000 return addr - BUF_PAGE_HDR_SIZE; 3001 } 3002 3003 static void rb_inc_iter(struct ring_buffer_iter *iter) 3004 { 3005 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3006 3007 /* 3008 * The iterator could be on the reader page (it starts there). 3009 * But the head could have moved, since the reader was 3010 * found. Check for this case and assign the iterator 3011 * to the head page instead of next. 3012 */ 3013 if (iter->head_page == cpu_buffer->reader_page) 3014 iter->head_page = rb_set_head_page(cpu_buffer); 3015 else 3016 rb_inc_page(&iter->head_page); 3017 3018 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3019 iter->head = 0; 3020 iter->next_event = 0; 3021 } 3022 3023 /* Return the index into the sub-buffers for a given sub-buffer */ 3024 static int rb_meta_subbuf_idx(struct ring_buffer_meta *meta, void *subbuf) 3025 { 3026 void *subbuf_array; 3027 3028 subbuf_array = (void *)meta + sizeof(int) * meta->nr_subbufs; 3029 subbuf_array = (void *)ALIGN((unsigned long)subbuf_array, meta->subbuf_size); 3030 return (subbuf - subbuf_array) / meta->subbuf_size; 3031 } 3032 3033 static void rb_update_meta_head(struct ring_buffer_per_cpu *cpu_buffer, 3034 struct buffer_page *next_page) 3035 { 3036 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 3037 unsigned long old_head = (unsigned long)next_page->page; 3038 unsigned long new_head; 3039 3040 rb_inc_page(&next_page); 3041 new_head = (unsigned long)next_page->page; 3042 3043 /* 3044 * Only move it forward once, if something else came in and 3045 * moved it forward, then we don't want to touch it. 3046 */ 3047 (void)cmpxchg(&meta->head_buffer, old_head, new_head); 3048 } 3049 3050 static void rb_update_meta_reader(struct ring_buffer_per_cpu *cpu_buffer, 3051 struct buffer_page *reader) 3052 { 3053 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 3054 void *old_reader = cpu_buffer->reader_page->page; 3055 void *new_reader = reader->page; 3056 int id; 3057 3058 id = reader->id; 3059 cpu_buffer->reader_page->id = id; 3060 reader->id = 0; 3061 3062 meta->buffers[0] = rb_meta_subbuf_idx(meta, new_reader); 3063 meta->buffers[id] = rb_meta_subbuf_idx(meta, old_reader); 3064 3065 /* The head pointer is the one after the reader */ 3066 rb_update_meta_head(cpu_buffer, reader); 3067 } 3068 3069 /* 3070 * rb_handle_head_page - writer hit the head page 3071 * 3072 * Returns: +1 to retry page 3073 * 0 to continue 3074 * -1 on error 3075 */ 3076 static int 3077 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 3078 struct buffer_page *tail_page, 3079 struct buffer_page *next_page) 3080 { 3081 struct buffer_page *new_head; 3082 int entries; 3083 int type; 3084 int ret; 3085 3086 entries = rb_page_entries(next_page); 3087 3088 /* 3089 * The hard part is here. We need to move the head 3090 * forward, and protect against both readers on 3091 * other CPUs and writers coming in via interrupts. 3092 */ 3093 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 3094 RB_PAGE_HEAD); 3095 3096 /* 3097 * type can be one of four: 3098 * NORMAL - an interrupt already moved it for us 3099 * HEAD - we are the first to get here. 3100 * UPDATE - we are the interrupt interrupting 3101 * a current move. 3102 * MOVED - a reader on another CPU moved the next 3103 * pointer to its reader page. Give up 3104 * and try again. 3105 */ 3106 3107 switch (type) { 3108 case RB_PAGE_HEAD: 3109 /* 3110 * We changed the head to UPDATE, thus 3111 * it is our responsibility to update 3112 * the counters. 3113 */ 3114 local_add(entries, &cpu_buffer->overrun); 3115 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 3116 local_inc(&cpu_buffer->pages_lost); 3117 3118 if (cpu_buffer->ring_meta) 3119 rb_update_meta_head(cpu_buffer, next_page); 3120 /* 3121 * The entries will be zeroed out when we move the 3122 * tail page. 3123 */ 3124 3125 /* still more to do */ 3126 break; 3127 3128 case RB_PAGE_UPDATE: 3129 /* 3130 * This is an interrupt that interrupt the 3131 * previous update. Still more to do. 3132 */ 3133 break; 3134 case RB_PAGE_NORMAL: 3135 /* 3136 * An interrupt came in before the update 3137 * and processed this for us. 3138 * Nothing left to do. 3139 */ 3140 return 1; 3141 case RB_PAGE_MOVED: 3142 /* 3143 * The reader is on another CPU and just did 3144 * a swap with our next_page. 3145 * Try again. 3146 */ 3147 return 1; 3148 default: 3149 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 3150 return -1; 3151 } 3152 3153 /* 3154 * Now that we are here, the old head pointer is 3155 * set to UPDATE. This will keep the reader from 3156 * swapping the head page with the reader page. 3157 * The reader (on another CPU) will spin till 3158 * we are finished. 3159 * 3160 * We just need to protect against interrupts 3161 * doing the job. We will set the next pointer 3162 * to HEAD. After that, we set the old pointer 3163 * to NORMAL, but only if it was HEAD before. 3164 * otherwise we are an interrupt, and only 3165 * want the outer most commit to reset it. 3166 */ 3167 new_head = next_page; 3168 rb_inc_page(&new_head); 3169 3170 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 3171 RB_PAGE_NORMAL); 3172 3173 /* 3174 * Valid returns are: 3175 * HEAD - an interrupt came in and already set it. 3176 * NORMAL - One of two things: 3177 * 1) We really set it. 3178 * 2) A bunch of interrupts came in and moved 3179 * the page forward again. 3180 */ 3181 switch (ret) { 3182 case RB_PAGE_HEAD: 3183 case RB_PAGE_NORMAL: 3184 /* OK */ 3185 break; 3186 default: 3187 RB_WARN_ON(cpu_buffer, 1); 3188 return -1; 3189 } 3190 3191 /* 3192 * It is possible that an interrupt came in, 3193 * set the head up, then more interrupts came in 3194 * and moved it again. When we get back here, 3195 * the page would have been set to NORMAL but we 3196 * just set it back to HEAD. 3197 * 3198 * How do you detect this? Well, if that happened 3199 * the tail page would have moved. 3200 */ 3201 if (ret == RB_PAGE_NORMAL) { 3202 struct buffer_page *buffer_tail_page; 3203 3204 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 3205 /* 3206 * If the tail had moved passed next, then we need 3207 * to reset the pointer. 3208 */ 3209 if (buffer_tail_page != tail_page && 3210 buffer_tail_page != next_page) 3211 rb_head_page_set_normal(cpu_buffer, new_head, 3212 next_page, 3213 RB_PAGE_HEAD); 3214 } 3215 3216 /* 3217 * If this was the outer most commit (the one that 3218 * changed the original pointer from HEAD to UPDATE), 3219 * then it is up to us to reset it to NORMAL. 3220 */ 3221 if (type == RB_PAGE_HEAD) { 3222 ret = rb_head_page_set_normal(cpu_buffer, next_page, 3223 tail_page, 3224 RB_PAGE_UPDATE); 3225 if (RB_WARN_ON(cpu_buffer, 3226 ret != RB_PAGE_UPDATE)) 3227 return -1; 3228 } 3229 3230 return 0; 3231 } 3232 3233 static inline void 3234 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 3235 unsigned long tail, struct rb_event_info *info) 3236 { 3237 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 3238 struct buffer_page *tail_page = info->tail_page; 3239 struct ring_buffer_event *event; 3240 unsigned long length = info->length; 3241 3242 /* 3243 * Only the event that crossed the page boundary 3244 * must fill the old tail_page with padding. 3245 */ 3246 if (tail >= bsize) { 3247 /* 3248 * If the page was filled, then we still need 3249 * to update the real_end. Reset it to zero 3250 * and the reader will ignore it. 3251 */ 3252 if (tail == bsize) 3253 tail_page->real_end = 0; 3254 3255 local_sub(length, &tail_page->write); 3256 return; 3257 } 3258 3259 event = __rb_page_index(tail_page, tail); 3260 3261 /* 3262 * Save the original length to the meta data. 3263 * This will be used by the reader to add lost event 3264 * counter. 3265 */ 3266 tail_page->real_end = tail; 3267 3268 /* 3269 * If this event is bigger than the minimum size, then 3270 * we need to be careful that we don't subtract the 3271 * write counter enough to allow another writer to slip 3272 * in on this page. 3273 * We put in a discarded commit instead, to make sure 3274 * that this space is not used again, and this space will 3275 * not be accounted into 'entries_bytes'. 3276 * 3277 * If we are less than the minimum size, we don't need to 3278 * worry about it. 3279 */ 3280 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 3281 /* No room for any events */ 3282 3283 /* Mark the rest of the page with padding */ 3284 rb_event_set_padding(event); 3285 3286 /* Make sure the padding is visible before the write update */ 3287 smp_wmb(); 3288 3289 /* Set the write back to the previous setting */ 3290 local_sub(length, &tail_page->write); 3291 return; 3292 } 3293 3294 /* Put in a discarded event */ 3295 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 3296 event->type_len = RINGBUF_TYPE_PADDING; 3297 /* time delta must be non zero */ 3298 event->time_delta = 1; 3299 3300 /* account for padding bytes */ 3301 local_add(bsize - tail, &cpu_buffer->entries_bytes); 3302 3303 /* Make sure the padding is visible before the tail_page->write update */ 3304 smp_wmb(); 3305 3306 /* Set write to end of buffer */ 3307 length = (tail + length) - bsize; 3308 local_sub(length, &tail_page->write); 3309 } 3310 3311 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 3312 3313 /* 3314 * This is the slow path, force gcc not to inline it. 3315 */ 3316 static noinline struct ring_buffer_event * 3317 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 3318 unsigned long tail, struct rb_event_info *info) 3319 { 3320 struct buffer_page *tail_page = info->tail_page; 3321 struct buffer_page *commit_page = cpu_buffer->commit_page; 3322 struct trace_buffer *buffer = cpu_buffer->buffer; 3323 struct buffer_page *next_page; 3324 int ret; 3325 3326 next_page = tail_page; 3327 3328 rb_inc_page(&next_page); 3329 3330 /* 3331 * If for some reason, we had an interrupt storm that made 3332 * it all the way around the buffer, bail, and warn 3333 * about it. 3334 */ 3335 if (unlikely(next_page == commit_page)) { 3336 local_inc(&cpu_buffer->commit_overrun); 3337 goto out_reset; 3338 } 3339 3340 /* 3341 * This is where the fun begins! 3342 * 3343 * We are fighting against races between a reader that 3344 * could be on another CPU trying to swap its reader 3345 * page with the buffer head. 3346 * 3347 * We are also fighting against interrupts coming in and 3348 * moving the head or tail on us as well. 3349 * 3350 * If the next page is the head page then we have filled 3351 * the buffer, unless the commit page is still on the 3352 * reader page. 3353 */ 3354 if (rb_is_head_page(next_page, &tail_page->list)) { 3355 3356 /* 3357 * If the commit is not on the reader page, then 3358 * move the header page. 3359 */ 3360 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 3361 /* 3362 * If we are not in overwrite mode, 3363 * this is easy, just stop here. 3364 */ 3365 if (!(buffer->flags & RB_FL_OVERWRITE)) { 3366 local_inc(&cpu_buffer->dropped_events); 3367 goto out_reset; 3368 } 3369 3370 ret = rb_handle_head_page(cpu_buffer, 3371 tail_page, 3372 next_page); 3373 if (ret < 0) 3374 goto out_reset; 3375 if (ret) 3376 goto out_again; 3377 } else { 3378 /* 3379 * We need to be careful here too. The 3380 * commit page could still be on the reader 3381 * page. We could have a small buffer, and 3382 * have filled up the buffer with events 3383 * from interrupts and such, and wrapped. 3384 * 3385 * Note, if the tail page is also on the 3386 * reader_page, we let it move out. 3387 */ 3388 if (unlikely((cpu_buffer->commit_page != 3389 cpu_buffer->tail_page) && 3390 (cpu_buffer->commit_page == 3391 cpu_buffer->reader_page))) { 3392 local_inc(&cpu_buffer->commit_overrun); 3393 goto out_reset; 3394 } 3395 } 3396 } 3397 3398 rb_tail_page_update(cpu_buffer, tail_page, next_page); 3399 3400 out_again: 3401 3402 rb_reset_tail(cpu_buffer, tail, info); 3403 3404 /* Commit what we have for now. */ 3405 rb_end_commit(cpu_buffer); 3406 /* rb_end_commit() decs committing */ 3407 local_inc(&cpu_buffer->committing); 3408 3409 /* fail and let the caller try again */ 3410 return ERR_PTR(-EAGAIN); 3411 3412 out_reset: 3413 /* reset write */ 3414 rb_reset_tail(cpu_buffer, tail, info); 3415 3416 return NULL; 3417 } 3418 3419 /* Slow path */ 3420 static struct ring_buffer_event * 3421 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3422 struct ring_buffer_event *event, u64 delta, bool abs) 3423 { 3424 if (abs) 3425 event->type_len = RINGBUF_TYPE_TIME_STAMP; 3426 else 3427 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 3428 3429 /* Not the first event on the page, or not delta? */ 3430 if (abs || rb_event_index(cpu_buffer, event)) { 3431 event->time_delta = delta & TS_MASK; 3432 event->array[0] = delta >> TS_SHIFT; 3433 } else { 3434 /* nope, just zero it */ 3435 event->time_delta = 0; 3436 event->array[0] = 0; 3437 } 3438 3439 return skip_time_extend(event); 3440 } 3441 3442 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 3443 static inline bool sched_clock_stable(void) 3444 { 3445 return true; 3446 } 3447 #endif 3448 3449 static void 3450 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3451 struct rb_event_info *info) 3452 { 3453 u64 write_stamp; 3454 3455 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 3456 (unsigned long long)info->delta, 3457 (unsigned long long)info->ts, 3458 (unsigned long long)info->before, 3459 (unsigned long long)info->after, 3460 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 3461 sched_clock_stable() ? "" : 3462 "If you just came from a suspend/resume,\n" 3463 "please switch to the trace global clock:\n" 3464 " echo global > /sys/kernel/tracing/trace_clock\n" 3465 "or add trace_clock=global to the kernel command line\n"); 3466 } 3467 3468 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3469 struct ring_buffer_event **event, 3470 struct rb_event_info *info, 3471 u64 *delta, 3472 unsigned int *length) 3473 { 3474 bool abs = info->add_timestamp & 3475 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 3476 3477 if (unlikely(info->delta > (1ULL << 59))) { 3478 /* 3479 * Some timers can use more than 59 bits, and when a timestamp 3480 * is added to the buffer, it will lose those bits. 3481 */ 3482 if (abs && (info->ts & TS_MSB)) { 3483 info->delta &= ABS_TS_MASK; 3484 3485 /* did the clock go backwards */ 3486 } else if (info->before == info->after && info->before > info->ts) { 3487 /* not interrupted */ 3488 static int once; 3489 3490 /* 3491 * This is possible with a recalibrating of the TSC. 3492 * Do not produce a call stack, but just report it. 3493 */ 3494 if (!once) { 3495 once++; 3496 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 3497 info->before, info->ts); 3498 } 3499 } else 3500 rb_check_timestamp(cpu_buffer, info); 3501 if (!abs) 3502 info->delta = 0; 3503 } 3504 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 3505 *length -= RB_LEN_TIME_EXTEND; 3506 *delta = 0; 3507 } 3508 3509 /** 3510 * rb_update_event - update event type and data 3511 * @cpu_buffer: The per cpu buffer of the @event 3512 * @event: the event to update 3513 * @info: The info to update the @event with (contains length and delta) 3514 * 3515 * Update the type and data fields of the @event. The length 3516 * is the actual size that is written to the ring buffer, 3517 * and with this, we can determine what to place into the 3518 * data field. 3519 */ 3520 static void 3521 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 3522 struct ring_buffer_event *event, 3523 struct rb_event_info *info) 3524 { 3525 unsigned length = info->length; 3526 u64 delta = info->delta; 3527 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 3528 3529 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 3530 cpu_buffer->event_stamp[nest] = info->ts; 3531 3532 /* 3533 * If we need to add a timestamp, then we 3534 * add it to the start of the reserved space. 3535 */ 3536 if (unlikely(info->add_timestamp)) 3537 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 3538 3539 event->time_delta = delta; 3540 length -= RB_EVNT_HDR_SIZE; 3541 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 3542 event->type_len = 0; 3543 event->array[0] = length; 3544 } else 3545 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 3546 } 3547 3548 static unsigned rb_calculate_event_length(unsigned length) 3549 { 3550 struct ring_buffer_event event; /* Used only for sizeof array */ 3551 3552 /* zero length can cause confusions */ 3553 if (!length) 3554 length++; 3555 3556 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 3557 length += sizeof(event.array[0]); 3558 3559 length += RB_EVNT_HDR_SIZE; 3560 length = ALIGN(length, RB_ARCH_ALIGNMENT); 3561 3562 /* 3563 * In case the time delta is larger than the 27 bits for it 3564 * in the header, we need to add a timestamp. If another 3565 * event comes in when trying to discard this one to increase 3566 * the length, then the timestamp will be added in the allocated 3567 * space of this event. If length is bigger than the size needed 3568 * for the TIME_EXTEND, then padding has to be used. The events 3569 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 3570 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 3571 * As length is a multiple of 4, we only need to worry if it 3572 * is 12 (RB_LEN_TIME_EXTEND + 4). 3573 */ 3574 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 3575 length += RB_ALIGNMENT; 3576 3577 return length; 3578 } 3579 3580 static inline bool 3581 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 3582 struct ring_buffer_event *event) 3583 { 3584 unsigned long new_index, old_index; 3585 struct buffer_page *bpage; 3586 unsigned long addr; 3587 3588 new_index = rb_event_index(cpu_buffer, event); 3589 old_index = new_index + rb_event_ts_length(event); 3590 addr = (unsigned long)event; 3591 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3592 3593 bpage = READ_ONCE(cpu_buffer->tail_page); 3594 3595 /* 3596 * Make sure the tail_page is still the same and 3597 * the next write location is the end of this event 3598 */ 3599 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3600 unsigned long write_mask = 3601 local_read(&bpage->write) & ~RB_WRITE_MASK; 3602 unsigned long event_length = rb_event_length(event); 3603 3604 /* 3605 * For the before_stamp to be different than the write_stamp 3606 * to make sure that the next event adds an absolute 3607 * value and does not rely on the saved write stamp, which 3608 * is now going to be bogus. 3609 * 3610 * By setting the before_stamp to zero, the next event 3611 * is not going to use the write_stamp and will instead 3612 * create an absolute timestamp. This means there's no 3613 * reason to update the wirte_stamp! 3614 */ 3615 rb_time_set(&cpu_buffer->before_stamp, 0); 3616 3617 /* 3618 * If an event were to come in now, it would see that the 3619 * write_stamp and the before_stamp are different, and assume 3620 * that this event just added itself before updating 3621 * the write stamp. The interrupting event will fix the 3622 * write stamp for us, and use an absolute timestamp. 3623 */ 3624 3625 /* 3626 * This is on the tail page. It is possible that 3627 * a write could come in and move the tail page 3628 * and write to the next page. That is fine 3629 * because we just shorten what is on this page. 3630 */ 3631 old_index += write_mask; 3632 new_index += write_mask; 3633 3634 /* caution: old_index gets updated on cmpxchg failure */ 3635 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 3636 /* update counters */ 3637 local_sub(event_length, &cpu_buffer->entries_bytes); 3638 return true; 3639 } 3640 } 3641 3642 /* could not discard */ 3643 return false; 3644 } 3645 3646 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3647 { 3648 local_inc(&cpu_buffer->committing); 3649 local_inc(&cpu_buffer->commits); 3650 } 3651 3652 static __always_inline void 3653 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3654 { 3655 unsigned long max_count; 3656 3657 /* 3658 * We only race with interrupts and NMIs on this CPU. 3659 * If we own the commit event, then we can commit 3660 * all others that interrupted us, since the interruptions 3661 * are in stack format (they finish before they come 3662 * back to us). This allows us to do a simple loop to 3663 * assign the commit to the tail. 3664 */ 3665 again: 3666 max_count = cpu_buffer->nr_pages * 100; 3667 3668 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3669 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3670 return; 3671 if (RB_WARN_ON(cpu_buffer, 3672 rb_is_reader_page(cpu_buffer->tail_page))) 3673 return; 3674 /* 3675 * No need for a memory barrier here, as the update 3676 * of the tail_page did it for this page. 3677 */ 3678 local_set(&cpu_buffer->commit_page->page->commit, 3679 rb_page_write(cpu_buffer->commit_page)); 3680 rb_inc_page(&cpu_buffer->commit_page); 3681 if (cpu_buffer->ring_meta) { 3682 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 3683 meta->commit_buffer = (unsigned long)cpu_buffer->commit_page->page; 3684 } 3685 /* add barrier to keep gcc from optimizing too much */ 3686 barrier(); 3687 } 3688 while (rb_commit_index(cpu_buffer) != 3689 rb_page_write(cpu_buffer->commit_page)) { 3690 3691 /* Make sure the readers see the content of what is committed. */ 3692 smp_wmb(); 3693 local_set(&cpu_buffer->commit_page->page->commit, 3694 rb_page_write(cpu_buffer->commit_page)); 3695 RB_WARN_ON(cpu_buffer, 3696 local_read(&cpu_buffer->commit_page->page->commit) & 3697 ~RB_WRITE_MASK); 3698 barrier(); 3699 } 3700 3701 /* again, keep gcc from optimizing */ 3702 barrier(); 3703 3704 /* 3705 * If an interrupt came in just after the first while loop 3706 * and pushed the tail page forward, we will be left with 3707 * a dangling commit that will never go forward. 3708 */ 3709 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3710 goto again; 3711 } 3712 3713 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3714 { 3715 unsigned long commits; 3716 3717 if (RB_WARN_ON(cpu_buffer, 3718 !local_read(&cpu_buffer->committing))) 3719 return; 3720 3721 again: 3722 commits = local_read(&cpu_buffer->commits); 3723 /* synchronize with interrupts */ 3724 barrier(); 3725 if (local_read(&cpu_buffer->committing) == 1) 3726 rb_set_commit_to_write(cpu_buffer); 3727 3728 local_dec(&cpu_buffer->committing); 3729 3730 /* synchronize with interrupts */ 3731 barrier(); 3732 3733 /* 3734 * Need to account for interrupts coming in between the 3735 * updating of the commit page and the clearing of the 3736 * committing counter. 3737 */ 3738 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3739 !local_read(&cpu_buffer->committing)) { 3740 local_inc(&cpu_buffer->committing); 3741 goto again; 3742 } 3743 } 3744 3745 static inline void rb_event_discard(struct ring_buffer_event *event) 3746 { 3747 if (extended_time(event)) 3748 event = skip_time_extend(event); 3749 3750 /* array[0] holds the actual length for the discarded event */ 3751 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 3752 event->type_len = RINGBUF_TYPE_PADDING; 3753 /* time delta must be non zero */ 3754 if (!event->time_delta) 3755 event->time_delta = 1; 3756 } 3757 3758 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 3759 { 3760 local_inc(&cpu_buffer->entries); 3761 rb_end_commit(cpu_buffer); 3762 } 3763 3764 static __always_inline void 3765 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 3766 { 3767 if (buffer->irq_work.waiters_pending) { 3768 buffer->irq_work.waiters_pending = false; 3769 /* irq_work_queue() supplies it's own memory barriers */ 3770 irq_work_queue(&buffer->irq_work.work); 3771 } 3772 3773 if (cpu_buffer->irq_work.waiters_pending) { 3774 cpu_buffer->irq_work.waiters_pending = false; 3775 /* irq_work_queue() supplies it's own memory barriers */ 3776 irq_work_queue(&cpu_buffer->irq_work.work); 3777 } 3778 3779 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 3780 return; 3781 3782 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 3783 return; 3784 3785 if (!cpu_buffer->irq_work.full_waiters_pending) 3786 return; 3787 3788 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 3789 3790 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 3791 return; 3792 3793 cpu_buffer->irq_work.wakeup_full = true; 3794 cpu_buffer->irq_work.full_waiters_pending = false; 3795 /* irq_work_queue() supplies it's own memory barriers */ 3796 irq_work_queue(&cpu_buffer->irq_work.work); 3797 } 3798 3799 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 3800 # define do_ring_buffer_record_recursion() \ 3801 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 3802 #else 3803 # define do_ring_buffer_record_recursion() do { } while (0) 3804 #endif 3805 3806 /* 3807 * The lock and unlock are done within a preempt disable section. 3808 * The current_context per_cpu variable can only be modified 3809 * by the current task between lock and unlock. But it can 3810 * be modified more than once via an interrupt. To pass this 3811 * information from the lock to the unlock without having to 3812 * access the 'in_interrupt()' functions again (which do show 3813 * a bit of overhead in something as critical as function tracing, 3814 * we use a bitmask trick. 3815 * 3816 * bit 1 = NMI context 3817 * bit 2 = IRQ context 3818 * bit 3 = SoftIRQ context 3819 * bit 4 = normal context. 3820 * 3821 * This works because this is the order of contexts that can 3822 * preempt other contexts. A SoftIRQ never preempts an IRQ 3823 * context. 3824 * 3825 * When the context is determined, the corresponding bit is 3826 * checked and set (if it was set, then a recursion of that context 3827 * happened). 3828 * 3829 * On unlock, we need to clear this bit. To do so, just subtract 3830 * 1 from the current_context and AND it to itself. 3831 * 3832 * (binary) 3833 * 101 - 1 = 100 3834 * 101 & 100 = 100 (clearing bit zero) 3835 * 3836 * 1010 - 1 = 1001 3837 * 1010 & 1001 = 1000 (clearing bit 1) 3838 * 3839 * The least significant bit can be cleared this way, and it 3840 * just so happens that it is the same bit corresponding to 3841 * the current context. 3842 * 3843 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3844 * is set when a recursion is detected at the current context, and if 3845 * the TRANSITION bit is already set, it will fail the recursion. 3846 * This is needed because there's a lag between the changing of 3847 * interrupt context and updating the preempt count. In this case, 3848 * a false positive will be found. To handle this, one extra recursion 3849 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3850 * bit is already set, then it is considered a recursion and the function 3851 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3852 * 3853 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3854 * to be cleared. Even if it wasn't the context that set it. That is, 3855 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3856 * is called before preempt_count() is updated, since the check will 3857 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3858 * NMI then comes in, it will set the NMI bit, but when the NMI code 3859 * does the trace_recursive_unlock() it will clear the TRANSITION bit 3860 * and leave the NMI bit set. But this is fine, because the interrupt 3861 * code that set the TRANSITION bit will then clear the NMI bit when it 3862 * calls trace_recursive_unlock(). If another NMI comes in, it will 3863 * set the TRANSITION bit and continue. 3864 * 3865 * Note: The TRANSITION bit only handles a single transition between context. 3866 */ 3867 3868 static __always_inline bool 3869 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 3870 { 3871 unsigned int val = cpu_buffer->current_context; 3872 int bit = interrupt_context_level(); 3873 3874 bit = RB_CTX_NORMAL - bit; 3875 3876 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 3877 /* 3878 * It is possible that this was called by transitioning 3879 * between interrupt context, and preempt_count() has not 3880 * been updated yet. In this case, use the TRANSITION bit. 3881 */ 3882 bit = RB_CTX_TRANSITION; 3883 if (val & (1 << (bit + cpu_buffer->nest))) { 3884 do_ring_buffer_record_recursion(); 3885 return true; 3886 } 3887 } 3888 3889 val |= (1 << (bit + cpu_buffer->nest)); 3890 cpu_buffer->current_context = val; 3891 3892 return false; 3893 } 3894 3895 static __always_inline void 3896 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 3897 { 3898 cpu_buffer->current_context &= 3899 cpu_buffer->current_context - (1 << cpu_buffer->nest); 3900 } 3901 3902 /* The recursive locking above uses 5 bits */ 3903 #define NESTED_BITS 5 3904 3905 /** 3906 * ring_buffer_nest_start - Allow to trace while nested 3907 * @buffer: The ring buffer to modify 3908 * 3909 * The ring buffer has a safety mechanism to prevent recursion. 3910 * But there may be a case where a trace needs to be done while 3911 * tracing something else. In this case, calling this function 3912 * will allow this function to nest within a currently active 3913 * ring_buffer_lock_reserve(). 3914 * 3915 * Call this function before calling another ring_buffer_lock_reserve() and 3916 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 3917 */ 3918 void ring_buffer_nest_start(struct trace_buffer *buffer) 3919 { 3920 struct ring_buffer_per_cpu *cpu_buffer; 3921 int cpu; 3922 3923 /* Enabled by ring_buffer_nest_end() */ 3924 preempt_disable_notrace(); 3925 cpu = raw_smp_processor_id(); 3926 cpu_buffer = buffer->buffers[cpu]; 3927 /* This is the shift value for the above recursive locking */ 3928 cpu_buffer->nest += NESTED_BITS; 3929 } 3930 3931 /** 3932 * ring_buffer_nest_end - Allow to trace while nested 3933 * @buffer: The ring buffer to modify 3934 * 3935 * Must be called after ring_buffer_nest_start() and after the 3936 * ring_buffer_unlock_commit(). 3937 */ 3938 void ring_buffer_nest_end(struct trace_buffer *buffer) 3939 { 3940 struct ring_buffer_per_cpu *cpu_buffer; 3941 int cpu; 3942 3943 /* disabled by ring_buffer_nest_start() */ 3944 cpu = raw_smp_processor_id(); 3945 cpu_buffer = buffer->buffers[cpu]; 3946 /* This is the shift value for the above recursive locking */ 3947 cpu_buffer->nest -= NESTED_BITS; 3948 preempt_enable_notrace(); 3949 } 3950 3951 /** 3952 * ring_buffer_unlock_commit - commit a reserved 3953 * @buffer: The buffer to commit to 3954 * 3955 * This commits the data to the ring buffer, and releases any locks held. 3956 * 3957 * Must be paired with ring_buffer_lock_reserve. 3958 */ 3959 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 3960 { 3961 struct ring_buffer_per_cpu *cpu_buffer; 3962 int cpu = raw_smp_processor_id(); 3963 3964 cpu_buffer = buffer->buffers[cpu]; 3965 3966 rb_commit(cpu_buffer); 3967 3968 rb_wakeups(buffer, cpu_buffer); 3969 3970 trace_recursive_unlock(cpu_buffer); 3971 3972 preempt_enable_notrace(); 3973 3974 return 0; 3975 } 3976 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 3977 3978 /* Special value to validate all deltas on a page. */ 3979 #define CHECK_FULL_PAGE 1L 3980 3981 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 3982 3983 static const char *show_irq_str(int bits) 3984 { 3985 const char *type[] = { 3986 ".", // 0 3987 "s", // 1 3988 "h", // 2 3989 "Hs", // 3 3990 "n", // 4 3991 "Ns", // 5 3992 "Nh", // 6 3993 "NHs", // 7 3994 }; 3995 3996 return type[bits]; 3997 } 3998 3999 /* Assume this is an trace event */ 4000 static const char *show_flags(struct ring_buffer_event *event) 4001 { 4002 struct trace_entry *entry; 4003 int bits = 0; 4004 4005 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4006 return "X"; 4007 4008 entry = ring_buffer_event_data(event); 4009 4010 if (entry->flags & TRACE_FLAG_SOFTIRQ) 4011 bits |= 1; 4012 4013 if (entry->flags & TRACE_FLAG_HARDIRQ) 4014 bits |= 2; 4015 4016 if (entry->flags & TRACE_FLAG_NMI) 4017 bits |= 4; 4018 4019 return show_irq_str(bits); 4020 } 4021 4022 static const char *show_irq(struct ring_buffer_event *event) 4023 { 4024 struct trace_entry *entry; 4025 4026 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4027 return ""; 4028 4029 entry = ring_buffer_event_data(event); 4030 if (entry->flags & TRACE_FLAG_IRQS_OFF) 4031 return "d"; 4032 return ""; 4033 } 4034 4035 static const char *show_interrupt_level(void) 4036 { 4037 unsigned long pc = preempt_count(); 4038 unsigned char level = 0; 4039 4040 if (pc & SOFTIRQ_OFFSET) 4041 level |= 1; 4042 4043 if (pc & HARDIRQ_MASK) 4044 level |= 2; 4045 4046 if (pc & NMI_MASK) 4047 level |= 4; 4048 4049 return show_irq_str(level); 4050 } 4051 4052 static void dump_buffer_page(struct buffer_data_page *bpage, 4053 struct rb_event_info *info, 4054 unsigned long tail) 4055 { 4056 struct ring_buffer_event *event; 4057 u64 ts, delta; 4058 int e; 4059 4060 ts = bpage->time_stamp; 4061 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 4062 4063 for (e = 0; e < tail; e += rb_event_length(event)) { 4064 4065 event = (struct ring_buffer_event *)(bpage->data + e); 4066 4067 switch (event->type_len) { 4068 4069 case RINGBUF_TYPE_TIME_EXTEND: 4070 delta = rb_event_time_stamp(event); 4071 ts += delta; 4072 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 4073 e, ts, delta); 4074 break; 4075 4076 case RINGBUF_TYPE_TIME_STAMP: 4077 delta = rb_event_time_stamp(event); 4078 ts = rb_fix_abs_ts(delta, ts); 4079 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 4080 e, ts, delta); 4081 break; 4082 4083 case RINGBUF_TYPE_PADDING: 4084 ts += event->time_delta; 4085 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 4086 e, ts, event->time_delta); 4087 break; 4088 4089 case RINGBUF_TYPE_DATA: 4090 ts += event->time_delta; 4091 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 4092 e, ts, event->time_delta, 4093 show_flags(event), show_irq(event)); 4094 break; 4095 4096 default: 4097 break; 4098 } 4099 } 4100 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 4101 } 4102 4103 static DEFINE_PER_CPU(atomic_t, checking); 4104 static atomic_t ts_dump; 4105 4106 #define buffer_warn_return(fmt, ...) \ 4107 do { \ 4108 /* If another report is happening, ignore this one */ \ 4109 if (atomic_inc_return(&ts_dump) != 1) { \ 4110 atomic_dec(&ts_dump); \ 4111 goto out; \ 4112 } \ 4113 atomic_inc(&cpu_buffer->record_disabled); \ 4114 pr_warn(fmt, ##__VA_ARGS__); \ 4115 dump_buffer_page(bpage, info, tail); \ 4116 atomic_dec(&ts_dump); \ 4117 /* There's some cases in boot up that this can happen */ \ 4118 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 4119 /* Do not re-enable checking */ \ 4120 return; \ 4121 } while (0) 4122 4123 /* 4124 * Check if the current event time stamp matches the deltas on 4125 * the buffer page. 4126 */ 4127 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4128 struct rb_event_info *info, 4129 unsigned long tail) 4130 { 4131 struct buffer_data_page *bpage; 4132 u64 ts, delta; 4133 bool full = false; 4134 int ret; 4135 4136 bpage = info->tail_page->page; 4137 4138 if (tail == CHECK_FULL_PAGE) { 4139 full = true; 4140 tail = local_read(&bpage->commit); 4141 } else if (info->add_timestamp & 4142 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 4143 /* Ignore events with absolute time stamps */ 4144 return; 4145 } 4146 4147 /* 4148 * Do not check the first event (skip possible extends too). 4149 * Also do not check if previous events have not been committed. 4150 */ 4151 if (tail <= 8 || tail > local_read(&bpage->commit)) 4152 return; 4153 4154 /* 4155 * If this interrupted another event, 4156 */ 4157 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 4158 goto out; 4159 4160 ret = rb_read_data_buffer(bpage, tail, cpu_buffer->cpu, &ts, &delta); 4161 if (ret < 0) { 4162 if (delta < ts) { 4163 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 4164 cpu_buffer->cpu, ts, delta); 4165 goto out; 4166 } 4167 } 4168 if ((full && ts > info->ts) || 4169 (!full && ts + info->delta != info->ts)) { 4170 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 4171 cpu_buffer->cpu, 4172 ts + info->delta, info->ts, info->delta, 4173 info->before, info->after, 4174 full ? " (full)" : "", show_interrupt_level()); 4175 } 4176 out: 4177 atomic_dec(this_cpu_ptr(&checking)); 4178 } 4179 #else 4180 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4181 struct rb_event_info *info, 4182 unsigned long tail) 4183 { 4184 } 4185 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 4186 4187 static struct ring_buffer_event * 4188 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 4189 struct rb_event_info *info) 4190 { 4191 struct ring_buffer_event *event; 4192 struct buffer_page *tail_page; 4193 unsigned long tail, write, w; 4194 4195 /* Don't let the compiler play games with cpu_buffer->tail_page */ 4196 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 4197 4198 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 4199 barrier(); 4200 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4201 rb_time_read(&cpu_buffer->write_stamp, &info->after); 4202 barrier(); 4203 info->ts = rb_time_stamp(cpu_buffer->buffer); 4204 4205 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 4206 info->delta = info->ts; 4207 } else { 4208 /* 4209 * If interrupting an event time update, we may need an 4210 * absolute timestamp. 4211 * Don't bother if this is the start of a new page (w == 0). 4212 */ 4213 if (!w) { 4214 /* Use the sub-buffer timestamp */ 4215 info->delta = 0; 4216 } else if (unlikely(info->before != info->after)) { 4217 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 4218 info->length += RB_LEN_TIME_EXTEND; 4219 } else { 4220 info->delta = info->ts - info->after; 4221 if (unlikely(test_time_stamp(info->delta))) { 4222 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 4223 info->length += RB_LEN_TIME_EXTEND; 4224 } 4225 } 4226 } 4227 4228 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 4229 4230 /*C*/ write = local_add_return(info->length, &tail_page->write); 4231 4232 /* set write to only the index of the write */ 4233 write &= RB_WRITE_MASK; 4234 4235 tail = write - info->length; 4236 4237 /* See if we shot pass the end of this buffer page */ 4238 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 4239 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 4240 return rb_move_tail(cpu_buffer, tail, info); 4241 } 4242 4243 if (likely(tail == w)) { 4244 /* Nothing interrupted us between A and C */ 4245 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 4246 /* 4247 * If something came in between C and D, the write stamp 4248 * may now not be in sync. But that's fine as the before_stamp 4249 * will be different and then next event will just be forced 4250 * to use an absolute timestamp. 4251 */ 4252 if (likely(!(info->add_timestamp & 4253 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4254 /* This did not interrupt any time update */ 4255 info->delta = info->ts - info->after; 4256 else 4257 /* Just use full timestamp for interrupting event */ 4258 info->delta = info->ts; 4259 check_buffer(cpu_buffer, info, tail); 4260 } else { 4261 u64 ts; 4262 /* SLOW PATH - Interrupted between A and C */ 4263 4264 /* Save the old before_stamp */ 4265 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4266 4267 /* 4268 * Read a new timestamp and update the before_stamp to make 4269 * the next event after this one force using an absolute 4270 * timestamp. This is in case an interrupt were to come in 4271 * between E and F. 4272 */ 4273 ts = rb_time_stamp(cpu_buffer->buffer); 4274 rb_time_set(&cpu_buffer->before_stamp, ts); 4275 4276 barrier(); 4277 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 4278 barrier(); 4279 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 4280 info->after == info->before && info->after < ts) { 4281 /* 4282 * Nothing came after this event between C and F, it is 4283 * safe to use info->after for the delta as it 4284 * matched info->before and is still valid. 4285 */ 4286 info->delta = ts - info->after; 4287 } else { 4288 /* 4289 * Interrupted between C and F: 4290 * Lost the previous events time stamp. Just set the 4291 * delta to zero, and this will be the same time as 4292 * the event this event interrupted. And the events that 4293 * came after this will still be correct (as they would 4294 * have built their delta on the previous event. 4295 */ 4296 info->delta = 0; 4297 } 4298 info->ts = ts; 4299 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 4300 } 4301 4302 /* 4303 * If this is the first commit on the page, then it has the same 4304 * timestamp as the page itself. 4305 */ 4306 if (unlikely(!tail && !(info->add_timestamp & 4307 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4308 info->delta = 0; 4309 4310 /* We reserved something on the buffer */ 4311 4312 event = __rb_page_index(tail_page, tail); 4313 rb_update_event(cpu_buffer, event, info); 4314 4315 local_inc(&tail_page->entries); 4316 4317 /* 4318 * If this is the first commit on the page, then update 4319 * its timestamp. 4320 */ 4321 if (unlikely(!tail)) 4322 tail_page->page->time_stamp = info->ts; 4323 4324 /* account for these added bytes */ 4325 local_add(info->length, &cpu_buffer->entries_bytes); 4326 4327 return event; 4328 } 4329 4330 static __always_inline struct ring_buffer_event * 4331 rb_reserve_next_event(struct trace_buffer *buffer, 4332 struct ring_buffer_per_cpu *cpu_buffer, 4333 unsigned long length) 4334 { 4335 struct ring_buffer_event *event; 4336 struct rb_event_info info; 4337 int nr_loops = 0; 4338 int add_ts_default; 4339 4340 /* ring buffer does cmpxchg, make sure it is safe in NMI context */ 4341 if (!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) && 4342 (unlikely(in_nmi()))) { 4343 return NULL; 4344 } 4345 4346 rb_start_commit(cpu_buffer); 4347 /* The commit page can not change after this */ 4348 4349 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4350 /* 4351 * Due to the ability to swap a cpu buffer from a buffer 4352 * it is possible it was swapped before we committed. 4353 * (committing stops a swap). We check for it here and 4354 * if it happened, we have to fail the write. 4355 */ 4356 barrier(); 4357 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 4358 local_dec(&cpu_buffer->committing); 4359 local_dec(&cpu_buffer->commits); 4360 return NULL; 4361 } 4362 #endif 4363 4364 info.length = rb_calculate_event_length(length); 4365 4366 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 4367 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 4368 info.length += RB_LEN_TIME_EXTEND; 4369 if (info.length > cpu_buffer->buffer->max_data_size) 4370 goto out_fail; 4371 } else { 4372 add_ts_default = RB_ADD_STAMP_NONE; 4373 } 4374 4375 again: 4376 info.add_timestamp = add_ts_default; 4377 info.delta = 0; 4378 4379 /* 4380 * We allow for interrupts to reenter here and do a trace. 4381 * If one does, it will cause this original code to loop 4382 * back here. Even with heavy interrupts happening, this 4383 * should only happen a few times in a row. If this happens 4384 * 1000 times in a row, there must be either an interrupt 4385 * storm or we have something buggy. 4386 * Bail! 4387 */ 4388 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 4389 goto out_fail; 4390 4391 event = __rb_reserve_next(cpu_buffer, &info); 4392 4393 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 4394 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 4395 info.length -= RB_LEN_TIME_EXTEND; 4396 goto again; 4397 } 4398 4399 if (likely(event)) 4400 return event; 4401 out_fail: 4402 rb_end_commit(cpu_buffer); 4403 return NULL; 4404 } 4405 4406 /** 4407 * ring_buffer_lock_reserve - reserve a part of the buffer 4408 * @buffer: the ring buffer to reserve from 4409 * @length: the length of the data to reserve (excluding event header) 4410 * 4411 * Returns a reserved event on the ring buffer to copy directly to. 4412 * The user of this interface will need to get the body to write into 4413 * and can use the ring_buffer_event_data() interface. 4414 * 4415 * The length is the length of the data needed, not the event length 4416 * which also includes the event header. 4417 * 4418 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 4419 * If NULL is returned, then nothing has been allocated or locked. 4420 */ 4421 struct ring_buffer_event * 4422 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 4423 { 4424 struct ring_buffer_per_cpu *cpu_buffer; 4425 struct ring_buffer_event *event; 4426 int cpu; 4427 4428 /* If we are tracing schedule, we don't want to recurse */ 4429 preempt_disable_notrace(); 4430 4431 if (unlikely(atomic_read(&buffer->record_disabled))) 4432 goto out; 4433 4434 cpu = raw_smp_processor_id(); 4435 4436 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 4437 goto out; 4438 4439 cpu_buffer = buffer->buffers[cpu]; 4440 4441 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 4442 goto out; 4443 4444 if (unlikely(length > buffer->max_data_size)) 4445 goto out; 4446 4447 if (unlikely(trace_recursive_lock(cpu_buffer))) 4448 goto out; 4449 4450 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4451 if (!event) 4452 goto out_unlock; 4453 4454 return event; 4455 4456 out_unlock: 4457 trace_recursive_unlock(cpu_buffer); 4458 out: 4459 preempt_enable_notrace(); 4460 return NULL; 4461 } 4462 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 4463 4464 /* 4465 * Decrement the entries to the page that an event is on. 4466 * The event does not even need to exist, only the pointer 4467 * to the page it is on. This may only be called before the commit 4468 * takes place. 4469 */ 4470 static inline void 4471 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 4472 struct ring_buffer_event *event) 4473 { 4474 unsigned long addr = (unsigned long)event; 4475 struct buffer_page *bpage = cpu_buffer->commit_page; 4476 struct buffer_page *start; 4477 4478 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 4479 4480 /* Do the likely case first */ 4481 if (likely(bpage->page == (void *)addr)) { 4482 local_dec(&bpage->entries); 4483 return; 4484 } 4485 4486 /* 4487 * Because the commit page may be on the reader page we 4488 * start with the next page and check the end loop there. 4489 */ 4490 rb_inc_page(&bpage); 4491 start = bpage; 4492 do { 4493 if (bpage->page == (void *)addr) { 4494 local_dec(&bpage->entries); 4495 return; 4496 } 4497 rb_inc_page(&bpage); 4498 } while (bpage != start); 4499 4500 /* commit not part of this buffer?? */ 4501 RB_WARN_ON(cpu_buffer, 1); 4502 } 4503 4504 /** 4505 * ring_buffer_discard_commit - discard an event that has not been committed 4506 * @buffer: the ring buffer 4507 * @event: non committed event to discard 4508 * 4509 * Sometimes an event that is in the ring buffer needs to be ignored. 4510 * This function lets the user discard an event in the ring buffer 4511 * and then that event will not be read later. 4512 * 4513 * This function only works if it is called before the item has been 4514 * committed. It will try to free the event from the ring buffer 4515 * if another event has not been added behind it. 4516 * 4517 * If another event has been added behind it, it will set the event 4518 * up as discarded, and perform the commit. 4519 * 4520 * If this function is called, do not call ring_buffer_unlock_commit on 4521 * the event. 4522 */ 4523 void ring_buffer_discard_commit(struct trace_buffer *buffer, 4524 struct ring_buffer_event *event) 4525 { 4526 struct ring_buffer_per_cpu *cpu_buffer; 4527 int cpu; 4528 4529 /* The event is discarded regardless */ 4530 rb_event_discard(event); 4531 4532 cpu = smp_processor_id(); 4533 cpu_buffer = buffer->buffers[cpu]; 4534 4535 /* 4536 * This must only be called if the event has not been 4537 * committed yet. Thus we can assume that preemption 4538 * is still disabled. 4539 */ 4540 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 4541 4542 rb_decrement_entry(cpu_buffer, event); 4543 if (rb_try_to_discard(cpu_buffer, event)) 4544 goto out; 4545 4546 out: 4547 rb_end_commit(cpu_buffer); 4548 4549 trace_recursive_unlock(cpu_buffer); 4550 4551 preempt_enable_notrace(); 4552 4553 } 4554 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 4555 4556 /** 4557 * ring_buffer_write - write data to the buffer without reserving 4558 * @buffer: The ring buffer to write to. 4559 * @length: The length of the data being written (excluding the event header) 4560 * @data: The data to write to the buffer. 4561 * 4562 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 4563 * one function. If you already have the data to write to the buffer, it 4564 * may be easier to simply call this function. 4565 * 4566 * Note, like ring_buffer_lock_reserve, the length is the length of the data 4567 * and not the length of the event which would hold the header. 4568 */ 4569 int ring_buffer_write(struct trace_buffer *buffer, 4570 unsigned long length, 4571 void *data) 4572 { 4573 struct ring_buffer_per_cpu *cpu_buffer; 4574 struct ring_buffer_event *event; 4575 void *body; 4576 int ret = -EBUSY; 4577 int cpu; 4578 4579 preempt_disable_notrace(); 4580 4581 if (atomic_read(&buffer->record_disabled)) 4582 goto out; 4583 4584 cpu = raw_smp_processor_id(); 4585 4586 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4587 goto out; 4588 4589 cpu_buffer = buffer->buffers[cpu]; 4590 4591 if (atomic_read(&cpu_buffer->record_disabled)) 4592 goto out; 4593 4594 if (length > buffer->max_data_size) 4595 goto out; 4596 4597 if (unlikely(trace_recursive_lock(cpu_buffer))) 4598 goto out; 4599 4600 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4601 if (!event) 4602 goto out_unlock; 4603 4604 body = rb_event_data(event); 4605 4606 memcpy(body, data, length); 4607 4608 rb_commit(cpu_buffer); 4609 4610 rb_wakeups(buffer, cpu_buffer); 4611 4612 ret = 0; 4613 4614 out_unlock: 4615 trace_recursive_unlock(cpu_buffer); 4616 4617 out: 4618 preempt_enable_notrace(); 4619 4620 return ret; 4621 } 4622 EXPORT_SYMBOL_GPL(ring_buffer_write); 4623 4624 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 4625 { 4626 struct buffer_page *reader = cpu_buffer->reader_page; 4627 struct buffer_page *head = rb_set_head_page(cpu_buffer); 4628 struct buffer_page *commit = cpu_buffer->commit_page; 4629 4630 /* In case of error, head will be NULL */ 4631 if (unlikely(!head)) 4632 return true; 4633 4634 /* Reader should exhaust content in reader page */ 4635 if (reader->read != rb_page_size(reader)) 4636 return false; 4637 4638 /* 4639 * If writers are committing on the reader page, knowing all 4640 * committed content has been read, the ring buffer is empty. 4641 */ 4642 if (commit == reader) 4643 return true; 4644 4645 /* 4646 * If writers are committing on a page other than reader page 4647 * and head page, there should always be content to read. 4648 */ 4649 if (commit != head) 4650 return false; 4651 4652 /* 4653 * Writers are committing on the head page, we just need 4654 * to care about there're committed data, and the reader will 4655 * swap reader page with head page when it is to read data. 4656 */ 4657 return rb_page_commit(commit) == 0; 4658 } 4659 4660 /** 4661 * ring_buffer_record_disable - stop all writes into the buffer 4662 * @buffer: The ring buffer to stop writes to. 4663 * 4664 * This prevents all writes to the buffer. Any attempt to write 4665 * to the buffer after this will fail and return NULL. 4666 * 4667 * The caller should call synchronize_rcu() after this. 4668 */ 4669 void ring_buffer_record_disable(struct trace_buffer *buffer) 4670 { 4671 atomic_inc(&buffer->record_disabled); 4672 } 4673 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4674 4675 /** 4676 * ring_buffer_record_enable - enable writes to the buffer 4677 * @buffer: The ring buffer to enable writes 4678 * 4679 * Note, multiple disables will need the same number of enables 4680 * to truly enable the writing (much like preempt_disable). 4681 */ 4682 void ring_buffer_record_enable(struct trace_buffer *buffer) 4683 { 4684 atomic_dec(&buffer->record_disabled); 4685 } 4686 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4687 4688 /** 4689 * ring_buffer_record_off - stop all writes into the buffer 4690 * @buffer: The ring buffer to stop writes to. 4691 * 4692 * This prevents all writes to the buffer. Any attempt to write 4693 * to the buffer after this will fail and return NULL. 4694 * 4695 * This is different than ring_buffer_record_disable() as 4696 * it works like an on/off switch, where as the disable() version 4697 * must be paired with a enable(). 4698 */ 4699 void ring_buffer_record_off(struct trace_buffer *buffer) 4700 { 4701 unsigned int rd; 4702 unsigned int new_rd; 4703 4704 rd = atomic_read(&buffer->record_disabled); 4705 do { 4706 new_rd = rd | RB_BUFFER_OFF; 4707 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4708 } 4709 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4710 4711 /** 4712 * ring_buffer_record_on - restart writes into the buffer 4713 * @buffer: The ring buffer to start writes to. 4714 * 4715 * This enables all writes to the buffer that was disabled by 4716 * ring_buffer_record_off(). 4717 * 4718 * This is different than ring_buffer_record_enable() as 4719 * it works like an on/off switch, where as the enable() version 4720 * must be paired with a disable(). 4721 */ 4722 void ring_buffer_record_on(struct trace_buffer *buffer) 4723 { 4724 unsigned int rd; 4725 unsigned int new_rd; 4726 4727 rd = atomic_read(&buffer->record_disabled); 4728 do { 4729 new_rd = rd & ~RB_BUFFER_OFF; 4730 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4731 } 4732 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4733 4734 /** 4735 * ring_buffer_record_is_on - return true if the ring buffer can write 4736 * @buffer: The ring buffer to see if write is enabled 4737 * 4738 * Returns true if the ring buffer is in a state that it accepts writes. 4739 */ 4740 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4741 { 4742 return !atomic_read(&buffer->record_disabled); 4743 } 4744 4745 /** 4746 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4747 * @buffer: The ring buffer to see if write is set enabled 4748 * 4749 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4750 * Note that this does NOT mean it is in a writable state. 4751 * 4752 * It may return true when the ring buffer has been disabled by 4753 * ring_buffer_record_disable(), as that is a temporary disabling of 4754 * the ring buffer. 4755 */ 4756 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4757 { 4758 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4759 } 4760 4761 /** 4762 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4763 * @buffer: The ring buffer to stop writes to. 4764 * @cpu: The CPU buffer to stop 4765 * 4766 * This prevents all writes to the buffer. Any attempt to write 4767 * to the buffer after this will fail and return NULL. 4768 * 4769 * The caller should call synchronize_rcu() after this. 4770 */ 4771 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 4772 { 4773 struct ring_buffer_per_cpu *cpu_buffer; 4774 4775 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4776 return; 4777 4778 cpu_buffer = buffer->buffers[cpu]; 4779 atomic_inc(&cpu_buffer->record_disabled); 4780 } 4781 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 4782 4783 /** 4784 * ring_buffer_record_enable_cpu - enable writes to the buffer 4785 * @buffer: The ring buffer to enable writes 4786 * @cpu: The CPU to enable. 4787 * 4788 * Note, multiple disables will need the same number of enables 4789 * to truly enable the writing (much like preempt_disable). 4790 */ 4791 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 4792 { 4793 struct ring_buffer_per_cpu *cpu_buffer; 4794 4795 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4796 return; 4797 4798 cpu_buffer = buffer->buffers[cpu]; 4799 atomic_dec(&cpu_buffer->record_disabled); 4800 } 4801 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 4802 4803 /* 4804 * The total entries in the ring buffer is the running counter 4805 * of entries entered into the ring buffer, minus the sum of 4806 * the entries read from the ring buffer and the number of 4807 * entries that were overwritten. 4808 */ 4809 static inline unsigned long 4810 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4811 { 4812 return local_read(&cpu_buffer->entries) - 4813 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4814 } 4815 4816 /** 4817 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 4818 * @buffer: The ring buffer 4819 * @cpu: The per CPU buffer to read from. 4820 */ 4821 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 4822 { 4823 unsigned long flags; 4824 struct ring_buffer_per_cpu *cpu_buffer; 4825 struct buffer_page *bpage; 4826 u64 ret = 0; 4827 4828 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4829 return 0; 4830 4831 cpu_buffer = buffer->buffers[cpu]; 4832 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4833 /* 4834 * if the tail is on reader_page, oldest time stamp is on the reader 4835 * page 4836 */ 4837 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 4838 bpage = cpu_buffer->reader_page; 4839 else 4840 bpage = rb_set_head_page(cpu_buffer); 4841 if (bpage) 4842 ret = bpage->page->time_stamp; 4843 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4844 4845 return ret; 4846 } 4847 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 4848 4849 /** 4850 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 4851 * @buffer: The ring buffer 4852 * @cpu: The per CPU buffer to read from. 4853 */ 4854 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 4855 { 4856 struct ring_buffer_per_cpu *cpu_buffer; 4857 unsigned long ret; 4858 4859 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4860 return 0; 4861 4862 cpu_buffer = buffer->buffers[cpu]; 4863 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 4864 4865 return ret; 4866 } 4867 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 4868 4869 /** 4870 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 4871 * @buffer: The ring buffer 4872 * @cpu: The per CPU buffer to get the entries from. 4873 */ 4874 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 4875 { 4876 struct ring_buffer_per_cpu *cpu_buffer; 4877 4878 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4879 return 0; 4880 4881 cpu_buffer = buffer->buffers[cpu]; 4882 4883 return rb_num_of_entries(cpu_buffer); 4884 } 4885 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 4886 4887 /** 4888 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 4889 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 4890 * @buffer: The ring buffer 4891 * @cpu: The per CPU buffer to get the number of overruns from 4892 */ 4893 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 4894 { 4895 struct ring_buffer_per_cpu *cpu_buffer; 4896 unsigned long ret; 4897 4898 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4899 return 0; 4900 4901 cpu_buffer = buffer->buffers[cpu]; 4902 ret = local_read(&cpu_buffer->overrun); 4903 4904 return ret; 4905 } 4906 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 4907 4908 /** 4909 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 4910 * commits failing due to the buffer wrapping around while there are uncommitted 4911 * events, such as during an interrupt storm. 4912 * @buffer: The ring buffer 4913 * @cpu: The per CPU buffer to get the number of overruns from 4914 */ 4915 unsigned long 4916 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 4917 { 4918 struct ring_buffer_per_cpu *cpu_buffer; 4919 unsigned long ret; 4920 4921 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4922 return 0; 4923 4924 cpu_buffer = buffer->buffers[cpu]; 4925 ret = local_read(&cpu_buffer->commit_overrun); 4926 4927 return ret; 4928 } 4929 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 4930 4931 /** 4932 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 4933 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 4934 * @buffer: The ring buffer 4935 * @cpu: The per CPU buffer to get the number of overruns from 4936 */ 4937 unsigned long 4938 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 4939 { 4940 struct ring_buffer_per_cpu *cpu_buffer; 4941 unsigned long ret; 4942 4943 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4944 return 0; 4945 4946 cpu_buffer = buffer->buffers[cpu]; 4947 ret = local_read(&cpu_buffer->dropped_events); 4948 4949 return ret; 4950 } 4951 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 4952 4953 /** 4954 * ring_buffer_read_events_cpu - get the number of events successfully read 4955 * @buffer: The ring buffer 4956 * @cpu: The per CPU buffer to get the number of events read 4957 */ 4958 unsigned long 4959 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 4960 { 4961 struct ring_buffer_per_cpu *cpu_buffer; 4962 4963 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4964 return 0; 4965 4966 cpu_buffer = buffer->buffers[cpu]; 4967 return cpu_buffer->read; 4968 } 4969 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 4970 4971 /** 4972 * ring_buffer_entries - get the number of entries in a buffer 4973 * @buffer: The ring buffer 4974 * 4975 * Returns the total number of entries in the ring buffer 4976 * (all CPU entries) 4977 */ 4978 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 4979 { 4980 struct ring_buffer_per_cpu *cpu_buffer; 4981 unsigned long entries = 0; 4982 int cpu; 4983 4984 /* if you care about this being correct, lock the buffer */ 4985 for_each_buffer_cpu(buffer, cpu) { 4986 cpu_buffer = buffer->buffers[cpu]; 4987 entries += rb_num_of_entries(cpu_buffer); 4988 } 4989 4990 return entries; 4991 } 4992 EXPORT_SYMBOL_GPL(ring_buffer_entries); 4993 4994 /** 4995 * ring_buffer_overruns - get the number of overruns in buffer 4996 * @buffer: The ring buffer 4997 * 4998 * Returns the total number of overruns in the ring buffer 4999 * (all CPU entries) 5000 */ 5001 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 5002 { 5003 struct ring_buffer_per_cpu *cpu_buffer; 5004 unsigned long overruns = 0; 5005 int cpu; 5006 5007 /* if you care about this being correct, lock the buffer */ 5008 for_each_buffer_cpu(buffer, cpu) { 5009 cpu_buffer = buffer->buffers[cpu]; 5010 overruns += local_read(&cpu_buffer->overrun); 5011 } 5012 5013 return overruns; 5014 } 5015 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 5016 5017 static void rb_iter_reset(struct ring_buffer_iter *iter) 5018 { 5019 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5020 5021 /* Iterator usage is expected to have record disabled */ 5022 iter->head_page = cpu_buffer->reader_page; 5023 iter->head = cpu_buffer->reader_page->read; 5024 iter->next_event = iter->head; 5025 5026 iter->cache_reader_page = iter->head_page; 5027 iter->cache_read = cpu_buffer->read; 5028 iter->cache_pages_removed = cpu_buffer->pages_removed; 5029 5030 if (iter->head) { 5031 iter->read_stamp = cpu_buffer->read_stamp; 5032 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 5033 } else { 5034 iter->read_stamp = iter->head_page->page->time_stamp; 5035 iter->page_stamp = iter->read_stamp; 5036 } 5037 } 5038 5039 /** 5040 * ring_buffer_iter_reset - reset an iterator 5041 * @iter: The iterator to reset 5042 * 5043 * Resets the iterator, so that it will start from the beginning 5044 * again. 5045 */ 5046 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 5047 { 5048 struct ring_buffer_per_cpu *cpu_buffer; 5049 unsigned long flags; 5050 5051 if (!iter) 5052 return; 5053 5054 cpu_buffer = iter->cpu_buffer; 5055 5056 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5057 rb_iter_reset(iter); 5058 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5059 } 5060 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 5061 5062 /** 5063 * ring_buffer_iter_empty - check if an iterator has no more to read 5064 * @iter: The iterator to check 5065 */ 5066 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 5067 { 5068 struct ring_buffer_per_cpu *cpu_buffer; 5069 struct buffer_page *reader; 5070 struct buffer_page *head_page; 5071 struct buffer_page *commit_page; 5072 struct buffer_page *curr_commit_page; 5073 unsigned commit; 5074 u64 curr_commit_ts; 5075 u64 commit_ts; 5076 5077 cpu_buffer = iter->cpu_buffer; 5078 reader = cpu_buffer->reader_page; 5079 head_page = cpu_buffer->head_page; 5080 commit_page = READ_ONCE(cpu_buffer->commit_page); 5081 commit_ts = commit_page->page->time_stamp; 5082 5083 /* 5084 * When the writer goes across pages, it issues a cmpxchg which 5085 * is a mb(), which will synchronize with the rmb here. 5086 * (see rb_tail_page_update()) 5087 */ 5088 smp_rmb(); 5089 commit = rb_page_commit(commit_page); 5090 /* We want to make sure that the commit page doesn't change */ 5091 smp_rmb(); 5092 5093 /* Make sure commit page didn't change */ 5094 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 5095 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 5096 5097 /* If the commit page changed, then there's more data */ 5098 if (curr_commit_page != commit_page || 5099 curr_commit_ts != commit_ts) 5100 return 0; 5101 5102 /* Still racy, as it may return a false positive, but that's OK */ 5103 return ((iter->head_page == commit_page && iter->head >= commit) || 5104 (iter->head_page == reader && commit_page == head_page && 5105 head_page->read == commit && 5106 iter->head == rb_page_size(cpu_buffer->reader_page))); 5107 } 5108 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 5109 5110 static void 5111 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 5112 struct ring_buffer_event *event) 5113 { 5114 u64 delta; 5115 5116 switch (event->type_len) { 5117 case RINGBUF_TYPE_PADDING: 5118 return; 5119 5120 case RINGBUF_TYPE_TIME_EXTEND: 5121 delta = rb_event_time_stamp(event); 5122 cpu_buffer->read_stamp += delta; 5123 return; 5124 5125 case RINGBUF_TYPE_TIME_STAMP: 5126 delta = rb_event_time_stamp(event); 5127 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 5128 cpu_buffer->read_stamp = delta; 5129 return; 5130 5131 case RINGBUF_TYPE_DATA: 5132 cpu_buffer->read_stamp += event->time_delta; 5133 return; 5134 5135 default: 5136 RB_WARN_ON(cpu_buffer, 1); 5137 } 5138 } 5139 5140 static void 5141 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 5142 struct ring_buffer_event *event) 5143 { 5144 u64 delta; 5145 5146 switch (event->type_len) { 5147 case RINGBUF_TYPE_PADDING: 5148 return; 5149 5150 case RINGBUF_TYPE_TIME_EXTEND: 5151 delta = rb_event_time_stamp(event); 5152 iter->read_stamp += delta; 5153 return; 5154 5155 case RINGBUF_TYPE_TIME_STAMP: 5156 delta = rb_event_time_stamp(event); 5157 delta = rb_fix_abs_ts(delta, iter->read_stamp); 5158 iter->read_stamp = delta; 5159 return; 5160 5161 case RINGBUF_TYPE_DATA: 5162 iter->read_stamp += event->time_delta; 5163 return; 5164 5165 default: 5166 RB_WARN_ON(iter->cpu_buffer, 1); 5167 } 5168 } 5169 5170 static struct buffer_page * 5171 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5172 { 5173 struct buffer_page *reader = NULL; 5174 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 5175 unsigned long overwrite; 5176 unsigned long flags; 5177 int nr_loops = 0; 5178 bool ret; 5179 5180 local_irq_save(flags); 5181 arch_spin_lock(&cpu_buffer->lock); 5182 5183 again: 5184 /* 5185 * This should normally only loop twice. But because the 5186 * start of the reader inserts an empty page, it causes 5187 * a case where we will loop three times. There should be no 5188 * reason to loop four times (that I know of). 5189 */ 5190 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 5191 reader = NULL; 5192 goto out; 5193 } 5194 5195 reader = cpu_buffer->reader_page; 5196 5197 /* If there's more to read, return this page */ 5198 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 5199 goto out; 5200 5201 /* Never should we have an index greater than the size */ 5202 if (RB_WARN_ON(cpu_buffer, 5203 cpu_buffer->reader_page->read > rb_page_size(reader))) 5204 goto out; 5205 5206 /* check if we caught up to the tail */ 5207 reader = NULL; 5208 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 5209 goto out; 5210 5211 /* Don't bother swapping if the ring buffer is empty */ 5212 if (rb_num_of_entries(cpu_buffer) == 0) 5213 goto out; 5214 5215 /* 5216 * Reset the reader page to size zero. 5217 */ 5218 local_set(&cpu_buffer->reader_page->write, 0); 5219 local_set(&cpu_buffer->reader_page->entries, 0); 5220 local_set(&cpu_buffer->reader_page->page->commit, 0); 5221 cpu_buffer->reader_page->real_end = 0; 5222 5223 spin: 5224 /* 5225 * Splice the empty reader page into the list around the head. 5226 */ 5227 reader = rb_set_head_page(cpu_buffer); 5228 if (!reader) 5229 goto out; 5230 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 5231 cpu_buffer->reader_page->list.prev = reader->list.prev; 5232 5233 /* 5234 * cpu_buffer->pages just needs to point to the buffer, it 5235 * has no specific buffer page to point to. Lets move it out 5236 * of our way so we don't accidentally swap it. 5237 */ 5238 cpu_buffer->pages = reader->list.prev; 5239 5240 /* The reader page will be pointing to the new head */ 5241 rb_set_list_to_head(&cpu_buffer->reader_page->list); 5242 5243 /* 5244 * We want to make sure we read the overruns after we set up our 5245 * pointers to the next object. The writer side does a 5246 * cmpxchg to cross pages which acts as the mb on the writer 5247 * side. Note, the reader will constantly fail the swap 5248 * while the writer is updating the pointers, so this 5249 * guarantees that the overwrite recorded here is the one we 5250 * want to compare with the last_overrun. 5251 */ 5252 smp_mb(); 5253 overwrite = local_read(&(cpu_buffer->overrun)); 5254 5255 /* 5256 * Here's the tricky part. 5257 * 5258 * We need to move the pointer past the header page. 5259 * But we can only do that if a writer is not currently 5260 * moving it. The page before the header page has the 5261 * flag bit '1' set if it is pointing to the page we want. 5262 * but if the writer is in the process of moving it 5263 * than it will be '2' or already moved '0'. 5264 */ 5265 5266 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 5267 5268 /* 5269 * If we did not convert it, then we must try again. 5270 */ 5271 if (!ret) 5272 goto spin; 5273 5274 if (cpu_buffer->ring_meta) 5275 rb_update_meta_reader(cpu_buffer, reader); 5276 5277 /* 5278 * Yay! We succeeded in replacing the page. 5279 * 5280 * Now make the new head point back to the reader page. 5281 */ 5282 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 5283 rb_inc_page(&cpu_buffer->head_page); 5284 5285 local_inc(&cpu_buffer->pages_read); 5286 5287 /* Finally update the reader page to the new head */ 5288 cpu_buffer->reader_page = reader; 5289 cpu_buffer->reader_page->read = 0; 5290 5291 if (overwrite != cpu_buffer->last_overrun) { 5292 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 5293 cpu_buffer->last_overrun = overwrite; 5294 } 5295 5296 goto again; 5297 5298 out: 5299 /* Update the read_stamp on the first event */ 5300 if (reader && reader->read == 0) 5301 cpu_buffer->read_stamp = reader->page->time_stamp; 5302 5303 arch_spin_unlock(&cpu_buffer->lock); 5304 local_irq_restore(flags); 5305 5306 /* 5307 * The writer has preempt disable, wait for it. But not forever 5308 * Although, 1 second is pretty much "forever" 5309 */ 5310 #define USECS_WAIT 1000000 5311 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 5312 /* If the write is past the end of page, a writer is still updating it */ 5313 if (likely(!reader || rb_page_write(reader) <= bsize)) 5314 break; 5315 5316 udelay(1); 5317 5318 /* Get the latest version of the reader write value */ 5319 smp_rmb(); 5320 } 5321 5322 /* The writer is not moving forward? Something is wrong */ 5323 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 5324 reader = NULL; 5325 5326 /* 5327 * Make sure we see any padding after the write update 5328 * (see rb_reset_tail()). 5329 * 5330 * In addition, a writer may be writing on the reader page 5331 * if the page has not been fully filled, so the read barrier 5332 * is also needed to make sure we see the content of what is 5333 * committed by the writer (see rb_set_commit_to_write()). 5334 */ 5335 smp_rmb(); 5336 5337 5338 return reader; 5339 } 5340 5341 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 5342 { 5343 struct ring_buffer_event *event; 5344 struct buffer_page *reader; 5345 unsigned length; 5346 5347 reader = rb_get_reader_page(cpu_buffer); 5348 5349 /* This function should not be called when buffer is empty */ 5350 if (RB_WARN_ON(cpu_buffer, !reader)) 5351 return; 5352 5353 event = rb_reader_event(cpu_buffer); 5354 5355 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 5356 cpu_buffer->read++; 5357 5358 rb_update_read_stamp(cpu_buffer, event); 5359 5360 length = rb_event_length(event); 5361 cpu_buffer->reader_page->read += length; 5362 cpu_buffer->read_bytes += length; 5363 } 5364 5365 static void rb_advance_iter(struct ring_buffer_iter *iter) 5366 { 5367 struct ring_buffer_per_cpu *cpu_buffer; 5368 5369 cpu_buffer = iter->cpu_buffer; 5370 5371 /* If head == next_event then we need to jump to the next event */ 5372 if (iter->head == iter->next_event) { 5373 /* If the event gets overwritten again, there's nothing to do */ 5374 if (rb_iter_head_event(iter) == NULL) 5375 return; 5376 } 5377 5378 iter->head = iter->next_event; 5379 5380 /* 5381 * Check if we are at the end of the buffer. 5382 */ 5383 if (iter->next_event >= rb_page_size(iter->head_page)) { 5384 /* discarded commits can make the page empty */ 5385 if (iter->head_page == cpu_buffer->commit_page) 5386 return; 5387 rb_inc_iter(iter); 5388 return; 5389 } 5390 5391 rb_update_iter_read_stamp(iter, iter->event); 5392 } 5393 5394 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 5395 { 5396 return cpu_buffer->lost_events; 5397 } 5398 5399 static struct ring_buffer_event * 5400 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 5401 unsigned long *lost_events) 5402 { 5403 struct ring_buffer_event *event; 5404 struct buffer_page *reader; 5405 int nr_loops = 0; 5406 5407 if (ts) 5408 *ts = 0; 5409 again: 5410 /* 5411 * We repeat when a time extend is encountered. 5412 * Since the time extend is always attached to a data event, 5413 * we should never loop more than once. 5414 * (We never hit the following condition more than twice). 5415 */ 5416 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 5417 return NULL; 5418 5419 reader = rb_get_reader_page(cpu_buffer); 5420 if (!reader) 5421 return NULL; 5422 5423 event = rb_reader_event(cpu_buffer); 5424 5425 switch (event->type_len) { 5426 case RINGBUF_TYPE_PADDING: 5427 if (rb_null_event(event)) 5428 RB_WARN_ON(cpu_buffer, 1); 5429 /* 5430 * Because the writer could be discarding every 5431 * event it creates (which would probably be bad) 5432 * if we were to go back to "again" then we may never 5433 * catch up, and will trigger the warn on, or lock 5434 * the box. Return the padding, and we will release 5435 * the current locks, and try again. 5436 */ 5437 return event; 5438 5439 case RINGBUF_TYPE_TIME_EXTEND: 5440 /* Internal data, OK to advance */ 5441 rb_advance_reader(cpu_buffer); 5442 goto again; 5443 5444 case RINGBUF_TYPE_TIME_STAMP: 5445 if (ts) { 5446 *ts = rb_event_time_stamp(event); 5447 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 5448 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5449 cpu_buffer->cpu, ts); 5450 } 5451 /* Internal data, OK to advance */ 5452 rb_advance_reader(cpu_buffer); 5453 goto again; 5454 5455 case RINGBUF_TYPE_DATA: 5456 if (ts && !(*ts)) { 5457 *ts = cpu_buffer->read_stamp + event->time_delta; 5458 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5459 cpu_buffer->cpu, ts); 5460 } 5461 if (lost_events) 5462 *lost_events = rb_lost_events(cpu_buffer); 5463 return event; 5464 5465 default: 5466 RB_WARN_ON(cpu_buffer, 1); 5467 } 5468 5469 return NULL; 5470 } 5471 EXPORT_SYMBOL_GPL(ring_buffer_peek); 5472 5473 static struct ring_buffer_event * 5474 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5475 { 5476 struct trace_buffer *buffer; 5477 struct ring_buffer_per_cpu *cpu_buffer; 5478 struct ring_buffer_event *event; 5479 int nr_loops = 0; 5480 5481 if (ts) 5482 *ts = 0; 5483 5484 cpu_buffer = iter->cpu_buffer; 5485 buffer = cpu_buffer->buffer; 5486 5487 /* 5488 * Check if someone performed a consuming read to the buffer 5489 * or removed some pages from the buffer. In these cases, 5490 * iterator was invalidated and we need to reset it. 5491 */ 5492 if (unlikely(iter->cache_read != cpu_buffer->read || 5493 iter->cache_reader_page != cpu_buffer->reader_page || 5494 iter->cache_pages_removed != cpu_buffer->pages_removed)) 5495 rb_iter_reset(iter); 5496 5497 again: 5498 if (ring_buffer_iter_empty(iter)) 5499 return NULL; 5500 5501 /* 5502 * As the writer can mess with what the iterator is trying 5503 * to read, just give up if we fail to get an event after 5504 * three tries. The iterator is not as reliable when reading 5505 * the ring buffer with an active write as the consumer is. 5506 * Do not warn if the three failures is reached. 5507 */ 5508 if (++nr_loops > 3) 5509 return NULL; 5510 5511 if (rb_per_cpu_empty(cpu_buffer)) 5512 return NULL; 5513 5514 if (iter->head >= rb_page_size(iter->head_page)) { 5515 rb_inc_iter(iter); 5516 goto again; 5517 } 5518 5519 event = rb_iter_head_event(iter); 5520 if (!event) 5521 goto again; 5522 5523 switch (event->type_len) { 5524 case RINGBUF_TYPE_PADDING: 5525 if (rb_null_event(event)) { 5526 rb_inc_iter(iter); 5527 goto again; 5528 } 5529 rb_advance_iter(iter); 5530 return event; 5531 5532 case RINGBUF_TYPE_TIME_EXTEND: 5533 /* Internal data, OK to advance */ 5534 rb_advance_iter(iter); 5535 goto again; 5536 5537 case RINGBUF_TYPE_TIME_STAMP: 5538 if (ts) { 5539 *ts = rb_event_time_stamp(event); 5540 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 5541 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5542 cpu_buffer->cpu, ts); 5543 } 5544 /* Internal data, OK to advance */ 5545 rb_advance_iter(iter); 5546 goto again; 5547 5548 case RINGBUF_TYPE_DATA: 5549 if (ts && !(*ts)) { 5550 *ts = iter->read_stamp + event->time_delta; 5551 ring_buffer_normalize_time_stamp(buffer, 5552 cpu_buffer->cpu, ts); 5553 } 5554 return event; 5555 5556 default: 5557 RB_WARN_ON(cpu_buffer, 1); 5558 } 5559 5560 return NULL; 5561 } 5562 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 5563 5564 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 5565 { 5566 if (likely(!in_nmi())) { 5567 raw_spin_lock(&cpu_buffer->reader_lock); 5568 return true; 5569 } 5570 5571 /* 5572 * If an NMI die dumps out the content of the ring buffer 5573 * trylock must be used to prevent a deadlock if the NMI 5574 * preempted a task that holds the ring buffer locks. If 5575 * we get the lock then all is fine, if not, then continue 5576 * to do the read, but this can corrupt the ring buffer, 5577 * so it must be permanently disabled from future writes. 5578 * Reading from NMI is a oneshot deal. 5579 */ 5580 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 5581 return true; 5582 5583 /* Continue without locking, but disable the ring buffer */ 5584 atomic_inc(&cpu_buffer->record_disabled); 5585 return false; 5586 } 5587 5588 static inline void 5589 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 5590 { 5591 if (likely(locked)) 5592 raw_spin_unlock(&cpu_buffer->reader_lock); 5593 } 5594 5595 /** 5596 * ring_buffer_peek - peek at the next event to be read 5597 * @buffer: The ring buffer to read 5598 * @cpu: The cpu to peak at 5599 * @ts: The timestamp counter of this event. 5600 * @lost_events: a variable to store if events were lost (may be NULL) 5601 * 5602 * This will return the event that will be read next, but does 5603 * not consume the data. 5604 */ 5605 struct ring_buffer_event * 5606 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 5607 unsigned long *lost_events) 5608 { 5609 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5610 struct ring_buffer_event *event; 5611 unsigned long flags; 5612 bool dolock; 5613 5614 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5615 return NULL; 5616 5617 again: 5618 local_irq_save(flags); 5619 dolock = rb_reader_lock(cpu_buffer); 5620 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5621 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5622 rb_advance_reader(cpu_buffer); 5623 rb_reader_unlock(cpu_buffer, dolock); 5624 local_irq_restore(flags); 5625 5626 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5627 goto again; 5628 5629 return event; 5630 } 5631 5632 /** ring_buffer_iter_dropped - report if there are dropped events 5633 * @iter: The ring buffer iterator 5634 * 5635 * Returns true if there was dropped events since the last peek. 5636 */ 5637 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 5638 { 5639 bool ret = iter->missed_events != 0; 5640 5641 iter->missed_events = 0; 5642 return ret; 5643 } 5644 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 5645 5646 /** 5647 * ring_buffer_iter_peek - peek at the next event to be read 5648 * @iter: The ring buffer iterator 5649 * @ts: The timestamp counter of this event. 5650 * 5651 * This will return the event that will be read next, but does 5652 * not increment the iterator. 5653 */ 5654 struct ring_buffer_event * 5655 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5656 { 5657 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5658 struct ring_buffer_event *event; 5659 unsigned long flags; 5660 5661 again: 5662 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5663 event = rb_iter_peek(iter, ts); 5664 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5665 5666 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5667 goto again; 5668 5669 return event; 5670 } 5671 5672 /** 5673 * ring_buffer_consume - return an event and consume it 5674 * @buffer: The ring buffer to get the next event from 5675 * @cpu: the cpu to read the buffer from 5676 * @ts: a variable to store the timestamp (may be NULL) 5677 * @lost_events: a variable to store if events were lost (may be NULL) 5678 * 5679 * Returns the next event in the ring buffer, and that event is consumed. 5680 * Meaning, that sequential reads will keep returning a different event, 5681 * and eventually empty the ring buffer if the producer is slower. 5682 */ 5683 struct ring_buffer_event * 5684 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5685 unsigned long *lost_events) 5686 { 5687 struct ring_buffer_per_cpu *cpu_buffer; 5688 struct ring_buffer_event *event = NULL; 5689 unsigned long flags; 5690 bool dolock; 5691 5692 again: 5693 /* might be called in atomic */ 5694 preempt_disable(); 5695 5696 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5697 goto out; 5698 5699 cpu_buffer = buffer->buffers[cpu]; 5700 local_irq_save(flags); 5701 dolock = rb_reader_lock(cpu_buffer); 5702 5703 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5704 if (event) { 5705 cpu_buffer->lost_events = 0; 5706 rb_advance_reader(cpu_buffer); 5707 } 5708 5709 rb_reader_unlock(cpu_buffer, dolock); 5710 local_irq_restore(flags); 5711 5712 out: 5713 preempt_enable(); 5714 5715 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5716 goto again; 5717 5718 return event; 5719 } 5720 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5721 5722 /** 5723 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 5724 * @buffer: The ring buffer to read from 5725 * @cpu: The cpu buffer to iterate over 5726 * @flags: gfp flags to use for memory allocation 5727 * 5728 * This performs the initial preparations necessary to iterate 5729 * through the buffer. Memory is allocated, buffer resizing 5730 * is disabled, and the iterator pointer is returned to the caller. 5731 * 5732 * After a sequence of ring_buffer_read_prepare calls, the user is 5733 * expected to make at least one call to ring_buffer_read_prepare_sync. 5734 * Afterwards, ring_buffer_read_start is invoked to get things going 5735 * for real. 5736 * 5737 * This overall must be paired with ring_buffer_read_finish. 5738 */ 5739 struct ring_buffer_iter * 5740 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 5741 { 5742 struct ring_buffer_per_cpu *cpu_buffer; 5743 struct ring_buffer_iter *iter; 5744 5745 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5746 return NULL; 5747 5748 iter = kzalloc(sizeof(*iter), flags); 5749 if (!iter) 5750 return NULL; 5751 5752 /* Holds the entire event: data and meta data */ 5753 iter->event_size = buffer->subbuf_size; 5754 iter->event = kmalloc(iter->event_size, flags); 5755 if (!iter->event) { 5756 kfree(iter); 5757 return NULL; 5758 } 5759 5760 cpu_buffer = buffer->buffers[cpu]; 5761 5762 iter->cpu_buffer = cpu_buffer; 5763 5764 atomic_inc(&cpu_buffer->resize_disabled); 5765 5766 return iter; 5767 } 5768 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 5769 5770 /** 5771 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 5772 * 5773 * All previously invoked ring_buffer_read_prepare calls to prepare 5774 * iterators will be synchronized. Afterwards, read_buffer_read_start 5775 * calls on those iterators are allowed. 5776 */ 5777 void 5778 ring_buffer_read_prepare_sync(void) 5779 { 5780 synchronize_rcu(); 5781 } 5782 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 5783 5784 /** 5785 * ring_buffer_read_start - start a non consuming read of the buffer 5786 * @iter: The iterator returned by ring_buffer_read_prepare 5787 * 5788 * This finalizes the startup of an iteration through the buffer. 5789 * The iterator comes from a call to ring_buffer_read_prepare and 5790 * an intervening ring_buffer_read_prepare_sync must have been 5791 * performed. 5792 * 5793 * Must be paired with ring_buffer_read_finish. 5794 */ 5795 void 5796 ring_buffer_read_start(struct ring_buffer_iter *iter) 5797 { 5798 struct ring_buffer_per_cpu *cpu_buffer; 5799 unsigned long flags; 5800 5801 if (!iter) 5802 return; 5803 5804 cpu_buffer = iter->cpu_buffer; 5805 5806 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5807 arch_spin_lock(&cpu_buffer->lock); 5808 rb_iter_reset(iter); 5809 arch_spin_unlock(&cpu_buffer->lock); 5810 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5811 } 5812 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5813 5814 /** 5815 * ring_buffer_read_finish - finish reading the iterator of the buffer 5816 * @iter: The iterator retrieved by ring_buffer_start 5817 * 5818 * This re-enables resizing of the buffer, and frees the iterator. 5819 */ 5820 void 5821 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5822 { 5823 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5824 unsigned long flags; 5825 5826 /* Use this opportunity to check the integrity of the ring buffer. */ 5827 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5828 rb_check_pages(cpu_buffer); 5829 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5830 5831 atomic_dec(&cpu_buffer->resize_disabled); 5832 kfree(iter->event); 5833 kfree(iter); 5834 } 5835 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 5836 5837 /** 5838 * ring_buffer_iter_advance - advance the iterator to the next location 5839 * @iter: The ring buffer iterator 5840 * 5841 * Move the location of the iterator such that the next read will 5842 * be the next location of the iterator. 5843 */ 5844 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 5845 { 5846 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5847 unsigned long flags; 5848 5849 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5850 5851 rb_advance_iter(iter); 5852 5853 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5854 } 5855 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 5856 5857 /** 5858 * ring_buffer_size - return the size of the ring buffer (in bytes) 5859 * @buffer: The ring buffer. 5860 * @cpu: The CPU to get ring buffer size from. 5861 */ 5862 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 5863 { 5864 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5865 return 0; 5866 5867 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 5868 } 5869 EXPORT_SYMBOL_GPL(ring_buffer_size); 5870 5871 /** 5872 * ring_buffer_max_event_size - return the max data size of an event 5873 * @buffer: The ring buffer. 5874 * 5875 * Returns the maximum size an event can be. 5876 */ 5877 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 5878 { 5879 /* If abs timestamp is requested, events have a timestamp too */ 5880 if (ring_buffer_time_stamp_abs(buffer)) 5881 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 5882 return buffer->max_data_size; 5883 } 5884 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 5885 5886 static void rb_clear_buffer_page(struct buffer_page *page) 5887 { 5888 local_set(&page->write, 0); 5889 local_set(&page->entries, 0); 5890 rb_init_page(page->page); 5891 page->read = 0; 5892 } 5893 5894 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 5895 { 5896 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 5897 5898 if (!meta) 5899 return; 5900 5901 meta->reader.read = cpu_buffer->reader_page->read; 5902 meta->reader.id = cpu_buffer->reader_page->id; 5903 meta->reader.lost_events = cpu_buffer->lost_events; 5904 5905 meta->entries = local_read(&cpu_buffer->entries); 5906 meta->overrun = local_read(&cpu_buffer->overrun); 5907 meta->read = cpu_buffer->read; 5908 5909 /* Some archs do not have data cache coherency between kernel and user-space */ 5910 flush_dcache_folio(virt_to_folio(cpu_buffer->meta_page)); 5911 } 5912 5913 static void 5914 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 5915 { 5916 struct buffer_page *page; 5917 5918 rb_head_page_deactivate(cpu_buffer); 5919 5920 cpu_buffer->head_page 5921 = list_entry(cpu_buffer->pages, struct buffer_page, list); 5922 rb_clear_buffer_page(cpu_buffer->head_page); 5923 list_for_each_entry(page, cpu_buffer->pages, list) { 5924 rb_clear_buffer_page(page); 5925 } 5926 5927 cpu_buffer->tail_page = cpu_buffer->head_page; 5928 cpu_buffer->commit_page = cpu_buffer->head_page; 5929 5930 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 5931 INIT_LIST_HEAD(&cpu_buffer->new_pages); 5932 rb_clear_buffer_page(cpu_buffer->reader_page); 5933 5934 local_set(&cpu_buffer->entries_bytes, 0); 5935 local_set(&cpu_buffer->overrun, 0); 5936 local_set(&cpu_buffer->commit_overrun, 0); 5937 local_set(&cpu_buffer->dropped_events, 0); 5938 local_set(&cpu_buffer->entries, 0); 5939 local_set(&cpu_buffer->committing, 0); 5940 local_set(&cpu_buffer->commits, 0); 5941 local_set(&cpu_buffer->pages_touched, 0); 5942 local_set(&cpu_buffer->pages_lost, 0); 5943 local_set(&cpu_buffer->pages_read, 0); 5944 cpu_buffer->last_pages_touch = 0; 5945 cpu_buffer->shortest_full = 0; 5946 cpu_buffer->read = 0; 5947 cpu_buffer->read_bytes = 0; 5948 5949 rb_time_set(&cpu_buffer->write_stamp, 0); 5950 rb_time_set(&cpu_buffer->before_stamp, 0); 5951 5952 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 5953 5954 cpu_buffer->lost_events = 0; 5955 cpu_buffer->last_overrun = 0; 5956 5957 rb_head_page_activate(cpu_buffer); 5958 cpu_buffer->pages_removed = 0; 5959 5960 if (cpu_buffer->mapped) { 5961 rb_update_meta_page(cpu_buffer); 5962 if (cpu_buffer->ring_meta) { 5963 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 5964 meta->commit_buffer = meta->head_buffer; 5965 } 5966 } 5967 } 5968 5969 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 5970 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 5971 { 5972 unsigned long flags; 5973 5974 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5975 5976 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 5977 goto out; 5978 5979 arch_spin_lock(&cpu_buffer->lock); 5980 5981 rb_reset_cpu(cpu_buffer); 5982 5983 arch_spin_unlock(&cpu_buffer->lock); 5984 5985 out: 5986 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5987 } 5988 5989 /** 5990 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 5991 * @buffer: The ring buffer to reset a per cpu buffer of 5992 * @cpu: The CPU buffer to be reset 5993 */ 5994 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 5995 { 5996 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5997 struct ring_buffer_meta *meta; 5998 5999 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6000 return; 6001 6002 /* prevent another thread from changing buffer sizes */ 6003 mutex_lock(&buffer->mutex); 6004 6005 atomic_inc(&cpu_buffer->resize_disabled); 6006 atomic_inc(&cpu_buffer->record_disabled); 6007 6008 /* Make sure all commits have finished */ 6009 synchronize_rcu(); 6010 6011 reset_disabled_cpu_buffer(cpu_buffer); 6012 6013 atomic_dec(&cpu_buffer->record_disabled); 6014 atomic_dec(&cpu_buffer->resize_disabled); 6015 6016 /* Make sure persistent meta now uses this buffer's addresses */ 6017 meta = rb_range_meta(buffer, 0, cpu_buffer->cpu); 6018 if (meta) 6019 rb_meta_init_text_addr(meta); 6020 6021 mutex_unlock(&buffer->mutex); 6022 } 6023 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 6024 6025 /* Flag to ensure proper resetting of atomic variables */ 6026 #define RESET_BIT (1 << 30) 6027 6028 /** 6029 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 6030 * @buffer: The ring buffer to reset a per cpu buffer of 6031 */ 6032 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 6033 { 6034 struct ring_buffer_per_cpu *cpu_buffer; 6035 struct ring_buffer_meta *meta; 6036 int cpu; 6037 6038 /* prevent another thread from changing buffer sizes */ 6039 mutex_lock(&buffer->mutex); 6040 6041 for_each_online_buffer_cpu(buffer, cpu) { 6042 cpu_buffer = buffer->buffers[cpu]; 6043 6044 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 6045 atomic_inc(&cpu_buffer->record_disabled); 6046 } 6047 6048 /* Make sure all commits have finished */ 6049 synchronize_rcu(); 6050 6051 for_each_buffer_cpu(buffer, cpu) { 6052 cpu_buffer = buffer->buffers[cpu]; 6053 6054 /* 6055 * If a CPU came online during the synchronize_rcu(), then 6056 * ignore it. 6057 */ 6058 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 6059 continue; 6060 6061 reset_disabled_cpu_buffer(cpu_buffer); 6062 6063 /* Make sure persistent meta now uses this buffer's addresses */ 6064 meta = rb_range_meta(buffer, 0, cpu_buffer->cpu); 6065 if (meta) 6066 rb_meta_init_text_addr(meta); 6067 6068 atomic_dec(&cpu_buffer->record_disabled); 6069 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 6070 } 6071 6072 mutex_unlock(&buffer->mutex); 6073 } 6074 6075 /** 6076 * ring_buffer_reset - reset a ring buffer 6077 * @buffer: The ring buffer to reset all cpu buffers 6078 */ 6079 void ring_buffer_reset(struct trace_buffer *buffer) 6080 { 6081 struct ring_buffer_per_cpu *cpu_buffer; 6082 int cpu; 6083 6084 /* prevent another thread from changing buffer sizes */ 6085 mutex_lock(&buffer->mutex); 6086 6087 for_each_buffer_cpu(buffer, cpu) { 6088 cpu_buffer = buffer->buffers[cpu]; 6089 6090 atomic_inc(&cpu_buffer->resize_disabled); 6091 atomic_inc(&cpu_buffer->record_disabled); 6092 } 6093 6094 /* Make sure all commits have finished */ 6095 synchronize_rcu(); 6096 6097 for_each_buffer_cpu(buffer, cpu) { 6098 cpu_buffer = buffer->buffers[cpu]; 6099 6100 reset_disabled_cpu_buffer(cpu_buffer); 6101 6102 atomic_dec(&cpu_buffer->record_disabled); 6103 atomic_dec(&cpu_buffer->resize_disabled); 6104 } 6105 6106 mutex_unlock(&buffer->mutex); 6107 } 6108 EXPORT_SYMBOL_GPL(ring_buffer_reset); 6109 6110 /** 6111 * ring_buffer_empty - is the ring buffer empty? 6112 * @buffer: The ring buffer to test 6113 */ 6114 bool ring_buffer_empty(struct trace_buffer *buffer) 6115 { 6116 struct ring_buffer_per_cpu *cpu_buffer; 6117 unsigned long flags; 6118 bool dolock; 6119 bool ret; 6120 int cpu; 6121 6122 /* yes this is racy, but if you don't like the race, lock the buffer */ 6123 for_each_buffer_cpu(buffer, cpu) { 6124 cpu_buffer = buffer->buffers[cpu]; 6125 local_irq_save(flags); 6126 dolock = rb_reader_lock(cpu_buffer); 6127 ret = rb_per_cpu_empty(cpu_buffer); 6128 rb_reader_unlock(cpu_buffer, dolock); 6129 local_irq_restore(flags); 6130 6131 if (!ret) 6132 return false; 6133 } 6134 6135 return true; 6136 } 6137 EXPORT_SYMBOL_GPL(ring_buffer_empty); 6138 6139 /** 6140 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 6141 * @buffer: The ring buffer 6142 * @cpu: The CPU buffer to test 6143 */ 6144 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 6145 { 6146 struct ring_buffer_per_cpu *cpu_buffer; 6147 unsigned long flags; 6148 bool dolock; 6149 bool ret; 6150 6151 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6152 return true; 6153 6154 cpu_buffer = buffer->buffers[cpu]; 6155 local_irq_save(flags); 6156 dolock = rb_reader_lock(cpu_buffer); 6157 ret = rb_per_cpu_empty(cpu_buffer); 6158 rb_reader_unlock(cpu_buffer, dolock); 6159 local_irq_restore(flags); 6160 6161 return ret; 6162 } 6163 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 6164 6165 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 6166 /** 6167 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 6168 * @buffer_a: One buffer to swap with 6169 * @buffer_b: The other buffer to swap with 6170 * @cpu: the CPU of the buffers to swap 6171 * 6172 * This function is useful for tracers that want to take a "snapshot" 6173 * of a CPU buffer and has another back up buffer lying around. 6174 * it is expected that the tracer handles the cpu buffer not being 6175 * used at the moment. 6176 */ 6177 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 6178 struct trace_buffer *buffer_b, int cpu) 6179 { 6180 struct ring_buffer_per_cpu *cpu_buffer_a; 6181 struct ring_buffer_per_cpu *cpu_buffer_b; 6182 int ret = -EINVAL; 6183 6184 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 6185 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 6186 goto out; 6187 6188 cpu_buffer_a = buffer_a->buffers[cpu]; 6189 cpu_buffer_b = buffer_b->buffers[cpu]; 6190 6191 /* It's up to the callers to not try to swap mapped buffers */ 6192 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) { 6193 ret = -EBUSY; 6194 goto out; 6195 } 6196 6197 /* At least make sure the two buffers are somewhat the same */ 6198 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 6199 goto out; 6200 6201 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 6202 goto out; 6203 6204 ret = -EAGAIN; 6205 6206 if (atomic_read(&buffer_a->record_disabled)) 6207 goto out; 6208 6209 if (atomic_read(&buffer_b->record_disabled)) 6210 goto out; 6211 6212 if (atomic_read(&cpu_buffer_a->record_disabled)) 6213 goto out; 6214 6215 if (atomic_read(&cpu_buffer_b->record_disabled)) 6216 goto out; 6217 6218 /* 6219 * We can't do a synchronize_rcu here because this 6220 * function can be called in atomic context. 6221 * Normally this will be called from the same CPU as cpu. 6222 * If not it's up to the caller to protect this. 6223 */ 6224 atomic_inc(&cpu_buffer_a->record_disabled); 6225 atomic_inc(&cpu_buffer_b->record_disabled); 6226 6227 ret = -EBUSY; 6228 if (local_read(&cpu_buffer_a->committing)) 6229 goto out_dec; 6230 if (local_read(&cpu_buffer_b->committing)) 6231 goto out_dec; 6232 6233 /* 6234 * When resize is in progress, we cannot swap it because 6235 * it will mess the state of the cpu buffer. 6236 */ 6237 if (atomic_read(&buffer_a->resizing)) 6238 goto out_dec; 6239 if (atomic_read(&buffer_b->resizing)) 6240 goto out_dec; 6241 6242 buffer_a->buffers[cpu] = cpu_buffer_b; 6243 buffer_b->buffers[cpu] = cpu_buffer_a; 6244 6245 cpu_buffer_b->buffer = buffer_a; 6246 cpu_buffer_a->buffer = buffer_b; 6247 6248 ret = 0; 6249 6250 out_dec: 6251 atomic_dec(&cpu_buffer_a->record_disabled); 6252 atomic_dec(&cpu_buffer_b->record_disabled); 6253 out: 6254 return ret; 6255 } 6256 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 6257 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 6258 6259 /** 6260 * ring_buffer_alloc_read_page - allocate a page to read from buffer 6261 * @buffer: the buffer to allocate for. 6262 * @cpu: the cpu buffer to allocate. 6263 * 6264 * This function is used in conjunction with ring_buffer_read_page. 6265 * When reading a full page from the ring buffer, these functions 6266 * can be used to speed up the process. The calling function should 6267 * allocate a few pages first with this function. Then when it 6268 * needs to get pages from the ring buffer, it passes the result 6269 * of this function into ring_buffer_read_page, which will swap 6270 * the page that was allocated, with the read page of the buffer. 6271 * 6272 * Returns: 6273 * The page allocated, or ERR_PTR 6274 */ 6275 struct buffer_data_read_page * 6276 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 6277 { 6278 struct ring_buffer_per_cpu *cpu_buffer; 6279 struct buffer_data_read_page *bpage = NULL; 6280 unsigned long flags; 6281 struct page *page; 6282 6283 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6284 return ERR_PTR(-ENODEV); 6285 6286 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 6287 if (!bpage) 6288 return ERR_PTR(-ENOMEM); 6289 6290 bpage->order = buffer->subbuf_order; 6291 cpu_buffer = buffer->buffers[cpu]; 6292 local_irq_save(flags); 6293 arch_spin_lock(&cpu_buffer->lock); 6294 6295 if (cpu_buffer->free_page) { 6296 bpage->data = cpu_buffer->free_page; 6297 cpu_buffer->free_page = NULL; 6298 } 6299 6300 arch_spin_unlock(&cpu_buffer->lock); 6301 local_irq_restore(flags); 6302 6303 if (bpage->data) 6304 goto out; 6305 6306 page = alloc_pages_node(cpu_to_node(cpu), 6307 GFP_KERNEL | __GFP_NORETRY | __GFP_COMP | __GFP_ZERO, 6308 cpu_buffer->buffer->subbuf_order); 6309 if (!page) { 6310 kfree(bpage); 6311 return ERR_PTR(-ENOMEM); 6312 } 6313 6314 bpage->data = page_address(page); 6315 6316 out: 6317 rb_init_page(bpage->data); 6318 6319 return bpage; 6320 } 6321 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 6322 6323 /** 6324 * ring_buffer_free_read_page - free an allocated read page 6325 * @buffer: the buffer the page was allocate for 6326 * @cpu: the cpu buffer the page came from 6327 * @data_page: the page to free 6328 * 6329 * Free a page allocated from ring_buffer_alloc_read_page. 6330 */ 6331 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 6332 struct buffer_data_read_page *data_page) 6333 { 6334 struct ring_buffer_per_cpu *cpu_buffer; 6335 struct buffer_data_page *bpage = data_page->data; 6336 struct page *page = virt_to_page(bpage); 6337 unsigned long flags; 6338 6339 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 6340 return; 6341 6342 cpu_buffer = buffer->buffers[cpu]; 6343 6344 /* 6345 * If the page is still in use someplace else, or order of the page 6346 * is different from the subbuffer order of the buffer - 6347 * we can't reuse it 6348 */ 6349 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 6350 goto out; 6351 6352 local_irq_save(flags); 6353 arch_spin_lock(&cpu_buffer->lock); 6354 6355 if (!cpu_buffer->free_page) { 6356 cpu_buffer->free_page = bpage; 6357 bpage = NULL; 6358 } 6359 6360 arch_spin_unlock(&cpu_buffer->lock); 6361 local_irq_restore(flags); 6362 6363 out: 6364 free_pages((unsigned long)bpage, data_page->order); 6365 kfree(data_page); 6366 } 6367 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 6368 6369 /** 6370 * ring_buffer_read_page - extract a page from the ring buffer 6371 * @buffer: buffer to extract from 6372 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 6373 * @len: amount to extract 6374 * @cpu: the cpu of the buffer to extract 6375 * @full: should the extraction only happen when the page is full. 6376 * 6377 * This function will pull out a page from the ring buffer and consume it. 6378 * @data_page must be the address of the variable that was returned 6379 * from ring_buffer_alloc_read_page. This is because the page might be used 6380 * to swap with a page in the ring buffer. 6381 * 6382 * for example: 6383 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 6384 * if (IS_ERR(rpage)) 6385 * return PTR_ERR(rpage); 6386 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 6387 * if (ret >= 0) 6388 * process_page(ring_buffer_read_page_data(rpage), ret); 6389 * ring_buffer_free_read_page(buffer, cpu, rpage); 6390 * 6391 * When @full is set, the function will not return true unless 6392 * the writer is off the reader page. 6393 * 6394 * Note: it is up to the calling functions to handle sleeps and wakeups. 6395 * The ring buffer can be used anywhere in the kernel and can not 6396 * blindly call wake_up. The layer that uses the ring buffer must be 6397 * responsible for that. 6398 * 6399 * Returns: 6400 * >=0 if data has been transferred, returns the offset of consumed data. 6401 * <0 if no data has been transferred. 6402 */ 6403 int ring_buffer_read_page(struct trace_buffer *buffer, 6404 struct buffer_data_read_page *data_page, 6405 size_t len, int cpu, int full) 6406 { 6407 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6408 struct ring_buffer_event *event; 6409 struct buffer_data_page *bpage; 6410 struct buffer_page *reader; 6411 unsigned long missed_events; 6412 unsigned long flags; 6413 unsigned int commit; 6414 unsigned int read; 6415 u64 save_timestamp; 6416 int ret = -1; 6417 6418 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6419 goto out; 6420 6421 /* 6422 * If len is not big enough to hold the page header, then 6423 * we can not copy anything. 6424 */ 6425 if (len <= BUF_PAGE_HDR_SIZE) 6426 goto out; 6427 6428 len -= BUF_PAGE_HDR_SIZE; 6429 6430 if (!data_page || !data_page->data) 6431 goto out; 6432 if (data_page->order != buffer->subbuf_order) 6433 goto out; 6434 6435 bpage = data_page->data; 6436 if (!bpage) 6437 goto out; 6438 6439 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6440 6441 reader = rb_get_reader_page(cpu_buffer); 6442 if (!reader) 6443 goto out_unlock; 6444 6445 event = rb_reader_event(cpu_buffer); 6446 6447 read = reader->read; 6448 commit = rb_page_size(reader); 6449 6450 /* Check if any events were dropped */ 6451 missed_events = cpu_buffer->lost_events; 6452 6453 /* 6454 * If this page has been partially read or 6455 * if len is not big enough to read the rest of the page or 6456 * a writer is still on the page, then 6457 * we must copy the data from the page to the buffer. 6458 * Otherwise, we can simply swap the page with the one passed in. 6459 */ 6460 if (read || (len < (commit - read)) || 6461 cpu_buffer->reader_page == cpu_buffer->commit_page || 6462 cpu_buffer->mapped) { 6463 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 6464 unsigned int rpos = read; 6465 unsigned int pos = 0; 6466 unsigned int size; 6467 6468 /* 6469 * If a full page is expected, this can still be returned 6470 * if there's been a previous partial read and the 6471 * rest of the page can be read and the commit page is off 6472 * the reader page. 6473 */ 6474 if (full && 6475 (!read || (len < (commit - read)) || 6476 cpu_buffer->reader_page == cpu_buffer->commit_page)) 6477 goto out_unlock; 6478 6479 if (len > (commit - read)) 6480 len = (commit - read); 6481 6482 /* Always keep the time extend and data together */ 6483 size = rb_event_ts_length(event); 6484 6485 if (len < size) 6486 goto out_unlock; 6487 6488 /* save the current timestamp, since the user will need it */ 6489 save_timestamp = cpu_buffer->read_stamp; 6490 6491 /* Need to copy one event at a time */ 6492 do { 6493 /* We need the size of one event, because 6494 * rb_advance_reader only advances by one event, 6495 * whereas rb_event_ts_length may include the size of 6496 * one or two events. 6497 * We have already ensured there's enough space if this 6498 * is a time extend. */ 6499 size = rb_event_length(event); 6500 memcpy(bpage->data + pos, rpage->data + rpos, size); 6501 6502 len -= size; 6503 6504 rb_advance_reader(cpu_buffer); 6505 rpos = reader->read; 6506 pos += size; 6507 6508 if (rpos >= commit) 6509 break; 6510 6511 event = rb_reader_event(cpu_buffer); 6512 /* Always keep the time extend and data together */ 6513 size = rb_event_ts_length(event); 6514 } while (len >= size); 6515 6516 /* update bpage */ 6517 local_set(&bpage->commit, pos); 6518 bpage->time_stamp = save_timestamp; 6519 6520 /* we copied everything to the beginning */ 6521 read = 0; 6522 } else { 6523 /* update the entry counter */ 6524 cpu_buffer->read += rb_page_entries(reader); 6525 cpu_buffer->read_bytes += rb_page_size(reader); 6526 6527 /* swap the pages */ 6528 rb_init_page(bpage); 6529 bpage = reader->page; 6530 reader->page = data_page->data; 6531 local_set(&reader->write, 0); 6532 local_set(&reader->entries, 0); 6533 reader->read = 0; 6534 data_page->data = bpage; 6535 6536 /* 6537 * Use the real_end for the data size, 6538 * This gives us a chance to store the lost events 6539 * on the page. 6540 */ 6541 if (reader->real_end) 6542 local_set(&bpage->commit, reader->real_end); 6543 } 6544 ret = read; 6545 6546 cpu_buffer->lost_events = 0; 6547 6548 commit = local_read(&bpage->commit); 6549 /* 6550 * Set a flag in the commit field if we lost events 6551 */ 6552 if (missed_events) { 6553 /* If there is room at the end of the page to save the 6554 * missed events, then record it there. 6555 */ 6556 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 6557 memcpy(&bpage->data[commit], &missed_events, 6558 sizeof(missed_events)); 6559 local_add(RB_MISSED_STORED, &bpage->commit); 6560 commit += sizeof(missed_events); 6561 } 6562 local_add(RB_MISSED_EVENTS, &bpage->commit); 6563 } 6564 6565 /* 6566 * This page may be off to user land. Zero it out here. 6567 */ 6568 if (commit < buffer->subbuf_size) 6569 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 6570 6571 out_unlock: 6572 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6573 6574 out: 6575 return ret; 6576 } 6577 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 6578 6579 /** 6580 * ring_buffer_read_page_data - get pointer to the data in the page. 6581 * @page: the page to get the data from 6582 * 6583 * Returns pointer to the actual data in this page. 6584 */ 6585 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 6586 { 6587 return page->data; 6588 } 6589 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 6590 6591 /** 6592 * ring_buffer_subbuf_size_get - get size of the sub buffer. 6593 * @buffer: the buffer to get the sub buffer size from 6594 * 6595 * Returns size of the sub buffer, in bytes. 6596 */ 6597 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 6598 { 6599 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6600 } 6601 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 6602 6603 /** 6604 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 6605 * @buffer: The ring_buffer to get the system sub page order from 6606 * 6607 * By default, one ring buffer sub page equals to one system page. This parameter 6608 * is configurable, per ring buffer. The size of the ring buffer sub page can be 6609 * extended, but must be an order of system page size. 6610 * 6611 * Returns the order of buffer sub page size, in system pages: 6612 * 0 means the sub buffer size is 1 system page and so forth. 6613 * In case of an error < 0 is returned. 6614 */ 6615 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 6616 { 6617 if (!buffer) 6618 return -EINVAL; 6619 6620 return buffer->subbuf_order; 6621 } 6622 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 6623 6624 /** 6625 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 6626 * @buffer: The ring_buffer to set the new page size. 6627 * @order: Order of the system pages in one sub buffer page 6628 * 6629 * By default, one ring buffer pages equals to one system page. This API can be 6630 * used to set new size of the ring buffer page. The size must be order of 6631 * system page size, that's why the input parameter @order is the order of 6632 * system pages that are allocated for one ring buffer page: 6633 * 0 - 1 system page 6634 * 1 - 2 system pages 6635 * 3 - 4 system pages 6636 * ... 6637 * 6638 * Returns 0 on success or < 0 in case of an error. 6639 */ 6640 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 6641 { 6642 struct ring_buffer_per_cpu *cpu_buffer; 6643 struct buffer_page *bpage, *tmp; 6644 int old_order, old_size; 6645 int nr_pages; 6646 int psize; 6647 int err; 6648 int cpu; 6649 6650 if (!buffer || order < 0) 6651 return -EINVAL; 6652 6653 if (buffer->subbuf_order == order) 6654 return 0; 6655 6656 psize = (1 << order) * PAGE_SIZE; 6657 if (psize <= BUF_PAGE_HDR_SIZE) 6658 return -EINVAL; 6659 6660 /* Size of a subbuf cannot be greater than the write counter */ 6661 if (psize > RB_WRITE_MASK + 1) 6662 return -EINVAL; 6663 6664 old_order = buffer->subbuf_order; 6665 old_size = buffer->subbuf_size; 6666 6667 /* prevent another thread from changing buffer sizes */ 6668 mutex_lock(&buffer->mutex); 6669 atomic_inc(&buffer->record_disabled); 6670 6671 /* Make sure all commits have finished */ 6672 synchronize_rcu(); 6673 6674 buffer->subbuf_order = order; 6675 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 6676 6677 /* Make sure all new buffers are allocated, before deleting the old ones */ 6678 for_each_buffer_cpu(buffer, cpu) { 6679 6680 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6681 continue; 6682 6683 cpu_buffer = buffer->buffers[cpu]; 6684 6685 if (cpu_buffer->mapped) { 6686 err = -EBUSY; 6687 goto error; 6688 } 6689 6690 /* Update the number of pages to match the new size */ 6691 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 6692 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 6693 6694 /* we need a minimum of two pages */ 6695 if (nr_pages < 2) 6696 nr_pages = 2; 6697 6698 cpu_buffer->nr_pages_to_update = nr_pages; 6699 6700 /* Include the reader page */ 6701 nr_pages++; 6702 6703 /* Allocate the new size buffer */ 6704 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6705 if (__rb_allocate_pages(cpu_buffer, nr_pages, 6706 &cpu_buffer->new_pages)) { 6707 /* not enough memory for new pages */ 6708 err = -ENOMEM; 6709 goto error; 6710 } 6711 } 6712 6713 for_each_buffer_cpu(buffer, cpu) { 6714 6715 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6716 continue; 6717 6718 cpu_buffer = buffer->buffers[cpu]; 6719 6720 /* Clear the head bit to make the link list normal to read */ 6721 rb_head_page_deactivate(cpu_buffer); 6722 6723 /* Now walk the list and free all the old sub buffers */ 6724 list_for_each_entry_safe(bpage, tmp, cpu_buffer->pages, list) { 6725 list_del_init(&bpage->list); 6726 free_buffer_page(bpage); 6727 } 6728 /* The above loop stopped an the last page needing to be freed */ 6729 bpage = list_entry(cpu_buffer->pages, struct buffer_page, list); 6730 free_buffer_page(bpage); 6731 6732 /* Free the current reader page */ 6733 free_buffer_page(cpu_buffer->reader_page); 6734 6735 /* One page was allocated for the reader page */ 6736 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 6737 struct buffer_page, list); 6738 list_del_init(&cpu_buffer->reader_page->list); 6739 6740 /* The cpu_buffer pages are a link list with no head */ 6741 cpu_buffer->pages = cpu_buffer->new_pages.next; 6742 cpu_buffer->new_pages.next->prev = cpu_buffer->new_pages.prev; 6743 cpu_buffer->new_pages.prev->next = cpu_buffer->new_pages.next; 6744 6745 /* Clear the new_pages list */ 6746 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6747 6748 cpu_buffer->head_page 6749 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6750 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 6751 6752 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 6753 cpu_buffer->nr_pages_to_update = 0; 6754 6755 free_pages((unsigned long)cpu_buffer->free_page, old_order); 6756 cpu_buffer->free_page = NULL; 6757 6758 rb_head_page_activate(cpu_buffer); 6759 6760 rb_check_pages(cpu_buffer); 6761 } 6762 6763 atomic_dec(&buffer->record_disabled); 6764 mutex_unlock(&buffer->mutex); 6765 6766 return 0; 6767 6768 error: 6769 buffer->subbuf_order = old_order; 6770 buffer->subbuf_size = old_size; 6771 6772 atomic_dec(&buffer->record_disabled); 6773 mutex_unlock(&buffer->mutex); 6774 6775 for_each_buffer_cpu(buffer, cpu) { 6776 cpu_buffer = buffer->buffers[cpu]; 6777 6778 if (!cpu_buffer->nr_pages_to_update) 6779 continue; 6780 6781 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 6782 list_del_init(&bpage->list); 6783 free_buffer_page(bpage); 6784 } 6785 } 6786 6787 return err; 6788 } 6789 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 6790 6791 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6792 { 6793 struct page *page; 6794 6795 if (cpu_buffer->meta_page) 6796 return 0; 6797 6798 page = alloc_page(GFP_USER | __GFP_ZERO); 6799 if (!page) 6800 return -ENOMEM; 6801 6802 cpu_buffer->meta_page = page_to_virt(page); 6803 6804 return 0; 6805 } 6806 6807 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6808 { 6809 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 6810 6811 free_page(addr); 6812 cpu_buffer->meta_page = NULL; 6813 } 6814 6815 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 6816 unsigned long *subbuf_ids) 6817 { 6818 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6819 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 6820 struct buffer_page *first_subbuf, *subbuf; 6821 int id = 0; 6822 6823 subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page; 6824 cpu_buffer->reader_page->id = id++; 6825 6826 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 6827 do { 6828 if (WARN_ON(id >= nr_subbufs)) 6829 break; 6830 6831 subbuf_ids[id] = (unsigned long)subbuf->page; 6832 subbuf->id = id; 6833 6834 rb_inc_page(&subbuf); 6835 id++; 6836 } while (subbuf != first_subbuf); 6837 6838 /* install subbuf ID to kern VA translation */ 6839 cpu_buffer->subbuf_ids = subbuf_ids; 6840 6841 meta->meta_page_size = PAGE_SIZE; 6842 meta->meta_struct_len = sizeof(*meta); 6843 meta->nr_subbufs = nr_subbufs; 6844 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6845 6846 rb_update_meta_page(cpu_buffer); 6847 } 6848 6849 static struct ring_buffer_per_cpu * 6850 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 6851 { 6852 struct ring_buffer_per_cpu *cpu_buffer; 6853 6854 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6855 return ERR_PTR(-EINVAL); 6856 6857 cpu_buffer = buffer->buffers[cpu]; 6858 6859 mutex_lock(&cpu_buffer->mapping_lock); 6860 6861 if (!cpu_buffer->user_mapped) { 6862 mutex_unlock(&cpu_buffer->mapping_lock); 6863 return ERR_PTR(-ENODEV); 6864 } 6865 6866 return cpu_buffer; 6867 } 6868 6869 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6870 { 6871 mutex_unlock(&cpu_buffer->mapping_lock); 6872 } 6873 6874 /* 6875 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 6876 * to be set-up or torn-down. 6877 */ 6878 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 6879 bool inc) 6880 { 6881 unsigned long flags; 6882 6883 lockdep_assert_held(&cpu_buffer->mapping_lock); 6884 6885 /* mapped is always greater or equal to user_mapped */ 6886 if (WARN_ON(cpu_buffer->mapped < cpu_buffer->user_mapped)) 6887 return -EINVAL; 6888 6889 if (inc && cpu_buffer->mapped == UINT_MAX) 6890 return -EBUSY; 6891 6892 if (WARN_ON(!inc && cpu_buffer->user_mapped == 0)) 6893 return -EINVAL; 6894 6895 mutex_lock(&cpu_buffer->buffer->mutex); 6896 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6897 6898 if (inc) { 6899 cpu_buffer->user_mapped++; 6900 cpu_buffer->mapped++; 6901 } else { 6902 cpu_buffer->user_mapped--; 6903 cpu_buffer->mapped--; 6904 } 6905 6906 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6907 mutex_unlock(&cpu_buffer->buffer->mutex); 6908 6909 return 0; 6910 } 6911 6912 /* 6913 * +--------------+ pgoff == 0 6914 * | meta page | 6915 * +--------------+ pgoff == 1 6916 * | subbuffer 0 | 6917 * | | 6918 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 6919 * | subbuffer 1 | 6920 * | | 6921 * ... 6922 */ 6923 #ifdef CONFIG_MMU 6924 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 6925 struct vm_area_struct *vma) 6926 { 6927 unsigned long nr_subbufs, nr_pages, nr_vma_pages, pgoff = vma->vm_pgoff; 6928 unsigned int subbuf_pages, subbuf_order; 6929 struct page **pages; 6930 int p = 0, s = 0; 6931 int err; 6932 6933 /* Refuse MP_PRIVATE or writable mappings */ 6934 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 6935 !(vma->vm_flags & VM_MAYSHARE)) 6936 return -EPERM; 6937 6938 /* 6939 * Make sure the mapping cannot become writable later. Also tell the VM 6940 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 6941 */ 6942 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 6943 VM_MAYWRITE); 6944 6945 lockdep_assert_held(&cpu_buffer->mapping_lock); 6946 6947 subbuf_order = cpu_buffer->buffer->subbuf_order; 6948 subbuf_pages = 1 << subbuf_order; 6949 6950 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 6951 nr_pages = ((nr_subbufs) << subbuf_order) - pgoff + 1; /* + meta-page */ 6952 6953 nr_vma_pages = vma_pages(vma); 6954 if (!nr_vma_pages || nr_vma_pages > nr_pages) 6955 return -EINVAL; 6956 6957 nr_pages = nr_vma_pages; 6958 6959 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL); 6960 if (!pages) 6961 return -ENOMEM; 6962 6963 if (!pgoff) { 6964 pages[p++] = virt_to_page(cpu_buffer->meta_page); 6965 6966 /* 6967 * TODO: Align sub-buffers on their size, once 6968 * vm_insert_pages() supports the zero-page. 6969 */ 6970 } else { 6971 /* Skip the meta-page */ 6972 pgoff--; 6973 6974 if (pgoff % subbuf_pages) { 6975 err = -EINVAL; 6976 goto out; 6977 } 6978 6979 s += pgoff / subbuf_pages; 6980 } 6981 6982 while (p < nr_pages) { 6983 struct page *page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]); 6984 int off = 0; 6985 6986 if (WARN_ON_ONCE(s >= nr_subbufs)) { 6987 err = -EINVAL; 6988 goto out; 6989 } 6990 6991 for (; off < (1 << (subbuf_order)); off++, page++) { 6992 if (p >= nr_pages) 6993 break; 6994 6995 pages[p++] = page; 6996 } 6997 s++; 6998 } 6999 7000 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 7001 7002 out: 7003 kfree(pages); 7004 7005 return err; 7006 } 7007 #else 7008 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7009 struct vm_area_struct *vma) 7010 { 7011 return -EOPNOTSUPP; 7012 } 7013 #endif 7014 7015 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 7016 struct vm_area_struct *vma) 7017 { 7018 struct ring_buffer_per_cpu *cpu_buffer; 7019 unsigned long flags, *subbuf_ids; 7020 int err = 0; 7021 7022 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7023 return -EINVAL; 7024 7025 cpu_buffer = buffer->buffers[cpu]; 7026 7027 mutex_lock(&cpu_buffer->mapping_lock); 7028 7029 if (cpu_buffer->user_mapped) { 7030 err = __rb_map_vma(cpu_buffer, vma); 7031 if (!err) 7032 err = __rb_inc_dec_mapped(cpu_buffer, true); 7033 mutex_unlock(&cpu_buffer->mapping_lock); 7034 return err; 7035 } 7036 7037 /* prevent another thread from changing buffer/sub-buffer sizes */ 7038 mutex_lock(&buffer->mutex); 7039 7040 err = rb_alloc_meta_page(cpu_buffer); 7041 if (err) 7042 goto unlock; 7043 7044 /* subbuf_ids include the reader while nr_pages does not */ 7045 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 7046 if (!subbuf_ids) { 7047 rb_free_meta_page(cpu_buffer); 7048 err = -ENOMEM; 7049 goto unlock; 7050 } 7051 7052 atomic_inc(&cpu_buffer->resize_disabled); 7053 7054 /* 7055 * Lock all readers to block any subbuf swap until the subbuf IDs are 7056 * assigned. 7057 */ 7058 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7059 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 7060 7061 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7062 7063 err = __rb_map_vma(cpu_buffer, vma); 7064 if (!err) { 7065 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7066 /* This is the first time it is mapped by user */ 7067 cpu_buffer->mapped++; 7068 cpu_buffer->user_mapped = 1; 7069 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7070 } else { 7071 kfree(cpu_buffer->subbuf_ids); 7072 cpu_buffer->subbuf_ids = NULL; 7073 rb_free_meta_page(cpu_buffer); 7074 } 7075 7076 unlock: 7077 mutex_unlock(&buffer->mutex); 7078 mutex_unlock(&cpu_buffer->mapping_lock); 7079 7080 return err; 7081 } 7082 7083 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 7084 { 7085 struct ring_buffer_per_cpu *cpu_buffer; 7086 unsigned long flags; 7087 int err = 0; 7088 7089 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7090 return -EINVAL; 7091 7092 cpu_buffer = buffer->buffers[cpu]; 7093 7094 mutex_lock(&cpu_buffer->mapping_lock); 7095 7096 if (!cpu_buffer->user_mapped) { 7097 err = -ENODEV; 7098 goto out; 7099 } else if (cpu_buffer->user_mapped > 1) { 7100 __rb_inc_dec_mapped(cpu_buffer, false); 7101 goto out; 7102 } 7103 7104 mutex_lock(&buffer->mutex); 7105 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7106 7107 /* This is the last user space mapping */ 7108 if (!WARN_ON_ONCE(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7109 cpu_buffer->mapped--; 7110 cpu_buffer->user_mapped = 0; 7111 7112 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7113 7114 kfree(cpu_buffer->subbuf_ids); 7115 cpu_buffer->subbuf_ids = NULL; 7116 rb_free_meta_page(cpu_buffer); 7117 atomic_dec(&cpu_buffer->resize_disabled); 7118 7119 mutex_unlock(&buffer->mutex); 7120 7121 out: 7122 mutex_unlock(&cpu_buffer->mapping_lock); 7123 7124 return err; 7125 } 7126 7127 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 7128 { 7129 struct ring_buffer_per_cpu *cpu_buffer; 7130 struct buffer_page *reader; 7131 unsigned long missed_events; 7132 unsigned long reader_size; 7133 unsigned long flags; 7134 7135 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 7136 if (IS_ERR(cpu_buffer)) 7137 return (int)PTR_ERR(cpu_buffer); 7138 7139 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7140 7141 consume: 7142 if (rb_per_cpu_empty(cpu_buffer)) 7143 goto out; 7144 7145 reader_size = rb_page_size(cpu_buffer->reader_page); 7146 7147 /* 7148 * There are data to be read on the current reader page, we can 7149 * return to the caller. But before that, we assume the latter will read 7150 * everything. Let's update the kernel reader accordingly. 7151 */ 7152 if (cpu_buffer->reader_page->read < reader_size) { 7153 while (cpu_buffer->reader_page->read < reader_size) 7154 rb_advance_reader(cpu_buffer); 7155 goto out; 7156 } 7157 7158 reader = rb_get_reader_page(cpu_buffer); 7159 if (WARN_ON(!reader)) 7160 goto out; 7161 7162 /* Check if any events were dropped */ 7163 missed_events = cpu_buffer->lost_events; 7164 7165 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 7166 if (missed_events) { 7167 struct buffer_data_page *bpage = reader->page; 7168 unsigned int commit; 7169 /* 7170 * Use the real_end for the data size, 7171 * This gives us a chance to store the lost events 7172 * on the page. 7173 */ 7174 if (reader->real_end) 7175 local_set(&bpage->commit, reader->real_end); 7176 /* 7177 * If there is room at the end of the page to save the 7178 * missed events, then record it there. 7179 */ 7180 commit = rb_page_size(reader); 7181 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7182 memcpy(&bpage->data[commit], &missed_events, 7183 sizeof(missed_events)); 7184 local_add(RB_MISSED_STORED, &bpage->commit); 7185 } 7186 local_add(RB_MISSED_EVENTS, &bpage->commit); 7187 } 7188 } else { 7189 /* 7190 * There really shouldn't be any missed events if the commit 7191 * is on the reader page. 7192 */ 7193 WARN_ON_ONCE(missed_events); 7194 } 7195 7196 cpu_buffer->lost_events = 0; 7197 7198 goto consume; 7199 7200 out: 7201 /* Some archs do not have data cache coherency between kernel and user-space */ 7202 flush_dcache_folio(virt_to_folio(cpu_buffer->reader_page->page)); 7203 7204 rb_update_meta_page(cpu_buffer); 7205 7206 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7207 rb_put_mapped_buffer(cpu_buffer); 7208 7209 return 0; 7210 } 7211 7212 /* 7213 * We only allocate new buffers, never free them if the CPU goes down. 7214 * If we were to free the buffer, then the user would lose any trace that was in 7215 * the buffer. 7216 */ 7217 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 7218 { 7219 struct trace_buffer *buffer; 7220 long nr_pages_same; 7221 int cpu_i; 7222 unsigned long nr_pages; 7223 7224 buffer = container_of(node, struct trace_buffer, node); 7225 if (cpumask_test_cpu(cpu, buffer->cpumask)) 7226 return 0; 7227 7228 nr_pages = 0; 7229 nr_pages_same = 1; 7230 /* check if all cpu sizes are same */ 7231 for_each_buffer_cpu(buffer, cpu_i) { 7232 /* fill in the size from first enabled cpu */ 7233 if (nr_pages == 0) 7234 nr_pages = buffer->buffers[cpu_i]->nr_pages; 7235 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 7236 nr_pages_same = 0; 7237 break; 7238 } 7239 } 7240 /* allocate minimum pages, user can later expand it */ 7241 if (!nr_pages_same) 7242 nr_pages = 2; 7243 buffer->buffers[cpu] = 7244 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 7245 if (!buffer->buffers[cpu]) { 7246 WARN(1, "failed to allocate ring buffer on CPU %u\n", 7247 cpu); 7248 return -ENOMEM; 7249 } 7250 smp_wmb(); 7251 cpumask_set_cpu(cpu, buffer->cpumask); 7252 return 0; 7253 } 7254 7255 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 7256 /* 7257 * This is a basic integrity check of the ring buffer. 7258 * Late in the boot cycle this test will run when configured in. 7259 * It will kick off a thread per CPU that will go into a loop 7260 * writing to the per cpu ring buffer various sizes of data. 7261 * Some of the data will be large items, some small. 7262 * 7263 * Another thread is created that goes into a spin, sending out 7264 * IPIs to the other CPUs to also write into the ring buffer. 7265 * this is to test the nesting ability of the buffer. 7266 * 7267 * Basic stats are recorded and reported. If something in the 7268 * ring buffer should happen that's not expected, a big warning 7269 * is displayed and all ring buffers are disabled. 7270 */ 7271 static struct task_struct *rb_threads[NR_CPUS] __initdata; 7272 7273 struct rb_test_data { 7274 struct trace_buffer *buffer; 7275 unsigned long events; 7276 unsigned long bytes_written; 7277 unsigned long bytes_alloc; 7278 unsigned long bytes_dropped; 7279 unsigned long events_nested; 7280 unsigned long bytes_written_nested; 7281 unsigned long bytes_alloc_nested; 7282 unsigned long bytes_dropped_nested; 7283 int min_size_nested; 7284 int max_size_nested; 7285 int max_size; 7286 int min_size; 7287 int cpu; 7288 int cnt; 7289 }; 7290 7291 static struct rb_test_data rb_data[NR_CPUS] __initdata; 7292 7293 /* 1 meg per cpu */ 7294 #define RB_TEST_BUFFER_SIZE 1048576 7295 7296 static char rb_string[] __initdata = 7297 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 7298 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 7299 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 7300 7301 static bool rb_test_started __initdata; 7302 7303 struct rb_item { 7304 int size; 7305 char str[]; 7306 }; 7307 7308 static __init int rb_write_something(struct rb_test_data *data, bool nested) 7309 { 7310 struct ring_buffer_event *event; 7311 struct rb_item *item; 7312 bool started; 7313 int event_len; 7314 int size; 7315 int len; 7316 int cnt; 7317 7318 /* Have nested writes different that what is written */ 7319 cnt = data->cnt + (nested ? 27 : 0); 7320 7321 /* Multiply cnt by ~e, to make some unique increment */ 7322 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 7323 7324 len = size + sizeof(struct rb_item); 7325 7326 started = rb_test_started; 7327 /* read rb_test_started before checking buffer enabled */ 7328 smp_rmb(); 7329 7330 event = ring_buffer_lock_reserve(data->buffer, len); 7331 if (!event) { 7332 /* Ignore dropped events before test starts. */ 7333 if (started) { 7334 if (nested) 7335 data->bytes_dropped += len; 7336 else 7337 data->bytes_dropped_nested += len; 7338 } 7339 return len; 7340 } 7341 7342 event_len = ring_buffer_event_length(event); 7343 7344 if (RB_WARN_ON(data->buffer, event_len < len)) 7345 goto out; 7346 7347 item = ring_buffer_event_data(event); 7348 item->size = size; 7349 memcpy(item->str, rb_string, size); 7350 7351 if (nested) { 7352 data->bytes_alloc_nested += event_len; 7353 data->bytes_written_nested += len; 7354 data->events_nested++; 7355 if (!data->min_size_nested || len < data->min_size_nested) 7356 data->min_size_nested = len; 7357 if (len > data->max_size_nested) 7358 data->max_size_nested = len; 7359 } else { 7360 data->bytes_alloc += event_len; 7361 data->bytes_written += len; 7362 data->events++; 7363 if (!data->min_size || len < data->min_size) 7364 data->max_size = len; 7365 if (len > data->max_size) 7366 data->max_size = len; 7367 } 7368 7369 out: 7370 ring_buffer_unlock_commit(data->buffer); 7371 7372 return 0; 7373 } 7374 7375 static __init int rb_test(void *arg) 7376 { 7377 struct rb_test_data *data = arg; 7378 7379 while (!kthread_should_stop()) { 7380 rb_write_something(data, false); 7381 data->cnt++; 7382 7383 set_current_state(TASK_INTERRUPTIBLE); 7384 /* Now sleep between a min of 100-300us and a max of 1ms */ 7385 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 7386 } 7387 7388 return 0; 7389 } 7390 7391 static __init void rb_ipi(void *ignore) 7392 { 7393 struct rb_test_data *data; 7394 int cpu = smp_processor_id(); 7395 7396 data = &rb_data[cpu]; 7397 rb_write_something(data, true); 7398 } 7399 7400 static __init int rb_hammer_test(void *arg) 7401 { 7402 while (!kthread_should_stop()) { 7403 7404 /* Send an IPI to all cpus to write data! */ 7405 smp_call_function(rb_ipi, NULL, 1); 7406 /* No sleep, but for non preempt, let others run */ 7407 schedule(); 7408 } 7409 7410 return 0; 7411 } 7412 7413 static __init int test_ringbuffer(void) 7414 { 7415 struct task_struct *rb_hammer; 7416 struct trace_buffer *buffer; 7417 int cpu; 7418 int ret = 0; 7419 7420 if (security_locked_down(LOCKDOWN_TRACEFS)) { 7421 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 7422 return 0; 7423 } 7424 7425 pr_info("Running ring buffer tests...\n"); 7426 7427 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 7428 if (WARN_ON(!buffer)) 7429 return 0; 7430 7431 /* Disable buffer so that threads can't write to it yet */ 7432 ring_buffer_record_off(buffer); 7433 7434 for_each_online_cpu(cpu) { 7435 rb_data[cpu].buffer = buffer; 7436 rb_data[cpu].cpu = cpu; 7437 rb_data[cpu].cnt = cpu; 7438 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 7439 cpu, "rbtester/%u"); 7440 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 7441 pr_cont("FAILED\n"); 7442 ret = PTR_ERR(rb_threads[cpu]); 7443 goto out_free; 7444 } 7445 } 7446 7447 /* Now create the rb hammer! */ 7448 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 7449 if (WARN_ON(IS_ERR(rb_hammer))) { 7450 pr_cont("FAILED\n"); 7451 ret = PTR_ERR(rb_hammer); 7452 goto out_free; 7453 } 7454 7455 ring_buffer_record_on(buffer); 7456 /* 7457 * Show buffer is enabled before setting rb_test_started. 7458 * Yes there's a small race window where events could be 7459 * dropped and the thread wont catch it. But when a ring 7460 * buffer gets enabled, there will always be some kind of 7461 * delay before other CPUs see it. Thus, we don't care about 7462 * those dropped events. We care about events dropped after 7463 * the threads see that the buffer is active. 7464 */ 7465 smp_wmb(); 7466 rb_test_started = true; 7467 7468 set_current_state(TASK_INTERRUPTIBLE); 7469 /* Just run for 10 seconds */; 7470 schedule_timeout(10 * HZ); 7471 7472 kthread_stop(rb_hammer); 7473 7474 out_free: 7475 for_each_online_cpu(cpu) { 7476 if (!rb_threads[cpu]) 7477 break; 7478 kthread_stop(rb_threads[cpu]); 7479 } 7480 if (ret) { 7481 ring_buffer_free(buffer); 7482 return ret; 7483 } 7484 7485 /* Report! */ 7486 pr_info("finished\n"); 7487 for_each_online_cpu(cpu) { 7488 struct ring_buffer_event *event; 7489 struct rb_test_data *data = &rb_data[cpu]; 7490 struct rb_item *item; 7491 unsigned long total_events; 7492 unsigned long total_dropped; 7493 unsigned long total_written; 7494 unsigned long total_alloc; 7495 unsigned long total_read = 0; 7496 unsigned long total_size = 0; 7497 unsigned long total_len = 0; 7498 unsigned long total_lost = 0; 7499 unsigned long lost; 7500 int big_event_size; 7501 int small_event_size; 7502 7503 ret = -1; 7504 7505 total_events = data->events + data->events_nested; 7506 total_written = data->bytes_written + data->bytes_written_nested; 7507 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 7508 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 7509 7510 big_event_size = data->max_size + data->max_size_nested; 7511 small_event_size = data->min_size + data->min_size_nested; 7512 7513 pr_info("CPU %d:\n", cpu); 7514 pr_info(" events: %ld\n", total_events); 7515 pr_info(" dropped bytes: %ld\n", total_dropped); 7516 pr_info(" alloced bytes: %ld\n", total_alloc); 7517 pr_info(" written bytes: %ld\n", total_written); 7518 pr_info(" biggest event: %d\n", big_event_size); 7519 pr_info(" smallest event: %d\n", small_event_size); 7520 7521 if (RB_WARN_ON(buffer, total_dropped)) 7522 break; 7523 7524 ret = 0; 7525 7526 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 7527 total_lost += lost; 7528 item = ring_buffer_event_data(event); 7529 total_len += ring_buffer_event_length(event); 7530 total_size += item->size + sizeof(struct rb_item); 7531 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 7532 pr_info("FAILED!\n"); 7533 pr_info("buffer had: %.*s\n", item->size, item->str); 7534 pr_info("expected: %.*s\n", item->size, rb_string); 7535 RB_WARN_ON(buffer, 1); 7536 ret = -1; 7537 break; 7538 } 7539 total_read++; 7540 } 7541 if (ret) 7542 break; 7543 7544 ret = -1; 7545 7546 pr_info(" read events: %ld\n", total_read); 7547 pr_info(" lost events: %ld\n", total_lost); 7548 pr_info(" total events: %ld\n", total_lost + total_read); 7549 pr_info(" recorded len bytes: %ld\n", total_len); 7550 pr_info(" recorded size bytes: %ld\n", total_size); 7551 if (total_lost) { 7552 pr_info(" With dropped events, record len and size may not match\n" 7553 " alloced and written from above\n"); 7554 } else { 7555 if (RB_WARN_ON(buffer, total_len != total_alloc || 7556 total_size != total_written)) 7557 break; 7558 } 7559 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 7560 break; 7561 7562 ret = 0; 7563 } 7564 if (!ret) 7565 pr_info("Ring buffer PASSED!\n"); 7566 7567 ring_buffer_free(buffer); 7568 return 0; 7569 } 7570 7571 late_initcall(test_ringbuffer); 7572 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 7573