1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <[email protected]> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/cacheflush.h> 13 #include <linux/trace_seq.h> 14 #include <linux/spinlock.h> 15 #include <linux/irq_work.h> 16 #include <linux/security.h> 17 #include <linux/uaccess.h> 18 #include <linux/hardirq.h> 19 #include <linux/kthread.h> /* for self test */ 20 #include <linux/module.h> 21 #include <linux/percpu.h> 22 #include <linux/mutex.h> 23 #include <linux/delay.h> 24 #include <linux/slab.h> 25 #include <linux/init.h> 26 #include <linux/hash.h> 27 #include <linux/list.h> 28 #include <linux/cpu.h> 29 #include <linux/oom.h> 30 #include <linux/mm.h> 31 32 #include <asm/local64.h> 33 #include <asm/local.h> 34 #include <asm/setup.h> 35 36 #include "trace.h" 37 38 /* 39 * The "absolute" timestamp in the buffer is only 59 bits. 40 * If a clock has the 5 MSBs set, it needs to be saved and 41 * reinserted. 42 */ 43 #define TS_MSB (0xf8ULL << 56) 44 #define ABS_TS_MASK (~TS_MSB) 45 46 static void update_pages_handler(struct work_struct *work); 47 48 #define RING_BUFFER_META_MAGIC 0xBADFEED 49 50 struct ring_buffer_meta { 51 int magic; 52 int struct_sizes; 53 unsigned long total_size; 54 unsigned long buffers_offset; 55 }; 56 57 struct ring_buffer_cpu_meta { 58 unsigned long kaslr_addr; 59 unsigned long first_buffer; 60 unsigned long head_buffer; 61 unsigned long commit_buffer; 62 __u32 subbuf_size; 63 __u32 nr_subbufs; 64 int buffers[]; 65 }; 66 67 /* 68 * The ring buffer header is special. We must manually up keep it. 69 */ 70 int ring_buffer_print_entry_header(struct trace_seq *s) 71 { 72 trace_seq_puts(s, "# compressed entry header\n"); 73 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 74 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 75 trace_seq_puts(s, "\tarray : 32 bits\n"); 76 trace_seq_putc(s, '\n'); 77 trace_seq_printf(s, "\tpadding : type == %d\n", 78 RINGBUF_TYPE_PADDING); 79 trace_seq_printf(s, "\ttime_extend : type == %d\n", 80 RINGBUF_TYPE_TIME_EXTEND); 81 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 82 RINGBUF_TYPE_TIME_STAMP); 83 trace_seq_printf(s, "\tdata max type_len == %d\n", 84 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 85 86 return !trace_seq_has_overflowed(s); 87 } 88 89 /* 90 * The ring buffer is made up of a list of pages. A separate list of pages is 91 * allocated for each CPU. A writer may only write to a buffer that is 92 * associated with the CPU it is currently executing on. A reader may read 93 * from any per cpu buffer. 94 * 95 * The reader is special. For each per cpu buffer, the reader has its own 96 * reader page. When a reader has read the entire reader page, this reader 97 * page is swapped with another page in the ring buffer. 98 * 99 * Now, as long as the writer is off the reader page, the reader can do what 100 * ever it wants with that page. The writer will never write to that page 101 * again (as long as it is out of the ring buffer). 102 * 103 * Here's some silly ASCII art. 104 * 105 * +------+ 106 * |reader| RING BUFFER 107 * |page | 108 * +------+ +---+ +---+ +---+ 109 * | |-->| |-->| | 110 * +---+ +---+ +---+ 111 * ^ | 112 * | | 113 * +---------------+ 114 * 115 * 116 * +------+ 117 * |reader| RING BUFFER 118 * |page |------------------v 119 * +------+ +---+ +---+ +---+ 120 * | |-->| |-->| | 121 * +---+ +---+ +---+ 122 * ^ | 123 * | | 124 * +---------------+ 125 * 126 * 127 * +------+ 128 * |reader| RING BUFFER 129 * |page |------------------v 130 * +------+ +---+ +---+ +---+ 131 * ^ | |-->| |-->| | 132 * | +---+ +---+ +---+ 133 * | | 134 * | | 135 * +------------------------------+ 136 * 137 * 138 * +------+ 139 * |buffer| RING BUFFER 140 * |page |------------------v 141 * +------+ +---+ +---+ +---+ 142 * ^ | | | |-->| | 143 * | New +---+ +---+ +---+ 144 * | Reader------^ | 145 * | page | 146 * +------------------------------+ 147 * 148 * 149 * After we make this swap, the reader can hand this page off to the splice 150 * code and be done with it. It can even allocate a new page if it needs to 151 * and swap that into the ring buffer. 152 * 153 * We will be using cmpxchg soon to make all this lockless. 154 * 155 */ 156 157 /* Used for individual buffers (after the counter) */ 158 #define RB_BUFFER_OFF (1 << 20) 159 160 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 161 162 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 163 #define RB_ALIGNMENT 4U 164 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 165 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 166 167 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 168 # define RB_FORCE_8BYTE_ALIGNMENT 0 169 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 170 #else 171 # define RB_FORCE_8BYTE_ALIGNMENT 1 172 # define RB_ARCH_ALIGNMENT 8U 173 #endif 174 175 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 176 177 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 178 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 179 180 enum { 181 RB_LEN_TIME_EXTEND = 8, 182 RB_LEN_TIME_STAMP = 8, 183 }; 184 185 #define skip_time_extend(event) \ 186 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 187 188 #define extended_time(event) \ 189 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 190 191 static inline bool rb_null_event(struct ring_buffer_event *event) 192 { 193 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 194 } 195 196 static void rb_event_set_padding(struct ring_buffer_event *event) 197 { 198 /* padding has a NULL time_delta */ 199 event->type_len = RINGBUF_TYPE_PADDING; 200 event->time_delta = 0; 201 } 202 203 static unsigned 204 rb_event_data_length(struct ring_buffer_event *event) 205 { 206 unsigned length; 207 208 if (event->type_len) 209 length = event->type_len * RB_ALIGNMENT; 210 else 211 length = event->array[0]; 212 return length + RB_EVNT_HDR_SIZE; 213 } 214 215 /* 216 * Return the length of the given event. Will return 217 * the length of the time extend if the event is a 218 * time extend. 219 */ 220 static inline unsigned 221 rb_event_length(struct ring_buffer_event *event) 222 { 223 switch (event->type_len) { 224 case RINGBUF_TYPE_PADDING: 225 if (rb_null_event(event)) 226 /* undefined */ 227 return -1; 228 return event->array[0] + RB_EVNT_HDR_SIZE; 229 230 case RINGBUF_TYPE_TIME_EXTEND: 231 return RB_LEN_TIME_EXTEND; 232 233 case RINGBUF_TYPE_TIME_STAMP: 234 return RB_LEN_TIME_STAMP; 235 236 case RINGBUF_TYPE_DATA: 237 return rb_event_data_length(event); 238 default: 239 WARN_ON_ONCE(1); 240 } 241 /* not hit */ 242 return 0; 243 } 244 245 /* 246 * Return total length of time extend and data, 247 * or just the event length for all other events. 248 */ 249 static inline unsigned 250 rb_event_ts_length(struct ring_buffer_event *event) 251 { 252 unsigned len = 0; 253 254 if (extended_time(event)) { 255 /* time extends include the data event after it */ 256 len = RB_LEN_TIME_EXTEND; 257 event = skip_time_extend(event); 258 } 259 return len + rb_event_length(event); 260 } 261 262 /** 263 * ring_buffer_event_length - return the length of the event 264 * @event: the event to get the length of 265 * 266 * Returns the size of the data load of a data event. 267 * If the event is something other than a data event, it 268 * returns the size of the event itself. With the exception 269 * of a TIME EXTEND, where it still returns the size of the 270 * data load of the data event after it. 271 */ 272 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 273 { 274 unsigned length; 275 276 if (extended_time(event)) 277 event = skip_time_extend(event); 278 279 length = rb_event_length(event); 280 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 281 return length; 282 length -= RB_EVNT_HDR_SIZE; 283 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 284 length -= sizeof(event->array[0]); 285 return length; 286 } 287 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 288 289 /* inline for ring buffer fast paths */ 290 static __always_inline void * 291 rb_event_data(struct ring_buffer_event *event) 292 { 293 if (extended_time(event)) 294 event = skip_time_extend(event); 295 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 296 /* If length is in len field, then array[0] has the data */ 297 if (event->type_len) 298 return (void *)&event->array[0]; 299 /* Otherwise length is in array[0] and array[1] has the data */ 300 return (void *)&event->array[1]; 301 } 302 303 /** 304 * ring_buffer_event_data - return the data of the event 305 * @event: the event to get the data from 306 */ 307 void *ring_buffer_event_data(struct ring_buffer_event *event) 308 { 309 return rb_event_data(event); 310 } 311 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 312 313 #define for_each_buffer_cpu(buffer, cpu) \ 314 for_each_cpu(cpu, buffer->cpumask) 315 316 #define for_each_online_buffer_cpu(buffer, cpu) \ 317 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 318 319 #define TS_SHIFT 27 320 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 321 #define TS_DELTA_TEST (~TS_MASK) 322 323 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 324 { 325 u64 ts; 326 327 ts = event->array[0]; 328 ts <<= TS_SHIFT; 329 ts += event->time_delta; 330 331 return ts; 332 } 333 334 /* Flag when events were overwritten */ 335 #define RB_MISSED_EVENTS (1 << 31) 336 /* Missed count stored at end */ 337 #define RB_MISSED_STORED (1 << 30) 338 339 #define RB_MISSED_MASK (3 << 30) 340 341 struct buffer_data_page { 342 u64 time_stamp; /* page time stamp */ 343 local_t commit; /* write committed index */ 344 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 345 }; 346 347 struct buffer_data_read_page { 348 unsigned order; /* order of the page */ 349 struct buffer_data_page *data; /* actual data, stored in this page */ 350 }; 351 352 /* 353 * Note, the buffer_page list must be first. The buffer pages 354 * are allocated in cache lines, which means that each buffer 355 * page will be at the beginning of a cache line, and thus 356 * the least significant bits will be zero. We use this to 357 * add flags in the list struct pointers, to make the ring buffer 358 * lockless. 359 */ 360 struct buffer_page { 361 struct list_head list; /* list of buffer pages */ 362 local_t write; /* index for next write */ 363 unsigned read; /* index for next read */ 364 local_t entries; /* entries on this page */ 365 unsigned long real_end; /* real end of data */ 366 unsigned order; /* order of the page */ 367 u32 id:30; /* ID for external mapping */ 368 u32 range:1; /* Mapped via a range */ 369 struct buffer_data_page *page; /* Actual data page */ 370 }; 371 372 /* 373 * The buffer page counters, write and entries, must be reset 374 * atomically when crossing page boundaries. To synchronize this 375 * update, two counters are inserted into the number. One is 376 * the actual counter for the write position or count on the page. 377 * 378 * The other is a counter of updaters. Before an update happens 379 * the update partition of the counter is incremented. This will 380 * allow the updater to update the counter atomically. 381 * 382 * The counter is 20 bits, and the state data is 12. 383 */ 384 #define RB_WRITE_MASK 0xfffff 385 #define RB_WRITE_INTCNT (1 << 20) 386 387 static void rb_init_page(struct buffer_data_page *bpage) 388 { 389 local_set(&bpage->commit, 0); 390 } 391 392 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 393 { 394 return local_read(&bpage->page->commit); 395 } 396 397 static void free_buffer_page(struct buffer_page *bpage) 398 { 399 /* Range pages are not to be freed */ 400 if (!bpage->range) 401 free_pages((unsigned long)bpage->page, bpage->order); 402 kfree(bpage); 403 } 404 405 /* 406 * We need to fit the time_stamp delta into 27 bits. 407 */ 408 static inline bool test_time_stamp(u64 delta) 409 { 410 return !!(delta & TS_DELTA_TEST); 411 } 412 413 struct rb_irq_work { 414 struct irq_work work; 415 wait_queue_head_t waiters; 416 wait_queue_head_t full_waiters; 417 atomic_t seq; 418 bool waiters_pending; 419 bool full_waiters_pending; 420 bool wakeup_full; 421 }; 422 423 /* 424 * Structure to hold event state and handle nested events. 425 */ 426 struct rb_event_info { 427 u64 ts; 428 u64 delta; 429 u64 before; 430 u64 after; 431 unsigned long length; 432 struct buffer_page *tail_page; 433 int add_timestamp; 434 }; 435 436 /* 437 * Used for the add_timestamp 438 * NONE 439 * EXTEND - wants a time extend 440 * ABSOLUTE - the buffer requests all events to have absolute time stamps 441 * FORCE - force a full time stamp. 442 */ 443 enum { 444 RB_ADD_STAMP_NONE = 0, 445 RB_ADD_STAMP_EXTEND = BIT(1), 446 RB_ADD_STAMP_ABSOLUTE = BIT(2), 447 RB_ADD_STAMP_FORCE = BIT(3) 448 }; 449 /* 450 * Used for which event context the event is in. 451 * TRANSITION = 0 452 * NMI = 1 453 * IRQ = 2 454 * SOFTIRQ = 3 455 * NORMAL = 4 456 * 457 * See trace_recursive_lock() comment below for more details. 458 */ 459 enum { 460 RB_CTX_TRANSITION, 461 RB_CTX_NMI, 462 RB_CTX_IRQ, 463 RB_CTX_SOFTIRQ, 464 RB_CTX_NORMAL, 465 RB_CTX_MAX 466 }; 467 468 struct rb_time_struct { 469 local64_t time; 470 }; 471 typedef struct rb_time_struct rb_time_t; 472 473 #define MAX_NEST 5 474 475 /* 476 * head_page == tail_page && head == tail then buffer is empty. 477 */ 478 struct ring_buffer_per_cpu { 479 int cpu; 480 atomic_t record_disabled; 481 atomic_t resize_disabled; 482 struct trace_buffer *buffer; 483 raw_spinlock_t reader_lock; /* serialize readers */ 484 arch_spinlock_t lock; 485 struct lock_class_key lock_key; 486 struct buffer_data_page *free_page; 487 unsigned long nr_pages; 488 unsigned int current_context; 489 struct list_head *pages; 490 /* pages generation counter, incremented when the list changes */ 491 unsigned long cnt; 492 struct buffer_page *head_page; /* read from head */ 493 struct buffer_page *tail_page; /* write to tail */ 494 struct buffer_page *commit_page; /* committed pages */ 495 struct buffer_page *reader_page; 496 unsigned long lost_events; 497 unsigned long last_overrun; 498 unsigned long nest; 499 local_t entries_bytes; 500 local_t entries; 501 local_t overrun; 502 local_t commit_overrun; 503 local_t dropped_events; 504 local_t committing; 505 local_t commits; 506 local_t pages_touched; 507 local_t pages_lost; 508 local_t pages_read; 509 long last_pages_touch; 510 size_t shortest_full; 511 unsigned long read; 512 unsigned long read_bytes; 513 rb_time_t write_stamp; 514 rb_time_t before_stamp; 515 u64 event_stamp[MAX_NEST]; 516 u64 read_stamp; 517 /* pages removed since last reset */ 518 unsigned long pages_removed; 519 520 unsigned int mapped; 521 unsigned int user_mapped; /* user space mapping */ 522 struct mutex mapping_lock; 523 unsigned long *subbuf_ids; /* ID to subbuf VA */ 524 struct trace_buffer_meta *meta_page; 525 struct ring_buffer_cpu_meta *ring_meta; 526 527 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 528 long nr_pages_to_update; 529 struct list_head new_pages; /* new pages to add */ 530 struct work_struct update_pages_work; 531 struct completion update_done; 532 533 struct rb_irq_work irq_work; 534 }; 535 536 struct trace_buffer { 537 unsigned flags; 538 int cpus; 539 atomic_t record_disabled; 540 atomic_t resizing; 541 cpumask_var_t cpumask; 542 543 struct lock_class_key *reader_lock_key; 544 545 struct mutex mutex; 546 547 struct ring_buffer_per_cpu **buffers; 548 549 struct hlist_node node; 550 u64 (*clock)(void); 551 552 struct rb_irq_work irq_work; 553 bool time_stamp_abs; 554 555 unsigned long range_addr_start; 556 unsigned long range_addr_end; 557 558 struct ring_buffer_meta *meta; 559 560 unsigned long kaslr_addr; 561 562 unsigned int subbuf_size; 563 unsigned int subbuf_order; 564 unsigned int max_data_size; 565 }; 566 567 struct ring_buffer_iter { 568 struct ring_buffer_per_cpu *cpu_buffer; 569 unsigned long head; 570 unsigned long next_event; 571 struct buffer_page *head_page; 572 struct buffer_page *cache_reader_page; 573 unsigned long cache_read; 574 unsigned long cache_pages_removed; 575 u64 read_stamp; 576 u64 page_stamp; 577 struct ring_buffer_event *event; 578 size_t event_size; 579 int missed_events; 580 }; 581 582 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 583 { 584 struct buffer_data_page field; 585 586 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 587 "offset:0;\tsize:%u;\tsigned:%u;\n", 588 (unsigned int)sizeof(field.time_stamp), 589 (unsigned int)is_signed_type(u64)); 590 591 trace_seq_printf(s, "\tfield: local_t commit;\t" 592 "offset:%u;\tsize:%u;\tsigned:%u;\n", 593 (unsigned int)offsetof(typeof(field), commit), 594 (unsigned int)sizeof(field.commit), 595 (unsigned int)is_signed_type(long)); 596 597 trace_seq_printf(s, "\tfield: int overwrite;\t" 598 "offset:%u;\tsize:%u;\tsigned:%u;\n", 599 (unsigned int)offsetof(typeof(field), commit), 600 1, 601 (unsigned int)is_signed_type(long)); 602 603 trace_seq_printf(s, "\tfield: char data;\t" 604 "offset:%u;\tsize:%u;\tsigned:%u;\n", 605 (unsigned int)offsetof(typeof(field), data), 606 (unsigned int)buffer->subbuf_size, 607 (unsigned int)is_signed_type(char)); 608 609 return !trace_seq_has_overflowed(s); 610 } 611 612 static inline void rb_time_read(rb_time_t *t, u64 *ret) 613 { 614 *ret = local64_read(&t->time); 615 } 616 static void rb_time_set(rb_time_t *t, u64 val) 617 { 618 local64_set(&t->time, val); 619 } 620 621 /* 622 * Enable this to make sure that the event passed to 623 * ring_buffer_event_time_stamp() is not committed and also 624 * is on the buffer that it passed in. 625 */ 626 //#define RB_VERIFY_EVENT 627 #ifdef RB_VERIFY_EVENT 628 static struct list_head *rb_list_head(struct list_head *list); 629 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 630 void *event) 631 { 632 struct buffer_page *page = cpu_buffer->commit_page; 633 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 634 struct list_head *next; 635 long commit, write; 636 unsigned long addr = (unsigned long)event; 637 bool done = false; 638 int stop = 0; 639 640 /* Make sure the event exists and is not committed yet */ 641 do { 642 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 643 done = true; 644 commit = local_read(&page->page->commit); 645 write = local_read(&page->write); 646 if (addr >= (unsigned long)&page->page->data[commit] && 647 addr < (unsigned long)&page->page->data[write]) 648 return; 649 650 next = rb_list_head(page->list.next); 651 page = list_entry(next, struct buffer_page, list); 652 } while (!done); 653 WARN_ON_ONCE(1); 654 } 655 #else 656 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 657 void *event) 658 { 659 } 660 #endif 661 662 /* 663 * The absolute time stamp drops the 5 MSBs and some clocks may 664 * require them. The rb_fix_abs_ts() will take a previous full 665 * time stamp, and add the 5 MSB of that time stamp on to the 666 * saved absolute time stamp. Then they are compared in case of 667 * the unlikely event that the latest time stamp incremented 668 * the 5 MSB. 669 */ 670 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 671 { 672 if (save_ts & TS_MSB) { 673 abs |= save_ts & TS_MSB; 674 /* Check for overflow */ 675 if (unlikely(abs < save_ts)) 676 abs += 1ULL << 59; 677 } 678 return abs; 679 } 680 681 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 682 683 /** 684 * ring_buffer_event_time_stamp - return the event's current time stamp 685 * @buffer: The buffer that the event is on 686 * @event: the event to get the time stamp of 687 * 688 * Note, this must be called after @event is reserved, and before it is 689 * committed to the ring buffer. And must be called from the same 690 * context where the event was reserved (normal, softirq, irq, etc). 691 * 692 * Returns the time stamp associated with the current event. 693 * If the event has an extended time stamp, then that is used as 694 * the time stamp to return. 695 * In the highly unlikely case that the event was nested more than 696 * the max nesting, then the write_stamp of the buffer is returned, 697 * otherwise current time is returned, but that really neither of 698 * the last two cases should ever happen. 699 */ 700 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 701 struct ring_buffer_event *event) 702 { 703 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 704 unsigned int nest; 705 u64 ts; 706 707 /* If the event includes an absolute time, then just use that */ 708 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 709 ts = rb_event_time_stamp(event); 710 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 711 } 712 713 nest = local_read(&cpu_buffer->committing); 714 verify_event(cpu_buffer, event); 715 if (WARN_ON_ONCE(!nest)) 716 goto fail; 717 718 /* Read the current saved nesting level time stamp */ 719 if (likely(--nest < MAX_NEST)) 720 return cpu_buffer->event_stamp[nest]; 721 722 /* Shouldn't happen, warn if it does */ 723 WARN_ONCE(1, "nest (%d) greater than max", nest); 724 725 fail: 726 rb_time_read(&cpu_buffer->write_stamp, &ts); 727 728 return ts; 729 } 730 731 /** 732 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 733 * @buffer: The ring_buffer to get the number of pages from 734 * @cpu: The cpu of the ring_buffer to get the number of pages from 735 * 736 * Returns the number of pages that have content in the ring buffer. 737 */ 738 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 739 { 740 size_t read; 741 size_t lost; 742 size_t cnt; 743 744 read = local_read(&buffer->buffers[cpu]->pages_read); 745 lost = local_read(&buffer->buffers[cpu]->pages_lost); 746 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 747 748 if (WARN_ON_ONCE(cnt < lost)) 749 return 0; 750 751 cnt -= lost; 752 753 /* The reader can read an empty page, but not more than that */ 754 if (cnt < read) { 755 WARN_ON_ONCE(read > cnt + 1); 756 return 0; 757 } 758 759 return cnt - read; 760 } 761 762 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 763 { 764 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 765 size_t nr_pages; 766 size_t dirty; 767 768 nr_pages = cpu_buffer->nr_pages; 769 if (!nr_pages || !full) 770 return true; 771 772 /* 773 * Add one as dirty will never equal nr_pages, as the sub-buffer 774 * that the writer is on is not counted as dirty. 775 * This is needed if "buffer_percent" is set to 100. 776 */ 777 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 778 779 return (dirty * 100) >= (full * nr_pages); 780 } 781 782 /* 783 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 784 * 785 * Schedules a delayed work to wake up any task that is blocked on the 786 * ring buffer waiters queue. 787 */ 788 static void rb_wake_up_waiters(struct irq_work *work) 789 { 790 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 791 792 /* For waiters waiting for the first wake up */ 793 (void)atomic_fetch_inc_release(&rbwork->seq); 794 795 wake_up_all(&rbwork->waiters); 796 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 797 /* Only cpu_buffer sets the above flags */ 798 struct ring_buffer_per_cpu *cpu_buffer = 799 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 800 801 /* Called from interrupt context */ 802 raw_spin_lock(&cpu_buffer->reader_lock); 803 rbwork->wakeup_full = false; 804 rbwork->full_waiters_pending = false; 805 806 /* Waking up all waiters, they will reset the shortest full */ 807 cpu_buffer->shortest_full = 0; 808 raw_spin_unlock(&cpu_buffer->reader_lock); 809 810 wake_up_all(&rbwork->full_waiters); 811 } 812 } 813 814 /** 815 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 816 * @buffer: The ring buffer to wake waiters on 817 * @cpu: The CPU buffer to wake waiters on 818 * 819 * In the case of a file that represents a ring buffer is closing, 820 * it is prudent to wake up any waiters that are on this. 821 */ 822 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 823 { 824 struct ring_buffer_per_cpu *cpu_buffer; 825 struct rb_irq_work *rbwork; 826 827 if (!buffer) 828 return; 829 830 if (cpu == RING_BUFFER_ALL_CPUS) { 831 832 /* Wake up individual ones too. One level recursion */ 833 for_each_buffer_cpu(buffer, cpu) 834 ring_buffer_wake_waiters(buffer, cpu); 835 836 rbwork = &buffer->irq_work; 837 } else { 838 if (WARN_ON_ONCE(!buffer->buffers)) 839 return; 840 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 841 return; 842 843 cpu_buffer = buffer->buffers[cpu]; 844 /* The CPU buffer may not have been initialized yet */ 845 if (!cpu_buffer) 846 return; 847 rbwork = &cpu_buffer->irq_work; 848 } 849 850 /* This can be called in any context */ 851 irq_work_queue(&rbwork->work); 852 } 853 854 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 855 { 856 struct ring_buffer_per_cpu *cpu_buffer; 857 bool ret = false; 858 859 /* Reads of all CPUs always waits for any data */ 860 if (cpu == RING_BUFFER_ALL_CPUS) 861 return !ring_buffer_empty(buffer); 862 863 cpu_buffer = buffer->buffers[cpu]; 864 865 if (!ring_buffer_empty_cpu(buffer, cpu)) { 866 unsigned long flags; 867 bool pagebusy; 868 869 if (!full) 870 return true; 871 872 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 873 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 874 ret = !pagebusy && full_hit(buffer, cpu, full); 875 876 if (!ret && (!cpu_buffer->shortest_full || 877 cpu_buffer->shortest_full > full)) { 878 cpu_buffer->shortest_full = full; 879 } 880 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 881 } 882 return ret; 883 } 884 885 static inline bool 886 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 887 int cpu, int full, ring_buffer_cond_fn cond, void *data) 888 { 889 if (rb_watermark_hit(buffer, cpu, full)) 890 return true; 891 892 if (cond(data)) 893 return true; 894 895 /* 896 * The events can happen in critical sections where 897 * checking a work queue can cause deadlocks. 898 * After adding a task to the queue, this flag is set 899 * only to notify events to try to wake up the queue 900 * using irq_work. 901 * 902 * We don't clear it even if the buffer is no longer 903 * empty. The flag only causes the next event to run 904 * irq_work to do the work queue wake up. The worse 905 * that can happen if we race with !trace_empty() is that 906 * an event will cause an irq_work to try to wake up 907 * an empty queue. 908 * 909 * There's no reason to protect this flag either, as 910 * the work queue and irq_work logic will do the necessary 911 * synchronization for the wake ups. The only thing 912 * that is necessary is that the wake up happens after 913 * a task has been queued. It's OK for spurious wake ups. 914 */ 915 if (full) 916 rbwork->full_waiters_pending = true; 917 else 918 rbwork->waiters_pending = true; 919 920 return false; 921 } 922 923 struct rb_wait_data { 924 struct rb_irq_work *irq_work; 925 int seq; 926 }; 927 928 /* 929 * The default wait condition for ring_buffer_wait() is to just to exit the 930 * wait loop the first time it is woken up. 931 */ 932 static bool rb_wait_once(void *data) 933 { 934 struct rb_wait_data *rdata = data; 935 struct rb_irq_work *rbwork = rdata->irq_work; 936 937 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 938 } 939 940 /** 941 * ring_buffer_wait - wait for input to the ring buffer 942 * @buffer: buffer to wait on 943 * @cpu: the cpu buffer to wait on 944 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 945 * @cond: condition function to break out of wait (NULL to run once) 946 * @data: the data to pass to @cond. 947 * 948 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 949 * as data is added to any of the @buffer's cpu buffers. Otherwise 950 * it will wait for data to be added to a specific cpu buffer. 951 */ 952 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 953 ring_buffer_cond_fn cond, void *data) 954 { 955 struct ring_buffer_per_cpu *cpu_buffer; 956 struct wait_queue_head *waitq; 957 struct rb_irq_work *rbwork; 958 struct rb_wait_data rdata; 959 int ret = 0; 960 961 /* 962 * Depending on what the caller is waiting for, either any 963 * data in any cpu buffer, or a specific buffer, put the 964 * caller on the appropriate wait queue. 965 */ 966 if (cpu == RING_BUFFER_ALL_CPUS) { 967 rbwork = &buffer->irq_work; 968 /* Full only makes sense on per cpu reads */ 969 full = 0; 970 } else { 971 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 972 return -ENODEV; 973 cpu_buffer = buffer->buffers[cpu]; 974 rbwork = &cpu_buffer->irq_work; 975 } 976 977 if (full) 978 waitq = &rbwork->full_waiters; 979 else 980 waitq = &rbwork->waiters; 981 982 /* Set up to exit loop as soon as it is woken */ 983 if (!cond) { 984 cond = rb_wait_once; 985 rdata.irq_work = rbwork; 986 rdata.seq = atomic_read_acquire(&rbwork->seq); 987 data = &rdata; 988 } 989 990 ret = wait_event_interruptible((*waitq), 991 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 992 993 return ret; 994 } 995 996 /** 997 * ring_buffer_poll_wait - poll on buffer input 998 * @buffer: buffer to wait on 999 * @cpu: the cpu buffer to wait on 1000 * @filp: the file descriptor 1001 * @poll_table: The poll descriptor 1002 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1003 * 1004 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1005 * as data is added to any of the @buffer's cpu buffers. Otherwise 1006 * it will wait for data to be added to a specific cpu buffer. 1007 * 1008 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1009 * zero otherwise. 1010 */ 1011 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1012 struct file *filp, poll_table *poll_table, int full) 1013 { 1014 struct ring_buffer_per_cpu *cpu_buffer; 1015 struct rb_irq_work *rbwork; 1016 1017 if (cpu == RING_BUFFER_ALL_CPUS) { 1018 rbwork = &buffer->irq_work; 1019 full = 0; 1020 } else { 1021 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1022 return EPOLLERR; 1023 1024 cpu_buffer = buffer->buffers[cpu]; 1025 rbwork = &cpu_buffer->irq_work; 1026 } 1027 1028 if (full) { 1029 poll_wait(filp, &rbwork->full_waiters, poll_table); 1030 1031 if (rb_watermark_hit(buffer, cpu, full)) 1032 return EPOLLIN | EPOLLRDNORM; 1033 /* 1034 * Only allow full_waiters_pending update to be seen after 1035 * the shortest_full is set (in rb_watermark_hit). If the 1036 * writer sees the full_waiters_pending flag set, it will 1037 * compare the amount in the ring buffer to shortest_full. 1038 * If the amount in the ring buffer is greater than the 1039 * shortest_full percent, it will call the irq_work handler 1040 * to wake up this list. The irq_handler will reset shortest_full 1041 * back to zero. That's done under the reader_lock, but 1042 * the below smp_mb() makes sure that the update to 1043 * full_waiters_pending doesn't leak up into the above. 1044 */ 1045 smp_mb(); 1046 rbwork->full_waiters_pending = true; 1047 return 0; 1048 } 1049 1050 poll_wait(filp, &rbwork->waiters, poll_table); 1051 rbwork->waiters_pending = true; 1052 1053 /* 1054 * There's a tight race between setting the waiters_pending and 1055 * checking if the ring buffer is empty. Once the waiters_pending bit 1056 * is set, the next event will wake the task up, but we can get stuck 1057 * if there's only a single event in. 1058 * 1059 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1060 * but adding a memory barrier to all events will cause too much of a 1061 * performance hit in the fast path. We only need a memory barrier when 1062 * the buffer goes from empty to having content. But as this race is 1063 * extremely small, and it's not a problem if another event comes in, we 1064 * will fix it later. 1065 */ 1066 smp_mb(); 1067 1068 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1069 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1070 return EPOLLIN | EPOLLRDNORM; 1071 return 0; 1072 } 1073 1074 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1075 #define RB_WARN_ON(b, cond) \ 1076 ({ \ 1077 int _____ret = unlikely(cond); \ 1078 if (_____ret) { \ 1079 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1080 struct ring_buffer_per_cpu *__b = \ 1081 (void *)b; \ 1082 atomic_inc(&__b->buffer->record_disabled); \ 1083 } else \ 1084 atomic_inc(&b->record_disabled); \ 1085 WARN_ON(1); \ 1086 } \ 1087 _____ret; \ 1088 }) 1089 1090 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1091 #define DEBUG_SHIFT 0 1092 1093 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1094 { 1095 u64 ts; 1096 1097 /* Skip retpolines :-( */ 1098 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1099 ts = trace_clock_local(); 1100 else 1101 ts = buffer->clock(); 1102 1103 /* shift to debug/test normalization and TIME_EXTENTS */ 1104 return ts << DEBUG_SHIFT; 1105 } 1106 1107 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1108 { 1109 u64 time; 1110 1111 preempt_disable_notrace(); 1112 time = rb_time_stamp(buffer); 1113 preempt_enable_notrace(); 1114 1115 return time; 1116 } 1117 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1118 1119 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1120 int cpu, u64 *ts) 1121 { 1122 /* Just stupid testing the normalize function and deltas */ 1123 *ts >>= DEBUG_SHIFT; 1124 } 1125 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1126 1127 /* 1128 * Making the ring buffer lockless makes things tricky. 1129 * Although writes only happen on the CPU that they are on, 1130 * and they only need to worry about interrupts. Reads can 1131 * happen on any CPU. 1132 * 1133 * The reader page is always off the ring buffer, but when the 1134 * reader finishes with a page, it needs to swap its page with 1135 * a new one from the buffer. The reader needs to take from 1136 * the head (writes go to the tail). But if a writer is in overwrite 1137 * mode and wraps, it must push the head page forward. 1138 * 1139 * Here lies the problem. 1140 * 1141 * The reader must be careful to replace only the head page, and 1142 * not another one. As described at the top of the file in the 1143 * ASCII art, the reader sets its old page to point to the next 1144 * page after head. It then sets the page after head to point to 1145 * the old reader page. But if the writer moves the head page 1146 * during this operation, the reader could end up with the tail. 1147 * 1148 * We use cmpxchg to help prevent this race. We also do something 1149 * special with the page before head. We set the LSB to 1. 1150 * 1151 * When the writer must push the page forward, it will clear the 1152 * bit that points to the head page, move the head, and then set 1153 * the bit that points to the new head page. 1154 * 1155 * We also don't want an interrupt coming in and moving the head 1156 * page on another writer. Thus we use the second LSB to catch 1157 * that too. Thus: 1158 * 1159 * head->list->prev->next bit 1 bit 0 1160 * ------- ------- 1161 * Normal page 0 0 1162 * Points to head page 0 1 1163 * New head page 1 0 1164 * 1165 * Note we can not trust the prev pointer of the head page, because: 1166 * 1167 * +----+ +-----+ +-----+ 1168 * | |------>| T |---X--->| N | 1169 * | |<------| | | | 1170 * +----+ +-----+ +-----+ 1171 * ^ ^ | 1172 * | +-----+ | | 1173 * +----------| R |----------+ | 1174 * | |<-----------+ 1175 * +-----+ 1176 * 1177 * Key: ---X--> HEAD flag set in pointer 1178 * T Tail page 1179 * R Reader page 1180 * N Next page 1181 * 1182 * (see __rb_reserve_next() to see where this happens) 1183 * 1184 * What the above shows is that the reader just swapped out 1185 * the reader page with a page in the buffer, but before it 1186 * could make the new header point back to the new page added 1187 * it was preempted by a writer. The writer moved forward onto 1188 * the new page added by the reader and is about to move forward 1189 * again. 1190 * 1191 * You can see, it is legitimate for the previous pointer of 1192 * the head (or any page) not to point back to itself. But only 1193 * temporarily. 1194 */ 1195 1196 #define RB_PAGE_NORMAL 0UL 1197 #define RB_PAGE_HEAD 1UL 1198 #define RB_PAGE_UPDATE 2UL 1199 1200 1201 #define RB_FLAG_MASK 3UL 1202 1203 /* PAGE_MOVED is not part of the mask */ 1204 #define RB_PAGE_MOVED 4UL 1205 1206 /* 1207 * rb_list_head - remove any bit 1208 */ 1209 static struct list_head *rb_list_head(struct list_head *list) 1210 { 1211 unsigned long val = (unsigned long)list; 1212 1213 return (struct list_head *)(val & ~RB_FLAG_MASK); 1214 } 1215 1216 /* 1217 * rb_is_head_page - test if the given page is the head page 1218 * 1219 * Because the reader may move the head_page pointer, we can 1220 * not trust what the head page is (it may be pointing to 1221 * the reader page). But if the next page is a header page, 1222 * its flags will be non zero. 1223 */ 1224 static inline int 1225 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1226 { 1227 unsigned long val; 1228 1229 val = (unsigned long)list->next; 1230 1231 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1232 return RB_PAGE_MOVED; 1233 1234 return val & RB_FLAG_MASK; 1235 } 1236 1237 /* 1238 * rb_is_reader_page 1239 * 1240 * The unique thing about the reader page, is that, if the 1241 * writer is ever on it, the previous pointer never points 1242 * back to the reader page. 1243 */ 1244 static bool rb_is_reader_page(struct buffer_page *page) 1245 { 1246 struct list_head *list = page->list.prev; 1247 1248 return rb_list_head(list->next) != &page->list; 1249 } 1250 1251 /* 1252 * rb_set_list_to_head - set a list_head to be pointing to head. 1253 */ 1254 static void rb_set_list_to_head(struct list_head *list) 1255 { 1256 unsigned long *ptr; 1257 1258 ptr = (unsigned long *)&list->next; 1259 *ptr |= RB_PAGE_HEAD; 1260 *ptr &= ~RB_PAGE_UPDATE; 1261 } 1262 1263 /* 1264 * rb_head_page_activate - sets up head page 1265 */ 1266 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1267 { 1268 struct buffer_page *head; 1269 1270 head = cpu_buffer->head_page; 1271 if (!head) 1272 return; 1273 1274 /* 1275 * Set the previous list pointer to have the HEAD flag. 1276 */ 1277 rb_set_list_to_head(head->list.prev); 1278 1279 if (cpu_buffer->ring_meta) { 1280 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1281 meta->head_buffer = (unsigned long)head->page; 1282 } 1283 } 1284 1285 static void rb_list_head_clear(struct list_head *list) 1286 { 1287 unsigned long *ptr = (unsigned long *)&list->next; 1288 1289 *ptr &= ~RB_FLAG_MASK; 1290 } 1291 1292 /* 1293 * rb_head_page_deactivate - clears head page ptr (for free list) 1294 */ 1295 static void 1296 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1297 { 1298 struct list_head *hd; 1299 1300 /* Go through the whole list and clear any pointers found. */ 1301 rb_list_head_clear(cpu_buffer->pages); 1302 1303 list_for_each(hd, cpu_buffer->pages) 1304 rb_list_head_clear(hd); 1305 } 1306 1307 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1308 struct buffer_page *head, 1309 struct buffer_page *prev, 1310 int old_flag, int new_flag) 1311 { 1312 struct list_head *list; 1313 unsigned long val = (unsigned long)&head->list; 1314 unsigned long ret; 1315 1316 list = &prev->list; 1317 1318 val &= ~RB_FLAG_MASK; 1319 1320 ret = cmpxchg((unsigned long *)&list->next, 1321 val | old_flag, val | new_flag); 1322 1323 /* check if the reader took the page */ 1324 if ((ret & ~RB_FLAG_MASK) != val) 1325 return RB_PAGE_MOVED; 1326 1327 return ret & RB_FLAG_MASK; 1328 } 1329 1330 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1331 struct buffer_page *head, 1332 struct buffer_page *prev, 1333 int old_flag) 1334 { 1335 return rb_head_page_set(cpu_buffer, head, prev, 1336 old_flag, RB_PAGE_UPDATE); 1337 } 1338 1339 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1340 struct buffer_page *head, 1341 struct buffer_page *prev, 1342 int old_flag) 1343 { 1344 return rb_head_page_set(cpu_buffer, head, prev, 1345 old_flag, RB_PAGE_HEAD); 1346 } 1347 1348 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1349 struct buffer_page *head, 1350 struct buffer_page *prev, 1351 int old_flag) 1352 { 1353 return rb_head_page_set(cpu_buffer, head, prev, 1354 old_flag, RB_PAGE_NORMAL); 1355 } 1356 1357 static inline void rb_inc_page(struct buffer_page **bpage) 1358 { 1359 struct list_head *p = rb_list_head((*bpage)->list.next); 1360 1361 *bpage = list_entry(p, struct buffer_page, list); 1362 } 1363 1364 static struct buffer_page * 1365 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1366 { 1367 struct buffer_page *head; 1368 struct buffer_page *page; 1369 struct list_head *list; 1370 int i; 1371 1372 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1373 return NULL; 1374 1375 /* sanity check */ 1376 list = cpu_buffer->pages; 1377 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1378 return NULL; 1379 1380 page = head = cpu_buffer->head_page; 1381 /* 1382 * It is possible that the writer moves the header behind 1383 * where we started, and we miss in one loop. 1384 * A second loop should grab the header, but we'll do 1385 * three loops just because I'm paranoid. 1386 */ 1387 for (i = 0; i < 3; i++) { 1388 do { 1389 if (rb_is_head_page(page, page->list.prev)) { 1390 cpu_buffer->head_page = page; 1391 return page; 1392 } 1393 rb_inc_page(&page); 1394 } while (page != head); 1395 } 1396 1397 RB_WARN_ON(cpu_buffer, 1); 1398 1399 return NULL; 1400 } 1401 1402 static bool rb_head_page_replace(struct buffer_page *old, 1403 struct buffer_page *new) 1404 { 1405 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1406 unsigned long val; 1407 1408 val = *ptr & ~RB_FLAG_MASK; 1409 val |= RB_PAGE_HEAD; 1410 1411 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1412 } 1413 1414 /* 1415 * rb_tail_page_update - move the tail page forward 1416 */ 1417 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1418 struct buffer_page *tail_page, 1419 struct buffer_page *next_page) 1420 { 1421 unsigned long old_entries; 1422 unsigned long old_write; 1423 1424 /* 1425 * The tail page now needs to be moved forward. 1426 * 1427 * We need to reset the tail page, but without messing 1428 * with possible erasing of data brought in by interrupts 1429 * that have moved the tail page and are currently on it. 1430 * 1431 * We add a counter to the write field to denote this. 1432 */ 1433 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1434 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1435 1436 /* 1437 * Just make sure we have seen our old_write and synchronize 1438 * with any interrupts that come in. 1439 */ 1440 barrier(); 1441 1442 /* 1443 * If the tail page is still the same as what we think 1444 * it is, then it is up to us to update the tail 1445 * pointer. 1446 */ 1447 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1448 /* Zero the write counter */ 1449 unsigned long val = old_write & ~RB_WRITE_MASK; 1450 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1451 1452 /* 1453 * This will only succeed if an interrupt did 1454 * not come in and change it. In which case, we 1455 * do not want to modify it. 1456 * 1457 * We add (void) to let the compiler know that we do not care 1458 * about the return value of these functions. We use the 1459 * cmpxchg to only update if an interrupt did not already 1460 * do it for us. If the cmpxchg fails, we don't care. 1461 */ 1462 (void)local_cmpxchg(&next_page->write, old_write, val); 1463 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1464 1465 /* 1466 * No need to worry about races with clearing out the commit. 1467 * it only can increment when a commit takes place. But that 1468 * only happens in the outer most nested commit. 1469 */ 1470 local_set(&next_page->page->commit, 0); 1471 1472 /* Either we update tail_page or an interrupt does */ 1473 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1474 local_inc(&cpu_buffer->pages_touched); 1475 } 1476 } 1477 1478 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1479 struct buffer_page *bpage) 1480 { 1481 unsigned long val = (unsigned long)bpage; 1482 1483 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1484 } 1485 1486 static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer, 1487 struct list_head *list) 1488 { 1489 if (RB_WARN_ON(cpu_buffer, 1490 rb_list_head(rb_list_head(list->next)->prev) != list)) 1491 return false; 1492 1493 if (RB_WARN_ON(cpu_buffer, 1494 rb_list_head(rb_list_head(list->prev)->next) != list)) 1495 return false; 1496 1497 return true; 1498 } 1499 1500 /** 1501 * rb_check_pages - integrity check of buffer pages 1502 * @cpu_buffer: CPU buffer with pages to test 1503 * 1504 * As a safety measure we check to make sure the data pages have not 1505 * been corrupted. 1506 */ 1507 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1508 { 1509 struct list_head *head, *tmp; 1510 unsigned long buffer_cnt; 1511 unsigned long flags; 1512 int nr_loops = 0; 1513 1514 /* 1515 * Walk the linked list underpinning the ring buffer and validate all 1516 * its next and prev links. 1517 * 1518 * The check acquires the reader_lock to avoid concurrent processing 1519 * with code that could be modifying the list. However, the lock cannot 1520 * be held for the entire duration of the walk, as this would make the 1521 * time when interrupts are disabled non-deterministic, dependent on the 1522 * ring buffer size. Therefore, the code releases and re-acquires the 1523 * lock after checking each page. The ring_buffer_per_cpu.cnt variable 1524 * is then used to detect if the list was modified while the lock was 1525 * not held, in which case the check needs to be restarted. 1526 * 1527 * The code attempts to perform the check at most three times before 1528 * giving up. This is acceptable because this is only a self-validation 1529 * to detect problems early on. In practice, the list modification 1530 * operations are fairly spaced, and so this check typically succeeds at 1531 * most on the second try. 1532 */ 1533 again: 1534 if (++nr_loops > 3) 1535 return; 1536 1537 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1538 head = rb_list_head(cpu_buffer->pages); 1539 if (!rb_check_links(cpu_buffer, head)) 1540 goto out_locked; 1541 buffer_cnt = cpu_buffer->cnt; 1542 tmp = head; 1543 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1544 1545 while (true) { 1546 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1547 1548 if (buffer_cnt != cpu_buffer->cnt) { 1549 /* The list was updated, try again. */ 1550 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1551 goto again; 1552 } 1553 1554 tmp = rb_list_head(tmp->next); 1555 if (tmp == head) 1556 /* The iteration circled back, all is done. */ 1557 goto out_locked; 1558 1559 if (!rb_check_links(cpu_buffer, tmp)) 1560 goto out_locked; 1561 1562 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1563 } 1564 1565 out_locked: 1566 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1567 } 1568 1569 /* 1570 * Take an address, add the meta data size as well as the array of 1571 * array subbuffer indexes, then align it to a subbuffer size. 1572 * 1573 * This is used to help find the next per cpu subbuffer within a mapped range. 1574 */ 1575 static unsigned long 1576 rb_range_align_subbuf(unsigned long addr, int subbuf_size, int nr_subbufs) 1577 { 1578 addr += sizeof(struct ring_buffer_cpu_meta) + 1579 sizeof(int) * nr_subbufs; 1580 return ALIGN(addr, subbuf_size); 1581 } 1582 1583 /* 1584 * Return the ring_buffer_meta for a given @cpu. 1585 */ 1586 static void *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu) 1587 { 1588 int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 1589 struct ring_buffer_cpu_meta *meta; 1590 struct ring_buffer_meta *bmeta; 1591 unsigned long ptr; 1592 int nr_subbufs; 1593 1594 bmeta = buffer->meta; 1595 if (!bmeta) 1596 return NULL; 1597 1598 ptr = (unsigned long)bmeta + bmeta->buffers_offset; 1599 meta = (struct ring_buffer_cpu_meta *)ptr; 1600 1601 /* When nr_pages passed in is zero, the first meta has already been initialized */ 1602 if (!nr_pages) { 1603 nr_subbufs = meta->nr_subbufs; 1604 } else { 1605 /* Include the reader page */ 1606 nr_subbufs = nr_pages + 1; 1607 } 1608 1609 /* 1610 * The first chunk may not be subbuffer aligned, where as 1611 * the rest of the chunks are. 1612 */ 1613 if (cpu) { 1614 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1615 ptr += subbuf_size * nr_subbufs; 1616 1617 /* We can use multiplication to find chunks greater than 1 */ 1618 if (cpu > 1) { 1619 unsigned long size; 1620 unsigned long p; 1621 1622 /* Save the beginning of this CPU chunk */ 1623 p = ptr; 1624 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1625 ptr += subbuf_size * nr_subbufs; 1626 1627 /* Now all chunks after this are the same size */ 1628 size = ptr - p; 1629 ptr += size * (cpu - 2); 1630 } 1631 } 1632 return (void *)ptr; 1633 } 1634 1635 /* Return the start of subbufs given the meta pointer */ 1636 static void *rb_subbufs_from_meta(struct ring_buffer_cpu_meta *meta) 1637 { 1638 int subbuf_size = meta->subbuf_size; 1639 unsigned long ptr; 1640 1641 ptr = (unsigned long)meta; 1642 ptr = rb_range_align_subbuf(ptr, subbuf_size, meta->nr_subbufs); 1643 1644 return (void *)ptr; 1645 } 1646 1647 /* 1648 * Return a specific sub-buffer for a given @cpu defined by @idx. 1649 */ 1650 static void *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx) 1651 { 1652 struct ring_buffer_cpu_meta *meta; 1653 unsigned long ptr; 1654 int subbuf_size; 1655 1656 meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu); 1657 if (!meta) 1658 return NULL; 1659 1660 if (WARN_ON_ONCE(idx >= meta->nr_subbufs)) 1661 return NULL; 1662 1663 subbuf_size = meta->subbuf_size; 1664 1665 /* Map this buffer to the order that's in meta->buffers[] */ 1666 idx = meta->buffers[idx]; 1667 1668 ptr = (unsigned long)rb_subbufs_from_meta(meta); 1669 1670 ptr += subbuf_size * idx; 1671 if (ptr + subbuf_size > cpu_buffer->buffer->range_addr_end) 1672 return NULL; 1673 1674 return (void *)ptr; 1675 } 1676 1677 /* 1678 * See if the existing memory contains a valid meta section. 1679 * if so, use that, otherwise initialize it. 1680 */ 1681 static bool rb_meta_init(struct trace_buffer *buffer) 1682 { 1683 unsigned long ptr = buffer->range_addr_start; 1684 struct ring_buffer_meta *bmeta; 1685 unsigned long total_size; 1686 int struct_sizes; 1687 1688 bmeta = (struct ring_buffer_meta *)ptr; 1689 buffer->meta = bmeta; 1690 1691 total_size = buffer->range_addr_end - buffer->range_addr_start; 1692 1693 struct_sizes = sizeof(struct ring_buffer_cpu_meta); 1694 struct_sizes |= sizeof(*bmeta) << 16; 1695 1696 /* The first buffer will start word size after the meta page */ 1697 ptr += sizeof(*bmeta); 1698 ptr = ALIGN(ptr, sizeof(long)); 1699 1700 if (bmeta->magic != RING_BUFFER_META_MAGIC) { 1701 pr_info("Ring buffer boot meta mismatch of magic\n"); 1702 goto init; 1703 } 1704 1705 if (bmeta->struct_sizes != struct_sizes) { 1706 pr_info("Ring buffer boot meta mismatch of struct size\n"); 1707 goto init; 1708 } 1709 1710 if (bmeta->total_size != total_size) { 1711 pr_info("Ring buffer boot meta mismatch of total size\n"); 1712 goto init; 1713 } 1714 1715 if (bmeta->buffers_offset > bmeta->total_size) { 1716 pr_info("Ring buffer boot meta mismatch of offset outside of total size\n"); 1717 goto init; 1718 } 1719 1720 if (bmeta->buffers_offset != (void *)ptr - (void *)bmeta) { 1721 pr_info("Ring buffer boot meta mismatch of first buffer offset\n"); 1722 goto init; 1723 } 1724 1725 return true; 1726 1727 init: 1728 bmeta->magic = RING_BUFFER_META_MAGIC; 1729 bmeta->struct_sizes = struct_sizes; 1730 bmeta->total_size = total_size; 1731 bmeta->buffers_offset = (void *)ptr - (void *)bmeta; 1732 1733 return false; 1734 } 1735 1736 /* 1737 * See if the existing memory contains valid ring buffer data. 1738 * As the previous kernel must be the same as this kernel, all 1739 * the calculations (size of buffers and number of buffers) 1740 * must be the same. 1741 */ 1742 static bool rb_cpu_meta_valid(struct ring_buffer_cpu_meta *meta, int cpu, 1743 struct trace_buffer *buffer, int nr_pages, 1744 unsigned long *subbuf_mask) 1745 { 1746 int subbuf_size = PAGE_SIZE; 1747 struct buffer_data_page *subbuf; 1748 unsigned long buffers_start; 1749 unsigned long buffers_end; 1750 int i; 1751 1752 if (!subbuf_mask) 1753 return false; 1754 1755 buffers_start = meta->first_buffer; 1756 buffers_end = meta->first_buffer + (subbuf_size * meta->nr_subbufs); 1757 1758 /* Is the head and commit buffers within the range of buffers? */ 1759 if (meta->head_buffer < buffers_start || 1760 meta->head_buffer >= buffers_end) { 1761 pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu); 1762 return false; 1763 } 1764 1765 if (meta->commit_buffer < buffers_start || 1766 meta->commit_buffer >= buffers_end) { 1767 pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu); 1768 return false; 1769 } 1770 1771 subbuf = rb_subbufs_from_meta(meta); 1772 1773 bitmap_clear(subbuf_mask, 0, meta->nr_subbufs); 1774 1775 /* Is the meta buffers and the subbufs themselves have correct data? */ 1776 for (i = 0; i < meta->nr_subbufs; i++) { 1777 if (meta->buffers[i] < 0 || 1778 meta->buffers[i] >= meta->nr_subbufs) { 1779 pr_info("Ring buffer boot meta [%d] array out of range\n", cpu); 1780 return false; 1781 } 1782 1783 if ((unsigned)local_read(&subbuf->commit) > subbuf_size) { 1784 pr_info("Ring buffer boot meta [%d] buffer invalid commit\n", cpu); 1785 return false; 1786 } 1787 1788 if (test_bit(meta->buffers[i], subbuf_mask)) { 1789 pr_info("Ring buffer boot meta [%d] array has duplicates\n", cpu); 1790 return false; 1791 } 1792 1793 set_bit(meta->buffers[i], subbuf_mask); 1794 subbuf = (void *)subbuf + subbuf_size; 1795 } 1796 1797 return true; 1798 } 1799 1800 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf); 1801 1802 static int rb_read_data_buffer(struct buffer_data_page *dpage, int tail, int cpu, 1803 unsigned long long *timestamp, u64 *delta_ptr) 1804 { 1805 struct ring_buffer_event *event; 1806 u64 ts, delta; 1807 int events = 0; 1808 int e; 1809 1810 *delta_ptr = 0; 1811 *timestamp = 0; 1812 1813 ts = dpage->time_stamp; 1814 1815 for (e = 0; e < tail; e += rb_event_length(event)) { 1816 1817 event = (struct ring_buffer_event *)(dpage->data + e); 1818 1819 switch (event->type_len) { 1820 1821 case RINGBUF_TYPE_TIME_EXTEND: 1822 delta = rb_event_time_stamp(event); 1823 ts += delta; 1824 break; 1825 1826 case RINGBUF_TYPE_TIME_STAMP: 1827 delta = rb_event_time_stamp(event); 1828 delta = rb_fix_abs_ts(delta, ts); 1829 if (delta < ts) { 1830 *delta_ptr = delta; 1831 *timestamp = ts; 1832 return -1; 1833 } 1834 ts = delta; 1835 break; 1836 1837 case RINGBUF_TYPE_PADDING: 1838 if (event->time_delta == 1) 1839 break; 1840 fallthrough; 1841 case RINGBUF_TYPE_DATA: 1842 events++; 1843 ts += event->time_delta; 1844 break; 1845 1846 default: 1847 return -1; 1848 } 1849 } 1850 *timestamp = ts; 1851 return events; 1852 } 1853 1854 static int rb_validate_buffer(struct buffer_data_page *dpage, int cpu) 1855 { 1856 unsigned long long ts; 1857 u64 delta; 1858 int tail; 1859 1860 tail = local_read(&dpage->commit); 1861 return rb_read_data_buffer(dpage, tail, cpu, &ts, &delta); 1862 } 1863 1864 /* If the meta data has been validated, now validate the events */ 1865 static void rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer) 1866 { 1867 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1868 struct buffer_page *head_page; 1869 unsigned long entry_bytes = 0; 1870 unsigned long entries = 0; 1871 int ret; 1872 int i; 1873 1874 if (!meta || !meta->head_buffer) 1875 return; 1876 1877 /* Do the reader page first */ 1878 ret = rb_validate_buffer(cpu_buffer->reader_page->page, cpu_buffer->cpu); 1879 if (ret < 0) { 1880 pr_info("Ring buffer reader page is invalid\n"); 1881 goto invalid; 1882 } 1883 entries += ret; 1884 entry_bytes += local_read(&cpu_buffer->reader_page->page->commit); 1885 local_set(&cpu_buffer->reader_page->entries, ret); 1886 1887 head_page = cpu_buffer->head_page; 1888 1889 /* If both the head and commit are on the reader_page then we are done. */ 1890 if (head_page == cpu_buffer->reader_page && 1891 head_page == cpu_buffer->commit_page) 1892 goto done; 1893 1894 /* Iterate until finding the commit page */ 1895 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) { 1896 1897 /* Reader page has already been done */ 1898 if (head_page == cpu_buffer->reader_page) 1899 continue; 1900 1901 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 1902 if (ret < 0) { 1903 pr_info("Ring buffer meta [%d] invalid buffer page\n", 1904 cpu_buffer->cpu); 1905 goto invalid; 1906 } 1907 1908 /* If the buffer has content, update pages_touched */ 1909 if (ret) 1910 local_inc(&cpu_buffer->pages_touched); 1911 1912 entries += ret; 1913 entry_bytes += local_read(&head_page->page->commit); 1914 local_set(&cpu_buffer->head_page->entries, ret); 1915 1916 if (head_page == cpu_buffer->commit_page) 1917 break; 1918 } 1919 1920 if (head_page != cpu_buffer->commit_page) { 1921 pr_info("Ring buffer meta [%d] commit page not found\n", 1922 cpu_buffer->cpu); 1923 goto invalid; 1924 } 1925 done: 1926 local_set(&cpu_buffer->entries, entries); 1927 local_set(&cpu_buffer->entries_bytes, entry_bytes); 1928 1929 pr_info("Ring buffer meta [%d] is from previous boot!\n", cpu_buffer->cpu); 1930 return; 1931 1932 invalid: 1933 /* The content of the buffers are invalid, reset the meta data */ 1934 meta->head_buffer = 0; 1935 meta->commit_buffer = 0; 1936 1937 /* Reset the reader page */ 1938 local_set(&cpu_buffer->reader_page->entries, 0); 1939 local_set(&cpu_buffer->reader_page->page->commit, 0); 1940 1941 /* Reset all the subbuffers */ 1942 for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) { 1943 local_set(&head_page->entries, 0); 1944 local_set(&head_page->page->commit, 0); 1945 } 1946 } 1947 1948 static void rb_meta_init_text_addr(struct ring_buffer_cpu_meta *meta) 1949 { 1950 #ifdef CONFIG_RANDOMIZE_BASE 1951 meta->kaslr_addr = kaslr_offset(); 1952 #else 1953 meta->kaslr_addr = 0; 1954 #endif 1955 } 1956 1957 static void rb_range_meta_init(struct trace_buffer *buffer, int nr_pages) 1958 { 1959 struct ring_buffer_cpu_meta *meta; 1960 struct ring_buffer_meta *bmeta; 1961 unsigned long *subbuf_mask; 1962 unsigned long delta; 1963 void *subbuf; 1964 bool valid = false; 1965 int cpu; 1966 int i; 1967 1968 /* Create a mask to test the subbuf array */ 1969 subbuf_mask = bitmap_alloc(nr_pages + 1, GFP_KERNEL); 1970 /* If subbuf_mask fails to allocate, then rb_meta_valid() will return false */ 1971 1972 if (rb_meta_init(buffer)) 1973 valid = true; 1974 1975 bmeta = buffer->meta; 1976 1977 for (cpu = 0; cpu < nr_cpu_ids; cpu++) { 1978 void *next_meta; 1979 1980 meta = rb_range_meta(buffer, nr_pages, cpu); 1981 1982 if (valid && rb_cpu_meta_valid(meta, cpu, buffer, nr_pages, subbuf_mask)) { 1983 /* Make the mappings match the current address */ 1984 subbuf = rb_subbufs_from_meta(meta); 1985 delta = (unsigned long)subbuf - meta->first_buffer; 1986 meta->first_buffer += delta; 1987 meta->head_buffer += delta; 1988 meta->commit_buffer += delta; 1989 buffer->kaslr_addr = meta->kaslr_addr; 1990 continue; 1991 } 1992 1993 if (cpu < nr_cpu_ids - 1) 1994 next_meta = rb_range_meta(buffer, nr_pages, cpu + 1); 1995 else 1996 next_meta = (void *)buffer->range_addr_end; 1997 1998 memset(meta, 0, next_meta - (void *)meta); 1999 2000 meta->nr_subbufs = nr_pages + 1; 2001 meta->subbuf_size = PAGE_SIZE; 2002 2003 subbuf = rb_subbufs_from_meta(meta); 2004 2005 meta->first_buffer = (unsigned long)subbuf; 2006 rb_meta_init_text_addr(meta); 2007 2008 /* 2009 * The buffers[] array holds the order of the sub-buffers 2010 * that are after the meta data. The sub-buffers may 2011 * be swapped out when read and inserted into a different 2012 * location of the ring buffer. Although their addresses 2013 * remain the same, the buffers[] array contains the 2014 * index into the sub-buffers holding their actual order. 2015 */ 2016 for (i = 0; i < meta->nr_subbufs; i++) { 2017 meta->buffers[i] = i; 2018 rb_init_page(subbuf); 2019 subbuf += meta->subbuf_size; 2020 } 2021 } 2022 bitmap_free(subbuf_mask); 2023 } 2024 2025 static void *rbm_start(struct seq_file *m, loff_t *pos) 2026 { 2027 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2028 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2029 unsigned long val; 2030 2031 if (!meta) 2032 return NULL; 2033 2034 if (*pos > meta->nr_subbufs) 2035 return NULL; 2036 2037 val = *pos; 2038 val++; 2039 2040 return (void *)val; 2041 } 2042 2043 static void *rbm_next(struct seq_file *m, void *v, loff_t *pos) 2044 { 2045 (*pos)++; 2046 2047 return rbm_start(m, pos); 2048 } 2049 2050 static int rbm_show(struct seq_file *m, void *v) 2051 { 2052 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2053 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2054 unsigned long val = (unsigned long)v; 2055 2056 if (val == 1) { 2057 seq_printf(m, "head_buffer: %d\n", 2058 rb_meta_subbuf_idx(meta, (void *)meta->head_buffer)); 2059 seq_printf(m, "commit_buffer: %d\n", 2060 rb_meta_subbuf_idx(meta, (void *)meta->commit_buffer)); 2061 seq_printf(m, "subbuf_size: %d\n", meta->subbuf_size); 2062 seq_printf(m, "nr_subbufs: %d\n", meta->nr_subbufs); 2063 return 0; 2064 } 2065 2066 val -= 2; 2067 seq_printf(m, "buffer[%ld]: %d\n", val, meta->buffers[val]); 2068 2069 return 0; 2070 } 2071 2072 static void rbm_stop(struct seq_file *m, void *p) 2073 { 2074 } 2075 2076 static const struct seq_operations rb_meta_seq_ops = { 2077 .start = rbm_start, 2078 .next = rbm_next, 2079 .show = rbm_show, 2080 .stop = rbm_stop, 2081 }; 2082 2083 int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu) 2084 { 2085 struct seq_file *m; 2086 int ret; 2087 2088 ret = seq_open(file, &rb_meta_seq_ops); 2089 if (ret) 2090 return ret; 2091 2092 m = file->private_data; 2093 m->private = buffer->buffers[cpu]; 2094 2095 return 0; 2096 } 2097 2098 /* Map the buffer_pages to the previous head and commit pages */ 2099 static void rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer, 2100 struct buffer_page *bpage) 2101 { 2102 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2103 2104 if (meta->head_buffer == (unsigned long)bpage->page) 2105 cpu_buffer->head_page = bpage; 2106 2107 if (meta->commit_buffer == (unsigned long)bpage->page) { 2108 cpu_buffer->commit_page = bpage; 2109 cpu_buffer->tail_page = bpage; 2110 } 2111 } 2112 2113 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2114 long nr_pages, struct list_head *pages) 2115 { 2116 struct trace_buffer *buffer = cpu_buffer->buffer; 2117 struct ring_buffer_cpu_meta *meta = NULL; 2118 struct buffer_page *bpage, *tmp; 2119 bool user_thread = current->mm != NULL; 2120 gfp_t mflags; 2121 long i; 2122 2123 /* 2124 * Check if the available memory is there first. 2125 * Note, si_mem_available() only gives us a rough estimate of available 2126 * memory. It may not be accurate. But we don't care, we just want 2127 * to prevent doing any allocation when it is obvious that it is 2128 * not going to succeed. 2129 */ 2130 i = si_mem_available(); 2131 if (i < nr_pages) 2132 return -ENOMEM; 2133 2134 /* 2135 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 2136 * gracefully without invoking oom-killer and the system is not 2137 * destabilized. 2138 */ 2139 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 2140 2141 /* 2142 * If a user thread allocates too much, and si_mem_available() 2143 * reports there's enough memory, even though there is not. 2144 * Make sure the OOM killer kills this thread. This can happen 2145 * even with RETRY_MAYFAIL because another task may be doing 2146 * an allocation after this task has taken all memory. 2147 * This is the task the OOM killer needs to take out during this 2148 * loop, even if it was triggered by an allocation somewhere else. 2149 */ 2150 if (user_thread) 2151 set_current_oom_origin(); 2152 2153 if (buffer->range_addr_start) 2154 meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu); 2155 2156 for (i = 0; i < nr_pages; i++) { 2157 struct page *page; 2158 2159 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 2160 mflags, cpu_to_node(cpu_buffer->cpu)); 2161 if (!bpage) 2162 goto free_pages; 2163 2164 rb_check_bpage(cpu_buffer, bpage); 2165 2166 /* 2167 * Append the pages as for mapped buffers we want to keep 2168 * the order 2169 */ 2170 list_add_tail(&bpage->list, pages); 2171 2172 if (meta) { 2173 /* A range was given. Use that for the buffer page */ 2174 bpage->page = rb_range_buffer(cpu_buffer, i + 1); 2175 if (!bpage->page) 2176 goto free_pages; 2177 /* If this is valid from a previous boot */ 2178 if (meta->head_buffer) 2179 rb_meta_buffer_update(cpu_buffer, bpage); 2180 bpage->range = 1; 2181 bpage->id = i + 1; 2182 } else { 2183 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), 2184 mflags | __GFP_COMP | __GFP_ZERO, 2185 cpu_buffer->buffer->subbuf_order); 2186 if (!page) 2187 goto free_pages; 2188 bpage->page = page_address(page); 2189 rb_init_page(bpage->page); 2190 } 2191 bpage->order = cpu_buffer->buffer->subbuf_order; 2192 2193 if (user_thread && fatal_signal_pending(current)) 2194 goto free_pages; 2195 } 2196 if (user_thread) 2197 clear_current_oom_origin(); 2198 2199 return 0; 2200 2201 free_pages: 2202 list_for_each_entry_safe(bpage, tmp, pages, list) { 2203 list_del_init(&bpage->list); 2204 free_buffer_page(bpage); 2205 } 2206 if (user_thread) 2207 clear_current_oom_origin(); 2208 2209 return -ENOMEM; 2210 } 2211 2212 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2213 unsigned long nr_pages) 2214 { 2215 LIST_HEAD(pages); 2216 2217 WARN_ON(!nr_pages); 2218 2219 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 2220 return -ENOMEM; 2221 2222 /* 2223 * The ring buffer page list is a circular list that does not 2224 * start and end with a list head. All page list items point to 2225 * other pages. 2226 */ 2227 cpu_buffer->pages = pages.next; 2228 list_del(&pages); 2229 2230 cpu_buffer->nr_pages = nr_pages; 2231 2232 rb_check_pages(cpu_buffer); 2233 2234 return 0; 2235 } 2236 2237 static struct ring_buffer_per_cpu * 2238 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 2239 { 2240 struct ring_buffer_per_cpu *cpu_buffer; 2241 struct ring_buffer_cpu_meta *meta; 2242 struct buffer_page *bpage; 2243 struct page *page; 2244 int ret; 2245 2246 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 2247 GFP_KERNEL, cpu_to_node(cpu)); 2248 if (!cpu_buffer) 2249 return NULL; 2250 2251 cpu_buffer->cpu = cpu; 2252 cpu_buffer->buffer = buffer; 2253 raw_spin_lock_init(&cpu_buffer->reader_lock); 2254 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 2255 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 2256 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 2257 init_completion(&cpu_buffer->update_done); 2258 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 2259 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 2260 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 2261 mutex_init(&cpu_buffer->mapping_lock); 2262 2263 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 2264 GFP_KERNEL, cpu_to_node(cpu)); 2265 if (!bpage) 2266 goto fail_free_buffer; 2267 2268 rb_check_bpage(cpu_buffer, bpage); 2269 2270 cpu_buffer->reader_page = bpage; 2271 2272 if (buffer->range_addr_start) { 2273 /* 2274 * Range mapped buffers have the same restrictions as memory 2275 * mapped ones do. 2276 */ 2277 cpu_buffer->mapped = 1; 2278 cpu_buffer->ring_meta = rb_range_meta(buffer, nr_pages, cpu); 2279 bpage->page = rb_range_buffer(cpu_buffer, 0); 2280 if (!bpage->page) 2281 goto fail_free_reader; 2282 if (cpu_buffer->ring_meta->head_buffer) 2283 rb_meta_buffer_update(cpu_buffer, bpage); 2284 bpage->range = 1; 2285 } else { 2286 page = alloc_pages_node(cpu_to_node(cpu), 2287 GFP_KERNEL | __GFP_COMP | __GFP_ZERO, 2288 cpu_buffer->buffer->subbuf_order); 2289 if (!page) 2290 goto fail_free_reader; 2291 bpage->page = page_address(page); 2292 rb_init_page(bpage->page); 2293 } 2294 2295 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2296 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2297 2298 ret = rb_allocate_pages(cpu_buffer, nr_pages); 2299 if (ret < 0) 2300 goto fail_free_reader; 2301 2302 rb_meta_validate_events(cpu_buffer); 2303 2304 /* If the boot meta was valid then this has already been updated */ 2305 meta = cpu_buffer->ring_meta; 2306 if (!meta || !meta->head_buffer || 2307 !cpu_buffer->head_page || !cpu_buffer->commit_page || !cpu_buffer->tail_page) { 2308 if (meta && meta->head_buffer && 2309 (cpu_buffer->head_page || cpu_buffer->commit_page || cpu_buffer->tail_page)) { 2310 pr_warn("Ring buffer meta buffers not all mapped\n"); 2311 if (!cpu_buffer->head_page) 2312 pr_warn(" Missing head_page\n"); 2313 if (!cpu_buffer->commit_page) 2314 pr_warn(" Missing commit_page\n"); 2315 if (!cpu_buffer->tail_page) 2316 pr_warn(" Missing tail_page\n"); 2317 } 2318 2319 cpu_buffer->head_page 2320 = list_entry(cpu_buffer->pages, struct buffer_page, list); 2321 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 2322 2323 rb_head_page_activate(cpu_buffer); 2324 2325 if (cpu_buffer->ring_meta) 2326 meta->commit_buffer = meta->head_buffer; 2327 } else { 2328 /* The valid meta buffer still needs to activate the head page */ 2329 rb_head_page_activate(cpu_buffer); 2330 } 2331 2332 return cpu_buffer; 2333 2334 fail_free_reader: 2335 free_buffer_page(cpu_buffer->reader_page); 2336 2337 fail_free_buffer: 2338 kfree(cpu_buffer); 2339 return NULL; 2340 } 2341 2342 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 2343 { 2344 struct list_head *head = cpu_buffer->pages; 2345 struct buffer_page *bpage, *tmp; 2346 2347 irq_work_sync(&cpu_buffer->irq_work.work); 2348 2349 free_buffer_page(cpu_buffer->reader_page); 2350 2351 if (head) { 2352 rb_head_page_deactivate(cpu_buffer); 2353 2354 list_for_each_entry_safe(bpage, tmp, head, list) { 2355 list_del_init(&bpage->list); 2356 free_buffer_page(bpage); 2357 } 2358 bpage = list_entry(head, struct buffer_page, list); 2359 free_buffer_page(bpage); 2360 } 2361 2362 free_page((unsigned long)cpu_buffer->free_page); 2363 2364 kfree(cpu_buffer); 2365 } 2366 2367 static struct trace_buffer *alloc_buffer(unsigned long size, unsigned flags, 2368 int order, unsigned long start, 2369 unsigned long end, 2370 struct lock_class_key *key) 2371 { 2372 struct trace_buffer *buffer; 2373 long nr_pages; 2374 int subbuf_size; 2375 int bsize; 2376 int cpu; 2377 int ret; 2378 2379 /* keep it in its own cache line */ 2380 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 2381 GFP_KERNEL); 2382 if (!buffer) 2383 return NULL; 2384 2385 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 2386 goto fail_free_buffer; 2387 2388 buffer->subbuf_order = order; 2389 subbuf_size = (PAGE_SIZE << order); 2390 buffer->subbuf_size = subbuf_size - BUF_PAGE_HDR_SIZE; 2391 2392 /* Max payload is buffer page size - header (8bytes) */ 2393 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 2394 2395 buffer->flags = flags; 2396 buffer->clock = trace_clock_local; 2397 buffer->reader_lock_key = key; 2398 2399 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 2400 init_waitqueue_head(&buffer->irq_work.waiters); 2401 2402 buffer->cpus = nr_cpu_ids; 2403 2404 bsize = sizeof(void *) * nr_cpu_ids; 2405 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 2406 GFP_KERNEL); 2407 if (!buffer->buffers) 2408 goto fail_free_cpumask; 2409 2410 /* If start/end are specified, then that overrides size */ 2411 if (start && end) { 2412 unsigned long buffers_start; 2413 unsigned long ptr; 2414 int n; 2415 2416 /* Make sure that start is word aligned */ 2417 start = ALIGN(start, sizeof(long)); 2418 2419 /* Subtract the buffer meta data and word aligned */ 2420 buffers_start = start + sizeof(struct ring_buffer_cpu_meta); 2421 buffers_start = ALIGN(buffers_start, sizeof(long)); 2422 2423 size = end - buffers_start; 2424 size = size / nr_cpu_ids; 2425 2426 /* 2427 * The number of sub-buffers (nr_pages) is determined by the 2428 * total size allocated minus the meta data size. 2429 * Then that is divided by the number of per CPU buffers 2430 * needed, plus account for the integer array index that 2431 * will be appended to the meta data. 2432 */ 2433 nr_pages = (size - sizeof(struct ring_buffer_cpu_meta)) / 2434 (subbuf_size + sizeof(int)); 2435 /* Need at least two pages plus the reader page */ 2436 if (nr_pages < 3) 2437 goto fail_free_buffers; 2438 2439 again: 2440 /* Make sure that the size fits aligned */ 2441 for (n = 0, ptr = buffers_start; n < nr_cpu_ids; n++) { 2442 ptr += sizeof(struct ring_buffer_cpu_meta) + 2443 sizeof(int) * nr_pages; 2444 ptr = ALIGN(ptr, subbuf_size); 2445 ptr += subbuf_size * nr_pages; 2446 } 2447 if (ptr > end) { 2448 if (nr_pages <= 3) 2449 goto fail_free_buffers; 2450 nr_pages--; 2451 goto again; 2452 } 2453 2454 /* nr_pages should not count the reader page */ 2455 nr_pages--; 2456 buffer->range_addr_start = start; 2457 buffer->range_addr_end = end; 2458 2459 rb_range_meta_init(buffer, nr_pages); 2460 } else { 2461 2462 /* need at least two pages */ 2463 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2464 if (nr_pages < 2) 2465 nr_pages = 2; 2466 } 2467 2468 cpu = raw_smp_processor_id(); 2469 cpumask_set_cpu(cpu, buffer->cpumask); 2470 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 2471 if (!buffer->buffers[cpu]) 2472 goto fail_free_buffers; 2473 2474 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2475 if (ret < 0) 2476 goto fail_free_buffers; 2477 2478 mutex_init(&buffer->mutex); 2479 2480 return buffer; 2481 2482 fail_free_buffers: 2483 for_each_buffer_cpu(buffer, cpu) { 2484 if (buffer->buffers[cpu]) 2485 rb_free_cpu_buffer(buffer->buffers[cpu]); 2486 } 2487 kfree(buffer->buffers); 2488 2489 fail_free_cpumask: 2490 free_cpumask_var(buffer->cpumask); 2491 2492 fail_free_buffer: 2493 kfree(buffer); 2494 return NULL; 2495 } 2496 2497 /** 2498 * __ring_buffer_alloc - allocate a new ring_buffer 2499 * @size: the size in bytes per cpu that is needed. 2500 * @flags: attributes to set for the ring buffer. 2501 * @key: ring buffer reader_lock_key. 2502 * 2503 * Currently the only flag that is available is the RB_FL_OVERWRITE 2504 * flag. This flag means that the buffer will overwrite old data 2505 * when the buffer wraps. If this flag is not set, the buffer will 2506 * drop data when the tail hits the head. 2507 */ 2508 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 2509 struct lock_class_key *key) 2510 { 2511 /* Default buffer page size - one system page */ 2512 return alloc_buffer(size, flags, 0, 0, 0,key); 2513 2514 } 2515 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 2516 2517 /** 2518 * __ring_buffer_alloc_range - allocate a new ring_buffer from existing memory 2519 * @size: the size in bytes per cpu that is needed. 2520 * @flags: attributes to set for the ring buffer. 2521 * @order: sub-buffer order 2522 * @start: start of allocated range 2523 * @range_size: size of allocated range 2524 * @key: ring buffer reader_lock_key. 2525 * 2526 * Currently the only flag that is available is the RB_FL_OVERWRITE 2527 * flag. This flag means that the buffer will overwrite old data 2528 * when the buffer wraps. If this flag is not set, the buffer will 2529 * drop data when the tail hits the head. 2530 */ 2531 struct trace_buffer *__ring_buffer_alloc_range(unsigned long size, unsigned flags, 2532 int order, unsigned long start, 2533 unsigned long range_size, 2534 struct lock_class_key *key) 2535 { 2536 return alloc_buffer(size, flags, order, start, start + range_size, key); 2537 } 2538 2539 /** 2540 * ring_buffer_last_boot_delta - return the delta offset from last boot 2541 * @buffer: The buffer to return the delta from 2542 * @text: Return text delta 2543 * @data: Return data delta 2544 * 2545 * Returns: The true if the delta is non zero 2546 */ 2547 bool ring_buffer_last_boot_delta(struct trace_buffer *buffer, unsigned long *kaslr_addr) 2548 { 2549 if (!buffer) 2550 return false; 2551 2552 if (!buffer->kaslr_addr) 2553 return false; 2554 2555 *kaslr_addr = buffer->kaslr_addr; 2556 2557 return true; 2558 } 2559 2560 /** 2561 * ring_buffer_free - free a ring buffer. 2562 * @buffer: the buffer to free. 2563 */ 2564 void 2565 ring_buffer_free(struct trace_buffer *buffer) 2566 { 2567 int cpu; 2568 2569 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2570 2571 irq_work_sync(&buffer->irq_work.work); 2572 2573 for_each_buffer_cpu(buffer, cpu) 2574 rb_free_cpu_buffer(buffer->buffers[cpu]); 2575 2576 kfree(buffer->buffers); 2577 free_cpumask_var(buffer->cpumask); 2578 2579 kfree(buffer); 2580 } 2581 EXPORT_SYMBOL_GPL(ring_buffer_free); 2582 2583 void ring_buffer_set_clock(struct trace_buffer *buffer, 2584 u64 (*clock)(void)) 2585 { 2586 buffer->clock = clock; 2587 } 2588 2589 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 2590 { 2591 buffer->time_stamp_abs = abs; 2592 } 2593 2594 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 2595 { 2596 return buffer->time_stamp_abs; 2597 } 2598 2599 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 2600 { 2601 return local_read(&bpage->entries) & RB_WRITE_MASK; 2602 } 2603 2604 static inline unsigned long rb_page_write(struct buffer_page *bpage) 2605 { 2606 return local_read(&bpage->write) & RB_WRITE_MASK; 2607 } 2608 2609 static bool 2610 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 2611 { 2612 struct list_head *tail_page, *to_remove, *next_page; 2613 struct buffer_page *to_remove_page, *tmp_iter_page; 2614 struct buffer_page *last_page, *first_page; 2615 unsigned long nr_removed; 2616 unsigned long head_bit; 2617 int page_entries; 2618 2619 head_bit = 0; 2620 2621 raw_spin_lock_irq(&cpu_buffer->reader_lock); 2622 atomic_inc(&cpu_buffer->record_disabled); 2623 /* 2624 * We don't race with the readers since we have acquired the reader 2625 * lock. We also don't race with writers after disabling recording. 2626 * This makes it easy to figure out the first and the last page to be 2627 * removed from the list. We unlink all the pages in between including 2628 * the first and last pages. This is done in a busy loop so that we 2629 * lose the least number of traces. 2630 * The pages are freed after we restart recording and unlock readers. 2631 */ 2632 tail_page = &cpu_buffer->tail_page->list; 2633 2634 /* 2635 * tail page might be on reader page, we remove the next page 2636 * from the ring buffer 2637 */ 2638 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 2639 tail_page = rb_list_head(tail_page->next); 2640 to_remove = tail_page; 2641 2642 /* start of pages to remove */ 2643 first_page = list_entry(rb_list_head(to_remove->next), 2644 struct buffer_page, list); 2645 2646 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 2647 to_remove = rb_list_head(to_remove)->next; 2648 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 2649 } 2650 /* Read iterators need to reset themselves when some pages removed */ 2651 cpu_buffer->pages_removed += nr_removed; 2652 2653 next_page = rb_list_head(to_remove)->next; 2654 2655 /* 2656 * Now we remove all pages between tail_page and next_page. 2657 * Make sure that we have head_bit value preserved for the 2658 * next page 2659 */ 2660 tail_page->next = (struct list_head *)((unsigned long)next_page | 2661 head_bit); 2662 next_page = rb_list_head(next_page); 2663 next_page->prev = tail_page; 2664 2665 /* make sure pages points to a valid page in the ring buffer */ 2666 cpu_buffer->pages = next_page; 2667 cpu_buffer->cnt++; 2668 2669 /* update head page */ 2670 if (head_bit) 2671 cpu_buffer->head_page = list_entry(next_page, 2672 struct buffer_page, list); 2673 2674 /* pages are removed, resume tracing and then free the pages */ 2675 atomic_dec(&cpu_buffer->record_disabled); 2676 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 2677 2678 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 2679 2680 /* last buffer page to remove */ 2681 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 2682 list); 2683 tmp_iter_page = first_page; 2684 2685 do { 2686 cond_resched(); 2687 2688 to_remove_page = tmp_iter_page; 2689 rb_inc_page(&tmp_iter_page); 2690 2691 /* update the counters */ 2692 page_entries = rb_page_entries(to_remove_page); 2693 if (page_entries) { 2694 /* 2695 * If something was added to this page, it was full 2696 * since it is not the tail page. So we deduct the 2697 * bytes consumed in ring buffer from here. 2698 * Increment overrun to account for the lost events. 2699 */ 2700 local_add(page_entries, &cpu_buffer->overrun); 2701 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 2702 local_inc(&cpu_buffer->pages_lost); 2703 } 2704 2705 /* 2706 * We have already removed references to this list item, just 2707 * free up the buffer_page and its page 2708 */ 2709 free_buffer_page(to_remove_page); 2710 nr_removed--; 2711 2712 } while (to_remove_page != last_page); 2713 2714 RB_WARN_ON(cpu_buffer, nr_removed); 2715 2716 return nr_removed == 0; 2717 } 2718 2719 static bool 2720 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2721 { 2722 struct list_head *pages = &cpu_buffer->new_pages; 2723 unsigned long flags; 2724 bool success; 2725 int retries; 2726 2727 /* Can be called at early boot up, where interrupts must not been enabled */ 2728 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2729 /* 2730 * We are holding the reader lock, so the reader page won't be swapped 2731 * in the ring buffer. Now we are racing with the writer trying to 2732 * move head page and the tail page. 2733 * We are going to adapt the reader page update process where: 2734 * 1. We first splice the start and end of list of new pages between 2735 * the head page and its previous page. 2736 * 2. We cmpxchg the prev_page->next to point from head page to the 2737 * start of new pages list. 2738 * 3. Finally, we update the head->prev to the end of new list. 2739 * 2740 * We will try this process 10 times, to make sure that we don't keep 2741 * spinning. 2742 */ 2743 retries = 10; 2744 success = false; 2745 while (retries--) { 2746 struct list_head *head_page, *prev_page; 2747 struct list_head *last_page, *first_page; 2748 struct list_head *head_page_with_bit; 2749 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2750 2751 if (!hpage) 2752 break; 2753 head_page = &hpage->list; 2754 prev_page = head_page->prev; 2755 2756 first_page = pages->next; 2757 last_page = pages->prev; 2758 2759 head_page_with_bit = (struct list_head *) 2760 ((unsigned long)head_page | RB_PAGE_HEAD); 2761 2762 last_page->next = head_page_with_bit; 2763 first_page->prev = prev_page; 2764 2765 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 2766 if (try_cmpxchg(&prev_page->next, 2767 &head_page_with_bit, first_page)) { 2768 /* 2769 * yay, we replaced the page pointer to our new list, 2770 * now, we just have to update to head page's prev 2771 * pointer to point to end of list 2772 */ 2773 head_page->prev = last_page; 2774 cpu_buffer->cnt++; 2775 success = true; 2776 break; 2777 } 2778 } 2779 2780 if (success) 2781 INIT_LIST_HEAD(pages); 2782 /* 2783 * If we weren't successful in adding in new pages, warn and stop 2784 * tracing 2785 */ 2786 RB_WARN_ON(cpu_buffer, !success); 2787 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2788 2789 /* free pages if they weren't inserted */ 2790 if (!success) { 2791 struct buffer_page *bpage, *tmp; 2792 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2793 list) { 2794 list_del_init(&bpage->list); 2795 free_buffer_page(bpage); 2796 } 2797 } 2798 return success; 2799 } 2800 2801 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2802 { 2803 bool success; 2804 2805 if (cpu_buffer->nr_pages_to_update > 0) 2806 success = rb_insert_pages(cpu_buffer); 2807 else 2808 success = rb_remove_pages(cpu_buffer, 2809 -cpu_buffer->nr_pages_to_update); 2810 2811 if (success) 2812 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2813 } 2814 2815 static void update_pages_handler(struct work_struct *work) 2816 { 2817 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2818 struct ring_buffer_per_cpu, update_pages_work); 2819 rb_update_pages(cpu_buffer); 2820 complete(&cpu_buffer->update_done); 2821 } 2822 2823 /** 2824 * ring_buffer_resize - resize the ring buffer 2825 * @buffer: the buffer to resize. 2826 * @size: the new size. 2827 * @cpu_id: the cpu buffer to resize 2828 * 2829 * Minimum size is 2 * buffer->subbuf_size. 2830 * 2831 * Returns 0 on success and < 0 on failure. 2832 */ 2833 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2834 int cpu_id) 2835 { 2836 struct ring_buffer_per_cpu *cpu_buffer; 2837 unsigned long nr_pages; 2838 int cpu, err; 2839 2840 /* 2841 * Always succeed at resizing a non-existent buffer: 2842 */ 2843 if (!buffer) 2844 return 0; 2845 2846 /* Make sure the requested buffer exists */ 2847 if (cpu_id != RING_BUFFER_ALL_CPUS && 2848 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2849 return 0; 2850 2851 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2852 2853 /* we need a minimum of two pages */ 2854 if (nr_pages < 2) 2855 nr_pages = 2; 2856 2857 /* prevent another thread from changing buffer sizes */ 2858 mutex_lock(&buffer->mutex); 2859 atomic_inc(&buffer->resizing); 2860 2861 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2862 /* 2863 * Don't succeed if resizing is disabled, as a reader might be 2864 * manipulating the ring buffer and is expecting a sane state while 2865 * this is true. 2866 */ 2867 for_each_buffer_cpu(buffer, cpu) { 2868 cpu_buffer = buffer->buffers[cpu]; 2869 if (atomic_read(&cpu_buffer->resize_disabled)) { 2870 err = -EBUSY; 2871 goto out_err_unlock; 2872 } 2873 } 2874 2875 /* calculate the pages to update */ 2876 for_each_buffer_cpu(buffer, cpu) { 2877 cpu_buffer = buffer->buffers[cpu]; 2878 2879 cpu_buffer->nr_pages_to_update = nr_pages - 2880 cpu_buffer->nr_pages; 2881 /* 2882 * nothing more to do for removing pages or no update 2883 */ 2884 if (cpu_buffer->nr_pages_to_update <= 0) 2885 continue; 2886 /* 2887 * to add pages, make sure all new pages can be 2888 * allocated without receiving ENOMEM 2889 */ 2890 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2891 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2892 &cpu_buffer->new_pages)) { 2893 /* not enough memory for new pages */ 2894 err = -ENOMEM; 2895 goto out_err; 2896 } 2897 2898 cond_resched(); 2899 } 2900 2901 cpus_read_lock(); 2902 /* 2903 * Fire off all the required work handlers 2904 * We can't schedule on offline CPUs, but it's not necessary 2905 * since we can change their buffer sizes without any race. 2906 */ 2907 for_each_buffer_cpu(buffer, cpu) { 2908 cpu_buffer = buffer->buffers[cpu]; 2909 if (!cpu_buffer->nr_pages_to_update) 2910 continue; 2911 2912 /* Can't run something on an offline CPU. */ 2913 if (!cpu_online(cpu)) { 2914 rb_update_pages(cpu_buffer); 2915 cpu_buffer->nr_pages_to_update = 0; 2916 } else { 2917 /* Run directly if possible. */ 2918 migrate_disable(); 2919 if (cpu != smp_processor_id()) { 2920 migrate_enable(); 2921 schedule_work_on(cpu, 2922 &cpu_buffer->update_pages_work); 2923 } else { 2924 update_pages_handler(&cpu_buffer->update_pages_work); 2925 migrate_enable(); 2926 } 2927 } 2928 } 2929 2930 /* wait for all the updates to complete */ 2931 for_each_buffer_cpu(buffer, cpu) { 2932 cpu_buffer = buffer->buffers[cpu]; 2933 if (!cpu_buffer->nr_pages_to_update) 2934 continue; 2935 2936 if (cpu_online(cpu)) 2937 wait_for_completion(&cpu_buffer->update_done); 2938 cpu_buffer->nr_pages_to_update = 0; 2939 } 2940 2941 cpus_read_unlock(); 2942 } else { 2943 cpu_buffer = buffer->buffers[cpu_id]; 2944 2945 if (nr_pages == cpu_buffer->nr_pages) 2946 goto out; 2947 2948 /* 2949 * Don't succeed if resizing is disabled, as a reader might be 2950 * manipulating the ring buffer and is expecting a sane state while 2951 * this is true. 2952 */ 2953 if (atomic_read(&cpu_buffer->resize_disabled)) { 2954 err = -EBUSY; 2955 goto out_err_unlock; 2956 } 2957 2958 cpu_buffer->nr_pages_to_update = nr_pages - 2959 cpu_buffer->nr_pages; 2960 2961 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2962 if (cpu_buffer->nr_pages_to_update > 0 && 2963 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2964 &cpu_buffer->new_pages)) { 2965 err = -ENOMEM; 2966 goto out_err; 2967 } 2968 2969 cpus_read_lock(); 2970 2971 /* Can't run something on an offline CPU. */ 2972 if (!cpu_online(cpu_id)) 2973 rb_update_pages(cpu_buffer); 2974 else { 2975 /* Run directly if possible. */ 2976 migrate_disable(); 2977 if (cpu_id == smp_processor_id()) { 2978 rb_update_pages(cpu_buffer); 2979 migrate_enable(); 2980 } else { 2981 migrate_enable(); 2982 schedule_work_on(cpu_id, 2983 &cpu_buffer->update_pages_work); 2984 wait_for_completion(&cpu_buffer->update_done); 2985 } 2986 } 2987 2988 cpu_buffer->nr_pages_to_update = 0; 2989 cpus_read_unlock(); 2990 } 2991 2992 out: 2993 /* 2994 * The ring buffer resize can happen with the ring buffer 2995 * enabled, so that the update disturbs the tracing as little 2996 * as possible. But if the buffer is disabled, we do not need 2997 * to worry about that, and we can take the time to verify 2998 * that the buffer is not corrupt. 2999 */ 3000 if (atomic_read(&buffer->record_disabled)) { 3001 atomic_inc(&buffer->record_disabled); 3002 /* 3003 * Even though the buffer was disabled, we must make sure 3004 * that it is truly disabled before calling rb_check_pages. 3005 * There could have been a race between checking 3006 * record_disable and incrementing it. 3007 */ 3008 synchronize_rcu(); 3009 for_each_buffer_cpu(buffer, cpu) { 3010 cpu_buffer = buffer->buffers[cpu]; 3011 rb_check_pages(cpu_buffer); 3012 } 3013 atomic_dec(&buffer->record_disabled); 3014 } 3015 3016 atomic_dec(&buffer->resizing); 3017 mutex_unlock(&buffer->mutex); 3018 return 0; 3019 3020 out_err: 3021 for_each_buffer_cpu(buffer, cpu) { 3022 struct buffer_page *bpage, *tmp; 3023 3024 cpu_buffer = buffer->buffers[cpu]; 3025 cpu_buffer->nr_pages_to_update = 0; 3026 3027 if (list_empty(&cpu_buffer->new_pages)) 3028 continue; 3029 3030 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 3031 list) { 3032 list_del_init(&bpage->list); 3033 free_buffer_page(bpage); 3034 } 3035 } 3036 out_err_unlock: 3037 atomic_dec(&buffer->resizing); 3038 mutex_unlock(&buffer->mutex); 3039 return err; 3040 } 3041 EXPORT_SYMBOL_GPL(ring_buffer_resize); 3042 3043 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 3044 { 3045 mutex_lock(&buffer->mutex); 3046 if (val) 3047 buffer->flags |= RB_FL_OVERWRITE; 3048 else 3049 buffer->flags &= ~RB_FL_OVERWRITE; 3050 mutex_unlock(&buffer->mutex); 3051 } 3052 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 3053 3054 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 3055 { 3056 return bpage->page->data + index; 3057 } 3058 3059 static __always_inline struct ring_buffer_event * 3060 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 3061 { 3062 return __rb_page_index(cpu_buffer->reader_page, 3063 cpu_buffer->reader_page->read); 3064 } 3065 3066 static struct ring_buffer_event * 3067 rb_iter_head_event(struct ring_buffer_iter *iter) 3068 { 3069 struct ring_buffer_event *event; 3070 struct buffer_page *iter_head_page = iter->head_page; 3071 unsigned long commit; 3072 unsigned length; 3073 3074 if (iter->head != iter->next_event) 3075 return iter->event; 3076 3077 /* 3078 * When the writer goes across pages, it issues a cmpxchg which 3079 * is a mb(), which will synchronize with the rmb here. 3080 * (see rb_tail_page_update() and __rb_reserve_next()) 3081 */ 3082 commit = rb_page_commit(iter_head_page); 3083 smp_rmb(); 3084 3085 /* An event needs to be at least 8 bytes in size */ 3086 if (iter->head > commit - 8) 3087 goto reset; 3088 3089 event = __rb_page_index(iter_head_page, iter->head); 3090 length = rb_event_length(event); 3091 3092 /* 3093 * READ_ONCE() doesn't work on functions and we don't want the 3094 * compiler doing any crazy optimizations with length. 3095 */ 3096 barrier(); 3097 3098 if ((iter->head + length) > commit || length > iter->event_size) 3099 /* Writer corrupted the read? */ 3100 goto reset; 3101 3102 memcpy(iter->event, event, length); 3103 /* 3104 * If the page stamp is still the same after this rmb() then the 3105 * event was safely copied without the writer entering the page. 3106 */ 3107 smp_rmb(); 3108 3109 /* Make sure the page didn't change since we read this */ 3110 if (iter->page_stamp != iter_head_page->page->time_stamp || 3111 commit > rb_page_commit(iter_head_page)) 3112 goto reset; 3113 3114 iter->next_event = iter->head + length; 3115 return iter->event; 3116 reset: 3117 /* Reset to the beginning */ 3118 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3119 iter->head = 0; 3120 iter->next_event = 0; 3121 iter->missed_events = 1; 3122 return NULL; 3123 } 3124 3125 /* Size is determined by what has been committed */ 3126 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 3127 { 3128 return rb_page_commit(bpage) & ~RB_MISSED_MASK; 3129 } 3130 3131 static __always_inline unsigned 3132 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 3133 { 3134 return rb_page_commit(cpu_buffer->commit_page); 3135 } 3136 3137 static __always_inline unsigned 3138 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 3139 { 3140 unsigned long addr = (unsigned long)event; 3141 3142 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 3143 3144 return addr - BUF_PAGE_HDR_SIZE; 3145 } 3146 3147 static void rb_inc_iter(struct ring_buffer_iter *iter) 3148 { 3149 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3150 3151 /* 3152 * The iterator could be on the reader page (it starts there). 3153 * But the head could have moved, since the reader was 3154 * found. Check for this case and assign the iterator 3155 * to the head page instead of next. 3156 */ 3157 if (iter->head_page == cpu_buffer->reader_page) 3158 iter->head_page = rb_set_head_page(cpu_buffer); 3159 else 3160 rb_inc_page(&iter->head_page); 3161 3162 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3163 iter->head = 0; 3164 iter->next_event = 0; 3165 } 3166 3167 /* Return the index into the sub-buffers for a given sub-buffer */ 3168 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf) 3169 { 3170 void *subbuf_array; 3171 3172 subbuf_array = (void *)meta + sizeof(int) * meta->nr_subbufs; 3173 subbuf_array = (void *)ALIGN((unsigned long)subbuf_array, meta->subbuf_size); 3174 return (subbuf - subbuf_array) / meta->subbuf_size; 3175 } 3176 3177 static void rb_update_meta_head(struct ring_buffer_per_cpu *cpu_buffer, 3178 struct buffer_page *next_page) 3179 { 3180 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3181 unsigned long old_head = (unsigned long)next_page->page; 3182 unsigned long new_head; 3183 3184 rb_inc_page(&next_page); 3185 new_head = (unsigned long)next_page->page; 3186 3187 /* 3188 * Only move it forward once, if something else came in and 3189 * moved it forward, then we don't want to touch it. 3190 */ 3191 (void)cmpxchg(&meta->head_buffer, old_head, new_head); 3192 } 3193 3194 static void rb_update_meta_reader(struct ring_buffer_per_cpu *cpu_buffer, 3195 struct buffer_page *reader) 3196 { 3197 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3198 void *old_reader = cpu_buffer->reader_page->page; 3199 void *new_reader = reader->page; 3200 int id; 3201 3202 id = reader->id; 3203 cpu_buffer->reader_page->id = id; 3204 reader->id = 0; 3205 3206 meta->buffers[0] = rb_meta_subbuf_idx(meta, new_reader); 3207 meta->buffers[id] = rb_meta_subbuf_idx(meta, old_reader); 3208 3209 /* The head pointer is the one after the reader */ 3210 rb_update_meta_head(cpu_buffer, reader); 3211 } 3212 3213 /* 3214 * rb_handle_head_page - writer hit the head page 3215 * 3216 * Returns: +1 to retry page 3217 * 0 to continue 3218 * -1 on error 3219 */ 3220 static int 3221 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 3222 struct buffer_page *tail_page, 3223 struct buffer_page *next_page) 3224 { 3225 struct buffer_page *new_head; 3226 int entries; 3227 int type; 3228 int ret; 3229 3230 entries = rb_page_entries(next_page); 3231 3232 /* 3233 * The hard part is here. We need to move the head 3234 * forward, and protect against both readers on 3235 * other CPUs and writers coming in via interrupts. 3236 */ 3237 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 3238 RB_PAGE_HEAD); 3239 3240 /* 3241 * type can be one of four: 3242 * NORMAL - an interrupt already moved it for us 3243 * HEAD - we are the first to get here. 3244 * UPDATE - we are the interrupt interrupting 3245 * a current move. 3246 * MOVED - a reader on another CPU moved the next 3247 * pointer to its reader page. Give up 3248 * and try again. 3249 */ 3250 3251 switch (type) { 3252 case RB_PAGE_HEAD: 3253 /* 3254 * We changed the head to UPDATE, thus 3255 * it is our responsibility to update 3256 * the counters. 3257 */ 3258 local_add(entries, &cpu_buffer->overrun); 3259 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 3260 local_inc(&cpu_buffer->pages_lost); 3261 3262 if (cpu_buffer->ring_meta) 3263 rb_update_meta_head(cpu_buffer, next_page); 3264 /* 3265 * The entries will be zeroed out when we move the 3266 * tail page. 3267 */ 3268 3269 /* still more to do */ 3270 break; 3271 3272 case RB_PAGE_UPDATE: 3273 /* 3274 * This is an interrupt that interrupt the 3275 * previous update. Still more to do. 3276 */ 3277 break; 3278 case RB_PAGE_NORMAL: 3279 /* 3280 * An interrupt came in before the update 3281 * and processed this for us. 3282 * Nothing left to do. 3283 */ 3284 return 1; 3285 case RB_PAGE_MOVED: 3286 /* 3287 * The reader is on another CPU and just did 3288 * a swap with our next_page. 3289 * Try again. 3290 */ 3291 return 1; 3292 default: 3293 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 3294 return -1; 3295 } 3296 3297 /* 3298 * Now that we are here, the old head pointer is 3299 * set to UPDATE. This will keep the reader from 3300 * swapping the head page with the reader page. 3301 * The reader (on another CPU) will spin till 3302 * we are finished. 3303 * 3304 * We just need to protect against interrupts 3305 * doing the job. We will set the next pointer 3306 * to HEAD. After that, we set the old pointer 3307 * to NORMAL, but only if it was HEAD before. 3308 * otherwise we are an interrupt, and only 3309 * want the outer most commit to reset it. 3310 */ 3311 new_head = next_page; 3312 rb_inc_page(&new_head); 3313 3314 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 3315 RB_PAGE_NORMAL); 3316 3317 /* 3318 * Valid returns are: 3319 * HEAD - an interrupt came in and already set it. 3320 * NORMAL - One of two things: 3321 * 1) We really set it. 3322 * 2) A bunch of interrupts came in and moved 3323 * the page forward again. 3324 */ 3325 switch (ret) { 3326 case RB_PAGE_HEAD: 3327 case RB_PAGE_NORMAL: 3328 /* OK */ 3329 break; 3330 default: 3331 RB_WARN_ON(cpu_buffer, 1); 3332 return -1; 3333 } 3334 3335 /* 3336 * It is possible that an interrupt came in, 3337 * set the head up, then more interrupts came in 3338 * and moved it again. When we get back here, 3339 * the page would have been set to NORMAL but we 3340 * just set it back to HEAD. 3341 * 3342 * How do you detect this? Well, if that happened 3343 * the tail page would have moved. 3344 */ 3345 if (ret == RB_PAGE_NORMAL) { 3346 struct buffer_page *buffer_tail_page; 3347 3348 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 3349 /* 3350 * If the tail had moved passed next, then we need 3351 * to reset the pointer. 3352 */ 3353 if (buffer_tail_page != tail_page && 3354 buffer_tail_page != next_page) 3355 rb_head_page_set_normal(cpu_buffer, new_head, 3356 next_page, 3357 RB_PAGE_HEAD); 3358 } 3359 3360 /* 3361 * If this was the outer most commit (the one that 3362 * changed the original pointer from HEAD to UPDATE), 3363 * then it is up to us to reset it to NORMAL. 3364 */ 3365 if (type == RB_PAGE_HEAD) { 3366 ret = rb_head_page_set_normal(cpu_buffer, next_page, 3367 tail_page, 3368 RB_PAGE_UPDATE); 3369 if (RB_WARN_ON(cpu_buffer, 3370 ret != RB_PAGE_UPDATE)) 3371 return -1; 3372 } 3373 3374 return 0; 3375 } 3376 3377 static inline void 3378 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 3379 unsigned long tail, struct rb_event_info *info) 3380 { 3381 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 3382 struct buffer_page *tail_page = info->tail_page; 3383 struct ring_buffer_event *event; 3384 unsigned long length = info->length; 3385 3386 /* 3387 * Only the event that crossed the page boundary 3388 * must fill the old tail_page with padding. 3389 */ 3390 if (tail >= bsize) { 3391 /* 3392 * If the page was filled, then we still need 3393 * to update the real_end. Reset it to zero 3394 * and the reader will ignore it. 3395 */ 3396 if (tail == bsize) 3397 tail_page->real_end = 0; 3398 3399 local_sub(length, &tail_page->write); 3400 return; 3401 } 3402 3403 event = __rb_page_index(tail_page, tail); 3404 3405 /* 3406 * Save the original length to the meta data. 3407 * This will be used by the reader to add lost event 3408 * counter. 3409 */ 3410 tail_page->real_end = tail; 3411 3412 /* 3413 * If this event is bigger than the minimum size, then 3414 * we need to be careful that we don't subtract the 3415 * write counter enough to allow another writer to slip 3416 * in on this page. 3417 * We put in a discarded commit instead, to make sure 3418 * that this space is not used again, and this space will 3419 * not be accounted into 'entries_bytes'. 3420 * 3421 * If we are less than the minimum size, we don't need to 3422 * worry about it. 3423 */ 3424 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 3425 /* No room for any events */ 3426 3427 /* Mark the rest of the page with padding */ 3428 rb_event_set_padding(event); 3429 3430 /* Make sure the padding is visible before the write update */ 3431 smp_wmb(); 3432 3433 /* Set the write back to the previous setting */ 3434 local_sub(length, &tail_page->write); 3435 return; 3436 } 3437 3438 /* Put in a discarded event */ 3439 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 3440 event->type_len = RINGBUF_TYPE_PADDING; 3441 /* time delta must be non zero */ 3442 event->time_delta = 1; 3443 3444 /* account for padding bytes */ 3445 local_add(bsize - tail, &cpu_buffer->entries_bytes); 3446 3447 /* Make sure the padding is visible before the tail_page->write update */ 3448 smp_wmb(); 3449 3450 /* Set write to end of buffer */ 3451 length = (tail + length) - bsize; 3452 local_sub(length, &tail_page->write); 3453 } 3454 3455 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 3456 3457 /* 3458 * This is the slow path, force gcc not to inline it. 3459 */ 3460 static noinline struct ring_buffer_event * 3461 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 3462 unsigned long tail, struct rb_event_info *info) 3463 { 3464 struct buffer_page *tail_page = info->tail_page; 3465 struct buffer_page *commit_page = cpu_buffer->commit_page; 3466 struct trace_buffer *buffer = cpu_buffer->buffer; 3467 struct buffer_page *next_page; 3468 int ret; 3469 3470 next_page = tail_page; 3471 3472 rb_inc_page(&next_page); 3473 3474 /* 3475 * If for some reason, we had an interrupt storm that made 3476 * it all the way around the buffer, bail, and warn 3477 * about it. 3478 */ 3479 if (unlikely(next_page == commit_page)) { 3480 local_inc(&cpu_buffer->commit_overrun); 3481 goto out_reset; 3482 } 3483 3484 /* 3485 * This is where the fun begins! 3486 * 3487 * We are fighting against races between a reader that 3488 * could be on another CPU trying to swap its reader 3489 * page with the buffer head. 3490 * 3491 * We are also fighting against interrupts coming in and 3492 * moving the head or tail on us as well. 3493 * 3494 * If the next page is the head page then we have filled 3495 * the buffer, unless the commit page is still on the 3496 * reader page. 3497 */ 3498 if (rb_is_head_page(next_page, &tail_page->list)) { 3499 3500 /* 3501 * If the commit is not on the reader page, then 3502 * move the header page. 3503 */ 3504 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 3505 /* 3506 * If we are not in overwrite mode, 3507 * this is easy, just stop here. 3508 */ 3509 if (!(buffer->flags & RB_FL_OVERWRITE)) { 3510 local_inc(&cpu_buffer->dropped_events); 3511 goto out_reset; 3512 } 3513 3514 ret = rb_handle_head_page(cpu_buffer, 3515 tail_page, 3516 next_page); 3517 if (ret < 0) 3518 goto out_reset; 3519 if (ret) 3520 goto out_again; 3521 } else { 3522 /* 3523 * We need to be careful here too. The 3524 * commit page could still be on the reader 3525 * page. We could have a small buffer, and 3526 * have filled up the buffer with events 3527 * from interrupts and such, and wrapped. 3528 * 3529 * Note, if the tail page is also on the 3530 * reader_page, we let it move out. 3531 */ 3532 if (unlikely((cpu_buffer->commit_page != 3533 cpu_buffer->tail_page) && 3534 (cpu_buffer->commit_page == 3535 cpu_buffer->reader_page))) { 3536 local_inc(&cpu_buffer->commit_overrun); 3537 goto out_reset; 3538 } 3539 } 3540 } 3541 3542 rb_tail_page_update(cpu_buffer, tail_page, next_page); 3543 3544 out_again: 3545 3546 rb_reset_tail(cpu_buffer, tail, info); 3547 3548 /* Commit what we have for now. */ 3549 rb_end_commit(cpu_buffer); 3550 /* rb_end_commit() decs committing */ 3551 local_inc(&cpu_buffer->committing); 3552 3553 /* fail and let the caller try again */ 3554 return ERR_PTR(-EAGAIN); 3555 3556 out_reset: 3557 /* reset write */ 3558 rb_reset_tail(cpu_buffer, tail, info); 3559 3560 return NULL; 3561 } 3562 3563 /* Slow path */ 3564 static struct ring_buffer_event * 3565 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3566 struct ring_buffer_event *event, u64 delta, bool abs) 3567 { 3568 if (abs) 3569 event->type_len = RINGBUF_TYPE_TIME_STAMP; 3570 else 3571 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 3572 3573 /* Not the first event on the page, or not delta? */ 3574 if (abs || rb_event_index(cpu_buffer, event)) { 3575 event->time_delta = delta & TS_MASK; 3576 event->array[0] = delta >> TS_SHIFT; 3577 } else { 3578 /* nope, just zero it */ 3579 event->time_delta = 0; 3580 event->array[0] = 0; 3581 } 3582 3583 return skip_time_extend(event); 3584 } 3585 3586 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 3587 static inline bool sched_clock_stable(void) 3588 { 3589 return true; 3590 } 3591 #endif 3592 3593 static void 3594 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3595 struct rb_event_info *info) 3596 { 3597 u64 write_stamp; 3598 3599 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 3600 (unsigned long long)info->delta, 3601 (unsigned long long)info->ts, 3602 (unsigned long long)info->before, 3603 (unsigned long long)info->after, 3604 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 3605 sched_clock_stable() ? "" : 3606 "If you just came from a suspend/resume,\n" 3607 "please switch to the trace global clock:\n" 3608 " echo global > /sys/kernel/tracing/trace_clock\n" 3609 "or add trace_clock=global to the kernel command line\n"); 3610 } 3611 3612 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3613 struct ring_buffer_event **event, 3614 struct rb_event_info *info, 3615 u64 *delta, 3616 unsigned int *length) 3617 { 3618 bool abs = info->add_timestamp & 3619 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 3620 3621 if (unlikely(info->delta > (1ULL << 59))) { 3622 /* 3623 * Some timers can use more than 59 bits, and when a timestamp 3624 * is added to the buffer, it will lose those bits. 3625 */ 3626 if (abs && (info->ts & TS_MSB)) { 3627 info->delta &= ABS_TS_MASK; 3628 3629 /* did the clock go backwards */ 3630 } else if (info->before == info->after && info->before > info->ts) { 3631 /* not interrupted */ 3632 static int once; 3633 3634 /* 3635 * This is possible with a recalibrating of the TSC. 3636 * Do not produce a call stack, but just report it. 3637 */ 3638 if (!once) { 3639 once++; 3640 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 3641 info->before, info->ts); 3642 } 3643 } else 3644 rb_check_timestamp(cpu_buffer, info); 3645 if (!abs) 3646 info->delta = 0; 3647 } 3648 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 3649 *length -= RB_LEN_TIME_EXTEND; 3650 *delta = 0; 3651 } 3652 3653 /** 3654 * rb_update_event - update event type and data 3655 * @cpu_buffer: The per cpu buffer of the @event 3656 * @event: the event to update 3657 * @info: The info to update the @event with (contains length and delta) 3658 * 3659 * Update the type and data fields of the @event. The length 3660 * is the actual size that is written to the ring buffer, 3661 * and with this, we can determine what to place into the 3662 * data field. 3663 */ 3664 static void 3665 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 3666 struct ring_buffer_event *event, 3667 struct rb_event_info *info) 3668 { 3669 unsigned length = info->length; 3670 u64 delta = info->delta; 3671 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 3672 3673 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 3674 cpu_buffer->event_stamp[nest] = info->ts; 3675 3676 /* 3677 * If we need to add a timestamp, then we 3678 * add it to the start of the reserved space. 3679 */ 3680 if (unlikely(info->add_timestamp)) 3681 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 3682 3683 event->time_delta = delta; 3684 length -= RB_EVNT_HDR_SIZE; 3685 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 3686 event->type_len = 0; 3687 event->array[0] = length; 3688 } else 3689 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 3690 } 3691 3692 static unsigned rb_calculate_event_length(unsigned length) 3693 { 3694 struct ring_buffer_event event; /* Used only for sizeof array */ 3695 3696 /* zero length can cause confusions */ 3697 if (!length) 3698 length++; 3699 3700 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 3701 length += sizeof(event.array[0]); 3702 3703 length += RB_EVNT_HDR_SIZE; 3704 length = ALIGN(length, RB_ARCH_ALIGNMENT); 3705 3706 /* 3707 * In case the time delta is larger than the 27 bits for it 3708 * in the header, we need to add a timestamp. If another 3709 * event comes in when trying to discard this one to increase 3710 * the length, then the timestamp will be added in the allocated 3711 * space of this event. If length is bigger than the size needed 3712 * for the TIME_EXTEND, then padding has to be used. The events 3713 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 3714 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 3715 * As length is a multiple of 4, we only need to worry if it 3716 * is 12 (RB_LEN_TIME_EXTEND + 4). 3717 */ 3718 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 3719 length += RB_ALIGNMENT; 3720 3721 return length; 3722 } 3723 3724 static inline bool 3725 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 3726 struct ring_buffer_event *event) 3727 { 3728 unsigned long new_index, old_index; 3729 struct buffer_page *bpage; 3730 unsigned long addr; 3731 3732 new_index = rb_event_index(cpu_buffer, event); 3733 old_index = new_index + rb_event_ts_length(event); 3734 addr = (unsigned long)event; 3735 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3736 3737 bpage = READ_ONCE(cpu_buffer->tail_page); 3738 3739 /* 3740 * Make sure the tail_page is still the same and 3741 * the next write location is the end of this event 3742 */ 3743 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3744 unsigned long write_mask = 3745 local_read(&bpage->write) & ~RB_WRITE_MASK; 3746 unsigned long event_length = rb_event_length(event); 3747 3748 /* 3749 * For the before_stamp to be different than the write_stamp 3750 * to make sure that the next event adds an absolute 3751 * value and does not rely on the saved write stamp, which 3752 * is now going to be bogus. 3753 * 3754 * By setting the before_stamp to zero, the next event 3755 * is not going to use the write_stamp and will instead 3756 * create an absolute timestamp. This means there's no 3757 * reason to update the wirte_stamp! 3758 */ 3759 rb_time_set(&cpu_buffer->before_stamp, 0); 3760 3761 /* 3762 * If an event were to come in now, it would see that the 3763 * write_stamp and the before_stamp are different, and assume 3764 * that this event just added itself before updating 3765 * the write stamp. The interrupting event will fix the 3766 * write stamp for us, and use an absolute timestamp. 3767 */ 3768 3769 /* 3770 * This is on the tail page. It is possible that 3771 * a write could come in and move the tail page 3772 * and write to the next page. That is fine 3773 * because we just shorten what is on this page. 3774 */ 3775 old_index += write_mask; 3776 new_index += write_mask; 3777 3778 /* caution: old_index gets updated on cmpxchg failure */ 3779 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 3780 /* update counters */ 3781 local_sub(event_length, &cpu_buffer->entries_bytes); 3782 return true; 3783 } 3784 } 3785 3786 /* could not discard */ 3787 return false; 3788 } 3789 3790 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3791 { 3792 local_inc(&cpu_buffer->committing); 3793 local_inc(&cpu_buffer->commits); 3794 } 3795 3796 static __always_inline void 3797 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3798 { 3799 unsigned long max_count; 3800 3801 /* 3802 * We only race with interrupts and NMIs on this CPU. 3803 * If we own the commit event, then we can commit 3804 * all others that interrupted us, since the interruptions 3805 * are in stack format (they finish before they come 3806 * back to us). This allows us to do a simple loop to 3807 * assign the commit to the tail. 3808 */ 3809 again: 3810 max_count = cpu_buffer->nr_pages * 100; 3811 3812 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3813 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3814 return; 3815 if (RB_WARN_ON(cpu_buffer, 3816 rb_is_reader_page(cpu_buffer->tail_page))) 3817 return; 3818 /* 3819 * No need for a memory barrier here, as the update 3820 * of the tail_page did it for this page. 3821 */ 3822 local_set(&cpu_buffer->commit_page->page->commit, 3823 rb_page_write(cpu_buffer->commit_page)); 3824 rb_inc_page(&cpu_buffer->commit_page); 3825 if (cpu_buffer->ring_meta) { 3826 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3827 meta->commit_buffer = (unsigned long)cpu_buffer->commit_page->page; 3828 } 3829 /* add barrier to keep gcc from optimizing too much */ 3830 barrier(); 3831 } 3832 while (rb_commit_index(cpu_buffer) != 3833 rb_page_write(cpu_buffer->commit_page)) { 3834 3835 /* Make sure the readers see the content of what is committed. */ 3836 smp_wmb(); 3837 local_set(&cpu_buffer->commit_page->page->commit, 3838 rb_page_write(cpu_buffer->commit_page)); 3839 RB_WARN_ON(cpu_buffer, 3840 local_read(&cpu_buffer->commit_page->page->commit) & 3841 ~RB_WRITE_MASK); 3842 barrier(); 3843 } 3844 3845 /* again, keep gcc from optimizing */ 3846 barrier(); 3847 3848 /* 3849 * If an interrupt came in just after the first while loop 3850 * and pushed the tail page forward, we will be left with 3851 * a dangling commit that will never go forward. 3852 */ 3853 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3854 goto again; 3855 } 3856 3857 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3858 { 3859 unsigned long commits; 3860 3861 if (RB_WARN_ON(cpu_buffer, 3862 !local_read(&cpu_buffer->committing))) 3863 return; 3864 3865 again: 3866 commits = local_read(&cpu_buffer->commits); 3867 /* synchronize with interrupts */ 3868 barrier(); 3869 if (local_read(&cpu_buffer->committing) == 1) 3870 rb_set_commit_to_write(cpu_buffer); 3871 3872 local_dec(&cpu_buffer->committing); 3873 3874 /* synchronize with interrupts */ 3875 barrier(); 3876 3877 /* 3878 * Need to account for interrupts coming in between the 3879 * updating of the commit page and the clearing of the 3880 * committing counter. 3881 */ 3882 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3883 !local_read(&cpu_buffer->committing)) { 3884 local_inc(&cpu_buffer->committing); 3885 goto again; 3886 } 3887 } 3888 3889 static inline void rb_event_discard(struct ring_buffer_event *event) 3890 { 3891 if (extended_time(event)) 3892 event = skip_time_extend(event); 3893 3894 /* array[0] holds the actual length for the discarded event */ 3895 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 3896 event->type_len = RINGBUF_TYPE_PADDING; 3897 /* time delta must be non zero */ 3898 if (!event->time_delta) 3899 event->time_delta = 1; 3900 } 3901 3902 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 3903 { 3904 local_inc(&cpu_buffer->entries); 3905 rb_end_commit(cpu_buffer); 3906 } 3907 3908 static __always_inline void 3909 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 3910 { 3911 if (buffer->irq_work.waiters_pending) { 3912 buffer->irq_work.waiters_pending = false; 3913 /* irq_work_queue() supplies it's own memory barriers */ 3914 irq_work_queue(&buffer->irq_work.work); 3915 } 3916 3917 if (cpu_buffer->irq_work.waiters_pending) { 3918 cpu_buffer->irq_work.waiters_pending = false; 3919 /* irq_work_queue() supplies it's own memory barriers */ 3920 irq_work_queue(&cpu_buffer->irq_work.work); 3921 } 3922 3923 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 3924 return; 3925 3926 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 3927 return; 3928 3929 if (!cpu_buffer->irq_work.full_waiters_pending) 3930 return; 3931 3932 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 3933 3934 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 3935 return; 3936 3937 cpu_buffer->irq_work.wakeup_full = true; 3938 cpu_buffer->irq_work.full_waiters_pending = false; 3939 /* irq_work_queue() supplies it's own memory barriers */ 3940 irq_work_queue(&cpu_buffer->irq_work.work); 3941 } 3942 3943 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 3944 # define do_ring_buffer_record_recursion() \ 3945 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 3946 #else 3947 # define do_ring_buffer_record_recursion() do { } while (0) 3948 #endif 3949 3950 /* 3951 * The lock and unlock are done within a preempt disable section. 3952 * The current_context per_cpu variable can only be modified 3953 * by the current task between lock and unlock. But it can 3954 * be modified more than once via an interrupt. To pass this 3955 * information from the lock to the unlock without having to 3956 * access the 'in_interrupt()' functions again (which do show 3957 * a bit of overhead in something as critical as function tracing, 3958 * we use a bitmask trick. 3959 * 3960 * bit 1 = NMI context 3961 * bit 2 = IRQ context 3962 * bit 3 = SoftIRQ context 3963 * bit 4 = normal context. 3964 * 3965 * This works because this is the order of contexts that can 3966 * preempt other contexts. A SoftIRQ never preempts an IRQ 3967 * context. 3968 * 3969 * When the context is determined, the corresponding bit is 3970 * checked and set (if it was set, then a recursion of that context 3971 * happened). 3972 * 3973 * On unlock, we need to clear this bit. To do so, just subtract 3974 * 1 from the current_context and AND it to itself. 3975 * 3976 * (binary) 3977 * 101 - 1 = 100 3978 * 101 & 100 = 100 (clearing bit zero) 3979 * 3980 * 1010 - 1 = 1001 3981 * 1010 & 1001 = 1000 (clearing bit 1) 3982 * 3983 * The least significant bit can be cleared this way, and it 3984 * just so happens that it is the same bit corresponding to 3985 * the current context. 3986 * 3987 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3988 * is set when a recursion is detected at the current context, and if 3989 * the TRANSITION bit is already set, it will fail the recursion. 3990 * This is needed because there's a lag between the changing of 3991 * interrupt context and updating the preempt count. In this case, 3992 * a false positive will be found. To handle this, one extra recursion 3993 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3994 * bit is already set, then it is considered a recursion and the function 3995 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3996 * 3997 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3998 * to be cleared. Even if it wasn't the context that set it. That is, 3999 * if an interrupt comes in while NORMAL bit is set and the ring buffer 4000 * is called before preempt_count() is updated, since the check will 4001 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 4002 * NMI then comes in, it will set the NMI bit, but when the NMI code 4003 * does the trace_recursive_unlock() it will clear the TRANSITION bit 4004 * and leave the NMI bit set. But this is fine, because the interrupt 4005 * code that set the TRANSITION bit will then clear the NMI bit when it 4006 * calls trace_recursive_unlock(). If another NMI comes in, it will 4007 * set the TRANSITION bit and continue. 4008 * 4009 * Note: The TRANSITION bit only handles a single transition between context. 4010 */ 4011 4012 static __always_inline bool 4013 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 4014 { 4015 unsigned int val = cpu_buffer->current_context; 4016 int bit = interrupt_context_level(); 4017 4018 bit = RB_CTX_NORMAL - bit; 4019 4020 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 4021 /* 4022 * It is possible that this was called by transitioning 4023 * between interrupt context, and preempt_count() has not 4024 * been updated yet. In this case, use the TRANSITION bit. 4025 */ 4026 bit = RB_CTX_TRANSITION; 4027 if (val & (1 << (bit + cpu_buffer->nest))) { 4028 do_ring_buffer_record_recursion(); 4029 return true; 4030 } 4031 } 4032 4033 val |= (1 << (bit + cpu_buffer->nest)); 4034 cpu_buffer->current_context = val; 4035 4036 return false; 4037 } 4038 4039 static __always_inline void 4040 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 4041 { 4042 cpu_buffer->current_context &= 4043 cpu_buffer->current_context - (1 << cpu_buffer->nest); 4044 } 4045 4046 /* The recursive locking above uses 5 bits */ 4047 #define NESTED_BITS 5 4048 4049 /** 4050 * ring_buffer_nest_start - Allow to trace while nested 4051 * @buffer: The ring buffer to modify 4052 * 4053 * The ring buffer has a safety mechanism to prevent recursion. 4054 * But there may be a case where a trace needs to be done while 4055 * tracing something else. In this case, calling this function 4056 * will allow this function to nest within a currently active 4057 * ring_buffer_lock_reserve(). 4058 * 4059 * Call this function before calling another ring_buffer_lock_reserve() and 4060 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 4061 */ 4062 void ring_buffer_nest_start(struct trace_buffer *buffer) 4063 { 4064 struct ring_buffer_per_cpu *cpu_buffer; 4065 int cpu; 4066 4067 /* Enabled by ring_buffer_nest_end() */ 4068 preempt_disable_notrace(); 4069 cpu = raw_smp_processor_id(); 4070 cpu_buffer = buffer->buffers[cpu]; 4071 /* This is the shift value for the above recursive locking */ 4072 cpu_buffer->nest += NESTED_BITS; 4073 } 4074 4075 /** 4076 * ring_buffer_nest_end - Allow to trace while nested 4077 * @buffer: The ring buffer to modify 4078 * 4079 * Must be called after ring_buffer_nest_start() and after the 4080 * ring_buffer_unlock_commit(). 4081 */ 4082 void ring_buffer_nest_end(struct trace_buffer *buffer) 4083 { 4084 struct ring_buffer_per_cpu *cpu_buffer; 4085 int cpu; 4086 4087 /* disabled by ring_buffer_nest_start() */ 4088 cpu = raw_smp_processor_id(); 4089 cpu_buffer = buffer->buffers[cpu]; 4090 /* This is the shift value for the above recursive locking */ 4091 cpu_buffer->nest -= NESTED_BITS; 4092 preempt_enable_notrace(); 4093 } 4094 4095 /** 4096 * ring_buffer_unlock_commit - commit a reserved 4097 * @buffer: The buffer to commit to 4098 * 4099 * This commits the data to the ring buffer, and releases any locks held. 4100 * 4101 * Must be paired with ring_buffer_lock_reserve. 4102 */ 4103 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 4104 { 4105 struct ring_buffer_per_cpu *cpu_buffer; 4106 int cpu = raw_smp_processor_id(); 4107 4108 cpu_buffer = buffer->buffers[cpu]; 4109 4110 rb_commit(cpu_buffer); 4111 4112 rb_wakeups(buffer, cpu_buffer); 4113 4114 trace_recursive_unlock(cpu_buffer); 4115 4116 preempt_enable_notrace(); 4117 4118 return 0; 4119 } 4120 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 4121 4122 /* Special value to validate all deltas on a page. */ 4123 #define CHECK_FULL_PAGE 1L 4124 4125 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 4126 4127 static const char *show_irq_str(int bits) 4128 { 4129 const char *type[] = { 4130 ".", // 0 4131 "s", // 1 4132 "h", // 2 4133 "Hs", // 3 4134 "n", // 4 4135 "Ns", // 5 4136 "Nh", // 6 4137 "NHs", // 7 4138 }; 4139 4140 return type[bits]; 4141 } 4142 4143 /* Assume this is a trace event */ 4144 static const char *show_flags(struct ring_buffer_event *event) 4145 { 4146 struct trace_entry *entry; 4147 int bits = 0; 4148 4149 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4150 return "X"; 4151 4152 entry = ring_buffer_event_data(event); 4153 4154 if (entry->flags & TRACE_FLAG_SOFTIRQ) 4155 bits |= 1; 4156 4157 if (entry->flags & TRACE_FLAG_HARDIRQ) 4158 bits |= 2; 4159 4160 if (entry->flags & TRACE_FLAG_NMI) 4161 bits |= 4; 4162 4163 return show_irq_str(bits); 4164 } 4165 4166 static const char *show_irq(struct ring_buffer_event *event) 4167 { 4168 struct trace_entry *entry; 4169 4170 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4171 return ""; 4172 4173 entry = ring_buffer_event_data(event); 4174 if (entry->flags & TRACE_FLAG_IRQS_OFF) 4175 return "d"; 4176 return ""; 4177 } 4178 4179 static const char *show_interrupt_level(void) 4180 { 4181 unsigned long pc = preempt_count(); 4182 unsigned char level = 0; 4183 4184 if (pc & SOFTIRQ_OFFSET) 4185 level |= 1; 4186 4187 if (pc & HARDIRQ_MASK) 4188 level |= 2; 4189 4190 if (pc & NMI_MASK) 4191 level |= 4; 4192 4193 return show_irq_str(level); 4194 } 4195 4196 static void dump_buffer_page(struct buffer_data_page *bpage, 4197 struct rb_event_info *info, 4198 unsigned long tail) 4199 { 4200 struct ring_buffer_event *event; 4201 u64 ts, delta; 4202 int e; 4203 4204 ts = bpage->time_stamp; 4205 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 4206 4207 for (e = 0; e < tail; e += rb_event_length(event)) { 4208 4209 event = (struct ring_buffer_event *)(bpage->data + e); 4210 4211 switch (event->type_len) { 4212 4213 case RINGBUF_TYPE_TIME_EXTEND: 4214 delta = rb_event_time_stamp(event); 4215 ts += delta; 4216 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 4217 e, ts, delta); 4218 break; 4219 4220 case RINGBUF_TYPE_TIME_STAMP: 4221 delta = rb_event_time_stamp(event); 4222 ts = rb_fix_abs_ts(delta, ts); 4223 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 4224 e, ts, delta); 4225 break; 4226 4227 case RINGBUF_TYPE_PADDING: 4228 ts += event->time_delta; 4229 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 4230 e, ts, event->time_delta); 4231 break; 4232 4233 case RINGBUF_TYPE_DATA: 4234 ts += event->time_delta; 4235 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 4236 e, ts, event->time_delta, 4237 show_flags(event), show_irq(event)); 4238 break; 4239 4240 default: 4241 break; 4242 } 4243 } 4244 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 4245 } 4246 4247 static DEFINE_PER_CPU(atomic_t, checking); 4248 static atomic_t ts_dump; 4249 4250 #define buffer_warn_return(fmt, ...) \ 4251 do { \ 4252 /* If another report is happening, ignore this one */ \ 4253 if (atomic_inc_return(&ts_dump) != 1) { \ 4254 atomic_dec(&ts_dump); \ 4255 goto out; \ 4256 } \ 4257 atomic_inc(&cpu_buffer->record_disabled); \ 4258 pr_warn(fmt, ##__VA_ARGS__); \ 4259 dump_buffer_page(bpage, info, tail); \ 4260 atomic_dec(&ts_dump); \ 4261 /* There's some cases in boot up that this can happen */ \ 4262 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 4263 /* Do not re-enable checking */ \ 4264 return; \ 4265 } while (0) 4266 4267 /* 4268 * Check if the current event time stamp matches the deltas on 4269 * the buffer page. 4270 */ 4271 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4272 struct rb_event_info *info, 4273 unsigned long tail) 4274 { 4275 struct buffer_data_page *bpage; 4276 u64 ts, delta; 4277 bool full = false; 4278 int ret; 4279 4280 bpage = info->tail_page->page; 4281 4282 if (tail == CHECK_FULL_PAGE) { 4283 full = true; 4284 tail = local_read(&bpage->commit); 4285 } else if (info->add_timestamp & 4286 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 4287 /* Ignore events with absolute time stamps */ 4288 return; 4289 } 4290 4291 /* 4292 * Do not check the first event (skip possible extends too). 4293 * Also do not check if previous events have not been committed. 4294 */ 4295 if (tail <= 8 || tail > local_read(&bpage->commit)) 4296 return; 4297 4298 /* 4299 * If this interrupted another event, 4300 */ 4301 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 4302 goto out; 4303 4304 ret = rb_read_data_buffer(bpage, tail, cpu_buffer->cpu, &ts, &delta); 4305 if (ret < 0) { 4306 if (delta < ts) { 4307 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 4308 cpu_buffer->cpu, ts, delta); 4309 goto out; 4310 } 4311 } 4312 if ((full && ts > info->ts) || 4313 (!full && ts + info->delta != info->ts)) { 4314 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 4315 cpu_buffer->cpu, 4316 ts + info->delta, info->ts, info->delta, 4317 info->before, info->after, 4318 full ? " (full)" : "", show_interrupt_level()); 4319 } 4320 out: 4321 atomic_dec(this_cpu_ptr(&checking)); 4322 } 4323 #else 4324 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4325 struct rb_event_info *info, 4326 unsigned long tail) 4327 { 4328 } 4329 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 4330 4331 static struct ring_buffer_event * 4332 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 4333 struct rb_event_info *info) 4334 { 4335 struct ring_buffer_event *event; 4336 struct buffer_page *tail_page; 4337 unsigned long tail, write, w; 4338 4339 /* Don't let the compiler play games with cpu_buffer->tail_page */ 4340 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 4341 4342 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 4343 barrier(); 4344 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4345 rb_time_read(&cpu_buffer->write_stamp, &info->after); 4346 barrier(); 4347 info->ts = rb_time_stamp(cpu_buffer->buffer); 4348 4349 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 4350 info->delta = info->ts; 4351 } else { 4352 /* 4353 * If interrupting an event time update, we may need an 4354 * absolute timestamp. 4355 * Don't bother if this is the start of a new page (w == 0). 4356 */ 4357 if (!w) { 4358 /* Use the sub-buffer timestamp */ 4359 info->delta = 0; 4360 } else if (unlikely(info->before != info->after)) { 4361 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 4362 info->length += RB_LEN_TIME_EXTEND; 4363 } else { 4364 info->delta = info->ts - info->after; 4365 if (unlikely(test_time_stamp(info->delta))) { 4366 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 4367 info->length += RB_LEN_TIME_EXTEND; 4368 } 4369 } 4370 } 4371 4372 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 4373 4374 /*C*/ write = local_add_return(info->length, &tail_page->write); 4375 4376 /* set write to only the index of the write */ 4377 write &= RB_WRITE_MASK; 4378 4379 tail = write - info->length; 4380 4381 /* See if we shot pass the end of this buffer page */ 4382 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 4383 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 4384 return rb_move_tail(cpu_buffer, tail, info); 4385 } 4386 4387 if (likely(tail == w)) { 4388 /* Nothing interrupted us between A and C */ 4389 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 4390 /* 4391 * If something came in between C and D, the write stamp 4392 * may now not be in sync. But that's fine as the before_stamp 4393 * will be different and then next event will just be forced 4394 * to use an absolute timestamp. 4395 */ 4396 if (likely(!(info->add_timestamp & 4397 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4398 /* This did not interrupt any time update */ 4399 info->delta = info->ts - info->after; 4400 else 4401 /* Just use full timestamp for interrupting event */ 4402 info->delta = info->ts; 4403 check_buffer(cpu_buffer, info, tail); 4404 } else { 4405 u64 ts; 4406 /* SLOW PATH - Interrupted between A and C */ 4407 4408 /* Save the old before_stamp */ 4409 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4410 4411 /* 4412 * Read a new timestamp and update the before_stamp to make 4413 * the next event after this one force using an absolute 4414 * timestamp. This is in case an interrupt were to come in 4415 * between E and F. 4416 */ 4417 ts = rb_time_stamp(cpu_buffer->buffer); 4418 rb_time_set(&cpu_buffer->before_stamp, ts); 4419 4420 barrier(); 4421 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 4422 barrier(); 4423 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 4424 info->after == info->before && info->after < ts) { 4425 /* 4426 * Nothing came after this event between C and F, it is 4427 * safe to use info->after for the delta as it 4428 * matched info->before and is still valid. 4429 */ 4430 info->delta = ts - info->after; 4431 } else { 4432 /* 4433 * Interrupted between C and F: 4434 * Lost the previous events time stamp. Just set the 4435 * delta to zero, and this will be the same time as 4436 * the event this event interrupted. And the events that 4437 * came after this will still be correct (as they would 4438 * have built their delta on the previous event. 4439 */ 4440 info->delta = 0; 4441 } 4442 info->ts = ts; 4443 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 4444 } 4445 4446 /* 4447 * If this is the first commit on the page, then it has the same 4448 * timestamp as the page itself. 4449 */ 4450 if (unlikely(!tail && !(info->add_timestamp & 4451 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4452 info->delta = 0; 4453 4454 /* We reserved something on the buffer */ 4455 4456 event = __rb_page_index(tail_page, tail); 4457 rb_update_event(cpu_buffer, event, info); 4458 4459 local_inc(&tail_page->entries); 4460 4461 /* 4462 * If this is the first commit on the page, then update 4463 * its timestamp. 4464 */ 4465 if (unlikely(!tail)) 4466 tail_page->page->time_stamp = info->ts; 4467 4468 /* account for these added bytes */ 4469 local_add(info->length, &cpu_buffer->entries_bytes); 4470 4471 return event; 4472 } 4473 4474 static __always_inline struct ring_buffer_event * 4475 rb_reserve_next_event(struct trace_buffer *buffer, 4476 struct ring_buffer_per_cpu *cpu_buffer, 4477 unsigned long length) 4478 { 4479 struct ring_buffer_event *event; 4480 struct rb_event_info info; 4481 int nr_loops = 0; 4482 int add_ts_default; 4483 4484 /* 4485 * ring buffer does cmpxchg as well as atomic64 operations 4486 * (which some archs use locking for atomic64), make sure this 4487 * is safe in NMI context 4488 */ 4489 if ((!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) || 4490 IS_ENABLED(CONFIG_GENERIC_ATOMIC64)) && 4491 (unlikely(in_nmi()))) { 4492 return NULL; 4493 } 4494 4495 rb_start_commit(cpu_buffer); 4496 /* The commit page can not change after this */ 4497 4498 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4499 /* 4500 * Due to the ability to swap a cpu buffer from a buffer 4501 * it is possible it was swapped before we committed. 4502 * (committing stops a swap). We check for it here and 4503 * if it happened, we have to fail the write. 4504 */ 4505 barrier(); 4506 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 4507 local_dec(&cpu_buffer->committing); 4508 local_dec(&cpu_buffer->commits); 4509 return NULL; 4510 } 4511 #endif 4512 4513 info.length = rb_calculate_event_length(length); 4514 4515 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 4516 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 4517 info.length += RB_LEN_TIME_EXTEND; 4518 if (info.length > cpu_buffer->buffer->max_data_size) 4519 goto out_fail; 4520 } else { 4521 add_ts_default = RB_ADD_STAMP_NONE; 4522 } 4523 4524 again: 4525 info.add_timestamp = add_ts_default; 4526 info.delta = 0; 4527 4528 /* 4529 * We allow for interrupts to reenter here and do a trace. 4530 * If one does, it will cause this original code to loop 4531 * back here. Even with heavy interrupts happening, this 4532 * should only happen a few times in a row. If this happens 4533 * 1000 times in a row, there must be either an interrupt 4534 * storm or we have something buggy. 4535 * Bail! 4536 */ 4537 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 4538 goto out_fail; 4539 4540 event = __rb_reserve_next(cpu_buffer, &info); 4541 4542 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 4543 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 4544 info.length -= RB_LEN_TIME_EXTEND; 4545 goto again; 4546 } 4547 4548 if (likely(event)) 4549 return event; 4550 out_fail: 4551 rb_end_commit(cpu_buffer); 4552 return NULL; 4553 } 4554 4555 /** 4556 * ring_buffer_lock_reserve - reserve a part of the buffer 4557 * @buffer: the ring buffer to reserve from 4558 * @length: the length of the data to reserve (excluding event header) 4559 * 4560 * Returns a reserved event on the ring buffer to copy directly to. 4561 * The user of this interface will need to get the body to write into 4562 * and can use the ring_buffer_event_data() interface. 4563 * 4564 * The length is the length of the data needed, not the event length 4565 * which also includes the event header. 4566 * 4567 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 4568 * If NULL is returned, then nothing has been allocated or locked. 4569 */ 4570 struct ring_buffer_event * 4571 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 4572 { 4573 struct ring_buffer_per_cpu *cpu_buffer; 4574 struct ring_buffer_event *event; 4575 int cpu; 4576 4577 /* If we are tracing schedule, we don't want to recurse */ 4578 preempt_disable_notrace(); 4579 4580 if (unlikely(atomic_read(&buffer->record_disabled))) 4581 goto out; 4582 4583 cpu = raw_smp_processor_id(); 4584 4585 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 4586 goto out; 4587 4588 cpu_buffer = buffer->buffers[cpu]; 4589 4590 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 4591 goto out; 4592 4593 if (unlikely(length > buffer->max_data_size)) 4594 goto out; 4595 4596 if (unlikely(trace_recursive_lock(cpu_buffer))) 4597 goto out; 4598 4599 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4600 if (!event) 4601 goto out_unlock; 4602 4603 return event; 4604 4605 out_unlock: 4606 trace_recursive_unlock(cpu_buffer); 4607 out: 4608 preempt_enable_notrace(); 4609 return NULL; 4610 } 4611 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 4612 4613 /* 4614 * Decrement the entries to the page that an event is on. 4615 * The event does not even need to exist, only the pointer 4616 * to the page it is on. This may only be called before the commit 4617 * takes place. 4618 */ 4619 static inline void 4620 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 4621 struct ring_buffer_event *event) 4622 { 4623 unsigned long addr = (unsigned long)event; 4624 struct buffer_page *bpage = cpu_buffer->commit_page; 4625 struct buffer_page *start; 4626 4627 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 4628 4629 /* Do the likely case first */ 4630 if (likely(bpage->page == (void *)addr)) { 4631 local_dec(&bpage->entries); 4632 return; 4633 } 4634 4635 /* 4636 * Because the commit page may be on the reader page we 4637 * start with the next page and check the end loop there. 4638 */ 4639 rb_inc_page(&bpage); 4640 start = bpage; 4641 do { 4642 if (bpage->page == (void *)addr) { 4643 local_dec(&bpage->entries); 4644 return; 4645 } 4646 rb_inc_page(&bpage); 4647 } while (bpage != start); 4648 4649 /* commit not part of this buffer?? */ 4650 RB_WARN_ON(cpu_buffer, 1); 4651 } 4652 4653 /** 4654 * ring_buffer_discard_commit - discard an event that has not been committed 4655 * @buffer: the ring buffer 4656 * @event: non committed event to discard 4657 * 4658 * Sometimes an event that is in the ring buffer needs to be ignored. 4659 * This function lets the user discard an event in the ring buffer 4660 * and then that event will not be read later. 4661 * 4662 * This function only works if it is called before the item has been 4663 * committed. It will try to free the event from the ring buffer 4664 * if another event has not been added behind it. 4665 * 4666 * If another event has been added behind it, it will set the event 4667 * up as discarded, and perform the commit. 4668 * 4669 * If this function is called, do not call ring_buffer_unlock_commit on 4670 * the event. 4671 */ 4672 void ring_buffer_discard_commit(struct trace_buffer *buffer, 4673 struct ring_buffer_event *event) 4674 { 4675 struct ring_buffer_per_cpu *cpu_buffer; 4676 int cpu; 4677 4678 /* The event is discarded regardless */ 4679 rb_event_discard(event); 4680 4681 cpu = smp_processor_id(); 4682 cpu_buffer = buffer->buffers[cpu]; 4683 4684 /* 4685 * This must only be called if the event has not been 4686 * committed yet. Thus we can assume that preemption 4687 * is still disabled. 4688 */ 4689 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 4690 4691 rb_decrement_entry(cpu_buffer, event); 4692 if (rb_try_to_discard(cpu_buffer, event)) 4693 goto out; 4694 4695 out: 4696 rb_end_commit(cpu_buffer); 4697 4698 trace_recursive_unlock(cpu_buffer); 4699 4700 preempt_enable_notrace(); 4701 4702 } 4703 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 4704 4705 /** 4706 * ring_buffer_write - write data to the buffer without reserving 4707 * @buffer: The ring buffer to write to. 4708 * @length: The length of the data being written (excluding the event header) 4709 * @data: The data to write to the buffer. 4710 * 4711 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 4712 * one function. If you already have the data to write to the buffer, it 4713 * may be easier to simply call this function. 4714 * 4715 * Note, like ring_buffer_lock_reserve, the length is the length of the data 4716 * and not the length of the event which would hold the header. 4717 */ 4718 int ring_buffer_write(struct trace_buffer *buffer, 4719 unsigned long length, 4720 void *data) 4721 { 4722 struct ring_buffer_per_cpu *cpu_buffer; 4723 struct ring_buffer_event *event; 4724 void *body; 4725 int ret = -EBUSY; 4726 int cpu; 4727 4728 preempt_disable_notrace(); 4729 4730 if (atomic_read(&buffer->record_disabled)) 4731 goto out; 4732 4733 cpu = raw_smp_processor_id(); 4734 4735 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4736 goto out; 4737 4738 cpu_buffer = buffer->buffers[cpu]; 4739 4740 if (atomic_read(&cpu_buffer->record_disabled)) 4741 goto out; 4742 4743 if (length > buffer->max_data_size) 4744 goto out; 4745 4746 if (unlikely(trace_recursive_lock(cpu_buffer))) 4747 goto out; 4748 4749 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4750 if (!event) 4751 goto out_unlock; 4752 4753 body = rb_event_data(event); 4754 4755 memcpy(body, data, length); 4756 4757 rb_commit(cpu_buffer); 4758 4759 rb_wakeups(buffer, cpu_buffer); 4760 4761 ret = 0; 4762 4763 out_unlock: 4764 trace_recursive_unlock(cpu_buffer); 4765 4766 out: 4767 preempt_enable_notrace(); 4768 4769 return ret; 4770 } 4771 EXPORT_SYMBOL_GPL(ring_buffer_write); 4772 4773 /* 4774 * The total entries in the ring buffer is the running counter 4775 * of entries entered into the ring buffer, minus the sum of 4776 * the entries read from the ring buffer and the number of 4777 * entries that were overwritten. 4778 */ 4779 static inline unsigned long 4780 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4781 { 4782 return local_read(&cpu_buffer->entries) - 4783 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4784 } 4785 4786 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 4787 { 4788 return !rb_num_of_entries(cpu_buffer); 4789 } 4790 4791 /** 4792 * ring_buffer_record_disable - stop all writes into the buffer 4793 * @buffer: The ring buffer to stop writes to. 4794 * 4795 * This prevents all writes to the buffer. Any attempt to write 4796 * to the buffer after this will fail and return NULL. 4797 * 4798 * The caller should call synchronize_rcu() after this. 4799 */ 4800 void ring_buffer_record_disable(struct trace_buffer *buffer) 4801 { 4802 atomic_inc(&buffer->record_disabled); 4803 } 4804 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4805 4806 /** 4807 * ring_buffer_record_enable - enable writes to the buffer 4808 * @buffer: The ring buffer to enable writes 4809 * 4810 * Note, multiple disables will need the same number of enables 4811 * to truly enable the writing (much like preempt_disable). 4812 */ 4813 void ring_buffer_record_enable(struct trace_buffer *buffer) 4814 { 4815 atomic_dec(&buffer->record_disabled); 4816 } 4817 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4818 4819 /** 4820 * ring_buffer_record_off - stop all writes into the buffer 4821 * @buffer: The ring buffer to stop writes to. 4822 * 4823 * This prevents all writes to the buffer. Any attempt to write 4824 * to the buffer after this will fail and return NULL. 4825 * 4826 * This is different than ring_buffer_record_disable() as 4827 * it works like an on/off switch, where as the disable() version 4828 * must be paired with a enable(). 4829 */ 4830 void ring_buffer_record_off(struct trace_buffer *buffer) 4831 { 4832 unsigned int rd; 4833 unsigned int new_rd; 4834 4835 rd = atomic_read(&buffer->record_disabled); 4836 do { 4837 new_rd = rd | RB_BUFFER_OFF; 4838 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4839 } 4840 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4841 4842 /** 4843 * ring_buffer_record_on - restart writes into the buffer 4844 * @buffer: The ring buffer to start writes to. 4845 * 4846 * This enables all writes to the buffer that was disabled by 4847 * ring_buffer_record_off(). 4848 * 4849 * This is different than ring_buffer_record_enable() as 4850 * it works like an on/off switch, where as the enable() version 4851 * must be paired with a disable(). 4852 */ 4853 void ring_buffer_record_on(struct trace_buffer *buffer) 4854 { 4855 unsigned int rd; 4856 unsigned int new_rd; 4857 4858 rd = atomic_read(&buffer->record_disabled); 4859 do { 4860 new_rd = rd & ~RB_BUFFER_OFF; 4861 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4862 } 4863 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4864 4865 /** 4866 * ring_buffer_record_is_on - return true if the ring buffer can write 4867 * @buffer: The ring buffer to see if write is enabled 4868 * 4869 * Returns true if the ring buffer is in a state that it accepts writes. 4870 */ 4871 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4872 { 4873 return !atomic_read(&buffer->record_disabled); 4874 } 4875 4876 /** 4877 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4878 * @buffer: The ring buffer to see if write is set enabled 4879 * 4880 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4881 * Note that this does NOT mean it is in a writable state. 4882 * 4883 * It may return true when the ring buffer has been disabled by 4884 * ring_buffer_record_disable(), as that is a temporary disabling of 4885 * the ring buffer. 4886 */ 4887 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4888 { 4889 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4890 } 4891 4892 /** 4893 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4894 * @buffer: The ring buffer to stop writes to. 4895 * @cpu: The CPU buffer to stop 4896 * 4897 * This prevents all writes to the buffer. Any attempt to write 4898 * to the buffer after this will fail and return NULL. 4899 * 4900 * The caller should call synchronize_rcu() after this. 4901 */ 4902 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 4903 { 4904 struct ring_buffer_per_cpu *cpu_buffer; 4905 4906 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4907 return; 4908 4909 cpu_buffer = buffer->buffers[cpu]; 4910 atomic_inc(&cpu_buffer->record_disabled); 4911 } 4912 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 4913 4914 /** 4915 * ring_buffer_record_enable_cpu - enable writes to the buffer 4916 * @buffer: The ring buffer to enable writes 4917 * @cpu: The CPU to enable. 4918 * 4919 * Note, multiple disables will need the same number of enables 4920 * to truly enable the writing (much like preempt_disable). 4921 */ 4922 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 4923 { 4924 struct ring_buffer_per_cpu *cpu_buffer; 4925 4926 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4927 return; 4928 4929 cpu_buffer = buffer->buffers[cpu]; 4930 atomic_dec(&cpu_buffer->record_disabled); 4931 } 4932 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 4933 4934 /** 4935 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 4936 * @buffer: The ring buffer 4937 * @cpu: The per CPU buffer to read from. 4938 */ 4939 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 4940 { 4941 unsigned long flags; 4942 struct ring_buffer_per_cpu *cpu_buffer; 4943 struct buffer_page *bpage; 4944 u64 ret = 0; 4945 4946 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4947 return 0; 4948 4949 cpu_buffer = buffer->buffers[cpu]; 4950 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4951 /* 4952 * if the tail is on reader_page, oldest time stamp is on the reader 4953 * page 4954 */ 4955 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 4956 bpage = cpu_buffer->reader_page; 4957 else 4958 bpage = rb_set_head_page(cpu_buffer); 4959 if (bpage) 4960 ret = bpage->page->time_stamp; 4961 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4962 4963 return ret; 4964 } 4965 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 4966 4967 /** 4968 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 4969 * @buffer: The ring buffer 4970 * @cpu: The per CPU buffer to read from. 4971 */ 4972 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 4973 { 4974 struct ring_buffer_per_cpu *cpu_buffer; 4975 unsigned long ret; 4976 4977 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4978 return 0; 4979 4980 cpu_buffer = buffer->buffers[cpu]; 4981 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 4982 4983 return ret; 4984 } 4985 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 4986 4987 /** 4988 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 4989 * @buffer: The ring buffer 4990 * @cpu: The per CPU buffer to get the entries from. 4991 */ 4992 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 4993 { 4994 struct ring_buffer_per_cpu *cpu_buffer; 4995 4996 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4997 return 0; 4998 4999 cpu_buffer = buffer->buffers[cpu]; 5000 5001 return rb_num_of_entries(cpu_buffer); 5002 } 5003 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 5004 5005 /** 5006 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 5007 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 5008 * @buffer: The ring buffer 5009 * @cpu: The per CPU buffer to get the number of overruns from 5010 */ 5011 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 5012 { 5013 struct ring_buffer_per_cpu *cpu_buffer; 5014 unsigned long ret; 5015 5016 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5017 return 0; 5018 5019 cpu_buffer = buffer->buffers[cpu]; 5020 ret = local_read(&cpu_buffer->overrun); 5021 5022 return ret; 5023 } 5024 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 5025 5026 /** 5027 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 5028 * commits failing due to the buffer wrapping around while there are uncommitted 5029 * events, such as during an interrupt storm. 5030 * @buffer: The ring buffer 5031 * @cpu: The per CPU buffer to get the number of overruns from 5032 */ 5033 unsigned long 5034 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 5035 { 5036 struct ring_buffer_per_cpu *cpu_buffer; 5037 unsigned long ret; 5038 5039 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5040 return 0; 5041 5042 cpu_buffer = buffer->buffers[cpu]; 5043 ret = local_read(&cpu_buffer->commit_overrun); 5044 5045 return ret; 5046 } 5047 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 5048 5049 /** 5050 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 5051 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 5052 * @buffer: The ring buffer 5053 * @cpu: The per CPU buffer to get the number of overruns from 5054 */ 5055 unsigned long 5056 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 5057 { 5058 struct ring_buffer_per_cpu *cpu_buffer; 5059 unsigned long ret; 5060 5061 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5062 return 0; 5063 5064 cpu_buffer = buffer->buffers[cpu]; 5065 ret = local_read(&cpu_buffer->dropped_events); 5066 5067 return ret; 5068 } 5069 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 5070 5071 /** 5072 * ring_buffer_read_events_cpu - get the number of events successfully read 5073 * @buffer: The ring buffer 5074 * @cpu: The per CPU buffer to get the number of events read 5075 */ 5076 unsigned long 5077 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 5078 { 5079 struct ring_buffer_per_cpu *cpu_buffer; 5080 5081 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5082 return 0; 5083 5084 cpu_buffer = buffer->buffers[cpu]; 5085 return cpu_buffer->read; 5086 } 5087 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 5088 5089 /** 5090 * ring_buffer_entries - get the number of entries in a buffer 5091 * @buffer: The ring buffer 5092 * 5093 * Returns the total number of entries in the ring buffer 5094 * (all CPU entries) 5095 */ 5096 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 5097 { 5098 struct ring_buffer_per_cpu *cpu_buffer; 5099 unsigned long entries = 0; 5100 int cpu; 5101 5102 /* if you care about this being correct, lock the buffer */ 5103 for_each_buffer_cpu(buffer, cpu) { 5104 cpu_buffer = buffer->buffers[cpu]; 5105 entries += rb_num_of_entries(cpu_buffer); 5106 } 5107 5108 return entries; 5109 } 5110 EXPORT_SYMBOL_GPL(ring_buffer_entries); 5111 5112 /** 5113 * ring_buffer_overruns - get the number of overruns in buffer 5114 * @buffer: The ring buffer 5115 * 5116 * Returns the total number of overruns in the ring buffer 5117 * (all CPU entries) 5118 */ 5119 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 5120 { 5121 struct ring_buffer_per_cpu *cpu_buffer; 5122 unsigned long overruns = 0; 5123 int cpu; 5124 5125 /* if you care about this being correct, lock the buffer */ 5126 for_each_buffer_cpu(buffer, cpu) { 5127 cpu_buffer = buffer->buffers[cpu]; 5128 overruns += local_read(&cpu_buffer->overrun); 5129 } 5130 5131 return overruns; 5132 } 5133 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 5134 5135 static void rb_iter_reset(struct ring_buffer_iter *iter) 5136 { 5137 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5138 5139 /* Iterator usage is expected to have record disabled */ 5140 iter->head_page = cpu_buffer->reader_page; 5141 iter->head = cpu_buffer->reader_page->read; 5142 iter->next_event = iter->head; 5143 5144 iter->cache_reader_page = iter->head_page; 5145 iter->cache_read = cpu_buffer->read; 5146 iter->cache_pages_removed = cpu_buffer->pages_removed; 5147 5148 if (iter->head) { 5149 iter->read_stamp = cpu_buffer->read_stamp; 5150 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 5151 } else { 5152 iter->read_stamp = iter->head_page->page->time_stamp; 5153 iter->page_stamp = iter->read_stamp; 5154 } 5155 } 5156 5157 /** 5158 * ring_buffer_iter_reset - reset an iterator 5159 * @iter: The iterator to reset 5160 * 5161 * Resets the iterator, so that it will start from the beginning 5162 * again. 5163 */ 5164 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 5165 { 5166 struct ring_buffer_per_cpu *cpu_buffer; 5167 unsigned long flags; 5168 5169 if (!iter) 5170 return; 5171 5172 cpu_buffer = iter->cpu_buffer; 5173 5174 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5175 rb_iter_reset(iter); 5176 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5177 } 5178 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 5179 5180 /** 5181 * ring_buffer_iter_empty - check if an iterator has no more to read 5182 * @iter: The iterator to check 5183 */ 5184 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 5185 { 5186 struct ring_buffer_per_cpu *cpu_buffer; 5187 struct buffer_page *reader; 5188 struct buffer_page *head_page; 5189 struct buffer_page *commit_page; 5190 struct buffer_page *curr_commit_page; 5191 unsigned commit; 5192 u64 curr_commit_ts; 5193 u64 commit_ts; 5194 5195 cpu_buffer = iter->cpu_buffer; 5196 reader = cpu_buffer->reader_page; 5197 head_page = cpu_buffer->head_page; 5198 commit_page = READ_ONCE(cpu_buffer->commit_page); 5199 commit_ts = commit_page->page->time_stamp; 5200 5201 /* 5202 * When the writer goes across pages, it issues a cmpxchg which 5203 * is a mb(), which will synchronize with the rmb here. 5204 * (see rb_tail_page_update()) 5205 */ 5206 smp_rmb(); 5207 commit = rb_page_commit(commit_page); 5208 /* We want to make sure that the commit page doesn't change */ 5209 smp_rmb(); 5210 5211 /* Make sure commit page didn't change */ 5212 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 5213 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 5214 5215 /* If the commit page changed, then there's more data */ 5216 if (curr_commit_page != commit_page || 5217 curr_commit_ts != commit_ts) 5218 return 0; 5219 5220 /* Still racy, as it may return a false positive, but that's OK */ 5221 return ((iter->head_page == commit_page && iter->head >= commit) || 5222 (iter->head_page == reader && commit_page == head_page && 5223 head_page->read == commit && 5224 iter->head == rb_page_size(cpu_buffer->reader_page))); 5225 } 5226 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 5227 5228 static void 5229 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 5230 struct ring_buffer_event *event) 5231 { 5232 u64 delta; 5233 5234 switch (event->type_len) { 5235 case RINGBUF_TYPE_PADDING: 5236 return; 5237 5238 case RINGBUF_TYPE_TIME_EXTEND: 5239 delta = rb_event_time_stamp(event); 5240 cpu_buffer->read_stamp += delta; 5241 return; 5242 5243 case RINGBUF_TYPE_TIME_STAMP: 5244 delta = rb_event_time_stamp(event); 5245 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 5246 cpu_buffer->read_stamp = delta; 5247 return; 5248 5249 case RINGBUF_TYPE_DATA: 5250 cpu_buffer->read_stamp += event->time_delta; 5251 return; 5252 5253 default: 5254 RB_WARN_ON(cpu_buffer, 1); 5255 } 5256 } 5257 5258 static void 5259 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 5260 struct ring_buffer_event *event) 5261 { 5262 u64 delta; 5263 5264 switch (event->type_len) { 5265 case RINGBUF_TYPE_PADDING: 5266 return; 5267 5268 case RINGBUF_TYPE_TIME_EXTEND: 5269 delta = rb_event_time_stamp(event); 5270 iter->read_stamp += delta; 5271 return; 5272 5273 case RINGBUF_TYPE_TIME_STAMP: 5274 delta = rb_event_time_stamp(event); 5275 delta = rb_fix_abs_ts(delta, iter->read_stamp); 5276 iter->read_stamp = delta; 5277 return; 5278 5279 case RINGBUF_TYPE_DATA: 5280 iter->read_stamp += event->time_delta; 5281 return; 5282 5283 default: 5284 RB_WARN_ON(iter->cpu_buffer, 1); 5285 } 5286 } 5287 5288 static struct buffer_page * 5289 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5290 { 5291 struct buffer_page *reader = NULL; 5292 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 5293 unsigned long overwrite; 5294 unsigned long flags; 5295 int nr_loops = 0; 5296 bool ret; 5297 5298 local_irq_save(flags); 5299 arch_spin_lock(&cpu_buffer->lock); 5300 5301 again: 5302 /* 5303 * This should normally only loop twice. But because the 5304 * start of the reader inserts an empty page, it causes 5305 * a case where we will loop three times. There should be no 5306 * reason to loop four times (that I know of). 5307 */ 5308 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 5309 reader = NULL; 5310 goto out; 5311 } 5312 5313 reader = cpu_buffer->reader_page; 5314 5315 /* If there's more to read, return this page */ 5316 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 5317 goto out; 5318 5319 /* Never should we have an index greater than the size */ 5320 if (RB_WARN_ON(cpu_buffer, 5321 cpu_buffer->reader_page->read > rb_page_size(reader))) 5322 goto out; 5323 5324 /* check if we caught up to the tail */ 5325 reader = NULL; 5326 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 5327 goto out; 5328 5329 /* Don't bother swapping if the ring buffer is empty */ 5330 if (rb_num_of_entries(cpu_buffer) == 0) 5331 goto out; 5332 5333 /* 5334 * Reset the reader page to size zero. 5335 */ 5336 local_set(&cpu_buffer->reader_page->write, 0); 5337 local_set(&cpu_buffer->reader_page->entries, 0); 5338 local_set(&cpu_buffer->reader_page->page->commit, 0); 5339 cpu_buffer->reader_page->real_end = 0; 5340 5341 spin: 5342 /* 5343 * Splice the empty reader page into the list around the head. 5344 */ 5345 reader = rb_set_head_page(cpu_buffer); 5346 if (!reader) 5347 goto out; 5348 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 5349 cpu_buffer->reader_page->list.prev = reader->list.prev; 5350 5351 /* 5352 * cpu_buffer->pages just needs to point to the buffer, it 5353 * has no specific buffer page to point to. Lets move it out 5354 * of our way so we don't accidentally swap it. 5355 */ 5356 cpu_buffer->pages = reader->list.prev; 5357 5358 /* The reader page will be pointing to the new head */ 5359 rb_set_list_to_head(&cpu_buffer->reader_page->list); 5360 5361 /* 5362 * We want to make sure we read the overruns after we set up our 5363 * pointers to the next object. The writer side does a 5364 * cmpxchg to cross pages which acts as the mb on the writer 5365 * side. Note, the reader will constantly fail the swap 5366 * while the writer is updating the pointers, so this 5367 * guarantees that the overwrite recorded here is the one we 5368 * want to compare with the last_overrun. 5369 */ 5370 smp_mb(); 5371 overwrite = local_read(&(cpu_buffer->overrun)); 5372 5373 /* 5374 * Here's the tricky part. 5375 * 5376 * We need to move the pointer past the header page. 5377 * But we can only do that if a writer is not currently 5378 * moving it. The page before the header page has the 5379 * flag bit '1' set if it is pointing to the page we want. 5380 * but if the writer is in the process of moving it 5381 * than it will be '2' or already moved '0'. 5382 */ 5383 5384 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 5385 5386 /* 5387 * If we did not convert it, then we must try again. 5388 */ 5389 if (!ret) 5390 goto spin; 5391 5392 if (cpu_buffer->ring_meta) 5393 rb_update_meta_reader(cpu_buffer, reader); 5394 5395 /* 5396 * Yay! We succeeded in replacing the page. 5397 * 5398 * Now make the new head point back to the reader page. 5399 */ 5400 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 5401 rb_inc_page(&cpu_buffer->head_page); 5402 5403 cpu_buffer->cnt++; 5404 local_inc(&cpu_buffer->pages_read); 5405 5406 /* Finally update the reader page to the new head */ 5407 cpu_buffer->reader_page = reader; 5408 cpu_buffer->reader_page->read = 0; 5409 5410 if (overwrite != cpu_buffer->last_overrun) { 5411 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 5412 cpu_buffer->last_overrun = overwrite; 5413 } 5414 5415 goto again; 5416 5417 out: 5418 /* Update the read_stamp on the first event */ 5419 if (reader && reader->read == 0) 5420 cpu_buffer->read_stamp = reader->page->time_stamp; 5421 5422 arch_spin_unlock(&cpu_buffer->lock); 5423 local_irq_restore(flags); 5424 5425 /* 5426 * The writer has preempt disable, wait for it. But not forever 5427 * Although, 1 second is pretty much "forever" 5428 */ 5429 #define USECS_WAIT 1000000 5430 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 5431 /* If the write is past the end of page, a writer is still updating it */ 5432 if (likely(!reader || rb_page_write(reader) <= bsize)) 5433 break; 5434 5435 udelay(1); 5436 5437 /* Get the latest version of the reader write value */ 5438 smp_rmb(); 5439 } 5440 5441 /* The writer is not moving forward? Something is wrong */ 5442 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 5443 reader = NULL; 5444 5445 /* 5446 * Make sure we see any padding after the write update 5447 * (see rb_reset_tail()). 5448 * 5449 * In addition, a writer may be writing on the reader page 5450 * if the page has not been fully filled, so the read barrier 5451 * is also needed to make sure we see the content of what is 5452 * committed by the writer (see rb_set_commit_to_write()). 5453 */ 5454 smp_rmb(); 5455 5456 5457 return reader; 5458 } 5459 5460 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 5461 { 5462 struct ring_buffer_event *event; 5463 struct buffer_page *reader; 5464 unsigned length; 5465 5466 reader = rb_get_reader_page(cpu_buffer); 5467 5468 /* This function should not be called when buffer is empty */ 5469 if (RB_WARN_ON(cpu_buffer, !reader)) 5470 return; 5471 5472 event = rb_reader_event(cpu_buffer); 5473 5474 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 5475 cpu_buffer->read++; 5476 5477 rb_update_read_stamp(cpu_buffer, event); 5478 5479 length = rb_event_length(event); 5480 cpu_buffer->reader_page->read += length; 5481 cpu_buffer->read_bytes += length; 5482 } 5483 5484 static void rb_advance_iter(struct ring_buffer_iter *iter) 5485 { 5486 struct ring_buffer_per_cpu *cpu_buffer; 5487 5488 cpu_buffer = iter->cpu_buffer; 5489 5490 /* If head == next_event then we need to jump to the next event */ 5491 if (iter->head == iter->next_event) { 5492 /* If the event gets overwritten again, there's nothing to do */ 5493 if (rb_iter_head_event(iter) == NULL) 5494 return; 5495 } 5496 5497 iter->head = iter->next_event; 5498 5499 /* 5500 * Check if we are at the end of the buffer. 5501 */ 5502 if (iter->next_event >= rb_page_size(iter->head_page)) { 5503 /* discarded commits can make the page empty */ 5504 if (iter->head_page == cpu_buffer->commit_page) 5505 return; 5506 rb_inc_iter(iter); 5507 return; 5508 } 5509 5510 rb_update_iter_read_stamp(iter, iter->event); 5511 } 5512 5513 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 5514 { 5515 return cpu_buffer->lost_events; 5516 } 5517 5518 static struct ring_buffer_event * 5519 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 5520 unsigned long *lost_events) 5521 { 5522 struct ring_buffer_event *event; 5523 struct buffer_page *reader; 5524 int nr_loops = 0; 5525 5526 if (ts) 5527 *ts = 0; 5528 again: 5529 /* 5530 * We repeat when a time extend is encountered. 5531 * Since the time extend is always attached to a data event, 5532 * we should never loop more than once. 5533 * (We never hit the following condition more than twice). 5534 */ 5535 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 5536 return NULL; 5537 5538 reader = rb_get_reader_page(cpu_buffer); 5539 if (!reader) 5540 return NULL; 5541 5542 event = rb_reader_event(cpu_buffer); 5543 5544 switch (event->type_len) { 5545 case RINGBUF_TYPE_PADDING: 5546 if (rb_null_event(event)) 5547 RB_WARN_ON(cpu_buffer, 1); 5548 /* 5549 * Because the writer could be discarding every 5550 * event it creates (which would probably be bad) 5551 * if we were to go back to "again" then we may never 5552 * catch up, and will trigger the warn on, or lock 5553 * the box. Return the padding, and we will release 5554 * the current locks, and try again. 5555 */ 5556 return event; 5557 5558 case RINGBUF_TYPE_TIME_EXTEND: 5559 /* Internal data, OK to advance */ 5560 rb_advance_reader(cpu_buffer); 5561 goto again; 5562 5563 case RINGBUF_TYPE_TIME_STAMP: 5564 if (ts) { 5565 *ts = rb_event_time_stamp(event); 5566 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 5567 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5568 cpu_buffer->cpu, ts); 5569 } 5570 /* Internal data, OK to advance */ 5571 rb_advance_reader(cpu_buffer); 5572 goto again; 5573 5574 case RINGBUF_TYPE_DATA: 5575 if (ts && !(*ts)) { 5576 *ts = cpu_buffer->read_stamp + event->time_delta; 5577 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5578 cpu_buffer->cpu, ts); 5579 } 5580 if (lost_events) 5581 *lost_events = rb_lost_events(cpu_buffer); 5582 return event; 5583 5584 default: 5585 RB_WARN_ON(cpu_buffer, 1); 5586 } 5587 5588 return NULL; 5589 } 5590 EXPORT_SYMBOL_GPL(ring_buffer_peek); 5591 5592 static struct ring_buffer_event * 5593 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5594 { 5595 struct trace_buffer *buffer; 5596 struct ring_buffer_per_cpu *cpu_buffer; 5597 struct ring_buffer_event *event; 5598 int nr_loops = 0; 5599 5600 if (ts) 5601 *ts = 0; 5602 5603 cpu_buffer = iter->cpu_buffer; 5604 buffer = cpu_buffer->buffer; 5605 5606 /* 5607 * Check if someone performed a consuming read to the buffer 5608 * or removed some pages from the buffer. In these cases, 5609 * iterator was invalidated and we need to reset it. 5610 */ 5611 if (unlikely(iter->cache_read != cpu_buffer->read || 5612 iter->cache_reader_page != cpu_buffer->reader_page || 5613 iter->cache_pages_removed != cpu_buffer->pages_removed)) 5614 rb_iter_reset(iter); 5615 5616 again: 5617 if (ring_buffer_iter_empty(iter)) 5618 return NULL; 5619 5620 /* 5621 * As the writer can mess with what the iterator is trying 5622 * to read, just give up if we fail to get an event after 5623 * three tries. The iterator is not as reliable when reading 5624 * the ring buffer with an active write as the consumer is. 5625 * Do not warn if the three failures is reached. 5626 */ 5627 if (++nr_loops > 3) 5628 return NULL; 5629 5630 if (rb_per_cpu_empty(cpu_buffer)) 5631 return NULL; 5632 5633 if (iter->head >= rb_page_size(iter->head_page)) { 5634 rb_inc_iter(iter); 5635 goto again; 5636 } 5637 5638 event = rb_iter_head_event(iter); 5639 if (!event) 5640 goto again; 5641 5642 switch (event->type_len) { 5643 case RINGBUF_TYPE_PADDING: 5644 if (rb_null_event(event)) { 5645 rb_inc_iter(iter); 5646 goto again; 5647 } 5648 rb_advance_iter(iter); 5649 return event; 5650 5651 case RINGBUF_TYPE_TIME_EXTEND: 5652 /* Internal data, OK to advance */ 5653 rb_advance_iter(iter); 5654 goto again; 5655 5656 case RINGBUF_TYPE_TIME_STAMP: 5657 if (ts) { 5658 *ts = rb_event_time_stamp(event); 5659 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 5660 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5661 cpu_buffer->cpu, ts); 5662 } 5663 /* Internal data, OK to advance */ 5664 rb_advance_iter(iter); 5665 goto again; 5666 5667 case RINGBUF_TYPE_DATA: 5668 if (ts && !(*ts)) { 5669 *ts = iter->read_stamp + event->time_delta; 5670 ring_buffer_normalize_time_stamp(buffer, 5671 cpu_buffer->cpu, ts); 5672 } 5673 return event; 5674 5675 default: 5676 RB_WARN_ON(cpu_buffer, 1); 5677 } 5678 5679 return NULL; 5680 } 5681 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 5682 5683 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 5684 { 5685 if (likely(!in_nmi())) { 5686 raw_spin_lock(&cpu_buffer->reader_lock); 5687 return true; 5688 } 5689 5690 /* 5691 * If an NMI die dumps out the content of the ring buffer 5692 * trylock must be used to prevent a deadlock if the NMI 5693 * preempted a task that holds the ring buffer locks. If 5694 * we get the lock then all is fine, if not, then continue 5695 * to do the read, but this can corrupt the ring buffer, 5696 * so it must be permanently disabled from future writes. 5697 * Reading from NMI is a oneshot deal. 5698 */ 5699 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 5700 return true; 5701 5702 /* Continue without locking, but disable the ring buffer */ 5703 atomic_inc(&cpu_buffer->record_disabled); 5704 return false; 5705 } 5706 5707 static inline void 5708 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 5709 { 5710 if (likely(locked)) 5711 raw_spin_unlock(&cpu_buffer->reader_lock); 5712 } 5713 5714 /** 5715 * ring_buffer_peek - peek at the next event to be read 5716 * @buffer: The ring buffer to read 5717 * @cpu: The cpu to peak at 5718 * @ts: The timestamp counter of this event. 5719 * @lost_events: a variable to store if events were lost (may be NULL) 5720 * 5721 * This will return the event that will be read next, but does 5722 * not consume the data. 5723 */ 5724 struct ring_buffer_event * 5725 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 5726 unsigned long *lost_events) 5727 { 5728 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5729 struct ring_buffer_event *event; 5730 unsigned long flags; 5731 bool dolock; 5732 5733 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5734 return NULL; 5735 5736 again: 5737 local_irq_save(flags); 5738 dolock = rb_reader_lock(cpu_buffer); 5739 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5740 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5741 rb_advance_reader(cpu_buffer); 5742 rb_reader_unlock(cpu_buffer, dolock); 5743 local_irq_restore(flags); 5744 5745 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5746 goto again; 5747 5748 return event; 5749 } 5750 5751 /** ring_buffer_iter_dropped - report if there are dropped events 5752 * @iter: The ring buffer iterator 5753 * 5754 * Returns true if there was dropped events since the last peek. 5755 */ 5756 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 5757 { 5758 bool ret = iter->missed_events != 0; 5759 5760 iter->missed_events = 0; 5761 return ret; 5762 } 5763 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 5764 5765 /** 5766 * ring_buffer_iter_peek - peek at the next event to be read 5767 * @iter: The ring buffer iterator 5768 * @ts: The timestamp counter of this event. 5769 * 5770 * This will return the event that will be read next, but does 5771 * not increment the iterator. 5772 */ 5773 struct ring_buffer_event * 5774 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5775 { 5776 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5777 struct ring_buffer_event *event; 5778 unsigned long flags; 5779 5780 again: 5781 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5782 event = rb_iter_peek(iter, ts); 5783 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5784 5785 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5786 goto again; 5787 5788 return event; 5789 } 5790 5791 /** 5792 * ring_buffer_consume - return an event and consume it 5793 * @buffer: The ring buffer to get the next event from 5794 * @cpu: the cpu to read the buffer from 5795 * @ts: a variable to store the timestamp (may be NULL) 5796 * @lost_events: a variable to store if events were lost (may be NULL) 5797 * 5798 * Returns the next event in the ring buffer, and that event is consumed. 5799 * Meaning, that sequential reads will keep returning a different event, 5800 * and eventually empty the ring buffer if the producer is slower. 5801 */ 5802 struct ring_buffer_event * 5803 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5804 unsigned long *lost_events) 5805 { 5806 struct ring_buffer_per_cpu *cpu_buffer; 5807 struct ring_buffer_event *event = NULL; 5808 unsigned long flags; 5809 bool dolock; 5810 5811 again: 5812 /* might be called in atomic */ 5813 preempt_disable(); 5814 5815 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5816 goto out; 5817 5818 cpu_buffer = buffer->buffers[cpu]; 5819 local_irq_save(flags); 5820 dolock = rb_reader_lock(cpu_buffer); 5821 5822 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5823 if (event) { 5824 cpu_buffer->lost_events = 0; 5825 rb_advance_reader(cpu_buffer); 5826 } 5827 5828 rb_reader_unlock(cpu_buffer, dolock); 5829 local_irq_restore(flags); 5830 5831 out: 5832 preempt_enable(); 5833 5834 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5835 goto again; 5836 5837 return event; 5838 } 5839 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5840 5841 /** 5842 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 5843 * @buffer: The ring buffer to read from 5844 * @cpu: The cpu buffer to iterate over 5845 * @flags: gfp flags to use for memory allocation 5846 * 5847 * This performs the initial preparations necessary to iterate 5848 * through the buffer. Memory is allocated, buffer resizing 5849 * is disabled, and the iterator pointer is returned to the caller. 5850 * 5851 * After a sequence of ring_buffer_read_prepare calls, the user is 5852 * expected to make at least one call to ring_buffer_read_prepare_sync. 5853 * Afterwards, ring_buffer_read_start is invoked to get things going 5854 * for real. 5855 * 5856 * This overall must be paired with ring_buffer_read_finish. 5857 */ 5858 struct ring_buffer_iter * 5859 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 5860 { 5861 struct ring_buffer_per_cpu *cpu_buffer; 5862 struct ring_buffer_iter *iter; 5863 5864 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5865 return NULL; 5866 5867 iter = kzalloc(sizeof(*iter), flags); 5868 if (!iter) 5869 return NULL; 5870 5871 /* Holds the entire event: data and meta data */ 5872 iter->event_size = buffer->subbuf_size; 5873 iter->event = kmalloc(iter->event_size, flags); 5874 if (!iter->event) { 5875 kfree(iter); 5876 return NULL; 5877 } 5878 5879 cpu_buffer = buffer->buffers[cpu]; 5880 5881 iter->cpu_buffer = cpu_buffer; 5882 5883 atomic_inc(&cpu_buffer->resize_disabled); 5884 5885 return iter; 5886 } 5887 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 5888 5889 /** 5890 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 5891 * 5892 * All previously invoked ring_buffer_read_prepare calls to prepare 5893 * iterators will be synchronized. Afterwards, read_buffer_read_start 5894 * calls on those iterators are allowed. 5895 */ 5896 void 5897 ring_buffer_read_prepare_sync(void) 5898 { 5899 synchronize_rcu(); 5900 } 5901 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 5902 5903 /** 5904 * ring_buffer_read_start - start a non consuming read of the buffer 5905 * @iter: The iterator returned by ring_buffer_read_prepare 5906 * 5907 * This finalizes the startup of an iteration through the buffer. 5908 * The iterator comes from a call to ring_buffer_read_prepare and 5909 * an intervening ring_buffer_read_prepare_sync must have been 5910 * performed. 5911 * 5912 * Must be paired with ring_buffer_read_finish. 5913 */ 5914 void 5915 ring_buffer_read_start(struct ring_buffer_iter *iter) 5916 { 5917 struct ring_buffer_per_cpu *cpu_buffer; 5918 unsigned long flags; 5919 5920 if (!iter) 5921 return; 5922 5923 cpu_buffer = iter->cpu_buffer; 5924 5925 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5926 arch_spin_lock(&cpu_buffer->lock); 5927 rb_iter_reset(iter); 5928 arch_spin_unlock(&cpu_buffer->lock); 5929 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5930 } 5931 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5932 5933 /** 5934 * ring_buffer_read_finish - finish reading the iterator of the buffer 5935 * @iter: The iterator retrieved by ring_buffer_start 5936 * 5937 * This re-enables resizing of the buffer, and frees the iterator. 5938 */ 5939 void 5940 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5941 { 5942 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5943 5944 /* Use this opportunity to check the integrity of the ring buffer. */ 5945 rb_check_pages(cpu_buffer); 5946 5947 atomic_dec(&cpu_buffer->resize_disabled); 5948 kfree(iter->event); 5949 kfree(iter); 5950 } 5951 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 5952 5953 /** 5954 * ring_buffer_iter_advance - advance the iterator to the next location 5955 * @iter: The ring buffer iterator 5956 * 5957 * Move the location of the iterator such that the next read will 5958 * be the next location of the iterator. 5959 */ 5960 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 5961 { 5962 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5963 unsigned long flags; 5964 5965 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5966 5967 rb_advance_iter(iter); 5968 5969 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5970 } 5971 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 5972 5973 /** 5974 * ring_buffer_size - return the size of the ring buffer (in bytes) 5975 * @buffer: The ring buffer. 5976 * @cpu: The CPU to get ring buffer size from. 5977 */ 5978 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 5979 { 5980 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5981 return 0; 5982 5983 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 5984 } 5985 EXPORT_SYMBOL_GPL(ring_buffer_size); 5986 5987 /** 5988 * ring_buffer_max_event_size - return the max data size of an event 5989 * @buffer: The ring buffer. 5990 * 5991 * Returns the maximum size an event can be. 5992 */ 5993 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 5994 { 5995 /* If abs timestamp is requested, events have a timestamp too */ 5996 if (ring_buffer_time_stamp_abs(buffer)) 5997 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 5998 return buffer->max_data_size; 5999 } 6000 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 6001 6002 static void rb_clear_buffer_page(struct buffer_page *page) 6003 { 6004 local_set(&page->write, 0); 6005 local_set(&page->entries, 0); 6006 rb_init_page(page->page); 6007 page->read = 0; 6008 } 6009 6010 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6011 { 6012 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6013 6014 if (!meta) 6015 return; 6016 6017 meta->reader.read = cpu_buffer->reader_page->read; 6018 meta->reader.id = cpu_buffer->reader_page->id; 6019 meta->reader.lost_events = cpu_buffer->lost_events; 6020 6021 meta->entries = local_read(&cpu_buffer->entries); 6022 meta->overrun = local_read(&cpu_buffer->overrun); 6023 meta->read = cpu_buffer->read; 6024 6025 /* Some archs do not have data cache coherency between kernel and user-space */ 6026 flush_dcache_folio(virt_to_folio(cpu_buffer->meta_page)); 6027 } 6028 6029 static void 6030 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 6031 { 6032 struct buffer_page *page; 6033 6034 rb_head_page_deactivate(cpu_buffer); 6035 6036 cpu_buffer->head_page 6037 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6038 rb_clear_buffer_page(cpu_buffer->head_page); 6039 list_for_each_entry(page, cpu_buffer->pages, list) { 6040 rb_clear_buffer_page(page); 6041 } 6042 6043 cpu_buffer->tail_page = cpu_buffer->head_page; 6044 cpu_buffer->commit_page = cpu_buffer->head_page; 6045 6046 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 6047 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6048 rb_clear_buffer_page(cpu_buffer->reader_page); 6049 6050 local_set(&cpu_buffer->entries_bytes, 0); 6051 local_set(&cpu_buffer->overrun, 0); 6052 local_set(&cpu_buffer->commit_overrun, 0); 6053 local_set(&cpu_buffer->dropped_events, 0); 6054 local_set(&cpu_buffer->entries, 0); 6055 local_set(&cpu_buffer->committing, 0); 6056 local_set(&cpu_buffer->commits, 0); 6057 local_set(&cpu_buffer->pages_touched, 0); 6058 local_set(&cpu_buffer->pages_lost, 0); 6059 local_set(&cpu_buffer->pages_read, 0); 6060 cpu_buffer->last_pages_touch = 0; 6061 cpu_buffer->shortest_full = 0; 6062 cpu_buffer->read = 0; 6063 cpu_buffer->read_bytes = 0; 6064 6065 rb_time_set(&cpu_buffer->write_stamp, 0); 6066 rb_time_set(&cpu_buffer->before_stamp, 0); 6067 6068 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 6069 6070 cpu_buffer->lost_events = 0; 6071 cpu_buffer->last_overrun = 0; 6072 6073 rb_head_page_activate(cpu_buffer); 6074 cpu_buffer->pages_removed = 0; 6075 6076 if (cpu_buffer->mapped) { 6077 rb_update_meta_page(cpu_buffer); 6078 if (cpu_buffer->ring_meta) { 6079 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 6080 meta->commit_buffer = meta->head_buffer; 6081 } 6082 } 6083 } 6084 6085 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 6086 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6087 { 6088 unsigned long flags; 6089 6090 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6091 6092 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 6093 goto out; 6094 6095 arch_spin_lock(&cpu_buffer->lock); 6096 6097 rb_reset_cpu(cpu_buffer); 6098 6099 arch_spin_unlock(&cpu_buffer->lock); 6100 6101 out: 6102 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6103 } 6104 6105 /** 6106 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 6107 * @buffer: The ring buffer to reset a per cpu buffer of 6108 * @cpu: The CPU buffer to be reset 6109 */ 6110 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 6111 { 6112 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6113 struct ring_buffer_cpu_meta *meta; 6114 6115 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6116 return; 6117 6118 /* prevent another thread from changing buffer sizes */ 6119 mutex_lock(&buffer->mutex); 6120 6121 atomic_inc(&cpu_buffer->resize_disabled); 6122 atomic_inc(&cpu_buffer->record_disabled); 6123 6124 /* Make sure all commits have finished */ 6125 synchronize_rcu(); 6126 6127 reset_disabled_cpu_buffer(cpu_buffer); 6128 6129 atomic_dec(&cpu_buffer->record_disabled); 6130 atomic_dec(&cpu_buffer->resize_disabled); 6131 6132 /* Make sure persistent meta now uses this buffer's addresses */ 6133 meta = rb_range_meta(buffer, 0, cpu_buffer->cpu); 6134 if (meta) 6135 rb_meta_init_text_addr(meta); 6136 6137 mutex_unlock(&buffer->mutex); 6138 } 6139 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 6140 6141 /* Flag to ensure proper resetting of atomic variables */ 6142 #define RESET_BIT (1 << 30) 6143 6144 /** 6145 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 6146 * @buffer: The ring buffer to reset a per cpu buffer of 6147 */ 6148 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 6149 { 6150 struct ring_buffer_per_cpu *cpu_buffer; 6151 struct ring_buffer_cpu_meta *meta; 6152 int cpu; 6153 6154 /* prevent another thread from changing buffer sizes */ 6155 mutex_lock(&buffer->mutex); 6156 6157 for_each_online_buffer_cpu(buffer, cpu) { 6158 cpu_buffer = buffer->buffers[cpu]; 6159 6160 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 6161 atomic_inc(&cpu_buffer->record_disabled); 6162 } 6163 6164 /* Make sure all commits have finished */ 6165 synchronize_rcu(); 6166 6167 for_each_buffer_cpu(buffer, cpu) { 6168 cpu_buffer = buffer->buffers[cpu]; 6169 6170 /* 6171 * If a CPU came online during the synchronize_rcu(), then 6172 * ignore it. 6173 */ 6174 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 6175 continue; 6176 6177 reset_disabled_cpu_buffer(cpu_buffer); 6178 6179 /* Make sure persistent meta now uses this buffer's addresses */ 6180 meta = rb_range_meta(buffer, 0, cpu_buffer->cpu); 6181 if (meta) 6182 rb_meta_init_text_addr(meta); 6183 6184 atomic_dec(&cpu_buffer->record_disabled); 6185 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 6186 } 6187 6188 mutex_unlock(&buffer->mutex); 6189 } 6190 6191 /** 6192 * ring_buffer_reset - reset a ring buffer 6193 * @buffer: The ring buffer to reset all cpu buffers 6194 */ 6195 void ring_buffer_reset(struct trace_buffer *buffer) 6196 { 6197 struct ring_buffer_per_cpu *cpu_buffer; 6198 int cpu; 6199 6200 /* prevent another thread from changing buffer sizes */ 6201 mutex_lock(&buffer->mutex); 6202 6203 for_each_buffer_cpu(buffer, cpu) { 6204 cpu_buffer = buffer->buffers[cpu]; 6205 6206 atomic_inc(&cpu_buffer->resize_disabled); 6207 atomic_inc(&cpu_buffer->record_disabled); 6208 } 6209 6210 /* Make sure all commits have finished */ 6211 synchronize_rcu(); 6212 6213 for_each_buffer_cpu(buffer, cpu) { 6214 cpu_buffer = buffer->buffers[cpu]; 6215 6216 reset_disabled_cpu_buffer(cpu_buffer); 6217 6218 atomic_dec(&cpu_buffer->record_disabled); 6219 atomic_dec(&cpu_buffer->resize_disabled); 6220 } 6221 6222 mutex_unlock(&buffer->mutex); 6223 } 6224 EXPORT_SYMBOL_GPL(ring_buffer_reset); 6225 6226 /** 6227 * ring_buffer_empty - is the ring buffer empty? 6228 * @buffer: The ring buffer to test 6229 */ 6230 bool ring_buffer_empty(struct trace_buffer *buffer) 6231 { 6232 struct ring_buffer_per_cpu *cpu_buffer; 6233 unsigned long flags; 6234 bool dolock; 6235 bool ret; 6236 int cpu; 6237 6238 /* yes this is racy, but if you don't like the race, lock the buffer */ 6239 for_each_buffer_cpu(buffer, cpu) { 6240 cpu_buffer = buffer->buffers[cpu]; 6241 local_irq_save(flags); 6242 dolock = rb_reader_lock(cpu_buffer); 6243 ret = rb_per_cpu_empty(cpu_buffer); 6244 rb_reader_unlock(cpu_buffer, dolock); 6245 local_irq_restore(flags); 6246 6247 if (!ret) 6248 return false; 6249 } 6250 6251 return true; 6252 } 6253 EXPORT_SYMBOL_GPL(ring_buffer_empty); 6254 6255 /** 6256 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 6257 * @buffer: The ring buffer 6258 * @cpu: The CPU buffer to test 6259 */ 6260 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 6261 { 6262 struct ring_buffer_per_cpu *cpu_buffer; 6263 unsigned long flags; 6264 bool dolock; 6265 bool ret; 6266 6267 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6268 return true; 6269 6270 cpu_buffer = buffer->buffers[cpu]; 6271 local_irq_save(flags); 6272 dolock = rb_reader_lock(cpu_buffer); 6273 ret = rb_per_cpu_empty(cpu_buffer); 6274 rb_reader_unlock(cpu_buffer, dolock); 6275 local_irq_restore(flags); 6276 6277 return ret; 6278 } 6279 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 6280 6281 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 6282 /** 6283 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 6284 * @buffer_a: One buffer to swap with 6285 * @buffer_b: The other buffer to swap with 6286 * @cpu: the CPU of the buffers to swap 6287 * 6288 * This function is useful for tracers that want to take a "snapshot" 6289 * of a CPU buffer and has another back up buffer lying around. 6290 * it is expected that the tracer handles the cpu buffer not being 6291 * used at the moment. 6292 */ 6293 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 6294 struct trace_buffer *buffer_b, int cpu) 6295 { 6296 struct ring_buffer_per_cpu *cpu_buffer_a; 6297 struct ring_buffer_per_cpu *cpu_buffer_b; 6298 int ret = -EINVAL; 6299 6300 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 6301 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 6302 goto out; 6303 6304 cpu_buffer_a = buffer_a->buffers[cpu]; 6305 cpu_buffer_b = buffer_b->buffers[cpu]; 6306 6307 /* It's up to the callers to not try to swap mapped buffers */ 6308 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) { 6309 ret = -EBUSY; 6310 goto out; 6311 } 6312 6313 /* At least make sure the two buffers are somewhat the same */ 6314 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 6315 goto out; 6316 6317 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 6318 goto out; 6319 6320 ret = -EAGAIN; 6321 6322 if (atomic_read(&buffer_a->record_disabled)) 6323 goto out; 6324 6325 if (atomic_read(&buffer_b->record_disabled)) 6326 goto out; 6327 6328 if (atomic_read(&cpu_buffer_a->record_disabled)) 6329 goto out; 6330 6331 if (atomic_read(&cpu_buffer_b->record_disabled)) 6332 goto out; 6333 6334 /* 6335 * We can't do a synchronize_rcu here because this 6336 * function can be called in atomic context. 6337 * Normally this will be called from the same CPU as cpu. 6338 * If not it's up to the caller to protect this. 6339 */ 6340 atomic_inc(&cpu_buffer_a->record_disabled); 6341 atomic_inc(&cpu_buffer_b->record_disabled); 6342 6343 ret = -EBUSY; 6344 if (local_read(&cpu_buffer_a->committing)) 6345 goto out_dec; 6346 if (local_read(&cpu_buffer_b->committing)) 6347 goto out_dec; 6348 6349 /* 6350 * When resize is in progress, we cannot swap it because 6351 * it will mess the state of the cpu buffer. 6352 */ 6353 if (atomic_read(&buffer_a->resizing)) 6354 goto out_dec; 6355 if (atomic_read(&buffer_b->resizing)) 6356 goto out_dec; 6357 6358 buffer_a->buffers[cpu] = cpu_buffer_b; 6359 buffer_b->buffers[cpu] = cpu_buffer_a; 6360 6361 cpu_buffer_b->buffer = buffer_a; 6362 cpu_buffer_a->buffer = buffer_b; 6363 6364 ret = 0; 6365 6366 out_dec: 6367 atomic_dec(&cpu_buffer_a->record_disabled); 6368 atomic_dec(&cpu_buffer_b->record_disabled); 6369 out: 6370 return ret; 6371 } 6372 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 6373 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 6374 6375 /** 6376 * ring_buffer_alloc_read_page - allocate a page to read from buffer 6377 * @buffer: the buffer to allocate for. 6378 * @cpu: the cpu buffer to allocate. 6379 * 6380 * This function is used in conjunction with ring_buffer_read_page. 6381 * When reading a full page from the ring buffer, these functions 6382 * can be used to speed up the process. The calling function should 6383 * allocate a few pages first with this function. Then when it 6384 * needs to get pages from the ring buffer, it passes the result 6385 * of this function into ring_buffer_read_page, which will swap 6386 * the page that was allocated, with the read page of the buffer. 6387 * 6388 * Returns: 6389 * The page allocated, or ERR_PTR 6390 */ 6391 struct buffer_data_read_page * 6392 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 6393 { 6394 struct ring_buffer_per_cpu *cpu_buffer; 6395 struct buffer_data_read_page *bpage = NULL; 6396 unsigned long flags; 6397 struct page *page; 6398 6399 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6400 return ERR_PTR(-ENODEV); 6401 6402 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 6403 if (!bpage) 6404 return ERR_PTR(-ENOMEM); 6405 6406 bpage->order = buffer->subbuf_order; 6407 cpu_buffer = buffer->buffers[cpu]; 6408 local_irq_save(flags); 6409 arch_spin_lock(&cpu_buffer->lock); 6410 6411 if (cpu_buffer->free_page) { 6412 bpage->data = cpu_buffer->free_page; 6413 cpu_buffer->free_page = NULL; 6414 } 6415 6416 arch_spin_unlock(&cpu_buffer->lock); 6417 local_irq_restore(flags); 6418 6419 if (bpage->data) 6420 goto out; 6421 6422 page = alloc_pages_node(cpu_to_node(cpu), 6423 GFP_KERNEL | __GFP_NORETRY | __GFP_COMP | __GFP_ZERO, 6424 cpu_buffer->buffer->subbuf_order); 6425 if (!page) { 6426 kfree(bpage); 6427 return ERR_PTR(-ENOMEM); 6428 } 6429 6430 bpage->data = page_address(page); 6431 6432 out: 6433 rb_init_page(bpage->data); 6434 6435 return bpage; 6436 } 6437 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 6438 6439 /** 6440 * ring_buffer_free_read_page - free an allocated read page 6441 * @buffer: the buffer the page was allocate for 6442 * @cpu: the cpu buffer the page came from 6443 * @data_page: the page to free 6444 * 6445 * Free a page allocated from ring_buffer_alloc_read_page. 6446 */ 6447 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 6448 struct buffer_data_read_page *data_page) 6449 { 6450 struct ring_buffer_per_cpu *cpu_buffer; 6451 struct buffer_data_page *bpage = data_page->data; 6452 struct page *page = virt_to_page(bpage); 6453 unsigned long flags; 6454 6455 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 6456 return; 6457 6458 cpu_buffer = buffer->buffers[cpu]; 6459 6460 /* 6461 * If the page is still in use someplace else, or order of the page 6462 * is different from the subbuffer order of the buffer - 6463 * we can't reuse it 6464 */ 6465 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 6466 goto out; 6467 6468 local_irq_save(flags); 6469 arch_spin_lock(&cpu_buffer->lock); 6470 6471 if (!cpu_buffer->free_page) { 6472 cpu_buffer->free_page = bpage; 6473 bpage = NULL; 6474 } 6475 6476 arch_spin_unlock(&cpu_buffer->lock); 6477 local_irq_restore(flags); 6478 6479 out: 6480 free_pages((unsigned long)bpage, data_page->order); 6481 kfree(data_page); 6482 } 6483 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 6484 6485 /** 6486 * ring_buffer_read_page - extract a page from the ring buffer 6487 * @buffer: buffer to extract from 6488 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 6489 * @len: amount to extract 6490 * @cpu: the cpu of the buffer to extract 6491 * @full: should the extraction only happen when the page is full. 6492 * 6493 * This function will pull out a page from the ring buffer and consume it. 6494 * @data_page must be the address of the variable that was returned 6495 * from ring_buffer_alloc_read_page. This is because the page might be used 6496 * to swap with a page in the ring buffer. 6497 * 6498 * for example: 6499 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 6500 * if (IS_ERR(rpage)) 6501 * return PTR_ERR(rpage); 6502 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 6503 * if (ret >= 0) 6504 * process_page(ring_buffer_read_page_data(rpage), ret); 6505 * ring_buffer_free_read_page(buffer, cpu, rpage); 6506 * 6507 * When @full is set, the function will not return true unless 6508 * the writer is off the reader page. 6509 * 6510 * Note: it is up to the calling functions to handle sleeps and wakeups. 6511 * The ring buffer can be used anywhere in the kernel and can not 6512 * blindly call wake_up. The layer that uses the ring buffer must be 6513 * responsible for that. 6514 * 6515 * Returns: 6516 * >=0 if data has been transferred, returns the offset of consumed data. 6517 * <0 if no data has been transferred. 6518 */ 6519 int ring_buffer_read_page(struct trace_buffer *buffer, 6520 struct buffer_data_read_page *data_page, 6521 size_t len, int cpu, int full) 6522 { 6523 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6524 struct ring_buffer_event *event; 6525 struct buffer_data_page *bpage; 6526 struct buffer_page *reader; 6527 unsigned long missed_events; 6528 unsigned long flags; 6529 unsigned int commit; 6530 unsigned int read; 6531 u64 save_timestamp; 6532 int ret = -1; 6533 6534 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6535 goto out; 6536 6537 /* 6538 * If len is not big enough to hold the page header, then 6539 * we can not copy anything. 6540 */ 6541 if (len <= BUF_PAGE_HDR_SIZE) 6542 goto out; 6543 6544 len -= BUF_PAGE_HDR_SIZE; 6545 6546 if (!data_page || !data_page->data) 6547 goto out; 6548 if (data_page->order != buffer->subbuf_order) 6549 goto out; 6550 6551 bpage = data_page->data; 6552 if (!bpage) 6553 goto out; 6554 6555 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6556 6557 reader = rb_get_reader_page(cpu_buffer); 6558 if (!reader) 6559 goto out_unlock; 6560 6561 event = rb_reader_event(cpu_buffer); 6562 6563 read = reader->read; 6564 commit = rb_page_size(reader); 6565 6566 /* Check if any events were dropped */ 6567 missed_events = cpu_buffer->lost_events; 6568 6569 /* 6570 * If this page has been partially read or 6571 * if len is not big enough to read the rest of the page or 6572 * a writer is still on the page, then 6573 * we must copy the data from the page to the buffer. 6574 * Otherwise, we can simply swap the page with the one passed in. 6575 */ 6576 if (read || (len < (commit - read)) || 6577 cpu_buffer->reader_page == cpu_buffer->commit_page || 6578 cpu_buffer->mapped) { 6579 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 6580 unsigned int rpos = read; 6581 unsigned int pos = 0; 6582 unsigned int size; 6583 6584 /* 6585 * If a full page is expected, this can still be returned 6586 * if there's been a previous partial read and the 6587 * rest of the page can be read and the commit page is off 6588 * the reader page. 6589 */ 6590 if (full && 6591 (!read || (len < (commit - read)) || 6592 cpu_buffer->reader_page == cpu_buffer->commit_page)) 6593 goto out_unlock; 6594 6595 if (len > (commit - read)) 6596 len = (commit - read); 6597 6598 /* Always keep the time extend and data together */ 6599 size = rb_event_ts_length(event); 6600 6601 if (len < size) 6602 goto out_unlock; 6603 6604 /* save the current timestamp, since the user will need it */ 6605 save_timestamp = cpu_buffer->read_stamp; 6606 6607 /* Need to copy one event at a time */ 6608 do { 6609 /* We need the size of one event, because 6610 * rb_advance_reader only advances by one event, 6611 * whereas rb_event_ts_length may include the size of 6612 * one or two events. 6613 * We have already ensured there's enough space if this 6614 * is a time extend. */ 6615 size = rb_event_length(event); 6616 memcpy(bpage->data + pos, rpage->data + rpos, size); 6617 6618 len -= size; 6619 6620 rb_advance_reader(cpu_buffer); 6621 rpos = reader->read; 6622 pos += size; 6623 6624 if (rpos >= commit) 6625 break; 6626 6627 event = rb_reader_event(cpu_buffer); 6628 /* Always keep the time extend and data together */ 6629 size = rb_event_ts_length(event); 6630 } while (len >= size); 6631 6632 /* update bpage */ 6633 local_set(&bpage->commit, pos); 6634 bpage->time_stamp = save_timestamp; 6635 6636 /* we copied everything to the beginning */ 6637 read = 0; 6638 } else { 6639 /* update the entry counter */ 6640 cpu_buffer->read += rb_page_entries(reader); 6641 cpu_buffer->read_bytes += rb_page_size(reader); 6642 6643 /* swap the pages */ 6644 rb_init_page(bpage); 6645 bpage = reader->page; 6646 reader->page = data_page->data; 6647 local_set(&reader->write, 0); 6648 local_set(&reader->entries, 0); 6649 reader->read = 0; 6650 data_page->data = bpage; 6651 6652 /* 6653 * Use the real_end for the data size, 6654 * This gives us a chance to store the lost events 6655 * on the page. 6656 */ 6657 if (reader->real_end) 6658 local_set(&bpage->commit, reader->real_end); 6659 } 6660 ret = read; 6661 6662 cpu_buffer->lost_events = 0; 6663 6664 commit = local_read(&bpage->commit); 6665 /* 6666 * Set a flag in the commit field if we lost events 6667 */ 6668 if (missed_events) { 6669 /* If there is room at the end of the page to save the 6670 * missed events, then record it there. 6671 */ 6672 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 6673 memcpy(&bpage->data[commit], &missed_events, 6674 sizeof(missed_events)); 6675 local_add(RB_MISSED_STORED, &bpage->commit); 6676 commit += sizeof(missed_events); 6677 } 6678 local_add(RB_MISSED_EVENTS, &bpage->commit); 6679 } 6680 6681 /* 6682 * This page may be off to user land. Zero it out here. 6683 */ 6684 if (commit < buffer->subbuf_size) 6685 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 6686 6687 out_unlock: 6688 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6689 6690 out: 6691 return ret; 6692 } 6693 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 6694 6695 /** 6696 * ring_buffer_read_page_data - get pointer to the data in the page. 6697 * @page: the page to get the data from 6698 * 6699 * Returns pointer to the actual data in this page. 6700 */ 6701 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 6702 { 6703 return page->data; 6704 } 6705 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 6706 6707 /** 6708 * ring_buffer_subbuf_size_get - get size of the sub buffer. 6709 * @buffer: the buffer to get the sub buffer size from 6710 * 6711 * Returns size of the sub buffer, in bytes. 6712 */ 6713 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 6714 { 6715 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6716 } 6717 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 6718 6719 /** 6720 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 6721 * @buffer: The ring_buffer to get the system sub page order from 6722 * 6723 * By default, one ring buffer sub page equals to one system page. This parameter 6724 * is configurable, per ring buffer. The size of the ring buffer sub page can be 6725 * extended, but must be an order of system page size. 6726 * 6727 * Returns the order of buffer sub page size, in system pages: 6728 * 0 means the sub buffer size is 1 system page and so forth. 6729 * In case of an error < 0 is returned. 6730 */ 6731 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 6732 { 6733 if (!buffer) 6734 return -EINVAL; 6735 6736 return buffer->subbuf_order; 6737 } 6738 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 6739 6740 /** 6741 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 6742 * @buffer: The ring_buffer to set the new page size. 6743 * @order: Order of the system pages in one sub buffer page 6744 * 6745 * By default, one ring buffer pages equals to one system page. This API can be 6746 * used to set new size of the ring buffer page. The size must be order of 6747 * system page size, that's why the input parameter @order is the order of 6748 * system pages that are allocated for one ring buffer page: 6749 * 0 - 1 system page 6750 * 1 - 2 system pages 6751 * 3 - 4 system pages 6752 * ... 6753 * 6754 * Returns 0 on success or < 0 in case of an error. 6755 */ 6756 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 6757 { 6758 struct ring_buffer_per_cpu *cpu_buffer; 6759 struct buffer_page *bpage, *tmp; 6760 int old_order, old_size; 6761 int nr_pages; 6762 int psize; 6763 int err; 6764 int cpu; 6765 6766 if (!buffer || order < 0) 6767 return -EINVAL; 6768 6769 if (buffer->subbuf_order == order) 6770 return 0; 6771 6772 psize = (1 << order) * PAGE_SIZE; 6773 if (psize <= BUF_PAGE_HDR_SIZE) 6774 return -EINVAL; 6775 6776 /* Size of a subbuf cannot be greater than the write counter */ 6777 if (psize > RB_WRITE_MASK + 1) 6778 return -EINVAL; 6779 6780 old_order = buffer->subbuf_order; 6781 old_size = buffer->subbuf_size; 6782 6783 /* prevent another thread from changing buffer sizes */ 6784 mutex_lock(&buffer->mutex); 6785 atomic_inc(&buffer->record_disabled); 6786 6787 /* Make sure all commits have finished */ 6788 synchronize_rcu(); 6789 6790 buffer->subbuf_order = order; 6791 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 6792 6793 /* Make sure all new buffers are allocated, before deleting the old ones */ 6794 for_each_buffer_cpu(buffer, cpu) { 6795 6796 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6797 continue; 6798 6799 cpu_buffer = buffer->buffers[cpu]; 6800 6801 if (cpu_buffer->mapped) { 6802 err = -EBUSY; 6803 goto error; 6804 } 6805 6806 /* Update the number of pages to match the new size */ 6807 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 6808 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 6809 6810 /* we need a minimum of two pages */ 6811 if (nr_pages < 2) 6812 nr_pages = 2; 6813 6814 cpu_buffer->nr_pages_to_update = nr_pages; 6815 6816 /* Include the reader page */ 6817 nr_pages++; 6818 6819 /* Allocate the new size buffer */ 6820 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6821 if (__rb_allocate_pages(cpu_buffer, nr_pages, 6822 &cpu_buffer->new_pages)) { 6823 /* not enough memory for new pages */ 6824 err = -ENOMEM; 6825 goto error; 6826 } 6827 } 6828 6829 for_each_buffer_cpu(buffer, cpu) { 6830 struct buffer_data_page *old_free_data_page; 6831 struct list_head old_pages; 6832 unsigned long flags; 6833 6834 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6835 continue; 6836 6837 cpu_buffer = buffer->buffers[cpu]; 6838 6839 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6840 6841 /* Clear the head bit to make the link list normal to read */ 6842 rb_head_page_deactivate(cpu_buffer); 6843 6844 /* 6845 * Collect buffers from the cpu_buffer pages list and the 6846 * reader_page on old_pages, so they can be freed later when not 6847 * under a spinlock. The pages list is a linked list with no 6848 * head, adding old_pages turns it into a regular list with 6849 * old_pages being the head. 6850 */ 6851 list_add(&old_pages, cpu_buffer->pages); 6852 list_add(&cpu_buffer->reader_page->list, &old_pages); 6853 6854 /* One page was allocated for the reader page */ 6855 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 6856 struct buffer_page, list); 6857 list_del_init(&cpu_buffer->reader_page->list); 6858 6859 /* Install the new pages, remove the head from the list */ 6860 cpu_buffer->pages = cpu_buffer->new_pages.next; 6861 list_del_init(&cpu_buffer->new_pages); 6862 cpu_buffer->cnt++; 6863 6864 cpu_buffer->head_page 6865 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6866 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 6867 6868 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 6869 cpu_buffer->nr_pages_to_update = 0; 6870 6871 old_free_data_page = cpu_buffer->free_page; 6872 cpu_buffer->free_page = NULL; 6873 6874 rb_head_page_activate(cpu_buffer); 6875 6876 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6877 6878 /* Free old sub buffers */ 6879 list_for_each_entry_safe(bpage, tmp, &old_pages, list) { 6880 list_del_init(&bpage->list); 6881 free_buffer_page(bpage); 6882 } 6883 free_pages((unsigned long)old_free_data_page, old_order); 6884 6885 rb_check_pages(cpu_buffer); 6886 } 6887 6888 atomic_dec(&buffer->record_disabled); 6889 mutex_unlock(&buffer->mutex); 6890 6891 return 0; 6892 6893 error: 6894 buffer->subbuf_order = old_order; 6895 buffer->subbuf_size = old_size; 6896 6897 atomic_dec(&buffer->record_disabled); 6898 mutex_unlock(&buffer->mutex); 6899 6900 for_each_buffer_cpu(buffer, cpu) { 6901 cpu_buffer = buffer->buffers[cpu]; 6902 6903 if (!cpu_buffer->nr_pages_to_update) 6904 continue; 6905 6906 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 6907 list_del_init(&bpage->list); 6908 free_buffer_page(bpage); 6909 } 6910 } 6911 6912 return err; 6913 } 6914 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 6915 6916 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6917 { 6918 struct page *page; 6919 6920 if (cpu_buffer->meta_page) 6921 return 0; 6922 6923 page = alloc_page(GFP_USER | __GFP_ZERO); 6924 if (!page) 6925 return -ENOMEM; 6926 6927 cpu_buffer->meta_page = page_to_virt(page); 6928 6929 return 0; 6930 } 6931 6932 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6933 { 6934 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 6935 6936 free_page(addr); 6937 cpu_buffer->meta_page = NULL; 6938 } 6939 6940 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 6941 unsigned long *subbuf_ids) 6942 { 6943 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6944 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 6945 struct buffer_page *first_subbuf, *subbuf; 6946 int id = 0; 6947 6948 subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page; 6949 cpu_buffer->reader_page->id = id++; 6950 6951 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 6952 do { 6953 if (WARN_ON(id >= nr_subbufs)) 6954 break; 6955 6956 subbuf_ids[id] = (unsigned long)subbuf->page; 6957 subbuf->id = id; 6958 6959 rb_inc_page(&subbuf); 6960 id++; 6961 } while (subbuf != first_subbuf); 6962 6963 /* install subbuf ID to kern VA translation */ 6964 cpu_buffer->subbuf_ids = subbuf_ids; 6965 6966 meta->meta_struct_len = sizeof(*meta); 6967 meta->nr_subbufs = nr_subbufs; 6968 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6969 meta->meta_page_size = meta->subbuf_size; 6970 6971 rb_update_meta_page(cpu_buffer); 6972 } 6973 6974 static struct ring_buffer_per_cpu * 6975 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 6976 { 6977 struct ring_buffer_per_cpu *cpu_buffer; 6978 6979 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6980 return ERR_PTR(-EINVAL); 6981 6982 cpu_buffer = buffer->buffers[cpu]; 6983 6984 mutex_lock(&cpu_buffer->mapping_lock); 6985 6986 if (!cpu_buffer->user_mapped) { 6987 mutex_unlock(&cpu_buffer->mapping_lock); 6988 return ERR_PTR(-ENODEV); 6989 } 6990 6991 return cpu_buffer; 6992 } 6993 6994 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6995 { 6996 mutex_unlock(&cpu_buffer->mapping_lock); 6997 } 6998 6999 /* 7000 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 7001 * to be set-up or torn-down. 7002 */ 7003 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 7004 bool inc) 7005 { 7006 unsigned long flags; 7007 7008 lockdep_assert_held(&cpu_buffer->mapping_lock); 7009 7010 /* mapped is always greater or equal to user_mapped */ 7011 if (WARN_ON(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7012 return -EINVAL; 7013 7014 if (inc && cpu_buffer->mapped == UINT_MAX) 7015 return -EBUSY; 7016 7017 if (WARN_ON(!inc && cpu_buffer->user_mapped == 0)) 7018 return -EINVAL; 7019 7020 mutex_lock(&cpu_buffer->buffer->mutex); 7021 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7022 7023 if (inc) { 7024 cpu_buffer->user_mapped++; 7025 cpu_buffer->mapped++; 7026 } else { 7027 cpu_buffer->user_mapped--; 7028 cpu_buffer->mapped--; 7029 } 7030 7031 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7032 mutex_unlock(&cpu_buffer->buffer->mutex); 7033 7034 return 0; 7035 } 7036 7037 /* 7038 * +--------------+ pgoff == 0 7039 * | meta page | 7040 * +--------------+ pgoff == 1 7041 * | subbuffer 0 | 7042 * | | 7043 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 7044 * | subbuffer 1 | 7045 * | | 7046 * ... 7047 */ 7048 #ifdef CONFIG_MMU 7049 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7050 struct vm_area_struct *vma) 7051 { 7052 unsigned long nr_subbufs, nr_pages, nr_vma_pages, pgoff = vma->vm_pgoff; 7053 unsigned int subbuf_pages, subbuf_order; 7054 struct page **pages; 7055 int p = 0, s = 0; 7056 int err; 7057 7058 /* Refuse MP_PRIVATE or writable mappings */ 7059 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 7060 !(vma->vm_flags & VM_MAYSHARE)) 7061 return -EPERM; 7062 7063 subbuf_order = cpu_buffer->buffer->subbuf_order; 7064 subbuf_pages = 1 << subbuf_order; 7065 7066 if (subbuf_order && pgoff % subbuf_pages) 7067 return -EINVAL; 7068 7069 /* 7070 * Make sure the mapping cannot become writable later. Also tell the VM 7071 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 7072 */ 7073 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 7074 VM_MAYWRITE); 7075 7076 lockdep_assert_held(&cpu_buffer->mapping_lock); 7077 7078 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 7079 nr_pages = ((nr_subbufs + 1) << subbuf_order); /* + meta-page */ 7080 if (nr_pages <= pgoff) 7081 return -EINVAL; 7082 7083 nr_pages -= pgoff; 7084 7085 nr_vma_pages = vma_pages(vma); 7086 if (!nr_vma_pages || nr_vma_pages > nr_pages) 7087 return -EINVAL; 7088 7089 nr_pages = nr_vma_pages; 7090 7091 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL); 7092 if (!pages) 7093 return -ENOMEM; 7094 7095 if (!pgoff) { 7096 unsigned long meta_page_padding; 7097 7098 pages[p++] = virt_to_page(cpu_buffer->meta_page); 7099 7100 /* 7101 * Pad with the zero-page to align the meta-page with the 7102 * sub-buffers. 7103 */ 7104 meta_page_padding = subbuf_pages - 1; 7105 while (meta_page_padding-- && p < nr_pages) { 7106 unsigned long __maybe_unused zero_addr = 7107 vma->vm_start + (PAGE_SIZE * p); 7108 7109 pages[p++] = ZERO_PAGE(zero_addr); 7110 } 7111 } else { 7112 /* Skip the meta-page */ 7113 pgoff -= subbuf_pages; 7114 7115 s += pgoff / subbuf_pages; 7116 } 7117 7118 while (p < nr_pages) { 7119 struct page *page; 7120 int off = 0; 7121 7122 if (WARN_ON_ONCE(s >= nr_subbufs)) { 7123 err = -EINVAL; 7124 goto out; 7125 } 7126 7127 page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]); 7128 7129 for (; off < (1 << (subbuf_order)); off++, page++) { 7130 if (p >= nr_pages) 7131 break; 7132 7133 pages[p++] = page; 7134 } 7135 s++; 7136 } 7137 7138 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 7139 7140 out: 7141 kfree(pages); 7142 7143 return err; 7144 } 7145 #else 7146 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7147 struct vm_area_struct *vma) 7148 { 7149 return -EOPNOTSUPP; 7150 } 7151 #endif 7152 7153 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 7154 struct vm_area_struct *vma) 7155 { 7156 struct ring_buffer_per_cpu *cpu_buffer; 7157 unsigned long flags, *subbuf_ids; 7158 int err = 0; 7159 7160 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7161 return -EINVAL; 7162 7163 cpu_buffer = buffer->buffers[cpu]; 7164 7165 mutex_lock(&cpu_buffer->mapping_lock); 7166 7167 if (cpu_buffer->user_mapped) { 7168 err = __rb_map_vma(cpu_buffer, vma); 7169 if (!err) 7170 err = __rb_inc_dec_mapped(cpu_buffer, true); 7171 mutex_unlock(&cpu_buffer->mapping_lock); 7172 return err; 7173 } 7174 7175 /* prevent another thread from changing buffer/sub-buffer sizes */ 7176 mutex_lock(&buffer->mutex); 7177 7178 err = rb_alloc_meta_page(cpu_buffer); 7179 if (err) 7180 goto unlock; 7181 7182 /* subbuf_ids include the reader while nr_pages does not */ 7183 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 7184 if (!subbuf_ids) { 7185 rb_free_meta_page(cpu_buffer); 7186 err = -ENOMEM; 7187 goto unlock; 7188 } 7189 7190 atomic_inc(&cpu_buffer->resize_disabled); 7191 7192 /* 7193 * Lock all readers to block any subbuf swap until the subbuf IDs are 7194 * assigned. 7195 */ 7196 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7197 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 7198 7199 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7200 7201 err = __rb_map_vma(cpu_buffer, vma); 7202 if (!err) { 7203 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7204 /* This is the first time it is mapped by user */ 7205 cpu_buffer->mapped++; 7206 cpu_buffer->user_mapped = 1; 7207 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7208 } else { 7209 kfree(cpu_buffer->subbuf_ids); 7210 cpu_buffer->subbuf_ids = NULL; 7211 rb_free_meta_page(cpu_buffer); 7212 atomic_dec(&cpu_buffer->resize_disabled); 7213 } 7214 7215 unlock: 7216 mutex_unlock(&buffer->mutex); 7217 mutex_unlock(&cpu_buffer->mapping_lock); 7218 7219 return err; 7220 } 7221 7222 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 7223 { 7224 struct ring_buffer_per_cpu *cpu_buffer; 7225 unsigned long flags; 7226 int err = 0; 7227 7228 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7229 return -EINVAL; 7230 7231 cpu_buffer = buffer->buffers[cpu]; 7232 7233 mutex_lock(&cpu_buffer->mapping_lock); 7234 7235 if (!cpu_buffer->user_mapped) { 7236 err = -ENODEV; 7237 goto out; 7238 } else if (cpu_buffer->user_mapped > 1) { 7239 __rb_inc_dec_mapped(cpu_buffer, false); 7240 goto out; 7241 } 7242 7243 mutex_lock(&buffer->mutex); 7244 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7245 7246 /* This is the last user space mapping */ 7247 if (!WARN_ON_ONCE(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7248 cpu_buffer->mapped--; 7249 cpu_buffer->user_mapped = 0; 7250 7251 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7252 7253 kfree(cpu_buffer->subbuf_ids); 7254 cpu_buffer->subbuf_ids = NULL; 7255 rb_free_meta_page(cpu_buffer); 7256 atomic_dec(&cpu_buffer->resize_disabled); 7257 7258 mutex_unlock(&buffer->mutex); 7259 7260 out: 7261 mutex_unlock(&cpu_buffer->mapping_lock); 7262 7263 return err; 7264 } 7265 7266 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 7267 { 7268 struct ring_buffer_per_cpu *cpu_buffer; 7269 struct buffer_page *reader; 7270 unsigned long missed_events; 7271 unsigned long reader_size; 7272 unsigned long flags; 7273 7274 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 7275 if (IS_ERR(cpu_buffer)) 7276 return (int)PTR_ERR(cpu_buffer); 7277 7278 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7279 7280 consume: 7281 if (rb_per_cpu_empty(cpu_buffer)) 7282 goto out; 7283 7284 reader_size = rb_page_size(cpu_buffer->reader_page); 7285 7286 /* 7287 * There are data to be read on the current reader page, we can 7288 * return to the caller. But before that, we assume the latter will read 7289 * everything. Let's update the kernel reader accordingly. 7290 */ 7291 if (cpu_buffer->reader_page->read < reader_size) { 7292 while (cpu_buffer->reader_page->read < reader_size) 7293 rb_advance_reader(cpu_buffer); 7294 goto out; 7295 } 7296 7297 reader = rb_get_reader_page(cpu_buffer); 7298 if (WARN_ON(!reader)) 7299 goto out; 7300 7301 /* Check if any events were dropped */ 7302 missed_events = cpu_buffer->lost_events; 7303 7304 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 7305 if (missed_events) { 7306 struct buffer_data_page *bpage = reader->page; 7307 unsigned int commit; 7308 /* 7309 * Use the real_end for the data size, 7310 * This gives us a chance to store the lost events 7311 * on the page. 7312 */ 7313 if (reader->real_end) 7314 local_set(&bpage->commit, reader->real_end); 7315 /* 7316 * If there is room at the end of the page to save the 7317 * missed events, then record it there. 7318 */ 7319 commit = rb_page_size(reader); 7320 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7321 memcpy(&bpage->data[commit], &missed_events, 7322 sizeof(missed_events)); 7323 local_add(RB_MISSED_STORED, &bpage->commit); 7324 } 7325 local_add(RB_MISSED_EVENTS, &bpage->commit); 7326 } 7327 } else { 7328 /* 7329 * There really shouldn't be any missed events if the commit 7330 * is on the reader page. 7331 */ 7332 WARN_ON_ONCE(missed_events); 7333 } 7334 7335 cpu_buffer->lost_events = 0; 7336 7337 goto consume; 7338 7339 out: 7340 /* Some archs do not have data cache coherency between kernel and user-space */ 7341 flush_dcache_folio(virt_to_folio(cpu_buffer->reader_page->page)); 7342 7343 rb_update_meta_page(cpu_buffer); 7344 7345 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7346 rb_put_mapped_buffer(cpu_buffer); 7347 7348 return 0; 7349 } 7350 7351 /* 7352 * We only allocate new buffers, never free them if the CPU goes down. 7353 * If we were to free the buffer, then the user would lose any trace that was in 7354 * the buffer. 7355 */ 7356 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 7357 { 7358 struct trace_buffer *buffer; 7359 long nr_pages_same; 7360 int cpu_i; 7361 unsigned long nr_pages; 7362 7363 buffer = container_of(node, struct trace_buffer, node); 7364 if (cpumask_test_cpu(cpu, buffer->cpumask)) 7365 return 0; 7366 7367 nr_pages = 0; 7368 nr_pages_same = 1; 7369 /* check if all cpu sizes are same */ 7370 for_each_buffer_cpu(buffer, cpu_i) { 7371 /* fill in the size from first enabled cpu */ 7372 if (nr_pages == 0) 7373 nr_pages = buffer->buffers[cpu_i]->nr_pages; 7374 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 7375 nr_pages_same = 0; 7376 break; 7377 } 7378 } 7379 /* allocate minimum pages, user can later expand it */ 7380 if (!nr_pages_same) 7381 nr_pages = 2; 7382 buffer->buffers[cpu] = 7383 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 7384 if (!buffer->buffers[cpu]) { 7385 WARN(1, "failed to allocate ring buffer on CPU %u\n", 7386 cpu); 7387 return -ENOMEM; 7388 } 7389 smp_wmb(); 7390 cpumask_set_cpu(cpu, buffer->cpumask); 7391 return 0; 7392 } 7393 7394 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 7395 /* 7396 * This is a basic integrity check of the ring buffer. 7397 * Late in the boot cycle this test will run when configured in. 7398 * It will kick off a thread per CPU that will go into a loop 7399 * writing to the per cpu ring buffer various sizes of data. 7400 * Some of the data will be large items, some small. 7401 * 7402 * Another thread is created that goes into a spin, sending out 7403 * IPIs to the other CPUs to also write into the ring buffer. 7404 * this is to test the nesting ability of the buffer. 7405 * 7406 * Basic stats are recorded and reported. If something in the 7407 * ring buffer should happen that's not expected, a big warning 7408 * is displayed and all ring buffers are disabled. 7409 */ 7410 static struct task_struct *rb_threads[NR_CPUS] __initdata; 7411 7412 struct rb_test_data { 7413 struct trace_buffer *buffer; 7414 unsigned long events; 7415 unsigned long bytes_written; 7416 unsigned long bytes_alloc; 7417 unsigned long bytes_dropped; 7418 unsigned long events_nested; 7419 unsigned long bytes_written_nested; 7420 unsigned long bytes_alloc_nested; 7421 unsigned long bytes_dropped_nested; 7422 int min_size_nested; 7423 int max_size_nested; 7424 int max_size; 7425 int min_size; 7426 int cpu; 7427 int cnt; 7428 }; 7429 7430 static struct rb_test_data rb_data[NR_CPUS] __initdata; 7431 7432 /* 1 meg per cpu */ 7433 #define RB_TEST_BUFFER_SIZE 1048576 7434 7435 static char rb_string[] __initdata = 7436 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 7437 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 7438 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 7439 7440 static bool rb_test_started __initdata; 7441 7442 struct rb_item { 7443 int size; 7444 char str[]; 7445 }; 7446 7447 static __init int rb_write_something(struct rb_test_data *data, bool nested) 7448 { 7449 struct ring_buffer_event *event; 7450 struct rb_item *item; 7451 bool started; 7452 int event_len; 7453 int size; 7454 int len; 7455 int cnt; 7456 7457 /* Have nested writes different that what is written */ 7458 cnt = data->cnt + (nested ? 27 : 0); 7459 7460 /* Multiply cnt by ~e, to make some unique increment */ 7461 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 7462 7463 len = size + sizeof(struct rb_item); 7464 7465 started = rb_test_started; 7466 /* read rb_test_started before checking buffer enabled */ 7467 smp_rmb(); 7468 7469 event = ring_buffer_lock_reserve(data->buffer, len); 7470 if (!event) { 7471 /* Ignore dropped events before test starts. */ 7472 if (started) { 7473 if (nested) 7474 data->bytes_dropped_nested += len; 7475 else 7476 data->bytes_dropped += len; 7477 } 7478 return len; 7479 } 7480 7481 event_len = ring_buffer_event_length(event); 7482 7483 if (RB_WARN_ON(data->buffer, event_len < len)) 7484 goto out; 7485 7486 item = ring_buffer_event_data(event); 7487 item->size = size; 7488 memcpy(item->str, rb_string, size); 7489 7490 if (nested) { 7491 data->bytes_alloc_nested += event_len; 7492 data->bytes_written_nested += len; 7493 data->events_nested++; 7494 if (!data->min_size_nested || len < data->min_size_nested) 7495 data->min_size_nested = len; 7496 if (len > data->max_size_nested) 7497 data->max_size_nested = len; 7498 } else { 7499 data->bytes_alloc += event_len; 7500 data->bytes_written += len; 7501 data->events++; 7502 if (!data->min_size || len < data->min_size) 7503 data->max_size = len; 7504 if (len > data->max_size) 7505 data->max_size = len; 7506 } 7507 7508 out: 7509 ring_buffer_unlock_commit(data->buffer); 7510 7511 return 0; 7512 } 7513 7514 static __init int rb_test(void *arg) 7515 { 7516 struct rb_test_data *data = arg; 7517 7518 while (!kthread_should_stop()) { 7519 rb_write_something(data, false); 7520 data->cnt++; 7521 7522 set_current_state(TASK_INTERRUPTIBLE); 7523 /* Now sleep between a min of 100-300us and a max of 1ms */ 7524 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 7525 } 7526 7527 return 0; 7528 } 7529 7530 static __init void rb_ipi(void *ignore) 7531 { 7532 struct rb_test_data *data; 7533 int cpu = smp_processor_id(); 7534 7535 data = &rb_data[cpu]; 7536 rb_write_something(data, true); 7537 } 7538 7539 static __init int rb_hammer_test(void *arg) 7540 { 7541 while (!kthread_should_stop()) { 7542 7543 /* Send an IPI to all cpus to write data! */ 7544 smp_call_function(rb_ipi, NULL, 1); 7545 /* No sleep, but for non preempt, let others run */ 7546 schedule(); 7547 } 7548 7549 return 0; 7550 } 7551 7552 static __init int test_ringbuffer(void) 7553 { 7554 struct task_struct *rb_hammer; 7555 struct trace_buffer *buffer; 7556 int cpu; 7557 int ret = 0; 7558 7559 if (security_locked_down(LOCKDOWN_TRACEFS)) { 7560 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 7561 return 0; 7562 } 7563 7564 pr_info("Running ring buffer tests...\n"); 7565 7566 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 7567 if (WARN_ON(!buffer)) 7568 return 0; 7569 7570 /* Disable buffer so that threads can't write to it yet */ 7571 ring_buffer_record_off(buffer); 7572 7573 for_each_online_cpu(cpu) { 7574 rb_data[cpu].buffer = buffer; 7575 rb_data[cpu].cpu = cpu; 7576 rb_data[cpu].cnt = cpu; 7577 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 7578 cpu, "rbtester/%u"); 7579 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 7580 pr_cont("FAILED\n"); 7581 ret = PTR_ERR(rb_threads[cpu]); 7582 goto out_free; 7583 } 7584 } 7585 7586 /* Now create the rb hammer! */ 7587 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 7588 if (WARN_ON(IS_ERR(rb_hammer))) { 7589 pr_cont("FAILED\n"); 7590 ret = PTR_ERR(rb_hammer); 7591 goto out_free; 7592 } 7593 7594 ring_buffer_record_on(buffer); 7595 /* 7596 * Show buffer is enabled before setting rb_test_started. 7597 * Yes there's a small race window where events could be 7598 * dropped and the thread wont catch it. But when a ring 7599 * buffer gets enabled, there will always be some kind of 7600 * delay before other CPUs see it. Thus, we don't care about 7601 * those dropped events. We care about events dropped after 7602 * the threads see that the buffer is active. 7603 */ 7604 smp_wmb(); 7605 rb_test_started = true; 7606 7607 set_current_state(TASK_INTERRUPTIBLE); 7608 /* Just run for 10 seconds */; 7609 schedule_timeout(10 * HZ); 7610 7611 kthread_stop(rb_hammer); 7612 7613 out_free: 7614 for_each_online_cpu(cpu) { 7615 if (!rb_threads[cpu]) 7616 break; 7617 kthread_stop(rb_threads[cpu]); 7618 } 7619 if (ret) { 7620 ring_buffer_free(buffer); 7621 return ret; 7622 } 7623 7624 /* Report! */ 7625 pr_info("finished\n"); 7626 for_each_online_cpu(cpu) { 7627 struct ring_buffer_event *event; 7628 struct rb_test_data *data = &rb_data[cpu]; 7629 struct rb_item *item; 7630 unsigned long total_events; 7631 unsigned long total_dropped; 7632 unsigned long total_written; 7633 unsigned long total_alloc; 7634 unsigned long total_read = 0; 7635 unsigned long total_size = 0; 7636 unsigned long total_len = 0; 7637 unsigned long total_lost = 0; 7638 unsigned long lost; 7639 int big_event_size; 7640 int small_event_size; 7641 7642 ret = -1; 7643 7644 total_events = data->events + data->events_nested; 7645 total_written = data->bytes_written + data->bytes_written_nested; 7646 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 7647 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 7648 7649 big_event_size = data->max_size + data->max_size_nested; 7650 small_event_size = data->min_size + data->min_size_nested; 7651 7652 pr_info("CPU %d:\n", cpu); 7653 pr_info(" events: %ld\n", total_events); 7654 pr_info(" dropped bytes: %ld\n", total_dropped); 7655 pr_info(" alloced bytes: %ld\n", total_alloc); 7656 pr_info(" written bytes: %ld\n", total_written); 7657 pr_info(" biggest event: %d\n", big_event_size); 7658 pr_info(" smallest event: %d\n", small_event_size); 7659 7660 if (RB_WARN_ON(buffer, total_dropped)) 7661 break; 7662 7663 ret = 0; 7664 7665 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 7666 total_lost += lost; 7667 item = ring_buffer_event_data(event); 7668 total_len += ring_buffer_event_length(event); 7669 total_size += item->size + sizeof(struct rb_item); 7670 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 7671 pr_info("FAILED!\n"); 7672 pr_info("buffer had: %.*s\n", item->size, item->str); 7673 pr_info("expected: %.*s\n", item->size, rb_string); 7674 RB_WARN_ON(buffer, 1); 7675 ret = -1; 7676 break; 7677 } 7678 total_read++; 7679 } 7680 if (ret) 7681 break; 7682 7683 ret = -1; 7684 7685 pr_info(" read events: %ld\n", total_read); 7686 pr_info(" lost events: %ld\n", total_lost); 7687 pr_info(" total events: %ld\n", total_lost + total_read); 7688 pr_info(" recorded len bytes: %ld\n", total_len); 7689 pr_info(" recorded size bytes: %ld\n", total_size); 7690 if (total_lost) { 7691 pr_info(" With dropped events, record len and size may not match\n" 7692 " alloced and written from above\n"); 7693 } else { 7694 if (RB_WARN_ON(buffer, total_len != total_alloc || 7695 total_size != total_written)) 7696 break; 7697 } 7698 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 7699 break; 7700 7701 ret = 0; 7702 } 7703 if (!ret) 7704 pr_info("Ring buffer PASSED!\n"); 7705 7706 ring_buffer_free(buffer); 7707 return 0; 7708 } 7709 7710 late_initcall(test_ringbuffer); 7711 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 7712