1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2018 Ericsson AB 3 */ 4 5 #include "dsw_evdev.h" 6 7 #ifdef DSW_SORT_DEQUEUED 8 #include "dsw_sort.h" 9 #endif 10 11 #include <stdbool.h> 12 #include <string.h> 13 14 #include <rte_cycles.h> 15 #include <rte_memcpy.h> 16 #include <rte_random.h> 17 18 static bool 19 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port, 20 int32_t credits) 21 { 22 int32_t inflight_credits = port->inflight_credits; 23 int32_t missing_credits = credits - inflight_credits; 24 int32_t total_on_loan; 25 int32_t available; 26 int32_t acquired_credits; 27 int32_t new_total_on_loan; 28 29 if (likely(missing_credits <= 0)) { 30 port->inflight_credits -= credits; 31 return true; 32 } 33 34 total_on_loan = 35 __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED); 36 available = dsw->max_inflight - total_on_loan; 37 acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS); 38 39 if (available < acquired_credits) 40 return false; 41 42 /* This is a race, no locks are involved, and thus some other 43 * thread can allocate tokens in between the check and the 44 * allocation. 45 */ 46 new_total_on_loan = 47 __atomic_add_fetch(&dsw->credits_on_loan, acquired_credits, 48 __ATOMIC_RELAXED); 49 50 if (unlikely(new_total_on_loan > dsw->max_inflight)) { 51 /* Some other port took the last credits */ 52 __atomic_sub_fetch(&dsw->credits_on_loan, acquired_credits, 53 __ATOMIC_RELAXED); 54 return false; 55 } 56 57 DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n", 58 acquired_credits); 59 60 port->inflight_credits += acquired_credits; 61 port->inflight_credits -= credits; 62 63 return true; 64 } 65 66 static void 67 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port, 68 int32_t credits) 69 { 70 port->inflight_credits += credits; 71 72 if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) { 73 int32_t leave_credits = DSW_PORT_MIN_CREDITS; 74 int32_t return_credits = 75 port->inflight_credits - leave_credits; 76 77 port->inflight_credits = leave_credits; 78 79 __atomic_sub_fetch(&dsw->credits_on_loan, return_credits, 80 __ATOMIC_RELAXED); 81 82 DSW_LOG_DP_PORT(DEBUG, port->id, 83 "Returned %d tokens to pool.\n", 84 return_credits); 85 } 86 } 87 88 static void 89 dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new, 90 uint16_t num_forward, uint16_t num_release) 91 { 92 port->new_enqueued += num_new; 93 port->forward_enqueued += num_forward; 94 port->release_enqueued += num_release; 95 } 96 97 static void 98 dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id) 99 { 100 source_port->queue_enqueued[queue_id]++; 101 } 102 103 static void 104 dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num) 105 { 106 port->dequeued += num; 107 } 108 109 static void 110 dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id) 111 { 112 source_port->queue_dequeued[queue_id]++; 113 } 114 115 static void 116 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued) 117 { 118 if (dequeued > 0 && port->busy_start == 0) 119 /* work period begins */ 120 port->busy_start = rte_get_timer_cycles(); 121 else if (dequeued == 0 && port->busy_start > 0) { 122 /* work period ends */ 123 uint64_t work_period = 124 rte_get_timer_cycles() - port->busy_start; 125 port->busy_cycles += work_period; 126 port->busy_start = 0; 127 } 128 } 129 130 static int16_t 131 dsw_port_load_close_period(struct dsw_port *port, uint64_t now) 132 { 133 uint64_t passed = now - port->measurement_start; 134 uint64_t busy_cycles = port->busy_cycles; 135 136 if (port->busy_start > 0) { 137 busy_cycles += (now - port->busy_start); 138 port->busy_start = now; 139 } 140 141 int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed; 142 143 port->measurement_start = now; 144 port->busy_cycles = 0; 145 146 port->total_busy_cycles += busy_cycles; 147 148 return load; 149 } 150 151 static void 152 dsw_port_load_update(struct dsw_port *port, uint64_t now) 153 { 154 int16_t old_load; 155 int16_t period_load; 156 int16_t new_load; 157 158 old_load = __atomic_load_n(&port->load, __ATOMIC_RELAXED); 159 160 period_load = dsw_port_load_close_period(port, now); 161 162 new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) / 163 (DSW_OLD_LOAD_WEIGHT+1); 164 165 __atomic_store_n(&port->load, new_load, __ATOMIC_RELAXED); 166 167 /* The load of the recently immigrated flows should hopefully 168 * be reflected the load estimate by now. 169 */ 170 __atomic_store_n(&port->immigration_load, 0, __ATOMIC_RELAXED); 171 } 172 173 static void 174 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now) 175 { 176 if (now < port->next_load_update) 177 return; 178 179 port->next_load_update = now + port->load_update_interval; 180 181 dsw_port_load_update(port, now); 182 } 183 184 static void 185 dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg) 186 { 187 /* there's always room on the ring */ 188 while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0) 189 rte_pause(); 190 } 191 192 static int 193 dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg) 194 { 195 return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg)); 196 } 197 198 static void 199 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port, 200 uint8_t type, struct dsw_queue_flow *qfs, 201 uint8_t qfs_len) 202 { 203 uint16_t port_id; 204 struct dsw_ctl_msg msg = { 205 .type = type, 206 .originating_port_id = source_port->id, 207 .qfs_len = qfs_len 208 }; 209 210 memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len); 211 212 for (port_id = 0; port_id < dsw->num_ports; port_id++) 213 if (port_id != source_port->id) 214 dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg); 215 } 216 217 static __rte_always_inline bool 218 dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len, 219 uint8_t queue_id, uint16_t flow_hash) 220 { 221 uint16_t i; 222 223 for (i = 0; i < qfs_len; i++) 224 if (qfs[i].queue_id == queue_id && 225 qfs[i].flow_hash == flow_hash) 226 return true; 227 228 return false; 229 } 230 231 static __rte_always_inline bool 232 dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id, 233 uint16_t flow_hash) 234 { 235 return dsw_is_queue_flow_in_ary(port->paused_flows, 236 port->paused_flows_len, 237 queue_id, flow_hash); 238 } 239 240 static void 241 dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs, 242 uint8_t qfs_len) 243 { 244 uint8_t i; 245 246 for (i = 0; i < qfs_len; i++) { 247 struct dsw_queue_flow *qf = &qfs[i]; 248 249 DSW_LOG_DP_PORT(DEBUG, port->id, 250 "Pausing queue_id %d flow_hash %d.\n", 251 qf->queue_id, qf->flow_hash); 252 253 port->paused_flows[port->paused_flows_len] = *qf; 254 port->paused_flows_len++; 255 }; 256 } 257 258 static void 259 dsw_port_remove_paused_flow(struct dsw_port *port, 260 struct dsw_queue_flow *target_qf) 261 { 262 uint16_t i; 263 264 for (i = 0; i < port->paused_flows_len; i++) { 265 struct dsw_queue_flow *qf = &port->paused_flows[i]; 266 267 if (qf->queue_id == target_qf->queue_id && 268 qf->flow_hash == target_qf->flow_hash) { 269 uint16_t last_idx = port->paused_flows_len-1; 270 if (i != last_idx) 271 port->paused_flows[i] = 272 port->paused_flows[last_idx]; 273 port->paused_flows_len--; 274 break; 275 } 276 } 277 } 278 279 static void 280 dsw_port_remove_paused_flows(struct dsw_port *port, 281 struct dsw_queue_flow *qfs, uint8_t qfs_len) 282 { 283 uint8_t i; 284 285 for (i = 0; i < qfs_len; i++) 286 dsw_port_remove_paused_flow(port, &qfs[i]); 287 288 } 289 290 static void 291 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port); 292 293 static void 294 dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port, 295 uint8_t originating_port_id, 296 struct dsw_queue_flow *paused_qfs, 297 uint8_t qfs_len) 298 { 299 struct dsw_ctl_msg cfm = { 300 .type = DSW_CTL_CFM, 301 .originating_port_id = port->id 302 }; 303 304 /* There might be already-scheduled events belonging to the 305 * paused flow in the output buffers. 306 */ 307 dsw_port_flush_out_buffers(dsw, port); 308 309 dsw_port_add_paused_flows(port, paused_qfs, qfs_len); 310 311 /* Make sure any stores to the original port's in_ring is seen 312 * before the ctl message. 313 */ 314 rte_smp_wmb(); 315 316 dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm); 317 } 318 319 struct dsw_queue_flow_burst { 320 struct dsw_queue_flow queue_flow; 321 uint16_t count; 322 }; 323 324 #define DSW_QF_TO_INT(_qf) \ 325 ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash))) 326 327 static inline int 328 dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b) 329 { 330 const struct dsw_queue_flow *qf_a = v_qf_a; 331 const struct dsw_queue_flow *qf_b = v_qf_b; 332 333 return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b); 334 } 335 336 static uint16_t 337 dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len, 338 struct dsw_queue_flow_burst *bursts) 339 { 340 uint16_t i; 341 struct dsw_queue_flow_burst *current_burst = NULL; 342 uint16_t num_bursts = 0; 343 344 /* We don't need the stable property, and the list is likely 345 * large enough for qsort() to outperform dsw_stable_sort(), 346 * so we use qsort() here. 347 */ 348 qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf); 349 350 /* arrange the (now-consecutive) events into bursts */ 351 for (i = 0; i < qfs_len; i++) { 352 if (i == 0 || 353 dsw_cmp_qf(&qfs[i], ¤t_burst->queue_flow) != 0) { 354 current_burst = &bursts[num_bursts]; 355 current_burst->queue_flow = qfs[i]; 356 current_burst->count = 0; 357 num_bursts++; 358 } 359 current_burst->count++; 360 } 361 362 return num_bursts; 363 } 364 365 static bool 366 dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads, 367 int16_t load_limit) 368 { 369 bool below_limit = false; 370 uint16_t i; 371 372 for (i = 0; i < dsw->num_ports; i++) { 373 int16_t measured_load = 374 __atomic_load_n(&dsw->ports[i].load, __ATOMIC_RELAXED); 375 int32_t immigration_load = 376 __atomic_load_n(&dsw->ports[i].immigration_load, 377 __ATOMIC_RELAXED); 378 int32_t load = measured_load + immigration_load; 379 380 load = RTE_MIN(load, DSW_MAX_LOAD); 381 382 if (load < load_limit) 383 below_limit = true; 384 port_loads[i] = load; 385 } 386 return below_limit; 387 } 388 389 static int16_t 390 dsw_flow_load(uint16_t num_events, int16_t port_load) 391 { 392 return ((int32_t)port_load * (int32_t)num_events) / 393 DSW_MAX_EVENTS_RECORDED; 394 } 395 396 static int16_t 397 dsw_evaluate_migration(int16_t source_load, int16_t target_load, 398 int16_t flow_load) 399 { 400 int32_t res_target_load; 401 int32_t imbalance; 402 403 if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION) 404 return -1; 405 406 imbalance = source_load - target_load; 407 408 if (imbalance < DSW_REBALANCE_THRESHOLD) 409 return -1; 410 411 res_target_load = target_load + flow_load; 412 413 /* If the estimated load of the target port will be higher 414 * than the source port's load, it doesn't make sense to move 415 * the flow. 416 */ 417 if (res_target_load > source_load) 418 return -1; 419 420 /* The more idle the target will be, the better. This will 421 * make migration prefer moving smaller flows, and flows to 422 * lightly loaded ports. 423 */ 424 return DSW_MAX_LOAD - res_target_load; 425 } 426 427 static bool 428 dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id) 429 { 430 struct dsw_queue *queue = &dsw->queues[queue_id]; 431 uint16_t i; 432 433 for (i = 0; i < queue->num_serving_ports; i++) 434 if (queue->serving_ports[i] == port_id) 435 return true; 436 437 return false; 438 } 439 440 static bool 441 dsw_select_emigration_target(struct dsw_evdev *dsw, 442 struct dsw_queue_flow_burst *bursts, 443 uint16_t num_bursts, uint8_t source_port_id, 444 int16_t *port_loads, uint16_t num_ports, 445 uint8_t *target_port_ids, 446 struct dsw_queue_flow *target_qfs, 447 uint8_t *targets_len) 448 { 449 int16_t source_port_load = port_loads[source_port_id]; 450 struct dsw_queue_flow *candidate_qf = NULL; 451 uint8_t candidate_port_id = 0; 452 int16_t candidate_weight = -1; 453 int16_t candidate_flow_load = -1; 454 uint16_t i; 455 456 if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) 457 return false; 458 459 for (i = 0; i < num_bursts; i++) { 460 struct dsw_queue_flow_burst *burst = &bursts[i]; 461 struct dsw_queue_flow *qf = &burst->queue_flow; 462 int16_t flow_load; 463 uint16_t port_id; 464 465 if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len, 466 qf->queue_id, qf->flow_hash)) 467 continue; 468 469 flow_load = dsw_flow_load(burst->count, source_port_load); 470 471 for (port_id = 0; port_id < num_ports; port_id++) { 472 int16_t weight; 473 474 if (port_id == source_port_id) 475 continue; 476 477 if (!dsw_is_serving_port(dsw, port_id, qf->queue_id)) 478 continue; 479 480 weight = dsw_evaluate_migration(source_port_load, 481 port_loads[port_id], 482 flow_load); 483 484 if (weight > candidate_weight) { 485 candidate_qf = qf; 486 candidate_port_id = port_id; 487 candidate_weight = weight; 488 candidate_flow_load = flow_load; 489 } 490 } 491 } 492 493 if (candidate_weight < 0) 494 return false; 495 496 DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d " 497 "flow_hash %d (with flow load %d) for migration " 498 "to port %d.\n", candidate_qf->queue_id, 499 candidate_qf->flow_hash, 500 DSW_LOAD_TO_PERCENT(candidate_flow_load), 501 candidate_port_id); 502 503 port_loads[candidate_port_id] += candidate_flow_load; 504 port_loads[source_port_id] -= candidate_flow_load; 505 506 target_port_ids[*targets_len] = candidate_port_id; 507 target_qfs[*targets_len] = *candidate_qf; 508 (*targets_len)++; 509 510 __atomic_add_fetch(&dsw->ports[candidate_port_id].immigration_load, 511 candidate_flow_load, __ATOMIC_RELAXED); 512 513 return true; 514 } 515 516 static void 517 dsw_select_emigration_targets(struct dsw_evdev *dsw, 518 struct dsw_port *source_port, 519 struct dsw_queue_flow_burst *bursts, 520 uint16_t num_bursts, int16_t *port_loads) 521 { 522 struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs; 523 uint8_t *target_port_ids = source_port->emigration_target_port_ids; 524 uint8_t *targets_len = &source_port->emigration_targets_len; 525 uint16_t i; 526 527 for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) { 528 bool found; 529 530 found = dsw_select_emigration_target(dsw, bursts, num_bursts, 531 source_port->id, 532 port_loads, dsw->num_ports, 533 target_port_ids, 534 target_qfs, 535 targets_len); 536 if (!found) 537 break; 538 } 539 540 if (*targets_len == 0) 541 DSW_LOG_DP_PORT(DEBUG, source_port->id, 542 "For the %d flows considered, no target port " 543 "was found.\n", num_bursts); 544 } 545 546 static uint8_t 547 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash) 548 { 549 struct dsw_queue *queue = &dsw->queues[queue_id]; 550 uint8_t port_id; 551 552 if (queue->num_serving_ports > 1) 553 port_id = queue->flow_to_port_map[flow_hash]; 554 else 555 /* A single-link queue, or atomic/ordered/parallel but 556 * with just a single serving port. 557 */ 558 port_id = queue->serving_ports[0]; 559 560 DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled " 561 "to port %d.\n", queue_id, flow_hash, port_id); 562 563 return port_id; 564 } 565 566 static void 567 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port, 568 uint8_t dest_port_id) 569 { 570 struct dsw_port *dest_port = &(dsw->ports[dest_port_id]); 571 uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id]; 572 struct rte_event *buffer = source_port->out_buffer[dest_port_id]; 573 uint16_t enqueued = 0; 574 575 if (*buffer_len == 0) 576 return; 577 578 /* The rings are dimensioned to fit all in-flight events (even 579 * on a single ring), so looping will work. 580 */ 581 do { 582 enqueued += 583 rte_event_ring_enqueue_burst(dest_port->in_ring, 584 buffer+enqueued, 585 *buffer_len-enqueued, 586 NULL); 587 } while (unlikely(enqueued != *buffer_len)); 588 589 (*buffer_len) = 0; 590 } 591 592 static uint16_t 593 dsw_port_get_parallel_flow_id(struct dsw_port *port) 594 { 595 uint16_t flow_id = port->next_parallel_flow_id; 596 597 port->next_parallel_flow_id = 598 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS; 599 600 return flow_id; 601 } 602 603 static void 604 dsw_port_buffer_paused(struct dsw_port *port, 605 const struct rte_event *paused_event) 606 { 607 port->paused_events[port->paused_events_len] = *paused_event; 608 port->paused_events_len++; 609 } 610 611 static void 612 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port, 613 uint8_t dest_port_id, const struct rte_event *event) 614 { 615 struct rte_event *buffer = source_port->out_buffer[dest_port_id]; 616 uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id]; 617 618 if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER) 619 dsw_port_transmit_buffered(dsw, source_port, dest_port_id); 620 621 buffer[*buffer_len] = *event; 622 623 (*buffer_len)++; 624 } 625 626 #define DSW_FLOW_ID_BITS (24) 627 static uint16_t 628 dsw_flow_id_hash(uint32_t flow_id) 629 { 630 uint16_t hash = 0; 631 uint16_t offset = 0; 632 633 do { 634 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK); 635 offset += DSW_MAX_FLOWS_BITS; 636 } while (offset < DSW_FLOW_ID_BITS); 637 638 return hash; 639 } 640 641 static void 642 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port, 643 struct rte_event event) 644 { 645 uint8_t dest_port_id; 646 647 event.flow_id = dsw_port_get_parallel_flow_id(source_port); 648 649 dest_port_id = dsw_schedule(dsw, event.queue_id, 650 dsw_flow_id_hash(event.flow_id)); 651 652 dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event); 653 } 654 655 static void 656 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port, 657 const struct rte_event *event) 658 { 659 uint16_t flow_hash; 660 uint8_t dest_port_id; 661 662 if (unlikely(dsw->queues[event->queue_id].schedule_type == 663 RTE_SCHED_TYPE_PARALLEL)) { 664 dsw_port_buffer_parallel(dsw, source_port, *event); 665 return; 666 } 667 668 flow_hash = dsw_flow_id_hash(event->flow_id); 669 670 if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id, 671 flow_hash))) { 672 dsw_port_buffer_paused(source_port, event); 673 return; 674 } 675 676 dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash); 677 678 dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event); 679 } 680 681 static void 682 dsw_port_flush_paused_events(struct dsw_evdev *dsw, 683 struct dsw_port *source_port, 684 const struct dsw_queue_flow *qf) 685 { 686 uint16_t paused_events_len = source_port->paused_events_len; 687 struct rte_event paused_events[paused_events_len]; 688 uint8_t dest_port_id; 689 uint16_t i; 690 691 if (paused_events_len == 0) 692 return; 693 694 if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash)) 695 return; 696 697 rte_memcpy(paused_events, source_port->paused_events, 698 paused_events_len * sizeof(struct rte_event)); 699 700 source_port->paused_events_len = 0; 701 702 dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash); 703 704 for (i = 0; i < paused_events_len; i++) { 705 struct rte_event *event = &paused_events[i]; 706 uint16_t flow_hash; 707 708 flow_hash = dsw_flow_id_hash(event->flow_id); 709 710 if (event->queue_id == qf->queue_id && 711 flow_hash == qf->flow_hash) 712 dsw_port_buffer_non_paused(dsw, source_port, 713 dest_port_id, event); 714 else 715 dsw_port_buffer_paused(source_port, event); 716 } 717 } 718 719 static void 720 dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished) 721 { 722 uint64_t flow_migration_latency; 723 724 flow_migration_latency = 725 (rte_get_timer_cycles() - port->emigration_start); 726 port->emigration_latency += (flow_migration_latency * finished); 727 port->emigrations += finished; 728 } 729 730 static void 731 dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port, 732 uint8_t schedule_type) 733 { 734 uint8_t i; 735 struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION]; 736 uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION]; 737 uint8_t left_qfs_len = 0; 738 uint8_t finished; 739 740 for (i = 0; i < port->emigration_targets_len; i++) { 741 struct dsw_queue_flow *qf = &port->emigration_target_qfs[i]; 742 uint8_t queue_id = qf->queue_id; 743 uint8_t queue_schedule_type = 744 dsw->queues[queue_id].schedule_type; 745 uint16_t flow_hash = qf->flow_hash; 746 747 if (queue_schedule_type != schedule_type) { 748 left_port_ids[left_qfs_len] = 749 port->emigration_target_port_ids[i]; 750 left_qfs[left_qfs_len] = *qf; 751 left_qfs_len++; 752 continue; 753 } 754 755 DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for " 756 "queue_id %d flow_hash %d.\n", queue_id, 757 flow_hash); 758 759 if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) { 760 dsw_port_remove_paused_flow(port, qf); 761 dsw_port_flush_paused_events(dsw, port, qf); 762 } 763 } 764 765 finished = port->emigration_targets_len - left_qfs_len; 766 767 if (finished > 0) 768 dsw_port_emigration_stats(port, finished); 769 770 for (i = 0; i < left_qfs_len; i++) { 771 port->emigration_target_port_ids[i] = left_port_ids[i]; 772 port->emigration_target_qfs[i] = left_qfs[i]; 773 } 774 port->emigration_targets_len = left_qfs_len; 775 776 if (port->emigration_targets_len == 0) { 777 port->migration_state = DSW_MIGRATION_STATE_IDLE; 778 port->seen_events_len = 0; 779 } 780 } 781 782 static void 783 dsw_port_move_parallel_flows(struct dsw_evdev *dsw, 784 struct dsw_port *source_port) 785 { 786 uint8_t i; 787 788 for (i = 0; i < source_port->emigration_targets_len; i++) { 789 struct dsw_queue_flow *qf = 790 &source_port->emigration_target_qfs[i]; 791 uint8_t queue_id = qf->queue_id; 792 793 if (dsw->queues[queue_id].schedule_type == 794 RTE_SCHED_TYPE_PARALLEL) { 795 uint8_t dest_port_id = 796 source_port->emigration_target_port_ids[i]; 797 uint16_t flow_hash = qf->flow_hash; 798 799 /* Single byte-sized stores are always atomic. */ 800 dsw->queues[queue_id].flow_to_port_map[flow_hash] = 801 dest_port_id; 802 } 803 } 804 805 rte_smp_wmb(); 806 807 dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL); 808 } 809 810 static void 811 dsw_port_consider_emigration(struct dsw_evdev *dsw, 812 struct dsw_port *source_port, 813 uint64_t now) 814 { 815 bool any_port_below_limit; 816 struct dsw_queue_flow *seen_events = source_port->seen_events; 817 uint16_t seen_events_len = source_port->seen_events_len; 818 struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED]; 819 uint16_t num_bursts; 820 int16_t source_port_load; 821 int16_t port_loads[dsw->num_ports]; 822 823 if (now < source_port->next_emigration) 824 return; 825 826 if (dsw->num_ports == 1) 827 return; 828 829 if (seen_events_len < DSW_MAX_EVENTS_RECORDED) 830 return; 831 832 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n"); 833 834 /* Randomize interval to avoid having all threads considering 835 * emigration at the same in point in time, which might lead 836 * to all choosing the same target port. 837 */ 838 source_port->next_emigration = now + 839 source_port->migration_interval / 2 + 840 rte_rand() % source_port->migration_interval; 841 842 if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) { 843 DSW_LOG_DP_PORT(DEBUG, source_port->id, 844 "Emigration already in progress.\n"); 845 return; 846 } 847 848 /* For simplicity, avoid migration in the unlikely case there 849 * is still events to consume in the in_buffer (from the last 850 * emigration). 851 */ 852 if (source_port->in_buffer_len > 0) { 853 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still " 854 "events in the input buffer.\n"); 855 return; 856 } 857 858 source_port_load = 859 __atomic_load_n(&source_port->load, __ATOMIC_RELAXED); 860 if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) { 861 DSW_LOG_DP_PORT(DEBUG, source_port->id, 862 "Load %d is below threshold level %d.\n", 863 DSW_LOAD_TO_PERCENT(source_port_load), 864 DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)); 865 return; 866 } 867 868 /* Avoid starting any expensive operations (sorting etc), in 869 * case of a scenario with all ports above the load limit. 870 */ 871 any_port_below_limit = 872 dsw_retrieve_port_loads(dsw, port_loads, 873 DSW_MAX_TARGET_LOAD_FOR_MIGRATION); 874 if (!any_port_below_limit) { 875 DSW_LOG_DP_PORT(DEBUG, source_port->id, 876 "Candidate target ports are all too highly " 877 "loaded.\n"); 878 return; 879 } 880 881 num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len, 882 bursts); 883 884 /* For non-big-little systems, there's no point in moving the 885 * only (known) flow. 886 */ 887 if (num_bursts < 2) { 888 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow " 889 "queue_id %d flow_hash %d has been seen.\n", 890 bursts[0].queue_flow.queue_id, 891 bursts[0].queue_flow.flow_hash); 892 return; 893 } 894 895 dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts, 896 port_loads); 897 898 if (source_port->emigration_targets_len == 0) 899 return; 900 901 source_port->migration_state = DSW_MIGRATION_STATE_PAUSING; 902 source_port->emigration_start = rte_get_timer_cycles(); 903 904 /* No need to go through the whole pause procedure for 905 * parallel queues, since atomic/ordered semantics need not to 906 * be maintained. 907 */ 908 dsw_port_move_parallel_flows(dsw, source_port); 909 910 /* All flows were on PARALLEL queues. */ 911 if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE) 912 return; 913 914 /* There might be 'loopback' events already scheduled in the 915 * output buffers. 916 */ 917 dsw_port_flush_out_buffers(dsw, source_port); 918 919 dsw_port_add_paused_flows(source_port, 920 source_port->emigration_target_qfs, 921 source_port->emigration_targets_len); 922 923 dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ, 924 source_port->emigration_target_qfs, 925 source_port->emigration_targets_len); 926 source_port->cfm_cnt = 0; 927 } 928 929 static void 930 dsw_port_flush_paused_events(struct dsw_evdev *dsw, 931 struct dsw_port *source_port, 932 const struct dsw_queue_flow *qf); 933 934 static void 935 dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port, 936 uint8_t originating_port_id, 937 struct dsw_queue_flow *paused_qfs, 938 uint8_t qfs_len) 939 { 940 uint16_t i; 941 struct dsw_ctl_msg cfm = { 942 .type = DSW_CTL_CFM, 943 .originating_port_id = port->id 944 }; 945 946 dsw_port_remove_paused_flows(port, paused_qfs, qfs_len); 947 948 rte_smp_rmb(); 949 950 dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm); 951 952 for (i = 0; i < qfs_len; i++) { 953 struct dsw_queue_flow *qf = &paused_qfs[i]; 954 955 if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id) 956 port->immigrations++; 957 958 dsw_port_flush_paused_events(dsw, port, qf); 959 } 960 } 961 962 #define FORWARD_BURST_SIZE (32) 963 964 static void 965 dsw_port_forward_emigrated_flow(struct dsw_port *source_port, 966 struct rte_event_ring *dest_ring, 967 uint8_t queue_id, 968 uint16_t flow_hash) 969 { 970 uint16_t events_left; 971 972 /* Control ring message should been seen before the ring count 973 * is read on the port's in_ring. 974 */ 975 rte_smp_rmb(); 976 977 events_left = rte_event_ring_count(source_port->in_ring); 978 979 while (events_left > 0) { 980 uint16_t in_burst_size = 981 RTE_MIN(FORWARD_BURST_SIZE, events_left); 982 struct rte_event in_burst[in_burst_size]; 983 uint16_t in_len; 984 uint16_t i; 985 986 in_len = rte_event_ring_dequeue_burst(source_port->in_ring, 987 in_burst, 988 in_burst_size, NULL); 989 /* No need to care about bursting forwarded events (to 990 * the destination port's in_ring), since migration 991 * doesn't happen very often, and also the majority of 992 * the dequeued events will likely *not* be forwarded. 993 */ 994 for (i = 0; i < in_len; i++) { 995 struct rte_event *e = &in_burst[i]; 996 if (e->queue_id == queue_id && 997 dsw_flow_id_hash(e->flow_id) == flow_hash) { 998 while (rte_event_ring_enqueue_burst(dest_ring, 999 e, 1, 1000 NULL) != 1) 1001 rte_pause(); 1002 } else { 1003 uint16_t last_idx = source_port->in_buffer_len; 1004 source_port->in_buffer[last_idx] = *e; 1005 source_port->in_buffer_len++; 1006 } 1007 } 1008 1009 events_left -= in_len; 1010 } 1011 } 1012 1013 static void 1014 dsw_port_move_emigrating_flows(struct dsw_evdev *dsw, 1015 struct dsw_port *source_port) 1016 { 1017 uint8_t i; 1018 1019 dsw_port_flush_out_buffers(dsw, source_port); 1020 1021 rte_smp_wmb(); 1022 1023 for (i = 0; i < source_port->emigration_targets_len; i++) { 1024 struct dsw_queue_flow *qf = 1025 &source_port->emigration_target_qfs[i]; 1026 uint8_t dest_port_id = 1027 source_port->emigration_target_port_ids[i]; 1028 struct dsw_port *dest_port = &dsw->ports[dest_port_id]; 1029 1030 dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] = 1031 dest_port_id; 1032 1033 dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring, 1034 qf->queue_id, qf->flow_hash); 1035 } 1036 1037 /* Flow table update and migration destination port's enqueues 1038 * must be seen before the control message. 1039 */ 1040 rte_smp_wmb(); 1041 1042 dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, 1043 source_port->emigration_target_qfs, 1044 source_port->emigration_targets_len); 1045 source_port->cfm_cnt = 0; 1046 source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING; 1047 } 1048 1049 static void 1050 dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port) 1051 { 1052 port->cfm_cnt++; 1053 1054 if (port->cfm_cnt == (dsw->num_ports-1)) { 1055 switch (port->migration_state) { 1056 case DSW_MIGRATION_STATE_PAUSING: 1057 DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding " 1058 "migration state.\n"); 1059 port->migration_state = DSW_MIGRATION_STATE_FORWARDING; 1060 break; 1061 case DSW_MIGRATION_STATE_UNPAUSING: 1062 dsw_port_end_emigration(dsw, port, 1063 RTE_SCHED_TYPE_ATOMIC); 1064 break; 1065 default: 1066 RTE_ASSERT(0); 1067 break; 1068 } 1069 } 1070 } 1071 1072 static void 1073 dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port) 1074 { 1075 struct dsw_ctl_msg msg; 1076 1077 if (dsw_port_ctl_dequeue(port, &msg) == 0) { 1078 switch (msg.type) { 1079 case DSW_CTL_PAUS_REQ: 1080 dsw_port_handle_pause_flows(dsw, port, 1081 msg.originating_port_id, 1082 msg.qfs, msg.qfs_len); 1083 break; 1084 case DSW_CTL_UNPAUS_REQ: 1085 dsw_port_handle_unpause_flows(dsw, port, 1086 msg.originating_port_id, 1087 msg.qfs, msg.qfs_len); 1088 break; 1089 case DSW_CTL_CFM: 1090 dsw_port_handle_confirm(dsw, port); 1091 break; 1092 } 1093 } 1094 } 1095 1096 static void 1097 dsw_port_note_op(struct dsw_port *port, uint16_t num_events) 1098 { 1099 /* To pull the control ring reasonably often on busy ports, 1100 * each dequeued/enqueued event is considered an 'op' too. 1101 */ 1102 port->ops_since_bg_task += (num_events+1); 1103 } 1104 1105 static void 1106 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port) 1107 { 1108 if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING && 1109 port->pending_releases == 0)) 1110 dsw_port_move_emigrating_flows(dsw, port); 1111 1112 /* Polling the control ring is relatively inexpensive, and 1113 * polling it often helps bringing down migration latency, so 1114 * do this for every iteration. 1115 */ 1116 dsw_port_ctl_process(dsw, port); 1117 1118 /* To avoid considering migration and flushing output buffers 1119 * on every dequeue/enqueue call, the scheduler only performs 1120 * such 'background' tasks every nth 1121 * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation. 1122 */ 1123 if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) { 1124 uint64_t now; 1125 1126 now = rte_get_timer_cycles(); 1127 1128 port->last_bg = now; 1129 1130 /* Logic to avoid having events linger in the output 1131 * buffer too long. 1132 */ 1133 dsw_port_flush_out_buffers(dsw, port); 1134 1135 dsw_port_consider_load_update(port, now); 1136 1137 dsw_port_consider_emigration(dsw, port, now); 1138 1139 port->ops_since_bg_task = 0; 1140 } 1141 } 1142 1143 static void 1144 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port) 1145 { 1146 uint16_t dest_port_id; 1147 1148 for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++) 1149 dsw_port_transmit_buffered(dsw, source_port, dest_port_id); 1150 } 1151 1152 uint16_t 1153 dsw_event_enqueue(void *port, const struct rte_event *ev) 1154 { 1155 return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1); 1156 } 1157 1158 static __rte_always_inline uint16_t 1159 dsw_event_enqueue_burst_generic(struct dsw_port *source_port, 1160 const struct rte_event events[], 1161 uint16_t events_len, bool op_types_known, 1162 uint16_t num_new, uint16_t num_release, 1163 uint16_t num_non_release) 1164 { 1165 struct dsw_evdev *dsw = source_port->dsw; 1166 bool enough_credits; 1167 uint16_t i; 1168 1169 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d " 1170 "events to port %d.\n", events_len, source_port->id); 1171 1172 dsw_port_bg_process(dsw, source_port); 1173 1174 /* XXX: For performance (=ring efficiency) reasons, the 1175 * scheduler relies on internal non-ring buffers instead of 1176 * immediately sending the event to the destination ring. For 1177 * a producer that doesn't intend to produce or consume any 1178 * more events, the scheduler provides a way to flush the 1179 * buffer, by means of doing an enqueue of zero events. In 1180 * addition, a port cannot be left "unattended" (e.g. unused) 1181 * for long periods of time, since that would stall 1182 * migration. Eventdev API extensions to provide a cleaner way 1183 * to archive both of these functions should be 1184 * considered. 1185 */ 1186 if (unlikely(events_len == 0)) { 1187 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK); 1188 dsw_port_flush_out_buffers(dsw, source_port); 1189 return 0; 1190 } 1191 1192 dsw_port_note_op(source_port, events_len); 1193 1194 if (!op_types_known) 1195 for (i = 0; i < events_len; i++) { 1196 switch (events[i].op) { 1197 case RTE_EVENT_OP_RELEASE: 1198 num_release++; 1199 break; 1200 case RTE_EVENT_OP_NEW: 1201 num_new++; 1202 /* Falls through. */ 1203 default: 1204 num_non_release++; 1205 break; 1206 } 1207 } 1208 1209 /* Technically, we could allow the non-new events up to the 1210 * first new event in the array into the system, but for 1211 * simplicity reasons, we deny the whole burst if the port is 1212 * above the water mark. 1213 */ 1214 if (unlikely(num_new > 0 && 1215 __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED) > 1216 source_port->new_event_threshold)) 1217 return 0; 1218 1219 enough_credits = dsw_port_acquire_credits(dsw, source_port, 1220 num_non_release); 1221 if (unlikely(!enough_credits)) 1222 return 0; 1223 1224 source_port->pending_releases -= num_release; 1225 1226 dsw_port_enqueue_stats(source_port, num_new, 1227 num_non_release-num_new, num_release); 1228 1229 for (i = 0; i < events_len; i++) { 1230 const struct rte_event *event = &events[i]; 1231 1232 if (likely(num_release == 0 || 1233 event->op != RTE_EVENT_OP_RELEASE)) 1234 dsw_port_buffer_event(dsw, source_port, event); 1235 dsw_port_queue_enqueue_stats(source_port, event->queue_id); 1236 } 1237 1238 DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events " 1239 "accepted.\n", num_non_release); 1240 1241 return (num_non_release + num_release); 1242 } 1243 1244 uint16_t 1245 dsw_event_enqueue_burst(void *port, const struct rte_event events[], 1246 uint16_t events_len) 1247 { 1248 struct dsw_port *source_port = port; 1249 1250 if (unlikely(events_len > source_port->enqueue_depth)) 1251 events_len = source_port->enqueue_depth; 1252 1253 return dsw_event_enqueue_burst_generic(source_port, events, 1254 events_len, false, 0, 0, 0); 1255 } 1256 1257 uint16_t 1258 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[], 1259 uint16_t events_len) 1260 { 1261 struct dsw_port *source_port = port; 1262 1263 if (unlikely(events_len > source_port->enqueue_depth)) 1264 events_len = source_port->enqueue_depth; 1265 1266 return dsw_event_enqueue_burst_generic(source_port, events, 1267 events_len, true, events_len, 1268 0, events_len); 1269 } 1270 1271 uint16_t 1272 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[], 1273 uint16_t events_len) 1274 { 1275 struct dsw_port *source_port = port; 1276 1277 if (unlikely(events_len > source_port->enqueue_depth)) 1278 events_len = source_port->enqueue_depth; 1279 1280 return dsw_event_enqueue_burst_generic(source_port, events, 1281 events_len, true, 0, 0, 1282 events_len); 1283 } 1284 1285 uint16_t 1286 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait) 1287 { 1288 return dsw_event_dequeue_burst(port, events, 1, wait); 1289 } 1290 1291 static void 1292 dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events, 1293 uint16_t num) 1294 { 1295 uint16_t i; 1296 1297 dsw_port_dequeue_stats(port, num); 1298 1299 for (i = 0; i < num; i++) { 1300 uint16_t l_idx = port->seen_events_idx; 1301 struct dsw_queue_flow *qf = &port->seen_events[l_idx]; 1302 struct rte_event *event = &events[i]; 1303 qf->queue_id = event->queue_id; 1304 qf->flow_hash = dsw_flow_id_hash(event->flow_id); 1305 1306 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED; 1307 1308 dsw_port_queue_dequeued_stats(port, event->queue_id); 1309 } 1310 1311 if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED)) 1312 port->seen_events_len = 1313 RTE_MIN(port->seen_events_len + num, 1314 DSW_MAX_EVENTS_RECORDED); 1315 } 1316 1317 #ifdef DSW_SORT_DEQUEUED 1318 1319 #define DSW_EVENT_TO_INT(_event) \ 1320 ((int)((((_event)->queue_id)<<16)|((_event)->flow_id))) 1321 1322 static inline int 1323 dsw_cmp_event(const void *v_event_a, const void *v_event_b) 1324 { 1325 const struct rte_event *event_a = v_event_a; 1326 const struct rte_event *event_b = v_event_b; 1327 1328 return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b); 1329 } 1330 #endif 1331 1332 static uint16_t 1333 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events, 1334 uint16_t num) 1335 { 1336 if (unlikely(port->in_buffer_len > 0)) { 1337 uint16_t dequeued = RTE_MIN(num, port->in_buffer_len); 1338 1339 rte_memcpy(events, &port->in_buffer[port->in_buffer_start], 1340 dequeued * sizeof(struct rte_event)); 1341 1342 port->in_buffer_start += dequeued; 1343 port->in_buffer_len -= dequeued; 1344 1345 if (port->in_buffer_len == 0) 1346 port->in_buffer_start = 0; 1347 1348 return dequeued; 1349 } 1350 1351 return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL); 1352 } 1353 1354 uint16_t 1355 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num, 1356 uint64_t wait __rte_unused) 1357 { 1358 struct dsw_port *source_port = port; 1359 struct dsw_evdev *dsw = source_port->dsw; 1360 uint16_t dequeued; 1361 1362 source_port->pending_releases = 0; 1363 1364 dsw_port_bg_process(dsw, source_port); 1365 1366 if (unlikely(num > source_port->dequeue_depth)) 1367 num = source_port->dequeue_depth; 1368 1369 dequeued = dsw_port_dequeue_burst(source_port, events, num); 1370 1371 source_port->pending_releases = dequeued; 1372 1373 dsw_port_load_record(source_port, dequeued); 1374 1375 dsw_port_note_op(source_port, dequeued); 1376 1377 if (dequeued > 0) { 1378 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n", 1379 dequeued); 1380 1381 dsw_port_return_credits(dsw, source_port, dequeued); 1382 1383 /* One potential optimization one might think of is to 1384 * add a migration state (prior to 'pausing'), and 1385 * only record seen events when the port is in this 1386 * state (and transit to 'pausing' when enough events 1387 * have been gathered). However, that schema doesn't 1388 * seem to improve performance. 1389 */ 1390 dsw_port_record_seen_events(port, events, dequeued); 1391 } else /* Zero-size dequeue means a likely idle port, and thus 1392 * we can afford trading some efficiency for a slightly 1393 * reduced event wall-time latency. 1394 */ 1395 dsw_port_flush_out_buffers(dsw, port); 1396 1397 #ifdef DSW_SORT_DEQUEUED 1398 dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event); 1399 #endif 1400 1401 return dequeued; 1402 } 1403 1404 void dsw_event_maintain(void *port, int op) 1405 { 1406 struct dsw_port *source_port = port; 1407 struct dsw_evdev *dsw = source_port->dsw; 1408 1409 dsw_port_note_op(source_port, 0); 1410 dsw_port_bg_process(dsw, source_port); 1411 1412 if (op & RTE_EVENT_DEV_MAINT_OP_FLUSH) 1413 dsw_port_flush_out_buffers(dsw, source_port); 1414 } 1415