1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <[email protected]> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/trace_clock.h> 8 #include <linux/ftrace_irq.h> 9 #include <linux/spinlock.h> 10 #include <linux/debugfs.h> 11 #include <linux/uaccess.h> 12 #include <linux/hardirq.h> 13 #include <linux/kmemcheck.h> 14 #include <linux/module.h> 15 #include <linux/percpu.h> 16 #include <linux/mutex.h> 17 #include <linux/init.h> 18 #include <linux/hash.h> 19 #include <linux/list.h> 20 #include <linux/cpu.h> 21 #include <linux/fs.h> 22 23 #include <asm/local.h> 24 #include "trace.h" 25 26 /* 27 * The ring buffer header is special. We must manually up keep it. 28 */ 29 int ring_buffer_print_entry_header(struct trace_seq *s) 30 { 31 int ret; 32 33 ret = trace_seq_printf(s, "# compressed entry header\n"); 34 ret = trace_seq_printf(s, "\ttype_len : 5 bits\n"); 35 ret = trace_seq_printf(s, "\ttime_delta : 27 bits\n"); 36 ret = trace_seq_printf(s, "\tarray : 32 bits\n"); 37 ret = trace_seq_printf(s, "\n"); 38 ret = trace_seq_printf(s, "\tpadding : type == %d\n", 39 RINGBUF_TYPE_PADDING); 40 ret = trace_seq_printf(s, "\ttime_extend : type == %d\n", 41 RINGBUF_TYPE_TIME_EXTEND); 42 ret = trace_seq_printf(s, "\tdata max type_len == %d\n", 43 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 44 45 return ret; 46 } 47 48 /* 49 * The ring buffer is made up of a list of pages. A separate list of pages is 50 * allocated for each CPU. A writer may only write to a buffer that is 51 * associated with the CPU it is currently executing on. A reader may read 52 * from any per cpu buffer. 53 * 54 * The reader is special. For each per cpu buffer, the reader has its own 55 * reader page. When a reader has read the entire reader page, this reader 56 * page is swapped with another page in the ring buffer. 57 * 58 * Now, as long as the writer is off the reader page, the reader can do what 59 * ever it wants with that page. The writer will never write to that page 60 * again (as long as it is out of the ring buffer). 61 * 62 * Here's some silly ASCII art. 63 * 64 * +------+ 65 * |reader| RING BUFFER 66 * |page | 67 * +------+ +---+ +---+ +---+ 68 * | |-->| |-->| | 69 * +---+ +---+ +---+ 70 * ^ | 71 * | | 72 * +---------------+ 73 * 74 * 75 * +------+ 76 * |reader| RING BUFFER 77 * |page |------------------v 78 * +------+ +---+ +---+ +---+ 79 * | |-->| |-->| | 80 * +---+ +---+ +---+ 81 * ^ | 82 * | | 83 * +---------------+ 84 * 85 * 86 * +------+ 87 * |reader| RING BUFFER 88 * |page |------------------v 89 * +------+ +---+ +---+ +---+ 90 * ^ | |-->| |-->| | 91 * | +---+ +---+ +---+ 92 * | | 93 * | | 94 * +------------------------------+ 95 * 96 * 97 * +------+ 98 * |buffer| RING BUFFER 99 * |page |------------------v 100 * +------+ +---+ +---+ +---+ 101 * ^ | | | |-->| | 102 * | New +---+ +---+ +---+ 103 * | Reader------^ | 104 * | page | 105 * +------------------------------+ 106 * 107 * 108 * After we make this swap, the reader can hand this page off to the splice 109 * code and be done with it. It can even allocate a new page if it needs to 110 * and swap that into the ring buffer. 111 * 112 * We will be using cmpxchg soon to make all this lockless. 113 * 114 */ 115 116 /* 117 * A fast way to enable or disable all ring buffers is to 118 * call tracing_on or tracing_off. Turning off the ring buffers 119 * prevents all ring buffers from being recorded to. 120 * Turning this switch on, makes it OK to write to the 121 * ring buffer, if the ring buffer is enabled itself. 122 * 123 * There's three layers that must be on in order to write 124 * to the ring buffer. 125 * 126 * 1) This global flag must be set. 127 * 2) The ring buffer must be enabled for recording. 128 * 3) The per cpu buffer must be enabled for recording. 129 * 130 * In case of an anomaly, this global flag has a bit set that 131 * will permantly disable all ring buffers. 132 */ 133 134 /* 135 * Global flag to disable all recording to ring buffers 136 * This has two bits: ON, DISABLED 137 * 138 * ON DISABLED 139 * ---- ---------- 140 * 0 0 : ring buffers are off 141 * 1 0 : ring buffers are on 142 * X 1 : ring buffers are permanently disabled 143 */ 144 145 enum { 146 RB_BUFFERS_ON_BIT = 0, 147 RB_BUFFERS_DISABLED_BIT = 1, 148 }; 149 150 enum { 151 RB_BUFFERS_ON = 1 << RB_BUFFERS_ON_BIT, 152 RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT, 153 }; 154 155 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; 156 157 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 158 159 /** 160 * tracing_on - enable all tracing buffers 161 * 162 * This function enables all tracing buffers that may have been 163 * disabled with tracing_off. 164 */ 165 void tracing_on(void) 166 { 167 set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 168 } 169 EXPORT_SYMBOL_GPL(tracing_on); 170 171 /** 172 * tracing_off - turn off all tracing buffers 173 * 174 * This function stops all tracing buffers from recording data. 175 * It does not disable any overhead the tracers themselves may 176 * be causing. This function simply causes all recording to 177 * the ring buffers to fail. 178 */ 179 void tracing_off(void) 180 { 181 clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 182 } 183 EXPORT_SYMBOL_GPL(tracing_off); 184 185 /** 186 * tracing_off_permanent - permanently disable ring buffers 187 * 188 * This function, once called, will disable all ring buffers 189 * permanently. 190 */ 191 void tracing_off_permanent(void) 192 { 193 set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags); 194 } 195 196 /** 197 * tracing_is_on - show state of ring buffers enabled 198 */ 199 int tracing_is_on(void) 200 { 201 return ring_buffer_flags == RB_BUFFERS_ON; 202 } 203 EXPORT_SYMBOL_GPL(tracing_is_on); 204 205 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 206 #define RB_ALIGNMENT 4U 207 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 208 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 209 210 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 211 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 212 213 enum { 214 RB_LEN_TIME_EXTEND = 8, 215 RB_LEN_TIME_STAMP = 16, 216 }; 217 218 static inline int rb_null_event(struct ring_buffer_event *event) 219 { 220 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 221 } 222 223 static void rb_event_set_padding(struct ring_buffer_event *event) 224 { 225 /* padding has a NULL time_delta */ 226 event->type_len = RINGBUF_TYPE_PADDING; 227 event->time_delta = 0; 228 } 229 230 static unsigned 231 rb_event_data_length(struct ring_buffer_event *event) 232 { 233 unsigned length; 234 235 if (event->type_len) 236 length = event->type_len * RB_ALIGNMENT; 237 else 238 length = event->array[0]; 239 return length + RB_EVNT_HDR_SIZE; 240 } 241 242 /* inline for ring buffer fast paths */ 243 static unsigned 244 rb_event_length(struct ring_buffer_event *event) 245 { 246 switch (event->type_len) { 247 case RINGBUF_TYPE_PADDING: 248 if (rb_null_event(event)) 249 /* undefined */ 250 return -1; 251 return event->array[0] + RB_EVNT_HDR_SIZE; 252 253 case RINGBUF_TYPE_TIME_EXTEND: 254 return RB_LEN_TIME_EXTEND; 255 256 case RINGBUF_TYPE_TIME_STAMP: 257 return RB_LEN_TIME_STAMP; 258 259 case RINGBUF_TYPE_DATA: 260 return rb_event_data_length(event); 261 default: 262 BUG(); 263 } 264 /* not hit */ 265 return 0; 266 } 267 268 /** 269 * ring_buffer_event_length - return the length of the event 270 * @event: the event to get the length of 271 */ 272 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 273 { 274 unsigned length = rb_event_length(event); 275 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 276 return length; 277 length -= RB_EVNT_HDR_SIZE; 278 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 279 length -= sizeof(event->array[0]); 280 return length; 281 } 282 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 283 284 /* inline for ring buffer fast paths */ 285 static void * 286 rb_event_data(struct ring_buffer_event *event) 287 { 288 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 289 /* If length is in len field, then array[0] has the data */ 290 if (event->type_len) 291 return (void *)&event->array[0]; 292 /* Otherwise length is in array[0] and array[1] has the data */ 293 return (void *)&event->array[1]; 294 } 295 296 /** 297 * ring_buffer_event_data - return the data of the event 298 * @event: the event to get the data from 299 */ 300 void *ring_buffer_event_data(struct ring_buffer_event *event) 301 { 302 return rb_event_data(event); 303 } 304 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 305 306 #define for_each_buffer_cpu(buffer, cpu) \ 307 for_each_cpu(cpu, buffer->cpumask) 308 309 #define TS_SHIFT 27 310 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 311 #define TS_DELTA_TEST (~TS_MASK) 312 313 struct buffer_data_page { 314 u64 time_stamp; /* page time stamp */ 315 local_t commit; /* write committed index */ 316 unsigned char data[]; /* data of buffer page */ 317 }; 318 319 /* 320 * Note, the buffer_page list must be first. The buffer pages 321 * are allocated in cache lines, which means that each buffer 322 * page will be at the beginning of a cache line, and thus 323 * the least significant bits will be zero. We use this to 324 * add flags in the list struct pointers, to make the ring buffer 325 * lockless. 326 */ 327 struct buffer_page { 328 struct list_head list; /* list of buffer pages */ 329 local_t write; /* index for next write */ 330 unsigned read; /* index for next read */ 331 local_t entries; /* entries on this page */ 332 struct buffer_data_page *page; /* Actual data page */ 333 }; 334 335 /* 336 * The buffer page counters, write and entries, must be reset 337 * atomically when crossing page boundaries. To synchronize this 338 * update, two counters are inserted into the number. One is 339 * the actual counter for the write position or count on the page. 340 * 341 * The other is a counter of updaters. Before an update happens 342 * the update partition of the counter is incremented. This will 343 * allow the updater to update the counter atomically. 344 * 345 * The counter is 20 bits, and the state data is 12. 346 */ 347 #define RB_WRITE_MASK 0xfffff 348 #define RB_WRITE_INTCNT (1 << 20) 349 350 static void rb_init_page(struct buffer_data_page *bpage) 351 { 352 local_set(&bpage->commit, 0); 353 } 354 355 /** 356 * ring_buffer_page_len - the size of data on the page. 357 * @page: The page to read 358 * 359 * Returns the amount of data on the page, including buffer page header. 360 */ 361 size_t ring_buffer_page_len(void *page) 362 { 363 return local_read(&((struct buffer_data_page *)page)->commit) 364 + BUF_PAGE_HDR_SIZE; 365 } 366 367 /* 368 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 369 * this issue out. 370 */ 371 static void free_buffer_page(struct buffer_page *bpage) 372 { 373 free_page((unsigned long)bpage->page); 374 kfree(bpage); 375 } 376 377 /* 378 * We need to fit the time_stamp delta into 27 bits. 379 */ 380 static inline int test_time_stamp(u64 delta) 381 { 382 if (delta & TS_DELTA_TEST) 383 return 1; 384 return 0; 385 } 386 387 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 388 389 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 390 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 391 392 /* Max number of timestamps that can fit on a page */ 393 #define RB_TIMESTAMPS_PER_PAGE (BUF_PAGE_SIZE / RB_LEN_TIME_STAMP) 394 395 int ring_buffer_print_page_header(struct trace_seq *s) 396 { 397 struct buffer_data_page field; 398 int ret; 399 400 ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t" 401 "offset:0;\tsize:%u;\tsigned:%u;\n", 402 (unsigned int)sizeof(field.time_stamp), 403 (unsigned int)is_signed_type(u64)); 404 405 ret = trace_seq_printf(s, "\tfield: local_t commit;\t" 406 "offset:%u;\tsize:%u;\tsigned:%u;\n", 407 (unsigned int)offsetof(typeof(field), commit), 408 (unsigned int)sizeof(field.commit), 409 (unsigned int)is_signed_type(long)); 410 411 ret = trace_seq_printf(s, "\tfield: char data;\t" 412 "offset:%u;\tsize:%u;\tsigned:%u;\n", 413 (unsigned int)offsetof(typeof(field), data), 414 (unsigned int)BUF_PAGE_SIZE, 415 (unsigned int)is_signed_type(char)); 416 417 return ret; 418 } 419 420 /* 421 * head_page == tail_page && head == tail then buffer is empty. 422 */ 423 struct ring_buffer_per_cpu { 424 int cpu; 425 struct ring_buffer *buffer; 426 spinlock_t reader_lock; /* serialize readers */ 427 arch_spinlock_t lock; 428 struct lock_class_key lock_key; 429 struct list_head *pages; 430 struct buffer_page *head_page; /* read from head */ 431 struct buffer_page *tail_page; /* write to tail */ 432 struct buffer_page *commit_page; /* committed pages */ 433 struct buffer_page *reader_page; 434 local_t commit_overrun; 435 local_t overrun; 436 local_t entries; 437 local_t committing; 438 local_t commits; 439 unsigned long read; 440 u64 write_stamp; 441 u64 read_stamp; 442 atomic_t record_disabled; 443 }; 444 445 struct ring_buffer { 446 unsigned pages; 447 unsigned flags; 448 int cpus; 449 atomic_t record_disabled; 450 cpumask_var_t cpumask; 451 452 struct lock_class_key *reader_lock_key; 453 454 struct mutex mutex; 455 456 struct ring_buffer_per_cpu **buffers; 457 458 #ifdef CONFIG_HOTPLUG_CPU 459 struct notifier_block cpu_notify; 460 #endif 461 u64 (*clock)(void); 462 }; 463 464 struct ring_buffer_iter { 465 struct ring_buffer_per_cpu *cpu_buffer; 466 unsigned long head; 467 struct buffer_page *head_page; 468 struct buffer_page *cache_reader_page; 469 unsigned long cache_read; 470 u64 read_stamp; 471 }; 472 473 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 474 #define RB_WARN_ON(b, cond) \ 475 ({ \ 476 int _____ret = unlikely(cond); \ 477 if (_____ret) { \ 478 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 479 struct ring_buffer_per_cpu *__b = \ 480 (void *)b; \ 481 atomic_inc(&__b->buffer->record_disabled); \ 482 } else \ 483 atomic_inc(&b->record_disabled); \ 484 WARN_ON(1); \ 485 } \ 486 _____ret; \ 487 }) 488 489 /* Up this if you want to test the TIME_EXTENTS and normalization */ 490 #define DEBUG_SHIFT 0 491 492 static inline u64 rb_time_stamp(struct ring_buffer *buffer) 493 { 494 /* shift to debug/test normalization and TIME_EXTENTS */ 495 return buffer->clock() << DEBUG_SHIFT; 496 } 497 498 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu) 499 { 500 u64 time; 501 502 preempt_disable_notrace(); 503 time = rb_time_stamp(buffer); 504 preempt_enable_no_resched_notrace(); 505 506 return time; 507 } 508 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 509 510 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer, 511 int cpu, u64 *ts) 512 { 513 /* Just stupid testing the normalize function and deltas */ 514 *ts >>= DEBUG_SHIFT; 515 } 516 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 517 518 /* 519 * Making the ring buffer lockless makes things tricky. 520 * Although writes only happen on the CPU that they are on, 521 * and they only need to worry about interrupts. Reads can 522 * happen on any CPU. 523 * 524 * The reader page is always off the ring buffer, but when the 525 * reader finishes with a page, it needs to swap its page with 526 * a new one from the buffer. The reader needs to take from 527 * the head (writes go to the tail). But if a writer is in overwrite 528 * mode and wraps, it must push the head page forward. 529 * 530 * Here lies the problem. 531 * 532 * The reader must be careful to replace only the head page, and 533 * not another one. As described at the top of the file in the 534 * ASCII art, the reader sets its old page to point to the next 535 * page after head. It then sets the page after head to point to 536 * the old reader page. But if the writer moves the head page 537 * during this operation, the reader could end up with the tail. 538 * 539 * We use cmpxchg to help prevent this race. We also do something 540 * special with the page before head. We set the LSB to 1. 541 * 542 * When the writer must push the page forward, it will clear the 543 * bit that points to the head page, move the head, and then set 544 * the bit that points to the new head page. 545 * 546 * We also don't want an interrupt coming in and moving the head 547 * page on another writer. Thus we use the second LSB to catch 548 * that too. Thus: 549 * 550 * head->list->prev->next bit 1 bit 0 551 * ------- ------- 552 * Normal page 0 0 553 * Points to head page 0 1 554 * New head page 1 0 555 * 556 * Note we can not trust the prev pointer of the head page, because: 557 * 558 * +----+ +-----+ +-----+ 559 * | |------>| T |---X--->| N | 560 * | |<------| | | | 561 * +----+ +-----+ +-----+ 562 * ^ ^ | 563 * | +-----+ | | 564 * +----------| R |----------+ | 565 * | |<-----------+ 566 * +-----+ 567 * 568 * Key: ---X--> HEAD flag set in pointer 569 * T Tail page 570 * R Reader page 571 * N Next page 572 * 573 * (see __rb_reserve_next() to see where this happens) 574 * 575 * What the above shows is that the reader just swapped out 576 * the reader page with a page in the buffer, but before it 577 * could make the new header point back to the new page added 578 * it was preempted by a writer. The writer moved forward onto 579 * the new page added by the reader and is about to move forward 580 * again. 581 * 582 * You can see, it is legitimate for the previous pointer of 583 * the head (or any page) not to point back to itself. But only 584 * temporarially. 585 */ 586 587 #define RB_PAGE_NORMAL 0UL 588 #define RB_PAGE_HEAD 1UL 589 #define RB_PAGE_UPDATE 2UL 590 591 592 #define RB_FLAG_MASK 3UL 593 594 /* PAGE_MOVED is not part of the mask */ 595 #define RB_PAGE_MOVED 4UL 596 597 /* 598 * rb_list_head - remove any bit 599 */ 600 static struct list_head *rb_list_head(struct list_head *list) 601 { 602 unsigned long val = (unsigned long)list; 603 604 return (struct list_head *)(val & ~RB_FLAG_MASK); 605 } 606 607 /* 608 * rb_is_head_page - test if the given page is the head page 609 * 610 * Because the reader may move the head_page pointer, we can 611 * not trust what the head page is (it may be pointing to 612 * the reader page). But if the next page is a header page, 613 * its flags will be non zero. 614 */ 615 static int inline 616 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer, 617 struct buffer_page *page, struct list_head *list) 618 { 619 unsigned long val; 620 621 val = (unsigned long)list->next; 622 623 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 624 return RB_PAGE_MOVED; 625 626 return val & RB_FLAG_MASK; 627 } 628 629 /* 630 * rb_is_reader_page 631 * 632 * The unique thing about the reader page, is that, if the 633 * writer is ever on it, the previous pointer never points 634 * back to the reader page. 635 */ 636 static int rb_is_reader_page(struct buffer_page *page) 637 { 638 struct list_head *list = page->list.prev; 639 640 return rb_list_head(list->next) != &page->list; 641 } 642 643 /* 644 * rb_set_list_to_head - set a list_head to be pointing to head. 645 */ 646 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer, 647 struct list_head *list) 648 { 649 unsigned long *ptr; 650 651 ptr = (unsigned long *)&list->next; 652 *ptr |= RB_PAGE_HEAD; 653 *ptr &= ~RB_PAGE_UPDATE; 654 } 655 656 /* 657 * rb_head_page_activate - sets up head page 658 */ 659 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 660 { 661 struct buffer_page *head; 662 663 head = cpu_buffer->head_page; 664 if (!head) 665 return; 666 667 /* 668 * Set the previous list pointer to have the HEAD flag. 669 */ 670 rb_set_list_to_head(cpu_buffer, head->list.prev); 671 } 672 673 static void rb_list_head_clear(struct list_head *list) 674 { 675 unsigned long *ptr = (unsigned long *)&list->next; 676 677 *ptr &= ~RB_FLAG_MASK; 678 } 679 680 /* 681 * rb_head_page_dactivate - clears head page ptr (for free list) 682 */ 683 static void 684 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 685 { 686 struct list_head *hd; 687 688 /* Go through the whole list and clear any pointers found. */ 689 rb_list_head_clear(cpu_buffer->pages); 690 691 list_for_each(hd, cpu_buffer->pages) 692 rb_list_head_clear(hd); 693 } 694 695 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 696 struct buffer_page *head, 697 struct buffer_page *prev, 698 int old_flag, int new_flag) 699 { 700 struct list_head *list; 701 unsigned long val = (unsigned long)&head->list; 702 unsigned long ret; 703 704 list = &prev->list; 705 706 val &= ~RB_FLAG_MASK; 707 708 ret = cmpxchg((unsigned long *)&list->next, 709 val | old_flag, val | new_flag); 710 711 /* check if the reader took the page */ 712 if ((ret & ~RB_FLAG_MASK) != val) 713 return RB_PAGE_MOVED; 714 715 return ret & RB_FLAG_MASK; 716 } 717 718 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 719 struct buffer_page *head, 720 struct buffer_page *prev, 721 int old_flag) 722 { 723 return rb_head_page_set(cpu_buffer, head, prev, 724 old_flag, RB_PAGE_UPDATE); 725 } 726 727 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 728 struct buffer_page *head, 729 struct buffer_page *prev, 730 int old_flag) 731 { 732 return rb_head_page_set(cpu_buffer, head, prev, 733 old_flag, RB_PAGE_HEAD); 734 } 735 736 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 737 struct buffer_page *head, 738 struct buffer_page *prev, 739 int old_flag) 740 { 741 return rb_head_page_set(cpu_buffer, head, prev, 742 old_flag, RB_PAGE_NORMAL); 743 } 744 745 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 746 struct buffer_page **bpage) 747 { 748 struct list_head *p = rb_list_head((*bpage)->list.next); 749 750 *bpage = list_entry(p, struct buffer_page, list); 751 } 752 753 static struct buffer_page * 754 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 755 { 756 struct buffer_page *head; 757 struct buffer_page *page; 758 struct list_head *list; 759 int i; 760 761 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 762 return NULL; 763 764 /* sanity check */ 765 list = cpu_buffer->pages; 766 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 767 return NULL; 768 769 page = head = cpu_buffer->head_page; 770 /* 771 * It is possible that the writer moves the header behind 772 * where we started, and we miss in one loop. 773 * A second loop should grab the header, but we'll do 774 * three loops just because I'm paranoid. 775 */ 776 for (i = 0; i < 3; i++) { 777 do { 778 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) { 779 cpu_buffer->head_page = page; 780 return page; 781 } 782 rb_inc_page(cpu_buffer, &page); 783 } while (page != head); 784 } 785 786 RB_WARN_ON(cpu_buffer, 1); 787 788 return NULL; 789 } 790 791 static int rb_head_page_replace(struct buffer_page *old, 792 struct buffer_page *new) 793 { 794 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 795 unsigned long val; 796 unsigned long ret; 797 798 val = *ptr & ~RB_FLAG_MASK; 799 val |= RB_PAGE_HEAD; 800 801 ret = cmpxchg(ptr, val, (unsigned long)&new->list); 802 803 return ret == val; 804 } 805 806 /* 807 * rb_tail_page_update - move the tail page forward 808 * 809 * Returns 1 if moved tail page, 0 if someone else did. 810 */ 811 static int rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 812 struct buffer_page *tail_page, 813 struct buffer_page *next_page) 814 { 815 struct buffer_page *old_tail; 816 unsigned long old_entries; 817 unsigned long old_write; 818 int ret = 0; 819 820 /* 821 * The tail page now needs to be moved forward. 822 * 823 * We need to reset the tail page, but without messing 824 * with possible erasing of data brought in by interrupts 825 * that have moved the tail page and are currently on it. 826 * 827 * We add a counter to the write field to denote this. 828 */ 829 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 830 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 831 832 /* 833 * Just make sure we have seen our old_write and synchronize 834 * with any interrupts that come in. 835 */ 836 barrier(); 837 838 /* 839 * If the tail page is still the same as what we think 840 * it is, then it is up to us to update the tail 841 * pointer. 842 */ 843 if (tail_page == cpu_buffer->tail_page) { 844 /* Zero the write counter */ 845 unsigned long val = old_write & ~RB_WRITE_MASK; 846 unsigned long eval = old_entries & ~RB_WRITE_MASK; 847 848 /* 849 * This will only succeed if an interrupt did 850 * not come in and change it. In which case, we 851 * do not want to modify it. 852 * 853 * We add (void) to let the compiler know that we do not care 854 * about the return value of these functions. We use the 855 * cmpxchg to only update if an interrupt did not already 856 * do it for us. If the cmpxchg fails, we don't care. 857 */ 858 (void)local_cmpxchg(&next_page->write, old_write, val); 859 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 860 861 /* 862 * No need to worry about races with clearing out the commit. 863 * it only can increment when a commit takes place. But that 864 * only happens in the outer most nested commit. 865 */ 866 local_set(&next_page->page->commit, 0); 867 868 old_tail = cmpxchg(&cpu_buffer->tail_page, 869 tail_page, next_page); 870 871 if (old_tail == tail_page) 872 ret = 1; 873 } 874 875 return ret; 876 } 877 878 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 879 struct buffer_page *bpage) 880 { 881 unsigned long val = (unsigned long)bpage; 882 883 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK)) 884 return 1; 885 886 return 0; 887 } 888 889 /** 890 * rb_check_list - make sure a pointer to a list has the last bits zero 891 */ 892 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer, 893 struct list_head *list) 894 { 895 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev)) 896 return 1; 897 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next)) 898 return 1; 899 return 0; 900 } 901 902 /** 903 * check_pages - integrity check of buffer pages 904 * @cpu_buffer: CPU buffer with pages to test 905 * 906 * As a safety measure we check to make sure the data pages have not 907 * been corrupted. 908 */ 909 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 910 { 911 struct list_head *head = cpu_buffer->pages; 912 struct buffer_page *bpage, *tmp; 913 914 rb_head_page_deactivate(cpu_buffer); 915 916 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 917 return -1; 918 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 919 return -1; 920 921 if (rb_check_list(cpu_buffer, head)) 922 return -1; 923 924 list_for_each_entry_safe(bpage, tmp, head, list) { 925 if (RB_WARN_ON(cpu_buffer, 926 bpage->list.next->prev != &bpage->list)) 927 return -1; 928 if (RB_WARN_ON(cpu_buffer, 929 bpage->list.prev->next != &bpage->list)) 930 return -1; 931 if (rb_check_list(cpu_buffer, &bpage->list)) 932 return -1; 933 } 934 935 rb_head_page_activate(cpu_buffer); 936 937 return 0; 938 } 939 940 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 941 unsigned nr_pages) 942 { 943 struct buffer_page *bpage, *tmp; 944 unsigned long addr; 945 LIST_HEAD(pages); 946 unsigned i; 947 948 WARN_ON(!nr_pages); 949 950 for (i = 0; i < nr_pages; i++) { 951 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 952 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); 953 if (!bpage) 954 goto free_pages; 955 956 rb_check_bpage(cpu_buffer, bpage); 957 958 list_add(&bpage->list, &pages); 959 960 addr = __get_free_page(GFP_KERNEL); 961 if (!addr) 962 goto free_pages; 963 bpage->page = (void *)addr; 964 rb_init_page(bpage->page); 965 } 966 967 /* 968 * The ring buffer page list is a circular list that does not 969 * start and end with a list head. All page list items point to 970 * other pages. 971 */ 972 cpu_buffer->pages = pages.next; 973 list_del(&pages); 974 975 rb_check_pages(cpu_buffer); 976 977 return 0; 978 979 free_pages: 980 list_for_each_entry_safe(bpage, tmp, &pages, list) { 981 list_del_init(&bpage->list); 982 free_buffer_page(bpage); 983 } 984 return -ENOMEM; 985 } 986 987 static struct ring_buffer_per_cpu * 988 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) 989 { 990 struct ring_buffer_per_cpu *cpu_buffer; 991 struct buffer_page *bpage; 992 unsigned long addr; 993 int ret; 994 995 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 996 GFP_KERNEL, cpu_to_node(cpu)); 997 if (!cpu_buffer) 998 return NULL; 999 1000 cpu_buffer->cpu = cpu; 1001 cpu_buffer->buffer = buffer; 1002 spin_lock_init(&cpu_buffer->reader_lock); 1003 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1004 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1005 1006 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1007 GFP_KERNEL, cpu_to_node(cpu)); 1008 if (!bpage) 1009 goto fail_free_buffer; 1010 1011 rb_check_bpage(cpu_buffer, bpage); 1012 1013 cpu_buffer->reader_page = bpage; 1014 addr = __get_free_page(GFP_KERNEL); 1015 if (!addr) 1016 goto fail_free_reader; 1017 bpage->page = (void *)addr; 1018 rb_init_page(bpage->page); 1019 1020 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1021 1022 ret = rb_allocate_pages(cpu_buffer, buffer->pages); 1023 if (ret < 0) 1024 goto fail_free_reader; 1025 1026 cpu_buffer->head_page 1027 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1028 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1029 1030 rb_head_page_activate(cpu_buffer); 1031 1032 return cpu_buffer; 1033 1034 fail_free_reader: 1035 free_buffer_page(cpu_buffer->reader_page); 1036 1037 fail_free_buffer: 1038 kfree(cpu_buffer); 1039 return NULL; 1040 } 1041 1042 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1043 { 1044 struct list_head *head = cpu_buffer->pages; 1045 struct buffer_page *bpage, *tmp; 1046 1047 free_buffer_page(cpu_buffer->reader_page); 1048 1049 rb_head_page_deactivate(cpu_buffer); 1050 1051 if (head) { 1052 list_for_each_entry_safe(bpage, tmp, head, list) { 1053 list_del_init(&bpage->list); 1054 free_buffer_page(bpage); 1055 } 1056 bpage = list_entry(head, struct buffer_page, list); 1057 free_buffer_page(bpage); 1058 } 1059 1060 kfree(cpu_buffer); 1061 } 1062 1063 #ifdef CONFIG_HOTPLUG_CPU 1064 static int rb_cpu_notify(struct notifier_block *self, 1065 unsigned long action, void *hcpu); 1066 #endif 1067 1068 /** 1069 * ring_buffer_alloc - allocate a new ring_buffer 1070 * @size: the size in bytes per cpu that is needed. 1071 * @flags: attributes to set for the ring buffer. 1072 * 1073 * Currently the only flag that is available is the RB_FL_OVERWRITE 1074 * flag. This flag means that the buffer will overwrite old data 1075 * when the buffer wraps. If this flag is not set, the buffer will 1076 * drop data when the tail hits the head. 1077 */ 1078 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1079 struct lock_class_key *key) 1080 { 1081 struct ring_buffer *buffer; 1082 int bsize; 1083 int cpu; 1084 1085 /* keep it in its own cache line */ 1086 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1087 GFP_KERNEL); 1088 if (!buffer) 1089 return NULL; 1090 1091 if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1092 goto fail_free_buffer; 1093 1094 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1095 buffer->flags = flags; 1096 buffer->clock = trace_clock_local; 1097 buffer->reader_lock_key = key; 1098 1099 /* need at least two pages */ 1100 if (buffer->pages < 2) 1101 buffer->pages = 2; 1102 1103 /* 1104 * In case of non-hotplug cpu, if the ring-buffer is allocated 1105 * in early initcall, it will not be notified of secondary cpus. 1106 * In that off case, we need to allocate for all possible cpus. 1107 */ 1108 #ifdef CONFIG_HOTPLUG_CPU 1109 get_online_cpus(); 1110 cpumask_copy(buffer->cpumask, cpu_online_mask); 1111 #else 1112 cpumask_copy(buffer->cpumask, cpu_possible_mask); 1113 #endif 1114 buffer->cpus = nr_cpu_ids; 1115 1116 bsize = sizeof(void *) * nr_cpu_ids; 1117 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1118 GFP_KERNEL); 1119 if (!buffer->buffers) 1120 goto fail_free_cpumask; 1121 1122 for_each_buffer_cpu(buffer, cpu) { 1123 buffer->buffers[cpu] = 1124 rb_allocate_cpu_buffer(buffer, cpu); 1125 if (!buffer->buffers[cpu]) 1126 goto fail_free_buffers; 1127 } 1128 1129 #ifdef CONFIG_HOTPLUG_CPU 1130 buffer->cpu_notify.notifier_call = rb_cpu_notify; 1131 buffer->cpu_notify.priority = 0; 1132 register_cpu_notifier(&buffer->cpu_notify); 1133 #endif 1134 1135 put_online_cpus(); 1136 mutex_init(&buffer->mutex); 1137 1138 return buffer; 1139 1140 fail_free_buffers: 1141 for_each_buffer_cpu(buffer, cpu) { 1142 if (buffer->buffers[cpu]) 1143 rb_free_cpu_buffer(buffer->buffers[cpu]); 1144 } 1145 kfree(buffer->buffers); 1146 1147 fail_free_cpumask: 1148 free_cpumask_var(buffer->cpumask); 1149 put_online_cpus(); 1150 1151 fail_free_buffer: 1152 kfree(buffer); 1153 return NULL; 1154 } 1155 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1156 1157 /** 1158 * ring_buffer_free - free a ring buffer. 1159 * @buffer: the buffer to free. 1160 */ 1161 void 1162 ring_buffer_free(struct ring_buffer *buffer) 1163 { 1164 int cpu; 1165 1166 get_online_cpus(); 1167 1168 #ifdef CONFIG_HOTPLUG_CPU 1169 unregister_cpu_notifier(&buffer->cpu_notify); 1170 #endif 1171 1172 for_each_buffer_cpu(buffer, cpu) 1173 rb_free_cpu_buffer(buffer->buffers[cpu]); 1174 1175 put_online_cpus(); 1176 1177 kfree(buffer->buffers); 1178 free_cpumask_var(buffer->cpumask); 1179 1180 kfree(buffer); 1181 } 1182 EXPORT_SYMBOL_GPL(ring_buffer_free); 1183 1184 void ring_buffer_set_clock(struct ring_buffer *buffer, 1185 u64 (*clock)(void)) 1186 { 1187 buffer->clock = clock; 1188 } 1189 1190 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1191 1192 static void 1193 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) 1194 { 1195 struct buffer_page *bpage; 1196 struct list_head *p; 1197 unsigned i; 1198 1199 spin_lock_irq(&cpu_buffer->reader_lock); 1200 rb_head_page_deactivate(cpu_buffer); 1201 1202 for (i = 0; i < nr_pages; i++) { 1203 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) 1204 return; 1205 p = cpu_buffer->pages->next; 1206 bpage = list_entry(p, struct buffer_page, list); 1207 list_del_init(&bpage->list); 1208 free_buffer_page(bpage); 1209 } 1210 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) 1211 return; 1212 1213 rb_reset_cpu(cpu_buffer); 1214 rb_check_pages(cpu_buffer); 1215 1216 spin_unlock_irq(&cpu_buffer->reader_lock); 1217 } 1218 1219 static void 1220 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, 1221 struct list_head *pages, unsigned nr_pages) 1222 { 1223 struct buffer_page *bpage; 1224 struct list_head *p; 1225 unsigned i; 1226 1227 spin_lock_irq(&cpu_buffer->reader_lock); 1228 rb_head_page_deactivate(cpu_buffer); 1229 1230 for (i = 0; i < nr_pages; i++) { 1231 if (RB_WARN_ON(cpu_buffer, list_empty(pages))) 1232 return; 1233 p = pages->next; 1234 bpage = list_entry(p, struct buffer_page, list); 1235 list_del_init(&bpage->list); 1236 list_add_tail(&bpage->list, cpu_buffer->pages); 1237 } 1238 rb_reset_cpu(cpu_buffer); 1239 rb_check_pages(cpu_buffer); 1240 1241 spin_unlock_irq(&cpu_buffer->reader_lock); 1242 } 1243 1244 /** 1245 * ring_buffer_resize - resize the ring buffer 1246 * @buffer: the buffer to resize. 1247 * @size: the new size. 1248 * 1249 * Minimum size is 2 * BUF_PAGE_SIZE. 1250 * 1251 * Returns -1 on failure. 1252 */ 1253 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) 1254 { 1255 struct ring_buffer_per_cpu *cpu_buffer; 1256 unsigned nr_pages, rm_pages, new_pages; 1257 struct buffer_page *bpage, *tmp; 1258 unsigned long buffer_size; 1259 unsigned long addr; 1260 LIST_HEAD(pages); 1261 int i, cpu; 1262 1263 /* 1264 * Always succeed at resizing a non-existent buffer: 1265 */ 1266 if (!buffer) 1267 return size; 1268 1269 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1270 size *= BUF_PAGE_SIZE; 1271 buffer_size = buffer->pages * BUF_PAGE_SIZE; 1272 1273 /* we need a minimum of two pages */ 1274 if (size < BUF_PAGE_SIZE * 2) 1275 size = BUF_PAGE_SIZE * 2; 1276 1277 if (size == buffer_size) 1278 return size; 1279 1280 atomic_inc(&buffer->record_disabled); 1281 1282 /* Make sure all writers are done with this buffer. */ 1283 synchronize_sched(); 1284 1285 mutex_lock(&buffer->mutex); 1286 get_online_cpus(); 1287 1288 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1289 1290 if (size < buffer_size) { 1291 1292 /* easy case, just free pages */ 1293 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) 1294 goto out_fail; 1295 1296 rm_pages = buffer->pages - nr_pages; 1297 1298 for_each_buffer_cpu(buffer, cpu) { 1299 cpu_buffer = buffer->buffers[cpu]; 1300 rb_remove_pages(cpu_buffer, rm_pages); 1301 } 1302 goto out; 1303 } 1304 1305 /* 1306 * This is a bit more difficult. We only want to add pages 1307 * when we can allocate enough for all CPUs. We do this 1308 * by allocating all the pages and storing them on a local 1309 * link list. If we succeed in our allocation, then we 1310 * add these pages to the cpu_buffers. Otherwise we just free 1311 * them all and return -ENOMEM; 1312 */ 1313 if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) 1314 goto out_fail; 1315 1316 new_pages = nr_pages - buffer->pages; 1317 1318 for_each_buffer_cpu(buffer, cpu) { 1319 for (i = 0; i < new_pages; i++) { 1320 bpage = kzalloc_node(ALIGN(sizeof(*bpage), 1321 cache_line_size()), 1322 GFP_KERNEL, cpu_to_node(cpu)); 1323 if (!bpage) 1324 goto free_pages; 1325 list_add(&bpage->list, &pages); 1326 addr = __get_free_page(GFP_KERNEL); 1327 if (!addr) 1328 goto free_pages; 1329 bpage->page = (void *)addr; 1330 rb_init_page(bpage->page); 1331 } 1332 } 1333 1334 for_each_buffer_cpu(buffer, cpu) { 1335 cpu_buffer = buffer->buffers[cpu]; 1336 rb_insert_pages(cpu_buffer, &pages, new_pages); 1337 } 1338 1339 if (RB_WARN_ON(buffer, !list_empty(&pages))) 1340 goto out_fail; 1341 1342 out: 1343 buffer->pages = nr_pages; 1344 put_online_cpus(); 1345 mutex_unlock(&buffer->mutex); 1346 1347 atomic_dec(&buffer->record_disabled); 1348 1349 return size; 1350 1351 free_pages: 1352 list_for_each_entry_safe(bpage, tmp, &pages, list) { 1353 list_del_init(&bpage->list); 1354 free_buffer_page(bpage); 1355 } 1356 put_online_cpus(); 1357 mutex_unlock(&buffer->mutex); 1358 atomic_dec(&buffer->record_disabled); 1359 return -ENOMEM; 1360 1361 /* 1362 * Something went totally wrong, and we are too paranoid 1363 * to even clean up the mess. 1364 */ 1365 out_fail: 1366 put_online_cpus(); 1367 mutex_unlock(&buffer->mutex); 1368 atomic_dec(&buffer->record_disabled); 1369 return -1; 1370 } 1371 EXPORT_SYMBOL_GPL(ring_buffer_resize); 1372 1373 static inline void * 1374 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) 1375 { 1376 return bpage->data + index; 1377 } 1378 1379 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 1380 { 1381 return bpage->page->data + index; 1382 } 1383 1384 static inline struct ring_buffer_event * 1385 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 1386 { 1387 return __rb_page_index(cpu_buffer->reader_page, 1388 cpu_buffer->reader_page->read); 1389 } 1390 1391 static inline struct ring_buffer_event * 1392 rb_iter_head_event(struct ring_buffer_iter *iter) 1393 { 1394 return __rb_page_index(iter->head_page, iter->head); 1395 } 1396 1397 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1398 { 1399 return local_read(&bpage->write) & RB_WRITE_MASK; 1400 } 1401 1402 static inline unsigned rb_page_commit(struct buffer_page *bpage) 1403 { 1404 return local_read(&bpage->page->commit); 1405 } 1406 1407 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1408 { 1409 return local_read(&bpage->entries) & RB_WRITE_MASK; 1410 } 1411 1412 /* Size is determined by what has been commited */ 1413 static inline unsigned rb_page_size(struct buffer_page *bpage) 1414 { 1415 return rb_page_commit(bpage); 1416 } 1417 1418 static inline unsigned 1419 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 1420 { 1421 return rb_page_commit(cpu_buffer->commit_page); 1422 } 1423 1424 static inline unsigned 1425 rb_event_index(struct ring_buffer_event *event) 1426 { 1427 unsigned long addr = (unsigned long)event; 1428 1429 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 1430 } 1431 1432 static inline int 1433 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 1434 struct ring_buffer_event *event) 1435 { 1436 unsigned long addr = (unsigned long)event; 1437 unsigned long index; 1438 1439 index = rb_event_index(event); 1440 addr &= PAGE_MASK; 1441 1442 return cpu_buffer->commit_page->page == (void *)addr && 1443 rb_commit_index(cpu_buffer) == index; 1444 } 1445 1446 static void 1447 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 1448 { 1449 unsigned long max_count; 1450 1451 /* 1452 * We only race with interrupts and NMIs on this CPU. 1453 * If we own the commit event, then we can commit 1454 * all others that interrupted us, since the interruptions 1455 * are in stack format (they finish before they come 1456 * back to us). This allows us to do a simple loop to 1457 * assign the commit to the tail. 1458 */ 1459 again: 1460 max_count = cpu_buffer->buffer->pages * 100; 1461 1462 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 1463 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 1464 return; 1465 if (RB_WARN_ON(cpu_buffer, 1466 rb_is_reader_page(cpu_buffer->tail_page))) 1467 return; 1468 local_set(&cpu_buffer->commit_page->page->commit, 1469 rb_page_write(cpu_buffer->commit_page)); 1470 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 1471 cpu_buffer->write_stamp = 1472 cpu_buffer->commit_page->page->time_stamp; 1473 /* add barrier to keep gcc from optimizing too much */ 1474 barrier(); 1475 } 1476 while (rb_commit_index(cpu_buffer) != 1477 rb_page_write(cpu_buffer->commit_page)) { 1478 1479 local_set(&cpu_buffer->commit_page->page->commit, 1480 rb_page_write(cpu_buffer->commit_page)); 1481 RB_WARN_ON(cpu_buffer, 1482 local_read(&cpu_buffer->commit_page->page->commit) & 1483 ~RB_WRITE_MASK); 1484 barrier(); 1485 } 1486 1487 /* again, keep gcc from optimizing */ 1488 barrier(); 1489 1490 /* 1491 * If an interrupt came in just after the first while loop 1492 * and pushed the tail page forward, we will be left with 1493 * a dangling commit that will never go forward. 1494 */ 1495 if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page)) 1496 goto again; 1497 } 1498 1499 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 1500 { 1501 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 1502 cpu_buffer->reader_page->read = 0; 1503 } 1504 1505 static void rb_inc_iter(struct ring_buffer_iter *iter) 1506 { 1507 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1508 1509 /* 1510 * The iterator could be on the reader page (it starts there). 1511 * But the head could have moved, since the reader was 1512 * found. Check for this case and assign the iterator 1513 * to the head page instead of next. 1514 */ 1515 if (iter->head_page == cpu_buffer->reader_page) 1516 iter->head_page = rb_set_head_page(cpu_buffer); 1517 else 1518 rb_inc_page(cpu_buffer, &iter->head_page); 1519 1520 iter->read_stamp = iter->head_page->page->time_stamp; 1521 iter->head = 0; 1522 } 1523 1524 /** 1525 * ring_buffer_update_event - update event type and data 1526 * @event: the even to update 1527 * @type: the type of event 1528 * @length: the size of the event field in the ring buffer 1529 * 1530 * Update the type and data fields of the event. The length 1531 * is the actual size that is written to the ring buffer, 1532 * and with this, we can determine what to place into the 1533 * data field. 1534 */ 1535 static void 1536 rb_update_event(struct ring_buffer_event *event, 1537 unsigned type, unsigned length) 1538 { 1539 event->type_len = type; 1540 1541 switch (type) { 1542 1543 case RINGBUF_TYPE_PADDING: 1544 case RINGBUF_TYPE_TIME_EXTEND: 1545 case RINGBUF_TYPE_TIME_STAMP: 1546 break; 1547 1548 case 0: 1549 length -= RB_EVNT_HDR_SIZE; 1550 if (length > RB_MAX_SMALL_DATA) 1551 event->array[0] = length; 1552 else 1553 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 1554 break; 1555 default: 1556 BUG(); 1557 } 1558 } 1559 1560 /* 1561 * rb_handle_head_page - writer hit the head page 1562 * 1563 * Returns: +1 to retry page 1564 * 0 to continue 1565 * -1 on error 1566 */ 1567 static int 1568 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 1569 struct buffer_page *tail_page, 1570 struct buffer_page *next_page) 1571 { 1572 struct buffer_page *new_head; 1573 int entries; 1574 int type; 1575 int ret; 1576 1577 entries = rb_page_entries(next_page); 1578 1579 /* 1580 * The hard part is here. We need to move the head 1581 * forward, and protect against both readers on 1582 * other CPUs and writers coming in via interrupts. 1583 */ 1584 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 1585 RB_PAGE_HEAD); 1586 1587 /* 1588 * type can be one of four: 1589 * NORMAL - an interrupt already moved it for us 1590 * HEAD - we are the first to get here. 1591 * UPDATE - we are the interrupt interrupting 1592 * a current move. 1593 * MOVED - a reader on another CPU moved the next 1594 * pointer to its reader page. Give up 1595 * and try again. 1596 */ 1597 1598 switch (type) { 1599 case RB_PAGE_HEAD: 1600 /* 1601 * We changed the head to UPDATE, thus 1602 * it is our responsibility to update 1603 * the counters. 1604 */ 1605 local_add(entries, &cpu_buffer->overrun); 1606 1607 /* 1608 * The entries will be zeroed out when we move the 1609 * tail page. 1610 */ 1611 1612 /* still more to do */ 1613 break; 1614 1615 case RB_PAGE_UPDATE: 1616 /* 1617 * This is an interrupt that interrupt the 1618 * previous update. Still more to do. 1619 */ 1620 break; 1621 case RB_PAGE_NORMAL: 1622 /* 1623 * An interrupt came in before the update 1624 * and processed this for us. 1625 * Nothing left to do. 1626 */ 1627 return 1; 1628 case RB_PAGE_MOVED: 1629 /* 1630 * The reader is on another CPU and just did 1631 * a swap with our next_page. 1632 * Try again. 1633 */ 1634 return 1; 1635 default: 1636 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 1637 return -1; 1638 } 1639 1640 /* 1641 * Now that we are here, the old head pointer is 1642 * set to UPDATE. This will keep the reader from 1643 * swapping the head page with the reader page. 1644 * The reader (on another CPU) will spin till 1645 * we are finished. 1646 * 1647 * We just need to protect against interrupts 1648 * doing the job. We will set the next pointer 1649 * to HEAD. After that, we set the old pointer 1650 * to NORMAL, but only if it was HEAD before. 1651 * otherwise we are an interrupt, and only 1652 * want the outer most commit to reset it. 1653 */ 1654 new_head = next_page; 1655 rb_inc_page(cpu_buffer, &new_head); 1656 1657 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 1658 RB_PAGE_NORMAL); 1659 1660 /* 1661 * Valid returns are: 1662 * HEAD - an interrupt came in and already set it. 1663 * NORMAL - One of two things: 1664 * 1) We really set it. 1665 * 2) A bunch of interrupts came in and moved 1666 * the page forward again. 1667 */ 1668 switch (ret) { 1669 case RB_PAGE_HEAD: 1670 case RB_PAGE_NORMAL: 1671 /* OK */ 1672 break; 1673 default: 1674 RB_WARN_ON(cpu_buffer, 1); 1675 return -1; 1676 } 1677 1678 /* 1679 * It is possible that an interrupt came in, 1680 * set the head up, then more interrupts came in 1681 * and moved it again. When we get back here, 1682 * the page would have been set to NORMAL but we 1683 * just set it back to HEAD. 1684 * 1685 * How do you detect this? Well, if that happened 1686 * the tail page would have moved. 1687 */ 1688 if (ret == RB_PAGE_NORMAL) { 1689 /* 1690 * If the tail had moved passed next, then we need 1691 * to reset the pointer. 1692 */ 1693 if (cpu_buffer->tail_page != tail_page && 1694 cpu_buffer->tail_page != next_page) 1695 rb_head_page_set_normal(cpu_buffer, new_head, 1696 next_page, 1697 RB_PAGE_HEAD); 1698 } 1699 1700 /* 1701 * If this was the outer most commit (the one that 1702 * changed the original pointer from HEAD to UPDATE), 1703 * then it is up to us to reset it to NORMAL. 1704 */ 1705 if (type == RB_PAGE_HEAD) { 1706 ret = rb_head_page_set_normal(cpu_buffer, next_page, 1707 tail_page, 1708 RB_PAGE_UPDATE); 1709 if (RB_WARN_ON(cpu_buffer, 1710 ret != RB_PAGE_UPDATE)) 1711 return -1; 1712 } 1713 1714 return 0; 1715 } 1716 1717 static unsigned rb_calculate_event_length(unsigned length) 1718 { 1719 struct ring_buffer_event event; /* Used only for sizeof array */ 1720 1721 /* zero length can cause confusions */ 1722 if (!length) 1723 length = 1; 1724 1725 if (length > RB_MAX_SMALL_DATA) 1726 length += sizeof(event.array[0]); 1727 1728 length += RB_EVNT_HDR_SIZE; 1729 length = ALIGN(length, RB_ALIGNMENT); 1730 1731 return length; 1732 } 1733 1734 static inline void 1735 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 1736 struct buffer_page *tail_page, 1737 unsigned long tail, unsigned long length) 1738 { 1739 struct ring_buffer_event *event; 1740 1741 /* 1742 * Only the event that crossed the page boundary 1743 * must fill the old tail_page with padding. 1744 */ 1745 if (tail >= BUF_PAGE_SIZE) { 1746 local_sub(length, &tail_page->write); 1747 return; 1748 } 1749 1750 event = __rb_page_index(tail_page, tail); 1751 kmemcheck_annotate_bitfield(event, bitfield); 1752 1753 /* 1754 * If this event is bigger than the minimum size, then 1755 * we need to be careful that we don't subtract the 1756 * write counter enough to allow another writer to slip 1757 * in on this page. 1758 * We put in a discarded commit instead, to make sure 1759 * that this space is not used again. 1760 * 1761 * If we are less than the minimum size, we don't need to 1762 * worry about it. 1763 */ 1764 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 1765 /* No room for any events */ 1766 1767 /* Mark the rest of the page with padding */ 1768 rb_event_set_padding(event); 1769 1770 /* Set the write back to the previous setting */ 1771 local_sub(length, &tail_page->write); 1772 return; 1773 } 1774 1775 /* Put in a discarded event */ 1776 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 1777 event->type_len = RINGBUF_TYPE_PADDING; 1778 /* time delta must be non zero */ 1779 event->time_delta = 1; 1780 1781 /* Set write to end of buffer */ 1782 length = (tail + length) - BUF_PAGE_SIZE; 1783 local_sub(length, &tail_page->write); 1784 } 1785 1786 static struct ring_buffer_event * 1787 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 1788 unsigned long length, unsigned long tail, 1789 struct buffer_page *tail_page, u64 *ts) 1790 { 1791 struct buffer_page *commit_page = cpu_buffer->commit_page; 1792 struct ring_buffer *buffer = cpu_buffer->buffer; 1793 struct buffer_page *next_page; 1794 int ret; 1795 1796 next_page = tail_page; 1797 1798 rb_inc_page(cpu_buffer, &next_page); 1799 1800 /* 1801 * If for some reason, we had an interrupt storm that made 1802 * it all the way around the buffer, bail, and warn 1803 * about it. 1804 */ 1805 if (unlikely(next_page == commit_page)) { 1806 local_inc(&cpu_buffer->commit_overrun); 1807 goto out_reset; 1808 } 1809 1810 /* 1811 * This is where the fun begins! 1812 * 1813 * We are fighting against races between a reader that 1814 * could be on another CPU trying to swap its reader 1815 * page with the buffer head. 1816 * 1817 * We are also fighting against interrupts coming in and 1818 * moving the head or tail on us as well. 1819 * 1820 * If the next page is the head page then we have filled 1821 * the buffer, unless the commit page is still on the 1822 * reader page. 1823 */ 1824 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) { 1825 1826 /* 1827 * If the commit is not on the reader page, then 1828 * move the header page. 1829 */ 1830 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 1831 /* 1832 * If we are not in overwrite mode, 1833 * this is easy, just stop here. 1834 */ 1835 if (!(buffer->flags & RB_FL_OVERWRITE)) 1836 goto out_reset; 1837 1838 ret = rb_handle_head_page(cpu_buffer, 1839 tail_page, 1840 next_page); 1841 if (ret < 0) 1842 goto out_reset; 1843 if (ret) 1844 goto out_again; 1845 } else { 1846 /* 1847 * We need to be careful here too. The 1848 * commit page could still be on the reader 1849 * page. We could have a small buffer, and 1850 * have filled up the buffer with events 1851 * from interrupts and such, and wrapped. 1852 * 1853 * Note, if the tail page is also the on the 1854 * reader_page, we let it move out. 1855 */ 1856 if (unlikely((cpu_buffer->commit_page != 1857 cpu_buffer->tail_page) && 1858 (cpu_buffer->commit_page == 1859 cpu_buffer->reader_page))) { 1860 local_inc(&cpu_buffer->commit_overrun); 1861 goto out_reset; 1862 } 1863 } 1864 } 1865 1866 ret = rb_tail_page_update(cpu_buffer, tail_page, next_page); 1867 if (ret) { 1868 /* 1869 * Nested commits always have zero deltas, so 1870 * just reread the time stamp 1871 */ 1872 *ts = rb_time_stamp(buffer); 1873 next_page->page->time_stamp = *ts; 1874 } 1875 1876 out_again: 1877 1878 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1879 1880 /* fail and let the caller try again */ 1881 return ERR_PTR(-EAGAIN); 1882 1883 out_reset: 1884 /* reset write */ 1885 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1886 1887 return NULL; 1888 } 1889 1890 static struct ring_buffer_event * 1891 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 1892 unsigned type, unsigned long length, u64 *ts) 1893 { 1894 struct buffer_page *tail_page; 1895 struct ring_buffer_event *event; 1896 unsigned long tail, write; 1897 1898 tail_page = cpu_buffer->tail_page; 1899 write = local_add_return(length, &tail_page->write); 1900 1901 /* set write to only the index of the write */ 1902 write &= RB_WRITE_MASK; 1903 tail = write - length; 1904 1905 /* See if we shot pass the end of this buffer page */ 1906 if (write > BUF_PAGE_SIZE) 1907 return rb_move_tail(cpu_buffer, length, tail, 1908 tail_page, ts); 1909 1910 /* We reserved something on the buffer */ 1911 1912 event = __rb_page_index(tail_page, tail); 1913 kmemcheck_annotate_bitfield(event, bitfield); 1914 rb_update_event(event, type, length); 1915 1916 /* The passed in type is zero for DATA */ 1917 if (likely(!type)) 1918 local_inc(&tail_page->entries); 1919 1920 /* 1921 * If this is the first commit on the page, then update 1922 * its timestamp. 1923 */ 1924 if (!tail) 1925 tail_page->page->time_stamp = *ts; 1926 1927 return event; 1928 } 1929 1930 static inline int 1931 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 1932 struct ring_buffer_event *event) 1933 { 1934 unsigned long new_index, old_index; 1935 struct buffer_page *bpage; 1936 unsigned long index; 1937 unsigned long addr; 1938 1939 new_index = rb_event_index(event); 1940 old_index = new_index + rb_event_length(event); 1941 addr = (unsigned long)event; 1942 addr &= PAGE_MASK; 1943 1944 bpage = cpu_buffer->tail_page; 1945 1946 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 1947 unsigned long write_mask = 1948 local_read(&bpage->write) & ~RB_WRITE_MASK; 1949 /* 1950 * This is on the tail page. It is possible that 1951 * a write could come in and move the tail page 1952 * and write to the next page. That is fine 1953 * because we just shorten what is on this page. 1954 */ 1955 old_index += write_mask; 1956 new_index += write_mask; 1957 index = local_cmpxchg(&bpage->write, old_index, new_index); 1958 if (index == old_index) 1959 return 1; 1960 } 1961 1962 /* could not discard */ 1963 return 0; 1964 } 1965 1966 static int 1967 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 1968 u64 *ts, u64 *delta) 1969 { 1970 struct ring_buffer_event *event; 1971 static int once; 1972 int ret; 1973 1974 if (unlikely(*delta > (1ULL << 59) && !once++)) { 1975 printk(KERN_WARNING "Delta way too big! %llu" 1976 " ts=%llu write stamp = %llu\n", 1977 (unsigned long long)*delta, 1978 (unsigned long long)*ts, 1979 (unsigned long long)cpu_buffer->write_stamp); 1980 WARN_ON(1); 1981 } 1982 1983 /* 1984 * The delta is too big, we to add a 1985 * new timestamp. 1986 */ 1987 event = __rb_reserve_next(cpu_buffer, 1988 RINGBUF_TYPE_TIME_EXTEND, 1989 RB_LEN_TIME_EXTEND, 1990 ts); 1991 if (!event) 1992 return -EBUSY; 1993 1994 if (PTR_ERR(event) == -EAGAIN) 1995 return -EAGAIN; 1996 1997 /* Only a commited time event can update the write stamp */ 1998 if (rb_event_is_commit(cpu_buffer, event)) { 1999 /* 2000 * If this is the first on the page, then it was 2001 * updated with the page itself. Try to discard it 2002 * and if we can't just make it zero. 2003 */ 2004 if (rb_event_index(event)) { 2005 event->time_delta = *delta & TS_MASK; 2006 event->array[0] = *delta >> TS_SHIFT; 2007 } else { 2008 /* try to discard, since we do not need this */ 2009 if (!rb_try_to_discard(cpu_buffer, event)) { 2010 /* nope, just zero it */ 2011 event->time_delta = 0; 2012 event->array[0] = 0; 2013 } 2014 } 2015 cpu_buffer->write_stamp = *ts; 2016 /* let the caller know this was the commit */ 2017 ret = 1; 2018 } else { 2019 /* Try to discard the event */ 2020 if (!rb_try_to_discard(cpu_buffer, event)) { 2021 /* Darn, this is just wasted space */ 2022 event->time_delta = 0; 2023 event->array[0] = 0; 2024 } 2025 ret = 0; 2026 } 2027 2028 *delta = 0; 2029 2030 return ret; 2031 } 2032 2033 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2034 { 2035 local_inc(&cpu_buffer->committing); 2036 local_inc(&cpu_buffer->commits); 2037 } 2038 2039 static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 2040 { 2041 unsigned long commits; 2042 2043 if (RB_WARN_ON(cpu_buffer, 2044 !local_read(&cpu_buffer->committing))) 2045 return; 2046 2047 again: 2048 commits = local_read(&cpu_buffer->commits); 2049 /* synchronize with interrupts */ 2050 barrier(); 2051 if (local_read(&cpu_buffer->committing) == 1) 2052 rb_set_commit_to_write(cpu_buffer); 2053 2054 local_dec(&cpu_buffer->committing); 2055 2056 /* synchronize with interrupts */ 2057 barrier(); 2058 2059 /* 2060 * Need to account for interrupts coming in between the 2061 * updating of the commit page and the clearing of the 2062 * committing counter. 2063 */ 2064 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 2065 !local_read(&cpu_buffer->committing)) { 2066 local_inc(&cpu_buffer->committing); 2067 goto again; 2068 } 2069 } 2070 2071 static struct ring_buffer_event * 2072 rb_reserve_next_event(struct ring_buffer *buffer, 2073 struct ring_buffer_per_cpu *cpu_buffer, 2074 unsigned long length) 2075 { 2076 struct ring_buffer_event *event; 2077 u64 ts, delta = 0; 2078 int commit = 0; 2079 int nr_loops = 0; 2080 2081 rb_start_commit(cpu_buffer); 2082 2083 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 2084 /* 2085 * Due to the ability to swap a cpu buffer from a buffer 2086 * it is possible it was swapped before we committed. 2087 * (committing stops a swap). We check for it here and 2088 * if it happened, we have to fail the write. 2089 */ 2090 barrier(); 2091 if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) { 2092 local_dec(&cpu_buffer->committing); 2093 local_dec(&cpu_buffer->commits); 2094 return NULL; 2095 } 2096 #endif 2097 2098 length = rb_calculate_event_length(length); 2099 again: 2100 /* 2101 * We allow for interrupts to reenter here and do a trace. 2102 * If one does, it will cause this original code to loop 2103 * back here. Even with heavy interrupts happening, this 2104 * should only happen a few times in a row. If this happens 2105 * 1000 times in a row, there must be either an interrupt 2106 * storm or we have something buggy. 2107 * Bail! 2108 */ 2109 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 2110 goto out_fail; 2111 2112 ts = rb_time_stamp(cpu_buffer->buffer); 2113 2114 /* 2115 * Only the first commit can update the timestamp. 2116 * Yes there is a race here. If an interrupt comes in 2117 * just after the conditional and it traces too, then it 2118 * will also check the deltas. More than one timestamp may 2119 * also be made. But only the entry that did the actual 2120 * commit will be something other than zero. 2121 */ 2122 if (likely(cpu_buffer->tail_page == cpu_buffer->commit_page && 2123 rb_page_write(cpu_buffer->tail_page) == 2124 rb_commit_index(cpu_buffer))) { 2125 u64 diff; 2126 2127 diff = ts - cpu_buffer->write_stamp; 2128 2129 /* make sure this diff is calculated here */ 2130 barrier(); 2131 2132 /* Did the write stamp get updated already? */ 2133 if (unlikely(ts < cpu_buffer->write_stamp)) 2134 goto get_event; 2135 2136 delta = diff; 2137 if (unlikely(test_time_stamp(delta))) { 2138 2139 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); 2140 if (commit == -EBUSY) 2141 goto out_fail; 2142 2143 if (commit == -EAGAIN) 2144 goto again; 2145 2146 RB_WARN_ON(cpu_buffer, commit < 0); 2147 } 2148 } 2149 2150 get_event: 2151 event = __rb_reserve_next(cpu_buffer, 0, length, &ts); 2152 if (unlikely(PTR_ERR(event) == -EAGAIN)) 2153 goto again; 2154 2155 if (!event) 2156 goto out_fail; 2157 2158 if (!rb_event_is_commit(cpu_buffer, event)) 2159 delta = 0; 2160 2161 event->time_delta = delta; 2162 2163 return event; 2164 2165 out_fail: 2166 rb_end_commit(cpu_buffer); 2167 return NULL; 2168 } 2169 2170 #ifdef CONFIG_TRACING 2171 2172 #define TRACE_RECURSIVE_DEPTH 16 2173 2174 static int trace_recursive_lock(void) 2175 { 2176 current->trace_recursion++; 2177 2178 if (likely(current->trace_recursion < TRACE_RECURSIVE_DEPTH)) 2179 return 0; 2180 2181 /* Disable all tracing before we do anything else */ 2182 tracing_off_permanent(); 2183 2184 printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:" 2185 "HC[%lu]:SC[%lu]:NMI[%lu]\n", 2186 current->trace_recursion, 2187 hardirq_count() >> HARDIRQ_SHIFT, 2188 softirq_count() >> SOFTIRQ_SHIFT, 2189 in_nmi()); 2190 2191 WARN_ON_ONCE(1); 2192 return -1; 2193 } 2194 2195 static void trace_recursive_unlock(void) 2196 { 2197 WARN_ON_ONCE(!current->trace_recursion); 2198 2199 current->trace_recursion--; 2200 } 2201 2202 #else 2203 2204 #define trace_recursive_lock() (0) 2205 #define trace_recursive_unlock() do { } while (0) 2206 2207 #endif 2208 2209 static DEFINE_PER_CPU(int, rb_need_resched); 2210 2211 /** 2212 * ring_buffer_lock_reserve - reserve a part of the buffer 2213 * @buffer: the ring buffer to reserve from 2214 * @length: the length of the data to reserve (excluding event header) 2215 * 2216 * Returns a reseverd event on the ring buffer to copy directly to. 2217 * The user of this interface will need to get the body to write into 2218 * and can use the ring_buffer_event_data() interface. 2219 * 2220 * The length is the length of the data needed, not the event length 2221 * which also includes the event header. 2222 * 2223 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 2224 * If NULL is returned, then nothing has been allocated or locked. 2225 */ 2226 struct ring_buffer_event * 2227 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) 2228 { 2229 struct ring_buffer_per_cpu *cpu_buffer; 2230 struct ring_buffer_event *event; 2231 int cpu, resched; 2232 2233 if (ring_buffer_flags != RB_BUFFERS_ON) 2234 return NULL; 2235 2236 if (atomic_read(&buffer->record_disabled)) 2237 return NULL; 2238 2239 /* If we are tracing schedule, we don't want to recurse */ 2240 resched = ftrace_preempt_disable(); 2241 2242 if (trace_recursive_lock()) 2243 goto out_nocheck; 2244 2245 cpu = raw_smp_processor_id(); 2246 2247 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2248 goto out; 2249 2250 cpu_buffer = buffer->buffers[cpu]; 2251 2252 if (atomic_read(&cpu_buffer->record_disabled)) 2253 goto out; 2254 2255 if (length > BUF_MAX_DATA_SIZE) 2256 goto out; 2257 2258 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2259 if (!event) 2260 goto out; 2261 2262 /* 2263 * Need to store resched state on this cpu. 2264 * Only the first needs to. 2265 */ 2266 2267 if (preempt_count() == 1) 2268 per_cpu(rb_need_resched, cpu) = resched; 2269 2270 return event; 2271 2272 out: 2273 trace_recursive_unlock(); 2274 2275 out_nocheck: 2276 ftrace_preempt_enable(resched); 2277 return NULL; 2278 } 2279 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 2280 2281 static void 2282 rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2283 struct ring_buffer_event *event) 2284 { 2285 /* 2286 * The event first in the commit queue updates the 2287 * time stamp. 2288 */ 2289 if (rb_event_is_commit(cpu_buffer, event)) 2290 cpu_buffer->write_stamp += event->time_delta; 2291 } 2292 2293 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 2294 struct ring_buffer_event *event) 2295 { 2296 local_inc(&cpu_buffer->entries); 2297 rb_update_write_stamp(cpu_buffer, event); 2298 rb_end_commit(cpu_buffer); 2299 } 2300 2301 /** 2302 * ring_buffer_unlock_commit - commit a reserved 2303 * @buffer: The buffer to commit to 2304 * @event: The event pointer to commit. 2305 * 2306 * This commits the data to the ring buffer, and releases any locks held. 2307 * 2308 * Must be paired with ring_buffer_lock_reserve. 2309 */ 2310 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 2311 struct ring_buffer_event *event) 2312 { 2313 struct ring_buffer_per_cpu *cpu_buffer; 2314 int cpu = raw_smp_processor_id(); 2315 2316 cpu_buffer = buffer->buffers[cpu]; 2317 2318 rb_commit(cpu_buffer, event); 2319 2320 trace_recursive_unlock(); 2321 2322 /* 2323 * Only the last preempt count needs to restore preemption. 2324 */ 2325 if (preempt_count() == 1) 2326 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 2327 else 2328 preempt_enable_no_resched_notrace(); 2329 2330 return 0; 2331 } 2332 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 2333 2334 static inline void rb_event_discard(struct ring_buffer_event *event) 2335 { 2336 /* array[0] holds the actual length for the discarded event */ 2337 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 2338 event->type_len = RINGBUF_TYPE_PADDING; 2339 /* time delta must be non zero */ 2340 if (!event->time_delta) 2341 event->time_delta = 1; 2342 } 2343 2344 /* 2345 * Decrement the entries to the page that an event is on. 2346 * The event does not even need to exist, only the pointer 2347 * to the page it is on. This may only be called before the commit 2348 * takes place. 2349 */ 2350 static inline void 2351 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 2352 struct ring_buffer_event *event) 2353 { 2354 unsigned long addr = (unsigned long)event; 2355 struct buffer_page *bpage = cpu_buffer->commit_page; 2356 struct buffer_page *start; 2357 2358 addr &= PAGE_MASK; 2359 2360 /* Do the likely case first */ 2361 if (likely(bpage->page == (void *)addr)) { 2362 local_dec(&bpage->entries); 2363 return; 2364 } 2365 2366 /* 2367 * Because the commit page may be on the reader page we 2368 * start with the next page and check the end loop there. 2369 */ 2370 rb_inc_page(cpu_buffer, &bpage); 2371 start = bpage; 2372 do { 2373 if (bpage->page == (void *)addr) { 2374 local_dec(&bpage->entries); 2375 return; 2376 } 2377 rb_inc_page(cpu_buffer, &bpage); 2378 } while (bpage != start); 2379 2380 /* commit not part of this buffer?? */ 2381 RB_WARN_ON(cpu_buffer, 1); 2382 } 2383 2384 /** 2385 * ring_buffer_commit_discard - discard an event that has not been committed 2386 * @buffer: the ring buffer 2387 * @event: non committed event to discard 2388 * 2389 * Sometimes an event that is in the ring buffer needs to be ignored. 2390 * This function lets the user discard an event in the ring buffer 2391 * and then that event will not be read later. 2392 * 2393 * This function only works if it is called before the the item has been 2394 * committed. It will try to free the event from the ring buffer 2395 * if another event has not been added behind it. 2396 * 2397 * If another event has been added behind it, it will set the event 2398 * up as discarded, and perform the commit. 2399 * 2400 * If this function is called, do not call ring_buffer_unlock_commit on 2401 * the event. 2402 */ 2403 void ring_buffer_discard_commit(struct ring_buffer *buffer, 2404 struct ring_buffer_event *event) 2405 { 2406 struct ring_buffer_per_cpu *cpu_buffer; 2407 int cpu; 2408 2409 /* The event is discarded regardless */ 2410 rb_event_discard(event); 2411 2412 cpu = smp_processor_id(); 2413 cpu_buffer = buffer->buffers[cpu]; 2414 2415 /* 2416 * This must only be called if the event has not been 2417 * committed yet. Thus we can assume that preemption 2418 * is still disabled. 2419 */ 2420 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 2421 2422 rb_decrement_entry(cpu_buffer, event); 2423 if (rb_try_to_discard(cpu_buffer, event)) 2424 goto out; 2425 2426 /* 2427 * The commit is still visible by the reader, so we 2428 * must still update the timestamp. 2429 */ 2430 rb_update_write_stamp(cpu_buffer, event); 2431 out: 2432 rb_end_commit(cpu_buffer); 2433 2434 trace_recursive_unlock(); 2435 2436 /* 2437 * Only the last preempt count needs to restore preemption. 2438 */ 2439 if (preempt_count() == 1) 2440 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 2441 else 2442 preempt_enable_no_resched_notrace(); 2443 2444 } 2445 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 2446 2447 /** 2448 * ring_buffer_write - write data to the buffer without reserving 2449 * @buffer: The ring buffer to write to. 2450 * @length: The length of the data being written (excluding the event header) 2451 * @data: The data to write to the buffer. 2452 * 2453 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 2454 * one function. If you already have the data to write to the buffer, it 2455 * may be easier to simply call this function. 2456 * 2457 * Note, like ring_buffer_lock_reserve, the length is the length of the data 2458 * and not the length of the event which would hold the header. 2459 */ 2460 int ring_buffer_write(struct ring_buffer *buffer, 2461 unsigned long length, 2462 void *data) 2463 { 2464 struct ring_buffer_per_cpu *cpu_buffer; 2465 struct ring_buffer_event *event; 2466 void *body; 2467 int ret = -EBUSY; 2468 int cpu, resched; 2469 2470 if (ring_buffer_flags != RB_BUFFERS_ON) 2471 return -EBUSY; 2472 2473 if (atomic_read(&buffer->record_disabled)) 2474 return -EBUSY; 2475 2476 resched = ftrace_preempt_disable(); 2477 2478 cpu = raw_smp_processor_id(); 2479 2480 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2481 goto out; 2482 2483 cpu_buffer = buffer->buffers[cpu]; 2484 2485 if (atomic_read(&cpu_buffer->record_disabled)) 2486 goto out; 2487 2488 if (length > BUF_MAX_DATA_SIZE) 2489 goto out; 2490 2491 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2492 if (!event) 2493 goto out; 2494 2495 body = rb_event_data(event); 2496 2497 memcpy(body, data, length); 2498 2499 rb_commit(cpu_buffer, event); 2500 2501 ret = 0; 2502 out: 2503 ftrace_preempt_enable(resched); 2504 2505 return ret; 2506 } 2507 EXPORT_SYMBOL_GPL(ring_buffer_write); 2508 2509 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 2510 { 2511 struct buffer_page *reader = cpu_buffer->reader_page; 2512 struct buffer_page *head = rb_set_head_page(cpu_buffer); 2513 struct buffer_page *commit = cpu_buffer->commit_page; 2514 2515 /* In case of error, head will be NULL */ 2516 if (unlikely(!head)) 2517 return 1; 2518 2519 return reader->read == rb_page_commit(reader) && 2520 (commit == reader || 2521 (commit == head && 2522 head->read == rb_page_commit(commit))); 2523 } 2524 2525 /** 2526 * ring_buffer_record_disable - stop all writes into the buffer 2527 * @buffer: The ring buffer to stop writes to. 2528 * 2529 * This prevents all writes to the buffer. Any attempt to write 2530 * to the buffer after this will fail and return NULL. 2531 * 2532 * The caller should call synchronize_sched() after this. 2533 */ 2534 void ring_buffer_record_disable(struct ring_buffer *buffer) 2535 { 2536 atomic_inc(&buffer->record_disabled); 2537 } 2538 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 2539 2540 /** 2541 * ring_buffer_record_enable - enable writes to the buffer 2542 * @buffer: The ring buffer to enable writes 2543 * 2544 * Note, multiple disables will need the same number of enables 2545 * to truely enable the writing (much like preempt_disable). 2546 */ 2547 void ring_buffer_record_enable(struct ring_buffer *buffer) 2548 { 2549 atomic_dec(&buffer->record_disabled); 2550 } 2551 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 2552 2553 /** 2554 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 2555 * @buffer: The ring buffer to stop writes to. 2556 * @cpu: The CPU buffer to stop 2557 * 2558 * This prevents all writes to the buffer. Any attempt to write 2559 * to the buffer after this will fail and return NULL. 2560 * 2561 * The caller should call synchronize_sched() after this. 2562 */ 2563 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 2564 { 2565 struct ring_buffer_per_cpu *cpu_buffer; 2566 2567 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2568 return; 2569 2570 cpu_buffer = buffer->buffers[cpu]; 2571 atomic_inc(&cpu_buffer->record_disabled); 2572 } 2573 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 2574 2575 /** 2576 * ring_buffer_record_enable_cpu - enable writes to the buffer 2577 * @buffer: The ring buffer to enable writes 2578 * @cpu: The CPU to enable. 2579 * 2580 * Note, multiple disables will need the same number of enables 2581 * to truely enable the writing (much like preempt_disable). 2582 */ 2583 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 2584 { 2585 struct ring_buffer_per_cpu *cpu_buffer; 2586 2587 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2588 return; 2589 2590 cpu_buffer = buffer->buffers[cpu]; 2591 atomic_dec(&cpu_buffer->record_disabled); 2592 } 2593 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 2594 2595 /** 2596 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 2597 * @buffer: The ring buffer 2598 * @cpu: The per CPU buffer to get the entries from. 2599 */ 2600 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 2601 { 2602 struct ring_buffer_per_cpu *cpu_buffer; 2603 unsigned long ret; 2604 2605 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2606 return 0; 2607 2608 cpu_buffer = buffer->buffers[cpu]; 2609 ret = (local_read(&cpu_buffer->entries) - local_read(&cpu_buffer->overrun)) 2610 - cpu_buffer->read; 2611 2612 return ret; 2613 } 2614 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 2615 2616 /** 2617 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer 2618 * @buffer: The ring buffer 2619 * @cpu: The per CPU buffer to get the number of overruns from 2620 */ 2621 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 2622 { 2623 struct ring_buffer_per_cpu *cpu_buffer; 2624 unsigned long ret; 2625 2626 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2627 return 0; 2628 2629 cpu_buffer = buffer->buffers[cpu]; 2630 ret = local_read(&cpu_buffer->overrun); 2631 2632 return ret; 2633 } 2634 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 2635 2636 /** 2637 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits 2638 * @buffer: The ring buffer 2639 * @cpu: The per CPU buffer to get the number of overruns from 2640 */ 2641 unsigned long 2642 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu) 2643 { 2644 struct ring_buffer_per_cpu *cpu_buffer; 2645 unsigned long ret; 2646 2647 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2648 return 0; 2649 2650 cpu_buffer = buffer->buffers[cpu]; 2651 ret = local_read(&cpu_buffer->commit_overrun); 2652 2653 return ret; 2654 } 2655 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 2656 2657 /** 2658 * ring_buffer_entries - get the number of entries in a buffer 2659 * @buffer: The ring buffer 2660 * 2661 * Returns the total number of entries in the ring buffer 2662 * (all CPU entries) 2663 */ 2664 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 2665 { 2666 struct ring_buffer_per_cpu *cpu_buffer; 2667 unsigned long entries = 0; 2668 int cpu; 2669 2670 /* if you care about this being correct, lock the buffer */ 2671 for_each_buffer_cpu(buffer, cpu) { 2672 cpu_buffer = buffer->buffers[cpu]; 2673 entries += (local_read(&cpu_buffer->entries) - 2674 local_read(&cpu_buffer->overrun)) - cpu_buffer->read; 2675 } 2676 2677 return entries; 2678 } 2679 EXPORT_SYMBOL_GPL(ring_buffer_entries); 2680 2681 /** 2682 * ring_buffer_overruns - get the number of overruns in buffer 2683 * @buffer: The ring buffer 2684 * 2685 * Returns the total number of overruns in the ring buffer 2686 * (all CPU entries) 2687 */ 2688 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 2689 { 2690 struct ring_buffer_per_cpu *cpu_buffer; 2691 unsigned long overruns = 0; 2692 int cpu; 2693 2694 /* if you care about this being correct, lock the buffer */ 2695 for_each_buffer_cpu(buffer, cpu) { 2696 cpu_buffer = buffer->buffers[cpu]; 2697 overruns += local_read(&cpu_buffer->overrun); 2698 } 2699 2700 return overruns; 2701 } 2702 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 2703 2704 static void rb_iter_reset(struct ring_buffer_iter *iter) 2705 { 2706 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2707 2708 /* Iterator usage is expected to have record disabled */ 2709 if (list_empty(&cpu_buffer->reader_page->list)) { 2710 iter->head_page = rb_set_head_page(cpu_buffer); 2711 if (unlikely(!iter->head_page)) 2712 return; 2713 iter->head = iter->head_page->read; 2714 } else { 2715 iter->head_page = cpu_buffer->reader_page; 2716 iter->head = cpu_buffer->reader_page->read; 2717 } 2718 if (iter->head) 2719 iter->read_stamp = cpu_buffer->read_stamp; 2720 else 2721 iter->read_stamp = iter->head_page->page->time_stamp; 2722 iter->cache_reader_page = cpu_buffer->reader_page; 2723 iter->cache_read = cpu_buffer->read; 2724 } 2725 2726 /** 2727 * ring_buffer_iter_reset - reset an iterator 2728 * @iter: The iterator to reset 2729 * 2730 * Resets the iterator, so that it will start from the beginning 2731 * again. 2732 */ 2733 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 2734 { 2735 struct ring_buffer_per_cpu *cpu_buffer; 2736 unsigned long flags; 2737 2738 if (!iter) 2739 return; 2740 2741 cpu_buffer = iter->cpu_buffer; 2742 2743 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2744 rb_iter_reset(iter); 2745 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2746 } 2747 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 2748 2749 /** 2750 * ring_buffer_iter_empty - check if an iterator has no more to read 2751 * @iter: The iterator to check 2752 */ 2753 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 2754 { 2755 struct ring_buffer_per_cpu *cpu_buffer; 2756 2757 cpu_buffer = iter->cpu_buffer; 2758 2759 return iter->head_page == cpu_buffer->commit_page && 2760 iter->head == rb_commit_index(cpu_buffer); 2761 } 2762 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 2763 2764 static void 2765 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2766 struct ring_buffer_event *event) 2767 { 2768 u64 delta; 2769 2770 switch (event->type_len) { 2771 case RINGBUF_TYPE_PADDING: 2772 return; 2773 2774 case RINGBUF_TYPE_TIME_EXTEND: 2775 delta = event->array[0]; 2776 delta <<= TS_SHIFT; 2777 delta += event->time_delta; 2778 cpu_buffer->read_stamp += delta; 2779 return; 2780 2781 case RINGBUF_TYPE_TIME_STAMP: 2782 /* FIXME: not implemented */ 2783 return; 2784 2785 case RINGBUF_TYPE_DATA: 2786 cpu_buffer->read_stamp += event->time_delta; 2787 return; 2788 2789 default: 2790 BUG(); 2791 } 2792 return; 2793 } 2794 2795 static void 2796 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 2797 struct ring_buffer_event *event) 2798 { 2799 u64 delta; 2800 2801 switch (event->type_len) { 2802 case RINGBUF_TYPE_PADDING: 2803 return; 2804 2805 case RINGBUF_TYPE_TIME_EXTEND: 2806 delta = event->array[0]; 2807 delta <<= TS_SHIFT; 2808 delta += event->time_delta; 2809 iter->read_stamp += delta; 2810 return; 2811 2812 case RINGBUF_TYPE_TIME_STAMP: 2813 /* FIXME: not implemented */ 2814 return; 2815 2816 case RINGBUF_TYPE_DATA: 2817 iter->read_stamp += event->time_delta; 2818 return; 2819 2820 default: 2821 BUG(); 2822 } 2823 return; 2824 } 2825 2826 static struct buffer_page * 2827 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 2828 { 2829 struct buffer_page *reader = NULL; 2830 unsigned long flags; 2831 int nr_loops = 0; 2832 int ret; 2833 2834 local_irq_save(flags); 2835 arch_spin_lock(&cpu_buffer->lock); 2836 2837 again: 2838 /* 2839 * This should normally only loop twice. But because the 2840 * start of the reader inserts an empty page, it causes 2841 * a case where we will loop three times. There should be no 2842 * reason to loop four times (that I know of). 2843 */ 2844 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 2845 reader = NULL; 2846 goto out; 2847 } 2848 2849 reader = cpu_buffer->reader_page; 2850 2851 /* If there's more to read, return this page */ 2852 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 2853 goto out; 2854 2855 /* Never should we have an index greater than the size */ 2856 if (RB_WARN_ON(cpu_buffer, 2857 cpu_buffer->reader_page->read > rb_page_size(reader))) 2858 goto out; 2859 2860 /* check if we caught up to the tail */ 2861 reader = NULL; 2862 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 2863 goto out; 2864 2865 /* 2866 * Reset the reader page to size zero. 2867 */ 2868 local_set(&cpu_buffer->reader_page->write, 0); 2869 local_set(&cpu_buffer->reader_page->entries, 0); 2870 local_set(&cpu_buffer->reader_page->page->commit, 0); 2871 2872 spin: 2873 /* 2874 * Splice the empty reader page into the list around the head. 2875 */ 2876 reader = rb_set_head_page(cpu_buffer); 2877 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 2878 cpu_buffer->reader_page->list.prev = reader->list.prev; 2879 2880 /* 2881 * cpu_buffer->pages just needs to point to the buffer, it 2882 * has no specific buffer page to point to. Lets move it out 2883 * of our way so we don't accidently swap it. 2884 */ 2885 cpu_buffer->pages = reader->list.prev; 2886 2887 /* The reader page will be pointing to the new head */ 2888 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list); 2889 2890 /* 2891 * Here's the tricky part. 2892 * 2893 * We need to move the pointer past the header page. 2894 * But we can only do that if a writer is not currently 2895 * moving it. The page before the header page has the 2896 * flag bit '1' set if it is pointing to the page we want. 2897 * but if the writer is in the process of moving it 2898 * than it will be '2' or already moved '0'. 2899 */ 2900 2901 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 2902 2903 /* 2904 * If we did not convert it, then we must try again. 2905 */ 2906 if (!ret) 2907 goto spin; 2908 2909 /* 2910 * Yeah! We succeeded in replacing the page. 2911 * 2912 * Now make the new head point back to the reader page. 2913 */ 2914 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 2915 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 2916 2917 /* Finally update the reader page to the new head */ 2918 cpu_buffer->reader_page = reader; 2919 rb_reset_reader_page(cpu_buffer); 2920 2921 goto again; 2922 2923 out: 2924 arch_spin_unlock(&cpu_buffer->lock); 2925 local_irq_restore(flags); 2926 2927 return reader; 2928 } 2929 2930 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 2931 { 2932 struct ring_buffer_event *event; 2933 struct buffer_page *reader; 2934 unsigned length; 2935 2936 reader = rb_get_reader_page(cpu_buffer); 2937 2938 /* This function should not be called when buffer is empty */ 2939 if (RB_WARN_ON(cpu_buffer, !reader)) 2940 return; 2941 2942 event = rb_reader_event(cpu_buffer); 2943 2944 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 2945 cpu_buffer->read++; 2946 2947 rb_update_read_stamp(cpu_buffer, event); 2948 2949 length = rb_event_length(event); 2950 cpu_buffer->reader_page->read += length; 2951 } 2952 2953 static void rb_advance_iter(struct ring_buffer_iter *iter) 2954 { 2955 struct ring_buffer *buffer; 2956 struct ring_buffer_per_cpu *cpu_buffer; 2957 struct ring_buffer_event *event; 2958 unsigned length; 2959 2960 cpu_buffer = iter->cpu_buffer; 2961 buffer = cpu_buffer->buffer; 2962 2963 /* 2964 * Check if we are at the end of the buffer. 2965 */ 2966 if (iter->head >= rb_page_size(iter->head_page)) { 2967 /* discarded commits can make the page empty */ 2968 if (iter->head_page == cpu_buffer->commit_page) 2969 return; 2970 rb_inc_iter(iter); 2971 return; 2972 } 2973 2974 event = rb_iter_head_event(iter); 2975 2976 length = rb_event_length(event); 2977 2978 /* 2979 * This should not be called to advance the header if we are 2980 * at the tail of the buffer. 2981 */ 2982 if (RB_WARN_ON(cpu_buffer, 2983 (iter->head_page == cpu_buffer->commit_page) && 2984 (iter->head + length > rb_commit_index(cpu_buffer)))) 2985 return; 2986 2987 rb_update_iter_read_stamp(iter, event); 2988 2989 iter->head += length; 2990 2991 /* check for end of page padding */ 2992 if ((iter->head >= rb_page_size(iter->head_page)) && 2993 (iter->head_page != cpu_buffer->commit_page)) 2994 rb_advance_iter(iter); 2995 } 2996 2997 static struct ring_buffer_event * 2998 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts) 2999 { 3000 struct ring_buffer_event *event; 3001 struct buffer_page *reader; 3002 int nr_loops = 0; 3003 3004 again: 3005 /* 3006 * We repeat when a timestamp is encountered. It is possible 3007 * to get multiple timestamps from an interrupt entering just 3008 * as one timestamp is about to be written, or from discarded 3009 * commits. The most that we can have is the number on a single page. 3010 */ 3011 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 3012 return NULL; 3013 3014 reader = rb_get_reader_page(cpu_buffer); 3015 if (!reader) 3016 return NULL; 3017 3018 event = rb_reader_event(cpu_buffer); 3019 3020 switch (event->type_len) { 3021 case RINGBUF_TYPE_PADDING: 3022 if (rb_null_event(event)) 3023 RB_WARN_ON(cpu_buffer, 1); 3024 /* 3025 * Because the writer could be discarding every 3026 * event it creates (which would probably be bad) 3027 * if we were to go back to "again" then we may never 3028 * catch up, and will trigger the warn on, or lock 3029 * the box. Return the padding, and we will release 3030 * the current locks, and try again. 3031 */ 3032 return event; 3033 3034 case RINGBUF_TYPE_TIME_EXTEND: 3035 /* Internal data, OK to advance */ 3036 rb_advance_reader(cpu_buffer); 3037 goto again; 3038 3039 case RINGBUF_TYPE_TIME_STAMP: 3040 /* FIXME: not implemented */ 3041 rb_advance_reader(cpu_buffer); 3042 goto again; 3043 3044 case RINGBUF_TYPE_DATA: 3045 if (ts) { 3046 *ts = cpu_buffer->read_stamp + event->time_delta; 3047 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 3048 cpu_buffer->cpu, ts); 3049 } 3050 return event; 3051 3052 default: 3053 BUG(); 3054 } 3055 3056 return NULL; 3057 } 3058 EXPORT_SYMBOL_GPL(ring_buffer_peek); 3059 3060 static struct ring_buffer_event * 3061 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3062 { 3063 struct ring_buffer *buffer; 3064 struct ring_buffer_per_cpu *cpu_buffer; 3065 struct ring_buffer_event *event; 3066 int nr_loops = 0; 3067 3068 cpu_buffer = iter->cpu_buffer; 3069 buffer = cpu_buffer->buffer; 3070 3071 /* 3072 * Check if someone performed a consuming read to 3073 * the buffer. A consuming read invalidates the iterator 3074 * and we need to reset the iterator in this case. 3075 */ 3076 if (unlikely(iter->cache_read != cpu_buffer->read || 3077 iter->cache_reader_page != cpu_buffer->reader_page)) 3078 rb_iter_reset(iter); 3079 3080 again: 3081 if (ring_buffer_iter_empty(iter)) 3082 return NULL; 3083 3084 /* 3085 * We repeat when a timestamp is encountered. 3086 * We can get multiple timestamps by nested interrupts or also 3087 * if filtering is on (discarding commits). Since discarding 3088 * commits can be frequent we can get a lot of timestamps. 3089 * But we limit them by not adding timestamps if they begin 3090 * at the start of a page. 3091 */ 3092 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 3093 return NULL; 3094 3095 if (rb_per_cpu_empty(cpu_buffer)) 3096 return NULL; 3097 3098 if (iter->head >= local_read(&iter->head_page->page->commit)) { 3099 rb_inc_iter(iter); 3100 goto again; 3101 } 3102 3103 event = rb_iter_head_event(iter); 3104 3105 switch (event->type_len) { 3106 case RINGBUF_TYPE_PADDING: 3107 if (rb_null_event(event)) { 3108 rb_inc_iter(iter); 3109 goto again; 3110 } 3111 rb_advance_iter(iter); 3112 return event; 3113 3114 case RINGBUF_TYPE_TIME_EXTEND: 3115 /* Internal data, OK to advance */ 3116 rb_advance_iter(iter); 3117 goto again; 3118 3119 case RINGBUF_TYPE_TIME_STAMP: 3120 /* FIXME: not implemented */ 3121 rb_advance_iter(iter); 3122 goto again; 3123 3124 case RINGBUF_TYPE_DATA: 3125 if (ts) { 3126 *ts = iter->read_stamp + event->time_delta; 3127 ring_buffer_normalize_time_stamp(buffer, 3128 cpu_buffer->cpu, ts); 3129 } 3130 return event; 3131 3132 default: 3133 BUG(); 3134 } 3135 3136 return NULL; 3137 } 3138 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 3139 3140 static inline int rb_ok_to_lock(void) 3141 { 3142 /* 3143 * If an NMI die dumps out the content of the ring buffer 3144 * do not grab locks. We also permanently disable the ring 3145 * buffer too. A one time deal is all you get from reading 3146 * the ring buffer from an NMI. 3147 */ 3148 if (likely(!in_nmi())) 3149 return 1; 3150 3151 tracing_off_permanent(); 3152 return 0; 3153 } 3154 3155 /** 3156 * ring_buffer_peek - peek at the next event to be read 3157 * @buffer: The ring buffer to read 3158 * @cpu: The cpu to peak at 3159 * @ts: The timestamp counter of this event. 3160 * 3161 * This will return the event that will be read next, but does 3162 * not consume the data. 3163 */ 3164 struct ring_buffer_event * 3165 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 3166 { 3167 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3168 struct ring_buffer_event *event; 3169 unsigned long flags; 3170 int dolock; 3171 3172 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3173 return NULL; 3174 3175 dolock = rb_ok_to_lock(); 3176 again: 3177 local_irq_save(flags); 3178 if (dolock) 3179 spin_lock(&cpu_buffer->reader_lock); 3180 event = rb_buffer_peek(cpu_buffer, ts); 3181 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3182 rb_advance_reader(cpu_buffer); 3183 if (dolock) 3184 spin_unlock(&cpu_buffer->reader_lock); 3185 local_irq_restore(flags); 3186 3187 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3188 goto again; 3189 3190 return event; 3191 } 3192 3193 /** 3194 * ring_buffer_iter_peek - peek at the next event to be read 3195 * @iter: The ring buffer iterator 3196 * @ts: The timestamp counter of this event. 3197 * 3198 * This will return the event that will be read next, but does 3199 * not increment the iterator. 3200 */ 3201 struct ring_buffer_event * 3202 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3203 { 3204 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3205 struct ring_buffer_event *event; 3206 unsigned long flags; 3207 3208 again: 3209 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3210 event = rb_iter_peek(iter, ts); 3211 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3212 3213 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3214 goto again; 3215 3216 return event; 3217 } 3218 3219 /** 3220 * ring_buffer_consume - return an event and consume it 3221 * @buffer: The ring buffer to get the next event from 3222 * 3223 * Returns the next event in the ring buffer, and that event is consumed. 3224 * Meaning, that sequential reads will keep returning a different event, 3225 * and eventually empty the ring buffer if the producer is slower. 3226 */ 3227 struct ring_buffer_event * 3228 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) 3229 { 3230 struct ring_buffer_per_cpu *cpu_buffer; 3231 struct ring_buffer_event *event = NULL; 3232 unsigned long flags; 3233 int dolock; 3234 3235 dolock = rb_ok_to_lock(); 3236 3237 again: 3238 /* might be called in atomic */ 3239 preempt_disable(); 3240 3241 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3242 goto out; 3243 3244 cpu_buffer = buffer->buffers[cpu]; 3245 local_irq_save(flags); 3246 if (dolock) 3247 spin_lock(&cpu_buffer->reader_lock); 3248 3249 event = rb_buffer_peek(cpu_buffer, ts); 3250 if (event) 3251 rb_advance_reader(cpu_buffer); 3252 3253 if (dolock) 3254 spin_unlock(&cpu_buffer->reader_lock); 3255 local_irq_restore(flags); 3256 3257 out: 3258 preempt_enable(); 3259 3260 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3261 goto again; 3262 3263 return event; 3264 } 3265 EXPORT_SYMBOL_GPL(ring_buffer_consume); 3266 3267 /** 3268 * ring_buffer_read_start - start a non consuming read of the buffer 3269 * @buffer: The ring buffer to read from 3270 * @cpu: The cpu buffer to iterate over 3271 * 3272 * This starts up an iteration through the buffer. It also disables 3273 * the recording to the buffer until the reading is finished. 3274 * This prevents the reading from being corrupted. This is not 3275 * a consuming read, so a producer is not expected. 3276 * 3277 * Must be paired with ring_buffer_finish. 3278 */ 3279 struct ring_buffer_iter * 3280 ring_buffer_read_start(struct ring_buffer *buffer, int cpu) 3281 { 3282 struct ring_buffer_per_cpu *cpu_buffer; 3283 struct ring_buffer_iter *iter; 3284 unsigned long flags; 3285 3286 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3287 return NULL; 3288 3289 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 3290 if (!iter) 3291 return NULL; 3292 3293 cpu_buffer = buffer->buffers[cpu]; 3294 3295 iter->cpu_buffer = cpu_buffer; 3296 3297 atomic_inc(&cpu_buffer->record_disabled); 3298 synchronize_sched(); 3299 3300 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3301 arch_spin_lock(&cpu_buffer->lock); 3302 rb_iter_reset(iter); 3303 arch_spin_unlock(&cpu_buffer->lock); 3304 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3305 3306 return iter; 3307 } 3308 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 3309 3310 /** 3311 * ring_buffer_finish - finish reading the iterator of the buffer 3312 * @iter: The iterator retrieved by ring_buffer_start 3313 * 3314 * This re-enables the recording to the buffer, and frees the 3315 * iterator. 3316 */ 3317 void 3318 ring_buffer_read_finish(struct ring_buffer_iter *iter) 3319 { 3320 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3321 3322 atomic_dec(&cpu_buffer->record_disabled); 3323 kfree(iter); 3324 } 3325 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 3326 3327 /** 3328 * ring_buffer_read - read the next item in the ring buffer by the iterator 3329 * @iter: The ring buffer iterator 3330 * @ts: The time stamp of the event read. 3331 * 3332 * This reads the next event in the ring buffer and increments the iterator. 3333 */ 3334 struct ring_buffer_event * 3335 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 3336 { 3337 struct ring_buffer_event *event; 3338 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3339 unsigned long flags; 3340 3341 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3342 again: 3343 event = rb_iter_peek(iter, ts); 3344 if (!event) 3345 goto out; 3346 3347 if (event->type_len == RINGBUF_TYPE_PADDING) 3348 goto again; 3349 3350 rb_advance_iter(iter); 3351 out: 3352 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3353 3354 return event; 3355 } 3356 EXPORT_SYMBOL_GPL(ring_buffer_read); 3357 3358 /** 3359 * ring_buffer_size - return the size of the ring buffer (in bytes) 3360 * @buffer: The ring buffer. 3361 */ 3362 unsigned long ring_buffer_size(struct ring_buffer *buffer) 3363 { 3364 return BUF_PAGE_SIZE * buffer->pages; 3365 } 3366 EXPORT_SYMBOL_GPL(ring_buffer_size); 3367 3368 static void 3369 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 3370 { 3371 rb_head_page_deactivate(cpu_buffer); 3372 3373 cpu_buffer->head_page 3374 = list_entry(cpu_buffer->pages, struct buffer_page, list); 3375 local_set(&cpu_buffer->head_page->write, 0); 3376 local_set(&cpu_buffer->head_page->entries, 0); 3377 local_set(&cpu_buffer->head_page->page->commit, 0); 3378 3379 cpu_buffer->head_page->read = 0; 3380 3381 cpu_buffer->tail_page = cpu_buffer->head_page; 3382 cpu_buffer->commit_page = cpu_buffer->head_page; 3383 3384 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 3385 local_set(&cpu_buffer->reader_page->write, 0); 3386 local_set(&cpu_buffer->reader_page->entries, 0); 3387 local_set(&cpu_buffer->reader_page->page->commit, 0); 3388 cpu_buffer->reader_page->read = 0; 3389 3390 local_set(&cpu_buffer->commit_overrun, 0); 3391 local_set(&cpu_buffer->overrun, 0); 3392 local_set(&cpu_buffer->entries, 0); 3393 local_set(&cpu_buffer->committing, 0); 3394 local_set(&cpu_buffer->commits, 0); 3395 cpu_buffer->read = 0; 3396 3397 cpu_buffer->write_stamp = 0; 3398 cpu_buffer->read_stamp = 0; 3399 3400 rb_head_page_activate(cpu_buffer); 3401 } 3402 3403 /** 3404 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 3405 * @buffer: The ring buffer to reset a per cpu buffer of 3406 * @cpu: The CPU buffer to be reset 3407 */ 3408 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 3409 { 3410 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3411 unsigned long flags; 3412 3413 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3414 return; 3415 3416 atomic_inc(&cpu_buffer->record_disabled); 3417 3418 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3419 3420 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 3421 goto out; 3422 3423 arch_spin_lock(&cpu_buffer->lock); 3424 3425 rb_reset_cpu(cpu_buffer); 3426 3427 arch_spin_unlock(&cpu_buffer->lock); 3428 3429 out: 3430 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3431 3432 atomic_dec(&cpu_buffer->record_disabled); 3433 } 3434 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 3435 3436 /** 3437 * ring_buffer_reset - reset a ring buffer 3438 * @buffer: The ring buffer to reset all cpu buffers 3439 */ 3440 void ring_buffer_reset(struct ring_buffer *buffer) 3441 { 3442 int cpu; 3443 3444 for_each_buffer_cpu(buffer, cpu) 3445 ring_buffer_reset_cpu(buffer, cpu); 3446 } 3447 EXPORT_SYMBOL_GPL(ring_buffer_reset); 3448 3449 /** 3450 * rind_buffer_empty - is the ring buffer empty? 3451 * @buffer: The ring buffer to test 3452 */ 3453 int ring_buffer_empty(struct ring_buffer *buffer) 3454 { 3455 struct ring_buffer_per_cpu *cpu_buffer; 3456 unsigned long flags; 3457 int dolock; 3458 int cpu; 3459 int ret; 3460 3461 dolock = rb_ok_to_lock(); 3462 3463 /* yes this is racy, but if you don't like the race, lock the buffer */ 3464 for_each_buffer_cpu(buffer, cpu) { 3465 cpu_buffer = buffer->buffers[cpu]; 3466 local_irq_save(flags); 3467 if (dolock) 3468 spin_lock(&cpu_buffer->reader_lock); 3469 ret = rb_per_cpu_empty(cpu_buffer); 3470 if (dolock) 3471 spin_unlock(&cpu_buffer->reader_lock); 3472 local_irq_restore(flags); 3473 3474 if (!ret) 3475 return 0; 3476 } 3477 3478 return 1; 3479 } 3480 EXPORT_SYMBOL_GPL(ring_buffer_empty); 3481 3482 /** 3483 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 3484 * @buffer: The ring buffer 3485 * @cpu: The CPU buffer to test 3486 */ 3487 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 3488 { 3489 struct ring_buffer_per_cpu *cpu_buffer; 3490 unsigned long flags; 3491 int dolock; 3492 int ret; 3493 3494 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3495 return 1; 3496 3497 dolock = rb_ok_to_lock(); 3498 3499 cpu_buffer = buffer->buffers[cpu]; 3500 local_irq_save(flags); 3501 if (dolock) 3502 spin_lock(&cpu_buffer->reader_lock); 3503 ret = rb_per_cpu_empty(cpu_buffer); 3504 if (dolock) 3505 spin_unlock(&cpu_buffer->reader_lock); 3506 local_irq_restore(flags); 3507 3508 return ret; 3509 } 3510 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 3511 3512 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3513 /** 3514 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 3515 * @buffer_a: One buffer to swap with 3516 * @buffer_b: The other buffer to swap with 3517 * 3518 * This function is useful for tracers that want to take a "snapshot" 3519 * of a CPU buffer and has another back up buffer lying around. 3520 * it is expected that the tracer handles the cpu buffer not being 3521 * used at the moment. 3522 */ 3523 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 3524 struct ring_buffer *buffer_b, int cpu) 3525 { 3526 struct ring_buffer_per_cpu *cpu_buffer_a; 3527 struct ring_buffer_per_cpu *cpu_buffer_b; 3528 int ret = -EINVAL; 3529 3530 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 3531 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 3532 goto out; 3533 3534 /* At least make sure the two buffers are somewhat the same */ 3535 if (buffer_a->pages != buffer_b->pages) 3536 goto out; 3537 3538 ret = -EAGAIN; 3539 3540 if (ring_buffer_flags != RB_BUFFERS_ON) 3541 goto out; 3542 3543 if (atomic_read(&buffer_a->record_disabled)) 3544 goto out; 3545 3546 if (atomic_read(&buffer_b->record_disabled)) 3547 goto out; 3548 3549 cpu_buffer_a = buffer_a->buffers[cpu]; 3550 cpu_buffer_b = buffer_b->buffers[cpu]; 3551 3552 if (atomic_read(&cpu_buffer_a->record_disabled)) 3553 goto out; 3554 3555 if (atomic_read(&cpu_buffer_b->record_disabled)) 3556 goto out; 3557 3558 /* 3559 * We can't do a synchronize_sched here because this 3560 * function can be called in atomic context. 3561 * Normally this will be called from the same CPU as cpu. 3562 * If not it's up to the caller to protect this. 3563 */ 3564 atomic_inc(&cpu_buffer_a->record_disabled); 3565 atomic_inc(&cpu_buffer_b->record_disabled); 3566 3567 ret = -EBUSY; 3568 if (local_read(&cpu_buffer_a->committing)) 3569 goto out_dec; 3570 if (local_read(&cpu_buffer_b->committing)) 3571 goto out_dec; 3572 3573 buffer_a->buffers[cpu] = cpu_buffer_b; 3574 buffer_b->buffers[cpu] = cpu_buffer_a; 3575 3576 cpu_buffer_b->buffer = buffer_a; 3577 cpu_buffer_a->buffer = buffer_b; 3578 3579 ret = 0; 3580 3581 out_dec: 3582 atomic_dec(&cpu_buffer_a->record_disabled); 3583 atomic_dec(&cpu_buffer_b->record_disabled); 3584 out: 3585 return ret; 3586 } 3587 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 3588 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 3589 3590 /** 3591 * ring_buffer_alloc_read_page - allocate a page to read from buffer 3592 * @buffer: the buffer to allocate for. 3593 * 3594 * This function is used in conjunction with ring_buffer_read_page. 3595 * When reading a full page from the ring buffer, these functions 3596 * can be used to speed up the process. The calling function should 3597 * allocate a few pages first with this function. Then when it 3598 * needs to get pages from the ring buffer, it passes the result 3599 * of this function into ring_buffer_read_page, which will swap 3600 * the page that was allocated, with the read page of the buffer. 3601 * 3602 * Returns: 3603 * The page allocated, or NULL on error. 3604 */ 3605 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) 3606 { 3607 struct buffer_data_page *bpage; 3608 unsigned long addr; 3609 3610 addr = __get_free_page(GFP_KERNEL); 3611 if (!addr) 3612 return NULL; 3613 3614 bpage = (void *)addr; 3615 3616 rb_init_page(bpage); 3617 3618 return bpage; 3619 } 3620 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 3621 3622 /** 3623 * ring_buffer_free_read_page - free an allocated read page 3624 * @buffer: the buffer the page was allocate for 3625 * @data: the page to free 3626 * 3627 * Free a page allocated from ring_buffer_alloc_read_page. 3628 */ 3629 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) 3630 { 3631 free_page((unsigned long)data); 3632 } 3633 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 3634 3635 /** 3636 * ring_buffer_read_page - extract a page from the ring buffer 3637 * @buffer: buffer to extract from 3638 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 3639 * @len: amount to extract 3640 * @cpu: the cpu of the buffer to extract 3641 * @full: should the extraction only happen when the page is full. 3642 * 3643 * This function will pull out a page from the ring buffer and consume it. 3644 * @data_page must be the address of the variable that was returned 3645 * from ring_buffer_alloc_read_page. This is because the page might be used 3646 * to swap with a page in the ring buffer. 3647 * 3648 * for example: 3649 * rpage = ring_buffer_alloc_read_page(buffer); 3650 * if (!rpage) 3651 * return error; 3652 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 3653 * if (ret >= 0) 3654 * process_page(rpage, ret); 3655 * 3656 * When @full is set, the function will not return true unless 3657 * the writer is off the reader page. 3658 * 3659 * Note: it is up to the calling functions to handle sleeps and wakeups. 3660 * The ring buffer can be used anywhere in the kernel and can not 3661 * blindly call wake_up. The layer that uses the ring buffer must be 3662 * responsible for that. 3663 * 3664 * Returns: 3665 * >=0 if data has been transferred, returns the offset of consumed data. 3666 * <0 if no data has been transferred. 3667 */ 3668 int ring_buffer_read_page(struct ring_buffer *buffer, 3669 void **data_page, size_t len, int cpu, int full) 3670 { 3671 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3672 struct ring_buffer_event *event; 3673 struct buffer_data_page *bpage; 3674 struct buffer_page *reader; 3675 unsigned long flags; 3676 unsigned int commit; 3677 unsigned int read; 3678 u64 save_timestamp; 3679 int ret = -1; 3680 3681 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3682 goto out; 3683 3684 /* 3685 * If len is not big enough to hold the page header, then 3686 * we can not copy anything. 3687 */ 3688 if (len <= BUF_PAGE_HDR_SIZE) 3689 goto out; 3690 3691 len -= BUF_PAGE_HDR_SIZE; 3692 3693 if (!data_page) 3694 goto out; 3695 3696 bpage = *data_page; 3697 if (!bpage) 3698 goto out; 3699 3700 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3701 3702 reader = rb_get_reader_page(cpu_buffer); 3703 if (!reader) 3704 goto out_unlock; 3705 3706 event = rb_reader_event(cpu_buffer); 3707 3708 read = reader->read; 3709 commit = rb_page_commit(reader); 3710 3711 /* 3712 * If this page has been partially read or 3713 * if len is not big enough to read the rest of the page or 3714 * a writer is still on the page, then 3715 * we must copy the data from the page to the buffer. 3716 * Otherwise, we can simply swap the page with the one passed in. 3717 */ 3718 if (read || (len < (commit - read)) || 3719 cpu_buffer->reader_page == cpu_buffer->commit_page) { 3720 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 3721 unsigned int rpos = read; 3722 unsigned int pos = 0; 3723 unsigned int size; 3724 3725 if (full) 3726 goto out_unlock; 3727 3728 if (len > (commit - read)) 3729 len = (commit - read); 3730 3731 size = rb_event_length(event); 3732 3733 if (len < size) 3734 goto out_unlock; 3735 3736 /* save the current timestamp, since the user will need it */ 3737 save_timestamp = cpu_buffer->read_stamp; 3738 3739 /* Need to copy one event at a time */ 3740 do { 3741 memcpy(bpage->data + pos, rpage->data + rpos, size); 3742 3743 len -= size; 3744 3745 rb_advance_reader(cpu_buffer); 3746 rpos = reader->read; 3747 pos += size; 3748 3749 event = rb_reader_event(cpu_buffer); 3750 size = rb_event_length(event); 3751 } while (len > size); 3752 3753 /* update bpage */ 3754 local_set(&bpage->commit, pos); 3755 bpage->time_stamp = save_timestamp; 3756 3757 /* we copied everything to the beginning */ 3758 read = 0; 3759 } else { 3760 /* update the entry counter */ 3761 cpu_buffer->read += rb_page_entries(reader); 3762 3763 /* swap the pages */ 3764 rb_init_page(bpage); 3765 bpage = reader->page; 3766 reader->page = *data_page; 3767 local_set(&reader->write, 0); 3768 local_set(&reader->entries, 0); 3769 reader->read = 0; 3770 *data_page = bpage; 3771 } 3772 ret = read; 3773 3774 out_unlock: 3775 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3776 3777 out: 3778 return ret; 3779 } 3780 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 3781 3782 #ifdef CONFIG_TRACING 3783 static ssize_t 3784 rb_simple_read(struct file *filp, char __user *ubuf, 3785 size_t cnt, loff_t *ppos) 3786 { 3787 unsigned long *p = filp->private_data; 3788 char buf[64]; 3789 int r; 3790 3791 if (test_bit(RB_BUFFERS_DISABLED_BIT, p)) 3792 r = sprintf(buf, "permanently disabled\n"); 3793 else 3794 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p)); 3795 3796 return simple_read_from_buffer(ubuf, cnt, ppos, buf, r); 3797 } 3798 3799 static ssize_t 3800 rb_simple_write(struct file *filp, const char __user *ubuf, 3801 size_t cnt, loff_t *ppos) 3802 { 3803 unsigned long *p = filp->private_data; 3804 char buf[64]; 3805 unsigned long val; 3806 int ret; 3807 3808 if (cnt >= sizeof(buf)) 3809 return -EINVAL; 3810 3811 if (copy_from_user(&buf, ubuf, cnt)) 3812 return -EFAULT; 3813 3814 buf[cnt] = 0; 3815 3816 ret = strict_strtoul(buf, 10, &val); 3817 if (ret < 0) 3818 return ret; 3819 3820 if (val) 3821 set_bit(RB_BUFFERS_ON_BIT, p); 3822 else 3823 clear_bit(RB_BUFFERS_ON_BIT, p); 3824 3825 (*ppos)++; 3826 3827 return cnt; 3828 } 3829 3830 static const struct file_operations rb_simple_fops = { 3831 .open = tracing_open_generic, 3832 .read = rb_simple_read, 3833 .write = rb_simple_write, 3834 }; 3835 3836 3837 static __init int rb_init_debugfs(void) 3838 { 3839 struct dentry *d_tracer; 3840 3841 d_tracer = tracing_init_dentry(); 3842 3843 trace_create_file("tracing_on", 0644, d_tracer, 3844 &ring_buffer_flags, &rb_simple_fops); 3845 3846 return 0; 3847 } 3848 3849 fs_initcall(rb_init_debugfs); 3850 #endif 3851 3852 #ifdef CONFIG_HOTPLUG_CPU 3853 static int rb_cpu_notify(struct notifier_block *self, 3854 unsigned long action, void *hcpu) 3855 { 3856 struct ring_buffer *buffer = 3857 container_of(self, struct ring_buffer, cpu_notify); 3858 long cpu = (long)hcpu; 3859 3860 switch (action) { 3861 case CPU_UP_PREPARE: 3862 case CPU_UP_PREPARE_FROZEN: 3863 if (cpumask_test_cpu(cpu, buffer->cpumask)) 3864 return NOTIFY_OK; 3865 3866 buffer->buffers[cpu] = 3867 rb_allocate_cpu_buffer(buffer, cpu); 3868 if (!buffer->buffers[cpu]) { 3869 WARN(1, "failed to allocate ring buffer on CPU %ld\n", 3870 cpu); 3871 return NOTIFY_OK; 3872 } 3873 smp_wmb(); 3874 cpumask_set_cpu(cpu, buffer->cpumask); 3875 break; 3876 case CPU_DOWN_PREPARE: 3877 case CPU_DOWN_PREPARE_FROZEN: 3878 /* 3879 * Do nothing. 3880 * If we were to free the buffer, then the user would 3881 * lose any trace that was in the buffer. 3882 */ 3883 break; 3884 default: 3885 break; 3886 } 3887 return NOTIFY_OK; 3888 } 3889 #endif 3890