xref: /f-stack/dpdk/drivers/event/dsw/dsw_event.c (revision 2d9fd380)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4 
5 #include "dsw_evdev.h"
6 
7 #ifdef DSW_SORT_DEQUEUED
8 #include "dsw_sort.h"
9 #endif
10 
11 #include <stdbool.h>
12 #include <string.h>
13 
14 #include <rte_atomic.h>
15 #include <rte_cycles.h>
16 #include <rte_memcpy.h>
17 #include <rte_random.h>
18 
19 static bool
dsw_port_acquire_credits(struct dsw_evdev * dsw,struct dsw_port * port,int32_t credits)20 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
21 			 int32_t credits)
22 {
23 	int32_t inflight_credits = port->inflight_credits;
24 	int32_t missing_credits = credits - inflight_credits;
25 	int32_t total_on_loan;
26 	int32_t available;
27 	int32_t acquired_credits;
28 	int32_t new_total_on_loan;
29 
30 	if (likely(missing_credits <= 0)) {
31 		port->inflight_credits -= credits;
32 		return true;
33 	}
34 
35 	total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
36 	available = dsw->max_inflight - total_on_loan;
37 	acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
38 
39 	if (available < acquired_credits)
40 		return false;
41 
42 	/* This is a race, no locks are involved, and thus some other
43 	 * thread can allocate tokens in between the check and the
44 	 * allocation.
45 	 */
46 	new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
47 						    acquired_credits);
48 
49 	if (unlikely(new_total_on_loan > dsw->max_inflight)) {
50 		/* Some other port took the last credits */
51 		rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
52 		return false;
53 	}
54 
55 	DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
56 			acquired_credits);
57 
58 	port->inflight_credits += acquired_credits;
59 	port->inflight_credits -= credits;
60 
61 	return true;
62 }
63 
64 static void
dsw_port_return_credits(struct dsw_evdev * dsw,struct dsw_port * port,int32_t credits)65 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
66 			int32_t credits)
67 {
68 	port->inflight_credits += credits;
69 
70 	if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
71 		int32_t leave_credits = DSW_PORT_MIN_CREDITS;
72 		int32_t return_credits =
73 			port->inflight_credits - leave_credits;
74 
75 		port->inflight_credits = leave_credits;
76 
77 		rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
78 
79 		DSW_LOG_DP_PORT(DEBUG, port->id,
80 				"Returned %d tokens to pool.\n",
81 				return_credits);
82 	}
83 }
84 
85 static void
dsw_port_enqueue_stats(struct dsw_port * port,uint16_t num_new,uint16_t num_forward,uint16_t num_release)86 dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
87 		       uint16_t num_forward, uint16_t num_release)
88 {
89 	port->new_enqueued += num_new;
90 	port->forward_enqueued += num_forward;
91 	port->release_enqueued += num_release;
92 }
93 
94 static void
dsw_port_queue_enqueue_stats(struct dsw_port * source_port,uint8_t queue_id)95 dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
96 {
97 	source_port->queue_enqueued[queue_id]++;
98 }
99 
100 static void
dsw_port_dequeue_stats(struct dsw_port * port,uint16_t num)101 dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
102 {
103 	port->dequeued += num;
104 }
105 
106 static void
dsw_port_queue_dequeued_stats(struct dsw_port * source_port,uint8_t queue_id)107 dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
108 {
109 	source_port->queue_dequeued[queue_id]++;
110 }
111 
112 static void
dsw_port_load_record(struct dsw_port * port,unsigned int dequeued)113 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
114 {
115 	if (dequeued > 0 && port->busy_start == 0)
116 		/* work period begins */
117 		port->busy_start = rte_get_timer_cycles();
118 	else if (dequeued == 0 && port->busy_start > 0) {
119 		/* work period ends */
120 		uint64_t work_period =
121 			rte_get_timer_cycles() - port->busy_start;
122 		port->busy_cycles += work_period;
123 		port->busy_start = 0;
124 	}
125 }
126 
127 static int16_t
dsw_port_load_close_period(struct dsw_port * port,uint64_t now)128 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
129 {
130 	uint64_t passed = now - port->measurement_start;
131 	uint64_t busy_cycles = port->busy_cycles;
132 
133 	if (port->busy_start > 0) {
134 		busy_cycles += (now - port->busy_start);
135 		port->busy_start = now;
136 	}
137 
138 	int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
139 
140 	port->measurement_start = now;
141 	port->busy_cycles = 0;
142 
143 	port->total_busy_cycles += busy_cycles;
144 
145 	return load;
146 }
147 
148 static void
dsw_port_load_update(struct dsw_port * port,uint64_t now)149 dsw_port_load_update(struct dsw_port *port, uint64_t now)
150 {
151 	int16_t old_load;
152 	int16_t period_load;
153 	int16_t new_load;
154 
155 	old_load = rte_atomic16_read(&port->load);
156 
157 	period_load = dsw_port_load_close_period(port, now);
158 
159 	new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
160 		(DSW_OLD_LOAD_WEIGHT+1);
161 
162 	rte_atomic16_set(&port->load, new_load);
163 
164 	/* The load of the recently immigrated flows should hopefully
165 	 * be reflected the load estimate by now.
166 	 */
167 	rte_atomic32_set(&port->immigration_load, 0);
168 }
169 
170 static void
dsw_port_consider_load_update(struct dsw_port * port,uint64_t now)171 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
172 {
173 	if (now < port->next_load_update)
174 		return;
175 
176 	port->next_load_update = now + port->load_update_interval;
177 
178 	dsw_port_load_update(port, now);
179 }
180 
181 static void
dsw_port_ctl_enqueue(struct dsw_port * port,struct dsw_ctl_msg * msg)182 dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
183 {
184 	/* there's always room on the ring */
185 	while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
186 		rte_pause();
187 }
188 
189 static int
dsw_port_ctl_dequeue(struct dsw_port * port,struct dsw_ctl_msg * msg)190 dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
191 {
192 	return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
193 }
194 
195 static void
dsw_port_ctl_broadcast(struct dsw_evdev * dsw,struct dsw_port * source_port,uint8_t type,struct dsw_queue_flow * qfs,uint8_t qfs_len)196 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
197 		       uint8_t type, struct dsw_queue_flow *qfs,
198 		       uint8_t qfs_len)
199 {
200 	uint16_t port_id;
201 	struct dsw_ctl_msg msg = {
202 		.type = type,
203 		.originating_port_id = source_port->id,
204 		.qfs_len = qfs_len
205 	};
206 
207 	memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
208 
209 	for (port_id = 0; port_id < dsw->num_ports; port_id++)
210 		if (port_id != source_port->id)
211 			dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
212 }
213 
214 static __rte_always_inline bool
dsw_is_queue_flow_in_ary(const struct dsw_queue_flow * qfs,uint16_t qfs_len,uint8_t queue_id,uint16_t flow_hash)215 dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
216 			 uint8_t queue_id, uint16_t flow_hash)
217 {
218 	uint16_t i;
219 
220 	for (i = 0; i < qfs_len; i++)
221 		if (qfs[i].queue_id == queue_id &&
222 		    qfs[i].flow_hash == flow_hash)
223 			return true;
224 
225 	return false;
226 }
227 
228 static __rte_always_inline bool
dsw_port_is_flow_paused(struct dsw_port * port,uint8_t queue_id,uint16_t flow_hash)229 dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
230 			uint16_t flow_hash)
231 {
232 	return dsw_is_queue_flow_in_ary(port->paused_flows,
233 					port->paused_flows_len,
234 					queue_id, flow_hash);
235 }
236 
237 static void
dsw_port_add_paused_flows(struct dsw_port * port,struct dsw_queue_flow * qfs,uint8_t qfs_len)238 dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
239 			  uint8_t qfs_len)
240 {
241 	uint8_t i;
242 
243 	for (i = 0; i < qfs_len; i++) {
244 		struct dsw_queue_flow *qf = &qfs[i];
245 
246 		DSW_LOG_DP_PORT(DEBUG, port->id,
247 				"Pausing queue_id %d flow_hash %d.\n",
248 				qf->queue_id, qf->flow_hash);
249 
250 		port->paused_flows[port->paused_flows_len] = *qf;
251 		port->paused_flows_len++;
252 	};
253 }
254 
255 static void
dsw_port_remove_paused_flow(struct dsw_port * port,struct dsw_queue_flow * target_qf)256 dsw_port_remove_paused_flow(struct dsw_port *port,
257 			    struct dsw_queue_flow *target_qf)
258 {
259 	uint16_t i;
260 
261 	for (i = 0; i < port->paused_flows_len; i++) {
262 		struct dsw_queue_flow *qf = &port->paused_flows[i];
263 
264 		if (qf->queue_id == target_qf->queue_id &&
265 		    qf->flow_hash == target_qf->flow_hash) {
266 			uint16_t last_idx = port->paused_flows_len-1;
267 			if (i != last_idx)
268 				port->paused_flows[i] =
269 					port->paused_flows[last_idx];
270 			port->paused_flows_len--;
271 			break;
272 		}
273 	}
274 }
275 
276 static void
dsw_port_remove_paused_flows(struct dsw_port * port,struct dsw_queue_flow * qfs,uint8_t qfs_len)277 dsw_port_remove_paused_flows(struct dsw_port *port,
278 			     struct dsw_queue_flow *qfs, uint8_t qfs_len)
279 {
280 	uint8_t i;
281 
282 	for (i = 0; i < qfs_len; i++)
283 		dsw_port_remove_paused_flow(port, &qfs[i]);
284 
285 }
286 
287 static void
288 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
289 
290 static void
dsw_port_handle_pause_flows(struct dsw_evdev * dsw,struct dsw_port * port,uint8_t originating_port_id,struct dsw_queue_flow * paused_qfs,uint8_t qfs_len)291 dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
292 			    uint8_t originating_port_id,
293 			    struct dsw_queue_flow *paused_qfs,
294 			    uint8_t qfs_len)
295 {
296 	struct dsw_ctl_msg cfm = {
297 		.type = DSW_CTL_CFM,
298 		.originating_port_id = port->id
299 	};
300 
301 	/* There might be already-scheduled events belonging to the
302 	 * paused flow in the output buffers.
303 	 */
304 	dsw_port_flush_out_buffers(dsw, port);
305 
306 	dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
307 
308 	/* Make sure any stores to the original port's in_ring is seen
309 	 * before the ctl message.
310 	 */
311 	rte_smp_wmb();
312 
313 	dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
314 }
315 
316 struct dsw_queue_flow_burst {
317 	struct dsw_queue_flow queue_flow;
318 	uint16_t count;
319 };
320 
321 #define DSW_QF_TO_INT(_qf)					\
322 	((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
323 
324 static inline int
dsw_cmp_qf(const void * v_qf_a,const void * v_qf_b)325 dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
326 {
327 	const struct dsw_queue_flow *qf_a = v_qf_a;
328 	const struct dsw_queue_flow *qf_b = v_qf_b;
329 
330 	return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
331 }
332 
333 static uint16_t
dsw_sort_qfs_to_bursts(struct dsw_queue_flow * qfs,uint16_t qfs_len,struct dsw_queue_flow_burst * bursts)334 dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
335 		       struct dsw_queue_flow_burst *bursts)
336 {
337 	uint16_t i;
338 	struct dsw_queue_flow_burst *current_burst = NULL;
339 	uint16_t num_bursts = 0;
340 
341 	/* We don't need the stable property, and the list is likely
342 	 * large enough for qsort() to outperform dsw_stable_sort(),
343 	 * so we use qsort() here.
344 	 */
345 	qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
346 
347 	/* arrange the (now-consecutive) events into bursts */
348 	for (i = 0; i < qfs_len; i++) {
349 		if (i == 0 ||
350 		    dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
351 			current_burst = &bursts[num_bursts];
352 			current_burst->queue_flow = qfs[i];
353 			current_burst->count = 0;
354 			num_bursts++;
355 		}
356 		current_burst->count++;
357 	}
358 
359 	return num_bursts;
360 }
361 
362 static bool
dsw_retrieve_port_loads(struct dsw_evdev * dsw,int16_t * port_loads,int16_t load_limit)363 dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
364 			int16_t load_limit)
365 {
366 	bool below_limit = false;
367 	uint16_t i;
368 
369 	for (i = 0; i < dsw->num_ports; i++) {
370 		int16_t measured_load = rte_atomic16_read(&dsw->ports[i].load);
371 		int32_t immigration_load =
372 			rte_atomic32_read(&dsw->ports[i].immigration_load);
373 		int32_t load = measured_load + immigration_load;
374 
375 		load = RTE_MIN(load, DSW_MAX_LOAD);
376 
377 		if (load < load_limit)
378 			below_limit = true;
379 		port_loads[i] = load;
380 	}
381 	return below_limit;
382 }
383 
384 static int16_t
dsw_flow_load(uint16_t num_events,int16_t port_load)385 dsw_flow_load(uint16_t num_events, int16_t port_load)
386 {
387 	return ((int32_t)port_load * (int32_t)num_events) /
388 		DSW_MAX_EVENTS_RECORDED;
389 }
390 
391 static int16_t
dsw_evaluate_migration(int16_t source_load,int16_t target_load,int16_t flow_load)392 dsw_evaluate_migration(int16_t source_load, int16_t target_load,
393 		       int16_t flow_load)
394 {
395 	int32_t res_target_load;
396 	int32_t imbalance;
397 
398 	if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
399 		return -1;
400 
401 	imbalance = source_load - target_load;
402 
403 	if (imbalance < DSW_REBALANCE_THRESHOLD)
404 		return -1;
405 
406 	res_target_load = target_load + flow_load;
407 
408 	/* If the estimated load of the target port will be higher
409 	 * than the source port's load, it doesn't make sense to move
410 	 * the flow.
411 	 */
412 	if (res_target_load > source_load)
413 		return -1;
414 
415 	/* The more idle the target will be, the better. This will
416 	 * make migration prefer moving smaller flows, and flows to
417 	 * lightly loaded ports.
418 	 */
419 	return DSW_MAX_LOAD - res_target_load;
420 }
421 
422 static bool
dsw_is_serving_port(struct dsw_evdev * dsw,uint8_t port_id,uint8_t queue_id)423 dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
424 {
425 	struct dsw_queue *queue = &dsw->queues[queue_id];
426 	uint16_t i;
427 
428 	for (i = 0; i < queue->num_serving_ports; i++)
429 		if (queue->serving_ports[i] == port_id)
430 			return true;
431 
432 	return false;
433 }
434 
435 static bool
dsw_select_emigration_target(struct dsw_evdev * dsw,struct dsw_queue_flow_burst * bursts,uint16_t num_bursts,uint8_t source_port_id,int16_t * port_loads,uint16_t num_ports,uint8_t * target_port_ids,struct dsw_queue_flow * target_qfs,uint8_t * targets_len)436 dsw_select_emigration_target(struct dsw_evdev *dsw,
437 			    struct dsw_queue_flow_burst *bursts,
438 			    uint16_t num_bursts, uint8_t source_port_id,
439 			    int16_t *port_loads, uint16_t num_ports,
440 			    uint8_t *target_port_ids,
441 			    struct dsw_queue_flow *target_qfs,
442 			    uint8_t *targets_len)
443 {
444 	int16_t source_port_load = port_loads[source_port_id];
445 	struct dsw_queue_flow *candidate_qf = NULL;
446 	uint8_t candidate_port_id = 0;
447 	int16_t candidate_weight = -1;
448 	int16_t candidate_flow_load = -1;
449 	uint16_t i;
450 
451 	if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
452 		return false;
453 
454 	for (i = 0; i < num_bursts; i++) {
455 		struct dsw_queue_flow_burst *burst = &bursts[i];
456 		struct dsw_queue_flow *qf = &burst->queue_flow;
457 		int16_t flow_load;
458 		uint16_t port_id;
459 
460 		if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
461 					     qf->queue_id, qf->flow_hash))
462 			continue;
463 
464 		flow_load = dsw_flow_load(burst->count, source_port_load);
465 
466 		for (port_id = 0; port_id < num_ports; port_id++) {
467 			int16_t weight;
468 
469 			if (port_id == source_port_id)
470 				continue;
471 
472 			if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
473 				continue;
474 
475 			weight = dsw_evaluate_migration(source_port_load,
476 							port_loads[port_id],
477 							flow_load);
478 
479 			if (weight > candidate_weight) {
480 				candidate_qf = qf;
481 				candidate_port_id = port_id;
482 				candidate_weight = weight;
483 				candidate_flow_load = flow_load;
484 			}
485 		}
486 	}
487 
488 	if (candidate_weight < 0)
489 		return false;
490 
491 	DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d "
492 			"flow_hash %d (with flow load %d) for migration "
493 			"to port %d.\n", candidate_qf->queue_id,
494 			candidate_qf->flow_hash,
495 			DSW_LOAD_TO_PERCENT(candidate_flow_load),
496 			candidate_port_id);
497 
498 	port_loads[candidate_port_id] += candidate_flow_load;
499 	port_loads[source_port_id] -= candidate_flow_load;
500 
501 	target_port_ids[*targets_len] = candidate_port_id;
502 	target_qfs[*targets_len] = *candidate_qf;
503 	(*targets_len)++;
504 
505 	rte_atomic32_add(&dsw->ports[candidate_port_id].immigration_load,
506 			 candidate_flow_load);
507 
508 	return true;
509 }
510 
511 static void
dsw_select_emigration_targets(struct dsw_evdev * dsw,struct dsw_port * source_port,struct dsw_queue_flow_burst * bursts,uint16_t num_bursts,int16_t * port_loads)512 dsw_select_emigration_targets(struct dsw_evdev *dsw,
513 			      struct dsw_port *source_port,
514 			      struct dsw_queue_flow_burst *bursts,
515 			      uint16_t num_bursts, int16_t *port_loads)
516 {
517 	struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
518 	uint8_t *target_port_ids = source_port->emigration_target_port_ids;
519 	uint8_t *targets_len = &source_port->emigration_targets_len;
520 	uint16_t i;
521 
522 	for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
523 		bool found;
524 
525 		found = dsw_select_emigration_target(dsw, bursts, num_bursts,
526 						     source_port->id,
527 						     port_loads, dsw->num_ports,
528 						     target_port_ids,
529 						     target_qfs,
530 						     targets_len);
531 		if (!found)
532 			break;
533 	}
534 
535 	if (*targets_len == 0)
536 		DSW_LOG_DP_PORT(DEBUG, source_port->id,
537 				"For the %d flows considered, no target port "
538 				"was found.\n", num_bursts);
539 }
540 
541 static uint8_t
dsw_schedule(struct dsw_evdev * dsw,uint8_t queue_id,uint16_t flow_hash)542 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
543 {
544 	struct dsw_queue *queue = &dsw->queues[queue_id];
545 	uint8_t port_id;
546 
547 	if (queue->num_serving_ports > 1)
548 		port_id = queue->flow_to_port_map[flow_hash];
549 	else
550 		/* A single-link queue, or atomic/ordered/parallel but
551 		 * with just a single serving port.
552 		 */
553 		port_id = queue->serving_ports[0];
554 
555 	DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
556 		   "to port %d.\n", queue_id, flow_hash, port_id);
557 
558 	return port_id;
559 }
560 
561 static void
dsw_port_transmit_buffered(struct dsw_evdev * dsw,struct dsw_port * source_port,uint8_t dest_port_id)562 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
563 			   uint8_t dest_port_id)
564 {
565 	struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
566 	uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
567 	struct rte_event *buffer = source_port->out_buffer[dest_port_id];
568 	uint16_t enqueued = 0;
569 
570 	if (*buffer_len == 0)
571 		return;
572 
573 	/* The rings are dimensioned to fit all in-flight events (even
574 	 * on a single ring), so looping will work.
575 	 */
576 	do {
577 		enqueued +=
578 			rte_event_ring_enqueue_burst(dest_port->in_ring,
579 						     buffer+enqueued,
580 						     *buffer_len-enqueued,
581 						     NULL);
582 	} while (unlikely(enqueued != *buffer_len));
583 
584 	(*buffer_len) = 0;
585 }
586 
587 static uint16_t
dsw_port_get_parallel_flow_id(struct dsw_port * port)588 dsw_port_get_parallel_flow_id(struct dsw_port *port)
589 {
590 	uint16_t flow_id = port->next_parallel_flow_id;
591 
592 	port->next_parallel_flow_id =
593 		(port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
594 
595 	return flow_id;
596 }
597 
598 static void
dsw_port_buffer_paused(struct dsw_port * port,const struct rte_event * paused_event)599 dsw_port_buffer_paused(struct dsw_port *port,
600 		       const struct rte_event *paused_event)
601 {
602 	port->paused_events[port->paused_events_len] = *paused_event;
603 	port->paused_events_len++;
604 }
605 
606 static void
dsw_port_buffer_non_paused(struct dsw_evdev * dsw,struct dsw_port * source_port,uint8_t dest_port_id,const struct rte_event * event)607 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
608 			   uint8_t dest_port_id, const struct rte_event *event)
609 {
610 	struct rte_event *buffer = source_port->out_buffer[dest_port_id];
611 	uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
612 
613 	if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
614 		dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
615 
616 	buffer[*buffer_len] = *event;
617 
618 	(*buffer_len)++;
619 }
620 
621 #define DSW_FLOW_ID_BITS (24)
622 static uint16_t
dsw_flow_id_hash(uint32_t flow_id)623 dsw_flow_id_hash(uint32_t flow_id)
624 {
625 	uint16_t hash = 0;
626 	uint16_t offset = 0;
627 
628 	do {
629 		hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
630 		offset += DSW_MAX_FLOWS_BITS;
631 	} while (offset < DSW_FLOW_ID_BITS);
632 
633 	return hash;
634 }
635 
636 static void
dsw_port_buffer_parallel(struct dsw_evdev * dsw,struct dsw_port * source_port,struct rte_event event)637 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
638 			 struct rte_event event)
639 {
640 	uint8_t dest_port_id;
641 
642 	event.flow_id = dsw_port_get_parallel_flow_id(source_port);
643 
644 	dest_port_id = dsw_schedule(dsw, event.queue_id,
645 				    dsw_flow_id_hash(event.flow_id));
646 
647 	dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
648 }
649 
650 static void
dsw_port_buffer_event(struct dsw_evdev * dsw,struct dsw_port * source_port,const struct rte_event * event)651 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
652 		      const struct rte_event *event)
653 {
654 	uint16_t flow_hash;
655 	uint8_t dest_port_id;
656 
657 	if (unlikely(dsw->queues[event->queue_id].schedule_type ==
658 		     RTE_SCHED_TYPE_PARALLEL)) {
659 		dsw_port_buffer_parallel(dsw, source_port, *event);
660 		return;
661 	}
662 
663 	flow_hash = dsw_flow_id_hash(event->flow_id);
664 
665 	if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
666 					     flow_hash))) {
667 		dsw_port_buffer_paused(source_port, event);
668 		return;
669 	}
670 
671 	dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
672 
673 	dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
674 }
675 
676 static void
dsw_port_flush_paused_events(struct dsw_evdev * dsw,struct dsw_port * source_port,const struct dsw_queue_flow * qf)677 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
678 			     struct dsw_port *source_port,
679 			     const struct dsw_queue_flow *qf)
680 {
681 	uint16_t paused_events_len = source_port->paused_events_len;
682 	struct rte_event paused_events[paused_events_len];
683 	uint8_t dest_port_id;
684 	uint16_t i;
685 
686 	if (paused_events_len == 0)
687 		return;
688 
689 	if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash))
690 		return;
691 
692 	rte_memcpy(paused_events, source_port->paused_events,
693 		   paused_events_len * sizeof(struct rte_event));
694 
695 	source_port->paused_events_len = 0;
696 
697 	dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash);
698 
699 	for (i = 0; i < paused_events_len; i++) {
700 		struct rte_event *event = &paused_events[i];
701 		uint16_t flow_hash;
702 
703 		flow_hash = dsw_flow_id_hash(event->flow_id);
704 
705 		if (event->queue_id == qf->queue_id &&
706 		    flow_hash == qf->flow_hash)
707 			dsw_port_buffer_non_paused(dsw, source_port,
708 						   dest_port_id, event);
709 		else
710 			dsw_port_buffer_paused(source_port, event);
711 	}
712 }
713 
714 static void
dsw_port_emigration_stats(struct dsw_port * port,uint8_t finished)715 dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
716 {
717 	uint64_t flow_migration_latency;
718 
719 	flow_migration_latency =
720 		(rte_get_timer_cycles() - port->emigration_start);
721 	port->emigration_latency += (flow_migration_latency * finished);
722 	port->emigrations += finished;
723 }
724 
725 static void
dsw_port_end_emigration(struct dsw_evdev * dsw,struct dsw_port * port,uint8_t schedule_type)726 dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
727 			uint8_t schedule_type)
728 {
729 	uint8_t i;
730 	struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
731 	uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
732 	uint8_t left_qfs_len = 0;
733 	uint8_t finished;
734 
735 	for (i = 0; i < port->emigration_targets_len; i++) {
736 		struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
737 		uint8_t queue_id = qf->queue_id;
738 		uint8_t queue_schedule_type =
739 			dsw->queues[queue_id].schedule_type;
740 		uint16_t flow_hash = qf->flow_hash;
741 
742 		if (queue_schedule_type != schedule_type) {
743 			left_port_ids[left_qfs_len] =
744 				port->emigration_target_port_ids[i];
745 			left_qfs[left_qfs_len] = *qf;
746 			left_qfs_len++;
747 			continue;
748 		}
749 
750 		DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
751 				"queue_id %d flow_hash %d.\n", queue_id,
752 				flow_hash);
753 
754 		if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) {
755 			dsw_port_remove_paused_flow(port, qf);
756 			dsw_port_flush_paused_events(dsw, port, qf);
757 		}
758 	}
759 
760 	finished = port->emigration_targets_len - left_qfs_len;
761 
762 	if (finished > 0)
763 		dsw_port_emigration_stats(port, finished);
764 
765 	for (i = 0; i < left_qfs_len; i++) {
766 		port->emigration_target_port_ids[i] = left_port_ids[i];
767 		port->emigration_target_qfs[i] = left_qfs[i];
768 	}
769 	port->emigration_targets_len = left_qfs_len;
770 
771 	if (port->emigration_targets_len == 0) {
772 		port->migration_state = DSW_MIGRATION_STATE_IDLE;
773 		port->seen_events_len = 0;
774 	}
775 }
776 
777 static void
dsw_port_move_parallel_flows(struct dsw_evdev * dsw,struct dsw_port * source_port)778 dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
779 			     struct dsw_port *source_port)
780 {
781 	uint8_t i;
782 
783 	for (i = 0; i < source_port->emigration_targets_len; i++) {
784 		struct dsw_queue_flow *qf =
785 			&source_port->emigration_target_qfs[i];
786 		uint8_t queue_id = qf->queue_id;
787 
788 		if (dsw->queues[queue_id].schedule_type ==
789 		    RTE_SCHED_TYPE_PARALLEL) {
790 			uint8_t dest_port_id =
791 				source_port->emigration_target_port_ids[i];
792 			uint16_t flow_hash = qf->flow_hash;
793 
794 			/* Single byte-sized stores are always atomic. */
795 			dsw->queues[queue_id].flow_to_port_map[flow_hash] =
796 				dest_port_id;
797 		}
798 	}
799 
800 	rte_smp_wmb();
801 
802 	dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
803 }
804 
805 static void
dsw_port_consider_emigration(struct dsw_evdev * dsw,struct dsw_port * source_port,uint64_t now)806 dsw_port_consider_emigration(struct dsw_evdev *dsw,
807 			     struct dsw_port *source_port,
808 			     uint64_t now)
809 {
810 	bool any_port_below_limit;
811 	struct dsw_queue_flow *seen_events = source_port->seen_events;
812 	uint16_t seen_events_len = source_port->seen_events_len;
813 	struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
814 	uint16_t num_bursts;
815 	int16_t source_port_load;
816 	int16_t port_loads[dsw->num_ports];
817 
818 	if (now < source_port->next_emigration)
819 		return;
820 
821 	if (dsw->num_ports == 1)
822 		return;
823 
824 	if (seen_events_len < DSW_MAX_EVENTS_RECORDED)
825 		return;
826 
827 	DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
828 
829 	/* Randomize interval to avoid having all threads considering
830 	 * emigration at the same in point in time, which might lead
831 	 * to all choosing the same target port.
832 	 */
833 	source_port->next_emigration = now +
834 		source_port->migration_interval / 2 +
835 		rte_rand() % source_port->migration_interval;
836 
837 	if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
838 		DSW_LOG_DP_PORT(DEBUG, source_port->id,
839 				"Emigration already in progress.\n");
840 		return;
841 	}
842 
843 	/* For simplicity, avoid migration in the unlikely case there
844 	 * is still events to consume in the in_buffer (from the last
845 	 * emigration).
846 	 */
847 	if (source_port->in_buffer_len > 0) {
848 		DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
849 				"events in the input buffer.\n");
850 		return;
851 	}
852 
853 	source_port_load = rte_atomic16_read(&source_port->load);
854 	if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
855 		DSW_LOG_DP_PORT(DEBUG, source_port->id,
856 		      "Load %d is below threshold level %d.\n",
857 		      DSW_LOAD_TO_PERCENT(source_port_load),
858 		      DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
859 		return;
860 	}
861 
862 	/* Avoid starting any expensive operations (sorting etc), in
863 	 * case of a scenario with all ports above the load limit.
864 	 */
865 	any_port_below_limit =
866 		dsw_retrieve_port_loads(dsw, port_loads,
867 					DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
868 	if (!any_port_below_limit) {
869 		DSW_LOG_DP_PORT(DEBUG, source_port->id,
870 				"Candidate target ports are all too highly "
871 				"loaded.\n");
872 		return;
873 	}
874 
875 	num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
876 					    bursts);
877 
878 	/* For non-big-little systems, there's no point in moving the
879 	 * only (known) flow.
880 	 */
881 	if (num_bursts < 2) {
882 		DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
883 				"queue_id %d flow_hash %d has been seen.\n",
884 				bursts[0].queue_flow.queue_id,
885 				bursts[0].queue_flow.flow_hash);
886 		return;
887 	}
888 
889 	dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
890 				      port_loads);
891 
892 	if (source_port->emigration_targets_len == 0)
893 		return;
894 
895 	source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
896 	source_port->emigration_start = rte_get_timer_cycles();
897 
898 	/* No need to go through the whole pause procedure for
899 	 * parallel queues, since atomic/ordered semantics need not to
900 	 * be maintained.
901 	 */
902 	dsw_port_move_parallel_flows(dsw, source_port);
903 
904 	/* All flows were on PARALLEL queues. */
905 	if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
906 		return;
907 
908 	/* There might be 'loopback' events already scheduled in the
909 	 * output buffers.
910 	 */
911 	dsw_port_flush_out_buffers(dsw, source_port);
912 
913 	dsw_port_add_paused_flows(source_port,
914 				  source_port->emigration_target_qfs,
915 				  source_port->emigration_targets_len);
916 
917 	dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
918 			       source_port->emigration_target_qfs,
919 			       source_port->emigration_targets_len);
920 	source_port->cfm_cnt = 0;
921 }
922 
923 static void
924 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
925 			     struct dsw_port *source_port,
926 			     const struct dsw_queue_flow *qf);
927 
928 static void
dsw_port_handle_unpause_flows(struct dsw_evdev * dsw,struct dsw_port * port,uint8_t originating_port_id,struct dsw_queue_flow * paused_qfs,uint8_t qfs_len)929 dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
930 			      uint8_t originating_port_id,
931 			      struct dsw_queue_flow *paused_qfs,
932 			      uint8_t qfs_len)
933 {
934 	uint16_t i;
935 	struct dsw_ctl_msg cfm = {
936 		.type = DSW_CTL_CFM,
937 		.originating_port_id = port->id
938 	};
939 
940 	dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
941 
942 	rte_smp_rmb();
943 
944 	dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
945 
946 	for (i = 0; i < qfs_len; i++) {
947 		struct dsw_queue_flow *qf = &paused_qfs[i];
948 
949 		if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
950 			port->immigrations++;
951 
952 		dsw_port_flush_paused_events(dsw, port, qf);
953 	}
954 }
955 
956 #define FORWARD_BURST_SIZE (32)
957 
958 static void
dsw_port_forward_emigrated_flow(struct dsw_port * source_port,struct rte_event_ring * dest_ring,uint8_t queue_id,uint16_t flow_hash)959 dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
960 				struct rte_event_ring *dest_ring,
961 				uint8_t queue_id,
962 				uint16_t flow_hash)
963 {
964 	uint16_t events_left;
965 
966 	/* Control ring message should been seen before the ring count
967 	 * is read on the port's in_ring.
968 	 */
969 	rte_smp_rmb();
970 
971 	events_left = rte_event_ring_count(source_port->in_ring);
972 
973 	while (events_left > 0) {
974 		uint16_t in_burst_size =
975 			RTE_MIN(FORWARD_BURST_SIZE, events_left);
976 		struct rte_event in_burst[in_burst_size];
977 		uint16_t in_len;
978 		uint16_t i;
979 
980 		in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
981 						      in_burst,
982 						      in_burst_size, NULL);
983 		/* No need to care about bursting forwarded events (to
984 		 * the destination port's in_ring), since migration
985 		 * doesn't happen very often, and also the majority of
986 		 * the dequeued events will likely *not* be forwarded.
987 		 */
988 		for (i = 0; i < in_len; i++) {
989 			struct rte_event *e = &in_burst[i];
990 			if (e->queue_id == queue_id &&
991 			    dsw_flow_id_hash(e->flow_id) == flow_hash) {
992 				while (rte_event_ring_enqueue_burst(dest_ring,
993 								    e, 1,
994 								    NULL) != 1)
995 					rte_pause();
996 			} else {
997 				uint16_t last_idx = source_port->in_buffer_len;
998 				source_port->in_buffer[last_idx] = *e;
999 				source_port->in_buffer_len++;
1000 			}
1001 		}
1002 
1003 		events_left -= in_len;
1004 	}
1005 }
1006 
1007 static void
dsw_port_move_emigrating_flows(struct dsw_evdev * dsw,struct dsw_port * source_port)1008 dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
1009 			       struct dsw_port *source_port)
1010 {
1011 	uint8_t i;
1012 
1013 	dsw_port_flush_out_buffers(dsw, source_port);
1014 
1015 	rte_smp_wmb();
1016 
1017 	for (i = 0; i < source_port->emigration_targets_len; i++) {
1018 		struct dsw_queue_flow *qf =
1019 			&source_port->emigration_target_qfs[i];
1020 		uint8_t dest_port_id =
1021 			source_port->emigration_target_port_ids[i];
1022 		struct dsw_port *dest_port = &dsw->ports[dest_port_id];
1023 
1024 		dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
1025 			dest_port_id;
1026 
1027 		dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
1028 						qf->queue_id, qf->flow_hash);
1029 	}
1030 
1031 	/* Flow table update and migration destination port's enqueues
1032 	 * must be seen before the control message.
1033 	 */
1034 	rte_smp_wmb();
1035 
1036 	dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
1037 			       source_port->emigration_target_qfs,
1038 			       source_port->emigration_targets_len);
1039 	source_port->cfm_cnt = 0;
1040 	source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
1041 }
1042 
1043 static void
dsw_port_handle_confirm(struct dsw_evdev * dsw,struct dsw_port * port)1044 dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
1045 {
1046 	port->cfm_cnt++;
1047 
1048 	if (port->cfm_cnt == (dsw->num_ports-1)) {
1049 		switch (port->migration_state) {
1050 		case DSW_MIGRATION_STATE_PAUSING:
1051 			DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
1052 					"migration state.\n");
1053 			port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
1054 			break;
1055 		case DSW_MIGRATION_STATE_UNPAUSING:
1056 			dsw_port_end_emigration(dsw, port,
1057 						RTE_SCHED_TYPE_ATOMIC);
1058 			break;
1059 		default:
1060 			RTE_ASSERT(0);
1061 			break;
1062 		}
1063 	}
1064 }
1065 
1066 static void
dsw_port_ctl_process(struct dsw_evdev * dsw,struct dsw_port * port)1067 dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
1068 {
1069 	struct dsw_ctl_msg msg;
1070 
1071 	if (dsw_port_ctl_dequeue(port, &msg) == 0) {
1072 		switch (msg.type) {
1073 		case DSW_CTL_PAUS_REQ:
1074 			dsw_port_handle_pause_flows(dsw, port,
1075 						    msg.originating_port_id,
1076 						    msg.qfs, msg.qfs_len);
1077 			break;
1078 		case DSW_CTL_UNPAUS_REQ:
1079 			dsw_port_handle_unpause_flows(dsw, port,
1080 						      msg.originating_port_id,
1081 						      msg.qfs, msg.qfs_len);
1082 			break;
1083 		case DSW_CTL_CFM:
1084 			dsw_port_handle_confirm(dsw, port);
1085 			break;
1086 		}
1087 	}
1088 }
1089 
1090 static void
dsw_port_note_op(struct dsw_port * port,uint16_t num_events)1091 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
1092 {
1093 	/* To pull the control ring reasonbly often on busy ports,
1094 	 * each dequeued/enqueued event is considered an 'op' too.
1095 	 */
1096 	port->ops_since_bg_task += (num_events+1);
1097 }
1098 
1099 static void
dsw_port_bg_process(struct dsw_evdev * dsw,struct dsw_port * port)1100 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
1101 {
1102 	if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
1103 		     port->pending_releases == 0))
1104 		dsw_port_move_emigrating_flows(dsw, port);
1105 
1106 	/* Polling the control ring is relatively inexpensive, and
1107 	 * polling it often helps bringing down migration latency, so
1108 	 * do this for every iteration.
1109 	 */
1110 	dsw_port_ctl_process(dsw, port);
1111 
1112 	/* To avoid considering migration and flushing output buffers
1113 	 * on every dequeue/enqueue call, the scheduler only performs
1114 	 * such 'background' tasks every nth
1115 	 * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
1116 	 */
1117 	if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
1118 		uint64_t now;
1119 
1120 		now = rte_get_timer_cycles();
1121 
1122 		port->last_bg = now;
1123 
1124 		/* Logic to avoid having events linger in the output
1125 		 * buffer too long.
1126 		 */
1127 		dsw_port_flush_out_buffers(dsw, port);
1128 
1129 		dsw_port_consider_load_update(port, now);
1130 
1131 		dsw_port_consider_emigration(dsw, port, now);
1132 
1133 		port->ops_since_bg_task = 0;
1134 	}
1135 }
1136 
1137 static void
dsw_port_flush_out_buffers(struct dsw_evdev * dsw,struct dsw_port * source_port)1138 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
1139 {
1140 	uint16_t dest_port_id;
1141 
1142 	for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
1143 		dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1144 }
1145 
1146 uint16_t
dsw_event_enqueue(void * port,const struct rte_event * ev)1147 dsw_event_enqueue(void *port, const struct rte_event *ev)
1148 {
1149 	return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1150 }
1151 
1152 static __rte_always_inline uint16_t
dsw_event_enqueue_burst_generic(struct dsw_port * source_port,const struct rte_event events[],uint16_t events_len,bool op_types_known,uint16_t num_new,uint16_t num_release,uint16_t num_non_release)1153 dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1154 				const struct rte_event events[],
1155 				uint16_t events_len, bool op_types_known,
1156 				uint16_t num_new, uint16_t num_release,
1157 				uint16_t num_non_release)
1158 {
1159 	struct dsw_evdev *dsw = source_port->dsw;
1160 	bool enough_credits;
1161 	uint16_t i;
1162 
1163 	DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1164 			"events to port %d.\n", events_len, source_port->id);
1165 
1166 	dsw_port_bg_process(dsw, source_port);
1167 
1168 	/* XXX: For performance (=ring efficiency) reasons, the
1169 	 * scheduler relies on internal non-ring buffers instead of
1170 	 * immediately sending the event to the destination ring. For
1171 	 * a producer that doesn't intend to produce or consume any
1172 	 * more events, the scheduler provides a way to flush the
1173 	 * buffer, by means of doing an enqueue of zero events. In
1174 	 * addition, a port cannot be left "unattended" (e.g. unused)
1175 	 * for long periods of time, since that would stall
1176 	 * migration. Eventdev API extensions to provide a cleaner way
1177 	 * to archieve both of these functions should be
1178 	 * considered.
1179 	 */
1180 	if (unlikely(events_len == 0)) {
1181 		dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1182 		dsw_port_flush_out_buffers(dsw, source_port);
1183 		return 0;
1184 	}
1185 
1186 	dsw_port_note_op(source_port, events_len);
1187 
1188 	if (!op_types_known)
1189 		for (i = 0; i < events_len; i++) {
1190 			switch (events[i].op) {
1191 			case RTE_EVENT_OP_RELEASE:
1192 				num_release++;
1193 				break;
1194 			case RTE_EVENT_OP_NEW:
1195 				num_new++;
1196 				/* Falls through. */
1197 			default:
1198 				num_non_release++;
1199 				break;
1200 			}
1201 		}
1202 
1203 	/* Technically, we could allow the non-new events up to the
1204 	 * first new event in the array into the system, but for
1205 	 * simplicity reasons, we deny the whole burst if the port is
1206 	 * above the water mark.
1207 	 */
1208 	if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
1209 		     source_port->new_event_threshold))
1210 		return 0;
1211 
1212 	enough_credits = dsw_port_acquire_credits(dsw, source_port,
1213 						  num_non_release);
1214 	if (unlikely(!enough_credits))
1215 		return 0;
1216 
1217 	source_port->pending_releases -= num_release;
1218 
1219 	dsw_port_enqueue_stats(source_port, num_new,
1220 			       num_non_release-num_new, num_release);
1221 
1222 	for (i = 0; i < events_len; i++) {
1223 		const struct rte_event *event = &events[i];
1224 
1225 		if (likely(num_release == 0 ||
1226 			   event->op != RTE_EVENT_OP_RELEASE))
1227 			dsw_port_buffer_event(dsw, source_port, event);
1228 		dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1229 	}
1230 
1231 	DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1232 			"accepted.\n", num_non_release);
1233 
1234 	return (num_non_release + num_release);
1235 }
1236 
1237 uint16_t
dsw_event_enqueue_burst(void * port,const struct rte_event events[],uint16_t events_len)1238 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1239 			uint16_t events_len)
1240 {
1241 	struct dsw_port *source_port = port;
1242 
1243 	if (unlikely(events_len > source_port->enqueue_depth))
1244 		events_len = source_port->enqueue_depth;
1245 
1246 	return dsw_event_enqueue_burst_generic(source_port, events,
1247 					       events_len, false, 0, 0, 0);
1248 }
1249 
1250 uint16_t
dsw_event_enqueue_new_burst(void * port,const struct rte_event events[],uint16_t events_len)1251 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1252 			    uint16_t events_len)
1253 {
1254 	struct dsw_port *source_port = port;
1255 
1256 	if (unlikely(events_len > source_port->enqueue_depth))
1257 		events_len = source_port->enqueue_depth;
1258 
1259 	return dsw_event_enqueue_burst_generic(source_port, events,
1260 					       events_len, true, events_len,
1261 					       0, events_len);
1262 }
1263 
1264 uint16_t
dsw_event_enqueue_forward_burst(void * port,const struct rte_event events[],uint16_t events_len)1265 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1266 				uint16_t events_len)
1267 {
1268 	struct dsw_port *source_port = port;
1269 
1270 	if (unlikely(events_len > source_port->enqueue_depth))
1271 		events_len = source_port->enqueue_depth;
1272 
1273 	return dsw_event_enqueue_burst_generic(source_port, events,
1274 					       events_len, true, 0, 0,
1275 					       events_len);
1276 }
1277 
1278 uint16_t
dsw_event_dequeue(void * port,struct rte_event * events,uint64_t wait)1279 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1280 {
1281 	return dsw_event_dequeue_burst(port, events, 1, wait);
1282 }
1283 
1284 static void
dsw_port_record_seen_events(struct dsw_port * port,struct rte_event * events,uint16_t num)1285 dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1286 			    uint16_t num)
1287 {
1288 	uint16_t i;
1289 
1290 	dsw_port_dequeue_stats(port, num);
1291 
1292 	for (i = 0; i < num; i++) {
1293 		uint16_t l_idx = port->seen_events_idx;
1294 		struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1295 		struct rte_event *event = &events[i];
1296 		qf->queue_id = event->queue_id;
1297 		qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1298 
1299 		port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1300 
1301 		dsw_port_queue_dequeued_stats(port, event->queue_id);
1302 	}
1303 
1304 	if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1305 		port->seen_events_len =
1306 			RTE_MIN(port->seen_events_len + num,
1307 				DSW_MAX_EVENTS_RECORDED);
1308 }
1309 
1310 #ifdef DSW_SORT_DEQUEUED
1311 
1312 #define DSW_EVENT_TO_INT(_event)				\
1313 	((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1314 
1315 static inline int
dsw_cmp_event(const void * v_event_a,const void * v_event_b)1316 dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1317 {
1318 	const struct rte_event *event_a = v_event_a;
1319 	const struct rte_event *event_b = v_event_b;
1320 
1321 	return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1322 }
1323 #endif
1324 
1325 static uint16_t
dsw_port_dequeue_burst(struct dsw_port * port,struct rte_event * events,uint16_t num)1326 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1327 		       uint16_t num)
1328 {
1329 	if (unlikely(port->in_buffer_len > 0)) {
1330 		uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1331 
1332 		rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1333 			   dequeued * sizeof(struct rte_event));
1334 
1335 		port->in_buffer_start += dequeued;
1336 		port->in_buffer_len -= dequeued;
1337 
1338 		if (port->in_buffer_len == 0)
1339 			port->in_buffer_start = 0;
1340 
1341 		return dequeued;
1342 	}
1343 
1344 	return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1345 }
1346 
1347 uint16_t
dsw_event_dequeue_burst(void * port,struct rte_event * events,uint16_t num,uint64_t wait __rte_unused)1348 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1349 			uint64_t wait __rte_unused)
1350 {
1351 	struct dsw_port *source_port = port;
1352 	struct dsw_evdev *dsw = source_port->dsw;
1353 	uint16_t dequeued;
1354 
1355 	source_port->pending_releases = 0;
1356 
1357 	dsw_port_bg_process(dsw, source_port);
1358 
1359 	if (unlikely(num > source_port->dequeue_depth))
1360 		num = source_port->dequeue_depth;
1361 
1362 	dequeued = dsw_port_dequeue_burst(source_port, events, num);
1363 
1364 	source_port->pending_releases = dequeued;
1365 
1366 	dsw_port_load_record(source_port, dequeued);
1367 
1368 	dsw_port_note_op(source_port, dequeued);
1369 
1370 	if (dequeued > 0) {
1371 		DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1372 				dequeued);
1373 
1374 		dsw_port_return_credits(dsw, source_port, dequeued);
1375 
1376 		/* One potential optimization one might think of is to
1377 		 * add a migration state (prior to 'pausing'), and
1378 		 * only record seen events when the port is in this
1379 		 * state (and transit to 'pausing' when enough events
1380 		 * have been gathered). However, that schema doesn't
1381 		 * seem to improve performance.
1382 		 */
1383 		dsw_port_record_seen_events(port, events, dequeued);
1384 	} else /* Zero-size dequeue means a likely idle port, and thus
1385 		* we can afford trading some efficiency for a slightly
1386 		* reduced event wall-time latency.
1387 		*/
1388 		dsw_port_flush_out_buffers(dsw, port);
1389 
1390 #ifdef DSW_SORT_DEQUEUED
1391 	dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1392 #endif
1393 
1394 	return dequeued;
1395 }
1396