1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <[email protected]> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/cacheflush.h> 13 #include <linux/trace_seq.h> 14 #include <linux/spinlock.h> 15 #include <linux/irq_work.h> 16 #include <linux/security.h> 17 #include <linux/uaccess.h> 18 #include <linux/hardirq.h> 19 #include <linux/kthread.h> /* for self test */ 20 #include <linux/module.h> 21 #include <linux/percpu.h> 22 #include <linux/mutex.h> 23 #include <linux/delay.h> 24 #include <linux/slab.h> 25 #include <linux/init.h> 26 #include <linux/hash.h> 27 #include <linux/list.h> 28 #include <linux/cpu.h> 29 #include <linux/oom.h> 30 #include <linux/mm.h> 31 32 #include <asm/local64.h> 33 #include <asm/local.h> 34 35 #include "trace.h" 36 37 /* 38 * The "absolute" timestamp in the buffer is only 59 bits. 39 * If a clock has the 5 MSBs set, it needs to be saved and 40 * reinserted. 41 */ 42 #define TS_MSB (0xf8ULL << 56) 43 #define ABS_TS_MASK (~TS_MSB) 44 45 static void update_pages_handler(struct work_struct *work); 46 47 #define RING_BUFFER_META_MAGIC 0xBADFEED 48 49 struct ring_buffer_meta { 50 int magic; 51 int struct_size; 52 unsigned long text_addr; 53 unsigned long data_addr; 54 unsigned long first_buffer; 55 unsigned long head_buffer; 56 unsigned long commit_buffer; 57 __u32 subbuf_size; 58 __u32 nr_subbufs; 59 int buffers[]; 60 }; 61 62 /* 63 * The ring buffer header is special. We must manually up keep it. 64 */ 65 int ring_buffer_print_entry_header(struct trace_seq *s) 66 { 67 trace_seq_puts(s, "# compressed entry header\n"); 68 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 69 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 70 trace_seq_puts(s, "\tarray : 32 bits\n"); 71 trace_seq_putc(s, '\n'); 72 trace_seq_printf(s, "\tpadding : type == %d\n", 73 RINGBUF_TYPE_PADDING); 74 trace_seq_printf(s, "\ttime_extend : type == %d\n", 75 RINGBUF_TYPE_TIME_EXTEND); 76 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 77 RINGBUF_TYPE_TIME_STAMP); 78 trace_seq_printf(s, "\tdata max type_len == %d\n", 79 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 80 81 return !trace_seq_has_overflowed(s); 82 } 83 84 /* 85 * The ring buffer is made up of a list of pages. A separate list of pages is 86 * allocated for each CPU. A writer may only write to a buffer that is 87 * associated with the CPU it is currently executing on. A reader may read 88 * from any per cpu buffer. 89 * 90 * The reader is special. For each per cpu buffer, the reader has its own 91 * reader page. When a reader has read the entire reader page, this reader 92 * page is swapped with another page in the ring buffer. 93 * 94 * Now, as long as the writer is off the reader page, the reader can do what 95 * ever it wants with that page. The writer will never write to that page 96 * again (as long as it is out of the ring buffer). 97 * 98 * Here's some silly ASCII art. 99 * 100 * +------+ 101 * |reader| RING BUFFER 102 * |page | 103 * +------+ +---+ +---+ +---+ 104 * | |-->| |-->| | 105 * +---+ +---+ +---+ 106 * ^ | 107 * | | 108 * +---------------+ 109 * 110 * 111 * +------+ 112 * |reader| RING BUFFER 113 * |page |------------------v 114 * +------+ +---+ +---+ +---+ 115 * | |-->| |-->| | 116 * +---+ +---+ +---+ 117 * ^ | 118 * | | 119 * +---------------+ 120 * 121 * 122 * +------+ 123 * |reader| RING BUFFER 124 * |page |------------------v 125 * +------+ +---+ +---+ +---+ 126 * ^ | |-->| |-->| | 127 * | +---+ +---+ +---+ 128 * | | 129 * | | 130 * +------------------------------+ 131 * 132 * 133 * +------+ 134 * |buffer| RING BUFFER 135 * |page |------------------v 136 * +------+ +---+ +---+ +---+ 137 * ^ | | | |-->| | 138 * | New +---+ +---+ +---+ 139 * | Reader------^ | 140 * | page | 141 * +------------------------------+ 142 * 143 * 144 * After we make this swap, the reader can hand this page off to the splice 145 * code and be done with it. It can even allocate a new page if it needs to 146 * and swap that into the ring buffer. 147 * 148 * We will be using cmpxchg soon to make all this lockless. 149 * 150 */ 151 152 /* Used for individual buffers (after the counter) */ 153 #define RB_BUFFER_OFF (1 << 20) 154 155 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 156 157 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 158 #define RB_ALIGNMENT 4U 159 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 160 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 161 162 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 163 # define RB_FORCE_8BYTE_ALIGNMENT 0 164 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 165 #else 166 # define RB_FORCE_8BYTE_ALIGNMENT 1 167 # define RB_ARCH_ALIGNMENT 8U 168 #endif 169 170 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 171 172 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 173 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 174 175 enum { 176 RB_LEN_TIME_EXTEND = 8, 177 RB_LEN_TIME_STAMP = 8, 178 }; 179 180 #define skip_time_extend(event) \ 181 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 182 183 #define extended_time(event) \ 184 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 185 186 static inline bool rb_null_event(struct ring_buffer_event *event) 187 { 188 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 189 } 190 191 static void rb_event_set_padding(struct ring_buffer_event *event) 192 { 193 /* padding has a NULL time_delta */ 194 event->type_len = RINGBUF_TYPE_PADDING; 195 event->time_delta = 0; 196 } 197 198 static unsigned 199 rb_event_data_length(struct ring_buffer_event *event) 200 { 201 unsigned length; 202 203 if (event->type_len) 204 length = event->type_len * RB_ALIGNMENT; 205 else 206 length = event->array[0]; 207 return length + RB_EVNT_HDR_SIZE; 208 } 209 210 /* 211 * Return the length of the given event. Will return 212 * the length of the time extend if the event is a 213 * time extend. 214 */ 215 static inline unsigned 216 rb_event_length(struct ring_buffer_event *event) 217 { 218 switch (event->type_len) { 219 case RINGBUF_TYPE_PADDING: 220 if (rb_null_event(event)) 221 /* undefined */ 222 return -1; 223 return event->array[0] + RB_EVNT_HDR_SIZE; 224 225 case RINGBUF_TYPE_TIME_EXTEND: 226 return RB_LEN_TIME_EXTEND; 227 228 case RINGBUF_TYPE_TIME_STAMP: 229 return RB_LEN_TIME_STAMP; 230 231 case RINGBUF_TYPE_DATA: 232 return rb_event_data_length(event); 233 default: 234 WARN_ON_ONCE(1); 235 } 236 /* not hit */ 237 return 0; 238 } 239 240 /* 241 * Return total length of time extend and data, 242 * or just the event length for all other events. 243 */ 244 static inline unsigned 245 rb_event_ts_length(struct ring_buffer_event *event) 246 { 247 unsigned len = 0; 248 249 if (extended_time(event)) { 250 /* time extends include the data event after it */ 251 len = RB_LEN_TIME_EXTEND; 252 event = skip_time_extend(event); 253 } 254 return len + rb_event_length(event); 255 } 256 257 /** 258 * ring_buffer_event_length - return the length of the event 259 * @event: the event to get the length of 260 * 261 * Returns the size of the data load of a data event. 262 * If the event is something other than a data event, it 263 * returns the size of the event itself. With the exception 264 * of a TIME EXTEND, where it still returns the size of the 265 * data load of the data event after it. 266 */ 267 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 268 { 269 unsigned length; 270 271 if (extended_time(event)) 272 event = skip_time_extend(event); 273 274 length = rb_event_length(event); 275 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 276 return length; 277 length -= RB_EVNT_HDR_SIZE; 278 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 279 length -= sizeof(event->array[0]); 280 return length; 281 } 282 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 283 284 /* inline for ring buffer fast paths */ 285 static __always_inline void * 286 rb_event_data(struct ring_buffer_event *event) 287 { 288 if (extended_time(event)) 289 event = skip_time_extend(event); 290 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 291 /* If length is in len field, then array[0] has the data */ 292 if (event->type_len) 293 return (void *)&event->array[0]; 294 /* Otherwise length is in array[0] and array[1] has the data */ 295 return (void *)&event->array[1]; 296 } 297 298 /** 299 * ring_buffer_event_data - return the data of the event 300 * @event: the event to get the data from 301 */ 302 void *ring_buffer_event_data(struct ring_buffer_event *event) 303 { 304 return rb_event_data(event); 305 } 306 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 307 308 #define for_each_buffer_cpu(buffer, cpu) \ 309 for_each_cpu(cpu, buffer->cpumask) 310 311 #define for_each_online_buffer_cpu(buffer, cpu) \ 312 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 313 314 #define TS_SHIFT 27 315 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 316 #define TS_DELTA_TEST (~TS_MASK) 317 318 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 319 { 320 u64 ts; 321 322 ts = event->array[0]; 323 ts <<= TS_SHIFT; 324 ts += event->time_delta; 325 326 return ts; 327 } 328 329 /* Flag when events were overwritten */ 330 #define RB_MISSED_EVENTS (1 << 31) 331 /* Missed count stored at end */ 332 #define RB_MISSED_STORED (1 << 30) 333 334 #define RB_MISSED_MASK (3 << 30) 335 336 struct buffer_data_page { 337 u64 time_stamp; /* page time stamp */ 338 local_t commit; /* write committed index */ 339 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 340 }; 341 342 struct buffer_data_read_page { 343 unsigned order; /* order of the page */ 344 struct buffer_data_page *data; /* actual data, stored in this page */ 345 }; 346 347 /* 348 * Note, the buffer_page list must be first. The buffer pages 349 * are allocated in cache lines, which means that each buffer 350 * page will be at the beginning of a cache line, and thus 351 * the least significant bits will be zero. We use this to 352 * add flags in the list struct pointers, to make the ring buffer 353 * lockless. 354 */ 355 struct buffer_page { 356 struct list_head list; /* list of buffer pages */ 357 local_t write; /* index for next write */ 358 unsigned read; /* index for next read */ 359 local_t entries; /* entries on this page */ 360 unsigned long real_end; /* real end of data */ 361 unsigned order; /* order of the page */ 362 u32 id:30; /* ID for external mapping */ 363 u32 range:1; /* Mapped via a range */ 364 struct buffer_data_page *page; /* Actual data page */ 365 }; 366 367 /* 368 * The buffer page counters, write and entries, must be reset 369 * atomically when crossing page boundaries. To synchronize this 370 * update, two counters are inserted into the number. One is 371 * the actual counter for the write position or count on the page. 372 * 373 * The other is a counter of updaters. Before an update happens 374 * the update partition of the counter is incremented. This will 375 * allow the updater to update the counter atomically. 376 * 377 * The counter is 20 bits, and the state data is 12. 378 */ 379 #define RB_WRITE_MASK 0xfffff 380 #define RB_WRITE_INTCNT (1 << 20) 381 382 static void rb_init_page(struct buffer_data_page *bpage) 383 { 384 local_set(&bpage->commit, 0); 385 } 386 387 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 388 { 389 return local_read(&bpage->page->commit); 390 } 391 392 static void free_buffer_page(struct buffer_page *bpage) 393 { 394 /* Range pages are not to be freed */ 395 if (!bpage->range) 396 free_pages((unsigned long)bpage->page, bpage->order); 397 kfree(bpage); 398 } 399 400 /* 401 * We need to fit the time_stamp delta into 27 bits. 402 */ 403 static inline bool test_time_stamp(u64 delta) 404 { 405 return !!(delta & TS_DELTA_TEST); 406 } 407 408 struct rb_irq_work { 409 struct irq_work work; 410 wait_queue_head_t waiters; 411 wait_queue_head_t full_waiters; 412 atomic_t seq; 413 bool waiters_pending; 414 bool full_waiters_pending; 415 bool wakeup_full; 416 }; 417 418 /* 419 * Structure to hold event state and handle nested events. 420 */ 421 struct rb_event_info { 422 u64 ts; 423 u64 delta; 424 u64 before; 425 u64 after; 426 unsigned long length; 427 struct buffer_page *tail_page; 428 int add_timestamp; 429 }; 430 431 /* 432 * Used for the add_timestamp 433 * NONE 434 * EXTEND - wants a time extend 435 * ABSOLUTE - the buffer requests all events to have absolute time stamps 436 * FORCE - force a full time stamp. 437 */ 438 enum { 439 RB_ADD_STAMP_NONE = 0, 440 RB_ADD_STAMP_EXTEND = BIT(1), 441 RB_ADD_STAMP_ABSOLUTE = BIT(2), 442 RB_ADD_STAMP_FORCE = BIT(3) 443 }; 444 /* 445 * Used for which event context the event is in. 446 * TRANSITION = 0 447 * NMI = 1 448 * IRQ = 2 449 * SOFTIRQ = 3 450 * NORMAL = 4 451 * 452 * See trace_recursive_lock() comment below for more details. 453 */ 454 enum { 455 RB_CTX_TRANSITION, 456 RB_CTX_NMI, 457 RB_CTX_IRQ, 458 RB_CTX_SOFTIRQ, 459 RB_CTX_NORMAL, 460 RB_CTX_MAX 461 }; 462 463 struct rb_time_struct { 464 local64_t time; 465 }; 466 typedef struct rb_time_struct rb_time_t; 467 468 #define MAX_NEST 5 469 470 /* 471 * head_page == tail_page && head == tail then buffer is empty. 472 */ 473 struct ring_buffer_per_cpu { 474 int cpu; 475 atomic_t record_disabled; 476 atomic_t resize_disabled; 477 struct trace_buffer *buffer; 478 raw_spinlock_t reader_lock; /* serialize readers */ 479 arch_spinlock_t lock; 480 struct lock_class_key lock_key; 481 struct buffer_data_page *free_page; 482 unsigned long nr_pages; 483 unsigned int current_context; 484 struct list_head *pages; 485 /* pages generation counter, incremented when the list changes */ 486 unsigned long cnt; 487 struct buffer_page *head_page; /* read from head */ 488 struct buffer_page *tail_page; /* write to tail */ 489 struct buffer_page *commit_page; /* committed pages */ 490 struct buffer_page *reader_page; 491 unsigned long lost_events; 492 unsigned long last_overrun; 493 unsigned long nest; 494 local_t entries_bytes; 495 local_t entries; 496 local_t overrun; 497 local_t commit_overrun; 498 local_t dropped_events; 499 local_t committing; 500 local_t commits; 501 local_t pages_touched; 502 local_t pages_lost; 503 local_t pages_read; 504 long last_pages_touch; 505 size_t shortest_full; 506 unsigned long read; 507 unsigned long read_bytes; 508 rb_time_t write_stamp; 509 rb_time_t before_stamp; 510 u64 event_stamp[MAX_NEST]; 511 u64 read_stamp; 512 /* pages removed since last reset */ 513 unsigned long pages_removed; 514 515 unsigned int mapped; 516 unsigned int user_mapped; /* user space mapping */ 517 struct mutex mapping_lock; 518 unsigned long *subbuf_ids; /* ID to subbuf VA */ 519 struct trace_buffer_meta *meta_page; 520 struct ring_buffer_meta *ring_meta; 521 522 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 523 long nr_pages_to_update; 524 struct list_head new_pages; /* new pages to add */ 525 struct work_struct update_pages_work; 526 struct completion update_done; 527 528 struct rb_irq_work irq_work; 529 }; 530 531 struct trace_buffer { 532 unsigned flags; 533 int cpus; 534 atomic_t record_disabled; 535 atomic_t resizing; 536 cpumask_var_t cpumask; 537 538 struct lock_class_key *reader_lock_key; 539 540 struct mutex mutex; 541 542 struct ring_buffer_per_cpu **buffers; 543 544 struct hlist_node node; 545 u64 (*clock)(void); 546 547 struct rb_irq_work irq_work; 548 bool time_stamp_abs; 549 550 unsigned long range_addr_start; 551 unsigned long range_addr_end; 552 553 long last_text_delta; 554 long last_data_delta; 555 556 unsigned int subbuf_size; 557 unsigned int subbuf_order; 558 unsigned int max_data_size; 559 }; 560 561 struct ring_buffer_iter { 562 struct ring_buffer_per_cpu *cpu_buffer; 563 unsigned long head; 564 unsigned long next_event; 565 struct buffer_page *head_page; 566 struct buffer_page *cache_reader_page; 567 unsigned long cache_read; 568 unsigned long cache_pages_removed; 569 u64 read_stamp; 570 u64 page_stamp; 571 struct ring_buffer_event *event; 572 size_t event_size; 573 int missed_events; 574 }; 575 576 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 577 { 578 struct buffer_data_page field; 579 580 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 581 "offset:0;\tsize:%u;\tsigned:%u;\n", 582 (unsigned int)sizeof(field.time_stamp), 583 (unsigned int)is_signed_type(u64)); 584 585 trace_seq_printf(s, "\tfield: local_t commit;\t" 586 "offset:%u;\tsize:%u;\tsigned:%u;\n", 587 (unsigned int)offsetof(typeof(field), commit), 588 (unsigned int)sizeof(field.commit), 589 (unsigned int)is_signed_type(long)); 590 591 trace_seq_printf(s, "\tfield: int overwrite;\t" 592 "offset:%u;\tsize:%u;\tsigned:%u;\n", 593 (unsigned int)offsetof(typeof(field), commit), 594 1, 595 (unsigned int)is_signed_type(long)); 596 597 trace_seq_printf(s, "\tfield: char data;\t" 598 "offset:%u;\tsize:%u;\tsigned:%u;\n", 599 (unsigned int)offsetof(typeof(field), data), 600 (unsigned int)buffer->subbuf_size, 601 (unsigned int)is_signed_type(char)); 602 603 return !trace_seq_has_overflowed(s); 604 } 605 606 static inline void rb_time_read(rb_time_t *t, u64 *ret) 607 { 608 *ret = local64_read(&t->time); 609 } 610 static void rb_time_set(rb_time_t *t, u64 val) 611 { 612 local64_set(&t->time, val); 613 } 614 615 /* 616 * Enable this to make sure that the event passed to 617 * ring_buffer_event_time_stamp() is not committed and also 618 * is on the buffer that it passed in. 619 */ 620 //#define RB_VERIFY_EVENT 621 #ifdef RB_VERIFY_EVENT 622 static struct list_head *rb_list_head(struct list_head *list); 623 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 624 void *event) 625 { 626 struct buffer_page *page = cpu_buffer->commit_page; 627 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 628 struct list_head *next; 629 long commit, write; 630 unsigned long addr = (unsigned long)event; 631 bool done = false; 632 int stop = 0; 633 634 /* Make sure the event exists and is not committed yet */ 635 do { 636 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 637 done = true; 638 commit = local_read(&page->page->commit); 639 write = local_read(&page->write); 640 if (addr >= (unsigned long)&page->page->data[commit] && 641 addr < (unsigned long)&page->page->data[write]) 642 return; 643 644 next = rb_list_head(page->list.next); 645 page = list_entry(next, struct buffer_page, list); 646 } while (!done); 647 WARN_ON_ONCE(1); 648 } 649 #else 650 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 651 void *event) 652 { 653 } 654 #endif 655 656 /* 657 * The absolute time stamp drops the 5 MSBs and some clocks may 658 * require them. The rb_fix_abs_ts() will take a previous full 659 * time stamp, and add the 5 MSB of that time stamp on to the 660 * saved absolute time stamp. Then they are compared in case of 661 * the unlikely event that the latest time stamp incremented 662 * the 5 MSB. 663 */ 664 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 665 { 666 if (save_ts & TS_MSB) { 667 abs |= save_ts & TS_MSB; 668 /* Check for overflow */ 669 if (unlikely(abs < save_ts)) 670 abs += 1ULL << 59; 671 } 672 return abs; 673 } 674 675 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 676 677 /** 678 * ring_buffer_event_time_stamp - return the event's current time stamp 679 * @buffer: The buffer that the event is on 680 * @event: the event to get the time stamp of 681 * 682 * Note, this must be called after @event is reserved, and before it is 683 * committed to the ring buffer. And must be called from the same 684 * context where the event was reserved (normal, softirq, irq, etc). 685 * 686 * Returns the time stamp associated with the current event. 687 * If the event has an extended time stamp, then that is used as 688 * the time stamp to return. 689 * In the highly unlikely case that the event was nested more than 690 * the max nesting, then the write_stamp of the buffer is returned, 691 * otherwise current time is returned, but that really neither of 692 * the last two cases should ever happen. 693 */ 694 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 695 struct ring_buffer_event *event) 696 { 697 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 698 unsigned int nest; 699 u64 ts; 700 701 /* If the event includes an absolute time, then just use that */ 702 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 703 ts = rb_event_time_stamp(event); 704 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 705 } 706 707 nest = local_read(&cpu_buffer->committing); 708 verify_event(cpu_buffer, event); 709 if (WARN_ON_ONCE(!nest)) 710 goto fail; 711 712 /* Read the current saved nesting level time stamp */ 713 if (likely(--nest < MAX_NEST)) 714 return cpu_buffer->event_stamp[nest]; 715 716 /* Shouldn't happen, warn if it does */ 717 WARN_ONCE(1, "nest (%d) greater than max", nest); 718 719 fail: 720 rb_time_read(&cpu_buffer->write_stamp, &ts); 721 722 return ts; 723 } 724 725 /** 726 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 727 * @buffer: The ring_buffer to get the number of pages from 728 * @cpu: The cpu of the ring_buffer to get the number of pages from 729 * 730 * Returns the number of pages that have content in the ring buffer. 731 */ 732 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 733 { 734 size_t read; 735 size_t lost; 736 size_t cnt; 737 738 read = local_read(&buffer->buffers[cpu]->pages_read); 739 lost = local_read(&buffer->buffers[cpu]->pages_lost); 740 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 741 742 if (WARN_ON_ONCE(cnt < lost)) 743 return 0; 744 745 cnt -= lost; 746 747 /* The reader can read an empty page, but not more than that */ 748 if (cnt < read) { 749 WARN_ON_ONCE(read > cnt + 1); 750 return 0; 751 } 752 753 return cnt - read; 754 } 755 756 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 757 { 758 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 759 size_t nr_pages; 760 size_t dirty; 761 762 nr_pages = cpu_buffer->nr_pages; 763 if (!nr_pages || !full) 764 return true; 765 766 /* 767 * Add one as dirty will never equal nr_pages, as the sub-buffer 768 * that the writer is on is not counted as dirty. 769 * This is needed if "buffer_percent" is set to 100. 770 */ 771 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 772 773 return (dirty * 100) >= (full * nr_pages); 774 } 775 776 /* 777 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 778 * 779 * Schedules a delayed work to wake up any task that is blocked on the 780 * ring buffer waiters queue. 781 */ 782 static void rb_wake_up_waiters(struct irq_work *work) 783 { 784 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 785 786 /* For waiters waiting for the first wake up */ 787 (void)atomic_fetch_inc_release(&rbwork->seq); 788 789 wake_up_all(&rbwork->waiters); 790 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 791 /* Only cpu_buffer sets the above flags */ 792 struct ring_buffer_per_cpu *cpu_buffer = 793 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 794 795 /* Called from interrupt context */ 796 raw_spin_lock(&cpu_buffer->reader_lock); 797 rbwork->wakeup_full = false; 798 rbwork->full_waiters_pending = false; 799 800 /* Waking up all waiters, they will reset the shortest full */ 801 cpu_buffer->shortest_full = 0; 802 raw_spin_unlock(&cpu_buffer->reader_lock); 803 804 wake_up_all(&rbwork->full_waiters); 805 } 806 } 807 808 /** 809 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 810 * @buffer: The ring buffer to wake waiters on 811 * @cpu: The CPU buffer to wake waiters on 812 * 813 * In the case of a file that represents a ring buffer is closing, 814 * it is prudent to wake up any waiters that are on this. 815 */ 816 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 817 { 818 struct ring_buffer_per_cpu *cpu_buffer; 819 struct rb_irq_work *rbwork; 820 821 if (!buffer) 822 return; 823 824 if (cpu == RING_BUFFER_ALL_CPUS) { 825 826 /* Wake up individual ones too. One level recursion */ 827 for_each_buffer_cpu(buffer, cpu) 828 ring_buffer_wake_waiters(buffer, cpu); 829 830 rbwork = &buffer->irq_work; 831 } else { 832 if (WARN_ON_ONCE(!buffer->buffers)) 833 return; 834 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 835 return; 836 837 cpu_buffer = buffer->buffers[cpu]; 838 /* The CPU buffer may not have been initialized yet */ 839 if (!cpu_buffer) 840 return; 841 rbwork = &cpu_buffer->irq_work; 842 } 843 844 /* This can be called in any context */ 845 irq_work_queue(&rbwork->work); 846 } 847 848 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 849 { 850 struct ring_buffer_per_cpu *cpu_buffer; 851 bool ret = false; 852 853 /* Reads of all CPUs always waits for any data */ 854 if (cpu == RING_BUFFER_ALL_CPUS) 855 return !ring_buffer_empty(buffer); 856 857 cpu_buffer = buffer->buffers[cpu]; 858 859 if (!ring_buffer_empty_cpu(buffer, cpu)) { 860 unsigned long flags; 861 bool pagebusy; 862 863 if (!full) 864 return true; 865 866 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 867 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 868 ret = !pagebusy && full_hit(buffer, cpu, full); 869 870 if (!ret && (!cpu_buffer->shortest_full || 871 cpu_buffer->shortest_full > full)) { 872 cpu_buffer->shortest_full = full; 873 } 874 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 875 } 876 return ret; 877 } 878 879 static inline bool 880 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 881 int cpu, int full, ring_buffer_cond_fn cond, void *data) 882 { 883 if (rb_watermark_hit(buffer, cpu, full)) 884 return true; 885 886 if (cond(data)) 887 return true; 888 889 /* 890 * The events can happen in critical sections where 891 * checking a work queue can cause deadlocks. 892 * After adding a task to the queue, this flag is set 893 * only to notify events to try to wake up the queue 894 * using irq_work. 895 * 896 * We don't clear it even if the buffer is no longer 897 * empty. The flag only causes the next event to run 898 * irq_work to do the work queue wake up. The worse 899 * that can happen if we race with !trace_empty() is that 900 * an event will cause an irq_work to try to wake up 901 * an empty queue. 902 * 903 * There's no reason to protect this flag either, as 904 * the work queue and irq_work logic will do the necessary 905 * synchronization for the wake ups. The only thing 906 * that is necessary is that the wake up happens after 907 * a task has been queued. It's OK for spurious wake ups. 908 */ 909 if (full) 910 rbwork->full_waiters_pending = true; 911 else 912 rbwork->waiters_pending = true; 913 914 return false; 915 } 916 917 struct rb_wait_data { 918 struct rb_irq_work *irq_work; 919 int seq; 920 }; 921 922 /* 923 * The default wait condition for ring_buffer_wait() is to just to exit the 924 * wait loop the first time it is woken up. 925 */ 926 static bool rb_wait_once(void *data) 927 { 928 struct rb_wait_data *rdata = data; 929 struct rb_irq_work *rbwork = rdata->irq_work; 930 931 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 932 } 933 934 /** 935 * ring_buffer_wait - wait for input to the ring buffer 936 * @buffer: buffer to wait on 937 * @cpu: the cpu buffer to wait on 938 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 939 * @cond: condition function to break out of wait (NULL to run once) 940 * @data: the data to pass to @cond. 941 * 942 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 943 * as data is added to any of the @buffer's cpu buffers. Otherwise 944 * it will wait for data to be added to a specific cpu buffer. 945 */ 946 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 947 ring_buffer_cond_fn cond, void *data) 948 { 949 struct ring_buffer_per_cpu *cpu_buffer; 950 struct wait_queue_head *waitq; 951 struct rb_irq_work *rbwork; 952 struct rb_wait_data rdata; 953 int ret = 0; 954 955 /* 956 * Depending on what the caller is waiting for, either any 957 * data in any cpu buffer, or a specific buffer, put the 958 * caller on the appropriate wait queue. 959 */ 960 if (cpu == RING_BUFFER_ALL_CPUS) { 961 rbwork = &buffer->irq_work; 962 /* Full only makes sense on per cpu reads */ 963 full = 0; 964 } else { 965 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 966 return -ENODEV; 967 cpu_buffer = buffer->buffers[cpu]; 968 rbwork = &cpu_buffer->irq_work; 969 } 970 971 if (full) 972 waitq = &rbwork->full_waiters; 973 else 974 waitq = &rbwork->waiters; 975 976 /* Set up to exit loop as soon as it is woken */ 977 if (!cond) { 978 cond = rb_wait_once; 979 rdata.irq_work = rbwork; 980 rdata.seq = atomic_read_acquire(&rbwork->seq); 981 data = &rdata; 982 } 983 984 ret = wait_event_interruptible((*waitq), 985 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 986 987 return ret; 988 } 989 990 /** 991 * ring_buffer_poll_wait - poll on buffer input 992 * @buffer: buffer to wait on 993 * @cpu: the cpu buffer to wait on 994 * @filp: the file descriptor 995 * @poll_table: The poll descriptor 996 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 997 * 998 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 999 * as data is added to any of the @buffer's cpu buffers. Otherwise 1000 * it will wait for data to be added to a specific cpu buffer. 1001 * 1002 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1003 * zero otherwise. 1004 */ 1005 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1006 struct file *filp, poll_table *poll_table, int full) 1007 { 1008 struct ring_buffer_per_cpu *cpu_buffer; 1009 struct rb_irq_work *rbwork; 1010 1011 if (cpu == RING_BUFFER_ALL_CPUS) { 1012 rbwork = &buffer->irq_work; 1013 full = 0; 1014 } else { 1015 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1016 return EPOLLERR; 1017 1018 cpu_buffer = buffer->buffers[cpu]; 1019 rbwork = &cpu_buffer->irq_work; 1020 } 1021 1022 if (full) { 1023 poll_wait(filp, &rbwork->full_waiters, poll_table); 1024 1025 if (rb_watermark_hit(buffer, cpu, full)) 1026 return EPOLLIN | EPOLLRDNORM; 1027 /* 1028 * Only allow full_waiters_pending update to be seen after 1029 * the shortest_full is set (in rb_watermark_hit). If the 1030 * writer sees the full_waiters_pending flag set, it will 1031 * compare the amount in the ring buffer to shortest_full. 1032 * If the amount in the ring buffer is greater than the 1033 * shortest_full percent, it will call the irq_work handler 1034 * to wake up this list. The irq_handler will reset shortest_full 1035 * back to zero. That's done under the reader_lock, but 1036 * the below smp_mb() makes sure that the update to 1037 * full_waiters_pending doesn't leak up into the above. 1038 */ 1039 smp_mb(); 1040 rbwork->full_waiters_pending = true; 1041 return 0; 1042 } 1043 1044 poll_wait(filp, &rbwork->waiters, poll_table); 1045 rbwork->waiters_pending = true; 1046 1047 /* 1048 * There's a tight race between setting the waiters_pending and 1049 * checking if the ring buffer is empty. Once the waiters_pending bit 1050 * is set, the next event will wake the task up, but we can get stuck 1051 * if there's only a single event in. 1052 * 1053 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1054 * but adding a memory barrier to all events will cause too much of a 1055 * performance hit in the fast path. We only need a memory barrier when 1056 * the buffer goes from empty to having content. But as this race is 1057 * extremely small, and it's not a problem if another event comes in, we 1058 * will fix it later. 1059 */ 1060 smp_mb(); 1061 1062 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1063 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1064 return EPOLLIN | EPOLLRDNORM; 1065 return 0; 1066 } 1067 1068 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1069 #define RB_WARN_ON(b, cond) \ 1070 ({ \ 1071 int _____ret = unlikely(cond); \ 1072 if (_____ret) { \ 1073 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1074 struct ring_buffer_per_cpu *__b = \ 1075 (void *)b; \ 1076 atomic_inc(&__b->buffer->record_disabled); \ 1077 } else \ 1078 atomic_inc(&b->record_disabled); \ 1079 WARN_ON(1); \ 1080 } \ 1081 _____ret; \ 1082 }) 1083 1084 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1085 #define DEBUG_SHIFT 0 1086 1087 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1088 { 1089 u64 ts; 1090 1091 /* Skip retpolines :-( */ 1092 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1093 ts = trace_clock_local(); 1094 else 1095 ts = buffer->clock(); 1096 1097 /* shift to debug/test normalization and TIME_EXTENTS */ 1098 return ts << DEBUG_SHIFT; 1099 } 1100 1101 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1102 { 1103 u64 time; 1104 1105 preempt_disable_notrace(); 1106 time = rb_time_stamp(buffer); 1107 preempt_enable_notrace(); 1108 1109 return time; 1110 } 1111 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1112 1113 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1114 int cpu, u64 *ts) 1115 { 1116 /* Just stupid testing the normalize function and deltas */ 1117 *ts >>= DEBUG_SHIFT; 1118 } 1119 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1120 1121 /* 1122 * Making the ring buffer lockless makes things tricky. 1123 * Although writes only happen on the CPU that they are on, 1124 * and they only need to worry about interrupts. Reads can 1125 * happen on any CPU. 1126 * 1127 * The reader page is always off the ring buffer, but when the 1128 * reader finishes with a page, it needs to swap its page with 1129 * a new one from the buffer. The reader needs to take from 1130 * the head (writes go to the tail). But if a writer is in overwrite 1131 * mode and wraps, it must push the head page forward. 1132 * 1133 * Here lies the problem. 1134 * 1135 * The reader must be careful to replace only the head page, and 1136 * not another one. As described at the top of the file in the 1137 * ASCII art, the reader sets its old page to point to the next 1138 * page after head. It then sets the page after head to point to 1139 * the old reader page. But if the writer moves the head page 1140 * during this operation, the reader could end up with the tail. 1141 * 1142 * We use cmpxchg to help prevent this race. We also do something 1143 * special with the page before head. We set the LSB to 1. 1144 * 1145 * When the writer must push the page forward, it will clear the 1146 * bit that points to the head page, move the head, and then set 1147 * the bit that points to the new head page. 1148 * 1149 * We also don't want an interrupt coming in and moving the head 1150 * page on another writer. Thus we use the second LSB to catch 1151 * that too. Thus: 1152 * 1153 * head->list->prev->next bit 1 bit 0 1154 * ------- ------- 1155 * Normal page 0 0 1156 * Points to head page 0 1 1157 * New head page 1 0 1158 * 1159 * Note we can not trust the prev pointer of the head page, because: 1160 * 1161 * +----+ +-----+ +-----+ 1162 * | |------>| T |---X--->| N | 1163 * | |<------| | | | 1164 * +----+ +-----+ +-----+ 1165 * ^ ^ | 1166 * | +-----+ | | 1167 * +----------| R |----------+ | 1168 * | |<-----------+ 1169 * +-----+ 1170 * 1171 * Key: ---X--> HEAD flag set in pointer 1172 * T Tail page 1173 * R Reader page 1174 * N Next page 1175 * 1176 * (see __rb_reserve_next() to see where this happens) 1177 * 1178 * What the above shows is that the reader just swapped out 1179 * the reader page with a page in the buffer, but before it 1180 * could make the new header point back to the new page added 1181 * it was preempted by a writer. The writer moved forward onto 1182 * the new page added by the reader and is about to move forward 1183 * again. 1184 * 1185 * You can see, it is legitimate for the previous pointer of 1186 * the head (or any page) not to point back to itself. But only 1187 * temporarily. 1188 */ 1189 1190 #define RB_PAGE_NORMAL 0UL 1191 #define RB_PAGE_HEAD 1UL 1192 #define RB_PAGE_UPDATE 2UL 1193 1194 1195 #define RB_FLAG_MASK 3UL 1196 1197 /* PAGE_MOVED is not part of the mask */ 1198 #define RB_PAGE_MOVED 4UL 1199 1200 /* 1201 * rb_list_head - remove any bit 1202 */ 1203 static struct list_head *rb_list_head(struct list_head *list) 1204 { 1205 unsigned long val = (unsigned long)list; 1206 1207 return (struct list_head *)(val & ~RB_FLAG_MASK); 1208 } 1209 1210 /* 1211 * rb_is_head_page - test if the given page is the head page 1212 * 1213 * Because the reader may move the head_page pointer, we can 1214 * not trust what the head page is (it may be pointing to 1215 * the reader page). But if the next page is a header page, 1216 * its flags will be non zero. 1217 */ 1218 static inline int 1219 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1220 { 1221 unsigned long val; 1222 1223 val = (unsigned long)list->next; 1224 1225 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1226 return RB_PAGE_MOVED; 1227 1228 return val & RB_FLAG_MASK; 1229 } 1230 1231 /* 1232 * rb_is_reader_page 1233 * 1234 * The unique thing about the reader page, is that, if the 1235 * writer is ever on it, the previous pointer never points 1236 * back to the reader page. 1237 */ 1238 static bool rb_is_reader_page(struct buffer_page *page) 1239 { 1240 struct list_head *list = page->list.prev; 1241 1242 return rb_list_head(list->next) != &page->list; 1243 } 1244 1245 /* 1246 * rb_set_list_to_head - set a list_head to be pointing to head. 1247 */ 1248 static void rb_set_list_to_head(struct list_head *list) 1249 { 1250 unsigned long *ptr; 1251 1252 ptr = (unsigned long *)&list->next; 1253 *ptr |= RB_PAGE_HEAD; 1254 *ptr &= ~RB_PAGE_UPDATE; 1255 } 1256 1257 /* 1258 * rb_head_page_activate - sets up head page 1259 */ 1260 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1261 { 1262 struct buffer_page *head; 1263 1264 head = cpu_buffer->head_page; 1265 if (!head) 1266 return; 1267 1268 /* 1269 * Set the previous list pointer to have the HEAD flag. 1270 */ 1271 rb_set_list_to_head(head->list.prev); 1272 1273 if (cpu_buffer->ring_meta) { 1274 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 1275 meta->head_buffer = (unsigned long)head->page; 1276 } 1277 } 1278 1279 static void rb_list_head_clear(struct list_head *list) 1280 { 1281 unsigned long *ptr = (unsigned long *)&list->next; 1282 1283 *ptr &= ~RB_FLAG_MASK; 1284 } 1285 1286 /* 1287 * rb_head_page_deactivate - clears head page ptr (for free list) 1288 */ 1289 static void 1290 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1291 { 1292 struct list_head *hd; 1293 1294 /* Go through the whole list and clear any pointers found. */ 1295 rb_list_head_clear(cpu_buffer->pages); 1296 1297 list_for_each(hd, cpu_buffer->pages) 1298 rb_list_head_clear(hd); 1299 } 1300 1301 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1302 struct buffer_page *head, 1303 struct buffer_page *prev, 1304 int old_flag, int new_flag) 1305 { 1306 struct list_head *list; 1307 unsigned long val = (unsigned long)&head->list; 1308 unsigned long ret; 1309 1310 list = &prev->list; 1311 1312 val &= ~RB_FLAG_MASK; 1313 1314 ret = cmpxchg((unsigned long *)&list->next, 1315 val | old_flag, val | new_flag); 1316 1317 /* check if the reader took the page */ 1318 if ((ret & ~RB_FLAG_MASK) != val) 1319 return RB_PAGE_MOVED; 1320 1321 return ret & RB_FLAG_MASK; 1322 } 1323 1324 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1325 struct buffer_page *head, 1326 struct buffer_page *prev, 1327 int old_flag) 1328 { 1329 return rb_head_page_set(cpu_buffer, head, prev, 1330 old_flag, RB_PAGE_UPDATE); 1331 } 1332 1333 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1334 struct buffer_page *head, 1335 struct buffer_page *prev, 1336 int old_flag) 1337 { 1338 return rb_head_page_set(cpu_buffer, head, prev, 1339 old_flag, RB_PAGE_HEAD); 1340 } 1341 1342 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1343 struct buffer_page *head, 1344 struct buffer_page *prev, 1345 int old_flag) 1346 { 1347 return rb_head_page_set(cpu_buffer, head, prev, 1348 old_flag, RB_PAGE_NORMAL); 1349 } 1350 1351 static inline void rb_inc_page(struct buffer_page **bpage) 1352 { 1353 struct list_head *p = rb_list_head((*bpage)->list.next); 1354 1355 *bpage = list_entry(p, struct buffer_page, list); 1356 } 1357 1358 static struct buffer_page * 1359 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1360 { 1361 struct buffer_page *head; 1362 struct buffer_page *page; 1363 struct list_head *list; 1364 int i; 1365 1366 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1367 return NULL; 1368 1369 /* sanity check */ 1370 list = cpu_buffer->pages; 1371 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1372 return NULL; 1373 1374 page = head = cpu_buffer->head_page; 1375 /* 1376 * It is possible that the writer moves the header behind 1377 * where we started, and we miss in one loop. 1378 * A second loop should grab the header, but we'll do 1379 * three loops just because I'm paranoid. 1380 */ 1381 for (i = 0; i < 3; i++) { 1382 do { 1383 if (rb_is_head_page(page, page->list.prev)) { 1384 cpu_buffer->head_page = page; 1385 return page; 1386 } 1387 rb_inc_page(&page); 1388 } while (page != head); 1389 } 1390 1391 RB_WARN_ON(cpu_buffer, 1); 1392 1393 return NULL; 1394 } 1395 1396 static bool rb_head_page_replace(struct buffer_page *old, 1397 struct buffer_page *new) 1398 { 1399 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1400 unsigned long val; 1401 1402 val = *ptr & ~RB_FLAG_MASK; 1403 val |= RB_PAGE_HEAD; 1404 1405 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1406 } 1407 1408 /* 1409 * rb_tail_page_update - move the tail page forward 1410 */ 1411 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1412 struct buffer_page *tail_page, 1413 struct buffer_page *next_page) 1414 { 1415 unsigned long old_entries; 1416 unsigned long old_write; 1417 1418 /* 1419 * The tail page now needs to be moved forward. 1420 * 1421 * We need to reset the tail page, but without messing 1422 * with possible erasing of data brought in by interrupts 1423 * that have moved the tail page and are currently on it. 1424 * 1425 * We add a counter to the write field to denote this. 1426 */ 1427 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1428 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1429 1430 /* 1431 * Just make sure we have seen our old_write and synchronize 1432 * with any interrupts that come in. 1433 */ 1434 barrier(); 1435 1436 /* 1437 * If the tail page is still the same as what we think 1438 * it is, then it is up to us to update the tail 1439 * pointer. 1440 */ 1441 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1442 /* Zero the write counter */ 1443 unsigned long val = old_write & ~RB_WRITE_MASK; 1444 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1445 1446 /* 1447 * This will only succeed if an interrupt did 1448 * not come in and change it. In which case, we 1449 * do not want to modify it. 1450 * 1451 * We add (void) to let the compiler know that we do not care 1452 * about the return value of these functions. We use the 1453 * cmpxchg to only update if an interrupt did not already 1454 * do it for us. If the cmpxchg fails, we don't care. 1455 */ 1456 (void)local_cmpxchg(&next_page->write, old_write, val); 1457 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1458 1459 /* 1460 * No need to worry about races with clearing out the commit. 1461 * it only can increment when a commit takes place. But that 1462 * only happens in the outer most nested commit. 1463 */ 1464 local_set(&next_page->page->commit, 0); 1465 1466 /* Either we update tail_page or an interrupt does */ 1467 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1468 local_inc(&cpu_buffer->pages_touched); 1469 } 1470 } 1471 1472 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1473 struct buffer_page *bpage) 1474 { 1475 unsigned long val = (unsigned long)bpage; 1476 1477 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1478 } 1479 1480 static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer, 1481 struct list_head *list) 1482 { 1483 if (RB_WARN_ON(cpu_buffer, 1484 rb_list_head(rb_list_head(list->next)->prev) != list)) 1485 return false; 1486 1487 if (RB_WARN_ON(cpu_buffer, 1488 rb_list_head(rb_list_head(list->prev)->next) != list)) 1489 return false; 1490 1491 return true; 1492 } 1493 1494 /** 1495 * rb_check_pages - integrity check of buffer pages 1496 * @cpu_buffer: CPU buffer with pages to test 1497 * 1498 * As a safety measure we check to make sure the data pages have not 1499 * been corrupted. 1500 */ 1501 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1502 { 1503 struct list_head *head, *tmp; 1504 unsigned long buffer_cnt; 1505 unsigned long flags; 1506 int nr_loops = 0; 1507 1508 /* 1509 * Walk the linked list underpinning the ring buffer and validate all 1510 * its next and prev links. 1511 * 1512 * The check acquires the reader_lock to avoid concurrent processing 1513 * with code that could be modifying the list. However, the lock cannot 1514 * be held for the entire duration of the walk, as this would make the 1515 * time when interrupts are disabled non-deterministic, dependent on the 1516 * ring buffer size. Therefore, the code releases and re-acquires the 1517 * lock after checking each page. The ring_buffer_per_cpu.cnt variable 1518 * is then used to detect if the list was modified while the lock was 1519 * not held, in which case the check needs to be restarted. 1520 * 1521 * The code attempts to perform the check at most three times before 1522 * giving up. This is acceptable because this is only a self-validation 1523 * to detect problems early on. In practice, the list modification 1524 * operations are fairly spaced, and so this check typically succeeds at 1525 * most on the second try. 1526 */ 1527 again: 1528 if (++nr_loops > 3) 1529 return; 1530 1531 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1532 head = rb_list_head(cpu_buffer->pages); 1533 if (!rb_check_links(cpu_buffer, head)) 1534 goto out_locked; 1535 buffer_cnt = cpu_buffer->cnt; 1536 tmp = head; 1537 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1538 1539 while (true) { 1540 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1541 1542 if (buffer_cnt != cpu_buffer->cnt) { 1543 /* The list was updated, try again. */ 1544 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1545 goto again; 1546 } 1547 1548 tmp = rb_list_head(tmp->next); 1549 if (tmp == head) 1550 /* The iteration circled back, all is done. */ 1551 goto out_locked; 1552 1553 if (!rb_check_links(cpu_buffer, tmp)) 1554 goto out_locked; 1555 1556 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1557 } 1558 1559 out_locked: 1560 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1561 } 1562 1563 /* 1564 * Take an address, add the meta data size as well as the array of 1565 * array subbuffer indexes, then align it to a subbuffer size. 1566 * 1567 * This is used to help find the next per cpu subbuffer within a mapped range. 1568 */ 1569 static unsigned long 1570 rb_range_align_subbuf(unsigned long addr, int subbuf_size, int nr_subbufs) 1571 { 1572 addr += sizeof(struct ring_buffer_meta) + 1573 sizeof(int) * nr_subbufs; 1574 return ALIGN(addr, subbuf_size); 1575 } 1576 1577 /* 1578 * Return the ring_buffer_meta for a given @cpu. 1579 */ 1580 static void *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu) 1581 { 1582 int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 1583 unsigned long ptr = buffer->range_addr_start; 1584 struct ring_buffer_meta *meta; 1585 int nr_subbufs; 1586 1587 if (!ptr) 1588 return NULL; 1589 1590 /* When nr_pages passed in is zero, the first meta has already been initialized */ 1591 if (!nr_pages) { 1592 meta = (struct ring_buffer_meta *)ptr; 1593 nr_subbufs = meta->nr_subbufs; 1594 } else { 1595 meta = NULL; 1596 /* Include the reader page */ 1597 nr_subbufs = nr_pages + 1; 1598 } 1599 1600 /* 1601 * The first chunk may not be subbuffer aligned, where as 1602 * the rest of the chunks are. 1603 */ 1604 if (cpu) { 1605 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1606 ptr += subbuf_size * nr_subbufs; 1607 1608 /* We can use multiplication to find chunks greater than 1 */ 1609 if (cpu > 1) { 1610 unsigned long size; 1611 unsigned long p; 1612 1613 /* Save the beginning of this CPU chunk */ 1614 p = ptr; 1615 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1616 ptr += subbuf_size * nr_subbufs; 1617 1618 /* Now all chunks after this are the same size */ 1619 size = ptr - p; 1620 ptr += size * (cpu - 2); 1621 } 1622 } 1623 return (void *)ptr; 1624 } 1625 1626 /* Return the start of subbufs given the meta pointer */ 1627 static void *rb_subbufs_from_meta(struct ring_buffer_meta *meta) 1628 { 1629 int subbuf_size = meta->subbuf_size; 1630 unsigned long ptr; 1631 1632 ptr = (unsigned long)meta; 1633 ptr = rb_range_align_subbuf(ptr, subbuf_size, meta->nr_subbufs); 1634 1635 return (void *)ptr; 1636 } 1637 1638 /* 1639 * Return a specific sub-buffer for a given @cpu defined by @idx. 1640 */ 1641 static void *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx) 1642 { 1643 struct ring_buffer_meta *meta; 1644 unsigned long ptr; 1645 int subbuf_size; 1646 1647 meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu); 1648 if (!meta) 1649 return NULL; 1650 1651 if (WARN_ON_ONCE(idx >= meta->nr_subbufs)) 1652 return NULL; 1653 1654 subbuf_size = meta->subbuf_size; 1655 1656 /* Map this buffer to the order that's in meta->buffers[] */ 1657 idx = meta->buffers[idx]; 1658 1659 ptr = (unsigned long)rb_subbufs_from_meta(meta); 1660 1661 ptr += subbuf_size * idx; 1662 if (ptr + subbuf_size > cpu_buffer->buffer->range_addr_end) 1663 return NULL; 1664 1665 return (void *)ptr; 1666 } 1667 1668 /* 1669 * See if the existing memory contains valid ring buffer data. 1670 * As the previous kernel must be the same as this kernel, all 1671 * the calculations (size of buffers and number of buffers) 1672 * must be the same. 1673 */ 1674 static bool rb_meta_valid(struct ring_buffer_meta *meta, int cpu, 1675 struct trace_buffer *buffer, int nr_pages, 1676 unsigned long *subbuf_mask) 1677 { 1678 int subbuf_size = PAGE_SIZE; 1679 struct buffer_data_page *subbuf; 1680 unsigned long buffers_start; 1681 unsigned long buffers_end; 1682 int i; 1683 1684 if (!subbuf_mask) 1685 return false; 1686 1687 /* Check the meta magic and meta struct size */ 1688 if (meta->magic != RING_BUFFER_META_MAGIC || 1689 meta->struct_size != sizeof(*meta)) { 1690 pr_info("Ring buffer boot meta[%d] mismatch of magic or struct size\n", cpu); 1691 return false; 1692 } 1693 1694 /* The subbuffer's size and number of subbuffers must match */ 1695 if (meta->subbuf_size != subbuf_size || 1696 meta->nr_subbufs != nr_pages + 1) { 1697 pr_info("Ring buffer boot meta [%d] mismatch of subbuf_size/nr_pages\n", cpu); 1698 return false; 1699 } 1700 1701 buffers_start = meta->first_buffer; 1702 buffers_end = meta->first_buffer + (subbuf_size * meta->nr_subbufs); 1703 1704 /* Is the head and commit buffers within the range of buffers? */ 1705 if (meta->head_buffer < buffers_start || 1706 meta->head_buffer >= buffers_end) { 1707 pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu); 1708 return false; 1709 } 1710 1711 if (meta->commit_buffer < buffers_start || 1712 meta->commit_buffer >= buffers_end) { 1713 pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu); 1714 return false; 1715 } 1716 1717 subbuf = rb_subbufs_from_meta(meta); 1718 1719 bitmap_clear(subbuf_mask, 0, meta->nr_subbufs); 1720 1721 /* Is the meta buffers and the subbufs themselves have correct data? */ 1722 for (i = 0; i < meta->nr_subbufs; i++) { 1723 if (meta->buffers[i] < 0 || 1724 meta->buffers[i] >= meta->nr_subbufs) { 1725 pr_info("Ring buffer boot meta [%d] array out of range\n", cpu); 1726 return false; 1727 } 1728 1729 if ((unsigned)local_read(&subbuf->commit) > subbuf_size) { 1730 pr_info("Ring buffer boot meta [%d] buffer invalid commit\n", cpu); 1731 return false; 1732 } 1733 1734 if (test_bit(meta->buffers[i], subbuf_mask)) { 1735 pr_info("Ring buffer boot meta [%d] array has duplicates\n", cpu); 1736 return false; 1737 } 1738 1739 set_bit(meta->buffers[i], subbuf_mask); 1740 subbuf = (void *)subbuf + subbuf_size; 1741 } 1742 1743 return true; 1744 } 1745 1746 static int rb_meta_subbuf_idx(struct ring_buffer_meta *meta, void *subbuf); 1747 1748 static int rb_read_data_buffer(struct buffer_data_page *dpage, int tail, int cpu, 1749 unsigned long long *timestamp, u64 *delta_ptr) 1750 { 1751 struct ring_buffer_event *event; 1752 u64 ts, delta; 1753 int events = 0; 1754 int e; 1755 1756 *delta_ptr = 0; 1757 *timestamp = 0; 1758 1759 ts = dpage->time_stamp; 1760 1761 for (e = 0; e < tail; e += rb_event_length(event)) { 1762 1763 event = (struct ring_buffer_event *)(dpage->data + e); 1764 1765 switch (event->type_len) { 1766 1767 case RINGBUF_TYPE_TIME_EXTEND: 1768 delta = rb_event_time_stamp(event); 1769 ts += delta; 1770 break; 1771 1772 case RINGBUF_TYPE_TIME_STAMP: 1773 delta = rb_event_time_stamp(event); 1774 delta = rb_fix_abs_ts(delta, ts); 1775 if (delta < ts) { 1776 *delta_ptr = delta; 1777 *timestamp = ts; 1778 return -1; 1779 } 1780 ts = delta; 1781 break; 1782 1783 case RINGBUF_TYPE_PADDING: 1784 if (event->time_delta == 1) 1785 break; 1786 fallthrough; 1787 case RINGBUF_TYPE_DATA: 1788 events++; 1789 ts += event->time_delta; 1790 break; 1791 1792 default: 1793 return -1; 1794 } 1795 } 1796 *timestamp = ts; 1797 return events; 1798 } 1799 1800 static int rb_validate_buffer(struct buffer_data_page *dpage, int cpu) 1801 { 1802 unsigned long long ts; 1803 u64 delta; 1804 int tail; 1805 1806 tail = local_read(&dpage->commit); 1807 return rb_read_data_buffer(dpage, tail, cpu, &ts, &delta); 1808 } 1809 1810 /* If the meta data has been validated, now validate the events */ 1811 static void rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer) 1812 { 1813 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 1814 struct buffer_page *head_page; 1815 unsigned long entry_bytes = 0; 1816 unsigned long entries = 0; 1817 int ret; 1818 int i; 1819 1820 if (!meta || !meta->head_buffer) 1821 return; 1822 1823 /* Do the reader page first */ 1824 ret = rb_validate_buffer(cpu_buffer->reader_page->page, cpu_buffer->cpu); 1825 if (ret < 0) { 1826 pr_info("Ring buffer reader page is invalid\n"); 1827 goto invalid; 1828 } 1829 entries += ret; 1830 entry_bytes += local_read(&cpu_buffer->reader_page->page->commit); 1831 local_set(&cpu_buffer->reader_page->entries, ret); 1832 1833 head_page = cpu_buffer->head_page; 1834 1835 /* If both the head and commit are on the reader_page then we are done. */ 1836 if (head_page == cpu_buffer->reader_page && 1837 head_page == cpu_buffer->commit_page) 1838 goto done; 1839 1840 /* Iterate until finding the commit page */ 1841 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) { 1842 1843 /* Reader page has already been done */ 1844 if (head_page == cpu_buffer->reader_page) 1845 continue; 1846 1847 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 1848 if (ret < 0) { 1849 pr_info("Ring buffer meta [%d] invalid buffer page\n", 1850 cpu_buffer->cpu); 1851 goto invalid; 1852 } 1853 entries += ret; 1854 entry_bytes += local_read(&head_page->page->commit); 1855 local_set(&cpu_buffer->head_page->entries, ret); 1856 1857 if (head_page == cpu_buffer->commit_page) 1858 break; 1859 } 1860 1861 if (head_page != cpu_buffer->commit_page) { 1862 pr_info("Ring buffer meta [%d] commit page not found\n", 1863 cpu_buffer->cpu); 1864 goto invalid; 1865 } 1866 done: 1867 local_set(&cpu_buffer->entries, entries); 1868 local_set(&cpu_buffer->entries_bytes, entry_bytes); 1869 1870 pr_info("Ring buffer meta [%d] is from previous boot!\n", cpu_buffer->cpu); 1871 return; 1872 1873 invalid: 1874 /* The content of the buffers are invalid, reset the meta data */ 1875 meta->head_buffer = 0; 1876 meta->commit_buffer = 0; 1877 1878 /* Reset the reader page */ 1879 local_set(&cpu_buffer->reader_page->entries, 0); 1880 local_set(&cpu_buffer->reader_page->page->commit, 0); 1881 1882 /* Reset all the subbuffers */ 1883 for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) { 1884 local_set(&head_page->entries, 0); 1885 local_set(&head_page->page->commit, 0); 1886 } 1887 } 1888 1889 /* Used to calculate data delta */ 1890 static char rb_data_ptr[] = ""; 1891 1892 #define THIS_TEXT_PTR ((unsigned long)rb_meta_init_text_addr) 1893 #define THIS_DATA_PTR ((unsigned long)rb_data_ptr) 1894 1895 static void rb_meta_init_text_addr(struct ring_buffer_meta *meta) 1896 { 1897 meta->text_addr = THIS_TEXT_PTR; 1898 meta->data_addr = THIS_DATA_PTR; 1899 } 1900 1901 static void rb_range_meta_init(struct trace_buffer *buffer, int nr_pages) 1902 { 1903 struct ring_buffer_meta *meta; 1904 unsigned long *subbuf_mask; 1905 unsigned long delta; 1906 void *subbuf; 1907 int cpu; 1908 int i; 1909 1910 /* Create a mask to test the subbuf array */ 1911 subbuf_mask = bitmap_alloc(nr_pages + 1, GFP_KERNEL); 1912 /* If subbuf_mask fails to allocate, then rb_meta_valid() will return false */ 1913 1914 for (cpu = 0; cpu < nr_cpu_ids; cpu++) { 1915 void *next_meta; 1916 1917 meta = rb_range_meta(buffer, nr_pages, cpu); 1918 1919 if (rb_meta_valid(meta, cpu, buffer, nr_pages, subbuf_mask)) { 1920 /* Make the mappings match the current address */ 1921 subbuf = rb_subbufs_from_meta(meta); 1922 delta = (unsigned long)subbuf - meta->first_buffer; 1923 meta->first_buffer += delta; 1924 meta->head_buffer += delta; 1925 meta->commit_buffer += delta; 1926 buffer->last_text_delta = THIS_TEXT_PTR - meta->text_addr; 1927 buffer->last_data_delta = THIS_DATA_PTR - meta->data_addr; 1928 continue; 1929 } 1930 1931 if (cpu < nr_cpu_ids - 1) 1932 next_meta = rb_range_meta(buffer, nr_pages, cpu + 1); 1933 else 1934 next_meta = (void *)buffer->range_addr_end; 1935 1936 memset(meta, 0, next_meta - (void *)meta); 1937 1938 meta->magic = RING_BUFFER_META_MAGIC; 1939 meta->struct_size = sizeof(*meta); 1940 1941 meta->nr_subbufs = nr_pages + 1; 1942 meta->subbuf_size = PAGE_SIZE; 1943 1944 subbuf = rb_subbufs_from_meta(meta); 1945 1946 meta->first_buffer = (unsigned long)subbuf; 1947 rb_meta_init_text_addr(meta); 1948 1949 /* 1950 * The buffers[] array holds the order of the sub-buffers 1951 * that are after the meta data. The sub-buffers may 1952 * be swapped out when read and inserted into a different 1953 * location of the ring buffer. Although their addresses 1954 * remain the same, the buffers[] array contains the 1955 * index into the sub-buffers holding their actual order. 1956 */ 1957 for (i = 0; i < meta->nr_subbufs; i++) { 1958 meta->buffers[i] = i; 1959 rb_init_page(subbuf); 1960 subbuf += meta->subbuf_size; 1961 } 1962 } 1963 bitmap_free(subbuf_mask); 1964 } 1965 1966 static void *rbm_start(struct seq_file *m, loff_t *pos) 1967 { 1968 struct ring_buffer_per_cpu *cpu_buffer = m->private; 1969 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 1970 unsigned long val; 1971 1972 if (!meta) 1973 return NULL; 1974 1975 if (*pos > meta->nr_subbufs) 1976 return NULL; 1977 1978 val = *pos; 1979 val++; 1980 1981 return (void *)val; 1982 } 1983 1984 static void *rbm_next(struct seq_file *m, void *v, loff_t *pos) 1985 { 1986 (*pos)++; 1987 1988 return rbm_start(m, pos); 1989 } 1990 1991 static int rbm_show(struct seq_file *m, void *v) 1992 { 1993 struct ring_buffer_per_cpu *cpu_buffer = m->private; 1994 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 1995 unsigned long val = (unsigned long)v; 1996 1997 if (val == 1) { 1998 seq_printf(m, "head_buffer: %d\n", 1999 rb_meta_subbuf_idx(meta, (void *)meta->head_buffer)); 2000 seq_printf(m, "commit_buffer: %d\n", 2001 rb_meta_subbuf_idx(meta, (void *)meta->commit_buffer)); 2002 seq_printf(m, "subbuf_size: %d\n", meta->subbuf_size); 2003 seq_printf(m, "nr_subbufs: %d\n", meta->nr_subbufs); 2004 return 0; 2005 } 2006 2007 val -= 2; 2008 seq_printf(m, "buffer[%ld]: %d\n", val, meta->buffers[val]); 2009 2010 return 0; 2011 } 2012 2013 static void rbm_stop(struct seq_file *m, void *p) 2014 { 2015 } 2016 2017 static const struct seq_operations rb_meta_seq_ops = { 2018 .start = rbm_start, 2019 .next = rbm_next, 2020 .show = rbm_show, 2021 .stop = rbm_stop, 2022 }; 2023 2024 int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu) 2025 { 2026 struct seq_file *m; 2027 int ret; 2028 2029 ret = seq_open(file, &rb_meta_seq_ops); 2030 if (ret) 2031 return ret; 2032 2033 m = file->private_data; 2034 m->private = buffer->buffers[cpu]; 2035 2036 return 0; 2037 } 2038 2039 /* Map the buffer_pages to the previous head and commit pages */ 2040 static void rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer, 2041 struct buffer_page *bpage) 2042 { 2043 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 2044 2045 if (meta->head_buffer == (unsigned long)bpage->page) 2046 cpu_buffer->head_page = bpage; 2047 2048 if (meta->commit_buffer == (unsigned long)bpage->page) { 2049 cpu_buffer->commit_page = bpage; 2050 cpu_buffer->tail_page = bpage; 2051 } 2052 } 2053 2054 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2055 long nr_pages, struct list_head *pages) 2056 { 2057 struct trace_buffer *buffer = cpu_buffer->buffer; 2058 struct ring_buffer_meta *meta = NULL; 2059 struct buffer_page *bpage, *tmp; 2060 bool user_thread = current->mm != NULL; 2061 gfp_t mflags; 2062 long i; 2063 2064 /* 2065 * Check if the available memory is there first. 2066 * Note, si_mem_available() only gives us a rough estimate of available 2067 * memory. It may not be accurate. But we don't care, we just want 2068 * to prevent doing any allocation when it is obvious that it is 2069 * not going to succeed. 2070 */ 2071 i = si_mem_available(); 2072 if (i < nr_pages) 2073 return -ENOMEM; 2074 2075 /* 2076 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 2077 * gracefully without invoking oom-killer and the system is not 2078 * destabilized. 2079 */ 2080 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 2081 2082 /* 2083 * If a user thread allocates too much, and si_mem_available() 2084 * reports there's enough memory, even though there is not. 2085 * Make sure the OOM killer kills this thread. This can happen 2086 * even with RETRY_MAYFAIL because another task may be doing 2087 * an allocation after this task has taken all memory. 2088 * This is the task the OOM killer needs to take out during this 2089 * loop, even if it was triggered by an allocation somewhere else. 2090 */ 2091 if (user_thread) 2092 set_current_oom_origin(); 2093 2094 if (buffer->range_addr_start) 2095 meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu); 2096 2097 for (i = 0; i < nr_pages; i++) { 2098 struct page *page; 2099 2100 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 2101 mflags, cpu_to_node(cpu_buffer->cpu)); 2102 if (!bpage) 2103 goto free_pages; 2104 2105 rb_check_bpage(cpu_buffer, bpage); 2106 2107 /* 2108 * Append the pages as for mapped buffers we want to keep 2109 * the order 2110 */ 2111 list_add_tail(&bpage->list, pages); 2112 2113 if (meta) { 2114 /* A range was given. Use that for the buffer page */ 2115 bpage->page = rb_range_buffer(cpu_buffer, i + 1); 2116 if (!bpage->page) 2117 goto free_pages; 2118 /* If this is valid from a previous boot */ 2119 if (meta->head_buffer) 2120 rb_meta_buffer_update(cpu_buffer, bpage); 2121 bpage->range = 1; 2122 bpage->id = i + 1; 2123 } else { 2124 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), 2125 mflags | __GFP_COMP | __GFP_ZERO, 2126 cpu_buffer->buffer->subbuf_order); 2127 if (!page) 2128 goto free_pages; 2129 bpage->page = page_address(page); 2130 rb_init_page(bpage->page); 2131 } 2132 bpage->order = cpu_buffer->buffer->subbuf_order; 2133 2134 if (user_thread && fatal_signal_pending(current)) 2135 goto free_pages; 2136 } 2137 if (user_thread) 2138 clear_current_oom_origin(); 2139 2140 return 0; 2141 2142 free_pages: 2143 list_for_each_entry_safe(bpage, tmp, pages, list) { 2144 list_del_init(&bpage->list); 2145 free_buffer_page(bpage); 2146 } 2147 if (user_thread) 2148 clear_current_oom_origin(); 2149 2150 return -ENOMEM; 2151 } 2152 2153 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2154 unsigned long nr_pages) 2155 { 2156 LIST_HEAD(pages); 2157 2158 WARN_ON(!nr_pages); 2159 2160 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 2161 return -ENOMEM; 2162 2163 /* 2164 * The ring buffer page list is a circular list that does not 2165 * start and end with a list head. All page list items point to 2166 * other pages. 2167 */ 2168 cpu_buffer->pages = pages.next; 2169 list_del(&pages); 2170 2171 cpu_buffer->nr_pages = nr_pages; 2172 2173 rb_check_pages(cpu_buffer); 2174 2175 return 0; 2176 } 2177 2178 static struct ring_buffer_per_cpu * 2179 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 2180 { 2181 struct ring_buffer_per_cpu *cpu_buffer; 2182 struct ring_buffer_meta *meta; 2183 struct buffer_page *bpage; 2184 struct page *page; 2185 int ret; 2186 2187 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 2188 GFP_KERNEL, cpu_to_node(cpu)); 2189 if (!cpu_buffer) 2190 return NULL; 2191 2192 cpu_buffer->cpu = cpu; 2193 cpu_buffer->buffer = buffer; 2194 raw_spin_lock_init(&cpu_buffer->reader_lock); 2195 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 2196 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 2197 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 2198 init_completion(&cpu_buffer->update_done); 2199 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 2200 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 2201 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 2202 mutex_init(&cpu_buffer->mapping_lock); 2203 2204 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 2205 GFP_KERNEL, cpu_to_node(cpu)); 2206 if (!bpage) 2207 goto fail_free_buffer; 2208 2209 rb_check_bpage(cpu_buffer, bpage); 2210 2211 cpu_buffer->reader_page = bpage; 2212 2213 if (buffer->range_addr_start) { 2214 /* 2215 * Range mapped buffers have the same restrictions as memory 2216 * mapped ones do. 2217 */ 2218 cpu_buffer->mapped = 1; 2219 cpu_buffer->ring_meta = rb_range_meta(buffer, nr_pages, cpu); 2220 bpage->page = rb_range_buffer(cpu_buffer, 0); 2221 if (!bpage->page) 2222 goto fail_free_reader; 2223 if (cpu_buffer->ring_meta->head_buffer) 2224 rb_meta_buffer_update(cpu_buffer, bpage); 2225 bpage->range = 1; 2226 } else { 2227 page = alloc_pages_node(cpu_to_node(cpu), 2228 GFP_KERNEL | __GFP_COMP | __GFP_ZERO, 2229 cpu_buffer->buffer->subbuf_order); 2230 if (!page) 2231 goto fail_free_reader; 2232 bpage->page = page_address(page); 2233 rb_init_page(bpage->page); 2234 } 2235 2236 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2237 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2238 2239 ret = rb_allocate_pages(cpu_buffer, nr_pages); 2240 if (ret < 0) 2241 goto fail_free_reader; 2242 2243 rb_meta_validate_events(cpu_buffer); 2244 2245 /* If the boot meta was valid then this has already been updated */ 2246 meta = cpu_buffer->ring_meta; 2247 if (!meta || !meta->head_buffer || 2248 !cpu_buffer->head_page || !cpu_buffer->commit_page || !cpu_buffer->tail_page) { 2249 if (meta && meta->head_buffer && 2250 (cpu_buffer->head_page || cpu_buffer->commit_page || cpu_buffer->tail_page)) { 2251 pr_warn("Ring buffer meta buffers not all mapped\n"); 2252 if (!cpu_buffer->head_page) 2253 pr_warn(" Missing head_page\n"); 2254 if (!cpu_buffer->commit_page) 2255 pr_warn(" Missing commit_page\n"); 2256 if (!cpu_buffer->tail_page) 2257 pr_warn(" Missing tail_page\n"); 2258 } 2259 2260 cpu_buffer->head_page 2261 = list_entry(cpu_buffer->pages, struct buffer_page, list); 2262 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 2263 2264 rb_head_page_activate(cpu_buffer); 2265 2266 if (cpu_buffer->ring_meta) 2267 meta->commit_buffer = meta->head_buffer; 2268 } else { 2269 /* The valid meta buffer still needs to activate the head page */ 2270 rb_head_page_activate(cpu_buffer); 2271 } 2272 2273 return cpu_buffer; 2274 2275 fail_free_reader: 2276 free_buffer_page(cpu_buffer->reader_page); 2277 2278 fail_free_buffer: 2279 kfree(cpu_buffer); 2280 return NULL; 2281 } 2282 2283 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 2284 { 2285 struct list_head *head = cpu_buffer->pages; 2286 struct buffer_page *bpage, *tmp; 2287 2288 irq_work_sync(&cpu_buffer->irq_work.work); 2289 2290 free_buffer_page(cpu_buffer->reader_page); 2291 2292 if (head) { 2293 rb_head_page_deactivate(cpu_buffer); 2294 2295 list_for_each_entry_safe(bpage, tmp, head, list) { 2296 list_del_init(&bpage->list); 2297 free_buffer_page(bpage); 2298 } 2299 bpage = list_entry(head, struct buffer_page, list); 2300 free_buffer_page(bpage); 2301 } 2302 2303 free_page((unsigned long)cpu_buffer->free_page); 2304 2305 kfree(cpu_buffer); 2306 } 2307 2308 static struct trace_buffer *alloc_buffer(unsigned long size, unsigned flags, 2309 int order, unsigned long start, 2310 unsigned long end, 2311 struct lock_class_key *key) 2312 { 2313 struct trace_buffer *buffer; 2314 long nr_pages; 2315 int subbuf_size; 2316 int bsize; 2317 int cpu; 2318 int ret; 2319 2320 /* keep it in its own cache line */ 2321 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 2322 GFP_KERNEL); 2323 if (!buffer) 2324 return NULL; 2325 2326 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 2327 goto fail_free_buffer; 2328 2329 buffer->subbuf_order = order; 2330 subbuf_size = (PAGE_SIZE << order); 2331 buffer->subbuf_size = subbuf_size - BUF_PAGE_HDR_SIZE; 2332 2333 /* Max payload is buffer page size - header (8bytes) */ 2334 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 2335 2336 buffer->flags = flags; 2337 buffer->clock = trace_clock_local; 2338 buffer->reader_lock_key = key; 2339 2340 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 2341 init_waitqueue_head(&buffer->irq_work.waiters); 2342 2343 buffer->cpus = nr_cpu_ids; 2344 2345 bsize = sizeof(void *) * nr_cpu_ids; 2346 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 2347 GFP_KERNEL); 2348 if (!buffer->buffers) 2349 goto fail_free_cpumask; 2350 2351 /* If start/end are specified, then that overrides size */ 2352 if (start && end) { 2353 unsigned long ptr; 2354 int n; 2355 2356 size = end - start; 2357 size = size / nr_cpu_ids; 2358 2359 /* 2360 * The number of sub-buffers (nr_pages) is determined by the 2361 * total size allocated minus the meta data size. 2362 * Then that is divided by the number of per CPU buffers 2363 * needed, plus account for the integer array index that 2364 * will be appended to the meta data. 2365 */ 2366 nr_pages = (size - sizeof(struct ring_buffer_meta)) / 2367 (subbuf_size + sizeof(int)); 2368 /* Need at least two pages plus the reader page */ 2369 if (nr_pages < 3) 2370 goto fail_free_buffers; 2371 2372 again: 2373 /* Make sure that the size fits aligned */ 2374 for (n = 0, ptr = start; n < nr_cpu_ids; n++) { 2375 ptr += sizeof(struct ring_buffer_meta) + 2376 sizeof(int) * nr_pages; 2377 ptr = ALIGN(ptr, subbuf_size); 2378 ptr += subbuf_size * nr_pages; 2379 } 2380 if (ptr > end) { 2381 if (nr_pages <= 3) 2382 goto fail_free_buffers; 2383 nr_pages--; 2384 goto again; 2385 } 2386 2387 /* nr_pages should not count the reader page */ 2388 nr_pages--; 2389 buffer->range_addr_start = start; 2390 buffer->range_addr_end = end; 2391 2392 rb_range_meta_init(buffer, nr_pages); 2393 } else { 2394 2395 /* need at least two pages */ 2396 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2397 if (nr_pages < 2) 2398 nr_pages = 2; 2399 } 2400 2401 cpu = raw_smp_processor_id(); 2402 cpumask_set_cpu(cpu, buffer->cpumask); 2403 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 2404 if (!buffer->buffers[cpu]) 2405 goto fail_free_buffers; 2406 2407 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2408 if (ret < 0) 2409 goto fail_free_buffers; 2410 2411 mutex_init(&buffer->mutex); 2412 2413 return buffer; 2414 2415 fail_free_buffers: 2416 for_each_buffer_cpu(buffer, cpu) { 2417 if (buffer->buffers[cpu]) 2418 rb_free_cpu_buffer(buffer->buffers[cpu]); 2419 } 2420 kfree(buffer->buffers); 2421 2422 fail_free_cpumask: 2423 free_cpumask_var(buffer->cpumask); 2424 2425 fail_free_buffer: 2426 kfree(buffer); 2427 return NULL; 2428 } 2429 2430 /** 2431 * __ring_buffer_alloc - allocate a new ring_buffer 2432 * @size: the size in bytes per cpu that is needed. 2433 * @flags: attributes to set for the ring buffer. 2434 * @key: ring buffer reader_lock_key. 2435 * 2436 * Currently the only flag that is available is the RB_FL_OVERWRITE 2437 * flag. This flag means that the buffer will overwrite old data 2438 * when the buffer wraps. If this flag is not set, the buffer will 2439 * drop data when the tail hits the head. 2440 */ 2441 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 2442 struct lock_class_key *key) 2443 { 2444 /* Default buffer page size - one system page */ 2445 return alloc_buffer(size, flags, 0, 0, 0,key); 2446 2447 } 2448 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 2449 2450 /** 2451 * __ring_buffer_alloc_range - allocate a new ring_buffer from existing memory 2452 * @size: the size in bytes per cpu that is needed. 2453 * @flags: attributes to set for the ring buffer. 2454 * @order: sub-buffer order 2455 * @start: start of allocated range 2456 * @range_size: size of allocated range 2457 * @key: ring buffer reader_lock_key. 2458 * 2459 * Currently the only flag that is available is the RB_FL_OVERWRITE 2460 * flag. This flag means that the buffer will overwrite old data 2461 * when the buffer wraps. If this flag is not set, the buffer will 2462 * drop data when the tail hits the head. 2463 */ 2464 struct trace_buffer *__ring_buffer_alloc_range(unsigned long size, unsigned flags, 2465 int order, unsigned long start, 2466 unsigned long range_size, 2467 struct lock_class_key *key) 2468 { 2469 return alloc_buffer(size, flags, order, start, start + range_size, key); 2470 } 2471 2472 /** 2473 * ring_buffer_last_boot_delta - return the delta offset from last boot 2474 * @buffer: The buffer to return the delta from 2475 * @text: Return text delta 2476 * @data: Return data delta 2477 * 2478 * Returns: The true if the delta is non zero 2479 */ 2480 bool ring_buffer_last_boot_delta(struct trace_buffer *buffer, long *text, 2481 long *data) 2482 { 2483 if (!buffer) 2484 return false; 2485 2486 if (!buffer->last_text_delta) 2487 return false; 2488 2489 *text = buffer->last_text_delta; 2490 *data = buffer->last_data_delta; 2491 2492 return true; 2493 } 2494 2495 /** 2496 * ring_buffer_free - free a ring buffer. 2497 * @buffer: the buffer to free. 2498 */ 2499 void 2500 ring_buffer_free(struct trace_buffer *buffer) 2501 { 2502 int cpu; 2503 2504 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2505 2506 irq_work_sync(&buffer->irq_work.work); 2507 2508 for_each_buffer_cpu(buffer, cpu) 2509 rb_free_cpu_buffer(buffer->buffers[cpu]); 2510 2511 kfree(buffer->buffers); 2512 free_cpumask_var(buffer->cpumask); 2513 2514 kfree(buffer); 2515 } 2516 EXPORT_SYMBOL_GPL(ring_buffer_free); 2517 2518 void ring_buffer_set_clock(struct trace_buffer *buffer, 2519 u64 (*clock)(void)) 2520 { 2521 buffer->clock = clock; 2522 } 2523 2524 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 2525 { 2526 buffer->time_stamp_abs = abs; 2527 } 2528 2529 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 2530 { 2531 return buffer->time_stamp_abs; 2532 } 2533 2534 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 2535 { 2536 return local_read(&bpage->entries) & RB_WRITE_MASK; 2537 } 2538 2539 static inline unsigned long rb_page_write(struct buffer_page *bpage) 2540 { 2541 return local_read(&bpage->write) & RB_WRITE_MASK; 2542 } 2543 2544 static bool 2545 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 2546 { 2547 struct list_head *tail_page, *to_remove, *next_page; 2548 struct buffer_page *to_remove_page, *tmp_iter_page; 2549 struct buffer_page *last_page, *first_page; 2550 unsigned long nr_removed; 2551 unsigned long head_bit; 2552 int page_entries; 2553 2554 head_bit = 0; 2555 2556 raw_spin_lock_irq(&cpu_buffer->reader_lock); 2557 atomic_inc(&cpu_buffer->record_disabled); 2558 /* 2559 * We don't race with the readers since we have acquired the reader 2560 * lock. We also don't race with writers after disabling recording. 2561 * This makes it easy to figure out the first and the last page to be 2562 * removed from the list. We unlink all the pages in between including 2563 * the first and last pages. This is done in a busy loop so that we 2564 * lose the least number of traces. 2565 * The pages are freed after we restart recording and unlock readers. 2566 */ 2567 tail_page = &cpu_buffer->tail_page->list; 2568 2569 /* 2570 * tail page might be on reader page, we remove the next page 2571 * from the ring buffer 2572 */ 2573 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 2574 tail_page = rb_list_head(tail_page->next); 2575 to_remove = tail_page; 2576 2577 /* start of pages to remove */ 2578 first_page = list_entry(rb_list_head(to_remove->next), 2579 struct buffer_page, list); 2580 2581 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 2582 to_remove = rb_list_head(to_remove)->next; 2583 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 2584 } 2585 /* Read iterators need to reset themselves when some pages removed */ 2586 cpu_buffer->pages_removed += nr_removed; 2587 2588 next_page = rb_list_head(to_remove)->next; 2589 2590 /* 2591 * Now we remove all pages between tail_page and next_page. 2592 * Make sure that we have head_bit value preserved for the 2593 * next page 2594 */ 2595 tail_page->next = (struct list_head *)((unsigned long)next_page | 2596 head_bit); 2597 next_page = rb_list_head(next_page); 2598 next_page->prev = tail_page; 2599 2600 /* make sure pages points to a valid page in the ring buffer */ 2601 cpu_buffer->pages = next_page; 2602 cpu_buffer->cnt++; 2603 2604 /* update head page */ 2605 if (head_bit) 2606 cpu_buffer->head_page = list_entry(next_page, 2607 struct buffer_page, list); 2608 2609 /* pages are removed, resume tracing and then free the pages */ 2610 atomic_dec(&cpu_buffer->record_disabled); 2611 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 2612 2613 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 2614 2615 /* last buffer page to remove */ 2616 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 2617 list); 2618 tmp_iter_page = first_page; 2619 2620 do { 2621 cond_resched(); 2622 2623 to_remove_page = tmp_iter_page; 2624 rb_inc_page(&tmp_iter_page); 2625 2626 /* update the counters */ 2627 page_entries = rb_page_entries(to_remove_page); 2628 if (page_entries) { 2629 /* 2630 * If something was added to this page, it was full 2631 * since it is not the tail page. So we deduct the 2632 * bytes consumed in ring buffer from here. 2633 * Increment overrun to account for the lost events. 2634 */ 2635 local_add(page_entries, &cpu_buffer->overrun); 2636 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 2637 local_inc(&cpu_buffer->pages_lost); 2638 } 2639 2640 /* 2641 * We have already removed references to this list item, just 2642 * free up the buffer_page and its page 2643 */ 2644 free_buffer_page(to_remove_page); 2645 nr_removed--; 2646 2647 } while (to_remove_page != last_page); 2648 2649 RB_WARN_ON(cpu_buffer, nr_removed); 2650 2651 return nr_removed == 0; 2652 } 2653 2654 static bool 2655 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2656 { 2657 struct list_head *pages = &cpu_buffer->new_pages; 2658 unsigned long flags; 2659 bool success; 2660 int retries; 2661 2662 /* Can be called at early boot up, where interrupts must not been enabled */ 2663 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2664 /* 2665 * We are holding the reader lock, so the reader page won't be swapped 2666 * in the ring buffer. Now we are racing with the writer trying to 2667 * move head page and the tail page. 2668 * We are going to adapt the reader page update process where: 2669 * 1. We first splice the start and end of list of new pages between 2670 * the head page and its previous page. 2671 * 2. We cmpxchg the prev_page->next to point from head page to the 2672 * start of new pages list. 2673 * 3. Finally, we update the head->prev to the end of new list. 2674 * 2675 * We will try this process 10 times, to make sure that we don't keep 2676 * spinning. 2677 */ 2678 retries = 10; 2679 success = false; 2680 while (retries--) { 2681 struct list_head *head_page, *prev_page; 2682 struct list_head *last_page, *first_page; 2683 struct list_head *head_page_with_bit; 2684 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2685 2686 if (!hpage) 2687 break; 2688 head_page = &hpage->list; 2689 prev_page = head_page->prev; 2690 2691 first_page = pages->next; 2692 last_page = pages->prev; 2693 2694 head_page_with_bit = (struct list_head *) 2695 ((unsigned long)head_page | RB_PAGE_HEAD); 2696 2697 last_page->next = head_page_with_bit; 2698 first_page->prev = prev_page; 2699 2700 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 2701 if (try_cmpxchg(&prev_page->next, 2702 &head_page_with_bit, first_page)) { 2703 /* 2704 * yay, we replaced the page pointer to our new list, 2705 * now, we just have to update to head page's prev 2706 * pointer to point to end of list 2707 */ 2708 head_page->prev = last_page; 2709 cpu_buffer->cnt++; 2710 success = true; 2711 break; 2712 } 2713 } 2714 2715 if (success) 2716 INIT_LIST_HEAD(pages); 2717 /* 2718 * If we weren't successful in adding in new pages, warn and stop 2719 * tracing 2720 */ 2721 RB_WARN_ON(cpu_buffer, !success); 2722 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2723 2724 /* free pages if they weren't inserted */ 2725 if (!success) { 2726 struct buffer_page *bpage, *tmp; 2727 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2728 list) { 2729 list_del_init(&bpage->list); 2730 free_buffer_page(bpage); 2731 } 2732 } 2733 return success; 2734 } 2735 2736 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2737 { 2738 bool success; 2739 2740 if (cpu_buffer->nr_pages_to_update > 0) 2741 success = rb_insert_pages(cpu_buffer); 2742 else 2743 success = rb_remove_pages(cpu_buffer, 2744 -cpu_buffer->nr_pages_to_update); 2745 2746 if (success) 2747 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2748 } 2749 2750 static void update_pages_handler(struct work_struct *work) 2751 { 2752 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2753 struct ring_buffer_per_cpu, update_pages_work); 2754 rb_update_pages(cpu_buffer); 2755 complete(&cpu_buffer->update_done); 2756 } 2757 2758 /** 2759 * ring_buffer_resize - resize the ring buffer 2760 * @buffer: the buffer to resize. 2761 * @size: the new size. 2762 * @cpu_id: the cpu buffer to resize 2763 * 2764 * Minimum size is 2 * buffer->subbuf_size. 2765 * 2766 * Returns 0 on success and < 0 on failure. 2767 */ 2768 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2769 int cpu_id) 2770 { 2771 struct ring_buffer_per_cpu *cpu_buffer; 2772 unsigned long nr_pages; 2773 int cpu, err; 2774 2775 /* 2776 * Always succeed at resizing a non-existent buffer: 2777 */ 2778 if (!buffer) 2779 return 0; 2780 2781 /* Make sure the requested buffer exists */ 2782 if (cpu_id != RING_BUFFER_ALL_CPUS && 2783 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2784 return 0; 2785 2786 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2787 2788 /* we need a minimum of two pages */ 2789 if (nr_pages < 2) 2790 nr_pages = 2; 2791 2792 /* prevent another thread from changing buffer sizes */ 2793 mutex_lock(&buffer->mutex); 2794 atomic_inc(&buffer->resizing); 2795 2796 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2797 /* 2798 * Don't succeed if resizing is disabled, as a reader might be 2799 * manipulating the ring buffer and is expecting a sane state while 2800 * this is true. 2801 */ 2802 for_each_buffer_cpu(buffer, cpu) { 2803 cpu_buffer = buffer->buffers[cpu]; 2804 if (atomic_read(&cpu_buffer->resize_disabled)) { 2805 err = -EBUSY; 2806 goto out_err_unlock; 2807 } 2808 } 2809 2810 /* calculate the pages to update */ 2811 for_each_buffer_cpu(buffer, cpu) { 2812 cpu_buffer = buffer->buffers[cpu]; 2813 2814 cpu_buffer->nr_pages_to_update = nr_pages - 2815 cpu_buffer->nr_pages; 2816 /* 2817 * nothing more to do for removing pages or no update 2818 */ 2819 if (cpu_buffer->nr_pages_to_update <= 0) 2820 continue; 2821 /* 2822 * to add pages, make sure all new pages can be 2823 * allocated without receiving ENOMEM 2824 */ 2825 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2826 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2827 &cpu_buffer->new_pages)) { 2828 /* not enough memory for new pages */ 2829 err = -ENOMEM; 2830 goto out_err; 2831 } 2832 2833 cond_resched(); 2834 } 2835 2836 cpus_read_lock(); 2837 /* 2838 * Fire off all the required work handlers 2839 * We can't schedule on offline CPUs, but it's not necessary 2840 * since we can change their buffer sizes without any race. 2841 */ 2842 for_each_buffer_cpu(buffer, cpu) { 2843 cpu_buffer = buffer->buffers[cpu]; 2844 if (!cpu_buffer->nr_pages_to_update) 2845 continue; 2846 2847 /* Can't run something on an offline CPU. */ 2848 if (!cpu_online(cpu)) { 2849 rb_update_pages(cpu_buffer); 2850 cpu_buffer->nr_pages_to_update = 0; 2851 } else { 2852 /* Run directly if possible. */ 2853 migrate_disable(); 2854 if (cpu != smp_processor_id()) { 2855 migrate_enable(); 2856 schedule_work_on(cpu, 2857 &cpu_buffer->update_pages_work); 2858 } else { 2859 update_pages_handler(&cpu_buffer->update_pages_work); 2860 migrate_enable(); 2861 } 2862 } 2863 } 2864 2865 /* wait for all the updates to complete */ 2866 for_each_buffer_cpu(buffer, cpu) { 2867 cpu_buffer = buffer->buffers[cpu]; 2868 if (!cpu_buffer->nr_pages_to_update) 2869 continue; 2870 2871 if (cpu_online(cpu)) 2872 wait_for_completion(&cpu_buffer->update_done); 2873 cpu_buffer->nr_pages_to_update = 0; 2874 } 2875 2876 cpus_read_unlock(); 2877 } else { 2878 cpu_buffer = buffer->buffers[cpu_id]; 2879 2880 if (nr_pages == cpu_buffer->nr_pages) 2881 goto out; 2882 2883 /* 2884 * Don't succeed if resizing is disabled, as a reader might be 2885 * manipulating the ring buffer and is expecting a sane state while 2886 * this is true. 2887 */ 2888 if (atomic_read(&cpu_buffer->resize_disabled)) { 2889 err = -EBUSY; 2890 goto out_err_unlock; 2891 } 2892 2893 cpu_buffer->nr_pages_to_update = nr_pages - 2894 cpu_buffer->nr_pages; 2895 2896 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2897 if (cpu_buffer->nr_pages_to_update > 0 && 2898 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2899 &cpu_buffer->new_pages)) { 2900 err = -ENOMEM; 2901 goto out_err; 2902 } 2903 2904 cpus_read_lock(); 2905 2906 /* Can't run something on an offline CPU. */ 2907 if (!cpu_online(cpu_id)) 2908 rb_update_pages(cpu_buffer); 2909 else { 2910 /* Run directly if possible. */ 2911 migrate_disable(); 2912 if (cpu_id == smp_processor_id()) { 2913 rb_update_pages(cpu_buffer); 2914 migrate_enable(); 2915 } else { 2916 migrate_enable(); 2917 schedule_work_on(cpu_id, 2918 &cpu_buffer->update_pages_work); 2919 wait_for_completion(&cpu_buffer->update_done); 2920 } 2921 } 2922 2923 cpu_buffer->nr_pages_to_update = 0; 2924 cpus_read_unlock(); 2925 } 2926 2927 out: 2928 /* 2929 * The ring buffer resize can happen with the ring buffer 2930 * enabled, so that the update disturbs the tracing as little 2931 * as possible. But if the buffer is disabled, we do not need 2932 * to worry about that, and we can take the time to verify 2933 * that the buffer is not corrupt. 2934 */ 2935 if (atomic_read(&buffer->record_disabled)) { 2936 atomic_inc(&buffer->record_disabled); 2937 /* 2938 * Even though the buffer was disabled, we must make sure 2939 * that it is truly disabled before calling rb_check_pages. 2940 * There could have been a race between checking 2941 * record_disable and incrementing it. 2942 */ 2943 synchronize_rcu(); 2944 for_each_buffer_cpu(buffer, cpu) { 2945 cpu_buffer = buffer->buffers[cpu]; 2946 rb_check_pages(cpu_buffer); 2947 } 2948 atomic_dec(&buffer->record_disabled); 2949 } 2950 2951 atomic_dec(&buffer->resizing); 2952 mutex_unlock(&buffer->mutex); 2953 return 0; 2954 2955 out_err: 2956 for_each_buffer_cpu(buffer, cpu) { 2957 struct buffer_page *bpage, *tmp; 2958 2959 cpu_buffer = buffer->buffers[cpu]; 2960 cpu_buffer->nr_pages_to_update = 0; 2961 2962 if (list_empty(&cpu_buffer->new_pages)) 2963 continue; 2964 2965 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2966 list) { 2967 list_del_init(&bpage->list); 2968 free_buffer_page(bpage); 2969 } 2970 } 2971 out_err_unlock: 2972 atomic_dec(&buffer->resizing); 2973 mutex_unlock(&buffer->mutex); 2974 return err; 2975 } 2976 EXPORT_SYMBOL_GPL(ring_buffer_resize); 2977 2978 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 2979 { 2980 mutex_lock(&buffer->mutex); 2981 if (val) 2982 buffer->flags |= RB_FL_OVERWRITE; 2983 else 2984 buffer->flags &= ~RB_FL_OVERWRITE; 2985 mutex_unlock(&buffer->mutex); 2986 } 2987 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 2988 2989 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 2990 { 2991 return bpage->page->data + index; 2992 } 2993 2994 static __always_inline struct ring_buffer_event * 2995 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 2996 { 2997 return __rb_page_index(cpu_buffer->reader_page, 2998 cpu_buffer->reader_page->read); 2999 } 3000 3001 static struct ring_buffer_event * 3002 rb_iter_head_event(struct ring_buffer_iter *iter) 3003 { 3004 struct ring_buffer_event *event; 3005 struct buffer_page *iter_head_page = iter->head_page; 3006 unsigned long commit; 3007 unsigned length; 3008 3009 if (iter->head != iter->next_event) 3010 return iter->event; 3011 3012 /* 3013 * When the writer goes across pages, it issues a cmpxchg which 3014 * is a mb(), which will synchronize with the rmb here. 3015 * (see rb_tail_page_update() and __rb_reserve_next()) 3016 */ 3017 commit = rb_page_commit(iter_head_page); 3018 smp_rmb(); 3019 3020 /* An event needs to be at least 8 bytes in size */ 3021 if (iter->head > commit - 8) 3022 goto reset; 3023 3024 event = __rb_page_index(iter_head_page, iter->head); 3025 length = rb_event_length(event); 3026 3027 /* 3028 * READ_ONCE() doesn't work on functions and we don't want the 3029 * compiler doing any crazy optimizations with length. 3030 */ 3031 barrier(); 3032 3033 if ((iter->head + length) > commit || length > iter->event_size) 3034 /* Writer corrupted the read? */ 3035 goto reset; 3036 3037 memcpy(iter->event, event, length); 3038 /* 3039 * If the page stamp is still the same after this rmb() then the 3040 * event was safely copied without the writer entering the page. 3041 */ 3042 smp_rmb(); 3043 3044 /* Make sure the page didn't change since we read this */ 3045 if (iter->page_stamp != iter_head_page->page->time_stamp || 3046 commit > rb_page_commit(iter_head_page)) 3047 goto reset; 3048 3049 iter->next_event = iter->head + length; 3050 return iter->event; 3051 reset: 3052 /* Reset to the beginning */ 3053 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3054 iter->head = 0; 3055 iter->next_event = 0; 3056 iter->missed_events = 1; 3057 return NULL; 3058 } 3059 3060 /* Size is determined by what has been committed */ 3061 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 3062 { 3063 return rb_page_commit(bpage) & ~RB_MISSED_MASK; 3064 } 3065 3066 static __always_inline unsigned 3067 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 3068 { 3069 return rb_page_commit(cpu_buffer->commit_page); 3070 } 3071 3072 static __always_inline unsigned 3073 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 3074 { 3075 unsigned long addr = (unsigned long)event; 3076 3077 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 3078 3079 return addr - BUF_PAGE_HDR_SIZE; 3080 } 3081 3082 static void rb_inc_iter(struct ring_buffer_iter *iter) 3083 { 3084 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3085 3086 /* 3087 * The iterator could be on the reader page (it starts there). 3088 * But the head could have moved, since the reader was 3089 * found. Check for this case and assign the iterator 3090 * to the head page instead of next. 3091 */ 3092 if (iter->head_page == cpu_buffer->reader_page) 3093 iter->head_page = rb_set_head_page(cpu_buffer); 3094 else 3095 rb_inc_page(&iter->head_page); 3096 3097 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3098 iter->head = 0; 3099 iter->next_event = 0; 3100 } 3101 3102 /* Return the index into the sub-buffers for a given sub-buffer */ 3103 static int rb_meta_subbuf_idx(struct ring_buffer_meta *meta, void *subbuf) 3104 { 3105 void *subbuf_array; 3106 3107 subbuf_array = (void *)meta + sizeof(int) * meta->nr_subbufs; 3108 subbuf_array = (void *)ALIGN((unsigned long)subbuf_array, meta->subbuf_size); 3109 return (subbuf - subbuf_array) / meta->subbuf_size; 3110 } 3111 3112 static void rb_update_meta_head(struct ring_buffer_per_cpu *cpu_buffer, 3113 struct buffer_page *next_page) 3114 { 3115 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 3116 unsigned long old_head = (unsigned long)next_page->page; 3117 unsigned long new_head; 3118 3119 rb_inc_page(&next_page); 3120 new_head = (unsigned long)next_page->page; 3121 3122 /* 3123 * Only move it forward once, if something else came in and 3124 * moved it forward, then we don't want to touch it. 3125 */ 3126 (void)cmpxchg(&meta->head_buffer, old_head, new_head); 3127 } 3128 3129 static void rb_update_meta_reader(struct ring_buffer_per_cpu *cpu_buffer, 3130 struct buffer_page *reader) 3131 { 3132 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 3133 void *old_reader = cpu_buffer->reader_page->page; 3134 void *new_reader = reader->page; 3135 int id; 3136 3137 id = reader->id; 3138 cpu_buffer->reader_page->id = id; 3139 reader->id = 0; 3140 3141 meta->buffers[0] = rb_meta_subbuf_idx(meta, new_reader); 3142 meta->buffers[id] = rb_meta_subbuf_idx(meta, old_reader); 3143 3144 /* The head pointer is the one after the reader */ 3145 rb_update_meta_head(cpu_buffer, reader); 3146 } 3147 3148 /* 3149 * rb_handle_head_page - writer hit the head page 3150 * 3151 * Returns: +1 to retry page 3152 * 0 to continue 3153 * -1 on error 3154 */ 3155 static int 3156 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 3157 struct buffer_page *tail_page, 3158 struct buffer_page *next_page) 3159 { 3160 struct buffer_page *new_head; 3161 int entries; 3162 int type; 3163 int ret; 3164 3165 entries = rb_page_entries(next_page); 3166 3167 /* 3168 * The hard part is here. We need to move the head 3169 * forward, and protect against both readers on 3170 * other CPUs and writers coming in via interrupts. 3171 */ 3172 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 3173 RB_PAGE_HEAD); 3174 3175 /* 3176 * type can be one of four: 3177 * NORMAL - an interrupt already moved it for us 3178 * HEAD - we are the first to get here. 3179 * UPDATE - we are the interrupt interrupting 3180 * a current move. 3181 * MOVED - a reader on another CPU moved the next 3182 * pointer to its reader page. Give up 3183 * and try again. 3184 */ 3185 3186 switch (type) { 3187 case RB_PAGE_HEAD: 3188 /* 3189 * We changed the head to UPDATE, thus 3190 * it is our responsibility to update 3191 * the counters. 3192 */ 3193 local_add(entries, &cpu_buffer->overrun); 3194 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 3195 local_inc(&cpu_buffer->pages_lost); 3196 3197 if (cpu_buffer->ring_meta) 3198 rb_update_meta_head(cpu_buffer, next_page); 3199 /* 3200 * The entries will be zeroed out when we move the 3201 * tail page. 3202 */ 3203 3204 /* still more to do */ 3205 break; 3206 3207 case RB_PAGE_UPDATE: 3208 /* 3209 * This is an interrupt that interrupt the 3210 * previous update. Still more to do. 3211 */ 3212 break; 3213 case RB_PAGE_NORMAL: 3214 /* 3215 * An interrupt came in before the update 3216 * and processed this for us. 3217 * Nothing left to do. 3218 */ 3219 return 1; 3220 case RB_PAGE_MOVED: 3221 /* 3222 * The reader is on another CPU and just did 3223 * a swap with our next_page. 3224 * Try again. 3225 */ 3226 return 1; 3227 default: 3228 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 3229 return -1; 3230 } 3231 3232 /* 3233 * Now that we are here, the old head pointer is 3234 * set to UPDATE. This will keep the reader from 3235 * swapping the head page with the reader page. 3236 * The reader (on another CPU) will spin till 3237 * we are finished. 3238 * 3239 * We just need to protect against interrupts 3240 * doing the job. We will set the next pointer 3241 * to HEAD. After that, we set the old pointer 3242 * to NORMAL, but only if it was HEAD before. 3243 * otherwise we are an interrupt, and only 3244 * want the outer most commit to reset it. 3245 */ 3246 new_head = next_page; 3247 rb_inc_page(&new_head); 3248 3249 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 3250 RB_PAGE_NORMAL); 3251 3252 /* 3253 * Valid returns are: 3254 * HEAD - an interrupt came in and already set it. 3255 * NORMAL - One of two things: 3256 * 1) We really set it. 3257 * 2) A bunch of interrupts came in and moved 3258 * the page forward again. 3259 */ 3260 switch (ret) { 3261 case RB_PAGE_HEAD: 3262 case RB_PAGE_NORMAL: 3263 /* OK */ 3264 break; 3265 default: 3266 RB_WARN_ON(cpu_buffer, 1); 3267 return -1; 3268 } 3269 3270 /* 3271 * It is possible that an interrupt came in, 3272 * set the head up, then more interrupts came in 3273 * and moved it again. When we get back here, 3274 * the page would have been set to NORMAL but we 3275 * just set it back to HEAD. 3276 * 3277 * How do you detect this? Well, if that happened 3278 * the tail page would have moved. 3279 */ 3280 if (ret == RB_PAGE_NORMAL) { 3281 struct buffer_page *buffer_tail_page; 3282 3283 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 3284 /* 3285 * If the tail had moved passed next, then we need 3286 * to reset the pointer. 3287 */ 3288 if (buffer_tail_page != tail_page && 3289 buffer_tail_page != next_page) 3290 rb_head_page_set_normal(cpu_buffer, new_head, 3291 next_page, 3292 RB_PAGE_HEAD); 3293 } 3294 3295 /* 3296 * If this was the outer most commit (the one that 3297 * changed the original pointer from HEAD to UPDATE), 3298 * then it is up to us to reset it to NORMAL. 3299 */ 3300 if (type == RB_PAGE_HEAD) { 3301 ret = rb_head_page_set_normal(cpu_buffer, next_page, 3302 tail_page, 3303 RB_PAGE_UPDATE); 3304 if (RB_WARN_ON(cpu_buffer, 3305 ret != RB_PAGE_UPDATE)) 3306 return -1; 3307 } 3308 3309 return 0; 3310 } 3311 3312 static inline void 3313 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 3314 unsigned long tail, struct rb_event_info *info) 3315 { 3316 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 3317 struct buffer_page *tail_page = info->tail_page; 3318 struct ring_buffer_event *event; 3319 unsigned long length = info->length; 3320 3321 /* 3322 * Only the event that crossed the page boundary 3323 * must fill the old tail_page with padding. 3324 */ 3325 if (tail >= bsize) { 3326 /* 3327 * If the page was filled, then we still need 3328 * to update the real_end. Reset it to zero 3329 * and the reader will ignore it. 3330 */ 3331 if (tail == bsize) 3332 tail_page->real_end = 0; 3333 3334 local_sub(length, &tail_page->write); 3335 return; 3336 } 3337 3338 event = __rb_page_index(tail_page, tail); 3339 3340 /* 3341 * Save the original length to the meta data. 3342 * This will be used by the reader to add lost event 3343 * counter. 3344 */ 3345 tail_page->real_end = tail; 3346 3347 /* 3348 * If this event is bigger than the minimum size, then 3349 * we need to be careful that we don't subtract the 3350 * write counter enough to allow another writer to slip 3351 * in on this page. 3352 * We put in a discarded commit instead, to make sure 3353 * that this space is not used again, and this space will 3354 * not be accounted into 'entries_bytes'. 3355 * 3356 * If we are less than the minimum size, we don't need to 3357 * worry about it. 3358 */ 3359 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 3360 /* No room for any events */ 3361 3362 /* Mark the rest of the page with padding */ 3363 rb_event_set_padding(event); 3364 3365 /* Make sure the padding is visible before the write update */ 3366 smp_wmb(); 3367 3368 /* Set the write back to the previous setting */ 3369 local_sub(length, &tail_page->write); 3370 return; 3371 } 3372 3373 /* Put in a discarded event */ 3374 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 3375 event->type_len = RINGBUF_TYPE_PADDING; 3376 /* time delta must be non zero */ 3377 event->time_delta = 1; 3378 3379 /* account for padding bytes */ 3380 local_add(bsize - tail, &cpu_buffer->entries_bytes); 3381 3382 /* Make sure the padding is visible before the tail_page->write update */ 3383 smp_wmb(); 3384 3385 /* Set write to end of buffer */ 3386 length = (tail + length) - bsize; 3387 local_sub(length, &tail_page->write); 3388 } 3389 3390 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 3391 3392 /* 3393 * This is the slow path, force gcc not to inline it. 3394 */ 3395 static noinline struct ring_buffer_event * 3396 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 3397 unsigned long tail, struct rb_event_info *info) 3398 { 3399 struct buffer_page *tail_page = info->tail_page; 3400 struct buffer_page *commit_page = cpu_buffer->commit_page; 3401 struct trace_buffer *buffer = cpu_buffer->buffer; 3402 struct buffer_page *next_page; 3403 int ret; 3404 3405 next_page = tail_page; 3406 3407 rb_inc_page(&next_page); 3408 3409 /* 3410 * If for some reason, we had an interrupt storm that made 3411 * it all the way around the buffer, bail, and warn 3412 * about it. 3413 */ 3414 if (unlikely(next_page == commit_page)) { 3415 local_inc(&cpu_buffer->commit_overrun); 3416 goto out_reset; 3417 } 3418 3419 /* 3420 * This is where the fun begins! 3421 * 3422 * We are fighting against races between a reader that 3423 * could be on another CPU trying to swap its reader 3424 * page with the buffer head. 3425 * 3426 * We are also fighting against interrupts coming in and 3427 * moving the head or tail on us as well. 3428 * 3429 * If the next page is the head page then we have filled 3430 * the buffer, unless the commit page is still on the 3431 * reader page. 3432 */ 3433 if (rb_is_head_page(next_page, &tail_page->list)) { 3434 3435 /* 3436 * If the commit is not on the reader page, then 3437 * move the header page. 3438 */ 3439 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 3440 /* 3441 * If we are not in overwrite mode, 3442 * this is easy, just stop here. 3443 */ 3444 if (!(buffer->flags & RB_FL_OVERWRITE)) { 3445 local_inc(&cpu_buffer->dropped_events); 3446 goto out_reset; 3447 } 3448 3449 ret = rb_handle_head_page(cpu_buffer, 3450 tail_page, 3451 next_page); 3452 if (ret < 0) 3453 goto out_reset; 3454 if (ret) 3455 goto out_again; 3456 } else { 3457 /* 3458 * We need to be careful here too. The 3459 * commit page could still be on the reader 3460 * page. We could have a small buffer, and 3461 * have filled up the buffer with events 3462 * from interrupts and such, and wrapped. 3463 * 3464 * Note, if the tail page is also on the 3465 * reader_page, we let it move out. 3466 */ 3467 if (unlikely((cpu_buffer->commit_page != 3468 cpu_buffer->tail_page) && 3469 (cpu_buffer->commit_page == 3470 cpu_buffer->reader_page))) { 3471 local_inc(&cpu_buffer->commit_overrun); 3472 goto out_reset; 3473 } 3474 } 3475 } 3476 3477 rb_tail_page_update(cpu_buffer, tail_page, next_page); 3478 3479 out_again: 3480 3481 rb_reset_tail(cpu_buffer, tail, info); 3482 3483 /* Commit what we have for now. */ 3484 rb_end_commit(cpu_buffer); 3485 /* rb_end_commit() decs committing */ 3486 local_inc(&cpu_buffer->committing); 3487 3488 /* fail and let the caller try again */ 3489 return ERR_PTR(-EAGAIN); 3490 3491 out_reset: 3492 /* reset write */ 3493 rb_reset_tail(cpu_buffer, tail, info); 3494 3495 return NULL; 3496 } 3497 3498 /* Slow path */ 3499 static struct ring_buffer_event * 3500 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3501 struct ring_buffer_event *event, u64 delta, bool abs) 3502 { 3503 if (abs) 3504 event->type_len = RINGBUF_TYPE_TIME_STAMP; 3505 else 3506 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 3507 3508 /* Not the first event on the page, or not delta? */ 3509 if (abs || rb_event_index(cpu_buffer, event)) { 3510 event->time_delta = delta & TS_MASK; 3511 event->array[0] = delta >> TS_SHIFT; 3512 } else { 3513 /* nope, just zero it */ 3514 event->time_delta = 0; 3515 event->array[0] = 0; 3516 } 3517 3518 return skip_time_extend(event); 3519 } 3520 3521 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 3522 static inline bool sched_clock_stable(void) 3523 { 3524 return true; 3525 } 3526 #endif 3527 3528 static void 3529 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3530 struct rb_event_info *info) 3531 { 3532 u64 write_stamp; 3533 3534 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 3535 (unsigned long long)info->delta, 3536 (unsigned long long)info->ts, 3537 (unsigned long long)info->before, 3538 (unsigned long long)info->after, 3539 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 3540 sched_clock_stable() ? "" : 3541 "If you just came from a suspend/resume,\n" 3542 "please switch to the trace global clock:\n" 3543 " echo global > /sys/kernel/tracing/trace_clock\n" 3544 "or add trace_clock=global to the kernel command line\n"); 3545 } 3546 3547 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3548 struct ring_buffer_event **event, 3549 struct rb_event_info *info, 3550 u64 *delta, 3551 unsigned int *length) 3552 { 3553 bool abs = info->add_timestamp & 3554 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 3555 3556 if (unlikely(info->delta > (1ULL << 59))) { 3557 /* 3558 * Some timers can use more than 59 bits, and when a timestamp 3559 * is added to the buffer, it will lose those bits. 3560 */ 3561 if (abs && (info->ts & TS_MSB)) { 3562 info->delta &= ABS_TS_MASK; 3563 3564 /* did the clock go backwards */ 3565 } else if (info->before == info->after && info->before > info->ts) { 3566 /* not interrupted */ 3567 static int once; 3568 3569 /* 3570 * This is possible with a recalibrating of the TSC. 3571 * Do not produce a call stack, but just report it. 3572 */ 3573 if (!once) { 3574 once++; 3575 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 3576 info->before, info->ts); 3577 } 3578 } else 3579 rb_check_timestamp(cpu_buffer, info); 3580 if (!abs) 3581 info->delta = 0; 3582 } 3583 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 3584 *length -= RB_LEN_TIME_EXTEND; 3585 *delta = 0; 3586 } 3587 3588 /** 3589 * rb_update_event - update event type and data 3590 * @cpu_buffer: The per cpu buffer of the @event 3591 * @event: the event to update 3592 * @info: The info to update the @event with (contains length and delta) 3593 * 3594 * Update the type and data fields of the @event. The length 3595 * is the actual size that is written to the ring buffer, 3596 * and with this, we can determine what to place into the 3597 * data field. 3598 */ 3599 static void 3600 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 3601 struct ring_buffer_event *event, 3602 struct rb_event_info *info) 3603 { 3604 unsigned length = info->length; 3605 u64 delta = info->delta; 3606 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 3607 3608 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 3609 cpu_buffer->event_stamp[nest] = info->ts; 3610 3611 /* 3612 * If we need to add a timestamp, then we 3613 * add it to the start of the reserved space. 3614 */ 3615 if (unlikely(info->add_timestamp)) 3616 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 3617 3618 event->time_delta = delta; 3619 length -= RB_EVNT_HDR_SIZE; 3620 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 3621 event->type_len = 0; 3622 event->array[0] = length; 3623 } else 3624 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 3625 } 3626 3627 static unsigned rb_calculate_event_length(unsigned length) 3628 { 3629 struct ring_buffer_event event; /* Used only for sizeof array */ 3630 3631 /* zero length can cause confusions */ 3632 if (!length) 3633 length++; 3634 3635 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 3636 length += sizeof(event.array[0]); 3637 3638 length += RB_EVNT_HDR_SIZE; 3639 length = ALIGN(length, RB_ARCH_ALIGNMENT); 3640 3641 /* 3642 * In case the time delta is larger than the 27 bits for it 3643 * in the header, we need to add a timestamp. If another 3644 * event comes in when trying to discard this one to increase 3645 * the length, then the timestamp will be added in the allocated 3646 * space of this event. If length is bigger than the size needed 3647 * for the TIME_EXTEND, then padding has to be used. The events 3648 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 3649 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 3650 * As length is a multiple of 4, we only need to worry if it 3651 * is 12 (RB_LEN_TIME_EXTEND + 4). 3652 */ 3653 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 3654 length += RB_ALIGNMENT; 3655 3656 return length; 3657 } 3658 3659 static inline bool 3660 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 3661 struct ring_buffer_event *event) 3662 { 3663 unsigned long new_index, old_index; 3664 struct buffer_page *bpage; 3665 unsigned long addr; 3666 3667 new_index = rb_event_index(cpu_buffer, event); 3668 old_index = new_index + rb_event_ts_length(event); 3669 addr = (unsigned long)event; 3670 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3671 3672 bpage = READ_ONCE(cpu_buffer->tail_page); 3673 3674 /* 3675 * Make sure the tail_page is still the same and 3676 * the next write location is the end of this event 3677 */ 3678 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3679 unsigned long write_mask = 3680 local_read(&bpage->write) & ~RB_WRITE_MASK; 3681 unsigned long event_length = rb_event_length(event); 3682 3683 /* 3684 * For the before_stamp to be different than the write_stamp 3685 * to make sure that the next event adds an absolute 3686 * value and does not rely on the saved write stamp, which 3687 * is now going to be bogus. 3688 * 3689 * By setting the before_stamp to zero, the next event 3690 * is not going to use the write_stamp and will instead 3691 * create an absolute timestamp. This means there's no 3692 * reason to update the wirte_stamp! 3693 */ 3694 rb_time_set(&cpu_buffer->before_stamp, 0); 3695 3696 /* 3697 * If an event were to come in now, it would see that the 3698 * write_stamp and the before_stamp are different, and assume 3699 * that this event just added itself before updating 3700 * the write stamp. The interrupting event will fix the 3701 * write stamp for us, and use an absolute timestamp. 3702 */ 3703 3704 /* 3705 * This is on the tail page. It is possible that 3706 * a write could come in and move the tail page 3707 * and write to the next page. That is fine 3708 * because we just shorten what is on this page. 3709 */ 3710 old_index += write_mask; 3711 new_index += write_mask; 3712 3713 /* caution: old_index gets updated on cmpxchg failure */ 3714 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 3715 /* update counters */ 3716 local_sub(event_length, &cpu_buffer->entries_bytes); 3717 return true; 3718 } 3719 } 3720 3721 /* could not discard */ 3722 return false; 3723 } 3724 3725 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3726 { 3727 local_inc(&cpu_buffer->committing); 3728 local_inc(&cpu_buffer->commits); 3729 } 3730 3731 static __always_inline void 3732 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3733 { 3734 unsigned long max_count; 3735 3736 /* 3737 * We only race with interrupts and NMIs on this CPU. 3738 * If we own the commit event, then we can commit 3739 * all others that interrupted us, since the interruptions 3740 * are in stack format (they finish before they come 3741 * back to us). This allows us to do a simple loop to 3742 * assign the commit to the tail. 3743 */ 3744 again: 3745 max_count = cpu_buffer->nr_pages * 100; 3746 3747 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3748 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3749 return; 3750 if (RB_WARN_ON(cpu_buffer, 3751 rb_is_reader_page(cpu_buffer->tail_page))) 3752 return; 3753 /* 3754 * No need for a memory barrier here, as the update 3755 * of the tail_page did it for this page. 3756 */ 3757 local_set(&cpu_buffer->commit_page->page->commit, 3758 rb_page_write(cpu_buffer->commit_page)); 3759 rb_inc_page(&cpu_buffer->commit_page); 3760 if (cpu_buffer->ring_meta) { 3761 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 3762 meta->commit_buffer = (unsigned long)cpu_buffer->commit_page->page; 3763 } 3764 /* add barrier to keep gcc from optimizing too much */ 3765 barrier(); 3766 } 3767 while (rb_commit_index(cpu_buffer) != 3768 rb_page_write(cpu_buffer->commit_page)) { 3769 3770 /* Make sure the readers see the content of what is committed. */ 3771 smp_wmb(); 3772 local_set(&cpu_buffer->commit_page->page->commit, 3773 rb_page_write(cpu_buffer->commit_page)); 3774 RB_WARN_ON(cpu_buffer, 3775 local_read(&cpu_buffer->commit_page->page->commit) & 3776 ~RB_WRITE_MASK); 3777 barrier(); 3778 } 3779 3780 /* again, keep gcc from optimizing */ 3781 barrier(); 3782 3783 /* 3784 * If an interrupt came in just after the first while loop 3785 * and pushed the tail page forward, we will be left with 3786 * a dangling commit that will never go forward. 3787 */ 3788 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3789 goto again; 3790 } 3791 3792 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3793 { 3794 unsigned long commits; 3795 3796 if (RB_WARN_ON(cpu_buffer, 3797 !local_read(&cpu_buffer->committing))) 3798 return; 3799 3800 again: 3801 commits = local_read(&cpu_buffer->commits); 3802 /* synchronize with interrupts */ 3803 barrier(); 3804 if (local_read(&cpu_buffer->committing) == 1) 3805 rb_set_commit_to_write(cpu_buffer); 3806 3807 local_dec(&cpu_buffer->committing); 3808 3809 /* synchronize with interrupts */ 3810 barrier(); 3811 3812 /* 3813 * Need to account for interrupts coming in between the 3814 * updating of the commit page and the clearing of the 3815 * committing counter. 3816 */ 3817 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3818 !local_read(&cpu_buffer->committing)) { 3819 local_inc(&cpu_buffer->committing); 3820 goto again; 3821 } 3822 } 3823 3824 static inline void rb_event_discard(struct ring_buffer_event *event) 3825 { 3826 if (extended_time(event)) 3827 event = skip_time_extend(event); 3828 3829 /* array[0] holds the actual length for the discarded event */ 3830 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 3831 event->type_len = RINGBUF_TYPE_PADDING; 3832 /* time delta must be non zero */ 3833 if (!event->time_delta) 3834 event->time_delta = 1; 3835 } 3836 3837 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 3838 { 3839 local_inc(&cpu_buffer->entries); 3840 rb_end_commit(cpu_buffer); 3841 } 3842 3843 static __always_inline void 3844 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 3845 { 3846 if (buffer->irq_work.waiters_pending) { 3847 buffer->irq_work.waiters_pending = false; 3848 /* irq_work_queue() supplies it's own memory barriers */ 3849 irq_work_queue(&buffer->irq_work.work); 3850 } 3851 3852 if (cpu_buffer->irq_work.waiters_pending) { 3853 cpu_buffer->irq_work.waiters_pending = false; 3854 /* irq_work_queue() supplies it's own memory barriers */ 3855 irq_work_queue(&cpu_buffer->irq_work.work); 3856 } 3857 3858 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 3859 return; 3860 3861 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 3862 return; 3863 3864 if (!cpu_buffer->irq_work.full_waiters_pending) 3865 return; 3866 3867 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 3868 3869 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 3870 return; 3871 3872 cpu_buffer->irq_work.wakeup_full = true; 3873 cpu_buffer->irq_work.full_waiters_pending = false; 3874 /* irq_work_queue() supplies it's own memory barriers */ 3875 irq_work_queue(&cpu_buffer->irq_work.work); 3876 } 3877 3878 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 3879 # define do_ring_buffer_record_recursion() \ 3880 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 3881 #else 3882 # define do_ring_buffer_record_recursion() do { } while (0) 3883 #endif 3884 3885 /* 3886 * The lock and unlock are done within a preempt disable section. 3887 * The current_context per_cpu variable can only be modified 3888 * by the current task between lock and unlock. But it can 3889 * be modified more than once via an interrupt. To pass this 3890 * information from the lock to the unlock without having to 3891 * access the 'in_interrupt()' functions again (which do show 3892 * a bit of overhead in something as critical as function tracing, 3893 * we use a bitmask trick. 3894 * 3895 * bit 1 = NMI context 3896 * bit 2 = IRQ context 3897 * bit 3 = SoftIRQ context 3898 * bit 4 = normal context. 3899 * 3900 * This works because this is the order of contexts that can 3901 * preempt other contexts. A SoftIRQ never preempts an IRQ 3902 * context. 3903 * 3904 * When the context is determined, the corresponding bit is 3905 * checked and set (if it was set, then a recursion of that context 3906 * happened). 3907 * 3908 * On unlock, we need to clear this bit. To do so, just subtract 3909 * 1 from the current_context and AND it to itself. 3910 * 3911 * (binary) 3912 * 101 - 1 = 100 3913 * 101 & 100 = 100 (clearing bit zero) 3914 * 3915 * 1010 - 1 = 1001 3916 * 1010 & 1001 = 1000 (clearing bit 1) 3917 * 3918 * The least significant bit can be cleared this way, and it 3919 * just so happens that it is the same bit corresponding to 3920 * the current context. 3921 * 3922 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3923 * is set when a recursion is detected at the current context, and if 3924 * the TRANSITION bit is already set, it will fail the recursion. 3925 * This is needed because there's a lag between the changing of 3926 * interrupt context and updating the preempt count. In this case, 3927 * a false positive will be found. To handle this, one extra recursion 3928 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3929 * bit is already set, then it is considered a recursion and the function 3930 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3931 * 3932 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3933 * to be cleared. Even if it wasn't the context that set it. That is, 3934 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3935 * is called before preempt_count() is updated, since the check will 3936 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3937 * NMI then comes in, it will set the NMI bit, but when the NMI code 3938 * does the trace_recursive_unlock() it will clear the TRANSITION bit 3939 * and leave the NMI bit set. But this is fine, because the interrupt 3940 * code that set the TRANSITION bit will then clear the NMI bit when it 3941 * calls trace_recursive_unlock(). If another NMI comes in, it will 3942 * set the TRANSITION bit and continue. 3943 * 3944 * Note: The TRANSITION bit only handles a single transition between context. 3945 */ 3946 3947 static __always_inline bool 3948 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 3949 { 3950 unsigned int val = cpu_buffer->current_context; 3951 int bit = interrupt_context_level(); 3952 3953 bit = RB_CTX_NORMAL - bit; 3954 3955 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 3956 /* 3957 * It is possible that this was called by transitioning 3958 * between interrupt context, and preempt_count() has not 3959 * been updated yet. In this case, use the TRANSITION bit. 3960 */ 3961 bit = RB_CTX_TRANSITION; 3962 if (val & (1 << (bit + cpu_buffer->nest))) { 3963 do_ring_buffer_record_recursion(); 3964 return true; 3965 } 3966 } 3967 3968 val |= (1 << (bit + cpu_buffer->nest)); 3969 cpu_buffer->current_context = val; 3970 3971 return false; 3972 } 3973 3974 static __always_inline void 3975 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 3976 { 3977 cpu_buffer->current_context &= 3978 cpu_buffer->current_context - (1 << cpu_buffer->nest); 3979 } 3980 3981 /* The recursive locking above uses 5 bits */ 3982 #define NESTED_BITS 5 3983 3984 /** 3985 * ring_buffer_nest_start - Allow to trace while nested 3986 * @buffer: The ring buffer to modify 3987 * 3988 * The ring buffer has a safety mechanism to prevent recursion. 3989 * But there may be a case where a trace needs to be done while 3990 * tracing something else. In this case, calling this function 3991 * will allow this function to nest within a currently active 3992 * ring_buffer_lock_reserve(). 3993 * 3994 * Call this function before calling another ring_buffer_lock_reserve() and 3995 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 3996 */ 3997 void ring_buffer_nest_start(struct trace_buffer *buffer) 3998 { 3999 struct ring_buffer_per_cpu *cpu_buffer; 4000 int cpu; 4001 4002 /* Enabled by ring_buffer_nest_end() */ 4003 preempt_disable_notrace(); 4004 cpu = raw_smp_processor_id(); 4005 cpu_buffer = buffer->buffers[cpu]; 4006 /* This is the shift value for the above recursive locking */ 4007 cpu_buffer->nest += NESTED_BITS; 4008 } 4009 4010 /** 4011 * ring_buffer_nest_end - Allow to trace while nested 4012 * @buffer: The ring buffer to modify 4013 * 4014 * Must be called after ring_buffer_nest_start() and after the 4015 * ring_buffer_unlock_commit(). 4016 */ 4017 void ring_buffer_nest_end(struct trace_buffer *buffer) 4018 { 4019 struct ring_buffer_per_cpu *cpu_buffer; 4020 int cpu; 4021 4022 /* disabled by ring_buffer_nest_start() */ 4023 cpu = raw_smp_processor_id(); 4024 cpu_buffer = buffer->buffers[cpu]; 4025 /* This is the shift value for the above recursive locking */ 4026 cpu_buffer->nest -= NESTED_BITS; 4027 preempt_enable_notrace(); 4028 } 4029 4030 /** 4031 * ring_buffer_unlock_commit - commit a reserved 4032 * @buffer: The buffer to commit to 4033 * 4034 * This commits the data to the ring buffer, and releases any locks held. 4035 * 4036 * Must be paired with ring_buffer_lock_reserve. 4037 */ 4038 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 4039 { 4040 struct ring_buffer_per_cpu *cpu_buffer; 4041 int cpu = raw_smp_processor_id(); 4042 4043 cpu_buffer = buffer->buffers[cpu]; 4044 4045 rb_commit(cpu_buffer); 4046 4047 rb_wakeups(buffer, cpu_buffer); 4048 4049 trace_recursive_unlock(cpu_buffer); 4050 4051 preempt_enable_notrace(); 4052 4053 return 0; 4054 } 4055 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 4056 4057 /* Special value to validate all deltas on a page. */ 4058 #define CHECK_FULL_PAGE 1L 4059 4060 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 4061 4062 static const char *show_irq_str(int bits) 4063 { 4064 const char *type[] = { 4065 ".", // 0 4066 "s", // 1 4067 "h", // 2 4068 "Hs", // 3 4069 "n", // 4 4070 "Ns", // 5 4071 "Nh", // 6 4072 "NHs", // 7 4073 }; 4074 4075 return type[bits]; 4076 } 4077 4078 /* Assume this is a trace event */ 4079 static const char *show_flags(struct ring_buffer_event *event) 4080 { 4081 struct trace_entry *entry; 4082 int bits = 0; 4083 4084 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4085 return "X"; 4086 4087 entry = ring_buffer_event_data(event); 4088 4089 if (entry->flags & TRACE_FLAG_SOFTIRQ) 4090 bits |= 1; 4091 4092 if (entry->flags & TRACE_FLAG_HARDIRQ) 4093 bits |= 2; 4094 4095 if (entry->flags & TRACE_FLAG_NMI) 4096 bits |= 4; 4097 4098 return show_irq_str(bits); 4099 } 4100 4101 static const char *show_irq(struct ring_buffer_event *event) 4102 { 4103 struct trace_entry *entry; 4104 4105 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4106 return ""; 4107 4108 entry = ring_buffer_event_data(event); 4109 if (entry->flags & TRACE_FLAG_IRQS_OFF) 4110 return "d"; 4111 return ""; 4112 } 4113 4114 static const char *show_interrupt_level(void) 4115 { 4116 unsigned long pc = preempt_count(); 4117 unsigned char level = 0; 4118 4119 if (pc & SOFTIRQ_OFFSET) 4120 level |= 1; 4121 4122 if (pc & HARDIRQ_MASK) 4123 level |= 2; 4124 4125 if (pc & NMI_MASK) 4126 level |= 4; 4127 4128 return show_irq_str(level); 4129 } 4130 4131 static void dump_buffer_page(struct buffer_data_page *bpage, 4132 struct rb_event_info *info, 4133 unsigned long tail) 4134 { 4135 struct ring_buffer_event *event; 4136 u64 ts, delta; 4137 int e; 4138 4139 ts = bpage->time_stamp; 4140 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 4141 4142 for (e = 0; e < tail; e += rb_event_length(event)) { 4143 4144 event = (struct ring_buffer_event *)(bpage->data + e); 4145 4146 switch (event->type_len) { 4147 4148 case RINGBUF_TYPE_TIME_EXTEND: 4149 delta = rb_event_time_stamp(event); 4150 ts += delta; 4151 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 4152 e, ts, delta); 4153 break; 4154 4155 case RINGBUF_TYPE_TIME_STAMP: 4156 delta = rb_event_time_stamp(event); 4157 ts = rb_fix_abs_ts(delta, ts); 4158 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 4159 e, ts, delta); 4160 break; 4161 4162 case RINGBUF_TYPE_PADDING: 4163 ts += event->time_delta; 4164 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 4165 e, ts, event->time_delta); 4166 break; 4167 4168 case RINGBUF_TYPE_DATA: 4169 ts += event->time_delta; 4170 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 4171 e, ts, event->time_delta, 4172 show_flags(event), show_irq(event)); 4173 break; 4174 4175 default: 4176 break; 4177 } 4178 } 4179 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 4180 } 4181 4182 static DEFINE_PER_CPU(atomic_t, checking); 4183 static atomic_t ts_dump; 4184 4185 #define buffer_warn_return(fmt, ...) \ 4186 do { \ 4187 /* If another report is happening, ignore this one */ \ 4188 if (atomic_inc_return(&ts_dump) != 1) { \ 4189 atomic_dec(&ts_dump); \ 4190 goto out; \ 4191 } \ 4192 atomic_inc(&cpu_buffer->record_disabled); \ 4193 pr_warn(fmt, ##__VA_ARGS__); \ 4194 dump_buffer_page(bpage, info, tail); \ 4195 atomic_dec(&ts_dump); \ 4196 /* There's some cases in boot up that this can happen */ \ 4197 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 4198 /* Do not re-enable checking */ \ 4199 return; \ 4200 } while (0) 4201 4202 /* 4203 * Check if the current event time stamp matches the deltas on 4204 * the buffer page. 4205 */ 4206 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4207 struct rb_event_info *info, 4208 unsigned long tail) 4209 { 4210 struct buffer_data_page *bpage; 4211 u64 ts, delta; 4212 bool full = false; 4213 int ret; 4214 4215 bpage = info->tail_page->page; 4216 4217 if (tail == CHECK_FULL_PAGE) { 4218 full = true; 4219 tail = local_read(&bpage->commit); 4220 } else if (info->add_timestamp & 4221 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 4222 /* Ignore events with absolute time stamps */ 4223 return; 4224 } 4225 4226 /* 4227 * Do not check the first event (skip possible extends too). 4228 * Also do not check if previous events have not been committed. 4229 */ 4230 if (tail <= 8 || tail > local_read(&bpage->commit)) 4231 return; 4232 4233 /* 4234 * If this interrupted another event, 4235 */ 4236 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 4237 goto out; 4238 4239 ret = rb_read_data_buffer(bpage, tail, cpu_buffer->cpu, &ts, &delta); 4240 if (ret < 0) { 4241 if (delta < ts) { 4242 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 4243 cpu_buffer->cpu, ts, delta); 4244 goto out; 4245 } 4246 } 4247 if ((full && ts > info->ts) || 4248 (!full && ts + info->delta != info->ts)) { 4249 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 4250 cpu_buffer->cpu, 4251 ts + info->delta, info->ts, info->delta, 4252 info->before, info->after, 4253 full ? " (full)" : "", show_interrupt_level()); 4254 } 4255 out: 4256 atomic_dec(this_cpu_ptr(&checking)); 4257 } 4258 #else 4259 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4260 struct rb_event_info *info, 4261 unsigned long tail) 4262 { 4263 } 4264 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 4265 4266 static struct ring_buffer_event * 4267 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 4268 struct rb_event_info *info) 4269 { 4270 struct ring_buffer_event *event; 4271 struct buffer_page *tail_page; 4272 unsigned long tail, write, w; 4273 4274 /* Don't let the compiler play games with cpu_buffer->tail_page */ 4275 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 4276 4277 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 4278 barrier(); 4279 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4280 rb_time_read(&cpu_buffer->write_stamp, &info->after); 4281 barrier(); 4282 info->ts = rb_time_stamp(cpu_buffer->buffer); 4283 4284 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 4285 info->delta = info->ts; 4286 } else { 4287 /* 4288 * If interrupting an event time update, we may need an 4289 * absolute timestamp. 4290 * Don't bother if this is the start of a new page (w == 0). 4291 */ 4292 if (!w) { 4293 /* Use the sub-buffer timestamp */ 4294 info->delta = 0; 4295 } else if (unlikely(info->before != info->after)) { 4296 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 4297 info->length += RB_LEN_TIME_EXTEND; 4298 } else { 4299 info->delta = info->ts - info->after; 4300 if (unlikely(test_time_stamp(info->delta))) { 4301 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 4302 info->length += RB_LEN_TIME_EXTEND; 4303 } 4304 } 4305 } 4306 4307 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 4308 4309 /*C*/ write = local_add_return(info->length, &tail_page->write); 4310 4311 /* set write to only the index of the write */ 4312 write &= RB_WRITE_MASK; 4313 4314 tail = write - info->length; 4315 4316 /* See if we shot pass the end of this buffer page */ 4317 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 4318 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 4319 return rb_move_tail(cpu_buffer, tail, info); 4320 } 4321 4322 if (likely(tail == w)) { 4323 /* Nothing interrupted us between A and C */ 4324 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 4325 /* 4326 * If something came in between C and D, the write stamp 4327 * may now not be in sync. But that's fine as the before_stamp 4328 * will be different and then next event will just be forced 4329 * to use an absolute timestamp. 4330 */ 4331 if (likely(!(info->add_timestamp & 4332 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4333 /* This did not interrupt any time update */ 4334 info->delta = info->ts - info->after; 4335 else 4336 /* Just use full timestamp for interrupting event */ 4337 info->delta = info->ts; 4338 check_buffer(cpu_buffer, info, tail); 4339 } else { 4340 u64 ts; 4341 /* SLOW PATH - Interrupted between A and C */ 4342 4343 /* Save the old before_stamp */ 4344 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4345 4346 /* 4347 * Read a new timestamp and update the before_stamp to make 4348 * the next event after this one force using an absolute 4349 * timestamp. This is in case an interrupt were to come in 4350 * between E and F. 4351 */ 4352 ts = rb_time_stamp(cpu_buffer->buffer); 4353 rb_time_set(&cpu_buffer->before_stamp, ts); 4354 4355 barrier(); 4356 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 4357 barrier(); 4358 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 4359 info->after == info->before && info->after < ts) { 4360 /* 4361 * Nothing came after this event between C and F, it is 4362 * safe to use info->after for the delta as it 4363 * matched info->before and is still valid. 4364 */ 4365 info->delta = ts - info->after; 4366 } else { 4367 /* 4368 * Interrupted between C and F: 4369 * Lost the previous events time stamp. Just set the 4370 * delta to zero, and this will be the same time as 4371 * the event this event interrupted. And the events that 4372 * came after this will still be correct (as they would 4373 * have built their delta on the previous event. 4374 */ 4375 info->delta = 0; 4376 } 4377 info->ts = ts; 4378 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 4379 } 4380 4381 /* 4382 * If this is the first commit on the page, then it has the same 4383 * timestamp as the page itself. 4384 */ 4385 if (unlikely(!tail && !(info->add_timestamp & 4386 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4387 info->delta = 0; 4388 4389 /* We reserved something on the buffer */ 4390 4391 event = __rb_page_index(tail_page, tail); 4392 rb_update_event(cpu_buffer, event, info); 4393 4394 local_inc(&tail_page->entries); 4395 4396 /* 4397 * If this is the first commit on the page, then update 4398 * its timestamp. 4399 */ 4400 if (unlikely(!tail)) 4401 tail_page->page->time_stamp = info->ts; 4402 4403 /* account for these added bytes */ 4404 local_add(info->length, &cpu_buffer->entries_bytes); 4405 4406 return event; 4407 } 4408 4409 static __always_inline struct ring_buffer_event * 4410 rb_reserve_next_event(struct trace_buffer *buffer, 4411 struct ring_buffer_per_cpu *cpu_buffer, 4412 unsigned long length) 4413 { 4414 struct ring_buffer_event *event; 4415 struct rb_event_info info; 4416 int nr_loops = 0; 4417 int add_ts_default; 4418 4419 /* 4420 * ring buffer does cmpxchg as well as atomic64 operations 4421 * (which some archs use locking for atomic64), make sure this 4422 * is safe in NMI context 4423 */ 4424 if ((!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) || 4425 IS_ENABLED(CONFIG_GENERIC_ATOMIC64)) && 4426 (unlikely(in_nmi()))) { 4427 return NULL; 4428 } 4429 4430 rb_start_commit(cpu_buffer); 4431 /* The commit page can not change after this */ 4432 4433 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4434 /* 4435 * Due to the ability to swap a cpu buffer from a buffer 4436 * it is possible it was swapped before we committed. 4437 * (committing stops a swap). We check for it here and 4438 * if it happened, we have to fail the write. 4439 */ 4440 barrier(); 4441 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 4442 local_dec(&cpu_buffer->committing); 4443 local_dec(&cpu_buffer->commits); 4444 return NULL; 4445 } 4446 #endif 4447 4448 info.length = rb_calculate_event_length(length); 4449 4450 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 4451 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 4452 info.length += RB_LEN_TIME_EXTEND; 4453 if (info.length > cpu_buffer->buffer->max_data_size) 4454 goto out_fail; 4455 } else { 4456 add_ts_default = RB_ADD_STAMP_NONE; 4457 } 4458 4459 again: 4460 info.add_timestamp = add_ts_default; 4461 info.delta = 0; 4462 4463 /* 4464 * We allow for interrupts to reenter here and do a trace. 4465 * If one does, it will cause this original code to loop 4466 * back here. Even with heavy interrupts happening, this 4467 * should only happen a few times in a row. If this happens 4468 * 1000 times in a row, there must be either an interrupt 4469 * storm or we have something buggy. 4470 * Bail! 4471 */ 4472 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 4473 goto out_fail; 4474 4475 event = __rb_reserve_next(cpu_buffer, &info); 4476 4477 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 4478 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 4479 info.length -= RB_LEN_TIME_EXTEND; 4480 goto again; 4481 } 4482 4483 if (likely(event)) 4484 return event; 4485 out_fail: 4486 rb_end_commit(cpu_buffer); 4487 return NULL; 4488 } 4489 4490 /** 4491 * ring_buffer_lock_reserve - reserve a part of the buffer 4492 * @buffer: the ring buffer to reserve from 4493 * @length: the length of the data to reserve (excluding event header) 4494 * 4495 * Returns a reserved event on the ring buffer to copy directly to. 4496 * The user of this interface will need to get the body to write into 4497 * and can use the ring_buffer_event_data() interface. 4498 * 4499 * The length is the length of the data needed, not the event length 4500 * which also includes the event header. 4501 * 4502 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 4503 * If NULL is returned, then nothing has been allocated or locked. 4504 */ 4505 struct ring_buffer_event * 4506 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 4507 { 4508 struct ring_buffer_per_cpu *cpu_buffer; 4509 struct ring_buffer_event *event; 4510 int cpu; 4511 4512 /* If we are tracing schedule, we don't want to recurse */ 4513 preempt_disable_notrace(); 4514 4515 if (unlikely(atomic_read(&buffer->record_disabled))) 4516 goto out; 4517 4518 cpu = raw_smp_processor_id(); 4519 4520 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 4521 goto out; 4522 4523 cpu_buffer = buffer->buffers[cpu]; 4524 4525 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 4526 goto out; 4527 4528 if (unlikely(length > buffer->max_data_size)) 4529 goto out; 4530 4531 if (unlikely(trace_recursive_lock(cpu_buffer))) 4532 goto out; 4533 4534 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4535 if (!event) 4536 goto out_unlock; 4537 4538 return event; 4539 4540 out_unlock: 4541 trace_recursive_unlock(cpu_buffer); 4542 out: 4543 preempt_enable_notrace(); 4544 return NULL; 4545 } 4546 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 4547 4548 /* 4549 * Decrement the entries to the page that an event is on. 4550 * The event does not even need to exist, only the pointer 4551 * to the page it is on. This may only be called before the commit 4552 * takes place. 4553 */ 4554 static inline void 4555 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 4556 struct ring_buffer_event *event) 4557 { 4558 unsigned long addr = (unsigned long)event; 4559 struct buffer_page *bpage = cpu_buffer->commit_page; 4560 struct buffer_page *start; 4561 4562 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 4563 4564 /* Do the likely case first */ 4565 if (likely(bpage->page == (void *)addr)) { 4566 local_dec(&bpage->entries); 4567 return; 4568 } 4569 4570 /* 4571 * Because the commit page may be on the reader page we 4572 * start with the next page and check the end loop there. 4573 */ 4574 rb_inc_page(&bpage); 4575 start = bpage; 4576 do { 4577 if (bpage->page == (void *)addr) { 4578 local_dec(&bpage->entries); 4579 return; 4580 } 4581 rb_inc_page(&bpage); 4582 } while (bpage != start); 4583 4584 /* commit not part of this buffer?? */ 4585 RB_WARN_ON(cpu_buffer, 1); 4586 } 4587 4588 /** 4589 * ring_buffer_discard_commit - discard an event that has not been committed 4590 * @buffer: the ring buffer 4591 * @event: non committed event to discard 4592 * 4593 * Sometimes an event that is in the ring buffer needs to be ignored. 4594 * This function lets the user discard an event in the ring buffer 4595 * and then that event will not be read later. 4596 * 4597 * This function only works if it is called before the item has been 4598 * committed. It will try to free the event from the ring buffer 4599 * if another event has not been added behind it. 4600 * 4601 * If another event has been added behind it, it will set the event 4602 * up as discarded, and perform the commit. 4603 * 4604 * If this function is called, do not call ring_buffer_unlock_commit on 4605 * the event. 4606 */ 4607 void ring_buffer_discard_commit(struct trace_buffer *buffer, 4608 struct ring_buffer_event *event) 4609 { 4610 struct ring_buffer_per_cpu *cpu_buffer; 4611 int cpu; 4612 4613 /* The event is discarded regardless */ 4614 rb_event_discard(event); 4615 4616 cpu = smp_processor_id(); 4617 cpu_buffer = buffer->buffers[cpu]; 4618 4619 /* 4620 * This must only be called if the event has not been 4621 * committed yet. Thus we can assume that preemption 4622 * is still disabled. 4623 */ 4624 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 4625 4626 rb_decrement_entry(cpu_buffer, event); 4627 if (rb_try_to_discard(cpu_buffer, event)) 4628 goto out; 4629 4630 out: 4631 rb_end_commit(cpu_buffer); 4632 4633 trace_recursive_unlock(cpu_buffer); 4634 4635 preempt_enable_notrace(); 4636 4637 } 4638 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 4639 4640 /** 4641 * ring_buffer_write - write data to the buffer without reserving 4642 * @buffer: The ring buffer to write to. 4643 * @length: The length of the data being written (excluding the event header) 4644 * @data: The data to write to the buffer. 4645 * 4646 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 4647 * one function. If you already have the data to write to the buffer, it 4648 * may be easier to simply call this function. 4649 * 4650 * Note, like ring_buffer_lock_reserve, the length is the length of the data 4651 * and not the length of the event which would hold the header. 4652 */ 4653 int ring_buffer_write(struct trace_buffer *buffer, 4654 unsigned long length, 4655 void *data) 4656 { 4657 struct ring_buffer_per_cpu *cpu_buffer; 4658 struct ring_buffer_event *event; 4659 void *body; 4660 int ret = -EBUSY; 4661 int cpu; 4662 4663 preempt_disable_notrace(); 4664 4665 if (atomic_read(&buffer->record_disabled)) 4666 goto out; 4667 4668 cpu = raw_smp_processor_id(); 4669 4670 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4671 goto out; 4672 4673 cpu_buffer = buffer->buffers[cpu]; 4674 4675 if (atomic_read(&cpu_buffer->record_disabled)) 4676 goto out; 4677 4678 if (length > buffer->max_data_size) 4679 goto out; 4680 4681 if (unlikely(trace_recursive_lock(cpu_buffer))) 4682 goto out; 4683 4684 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4685 if (!event) 4686 goto out_unlock; 4687 4688 body = rb_event_data(event); 4689 4690 memcpy(body, data, length); 4691 4692 rb_commit(cpu_buffer); 4693 4694 rb_wakeups(buffer, cpu_buffer); 4695 4696 ret = 0; 4697 4698 out_unlock: 4699 trace_recursive_unlock(cpu_buffer); 4700 4701 out: 4702 preempt_enable_notrace(); 4703 4704 return ret; 4705 } 4706 EXPORT_SYMBOL_GPL(ring_buffer_write); 4707 4708 /* 4709 * The total entries in the ring buffer is the running counter 4710 * of entries entered into the ring buffer, minus the sum of 4711 * the entries read from the ring buffer and the number of 4712 * entries that were overwritten. 4713 */ 4714 static inline unsigned long 4715 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4716 { 4717 return local_read(&cpu_buffer->entries) - 4718 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4719 } 4720 4721 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 4722 { 4723 return !rb_num_of_entries(cpu_buffer); 4724 } 4725 4726 /** 4727 * ring_buffer_record_disable - stop all writes into the buffer 4728 * @buffer: The ring buffer to stop writes to. 4729 * 4730 * This prevents all writes to the buffer. Any attempt to write 4731 * to the buffer after this will fail and return NULL. 4732 * 4733 * The caller should call synchronize_rcu() after this. 4734 */ 4735 void ring_buffer_record_disable(struct trace_buffer *buffer) 4736 { 4737 atomic_inc(&buffer->record_disabled); 4738 } 4739 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4740 4741 /** 4742 * ring_buffer_record_enable - enable writes to the buffer 4743 * @buffer: The ring buffer to enable writes 4744 * 4745 * Note, multiple disables will need the same number of enables 4746 * to truly enable the writing (much like preempt_disable). 4747 */ 4748 void ring_buffer_record_enable(struct trace_buffer *buffer) 4749 { 4750 atomic_dec(&buffer->record_disabled); 4751 } 4752 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4753 4754 /** 4755 * ring_buffer_record_off - stop all writes into the buffer 4756 * @buffer: The ring buffer to stop writes to. 4757 * 4758 * This prevents all writes to the buffer. Any attempt to write 4759 * to the buffer after this will fail and return NULL. 4760 * 4761 * This is different than ring_buffer_record_disable() as 4762 * it works like an on/off switch, where as the disable() version 4763 * must be paired with a enable(). 4764 */ 4765 void ring_buffer_record_off(struct trace_buffer *buffer) 4766 { 4767 unsigned int rd; 4768 unsigned int new_rd; 4769 4770 rd = atomic_read(&buffer->record_disabled); 4771 do { 4772 new_rd = rd | RB_BUFFER_OFF; 4773 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4774 } 4775 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4776 4777 /** 4778 * ring_buffer_record_on - restart writes into the buffer 4779 * @buffer: The ring buffer to start writes to. 4780 * 4781 * This enables all writes to the buffer that was disabled by 4782 * ring_buffer_record_off(). 4783 * 4784 * This is different than ring_buffer_record_enable() as 4785 * it works like an on/off switch, where as the enable() version 4786 * must be paired with a disable(). 4787 */ 4788 void ring_buffer_record_on(struct trace_buffer *buffer) 4789 { 4790 unsigned int rd; 4791 unsigned int new_rd; 4792 4793 rd = atomic_read(&buffer->record_disabled); 4794 do { 4795 new_rd = rd & ~RB_BUFFER_OFF; 4796 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4797 } 4798 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4799 4800 /** 4801 * ring_buffer_record_is_on - return true if the ring buffer can write 4802 * @buffer: The ring buffer to see if write is enabled 4803 * 4804 * Returns true if the ring buffer is in a state that it accepts writes. 4805 */ 4806 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4807 { 4808 return !atomic_read(&buffer->record_disabled); 4809 } 4810 4811 /** 4812 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4813 * @buffer: The ring buffer to see if write is set enabled 4814 * 4815 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4816 * Note that this does NOT mean it is in a writable state. 4817 * 4818 * It may return true when the ring buffer has been disabled by 4819 * ring_buffer_record_disable(), as that is a temporary disabling of 4820 * the ring buffer. 4821 */ 4822 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4823 { 4824 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4825 } 4826 4827 /** 4828 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4829 * @buffer: The ring buffer to stop writes to. 4830 * @cpu: The CPU buffer to stop 4831 * 4832 * This prevents all writes to the buffer. Any attempt to write 4833 * to the buffer after this will fail and return NULL. 4834 * 4835 * The caller should call synchronize_rcu() after this. 4836 */ 4837 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 4838 { 4839 struct ring_buffer_per_cpu *cpu_buffer; 4840 4841 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4842 return; 4843 4844 cpu_buffer = buffer->buffers[cpu]; 4845 atomic_inc(&cpu_buffer->record_disabled); 4846 } 4847 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 4848 4849 /** 4850 * ring_buffer_record_enable_cpu - enable writes to the buffer 4851 * @buffer: The ring buffer to enable writes 4852 * @cpu: The CPU to enable. 4853 * 4854 * Note, multiple disables will need the same number of enables 4855 * to truly enable the writing (much like preempt_disable). 4856 */ 4857 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 4858 { 4859 struct ring_buffer_per_cpu *cpu_buffer; 4860 4861 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4862 return; 4863 4864 cpu_buffer = buffer->buffers[cpu]; 4865 atomic_dec(&cpu_buffer->record_disabled); 4866 } 4867 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 4868 4869 /** 4870 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 4871 * @buffer: The ring buffer 4872 * @cpu: The per CPU buffer to read from. 4873 */ 4874 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 4875 { 4876 unsigned long flags; 4877 struct ring_buffer_per_cpu *cpu_buffer; 4878 struct buffer_page *bpage; 4879 u64 ret = 0; 4880 4881 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4882 return 0; 4883 4884 cpu_buffer = buffer->buffers[cpu]; 4885 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4886 /* 4887 * if the tail is on reader_page, oldest time stamp is on the reader 4888 * page 4889 */ 4890 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 4891 bpage = cpu_buffer->reader_page; 4892 else 4893 bpage = rb_set_head_page(cpu_buffer); 4894 if (bpage) 4895 ret = bpage->page->time_stamp; 4896 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4897 4898 return ret; 4899 } 4900 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 4901 4902 /** 4903 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 4904 * @buffer: The ring buffer 4905 * @cpu: The per CPU buffer to read from. 4906 */ 4907 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 4908 { 4909 struct ring_buffer_per_cpu *cpu_buffer; 4910 unsigned long ret; 4911 4912 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4913 return 0; 4914 4915 cpu_buffer = buffer->buffers[cpu]; 4916 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 4917 4918 return ret; 4919 } 4920 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 4921 4922 /** 4923 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 4924 * @buffer: The ring buffer 4925 * @cpu: The per CPU buffer to get the entries from. 4926 */ 4927 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 4928 { 4929 struct ring_buffer_per_cpu *cpu_buffer; 4930 4931 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4932 return 0; 4933 4934 cpu_buffer = buffer->buffers[cpu]; 4935 4936 return rb_num_of_entries(cpu_buffer); 4937 } 4938 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 4939 4940 /** 4941 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 4942 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 4943 * @buffer: The ring buffer 4944 * @cpu: The per CPU buffer to get the number of overruns from 4945 */ 4946 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 4947 { 4948 struct ring_buffer_per_cpu *cpu_buffer; 4949 unsigned long ret; 4950 4951 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4952 return 0; 4953 4954 cpu_buffer = buffer->buffers[cpu]; 4955 ret = local_read(&cpu_buffer->overrun); 4956 4957 return ret; 4958 } 4959 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 4960 4961 /** 4962 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 4963 * commits failing due to the buffer wrapping around while there are uncommitted 4964 * events, such as during an interrupt storm. 4965 * @buffer: The ring buffer 4966 * @cpu: The per CPU buffer to get the number of overruns from 4967 */ 4968 unsigned long 4969 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 4970 { 4971 struct ring_buffer_per_cpu *cpu_buffer; 4972 unsigned long ret; 4973 4974 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4975 return 0; 4976 4977 cpu_buffer = buffer->buffers[cpu]; 4978 ret = local_read(&cpu_buffer->commit_overrun); 4979 4980 return ret; 4981 } 4982 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 4983 4984 /** 4985 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 4986 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 4987 * @buffer: The ring buffer 4988 * @cpu: The per CPU buffer to get the number of overruns from 4989 */ 4990 unsigned long 4991 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 4992 { 4993 struct ring_buffer_per_cpu *cpu_buffer; 4994 unsigned long ret; 4995 4996 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4997 return 0; 4998 4999 cpu_buffer = buffer->buffers[cpu]; 5000 ret = local_read(&cpu_buffer->dropped_events); 5001 5002 return ret; 5003 } 5004 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 5005 5006 /** 5007 * ring_buffer_read_events_cpu - get the number of events successfully read 5008 * @buffer: The ring buffer 5009 * @cpu: The per CPU buffer to get the number of events read 5010 */ 5011 unsigned long 5012 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 5013 { 5014 struct ring_buffer_per_cpu *cpu_buffer; 5015 5016 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5017 return 0; 5018 5019 cpu_buffer = buffer->buffers[cpu]; 5020 return cpu_buffer->read; 5021 } 5022 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 5023 5024 /** 5025 * ring_buffer_entries - get the number of entries in a buffer 5026 * @buffer: The ring buffer 5027 * 5028 * Returns the total number of entries in the ring buffer 5029 * (all CPU entries) 5030 */ 5031 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 5032 { 5033 struct ring_buffer_per_cpu *cpu_buffer; 5034 unsigned long entries = 0; 5035 int cpu; 5036 5037 /* if you care about this being correct, lock the buffer */ 5038 for_each_buffer_cpu(buffer, cpu) { 5039 cpu_buffer = buffer->buffers[cpu]; 5040 entries += rb_num_of_entries(cpu_buffer); 5041 } 5042 5043 return entries; 5044 } 5045 EXPORT_SYMBOL_GPL(ring_buffer_entries); 5046 5047 /** 5048 * ring_buffer_overruns - get the number of overruns in buffer 5049 * @buffer: The ring buffer 5050 * 5051 * Returns the total number of overruns in the ring buffer 5052 * (all CPU entries) 5053 */ 5054 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 5055 { 5056 struct ring_buffer_per_cpu *cpu_buffer; 5057 unsigned long overruns = 0; 5058 int cpu; 5059 5060 /* if you care about this being correct, lock the buffer */ 5061 for_each_buffer_cpu(buffer, cpu) { 5062 cpu_buffer = buffer->buffers[cpu]; 5063 overruns += local_read(&cpu_buffer->overrun); 5064 } 5065 5066 return overruns; 5067 } 5068 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 5069 5070 static void rb_iter_reset(struct ring_buffer_iter *iter) 5071 { 5072 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5073 5074 /* Iterator usage is expected to have record disabled */ 5075 iter->head_page = cpu_buffer->reader_page; 5076 iter->head = cpu_buffer->reader_page->read; 5077 iter->next_event = iter->head; 5078 5079 iter->cache_reader_page = iter->head_page; 5080 iter->cache_read = cpu_buffer->read; 5081 iter->cache_pages_removed = cpu_buffer->pages_removed; 5082 5083 if (iter->head) { 5084 iter->read_stamp = cpu_buffer->read_stamp; 5085 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 5086 } else { 5087 iter->read_stamp = iter->head_page->page->time_stamp; 5088 iter->page_stamp = iter->read_stamp; 5089 } 5090 } 5091 5092 /** 5093 * ring_buffer_iter_reset - reset an iterator 5094 * @iter: The iterator to reset 5095 * 5096 * Resets the iterator, so that it will start from the beginning 5097 * again. 5098 */ 5099 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 5100 { 5101 struct ring_buffer_per_cpu *cpu_buffer; 5102 unsigned long flags; 5103 5104 if (!iter) 5105 return; 5106 5107 cpu_buffer = iter->cpu_buffer; 5108 5109 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5110 rb_iter_reset(iter); 5111 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5112 } 5113 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 5114 5115 /** 5116 * ring_buffer_iter_empty - check if an iterator has no more to read 5117 * @iter: The iterator to check 5118 */ 5119 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 5120 { 5121 struct ring_buffer_per_cpu *cpu_buffer; 5122 struct buffer_page *reader; 5123 struct buffer_page *head_page; 5124 struct buffer_page *commit_page; 5125 struct buffer_page *curr_commit_page; 5126 unsigned commit; 5127 u64 curr_commit_ts; 5128 u64 commit_ts; 5129 5130 cpu_buffer = iter->cpu_buffer; 5131 reader = cpu_buffer->reader_page; 5132 head_page = cpu_buffer->head_page; 5133 commit_page = READ_ONCE(cpu_buffer->commit_page); 5134 commit_ts = commit_page->page->time_stamp; 5135 5136 /* 5137 * When the writer goes across pages, it issues a cmpxchg which 5138 * is a mb(), which will synchronize with the rmb here. 5139 * (see rb_tail_page_update()) 5140 */ 5141 smp_rmb(); 5142 commit = rb_page_commit(commit_page); 5143 /* We want to make sure that the commit page doesn't change */ 5144 smp_rmb(); 5145 5146 /* Make sure commit page didn't change */ 5147 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 5148 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 5149 5150 /* If the commit page changed, then there's more data */ 5151 if (curr_commit_page != commit_page || 5152 curr_commit_ts != commit_ts) 5153 return 0; 5154 5155 /* Still racy, as it may return a false positive, but that's OK */ 5156 return ((iter->head_page == commit_page && iter->head >= commit) || 5157 (iter->head_page == reader && commit_page == head_page && 5158 head_page->read == commit && 5159 iter->head == rb_page_size(cpu_buffer->reader_page))); 5160 } 5161 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 5162 5163 static void 5164 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 5165 struct ring_buffer_event *event) 5166 { 5167 u64 delta; 5168 5169 switch (event->type_len) { 5170 case RINGBUF_TYPE_PADDING: 5171 return; 5172 5173 case RINGBUF_TYPE_TIME_EXTEND: 5174 delta = rb_event_time_stamp(event); 5175 cpu_buffer->read_stamp += delta; 5176 return; 5177 5178 case RINGBUF_TYPE_TIME_STAMP: 5179 delta = rb_event_time_stamp(event); 5180 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 5181 cpu_buffer->read_stamp = delta; 5182 return; 5183 5184 case RINGBUF_TYPE_DATA: 5185 cpu_buffer->read_stamp += event->time_delta; 5186 return; 5187 5188 default: 5189 RB_WARN_ON(cpu_buffer, 1); 5190 } 5191 } 5192 5193 static void 5194 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 5195 struct ring_buffer_event *event) 5196 { 5197 u64 delta; 5198 5199 switch (event->type_len) { 5200 case RINGBUF_TYPE_PADDING: 5201 return; 5202 5203 case RINGBUF_TYPE_TIME_EXTEND: 5204 delta = rb_event_time_stamp(event); 5205 iter->read_stamp += delta; 5206 return; 5207 5208 case RINGBUF_TYPE_TIME_STAMP: 5209 delta = rb_event_time_stamp(event); 5210 delta = rb_fix_abs_ts(delta, iter->read_stamp); 5211 iter->read_stamp = delta; 5212 return; 5213 5214 case RINGBUF_TYPE_DATA: 5215 iter->read_stamp += event->time_delta; 5216 return; 5217 5218 default: 5219 RB_WARN_ON(iter->cpu_buffer, 1); 5220 } 5221 } 5222 5223 static struct buffer_page * 5224 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5225 { 5226 struct buffer_page *reader = NULL; 5227 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 5228 unsigned long overwrite; 5229 unsigned long flags; 5230 int nr_loops = 0; 5231 bool ret; 5232 5233 local_irq_save(flags); 5234 arch_spin_lock(&cpu_buffer->lock); 5235 5236 again: 5237 /* 5238 * This should normally only loop twice. But because the 5239 * start of the reader inserts an empty page, it causes 5240 * a case where we will loop three times. There should be no 5241 * reason to loop four times (that I know of). 5242 */ 5243 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 5244 reader = NULL; 5245 goto out; 5246 } 5247 5248 reader = cpu_buffer->reader_page; 5249 5250 /* If there's more to read, return this page */ 5251 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 5252 goto out; 5253 5254 /* Never should we have an index greater than the size */ 5255 if (RB_WARN_ON(cpu_buffer, 5256 cpu_buffer->reader_page->read > rb_page_size(reader))) 5257 goto out; 5258 5259 /* check if we caught up to the tail */ 5260 reader = NULL; 5261 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 5262 goto out; 5263 5264 /* Don't bother swapping if the ring buffer is empty */ 5265 if (rb_num_of_entries(cpu_buffer) == 0) 5266 goto out; 5267 5268 /* 5269 * Reset the reader page to size zero. 5270 */ 5271 local_set(&cpu_buffer->reader_page->write, 0); 5272 local_set(&cpu_buffer->reader_page->entries, 0); 5273 local_set(&cpu_buffer->reader_page->page->commit, 0); 5274 cpu_buffer->reader_page->real_end = 0; 5275 5276 spin: 5277 /* 5278 * Splice the empty reader page into the list around the head. 5279 */ 5280 reader = rb_set_head_page(cpu_buffer); 5281 if (!reader) 5282 goto out; 5283 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 5284 cpu_buffer->reader_page->list.prev = reader->list.prev; 5285 5286 /* 5287 * cpu_buffer->pages just needs to point to the buffer, it 5288 * has no specific buffer page to point to. Lets move it out 5289 * of our way so we don't accidentally swap it. 5290 */ 5291 cpu_buffer->pages = reader->list.prev; 5292 5293 /* The reader page will be pointing to the new head */ 5294 rb_set_list_to_head(&cpu_buffer->reader_page->list); 5295 5296 /* 5297 * We want to make sure we read the overruns after we set up our 5298 * pointers to the next object. The writer side does a 5299 * cmpxchg to cross pages which acts as the mb on the writer 5300 * side. Note, the reader will constantly fail the swap 5301 * while the writer is updating the pointers, so this 5302 * guarantees that the overwrite recorded here is the one we 5303 * want to compare with the last_overrun. 5304 */ 5305 smp_mb(); 5306 overwrite = local_read(&(cpu_buffer->overrun)); 5307 5308 /* 5309 * Here's the tricky part. 5310 * 5311 * We need to move the pointer past the header page. 5312 * But we can only do that if a writer is not currently 5313 * moving it. The page before the header page has the 5314 * flag bit '1' set if it is pointing to the page we want. 5315 * but if the writer is in the process of moving it 5316 * than it will be '2' or already moved '0'. 5317 */ 5318 5319 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 5320 5321 /* 5322 * If we did not convert it, then we must try again. 5323 */ 5324 if (!ret) 5325 goto spin; 5326 5327 if (cpu_buffer->ring_meta) 5328 rb_update_meta_reader(cpu_buffer, reader); 5329 5330 /* 5331 * Yay! We succeeded in replacing the page. 5332 * 5333 * Now make the new head point back to the reader page. 5334 */ 5335 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 5336 rb_inc_page(&cpu_buffer->head_page); 5337 5338 cpu_buffer->cnt++; 5339 local_inc(&cpu_buffer->pages_read); 5340 5341 /* Finally update the reader page to the new head */ 5342 cpu_buffer->reader_page = reader; 5343 cpu_buffer->reader_page->read = 0; 5344 5345 if (overwrite != cpu_buffer->last_overrun) { 5346 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 5347 cpu_buffer->last_overrun = overwrite; 5348 } 5349 5350 goto again; 5351 5352 out: 5353 /* Update the read_stamp on the first event */ 5354 if (reader && reader->read == 0) 5355 cpu_buffer->read_stamp = reader->page->time_stamp; 5356 5357 arch_spin_unlock(&cpu_buffer->lock); 5358 local_irq_restore(flags); 5359 5360 /* 5361 * The writer has preempt disable, wait for it. But not forever 5362 * Although, 1 second is pretty much "forever" 5363 */ 5364 #define USECS_WAIT 1000000 5365 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 5366 /* If the write is past the end of page, a writer is still updating it */ 5367 if (likely(!reader || rb_page_write(reader) <= bsize)) 5368 break; 5369 5370 udelay(1); 5371 5372 /* Get the latest version of the reader write value */ 5373 smp_rmb(); 5374 } 5375 5376 /* The writer is not moving forward? Something is wrong */ 5377 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 5378 reader = NULL; 5379 5380 /* 5381 * Make sure we see any padding after the write update 5382 * (see rb_reset_tail()). 5383 * 5384 * In addition, a writer may be writing on the reader page 5385 * if the page has not been fully filled, so the read barrier 5386 * is also needed to make sure we see the content of what is 5387 * committed by the writer (see rb_set_commit_to_write()). 5388 */ 5389 smp_rmb(); 5390 5391 5392 return reader; 5393 } 5394 5395 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 5396 { 5397 struct ring_buffer_event *event; 5398 struct buffer_page *reader; 5399 unsigned length; 5400 5401 reader = rb_get_reader_page(cpu_buffer); 5402 5403 /* This function should not be called when buffer is empty */ 5404 if (RB_WARN_ON(cpu_buffer, !reader)) 5405 return; 5406 5407 event = rb_reader_event(cpu_buffer); 5408 5409 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 5410 cpu_buffer->read++; 5411 5412 rb_update_read_stamp(cpu_buffer, event); 5413 5414 length = rb_event_length(event); 5415 cpu_buffer->reader_page->read += length; 5416 cpu_buffer->read_bytes += length; 5417 } 5418 5419 static void rb_advance_iter(struct ring_buffer_iter *iter) 5420 { 5421 struct ring_buffer_per_cpu *cpu_buffer; 5422 5423 cpu_buffer = iter->cpu_buffer; 5424 5425 /* If head == next_event then we need to jump to the next event */ 5426 if (iter->head == iter->next_event) { 5427 /* If the event gets overwritten again, there's nothing to do */ 5428 if (rb_iter_head_event(iter) == NULL) 5429 return; 5430 } 5431 5432 iter->head = iter->next_event; 5433 5434 /* 5435 * Check if we are at the end of the buffer. 5436 */ 5437 if (iter->next_event >= rb_page_size(iter->head_page)) { 5438 /* discarded commits can make the page empty */ 5439 if (iter->head_page == cpu_buffer->commit_page) 5440 return; 5441 rb_inc_iter(iter); 5442 return; 5443 } 5444 5445 rb_update_iter_read_stamp(iter, iter->event); 5446 } 5447 5448 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 5449 { 5450 return cpu_buffer->lost_events; 5451 } 5452 5453 static struct ring_buffer_event * 5454 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 5455 unsigned long *lost_events) 5456 { 5457 struct ring_buffer_event *event; 5458 struct buffer_page *reader; 5459 int nr_loops = 0; 5460 5461 if (ts) 5462 *ts = 0; 5463 again: 5464 /* 5465 * We repeat when a time extend is encountered. 5466 * Since the time extend is always attached to a data event, 5467 * we should never loop more than once. 5468 * (We never hit the following condition more than twice). 5469 */ 5470 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 5471 return NULL; 5472 5473 reader = rb_get_reader_page(cpu_buffer); 5474 if (!reader) 5475 return NULL; 5476 5477 event = rb_reader_event(cpu_buffer); 5478 5479 switch (event->type_len) { 5480 case RINGBUF_TYPE_PADDING: 5481 if (rb_null_event(event)) 5482 RB_WARN_ON(cpu_buffer, 1); 5483 /* 5484 * Because the writer could be discarding every 5485 * event it creates (which would probably be bad) 5486 * if we were to go back to "again" then we may never 5487 * catch up, and will trigger the warn on, or lock 5488 * the box. Return the padding, and we will release 5489 * the current locks, and try again. 5490 */ 5491 return event; 5492 5493 case RINGBUF_TYPE_TIME_EXTEND: 5494 /* Internal data, OK to advance */ 5495 rb_advance_reader(cpu_buffer); 5496 goto again; 5497 5498 case RINGBUF_TYPE_TIME_STAMP: 5499 if (ts) { 5500 *ts = rb_event_time_stamp(event); 5501 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 5502 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5503 cpu_buffer->cpu, ts); 5504 } 5505 /* Internal data, OK to advance */ 5506 rb_advance_reader(cpu_buffer); 5507 goto again; 5508 5509 case RINGBUF_TYPE_DATA: 5510 if (ts && !(*ts)) { 5511 *ts = cpu_buffer->read_stamp + event->time_delta; 5512 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5513 cpu_buffer->cpu, ts); 5514 } 5515 if (lost_events) 5516 *lost_events = rb_lost_events(cpu_buffer); 5517 return event; 5518 5519 default: 5520 RB_WARN_ON(cpu_buffer, 1); 5521 } 5522 5523 return NULL; 5524 } 5525 EXPORT_SYMBOL_GPL(ring_buffer_peek); 5526 5527 static struct ring_buffer_event * 5528 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5529 { 5530 struct trace_buffer *buffer; 5531 struct ring_buffer_per_cpu *cpu_buffer; 5532 struct ring_buffer_event *event; 5533 int nr_loops = 0; 5534 5535 if (ts) 5536 *ts = 0; 5537 5538 cpu_buffer = iter->cpu_buffer; 5539 buffer = cpu_buffer->buffer; 5540 5541 /* 5542 * Check if someone performed a consuming read to the buffer 5543 * or removed some pages from the buffer. In these cases, 5544 * iterator was invalidated and we need to reset it. 5545 */ 5546 if (unlikely(iter->cache_read != cpu_buffer->read || 5547 iter->cache_reader_page != cpu_buffer->reader_page || 5548 iter->cache_pages_removed != cpu_buffer->pages_removed)) 5549 rb_iter_reset(iter); 5550 5551 again: 5552 if (ring_buffer_iter_empty(iter)) 5553 return NULL; 5554 5555 /* 5556 * As the writer can mess with what the iterator is trying 5557 * to read, just give up if we fail to get an event after 5558 * three tries. The iterator is not as reliable when reading 5559 * the ring buffer with an active write as the consumer is. 5560 * Do not warn if the three failures is reached. 5561 */ 5562 if (++nr_loops > 3) 5563 return NULL; 5564 5565 if (rb_per_cpu_empty(cpu_buffer)) 5566 return NULL; 5567 5568 if (iter->head >= rb_page_size(iter->head_page)) { 5569 rb_inc_iter(iter); 5570 goto again; 5571 } 5572 5573 event = rb_iter_head_event(iter); 5574 if (!event) 5575 goto again; 5576 5577 switch (event->type_len) { 5578 case RINGBUF_TYPE_PADDING: 5579 if (rb_null_event(event)) { 5580 rb_inc_iter(iter); 5581 goto again; 5582 } 5583 rb_advance_iter(iter); 5584 return event; 5585 5586 case RINGBUF_TYPE_TIME_EXTEND: 5587 /* Internal data, OK to advance */ 5588 rb_advance_iter(iter); 5589 goto again; 5590 5591 case RINGBUF_TYPE_TIME_STAMP: 5592 if (ts) { 5593 *ts = rb_event_time_stamp(event); 5594 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 5595 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5596 cpu_buffer->cpu, ts); 5597 } 5598 /* Internal data, OK to advance */ 5599 rb_advance_iter(iter); 5600 goto again; 5601 5602 case RINGBUF_TYPE_DATA: 5603 if (ts && !(*ts)) { 5604 *ts = iter->read_stamp + event->time_delta; 5605 ring_buffer_normalize_time_stamp(buffer, 5606 cpu_buffer->cpu, ts); 5607 } 5608 return event; 5609 5610 default: 5611 RB_WARN_ON(cpu_buffer, 1); 5612 } 5613 5614 return NULL; 5615 } 5616 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 5617 5618 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 5619 { 5620 if (likely(!in_nmi())) { 5621 raw_spin_lock(&cpu_buffer->reader_lock); 5622 return true; 5623 } 5624 5625 /* 5626 * If an NMI die dumps out the content of the ring buffer 5627 * trylock must be used to prevent a deadlock if the NMI 5628 * preempted a task that holds the ring buffer locks. If 5629 * we get the lock then all is fine, if not, then continue 5630 * to do the read, but this can corrupt the ring buffer, 5631 * so it must be permanently disabled from future writes. 5632 * Reading from NMI is a oneshot deal. 5633 */ 5634 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 5635 return true; 5636 5637 /* Continue without locking, but disable the ring buffer */ 5638 atomic_inc(&cpu_buffer->record_disabled); 5639 return false; 5640 } 5641 5642 static inline void 5643 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 5644 { 5645 if (likely(locked)) 5646 raw_spin_unlock(&cpu_buffer->reader_lock); 5647 } 5648 5649 /** 5650 * ring_buffer_peek - peek at the next event to be read 5651 * @buffer: The ring buffer to read 5652 * @cpu: The cpu to peak at 5653 * @ts: The timestamp counter of this event. 5654 * @lost_events: a variable to store if events were lost (may be NULL) 5655 * 5656 * This will return the event that will be read next, but does 5657 * not consume the data. 5658 */ 5659 struct ring_buffer_event * 5660 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 5661 unsigned long *lost_events) 5662 { 5663 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5664 struct ring_buffer_event *event; 5665 unsigned long flags; 5666 bool dolock; 5667 5668 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5669 return NULL; 5670 5671 again: 5672 local_irq_save(flags); 5673 dolock = rb_reader_lock(cpu_buffer); 5674 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5675 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5676 rb_advance_reader(cpu_buffer); 5677 rb_reader_unlock(cpu_buffer, dolock); 5678 local_irq_restore(flags); 5679 5680 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5681 goto again; 5682 5683 return event; 5684 } 5685 5686 /** ring_buffer_iter_dropped - report if there are dropped events 5687 * @iter: The ring buffer iterator 5688 * 5689 * Returns true if there was dropped events since the last peek. 5690 */ 5691 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 5692 { 5693 bool ret = iter->missed_events != 0; 5694 5695 iter->missed_events = 0; 5696 return ret; 5697 } 5698 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 5699 5700 /** 5701 * ring_buffer_iter_peek - peek at the next event to be read 5702 * @iter: The ring buffer iterator 5703 * @ts: The timestamp counter of this event. 5704 * 5705 * This will return the event that will be read next, but does 5706 * not increment the iterator. 5707 */ 5708 struct ring_buffer_event * 5709 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5710 { 5711 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5712 struct ring_buffer_event *event; 5713 unsigned long flags; 5714 5715 again: 5716 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5717 event = rb_iter_peek(iter, ts); 5718 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5719 5720 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5721 goto again; 5722 5723 return event; 5724 } 5725 5726 /** 5727 * ring_buffer_consume - return an event and consume it 5728 * @buffer: The ring buffer to get the next event from 5729 * @cpu: the cpu to read the buffer from 5730 * @ts: a variable to store the timestamp (may be NULL) 5731 * @lost_events: a variable to store if events were lost (may be NULL) 5732 * 5733 * Returns the next event in the ring buffer, and that event is consumed. 5734 * Meaning, that sequential reads will keep returning a different event, 5735 * and eventually empty the ring buffer if the producer is slower. 5736 */ 5737 struct ring_buffer_event * 5738 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5739 unsigned long *lost_events) 5740 { 5741 struct ring_buffer_per_cpu *cpu_buffer; 5742 struct ring_buffer_event *event = NULL; 5743 unsigned long flags; 5744 bool dolock; 5745 5746 again: 5747 /* might be called in atomic */ 5748 preempt_disable(); 5749 5750 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5751 goto out; 5752 5753 cpu_buffer = buffer->buffers[cpu]; 5754 local_irq_save(flags); 5755 dolock = rb_reader_lock(cpu_buffer); 5756 5757 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5758 if (event) { 5759 cpu_buffer->lost_events = 0; 5760 rb_advance_reader(cpu_buffer); 5761 } 5762 5763 rb_reader_unlock(cpu_buffer, dolock); 5764 local_irq_restore(flags); 5765 5766 out: 5767 preempt_enable(); 5768 5769 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5770 goto again; 5771 5772 return event; 5773 } 5774 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5775 5776 /** 5777 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 5778 * @buffer: The ring buffer to read from 5779 * @cpu: The cpu buffer to iterate over 5780 * @flags: gfp flags to use for memory allocation 5781 * 5782 * This performs the initial preparations necessary to iterate 5783 * through the buffer. Memory is allocated, buffer resizing 5784 * is disabled, and the iterator pointer is returned to the caller. 5785 * 5786 * After a sequence of ring_buffer_read_prepare calls, the user is 5787 * expected to make at least one call to ring_buffer_read_prepare_sync. 5788 * Afterwards, ring_buffer_read_start is invoked to get things going 5789 * for real. 5790 * 5791 * This overall must be paired with ring_buffer_read_finish. 5792 */ 5793 struct ring_buffer_iter * 5794 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 5795 { 5796 struct ring_buffer_per_cpu *cpu_buffer; 5797 struct ring_buffer_iter *iter; 5798 5799 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5800 return NULL; 5801 5802 iter = kzalloc(sizeof(*iter), flags); 5803 if (!iter) 5804 return NULL; 5805 5806 /* Holds the entire event: data and meta data */ 5807 iter->event_size = buffer->subbuf_size; 5808 iter->event = kmalloc(iter->event_size, flags); 5809 if (!iter->event) { 5810 kfree(iter); 5811 return NULL; 5812 } 5813 5814 cpu_buffer = buffer->buffers[cpu]; 5815 5816 iter->cpu_buffer = cpu_buffer; 5817 5818 atomic_inc(&cpu_buffer->resize_disabled); 5819 5820 return iter; 5821 } 5822 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 5823 5824 /** 5825 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 5826 * 5827 * All previously invoked ring_buffer_read_prepare calls to prepare 5828 * iterators will be synchronized. Afterwards, read_buffer_read_start 5829 * calls on those iterators are allowed. 5830 */ 5831 void 5832 ring_buffer_read_prepare_sync(void) 5833 { 5834 synchronize_rcu(); 5835 } 5836 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 5837 5838 /** 5839 * ring_buffer_read_start - start a non consuming read of the buffer 5840 * @iter: The iterator returned by ring_buffer_read_prepare 5841 * 5842 * This finalizes the startup of an iteration through the buffer. 5843 * The iterator comes from a call to ring_buffer_read_prepare and 5844 * an intervening ring_buffer_read_prepare_sync must have been 5845 * performed. 5846 * 5847 * Must be paired with ring_buffer_read_finish. 5848 */ 5849 void 5850 ring_buffer_read_start(struct ring_buffer_iter *iter) 5851 { 5852 struct ring_buffer_per_cpu *cpu_buffer; 5853 unsigned long flags; 5854 5855 if (!iter) 5856 return; 5857 5858 cpu_buffer = iter->cpu_buffer; 5859 5860 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5861 arch_spin_lock(&cpu_buffer->lock); 5862 rb_iter_reset(iter); 5863 arch_spin_unlock(&cpu_buffer->lock); 5864 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5865 } 5866 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5867 5868 /** 5869 * ring_buffer_read_finish - finish reading the iterator of the buffer 5870 * @iter: The iterator retrieved by ring_buffer_start 5871 * 5872 * This re-enables resizing of the buffer, and frees the iterator. 5873 */ 5874 void 5875 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5876 { 5877 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5878 5879 /* Use this opportunity to check the integrity of the ring buffer. */ 5880 rb_check_pages(cpu_buffer); 5881 5882 atomic_dec(&cpu_buffer->resize_disabled); 5883 kfree(iter->event); 5884 kfree(iter); 5885 } 5886 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 5887 5888 /** 5889 * ring_buffer_iter_advance - advance the iterator to the next location 5890 * @iter: The ring buffer iterator 5891 * 5892 * Move the location of the iterator such that the next read will 5893 * be the next location of the iterator. 5894 */ 5895 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 5896 { 5897 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5898 unsigned long flags; 5899 5900 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5901 5902 rb_advance_iter(iter); 5903 5904 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5905 } 5906 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 5907 5908 /** 5909 * ring_buffer_size - return the size of the ring buffer (in bytes) 5910 * @buffer: The ring buffer. 5911 * @cpu: The CPU to get ring buffer size from. 5912 */ 5913 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 5914 { 5915 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5916 return 0; 5917 5918 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 5919 } 5920 EXPORT_SYMBOL_GPL(ring_buffer_size); 5921 5922 /** 5923 * ring_buffer_max_event_size - return the max data size of an event 5924 * @buffer: The ring buffer. 5925 * 5926 * Returns the maximum size an event can be. 5927 */ 5928 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 5929 { 5930 /* If abs timestamp is requested, events have a timestamp too */ 5931 if (ring_buffer_time_stamp_abs(buffer)) 5932 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 5933 return buffer->max_data_size; 5934 } 5935 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 5936 5937 static void rb_clear_buffer_page(struct buffer_page *page) 5938 { 5939 local_set(&page->write, 0); 5940 local_set(&page->entries, 0); 5941 rb_init_page(page->page); 5942 page->read = 0; 5943 } 5944 5945 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 5946 { 5947 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 5948 5949 if (!meta) 5950 return; 5951 5952 meta->reader.read = cpu_buffer->reader_page->read; 5953 meta->reader.id = cpu_buffer->reader_page->id; 5954 meta->reader.lost_events = cpu_buffer->lost_events; 5955 5956 meta->entries = local_read(&cpu_buffer->entries); 5957 meta->overrun = local_read(&cpu_buffer->overrun); 5958 meta->read = cpu_buffer->read; 5959 5960 /* Some archs do not have data cache coherency between kernel and user-space */ 5961 flush_dcache_folio(virt_to_folio(cpu_buffer->meta_page)); 5962 } 5963 5964 static void 5965 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 5966 { 5967 struct buffer_page *page; 5968 5969 rb_head_page_deactivate(cpu_buffer); 5970 5971 cpu_buffer->head_page 5972 = list_entry(cpu_buffer->pages, struct buffer_page, list); 5973 rb_clear_buffer_page(cpu_buffer->head_page); 5974 list_for_each_entry(page, cpu_buffer->pages, list) { 5975 rb_clear_buffer_page(page); 5976 } 5977 5978 cpu_buffer->tail_page = cpu_buffer->head_page; 5979 cpu_buffer->commit_page = cpu_buffer->head_page; 5980 5981 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 5982 INIT_LIST_HEAD(&cpu_buffer->new_pages); 5983 rb_clear_buffer_page(cpu_buffer->reader_page); 5984 5985 local_set(&cpu_buffer->entries_bytes, 0); 5986 local_set(&cpu_buffer->overrun, 0); 5987 local_set(&cpu_buffer->commit_overrun, 0); 5988 local_set(&cpu_buffer->dropped_events, 0); 5989 local_set(&cpu_buffer->entries, 0); 5990 local_set(&cpu_buffer->committing, 0); 5991 local_set(&cpu_buffer->commits, 0); 5992 local_set(&cpu_buffer->pages_touched, 0); 5993 local_set(&cpu_buffer->pages_lost, 0); 5994 local_set(&cpu_buffer->pages_read, 0); 5995 cpu_buffer->last_pages_touch = 0; 5996 cpu_buffer->shortest_full = 0; 5997 cpu_buffer->read = 0; 5998 cpu_buffer->read_bytes = 0; 5999 6000 rb_time_set(&cpu_buffer->write_stamp, 0); 6001 rb_time_set(&cpu_buffer->before_stamp, 0); 6002 6003 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 6004 6005 cpu_buffer->lost_events = 0; 6006 cpu_buffer->last_overrun = 0; 6007 6008 rb_head_page_activate(cpu_buffer); 6009 cpu_buffer->pages_removed = 0; 6010 6011 if (cpu_buffer->mapped) { 6012 rb_update_meta_page(cpu_buffer); 6013 if (cpu_buffer->ring_meta) { 6014 struct ring_buffer_meta *meta = cpu_buffer->ring_meta; 6015 meta->commit_buffer = meta->head_buffer; 6016 } 6017 } 6018 } 6019 6020 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 6021 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6022 { 6023 unsigned long flags; 6024 6025 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6026 6027 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 6028 goto out; 6029 6030 arch_spin_lock(&cpu_buffer->lock); 6031 6032 rb_reset_cpu(cpu_buffer); 6033 6034 arch_spin_unlock(&cpu_buffer->lock); 6035 6036 out: 6037 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6038 } 6039 6040 /** 6041 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 6042 * @buffer: The ring buffer to reset a per cpu buffer of 6043 * @cpu: The CPU buffer to be reset 6044 */ 6045 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 6046 { 6047 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6048 struct ring_buffer_meta *meta; 6049 6050 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6051 return; 6052 6053 /* prevent another thread from changing buffer sizes */ 6054 mutex_lock(&buffer->mutex); 6055 6056 atomic_inc(&cpu_buffer->resize_disabled); 6057 atomic_inc(&cpu_buffer->record_disabled); 6058 6059 /* Make sure all commits have finished */ 6060 synchronize_rcu(); 6061 6062 reset_disabled_cpu_buffer(cpu_buffer); 6063 6064 atomic_dec(&cpu_buffer->record_disabled); 6065 atomic_dec(&cpu_buffer->resize_disabled); 6066 6067 /* Make sure persistent meta now uses this buffer's addresses */ 6068 meta = rb_range_meta(buffer, 0, cpu_buffer->cpu); 6069 if (meta) 6070 rb_meta_init_text_addr(meta); 6071 6072 mutex_unlock(&buffer->mutex); 6073 } 6074 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 6075 6076 /* Flag to ensure proper resetting of atomic variables */ 6077 #define RESET_BIT (1 << 30) 6078 6079 /** 6080 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 6081 * @buffer: The ring buffer to reset a per cpu buffer of 6082 */ 6083 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 6084 { 6085 struct ring_buffer_per_cpu *cpu_buffer; 6086 struct ring_buffer_meta *meta; 6087 int cpu; 6088 6089 /* prevent another thread from changing buffer sizes */ 6090 mutex_lock(&buffer->mutex); 6091 6092 for_each_online_buffer_cpu(buffer, cpu) { 6093 cpu_buffer = buffer->buffers[cpu]; 6094 6095 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 6096 atomic_inc(&cpu_buffer->record_disabled); 6097 } 6098 6099 /* Make sure all commits have finished */ 6100 synchronize_rcu(); 6101 6102 for_each_buffer_cpu(buffer, cpu) { 6103 cpu_buffer = buffer->buffers[cpu]; 6104 6105 /* 6106 * If a CPU came online during the synchronize_rcu(), then 6107 * ignore it. 6108 */ 6109 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 6110 continue; 6111 6112 reset_disabled_cpu_buffer(cpu_buffer); 6113 6114 /* Make sure persistent meta now uses this buffer's addresses */ 6115 meta = rb_range_meta(buffer, 0, cpu_buffer->cpu); 6116 if (meta) 6117 rb_meta_init_text_addr(meta); 6118 6119 atomic_dec(&cpu_buffer->record_disabled); 6120 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 6121 } 6122 6123 mutex_unlock(&buffer->mutex); 6124 } 6125 6126 /** 6127 * ring_buffer_reset - reset a ring buffer 6128 * @buffer: The ring buffer to reset all cpu buffers 6129 */ 6130 void ring_buffer_reset(struct trace_buffer *buffer) 6131 { 6132 struct ring_buffer_per_cpu *cpu_buffer; 6133 int cpu; 6134 6135 /* prevent another thread from changing buffer sizes */ 6136 mutex_lock(&buffer->mutex); 6137 6138 for_each_buffer_cpu(buffer, cpu) { 6139 cpu_buffer = buffer->buffers[cpu]; 6140 6141 atomic_inc(&cpu_buffer->resize_disabled); 6142 atomic_inc(&cpu_buffer->record_disabled); 6143 } 6144 6145 /* Make sure all commits have finished */ 6146 synchronize_rcu(); 6147 6148 for_each_buffer_cpu(buffer, cpu) { 6149 cpu_buffer = buffer->buffers[cpu]; 6150 6151 reset_disabled_cpu_buffer(cpu_buffer); 6152 6153 atomic_dec(&cpu_buffer->record_disabled); 6154 atomic_dec(&cpu_buffer->resize_disabled); 6155 } 6156 6157 mutex_unlock(&buffer->mutex); 6158 } 6159 EXPORT_SYMBOL_GPL(ring_buffer_reset); 6160 6161 /** 6162 * ring_buffer_empty - is the ring buffer empty? 6163 * @buffer: The ring buffer to test 6164 */ 6165 bool ring_buffer_empty(struct trace_buffer *buffer) 6166 { 6167 struct ring_buffer_per_cpu *cpu_buffer; 6168 unsigned long flags; 6169 bool dolock; 6170 bool ret; 6171 int cpu; 6172 6173 /* yes this is racy, but if you don't like the race, lock the buffer */ 6174 for_each_buffer_cpu(buffer, cpu) { 6175 cpu_buffer = buffer->buffers[cpu]; 6176 local_irq_save(flags); 6177 dolock = rb_reader_lock(cpu_buffer); 6178 ret = rb_per_cpu_empty(cpu_buffer); 6179 rb_reader_unlock(cpu_buffer, dolock); 6180 local_irq_restore(flags); 6181 6182 if (!ret) 6183 return false; 6184 } 6185 6186 return true; 6187 } 6188 EXPORT_SYMBOL_GPL(ring_buffer_empty); 6189 6190 /** 6191 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 6192 * @buffer: The ring buffer 6193 * @cpu: The CPU buffer to test 6194 */ 6195 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 6196 { 6197 struct ring_buffer_per_cpu *cpu_buffer; 6198 unsigned long flags; 6199 bool dolock; 6200 bool ret; 6201 6202 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6203 return true; 6204 6205 cpu_buffer = buffer->buffers[cpu]; 6206 local_irq_save(flags); 6207 dolock = rb_reader_lock(cpu_buffer); 6208 ret = rb_per_cpu_empty(cpu_buffer); 6209 rb_reader_unlock(cpu_buffer, dolock); 6210 local_irq_restore(flags); 6211 6212 return ret; 6213 } 6214 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 6215 6216 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 6217 /** 6218 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 6219 * @buffer_a: One buffer to swap with 6220 * @buffer_b: The other buffer to swap with 6221 * @cpu: the CPU of the buffers to swap 6222 * 6223 * This function is useful for tracers that want to take a "snapshot" 6224 * of a CPU buffer and has another back up buffer lying around. 6225 * it is expected that the tracer handles the cpu buffer not being 6226 * used at the moment. 6227 */ 6228 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 6229 struct trace_buffer *buffer_b, int cpu) 6230 { 6231 struct ring_buffer_per_cpu *cpu_buffer_a; 6232 struct ring_buffer_per_cpu *cpu_buffer_b; 6233 int ret = -EINVAL; 6234 6235 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 6236 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 6237 goto out; 6238 6239 cpu_buffer_a = buffer_a->buffers[cpu]; 6240 cpu_buffer_b = buffer_b->buffers[cpu]; 6241 6242 /* It's up to the callers to not try to swap mapped buffers */ 6243 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) { 6244 ret = -EBUSY; 6245 goto out; 6246 } 6247 6248 /* At least make sure the two buffers are somewhat the same */ 6249 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 6250 goto out; 6251 6252 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 6253 goto out; 6254 6255 ret = -EAGAIN; 6256 6257 if (atomic_read(&buffer_a->record_disabled)) 6258 goto out; 6259 6260 if (atomic_read(&buffer_b->record_disabled)) 6261 goto out; 6262 6263 if (atomic_read(&cpu_buffer_a->record_disabled)) 6264 goto out; 6265 6266 if (atomic_read(&cpu_buffer_b->record_disabled)) 6267 goto out; 6268 6269 /* 6270 * We can't do a synchronize_rcu here because this 6271 * function can be called in atomic context. 6272 * Normally this will be called from the same CPU as cpu. 6273 * If not it's up to the caller to protect this. 6274 */ 6275 atomic_inc(&cpu_buffer_a->record_disabled); 6276 atomic_inc(&cpu_buffer_b->record_disabled); 6277 6278 ret = -EBUSY; 6279 if (local_read(&cpu_buffer_a->committing)) 6280 goto out_dec; 6281 if (local_read(&cpu_buffer_b->committing)) 6282 goto out_dec; 6283 6284 /* 6285 * When resize is in progress, we cannot swap it because 6286 * it will mess the state of the cpu buffer. 6287 */ 6288 if (atomic_read(&buffer_a->resizing)) 6289 goto out_dec; 6290 if (atomic_read(&buffer_b->resizing)) 6291 goto out_dec; 6292 6293 buffer_a->buffers[cpu] = cpu_buffer_b; 6294 buffer_b->buffers[cpu] = cpu_buffer_a; 6295 6296 cpu_buffer_b->buffer = buffer_a; 6297 cpu_buffer_a->buffer = buffer_b; 6298 6299 ret = 0; 6300 6301 out_dec: 6302 atomic_dec(&cpu_buffer_a->record_disabled); 6303 atomic_dec(&cpu_buffer_b->record_disabled); 6304 out: 6305 return ret; 6306 } 6307 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 6308 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 6309 6310 /** 6311 * ring_buffer_alloc_read_page - allocate a page to read from buffer 6312 * @buffer: the buffer to allocate for. 6313 * @cpu: the cpu buffer to allocate. 6314 * 6315 * This function is used in conjunction with ring_buffer_read_page. 6316 * When reading a full page from the ring buffer, these functions 6317 * can be used to speed up the process. The calling function should 6318 * allocate a few pages first with this function. Then when it 6319 * needs to get pages from the ring buffer, it passes the result 6320 * of this function into ring_buffer_read_page, which will swap 6321 * the page that was allocated, with the read page of the buffer. 6322 * 6323 * Returns: 6324 * The page allocated, or ERR_PTR 6325 */ 6326 struct buffer_data_read_page * 6327 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 6328 { 6329 struct ring_buffer_per_cpu *cpu_buffer; 6330 struct buffer_data_read_page *bpage = NULL; 6331 unsigned long flags; 6332 struct page *page; 6333 6334 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6335 return ERR_PTR(-ENODEV); 6336 6337 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 6338 if (!bpage) 6339 return ERR_PTR(-ENOMEM); 6340 6341 bpage->order = buffer->subbuf_order; 6342 cpu_buffer = buffer->buffers[cpu]; 6343 local_irq_save(flags); 6344 arch_spin_lock(&cpu_buffer->lock); 6345 6346 if (cpu_buffer->free_page) { 6347 bpage->data = cpu_buffer->free_page; 6348 cpu_buffer->free_page = NULL; 6349 } 6350 6351 arch_spin_unlock(&cpu_buffer->lock); 6352 local_irq_restore(flags); 6353 6354 if (bpage->data) 6355 goto out; 6356 6357 page = alloc_pages_node(cpu_to_node(cpu), 6358 GFP_KERNEL | __GFP_NORETRY | __GFP_COMP | __GFP_ZERO, 6359 cpu_buffer->buffer->subbuf_order); 6360 if (!page) { 6361 kfree(bpage); 6362 return ERR_PTR(-ENOMEM); 6363 } 6364 6365 bpage->data = page_address(page); 6366 6367 out: 6368 rb_init_page(bpage->data); 6369 6370 return bpage; 6371 } 6372 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 6373 6374 /** 6375 * ring_buffer_free_read_page - free an allocated read page 6376 * @buffer: the buffer the page was allocate for 6377 * @cpu: the cpu buffer the page came from 6378 * @data_page: the page to free 6379 * 6380 * Free a page allocated from ring_buffer_alloc_read_page. 6381 */ 6382 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 6383 struct buffer_data_read_page *data_page) 6384 { 6385 struct ring_buffer_per_cpu *cpu_buffer; 6386 struct buffer_data_page *bpage = data_page->data; 6387 struct page *page = virt_to_page(bpage); 6388 unsigned long flags; 6389 6390 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 6391 return; 6392 6393 cpu_buffer = buffer->buffers[cpu]; 6394 6395 /* 6396 * If the page is still in use someplace else, or order of the page 6397 * is different from the subbuffer order of the buffer - 6398 * we can't reuse it 6399 */ 6400 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 6401 goto out; 6402 6403 local_irq_save(flags); 6404 arch_spin_lock(&cpu_buffer->lock); 6405 6406 if (!cpu_buffer->free_page) { 6407 cpu_buffer->free_page = bpage; 6408 bpage = NULL; 6409 } 6410 6411 arch_spin_unlock(&cpu_buffer->lock); 6412 local_irq_restore(flags); 6413 6414 out: 6415 free_pages((unsigned long)bpage, data_page->order); 6416 kfree(data_page); 6417 } 6418 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 6419 6420 /** 6421 * ring_buffer_read_page - extract a page from the ring buffer 6422 * @buffer: buffer to extract from 6423 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 6424 * @len: amount to extract 6425 * @cpu: the cpu of the buffer to extract 6426 * @full: should the extraction only happen when the page is full. 6427 * 6428 * This function will pull out a page from the ring buffer and consume it. 6429 * @data_page must be the address of the variable that was returned 6430 * from ring_buffer_alloc_read_page. This is because the page might be used 6431 * to swap with a page in the ring buffer. 6432 * 6433 * for example: 6434 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 6435 * if (IS_ERR(rpage)) 6436 * return PTR_ERR(rpage); 6437 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 6438 * if (ret >= 0) 6439 * process_page(ring_buffer_read_page_data(rpage), ret); 6440 * ring_buffer_free_read_page(buffer, cpu, rpage); 6441 * 6442 * When @full is set, the function will not return true unless 6443 * the writer is off the reader page. 6444 * 6445 * Note: it is up to the calling functions to handle sleeps and wakeups. 6446 * The ring buffer can be used anywhere in the kernel and can not 6447 * blindly call wake_up. The layer that uses the ring buffer must be 6448 * responsible for that. 6449 * 6450 * Returns: 6451 * >=0 if data has been transferred, returns the offset of consumed data. 6452 * <0 if no data has been transferred. 6453 */ 6454 int ring_buffer_read_page(struct trace_buffer *buffer, 6455 struct buffer_data_read_page *data_page, 6456 size_t len, int cpu, int full) 6457 { 6458 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6459 struct ring_buffer_event *event; 6460 struct buffer_data_page *bpage; 6461 struct buffer_page *reader; 6462 unsigned long missed_events; 6463 unsigned long flags; 6464 unsigned int commit; 6465 unsigned int read; 6466 u64 save_timestamp; 6467 int ret = -1; 6468 6469 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6470 goto out; 6471 6472 /* 6473 * If len is not big enough to hold the page header, then 6474 * we can not copy anything. 6475 */ 6476 if (len <= BUF_PAGE_HDR_SIZE) 6477 goto out; 6478 6479 len -= BUF_PAGE_HDR_SIZE; 6480 6481 if (!data_page || !data_page->data) 6482 goto out; 6483 if (data_page->order != buffer->subbuf_order) 6484 goto out; 6485 6486 bpage = data_page->data; 6487 if (!bpage) 6488 goto out; 6489 6490 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6491 6492 reader = rb_get_reader_page(cpu_buffer); 6493 if (!reader) 6494 goto out_unlock; 6495 6496 event = rb_reader_event(cpu_buffer); 6497 6498 read = reader->read; 6499 commit = rb_page_size(reader); 6500 6501 /* Check if any events were dropped */ 6502 missed_events = cpu_buffer->lost_events; 6503 6504 /* 6505 * If this page has been partially read or 6506 * if len is not big enough to read the rest of the page or 6507 * a writer is still on the page, then 6508 * we must copy the data from the page to the buffer. 6509 * Otherwise, we can simply swap the page with the one passed in. 6510 */ 6511 if (read || (len < (commit - read)) || 6512 cpu_buffer->reader_page == cpu_buffer->commit_page || 6513 cpu_buffer->mapped) { 6514 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 6515 unsigned int rpos = read; 6516 unsigned int pos = 0; 6517 unsigned int size; 6518 6519 /* 6520 * If a full page is expected, this can still be returned 6521 * if there's been a previous partial read and the 6522 * rest of the page can be read and the commit page is off 6523 * the reader page. 6524 */ 6525 if (full && 6526 (!read || (len < (commit - read)) || 6527 cpu_buffer->reader_page == cpu_buffer->commit_page)) 6528 goto out_unlock; 6529 6530 if (len > (commit - read)) 6531 len = (commit - read); 6532 6533 /* Always keep the time extend and data together */ 6534 size = rb_event_ts_length(event); 6535 6536 if (len < size) 6537 goto out_unlock; 6538 6539 /* save the current timestamp, since the user will need it */ 6540 save_timestamp = cpu_buffer->read_stamp; 6541 6542 /* Need to copy one event at a time */ 6543 do { 6544 /* We need the size of one event, because 6545 * rb_advance_reader only advances by one event, 6546 * whereas rb_event_ts_length may include the size of 6547 * one or two events. 6548 * We have already ensured there's enough space if this 6549 * is a time extend. */ 6550 size = rb_event_length(event); 6551 memcpy(bpage->data + pos, rpage->data + rpos, size); 6552 6553 len -= size; 6554 6555 rb_advance_reader(cpu_buffer); 6556 rpos = reader->read; 6557 pos += size; 6558 6559 if (rpos >= commit) 6560 break; 6561 6562 event = rb_reader_event(cpu_buffer); 6563 /* Always keep the time extend and data together */ 6564 size = rb_event_ts_length(event); 6565 } while (len >= size); 6566 6567 /* update bpage */ 6568 local_set(&bpage->commit, pos); 6569 bpage->time_stamp = save_timestamp; 6570 6571 /* we copied everything to the beginning */ 6572 read = 0; 6573 } else { 6574 /* update the entry counter */ 6575 cpu_buffer->read += rb_page_entries(reader); 6576 cpu_buffer->read_bytes += rb_page_size(reader); 6577 6578 /* swap the pages */ 6579 rb_init_page(bpage); 6580 bpage = reader->page; 6581 reader->page = data_page->data; 6582 local_set(&reader->write, 0); 6583 local_set(&reader->entries, 0); 6584 reader->read = 0; 6585 data_page->data = bpage; 6586 6587 /* 6588 * Use the real_end for the data size, 6589 * This gives us a chance to store the lost events 6590 * on the page. 6591 */ 6592 if (reader->real_end) 6593 local_set(&bpage->commit, reader->real_end); 6594 } 6595 ret = read; 6596 6597 cpu_buffer->lost_events = 0; 6598 6599 commit = local_read(&bpage->commit); 6600 /* 6601 * Set a flag in the commit field if we lost events 6602 */ 6603 if (missed_events) { 6604 /* If there is room at the end of the page to save the 6605 * missed events, then record it there. 6606 */ 6607 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 6608 memcpy(&bpage->data[commit], &missed_events, 6609 sizeof(missed_events)); 6610 local_add(RB_MISSED_STORED, &bpage->commit); 6611 commit += sizeof(missed_events); 6612 } 6613 local_add(RB_MISSED_EVENTS, &bpage->commit); 6614 } 6615 6616 /* 6617 * This page may be off to user land. Zero it out here. 6618 */ 6619 if (commit < buffer->subbuf_size) 6620 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 6621 6622 out_unlock: 6623 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6624 6625 out: 6626 return ret; 6627 } 6628 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 6629 6630 /** 6631 * ring_buffer_read_page_data - get pointer to the data in the page. 6632 * @page: the page to get the data from 6633 * 6634 * Returns pointer to the actual data in this page. 6635 */ 6636 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 6637 { 6638 return page->data; 6639 } 6640 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 6641 6642 /** 6643 * ring_buffer_subbuf_size_get - get size of the sub buffer. 6644 * @buffer: the buffer to get the sub buffer size from 6645 * 6646 * Returns size of the sub buffer, in bytes. 6647 */ 6648 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 6649 { 6650 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6651 } 6652 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 6653 6654 /** 6655 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 6656 * @buffer: The ring_buffer to get the system sub page order from 6657 * 6658 * By default, one ring buffer sub page equals to one system page. This parameter 6659 * is configurable, per ring buffer. The size of the ring buffer sub page can be 6660 * extended, but must be an order of system page size. 6661 * 6662 * Returns the order of buffer sub page size, in system pages: 6663 * 0 means the sub buffer size is 1 system page and so forth. 6664 * In case of an error < 0 is returned. 6665 */ 6666 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 6667 { 6668 if (!buffer) 6669 return -EINVAL; 6670 6671 return buffer->subbuf_order; 6672 } 6673 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 6674 6675 /** 6676 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 6677 * @buffer: The ring_buffer to set the new page size. 6678 * @order: Order of the system pages in one sub buffer page 6679 * 6680 * By default, one ring buffer pages equals to one system page. This API can be 6681 * used to set new size of the ring buffer page. The size must be order of 6682 * system page size, that's why the input parameter @order is the order of 6683 * system pages that are allocated for one ring buffer page: 6684 * 0 - 1 system page 6685 * 1 - 2 system pages 6686 * 3 - 4 system pages 6687 * ... 6688 * 6689 * Returns 0 on success or < 0 in case of an error. 6690 */ 6691 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 6692 { 6693 struct ring_buffer_per_cpu *cpu_buffer; 6694 struct buffer_page *bpage, *tmp; 6695 int old_order, old_size; 6696 int nr_pages; 6697 int psize; 6698 int err; 6699 int cpu; 6700 6701 if (!buffer || order < 0) 6702 return -EINVAL; 6703 6704 if (buffer->subbuf_order == order) 6705 return 0; 6706 6707 psize = (1 << order) * PAGE_SIZE; 6708 if (psize <= BUF_PAGE_HDR_SIZE) 6709 return -EINVAL; 6710 6711 /* Size of a subbuf cannot be greater than the write counter */ 6712 if (psize > RB_WRITE_MASK + 1) 6713 return -EINVAL; 6714 6715 old_order = buffer->subbuf_order; 6716 old_size = buffer->subbuf_size; 6717 6718 /* prevent another thread from changing buffer sizes */ 6719 mutex_lock(&buffer->mutex); 6720 atomic_inc(&buffer->record_disabled); 6721 6722 /* Make sure all commits have finished */ 6723 synchronize_rcu(); 6724 6725 buffer->subbuf_order = order; 6726 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 6727 6728 /* Make sure all new buffers are allocated, before deleting the old ones */ 6729 for_each_buffer_cpu(buffer, cpu) { 6730 6731 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6732 continue; 6733 6734 cpu_buffer = buffer->buffers[cpu]; 6735 6736 if (cpu_buffer->mapped) { 6737 err = -EBUSY; 6738 goto error; 6739 } 6740 6741 /* Update the number of pages to match the new size */ 6742 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 6743 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 6744 6745 /* we need a minimum of two pages */ 6746 if (nr_pages < 2) 6747 nr_pages = 2; 6748 6749 cpu_buffer->nr_pages_to_update = nr_pages; 6750 6751 /* Include the reader page */ 6752 nr_pages++; 6753 6754 /* Allocate the new size buffer */ 6755 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6756 if (__rb_allocate_pages(cpu_buffer, nr_pages, 6757 &cpu_buffer->new_pages)) { 6758 /* not enough memory for new pages */ 6759 err = -ENOMEM; 6760 goto error; 6761 } 6762 } 6763 6764 for_each_buffer_cpu(buffer, cpu) { 6765 struct buffer_data_page *old_free_data_page; 6766 struct list_head old_pages; 6767 unsigned long flags; 6768 6769 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6770 continue; 6771 6772 cpu_buffer = buffer->buffers[cpu]; 6773 6774 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6775 6776 /* Clear the head bit to make the link list normal to read */ 6777 rb_head_page_deactivate(cpu_buffer); 6778 6779 /* 6780 * Collect buffers from the cpu_buffer pages list and the 6781 * reader_page on old_pages, so they can be freed later when not 6782 * under a spinlock. The pages list is a linked list with no 6783 * head, adding old_pages turns it into a regular list with 6784 * old_pages being the head. 6785 */ 6786 list_add(&old_pages, cpu_buffer->pages); 6787 list_add(&cpu_buffer->reader_page->list, &old_pages); 6788 6789 /* One page was allocated for the reader page */ 6790 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 6791 struct buffer_page, list); 6792 list_del_init(&cpu_buffer->reader_page->list); 6793 6794 /* Install the new pages, remove the head from the list */ 6795 cpu_buffer->pages = cpu_buffer->new_pages.next; 6796 list_del_init(&cpu_buffer->new_pages); 6797 cpu_buffer->cnt++; 6798 6799 cpu_buffer->head_page 6800 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6801 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 6802 6803 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 6804 cpu_buffer->nr_pages_to_update = 0; 6805 6806 old_free_data_page = cpu_buffer->free_page; 6807 cpu_buffer->free_page = NULL; 6808 6809 rb_head_page_activate(cpu_buffer); 6810 6811 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6812 6813 /* Free old sub buffers */ 6814 list_for_each_entry_safe(bpage, tmp, &old_pages, list) { 6815 list_del_init(&bpage->list); 6816 free_buffer_page(bpage); 6817 } 6818 free_pages((unsigned long)old_free_data_page, old_order); 6819 6820 rb_check_pages(cpu_buffer); 6821 } 6822 6823 atomic_dec(&buffer->record_disabled); 6824 mutex_unlock(&buffer->mutex); 6825 6826 return 0; 6827 6828 error: 6829 buffer->subbuf_order = old_order; 6830 buffer->subbuf_size = old_size; 6831 6832 atomic_dec(&buffer->record_disabled); 6833 mutex_unlock(&buffer->mutex); 6834 6835 for_each_buffer_cpu(buffer, cpu) { 6836 cpu_buffer = buffer->buffers[cpu]; 6837 6838 if (!cpu_buffer->nr_pages_to_update) 6839 continue; 6840 6841 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 6842 list_del_init(&bpage->list); 6843 free_buffer_page(bpage); 6844 } 6845 } 6846 6847 return err; 6848 } 6849 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 6850 6851 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6852 { 6853 struct page *page; 6854 6855 if (cpu_buffer->meta_page) 6856 return 0; 6857 6858 page = alloc_page(GFP_USER | __GFP_ZERO); 6859 if (!page) 6860 return -ENOMEM; 6861 6862 cpu_buffer->meta_page = page_to_virt(page); 6863 6864 return 0; 6865 } 6866 6867 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6868 { 6869 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 6870 6871 free_page(addr); 6872 cpu_buffer->meta_page = NULL; 6873 } 6874 6875 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 6876 unsigned long *subbuf_ids) 6877 { 6878 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6879 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 6880 struct buffer_page *first_subbuf, *subbuf; 6881 int id = 0; 6882 6883 subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page; 6884 cpu_buffer->reader_page->id = id++; 6885 6886 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 6887 do { 6888 if (WARN_ON(id >= nr_subbufs)) 6889 break; 6890 6891 subbuf_ids[id] = (unsigned long)subbuf->page; 6892 subbuf->id = id; 6893 6894 rb_inc_page(&subbuf); 6895 id++; 6896 } while (subbuf != first_subbuf); 6897 6898 /* install subbuf ID to kern VA translation */ 6899 cpu_buffer->subbuf_ids = subbuf_ids; 6900 6901 meta->meta_struct_len = sizeof(*meta); 6902 meta->nr_subbufs = nr_subbufs; 6903 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6904 meta->meta_page_size = meta->subbuf_size; 6905 6906 rb_update_meta_page(cpu_buffer); 6907 } 6908 6909 static struct ring_buffer_per_cpu * 6910 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 6911 { 6912 struct ring_buffer_per_cpu *cpu_buffer; 6913 6914 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6915 return ERR_PTR(-EINVAL); 6916 6917 cpu_buffer = buffer->buffers[cpu]; 6918 6919 mutex_lock(&cpu_buffer->mapping_lock); 6920 6921 if (!cpu_buffer->user_mapped) { 6922 mutex_unlock(&cpu_buffer->mapping_lock); 6923 return ERR_PTR(-ENODEV); 6924 } 6925 6926 return cpu_buffer; 6927 } 6928 6929 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6930 { 6931 mutex_unlock(&cpu_buffer->mapping_lock); 6932 } 6933 6934 /* 6935 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 6936 * to be set-up or torn-down. 6937 */ 6938 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 6939 bool inc) 6940 { 6941 unsigned long flags; 6942 6943 lockdep_assert_held(&cpu_buffer->mapping_lock); 6944 6945 /* mapped is always greater or equal to user_mapped */ 6946 if (WARN_ON(cpu_buffer->mapped < cpu_buffer->user_mapped)) 6947 return -EINVAL; 6948 6949 if (inc && cpu_buffer->mapped == UINT_MAX) 6950 return -EBUSY; 6951 6952 if (WARN_ON(!inc && cpu_buffer->user_mapped == 0)) 6953 return -EINVAL; 6954 6955 mutex_lock(&cpu_buffer->buffer->mutex); 6956 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6957 6958 if (inc) { 6959 cpu_buffer->user_mapped++; 6960 cpu_buffer->mapped++; 6961 } else { 6962 cpu_buffer->user_mapped--; 6963 cpu_buffer->mapped--; 6964 } 6965 6966 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6967 mutex_unlock(&cpu_buffer->buffer->mutex); 6968 6969 return 0; 6970 } 6971 6972 /* 6973 * +--------------+ pgoff == 0 6974 * | meta page | 6975 * +--------------+ pgoff == 1 6976 * | subbuffer 0 | 6977 * | | 6978 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 6979 * | subbuffer 1 | 6980 * | | 6981 * ... 6982 */ 6983 #ifdef CONFIG_MMU 6984 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 6985 struct vm_area_struct *vma) 6986 { 6987 unsigned long nr_subbufs, nr_pages, nr_vma_pages, pgoff = vma->vm_pgoff; 6988 unsigned int subbuf_pages, subbuf_order; 6989 struct page **pages; 6990 int p = 0, s = 0; 6991 int err; 6992 6993 /* Refuse MP_PRIVATE or writable mappings */ 6994 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 6995 !(vma->vm_flags & VM_MAYSHARE)) 6996 return -EPERM; 6997 6998 subbuf_order = cpu_buffer->buffer->subbuf_order; 6999 subbuf_pages = 1 << subbuf_order; 7000 7001 if (subbuf_order && pgoff % subbuf_pages) 7002 return -EINVAL; 7003 7004 /* 7005 * Make sure the mapping cannot become writable later. Also tell the VM 7006 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 7007 */ 7008 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 7009 VM_MAYWRITE); 7010 7011 lockdep_assert_held(&cpu_buffer->mapping_lock); 7012 7013 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 7014 nr_pages = ((nr_subbufs + 1) << subbuf_order); /* + meta-page */ 7015 if (nr_pages <= pgoff) 7016 return -EINVAL; 7017 7018 nr_pages -= pgoff; 7019 7020 nr_vma_pages = vma_pages(vma); 7021 if (!nr_vma_pages || nr_vma_pages > nr_pages) 7022 return -EINVAL; 7023 7024 nr_pages = nr_vma_pages; 7025 7026 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL); 7027 if (!pages) 7028 return -ENOMEM; 7029 7030 if (!pgoff) { 7031 unsigned long meta_page_padding; 7032 7033 pages[p++] = virt_to_page(cpu_buffer->meta_page); 7034 7035 /* 7036 * Pad with the zero-page to align the meta-page with the 7037 * sub-buffers. 7038 */ 7039 meta_page_padding = subbuf_pages - 1; 7040 while (meta_page_padding-- && p < nr_pages) { 7041 unsigned long __maybe_unused zero_addr = 7042 vma->vm_start + (PAGE_SIZE * p); 7043 7044 pages[p++] = ZERO_PAGE(zero_addr); 7045 } 7046 } else { 7047 /* Skip the meta-page */ 7048 pgoff -= subbuf_pages; 7049 7050 s += pgoff / subbuf_pages; 7051 } 7052 7053 while (p < nr_pages) { 7054 struct page *page; 7055 int off = 0; 7056 7057 if (WARN_ON_ONCE(s >= nr_subbufs)) { 7058 err = -EINVAL; 7059 goto out; 7060 } 7061 7062 page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]); 7063 7064 for (; off < (1 << (subbuf_order)); off++, page++) { 7065 if (p >= nr_pages) 7066 break; 7067 7068 pages[p++] = page; 7069 } 7070 s++; 7071 } 7072 7073 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 7074 7075 out: 7076 kfree(pages); 7077 7078 return err; 7079 } 7080 #else 7081 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7082 struct vm_area_struct *vma) 7083 { 7084 return -EOPNOTSUPP; 7085 } 7086 #endif 7087 7088 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 7089 struct vm_area_struct *vma) 7090 { 7091 struct ring_buffer_per_cpu *cpu_buffer; 7092 unsigned long flags, *subbuf_ids; 7093 int err = 0; 7094 7095 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7096 return -EINVAL; 7097 7098 cpu_buffer = buffer->buffers[cpu]; 7099 7100 mutex_lock(&cpu_buffer->mapping_lock); 7101 7102 if (cpu_buffer->user_mapped) { 7103 err = __rb_map_vma(cpu_buffer, vma); 7104 if (!err) 7105 err = __rb_inc_dec_mapped(cpu_buffer, true); 7106 mutex_unlock(&cpu_buffer->mapping_lock); 7107 return err; 7108 } 7109 7110 /* prevent another thread from changing buffer/sub-buffer sizes */ 7111 mutex_lock(&buffer->mutex); 7112 7113 err = rb_alloc_meta_page(cpu_buffer); 7114 if (err) 7115 goto unlock; 7116 7117 /* subbuf_ids include the reader while nr_pages does not */ 7118 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 7119 if (!subbuf_ids) { 7120 rb_free_meta_page(cpu_buffer); 7121 err = -ENOMEM; 7122 goto unlock; 7123 } 7124 7125 atomic_inc(&cpu_buffer->resize_disabled); 7126 7127 /* 7128 * Lock all readers to block any subbuf swap until the subbuf IDs are 7129 * assigned. 7130 */ 7131 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7132 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 7133 7134 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7135 7136 err = __rb_map_vma(cpu_buffer, vma); 7137 if (!err) { 7138 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7139 /* This is the first time it is mapped by user */ 7140 cpu_buffer->mapped++; 7141 cpu_buffer->user_mapped = 1; 7142 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7143 } else { 7144 kfree(cpu_buffer->subbuf_ids); 7145 cpu_buffer->subbuf_ids = NULL; 7146 rb_free_meta_page(cpu_buffer); 7147 atomic_dec(&cpu_buffer->resize_disabled); 7148 } 7149 7150 unlock: 7151 mutex_unlock(&buffer->mutex); 7152 mutex_unlock(&cpu_buffer->mapping_lock); 7153 7154 return err; 7155 } 7156 7157 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 7158 { 7159 struct ring_buffer_per_cpu *cpu_buffer; 7160 unsigned long flags; 7161 int err = 0; 7162 7163 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7164 return -EINVAL; 7165 7166 cpu_buffer = buffer->buffers[cpu]; 7167 7168 mutex_lock(&cpu_buffer->mapping_lock); 7169 7170 if (!cpu_buffer->user_mapped) { 7171 err = -ENODEV; 7172 goto out; 7173 } else if (cpu_buffer->user_mapped > 1) { 7174 __rb_inc_dec_mapped(cpu_buffer, false); 7175 goto out; 7176 } 7177 7178 mutex_lock(&buffer->mutex); 7179 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7180 7181 /* This is the last user space mapping */ 7182 if (!WARN_ON_ONCE(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7183 cpu_buffer->mapped--; 7184 cpu_buffer->user_mapped = 0; 7185 7186 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7187 7188 kfree(cpu_buffer->subbuf_ids); 7189 cpu_buffer->subbuf_ids = NULL; 7190 rb_free_meta_page(cpu_buffer); 7191 atomic_dec(&cpu_buffer->resize_disabled); 7192 7193 mutex_unlock(&buffer->mutex); 7194 7195 out: 7196 mutex_unlock(&cpu_buffer->mapping_lock); 7197 7198 return err; 7199 } 7200 7201 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 7202 { 7203 struct ring_buffer_per_cpu *cpu_buffer; 7204 struct buffer_page *reader; 7205 unsigned long missed_events; 7206 unsigned long reader_size; 7207 unsigned long flags; 7208 7209 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 7210 if (IS_ERR(cpu_buffer)) 7211 return (int)PTR_ERR(cpu_buffer); 7212 7213 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7214 7215 consume: 7216 if (rb_per_cpu_empty(cpu_buffer)) 7217 goto out; 7218 7219 reader_size = rb_page_size(cpu_buffer->reader_page); 7220 7221 /* 7222 * There are data to be read on the current reader page, we can 7223 * return to the caller. But before that, we assume the latter will read 7224 * everything. Let's update the kernel reader accordingly. 7225 */ 7226 if (cpu_buffer->reader_page->read < reader_size) { 7227 while (cpu_buffer->reader_page->read < reader_size) 7228 rb_advance_reader(cpu_buffer); 7229 goto out; 7230 } 7231 7232 reader = rb_get_reader_page(cpu_buffer); 7233 if (WARN_ON(!reader)) 7234 goto out; 7235 7236 /* Check if any events were dropped */ 7237 missed_events = cpu_buffer->lost_events; 7238 7239 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 7240 if (missed_events) { 7241 struct buffer_data_page *bpage = reader->page; 7242 unsigned int commit; 7243 /* 7244 * Use the real_end for the data size, 7245 * This gives us a chance to store the lost events 7246 * on the page. 7247 */ 7248 if (reader->real_end) 7249 local_set(&bpage->commit, reader->real_end); 7250 /* 7251 * If there is room at the end of the page to save the 7252 * missed events, then record it there. 7253 */ 7254 commit = rb_page_size(reader); 7255 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7256 memcpy(&bpage->data[commit], &missed_events, 7257 sizeof(missed_events)); 7258 local_add(RB_MISSED_STORED, &bpage->commit); 7259 } 7260 local_add(RB_MISSED_EVENTS, &bpage->commit); 7261 } 7262 } else { 7263 /* 7264 * There really shouldn't be any missed events if the commit 7265 * is on the reader page. 7266 */ 7267 WARN_ON_ONCE(missed_events); 7268 } 7269 7270 cpu_buffer->lost_events = 0; 7271 7272 goto consume; 7273 7274 out: 7275 /* Some archs do not have data cache coherency between kernel and user-space */ 7276 flush_dcache_folio(virt_to_folio(cpu_buffer->reader_page->page)); 7277 7278 rb_update_meta_page(cpu_buffer); 7279 7280 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7281 rb_put_mapped_buffer(cpu_buffer); 7282 7283 return 0; 7284 } 7285 7286 /* 7287 * We only allocate new buffers, never free them if the CPU goes down. 7288 * If we were to free the buffer, then the user would lose any trace that was in 7289 * the buffer. 7290 */ 7291 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 7292 { 7293 struct trace_buffer *buffer; 7294 long nr_pages_same; 7295 int cpu_i; 7296 unsigned long nr_pages; 7297 7298 buffer = container_of(node, struct trace_buffer, node); 7299 if (cpumask_test_cpu(cpu, buffer->cpumask)) 7300 return 0; 7301 7302 nr_pages = 0; 7303 nr_pages_same = 1; 7304 /* check if all cpu sizes are same */ 7305 for_each_buffer_cpu(buffer, cpu_i) { 7306 /* fill in the size from first enabled cpu */ 7307 if (nr_pages == 0) 7308 nr_pages = buffer->buffers[cpu_i]->nr_pages; 7309 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 7310 nr_pages_same = 0; 7311 break; 7312 } 7313 } 7314 /* allocate minimum pages, user can later expand it */ 7315 if (!nr_pages_same) 7316 nr_pages = 2; 7317 buffer->buffers[cpu] = 7318 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 7319 if (!buffer->buffers[cpu]) { 7320 WARN(1, "failed to allocate ring buffer on CPU %u\n", 7321 cpu); 7322 return -ENOMEM; 7323 } 7324 smp_wmb(); 7325 cpumask_set_cpu(cpu, buffer->cpumask); 7326 return 0; 7327 } 7328 7329 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 7330 /* 7331 * This is a basic integrity check of the ring buffer. 7332 * Late in the boot cycle this test will run when configured in. 7333 * It will kick off a thread per CPU that will go into a loop 7334 * writing to the per cpu ring buffer various sizes of data. 7335 * Some of the data will be large items, some small. 7336 * 7337 * Another thread is created that goes into a spin, sending out 7338 * IPIs to the other CPUs to also write into the ring buffer. 7339 * this is to test the nesting ability of the buffer. 7340 * 7341 * Basic stats are recorded and reported. If something in the 7342 * ring buffer should happen that's not expected, a big warning 7343 * is displayed and all ring buffers are disabled. 7344 */ 7345 static struct task_struct *rb_threads[NR_CPUS] __initdata; 7346 7347 struct rb_test_data { 7348 struct trace_buffer *buffer; 7349 unsigned long events; 7350 unsigned long bytes_written; 7351 unsigned long bytes_alloc; 7352 unsigned long bytes_dropped; 7353 unsigned long events_nested; 7354 unsigned long bytes_written_nested; 7355 unsigned long bytes_alloc_nested; 7356 unsigned long bytes_dropped_nested; 7357 int min_size_nested; 7358 int max_size_nested; 7359 int max_size; 7360 int min_size; 7361 int cpu; 7362 int cnt; 7363 }; 7364 7365 static struct rb_test_data rb_data[NR_CPUS] __initdata; 7366 7367 /* 1 meg per cpu */ 7368 #define RB_TEST_BUFFER_SIZE 1048576 7369 7370 static char rb_string[] __initdata = 7371 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 7372 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 7373 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 7374 7375 static bool rb_test_started __initdata; 7376 7377 struct rb_item { 7378 int size; 7379 char str[]; 7380 }; 7381 7382 static __init int rb_write_something(struct rb_test_data *data, bool nested) 7383 { 7384 struct ring_buffer_event *event; 7385 struct rb_item *item; 7386 bool started; 7387 int event_len; 7388 int size; 7389 int len; 7390 int cnt; 7391 7392 /* Have nested writes different that what is written */ 7393 cnt = data->cnt + (nested ? 27 : 0); 7394 7395 /* Multiply cnt by ~e, to make some unique increment */ 7396 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 7397 7398 len = size + sizeof(struct rb_item); 7399 7400 started = rb_test_started; 7401 /* read rb_test_started before checking buffer enabled */ 7402 smp_rmb(); 7403 7404 event = ring_buffer_lock_reserve(data->buffer, len); 7405 if (!event) { 7406 /* Ignore dropped events before test starts. */ 7407 if (started) { 7408 if (nested) 7409 data->bytes_dropped += len; 7410 else 7411 data->bytes_dropped_nested += len; 7412 } 7413 return len; 7414 } 7415 7416 event_len = ring_buffer_event_length(event); 7417 7418 if (RB_WARN_ON(data->buffer, event_len < len)) 7419 goto out; 7420 7421 item = ring_buffer_event_data(event); 7422 item->size = size; 7423 memcpy(item->str, rb_string, size); 7424 7425 if (nested) { 7426 data->bytes_alloc_nested += event_len; 7427 data->bytes_written_nested += len; 7428 data->events_nested++; 7429 if (!data->min_size_nested || len < data->min_size_nested) 7430 data->min_size_nested = len; 7431 if (len > data->max_size_nested) 7432 data->max_size_nested = len; 7433 } else { 7434 data->bytes_alloc += event_len; 7435 data->bytes_written += len; 7436 data->events++; 7437 if (!data->min_size || len < data->min_size) 7438 data->max_size = len; 7439 if (len > data->max_size) 7440 data->max_size = len; 7441 } 7442 7443 out: 7444 ring_buffer_unlock_commit(data->buffer); 7445 7446 return 0; 7447 } 7448 7449 static __init int rb_test(void *arg) 7450 { 7451 struct rb_test_data *data = arg; 7452 7453 while (!kthread_should_stop()) { 7454 rb_write_something(data, false); 7455 data->cnt++; 7456 7457 set_current_state(TASK_INTERRUPTIBLE); 7458 /* Now sleep between a min of 100-300us and a max of 1ms */ 7459 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 7460 } 7461 7462 return 0; 7463 } 7464 7465 static __init void rb_ipi(void *ignore) 7466 { 7467 struct rb_test_data *data; 7468 int cpu = smp_processor_id(); 7469 7470 data = &rb_data[cpu]; 7471 rb_write_something(data, true); 7472 } 7473 7474 static __init int rb_hammer_test(void *arg) 7475 { 7476 while (!kthread_should_stop()) { 7477 7478 /* Send an IPI to all cpus to write data! */ 7479 smp_call_function(rb_ipi, NULL, 1); 7480 /* No sleep, but for non preempt, let others run */ 7481 schedule(); 7482 } 7483 7484 return 0; 7485 } 7486 7487 static __init int test_ringbuffer(void) 7488 { 7489 struct task_struct *rb_hammer; 7490 struct trace_buffer *buffer; 7491 int cpu; 7492 int ret = 0; 7493 7494 if (security_locked_down(LOCKDOWN_TRACEFS)) { 7495 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 7496 return 0; 7497 } 7498 7499 pr_info("Running ring buffer tests...\n"); 7500 7501 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 7502 if (WARN_ON(!buffer)) 7503 return 0; 7504 7505 /* Disable buffer so that threads can't write to it yet */ 7506 ring_buffer_record_off(buffer); 7507 7508 for_each_online_cpu(cpu) { 7509 rb_data[cpu].buffer = buffer; 7510 rb_data[cpu].cpu = cpu; 7511 rb_data[cpu].cnt = cpu; 7512 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 7513 cpu, "rbtester/%u"); 7514 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 7515 pr_cont("FAILED\n"); 7516 ret = PTR_ERR(rb_threads[cpu]); 7517 goto out_free; 7518 } 7519 } 7520 7521 /* Now create the rb hammer! */ 7522 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 7523 if (WARN_ON(IS_ERR(rb_hammer))) { 7524 pr_cont("FAILED\n"); 7525 ret = PTR_ERR(rb_hammer); 7526 goto out_free; 7527 } 7528 7529 ring_buffer_record_on(buffer); 7530 /* 7531 * Show buffer is enabled before setting rb_test_started. 7532 * Yes there's a small race window where events could be 7533 * dropped and the thread wont catch it. But when a ring 7534 * buffer gets enabled, there will always be some kind of 7535 * delay before other CPUs see it. Thus, we don't care about 7536 * those dropped events. We care about events dropped after 7537 * the threads see that the buffer is active. 7538 */ 7539 smp_wmb(); 7540 rb_test_started = true; 7541 7542 set_current_state(TASK_INTERRUPTIBLE); 7543 /* Just run for 10 seconds */; 7544 schedule_timeout(10 * HZ); 7545 7546 kthread_stop(rb_hammer); 7547 7548 out_free: 7549 for_each_online_cpu(cpu) { 7550 if (!rb_threads[cpu]) 7551 break; 7552 kthread_stop(rb_threads[cpu]); 7553 } 7554 if (ret) { 7555 ring_buffer_free(buffer); 7556 return ret; 7557 } 7558 7559 /* Report! */ 7560 pr_info("finished\n"); 7561 for_each_online_cpu(cpu) { 7562 struct ring_buffer_event *event; 7563 struct rb_test_data *data = &rb_data[cpu]; 7564 struct rb_item *item; 7565 unsigned long total_events; 7566 unsigned long total_dropped; 7567 unsigned long total_written; 7568 unsigned long total_alloc; 7569 unsigned long total_read = 0; 7570 unsigned long total_size = 0; 7571 unsigned long total_len = 0; 7572 unsigned long total_lost = 0; 7573 unsigned long lost; 7574 int big_event_size; 7575 int small_event_size; 7576 7577 ret = -1; 7578 7579 total_events = data->events + data->events_nested; 7580 total_written = data->bytes_written + data->bytes_written_nested; 7581 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 7582 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 7583 7584 big_event_size = data->max_size + data->max_size_nested; 7585 small_event_size = data->min_size + data->min_size_nested; 7586 7587 pr_info("CPU %d:\n", cpu); 7588 pr_info(" events: %ld\n", total_events); 7589 pr_info(" dropped bytes: %ld\n", total_dropped); 7590 pr_info(" alloced bytes: %ld\n", total_alloc); 7591 pr_info(" written bytes: %ld\n", total_written); 7592 pr_info(" biggest event: %d\n", big_event_size); 7593 pr_info(" smallest event: %d\n", small_event_size); 7594 7595 if (RB_WARN_ON(buffer, total_dropped)) 7596 break; 7597 7598 ret = 0; 7599 7600 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 7601 total_lost += lost; 7602 item = ring_buffer_event_data(event); 7603 total_len += ring_buffer_event_length(event); 7604 total_size += item->size + sizeof(struct rb_item); 7605 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 7606 pr_info("FAILED!\n"); 7607 pr_info("buffer had: %.*s\n", item->size, item->str); 7608 pr_info("expected: %.*s\n", item->size, rb_string); 7609 RB_WARN_ON(buffer, 1); 7610 ret = -1; 7611 break; 7612 } 7613 total_read++; 7614 } 7615 if (ret) 7616 break; 7617 7618 ret = -1; 7619 7620 pr_info(" read events: %ld\n", total_read); 7621 pr_info(" lost events: %ld\n", total_lost); 7622 pr_info(" total events: %ld\n", total_lost + total_read); 7623 pr_info(" recorded len bytes: %ld\n", total_len); 7624 pr_info(" recorded size bytes: %ld\n", total_size); 7625 if (total_lost) { 7626 pr_info(" With dropped events, record len and size may not match\n" 7627 " alloced and written from above\n"); 7628 } else { 7629 if (RB_WARN_ON(buffer, total_len != total_alloc || 7630 total_size != total_written)) 7631 break; 7632 } 7633 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 7634 break; 7635 7636 ret = 0; 7637 } 7638 if (!ret) 7639 pr_info("Ring buffer PASSED!\n"); 7640 7641 ring_buffer_free(buffer); 7642 return 0; 7643 } 7644 7645 late_initcall(test_ringbuffer); 7646 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 7647