1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <[email protected]> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/cacheflush.h> 13 #include <linux/trace_seq.h> 14 #include <linux/spinlock.h> 15 #include <linux/irq_work.h> 16 #include <linux/security.h> 17 #include <linux/uaccess.h> 18 #include <linux/hardirq.h> 19 #include <linux/kthread.h> /* for self test */ 20 #include <linux/module.h> 21 #include <linux/percpu.h> 22 #include <linux/mutex.h> 23 #include <linux/delay.h> 24 #include <linux/slab.h> 25 #include <linux/init.h> 26 #include <linux/hash.h> 27 #include <linux/list.h> 28 #include <linux/cpu.h> 29 #include <linux/oom.h> 30 #include <linux/mm.h> 31 32 #include <asm/local64.h> 33 #include <asm/local.h> 34 #include <asm/setup.h> 35 36 #include "trace.h" 37 38 /* 39 * The "absolute" timestamp in the buffer is only 59 bits. 40 * If a clock has the 5 MSBs set, it needs to be saved and 41 * reinserted. 42 */ 43 #define TS_MSB (0xf8ULL << 56) 44 #define ABS_TS_MASK (~TS_MSB) 45 46 static void update_pages_handler(struct work_struct *work); 47 48 #define RING_BUFFER_META_MAGIC 0xBADFEED 49 50 struct ring_buffer_meta { 51 int magic; 52 int struct_size; 53 unsigned long kaslr_addr; 54 unsigned long first_buffer; 55 unsigned long head_buffer; 56 unsigned long commit_buffer; 57 __u32 subbuf_size; 58 __u32 nr_subbufs; 59 int buffers[]; 60 }; 61 62 /* 63 * The ring buffer header is special. We must manually up keep it. 64 */ 65 int ring_buffer_print_entry_header(struct trace_seq *s) 66 { 67 trace_seq_puts(s, "# compressed entry header\n"); 68 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 69 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 70 trace_seq_puts(s, "\tarray : 32 bits\n"); 71 trace_seq_putc(s, '\n'); 72 trace_seq_printf(s, "\tpadding : type == %d\n", 73 RINGBUF_TYPE_PADDING); 74 trace_seq_printf(s, "\ttime_extend : type == %d\n", 75 RINGBUF_TYPE_TIME_EXTEND); 76 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 77 RINGBUF_TYPE_TIME_STAMP); 78 trace_seq_printf(s, "\tdata max type_len == %d\n", 79 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 80 81 return !trace_seq_has_overflowed(s); 82 } 83 84 /* 85 * The ring buffer is made up of a list of pages. A separate list of pages is 86 * allocated for each CPU. A writer may only write to a buffer that is 87 * associated with the CPU it is currently executing on. A reader may read 88 * from any per cpu buffer. 89 * 90 * The reader is special. For each per cpu buffer, the reader has its own 91 * reader page. When a reader has read the entire reader page, this reader 92 * page is swapped with another page in the ring buffer. 93 * 94 * Now, as long as the writer is off the reader page, the reader can do what 95 * ever it wants with that page. The writer will never write to that page 96 * again (as long as it is out of the ring buffer). 97 * 98 * Here's some silly ASCII art. 99 * 100 * +------+ 101 * |reader| RING BUFFER 102 * |page | 103 * +------+ +---+ +---+ +---+ 104 * | |-->| |-->| | 105 * +---+ +---+ +---+ 106 * ^ | 107 * | | 108 * +---------------+ 109 * 110 * 111 * +------+ 112 * |reader| RING BUFFER 113 * |page |------------------v 114 * +------+ +---+ +---+ +---+ 115 * | |-->| |-->| | 116 * +---+ +---+ +---+ 117 * ^ | 118 * | | 119 * +---------------+ 120 * 121 * 122 * +------+ 123 * |reader| RING BUFFER 124 * |page |------------------v 125 * +------+ +---+ +---+ +---+ 126 * ^ | |-->| |-->| | 127 * | +---+ +---+ +---+ 128 * | | 129 * | | 130 * +------------------------------+ 131 * 132 * 133 * +------+ 134 * |buffer| RING BUFFER 135 * |page |------------------v 136 * +------+ +---+ +---+ +---+ 137 * ^ | | | |-->| | 138 * | New +---+ +---+ +---+ 139 * | Reader------^ | 140 * | page | 141 * +------------------------------+ 142 * 143 * 144 * After we make this swap, the reader can hand this page off to the splice 145 * code and be done with it. It can even allocate a new page if it needs to 146 * and swap that into the ring buffer. 147 * 148 * We will be using cmpxchg soon to make all this lockless. 149 * 150 */ 151 152 /* Used for individual buffers (after the counter) */ 153 #define RB_BUFFER_OFF (1 << 20) 154 155 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 156 157 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 158 #define RB_ALIGNMENT 4U 159 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 160 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 161 162 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 163 # define RB_FORCE_8BYTE_ALIGNMENT 0 164 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 165 #else 166 # define RB_FORCE_8BYTE_ALIGNMENT 1 167 # define RB_ARCH_ALIGNMENT 8U 168 #endif 169 170 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 171 172 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 173 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 174 175 enum { 176 RB_LEN_TIME_EXTEND = 8, 177 RB_LEN_TIME_STAMP = 8, 178 }; 179 180 #define skip_time_extend(event) \ 181 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 182 183 #define extended_time(event) \ 184 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 185 186 static inline bool rb_null_event(struct ring_buffer_event *event) 187 { 188 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 189 } 190 191 static void rb_event_set_padding(struct ring_buffer_event *event) 192 { 193 /* padding has a NULL time_delta */ 194 event->type_len = RINGBUF_TYPE_PADDING; 195 event->time_delta = 0; 196 } 197 198 static unsigned 199 rb_event_data_length(struct ring_buffer_event *event) 200 { 201 unsigned length; 202 203 if (event->type_len) 204 length = event->type_len * RB_ALIGNMENT; 205 else 206 length = event->array[0]; 207 return length + RB_EVNT_HDR_SIZE; 208 } 209 210 /* 211 * Return the length of the given event. Will return 212 * the length of the time extend if the event is a 213 * time extend. 214 */ 215 static inline unsigned 216 rb_event_length(struct ring_buffer_event *event) 217 { 218 switch (event->type_len) { 219 case RINGBUF_TYPE_PADDING: 220 if (rb_null_event(event)) 221 /* undefined */ 222 return -1; 223 return event->array[0] + RB_EVNT_HDR_SIZE; 224 225 case RINGBUF_TYPE_TIME_EXTEND: 226 return RB_LEN_TIME_EXTEND; 227 228 case RINGBUF_TYPE_TIME_STAMP: 229 return RB_LEN_TIME_STAMP; 230 231 case RINGBUF_TYPE_DATA: 232 return rb_event_data_length(event); 233 default: 234 WARN_ON_ONCE(1); 235 } 236 /* not hit */ 237 return 0; 238 } 239 240 /* 241 * Return total length of time extend and data, 242 * or just the event length for all other events. 243 */ 244 static inline unsigned 245 rb_event_ts_length(struct ring_buffer_event *event) 246 { 247 unsigned len = 0; 248 249 if (extended_time(event)) { 250 /* time extends include the data event after it */ 251 len = RB_LEN_TIME_EXTEND; 252 event = skip_time_extend(event); 253 } 254 return len + rb_event_length(event); 255 } 256 257 /** 258 * ring_buffer_event_length - return the length of the event 259 * @event: the event to get the length of 260 * 261 * Returns the size of the data load of a data event. 262 * If the event is something other than a data event, it 263 * returns the size of the event itself. With the exception 264 * of a TIME EXTEND, where it still returns the size of the 265 * data load of the data event after it. 266 */ 267 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 268 { 269 unsigned length; 270 271 if (extended_time(event)) 272 event = skip_time_extend(event); 273 274 length = rb_event_length(event); 275 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 276 return length; 277 length -= RB_EVNT_HDR_SIZE; 278 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 279 length -= sizeof(event->array[0]); 280 return length; 281 } 282 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 283 284 /* inline for ring buffer fast paths */ 285 static __always_inline void * 286 rb_event_data(struct ring_buffer_event *event) 287 { 288 if (extended_time(event)) 289 event = skip_time_extend(event); 290 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 291 /* If length is in len field, then array[0] has the data */ 292 if (event->type_len) 293 return (void *)&event->array[0]; 294 /* Otherwise length is in array[0] and array[1] has the data */ 295 return (void *)&event->array[1]; 296 } 297 298 /** 299 * ring_buffer_event_data - return the data of the event 300 * @event: the event to get the data from 301 */ 302 void *ring_buffer_event_data(struct ring_buffer_event *event) 303 { 304 return rb_event_data(event); 305 } 306 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 307 308 #define for_each_buffer_cpu(buffer, cpu) \ 309 for_each_cpu(cpu, buffer->cpumask) 310 311 #define for_each_online_buffer_cpu(buffer, cpu) \ 312 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 313 314 #define TS_SHIFT 27 315 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 316 #define TS_DELTA_TEST (~TS_MASK) 317 318 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 319 { 320 u64 ts; 321 322 ts = event->array[0]; 323 ts <<= TS_SHIFT; 324 ts += event->time_delta; 325 326 return ts; 327 } 328 329 /* Flag when events were overwritten */ 330 #define RB_MISSED_EVENTS (1 << 31) 331 /* Missed count stored at end */ 332 #define RB_MISSED_STORED (1 << 30) 333 334 #define RB_MISSED_MASK (3 << 30) 335 336 struct buffer_data_page { 337 u64 time_stamp; /* page time stamp */ 338 local_t commit; /* write committed index */ 339 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 340 }; 341 342 struct buffer_data_read_page { 343 unsigned order; /* order of the page */ 344 struct buffer_data_page *data; /* actual data, stored in this page */ 345 }; 346 347 /* 348 * Note, the buffer_page list must be first. The buffer pages 349 * are allocated in cache lines, which means that each buffer 350 * page will be at the beginning of a cache line, and thus 351 * the least significant bits will be zero. We use this to 352 * add flags in the list struct pointers, to make the ring buffer 353 * lockless. 354 */ 355 struct buffer_page { 356 struct list_head list; /* list of buffer pages */ 357 local_t write; /* index for next write */ 358 unsigned read; /* index for next read */ 359 local_t entries; /* entries on this page */ 360 unsigned long real_end; /* real end of data */ 361 unsigned order; /* order of the page */ 362 u32 id:30; /* ID for external mapping */ 363 u32 range:1; /* Mapped via a range */ 364 struct buffer_data_page *page; /* Actual data page */ 365 }; 366 367 /* 368 * The buffer page counters, write and entries, must be reset 369 * atomically when crossing page boundaries. To synchronize this 370 * update, two counters are inserted into the number. One is 371 * the actual counter for the write position or count on the page. 372 * 373 * The other is a counter of updaters. Before an update happens 374 * the update partition of the counter is incremented. This will 375 * allow the updater to update the counter atomically. 376 * 377 * The counter is 20 bits, and the state data is 12. 378 */ 379 #define RB_WRITE_MASK 0xfffff 380 #define RB_WRITE_INTCNT (1 << 20) 381 382 static void rb_init_page(struct buffer_data_page *bpage) 383 { 384 local_set(&bpage->commit, 0); 385 } 386 387 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 388 { 389 return local_read(&bpage->page->commit); 390 } 391 392 static void free_buffer_page(struct buffer_page *bpage) 393 { 394 /* Range pages are not to be freed */ 395 if (!bpage->range) 396 free_pages((unsigned long)bpage->page, bpage->order); 397 kfree(bpage); 398 } 399 400 /* 401 * We need to fit the time_stamp delta into 27 bits. 402 */ 403 static inline bool test_time_stamp(u64 delta) 404 { 405 return !!(delta & TS_DELTA_TEST); 406 } 407 408 struct rb_irq_work { 409 struct irq_work work; 410 wait_queue_head_t waiters; 411 wait_queue_head_t full_waiters; 412 atomic_t seq; 413 bool waiters_pending; 414 bool full_waiters_pending; 415 bool wakeup_full; 416 }; 417 418 /* 419 * Structure to hold event state and handle nested events. 420 */ 421 struct rb_event_info { 422 u64 ts; 423 u64 delta; 424 u64 before; 425 u64 after; 426 unsigned long length; 427 struct buffer_page *tail_page; 428 int add_timestamp; 429 }; 430 431 /* 432 * Used for the add_timestamp 433 * NONE 434 * EXTEND - wants a time extend 435 * ABSOLUTE - the buffer requests all events to have absolute time stamps 436 * FORCE - force a full time stamp. 437 */ 438 enum { 439 RB_ADD_STAMP_NONE = 0, 440 RB_ADD_STAMP_EXTEND = BIT(1), 441 RB_ADD_STAMP_ABSOLUTE = BIT(2), 442 RB_ADD_STAMP_FORCE = BIT(3) 443 }; 444 /* 445 * Used for which event context the event is in. 446 * TRANSITION = 0 447 * NMI = 1 448 * IRQ = 2 449 * SOFTIRQ = 3 450 * NORMAL = 4 451 * 452 * See trace_recursive_lock() comment below for more details. 453 */ 454 enum { 455 RB_CTX_TRANSITION, 456 RB_CTX_NMI, 457 RB_CTX_IRQ, 458 RB_CTX_SOFTIRQ, 459 RB_CTX_NORMAL, 460 RB_CTX_MAX 461 }; 462 463 struct rb_time_struct { 464 local64_t time; 465 }; 466 typedef struct rb_time_struct rb_time_t; 467 468 #define MAX_NEST 5 469 470 /* 471 * head_page == tail_page && head == tail then buffer is empty. 472 */ 473 struct ring_buffer_per_cpu { 474 int cpu; 475 atomic_t record_disabled; 476 atomic_t resize_disabled; 477 struct trace_buffer *buffer; 478 raw_spinlock_t reader_lock; /* serialize readers */ 479 arch_spinlock_t lock; 480 struct lock_class_key lock_key; 481 struct buffer_data_page *free_page; 482 unsigned long nr_pages; 483 unsigned int current_context; 484 struct list_head *pages; 485 /* pages generation counter, incremented when the list changes */ 486 unsigned long cnt; 487 struct buffer_page *head_page; /* read from head */ 488 struct buffer_page *tail_page; /* write to tail */ 489 struct buffer_page *commit_page; /* committed pages */ 490 struct buffer_page *reader_page; 491 unsigned long lost_events; 492 unsigned long last_overrun; 493 unsigned long nest; 494 local_t entries_bytes; 495 local_t entries; 496 local_t overrun; 497 local_t commit_overrun; 498 local_t dropped_events; 499 local_t committing; 500 local_t commits; 501 local_t pages_touched; 502 local_t pages_lost; 503 local_t pages_read; 504 long last_pages_touch; 505 size_t shortest_full; 506 unsigned long read; 507 unsigned long read_bytes; 508 rb_time_t write_stamp; 509 rb_time_t before_stamp; 510 u64 event_stamp[MAX_NEST]; 511 u64 read_stamp; 512 /* pages removed since last reset */ 513 unsigned long pages_removed; 514 515 unsigned int mapped; 516 unsigned int user_mapped; /* user space mapping */ 517 struct mutex mapping_lock; 518 unsigned long *subbuf_ids; /* ID to subbuf VA */ 519 struct trace_buffer_meta *meta_page; 520 struct ring_buffer_meta *ring_meta; 521 522 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 523 long nr_pages_to_update; 524 struct list_head new_pages; /* new pages to add */ 525 struct work_struct update_pages_work; 526 struct completion update_done; 527 528 struct rb_irq_work irq_work; 529 }; 530 531 struct trace_buffer { 532 unsigned flags; 533 int cpus; 534 atomic_t record_disabled; 535 atomic_t resizing; 536 cpumask_var_t cpumask; 537 538 struct lock_class_key *reader_lock_key; 539 540 struct mutex mutex; 541 542 struct ring_buffer_per_cpu **buffers; 543 544 struct hlist_node node; 545 u64 (*clock)(void); 546 547 struct rb_irq_work irq_work; 548 bool time_stamp_abs; 549 550 unsigned long range_addr_start; 551 unsigned long range_addr_end; 552 553 unsigned long kaslr_addr; 554 555 unsigned int subbuf_size; 556 unsigned int subbuf_order; 557 unsigned int max_data_size; 558 }; 559 560 struct ring_buffer_iter { 561 struct ring_buffer_per_cpu *cpu_buffer; 562 unsigned long head; 563 unsigned long next_event; 564 struct buffer_page *head_page; 565 struct buffer_page *cache_reader_page; 566 unsigned long cache_read; 567 unsigned long cache_pages_removed; 568 u64 read_stamp; 569 u64 page_stamp; 570 struct ring_buffer_event *event; 571 size_t event_size; 572 int missed_events; 573 }; 574 575 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 576 { 577 struct buffer_data_page field; 578 579 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 580 "offset:0;\tsize:%u;\tsigned:%u;\n", 581 (unsigned int)sizeof(field.time_stamp), 582 (unsigned int)is_signed_type(u64)); 583 584 trace_seq_printf(s, "\tfield: local_t commit;\t" 585 "offset:%u;\tsize:%u;\tsigned:%u;\n", 586 (unsigned int)offsetof(typeof(field), commit), 587 (unsigned int)sizeof(field.commit), 588 (unsigned int)is_signed_type(long)); 589 590 trace_seq_printf(s, "\tfield: int overwrite;\t" 591 "offset:%u;\tsize:%u;\tsigned:%u;\n", 592 (unsigned int)offsetof(typeof(field), commit), 593 1, 594 (unsigned int)is_signed_type(long)); 595 596 trace_seq_printf(s, "\tfield: char data;\t" 597 "offset:%u;\tsize:%u;\tsigned:%u;\n", 598 (unsigned int)offsetof(typeof(field), data), 599 (unsigned int)buffer->subbuf_size, 600 (unsigned int)is_signed_type(char)); 601 602 return !trace_seq_has_overflowed(s); 603 } 604 605 static inline void rb_time_read(rb_time_t *t, u64 *ret) 606 { 607 *ret = local64_read(&t->time); 608 } 609 static void rb_time_set(rb_time_t *t, u64 val) 610 { 611 local64_set(&t->time, val); 612 } 613 614 /* 615 * Enable this to make sure that the event passed to 616 * ring_buffer_event_time_stamp() is not committed and also 617 * is on the buffer that it passed in. 618 */ 619 //#define RB_VERIFY_EVENT 620 #ifdef RB_VERIFY_EVENT 621 static struct list_head *rb_list_head(struct list_head *list); 622 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 623 void *event) 624 { 625 struct buffer_page *page = cpu_buffer->commit_page; 626 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 627 struct list_head *next; 628 long commit, write; 629 unsigned long addr = (unsigned long)event; 630 bool done = false; 631 int stop = 0; 632 633 /* Make sure the event exists and is not committed yet */ 634 do { 635 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 636 done = true; 637 commit = local_read(&page->page->commit); 638 write = local_read(&page->write); 639 if (addr >= (unsigned long)&page->page->data[commit] && 640 addr < (unsigned long)&page->page->data[write]) 641 return; 642 643 next = rb_list_head(page->list.next); 644 page = list_entry(next, struct buffer_page, list); 645 } while (!done); 646 WARN_ON_ONCE(1); 647 } 648 #else 649 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 650 void *event) 651 { 652 } 653 #endif 654 655 /* 656 * The absolute time stamp drops the 5 MSBs and some clocks may 657 * require them. The rb_fix_abs_ts() will take a previous full 658 * time stamp, and add the 5 MSB of that time stamp on to the 659 * saved absolute time stamp. Then they are compared in case of 660 * the unlikely event that the latest time stamp incremented 661 * the 5 MSB. 662 */ 663 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 664 { 665 if (save_ts & TS_MSB) { 666 abs |= save_ts & TS_MSB; 667 /* Check for overflow */ 668 if (unlikely(abs < save_ts)) 669 abs += 1ULL << 59; 670 } 671 return abs; 672 } 673 674 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 675 676 /** 677 * ring_buffer_event_time_stamp - return the event's current time stamp 678 * @buffer: The buffer that the event is on 679 * @event: the event to get the time stamp of 680 * 681 * Note, this must be called after @event is reserved, and before it is 682 * committed to the ring buffer. And must be called from the same 683 * context where the event was reserved (normal, softirq, irq, etc). 684 * 685 * Returns the time stamp associated with the current event. 686 * If the event has an extended time stamp, then that is used as 687 * the time stamp to return. 688 * In the highly unlikely case that the event was nested more than 689 * the max nesting, then the write_stamp of the buffer is returned, 690 * otherwise current time is returned, but that really neither of 691 * the last two cases should ever happen. 692 */ 693 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 694 struct ring_buffer_event *event) 695 { 696 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 697 unsigned int nest; 698 u64 ts; 699 700 /* If the event includes an absolute time, then just use that */ 701 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 702 ts = rb_event_time_stamp(event); 703 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 704 } 705 706 nest = local_read(&cpu_buffer->committing); 707 verify_event(cpu_buffer, event); 708 if (WARN_ON_ONCE(!nest)) 709 goto fail; 710 711 /* Read the current saved nesting level time stamp */ 712 if (likely(--nest < MAX_NEST)) 713 return cpu_buffer->event_stamp[nest]; 714 715 /* Shouldn't happen, warn if it does */ 716 WARN_ONCE(1, "nest (%d) greater than max", nest); 717 718 fail: 719 rb_time_read(&cpu_buffer->write_stamp, &ts); 720 721 return ts; 722 } 723 724 /** 725 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 726 * @buffer: The ring_buffer to get the number of pages from 727 * @cpu: The cpu of the ring_buffer to get the number of pages from 728 * 729 * Returns the number of pages that have content in the ring buffer. 730 */ 731 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 732 { 733 size_t read; 734 size_t lost; 735 size_t cnt; 736 737 read = local_read(&buffer->buffers[cpu]->pages_read); 738 lost = local_read(&buffer->buffers[cpu]->pages_lost); 739 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 740 741 if (WARN_ON_ONCE(cnt < lost)) 742 return 0; 743 744 cnt -= lost; 745 746 /* The reader can read an empty page, but not more than that */ 747 if (cnt < read) { 748 WARN_ON_ONCE(read > cnt + 1); 749 return 0; 750 } 751 752 return cnt - read; 753 } 754 755 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 756 { 757 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 758 size_t nr_pages; 759 size_t dirty; 760 761 nr_pages = cpu_buffer->nr_pages; 762 if (!nr_pages || !full) 763 return true; 764 765 /* 766 * Add one as dirty will never equal nr_pages, as the sub-buffer 767 * that the writer is on is not counted as dirty. 768 * This is needed if "buffer_percent" is set to 100. 769 */ 770 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 771 772 return (dirty * 100) >= (full * nr_pages); 773 } 774 775 /* 776 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 777 * 778 * Schedules a delayed work to wake up any task that is blocked on the 779 * ring buffer waiters queue. 780 */ 781 static void rb_wake_up_waiters(struct irq_work *work) 782 { 783 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 784 785 /* For waiters waiting for the first wake up */ 786 (void)atomic_fetch_inc_release(&rbwork->seq); 787 788 wake_up_all(&rbwork->waiters); 789 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 790 /* Only cpu_buffer sets the above flags */ 791 struct ring_buffer_per_cpu *cpu_buffer = 792 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 793 794 /* Called from interrupt context */ 795 raw_spin_lock(&cpu_buffer->reader_lock); 796 rbwork->wakeup_full = false; 797 rbwork->full_waiters_pending = false; 798 799 /* Waking up all waiters, they will reset the shortest full */ 800 cpu_buffer->shortest_full = 0; 801 raw_spin_unlock(&cpu_buffer->reader_lock); 802 803 wake_up_all(&rbwork->full_waiters); 804 } 805 } 806 807 /** 808 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 809 * @buffer: The ring buffer to wake waiters on 810 * @cpu: The CPU buffer to wake waiters on 811 * 812 * In the case of a file that represents a ring buffer is closing, 813 * it is prudent to wake up any waiters that are on this. 814 */ 815 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 816 { 817 struct ring_buffer_per_cpu *cpu_buffer; 818 struct rb_irq_work *rbwork; 819 820 if (!buffer) 821 return; 822 823 if (cpu == RING_BUFFER_ALL_CPUS) { 824 825 /* Wake up individual ones too. One level recursion */ 826 for_each_buffer_cpu(buffer, cpu) 827 ring_buffer_wake_waiters(buffer, cpu); 828 829 rbwork = &buffer->irq_work; 830 } else { 831 if (WARN_ON_ONCE(!buffer->buffers)) 832 return; 833 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 834 return; 835 836 cpu_buffer = buffer->buffers[cpu]; 837 /* The CPU buffer may not have been initialized yet */ 838 if (!cpu_buffer) 839 return; 840 rbwork = &cpu_buffer->irq_work; 841 } 842 843 /* This can be called in any context */ 844 irq_work_queue(&rbwork->work); 845 } 846 847 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 848 { 849 struct ring_buffer_per_cpu *cpu_buffer; 850 bool ret = false; 851 852 /* Reads of all CPUs always waits for any data */ 853 if (cpu == RING_BUFFER_ALL_CPUS) 854 return !ring_buffer_empty(buffer); 855 856 cpu_buffer = buffer->buffers[cpu]; 857 858 if (!ring_buffer_empty_cpu(buffer, cpu)) { 859 unsigned long flags; 860 bool pagebusy; 861 862 if (!full) 863 return true; 864 865 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 866 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 867 ret = !pagebusy && full_hit(buffer, cpu, full); 868 869 if (!ret && (!cpu_buffer->shortest_full || 870 cpu_buffer->shortest_full > full)) { 871 cpu_buffer->shortest_full = full; 872 } 873 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 874 } 875 return ret; 876 } 877 878 static inline bool 879 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 880 int cpu, int full, ring_buffer_cond_fn cond, void *data) 881 { 882 if (rb_watermark_hit(buffer, cpu, full)) 883 return true; 884 885 if (cond(data)) 886 return true; 887 888 /* 889 * The events can happen in critical sections where 890 * checking a work queue can cause deadlocks. 891 * After adding a task to the queue, this flag is set 892 * only to notify events to try to wake up the queue 893 * using irq_work. 894 * 895 * We don't clear it even if the buffer is no longer 896 * empty. The flag only causes the next event to run 897 * irq_work to do the work queue wake up. The worse 898 * that can happen if we race with !trace_empty() is that 899 * an event will cause an irq_work to try to wake up 900 * an empty queue. 901 * 902 * There's no reason to protect this flag either, as 903 * the work queue and irq_work logic will do the necessary 904 * synchronization for the wake ups. The only thing 905 * that is necessary is that the wake up happens after 906 * a task has been queued. It's OK for spurious wake ups. 907 */ 908 if (full) 909 rbwork->full_waiters_pending = true; 910 else 911 rbwork->waiters_pending = true; 912 913 return false; 914 } 915 916 struct rb_wait_data { 917 struct rb_irq_work *irq_work; 918 int seq; 919 }; 920 921 /* 922 * The default wait condition for ring_buffer_wait() is to just to exit the 923 * wait loop the first time it is woken up. 924 */ 925 static bool rb_wait_once(void *data) 926 { 927 struct rb_wait_data *rdata = data; 928 struct rb_irq_work *rbwork = rdata->irq_work; 929 930 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 931 } 932 933 /** 934 * ring_buffer_wait - wait for input to the ring buffer 935 * @buffer: buffer to wait on 936 * @cpu: the cpu buffer to wait on 937 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 938 * @cond: condition function to break out of wait (NULL to run once) 939 * @data: the data to pass to @cond. 940 * 941 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 942 * as data is added to any of the @buffer's cpu buffers. Otherwise 943 * it will wait for data to be added to a specific cpu buffer. 944 */ 945 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 946 ring_buffer_cond_fn cond, void *data) 947 { 948 struct ring_buffer_per_cpu *cpu_buffer; 949 struct wait_queue_head *waitq; 950 struct rb_irq_work *rbwork; 951 struct rb_wait_data rdata; 952 int ret = 0; 953 954 /* 955 * Depending on what the caller is waiting for, either any 956 * data in any cpu buffer, or a specific buffer, put the 957 * caller on the appropriate wait queue. 958 */ 959 if (cpu == RING_BUFFER_ALL_CPUS) { 960 rbwork = &buffer->irq_work; 961 /* Full only makes sense on per cpu reads */ 962 full = 0; 963 } else { 964 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 965 return -ENODEV; 966 cpu_buffer = buffer->buffers[cpu]; 967 rbwork = &cpu_buffer->irq_work; 968 } 969 970 if (full) 971 waitq = &rbwork->full_waiters; 972 else 973 waitq = &rbwork->waiters; 974 975 /* Set up to exit loop as soon as it is woken */ 976 if (!cond) { 977 cond = rb_wait_once; 978 rdata.irq_work = rbwork; 979 rdata.seq = atomic_read_acquire(&rbwork->seq); 980 data = &rdata; 981 } 982 983 ret = wait_event_interruptible((*waitq), 984 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 985 986 return ret; 987 } 988 989 /** 990 * ring_buffer_poll_wait - poll on buffer input 991 * @buffer: buffer to wait on 992 * @cpu: the cpu buffer to wait on 993 * @filp: the file descriptor 994 * @poll_table: The poll descriptor 995 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 996 * 997 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 998 * as data is added to any of the @buffer's cpu buffers. Otherwise 999 * it will wait for data to be added to a specific cpu buffer. 1000 * 1001 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1002 * zero otherwise. 1003 */ 1004 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1005 struct file *filp, poll_table *poll_table, int full) 1006 { 1007 struct ring_buffer_per_cpu *cpu_buffer; 1008 struct rb_irq_work *rbwork; 1009 1010 if (cpu == RING_BUFFER_ALL_CPUS) { 1011 rbwork = &buffer->irq_work; 1012 full = 0; 1013 } else { 1014 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1015 return EPOLLERR; 1016 1017 cpu_buffer = buffer->buffers[cpu]; 1018 rbwork = &cpu_buffer->irq_work; 1019 } 1020 1021 if (full) { 1022 poll_wait(filp, &rbwork->full_waiters, poll_table); 1023 1024 if (rb_watermark_hit(buffer, cpu, full)) 1025 return EPOLLIN | EPOLLRDNORM; 1026 /* 1027 * Only allow full_waiters_pending update to be seen after 1028 * the shortest_full is set (in rb_watermark_hit). If the 1029 * writer sees the full_waiters_pending flag set, it will 1030 * compare the amount in the ring buffer to shortest_full. 1031 * If the amount in the ring buffer is greater than the 1032 * shortest_full percent, it will call the irq_work handler 1033 * to wake up this list. The irq_handler will reset shortest_full 1034 * back to zero. That's done under the reader_lock, but 1035 * the below smp_mb() makes sure that the update to 1036 * full_waiters_pending doesn't leak up into the above. 1037 */ 1038 smp_mb(); 1039 rbwork->full_waiters_pending = true; 1040 return 0; 1041 } 1042 1043 poll_wait(filp, &rbwork->waiters, poll_table); 1044 rbwork->waiters_pending = true; 1045 1046 /* 1047 * There's a tight race between setting the waiters_pending and 1048 * checking if the ring buffer is empty. Once the waiters_pending bit 1049 * is set, the next event will wake the task up, but we can get stuck 1050 * if there's only a single event in. 1051 * 1052 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1053 * but adding a memory barrier to all events will cause too much of a 1054 * performance hit in the fast path. We only need a memory barrier when 1055 * the buffer goes from empty to having content. But as this race is 1056 * extremely small, and it's not a problem if another event comes in, we 1057 * will fix it later. 1058 */ 1059 smp_mb(); 1060 1061 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1062 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1063 return EPOLLIN | EPOLLRDNORM; 1064 return 0; 1065 } 1066 1067 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1068 #define RB_WARN_ON(b, cond) \ 1069 ({ \ 1070 int _____ret = unlikely(cond); \ 1071 if (_____ret) { \ 1072 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1073 struct ring_buffer_per_cpu *__b = \ 1074 (void *)b; \ 1075 atomic_inc(&__b->buffer->record_disabled); \ 1076 } else \ 1077 atomic_inc(&b->record_disabled); \ 1078 WARN_ON(1); \ 1079 } \ 1080 _____ret; \ 1081 }) 1082 1083 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1084 #define DEBUG_SHIFT 0 1085 1086 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1087 { 1088 u64 ts; 1089 1090 /* Skip retpolines :-( */ 1091 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1092 ts = trace_clock_local(); 1093 else 1094 ts = buffer->clock(); 1095 1096 /* shift to debug/test normalization and TIME_EXTENTS */ 1097 return ts << DEBUG_SHIFT; 1098 } 1099 1100 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1101 { 1102 u64 time; 1103 1104 preempt_disable_notrace(); 1105 time = rb_time_stamp(buffer); 1106 preempt_enable_notrace(); 1107 1108 return time; 1109 } 1110 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1111 1112 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1113 int cpu, u64 *ts) 1114 { 1115 /* Just stupid testing the normalize function and deltas */ 1116 *ts >>= DEBUG_SHIFT; 1117 } 1118 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1119 1120 /* 1121 * Making the ring buffer lockless makes things tricky. 1122 * Although writes only happen on the CPU that they are on, 1123 * and they only need to worry about interrupts. Reads can 1124 * happen on any CPU. 1125 * 1126 * The reader page is always off the ring buffer, but when the 1127 * reader finishes with a page, it needs to swap its page with 1128 * a new one from the buffer. The reader needs to take from 1129 * the head (writes go to the tail). But if a writer is in overwrite 1130 * mode and wraps, it must push the head page forward. 1131 * 1132 * Here lies the problem. 1133 * 1134 * The reader must be careful to replace only the head page, and 1135 * not another one. As described at the top of the file in the 1136 * ASCII art, the reader sets its old page to point to the next 1137 * page after head. It then sets the page after head to point to 1138 * the old reader page. But if the writer moves the head page 1139 * during this operation, the reader could end up with the tail. 1140 * 1141 * We use cmpxchg to help prevent this race. We also do something 1142 * special with the page before head. We set the LSB to 1. 1143 * 1144 * When the writer must push the page forward, it will clear the 1145 * bit that points to the head page, move the head, and then set 1146 * the bit that points to the new head page. 1147 * 1148 * We also don't want an interrupt coming in and moving the head 1149 * page on another writer. Thus we use the second LSB to catch 1150 * that too. Thus: 1151 * 1152 * head->list->prev->next bit 1 bit 0 1153 * ------- ------- 1154 * Normal page 0 0 1155 * Points to head page 0 1 1156 * New head page 1 0 1157 * 1158 * Note we can not trust the prev pointer of the head page, because: 1159 * 1160 * +----+ +-----+ +-----+ 1161 * | |------>| T |---X--->| N | 1162 * | |<------| | | | 1163 * +----+ +-----+ +-----+ 1164 * ^ ^ | 1165 * | +-----+ | | 1166 * +----------| R |----------+ | 1167 * | |<-----------+ 1168 * +-----+ 1169 * 1170 * Key: ---X--> HEAD flag set in pointer 1171 * T Tail page 1172 * R Reader page 1173 * N Next page 1174 * 1175 * (see __rb_reserve_next() to see where this happens) 1176 * 1177 * What the above shows is that the reader just swapped out 1178 * the reader page with a page in the buffer, but before it 1179 * could make the new header point back to the new page added 1180 * it was preempted by a writer. The writer moved forward onto 1181 * the new page added by the reader and is about to move forward 1182 * again. 1183 * 1184 * You can see, it is legitimate for the previous pointer of 1185 * the head (or any page) not to point back to itself. But only 1186 * temporarily. 1187 */ 1188 1189 #define RB_PAGE_NORMAL 0UL 1190 #define RB_PAGE_HEAD 1UL 1191 #define RB_PAGE_UPDATE 2UL 1192 1193 1194 #define RB_FLAG_MASK 3UL 1195 1196 /* PAGE_MOVED is not part of the mask */ 1197 #define RB_PAGE_MOVED 4UL 1198 1199 /* 1200 * rb_list_head - remove any bit 1201 */ 1202 static struct list_head *rb_list_head(struct list_head *list) 1203 { 1204 unsigned long val = (unsigned long)list; 1205 1206 return (struct list_head *)(val & ~RB_FLAG_MASK); 1207 } 1208 1209 /* 1210 * rb_is_head_page - test if the given page is the head page 1211 * 1212 * Because the reader may move the head_page pointer, we can 1213 * not trust what the head page is (it may be pointing to 1214 * the reader page). But if the next page is a header page, 1215 * its flags will be non zero. 1216 */ 1217 static inline int 1218 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1219 { 1220 unsigned long val; 1221 1222 val = (unsigned long)list->next; 1223 1224 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1225 return RB_PAGE_MOVED; 1226 1227 return val & RB_FLAG_MASK; 1228 } 1229 1230 /* 1231 * rb_is_reader_page 1232 * 1233 * The unique thing about the reader page, is that, if the 1234 * writer is ever on it, the previous pointer never points 1235 * back to the reader page. 1236 */ 1237 static bool rb_is_reader_page(struct buffer_page *page) 1238 { 1239 struct list_head *list = page->list.prev; 1240 1241 return rb_list_head(list->next) != &page->list; 1242 } 1243 1244 /* 1245 * rb_set_list_to_head - set a list_head to be pointing to head. 1246 */ 1247 static void rb_set_list_to_head(struct list_head *list) 1248 { 1249 unsigned long *ptr; 1250 1251 ptr = (unsigned long *)&list->next; 1252 *ptr |= RB_PAGE_HEAD; 1253 *ptr &= ~RB_PAGE_UPDATE; 1254 } 1255 1256 /* 1257 * rb_head_page_activate - sets up head page 1258 */ 1259 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1260 { 1261 struct buffer_page *head; 1262 1263 head = cpu_buffer->head_page; 1264 if (!head) 1265 return; 1266 1267 /* 1268 * Set the previous list pointer to have the HEAD flag. 1269 */ 1270 rb_set_list_to_head(head->list.prev); 1271 1272 if (cpu_buffer->ring_meta) { 1273 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 1274 meta->head_buffer = (unsigned long)head->page; 1275 } 1276 } 1277 1278 static void rb_list_head_clear(struct list_head *list) 1279 { 1280 unsigned long *ptr = (unsigned long *)&list->next; 1281 1282 *ptr &= ~RB_FLAG_MASK; 1283 } 1284 1285 /* 1286 * rb_head_page_deactivate - clears head page ptr (for free list) 1287 */ 1288 static void 1289 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1290 { 1291 struct list_head *hd; 1292 1293 /* Go through the whole list and clear any pointers found. */ 1294 rb_list_head_clear(cpu_buffer->pages); 1295 1296 list_for_each(hd, cpu_buffer->pages) 1297 rb_list_head_clear(hd); 1298 } 1299 1300 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1301 struct buffer_page *head, 1302 struct buffer_page *prev, 1303 int old_flag, int new_flag) 1304 { 1305 struct list_head *list; 1306 unsigned long val = (unsigned long)&head->list; 1307 unsigned long ret; 1308 1309 list = &prev->list; 1310 1311 val &= ~RB_FLAG_MASK; 1312 1313 ret = cmpxchg((unsigned long *)&list->next, 1314 val | old_flag, val | new_flag); 1315 1316 /* check if the reader took the page */ 1317 if ((ret & ~RB_FLAG_MASK) != val) 1318 return RB_PAGE_MOVED; 1319 1320 return ret & RB_FLAG_MASK; 1321 } 1322 1323 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1324 struct buffer_page *head, 1325 struct buffer_page *prev, 1326 int old_flag) 1327 { 1328 return rb_head_page_set(cpu_buffer, head, prev, 1329 old_flag, RB_PAGE_UPDATE); 1330 } 1331 1332 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1333 struct buffer_page *head, 1334 struct buffer_page *prev, 1335 int old_flag) 1336 { 1337 return rb_head_page_set(cpu_buffer, head, prev, 1338 old_flag, RB_PAGE_HEAD); 1339 } 1340 1341 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1342 struct buffer_page *head, 1343 struct buffer_page *prev, 1344 int old_flag) 1345 { 1346 return rb_head_page_set(cpu_buffer, head, prev, 1347 old_flag, RB_PAGE_NORMAL); 1348 } 1349 1350 static inline void rb_inc_page(struct buffer_page **bpage) 1351 { 1352 struct list_head *p = rb_list_head((*bpage)->list.next); 1353 1354 *bpage = list_entry(p, struct buffer_page, list); 1355 } 1356 1357 static struct buffer_page * 1358 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1359 { 1360 struct buffer_page *head; 1361 struct buffer_page *page; 1362 struct list_head *list; 1363 int i; 1364 1365 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1366 return NULL; 1367 1368 /* sanity check */ 1369 list = cpu_buffer->pages; 1370 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1371 return NULL; 1372 1373 page = head = cpu_buffer->head_page; 1374 /* 1375 * It is possible that the writer moves the header behind 1376 * where we started, and we miss in one loop. 1377 * A second loop should grab the header, but we'll do 1378 * three loops just because I'm paranoid. 1379 */ 1380 for (i = 0; i < 3; i++) { 1381 do { 1382 if (rb_is_head_page(page, page->list.prev)) { 1383 cpu_buffer->head_page = page; 1384 return page; 1385 } 1386 rb_inc_page(&page); 1387 } while (page != head); 1388 } 1389 1390 RB_WARN_ON(cpu_buffer, 1); 1391 1392 return NULL; 1393 } 1394 1395 static bool rb_head_page_replace(struct buffer_page *old, 1396 struct buffer_page *new) 1397 { 1398 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1399 unsigned long val; 1400 1401 val = *ptr & ~RB_FLAG_MASK; 1402 val |= RB_PAGE_HEAD; 1403 1404 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1405 } 1406 1407 /* 1408 * rb_tail_page_update - move the tail page forward 1409 */ 1410 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1411 struct buffer_page *tail_page, 1412 struct buffer_page *next_page) 1413 { 1414 unsigned long old_entries; 1415 unsigned long old_write; 1416 1417 /* 1418 * The tail page now needs to be moved forward. 1419 * 1420 * We need to reset the tail page, but without messing 1421 * with possible erasing of data brought in by interrupts 1422 * that have moved the tail page and are currently on it. 1423 * 1424 * We add a counter to the write field to denote this. 1425 */ 1426 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1427 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1428 1429 /* 1430 * Just make sure we have seen our old_write and synchronize 1431 * with any interrupts that come in. 1432 */ 1433 barrier(); 1434 1435 /* 1436 * If the tail page is still the same as what we think 1437 * it is, then it is up to us to update the tail 1438 * pointer. 1439 */ 1440 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1441 /* Zero the write counter */ 1442 unsigned long val = old_write & ~RB_WRITE_MASK; 1443 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1444 1445 /* 1446 * This will only succeed if an interrupt did 1447 * not come in and change it. In which case, we 1448 * do not want to modify it. 1449 * 1450 * We add (void) to let the compiler know that we do not care 1451 * about the return value of these functions. We use the 1452 * cmpxchg to only update if an interrupt did not already 1453 * do it for us. If the cmpxchg fails, we don't care. 1454 */ 1455 (void)local_cmpxchg(&next_page->write, old_write, val); 1456 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1457 1458 /* 1459 * No need to worry about races with clearing out the commit. 1460 * it only can increment when a commit takes place. But that 1461 * only happens in the outer most nested commit. 1462 */ 1463 local_set(&next_page->page->commit, 0); 1464 1465 /* Either we update tail_page or an interrupt does */ 1466 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1467 local_inc(&cpu_buffer->pages_touched); 1468 } 1469 } 1470 1471 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1472 struct buffer_page *bpage) 1473 { 1474 unsigned long val = (unsigned long)bpage; 1475 1476 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1477 } 1478 1479 static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer, 1480 struct list_head *list) 1481 { 1482 if (RB_WARN_ON(cpu_buffer, 1483 rb_list_head(rb_list_head(list->next)->prev) != list)) 1484 return false; 1485 1486 if (RB_WARN_ON(cpu_buffer, 1487 rb_list_head(rb_list_head(list->prev)->next) != list)) 1488 return false; 1489 1490 return true; 1491 } 1492 1493 /** 1494 * rb_check_pages - integrity check of buffer pages 1495 * @cpu_buffer: CPU buffer with pages to test 1496 * 1497 * As a safety measure we check to make sure the data pages have not 1498 * been corrupted. 1499 */ 1500 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1501 { 1502 struct list_head *head, *tmp; 1503 unsigned long buffer_cnt; 1504 unsigned long flags; 1505 int nr_loops = 0; 1506 1507 /* 1508 * Walk the linked list underpinning the ring buffer and validate all 1509 * its next and prev links. 1510 * 1511 * The check acquires the reader_lock to avoid concurrent processing 1512 * with code that could be modifying the list. However, the lock cannot 1513 * be held for the entire duration of the walk, as this would make the 1514 * time when interrupts are disabled non-deterministic, dependent on the 1515 * ring buffer size. Therefore, the code releases and re-acquires the 1516 * lock after checking each page. The ring_buffer_per_cpu.cnt variable 1517 * is then used to detect if the list was modified while the lock was 1518 * not held, in which case the check needs to be restarted. 1519 * 1520 * The code attempts to perform the check at most three times before 1521 * giving up. This is acceptable because this is only a self-validation 1522 * to detect problems early on. In practice, the list modification 1523 * operations are fairly spaced, and so this check typically succeeds at 1524 * most on the second try. 1525 */ 1526 again: 1527 if (++nr_loops > 3) 1528 return; 1529 1530 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1531 head = rb_list_head(cpu_buffer->pages); 1532 if (!rb_check_links(cpu_buffer, head)) 1533 goto out_locked; 1534 buffer_cnt = cpu_buffer->cnt; 1535 tmp = head; 1536 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1537 1538 while (true) { 1539 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1540 1541 if (buffer_cnt != cpu_buffer->cnt) { 1542 /* The list was updated, try again. */ 1543 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1544 goto again; 1545 } 1546 1547 tmp = rb_list_head(tmp->next); 1548 if (tmp == head) 1549 /* The iteration circled back, all is done. */ 1550 goto out_locked; 1551 1552 if (!rb_check_links(cpu_buffer, tmp)) 1553 goto out_locked; 1554 1555 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1556 } 1557 1558 out_locked: 1559 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1560 } 1561 1562 /* 1563 * Take an address, add the meta data size as well as the array of 1564 * array subbuffer indexes, then align it to a subbuffer size. 1565 * 1566 * This is used to help find the next per cpu subbuffer within a mapped range. 1567 */ 1568 static unsigned long 1569 rb_range_align_subbuf(unsigned long addr, int subbuf_size, int nr_subbufs) 1570 { 1571 addr += sizeof(struct ring_buffer_meta) + 1572 sizeof(int) * nr_subbufs; 1573 return ALIGN(addr, subbuf_size); 1574 } 1575 1576 /* 1577 * Return the ring_buffer_meta for a given @cpu. 1578 */ 1579 static void *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu) 1580 { 1581 int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 1582 unsigned long ptr = buffer->range_addr_start; 1583 struct ring_buffer_meta *meta; 1584 int nr_subbufs; 1585 1586 if (!ptr) 1587 return NULL; 1588 1589 /* When nr_pages passed in is zero, the first meta has already been initialized */ 1590 if (!nr_pages) { 1591 meta = (struct ring_buffer_meta *)ptr; 1592 nr_subbufs = meta->nr_subbufs; 1593 } else { 1594 meta = NULL; 1595 /* Include the reader page */ 1596 nr_subbufs = nr_pages + 1; 1597 } 1598 1599 /* 1600 * The first chunk may not be subbuffer aligned, where as 1601 * the rest of the chunks are. 1602 */ 1603 if (cpu) { 1604 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1605 ptr += subbuf_size * nr_subbufs; 1606 1607 /* We can use multiplication to find chunks greater than 1 */ 1608 if (cpu > 1) { 1609 unsigned long size; 1610 unsigned long p; 1611 1612 /* Save the beginning of this CPU chunk */ 1613 p = ptr; 1614 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1615 ptr += subbuf_size * nr_subbufs; 1616 1617 /* Now all chunks after this are the same size */ 1618 size = ptr - p; 1619 ptr += size * (cpu - 2); 1620 } 1621 } 1622 return (void *)ptr; 1623 } 1624 1625 /* Return the start of subbufs given the meta pointer */ 1626 static void *rb_subbufs_from_meta(struct ring_buffer_meta *meta) 1627 { 1628 int subbuf_size = meta->subbuf_size; 1629 unsigned long ptr; 1630 1631 ptr = (unsigned long)meta; 1632 ptr = rb_range_align_subbuf(ptr, subbuf_size, meta->nr_subbufs); 1633 1634 return (void *)ptr; 1635 } 1636 1637 /* 1638 * Return a specific sub-buffer for a given @cpu defined by @idx. 1639 */ 1640 static void *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx) 1641 { 1642 struct ring_buffer_meta *meta; 1643 unsigned long ptr; 1644 int subbuf_size; 1645 1646 meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu); 1647 if (!meta) 1648 return NULL; 1649 1650 if (WARN_ON_ONCE(idx >= meta->nr_subbufs)) 1651 return NULL; 1652 1653 subbuf_size = meta->subbuf_size; 1654 1655 /* Map this buffer to the order that's in meta->buffers[] */ 1656 idx = meta->buffers[idx]; 1657 1658 ptr = (unsigned long)rb_subbufs_from_meta(meta); 1659 1660 ptr += subbuf_size * idx; 1661 if (ptr + subbuf_size > cpu_buffer->buffer->range_addr_end) 1662 return NULL; 1663 1664 return (void *)ptr; 1665 } 1666 1667 /* 1668 * See if the existing memory contains valid ring buffer data. 1669 * As the previous kernel must be the same as this kernel, all 1670 * the calculations (size of buffers and number of buffers) 1671 * must be the same. 1672 */ 1673 static bool rb_meta_valid(struct ring_buffer_meta *meta, int cpu, 1674 struct trace_buffer *buffer, int nr_pages, 1675 unsigned long *subbuf_mask) 1676 { 1677 int subbuf_size = PAGE_SIZE; 1678 struct buffer_data_page *subbuf; 1679 unsigned long buffers_start; 1680 unsigned long buffers_end; 1681 int i; 1682 1683 if (!subbuf_mask) 1684 return false; 1685 1686 /* Check the meta magic and meta struct size */ 1687 if (meta->magic != RING_BUFFER_META_MAGIC || 1688 meta->struct_size != sizeof(*meta)) { 1689 pr_info("Ring buffer boot meta[%d] mismatch of magic or struct size\n", cpu); 1690 return false; 1691 } 1692 1693 /* The subbuffer's size and number of subbuffers must match */ 1694 if (meta->subbuf_size != subbuf_size || 1695 meta->nr_subbufs != nr_pages + 1) { 1696 pr_info("Ring buffer boot meta [%d] mismatch of subbuf_size/nr_pages\n", cpu); 1697 return false; 1698 } 1699 1700 buffers_start = meta->first_buffer; 1701 buffers_end = meta->first_buffer + (subbuf_size * meta->nr_subbufs); 1702 1703 /* Is the head and commit buffers within the range of buffers? */ 1704 if (meta->head_buffer < buffers_start || 1705 meta->head_buffer >= buffers_end) { 1706 pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu); 1707 return false; 1708 } 1709 1710 if (meta->commit_buffer < buffers_start || 1711 meta->commit_buffer >= buffers_end) { 1712 pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu); 1713 return false; 1714 } 1715 1716 subbuf = rb_subbufs_from_meta(meta); 1717 1718 bitmap_clear(subbuf_mask, 0, meta->nr_subbufs); 1719 1720 /* Is the meta buffers and the subbufs themselves have correct data? */ 1721 for (i = 0; i < meta->nr_subbufs; i++) { 1722 if (meta->buffers[i] < 0 || 1723 meta->buffers[i] >= meta->nr_subbufs) { 1724 pr_info("Ring buffer boot meta [%d] array out of range\n", cpu); 1725 return false; 1726 } 1727 1728 if ((unsigned)local_read(&subbuf->commit) > subbuf_size) { 1729 pr_info("Ring buffer boot meta [%d] buffer invalid commit\n", cpu); 1730 return false; 1731 } 1732 1733 if (test_bit(meta->buffers[i], subbuf_mask)) { 1734 pr_info("Ring buffer boot meta [%d] array has duplicates\n", cpu); 1735 return false; 1736 } 1737 1738 set_bit(meta->buffers[i], subbuf_mask); 1739 subbuf = (void *)subbuf + subbuf_size; 1740 } 1741 1742 return true; 1743 } 1744 1745 static int rb_meta_subbuf_idx(struct ring_buffer_meta *meta, void *subbuf); 1746 1747 static int rb_read_data_buffer(struct buffer_data_page *dpage, int tail, int cpu, 1748 unsigned long long *timestamp, u64 *delta_ptr) 1749 { 1750 struct ring_buffer_event *event; 1751 u64 ts, delta; 1752 int events = 0; 1753 int e; 1754 1755 *delta_ptr = 0; 1756 *timestamp = 0; 1757 1758 ts = dpage->time_stamp; 1759 1760 for (e = 0; e < tail; e += rb_event_length(event)) { 1761 1762 event = (struct ring_buffer_event *)(dpage->data + e); 1763 1764 switch (event->type_len) { 1765 1766 case RINGBUF_TYPE_TIME_EXTEND: 1767 delta = rb_event_time_stamp(event); 1768 ts += delta; 1769 break; 1770 1771 case RINGBUF_TYPE_TIME_STAMP: 1772 delta = rb_event_time_stamp(event); 1773 delta = rb_fix_abs_ts(delta, ts); 1774 if (delta < ts) { 1775 *delta_ptr = delta; 1776 *timestamp = ts; 1777 return -1; 1778 } 1779 ts = delta; 1780 break; 1781 1782 case RINGBUF_TYPE_PADDING: 1783 if (event->time_delta == 1) 1784 break; 1785 fallthrough; 1786 case RINGBUF_TYPE_DATA: 1787 events++; 1788 ts += event->time_delta; 1789 break; 1790 1791 default: 1792 return -1; 1793 } 1794 } 1795 *timestamp = ts; 1796 return events; 1797 } 1798 1799 static int rb_validate_buffer(struct buffer_data_page *dpage, int cpu) 1800 { 1801 unsigned long long ts; 1802 u64 delta; 1803 int tail; 1804 1805 tail = local_read(&dpage->commit); 1806 return rb_read_data_buffer(dpage, tail, cpu, &ts, &delta); 1807 } 1808 1809 /* If the meta data has been validated, now validate the events */ 1810 static void rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer) 1811 { 1812 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 1813 struct buffer_page *head_page; 1814 unsigned long entry_bytes = 0; 1815 unsigned long entries = 0; 1816 int ret; 1817 int i; 1818 1819 if (!meta || !meta->head_buffer) 1820 return; 1821 1822 /* Do the reader page first */ 1823 ret = rb_validate_buffer(cpu_buffer->reader_page->page, cpu_buffer->cpu); 1824 if (ret < 0) { 1825 pr_info("Ring buffer reader page is invalid\n"); 1826 goto invalid; 1827 } 1828 entries += ret; 1829 entry_bytes += local_read(&cpu_buffer->reader_page->page->commit); 1830 local_set(&cpu_buffer->reader_page->entries, ret); 1831 1832 head_page = cpu_buffer->head_page; 1833 1834 /* If both the head and commit are on the reader_page then we are done. */ 1835 if (head_page == cpu_buffer->reader_page && 1836 head_page == cpu_buffer->commit_page) 1837 goto done; 1838 1839 /* Iterate until finding the commit page */ 1840 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) { 1841 1842 /* Reader page has already been done */ 1843 if (head_page == cpu_buffer->reader_page) 1844 continue; 1845 1846 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 1847 if (ret < 0) { 1848 pr_info("Ring buffer meta [%d] invalid buffer page\n", 1849 cpu_buffer->cpu); 1850 goto invalid; 1851 } 1852 1853 /* If the buffer has content, update pages_touched */ 1854 if (ret) 1855 local_inc(&cpu_buffer->pages_touched); 1856 1857 entries += ret; 1858 entry_bytes += local_read(&head_page->page->commit); 1859 local_set(&cpu_buffer->head_page->entries, ret); 1860 1861 if (head_page == cpu_buffer->commit_page) 1862 break; 1863 } 1864 1865 if (head_page != cpu_buffer->commit_page) { 1866 pr_info("Ring buffer meta [%d] commit page not found\n", 1867 cpu_buffer->cpu); 1868 goto invalid; 1869 } 1870 done: 1871 local_set(&cpu_buffer->entries, entries); 1872 local_set(&cpu_buffer->entries_bytes, entry_bytes); 1873 1874 pr_info("Ring buffer meta [%d] is from previous boot!\n", cpu_buffer->cpu); 1875 return; 1876 1877 invalid: 1878 /* The content of the buffers are invalid, reset the meta data */ 1879 meta->head_buffer = 0; 1880 meta->commit_buffer = 0; 1881 1882 /* Reset the reader page */ 1883 local_set(&cpu_buffer->reader_page->entries, 0); 1884 local_set(&cpu_buffer->reader_page->page->commit, 0); 1885 1886 /* Reset all the subbuffers */ 1887 for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) { 1888 local_set(&head_page->entries, 0); 1889 local_set(&head_page->page->commit, 0); 1890 } 1891 } 1892 1893 static void rb_meta_init_text_addr(struct ring_buffer_meta *meta) 1894 { 1895 #ifdef CONFIG_RANDOMIZE_BASE 1896 meta->kaslr_addr = kaslr_offset(); 1897 #else 1898 meta->kaslr_addr = 0; 1899 #endif 1900 } 1901 1902 static void rb_range_meta_init(struct trace_buffer *buffer, int nr_pages) 1903 { 1904 struct ring_buffer_meta *meta; 1905 unsigned long *subbuf_mask; 1906 unsigned long delta; 1907 void *subbuf; 1908 int cpu; 1909 int i; 1910 1911 /* Create a mask to test the subbuf array */ 1912 subbuf_mask = bitmap_alloc(nr_pages + 1, GFP_KERNEL); 1913 /* If subbuf_mask fails to allocate, then rb_meta_valid() will return false */ 1914 1915 for (cpu = 0; cpu < nr_cpu_ids; cpu++) { 1916 void *next_meta; 1917 1918 meta = rb_range_meta(buffer, nr_pages, cpu); 1919 1920 if (rb_meta_valid(meta, cpu, buffer, nr_pages, subbuf_mask)) { 1921 /* Make the mappings match the current address */ 1922 subbuf = rb_subbufs_from_meta(meta); 1923 delta = (unsigned long)subbuf - meta->first_buffer; 1924 meta->first_buffer += delta; 1925 meta->head_buffer += delta; 1926 meta->commit_buffer += delta; 1927 buffer->kaslr_addr = meta->kaslr_addr; 1928 continue; 1929 } 1930 1931 if (cpu < nr_cpu_ids - 1) 1932 next_meta = rb_range_meta(buffer, nr_pages, cpu + 1); 1933 else 1934 next_meta = (void *)buffer->range_addr_end; 1935 1936 memset(meta, 0, next_meta - (void *)meta); 1937 1938 meta->magic = RING_BUFFER_META_MAGIC; 1939 meta->struct_size = sizeof(*meta); 1940 1941 meta->nr_subbufs = nr_pages + 1; 1942 meta->subbuf_size = PAGE_SIZE; 1943 1944 subbuf = rb_subbufs_from_meta(meta); 1945 1946 meta->first_buffer = (unsigned long)subbuf; 1947 rb_meta_init_text_addr(meta); 1948 1949 /* 1950 * The buffers[] array holds the order of the sub-buffers 1951 * that are after the meta data. The sub-buffers may 1952 * be swapped out when read and inserted into a different 1953 * location of the ring buffer. Although their addresses 1954 * remain the same, the buffers[] array contains the 1955 * index into the sub-buffers holding their actual order. 1956 */ 1957 for (i = 0; i < meta->nr_subbufs; i++) { 1958 meta->buffers[i] = i; 1959 rb_init_page(subbuf); 1960 subbuf += meta->subbuf_size; 1961 } 1962 } 1963 bitmap_free(subbuf_mask); 1964 } 1965 1966 static void *rbm_start(struct seq_file *m, loff_t *pos) 1967 { 1968 struct ring_buffer_per_cpu *cpu_buffer = m->private; 1969 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 1970 unsigned long val; 1971 1972 if (!meta) 1973 return NULL; 1974 1975 if (*pos > meta->nr_subbufs) 1976 return NULL; 1977 1978 val = *pos; 1979 val++; 1980 1981 return (void *)val; 1982 } 1983 1984 static void *rbm_next(struct seq_file *m, void *v, loff_t *pos) 1985 { 1986 (*pos)++; 1987 1988 return rbm_start(m, pos); 1989 } 1990 1991 static int rbm_show(struct seq_file *m, void *v) 1992 { 1993 struct ring_buffer_per_cpu *cpu_buffer = m->private; 1994 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 1995 unsigned long val = (unsigned long)v; 1996 1997 if (val == 1) { 1998 seq_printf(m, "head_buffer: %d\n", 1999 rb_meta_subbuf_idx(meta, (void *)meta->head_buffer)); 2000 seq_printf(m, "commit_buffer: %d\n", 2001 rb_meta_subbuf_idx(meta, (void *)meta->commit_buffer)); 2002 seq_printf(m, "subbuf_size: %d\n", meta->subbuf_size); 2003 seq_printf(m, "nr_subbufs: %d\n", meta->nr_subbufs); 2004 return 0; 2005 } 2006 2007 val -= 2; 2008 seq_printf(m, "buffer[%ld]: %d\n", val, meta->buffers[val]); 2009 2010 return 0; 2011 } 2012 2013 static void rbm_stop(struct seq_file *m, void *p) 2014 { 2015 } 2016 2017 static const struct seq_operations rb_meta_seq_ops = { 2018 .start = rbm_start, 2019 .next = rbm_next, 2020 .show = rbm_show, 2021 .stop = rbm_stop, 2022 }; 2023 2024 int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu) 2025 { 2026 struct seq_file *m; 2027 int ret; 2028 2029 ret = seq_open(file, &rb_meta_seq_ops); 2030 if (ret) 2031 return ret; 2032 2033 m = file->private_data; 2034 m->private = buffer->buffers[cpu]; 2035 2036 return 0; 2037 } 2038 2039 /* Map the buffer_pages to the previous head and commit pages */ 2040 static void rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer, 2041 struct buffer_page *bpage) 2042 { 2043 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 2044 2045 if (meta->head_buffer == (unsigned long)bpage->page) 2046 cpu_buffer->head_page = bpage; 2047 2048 if (meta->commit_buffer == (unsigned long)bpage->page) { 2049 cpu_buffer->commit_page = bpage; 2050 cpu_buffer->tail_page = bpage; 2051 } 2052 } 2053 2054 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2055 long nr_pages, struct list_head *pages) 2056 { 2057 struct trace_buffer *buffer = cpu_buffer->buffer; 2058 struct ring_buffer_meta *meta = NULL; 2059 struct buffer_page *bpage, *tmp; 2060 bool user_thread = current->mm != NULL; 2061 gfp_t mflags; 2062 long i; 2063 2064 /* 2065 * Check if the available memory is there first. 2066 * Note, si_mem_available() only gives us a rough estimate of available 2067 * memory. It may not be accurate. But we don't care, we just want 2068 * to prevent doing any allocation when it is obvious that it is 2069 * not going to succeed. 2070 */ 2071 i = si_mem_available(); 2072 if (i < nr_pages) 2073 return -ENOMEM; 2074 2075 /* 2076 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 2077 * gracefully without invoking oom-killer and the system is not 2078 * destabilized. 2079 */ 2080 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 2081 2082 /* 2083 * If a user thread allocates too much, and si_mem_available() 2084 * reports there's enough memory, even though there is not. 2085 * Make sure the OOM killer kills this thread. This can happen 2086 * even with RETRY_MAYFAIL because another task may be doing 2087 * an allocation after this task has taken all memory. 2088 * This is the task the OOM killer needs to take out during this 2089 * loop, even if it was triggered by an allocation somewhere else. 2090 */ 2091 if (user_thread) 2092 set_current_oom_origin(); 2093 2094 if (buffer->range_addr_start) 2095 meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu); 2096 2097 for (i = 0; i < nr_pages; i++) { 2098 struct page *page; 2099 2100 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 2101 mflags, cpu_to_node(cpu_buffer->cpu)); 2102 if (!bpage) 2103 goto free_pages; 2104 2105 rb_check_bpage(cpu_buffer, bpage); 2106 2107 /* 2108 * Append the pages as for mapped buffers we want to keep 2109 * the order 2110 */ 2111 list_add_tail(&bpage->list, pages); 2112 2113 if (meta) { 2114 /* A range was given. Use that for the buffer page */ 2115 bpage->page = rb_range_buffer(cpu_buffer, i + 1); 2116 if (!bpage->page) 2117 goto free_pages; 2118 /* If this is valid from a previous boot */ 2119 if (meta->head_buffer) 2120 rb_meta_buffer_update(cpu_buffer, bpage); 2121 bpage->range = 1; 2122 bpage->id = i + 1; 2123 } else { 2124 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), 2125 mflags | __GFP_COMP | __GFP_ZERO, 2126 cpu_buffer->buffer->subbuf_order); 2127 if (!page) 2128 goto free_pages; 2129 bpage->page = page_address(page); 2130 rb_init_page(bpage->page); 2131 } 2132 bpage->order = cpu_buffer->buffer->subbuf_order; 2133 2134 if (user_thread && fatal_signal_pending(current)) 2135 goto free_pages; 2136 } 2137 if (user_thread) 2138 clear_current_oom_origin(); 2139 2140 return 0; 2141 2142 free_pages: 2143 list_for_each_entry_safe(bpage, tmp, pages, list) { 2144 list_del_init(&bpage->list); 2145 free_buffer_page(bpage); 2146 } 2147 if (user_thread) 2148 clear_current_oom_origin(); 2149 2150 return -ENOMEM; 2151 } 2152 2153 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2154 unsigned long nr_pages) 2155 { 2156 LIST_HEAD(pages); 2157 2158 WARN_ON(!nr_pages); 2159 2160 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 2161 return -ENOMEM; 2162 2163 /* 2164 * The ring buffer page list is a circular list that does not 2165 * start and end with a list head. All page list items point to 2166 * other pages. 2167 */ 2168 cpu_buffer->pages = pages.next; 2169 list_del(&pages); 2170 2171 cpu_buffer->nr_pages = nr_pages; 2172 2173 rb_check_pages(cpu_buffer); 2174 2175 return 0; 2176 } 2177 2178 static struct ring_buffer_per_cpu * 2179 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 2180 { 2181 struct ring_buffer_per_cpu *cpu_buffer; 2182 struct ring_buffer_meta *meta; 2183 struct buffer_page *bpage; 2184 struct page *page; 2185 int ret; 2186 2187 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 2188 GFP_KERNEL, cpu_to_node(cpu)); 2189 if (!cpu_buffer) 2190 return NULL; 2191 2192 cpu_buffer->cpu = cpu; 2193 cpu_buffer->buffer = buffer; 2194 raw_spin_lock_init(&cpu_buffer->reader_lock); 2195 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 2196 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 2197 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 2198 init_completion(&cpu_buffer->update_done); 2199 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 2200 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 2201 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 2202 mutex_init(&cpu_buffer->mapping_lock); 2203 2204 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 2205 GFP_KERNEL, cpu_to_node(cpu)); 2206 if (!bpage) 2207 goto fail_free_buffer; 2208 2209 rb_check_bpage(cpu_buffer, bpage); 2210 2211 cpu_buffer->reader_page = bpage; 2212 2213 if (buffer->range_addr_start) { 2214 /* 2215 * Range mapped buffers have the same restrictions as memory 2216 * mapped ones do. 2217 */ 2218 cpu_buffer->mapped = 1; 2219 cpu_buffer->ring_meta = rb_range_meta(buffer, nr_pages, cpu); 2220 bpage->page = rb_range_buffer(cpu_buffer, 0); 2221 if (!bpage->page) 2222 goto fail_free_reader; 2223 if (cpu_buffer->ring_meta->head_buffer) 2224 rb_meta_buffer_update(cpu_buffer, bpage); 2225 bpage->range = 1; 2226 } else { 2227 page = alloc_pages_node(cpu_to_node(cpu), 2228 GFP_KERNEL | __GFP_COMP | __GFP_ZERO, 2229 cpu_buffer->buffer->subbuf_order); 2230 if (!page) 2231 goto fail_free_reader; 2232 bpage->page = page_address(page); 2233 rb_init_page(bpage->page); 2234 } 2235 2236 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2237 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2238 2239 ret = rb_allocate_pages(cpu_buffer, nr_pages); 2240 if (ret < 0) 2241 goto fail_free_reader; 2242 2243 rb_meta_validate_events(cpu_buffer); 2244 2245 /* If the boot meta was valid then this has already been updated */ 2246 meta = cpu_buffer->ring_meta; 2247 if (!meta || !meta->head_buffer || 2248 !cpu_buffer->head_page || !cpu_buffer->commit_page || !cpu_buffer->tail_page) { 2249 if (meta && meta->head_buffer && 2250 (cpu_buffer->head_page || cpu_buffer->commit_page || cpu_buffer->tail_page)) { 2251 pr_warn("Ring buffer meta buffers not all mapped\n"); 2252 if (!cpu_buffer->head_page) 2253 pr_warn(" Missing head_page\n"); 2254 if (!cpu_buffer->commit_page) 2255 pr_warn(" Missing commit_page\n"); 2256 if (!cpu_buffer->tail_page) 2257 pr_warn(" Missing tail_page\n"); 2258 } 2259 2260 cpu_buffer->head_page 2261 = list_entry(cpu_buffer->pages, struct buffer_page, list); 2262 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 2263 2264 rb_head_page_activate(cpu_buffer); 2265 2266 if (cpu_buffer->ring_meta) 2267 meta->commit_buffer = meta->head_buffer; 2268 } else { 2269 /* The valid meta buffer still needs to activate the head page */ 2270 rb_head_page_activate(cpu_buffer); 2271 } 2272 2273 return cpu_buffer; 2274 2275 fail_free_reader: 2276 free_buffer_page(cpu_buffer->reader_page); 2277 2278 fail_free_buffer: 2279 kfree(cpu_buffer); 2280 return NULL; 2281 } 2282 2283 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 2284 { 2285 struct list_head *head = cpu_buffer->pages; 2286 struct buffer_page *bpage, *tmp; 2287 2288 irq_work_sync(&cpu_buffer->irq_work.work); 2289 2290 free_buffer_page(cpu_buffer->reader_page); 2291 2292 if (head) { 2293 rb_head_page_deactivate(cpu_buffer); 2294 2295 list_for_each_entry_safe(bpage, tmp, head, list) { 2296 list_del_init(&bpage->list); 2297 free_buffer_page(bpage); 2298 } 2299 bpage = list_entry(head, struct buffer_page, list); 2300 free_buffer_page(bpage); 2301 } 2302 2303 free_page((unsigned long)cpu_buffer->free_page); 2304 2305 kfree(cpu_buffer); 2306 } 2307 2308 static struct trace_buffer *alloc_buffer(unsigned long size, unsigned flags, 2309 int order, unsigned long start, 2310 unsigned long end, 2311 struct lock_class_key *key) 2312 { 2313 struct trace_buffer *buffer; 2314 long nr_pages; 2315 int subbuf_size; 2316 int bsize; 2317 int cpu; 2318 int ret; 2319 2320 /* keep it in its own cache line */ 2321 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 2322 GFP_KERNEL); 2323 if (!buffer) 2324 return NULL; 2325 2326 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 2327 goto fail_free_buffer; 2328 2329 buffer->subbuf_order = order; 2330 subbuf_size = (PAGE_SIZE << order); 2331 buffer->subbuf_size = subbuf_size - BUF_PAGE_HDR_SIZE; 2332 2333 /* Max payload is buffer page size - header (8bytes) */ 2334 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 2335 2336 buffer->flags = flags; 2337 buffer->clock = trace_clock_local; 2338 buffer->reader_lock_key = key; 2339 2340 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 2341 init_waitqueue_head(&buffer->irq_work.waiters); 2342 2343 buffer->cpus = nr_cpu_ids; 2344 2345 bsize = sizeof(void *) * nr_cpu_ids; 2346 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 2347 GFP_KERNEL); 2348 if (!buffer->buffers) 2349 goto fail_free_cpumask; 2350 2351 /* If start/end are specified, then that overrides size */ 2352 if (start && end) { 2353 unsigned long ptr; 2354 int n; 2355 2356 size = end - start; 2357 size = size / nr_cpu_ids; 2358 2359 /* 2360 * The number of sub-buffers (nr_pages) is determined by the 2361 * total size allocated minus the meta data size. 2362 * Then that is divided by the number of per CPU buffers 2363 * needed, plus account for the integer array index that 2364 * will be appended to the meta data. 2365 */ 2366 nr_pages = (size - sizeof(struct ring_buffer_meta)) / 2367 (subbuf_size + sizeof(int)); 2368 /* Need at least two pages plus the reader page */ 2369 if (nr_pages < 3) 2370 goto fail_free_buffers; 2371 2372 again: 2373 /* Make sure that the size fits aligned */ 2374 for (n = 0, ptr = start; n < nr_cpu_ids; n++) { 2375 ptr += sizeof(struct ring_buffer_meta) + 2376 sizeof(int) * nr_pages; 2377 ptr = ALIGN(ptr, subbuf_size); 2378 ptr += subbuf_size * nr_pages; 2379 } 2380 if (ptr > end) { 2381 if (nr_pages <= 3) 2382 goto fail_free_buffers; 2383 nr_pages--; 2384 goto again; 2385 } 2386 2387 /* nr_pages should not count the reader page */ 2388 nr_pages--; 2389 buffer->range_addr_start = start; 2390 buffer->range_addr_end = end; 2391 2392 rb_range_meta_init(buffer, nr_pages); 2393 } else { 2394 2395 /* need at least two pages */ 2396 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2397 if (nr_pages < 2) 2398 nr_pages = 2; 2399 } 2400 2401 cpu = raw_smp_processor_id(); 2402 cpumask_set_cpu(cpu, buffer->cpumask); 2403 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 2404 if (!buffer->buffers[cpu]) 2405 goto fail_free_buffers; 2406 2407 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2408 if (ret < 0) 2409 goto fail_free_buffers; 2410 2411 mutex_init(&buffer->mutex); 2412 2413 return buffer; 2414 2415 fail_free_buffers: 2416 for_each_buffer_cpu(buffer, cpu) { 2417 if (buffer->buffers[cpu]) 2418 rb_free_cpu_buffer(buffer->buffers[cpu]); 2419 } 2420 kfree(buffer->buffers); 2421 2422 fail_free_cpumask: 2423 free_cpumask_var(buffer->cpumask); 2424 2425 fail_free_buffer: 2426 kfree(buffer); 2427 return NULL; 2428 } 2429 2430 /** 2431 * __ring_buffer_alloc - allocate a new ring_buffer 2432 * @size: the size in bytes per cpu that is needed. 2433 * @flags: attributes to set for the ring buffer. 2434 * @key: ring buffer reader_lock_key. 2435 * 2436 * Currently the only flag that is available is the RB_FL_OVERWRITE 2437 * flag. This flag means that the buffer will overwrite old data 2438 * when the buffer wraps. If this flag is not set, the buffer will 2439 * drop data when the tail hits the head. 2440 */ 2441 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 2442 struct lock_class_key *key) 2443 { 2444 /* Default buffer page size - one system page */ 2445 return alloc_buffer(size, flags, 0, 0, 0,key); 2446 2447 } 2448 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 2449 2450 /** 2451 * __ring_buffer_alloc_range - allocate a new ring_buffer from existing memory 2452 * @size: the size in bytes per cpu that is needed. 2453 * @flags: attributes to set for the ring buffer. 2454 * @order: sub-buffer order 2455 * @start: start of allocated range 2456 * @range_size: size of allocated range 2457 * @key: ring buffer reader_lock_key. 2458 * 2459 * Currently the only flag that is available is the RB_FL_OVERWRITE 2460 * flag. This flag means that the buffer will overwrite old data 2461 * when the buffer wraps. If this flag is not set, the buffer will 2462 * drop data when the tail hits the head. 2463 */ 2464 struct trace_buffer *__ring_buffer_alloc_range(unsigned long size, unsigned flags, 2465 int order, unsigned long start, 2466 unsigned long range_size, 2467 struct lock_class_key *key) 2468 { 2469 return alloc_buffer(size, flags, order, start, start + range_size, key); 2470 } 2471 2472 /** 2473 * ring_buffer_last_boot_delta - return the delta offset from last boot 2474 * @buffer: The buffer to return the delta from 2475 * @text: Return text delta 2476 * @data: Return data delta 2477 * 2478 * Returns: The true if the delta is non zero 2479 */ 2480 bool ring_buffer_last_boot_delta(struct trace_buffer *buffer, unsigned long *kaslr_addr) 2481 { 2482 if (!buffer) 2483 return false; 2484 2485 if (!buffer->kaslr_addr) 2486 return false; 2487 2488 *kaslr_addr = buffer->kaslr_addr; 2489 2490 return true; 2491 } 2492 2493 /** 2494 * ring_buffer_free - free a ring buffer. 2495 * @buffer: the buffer to free. 2496 */ 2497 void 2498 ring_buffer_free(struct trace_buffer *buffer) 2499 { 2500 int cpu; 2501 2502 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2503 2504 irq_work_sync(&buffer->irq_work.work); 2505 2506 for_each_buffer_cpu(buffer, cpu) 2507 rb_free_cpu_buffer(buffer->buffers[cpu]); 2508 2509 kfree(buffer->buffers); 2510 free_cpumask_var(buffer->cpumask); 2511 2512 kfree(buffer); 2513 } 2514 EXPORT_SYMBOL_GPL(ring_buffer_free); 2515 2516 void ring_buffer_set_clock(struct trace_buffer *buffer, 2517 u64 (*clock)(void)) 2518 { 2519 buffer->clock = clock; 2520 } 2521 2522 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 2523 { 2524 buffer->time_stamp_abs = abs; 2525 } 2526 2527 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 2528 { 2529 return buffer->time_stamp_abs; 2530 } 2531 2532 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 2533 { 2534 return local_read(&bpage->entries) & RB_WRITE_MASK; 2535 } 2536 2537 static inline unsigned long rb_page_write(struct buffer_page *bpage) 2538 { 2539 return local_read(&bpage->write) & RB_WRITE_MASK; 2540 } 2541 2542 static bool 2543 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 2544 { 2545 struct list_head *tail_page, *to_remove, *next_page; 2546 struct buffer_page *to_remove_page, *tmp_iter_page; 2547 struct buffer_page *last_page, *first_page; 2548 unsigned long nr_removed; 2549 unsigned long head_bit; 2550 int page_entries; 2551 2552 head_bit = 0; 2553 2554 raw_spin_lock_irq(&cpu_buffer->reader_lock); 2555 atomic_inc(&cpu_buffer->record_disabled); 2556 /* 2557 * We don't race with the readers since we have acquired the reader 2558 * lock. We also don't race with writers after disabling recording. 2559 * This makes it easy to figure out the first and the last page to be 2560 * removed from the list. We unlink all the pages in between including 2561 * the first and last pages. This is done in a busy loop so that we 2562 * lose the least number of traces. 2563 * The pages are freed after we restart recording and unlock readers. 2564 */ 2565 tail_page = &cpu_buffer->tail_page->list; 2566 2567 /* 2568 * tail page might be on reader page, we remove the next page 2569 * from the ring buffer 2570 */ 2571 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 2572 tail_page = rb_list_head(tail_page->next); 2573 to_remove = tail_page; 2574 2575 /* start of pages to remove */ 2576 first_page = list_entry(rb_list_head(to_remove->next), 2577 struct buffer_page, list); 2578 2579 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 2580 to_remove = rb_list_head(to_remove)->next; 2581 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 2582 } 2583 /* Read iterators need to reset themselves when some pages removed */ 2584 cpu_buffer->pages_removed += nr_removed; 2585 2586 next_page = rb_list_head(to_remove)->next; 2587 2588 /* 2589 * Now we remove all pages between tail_page and next_page. 2590 * Make sure that we have head_bit value preserved for the 2591 * next page 2592 */ 2593 tail_page->next = (struct list_head *)((unsigned long)next_page | 2594 head_bit); 2595 next_page = rb_list_head(next_page); 2596 next_page->prev = tail_page; 2597 2598 /* make sure pages points to a valid page in the ring buffer */ 2599 cpu_buffer->pages = next_page; 2600 cpu_buffer->cnt++; 2601 2602 /* update head page */ 2603 if (head_bit) 2604 cpu_buffer->head_page = list_entry(next_page, 2605 struct buffer_page, list); 2606 2607 /* pages are removed, resume tracing and then free the pages */ 2608 atomic_dec(&cpu_buffer->record_disabled); 2609 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 2610 2611 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 2612 2613 /* last buffer page to remove */ 2614 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 2615 list); 2616 tmp_iter_page = first_page; 2617 2618 do { 2619 cond_resched(); 2620 2621 to_remove_page = tmp_iter_page; 2622 rb_inc_page(&tmp_iter_page); 2623 2624 /* update the counters */ 2625 page_entries = rb_page_entries(to_remove_page); 2626 if (page_entries) { 2627 /* 2628 * If something was added to this page, it was full 2629 * since it is not the tail page. So we deduct the 2630 * bytes consumed in ring buffer from here. 2631 * Increment overrun to account for the lost events. 2632 */ 2633 local_add(page_entries, &cpu_buffer->overrun); 2634 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 2635 local_inc(&cpu_buffer->pages_lost); 2636 } 2637 2638 /* 2639 * We have already removed references to this list item, just 2640 * free up the buffer_page and its page 2641 */ 2642 free_buffer_page(to_remove_page); 2643 nr_removed--; 2644 2645 } while (to_remove_page != last_page); 2646 2647 RB_WARN_ON(cpu_buffer, nr_removed); 2648 2649 return nr_removed == 0; 2650 } 2651 2652 static bool 2653 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2654 { 2655 struct list_head *pages = &cpu_buffer->new_pages; 2656 unsigned long flags; 2657 bool success; 2658 int retries; 2659 2660 /* Can be called at early boot up, where interrupts must not been enabled */ 2661 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2662 /* 2663 * We are holding the reader lock, so the reader page won't be swapped 2664 * in the ring buffer. Now we are racing with the writer trying to 2665 * move head page and the tail page. 2666 * We are going to adapt the reader page update process where: 2667 * 1. We first splice the start and end of list of new pages between 2668 * the head page and its previous page. 2669 * 2. We cmpxchg the prev_page->next to point from head page to the 2670 * start of new pages list. 2671 * 3. Finally, we update the head->prev to the end of new list. 2672 * 2673 * We will try this process 10 times, to make sure that we don't keep 2674 * spinning. 2675 */ 2676 retries = 10; 2677 success = false; 2678 while (retries--) { 2679 struct list_head *head_page, *prev_page; 2680 struct list_head *last_page, *first_page; 2681 struct list_head *head_page_with_bit; 2682 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2683 2684 if (!hpage) 2685 break; 2686 head_page = &hpage->list; 2687 prev_page = head_page->prev; 2688 2689 first_page = pages->next; 2690 last_page = pages->prev; 2691 2692 head_page_with_bit = (struct list_head *) 2693 ((unsigned long)head_page | RB_PAGE_HEAD); 2694 2695 last_page->next = head_page_with_bit; 2696 first_page->prev = prev_page; 2697 2698 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 2699 if (try_cmpxchg(&prev_page->next, 2700 &head_page_with_bit, first_page)) { 2701 /* 2702 * yay, we replaced the page pointer to our new list, 2703 * now, we just have to update to head page's prev 2704 * pointer to point to end of list 2705 */ 2706 head_page->prev = last_page; 2707 cpu_buffer->cnt++; 2708 success = true; 2709 break; 2710 } 2711 } 2712 2713 if (success) 2714 INIT_LIST_HEAD(pages); 2715 /* 2716 * If we weren't successful in adding in new pages, warn and stop 2717 * tracing 2718 */ 2719 RB_WARN_ON(cpu_buffer, !success); 2720 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2721 2722 /* free pages if they weren't inserted */ 2723 if (!success) { 2724 struct buffer_page *bpage, *tmp; 2725 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2726 list) { 2727 list_del_init(&bpage->list); 2728 free_buffer_page(bpage); 2729 } 2730 } 2731 return success; 2732 } 2733 2734 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2735 { 2736 bool success; 2737 2738 if (cpu_buffer->nr_pages_to_update > 0) 2739 success = rb_insert_pages(cpu_buffer); 2740 else 2741 success = rb_remove_pages(cpu_buffer, 2742 -cpu_buffer->nr_pages_to_update); 2743 2744 if (success) 2745 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2746 } 2747 2748 static void update_pages_handler(struct work_struct *work) 2749 { 2750 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2751 struct ring_buffer_per_cpu, update_pages_work); 2752 rb_update_pages(cpu_buffer); 2753 complete(&cpu_buffer->update_done); 2754 } 2755 2756 /** 2757 * ring_buffer_resize - resize the ring buffer 2758 * @buffer: the buffer to resize. 2759 * @size: the new size. 2760 * @cpu_id: the cpu buffer to resize 2761 * 2762 * Minimum size is 2 * buffer->subbuf_size. 2763 * 2764 * Returns 0 on success and < 0 on failure. 2765 */ 2766 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2767 int cpu_id) 2768 { 2769 struct ring_buffer_per_cpu *cpu_buffer; 2770 unsigned long nr_pages; 2771 int cpu, err; 2772 2773 /* 2774 * Always succeed at resizing a non-existent buffer: 2775 */ 2776 if (!buffer) 2777 return 0; 2778 2779 /* Make sure the requested buffer exists */ 2780 if (cpu_id != RING_BUFFER_ALL_CPUS && 2781 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2782 return 0; 2783 2784 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2785 2786 /* we need a minimum of two pages */ 2787 if (nr_pages < 2) 2788 nr_pages = 2; 2789 2790 /* prevent another thread from changing buffer sizes */ 2791 mutex_lock(&buffer->mutex); 2792 atomic_inc(&buffer->resizing); 2793 2794 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2795 /* 2796 * Don't succeed if resizing is disabled, as a reader might be 2797 * manipulating the ring buffer and is expecting a sane state while 2798 * this is true. 2799 */ 2800 for_each_buffer_cpu(buffer, cpu) { 2801 cpu_buffer = buffer->buffers[cpu]; 2802 if (atomic_read(&cpu_buffer->resize_disabled)) { 2803 err = -EBUSY; 2804 goto out_err_unlock; 2805 } 2806 } 2807 2808 /* calculate the pages to update */ 2809 for_each_buffer_cpu(buffer, cpu) { 2810 cpu_buffer = buffer->buffers[cpu]; 2811 2812 cpu_buffer->nr_pages_to_update = nr_pages - 2813 cpu_buffer->nr_pages; 2814 /* 2815 * nothing more to do for removing pages or no update 2816 */ 2817 if (cpu_buffer->nr_pages_to_update <= 0) 2818 continue; 2819 /* 2820 * to add pages, make sure all new pages can be 2821 * allocated without receiving ENOMEM 2822 */ 2823 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2824 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2825 &cpu_buffer->new_pages)) { 2826 /* not enough memory for new pages */ 2827 err = -ENOMEM; 2828 goto out_err; 2829 } 2830 2831 cond_resched(); 2832 } 2833 2834 cpus_read_lock(); 2835 /* 2836 * Fire off all the required work handlers 2837 * We can't schedule on offline CPUs, but it's not necessary 2838 * since we can change their buffer sizes without any race. 2839 */ 2840 for_each_buffer_cpu(buffer, cpu) { 2841 cpu_buffer = buffer->buffers[cpu]; 2842 if (!cpu_buffer->nr_pages_to_update) 2843 continue; 2844 2845 /* Can't run something on an offline CPU. */ 2846 if (!cpu_online(cpu)) { 2847 rb_update_pages(cpu_buffer); 2848 cpu_buffer->nr_pages_to_update = 0; 2849 } else { 2850 /* Run directly if possible. */ 2851 migrate_disable(); 2852 if (cpu != smp_processor_id()) { 2853 migrate_enable(); 2854 schedule_work_on(cpu, 2855 &cpu_buffer->update_pages_work); 2856 } else { 2857 update_pages_handler(&cpu_buffer->update_pages_work); 2858 migrate_enable(); 2859 } 2860 } 2861 } 2862 2863 /* wait for all the updates to complete */ 2864 for_each_buffer_cpu(buffer, cpu) { 2865 cpu_buffer = buffer->buffers[cpu]; 2866 if (!cpu_buffer->nr_pages_to_update) 2867 continue; 2868 2869 if (cpu_online(cpu)) 2870 wait_for_completion(&cpu_buffer->update_done); 2871 cpu_buffer->nr_pages_to_update = 0; 2872 } 2873 2874 cpus_read_unlock(); 2875 } else { 2876 cpu_buffer = buffer->buffers[cpu_id]; 2877 2878 if (nr_pages == cpu_buffer->nr_pages) 2879 goto out; 2880 2881 /* 2882 * Don't succeed if resizing is disabled, as a reader might be 2883 * manipulating the ring buffer and is expecting a sane state while 2884 * this is true. 2885 */ 2886 if (atomic_read(&cpu_buffer->resize_disabled)) { 2887 err = -EBUSY; 2888 goto out_err_unlock; 2889 } 2890 2891 cpu_buffer->nr_pages_to_update = nr_pages - 2892 cpu_buffer->nr_pages; 2893 2894 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2895 if (cpu_buffer->nr_pages_to_update > 0 && 2896 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2897 &cpu_buffer->new_pages)) { 2898 err = -ENOMEM; 2899 goto out_err; 2900 } 2901 2902 cpus_read_lock(); 2903 2904 /* Can't run something on an offline CPU. */ 2905 if (!cpu_online(cpu_id)) 2906 rb_update_pages(cpu_buffer); 2907 else { 2908 /* Run directly if possible. */ 2909 migrate_disable(); 2910 if (cpu_id == smp_processor_id()) { 2911 rb_update_pages(cpu_buffer); 2912 migrate_enable(); 2913 } else { 2914 migrate_enable(); 2915 schedule_work_on(cpu_id, 2916 &cpu_buffer->update_pages_work); 2917 wait_for_completion(&cpu_buffer->update_done); 2918 } 2919 } 2920 2921 cpu_buffer->nr_pages_to_update = 0; 2922 cpus_read_unlock(); 2923 } 2924 2925 out: 2926 /* 2927 * The ring buffer resize can happen with the ring buffer 2928 * enabled, so that the update disturbs the tracing as little 2929 * as possible. But if the buffer is disabled, we do not need 2930 * to worry about that, and we can take the time to verify 2931 * that the buffer is not corrupt. 2932 */ 2933 if (atomic_read(&buffer->record_disabled)) { 2934 atomic_inc(&buffer->record_disabled); 2935 /* 2936 * Even though the buffer was disabled, we must make sure 2937 * that it is truly disabled before calling rb_check_pages. 2938 * There could have been a race between checking 2939 * record_disable and incrementing it. 2940 */ 2941 synchronize_rcu(); 2942 for_each_buffer_cpu(buffer, cpu) { 2943 cpu_buffer = buffer->buffers[cpu]; 2944 rb_check_pages(cpu_buffer); 2945 } 2946 atomic_dec(&buffer->record_disabled); 2947 } 2948 2949 atomic_dec(&buffer->resizing); 2950 mutex_unlock(&buffer->mutex); 2951 return 0; 2952 2953 out_err: 2954 for_each_buffer_cpu(buffer, cpu) { 2955 struct buffer_page *bpage, *tmp; 2956 2957 cpu_buffer = buffer->buffers[cpu]; 2958 cpu_buffer->nr_pages_to_update = 0; 2959 2960 if (list_empty(&cpu_buffer->new_pages)) 2961 continue; 2962 2963 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2964 list) { 2965 list_del_init(&bpage->list); 2966 free_buffer_page(bpage); 2967 } 2968 } 2969 out_err_unlock: 2970 atomic_dec(&buffer->resizing); 2971 mutex_unlock(&buffer->mutex); 2972 return err; 2973 } 2974 EXPORT_SYMBOL_GPL(ring_buffer_resize); 2975 2976 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 2977 { 2978 mutex_lock(&buffer->mutex); 2979 if (val) 2980 buffer->flags |= RB_FL_OVERWRITE; 2981 else 2982 buffer->flags &= ~RB_FL_OVERWRITE; 2983 mutex_unlock(&buffer->mutex); 2984 } 2985 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 2986 2987 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 2988 { 2989 return bpage->page->data + index; 2990 } 2991 2992 static __always_inline struct ring_buffer_event * 2993 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 2994 { 2995 return __rb_page_index(cpu_buffer->reader_page, 2996 cpu_buffer->reader_page->read); 2997 } 2998 2999 static struct ring_buffer_event * 3000 rb_iter_head_event(struct ring_buffer_iter *iter) 3001 { 3002 struct ring_buffer_event *event; 3003 struct buffer_page *iter_head_page = iter->head_page; 3004 unsigned long commit; 3005 unsigned length; 3006 3007 if (iter->head != iter->next_event) 3008 return iter->event; 3009 3010 /* 3011 * When the writer goes across pages, it issues a cmpxchg which 3012 * is a mb(), which will synchronize with the rmb here. 3013 * (see rb_tail_page_update() and __rb_reserve_next()) 3014 */ 3015 commit = rb_page_commit(iter_head_page); 3016 smp_rmb(); 3017 3018 /* An event needs to be at least 8 bytes in size */ 3019 if (iter->head > commit - 8) 3020 goto reset; 3021 3022 event = __rb_page_index(iter_head_page, iter->head); 3023 length = rb_event_length(event); 3024 3025 /* 3026 * READ_ONCE() doesn't work on functions and we don't want the 3027 * compiler doing any crazy optimizations with length. 3028 */ 3029 barrier(); 3030 3031 if ((iter->head + length) > commit || length > iter->event_size) 3032 /* Writer corrupted the read? */ 3033 goto reset; 3034 3035 memcpy(iter->event, event, length); 3036 /* 3037 * If the page stamp is still the same after this rmb() then the 3038 * event was safely copied without the writer entering the page. 3039 */ 3040 smp_rmb(); 3041 3042 /* Make sure the page didn't change since we read this */ 3043 if (iter->page_stamp != iter_head_page->page->time_stamp || 3044 commit > rb_page_commit(iter_head_page)) 3045 goto reset; 3046 3047 iter->next_event = iter->head + length; 3048 return iter->event; 3049 reset: 3050 /* Reset to the beginning */ 3051 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3052 iter->head = 0; 3053 iter->next_event = 0; 3054 iter->missed_events = 1; 3055 return NULL; 3056 } 3057 3058 /* Size is determined by what has been committed */ 3059 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 3060 { 3061 return rb_page_commit(bpage) & ~RB_MISSED_MASK; 3062 } 3063 3064 static __always_inline unsigned 3065 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 3066 { 3067 return rb_page_commit(cpu_buffer->commit_page); 3068 } 3069 3070 static __always_inline unsigned 3071 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 3072 { 3073 unsigned long addr = (unsigned long)event; 3074 3075 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 3076 3077 return addr - BUF_PAGE_HDR_SIZE; 3078 } 3079 3080 static void rb_inc_iter(struct ring_buffer_iter *iter) 3081 { 3082 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3083 3084 /* 3085 * The iterator could be on the reader page (it starts there). 3086 * But the head could have moved, since the reader was 3087 * found. Check for this case and assign the iterator 3088 * to the head page instead of next. 3089 */ 3090 if (iter->head_page == cpu_buffer->reader_page) 3091 iter->head_page = rb_set_head_page(cpu_buffer); 3092 else 3093 rb_inc_page(&iter->head_page); 3094 3095 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3096 iter->head = 0; 3097 iter->next_event = 0; 3098 } 3099 3100 /* Return the index into the sub-buffers for a given sub-buffer */ 3101 static int rb_meta_subbuf_idx(struct ring_buffer_meta *meta, void *subbuf) 3102 { 3103 void *subbuf_array; 3104 3105 subbuf_array = (void *)meta + sizeof(int) * meta->nr_subbufs; 3106 subbuf_array = (void *)ALIGN((unsigned long)subbuf_array, meta->subbuf_size); 3107 return (subbuf - subbuf_array) / meta->subbuf_size; 3108 } 3109 3110 static void rb_update_meta_head(struct ring_buffer_per_cpu *cpu_buffer, 3111 struct buffer_page *next_page) 3112 { 3113 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 3114 unsigned long old_head = (unsigned long)next_page->page; 3115 unsigned long new_head; 3116 3117 rb_inc_page(&next_page); 3118 new_head = (unsigned long)next_page->page; 3119 3120 /* 3121 * Only move it forward once, if something else came in and 3122 * moved it forward, then we don't want to touch it. 3123 */ 3124 (void)cmpxchg(&meta->head_buffer, old_head, new_head); 3125 } 3126 3127 static void rb_update_meta_reader(struct ring_buffer_per_cpu *cpu_buffer, 3128 struct buffer_page *reader) 3129 { 3130 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 3131 void *old_reader = cpu_buffer->reader_page->page; 3132 void *new_reader = reader->page; 3133 int id; 3134 3135 id = reader->id; 3136 cpu_buffer->reader_page->id = id; 3137 reader->id = 0; 3138 3139 meta->buffers[0] = rb_meta_subbuf_idx(meta, new_reader); 3140 meta->buffers[id] = rb_meta_subbuf_idx(meta, old_reader); 3141 3142 /* The head pointer is the one after the reader */ 3143 rb_update_meta_head(cpu_buffer, reader); 3144 } 3145 3146 /* 3147 * rb_handle_head_page - writer hit the head page 3148 * 3149 * Returns: +1 to retry page 3150 * 0 to continue 3151 * -1 on error 3152 */ 3153 static int 3154 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 3155 struct buffer_page *tail_page, 3156 struct buffer_page *next_page) 3157 { 3158 struct buffer_page *new_head; 3159 int entries; 3160 int type; 3161 int ret; 3162 3163 entries = rb_page_entries(next_page); 3164 3165 /* 3166 * The hard part is here. We need to move the head 3167 * forward, and protect against both readers on 3168 * other CPUs and writers coming in via interrupts. 3169 */ 3170 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 3171 RB_PAGE_HEAD); 3172 3173 /* 3174 * type can be one of four: 3175 * NORMAL - an interrupt already moved it for us 3176 * HEAD - we are the first to get here. 3177 * UPDATE - we are the interrupt interrupting 3178 * a current move. 3179 * MOVED - a reader on another CPU moved the next 3180 * pointer to its reader page. Give up 3181 * and try again. 3182 */ 3183 3184 switch (type) { 3185 case RB_PAGE_HEAD: 3186 /* 3187 * We changed the head to UPDATE, thus 3188 * it is our responsibility to update 3189 * the counters. 3190 */ 3191 local_add(entries, &cpu_buffer->overrun); 3192 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 3193 local_inc(&cpu_buffer->pages_lost); 3194 3195 if (cpu_buffer->ring_meta) 3196 rb_update_meta_head(cpu_buffer, next_page); 3197 /* 3198 * The entries will be zeroed out when we move the 3199 * tail page. 3200 */ 3201 3202 /* still more to do */ 3203 break; 3204 3205 case RB_PAGE_UPDATE: 3206 /* 3207 * This is an interrupt that interrupt the 3208 * previous update. Still more to do. 3209 */ 3210 break; 3211 case RB_PAGE_NORMAL: 3212 /* 3213 * An interrupt came in before the update 3214 * and processed this for us. 3215 * Nothing left to do. 3216 */ 3217 return 1; 3218 case RB_PAGE_MOVED: 3219 /* 3220 * The reader is on another CPU and just did 3221 * a swap with our next_page. 3222 * Try again. 3223 */ 3224 return 1; 3225 default: 3226 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 3227 return -1; 3228 } 3229 3230 /* 3231 * Now that we are here, the old head pointer is 3232 * set to UPDATE. This will keep the reader from 3233 * swapping the head page with the reader page. 3234 * The reader (on another CPU) will spin till 3235 * we are finished. 3236 * 3237 * We just need to protect against interrupts 3238 * doing the job. We will set the next pointer 3239 * to HEAD. After that, we set the old pointer 3240 * to NORMAL, but only if it was HEAD before. 3241 * otherwise we are an interrupt, and only 3242 * want the outer most commit to reset it. 3243 */ 3244 new_head = next_page; 3245 rb_inc_page(&new_head); 3246 3247 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 3248 RB_PAGE_NORMAL); 3249 3250 /* 3251 * Valid returns are: 3252 * HEAD - an interrupt came in and already set it. 3253 * NORMAL - One of two things: 3254 * 1) We really set it. 3255 * 2) A bunch of interrupts came in and moved 3256 * the page forward again. 3257 */ 3258 switch (ret) { 3259 case RB_PAGE_HEAD: 3260 case RB_PAGE_NORMAL: 3261 /* OK */ 3262 break; 3263 default: 3264 RB_WARN_ON(cpu_buffer, 1); 3265 return -1; 3266 } 3267 3268 /* 3269 * It is possible that an interrupt came in, 3270 * set the head up, then more interrupts came in 3271 * and moved it again. When we get back here, 3272 * the page would have been set to NORMAL but we 3273 * just set it back to HEAD. 3274 * 3275 * How do you detect this? Well, if that happened 3276 * the tail page would have moved. 3277 */ 3278 if (ret == RB_PAGE_NORMAL) { 3279 struct buffer_page *buffer_tail_page; 3280 3281 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 3282 /* 3283 * If the tail had moved passed next, then we need 3284 * to reset the pointer. 3285 */ 3286 if (buffer_tail_page != tail_page && 3287 buffer_tail_page != next_page) 3288 rb_head_page_set_normal(cpu_buffer, new_head, 3289 next_page, 3290 RB_PAGE_HEAD); 3291 } 3292 3293 /* 3294 * If this was the outer most commit (the one that 3295 * changed the original pointer from HEAD to UPDATE), 3296 * then it is up to us to reset it to NORMAL. 3297 */ 3298 if (type == RB_PAGE_HEAD) { 3299 ret = rb_head_page_set_normal(cpu_buffer, next_page, 3300 tail_page, 3301 RB_PAGE_UPDATE); 3302 if (RB_WARN_ON(cpu_buffer, 3303 ret != RB_PAGE_UPDATE)) 3304 return -1; 3305 } 3306 3307 return 0; 3308 } 3309 3310 static inline void 3311 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 3312 unsigned long tail, struct rb_event_info *info) 3313 { 3314 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 3315 struct buffer_page *tail_page = info->tail_page; 3316 struct ring_buffer_event *event; 3317 unsigned long length = info->length; 3318 3319 /* 3320 * Only the event that crossed the page boundary 3321 * must fill the old tail_page with padding. 3322 */ 3323 if (tail >= bsize) { 3324 /* 3325 * If the page was filled, then we still need 3326 * to update the real_end. Reset it to zero 3327 * and the reader will ignore it. 3328 */ 3329 if (tail == bsize) 3330 tail_page->real_end = 0; 3331 3332 local_sub(length, &tail_page->write); 3333 return; 3334 } 3335 3336 event = __rb_page_index(tail_page, tail); 3337 3338 /* 3339 * Save the original length to the meta data. 3340 * This will be used by the reader to add lost event 3341 * counter. 3342 */ 3343 tail_page->real_end = tail; 3344 3345 /* 3346 * If this event is bigger than the minimum size, then 3347 * we need to be careful that we don't subtract the 3348 * write counter enough to allow another writer to slip 3349 * in on this page. 3350 * We put in a discarded commit instead, to make sure 3351 * that this space is not used again, and this space will 3352 * not be accounted into 'entries_bytes'. 3353 * 3354 * If we are less than the minimum size, we don't need to 3355 * worry about it. 3356 */ 3357 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 3358 /* No room for any events */ 3359 3360 /* Mark the rest of the page with padding */ 3361 rb_event_set_padding(event); 3362 3363 /* Make sure the padding is visible before the write update */ 3364 smp_wmb(); 3365 3366 /* Set the write back to the previous setting */ 3367 local_sub(length, &tail_page->write); 3368 return; 3369 } 3370 3371 /* Put in a discarded event */ 3372 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 3373 event->type_len = RINGBUF_TYPE_PADDING; 3374 /* time delta must be non zero */ 3375 event->time_delta = 1; 3376 3377 /* account for padding bytes */ 3378 local_add(bsize - tail, &cpu_buffer->entries_bytes); 3379 3380 /* Make sure the padding is visible before the tail_page->write update */ 3381 smp_wmb(); 3382 3383 /* Set write to end of buffer */ 3384 length = (tail + length) - bsize; 3385 local_sub(length, &tail_page->write); 3386 } 3387 3388 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 3389 3390 /* 3391 * This is the slow path, force gcc not to inline it. 3392 */ 3393 static noinline struct ring_buffer_event * 3394 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 3395 unsigned long tail, struct rb_event_info *info) 3396 { 3397 struct buffer_page *tail_page = info->tail_page; 3398 struct buffer_page *commit_page = cpu_buffer->commit_page; 3399 struct trace_buffer *buffer = cpu_buffer->buffer; 3400 struct buffer_page *next_page; 3401 int ret; 3402 3403 next_page = tail_page; 3404 3405 rb_inc_page(&next_page); 3406 3407 /* 3408 * If for some reason, we had an interrupt storm that made 3409 * it all the way around the buffer, bail, and warn 3410 * about it. 3411 */ 3412 if (unlikely(next_page == commit_page)) { 3413 local_inc(&cpu_buffer->commit_overrun); 3414 goto out_reset; 3415 } 3416 3417 /* 3418 * This is where the fun begins! 3419 * 3420 * We are fighting against races between a reader that 3421 * could be on another CPU trying to swap its reader 3422 * page with the buffer head. 3423 * 3424 * We are also fighting against interrupts coming in and 3425 * moving the head or tail on us as well. 3426 * 3427 * If the next page is the head page then we have filled 3428 * the buffer, unless the commit page is still on the 3429 * reader page. 3430 */ 3431 if (rb_is_head_page(next_page, &tail_page->list)) { 3432 3433 /* 3434 * If the commit is not on the reader page, then 3435 * move the header page. 3436 */ 3437 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 3438 /* 3439 * If we are not in overwrite mode, 3440 * this is easy, just stop here. 3441 */ 3442 if (!(buffer->flags & RB_FL_OVERWRITE)) { 3443 local_inc(&cpu_buffer->dropped_events); 3444 goto out_reset; 3445 } 3446 3447 ret = rb_handle_head_page(cpu_buffer, 3448 tail_page, 3449 next_page); 3450 if (ret < 0) 3451 goto out_reset; 3452 if (ret) 3453 goto out_again; 3454 } else { 3455 /* 3456 * We need to be careful here too. The 3457 * commit page could still be on the reader 3458 * page. We could have a small buffer, and 3459 * have filled up the buffer with events 3460 * from interrupts and such, and wrapped. 3461 * 3462 * Note, if the tail page is also on the 3463 * reader_page, we let it move out. 3464 */ 3465 if (unlikely((cpu_buffer->commit_page != 3466 cpu_buffer->tail_page) && 3467 (cpu_buffer->commit_page == 3468 cpu_buffer->reader_page))) { 3469 local_inc(&cpu_buffer->commit_overrun); 3470 goto out_reset; 3471 } 3472 } 3473 } 3474 3475 rb_tail_page_update(cpu_buffer, tail_page, next_page); 3476 3477 out_again: 3478 3479 rb_reset_tail(cpu_buffer, tail, info); 3480 3481 /* Commit what we have for now. */ 3482 rb_end_commit(cpu_buffer); 3483 /* rb_end_commit() decs committing */ 3484 local_inc(&cpu_buffer->committing); 3485 3486 /* fail and let the caller try again */ 3487 return ERR_PTR(-EAGAIN); 3488 3489 out_reset: 3490 /* reset write */ 3491 rb_reset_tail(cpu_buffer, tail, info); 3492 3493 return NULL; 3494 } 3495 3496 /* Slow path */ 3497 static struct ring_buffer_event * 3498 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3499 struct ring_buffer_event *event, u64 delta, bool abs) 3500 { 3501 if (abs) 3502 event->type_len = RINGBUF_TYPE_TIME_STAMP; 3503 else 3504 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 3505 3506 /* Not the first event on the page, or not delta? */ 3507 if (abs || rb_event_index(cpu_buffer, event)) { 3508 event->time_delta = delta & TS_MASK; 3509 event->array[0] = delta >> TS_SHIFT; 3510 } else { 3511 /* nope, just zero it */ 3512 event->time_delta = 0; 3513 event->array[0] = 0; 3514 } 3515 3516 return skip_time_extend(event); 3517 } 3518 3519 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 3520 static inline bool sched_clock_stable(void) 3521 { 3522 return true; 3523 } 3524 #endif 3525 3526 static void 3527 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3528 struct rb_event_info *info) 3529 { 3530 u64 write_stamp; 3531 3532 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 3533 (unsigned long long)info->delta, 3534 (unsigned long long)info->ts, 3535 (unsigned long long)info->before, 3536 (unsigned long long)info->after, 3537 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 3538 sched_clock_stable() ? "" : 3539 "If you just came from a suspend/resume,\n" 3540 "please switch to the trace global clock:\n" 3541 " echo global > /sys/kernel/tracing/trace_clock\n" 3542 "or add trace_clock=global to the kernel command line\n"); 3543 } 3544 3545 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3546 struct ring_buffer_event **event, 3547 struct rb_event_info *info, 3548 u64 *delta, 3549 unsigned int *length) 3550 { 3551 bool abs = info->add_timestamp & 3552 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 3553 3554 if (unlikely(info->delta > (1ULL << 59))) { 3555 /* 3556 * Some timers can use more than 59 bits, and when a timestamp 3557 * is added to the buffer, it will lose those bits. 3558 */ 3559 if (abs && (info->ts & TS_MSB)) { 3560 info->delta &= ABS_TS_MASK; 3561 3562 /* did the clock go backwards */ 3563 } else if (info->before == info->after && info->before > info->ts) { 3564 /* not interrupted */ 3565 static int once; 3566 3567 /* 3568 * This is possible with a recalibrating of the TSC. 3569 * Do not produce a call stack, but just report it. 3570 */ 3571 if (!once) { 3572 once++; 3573 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 3574 info->before, info->ts); 3575 } 3576 } else 3577 rb_check_timestamp(cpu_buffer, info); 3578 if (!abs) 3579 info->delta = 0; 3580 } 3581 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 3582 *length -= RB_LEN_TIME_EXTEND; 3583 *delta = 0; 3584 } 3585 3586 /** 3587 * rb_update_event - update event type and data 3588 * @cpu_buffer: The per cpu buffer of the @event 3589 * @event: the event to update 3590 * @info: The info to update the @event with (contains length and delta) 3591 * 3592 * Update the type and data fields of the @event. The length 3593 * is the actual size that is written to the ring buffer, 3594 * and with this, we can determine what to place into the 3595 * data field. 3596 */ 3597 static void 3598 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 3599 struct ring_buffer_event *event, 3600 struct rb_event_info *info) 3601 { 3602 unsigned length = info->length; 3603 u64 delta = info->delta; 3604 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 3605 3606 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 3607 cpu_buffer->event_stamp[nest] = info->ts; 3608 3609 /* 3610 * If we need to add a timestamp, then we 3611 * add it to the start of the reserved space. 3612 */ 3613 if (unlikely(info->add_timestamp)) 3614 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 3615 3616 event->time_delta = delta; 3617 length -= RB_EVNT_HDR_SIZE; 3618 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 3619 event->type_len = 0; 3620 event->array[0] = length; 3621 } else 3622 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 3623 } 3624 3625 static unsigned rb_calculate_event_length(unsigned length) 3626 { 3627 struct ring_buffer_event event; /* Used only for sizeof array */ 3628 3629 /* zero length can cause confusions */ 3630 if (!length) 3631 length++; 3632 3633 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 3634 length += sizeof(event.array[0]); 3635 3636 length += RB_EVNT_HDR_SIZE; 3637 length = ALIGN(length, RB_ARCH_ALIGNMENT); 3638 3639 /* 3640 * In case the time delta is larger than the 27 bits for it 3641 * in the header, we need to add a timestamp. If another 3642 * event comes in when trying to discard this one to increase 3643 * the length, then the timestamp will be added in the allocated 3644 * space of this event. If length is bigger than the size needed 3645 * for the TIME_EXTEND, then padding has to be used. The events 3646 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 3647 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 3648 * As length is a multiple of 4, we only need to worry if it 3649 * is 12 (RB_LEN_TIME_EXTEND + 4). 3650 */ 3651 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 3652 length += RB_ALIGNMENT; 3653 3654 return length; 3655 } 3656 3657 static inline bool 3658 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 3659 struct ring_buffer_event *event) 3660 { 3661 unsigned long new_index, old_index; 3662 struct buffer_page *bpage; 3663 unsigned long addr; 3664 3665 new_index = rb_event_index(cpu_buffer, event); 3666 old_index = new_index + rb_event_ts_length(event); 3667 addr = (unsigned long)event; 3668 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3669 3670 bpage = READ_ONCE(cpu_buffer->tail_page); 3671 3672 /* 3673 * Make sure the tail_page is still the same and 3674 * the next write location is the end of this event 3675 */ 3676 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3677 unsigned long write_mask = 3678 local_read(&bpage->write) & ~RB_WRITE_MASK; 3679 unsigned long event_length = rb_event_length(event); 3680 3681 /* 3682 * For the before_stamp to be different than the write_stamp 3683 * to make sure that the next event adds an absolute 3684 * value and does not rely on the saved write stamp, which 3685 * is now going to be bogus. 3686 * 3687 * By setting the before_stamp to zero, the next event 3688 * is not going to use the write_stamp and will instead 3689 * create an absolute timestamp. This means there's no 3690 * reason to update the wirte_stamp! 3691 */ 3692 rb_time_set(&cpu_buffer->before_stamp, 0); 3693 3694 /* 3695 * If an event were to come in now, it would see that the 3696 * write_stamp and the before_stamp are different, and assume 3697 * that this event just added itself before updating 3698 * the write stamp. The interrupting event will fix the 3699 * write stamp for us, and use an absolute timestamp. 3700 */ 3701 3702 /* 3703 * This is on the tail page. It is possible that 3704 * a write could come in and move the tail page 3705 * and write to the next page. That is fine 3706 * because we just shorten what is on this page. 3707 */ 3708 old_index += write_mask; 3709 new_index += write_mask; 3710 3711 /* caution: old_index gets updated on cmpxchg failure */ 3712 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 3713 /* update counters */ 3714 local_sub(event_length, &cpu_buffer->entries_bytes); 3715 return true; 3716 } 3717 } 3718 3719 /* could not discard */ 3720 return false; 3721 } 3722 3723 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3724 { 3725 local_inc(&cpu_buffer->committing); 3726 local_inc(&cpu_buffer->commits); 3727 } 3728 3729 static __always_inline void 3730 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3731 { 3732 unsigned long max_count; 3733 3734 /* 3735 * We only race with interrupts and NMIs on this CPU. 3736 * If we own the commit event, then we can commit 3737 * all others that interrupted us, since the interruptions 3738 * are in stack format (they finish before they come 3739 * back to us). This allows us to do a simple loop to 3740 * assign the commit to the tail. 3741 */ 3742 again: 3743 max_count = cpu_buffer->nr_pages * 100; 3744 3745 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3746 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3747 return; 3748 if (RB_WARN_ON(cpu_buffer, 3749 rb_is_reader_page(cpu_buffer->tail_page))) 3750 return; 3751 /* 3752 * No need for a memory barrier here, as the update 3753 * of the tail_page did it for this page. 3754 */ 3755 local_set(&cpu_buffer->commit_page->page->commit, 3756 rb_page_write(cpu_buffer->commit_page)); 3757 rb_inc_page(&cpu_buffer->commit_page); 3758 if (cpu_buffer->ring_meta) { 3759 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 3760 meta->commit_buffer = (unsigned long)cpu_buffer->commit_page->page; 3761 } 3762 /* add barrier to keep gcc from optimizing too much */ 3763 barrier(); 3764 } 3765 while (rb_commit_index(cpu_buffer) != 3766 rb_page_write(cpu_buffer->commit_page)) { 3767 3768 /* Make sure the readers see the content of what is committed. */ 3769 smp_wmb(); 3770 local_set(&cpu_buffer->commit_page->page->commit, 3771 rb_page_write(cpu_buffer->commit_page)); 3772 RB_WARN_ON(cpu_buffer, 3773 local_read(&cpu_buffer->commit_page->page->commit) & 3774 ~RB_WRITE_MASK); 3775 barrier(); 3776 } 3777 3778 /* again, keep gcc from optimizing */ 3779 barrier(); 3780 3781 /* 3782 * If an interrupt came in just after the first while loop 3783 * and pushed the tail page forward, we will be left with 3784 * a dangling commit that will never go forward. 3785 */ 3786 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3787 goto again; 3788 } 3789 3790 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3791 { 3792 unsigned long commits; 3793 3794 if (RB_WARN_ON(cpu_buffer, 3795 !local_read(&cpu_buffer->committing))) 3796 return; 3797 3798 again: 3799 commits = local_read(&cpu_buffer->commits); 3800 /* synchronize with interrupts */ 3801 barrier(); 3802 if (local_read(&cpu_buffer->committing) == 1) 3803 rb_set_commit_to_write(cpu_buffer); 3804 3805 local_dec(&cpu_buffer->committing); 3806 3807 /* synchronize with interrupts */ 3808 barrier(); 3809 3810 /* 3811 * Need to account for interrupts coming in between the 3812 * updating of the commit page and the clearing of the 3813 * committing counter. 3814 */ 3815 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3816 !local_read(&cpu_buffer->committing)) { 3817 local_inc(&cpu_buffer->committing); 3818 goto again; 3819 } 3820 } 3821 3822 static inline void rb_event_discard(struct ring_buffer_event *event) 3823 { 3824 if (extended_time(event)) 3825 event = skip_time_extend(event); 3826 3827 /* array[0] holds the actual length for the discarded event */ 3828 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 3829 event->type_len = RINGBUF_TYPE_PADDING; 3830 /* time delta must be non zero */ 3831 if (!event->time_delta) 3832 event->time_delta = 1; 3833 } 3834 3835 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 3836 { 3837 local_inc(&cpu_buffer->entries); 3838 rb_end_commit(cpu_buffer); 3839 } 3840 3841 static __always_inline void 3842 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 3843 { 3844 if (buffer->irq_work.waiters_pending) { 3845 buffer->irq_work.waiters_pending = false; 3846 /* irq_work_queue() supplies it's own memory barriers */ 3847 irq_work_queue(&buffer->irq_work.work); 3848 } 3849 3850 if (cpu_buffer->irq_work.waiters_pending) { 3851 cpu_buffer->irq_work.waiters_pending = false; 3852 /* irq_work_queue() supplies it's own memory barriers */ 3853 irq_work_queue(&cpu_buffer->irq_work.work); 3854 } 3855 3856 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 3857 return; 3858 3859 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 3860 return; 3861 3862 if (!cpu_buffer->irq_work.full_waiters_pending) 3863 return; 3864 3865 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 3866 3867 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 3868 return; 3869 3870 cpu_buffer->irq_work.wakeup_full = true; 3871 cpu_buffer->irq_work.full_waiters_pending = false; 3872 /* irq_work_queue() supplies it's own memory barriers */ 3873 irq_work_queue(&cpu_buffer->irq_work.work); 3874 } 3875 3876 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 3877 # define do_ring_buffer_record_recursion() \ 3878 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 3879 #else 3880 # define do_ring_buffer_record_recursion() do { } while (0) 3881 #endif 3882 3883 /* 3884 * The lock and unlock are done within a preempt disable section. 3885 * The current_context per_cpu variable can only be modified 3886 * by the current task between lock and unlock. But it can 3887 * be modified more than once via an interrupt. To pass this 3888 * information from the lock to the unlock without having to 3889 * access the 'in_interrupt()' functions again (which do show 3890 * a bit of overhead in something as critical as function tracing, 3891 * we use a bitmask trick. 3892 * 3893 * bit 1 = NMI context 3894 * bit 2 = IRQ context 3895 * bit 3 = SoftIRQ context 3896 * bit 4 = normal context. 3897 * 3898 * This works because this is the order of contexts that can 3899 * preempt other contexts. A SoftIRQ never preempts an IRQ 3900 * context. 3901 * 3902 * When the context is determined, the corresponding bit is 3903 * checked and set (if it was set, then a recursion of that context 3904 * happened). 3905 * 3906 * On unlock, we need to clear this bit. To do so, just subtract 3907 * 1 from the current_context and AND it to itself. 3908 * 3909 * (binary) 3910 * 101 - 1 = 100 3911 * 101 & 100 = 100 (clearing bit zero) 3912 * 3913 * 1010 - 1 = 1001 3914 * 1010 & 1001 = 1000 (clearing bit 1) 3915 * 3916 * The least significant bit can be cleared this way, and it 3917 * just so happens that it is the same bit corresponding to 3918 * the current context. 3919 * 3920 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3921 * is set when a recursion is detected at the current context, and if 3922 * the TRANSITION bit is already set, it will fail the recursion. 3923 * This is needed because there's a lag between the changing of 3924 * interrupt context and updating the preempt count. In this case, 3925 * a false positive will be found. To handle this, one extra recursion 3926 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3927 * bit is already set, then it is considered a recursion and the function 3928 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3929 * 3930 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3931 * to be cleared. Even if it wasn't the context that set it. That is, 3932 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3933 * is called before preempt_count() is updated, since the check will 3934 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3935 * NMI then comes in, it will set the NMI bit, but when the NMI code 3936 * does the trace_recursive_unlock() it will clear the TRANSITION bit 3937 * and leave the NMI bit set. But this is fine, because the interrupt 3938 * code that set the TRANSITION bit will then clear the NMI bit when it 3939 * calls trace_recursive_unlock(). If another NMI comes in, it will 3940 * set the TRANSITION bit and continue. 3941 * 3942 * Note: The TRANSITION bit only handles a single transition between context. 3943 */ 3944 3945 static __always_inline bool 3946 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 3947 { 3948 unsigned int val = cpu_buffer->current_context; 3949 int bit = interrupt_context_level(); 3950 3951 bit = RB_CTX_NORMAL - bit; 3952 3953 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 3954 /* 3955 * It is possible that this was called by transitioning 3956 * between interrupt context, and preempt_count() has not 3957 * been updated yet. In this case, use the TRANSITION bit. 3958 */ 3959 bit = RB_CTX_TRANSITION; 3960 if (val & (1 << (bit + cpu_buffer->nest))) { 3961 do_ring_buffer_record_recursion(); 3962 return true; 3963 } 3964 } 3965 3966 val |= (1 << (bit + cpu_buffer->nest)); 3967 cpu_buffer->current_context = val; 3968 3969 return false; 3970 } 3971 3972 static __always_inline void 3973 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 3974 { 3975 cpu_buffer->current_context &= 3976 cpu_buffer->current_context - (1 << cpu_buffer->nest); 3977 } 3978 3979 /* The recursive locking above uses 5 bits */ 3980 #define NESTED_BITS 5 3981 3982 /** 3983 * ring_buffer_nest_start - Allow to trace while nested 3984 * @buffer: The ring buffer to modify 3985 * 3986 * The ring buffer has a safety mechanism to prevent recursion. 3987 * But there may be a case where a trace needs to be done while 3988 * tracing something else. In this case, calling this function 3989 * will allow this function to nest within a currently active 3990 * ring_buffer_lock_reserve(). 3991 * 3992 * Call this function before calling another ring_buffer_lock_reserve() and 3993 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 3994 */ 3995 void ring_buffer_nest_start(struct trace_buffer *buffer) 3996 { 3997 struct ring_buffer_per_cpu *cpu_buffer; 3998 int cpu; 3999 4000 /* Enabled by ring_buffer_nest_end() */ 4001 preempt_disable_notrace(); 4002 cpu = raw_smp_processor_id(); 4003 cpu_buffer = buffer->buffers[cpu]; 4004 /* This is the shift value for the above recursive locking */ 4005 cpu_buffer->nest += NESTED_BITS; 4006 } 4007 4008 /** 4009 * ring_buffer_nest_end - Allow to trace while nested 4010 * @buffer: The ring buffer to modify 4011 * 4012 * Must be called after ring_buffer_nest_start() and after the 4013 * ring_buffer_unlock_commit(). 4014 */ 4015 void ring_buffer_nest_end(struct trace_buffer *buffer) 4016 { 4017 struct ring_buffer_per_cpu *cpu_buffer; 4018 int cpu; 4019 4020 /* disabled by ring_buffer_nest_start() */ 4021 cpu = raw_smp_processor_id(); 4022 cpu_buffer = buffer->buffers[cpu]; 4023 /* This is the shift value for the above recursive locking */ 4024 cpu_buffer->nest -= NESTED_BITS; 4025 preempt_enable_notrace(); 4026 } 4027 4028 /** 4029 * ring_buffer_unlock_commit - commit a reserved 4030 * @buffer: The buffer to commit to 4031 * 4032 * This commits the data to the ring buffer, and releases any locks held. 4033 * 4034 * Must be paired with ring_buffer_lock_reserve. 4035 */ 4036 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 4037 { 4038 struct ring_buffer_per_cpu *cpu_buffer; 4039 int cpu = raw_smp_processor_id(); 4040 4041 cpu_buffer = buffer->buffers[cpu]; 4042 4043 rb_commit(cpu_buffer); 4044 4045 rb_wakeups(buffer, cpu_buffer); 4046 4047 trace_recursive_unlock(cpu_buffer); 4048 4049 preempt_enable_notrace(); 4050 4051 return 0; 4052 } 4053 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 4054 4055 /* Special value to validate all deltas on a page. */ 4056 #define CHECK_FULL_PAGE 1L 4057 4058 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 4059 4060 static const char *show_irq_str(int bits) 4061 { 4062 const char *type[] = { 4063 ".", // 0 4064 "s", // 1 4065 "h", // 2 4066 "Hs", // 3 4067 "n", // 4 4068 "Ns", // 5 4069 "Nh", // 6 4070 "NHs", // 7 4071 }; 4072 4073 return type[bits]; 4074 } 4075 4076 /* Assume this is a trace event */ 4077 static const char *show_flags(struct ring_buffer_event *event) 4078 { 4079 struct trace_entry *entry; 4080 int bits = 0; 4081 4082 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4083 return "X"; 4084 4085 entry = ring_buffer_event_data(event); 4086 4087 if (entry->flags & TRACE_FLAG_SOFTIRQ) 4088 bits |= 1; 4089 4090 if (entry->flags & TRACE_FLAG_HARDIRQ) 4091 bits |= 2; 4092 4093 if (entry->flags & TRACE_FLAG_NMI) 4094 bits |= 4; 4095 4096 return show_irq_str(bits); 4097 } 4098 4099 static const char *show_irq(struct ring_buffer_event *event) 4100 { 4101 struct trace_entry *entry; 4102 4103 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4104 return ""; 4105 4106 entry = ring_buffer_event_data(event); 4107 if (entry->flags & TRACE_FLAG_IRQS_OFF) 4108 return "d"; 4109 return ""; 4110 } 4111 4112 static const char *show_interrupt_level(void) 4113 { 4114 unsigned long pc = preempt_count(); 4115 unsigned char level = 0; 4116 4117 if (pc & SOFTIRQ_OFFSET) 4118 level |= 1; 4119 4120 if (pc & HARDIRQ_MASK) 4121 level |= 2; 4122 4123 if (pc & NMI_MASK) 4124 level |= 4; 4125 4126 return show_irq_str(level); 4127 } 4128 4129 static void dump_buffer_page(struct buffer_data_page *bpage, 4130 struct rb_event_info *info, 4131 unsigned long tail) 4132 { 4133 struct ring_buffer_event *event; 4134 u64 ts, delta; 4135 int e; 4136 4137 ts = bpage->time_stamp; 4138 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 4139 4140 for (e = 0; e < tail; e += rb_event_length(event)) { 4141 4142 event = (struct ring_buffer_event *)(bpage->data + e); 4143 4144 switch (event->type_len) { 4145 4146 case RINGBUF_TYPE_TIME_EXTEND: 4147 delta = rb_event_time_stamp(event); 4148 ts += delta; 4149 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 4150 e, ts, delta); 4151 break; 4152 4153 case RINGBUF_TYPE_TIME_STAMP: 4154 delta = rb_event_time_stamp(event); 4155 ts = rb_fix_abs_ts(delta, ts); 4156 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 4157 e, ts, delta); 4158 break; 4159 4160 case RINGBUF_TYPE_PADDING: 4161 ts += event->time_delta; 4162 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 4163 e, ts, event->time_delta); 4164 break; 4165 4166 case RINGBUF_TYPE_DATA: 4167 ts += event->time_delta; 4168 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 4169 e, ts, event->time_delta, 4170 show_flags(event), show_irq(event)); 4171 break; 4172 4173 default: 4174 break; 4175 } 4176 } 4177 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 4178 } 4179 4180 static DEFINE_PER_CPU(atomic_t, checking); 4181 static atomic_t ts_dump; 4182 4183 #define buffer_warn_return(fmt, ...) \ 4184 do { \ 4185 /* If another report is happening, ignore this one */ \ 4186 if (atomic_inc_return(&ts_dump) != 1) { \ 4187 atomic_dec(&ts_dump); \ 4188 goto out; \ 4189 } \ 4190 atomic_inc(&cpu_buffer->record_disabled); \ 4191 pr_warn(fmt, ##__VA_ARGS__); \ 4192 dump_buffer_page(bpage, info, tail); \ 4193 atomic_dec(&ts_dump); \ 4194 /* There's some cases in boot up that this can happen */ \ 4195 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 4196 /* Do not re-enable checking */ \ 4197 return; \ 4198 } while (0) 4199 4200 /* 4201 * Check if the current event time stamp matches the deltas on 4202 * the buffer page. 4203 */ 4204 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4205 struct rb_event_info *info, 4206 unsigned long tail) 4207 { 4208 struct buffer_data_page *bpage; 4209 u64 ts, delta; 4210 bool full = false; 4211 int ret; 4212 4213 bpage = info->tail_page->page; 4214 4215 if (tail == CHECK_FULL_PAGE) { 4216 full = true; 4217 tail = local_read(&bpage->commit); 4218 } else if (info->add_timestamp & 4219 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 4220 /* Ignore events with absolute time stamps */ 4221 return; 4222 } 4223 4224 /* 4225 * Do not check the first event (skip possible extends too). 4226 * Also do not check if previous events have not been committed. 4227 */ 4228 if (tail <= 8 || tail > local_read(&bpage->commit)) 4229 return; 4230 4231 /* 4232 * If this interrupted another event, 4233 */ 4234 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 4235 goto out; 4236 4237 ret = rb_read_data_buffer(bpage, tail, cpu_buffer->cpu, &ts, &delta); 4238 if (ret < 0) { 4239 if (delta < ts) { 4240 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 4241 cpu_buffer->cpu, ts, delta); 4242 goto out; 4243 } 4244 } 4245 if ((full && ts > info->ts) || 4246 (!full && ts + info->delta != info->ts)) { 4247 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 4248 cpu_buffer->cpu, 4249 ts + info->delta, info->ts, info->delta, 4250 info->before, info->after, 4251 full ? " (full)" : "", show_interrupt_level()); 4252 } 4253 out: 4254 atomic_dec(this_cpu_ptr(&checking)); 4255 } 4256 #else 4257 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4258 struct rb_event_info *info, 4259 unsigned long tail) 4260 { 4261 } 4262 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 4263 4264 static struct ring_buffer_event * 4265 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 4266 struct rb_event_info *info) 4267 { 4268 struct ring_buffer_event *event; 4269 struct buffer_page *tail_page; 4270 unsigned long tail, write, w; 4271 4272 /* Don't let the compiler play games with cpu_buffer->tail_page */ 4273 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 4274 4275 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 4276 barrier(); 4277 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4278 rb_time_read(&cpu_buffer->write_stamp, &info->after); 4279 barrier(); 4280 info->ts = rb_time_stamp(cpu_buffer->buffer); 4281 4282 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 4283 info->delta = info->ts; 4284 } else { 4285 /* 4286 * If interrupting an event time update, we may need an 4287 * absolute timestamp. 4288 * Don't bother if this is the start of a new page (w == 0). 4289 */ 4290 if (!w) { 4291 /* Use the sub-buffer timestamp */ 4292 info->delta = 0; 4293 } else if (unlikely(info->before != info->after)) { 4294 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 4295 info->length += RB_LEN_TIME_EXTEND; 4296 } else { 4297 info->delta = info->ts - info->after; 4298 if (unlikely(test_time_stamp(info->delta))) { 4299 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 4300 info->length += RB_LEN_TIME_EXTEND; 4301 } 4302 } 4303 } 4304 4305 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 4306 4307 /*C*/ write = local_add_return(info->length, &tail_page->write); 4308 4309 /* set write to only the index of the write */ 4310 write &= RB_WRITE_MASK; 4311 4312 tail = write - info->length; 4313 4314 /* See if we shot pass the end of this buffer page */ 4315 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 4316 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 4317 return rb_move_tail(cpu_buffer, tail, info); 4318 } 4319 4320 if (likely(tail == w)) { 4321 /* Nothing interrupted us between A and C */ 4322 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 4323 /* 4324 * If something came in between C and D, the write stamp 4325 * may now not be in sync. But that's fine as the before_stamp 4326 * will be different and then next event will just be forced 4327 * to use an absolute timestamp. 4328 */ 4329 if (likely(!(info->add_timestamp & 4330 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4331 /* This did not interrupt any time update */ 4332 info->delta = info->ts - info->after; 4333 else 4334 /* Just use full timestamp for interrupting event */ 4335 info->delta = info->ts; 4336 check_buffer(cpu_buffer, info, tail); 4337 } else { 4338 u64 ts; 4339 /* SLOW PATH - Interrupted between A and C */ 4340 4341 /* Save the old before_stamp */ 4342 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4343 4344 /* 4345 * Read a new timestamp and update the before_stamp to make 4346 * the next event after this one force using an absolute 4347 * timestamp. This is in case an interrupt were to come in 4348 * between E and F. 4349 */ 4350 ts = rb_time_stamp(cpu_buffer->buffer); 4351 rb_time_set(&cpu_buffer->before_stamp, ts); 4352 4353 barrier(); 4354 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 4355 barrier(); 4356 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 4357 info->after == info->before && info->after < ts) { 4358 /* 4359 * Nothing came after this event between C and F, it is 4360 * safe to use info->after for the delta as it 4361 * matched info->before and is still valid. 4362 */ 4363 info->delta = ts - info->after; 4364 } else { 4365 /* 4366 * Interrupted between C and F: 4367 * Lost the previous events time stamp. Just set the 4368 * delta to zero, and this will be the same time as 4369 * the event this event interrupted. And the events that 4370 * came after this will still be correct (as they would 4371 * have built their delta on the previous event. 4372 */ 4373 info->delta = 0; 4374 } 4375 info->ts = ts; 4376 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 4377 } 4378 4379 /* 4380 * If this is the first commit on the page, then it has the same 4381 * timestamp as the page itself. 4382 */ 4383 if (unlikely(!tail && !(info->add_timestamp & 4384 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4385 info->delta = 0; 4386 4387 /* We reserved something on the buffer */ 4388 4389 event = __rb_page_index(tail_page, tail); 4390 rb_update_event(cpu_buffer, event, info); 4391 4392 local_inc(&tail_page->entries); 4393 4394 /* 4395 * If this is the first commit on the page, then update 4396 * its timestamp. 4397 */ 4398 if (unlikely(!tail)) 4399 tail_page->page->time_stamp = info->ts; 4400 4401 /* account for these added bytes */ 4402 local_add(info->length, &cpu_buffer->entries_bytes); 4403 4404 return event; 4405 } 4406 4407 static __always_inline struct ring_buffer_event * 4408 rb_reserve_next_event(struct trace_buffer *buffer, 4409 struct ring_buffer_per_cpu *cpu_buffer, 4410 unsigned long length) 4411 { 4412 struct ring_buffer_event *event; 4413 struct rb_event_info info; 4414 int nr_loops = 0; 4415 int add_ts_default; 4416 4417 /* 4418 * ring buffer does cmpxchg as well as atomic64 operations 4419 * (which some archs use locking for atomic64), make sure this 4420 * is safe in NMI context 4421 */ 4422 if ((!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) || 4423 IS_ENABLED(CONFIG_GENERIC_ATOMIC64)) && 4424 (unlikely(in_nmi()))) { 4425 return NULL; 4426 } 4427 4428 rb_start_commit(cpu_buffer); 4429 /* The commit page can not change after this */ 4430 4431 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4432 /* 4433 * Due to the ability to swap a cpu buffer from a buffer 4434 * it is possible it was swapped before we committed. 4435 * (committing stops a swap). We check for it here and 4436 * if it happened, we have to fail the write. 4437 */ 4438 barrier(); 4439 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 4440 local_dec(&cpu_buffer->committing); 4441 local_dec(&cpu_buffer->commits); 4442 return NULL; 4443 } 4444 #endif 4445 4446 info.length = rb_calculate_event_length(length); 4447 4448 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 4449 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 4450 info.length += RB_LEN_TIME_EXTEND; 4451 if (info.length > cpu_buffer->buffer->max_data_size) 4452 goto out_fail; 4453 } else { 4454 add_ts_default = RB_ADD_STAMP_NONE; 4455 } 4456 4457 again: 4458 info.add_timestamp = add_ts_default; 4459 info.delta = 0; 4460 4461 /* 4462 * We allow for interrupts to reenter here and do a trace. 4463 * If one does, it will cause this original code to loop 4464 * back here. Even with heavy interrupts happening, this 4465 * should only happen a few times in a row. If this happens 4466 * 1000 times in a row, there must be either an interrupt 4467 * storm or we have something buggy. 4468 * Bail! 4469 */ 4470 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 4471 goto out_fail; 4472 4473 event = __rb_reserve_next(cpu_buffer, &info); 4474 4475 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 4476 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 4477 info.length -= RB_LEN_TIME_EXTEND; 4478 goto again; 4479 } 4480 4481 if (likely(event)) 4482 return event; 4483 out_fail: 4484 rb_end_commit(cpu_buffer); 4485 return NULL; 4486 } 4487 4488 /** 4489 * ring_buffer_lock_reserve - reserve a part of the buffer 4490 * @buffer: the ring buffer to reserve from 4491 * @length: the length of the data to reserve (excluding event header) 4492 * 4493 * Returns a reserved event on the ring buffer to copy directly to. 4494 * The user of this interface will need to get the body to write into 4495 * and can use the ring_buffer_event_data() interface. 4496 * 4497 * The length is the length of the data needed, not the event length 4498 * which also includes the event header. 4499 * 4500 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 4501 * If NULL is returned, then nothing has been allocated or locked. 4502 */ 4503 struct ring_buffer_event * 4504 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 4505 { 4506 struct ring_buffer_per_cpu *cpu_buffer; 4507 struct ring_buffer_event *event; 4508 int cpu; 4509 4510 /* If we are tracing schedule, we don't want to recurse */ 4511 preempt_disable_notrace(); 4512 4513 if (unlikely(atomic_read(&buffer->record_disabled))) 4514 goto out; 4515 4516 cpu = raw_smp_processor_id(); 4517 4518 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 4519 goto out; 4520 4521 cpu_buffer = buffer->buffers[cpu]; 4522 4523 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 4524 goto out; 4525 4526 if (unlikely(length > buffer->max_data_size)) 4527 goto out; 4528 4529 if (unlikely(trace_recursive_lock(cpu_buffer))) 4530 goto out; 4531 4532 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4533 if (!event) 4534 goto out_unlock; 4535 4536 return event; 4537 4538 out_unlock: 4539 trace_recursive_unlock(cpu_buffer); 4540 out: 4541 preempt_enable_notrace(); 4542 return NULL; 4543 } 4544 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 4545 4546 /* 4547 * Decrement the entries to the page that an event is on. 4548 * The event does not even need to exist, only the pointer 4549 * to the page it is on. This may only be called before the commit 4550 * takes place. 4551 */ 4552 static inline void 4553 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 4554 struct ring_buffer_event *event) 4555 { 4556 unsigned long addr = (unsigned long)event; 4557 struct buffer_page *bpage = cpu_buffer->commit_page; 4558 struct buffer_page *start; 4559 4560 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 4561 4562 /* Do the likely case first */ 4563 if (likely(bpage->page == (void *)addr)) { 4564 local_dec(&bpage->entries); 4565 return; 4566 } 4567 4568 /* 4569 * Because the commit page may be on the reader page we 4570 * start with the next page and check the end loop there. 4571 */ 4572 rb_inc_page(&bpage); 4573 start = bpage; 4574 do { 4575 if (bpage->page == (void *)addr) { 4576 local_dec(&bpage->entries); 4577 return; 4578 } 4579 rb_inc_page(&bpage); 4580 } while (bpage != start); 4581 4582 /* commit not part of this buffer?? */ 4583 RB_WARN_ON(cpu_buffer, 1); 4584 } 4585 4586 /** 4587 * ring_buffer_discard_commit - discard an event that has not been committed 4588 * @buffer: the ring buffer 4589 * @event: non committed event to discard 4590 * 4591 * Sometimes an event that is in the ring buffer needs to be ignored. 4592 * This function lets the user discard an event in the ring buffer 4593 * and then that event will not be read later. 4594 * 4595 * This function only works if it is called before the item has been 4596 * committed. It will try to free the event from the ring buffer 4597 * if another event has not been added behind it. 4598 * 4599 * If another event has been added behind it, it will set the event 4600 * up as discarded, and perform the commit. 4601 * 4602 * If this function is called, do not call ring_buffer_unlock_commit on 4603 * the event. 4604 */ 4605 void ring_buffer_discard_commit(struct trace_buffer *buffer, 4606 struct ring_buffer_event *event) 4607 { 4608 struct ring_buffer_per_cpu *cpu_buffer; 4609 int cpu; 4610 4611 /* The event is discarded regardless */ 4612 rb_event_discard(event); 4613 4614 cpu = smp_processor_id(); 4615 cpu_buffer = buffer->buffers[cpu]; 4616 4617 /* 4618 * This must only be called if the event has not been 4619 * committed yet. Thus we can assume that preemption 4620 * is still disabled. 4621 */ 4622 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 4623 4624 rb_decrement_entry(cpu_buffer, event); 4625 if (rb_try_to_discard(cpu_buffer, event)) 4626 goto out; 4627 4628 out: 4629 rb_end_commit(cpu_buffer); 4630 4631 trace_recursive_unlock(cpu_buffer); 4632 4633 preempt_enable_notrace(); 4634 4635 } 4636 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 4637 4638 /** 4639 * ring_buffer_write - write data to the buffer without reserving 4640 * @buffer: The ring buffer to write to. 4641 * @length: The length of the data being written (excluding the event header) 4642 * @data: The data to write to the buffer. 4643 * 4644 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 4645 * one function. If you already have the data to write to the buffer, it 4646 * may be easier to simply call this function. 4647 * 4648 * Note, like ring_buffer_lock_reserve, the length is the length of the data 4649 * and not the length of the event which would hold the header. 4650 */ 4651 int ring_buffer_write(struct trace_buffer *buffer, 4652 unsigned long length, 4653 void *data) 4654 { 4655 struct ring_buffer_per_cpu *cpu_buffer; 4656 struct ring_buffer_event *event; 4657 void *body; 4658 int ret = -EBUSY; 4659 int cpu; 4660 4661 preempt_disable_notrace(); 4662 4663 if (atomic_read(&buffer->record_disabled)) 4664 goto out; 4665 4666 cpu = raw_smp_processor_id(); 4667 4668 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4669 goto out; 4670 4671 cpu_buffer = buffer->buffers[cpu]; 4672 4673 if (atomic_read(&cpu_buffer->record_disabled)) 4674 goto out; 4675 4676 if (length > buffer->max_data_size) 4677 goto out; 4678 4679 if (unlikely(trace_recursive_lock(cpu_buffer))) 4680 goto out; 4681 4682 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4683 if (!event) 4684 goto out_unlock; 4685 4686 body = rb_event_data(event); 4687 4688 memcpy(body, data, length); 4689 4690 rb_commit(cpu_buffer); 4691 4692 rb_wakeups(buffer, cpu_buffer); 4693 4694 ret = 0; 4695 4696 out_unlock: 4697 trace_recursive_unlock(cpu_buffer); 4698 4699 out: 4700 preempt_enable_notrace(); 4701 4702 return ret; 4703 } 4704 EXPORT_SYMBOL_GPL(ring_buffer_write); 4705 4706 /* 4707 * The total entries in the ring buffer is the running counter 4708 * of entries entered into the ring buffer, minus the sum of 4709 * the entries read from the ring buffer and the number of 4710 * entries that were overwritten. 4711 */ 4712 static inline unsigned long 4713 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4714 { 4715 return local_read(&cpu_buffer->entries) - 4716 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4717 } 4718 4719 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 4720 { 4721 return !rb_num_of_entries(cpu_buffer); 4722 } 4723 4724 /** 4725 * ring_buffer_record_disable - stop all writes into the buffer 4726 * @buffer: The ring buffer to stop writes to. 4727 * 4728 * This prevents all writes to the buffer. Any attempt to write 4729 * to the buffer after this will fail and return NULL. 4730 * 4731 * The caller should call synchronize_rcu() after this. 4732 */ 4733 void ring_buffer_record_disable(struct trace_buffer *buffer) 4734 { 4735 atomic_inc(&buffer->record_disabled); 4736 } 4737 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4738 4739 /** 4740 * ring_buffer_record_enable - enable writes to the buffer 4741 * @buffer: The ring buffer to enable writes 4742 * 4743 * Note, multiple disables will need the same number of enables 4744 * to truly enable the writing (much like preempt_disable). 4745 */ 4746 void ring_buffer_record_enable(struct trace_buffer *buffer) 4747 { 4748 atomic_dec(&buffer->record_disabled); 4749 } 4750 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4751 4752 /** 4753 * ring_buffer_record_off - stop all writes into the buffer 4754 * @buffer: The ring buffer to stop writes to. 4755 * 4756 * This prevents all writes to the buffer. Any attempt to write 4757 * to the buffer after this will fail and return NULL. 4758 * 4759 * This is different than ring_buffer_record_disable() as 4760 * it works like an on/off switch, where as the disable() version 4761 * must be paired with a enable(). 4762 */ 4763 void ring_buffer_record_off(struct trace_buffer *buffer) 4764 { 4765 unsigned int rd; 4766 unsigned int new_rd; 4767 4768 rd = atomic_read(&buffer->record_disabled); 4769 do { 4770 new_rd = rd | RB_BUFFER_OFF; 4771 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4772 } 4773 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4774 4775 /** 4776 * ring_buffer_record_on - restart writes into the buffer 4777 * @buffer: The ring buffer to start writes to. 4778 * 4779 * This enables all writes to the buffer that was disabled by 4780 * ring_buffer_record_off(). 4781 * 4782 * This is different than ring_buffer_record_enable() as 4783 * it works like an on/off switch, where as the enable() version 4784 * must be paired with a disable(). 4785 */ 4786 void ring_buffer_record_on(struct trace_buffer *buffer) 4787 { 4788 unsigned int rd; 4789 unsigned int new_rd; 4790 4791 rd = atomic_read(&buffer->record_disabled); 4792 do { 4793 new_rd = rd & ~RB_BUFFER_OFF; 4794 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4795 } 4796 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4797 4798 /** 4799 * ring_buffer_record_is_on - return true if the ring buffer can write 4800 * @buffer: The ring buffer to see if write is enabled 4801 * 4802 * Returns true if the ring buffer is in a state that it accepts writes. 4803 */ 4804 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4805 { 4806 return !atomic_read(&buffer->record_disabled); 4807 } 4808 4809 /** 4810 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4811 * @buffer: The ring buffer to see if write is set enabled 4812 * 4813 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4814 * Note that this does NOT mean it is in a writable state. 4815 * 4816 * It may return true when the ring buffer has been disabled by 4817 * ring_buffer_record_disable(), as that is a temporary disabling of 4818 * the ring buffer. 4819 */ 4820 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4821 { 4822 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4823 } 4824 4825 /** 4826 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4827 * @buffer: The ring buffer to stop writes to. 4828 * @cpu: The CPU buffer to stop 4829 * 4830 * This prevents all writes to the buffer. Any attempt to write 4831 * to the buffer after this will fail and return NULL. 4832 * 4833 * The caller should call synchronize_rcu() after this. 4834 */ 4835 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 4836 { 4837 struct ring_buffer_per_cpu *cpu_buffer; 4838 4839 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4840 return; 4841 4842 cpu_buffer = buffer->buffers[cpu]; 4843 atomic_inc(&cpu_buffer->record_disabled); 4844 } 4845 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 4846 4847 /** 4848 * ring_buffer_record_enable_cpu - enable writes to the buffer 4849 * @buffer: The ring buffer to enable writes 4850 * @cpu: The CPU to enable. 4851 * 4852 * Note, multiple disables will need the same number of enables 4853 * to truly enable the writing (much like preempt_disable). 4854 */ 4855 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 4856 { 4857 struct ring_buffer_per_cpu *cpu_buffer; 4858 4859 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4860 return; 4861 4862 cpu_buffer = buffer->buffers[cpu]; 4863 atomic_dec(&cpu_buffer->record_disabled); 4864 } 4865 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 4866 4867 /** 4868 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 4869 * @buffer: The ring buffer 4870 * @cpu: The per CPU buffer to read from. 4871 */ 4872 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 4873 { 4874 unsigned long flags; 4875 struct ring_buffer_per_cpu *cpu_buffer; 4876 struct buffer_page *bpage; 4877 u64 ret = 0; 4878 4879 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4880 return 0; 4881 4882 cpu_buffer = buffer->buffers[cpu]; 4883 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4884 /* 4885 * if the tail is on reader_page, oldest time stamp is on the reader 4886 * page 4887 */ 4888 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 4889 bpage = cpu_buffer->reader_page; 4890 else 4891 bpage = rb_set_head_page(cpu_buffer); 4892 if (bpage) 4893 ret = bpage->page->time_stamp; 4894 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4895 4896 return ret; 4897 } 4898 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 4899 4900 /** 4901 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 4902 * @buffer: The ring buffer 4903 * @cpu: The per CPU buffer to read from. 4904 */ 4905 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 4906 { 4907 struct ring_buffer_per_cpu *cpu_buffer; 4908 unsigned long ret; 4909 4910 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4911 return 0; 4912 4913 cpu_buffer = buffer->buffers[cpu]; 4914 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 4915 4916 return ret; 4917 } 4918 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 4919 4920 /** 4921 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 4922 * @buffer: The ring buffer 4923 * @cpu: The per CPU buffer to get the entries from. 4924 */ 4925 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 4926 { 4927 struct ring_buffer_per_cpu *cpu_buffer; 4928 4929 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4930 return 0; 4931 4932 cpu_buffer = buffer->buffers[cpu]; 4933 4934 return rb_num_of_entries(cpu_buffer); 4935 } 4936 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 4937 4938 /** 4939 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 4940 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 4941 * @buffer: The ring buffer 4942 * @cpu: The per CPU buffer to get the number of overruns from 4943 */ 4944 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 4945 { 4946 struct ring_buffer_per_cpu *cpu_buffer; 4947 unsigned long ret; 4948 4949 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4950 return 0; 4951 4952 cpu_buffer = buffer->buffers[cpu]; 4953 ret = local_read(&cpu_buffer->overrun); 4954 4955 return ret; 4956 } 4957 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 4958 4959 /** 4960 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 4961 * commits failing due to the buffer wrapping around while there are uncommitted 4962 * events, such as during an interrupt storm. 4963 * @buffer: The ring buffer 4964 * @cpu: The per CPU buffer to get the number of overruns from 4965 */ 4966 unsigned long 4967 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 4968 { 4969 struct ring_buffer_per_cpu *cpu_buffer; 4970 unsigned long ret; 4971 4972 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4973 return 0; 4974 4975 cpu_buffer = buffer->buffers[cpu]; 4976 ret = local_read(&cpu_buffer->commit_overrun); 4977 4978 return ret; 4979 } 4980 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 4981 4982 /** 4983 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 4984 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 4985 * @buffer: The ring buffer 4986 * @cpu: The per CPU buffer to get the number of overruns from 4987 */ 4988 unsigned long 4989 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 4990 { 4991 struct ring_buffer_per_cpu *cpu_buffer; 4992 unsigned long ret; 4993 4994 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4995 return 0; 4996 4997 cpu_buffer = buffer->buffers[cpu]; 4998 ret = local_read(&cpu_buffer->dropped_events); 4999 5000 return ret; 5001 } 5002 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 5003 5004 /** 5005 * ring_buffer_read_events_cpu - get the number of events successfully read 5006 * @buffer: The ring buffer 5007 * @cpu: The per CPU buffer to get the number of events read 5008 */ 5009 unsigned long 5010 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 5011 { 5012 struct ring_buffer_per_cpu *cpu_buffer; 5013 5014 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5015 return 0; 5016 5017 cpu_buffer = buffer->buffers[cpu]; 5018 return cpu_buffer->read; 5019 } 5020 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 5021 5022 /** 5023 * ring_buffer_entries - get the number of entries in a buffer 5024 * @buffer: The ring buffer 5025 * 5026 * Returns the total number of entries in the ring buffer 5027 * (all CPU entries) 5028 */ 5029 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 5030 { 5031 struct ring_buffer_per_cpu *cpu_buffer; 5032 unsigned long entries = 0; 5033 int cpu; 5034 5035 /* if you care about this being correct, lock the buffer */ 5036 for_each_buffer_cpu(buffer, cpu) { 5037 cpu_buffer = buffer->buffers[cpu]; 5038 entries += rb_num_of_entries(cpu_buffer); 5039 } 5040 5041 return entries; 5042 } 5043 EXPORT_SYMBOL_GPL(ring_buffer_entries); 5044 5045 /** 5046 * ring_buffer_overruns - get the number of overruns in buffer 5047 * @buffer: The ring buffer 5048 * 5049 * Returns the total number of overruns in the ring buffer 5050 * (all CPU entries) 5051 */ 5052 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 5053 { 5054 struct ring_buffer_per_cpu *cpu_buffer; 5055 unsigned long overruns = 0; 5056 int cpu; 5057 5058 /* if you care about this being correct, lock the buffer */ 5059 for_each_buffer_cpu(buffer, cpu) { 5060 cpu_buffer = buffer->buffers[cpu]; 5061 overruns += local_read(&cpu_buffer->overrun); 5062 } 5063 5064 return overruns; 5065 } 5066 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 5067 5068 static void rb_iter_reset(struct ring_buffer_iter *iter) 5069 { 5070 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5071 5072 /* Iterator usage is expected to have record disabled */ 5073 iter->head_page = cpu_buffer->reader_page; 5074 iter->head = cpu_buffer->reader_page->read; 5075 iter->next_event = iter->head; 5076 5077 iter->cache_reader_page = iter->head_page; 5078 iter->cache_read = cpu_buffer->read; 5079 iter->cache_pages_removed = cpu_buffer->pages_removed; 5080 5081 if (iter->head) { 5082 iter->read_stamp = cpu_buffer->read_stamp; 5083 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 5084 } else { 5085 iter->read_stamp = iter->head_page->page->time_stamp; 5086 iter->page_stamp = iter->read_stamp; 5087 } 5088 } 5089 5090 /** 5091 * ring_buffer_iter_reset - reset an iterator 5092 * @iter: The iterator to reset 5093 * 5094 * Resets the iterator, so that it will start from the beginning 5095 * again. 5096 */ 5097 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 5098 { 5099 struct ring_buffer_per_cpu *cpu_buffer; 5100 unsigned long flags; 5101 5102 if (!iter) 5103 return; 5104 5105 cpu_buffer = iter->cpu_buffer; 5106 5107 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5108 rb_iter_reset(iter); 5109 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5110 } 5111 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 5112 5113 /** 5114 * ring_buffer_iter_empty - check if an iterator has no more to read 5115 * @iter: The iterator to check 5116 */ 5117 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 5118 { 5119 struct ring_buffer_per_cpu *cpu_buffer; 5120 struct buffer_page *reader; 5121 struct buffer_page *head_page; 5122 struct buffer_page *commit_page; 5123 struct buffer_page *curr_commit_page; 5124 unsigned commit; 5125 u64 curr_commit_ts; 5126 u64 commit_ts; 5127 5128 cpu_buffer = iter->cpu_buffer; 5129 reader = cpu_buffer->reader_page; 5130 head_page = cpu_buffer->head_page; 5131 commit_page = READ_ONCE(cpu_buffer->commit_page); 5132 commit_ts = commit_page->page->time_stamp; 5133 5134 /* 5135 * When the writer goes across pages, it issues a cmpxchg which 5136 * is a mb(), which will synchronize with the rmb here. 5137 * (see rb_tail_page_update()) 5138 */ 5139 smp_rmb(); 5140 commit = rb_page_commit(commit_page); 5141 /* We want to make sure that the commit page doesn't change */ 5142 smp_rmb(); 5143 5144 /* Make sure commit page didn't change */ 5145 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 5146 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 5147 5148 /* If the commit page changed, then there's more data */ 5149 if (curr_commit_page != commit_page || 5150 curr_commit_ts != commit_ts) 5151 return 0; 5152 5153 /* Still racy, as it may return a false positive, but that's OK */ 5154 return ((iter->head_page == commit_page && iter->head >= commit) || 5155 (iter->head_page == reader && commit_page == head_page && 5156 head_page->read == commit && 5157 iter->head == rb_page_size(cpu_buffer->reader_page))); 5158 } 5159 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 5160 5161 static void 5162 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 5163 struct ring_buffer_event *event) 5164 { 5165 u64 delta; 5166 5167 switch (event->type_len) { 5168 case RINGBUF_TYPE_PADDING: 5169 return; 5170 5171 case RINGBUF_TYPE_TIME_EXTEND: 5172 delta = rb_event_time_stamp(event); 5173 cpu_buffer->read_stamp += delta; 5174 return; 5175 5176 case RINGBUF_TYPE_TIME_STAMP: 5177 delta = rb_event_time_stamp(event); 5178 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 5179 cpu_buffer->read_stamp = delta; 5180 return; 5181 5182 case RINGBUF_TYPE_DATA: 5183 cpu_buffer->read_stamp += event->time_delta; 5184 return; 5185 5186 default: 5187 RB_WARN_ON(cpu_buffer, 1); 5188 } 5189 } 5190 5191 static void 5192 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 5193 struct ring_buffer_event *event) 5194 { 5195 u64 delta; 5196 5197 switch (event->type_len) { 5198 case RINGBUF_TYPE_PADDING: 5199 return; 5200 5201 case RINGBUF_TYPE_TIME_EXTEND: 5202 delta = rb_event_time_stamp(event); 5203 iter->read_stamp += delta; 5204 return; 5205 5206 case RINGBUF_TYPE_TIME_STAMP: 5207 delta = rb_event_time_stamp(event); 5208 delta = rb_fix_abs_ts(delta, iter->read_stamp); 5209 iter->read_stamp = delta; 5210 return; 5211 5212 case RINGBUF_TYPE_DATA: 5213 iter->read_stamp += event->time_delta; 5214 return; 5215 5216 default: 5217 RB_WARN_ON(iter->cpu_buffer, 1); 5218 } 5219 } 5220 5221 static struct buffer_page * 5222 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5223 { 5224 struct buffer_page *reader = NULL; 5225 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 5226 unsigned long overwrite; 5227 unsigned long flags; 5228 int nr_loops = 0; 5229 bool ret; 5230 5231 local_irq_save(flags); 5232 arch_spin_lock(&cpu_buffer->lock); 5233 5234 again: 5235 /* 5236 * This should normally only loop twice. But because the 5237 * start of the reader inserts an empty page, it causes 5238 * a case where we will loop three times. There should be no 5239 * reason to loop four times (that I know of). 5240 */ 5241 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 5242 reader = NULL; 5243 goto out; 5244 } 5245 5246 reader = cpu_buffer->reader_page; 5247 5248 /* If there's more to read, return this page */ 5249 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 5250 goto out; 5251 5252 /* Never should we have an index greater than the size */ 5253 if (RB_WARN_ON(cpu_buffer, 5254 cpu_buffer->reader_page->read > rb_page_size(reader))) 5255 goto out; 5256 5257 /* check if we caught up to the tail */ 5258 reader = NULL; 5259 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 5260 goto out; 5261 5262 /* Don't bother swapping if the ring buffer is empty */ 5263 if (rb_num_of_entries(cpu_buffer) == 0) 5264 goto out; 5265 5266 /* 5267 * Reset the reader page to size zero. 5268 */ 5269 local_set(&cpu_buffer->reader_page->write, 0); 5270 local_set(&cpu_buffer->reader_page->entries, 0); 5271 local_set(&cpu_buffer->reader_page->page->commit, 0); 5272 cpu_buffer->reader_page->real_end = 0; 5273 5274 spin: 5275 /* 5276 * Splice the empty reader page into the list around the head. 5277 */ 5278 reader = rb_set_head_page(cpu_buffer); 5279 if (!reader) 5280 goto out; 5281 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 5282 cpu_buffer->reader_page->list.prev = reader->list.prev; 5283 5284 /* 5285 * cpu_buffer->pages just needs to point to the buffer, it 5286 * has no specific buffer page to point to. Lets move it out 5287 * of our way so we don't accidentally swap it. 5288 */ 5289 cpu_buffer->pages = reader->list.prev; 5290 5291 /* The reader page will be pointing to the new head */ 5292 rb_set_list_to_head(&cpu_buffer->reader_page->list); 5293 5294 /* 5295 * We want to make sure we read the overruns after we set up our 5296 * pointers to the next object. The writer side does a 5297 * cmpxchg to cross pages which acts as the mb on the writer 5298 * side. Note, the reader will constantly fail the swap 5299 * while the writer is updating the pointers, so this 5300 * guarantees that the overwrite recorded here is the one we 5301 * want to compare with the last_overrun. 5302 */ 5303 smp_mb(); 5304 overwrite = local_read(&(cpu_buffer->overrun)); 5305 5306 /* 5307 * Here's the tricky part. 5308 * 5309 * We need to move the pointer past the header page. 5310 * But we can only do that if a writer is not currently 5311 * moving it. The page before the header page has the 5312 * flag bit '1' set if it is pointing to the page we want. 5313 * but if the writer is in the process of moving it 5314 * than it will be '2' or already moved '0'. 5315 */ 5316 5317 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 5318 5319 /* 5320 * If we did not convert it, then we must try again. 5321 */ 5322 if (!ret) 5323 goto spin; 5324 5325 if (cpu_buffer->ring_meta) 5326 rb_update_meta_reader(cpu_buffer, reader); 5327 5328 /* 5329 * Yay! We succeeded in replacing the page. 5330 * 5331 * Now make the new head point back to the reader page. 5332 */ 5333 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 5334 rb_inc_page(&cpu_buffer->head_page); 5335 5336 cpu_buffer->cnt++; 5337 local_inc(&cpu_buffer->pages_read); 5338 5339 /* Finally update the reader page to the new head */ 5340 cpu_buffer->reader_page = reader; 5341 cpu_buffer->reader_page->read = 0; 5342 5343 if (overwrite != cpu_buffer->last_overrun) { 5344 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 5345 cpu_buffer->last_overrun = overwrite; 5346 } 5347 5348 goto again; 5349 5350 out: 5351 /* Update the read_stamp on the first event */ 5352 if (reader && reader->read == 0) 5353 cpu_buffer->read_stamp = reader->page->time_stamp; 5354 5355 arch_spin_unlock(&cpu_buffer->lock); 5356 local_irq_restore(flags); 5357 5358 /* 5359 * The writer has preempt disable, wait for it. But not forever 5360 * Although, 1 second is pretty much "forever" 5361 */ 5362 #define USECS_WAIT 1000000 5363 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 5364 /* If the write is past the end of page, a writer is still updating it */ 5365 if (likely(!reader || rb_page_write(reader) <= bsize)) 5366 break; 5367 5368 udelay(1); 5369 5370 /* Get the latest version of the reader write value */ 5371 smp_rmb(); 5372 } 5373 5374 /* The writer is not moving forward? Something is wrong */ 5375 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 5376 reader = NULL; 5377 5378 /* 5379 * Make sure we see any padding after the write update 5380 * (see rb_reset_tail()). 5381 * 5382 * In addition, a writer may be writing on the reader page 5383 * if the page has not been fully filled, so the read barrier 5384 * is also needed to make sure we see the content of what is 5385 * committed by the writer (see rb_set_commit_to_write()). 5386 */ 5387 smp_rmb(); 5388 5389 5390 return reader; 5391 } 5392 5393 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 5394 { 5395 struct ring_buffer_event *event; 5396 struct buffer_page *reader; 5397 unsigned length; 5398 5399 reader = rb_get_reader_page(cpu_buffer); 5400 5401 /* This function should not be called when buffer is empty */ 5402 if (RB_WARN_ON(cpu_buffer, !reader)) 5403 return; 5404 5405 event = rb_reader_event(cpu_buffer); 5406 5407 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 5408 cpu_buffer->read++; 5409 5410 rb_update_read_stamp(cpu_buffer, event); 5411 5412 length = rb_event_length(event); 5413 cpu_buffer->reader_page->read += length; 5414 cpu_buffer->read_bytes += length; 5415 } 5416 5417 static void rb_advance_iter(struct ring_buffer_iter *iter) 5418 { 5419 struct ring_buffer_per_cpu *cpu_buffer; 5420 5421 cpu_buffer = iter->cpu_buffer; 5422 5423 /* If head == next_event then we need to jump to the next event */ 5424 if (iter->head == iter->next_event) { 5425 /* If the event gets overwritten again, there's nothing to do */ 5426 if (rb_iter_head_event(iter) == NULL) 5427 return; 5428 } 5429 5430 iter->head = iter->next_event; 5431 5432 /* 5433 * Check if we are at the end of the buffer. 5434 */ 5435 if (iter->next_event >= rb_page_size(iter->head_page)) { 5436 /* discarded commits can make the page empty */ 5437 if (iter->head_page == cpu_buffer->commit_page) 5438 return; 5439 rb_inc_iter(iter); 5440 return; 5441 } 5442 5443 rb_update_iter_read_stamp(iter, iter->event); 5444 } 5445 5446 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 5447 { 5448 return cpu_buffer->lost_events; 5449 } 5450 5451 static struct ring_buffer_event * 5452 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 5453 unsigned long *lost_events) 5454 { 5455 struct ring_buffer_event *event; 5456 struct buffer_page *reader; 5457 int nr_loops = 0; 5458 5459 if (ts) 5460 *ts = 0; 5461 again: 5462 /* 5463 * We repeat when a time extend is encountered. 5464 * Since the time extend is always attached to a data event, 5465 * we should never loop more than once. 5466 * (We never hit the following condition more than twice). 5467 */ 5468 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 5469 return NULL; 5470 5471 reader = rb_get_reader_page(cpu_buffer); 5472 if (!reader) 5473 return NULL; 5474 5475 event = rb_reader_event(cpu_buffer); 5476 5477 switch (event->type_len) { 5478 case RINGBUF_TYPE_PADDING: 5479 if (rb_null_event(event)) 5480 RB_WARN_ON(cpu_buffer, 1); 5481 /* 5482 * Because the writer could be discarding every 5483 * event it creates (which would probably be bad) 5484 * if we were to go back to "again" then we may never 5485 * catch up, and will trigger the warn on, or lock 5486 * the box. Return the padding, and we will release 5487 * the current locks, and try again. 5488 */ 5489 return event; 5490 5491 case RINGBUF_TYPE_TIME_EXTEND: 5492 /* Internal data, OK to advance */ 5493 rb_advance_reader(cpu_buffer); 5494 goto again; 5495 5496 case RINGBUF_TYPE_TIME_STAMP: 5497 if (ts) { 5498 *ts = rb_event_time_stamp(event); 5499 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 5500 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5501 cpu_buffer->cpu, ts); 5502 } 5503 /* Internal data, OK to advance */ 5504 rb_advance_reader(cpu_buffer); 5505 goto again; 5506 5507 case RINGBUF_TYPE_DATA: 5508 if (ts && !(*ts)) { 5509 *ts = cpu_buffer->read_stamp + event->time_delta; 5510 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5511 cpu_buffer->cpu, ts); 5512 } 5513 if (lost_events) 5514 *lost_events = rb_lost_events(cpu_buffer); 5515 return event; 5516 5517 default: 5518 RB_WARN_ON(cpu_buffer, 1); 5519 } 5520 5521 return NULL; 5522 } 5523 EXPORT_SYMBOL_GPL(ring_buffer_peek); 5524 5525 static struct ring_buffer_event * 5526 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5527 { 5528 struct trace_buffer *buffer; 5529 struct ring_buffer_per_cpu *cpu_buffer; 5530 struct ring_buffer_event *event; 5531 int nr_loops = 0; 5532 5533 if (ts) 5534 *ts = 0; 5535 5536 cpu_buffer = iter->cpu_buffer; 5537 buffer = cpu_buffer->buffer; 5538 5539 /* 5540 * Check if someone performed a consuming read to the buffer 5541 * or removed some pages from the buffer. In these cases, 5542 * iterator was invalidated and we need to reset it. 5543 */ 5544 if (unlikely(iter->cache_read != cpu_buffer->read || 5545 iter->cache_reader_page != cpu_buffer->reader_page || 5546 iter->cache_pages_removed != cpu_buffer->pages_removed)) 5547 rb_iter_reset(iter); 5548 5549 again: 5550 if (ring_buffer_iter_empty(iter)) 5551 return NULL; 5552 5553 /* 5554 * As the writer can mess with what the iterator is trying 5555 * to read, just give up if we fail to get an event after 5556 * three tries. The iterator is not as reliable when reading 5557 * the ring buffer with an active write as the consumer is. 5558 * Do not warn if the three failures is reached. 5559 */ 5560 if (++nr_loops > 3) 5561 return NULL; 5562 5563 if (rb_per_cpu_empty(cpu_buffer)) 5564 return NULL; 5565 5566 if (iter->head >= rb_page_size(iter->head_page)) { 5567 rb_inc_iter(iter); 5568 goto again; 5569 } 5570 5571 event = rb_iter_head_event(iter); 5572 if (!event) 5573 goto again; 5574 5575 switch (event->type_len) { 5576 case RINGBUF_TYPE_PADDING: 5577 if (rb_null_event(event)) { 5578 rb_inc_iter(iter); 5579 goto again; 5580 } 5581 rb_advance_iter(iter); 5582 return event; 5583 5584 case RINGBUF_TYPE_TIME_EXTEND: 5585 /* Internal data, OK to advance */ 5586 rb_advance_iter(iter); 5587 goto again; 5588 5589 case RINGBUF_TYPE_TIME_STAMP: 5590 if (ts) { 5591 *ts = rb_event_time_stamp(event); 5592 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 5593 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5594 cpu_buffer->cpu, ts); 5595 } 5596 /* Internal data, OK to advance */ 5597 rb_advance_iter(iter); 5598 goto again; 5599 5600 case RINGBUF_TYPE_DATA: 5601 if (ts && !(*ts)) { 5602 *ts = iter->read_stamp + event->time_delta; 5603 ring_buffer_normalize_time_stamp(buffer, 5604 cpu_buffer->cpu, ts); 5605 } 5606 return event; 5607 5608 default: 5609 RB_WARN_ON(cpu_buffer, 1); 5610 } 5611 5612 return NULL; 5613 } 5614 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 5615 5616 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 5617 { 5618 if (likely(!in_nmi())) { 5619 raw_spin_lock(&cpu_buffer->reader_lock); 5620 return true; 5621 } 5622 5623 /* 5624 * If an NMI die dumps out the content of the ring buffer 5625 * trylock must be used to prevent a deadlock if the NMI 5626 * preempted a task that holds the ring buffer locks. If 5627 * we get the lock then all is fine, if not, then continue 5628 * to do the read, but this can corrupt the ring buffer, 5629 * so it must be permanently disabled from future writes. 5630 * Reading from NMI is a oneshot deal. 5631 */ 5632 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 5633 return true; 5634 5635 /* Continue without locking, but disable the ring buffer */ 5636 atomic_inc(&cpu_buffer->record_disabled); 5637 return false; 5638 } 5639 5640 static inline void 5641 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 5642 { 5643 if (likely(locked)) 5644 raw_spin_unlock(&cpu_buffer->reader_lock); 5645 } 5646 5647 /** 5648 * ring_buffer_peek - peek at the next event to be read 5649 * @buffer: The ring buffer to read 5650 * @cpu: The cpu to peak at 5651 * @ts: The timestamp counter of this event. 5652 * @lost_events: a variable to store if events were lost (may be NULL) 5653 * 5654 * This will return the event that will be read next, but does 5655 * not consume the data. 5656 */ 5657 struct ring_buffer_event * 5658 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 5659 unsigned long *lost_events) 5660 { 5661 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5662 struct ring_buffer_event *event; 5663 unsigned long flags; 5664 bool dolock; 5665 5666 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5667 return NULL; 5668 5669 again: 5670 local_irq_save(flags); 5671 dolock = rb_reader_lock(cpu_buffer); 5672 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5673 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5674 rb_advance_reader(cpu_buffer); 5675 rb_reader_unlock(cpu_buffer, dolock); 5676 local_irq_restore(flags); 5677 5678 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5679 goto again; 5680 5681 return event; 5682 } 5683 5684 /** ring_buffer_iter_dropped - report if there are dropped events 5685 * @iter: The ring buffer iterator 5686 * 5687 * Returns true if there was dropped events since the last peek. 5688 */ 5689 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 5690 { 5691 bool ret = iter->missed_events != 0; 5692 5693 iter->missed_events = 0; 5694 return ret; 5695 } 5696 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 5697 5698 /** 5699 * ring_buffer_iter_peek - peek at the next event to be read 5700 * @iter: The ring buffer iterator 5701 * @ts: The timestamp counter of this event. 5702 * 5703 * This will return the event that will be read next, but does 5704 * not increment the iterator. 5705 */ 5706 struct ring_buffer_event * 5707 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5708 { 5709 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5710 struct ring_buffer_event *event; 5711 unsigned long flags; 5712 5713 again: 5714 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5715 event = rb_iter_peek(iter, ts); 5716 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5717 5718 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5719 goto again; 5720 5721 return event; 5722 } 5723 5724 /** 5725 * ring_buffer_consume - return an event and consume it 5726 * @buffer: The ring buffer to get the next event from 5727 * @cpu: the cpu to read the buffer from 5728 * @ts: a variable to store the timestamp (may be NULL) 5729 * @lost_events: a variable to store if events were lost (may be NULL) 5730 * 5731 * Returns the next event in the ring buffer, and that event is consumed. 5732 * Meaning, that sequential reads will keep returning a different event, 5733 * and eventually empty the ring buffer if the producer is slower. 5734 */ 5735 struct ring_buffer_event * 5736 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5737 unsigned long *lost_events) 5738 { 5739 struct ring_buffer_per_cpu *cpu_buffer; 5740 struct ring_buffer_event *event = NULL; 5741 unsigned long flags; 5742 bool dolock; 5743 5744 again: 5745 /* might be called in atomic */ 5746 preempt_disable(); 5747 5748 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5749 goto out; 5750 5751 cpu_buffer = buffer->buffers[cpu]; 5752 local_irq_save(flags); 5753 dolock = rb_reader_lock(cpu_buffer); 5754 5755 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5756 if (event) { 5757 cpu_buffer->lost_events = 0; 5758 rb_advance_reader(cpu_buffer); 5759 } 5760 5761 rb_reader_unlock(cpu_buffer, dolock); 5762 local_irq_restore(flags); 5763 5764 out: 5765 preempt_enable(); 5766 5767 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5768 goto again; 5769 5770 return event; 5771 } 5772 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5773 5774 /** 5775 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 5776 * @buffer: The ring buffer to read from 5777 * @cpu: The cpu buffer to iterate over 5778 * @flags: gfp flags to use for memory allocation 5779 * 5780 * This performs the initial preparations necessary to iterate 5781 * through the buffer. Memory is allocated, buffer resizing 5782 * is disabled, and the iterator pointer is returned to the caller. 5783 * 5784 * After a sequence of ring_buffer_read_prepare calls, the user is 5785 * expected to make at least one call to ring_buffer_read_prepare_sync. 5786 * Afterwards, ring_buffer_read_start is invoked to get things going 5787 * for real. 5788 * 5789 * This overall must be paired with ring_buffer_read_finish. 5790 */ 5791 struct ring_buffer_iter * 5792 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 5793 { 5794 struct ring_buffer_per_cpu *cpu_buffer; 5795 struct ring_buffer_iter *iter; 5796 5797 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5798 return NULL; 5799 5800 iter = kzalloc(sizeof(*iter), flags); 5801 if (!iter) 5802 return NULL; 5803 5804 /* Holds the entire event: data and meta data */ 5805 iter->event_size = buffer->subbuf_size; 5806 iter->event = kmalloc(iter->event_size, flags); 5807 if (!iter->event) { 5808 kfree(iter); 5809 return NULL; 5810 } 5811 5812 cpu_buffer = buffer->buffers[cpu]; 5813 5814 iter->cpu_buffer = cpu_buffer; 5815 5816 atomic_inc(&cpu_buffer->resize_disabled); 5817 5818 return iter; 5819 } 5820 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 5821 5822 /** 5823 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 5824 * 5825 * All previously invoked ring_buffer_read_prepare calls to prepare 5826 * iterators will be synchronized. Afterwards, read_buffer_read_start 5827 * calls on those iterators are allowed. 5828 */ 5829 void 5830 ring_buffer_read_prepare_sync(void) 5831 { 5832 synchronize_rcu(); 5833 } 5834 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 5835 5836 /** 5837 * ring_buffer_read_start - start a non consuming read of the buffer 5838 * @iter: The iterator returned by ring_buffer_read_prepare 5839 * 5840 * This finalizes the startup of an iteration through the buffer. 5841 * The iterator comes from a call to ring_buffer_read_prepare and 5842 * an intervening ring_buffer_read_prepare_sync must have been 5843 * performed. 5844 * 5845 * Must be paired with ring_buffer_read_finish. 5846 */ 5847 void 5848 ring_buffer_read_start(struct ring_buffer_iter *iter) 5849 { 5850 struct ring_buffer_per_cpu *cpu_buffer; 5851 unsigned long flags; 5852 5853 if (!iter) 5854 return; 5855 5856 cpu_buffer = iter->cpu_buffer; 5857 5858 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5859 arch_spin_lock(&cpu_buffer->lock); 5860 rb_iter_reset(iter); 5861 arch_spin_unlock(&cpu_buffer->lock); 5862 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5863 } 5864 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5865 5866 /** 5867 * ring_buffer_read_finish - finish reading the iterator of the buffer 5868 * @iter: The iterator retrieved by ring_buffer_start 5869 * 5870 * This re-enables resizing of the buffer, and frees the iterator. 5871 */ 5872 void 5873 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5874 { 5875 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5876 5877 /* Use this opportunity to check the integrity of the ring buffer. */ 5878 rb_check_pages(cpu_buffer); 5879 5880 atomic_dec(&cpu_buffer->resize_disabled); 5881 kfree(iter->event); 5882 kfree(iter); 5883 } 5884 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 5885 5886 /** 5887 * ring_buffer_iter_advance - advance the iterator to the next location 5888 * @iter: The ring buffer iterator 5889 * 5890 * Move the location of the iterator such that the next read will 5891 * be the next location of the iterator. 5892 */ 5893 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 5894 { 5895 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5896 unsigned long flags; 5897 5898 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5899 5900 rb_advance_iter(iter); 5901 5902 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5903 } 5904 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 5905 5906 /** 5907 * ring_buffer_size - return the size of the ring buffer (in bytes) 5908 * @buffer: The ring buffer. 5909 * @cpu: The CPU to get ring buffer size from. 5910 */ 5911 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 5912 { 5913 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5914 return 0; 5915 5916 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 5917 } 5918 EXPORT_SYMBOL_GPL(ring_buffer_size); 5919 5920 /** 5921 * ring_buffer_max_event_size - return the max data size of an event 5922 * @buffer: The ring buffer. 5923 * 5924 * Returns the maximum size an event can be. 5925 */ 5926 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 5927 { 5928 /* If abs timestamp is requested, events have a timestamp too */ 5929 if (ring_buffer_time_stamp_abs(buffer)) 5930 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 5931 return buffer->max_data_size; 5932 } 5933 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 5934 5935 static void rb_clear_buffer_page(struct buffer_page *page) 5936 { 5937 local_set(&page->write, 0); 5938 local_set(&page->entries, 0); 5939 rb_init_page(page->page); 5940 page->read = 0; 5941 } 5942 5943 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 5944 { 5945 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 5946 5947 if (!meta) 5948 return; 5949 5950 meta->reader.read = cpu_buffer->reader_page->read; 5951 meta->reader.id = cpu_buffer->reader_page->id; 5952 meta->reader.lost_events = cpu_buffer->lost_events; 5953 5954 meta->entries = local_read(&cpu_buffer->entries); 5955 meta->overrun = local_read(&cpu_buffer->overrun); 5956 meta->read = cpu_buffer->read; 5957 5958 /* Some archs do not have data cache coherency between kernel and user-space */ 5959 flush_dcache_folio(virt_to_folio(cpu_buffer->meta_page)); 5960 } 5961 5962 static void 5963 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 5964 { 5965 struct buffer_page *page; 5966 5967 rb_head_page_deactivate(cpu_buffer); 5968 5969 cpu_buffer->head_page 5970 = list_entry(cpu_buffer->pages, struct buffer_page, list); 5971 rb_clear_buffer_page(cpu_buffer->head_page); 5972 list_for_each_entry(page, cpu_buffer->pages, list) { 5973 rb_clear_buffer_page(page); 5974 } 5975 5976 cpu_buffer->tail_page = cpu_buffer->head_page; 5977 cpu_buffer->commit_page = cpu_buffer->head_page; 5978 5979 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 5980 INIT_LIST_HEAD(&cpu_buffer->new_pages); 5981 rb_clear_buffer_page(cpu_buffer->reader_page); 5982 5983 local_set(&cpu_buffer->entries_bytes, 0); 5984 local_set(&cpu_buffer->overrun, 0); 5985 local_set(&cpu_buffer->commit_overrun, 0); 5986 local_set(&cpu_buffer->dropped_events, 0); 5987 local_set(&cpu_buffer->entries, 0); 5988 local_set(&cpu_buffer->committing, 0); 5989 local_set(&cpu_buffer->commits, 0); 5990 local_set(&cpu_buffer->pages_touched, 0); 5991 local_set(&cpu_buffer->pages_lost, 0); 5992 local_set(&cpu_buffer->pages_read, 0); 5993 cpu_buffer->last_pages_touch = 0; 5994 cpu_buffer->shortest_full = 0; 5995 cpu_buffer->read = 0; 5996 cpu_buffer->read_bytes = 0; 5997 5998 rb_time_set(&cpu_buffer->write_stamp, 0); 5999 rb_time_set(&cpu_buffer->before_stamp, 0); 6000 6001 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 6002 6003 cpu_buffer->lost_events = 0; 6004 cpu_buffer->last_overrun = 0; 6005 6006 rb_head_page_activate(cpu_buffer); 6007 cpu_buffer->pages_removed = 0; 6008 6009 if (cpu_buffer->mapped) { 6010 rb_update_meta_page(cpu_buffer); 6011 if (cpu_buffer->ring_meta) { 6012 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 6013 meta->commit_buffer = meta->head_buffer; 6014 } 6015 } 6016 } 6017 6018 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 6019 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6020 { 6021 unsigned long flags; 6022 6023 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6024 6025 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 6026 goto out; 6027 6028 arch_spin_lock(&cpu_buffer->lock); 6029 6030 rb_reset_cpu(cpu_buffer); 6031 6032 arch_spin_unlock(&cpu_buffer->lock); 6033 6034 out: 6035 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6036 } 6037 6038 /** 6039 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 6040 * @buffer: The ring buffer to reset a per cpu buffer of 6041 * @cpu: The CPU buffer to be reset 6042 */ 6043 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 6044 { 6045 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6046 struct ring_buffer_meta *meta; 6047 6048 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6049 return; 6050 6051 /* prevent another thread from changing buffer sizes */ 6052 mutex_lock(&buffer->mutex); 6053 6054 atomic_inc(&cpu_buffer->resize_disabled); 6055 atomic_inc(&cpu_buffer->record_disabled); 6056 6057 /* Make sure all commits have finished */ 6058 synchronize_rcu(); 6059 6060 reset_disabled_cpu_buffer(cpu_buffer); 6061 6062 atomic_dec(&cpu_buffer->record_disabled); 6063 atomic_dec(&cpu_buffer->resize_disabled); 6064 6065 /* Make sure persistent meta now uses this buffer's addresses */ 6066 meta = rb_range_meta(buffer, 0, cpu_buffer->cpu); 6067 if (meta) 6068 rb_meta_init_text_addr(meta); 6069 6070 mutex_unlock(&buffer->mutex); 6071 } 6072 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 6073 6074 /* Flag to ensure proper resetting of atomic variables */ 6075 #define RESET_BIT (1 << 30) 6076 6077 /** 6078 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 6079 * @buffer: The ring buffer to reset a per cpu buffer of 6080 */ 6081 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 6082 { 6083 struct ring_buffer_per_cpu *cpu_buffer; 6084 struct ring_buffer_meta *meta; 6085 int cpu; 6086 6087 /* prevent another thread from changing buffer sizes */ 6088 mutex_lock(&buffer->mutex); 6089 6090 for_each_online_buffer_cpu(buffer, cpu) { 6091 cpu_buffer = buffer->buffers[cpu]; 6092 6093 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 6094 atomic_inc(&cpu_buffer->record_disabled); 6095 } 6096 6097 /* Make sure all commits have finished */ 6098 synchronize_rcu(); 6099 6100 for_each_buffer_cpu(buffer, cpu) { 6101 cpu_buffer = buffer->buffers[cpu]; 6102 6103 /* 6104 * If a CPU came online during the synchronize_rcu(), then 6105 * ignore it. 6106 */ 6107 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 6108 continue; 6109 6110 reset_disabled_cpu_buffer(cpu_buffer); 6111 6112 /* Make sure persistent meta now uses this buffer's addresses */ 6113 meta = rb_range_meta(buffer, 0, cpu_buffer->cpu); 6114 if (meta) 6115 rb_meta_init_text_addr(meta); 6116 6117 atomic_dec(&cpu_buffer->record_disabled); 6118 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 6119 } 6120 6121 mutex_unlock(&buffer->mutex); 6122 } 6123 6124 /** 6125 * ring_buffer_reset - reset a ring buffer 6126 * @buffer: The ring buffer to reset all cpu buffers 6127 */ 6128 void ring_buffer_reset(struct trace_buffer *buffer) 6129 { 6130 struct ring_buffer_per_cpu *cpu_buffer; 6131 int cpu; 6132 6133 /* prevent another thread from changing buffer sizes */ 6134 mutex_lock(&buffer->mutex); 6135 6136 for_each_buffer_cpu(buffer, cpu) { 6137 cpu_buffer = buffer->buffers[cpu]; 6138 6139 atomic_inc(&cpu_buffer->resize_disabled); 6140 atomic_inc(&cpu_buffer->record_disabled); 6141 } 6142 6143 /* Make sure all commits have finished */ 6144 synchronize_rcu(); 6145 6146 for_each_buffer_cpu(buffer, cpu) { 6147 cpu_buffer = buffer->buffers[cpu]; 6148 6149 reset_disabled_cpu_buffer(cpu_buffer); 6150 6151 atomic_dec(&cpu_buffer->record_disabled); 6152 atomic_dec(&cpu_buffer->resize_disabled); 6153 } 6154 6155 mutex_unlock(&buffer->mutex); 6156 } 6157 EXPORT_SYMBOL_GPL(ring_buffer_reset); 6158 6159 /** 6160 * ring_buffer_empty - is the ring buffer empty? 6161 * @buffer: The ring buffer to test 6162 */ 6163 bool ring_buffer_empty(struct trace_buffer *buffer) 6164 { 6165 struct ring_buffer_per_cpu *cpu_buffer; 6166 unsigned long flags; 6167 bool dolock; 6168 bool ret; 6169 int cpu; 6170 6171 /* yes this is racy, but if you don't like the race, lock the buffer */ 6172 for_each_buffer_cpu(buffer, cpu) { 6173 cpu_buffer = buffer->buffers[cpu]; 6174 local_irq_save(flags); 6175 dolock = rb_reader_lock(cpu_buffer); 6176 ret = rb_per_cpu_empty(cpu_buffer); 6177 rb_reader_unlock(cpu_buffer, dolock); 6178 local_irq_restore(flags); 6179 6180 if (!ret) 6181 return false; 6182 } 6183 6184 return true; 6185 } 6186 EXPORT_SYMBOL_GPL(ring_buffer_empty); 6187 6188 /** 6189 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 6190 * @buffer: The ring buffer 6191 * @cpu: The CPU buffer to test 6192 */ 6193 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 6194 { 6195 struct ring_buffer_per_cpu *cpu_buffer; 6196 unsigned long flags; 6197 bool dolock; 6198 bool ret; 6199 6200 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6201 return true; 6202 6203 cpu_buffer = buffer->buffers[cpu]; 6204 local_irq_save(flags); 6205 dolock = rb_reader_lock(cpu_buffer); 6206 ret = rb_per_cpu_empty(cpu_buffer); 6207 rb_reader_unlock(cpu_buffer, dolock); 6208 local_irq_restore(flags); 6209 6210 return ret; 6211 } 6212 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 6213 6214 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 6215 /** 6216 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 6217 * @buffer_a: One buffer to swap with 6218 * @buffer_b: The other buffer to swap with 6219 * @cpu: the CPU of the buffers to swap 6220 * 6221 * This function is useful for tracers that want to take a "snapshot" 6222 * of a CPU buffer and has another back up buffer lying around. 6223 * it is expected that the tracer handles the cpu buffer not being 6224 * used at the moment. 6225 */ 6226 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 6227 struct trace_buffer *buffer_b, int cpu) 6228 { 6229 struct ring_buffer_per_cpu *cpu_buffer_a; 6230 struct ring_buffer_per_cpu *cpu_buffer_b; 6231 int ret = -EINVAL; 6232 6233 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 6234 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 6235 goto out; 6236 6237 cpu_buffer_a = buffer_a->buffers[cpu]; 6238 cpu_buffer_b = buffer_b->buffers[cpu]; 6239 6240 /* It's up to the callers to not try to swap mapped buffers */ 6241 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) { 6242 ret = -EBUSY; 6243 goto out; 6244 } 6245 6246 /* At least make sure the two buffers are somewhat the same */ 6247 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 6248 goto out; 6249 6250 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 6251 goto out; 6252 6253 ret = -EAGAIN; 6254 6255 if (atomic_read(&buffer_a->record_disabled)) 6256 goto out; 6257 6258 if (atomic_read(&buffer_b->record_disabled)) 6259 goto out; 6260 6261 if (atomic_read(&cpu_buffer_a->record_disabled)) 6262 goto out; 6263 6264 if (atomic_read(&cpu_buffer_b->record_disabled)) 6265 goto out; 6266 6267 /* 6268 * We can't do a synchronize_rcu here because this 6269 * function can be called in atomic context. 6270 * Normally this will be called from the same CPU as cpu. 6271 * If not it's up to the caller to protect this. 6272 */ 6273 atomic_inc(&cpu_buffer_a->record_disabled); 6274 atomic_inc(&cpu_buffer_b->record_disabled); 6275 6276 ret = -EBUSY; 6277 if (local_read(&cpu_buffer_a->committing)) 6278 goto out_dec; 6279 if (local_read(&cpu_buffer_b->committing)) 6280 goto out_dec; 6281 6282 /* 6283 * When resize is in progress, we cannot swap it because 6284 * it will mess the state of the cpu buffer. 6285 */ 6286 if (atomic_read(&buffer_a->resizing)) 6287 goto out_dec; 6288 if (atomic_read(&buffer_b->resizing)) 6289 goto out_dec; 6290 6291 buffer_a->buffers[cpu] = cpu_buffer_b; 6292 buffer_b->buffers[cpu] = cpu_buffer_a; 6293 6294 cpu_buffer_b->buffer = buffer_a; 6295 cpu_buffer_a->buffer = buffer_b; 6296 6297 ret = 0; 6298 6299 out_dec: 6300 atomic_dec(&cpu_buffer_a->record_disabled); 6301 atomic_dec(&cpu_buffer_b->record_disabled); 6302 out: 6303 return ret; 6304 } 6305 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 6306 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 6307 6308 /** 6309 * ring_buffer_alloc_read_page - allocate a page to read from buffer 6310 * @buffer: the buffer to allocate for. 6311 * @cpu: the cpu buffer to allocate. 6312 * 6313 * This function is used in conjunction with ring_buffer_read_page. 6314 * When reading a full page from the ring buffer, these functions 6315 * can be used to speed up the process. The calling function should 6316 * allocate a few pages first with this function. Then when it 6317 * needs to get pages from the ring buffer, it passes the result 6318 * of this function into ring_buffer_read_page, which will swap 6319 * the page that was allocated, with the read page of the buffer. 6320 * 6321 * Returns: 6322 * The page allocated, or ERR_PTR 6323 */ 6324 struct buffer_data_read_page * 6325 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 6326 { 6327 struct ring_buffer_per_cpu *cpu_buffer; 6328 struct buffer_data_read_page *bpage = NULL; 6329 unsigned long flags; 6330 struct page *page; 6331 6332 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6333 return ERR_PTR(-ENODEV); 6334 6335 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 6336 if (!bpage) 6337 return ERR_PTR(-ENOMEM); 6338 6339 bpage->order = buffer->subbuf_order; 6340 cpu_buffer = buffer->buffers[cpu]; 6341 local_irq_save(flags); 6342 arch_spin_lock(&cpu_buffer->lock); 6343 6344 if (cpu_buffer->free_page) { 6345 bpage->data = cpu_buffer->free_page; 6346 cpu_buffer->free_page = NULL; 6347 } 6348 6349 arch_spin_unlock(&cpu_buffer->lock); 6350 local_irq_restore(flags); 6351 6352 if (bpage->data) 6353 goto out; 6354 6355 page = alloc_pages_node(cpu_to_node(cpu), 6356 GFP_KERNEL | __GFP_NORETRY | __GFP_COMP | __GFP_ZERO, 6357 cpu_buffer->buffer->subbuf_order); 6358 if (!page) { 6359 kfree(bpage); 6360 return ERR_PTR(-ENOMEM); 6361 } 6362 6363 bpage->data = page_address(page); 6364 6365 out: 6366 rb_init_page(bpage->data); 6367 6368 return bpage; 6369 } 6370 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 6371 6372 /** 6373 * ring_buffer_free_read_page - free an allocated read page 6374 * @buffer: the buffer the page was allocate for 6375 * @cpu: the cpu buffer the page came from 6376 * @data_page: the page to free 6377 * 6378 * Free a page allocated from ring_buffer_alloc_read_page. 6379 */ 6380 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 6381 struct buffer_data_read_page *data_page) 6382 { 6383 struct ring_buffer_per_cpu *cpu_buffer; 6384 struct buffer_data_page *bpage = data_page->data; 6385 struct page *page = virt_to_page(bpage); 6386 unsigned long flags; 6387 6388 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 6389 return; 6390 6391 cpu_buffer = buffer->buffers[cpu]; 6392 6393 /* 6394 * If the page is still in use someplace else, or order of the page 6395 * is different from the subbuffer order of the buffer - 6396 * we can't reuse it 6397 */ 6398 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 6399 goto out; 6400 6401 local_irq_save(flags); 6402 arch_spin_lock(&cpu_buffer->lock); 6403 6404 if (!cpu_buffer->free_page) { 6405 cpu_buffer->free_page = bpage; 6406 bpage = NULL; 6407 } 6408 6409 arch_spin_unlock(&cpu_buffer->lock); 6410 local_irq_restore(flags); 6411 6412 out: 6413 free_pages((unsigned long)bpage, data_page->order); 6414 kfree(data_page); 6415 } 6416 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 6417 6418 /** 6419 * ring_buffer_read_page - extract a page from the ring buffer 6420 * @buffer: buffer to extract from 6421 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 6422 * @len: amount to extract 6423 * @cpu: the cpu of the buffer to extract 6424 * @full: should the extraction only happen when the page is full. 6425 * 6426 * This function will pull out a page from the ring buffer and consume it. 6427 * @data_page must be the address of the variable that was returned 6428 * from ring_buffer_alloc_read_page. This is because the page might be used 6429 * to swap with a page in the ring buffer. 6430 * 6431 * for example: 6432 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 6433 * if (IS_ERR(rpage)) 6434 * return PTR_ERR(rpage); 6435 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 6436 * if (ret >= 0) 6437 * process_page(ring_buffer_read_page_data(rpage), ret); 6438 * ring_buffer_free_read_page(buffer, cpu, rpage); 6439 * 6440 * When @full is set, the function will not return true unless 6441 * the writer is off the reader page. 6442 * 6443 * Note: it is up to the calling functions to handle sleeps and wakeups. 6444 * The ring buffer can be used anywhere in the kernel and can not 6445 * blindly call wake_up. The layer that uses the ring buffer must be 6446 * responsible for that. 6447 * 6448 * Returns: 6449 * >=0 if data has been transferred, returns the offset of consumed data. 6450 * <0 if no data has been transferred. 6451 */ 6452 int ring_buffer_read_page(struct trace_buffer *buffer, 6453 struct buffer_data_read_page *data_page, 6454 size_t len, int cpu, int full) 6455 { 6456 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6457 struct ring_buffer_event *event; 6458 struct buffer_data_page *bpage; 6459 struct buffer_page *reader; 6460 unsigned long missed_events; 6461 unsigned long flags; 6462 unsigned int commit; 6463 unsigned int read; 6464 u64 save_timestamp; 6465 int ret = -1; 6466 6467 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6468 goto out; 6469 6470 /* 6471 * If len is not big enough to hold the page header, then 6472 * we can not copy anything. 6473 */ 6474 if (len <= BUF_PAGE_HDR_SIZE) 6475 goto out; 6476 6477 len -= BUF_PAGE_HDR_SIZE; 6478 6479 if (!data_page || !data_page->data) 6480 goto out; 6481 if (data_page->order != buffer->subbuf_order) 6482 goto out; 6483 6484 bpage = data_page->data; 6485 if (!bpage) 6486 goto out; 6487 6488 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6489 6490 reader = rb_get_reader_page(cpu_buffer); 6491 if (!reader) 6492 goto out_unlock; 6493 6494 event = rb_reader_event(cpu_buffer); 6495 6496 read = reader->read; 6497 commit = rb_page_size(reader); 6498 6499 /* Check if any events were dropped */ 6500 missed_events = cpu_buffer->lost_events; 6501 6502 /* 6503 * If this page has been partially read or 6504 * if len is not big enough to read the rest of the page or 6505 * a writer is still on the page, then 6506 * we must copy the data from the page to the buffer. 6507 * Otherwise, we can simply swap the page with the one passed in. 6508 */ 6509 if (read || (len < (commit - read)) || 6510 cpu_buffer->reader_page == cpu_buffer->commit_page || 6511 cpu_buffer->mapped) { 6512 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 6513 unsigned int rpos = read; 6514 unsigned int pos = 0; 6515 unsigned int size; 6516 6517 /* 6518 * If a full page is expected, this can still be returned 6519 * if there's been a previous partial read and the 6520 * rest of the page can be read and the commit page is off 6521 * the reader page. 6522 */ 6523 if (full && 6524 (!read || (len < (commit - read)) || 6525 cpu_buffer->reader_page == cpu_buffer->commit_page)) 6526 goto out_unlock; 6527 6528 if (len > (commit - read)) 6529 len = (commit - read); 6530 6531 /* Always keep the time extend and data together */ 6532 size = rb_event_ts_length(event); 6533 6534 if (len < size) 6535 goto out_unlock; 6536 6537 /* save the current timestamp, since the user will need it */ 6538 save_timestamp = cpu_buffer->read_stamp; 6539 6540 /* Need to copy one event at a time */ 6541 do { 6542 /* We need the size of one event, because 6543 * rb_advance_reader only advances by one event, 6544 * whereas rb_event_ts_length may include the size of 6545 * one or two events. 6546 * We have already ensured there's enough space if this 6547 * is a time extend. */ 6548 size = rb_event_length(event); 6549 memcpy(bpage->data + pos, rpage->data + rpos, size); 6550 6551 len -= size; 6552 6553 rb_advance_reader(cpu_buffer); 6554 rpos = reader->read; 6555 pos += size; 6556 6557 if (rpos >= commit) 6558 break; 6559 6560 event = rb_reader_event(cpu_buffer); 6561 /* Always keep the time extend and data together */ 6562 size = rb_event_ts_length(event); 6563 } while (len >= size); 6564 6565 /* update bpage */ 6566 local_set(&bpage->commit, pos); 6567 bpage->time_stamp = save_timestamp; 6568 6569 /* we copied everything to the beginning */ 6570 read = 0; 6571 } else { 6572 /* update the entry counter */ 6573 cpu_buffer->read += rb_page_entries(reader); 6574 cpu_buffer->read_bytes += rb_page_size(reader); 6575 6576 /* swap the pages */ 6577 rb_init_page(bpage); 6578 bpage = reader->page; 6579 reader->page = data_page->data; 6580 local_set(&reader->write, 0); 6581 local_set(&reader->entries, 0); 6582 reader->read = 0; 6583 data_page->data = bpage; 6584 6585 /* 6586 * Use the real_end for the data size, 6587 * This gives us a chance to store the lost events 6588 * on the page. 6589 */ 6590 if (reader->real_end) 6591 local_set(&bpage->commit, reader->real_end); 6592 } 6593 ret = read; 6594 6595 cpu_buffer->lost_events = 0; 6596 6597 commit = local_read(&bpage->commit); 6598 /* 6599 * Set a flag in the commit field if we lost events 6600 */ 6601 if (missed_events) { 6602 /* If there is room at the end of the page to save the 6603 * missed events, then record it there. 6604 */ 6605 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 6606 memcpy(&bpage->data[commit], &missed_events, 6607 sizeof(missed_events)); 6608 local_add(RB_MISSED_STORED, &bpage->commit); 6609 commit += sizeof(missed_events); 6610 } 6611 local_add(RB_MISSED_EVENTS, &bpage->commit); 6612 } 6613 6614 /* 6615 * This page may be off to user land. Zero it out here. 6616 */ 6617 if (commit < buffer->subbuf_size) 6618 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 6619 6620 out_unlock: 6621 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6622 6623 out: 6624 return ret; 6625 } 6626 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 6627 6628 /** 6629 * ring_buffer_read_page_data - get pointer to the data in the page. 6630 * @page: the page to get the data from 6631 * 6632 * Returns pointer to the actual data in this page. 6633 */ 6634 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 6635 { 6636 return page->data; 6637 } 6638 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 6639 6640 /** 6641 * ring_buffer_subbuf_size_get - get size of the sub buffer. 6642 * @buffer: the buffer to get the sub buffer size from 6643 * 6644 * Returns size of the sub buffer, in bytes. 6645 */ 6646 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 6647 { 6648 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6649 } 6650 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 6651 6652 /** 6653 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 6654 * @buffer: The ring_buffer to get the system sub page order from 6655 * 6656 * By default, one ring buffer sub page equals to one system page. This parameter 6657 * is configurable, per ring buffer. The size of the ring buffer sub page can be 6658 * extended, but must be an order of system page size. 6659 * 6660 * Returns the order of buffer sub page size, in system pages: 6661 * 0 means the sub buffer size is 1 system page and so forth. 6662 * In case of an error < 0 is returned. 6663 */ 6664 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 6665 { 6666 if (!buffer) 6667 return -EINVAL; 6668 6669 return buffer->subbuf_order; 6670 } 6671 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 6672 6673 /** 6674 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 6675 * @buffer: The ring_buffer to set the new page size. 6676 * @order: Order of the system pages in one sub buffer page 6677 * 6678 * By default, one ring buffer pages equals to one system page. This API can be 6679 * used to set new size of the ring buffer page. The size must be order of 6680 * system page size, that's why the input parameter @order is the order of 6681 * system pages that are allocated for one ring buffer page: 6682 * 0 - 1 system page 6683 * 1 - 2 system pages 6684 * 3 - 4 system pages 6685 * ... 6686 * 6687 * Returns 0 on success or < 0 in case of an error. 6688 */ 6689 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 6690 { 6691 struct ring_buffer_per_cpu *cpu_buffer; 6692 struct buffer_page *bpage, *tmp; 6693 int old_order, old_size; 6694 int nr_pages; 6695 int psize; 6696 int err; 6697 int cpu; 6698 6699 if (!buffer || order < 0) 6700 return -EINVAL; 6701 6702 if (buffer->subbuf_order == order) 6703 return 0; 6704 6705 psize = (1 << order) * PAGE_SIZE; 6706 if (psize <= BUF_PAGE_HDR_SIZE) 6707 return -EINVAL; 6708 6709 /* Size of a subbuf cannot be greater than the write counter */ 6710 if (psize > RB_WRITE_MASK + 1) 6711 return -EINVAL; 6712 6713 old_order = buffer->subbuf_order; 6714 old_size = buffer->subbuf_size; 6715 6716 /* prevent another thread from changing buffer sizes */ 6717 mutex_lock(&buffer->mutex); 6718 atomic_inc(&buffer->record_disabled); 6719 6720 /* Make sure all commits have finished */ 6721 synchronize_rcu(); 6722 6723 buffer->subbuf_order = order; 6724 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 6725 6726 /* Make sure all new buffers are allocated, before deleting the old ones */ 6727 for_each_buffer_cpu(buffer, cpu) { 6728 6729 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6730 continue; 6731 6732 cpu_buffer = buffer->buffers[cpu]; 6733 6734 if (cpu_buffer->mapped) { 6735 err = -EBUSY; 6736 goto error; 6737 } 6738 6739 /* Update the number of pages to match the new size */ 6740 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 6741 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 6742 6743 /* we need a minimum of two pages */ 6744 if (nr_pages < 2) 6745 nr_pages = 2; 6746 6747 cpu_buffer->nr_pages_to_update = nr_pages; 6748 6749 /* Include the reader page */ 6750 nr_pages++; 6751 6752 /* Allocate the new size buffer */ 6753 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6754 if (__rb_allocate_pages(cpu_buffer, nr_pages, 6755 &cpu_buffer->new_pages)) { 6756 /* not enough memory for new pages */ 6757 err = -ENOMEM; 6758 goto error; 6759 } 6760 } 6761 6762 for_each_buffer_cpu(buffer, cpu) { 6763 struct buffer_data_page *old_free_data_page; 6764 struct list_head old_pages; 6765 unsigned long flags; 6766 6767 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6768 continue; 6769 6770 cpu_buffer = buffer->buffers[cpu]; 6771 6772 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6773 6774 /* Clear the head bit to make the link list normal to read */ 6775 rb_head_page_deactivate(cpu_buffer); 6776 6777 /* 6778 * Collect buffers from the cpu_buffer pages list and the 6779 * reader_page on old_pages, so they can be freed later when not 6780 * under a spinlock. The pages list is a linked list with no 6781 * head, adding old_pages turns it into a regular list with 6782 * old_pages being the head. 6783 */ 6784 list_add(&old_pages, cpu_buffer->pages); 6785 list_add(&cpu_buffer->reader_page->list, &old_pages); 6786 6787 /* One page was allocated for the reader page */ 6788 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 6789 struct buffer_page, list); 6790 list_del_init(&cpu_buffer->reader_page->list); 6791 6792 /* Install the new pages, remove the head from the list */ 6793 cpu_buffer->pages = cpu_buffer->new_pages.next; 6794 list_del_init(&cpu_buffer->new_pages); 6795 cpu_buffer->cnt++; 6796 6797 cpu_buffer->head_page 6798 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6799 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 6800 6801 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 6802 cpu_buffer->nr_pages_to_update = 0; 6803 6804 old_free_data_page = cpu_buffer->free_page; 6805 cpu_buffer->free_page = NULL; 6806 6807 rb_head_page_activate(cpu_buffer); 6808 6809 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6810 6811 /* Free old sub buffers */ 6812 list_for_each_entry_safe(bpage, tmp, &old_pages, list) { 6813 list_del_init(&bpage->list); 6814 free_buffer_page(bpage); 6815 } 6816 free_pages((unsigned long)old_free_data_page, old_order); 6817 6818 rb_check_pages(cpu_buffer); 6819 } 6820 6821 atomic_dec(&buffer->record_disabled); 6822 mutex_unlock(&buffer->mutex); 6823 6824 return 0; 6825 6826 error: 6827 buffer->subbuf_order = old_order; 6828 buffer->subbuf_size = old_size; 6829 6830 atomic_dec(&buffer->record_disabled); 6831 mutex_unlock(&buffer->mutex); 6832 6833 for_each_buffer_cpu(buffer, cpu) { 6834 cpu_buffer = buffer->buffers[cpu]; 6835 6836 if (!cpu_buffer->nr_pages_to_update) 6837 continue; 6838 6839 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 6840 list_del_init(&bpage->list); 6841 free_buffer_page(bpage); 6842 } 6843 } 6844 6845 return err; 6846 } 6847 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 6848 6849 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6850 { 6851 struct page *page; 6852 6853 if (cpu_buffer->meta_page) 6854 return 0; 6855 6856 page = alloc_page(GFP_USER | __GFP_ZERO); 6857 if (!page) 6858 return -ENOMEM; 6859 6860 cpu_buffer->meta_page = page_to_virt(page); 6861 6862 return 0; 6863 } 6864 6865 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6866 { 6867 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 6868 6869 free_page(addr); 6870 cpu_buffer->meta_page = NULL; 6871 } 6872 6873 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 6874 unsigned long *subbuf_ids) 6875 { 6876 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6877 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 6878 struct buffer_page *first_subbuf, *subbuf; 6879 int id = 0; 6880 6881 subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page; 6882 cpu_buffer->reader_page->id = id++; 6883 6884 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 6885 do { 6886 if (WARN_ON(id >= nr_subbufs)) 6887 break; 6888 6889 subbuf_ids[id] = (unsigned long)subbuf->page; 6890 subbuf->id = id; 6891 6892 rb_inc_page(&subbuf); 6893 id++; 6894 } while (subbuf != first_subbuf); 6895 6896 /* install subbuf ID to kern VA translation */ 6897 cpu_buffer->subbuf_ids = subbuf_ids; 6898 6899 meta->meta_struct_len = sizeof(*meta); 6900 meta->nr_subbufs = nr_subbufs; 6901 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6902 meta->meta_page_size = meta->subbuf_size; 6903 6904 rb_update_meta_page(cpu_buffer); 6905 } 6906 6907 static struct ring_buffer_per_cpu * 6908 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 6909 { 6910 struct ring_buffer_per_cpu *cpu_buffer; 6911 6912 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6913 return ERR_PTR(-EINVAL); 6914 6915 cpu_buffer = buffer->buffers[cpu]; 6916 6917 mutex_lock(&cpu_buffer->mapping_lock); 6918 6919 if (!cpu_buffer->user_mapped) { 6920 mutex_unlock(&cpu_buffer->mapping_lock); 6921 return ERR_PTR(-ENODEV); 6922 } 6923 6924 return cpu_buffer; 6925 } 6926 6927 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6928 { 6929 mutex_unlock(&cpu_buffer->mapping_lock); 6930 } 6931 6932 /* 6933 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 6934 * to be set-up or torn-down. 6935 */ 6936 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 6937 bool inc) 6938 { 6939 unsigned long flags; 6940 6941 lockdep_assert_held(&cpu_buffer->mapping_lock); 6942 6943 /* mapped is always greater or equal to user_mapped */ 6944 if (WARN_ON(cpu_buffer->mapped < cpu_buffer->user_mapped)) 6945 return -EINVAL; 6946 6947 if (inc && cpu_buffer->mapped == UINT_MAX) 6948 return -EBUSY; 6949 6950 if (WARN_ON(!inc && cpu_buffer->user_mapped == 0)) 6951 return -EINVAL; 6952 6953 mutex_lock(&cpu_buffer->buffer->mutex); 6954 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6955 6956 if (inc) { 6957 cpu_buffer->user_mapped++; 6958 cpu_buffer->mapped++; 6959 } else { 6960 cpu_buffer->user_mapped--; 6961 cpu_buffer->mapped--; 6962 } 6963 6964 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6965 mutex_unlock(&cpu_buffer->buffer->mutex); 6966 6967 return 0; 6968 } 6969 6970 /* 6971 * +--------------+ pgoff == 0 6972 * | meta page | 6973 * +--------------+ pgoff == 1 6974 * | subbuffer 0 | 6975 * | | 6976 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 6977 * | subbuffer 1 | 6978 * | | 6979 * ... 6980 */ 6981 #ifdef CONFIG_MMU 6982 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 6983 struct vm_area_struct *vma) 6984 { 6985 unsigned long nr_subbufs, nr_pages, nr_vma_pages, pgoff = vma->vm_pgoff; 6986 unsigned int subbuf_pages, subbuf_order; 6987 struct page **pages; 6988 int p = 0, s = 0; 6989 int err; 6990 6991 /* Refuse MP_PRIVATE or writable mappings */ 6992 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 6993 !(vma->vm_flags & VM_MAYSHARE)) 6994 return -EPERM; 6995 6996 subbuf_order = cpu_buffer->buffer->subbuf_order; 6997 subbuf_pages = 1 << subbuf_order; 6998 6999 if (subbuf_order && pgoff % subbuf_pages) 7000 return -EINVAL; 7001 7002 /* 7003 * Make sure the mapping cannot become writable later. Also tell the VM 7004 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 7005 */ 7006 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 7007 VM_MAYWRITE); 7008 7009 lockdep_assert_held(&cpu_buffer->mapping_lock); 7010 7011 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 7012 nr_pages = ((nr_subbufs + 1) << subbuf_order); /* + meta-page */ 7013 if (nr_pages <= pgoff) 7014 return -EINVAL; 7015 7016 nr_pages -= pgoff; 7017 7018 nr_vma_pages = vma_pages(vma); 7019 if (!nr_vma_pages || nr_vma_pages > nr_pages) 7020 return -EINVAL; 7021 7022 nr_pages = nr_vma_pages; 7023 7024 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL); 7025 if (!pages) 7026 return -ENOMEM; 7027 7028 if (!pgoff) { 7029 unsigned long meta_page_padding; 7030 7031 pages[p++] = virt_to_page(cpu_buffer->meta_page); 7032 7033 /* 7034 * Pad with the zero-page to align the meta-page with the 7035 * sub-buffers. 7036 */ 7037 meta_page_padding = subbuf_pages - 1; 7038 while (meta_page_padding-- && p < nr_pages) { 7039 unsigned long __maybe_unused zero_addr = 7040 vma->vm_start + (PAGE_SIZE * p); 7041 7042 pages[p++] = ZERO_PAGE(zero_addr); 7043 } 7044 } else { 7045 /* Skip the meta-page */ 7046 pgoff -= subbuf_pages; 7047 7048 s += pgoff / subbuf_pages; 7049 } 7050 7051 while (p < nr_pages) { 7052 struct page *page; 7053 int off = 0; 7054 7055 if (WARN_ON_ONCE(s >= nr_subbufs)) { 7056 err = -EINVAL; 7057 goto out; 7058 } 7059 7060 page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]); 7061 7062 for (; off < (1 << (subbuf_order)); off++, page++) { 7063 if (p >= nr_pages) 7064 break; 7065 7066 pages[p++] = page; 7067 } 7068 s++; 7069 } 7070 7071 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 7072 7073 out: 7074 kfree(pages); 7075 7076 return err; 7077 } 7078 #else 7079 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7080 struct vm_area_struct *vma) 7081 { 7082 return -EOPNOTSUPP; 7083 } 7084 #endif 7085 7086 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 7087 struct vm_area_struct *vma) 7088 { 7089 struct ring_buffer_per_cpu *cpu_buffer; 7090 unsigned long flags, *subbuf_ids; 7091 int err = 0; 7092 7093 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7094 return -EINVAL; 7095 7096 cpu_buffer = buffer->buffers[cpu]; 7097 7098 mutex_lock(&cpu_buffer->mapping_lock); 7099 7100 if (cpu_buffer->user_mapped) { 7101 err = __rb_map_vma(cpu_buffer, vma); 7102 if (!err) 7103 err = __rb_inc_dec_mapped(cpu_buffer, true); 7104 mutex_unlock(&cpu_buffer->mapping_lock); 7105 return err; 7106 } 7107 7108 /* prevent another thread from changing buffer/sub-buffer sizes */ 7109 mutex_lock(&buffer->mutex); 7110 7111 err = rb_alloc_meta_page(cpu_buffer); 7112 if (err) 7113 goto unlock; 7114 7115 /* subbuf_ids include the reader while nr_pages does not */ 7116 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 7117 if (!subbuf_ids) { 7118 rb_free_meta_page(cpu_buffer); 7119 err = -ENOMEM; 7120 goto unlock; 7121 } 7122 7123 atomic_inc(&cpu_buffer->resize_disabled); 7124 7125 /* 7126 * Lock all readers to block any subbuf swap until the subbuf IDs are 7127 * assigned. 7128 */ 7129 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7130 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 7131 7132 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7133 7134 err = __rb_map_vma(cpu_buffer, vma); 7135 if (!err) { 7136 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7137 /* This is the first time it is mapped by user */ 7138 cpu_buffer->mapped++; 7139 cpu_buffer->user_mapped = 1; 7140 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7141 } else { 7142 kfree(cpu_buffer->subbuf_ids); 7143 cpu_buffer->subbuf_ids = NULL; 7144 rb_free_meta_page(cpu_buffer); 7145 atomic_dec(&cpu_buffer->resize_disabled); 7146 } 7147 7148 unlock: 7149 mutex_unlock(&buffer->mutex); 7150 mutex_unlock(&cpu_buffer->mapping_lock); 7151 7152 return err; 7153 } 7154 7155 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 7156 { 7157 struct ring_buffer_per_cpu *cpu_buffer; 7158 unsigned long flags; 7159 int err = 0; 7160 7161 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7162 return -EINVAL; 7163 7164 cpu_buffer = buffer->buffers[cpu]; 7165 7166 mutex_lock(&cpu_buffer->mapping_lock); 7167 7168 if (!cpu_buffer->user_mapped) { 7169 err = -ENODEV; 7170 goto out; 7171 } else if (cpu_buffer->user_mapped > 1) { 7172 __rb_inc_dec_mapped(cpu_buffer, false); 7173 goto out; 7174 } 7175 7176 mutex_lock(&buffer->mutex); 7177 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7178 7179 /* This is the last user space mapping */ 7180 if (!WARN_ON_ONCE(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7181 cpu_buffer->mapped--; 7182 cpu_buffer->user_mapped = 0; 7183 7184 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7185 7186 kfree(cpu_buffer->subbuf_ids); 7187 cpu_buffer->subbuf_ids = NULL; 7188 rb_free_meta_page(cpu_buffer); 7189 atomic_dec(&cpu_buffer->resize_disabled); 7190 7191 mutex_unlock(&buffer->mutex); 7192 7193 out: 7194 mutex_unlock(&cpu_buffer->mapping_lock); 7195 7196 return err; 7197 } 7198 7199 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 7200 { 7201 struct ring_buffer_per_cpu *cpu_buffer; 7202 struct buffer_page *reader; 7203 unsigned long missed_events; 7204 unsigned long reader_size; 7205 unsigned long flags; 7206 7207 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 7208 if (IS_ERR(cpu_buffer)) 7209 return (int)PTR_ERR(cpu_buffer); 7210 7211 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7212 7213 consume: 7214 if (rb_per_cpu_empty(cpu_buffer)) 7215 goto out; 7216 7217 reader_size = rb_page_size(cpu_buffer->reader_page); 7218 7219 /* 7220 * There are data to be read on the current reader page, we can 7221 * return to the caller. But before that, we assume the latter will read 7222 * everything. Let's update the kernel reader accordingly. 7223 */ 7224 if (cpu_buffer->reader_page->read < reader_size) { 7225 while (cpu_buffer->reader_page->read < reader_size) 7226 rb_advance_reader(cpu_buffer); 7227 goto out; 7228 } 7229 7230 reader = rb_get_reader_page(cpu_buffer); 7231 if (WARN_ON(!reader)) 7232 goto out; 7233 7234 /* Check if any events were dropped */ 7235 missed_events = cpu_buffer->lost_events; 7236 7237 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 7238 if (missed_events) { 7239 struct buffer_data_page *bpage = reader->page; 7240 unsigned int commit; 7241 /* 7242 * Use the real_end for the data size, 7243 * This gives us a chance to store the lost events 7244 * on the page. 7245 */ 7246 if (reader->real_end) 7247 local_set(&bpage->commit, reader->real_end); 7248 /* 7249 * If there is room at the end of the page to save the 7250 * missed events, then record it there. 7251 */ 7252 commit = rb_page_size(reader); 7253 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7254 memcpy(&bpage->data[commit], &missed_events, 7255 sizeof(missed_events)); 7256 local_add(RB_MISSED_STORED, &bpage->commit); 7257 } 7258 local_add(RB_MISSED_EVENTS, &bpage->commit); 7259 } 7260 } else { 7261 /* 7262 * There really shouldn't be any missed events if the commit 7263 * is on the reader page. 7264 */ 7265 WARN_ON_ONCE(missed_events); 7266 } 7267 7268 cpu_buffer->lost_events = 0; 7269 7270 goto consume; 7271 7272 out: 7273 /* Some archs do not have data cache coherency between kernel and user-space */ 7274 flush_dcache_folio(virt_to_folio(cpu_buffer->reader_page->page)); 7275 7276 rb_update_meta_page(cpu_buffer); 7277 7278 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7279 rb_put_mapped_buffer(cpu_buffer); 7280 7281 return 0; 7282 } 7283 7284 /* 7285 * We only allocate new buffers, never free them if the CPU goes down. 7286 * If we were to free the buffer, then the user would lose any trace that was in 7287 * the buffer. 7288 */ 7289 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 7290 { 7291 struct trace_buffer *buffer; 7292 long nr_pages_same; 7293 int cpu_i; 7294 unsigned long nr_pages; 7295 7296 buffer = container_of(node, struct trace_buffer, node); 7297 if (cpumask_test_cpu(cpu, buffer->cpumask)) 7298 return 0; 7299 7300 nr_pages = 0; 7301 nr_pages_same = 1; 7302 /* check if all cpu sizes are same */ 7303 for_each_buffer_cpu(buffer, cpu_i) { 7304 /* fill in the size from first enabled cpu */ 7305 if (nr_pages == 0) 7306 nr_pages = buffer->buffers[cpu_i]->nr_pages; 7307 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 7308 nr_pages_same = 0; 7309 break; 7310 } 7311 } 7312 /* allocate minimum pages, user can later expand it */ 7313 if (!nr_pages_same) 7314 nr_pages = 2; 7315 buffer->buffers[cpu] = 7316 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 7317 if (!buffer->buffers[cpu]) { 7318 WARN(1, "failed to allocate ring buffer on CPU %u\n", 7319 cpu); 7320 return -ENOMEM; 7321 } 7322 smp_wmb(); 7323 cpumask_set_cpu(cpu, buffer->cpumask); 7324 return 0; 7325 } 7326 7327 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 7328 /* 7329 * This is a basic integrity check of the ring buffer. 7330 * Late in the boot cycle this test will run when configured in. 7331 * It will kick off a thread per CPU that will go into a loop 7332 * writing to the per cpu ring buffer various sizes of data. 7333 * Some of the data will be large items, some small. 7334 * 7335 * Another thread is created that goes into a spin, sending out 7336 * IPIs to the other CPUs to also write into the ring buffer. 7337 * this is to test the nesting ability of the buffer. 7338 * 7339 * Basic stats are recorded and reported. If something in the 7340 * ring buffer should happen that's not expected, a big warning 7341 * is displayed and all ring buffers are disabled. 7342 */ 7343 static struct task_struct *rb_threads[NR_CPUS] __initdata; 7344 7345 struct rb_test_data { 7346 struct trace_buffer *buffer; 7347 unsigned long events; 7348 unsigned long bytes_written; 7349 unsigned long bytes_alloc; 7350 unsigned long bytes_dropped; 7351 unsigned long events_nested; 7352 unsigned long bytes_written_nested; 7353 unsigned long bytes_alloc_nested; 7354 unsigned long bytes_dropped_nested; 7355 int min_size_nested; 7356 int max_size_nested; 7357 int max_size; 7358 int min_size; 7359 int cpu; 7360 int cnt; 7361 }; 7362 7363 static struct rb_test_data rb_data[NR_CPUS] __initdata; 7364 7365 /* 1 meg per cpu */ 7366 #define RB_TEST_BUFFER_SIZE 1048576 7367 7368 static char rb_string[] __initdata = 7369 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 7370 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 7371 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 7372 7373 static bool rb_test_started __initdata; 7374 7375 struct rb_item { 7376 int size; 7377 char str[]; 7378 }; 7379 7380 static __init int rb_write_something(struct rb_test_data *data, bool nested) 7381 { 7382 struct ring_buffer_event *event; 7383 struct rb_item *item; 7384 bool started; 7385 int event_len; 7386 int size; 7387 int len; 7388 int cnt; 7389 7390 /* Have nested writes different that what is written */ 7391 cnt = data->cnt + (nested ? 27 : 0); 7392 7393 /* Multiply cnt by ~e, to make some unique increment */ 7394 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 7395 7396 len = size + sizeof(struct rb_item); 7397 7398 started = rb_test_started; 7399 /* read rb_test_started before checking buffer enabled */ 7400 smp_rmb(); 7401 7402 event = ring_buffer_lock_reserve(data->buffer, len); 7403 if (!event) { 7404 /* Ignore dropped events before test starts. */ 7405 if (started) { 7406 if (nested) 7407 data->bytes_dropped_nested += len; 7408 else 7409 data->bytes_dropped += len; 7410 } 7411 return len; 7412 } 7413 7414 event_len = ring_buffer_event_length(event); 7415 7416 if (RB_WARN_ON(data->buffer, event_len < len)) 7417 goto out; 7418 7419 item = ring_buffer_event_data(event); 7420 item->size = size; 7421 memcpy(item->str, rb_string, size); 7422 7423 if (nested) { 7424 data->bytes_alloc_nested += event_len; 7425 data->bytes_written_nested += len; 7426 data->events_nested++; 7427 if (!data->min_size_nested || len < data->min_size_nested) 7428 data->min_size_nested = len; 7429 if (len > data->max_size_nested) 7430 data->max_size_nested = len; 7431 } else { 7432 data->bytes_alloc += event_len; 7433 data->bytes_written += len; 7434 data->events++; 7435 if (!data->min_size || len < data->min_size) 7436 data->max_size = len; 7437 if (len > data->max_size) 7438 data->max_size = len; 7439 } 7440 7441 out: 7442 ring_buffer_unlock_commit(data->buffer); 7443 7444 return 0; 7445 } 7446 7447 static __init int rb_test(void *arg) 7448 { 7449 struct rb_test_data *data = arg; 7450 7451 while (!kthread_should_stop()) { 7452 rb_write_something(data, false); 7453 data->cnt++; 7454 7455 set_current_state(TASK_INTERRUPTIBLE); 7456 /* Now sleep between a min of 100-300us and a max of 1ms */ 7457 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 7458 } 7459 7460 return 0; 7461 } 7462 7463 static __init void rb_ipi(void *ignore) 7464 { 7465 struct rb_test_data *data; 7466 int cpu = smp_processor_id(); 7467 7468 data = &rb_data[cpu]; 7469 rb_write_something(data, true); 7470 } 7471 7472 static __init int rb_hammer_test(void *arg) 7473 { 7474 while (!kthread_should_stop()) { 7475 7476 /* Send an IPI to all cpus to write data! */ 7477 smp_call_function(rb_ipi, NULL, 1); 7478 /* No sleep, but for non preempt, let others run */ 7479 schedule(); 7480 } 7481 7482 return 0; 7483 } 7484 7485 static __init int test_ringbuffer(void) 7486 { 7487 struct task_struct *rb_hammer; 7488 struct trace_buffer *buffer; 7489 int cpu; 7490 int ret = 0; 7491 7492 if (security_locked_down(LOCKDOWN_TRACEFS)) { 7493 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 7494 return 0; 7495 } 7496 7497 pr_info("Running ring buffer tests...\n"); 7498 7499 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 7500 if (WARN_ON(!buffer)) 7501 return 0; 7502 7503 /* Disable buffer so that threads can't write to it yet */ 7504 ring_buffer_record_off(buffer); 7505 7506 for_each_online_cpu(cpu) { 7507 rb_data[cpu].buffer = buffer; 7508 rb_data[cpu].cpu = cpu; 7509 rb_data[cpu].cnt = cpu; 7510 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 7511 cpu, "rbtester/%u"); 7512 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 7513 pr_cont("FAILED\n"); 7514 ret = PTR_ERR(rb_threads[cpu]); 7515 goto out_free; 7516 } 7517 } 7518 7519 /* Now create the rb hammer! */ 7520 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 7521 if (WARN_ON(IS_ERR(rb_hammer))) { 7522 pr_cont("FAILED\n"); 7523 ret = PTR_ERR(rb_hammer); 7524 goto out_free; 7525 } 7526 7527 ring_buffer_record_on(buffer); 7528 /* 7529 * Show buffer is enabled before setting rb_test_started. 7530 * Yes there's a small race window where events could be 7531 * dropped and the thread wont catch it. But when a ring 7532 * buffer gets enabled, there will always be some kind of 7533 * delay before other CPUs see it. Thus, we don't care about 7534 * those dropped events. We care about events dropped after 7535 * the threads see that the buffer is active. 7536 */ 7537 smp_wmb(); 7538 rb_test_started = true; 7539 7540 set_current_state(TASK_INTERRUPTIBLE); 7541 /* Just run for 10 seconds */; 7542 schedule_timeout(10 * HZ); 7543 7544 kthread_stop(rb_hammer); 7545 7546 out_free: 7547 for_each_online_cpu(cpu) { 7548 if (!rb_threads[cpu]) 7549 break; 7550 kthread_stop(rb_threads[cpu]); 7551 } 7552 if (ret) { 7553 ring_buffer_free(buffer); 7554 return ret; 7555 } 7556 7557 /* Report! */ 7558 pr_info("finished\n"); 7559 for_each_online_cpu(cpu) { 7560 struct ring_buffer_event *event; 7561 struct rb_test_data *data = &rb_data[cpu]; 7562 struct rb_item *item; 7563 unsigned long total_events; 7564 unsigned long total_dropped; 7565 unsigned long total_written; 7566 unsigned long total_alloc; 7567 unsigned long total_read = 0; 7568 unsigned long total_size = 0; 7569 unsigned long total_len = 0; 7570 unsigned long total_lost = 0; 7571 unsigned long lost; 7572 int big_event_size; 7573 int small_event_size; 7574 7575 ret = -1; 7576 7577 total_events = data->events + data->events_nested; 7578 total_written = data->bytes_written + data->bytes_written_nested; 7579 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 7580 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 7581 7582 big_event_size = data->max_size + data->max_size_nested; 7583 small_event_size = data->min_size + data->min_size_nested; 7584 7585 pr_info("CPU %d:\n", cpu); 7586 pr_info(" events: %ld\n", total_events); 7587 pr_info(" dropped bytes: %ld\n", total_dropped); 7588 pr_info(" alloced bytes: %ld\n", total_alloc); 7589 pr_info(" written bytes: %ld\n", total_written); 7590 pr_info(" biggest event: %d\n", big_event_size); 7591 pr_info(" smallest event: %d\n", small_event_size); 7592 7593 if (RB_WARN_ON(buffer, total_dropped)) 7594 break; 7595 7596 ret = 0; 7597 7598 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 7599 total_lost += lost; 7600 item = ring_buffer_event_data(event); 7601 total_len += ring_buffer_event_length(event); 7602 total_size += item->size + sizeof(struct rb_item); 7603 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 7604 pr_info("FAILED!\n"); 7605 pr_info("buffer had: %.*s\n", item->size, item->str); 7606 pr_info("expected: %.*s\n", item->size, rb_string); 7607 RB_WARN_ON(buffer, 1); 7608 ret = -1; 7609 break; 7610 } 7611 total_read++; 7612 } 7613 if (ret) 7614 break; 7615 7616 ret = -1; 7617 7618 pr_info(" read events: %ld\n", total_read); 7619 pr_info(" lost events: %ld\n", total_lost); 7620 pr_info(" total events: %ld\n", total_lost + total_read); 7621 pr_info(" recorded len bytes: %ld\n", total_len); 7622 pr_info(" recorded size bytes: %ld\n", total_size); 7623 if (total_lost) { 7624 pr_info(" With dropped events, record len and size may not match\n" 7625 " alloced and written from above\n"); 7626 } else { 7627 if (RB_WARN_ON(buffer, total_len != total_alloc || 7628 total_size != total_written)) 7629 break; 7630 } 7631 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 7632 break; 7633 7634 ret = 0; 7635 } 7636 if (!ret) 7637 pr_info("Ring buffer PASSED!\n"); 7638 7639 ring_buffer_free(buffer); 7640 return 0; 7641 } 7642 7643 late_initcall(test_ringbuffer); 7644 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 7645