1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <[email protected]> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/spinlock.h> 8 #include <linux/debugfs.h> 9 #include <linux/uaccess.h> 10 #include <linux/module.h> 11 #include <linux/percpu.h> 12 #include <linux/mutex.h> 13 #include <linux/sched.h> /* used for sched_clock() (for now) */ 14 #include <linux/init.h> 15 #include <linux/hash.h> 16 #include <linux/list.h> 17 #include <linux/fs.h> 18 19 #include "trace.h" 20 21 /* 22 * A fast way to enable or disable all ring buffers is to 23 * call tracing_on or tracing_off. Turning off the ring buffers 24 * prevents all ring buffers from being recorded to. 25 * Turning this switch on, makes it OK to write to the 26 * ring buffer, if the ring buffer is enabled itself. 27 * 28 * There's three layers that must be on in order to write 29 * to the ring buffer. 30 * 31 * 1) This global flag must be set. 32 * 2) The ring buffer must be enabled for recording. 33 * 3) The per cpu buffer must be enabled for recording. 34 * 35 * In case of an anomaly, this global flag has a bit set that 36 * will permantly disable all ring buffers. 37 */ 38 39 /* 40 * Global flag to disable all recording to ring buffers 41 * This has two bits: ON, DISABLED 42 * 43 * ON DISABLED 44 * ---- ---------- 45 * 0 0 : ring buffers are off 46 * 1 0 : ring buffers are on 47 * X 1 : ring buffers are permanently disabled 48 */ 49 50 enum { 51 RB_BUFFERS_ON_BIT = 0, 52 RB_BUFFERS_DISABLED_BIT = 1, 53 }; 54 55 enum { 56 RB_BUFFERS_ON = 1 << RB_BUFFERS_ON_BIT, 57 RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT, 58 }; 59 60 static long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; 61 62 /** 63 * tracing_on - enable all tracing buffers 64 * 65 * This function enables all tracing buffers that may have been 66 * disabled with tracing_off. 67 */ 68 void tracing_on(void) 69 { 70 set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 71 } 72 EXPORT_SYMBOL_GPL(tracing_on); 73 74 /** 75 * tracing_off - turn off all tracing buffers 76 * 77 * This function stops all tracing buffers from recording data. 78 * It does not disable any overhead the tracers themselves may 79 * be causing. This function simply causes all recording to 80 * the ring buffers to fail. 81 */ 82 void tracing_off(void) 83 { 84 clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 85 } 86 EXPORT_SYMBOL_GPL(tracing_off); 87 88 /** 89 * tracing_off_permanent - permanently disable ring buffers 90 * 91 * This function, once called, will disable all ring buffers 92 * permanenty. 93 */ 94 void tracing_off_permanent(void) 95 { 96 set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags); 97 } 98 99 #include "trace.h" 100 101 /* Up this if you want to test the TIME_EXTENTS and normalization */ 102 #define DEBUG_SHIFT 0 103 104 /* FIXME!!! */ 105 u64 ring_buffer_time_stamp(int cpu) 106 { 107 u64 time; 108 109 preempt_disable_notrace(); 110 /* shift to debug/test normalization and TIME_EXTENTS */ 111 time = sched_clock() << DEBUG_SHIFT; 112 preempt_enable_no_resched_notrace(); 113 114 return time; 115 } 116 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 117 118 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts) 119 { 120 /* Just stupid testing the normalize function and deltas */ 121 *ts >>= DEBUG_SHIFT; 122 } 123 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 124 125 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event)) 126 #define RB_ALIGNMENT_SHIFT 2 127 #define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT) 128 #define RB_MAX_SMALL_DATA 28 129 130 enum { 131 RB_LEN_TIME_EXTEND = 8, 132 RB_LEN_TIME_STAMP = 16, 133 }; 134 135 /* inline for ring buffer fast paths */ 136 static inline unsigned 137 rb_event_length(struct ring_buffer_event *event) 138 { 139 unsigned length; 140 141 switch (event->type) { 142 case RINGBUF_TYPE_PADDING: 143 /* undefined */ 144 return -1; 145 146 case RINGBUF_TYPE_TIME_EXTEND: 147 return RB_LEN_TIME_EXTEND; 148 149 case RINGBUF_TYPE_TIME_STAMP: 150 return RB_LEN_TIME_STAMP; 151 152 case RINGBUF_TYPE_DATA: 153 if (event->len) 154 length = event->len << RB_ALIGNMENT_SHIFT; 155 else 156 length = event->array[0]; 157 return length + RB_EVNT_HDR_SIZE; 158 default: 159 BUG(); 160 } 161 /* not hit */ 162 return 0; 163 } 164 165 /** 166 * ring_buffer_event_length - return the length of the event 167 * @event: the event to get the length of 168 */ 169 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 170 { 171 return rb_event_length(event); 172 } 173 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 174 175 /* inline for ring buffer fast paths */ 176 static inline void * 177 rb_event_data(struct ring_buffer_event *event) 178 { 179 BUG_ON(event->type != RINGBUF_TYPE_DATA); 180 /* If length is in len field, then array[0] has the data */ 181 if (event->len) 182 return (void *)&event->array[0]; 183 /* Otherwise length is in array[0] and array[1] has the data */ 184 return (void *)&event->array[1]; 185 } 186 187 /** 188 * ring_buffer_event_data - return the data of the event 189 * @event: the event to get the data from 190 */ 191 void *ring_buffer_event_data(struct ring_buffer_event *event) 192 { 193 return rb_event_data(event); 194 } 195 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 196 197 #define for_each_buffer_cpu(buffer, cpu) \ 198 for_each_cpu(cpu, buffer->cpumask) 199 200 #define TS_SHIFT 27 201 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 202 #define TS_DELTA_TEST (~TS_MASK) 203 204 struct buffer_data_page { 205 u64 time_stamp; /* page time stamp */ 206 local_t commit; /* write commited index */ 207 unsigned char data[]; /* data of buffer page */ 208 }; 209 210 struct buffer_page { 211 local_t write; /* index for next write */ 212 unsigned read; /* index for next read */ 213 struct list_head list; /* list of free pages */ 214 struct buffer_data_page *page; /* Actual data page */ 215 }; 216 217 static void rb_init_page(struct buffer_data_page *bpage) 218 { 219 local_set(&bpage->commit, 0); 220 } 221 222 /* 223 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 224 * this issue out. 225 */ 226 static inline void free_buffer_page(struct buffer_page *bpage) 227 { 228 if (bpage->page) 229 free_page((unsigned long)bpage->page); 230 kfree(bpage); 231 } 232 233 /* 234 * We need to fit the time_stamp delta into 27 bits. 235 */ 236 static inline int test_time_stamp(u64 delta) 237 { 238 if (delta & TS_DELTA_TEST) 239 return 1; 240 return 0; 241 } 242 243 #define BUF_PAGE_SIZE (PAGE_SIZE - sizeof(struct buffer_data_page)) 244 245 /* 246 * head_page == tail_page && head == tail then buffer is empty. 247 */ 248 struct ring_buffer_per_cpu { 249 int cpu; 250 struct ring_buffer *buffer; 251 spinlock_t reader_lock; /* serialize readers */ 252 raw_spinlock_t lock; 253 struct lock_class_key lock_key; 254 struct list_head pages; 255 struct buffer_page *head_page; /* read from head */ 256 struct buffer_page *tail_page; /* write to tail */ 257 struct buffer_page *commit_page; /* commited pages */ 258 struct buffer_page *reader_page; 259 unsigned long overrun; 260 unsigned long entries; 261 u64 write_stamp; 262 u64 read_stamp; 263 atomic_t record_disabled; 264 }; 265 266 struct ring_buffer { 267 unsigned pages; 268 unsigned flags; 269 int cpus; 270 cpumask_var_t cpumask; 271 atomic_t record_disabled; 272 273 struct mutex mutex; 274 275 struct ring_buffer_per_cpu **buffers; 276 }; 277 278 struct ring_buffer_iter { 279 struct ring_buffer_per_cpu *cpu_buffer; 280 unsigned long head; 281 struct buffer_page *head_page; 282 u64 read_stamp; 283 }; 284 285 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 286 #define RB_WARN_ON(buffer, cond) \ 287 ({ \ 288 int _____ret = unlikely(cond); \ 289 if (_____ret) { \ 290 atomic_inc(&buffer->record_disabled); \ 291 WARN_ON(1); \ 292 } \ 293 _____ret; \ 294 }) 295 296 /** 297 * check_pages - integrity check of buffer pages 298 * @cpu_buffer: CPU buffer with pages to test 299 * 300 * As a safty measure we check to make sure the data pages have not 301 * been corrupted. 302 */ 303 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 304 { 305 struct list_head *head = &cpu_buffer->pages; 306 struct buffer_page *bpage, *tmp; 307 308 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 309 return -1; 310 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 311 return -1; 312 313 list_for_each_entry_safe(bpage, tmp, head, list) { 314 if (RB_WARN_ON(cpu_buffer, 315 bpage->list.next->prev != &bpage->list)) 316 return -1; 317 if (RB_WARN_ON(cpu_buffer, 318 bpage->list.prev->next != &bpage->list)) 319 return -1; 320 } 321 322 return 0; 323 } 324 325 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 326 unsigned nr_pages) 327 { 328 struct list_head *head = &cpu_buffer->pages; 329 struct buffer_page *bpage, *tmp; 330 unsigned long addr; 331 LIST_HEAD(pages); 332 unsigned i; 333 334 for (i = 0; i < nr_pages; i++) { 335 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 336 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); 337 if (!bpage) 338 goto free_pages; 339 list_add(&bpage->list, &pages); 340 341 addr = __get_free_page(GFP_KERNEL); 342 if (!addr) 343 goto free_pages; 344 bpage->page = (void *)addr; 345 rb_init_page(bpage->page); 346 } 347 348 list_splice(&pages, head); 349 350 rb_check_pages(cpu_buffer); 351 352 return 0; 353 354 free_pages: 355 list_for_each_entry_safe(bpage, tmp, &pages, list) { 356 list_del_init(&bpage->list); 357 free_buffer_page(bpage); 358 } 359 return -ENOMEM; 360 } 361 362 static struct ring_buffer_per_cpu * 363 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) 364 { 365 struct ring_buffer_per_cpu *cpu_buffer; 366 struct buffer_page *bpage; 367 unsigned long addr; 368 int ret; 369 370 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 371 GFP_KERNEL, cpu_to_node(cpu)); 372 if (!cpu_buffer) 373 return NULL; 374 375 cpu_buffer->cpu = cpu; 376 cpu_buffer->buffer = buffer; 377 spin_lock_init(&cpu_buffer->reader_lock); 378 cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED; 379 INIT_LIST_HEAD(&cpu_buffer->pages); 380 381 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 382 GFP_KERNEL, cpu_to_node(cpu)); 383 if (!bpage) 384 goto fail_free_buffer; 385 386 cpu_buffer->reader_page = bpage; 387 addr = __get_free_page(GFP_KERNEL); 388 if (!addr) 389 goto fail_free_reader; 390 bpage->page = (void *)addr; 391 rb_init_page(bpage->page); 392 393 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 394 395 ret = rb_allocate_pages(cpu_buffer, buffer->pages); 396 if (ret < 0) 397 goto fail_free_reader; 398 399 cpu_buffer->head_page 400 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 401 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 402 403 return cpu_buffer; 404 405 fail_free_reader: 406 free_buffer_page(cpu_buffer->reader_page); 407 408 fail_free_buffer: 409 kfree(cpu_buffer); 410 return NULL; 411 } 412 413 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 414 { 415 struct list_head *head = &cpu_buffer->pages; 416 struct buffer_page *bpage, *tmp; 417 418 list_del_init(&cpu_buffer->reader_page->list); 419 free_buffer_page(cpu_buffer->reader_page); 420 421 list_for_each_entry_safe(bpage, tmp, head, list) { 422 list_del_init(&bpage->list); 423 free_buffer_page(bpage); 424 } 425 kfree(cpu_buffer); 426 } 427 428 /* 429 * Causes compile errors if the struct buffer_page gets bigger 430 * than the struct page. 431 */ 432 extern int ring_buffer_page_too_big(void); 433 434 /** 435 * ring_buffer_alloc - allocate a new ring_buffer 436 * @size: the size in bytes per cpu that is needed. 437 * @flags: attributes to set for the ring buffer. 438 * 439 * Currently the only flag that is available is the RB_FL_OVERWRITE 440 * flag. This flag means that the buffer will overwrite old data 441 * when the buffer wraps. If this flag is not set, the buffer will 442 * drop data when the tail hits the head. 443 */ 444 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags) 445 { 446 struct ring_buffer *buffer; 447 int bsize; 448 int cpu; 449 450 /* Paranoid! Optimizes out when all is well */ 451 if (sizeof(struct buffer_page) > sizeof(struct page)) 452 ring_buffer_page_too_big(); 453 454 455 /* keep it in its own cache line */ 456 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 457 GFP_KERNEL); 458 if (!buffer) 459 return NULL; 460 461 if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 462 goto fail_free_buffer; 463 464 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 465 buffer->flags = flags; 466 467 /* need at least two pages */ 468 if (buffer->pages == 1) 469 buffer->pages++; 470 471 cpumask_copy(buffer->cpumask, cpu_possible_mask); 472 buffer->cpus = nr_cpu_ids; 473 474 bsize = sizeof(void *) * nr_cpu_ids; 475 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 476 GFP_KERNEL); 477 if (!buffer->buffers) 478 goto fail_free_cpumask; 479 480 for_each_buffer_cpu(buffer, cpu) { 481 buffer->buffers[cpu] = 482 rb_allocate_cpu_buffer(buffer, cpu); 483 if (!buffer->buffers[cpu]) 484 goto fail_free_buffers; 485 } 486 487 mutex_init(&buffer->mutex); 488 489 return buffer; 490 491 fail_free_buffers: 492 for_each_buffer_cpu(buffer, cpu) { 493 if (buffer->buffers[cpu]) 494 rb_free_cpu_buffer(buffer->buffers[cpu]); 495 } 496 kfree(buffer->buffers); 497 498 fail_free_cpumask: 499 free_cpumask_var(buffer->cpumask); 500 501 fail_free_buffer: 502 kfree(buffer); 503 return NULL; 504 } 505 EXPORT_SYMBOL_GPL(ring_buffer_alloc); 506 507 /** 508 * ring_buffer_free - free a ring buffer. 509 * @buffer: the buffer to free. 510 */ 511 void 512 ring_buffer_free(struct ring_buffer *buffer) 513 { 514 int cpu; 515 516 for_each_buffer_cpu(buffer, cpu) 517 rb_free_cpu_buffer(buffer->buffers[cpu]); 518 519 free_cpumask_var(buffer->cpumask); 520 521 kfree(buffer); 522 } 523 EXPORT_SYMBOL_GPL(ring_buffer_free); 524 525 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 526 527 static void 528 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) 529 { 530 struct buffer_page *bpage; 531 struct list_head *p; 532 unsigned i; 533 534 atomic_inc(&cpu_buffer->record_disabled); 535 synchronize_sched(); 536 537 for (i = 0; i < nr_pages; i++) { 538 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) 539 return; 540 p = cpu_buffer->pages.next; 541 bpage = list_entry(p, struct buffer_page, list); 542 list_del_init(&bpage->list); 543 free_buffer_page(bpage); 544 } 545 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) 546 return; 547 548 rb_reset_cpu(cpu_buffer); 549 550 rb_check_pages(cpu_buffer); 551 552 atomic_dec(&cpu_buffer->record_disabled); 553 554 } 555 556 static void 557 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, 558 struct list_head *pages, unsigned nr_pages) 559 { 560 struct buffer_page *bpage; 561 struct list_head *p; 562 unsigned i; 563 564 atomic_inc(&cpu_buffer->record_disabled); 565 synchronize_sched(); 566 567 for (i = 0; i < nr_pages; i++) { 568 if (RB_WARN_ON(cpu_buffer, list_empty(pages))) 569 return; 570 p = pages->next; 571 bpage = list_entry(p, struct buffer_page, list); 572 list_del_init(&bpage->list); 573 list_add_tail(&bpage->list, &cpu_buffer->pages); 574 } 575 rb_reset_cpu(cpu_buffer); 576 577 rb_check_pages(cpu_buffer); 578 579 atomic_dec(&cpu_buffer->record_disabled); 580 } 581 582 /** 583 * ring_buffer_resize - resize the ring buffer 584 * @buffer: the buffer to resize. 585 * @size: the new size. 586 * 587 * The tracer is responsible for making sure that the buffer is 588 * not being used while changing the size. 589 * Note: We may be able to change the above requirement by using 590 * RCU synchronizations. 591 * 592 * Minimum size is 2 * BUF_PAGE_SIZE. 593 * 594 * Returns -1 on failure. 595 */ 596 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) 597 { 598 struct ring_buffer_per_cpu *cpu_buffer; 599 unsigned nr_pages, rm_pages, new_pages; 600 struct buffer_page *bpage, *tmp; 601 unsigned long buffer_size; 602 unsigned long addr; 603 LIST_HEAD(pages); 604 int i, cpu; 605 606 /* 607 * Always succeed at resizing a non-existent buffer: 608 */ 609 if (!buffer) 610 return size; 611 612 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 613 size *= BUF_PAGE_SIZE; 614 buffer_size = buffer->pages * BUF_PAGE_SIZE; 615 616 /* we need a minimum of two pages */ 617 if (size < BUF_PAGE_SIZE * 2) 618 size = BUF_PAGE_SIZE * 2; 619 620 if (size == buffer_size) 621 return size; 622 623 mutex_lock(&buffer->mutex); 624 625 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 626 627 if (size < buffer_size) { 628 629 /* easy case, just free pages */ 630 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) { 631 mutex_unlock(&buffer->mutex); 632 return -1; 633 } 634 635 rm_pages = buffer->pages - nr_pages; 636 637 for_each_buffer_cpu(buffer, cpu) { 638 cpu_buffer = buffer->buffers[cpu]; 639 rb_remove_pages(cpu_buffer, rm_pages); 640 } 641 goto out; 642 } 643 644 /* 645 * This is a bit more difficult. We only want to add pages 646 * when we can allocate enough for all CPUs. We do this 647 * by allocating all the pages and storing them on a local 648 * link list. If we succeed in our allocation, then we 649 * add these pages to the cpu_buffers. Otherwise we just free 650 * them all and return -ENOMEM; 651 */ 652 if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) { 653 mutex_unlock(&buffer->mutex); 654 return -1; 655 } 656 657 new_pages = nr_pages - buffer->pages; 658 659 for_each_buffer_cpu(buffer, cpu) { 660 for (i = 0; i < new_pages; i++) { 661 bpage = kzalloc_node(ALIGN(sizeof(*bpage), 662 cache_line_size()), 663 GFP_KERNEL, cpu_to_node(cpu)); 664 if (!bpage) 665 goto free_pages; 666 list_add(&bpage->list, &pages); 667 addr = __get_free_page(GFP_KERNEL); 668 if (!addr) 669 goto free_pages; 670 bpage->page = (void *)addr; 671 rb_init_page(bpage->page); 672 } 673 } 674 675 for_each_buffer_cpu(buffer, cpu) { 676 cpu_buffer = buffer->buffers[cpu]; 677 rb_insert_pages(cpu_buffer, &pages, new_pages); 678 } 679 680 if (RB_WARN_ON(buffer, !list_empty(&pages))) { 681 mutex_unlock(&buffer->mutex); 682 return -1; 683 } 684 685 out: 686 buffer->pages = nr_pages; 687 mutex_unlock(&buffer->mutex); 688 689 return size; 690 691 free_pages: 692 list_for_each_entry_safe(bpage, tmp, &pages, list) { 693 list_del_init(&bpage->list); 694 free_buffer_page(bpage); 695 } 696 mutex_unlock(&buffer->mutex); 697 return -ENOMEM; 698 } 699 EXPORT_SYMBOL_GPL(ring_buffer_resize); 700 701 static inline int rb_null_event(struct ring_buffer_event *event) 702 { 703 return event->type == RINGBUF_TYPE_PADDING; 704 } 705 706 static inline void * 707 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) 708 { 709 return bpage->data + index; 710 } 711 712 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 713 { 714 return bpage->page->data + index; 715 } 716 717 static inline struct ring_buffer_event * 718 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 719 { 720 return __rb_page_index(cpu_buffer->reader_page, 721 cpu_buffer->reader_page->read); 722 } 723 724 static inline struct ring_buffer_event * 725 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer) 726 { 727 return __rb_page_index(cpu_buffer->head_page, 728 cpu_buffer->head_page->read); 729 } 730 731 static inline struct ring_buffer_event * 732 rb_iter_head_event(struct ring_buffer_iter *iter) 733 { 734 return __rb_page_index(iter->head_page, iter->head); 735 } 736 737 static inline unsigned rb_page_write(struct buffer_page *bpage) 738 { 739 return local_read(&bpage->write); 740 } 741 742 static inline unsigned rb_page_commit(struct buffer_page *bpage) 743 { 744 return local_read(&bpage->page->commit); 745 } 746 747 /* Size is determined by what has been commited */ 748 static inline unsigned rb_page_size(struct buffer_page *bpage) 749 { 750 return rb_page_commit(bpage); 751 } 752 753 static inline unsigned 754 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 755 { 756 return rb_page_commit(cpu_buffer->commit_page); 757 } 758 759 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer) 760 { 761 return rb_page_commit(cpu_buffer->head_page); 762 } 763 764 /* 765 * When the tail hits the head and the buffer is in overwrite mode, 766 * the head jumps to the next page and all content on the previous 767 * page is discarded. But before doing so, we update the overrun 768 * variable of the buffer. 769 */ 770 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer) 771 { 772 struct ring_buffer_event *event; 773 unsigned long head; 774 775 for (head = 0; head < rb_head_size(cpu_buffer); 776 head += rb_event_length(event)) { 777 778 event = __rb_page_index(cpu_buffer->head_page, head); 779 if (RB_WARN_ON(cpu_buffer, rb_null_event(event))) 780 return; 781 /* Only count data entries */ 782 if (event->type != RINGBUF_TYPE_DATA) 783 continue; 784 cpu_buffer->overrun++; 785 cpu_buffer->entries--; 786 } 787 } 788 789 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 790 struct buffer_page **bpage) 791 { 792 struct list_head *p = (*bpage)->list.next; 793 794 if (p == &cpu_buffer->pages) 795 p = p->next; 796 797 *bpage = list_entry(p, struct buffer_page, list); 798 } 799 800 static inline unsigned 801 rb_event_index(struct ring_buffer_event *event) 802 { 803 unsigned long addr = (unsigned long)event; 804 805 return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE); 806 } 807 808 static inline int 809 rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 810 struct ring_buffer_event *event) 811 { 812 unsigned long addr = (unsigned long)event; 813 unsigned long index; 814 815 index = rb_event_index(event); 816 addr &= PAGE_MASK; 817 818 return cpu_buffer->commit_page->page == (void *)addr && 819 rb_commit_index(cpu_buffer) == index; 820 } 821 822 static inline void 823 rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer, 824 struct ring_buffer_event *event) 825 { 826 unsigned long addr = (unsigned long)event; 827 unsigned long index; 828 829 index = rb_event_index(event); 830 addr &= PAGE_MASK; 831 832 while (cpu_buffer->commit_page->page != (void *)addr) { 833 if (RB_WARN_ON(cpu_buffer, 834 cpu_buffer->commit_page == cpu_buffer->tail_page)) 835 return; 836 cpu_buffer->commit_page->page->commit = 837 cpu_buffer->commit_page->write; 838 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 839 cpu_buffer->write_stamp = 840 cpu_buffer->commit_page->page->time_stamp; 841 } 842 843 /* Now set the commit to the event's index */ 844 local_set(&cpu_buffer->commit_page->page->commit, index); 845 } 846 847 static inline void 848 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 849 { 850 /* 851 * We only race with interrupts and NMIs on this CPU. 852 * If we own the commit event, then we can commit 853 * all others that interrupted us, since the interruptions 854 * are in stack format (they finish before they come 855 * back to us). This allows us to do a simple loop to 856 * assign the commit to the tail. 857 */ 858 again: 859 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 860 cpu_buffer->commit_page->page->commit = 861 cpu_buffer->commit_page->write; 862 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 863 cpu_buffer->write_stamp = 864 cpu_buffer->commit_page->page->time_stamp; 865 /* add barrier to keep gcc from optimizing too much */ 866 barrier(); 867 } 868 while (rb_commit_index(cpu_buffer) != 869 rb_page_write(cpu_buffer->commit_page)) { 870 cpu_buffer->commit_page->page->commit = 871 cpu_buffer->commit_page->write; 872 barrier(); 873 } 874 875 /* again, keep gcc from optimizing */ 876 barrier(); 877 878 /* 879 * If an interrupt came in just after the first while loop 880 * and pushed the tail page forward, we will be left with 881 * a dangling commit that will never go forward. 882 */ 883 if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page)) 884 goto again; 885 } 886 887 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 888 { 889 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 890 cpu_buffer->reader_page->read = 0; 891 } 892 893 static inline void rb_inc_iter(struct ring_buffer_iter *iter) 894 { 895 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 896 897 /* 898 * The iterator could be on the reader page (it starts there). 899 * But the head could have moved, since the reader was 900 * found. Check for this case and assign the iterator 901 * to the head page instead of next. 902 */ 903 if (iter->head_page == cpu_buffer->reader_page) 904 iter->head_page = cpu_buffer->head_page; 905 else 906 rb_inc_page(cpu_buffer, &iter->head_page); 907 908 iter->read_stamp = iter->head_page->page->time_stamp; 909 iter->head = 0; 910 } 911 912 /** 913 * ring_buffer_update_event - update event type and data 914 * @event: the even to update 915 * @type: the type of event 916 * @length: the size of the event field in the ring buffer 917 * 918 * Update the type and data fields of the event. The length 919 * is the actual size that is written to the ring buffer, 920 * and with this, we can determine what to place into the 921 * data field. 922 */ 923 static inline void 924 rb_update_event(struct ring_buffer_event *event, 925 unsigned type, unsigned length) 926 { 927 event->type = type; 928 929 switch (type) { 930 931 case RINGBUF_TYPE_PADDING: 932 break; 933 934 case RINGBUF_TYPE_TIME_EXTEND: 935 event->len = 936 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1)) 937 >> RB_ALIGNMENT_SHIFT; 938 break; 939 940 case RINGBUF_TYPE_TIME_STAMP: 941 event->len = 942 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1)) 943 >> RB_ALIGNMENT_SHIFT; 944 break; 945 946 case RINGBUF_TYPE_DATA: 947 length -= RB_EVNT_HDR_SIZE; 948 if (length > RB_MAX_SMALL_DATA) { 949 event->len = 0; 950 event->array[0] = length; 951 } else 952 event->len = 953 (length + (RB_ALIGNMENT-1)) 954 >> RB_ALIGNMENT_SHIFT; 955 break; 956 default: 957 BUG(); 958 } 959 } 960 961 static inline unsigned rb_calculate_event_length(unsigned length) 962 { 963 struct ring_buffer_event event; /* Used only for sizeof array */ 964 965 /* zero length can cause confusions */ 966 if (!length) 967 length = 1; 968 969 if (length > RB_MAX_SMALL_DATA) 970 length += sizeof(event.array[0]); 971 972 length += RB_EVNT_HDR_SIZE; 973 length = ALIGN(length, RB_ALIGNMENT); 974 975 return length; 976 } 977 978 static struct ring_buffer_event * 979 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 980 unsigned type, unsigned long length, u64 *ts) 981 { 982 struct buffer_page *tail_page, *head_page, *reader_page, *commit_page; 983 unsigned long tail, write; 984 struct ring_buffer *buffer = cpu_buffer->buffer; 985 struct ring_buffer_event *event; 986 unsigned long flags; 987 988 commit_page = cpu_buffer->commit_page; 989 /* we just need to protect against interrupts */ 990 barrier(); 991 tail_page = cpu_buffer->tail_page; 992 write = local_add_return(length, &tail_page->write); 993 tail = write - length; 994 995 /* See if we shot pass the end of this buffer page */ 996 if (write > BUF_PAGE_SIZE) { 997 struct buffer_page *next_page = tail_page; 998 999 local_irq_save(flags); 1000 __raw_spin_lock(&cpu_buffer->lock); 1001 1002 rb_inc_page(cpu_buffer, &next_page); 1003 1004 head_page = cpu_buffer->head_page; 1005 reader_page = cpu_buffer->reader_page; 1006 1007 /* we grabbed the lock before incrementing */ 1008 if (RB_WARN_ON(cpu_buffer, next_page == reader_page)) 1009 goto out_unlock; 1010 1011 /* 1012 * If for some reason, we had an interrupt storm that made 1013 * it all the way around the buffer, bail, and warn 1014 * about it. 1015 */ 1016 if (unlikely(next_page == commit_page)) { 1017 WARN_ON_ONCE(1); 1018 goto out_unlock; 1019 } 1020 1021 if (next_page == head_page) { 1022 if (!(buffer->flags & RB_FL_OVERWRITE)) { 1023 /* reset write */ 1024 if (tail <= BUF_PAGE_SIZE) 1025 local_set(&tail_page->write, tail); 1026 goto out_unlock; 1027 } 1028 1029 /* tail_page has not moved yet? */ 1030 if (tail_page == cpu_buffer->tail_page) { 1031 /* count overflows */ 1032 rb_update_overflow(cpu_buffer); 1033 1034 rb_inc_page(cpu_buffer, &head_page); 1035 cpu_buffer->head_page = head_page; 1036 cpu_buffer->head_page->read = 0; 1037 } 1038 } 1039 1040 /* 1041 * If the tail page is still the same as what we think 1042 * it is, then it is up to us to update the tail 1043 * pointer. 1044 */ 1045 if (tail_page == cpu_buffer->tail_page) { 1046 local_set(&next_page->write, 0); 1047 local_set(&next_page->page->commit, 0); 1048 cpu_buffer->tail_page = next_page; 1049 1050 /* reread the time stamp */ 1051 *ts = ring_buffer_time_stamp(cpu_buffer->cpu); 1052 cpu_buffer->tail_page->page->time_stamp = *ts; 1053 } 1054 1055 /* 1056 * The actual tail page has moved forward. 1057 */ 1058 if (tail < BUF_PAGE_SIZE) { 1059 /* Mark the rest of the page with padding */ 1060 event = __rb_page_index(tail_page, tail); 1061 event->type = RINGBUF_TYPE_PADDING; 1062 } 1063 1064 if (tail <= BUF_PAGE_SIZE) 1065 /* Set the write back to the previous setting */ 1066 local_set(&tail_page->write, tail); 1067 1068 /* 1069 * If this was a commit entry that failed, 1070 * increment that too 1071 */ 1072 if (tail_page == cpu_buffer->commit_page && 1073 tail == rb_commit_index(cpu_buffer)) { 1074 rb_set_commit_to_write(cpu_buffer); 1075 } 1076 1077 __raw_spin_unlock(&cpu_buffer->lock); 1078 local_irq_restore(flags); 1079 1080 /* fail and let the caller try again */ 1081 return ERR_PTR(-EAGAIN); 1082 } 1083 1084 /* We reserved something on the buffer */ 1085 1086 if (RB_WARN_ON(cpu_buffer, write > BUF_PAGE_SIZE)) 1087 return NULL; 1088 1089 event = __rb_page_index(tail_page, tail); 1090 rb_update_event(event, type, length); 1091 1092 /* 1093 * If this is a commit and the tail is zero, then update 1094 * this page's time stamp. 1095 */ 1096 if (!tail && rb_is_commit(cpu_buffer, event)) 1097 cpu_buffer->commit_page->page->time_stamp = *ts; 1098 1099 return event; 1100 1101 out_unlock: 1102 __raw_spin_unlock(&cpu_buffer->lock); 1103 local_irq_restore(flags); 1104 return NULL; 1105 } 1106 1107 static int 1108 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 1109 u64 *ts, u64 *delta) 1110 { 1111 struct ring_buffer_event *event; 1112 static int once; 1113 int ret; 1114 1115 if (unlikely(*delta > (1ULL << 59) && !once++)) { 1116 printk(KERN_WARNING "Delta way too big! %llu" 1117 " ts=%llu write stamp = %llu\n", 1118 (unsigned long long)*delta, 1119 (unsigned long long)*ts, 1120 (unsigned long long)cpu_buffer->write_stamp); 1121 WARN_ON(1); 1122 } 1123 1124 /* 1125 * The delta is too big, we to add a 1126 * new timestamp. 1127 */ 1128 event = __rb_reserve_next(cpu_buffer, 1129 RINGBUF_TYPE_TIME_EXTEND, 1130 RB_LEN_TIME_EXTEND, 1131 ts); 1132 if (!event) 1133 return -EBUSY; 1134 1135 if (PTR_ERR(event) == -EAGAIN) 1136 return -EAGAIN; 1137 1138 /* Only a commited time event can update the write stamp */ 1139 if (rb_is_commit(cpu_buffer, event)) { 1140 /* 1141 * If this is the first on the page, then we need to 1142 * update the page itself, and just put in a zero. 1143 */ 1144 if (rb_event_index(event)) { 1145 event->time_delta = *delta & TS_MASK; 1146 event->array[0] = *delta >> TS_SHIFT; 1147 } else { 1148 cpu_buffer->commit_page->page->time_stamp = *ts; 1149 event->time_delta = 0; 1150 event->array[0] = 0; 1151 } 1152 cpu_buffer->write_stamp = *ts; 1153 /* let the caller know this was the commit */ 1154 ret = 1; 1155 } else { 1156 /* Darn, this is just wasted space */ 1157 event->time_delta = 0; 1158 event->array[0] = 0; 1159 ret = 0; 1160 } 1161 1162 *delta = 0; 1163 1164 return ret; 1165 } 1166 1167 static struct ring_buffer_event * 1168 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, 1169 unsigned type, unsigned long length) 1170 { 1171 struct ring_buffer_event *event; 1172 u64 ts, delta; 1173 int commit = 0; 1174 int nr_loops = 0; 1175 1176 again: 1177 /* 1178 * We allow for interrupts to reenter here and do a trace. 1179 * If one does, it will cause this original code to loop 1180 * back here. Even with heavy interrupts happening, this 1181 * should only happen a few times in a row. If this happens 1182 * 1000 times in a row, there must be either an interrupt 1183 * storm or we have something buggy. 1184 * Bail! 1185 */ 1186 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 1187 return NULL; 1188 1189 ts = ring_buffer_time_stamp(cpu_buffer->cpu); 1190 1191 /* 1192 * Only the first commit can update the timestamp. 1193 * Yes there is a race here. If an interrupt comes in 1194 * just after the conditional and it traces too, then it 1195 * will also check the deltas. More than one timestamp may 1196 * also be made. But only the entry that did the actual 1197 * commit will be something other than zero. 1198 */ 1199 if (cpu_buffer->tail_page == cpu_buffer->commit_page && 1200 rb_page_write(cpu_buffer->tail_page) == 1201 rb_commit_index(cpu_buffer)) { 1202 1203 delta = ts - cpu_buffer->write_stamp; 1204 1205 /* make sure this delta is calculated here */ 1206 barrier(); 1207 1208 /* Did the write stamp get updated already? */ 1209 if (unlikely(ts < cpu_buffer->write_stamp)) 1210 delta = 0; 1211 1212 if (test_time_stamp(delta)) { 1213 1214 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); 1215 1216 if (commit == -EBUSY) 1217 return NULL; 1218 1219 if (commit == -EAGAIN) 1220 goto again; 1221 1222 RB_WARN_ON(cpu_buffer, commit < 0); 1223 } 1224 } else 1225 /* Non commits have zero deltas */ 1226 delta = 0; 1227 1228 event = __rb_reserve_next(cpu_buffer, type, length, &ts); 1229 if (PTR_ERR(event) == -EAGAIN) 1230 goto again; 1231 1232 if (!event) { 1233 if (unlikely(commit)) 1234 /* 1235 * Ouch! We needed a timestamp and it was commited. But 1236 * we didn't get our event reserved. 1237 */ 1238 rb_set_commit_to_write(cpu_buffer); 1239 return NULL; 1240 } 1241 1242 /* 1243 * If the timestamp was commited, make the commit our entry 1244 * now so that we will update it when needed. 1245 */ 1246 if (commit) 1247 rb_set_commit_event(cpu_buffer, event); 1248 else if (!rb_is_commit(cpu_buffer, event)) 1249 delta = 0; 1250 1251 event->time_delta = delta; 1252 1253 return event; 1254 } 1255 1256 static DEFINE_PER_CPU(int, rb_need_resched); 1257 1258 /** 1259 * ring_buffer_lock_reserve - reserve a part of the buffer 1260 * @buffer: the ring buffer to reserve from 1261 * @length: the length of the data to reserve (excluding event header) 1262 * @flags: a pointer to save the interrupt flags 1263 * 1264 * Returns a reseverd event on the ring buffer to copy directly to. 1265 * The user of this interface will need to get the body to write into 1266 * and can use the ring_buffer_event_data() interface. 1267 * 1268 * The length is the length of the data needed, not the event length 1269 * which also includes the event header. 1270 * 1271 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 1272 * If NULL is returned, then nothing has been allocated or locked. 1273 */ 1274 struct ring_buffer_event * 1275 ring_buffer_lock_reserve(struct ring_buffer *buffer, 1276 unsigned long length, 1277 unsigned long *flags) 1278 { 1279 struct ring_buffer_per_cpu *cpu_buffer; 1280 struct ring_buffer_event *event; 1281 int cpu, resched; 1282 1283 if (ring_buffer_flags != RB_BUFFERS_ON) 1284 return NULL; 1285 1286 if (atomic_read(&buffer->record_disabled)) 1287 return NULL; 1288 1289 /* If we are tracing schedule, we don't want to recurse */ 1290 resched = ftrace_preempt_disable(); 1291 1292 cpu = raw_smp_processor_id(); 1293 1294 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1295 goto out; 1296 1297 cpu_buffer = buffer->buffers[cpu]; 1298 1299 if (atomic_read(&cpu_buffer->record_disabled)) 1300 goto out; 1301 1302 length = rb_calculate_event_length(length); 1303 if (length > BUF_PAGE_SIZE) 1304 goto out; 1305 1306 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length); 1307 if (!event) 1308 goto out; 1309 1310 /* 1311 * Need to store resched state on this cpu. 1312 * Only the first needs to. 1313 */ 1314 1315 if (preempt_count() == 1) 1316 per_cpu(rb_need_resched, cpu) = resched; 1317 1318 return event; 1319 1320 out: 1321 ftrace_preempt_enable(resched); 1322 return NULL; 1323 } 1324 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 1325 1326 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 1327 struct ring_buffer_event *event) 1328 { 1329 cpu_buffer->entries++; 1330 1331 /* Only process further if we own the commit */ 1332 if (!rb_is_commit(cpu_buffer, event)) 1333 return; 1334 1335 cpu_buffer->write_stamp += event->time_delta; 1336 1337 rb_set_commit_to_write(cpu_buffer); 1338 } 1339 1340 /** 1341 * ring_buffer_unlock_commit - commit a reserved 1342 * @buffer: The buffer to commit to 1343 * @event: The event pointer to commit. 1344 * @flags: the interrupt flags received from ring_buffer_lock_reserve. 1345 * 1346 * This commits the data to the ring buffer, and releases any locks held. 1347 * 1348 * Must be paired with ring_buffer_lock_reserve. 1349 */ 1350 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 1351 struct ring_buffer_event *event, 1352 unsigned long flags) 1353 { 1354 struct ring_buffer_per_cpu *cpu_buffer; 1355 int cpu = raw_smp_processor_id(); 1356 1357 cpu_buffer = buffer->buffers[cpu]; 1358 1359 rb_commit(cpu_buffer, event); 1360 1361 /* 1362 * Only the last preempt count needs to restore preemption. 1363 */ 1364 if (preempt_count() == 1) 1365 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 1366 else 1367 preempt_enable_no_resched_notrace(); 1368 1369 return 0; 1370 } 1371 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 1372 1373 /** 1374 * ring_buffer_write - write data to the buffer without reserving 1375 * @buffer: The ring buffer to write to. 1376 * @length: The length of the data being written (excluding the event header) 1377 * @data: The data to write to the buffer. 1378 * 1379 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 1380 * one function. If you already have the data to write to the buffer, it 1381 * may be easier to simply call this function. 1382 * 1383 * Note, like ring_buffer_lock_reserve, the length is the length of the data 1384 * and not the length of the event which would hold the header. 1385 */ 1386 int ring_buffer_write(struct ring_buffer *buffer, 1387 unsigned long length, 1388 void *data) 1389 { 1390 struct ring_buffer_per_cpu *cpu_buffer; 1391 struct ring_buffer_event *event; 1392 unsigned long event_length; 1393 void *body; 1394 int ret = -EBUSY; 1395 int cpu, resched; 1396 1397 if (ring_buffer_flags != RB_BUFFERS_ON) 1398 return -EBUSY; 1399 1400 if (atomic_read(&buffer->record_disabled)) 1401 return -EBUSY; 1402 1403 resched = ftrace_preempt_disable(); 1404 1405 cpu = raw_smp_processor_id(); 1406 1407 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1408 goto out; 1409 1410 cpu_buffer = buffer->buffers[cpu]; 1411 1412 if (atomic_read(&cpu_buffer->record_disabled)) 1413 goto out; 1414 1415 event_length = rb_calculate_event_length(length); 1416 event = rb_reserve_next_event(cpu_buffer, 1417 RINGBUF_TYPE_DATA, event_length); 1418 if (!event) 1419 goto out; 1420 1421 body = rb_event_data(event); 1422 1423 memcpy(body, data, length); 1424 1425 rb_commit(cpu_buffer, event); 1426 1427 ret = 0; 1428 out: 1429 ftrace_preempt_enable(resched); 1430 1431 return ret; 1432 } 1433 EXPORT_SYMBOL_GPL(ring_buffer_write); 1434 1435 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 1436 { 1437 struct buffer_page *reader = cpu_buffer->reader_page; 1438 struct buffer_page *head = cpu_buffer->head_page; 1439 struct buffer_page *commit = cpu_buffer->commit_page; 1440 1441 return reader->read == rb_page_commit(reader) && 1442 (commit == reader || 1443 (commit == head && 1444 head->read == rb_page_commit(commit))); 1445 } 1446 1447 /** 1448 * ring_buffer_record_disable - stop all writes into the buffer 1449 * @buffer: The ring buffer to stop writes to. 1450 * 1451 * This prevents all writes to the buffer. Any attempt to write 1452 * to the buffer after this will fail and return NULL. 1453 * 1454 * The caller should call synchronize_sched() after this. 1455 */ 1456 void ring_buffer_record_disable(struct ring_buffer *buffer) 1457 { 1458 atomic_inc(&buffer->record_disabled); 1459 } 1460 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 1461 1462 /** 1463 * ring_buffer_record_enable - enable writes to the buffer 1464 * @buffer: The ring buffer to enable writes 1465 * 1466 * Note, multiple disables will need the same number of enables 1467 * to truely enable the writing (much like preempt_disable). 1468 */ 1469 void ring_buffer_record_enable(struct ring_buffer *buffer) 1470 { 1471 atomic_dec(&buffer->record_disabled); 1472 } 1473 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 1474 1475 /** 1476 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 1477 * @buffer: The ring buffer to stop writes to. 1478 * @cpu: The CPU buffer to stop 1479 * 1480 * This prevents all writes to the buffer. Any attempt to write 1481 * to the buffer after this will fail and return NULL. 1482 * 1483 * The caller should call synchronize_sched() after this. 1484 */ 1485 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 1486 { 1487 struct ring_buffer_per_cpu *cpu_buffer; 1488 1489 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1490 return; 1491 1492 cpu_buffer = buffer->buffers[cpu]; 1493 atomic_inc(&cpu_buffer->record_disabled); 1494 } 1495 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 1496 1497 /** 1498 * ring_buffer_record_enable_cpu - enable writes to the buffer 1499 * @buffer: The ring buffer to enable writes 1500 * @cpu: The CPU to enable. 1501 * 1502 * Note, multiple disables will need the same number of enables 1503 * to truely enable the writing (much like preempt_disable). 1504 */ 1505 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 1506 { 1507 struct ring_buffer_per_cpu *cpu_buffer; 1508 1509 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1510 return; 1511 1512 cpu_buffer = buffer->buffers[cpu]; 1513 atomic_dec(&cpu_buffer->record_disabled); 1514 } 1515 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 1516 1517 /** 1518 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 1519 * @buffer: The ring buffer 1520 * @cpu: The per CPU buffer to get the entries from. 1521 */ 1522 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 1523 { 1524 struct ring_buffer_per_cpu *cpu_buffer; 1525 1526 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1527 return 0; 1528 1529 cpu_buffer = buffer->buffers[cpu]; 1530 return cpu_buffer->entries; 1531 } 1532 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 1533 1534 /** 1535 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer 1536 * @buffer: The ring buffer 1537 * @cpu: The per CPU buffer to get the number of overruns from 1538 */ 1539 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 1540 { 1541 struct ring_buffer_per_cpu *cpu_buffer; 1542 1543 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1544 return 0; 1545 1546 cpu_buffer = buffer->buffers[cpu]; 1547 return cpu_buffer->overrun; 1548 } 1549 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 1550 1551 /** 1552 * ring_buffer_entries - get the number of entries in a buffer 1553 * @buffer: The ring buffer 1554 * 1555 * Returns the total number of entries in the ring buffer 1556 * (all CPU entries) 1557 */ 1558 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 1559 { 1560 struct ring_buffer_per_cpu *cpu_buffer; 1561 unsigned long entries = 0; 1562 int cpu; 1563 1564 /* if you care about this being correct, lock the buffer */ 1565 for_each_buffer_cpu(buffer, cpu) { 1566 cpu_buffer = buffer->buffers[cpu]; 1567 entries += cpu_buffer->entries; 1568 } 1569 1570 return entries; 1571 } 1572 EXPORT_SYMBOL_GPL(ring_buffer_entries); 1573 1574 /** 1575 * ring_buffer_overrun_cpu - get the number of overruns in buffer 1576 * @buffer: The ring buffer 1577 * 1578 * Returns the total number of overruns in the ring buffer 1579 * (all CPU entries) 1580 */ 1581 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 1582 { 1583 struct ring_buffer_per_cpu *cpu_buffer; 1584 unsigned long overruns = 0; 1585 int cpu; 1586 1587 /* if you care about this being correct, lock the buffer */ 1588 for_each_buffer_cpu(buffer, cpu) { 1589 cpu_buffer = buffer->buffers[cpu]; 1590 overruns += cpu_buffer->overrun; 1591 } 1592 1593 return overruns; 1594 } 1595 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 1596 1597 static void rb_iter_reset(struct ring_buffer_iter *iter) 1598 { 1599 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1600 1601 /* Iterator usage is expected to have record disabled */ 1602 if (list_empty(&cpu_buffer->reader_page->list)) { 1603 iter->head_page = cpu_buffer->head_page; 1604 iter->head = cpu_buffer->head_page->read; 1605 } else { 1606 iter->head_page = cpu_buffer->reader_page; 1607 iter->head = cpu_buffer->reader_page->read; 1608 } 1609 if (iter->head) 1610 iter->read_stamp = cpu_buffer->read_stamp; 1611 else 1612 iter->read_stamp = iter->head_page->page->time_stamp; 1613 } 1614 1615 /** 1616 * ring_buffer_iter_reset - reset an iterator 1617 * @iter: The iterator to reset 1618 * 1619 * Resets the iterator, so that it will start from the beginning 1620 * again. 1621 */ 1622 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 1623 { 1624 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1625 unsigned long flags; 1626 1627 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1628 rb_iter_reset(iter); 1629 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1630 } 1631 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 1632 1633 /** 1634 * ring_buffer_iter_empty - check if an iterator has no more to read 1635 * @iter: The iterator to check 1636 */ 1637 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 1638 { 1639 struct ring_buffer_per_cpu *cpu_buffer; 1640 1641 cpu_buffer = iter->cpu_buffer; 1642 1643 return iter->head_page == cpu_buffer->commit_page && 1644 iter->head == rb_commit_index(cpu_buffer); 1645 } 1646 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 1647 1648 static void 1649 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 1650 struct ring_buffer_event *event) 1651 { 1652 u64 delta; 1653 1654 switch (event->type) { 1655 case RINGBUF_TYPE_PADDING: 1656 return; 1657 1658 case RINGBUF_TYPE_TIME_EXTEND: 1659 delta = event->array[0]; 1660 delta <<= TS_SHIFT; 1661 delta += event->time_delta; 1662 cpu_buffer->read_stamp += delta; 1663 return; 1664 1665 case RINGBUF_TYPE_TIME_STAMP: 1666 /* FIXME: not implemented */ 1667 return; 1668 1669 case RINGBUF_TYPE_DATA: 1670 cpu_buffer->read_stamp += event->time_delta; 1671 return; 1672 1673 default: 1674 BUG(); 1675 } 1676 return; 1677 } 1678 1679 static void 1680 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 1681 struct ring_buffer_event *event) 1682 { 1683 u64 delta; 1684 1685 switch (event->type) { 1686 case RINGBUF_TYPE_PADDING: 1687 return; 1688 1689 case RINGBUF_TYPE_TIME_EXTEND: 1690 delta = event->array[0]; 1691 delta <<= TS_SHIFT; 1692 delta += event->time_delta; 1693 iter->read_stamp += delta; 1694 return; 1695 1696 case RINGBUF_TYPE_TIME_STAMP: 1697 /* FIXME: not implemented */ 1698 return; 1699 1700 case RINGBUF_TYPE_DATA: 1701 iter->read_stamp += event->time_delta; 1702 return; 1703 1704 default: 1705 BUG(); 1706 } 1707 return; 1708 } 1709 1710 static struct buffer_page * 1711 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 1712 { 1713 struct buffer_page *reader = NULL; 1714 unsigned long flags; 1715 int nr_loops = 0; 1716 1717 local_irq_save(flags); 1718 __raw_spin_lock(&cpu_buffer->lock); 1719 1720 again: 1721 /* 1722 * This should normally only loop twice. But because the 1723 * start of the reader inserts an empty page, it causes 1724 * a case where we will loop three times. There should be no 1725 * reason to loop four times (that I know of). 1726 */ 1727 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 1728 reader = NULL; 1729 goto out; 1730 } 1731 1732 reader = cpu_buffer->reader_page; 1733 1734 /* If there's more to read, return this page */ 1735 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 1736 goto out; 1737 1738 /* Never should we have an index greater than the size */ 1739 if (RB_WARN_ON(cpu_buffer, 1740 cpu_buffer->reader_page->read > rb_page_size(reader))) 1741 goto out; 1742 1743 /* check if we caught up to the tail */ 1744 reader = NULL; 1745 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 1746 goto out; 1747 1748 /* 1749 * Splice the empty reader page into the list around the head. 1750 * Reset the reader page to size zero. 1751 */ 1752 1753 reader = cpu_buffer->head_page; 1754 cpu_buffer->reader_page->list.next = reader->list.next; 1755 cpu_buffer->reader_page->list.prev = reader->list.prev; 1756 1757 local_set(&cpu_buffer->reader_page->write, 0); 1758 local_set(&cpu_buffer->reader_page->page->commit, 0); 1759 1760 /* Make the reader page now replace the head */ 1761 reader->list.prev->next = &cpu_buffer->reader_page->list; 1762 reader->list.next->prev = &cpu_buffer->reader_page->list; 1763 1764 /* 1765 * If the tail is on the reader, then we must set the head 1766 * to the inserted page, otherwise we set it one before. 1767 */ 1768 cpu_buffer->head_page = cpu_buffer->reader_page; 1769 1770 if (cpu_buffer->commit_page != reader) 1771 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 1772 1773 /* Finally update the reader page to the new head */ 1774 cpu_buffer->reader_page = reader; 1775 rb_reset_reader_page(cpu_buffer); 1776 1777 goto again; 1778 1779 out: 1780 __raw_spin_unlock(&cpu_buffer->lock); 1781 local_irq_restore(flags); 1782 1783 return reader; 1784 } 1785 1786 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 1787 { 1788 struct ring_buffer_event *event; 1789 struct buffer_page *reader; 1790 unsigned length; 1791 1792 reader = rb_get_reader_page(cpu_buffer); 1793 1794 /* This function should not be called when buffer is empty */ 1795 if (RB_WARN_ON(cpu_buffer, !reader)) 1796 return; 1797 1798 event = rb_reader_event(cpu_buffer); 1799 1800 if (event->type == RINGBUF_TYPE_DATA) 1801 cpu_buffer->entries--; 1802 1803 rb_update_read_stamp(cpu_buffer, event); 1804 1805 length = rb_event_length(event); 1806 cpu_buffer->reader_page->read += length; 1807 } 1808 1809 static void rb_advance_iter(struct ring_buffer_iter *iter) 1810 { 1811 struct ring_buffer *buffer; 1812 struct ring_buffer_per_cpu *cpu_buffer; 1813 struct ring_buffer_event *event; 1814 unsigned length; 1815 1816 cpu_buffer = iter->cpu_buffer; 1817 buffer = cpu_buffer->buffer; 1818 1819 /* 1820 * Check if we are at the end of the buffer. 1821 */ 1822 if (iter->head >= rb_page_size(iter->head_page)) { 1823 if (RB_WARN_ON(buffer, 1824 iter->head_page == cpu_buffer->commit_page)) 1825 return; 1826 rb_inc_iter(iter); 1827 return; 1828 } 1829 1830 event = rb_iter_head_event(iter); 1831 1832 length = rb_event_length(event); 1833 1834 /* 1835 * This should not be called to advance the header if we are 1836 * at the tail of the buffer. 1837 */ 1838 if (RB_WARN_ON(cpu_buffer, 1839 (iter->head_page == cpu_buffer->commit_page) && 1840 (iter->head + length > rb_commit_index(cpu_buffer)))) 1841 return; 1842 1843 rb_update_iter_read_stamp(iter, event); 1844 1845 iter->head += length; 1846 1847 /* check for end of page padding */ 1848 if ((iter->head >= rb_page_size(iter->head_page)) && 1849 (iter->head_page != cpu_buffer->commit_page)) 1850 rb_advance_iter(iter); 1851 } 1852 1853 static struct ring_buffer_event * 1854 rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 1855 { 1856 struct ring_buffer_per_cpu *cpu_buffer; 1857 struct ring_buffer_event *event; 1858 struct buffer_page *reader; 1859 int nr_loops = 0; 1860 1861 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1862 return NULL; 1863 1864 cpu_buffer = buffer->buffers[cpu]; 1865 1866 again: 1867 /* 1868 * We repeat when a timestamp is encountered. It is possible 1869 * to get multiple timestamps from an interrupt entering just 1870 * as one timestamp is about to be written. The max times 1871 * that this can happen is the number of nested interrupts we 1872 * can have. Nesting 10 deep of interrupts is clearly 1873 * an anomaly. 1874 */ 1875 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10)) 1876 return NULL; 1877 1878 reader = rb_get_reader_page(cpu_buffer); 1879 if (!reader) 1880 return NULL; 1881 1882 event = rb_reader_event(cpu_buffer); 1883 1884 switch (event->type) { 1885 case RINGBUF_TYPE_PADDING: 1886 RB_WARN_ON(cpu_buffer, 1); 1887 rb_advance_reader(cpu_buffer); 1888 return NULL; 1889 1890 case RINGBUF_TYPE_TIME_EXTEND: 1891 /* Internal data, OK to advance */ 1892 rb_advance_reader(cpu_buffer); 1893 goto again; 1894 1895 case RINGBUF_TYPE_TIME_STAMP: 1896 /* FIXME: not implemented */ 1897 rb_advance_reader(cpu_buffer); 1898 goto again; 1899 1900 case RINGBUF_TYPE_DATA: 1901 if (ts) { 1902 *ts = cpu_buffer->read_stamp + event->time_delta; 1903 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts); 1904 } 1905 return event; 1906 1907 default: 1908 BUG(); 1909 } 1910 1911 return NULL; 1912 } 1913 EXPORT_SYMBOL_GPL(ring_buffer_peek); 1914 1915 static struct ring_buffer_event * 1916 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 1917 { 1918 struct ring_buffer *buffer; 1919 struct ring_buffer_per_cpu *cpu_buffer; 1920 struct ring_buffer_event *event; 1921 int nr_loops = 0; 1922 1923 if (ring_buffer_iter_empty(iter)) 1924 return NULL; 1925 1926 cpu_buffer = iter->cpu_buffer; 1927 buffer = cpu_buffer->buffer; 1928 1929 again: 1930 /* 1931 * We repeat when a timestamp is encountered. It is possible 1932 * to get multiple timestamps from an interrupt entering just 1933 * as one timestamp is about to be written. The max times 1934 * that this can happen is the number of nested interrupts we 1935 * can have. Nesting 10 deep of interrupts is clearly 1936 * an anomaly. 1937 */ 1938 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10)) 1939 return NULL; 1940 1941 if (rb_per_cpu_empty(cpu_buffer)) 1942 return NULL; 1943 1944 event = rb_iter_head_event(iter); 1945 1946 switch (event->type) { 1947 case RINGBUF_TYPE_PADDING: 1948 rb_inc_iter(iter); 1949 goto again; 1950 1951 case RINGBUF_TYPE_TIME_EXTEND: 1952 /* Internal data, OK to advance */ 1953 rb_advance_iter(iter); 1954 goto again; 1955 1956 case RINGBUF_TYPE_TIME_STAMP: 1957 /* FIXME: not implemented */ 1958 rb_advance_iter(iter); 1959 goto again; 1960 1961 case RINGBUF_TYPE_DATA: 1962 if (ts) { 1963 *ts = iter->read_stamp + event->time_delta; 1964 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts); 1965 } 1966 return event; 1967 1968 default: 1969 BUG(); 1970 } 1971 1972 return NULL; 1973 } 1974 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 1975 1976 /** 1977 * ring_buffer_peek - peek at the next event to be read 1978 * @buffer: The ring buffer to read 1979 * @cpu: The cpu to peak at 1980 * @ts: The timestamp counter of this event. 1981 * 1982 * This will return the event that will be read next, but does 1983 * not consume the data. 1984 */ 1985 struct ring_buffer_event * 1986 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 1987 { 1988 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 1989 struct ring_buffer_event *event; 1990 unsigned long flags; 1991 1992 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1993 event = rb_buffer_peek(buffer, cpu, ts); 1994 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1995 1996 return event; 1997 } 1998 1999 /** 2000 * ring_buffer_iter_peek - peek at the next event to be read 2001 * @iter: The ring buffer iterator 2002 * @ts: The timestamp counter of this event. 2003 * 2004 * This will return the event that will be read next, but does 2005 * not increment the iterator. 2006 */ 2007 struct ring_buffer_event * 2008 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 2009 { 2010 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2011 struct ring_buffer_event *event; 2012 unsigned long flags; 2013 2014 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2015 event = rb_iter_peek(iter, ts); 2016 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2017 2018 return event; 2019 } 2020 2021 /** 2022 * ring_buffer_consume - return an event and consume it 2023 * @buffer: The ring buffer to get the next event from 2024 * 2025 * Returns the next event in the ring buffer, and that event is consumed. 2026 * Meaning, that sequential reads will keep returning a different event, 2027 * and eventually empty the ring buffer if the producer is slower. 2028 */ 2029 struct ring_buffer_event * 2030 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) 2031 { 2032 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 2033 struct ring_buffer_event *event; 2034 unsigned long flags; 2035 2036 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2037 return NULL; 2038 2039 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2040 2041 event = rb_buffer_peek(buffer, cpu, ts); 2042 if (!event) 2043 goto out; 2044 2045 rb_advance_reader(cpu_buffer); 2046 2047 out: 2048 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2049 2050 return event; 2051 } 2052 EXPORT_SYMBOL_GPL(ring_buffer_consume); 2053 2054 /** 2055 * ring_buffer_read_start - start a non consuming read of the buffer 2056 * @buffer: The ring buffer to read from 2057 * @cpu: The cpu buffer to iterate over 2058 * 2059 * This starts up an iteration through the buffer. It also disables 2060 * the recording to the buffer until the reading is finished. 2061 * This prevents the reading from being corrupted. This is not 2062 * a consuming read, so a producer is not expected. 2063 * 2064 * Must be paired with ring_buffer_finish. 2065 */ 2066 struct ring_buffer_iter * 2067 ring_buffer_read_start(struct ring_buffer *buffer, int cpu) 2068 { 2069 struct ring_buffer_per_cpu *cpu_buffer; 2070 struct ring_buffer_iter *iter; 2071 unsigned long flags; 2072 2073 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2074 return NULL; 2075 2076 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 2077 if (!iter) 2078 return NULL; 2079 2080 cpu_buffer = buffer->buffers[cpu]; 2081 2082 iter->cpu_buffer = cpu_buffer; 2083 2084 atomic_inc(&cpu_buffer->record_disabled); 2085 synchronize_sched(); 2086 2087 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2088 __raw_spin_lock(&cpu_buffer->lock); 2089 rb_iter_reset(iter); 2090 __raw_spin_unlock(&cpu_buffer->lock); 2091 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2092 2093 return iter; 2094 } 2095 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 2096 2097 /** 2098 * ring_buffer_finish - finish reading the iterator of the buffer 2099 * @iter: The iterator retrieved by ring_buffer_start 2100 * 2101 * This re-enables the recording to the buffer, and frees the 2102 * iterator. 2103 */ 2104 void 2105 ring_buffer_read_finish(struct ring_buffer_iter *iter) 2106 { 2107 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2108 2109 atomic_dec(&cpu_buffer->record_disabled); 2110 kfree(iter); 2111 } 2112 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 2113 2114 /** 2115 * ring_buffer_read - read the next item in the ring buffer by the iterator 2116 * @iter: The ring buffer iterator 2117 * @ts: The time stamp of the event read. 2118 * 2119 * This reads the next event in the ring buffer and increments the iterator. 2120 */ 2121 struct ring_buffer_event * 2122 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 2123 { 2124 struct ring_buffer_event *event; 2125 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2126 unsigned long flags; 2127 2128 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2129 event = rb_iter_peek(iter, ts); 2130 if (!event) 2131 goto out; 2132 2133 rb_advance_iter(iter); 2134 out: 2135 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2136 2137 return event; 2138 } 2139 EXPORT_SYMBOL_GPL(ring_buffer_read); 2140 2141 /** 2142 * ring_buffer_size - return the size of the ring buffer (in bytes) 2143 * @buffer: The ring buffer. 2144 */ 2145 unsigned long ring_buffer_size(struct ring_buffer *buffer) 2146 { 2147 return BUF_PAGE_SIZE * buffer->pages; 2148 } 2149 EXPORT_SYMBOL_GPL(ring_buffer_size); 2150 2151 static void 2152 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 2153 { 2154 cpu_buffer->head_page 2155 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 2156 local_set(&cpu_buffer->head_page->write, 0); 2157 local_set(&cpu_buffer->head_page->page->commit, 0); 2158 2159 cpu_buffer->head_page->read = 0; 2160 2161 cpu_buffer->tail_page = cpu_buffer->head_page; 2162 cpu_buffer->commit_page = cpu_buffer->head_page; 2163 2164 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2165 local_set(&cpu_buffer->reader_page->write, 0); 2166 local_set(&cpu_buffer->reader_page->page->commit, 0); 2167 cpu_buffer->reader_page->read = 0; 2168 2169 cpu_buffer->overrun = 0; 2170 cpu_buffer->entries = 0; 2171 } 2172 2173 /** 2174 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 2175 * @buffer: The ring buffer to reset a per cpu buffer of 2176 * @cpu: The CPU buffer to be reset 2177 */ 2178 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 2179 { 2180 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 2181 unsigned long flags; 2182 2183 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2184 return; 2185 2186 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2187 2188 __raw_spin_lock(&cpu_buffer->lock); 2189 2190 rb_reset_cpu(cpu_buffer); 2191 2192 __raw_spin_unlock(&cpu_buffer->lock); 2193 2194 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2195 } 2196 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 2197 2198 /** 2199 * ring_buffer_reset - reset a ring buffer 2200 * @buffer: The ring buffer to reset all cpu buffers 2201 */ 2202 void ring_buffer_reset(struct ring_buffer *buffer) 2203 { 2204 int cpu; 2205 2206 for_each_buffer_cpu(buffer, cpu) 2207 ring_buffer_reset_cpu(buffer, cpu); 2208 } 2209 EXPORT_SYMBOL_GPL(ring_buffer_reset); 2210 2211 /** 2212 * rind_buffer_empty - is the ring buffer empty? 2213 * @buffer: The ring buffer to test 2214 */ 2215 int ring_buffer_empty(struct ring_buffer *buffer) 2216 { 2217 struct ring_buffer_per_cpu *cpu_buffer; 2218 int cpu; 2219 2220 /* yes this is racy, but if you don't like the race, lock the buffer */ 2221 for_each_buffer_cpu(buffer, cpu) { 2222 cpu_buffer = buffer->buffers[cpu]; 2223 if (!rb_per_cpu_empty(cpu_buffer)) 2224 return 0; 2225 } 2226 return 1; 2227 } 2228 EXPORT_SYMBOL_GPL(ring_buffer_empty); 2229 2230 /** 2231 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 2232 * @buffer: The ring buffer 2233 * @cpu: The CPU buffer to test 2234 */ 2235 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 2236 { 2237 struct ring_buffer_per_cpu *cpu_buffer; 2238 2239 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2240 return 1; 2241 2242 cpu_buffer = buffer->buffers[cpu]; 2243 return rb_per_cpu_empty(cpu_buffer); 2244 } 2245 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 2246 2247 /** 2248 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 2249 * @buffer_a: One buffer to swap with 2250 * @buffer_b: The other buffer to swap with 2251 * 2252 * This function is useful for tracers that want to take a "snapshot" 2253 * of a CPU buffer and has another back up buffer lying around. 2254 * it is expected that the tracer handles the cpu buffer not being 2255 * used at the moment. 2256 */ 2257 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 2258 struct ring_buffer *buffer_b, int cpu) 2259 { 2260 struct ring_buffer_per_cpu *cpu_buffer_a; 2261 struct ring_buffer_per_cpu *cpu_buffer_b; 2262 2263 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 2264 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 2265 return -EINVAL; 2266 2267 /* At least make sure the two buffers are somewhat the same */ 2268 if (buffer_a->pages != buffer_b->pages) 2269 return -EINVAL; 2270 2271 cpu_buffer_a = buffer_a->buffers[cpu]; 2272 cpu_buffer_b = buffer_b->buffers[cpu]; 2273 2274 /* 2275 * We can't do a synchronize_sched here because this 2276 * function can be called in atomic context. 2277 * Normally this will be called from the same CPU as cpu. 2278 * If not it's up to the caller to protect this. 2279 */ 2280 atomic_inc(&cpu_buffer_a->record_disabled); 2281 atomic_inc(&cpu_buffer_b->record_disabled); 2282 2283 buffer_a->buffers[cpu] = cpu_buffer_b; 2284 buffer_b->buffers[cpu] = cpu_buffer_a; 2285 2286 cpu_buffer_b->buffer = buffer_a; 2287 cpu_buffer_a->buffer = buffer_b; 2288 2289 atomic_dec(&cpu_buffer_a->record_disabled); 2290 atomic_dec(&cpu_buffer_b->record_disabled); 2291 2292 return 0; 2293 } 2294 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 2295 2296 static void rb_remove_entries(struct ring_buffer_per_cpu *cpu_buffer, 2297 struct buffer_data_page *bpage) 2298 { 2299 struct ring_buffer_event *event; 2300 unsigned long head; 2301 2302 __raw_spin_lock(&cpu_buffer->lock); 2303 for (head = 0; head < local_read(&bpage->commit); 2304 head += rb_event_length(event)) { 2305 2306 event = __rb_data_page_index(bpage, head); 2307 if (RB_WARN_ON(cpu_buffer, rb_null_event(event))) 2308 return; 2309 /* Only count data entries */ 2310 if (event->type != RINGBUF_TYPE_DATA) 2311 continue; 2312 cpu_buffer->entries--; 2313 } 2314 __raw_spin_unlock(&cpu_buffer->lock); 2315 } 2316 2317 /** 2318 * ring_buffer_alloc_read_page - allocate a page to read from buffer 2319 * @buffer: the buffer to allocate for. 2320 * 2321 * This function is used in conjunction with ring_buffer_read_page. 2322 * When reading a full page from the ring buffer, these functions 2323 * can be used to speed up the process. The calling function should 2324 * allocate a few pages first with this function. Then when it 2325 * needs to get pages from the ring buffer, it passes the result 2326 * of this function into ring_buffer_read_page, which will swap 2327 * the page that was allocated, with the read page of the buffer. 2328 * 2329 * Returns: 2330 * The page allocated, or NULL on error. 2331 */ 2332 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) 2333 { 2334 unsigned long addr; 2335 struct buffer_data_page *bpage; 2336 2337 addr = __get_free_page(GFP_KERNEL); 2338 if (!addr) 2339 return NULL; 2340 2341 bpage = (void *)addr; 2342 2343 return bpage; 2344 } 2345 2346 /** 2347 * ring_buffer_free_read_page - free an allocated read page 2348 * @buffer: the buffer the page was allocate for 2349 * @data: the page to free 2350 * 2351 * Free a page allocated from ring_buffer_alloc_read_page. 2352 */ 2353 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) 2354 { 2355 free_page((unsigned long)data); 2356 } 2357 2358 /** 2359 * ring_buffer_read_page - extract a page from the ring buffer 2360 * @buffer: buffer to extract from 2361 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 2362 * @cpu: the cpu of the buffer to extract 2363 * @full: should the extraction only happen when the page is full. 2364 * 2365 * This function will pull out a page from the ring buffer and consume it. 2366 * @data_page must be the address of the variable that was returned 2367 * from ring_buffer_alloc_read_page. This is because the page might be used 2368 * to swap with a page in the ring buffer. 2369 * 2370 * for example: 2371 * rpage = ring_buffer_alloc_page(buffer); 2372 * if (!rpage) 2373 * return error; 2374 * ret = ring_buffer_read_page(buffer, &rpage, cpu, 0); 2375 * if (ret) 2376 * process_page(rpage); 2377 * 2378 * When @full is set, the function will not return true unless 2379 * the writer is off the reader page. 2380 * 2381 * Note: it is up to the calling functions to handle sleeps and wakeups. 2382 * The ring buffer can be used anywhere in the kernel and can not 2383 * blindly call wake_up. The layer that uses the ring buffer must be 2384 * responsible for that. 2385 * 2386 * Returns: 2387 * 1 if data has been transferred 2388 * 0 if no data has been transferred. 2389 */ 2390 int ring_buffer_read_page(struct ring_buffer *buffer, 2391 void **data_page, int cpu, int full) 2392 { 2393 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 2394 struct ring_buffer_event *event; 2395 struct buffer_data_page *bpage; 2396 unsigned long flags; 2397 int ret = 0; 2398 2399 if (!data_page) 2400 return 0; 2401 2402 bpage = *data_page; 2403 if (!bpage) 2404 return 0; 2405 2406 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2407 2408 /* 2409 * rb_buffer_peek will get the next ring buffer if 2410 * the current reader page is empty. 2411 */ 2412 event = rb_buffer_peek(buffer, cpu, NULL); 2413 if (!event) 2414 goto out; 2415 2416 /* check for data */ 2417 if (!local_read(&cpu_buffer->reader_page->page->commit)) 2418 goto out; 2419 /* 2420 * If the writer is already off of the read page, then simply 2421 * switch the read page with the given page. Otherwise 2422 * we need to copy the data from the reader to the writer. 2423 */ 2424 if (cpu_buffer->reader_page == cpu_buffer->commit_page) { 2425 unsigned int read = cpu_buffer->reader_page->read; 2426 2427 if (full) 2428 goto out; 2429 /* The writer is still on the reader page, we must copy */ 2430 bpage = cpu_buffer->reader_page->page; 2431 memcpy(bpage->data, 2432 cpu_buffer->reader_page->page->data + read, 2433 local_read(&bpage->commit) - read); 2434 2435 /* consume what was read */ 2436 cpu_buffer->reader_page += read; 2437 2438 } else { 2439 /* swap the pages */ 2440 rb_init_page(bpage); 2441 bpage = cpu_buffer->reader_page->page; 2442 cpu_buffer->reader_page->page = *data_page; 2443 cpu_buffer->reader_page->read = 0; 2444 *data_page = bpage; 2445 } 2446 ret = 1; 2447 2448 /* update the entry counter */ 2449 rb_remove_entries(cpu_buffer, bpage); 2450 out: 2451 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2452 2453 return ret; 2454 } 2455 2456 static ssize_t 2457 rb_simple_read(struct file *filp, char __user *ubuf, 2458 size_t cnt, loff_t *ppos) 2459 { 2460 long *p = filp->private_data; 2461 char buf[64]; 2462 int r; 2463 2464 if (test_bit(RB_BUFFERS_DISABLED_BIT, p)) 2465 r = sprintf(buf, "permanently disabled\n"); 2466 else 2467 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p)); 2468 2469 return simple_read_from_buffer(ubuf, cnt, ppos, buf, r); 2470 } 2471 2472 static ssize_t 2473 rb_simple_write(struct file *filp, const char __user *ubuf, 2474 size_t cnt, loff_t *ppos) 2475 { 2476 long *p = filp->private_data; 2477 char buf[64]; 2478 long val; 2479 int ret; 2480 2481 if (cnt >= sizeof(buf)) 2482 return -EINVAL; 2483 2484 if (copy_from_user(&buf, ubuf, cnt)) 2485 return -EFAULT; 2486 2487 buf[cnt] = 0; 2488 2489 ret = strict_strtoul(buf, 10, &val); 2490 if (ret < 0) 2491 return ret; 2492 2493 if (val) 2494 set_bit(RB_BUFFERS_ON_BIT, p); 2495 else 2496 clear_bit(RB_BUFFERS_ON_BIT, p); 2497 2498 (*ppos)++; 2499 2500 return cnt; 2501 } 2502 2503 static struct file_operations rb_simple_fops = { 2504 .open = tracing_open_generic, 2505 .read = rb_simple_read, 2506 .write = rb_simple_write, 2507 }; 2508 2509 2510 static __init int rb_init_debugfs(void) 2511 { 2512 struct dentry *d_tracer; 2513 struct dentry *entry; 2514 2515 d_tracer = tracing_init_dentry(); 2516 2517 entry = debugfs_create_file("tracing_on", 0644, d_tracer, 2518 &ring_buffer_flags, &rb_simple_fops); 2519 if (!entry) 2520 pr_warning("Could not create debugfs 'tracing_on' entry\n"); 2521 2522 return 0; 2523 } 2524 2525 fs_initcall(rb_init_debugfs); 2526