xref: /dpdk/drivers/event/dsw/dsw_event.c (revision 7be78d02)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4 
5 #include "dsw_evdev.h"
6 
7 #ifdef DSW_SORT_DEQUEUED
8 #include "dsw_sort.h"
9 #endif
10 
11 #include <stdbool.h>
12 #include <string.h>
13 
14 #include <rte_cycles.h>
15 #include <rte_memcpy.h>
16 #include <rte_random.h>
17 
18 static bool
dsw_port_acquire_credits(struct dsw_evdev * dsw,struct dsw_port * port,int32_t credits)19 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
20 			 int32_t credits)
21 {
22 	int32_t inflight_credits = port->inflight_credits;
23 	int32_t missing_credits = credits - inflight_credits;
24 	int32_t total_on_loan;
25 	int32_t available;
26 	int32_t acquired_credits;
27 	int32_t new_total_on_loan;
28 
29 	if (likely(missing_credits <= 0)) {
30 		port->inflight_credits -= credits;
31 		return true;
32 	}
33 
34 	total_on_loan =
35 		__atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED);
36 	available = dsw->max_inflight - total_on_loan;
37 	acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
38 
39 	if (available < acquired_credits)
40 		return false;
41 
42 	/* This is a race, no locks are involved, and thus some other
43 	 * thread can allocate tokens in between the check and the
44 	 * allocation.
45 	 */
46 	new_total_on_loan =
47 	    __atomic_add_fetch(&dsw->credits_on_loan, acquired_credits,
48 			       __ATOMIC_RELAXED);
49 
50 	if (unlikely(new_total_on_loan > dsw->max_inflight)) {
51 		/* Some other port took the last credits */
52 		__atomic_sub_fetch(&dsw->credits_on_loan, acquired_credits,
53 				   __ATOMIC_RELAXED);
54 		return false;
55 	}
56 
57 	DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
58 			acquired_credits);
59 
60 	port->inflight_credits += acquired_credits;
61 	port->inflight_credits -= credits;
62 
63 	return true;
64 }
65 
66 static void
dsw_port_return_credits(struct dsw_evdev * dsw,struct dsw_port * port,int32_t credits)67 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
68 			int32_t credits)
69 {
70 	port->inflight_credits += credits;
71 
72 	if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
73 		int32_t leave_credits = DSW_PORT_MIN_CREDITS;
74 		int32_t return_credits =
75 			port->inflight_credits - leave_credits;
76 
77 		port->inflight_credits = leave_credits;
78 
79 		__atomic_sub_fetch(&dsw->credits_on_loan, return_credits,
80 				   __ATOMIC_RELAXED);
81 
82 		DSW_LOG_DP_PORT(DEBUG, port->id,
83 				"Returned %d tokens to pool.\n",
84 				return_credits);
85 	}
86 }
87 
88 static void
dsw_port_enqueue_stats(struct dsw_port * port,uint16_t num_new,uint16_t num_forward,uint16_t num_release)89 dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
90 		       uint16_t num_forward, uint16_t num_release)
91 {
92 	port->new_enqueued += num_new;
93 	port->forward_enqueued += num_forward;
94 	port->release_enqueued += num_release;
95 }
96 
97 static void
dsw_port_queue_enqueue_stats(struct dsw_port * source_port,uint8_t queue_id)98 dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
99 {
100 	source_port->queue_enqueued[queue_id]++;
101 }
102 
103 static void
dsw_port_dequeue_stats(struct dsw_port * port,uint16_t num)104 dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
105 {
106 	port->dequeued += num;
107 }
108 
109 static void
dsw_port_queue_dequeued_stats(struct dsw_port * source_port,uint8_t queue_id)110 dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
111 {
112 	source_port->queue_dequeued[queue_id]++;
113 }
114 
115 static void
dsw_port_load_record(struct dsw_port * port,unsigned int dequeued)116 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
117 {
118 	if (dequeued > 0 && port->busy_start == 0)
119 		/* work period begins */
120 		port->busy_start = rte_get_timer_cycles();
121 	else if (dequeued == 0 && port->busy_start > 0) {
122 		/* work period ends */
123 		uint64_t work_period =
124 			rte_get_timer_cycles() - port->busy_start;
125 		port->busy_cycles += work_period;
126 		port->busy_start = 0;
127 	}
128 }
129 
130 static int16_t
dsw_port_load_close_period(struct dsw_port * port,uint64_t now)131 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
132 {
133 	uint64_t passed = now - port->measurement_start;
134 	uint64_t busy_cycles = port->busy_cycles;
135 
136 	if (port->busy_start > 0) {
137 		busy_cycles += (now - port->busy_start);
138 		port->busy_start = now;
139 	}
140 
141 	int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
142 
143 	port->measurement_start = now;
144 	port->busy_cycles = 0;
145 
146 	port->total_busy_cycles += busy_cycles;
147 
148 	return load;
149 }
150 
151 static void
dsw_port_load_update(struct dsw_port * port,uint64_t now)152 dsw_port_load_update(struct dsw_port *port, uint64_t now)
153 {
154 	int16_t old_load;
155 	int16_t period_load;
156 	int16_t new_load;
157 
158 	old_load = __atomic_load_n(&port->load, __ATOMIC_RELAXED);
159 
160 	period_load = dsw_port_load_close_period(port, now);
161 
162 	new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
163 		(DSW_OLD_LOAD_WEIGHT+1);
164 
165 	__atomic_store_n(&port->load, new_load, __ATOMIC_RELAXED);
166 
167 	/* The load of the recently immigrated flows should hopefully
168 	 * be reflected the load estimate by now.
169 	 */
170 	__atomic_store_n(&port->immigration_load, 0, __ATOMIC_RELAXED);
171 }
172 
173 static void
dsw_port_consider_load_update(struct dsw_port * port,uint64_t now)174 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
175 {
176 	if (now < port->next_load_update)
177 		return;
178 
179 	port->next_load_update = now + port->load_update_interval;
180 
181 	dsw_port_load_update(port, now);
182 }
183 
184 static void
dsw_port_ctl_enqueue(struct dsw_port * port,struct dsw_ctl_msg * msg)185 dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
186 {
187 	/* there's always room on the ring */
188 	while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
189 		rte_pause();
190 }
191 
192 static int
dsw_port_ctl_dequeue(struct dsw_port * port,struct dsw_ctl_msg * msg)193 dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
194 {
195 	return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
196 }
197 
198 static void
dsw_port_ctl_broadcast(struct dsw_evdev * dsw,struct dsw_port * source_port,uint8_t type,struct dsw_queue_flow * qfs,uint8_t qfs_len)199 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
200 		       uint8_t type, struct dsw_queue_flow *qfs,
201 		       uint8_t qfs_len)
202 {
203 	uint16_t port_id;
204 	struct dsw_ctl_msg msg = {
205 		.type = type,
206 		.originating_port_id = source_port->id,
207 		.qfs_len = qfs_len
208 	};
209 
210 	memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
211 
212 	for (port_id = 0; port_id < dsw->num_ports; port_id++)
213 		if (port_id != source_port->id)
214 			dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
215 }
216 
217 static __rte_always_inline bool
dsw_is_queue_flow_in_ary(const struct dsw_queue_flow * qfs,uint16_t qfs_len,uint8_t queue_id,uint16_t flow_hash)218 dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
219 			 uint8_t queue_id, uint16_t flow_hash)
220 {
221 	uint16_t i;
222 
223 	for (i = 0; i < qfs_len; i++)
224 		if (qfs[i].queue_id == queue_id &&
225 		    qfs[i].flow_hash == flow_hash)
226 			return true;
227 
228 	return false;
229 }
230 
231 static __rte_always_inline bool
dsw_port_is_flow_paused(struct dsw_port * port,uint8_t queue_id,uint16_t flow_hash)232 dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
233 			uint16_t flow_hash)
234 {
235 	return dsw_is_queue_flow_in_ary(port->paused_flows,
236 					port->paused_flows_len,
237 					queue_id, flow_hash);
238 }
239 
240 static void
dsw_port_add_paused_flows(struct dsw_port * port,struct dsw_queue_flow * qfs,uint8_t qfs_len)241 dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
242 			  uint8_t qfs_len)
243 {
244 	uint8_t i;
245 
246 	for (i = 0; i < qfs_len; i++) {
247 		struct dsw_queue_flow *qf = &qfs[i];
248 
249 		DSW_LOG_DP_PORT(DEBUG, port->id,
250 				"Pausing queue_id %d flow_hash %d.\n",
251 				qf->queue_id, qf->flow_hash);
252 
253 		port->paused_flows[port->paused_flows_len] = *qf;
254 		port->paused_flows_len++;
255 	};
256 }
257 
258 static void
dsw_port_remove_paused_flow(struct dsw_port * port,struct dsw_queue_flow * target_qf)259 dsw_port_remove_paused_flow(struct dsw_port *port,
260 			    struct dsw_queue_flow *target_qf)
261 {
262 	uint16_t i;
263 
264 	for (i = 0; i < port->paused_flows_len; i++) {
265 		struct dsw_queue_flow *qf = &port->paused_flows[i];
266 
267 		if (qf->queue_id == target_qf->queue_id &&
268 		    qf->flow_hash == target_qf->flow_hash) {
269 			uint16_t last_idx = port->paused_flows_len-1;
270 			if (i != last_idx)
271 				port->paused_flows[i] =
272 					port->paused_flows[last_idx];
273 			port->paused_flows_len--;
274 			break;
275 		}
276 	}
277 }
278 
279 static void
dsw_port_remove_paused_flows(struct dsw_port * port,struct dsw_queue_flow * qfs,uint8_t qfs_len)280 dsw_port_remove_paused_flows(struct dsw_port *port,
281 			     struct dsw_queue_flow *qfs, uint8_t qfs_len)
282 {
283 	uint8_t i;
284 
285 	for (i = 0; i < qfs_len; i++)
286 		dsw_port_remove_paused_flow(port, &qfs[i]);
287 
288 }
289 
290 static void
291 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
292 
293 static void
dsw_port_handle_pause_flows(struct dsw_evdev * dsw,struct dsw_port * port,uint8_t originating_port_id,struct dsw_queue_flow * paused_qfs,uint8_t qfs_len)294 dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
295 			    uint8_t originating_port_id,
296 			    struct dsw_queue_flow *paused_qfs,
297 			    uint8_t qfs_len)
298 {
299 	struct dsw_ctl_msg cfm = {
300 		.type = DSW_CTL_CFM,
301 		.originating_port_id = port->id
302 	};
303 
304 	/* There might be already-scheduled events belonging to the
305 	 * paused flow in the output buffers.
306 	 */
307 	dsw_port_flush_out_buffers(dsw, port);
308 
309 	dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
310 
311 	/* Make sure any stores to the original port's in_ring is seen
312 	 * before the ctl message.
313 	 */
314 	rte_smp_wmb();
315 
316 	dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
317 }
318 
319 struct dsw_queue_flow_burst {
320 	struct dsw_queue_flow queue_flow;
321 	uint16_t count;
322 };
323 
324 #define DSW_QF_TO_INT(_qf)					\
325 	((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
326 
327 static inline int
dsw_cmp_qf(const void * v_qf_a,const void * v_qf_b)328 dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
329 {
330 	const struct dsw_queue_flow *qf_a = v_qf_a;
331 	const struct dsw_queue_flow *qf_b = v_qf_b;
332 
333 	return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
334 }
335 
336 static uint16_t
dsw_sort_qfs_to_bursts(struct dsw_queue_flow * qfs,uint16_t qfs_len,struct dsw_queue_flow_burst * bursts)337 dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
338 		       struct dsw_queue_flow_burst *bursts)
339 {
340 	uint16_t i;
341 	struct dsw_queue_flow_burst *current_burst = NULL;
342 	uint16_t num_bursts = 0;
343 
344 	/* We don't need the stable property, and the list is likely
345 	 * large enough for qsort() to outperform dsw_stable_sort(),
346 	 * so we use qsort() here.
347 	 */
348 	qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
349 
350 	/* arrange the (now-consecutive) events into bursts */
351 	for (i = 0; i < qfs_len; i++) {
352 		if (i == 0 ||
353 		    dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
354 			current_burst = &bursts[num_bursts];
355 			current_burst->queue_flow = qfs[i];
356 			current_burst->count = 0;
357 			num_bursts++;
358 		}
359 		current_burst->count++;
360 	}
361 
362 	return num_bursts;
363 }
364 
365 static bool
dsw_retrieve_port_loads(struct dsw_evdev * dsw,int16_t * port_loads,int16_t load_limit)366 dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
367 			int16_t load_limit)
368 {
369 	bool below_limit = false;
370 	uint16_t i;
371 
372 	for (i = 0; i < dsw->num_ports; i++) {
373 		int16_t measured_load =
374 			__atomic_load_n(&dsw->ports[i].load, __ATOMIC_RELAXED);
375 		int32_t immigration_load =
376 			__atomic_load_n(&dsw->ports[i].immigration_load,
377 					__ATOMIC_RELAXED);
378 		int32_t load = measured_load + immigration_load;
379 
380 		load = RTE_MIN(load, DSW_MAX_LOAD);
381 
382 		if (load < load_limit)
383 			below_limit = true;
384 		port_loads[i] = load;
385 	}
386 	return below_limit;
387 }
388 
389 static int16_t
dsw_flow_load(uint16_t num_events,int16_t port_load)390 dsw_flow_load(uint16_t num_events, int16_t port_load)
391 {
392 	return ((int32_t)port_load * (int32_t)num_events) /
393 		DSW_MAX_EVENTS_RECORDED;
394 }
395 
396 static int16_t
dsw_evaluate_migration(int16_t source_load,int16_t target_load,int16_t flow_load)397 dsw_evaluate_migration(int16_t source_load, int16_t target_load,
398 		       int16_t flow_load)
399 {
400 	int32_t res_target_load;
401 	int32_t imbalance;
402 
403 	if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
404 		return -1;
405 
406 	imbalance = source_load - target_load;
407 
408 	if (imbalance < DSW_REBALANCE_THRESHOLD)
409 		return -1;
410 
411 	res_target_load = target_load + flow_load;
412 
413 	/* If the estimated load of the target port will be higher
414 	 * than the source port's load, it doesn't make sense to move
415 	 * the flow.
416 	 */
417 	if (res_target_load > source_load)
418 		return -1;
419 
420 	/* The more idle the target will be, the better. This will
421 	 * make migration prefer moving smaller flows, and flows to
422 	 * lightly loaded ports.
423 	 */
424 	return DSW_MAX_LOAD - res_target_load;
425 }
426 
427 static bool
dsw_is_serving_port(struct dsw_evdev * dsw,uint8_t port_id,uint8_t queue_id)428 dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
429 {
430 	struct dsw_queue *queue = &dsw->queues[queue_id];
431 	uint16_t i;
432 
433 	for (i = 0; i < queue->num_serving_ports; i++)
434 		if (queue->serving_ports[i] == port_id)
435 			return true;
436 
437 	return false;
438 }
439 
440 static bool
dsw_select_emigration_target(struct dsw_evdev * dsw,struct dsw_queue_flow_burst * bursts,uint16_t num_bursts,uint8_t source_port_id,int16_t * port_loads,uint16_t num_ports,uint8_t * target_port_ids,struct dsw_queue_flow * target_qfs,uint8_t * targets_len)441 dsw_select_emigration_target(struct dsw_evdev *dsw,
442 			    struct dsw_queue_flow_burst *bursts,
443 			    uint16_t num_bursts, uint8_t source_port_id,
444 			    int16_t *port_loads, uint16_t num_ports,
445 			    uint8_t *target_port_ids,
446 			    struct dsw_queue_flow *target_qfs,
447 			    uint8_t *targets_len)
448 {
449 	int16_t source_port_load = port_loads[source_port_id];
450 	struct dsw_queue_flow *candidate_qf = NULL;
451 	uint8_t candidate_port_id = 0;
452 	int16_t candidate_weight = -1;
453 	int16_t candidate_flow_load = -1;
454 	uint16_t i;
455 
456 	if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
457 		return false;
458 
459 	for (i = 0; i < num_bursts; i++) {
460 		struct dsw_queue_flow_burst *burst = &bursts[i];
461 		struct dsw_queue_flow *qf = &burst->queue_flow;
462 		int16_t flow_load;
463 		uint16_t port_id;
464 
465 		if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
466 					     qf->queue_id, qf->flow_hash))
467 			continue;
468 
469 		flow_load = dsw_flow_load(burst->count, source_port_load);
470 
471 		for (port_id = 0; port_id < num_ports; port_id++) {
472 			int16_t weight;
473 
474 			if (port_id == source_port_id)
475 				continue;
476 
477 			if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
478 				continue;
479 
480 			weight = dsw_evaluate_migration(source_port_load,
481 							port_loads[port_id],
482 							flow_load);
483 
484 			if (weight > candidate_weight) {
485 				candidate_qf = qf;
486 				candidate_port_id = port_id;
487 				candidate_weight = weight;
488 				candidate_flow_load = flow_load;
489 			}
490 		}
491 	}
492 
493 	if (candidate_weight < 0)
494 		return false;
495 
496 	DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d "
497 			"flow_hash %d (with flow load %d) for migration "
498 			"to port %d.\n", candidate_qf->queue_id,
499 			candidate_qf->flow_hash,
500 			DSW_LOAD_TO_PERCENT(candidate_flow_load),
501 			candidate_port_id);
502 
503 	port_loads[candidate_port_id] += candidate_flow_load;
504 	port_loads[source_port_id] -= candidate_flow_load;
505 
506 	target_port_ids[*targets_len] = candidate_port_id;
507 	target_qfs[*targets_len] = *candidate_qf;
508 	(*targets_len)++;
509 
510 	__atomic_add_fetch(&dsw->ports[candidate_port_id].immigration_load,
511 			   candidate_flow_load, __ATOMIC_RELAXED);
512 
513 	return true;
514 }
515 
516 static void
dsw_select_emigration_targets(struct dsw_evdev * dsw,struct dsw_port * source_port,struct dsw_queue_flow_burst * bursts,uint16_t num_bursts,int16_t * port_loads)517 dsw_select_emigration_targets(struct dsw_evdev *dsw,
518 			      struct dsw_port *source_port,
519 			      struct dsw_queue_flow_burst *bursts,
520 			      uint16_t num_bursts, int16_t *port_loads)
521 {
522 	struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
523 	uint8_t *target_port_ids = source_port->emigration_target_port_ids;
524 	uint8_t *targets_len = &source_port->emigration_targets_len;
525 	uint16_t i;
526 
527 	for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
528 		bool found;
529 
530 		found = dsw_select_emigration_target(dsw, bursts, num_bursts,
531 						     source_port->id,
532 						     port_loads, dsw->num_ports,
533 						     target_port_ids,
534 						     target_qfs,
535 						     targets_len);
536 		if (!found)
537 			break;
538 	}
539 
540 	if (*targets_len == 0)
541 		DSW_LOG_DP_PORT(DEBUG, source_port->id,
542 				"For the %d flows considered, no target port "
543 				"was found.\n", num_bursts);
544 }
545 
546 static uint8_t
dsw_schedule(struct dsw_evdev * dsw,uint8_t queue_id,uint16_t flow_hash)547 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
548 {
549 	struct dsw_queue *queue = &dsw->queues[queue_id];
550 	uint8_t port_id;
551 
552 	if (queue->num_serving_ports > 1)
553 		port_id = queue->flow_to_port_map[flow_hash];
554 	else
555 		/* A single-link queue, or atomic/ordered/parallel but
556 		 * with just a single serving port.
557 		 */
558 		port_id = queue->serving_ports[0];
559 
560 	DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
561 		   "to port %d.\n", queue_id, flow_hash, port_id);
562 
563 	return port_id;
564 }
565 
566 static void
dsw_port_transmit_buffered(struct dsw_evdev * dsw,struct dsw_port * source_port,uint8_t dest_port_id)567 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
568 			   uint8_t dest_port_id)
569 {
570 	struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
571 	uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
572 	struct rte_event *buffer = source_port->out_buffer[dest_port_id];
573 	uint16_t enqueued = 0;
574 
575 	if (*buffer_len == 0)
576 		return;
577 
578 	/* The rings are dimensioned to fit all in-flight events (even
579 	 * on a single ring), so looping will work.
580 	 */
581 	do {
582 		enqueued +=
583 			rte_event_ring_enqueue_burst(dest_port->in_ring,
584 						     buffer+enqueued,
585 						     *buffer_len-enqueued,
586 						     NULL);
587 	} while (unlikely(enqueued != *buffer_len));
588 
589 	(*buffer_len) = 0;
590 }
591 
592 static uint16_t
dsw_port_get_parallel_flow_id(struct dsw_port * port)593 dsw_port_get_parallel_flow_id(struct dsw_port *port)
594 {
595 	uint16_t flow_id = port->next_parallel_flow_id;
596 
597 	port->next_parallel_flow_id =
598 		(port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
599 
600 	return flow_id;
601 }
602 
603 static void
dsw_port_buffer_paused(struct dsw_port * port,const struct rte_event * paused_event)604 dsw_port_buffer_paused(struct dsw_port *port,
605 		       const struct rte_event *paused_event)
606 {
607 	port->paused_events[port->paused_events_len] = *paused_event;
608 	port->paused_events_len++;
609 }
610 
611 static void
dsw_port_buffer_non_paused(struct dsw_evdev * dsw,struct dsw_port * source_port,uint8_t dest_port_id,const struct rte_event * event)612 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
613 			   uint8_t dest_port_id, const struct rte_event *event)
614 {
615 	struct rte_event *buffer = source_port->out_buffer[dest_port_id];
616 	uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
617 
618 	if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
619 		dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
620 
621 	buffer[*buffer_len] = *event;
622 
623 	(*buffer_len)++;
624 }
625 
626 #define DSW_FLOW_ID_BITS (24)
627 static uint16_t
dsw_flow_id_hash(uint32_t flow_id)628 dsw_flow_id_hash(uint32_t flow_id)
629 {
630 	uint16_t hash = 0;
631 	uint16_t offset = 0;
632 
633 	do {
634 		hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
635 		offset += DSW_MAX_FLOWS_BITS;
636 	} while (offset < DSW_FLOW_ID_BITS);
637 
638 	return hash;
639 }
640 
641 static void
dsw_port_buffer_parallel(struct dsw_evdev * dsw,struct dsw_port * source_port,struct rte_event event)642 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
643 			 struct rte_event event)
644 {
645 	uint8_t dest_port_id;
646 
647 	event.flow_id = dsw_port_get_parallel_flow_id(source_port);
648 
649 	dest_port_id = dsw_schedule(dsw, event.queue_id,
650 				    dsw_flow_id_hash(event.flow_id));
651 
652 	dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
653 }
654 
655 static void
dsw_port_buffer_event(struct dsw_evdev * dsw,struct dsw_port * source_port,const struct rte_event * event)656 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
657 		      const struct rte_event *event)
658 {
659 	uint16_t flow_hash;
660 	uint8_t dest_port_id;
661 
662 	if (unlikely(dsw->queues[event->queue_id].schedule_type ==
663 		     RTE_SCHED_TYPE_PARALLEL)) {
664 		dsw_port_buffer_parallel(dsw, source_port, *event);
665 		return;
666 	}
667 
668 	flow_hash = dsw_flow_id_hash(event->flow_id);
669 
670 	if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
671 					     flow_hash))) {
672 		dsw_port_buffer_paused(source_port, event);
673 		return;
674 	}
675 
676 	dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
677 
678 	dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
679 }
680 
681 static void
dsw_port_flush_paused_events(struct dsw_evdev * dsw,struct dsw_port * source_port,const struct dsw_queue_flow * qf)682 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
683 			     struct dsw_port *source_port,
684 			     const struct dsw_queue_flow *qf)
685 {
686 	uint16_t paused_events_len = source_port->paused_events_len;
687 	struct rte_event paused_events[paused_events_len];
688 	uint8_t dest_port_id;
689 	uint16_t i;
690 
691 	if (paused_events_len == 0)
692 		return;
693 
694 	if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash))
695 		return;
696 
697 	rte_memcpy(paused_events, source_port->paused_events,
698 		   paused_events_len * sizeof(struct rte_event));
699 
700 	source_port->paused_events_len = 0;
701 
702 	dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash);
703 
704 	for (i = 0; i < paused_events_len; i++) {
705 		struct rte_event *event = &paused_events[i];
706 		uint16_t flow_hash;
707 
708 		flow_hash = dsw_flow_id_hash(event->flow_id);
709 
710 		if (event->queue_id == qf->queue_id &&
711 		    flow_hash == qf->flow_hash)
712 			dsw_port_buffer_non_paused(dsw, source_port,
713 						   dest_port_id, event);
714 		else
715 			dsw_port_buffer_paused(source_port, event);
716 	}
717 }
718 
719 static void
dsw_port_emigration_stats(struct dsw_port * port,uint8_t finished)720 dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
721 {
722 	uint64_t flow_migration_latency;
723 
724 	flow_migration_latency =
725 		(rte_get_timer_cycles() - port->emigration_start);
726 	port->emigration_latency += (flow_migration_latency * finished);
727 	port->emigrations += finished;
728 }
729 
730 static void
dsw_port_end_emigration(struct dsw_evdev * dsw,struct dsw_port * port,uint8_t schedule_type)731 dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
732 			uint8_t schedule_type)
733 {
734 	uint8_t i;
735 	struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
736 	uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
737 	uint8_t left_qfs_len = 0;
738 	uint8_t finished;
739 
740 	for (i = 0; i < port->emigration_targets_len; i++) {
741 		struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
742 		uint8_t queue_id = qf->queue_id;
743 		uint8_t queue_schedule_type =
744 			dsw->queues[queue_id].schedule_type;
745 		uint16_t flow_hash = qf->flow_hash;
746 
747 		if (queue_schedule_type != schedule_type) {
748 			left_port_ids[left_qfs_len] =
749 				port->emigration_target_port_ids[i];
750 			left_qfs[left_qfs_len] = *qf;
751 			left_qfs_len++;
752 			continue;
753 		}
754 
755 		DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
756 				"queue_id %d flow_hash %d.\n", queue_id,
757 				flow_hash);
758 
759 		if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) {
760 			dsw_port_remove_paused_flow(port, qf);
761 			dsw_port_flush_paused_events(dsw, port, qf);
762 		}
763 	}
764 
765 	finished = port->emigration_targets_len - left_qfs_len;
766 
767 	if (finished > 0)
768 		dsw_port_emigration_stats(port, finished);
769 
770 	for (i = 0; i < left_qfs_len; i++) {
771 		port->emigration_target_port_ids[i] = left_port_ids[i];
772 		port->emigration_target_qfs[i] = left_qfs[i];
773 	}
774 	port->emigration_targets_len = left_qfs_len;
775 
776 	if (port->emigration_targets_len == 0) {
777 		port->migration_state = DSW_MIGRATION_STATE_IDLE;
778 		port->seen_events_len = 0;
779 	}
780 }
781 
782 static void
dsw_port_move_parallel_flows(struct dsw_evdev * dsw,struct dsw_port * source_port)783 dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
784 			     struct dsw_port *source_port)
785 {
786 	uint8_t i;
787 
788 	for (i = 0; i < source_port->emigration_targets_len; i++) {
789 		struct dsw_queue_flow *qf =
790 			&source_port->emigration_target_qfs[i];
791 		uint8_t queue_id = qf->queue_id;
792 
793 		if (dsw->queues[queue_id].schedule_type ==
794 		    RTE_SCHED_TYPE_PARALLEL) {
795 			uint8_t dest_port_id =
796 				source_port->emigration_target_port_ids[i];
797 			uint16_t flow_hash = qf->flow_hash;
798 
799 			/* Single byte-sized stores are always atomic. */
800 			dsw->queues[queue_id].flow_to_port_map[flow_hash] =
801 				dest_port_id;
802 		}
803 	}
804 
805 	rte_smp_wmb();
806 
807 	dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
808 }
809 
810 static void
dsw_port_consider_emigration(struct dsw_evdev * dsw,struct dsw_port * source_port,uint64_t now)811 dsw_port_consider_emigration(struct dsw_evdev *dsw,
812 			     struct dsw_port *source_port,
813 			     uint64_t now)
814 {
815 	bool any_port_below_limit;
816 	struct dsw_queue_flow *seen_events = source_port->seen_events;
817 	uint16_t seen_events_len = source_port->seen_events_len;
818 	struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
819 	uint16_t num_bursts;
820 	int16_t source_port_load;
821 	int16_t port_loads[dsw->num_ports];
822 
823 	if (now < source_port->next_emigration)
824 		return;
825 
826 	if (dsw->num_ports == 1)
827 		return;
828 
829 	if (seen_events_len < DSW_MAX_EVENTS_RECORDED)
830 		return;
831 
832 	DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
833 
834 	/* Randomize interval to avoid having all threads considering
835 	 * emigration at the same in point in time, which might lead
836 	 * to all choosing the same target port.
837 	 */
838 	source_port->next_emigration = now +
839 		source_port->migration_interval / 2 +
840 		rte_rand() % source_port->migration_interval;
841 
842 	if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
843 		DSW_LOG_DP_PORT(DEBUG, source_port->id,
844 				"Emigration already in progress.\n");
845 		return;
846 	}
847 
848 	/* For simplicity, avoid migration in the unlikely case there
849 	 * is still events to consume in the in_buffer (from the last
850 	 * emigration).
851 	 */
852 	if (source_port->in_buffer_len > 0) {
853 		DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
854 				"events in the input buffer.\n");
855 		return;
856 	}
857 
858 	source_port_load =
859 		__atomic_load_n(&source_port->load, __ATOMIC_RELAXED);
860 	if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
861 		DSW_LOG_DP_PORT(DEBUG, source_port->id,
862 		      "Load %d is below threshold level %d.\n",
863 		      DSW_LOAD_TO_PERCENT(source_port_load),
864 		      DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
865 		return;
866 	}
867 
868 	/* Avoid starting any expensive operations (sorting etc), in
869 	 * case of a scenario with all ports above the load limit.
870 	 */
871 	any_port_below_limit =
872 		dsw_retrieve_port_loads(dsw, port_loads,
873 					DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
874 	if (!any_port_below_limit) {
875 		DSW_LOG_DP_PORT(DEBUG, source_port->id,
876 				"Candidate target ports are all too highly "
877 				"loaded.\n");
878 		return;
879 	}
880 
881 	num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
882 					    bursts);
883 
884 	/* For non-big-little systems, there's no point in moving the
885 	 * only (known) flow.
886 	 */
887 	if (num_bursts < 2) {
888 		DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
889 				"queue_id %d flow_hash %d has been seen.\n",
890 				bursts[0].queue_flow.queue_id,
891 				bursts[0].queue_flow.flow_hash);
892 		return;
893 	}
894 
895 	dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
896 				      port_loads);
897 
898 	if (source_port->emigration_targets_len == 0)
899 		return;
900 
901 	source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
902 	source_port->emigration_start = rte_get_timer_cycles();
903 
904 	/* No need to go through the whole pause procedure for
905 	 * parallel queues, since atomic/ordered semantics need not to
906 	 * be maintained.
907 	 */
908 	dsw_port_move_parallel_flows(dsw, source_port);
909 
910 	/* All flows were on PARALLEL queues. */
911 	if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
912 		return;
913 
914 	/* There might be 'loopback' events already scheduled in the
915 	 * output buffers.
916 	 */
917 	dsw_port_flush_out_buffers(dsw, source_port);
918 
919 	dsw_port_add_paused_flows(source_port,
920 				  source_port->emigration_target_qfs,
921 				  source_port->emigration_targets_len);
922 
923 	dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
924 			       source_port->emigration_target_qfs,
925 			       source_port->emigration_targets_len);
926 	source_port->cfm_cnt = 0;
927 }
928 
929 static void
930 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
931 			     struct dsw_port *source_port,
932 			     const struct dsw_queue_flow *qf);
933 
934 static void
dsw_port_handle_unpause_flows(struct dsw_evdev * dsw,struct dsw_port * port,uint8_t originating_port_id,struct dsw_queue_flow * paused_qfs,uint8_t qfs_len)935 dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
936 			      uint8_t originating_port_id,
937 			      struct dsw_queue_flow *paused_qfs,
938 			      uint8_t qfs_len)
939 {
940 	uint16_t i;
941 	struct dsw_ctl_msg cfm = {
942 		.type = DSW_CTL_CFM,
943 		.originating_port_id = port->id
944 	};
945 
946 	dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
947 
948 	rte_smp_rmb();
949 
950 	dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
951 
952 	for (i = 0; i < qfs_len; i++) {
953 		struct dsw_queue_flow *qf = &paused_qfs[i];
954 
955 		if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
956 			port->immigrations++;
957 
958 		dsw_port_flush_paused_events(dsw, port, qf);
959 	}
960 }
961 
962 #define FORWARD_BURST_SIZE (32)
963 
964 static void
dsw_port_forward_emigrated_flow(struct dsw_port * source_port,struct rte_event_ring * dest_ring,uint8_t queue_id,uint16_t flow_hash)965 dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
966 				struct rte_event_ring *dest_ring,
967 				uint8_t queue_id,
968 				uint16_t flow_hash)
969 {
970 	uint16_t events_left;
971 
972 	/* Control ring message should been seen before the ring count
973 	 * is read on the port's in_ring.
974 	 */
975 	rte_smp_rmb();
976 
977 	events_left = rte_event_ring_count(source_port->in_ring);
978 
979 	while (events_left > 0) {
980 		uint16_t in_burst_size =
981 			RTE_MIN(FORWARD_BURST_SIZE, events_left);
982 		struct rte_event in_burst[in_burst_size];
983 		uint16_t in_len;
984 		uint16_t i;
985 
986 		in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
987 						      in_burst,
988 						      in_burst_size, NULL);
989 		/* No need to care about bursting forwarded events (to
990 		 * the destination port's in_ring), since migration
991 		 * doesn't happen very often, and also the majority of
992 		 * the dequeued events will likely *not* be forwarded.
993 		 */
994 		for (i = 0; i < in_len; i++) {
995 			struct rte_event *e = &in_burst[i];
996 			if (e->queue_id == queue_id &&
997 			    dsw_flow_id_hash(e->flow_id) == flow_hash) {
998 				while (rte_event_ring_enqueue_burst(dest_ring,
999 								    e, 1,
1000 								    NULL) != 1)
1001 					rte_pause();
1002 			} else {
1003 				uint16_t last_idx = source_port->in_buffer_len;
1004 				source_port->in_buffer[last_idx] = *e;
1005 				source_port->in_buffer_len++;
1006 			}
1007 		}
1008 
1009 		events_left -= in_len;
1010 	}
1011 }
1012 
1013 static void
dsw_port_move_emigrating_flows(struct dsw_evdev * dsw,struct dsw_port * source_port)1014 dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
1015 			       struct dsw_port *source_port)
1016 {
1017 	uint8_t i;
1018 
1019 	dsw_port_flush_out_buffers(dsw, source_port);
1020 
1021 	rte_smp_wmb();
1022 
1023 	for (i = 0; i < source_port->emigration_targets_len; i++) {
1024 		struct dsw_queue_flow *qf =
1025 			&source_port->emigration_target_qfs[i];
1026 		uint8_t dest_port_id =
1027 			source_port->emigration_target_port_ids[i];
1028 		struct dsw_port *dest_port = &dsw->ports[dest_port_id];
1029 
1030 		dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
1031 			dest_port_id;
1032 
1033 		dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
1034 						qf->queue_id, qf->flow_hash);
1035 	}
1036 
1037 	/* Flow table update and migration destination port's enqueues
1038 	 * must be seen before the control message.
1039 	 */
1040 	rte_smp_wmb();
1041 
1042 	dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
1043 			       source_port->emigration_target_qfs,
1044 			       source_port->emigration_targets_len);
1045 	source_port->cfm_cnt = 0;
1046 	source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
1047 }
1048 
1049 static void
dsw_port_handle_confirm(struct dsw_evdev * dsw,struct dsw_port * port)1050 dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
1051 {
1052 	port->cfm_cnt++;
1053 
1054 	if (port->cfm_cnt == (dsw->num_ports-1)) {
1055 		switch (port->migration_state) {
1056 		case DSW_MIGRATION_STATE_PAUSING:
1057 			DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
1058 					"migration state.\n");
1059 			port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
1060 			break;
1061 		case DSW_MIGRATION_STATE_UNPAUSING:
1062 			dsw_port_end_emigration(dsw, port,
1063 						RTE_SCHED_TYPE_ATOMIC);
1064 			break;
1065 		default:
1066 			RTE_ASSERT(0);
1067 			break;
1068 		}
1069 	}
1070 }
1071 
1072 static void
dsw_port_ctl_process(struct dsw_evdev * dsw,struct dsw_port * port)1073 dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
1074 {
1075 	struct dsw_ctl_msg msg;
1076 
1077 	if (dsw_port_ctl_dequeue(port, &msg) == 0) {
1078 		switch (msg.type) {
1079 		case DSW_CTL_PAUS_REQ:
1080 			dsw_port_handle_pause_flows(dsw, port,
1081 						    msg.originating_port_id,
1082 						    msg.qfs, msg.qfs_len);
1083 			break;
1084 		case DSW_CTL_UNPAUS_REQ:
1085 			dsw_port_handle_unpause_flows(dsw, port,
1086 						      msg.originating_port_id,
1087 						      msg.qfs, msg.qfs_len);
1088 			break;
1089 		case DSW_CTL_CFM:
1090 			dsw_port_handle_confirm(dsw, port);
1091 			break;
1092 		}
1093 	}
1094 }
1095 
1096 static void
dsw_port_note_op(struct dsw_port * port,uint16_t num_events)1097 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
1098 {
1099 	/* To pull the control ring reasonably often on busy ports,
1100 	 * each dequeued/enqueued event is considered an 'op' too.
1101 	 */
1102 	port->ops_since_bg_task += (num_events+1);
1103 }
1104 
1105 static void
dsw_port_bg_process(struct dsw_evdev * dsw,struct dsw_port * port)1106 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
1107 {
1108 	if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
1109 		     port->pending_releases == 0))
1110 		dsw_port_move_emigrating_flows(dsw, port);
1111 
1112 	/* Polling the control ring is relatively inexpensive, and
1113 	 * polling it often helps bringing down migration latency, so
1114 	 * do this for every iteration.
1115 	 */
1116 	dsw_port_ctl_process(dsw, port);
1117 
1118 	/* To avoid considering migration and flushing output buffers
1119 	 * on every dequeue/enqueue call, the scheduler only performs
1120 	 * such 'background' tasks every nth
1121 	 * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
1122 	 */
1123 	if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
1124 		uint64_t now;
1125 
1126 		now = rte_get_timer_cycles();
1127 
1128 		port->last_bg = now;
1129 
1130 		/* Logic to avoid having events linger in the output
1131 		 * buffer too long.
1132 		 */
1133 		dsw_port_flush_out_buffers(dsw, port);
1134 
1135 		dsw_port_consider_load_update(port, now);
1136 
1137 		dsw_port_consider_emigration(dsw, port, now);
1138 
1139 		port->ops_since_bg_task = 0;
1140 	}
1141 }
1142 
1143 static void
dsw_port_flush_out_buffers(struct dsw_evdev * dsw,struct dsw_port * source_port)1144 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
1145 {
1146 	uint16_t dest_port_id;
1147 
1148 	for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
1149 		dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1150 }
1151 
1152 uint16_t
dsw_event_enqueue(void * port,const struct rte_event * ev)1153 dsw_event_enqueue(void *port, const struct rte_event *ev)
1154 {
1155 	return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1156 }
1157 
1158 static __rte_always_inline uint16_t
dsw_event_enqueue_burst_generic(struct dsw_port * source_port,const struct rte_event events[],uint16_t events_len,bool op_types_known,uint16_t num_new,uint16_t num_release,uint16_t num_non_release)1159 dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1160 				const struct rte_event events[],
1161 				uint16_t events_len, bool op_types_known,
1162 				uint16_t num_new, uint16_t num_release,
1163 				uint16_t num_non_release)
1164 {
1165 	struct dsw_evdev *dsw = source_port->dsw;
1166 	bool enough_credits;
1167 	uint16_t i;
1168 
1169 	DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1170 			"events to port %d.\n", events_len, source_port->id);
1171 
1172 	dsw_port_bg_process(dsw, source_port);
1173 
1174 	/* XXX: For performance (=ring efficiency) reasons, the
1175 	 * scheduler relies on internal non-ring buffers instead of
1176 	 * immediately sending the event to the destination ring. For
1177 	 * a producer that doesn't intend to produce or consume any
1178 	 * more events, the scheduler provides a way to flush the
1179 	 * buffer, by means of doing an enqueue of zero events. In
1180 	 * addition, a port cannot be left "unattended" (e.g. unused)
1181 	 * for long periods of time, since that would stall
1182 	 * migration. Eventdev API extensions to provide a cleaner way
1183 	 * to archive both of these functions should be
1184 	 * considered.
1185 	 */
1186 	if (unlikely(events_len == 0)) {
1187 		dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1188 		dsw_port_flush_out_buffers(dsw, source_port);
1189 		return 0;
1190 	}
1191 
1192 	dsw_port_note_op(source_port, events_len);
1193 
1194 	if (!op_types_known)
1195 		for (i = 0; i < events_len; i++) {
1196 			switch (events[i].op) {
1197 			case RTE_EVENT_OP_RELEASE:
1198 				num_release++;
1199 				break;
1200 			case RTE_EVENT_OP_NEW:
1201 				num_new++;
1202 				/* Falls through. */
1203 			default:
1204 				num_non_release++;
1205 				break;
1206 			}
1207 		}
1208 
1209 	/* Technically, we could allow the non-new events up to the
1210 	 * first new event in the array into the system, but for
1211 	 * simplicity reasons, we deny the whole burst if the port is
1212 	 * above the water mark.
1213 	 */
1214 	if (unlikely(num_new > 0 &&
1215 		     __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED) >
1216 		     source_port->new_event_threshold))
1217 		return 0;
1218 
1219 	enough_credits = dsw_port_acquire_credits(dsw, source_port,
1220 						  num_non_release);
1221 	if (unlikely(!enough_credits))
1222 		return 0;
1223 
1224 	source_port->pending_releases -= num_release;
1225 
1226 	dsw_port_enqueue_stats(source_port, num_new,
1227 			       num_non_release-num_new, num_release);
1228 
1229 	for (i = 0; i < events_len; i++) {
1230 		const struct rte_event *event = &events[i];
1231 
1232 		if (likely(num_release == 0 ||
1233 			   event->op != RTE_EVENT_OP_RELEASE))
1234 			dsw_port_buffer_event(dsw, source_port, event);
1235 		dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1236 	}
1237 
1238 	DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1239 			"accepted.\n", num_non_release);
1240 
1241 	return (num_non_release + num_release);
1242 }
1243 
1244 uint16_t
dsw_event_enqueue_burst(void * port,const struct rte_event events[],uint16_t events_len)1245 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1246 			uint16_t events_len)
1247 {
1248 	struct dsw_port *source_port = port;
1249 
1250 	if (unlikely(events_len > source_port->enqueue_depth))
1251 		events_len = source_port->enqueue_depth;
1252 
1253 	return dsw_event_enqueue_burst_generic(source_port, events,
1254 					       events_len, false, 0, 0, 0);
1255 }
1256 
1257 uint16_t
dsw_event_enqueue_new_burst(void * port,const struct rte_event events[],uint16_t events_len)1258 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1259 			    uint16_t events_len)
1260 {
1261 	struct dsw_port *source_port = port;
1262 
1263 	if (unlikely(events_len > source_port->enqueue_depth))
1264 		events_len = source_port->enqueue_depth;
1265 
1266 	return dsw_event_enqueue_burst_generic(source_port, events,
1267 					       events_len, true, events_len,
1268 					       0, events_len);
1269 }
1270 
1271 uint16_t
dsw_event_enqueue_forward_burst(void * port,const struct rte_event events[],uint16_t events_len)1272 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1273 				uint16_t events_len)
1274 {
1275 	struct dsw_port *source_port = port;
1276 
1277 	if (unlikely(events_len > source_port->enqueue_depth))
1278 		events_len = source_port->enqueue_depth;
1279 
1280 	return dsw_event_enqueue_burst_generic(source_port, events,
1281 					       events_len, true, 0, 0,
1282 					       events_len);
1283 }
1284 
1285 uint16_t
dsw_event_dequeue(void * port,struct rte_event * events,uint64_t wait)1286 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1287 {
1288 	return dsw_event_dequeue_burst(port, events, 1, wait);
1289 }
1290 
1291 static void
dsw_port_record_seen_events(struct dsw_port * port,struct rte_event * events,uint16_t num)1292 dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1293 			    uint16_t num)
1294 {
1295 	uint16_t i;
1296 
1297 	dsw_port_dequeue_stats(port, num);
1298 
1299 	for (i = 0; i < num; i++) {
1300 		uint16_t l_idx = port->seen_events_idx;
1301 		struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1302 		struct rte_event *event = &events[i];
1303 		qf->queue_id = event->queue_id;
1304 		qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1305 
1306 		port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1307 
1308 		dsw_port_queue_dequeued_stats(port, event->queue_id);
1309 	}
1310 
1311 	if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1312 		port->seen_events_len =
1313 			RTE_MIN(port->seen_events_len + num,
1314 				DSW_MAX_EVENTS_RECORDED);
1315 }
1316 
1317 #ifdef DSW_SORT_DEQUEUED
1318 
1319 #define DSW_EVENT_TO_INT(_event)				\
1320 	((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1321 
1322 static inline int
dsw_cmp_event(const void * v_event_a,const void * v_event_b)1323 dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1324 {
1325 	const struct rte_event *event_a = v_event_a;
1326 	const struct rte_event *event_b = v_event_b;
1327 
1328 	return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1329 }
1330 #endif
1331 
1332 static uint16_t
dsw_port_dequeue_burst(struct dsw_port * port,struct rte_event * events,uint16_t num)1333 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1334 		       uint16_t num)
1335 {
1336 	if (unlikely(port->in_buffer_len > 0)) {
1337 		uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1338 
1339 		rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1340 			   dequeued * sizeof(struct rte_event));
1341 
1342 		port->in_buffer_start += dequeued;
1343 		port->in_buffer_len -= dequeued;
1344 
1345 		if (port->in_buffer_len == 0)
1346 			port->in_buffer_start = 0;
1347 
1348 		return dequeued;
1349 	}
1350 
1351 	return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1352 }
1353 
1354 uint16_t
dsw_event_dequeue_burst(void * port,struct rte_event * events,uint16_t num,uint64_t wait __rte_unused)1355 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1356 			uint64_t wait __rte_unused)
1357 {
1358 	struct dsw_port *source_port = port;
1359 	struct dsw_evdev *dsw = source_port->dsw;
1360 	uint16_t dequeued;
1361 
1362 	source_port->pending_releases = 0;
1363 
1364 	dsw_port_bg_process(dsw, source_port);
1365 
1366 	if (unlikely(num > source_port->dequeue_depth))
1367 		num = source_port->dequeue_depth;
1368 
1369 	dequeued = dsw_port_dequeue_burst(source_port, events, num);
1370 
1371 	source_port->pending_releases = dequeued;
1372 
1373 	dsw_port_load_record(source_port, dequeued);
1374 
1375 	dsw_port_note_op(source_port, dequeued);
1376 
1377 	if (dequeued > 0) {
1378 		DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1379 				dequeued);
1380 
1381 		dsw_port_return_credits(dsw, source_port, dequeued);
1382 
1383 		/* One potential optimization one might think of is to
1384 		 * add a migration state (prior to 'pausing'), and
1385 		 * only record seen events when the port is in this
1386 		 * state (and transit to 'pausing' when enough events
1387 		 * have been gathered). However, that schema doesn't
1388 		 * seem to improve performance.
1389 		 */
1390 		dsw_port_record_seen_events(port, events, dequeued);
1391 	} else /* Zero-size dequeue means a likely idle port, and thus
1392 		* we can afford trading some efficiency for a slightly
1393 		* reduced event wall-time latency.
1394 		*/
1395 		dsw_port_flush_out_buffers(dsw, port);
1396 
1397 #ifdef DSW_SORT_DEQUEUED
1398 	dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1399 #endif
1400 
1401 	return dequeued;
1402 }
1403 
dsw_event_maintain(void * port,int op)1404 void dsw_event_maintain(void *port, int op)
1405 {
1406 	struct dsw_port *source_port = port;
1407 	struct dsw_evdev *dsw = source_port->dsw;
1408 
1409 	dsw_port_note_op(source_port, 0);
1410 	dsw_port_bg_process(dsw, source_port);
1411 
1412 	if (op & RTE_EVENT_DEV_MAINT_OP_FLUSH)
1413 		dsw_port_flush_out_buffers(dsw, source_port);
1414 }
1415