xref: /dpdk/examples/ip_pipeline/thread.c (revision 87b36dcd)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2018 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11 
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17 
18 #include "common.h"
19 #include "thread.h"
20 #include "pipeline.h"
21 
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX                               256
24 #endif
25 
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE                                   64
28 #endif
29 
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS                             100
32 #endif
33 
34 /**
35  * Master thead: data plane thread context
36  */
37 struct thread {
38 	struct rte_ring *msgq_req;
39 	struct rte_ring *msgq_rsp;
40 
41 	uint32_t enabled;
42 };
43 
44 static struct thread thread[RTE_MAX_LCORE];
45 
46 /**
47  * Data plane threads: context
48  */
49 struct table_data {
50 	struct rte_table_action *a;
51 };
52 
53 struct pipeline_data {
54 	struct rte_pipeline *p;
55 	struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
56 	uint32_t n_tables;
57 
58 	struct rte_ring *msgq_req;
59 	struct rte_ring *msgq_rsp;
60 	uint64_t timer_period; /* Measured in CPU cycles. */
61 	uint64_t time_next;
62 
63 	uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
64 };
65 
66 struct thread_data {
67 	struct rte_pipeline *p[THREAD_PIPELINES_MAX];
68 	uint32_t n_pipelines;
69 
70 	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71 	struct rte_ring *msgq_req;
72 	struct rte_ring *msgq_rsp;
73 	uint64_t timer_period; /* Measured in CPU cycles. */
74 	uint64_t time_next;
75 	uint64_t time_next_min;
76 } __rte_cache_aligned;
77 
78 static struct thread_data thread_data[RTE_MAX_LCORE];
79 
80 /**
81  * Master thread: data plane thread init
82  */
83 static void
84 thread_free(void)
85 {
86 	uint32_t i;
87 
88 	for (i = 0; i < RTE_MAX_LCORE; i++) {
89 		struct thread *t = &thread[i];
90 
91 		if (!rte_lcore_is_enabled(i))
92 			continue;
93 
94 		/* MSGQs */
95 		if (t->msgq_req)
96 			rte_ring_free(t->msgq_req);
97 
98 		if (t->msgq_rsp)
99 			rte_ring_free(t->msgq_rsp);
100 	}
101 }
102 
103 int
104 thread_init(void)
105 {
106 	uint32_t i;
107 
108 	RTE_LCORE_FOREACH_SLAVE(i) {
109 		char name[NAME_MAX];
110 		struct rte_ring *msgq_req, *msgq_rsp;
111 		struct thread *t = &thread[i];
112 		struct thread_data *t_data = &thread_data[i];
113 		uint32_t cpu_id = rte_lcore_to_socket_id(i);
114 
115 		/* MSGQs */
116 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
117 
118 		msgq_req = rte_ring_create(name,
119 			THREAD_MSGQ_SIZE,
120 			cpu_id,
121 			RING_F_SP_ENQ | RING_F_SC_DEQ);
122 
123 		if (msgq_req == NULL) {
124 			thread_free();
125 			return -1;
126 		}
127 
128 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
129 
130 		msgq_rsp = rte_ring_create(name,
131 			THREAD_MSGQ_SIZE,
132 			cpu_id,
133 			RING_F_SP_ENQ | RING_F_SC_DEQ);
134 
135 		if (msgq_rsp == NULL) {
136 			thread_free();
137 			return -1;
138 		}
139 
140 		/* Master thread records */
141 		t->msgq_req = msgq_req;
142 		t->msgq_rsp = msgq_rsp;
143 		t->enabled = 1;
144 
145 		/* Data plane thread records */
146 		t_data->n_pipelines = 0;
147 		t_data->msgq_req = msgq_req;
148 		t_data->msgq_rsp = msgq_rsp;
149 		t_data->timer_period =
150 			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
151 		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
152 		t_data->time_next_min = t_data->time_next;
153 	}
154 
155 	return 0;
156 }
157 
158 static inline int
159 thread_is_running(uint32_t thread_id)
160 {
161 	enum rte_lcore_state_t thread_state;
162 
163 	thread_state = rte_eal_get_lcore_state(thread_id);
164 	return (thread_state == RUNNING) ? 1 : 0;
165 }
166 
167 /**
168  * Pipeline is running when:
169  *    (A) Pipeline is mapped to a data plane thread AND
170  *    (B) Its data plane thread is in RUNNING state.
171  */
172 static inline int
173 pipeline_is_running(struct pipeline *p)
174 {
175 	if (p->enabled == 0)
176 		return 0;
177 
178 	return thread_is_running(p->thread_id);
179 }
180 
181 /**
182  * Master thread & data plane threads: message passing
183  */
184 enum thread_req_type {
185 	THREAD_REQ_PIPELINE_ENABLE = 0,
186 	THREAD_REQ_PIPELINE_DISABLE,
187 	THREAD_REQ_MAX
188 };
189 
190 struct thread_msg_req {
191 	enum thread_req_type type;
192 
193 	union {
194 		struct {
195 			struct rte_pipeline *p;
196 			struct {
197 				struct rte_table_action *a;
198 			} table[RTE_PIPELINE_TABLE_MAX];
199 			struct rte_ring *msgq_req;
200 			struct rte_ring *msgq_rsp;
201 			uint32_t timer_period_ms;
202 			uint32_t n_tables;
203 		} pipeline_enable;
204 
205 		struct {
206 			struct rte_pipeline *p;
207 		} pipeline_disable;
208 	};
209 };
210 
211 struct thread_msg_rsp {
212 	int status;
213 };
214 
215 /**
216  * Master thread
217  */
218 static struct thread_msg_req *
219 thread_msg_alloc(void)
220 {
221 	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
222 		sizeof(struct thread_msg_rsp));
223 
224 	return calloc(1, size);
225 }
226 
227 static void
228 thread_msg_free(struct thread_msg_rsp *rsp)
229 {
230 	free(rsp);
231 }
232 
233 static struct thread_msg_rsp *
234 thread_msg_send_recv(uint32_t thread_id,
235 	struct thread_msg_req *req)
236 {
237 	struct thread *t = &thread[thread_id];
238 	struct rte_ring *msgq_req = t->msgq_req;
239 	struct rte_ring *msgq_rsp = t->msgq_rsp;
240 	struct thread_msg_rsp *rsp;
241 	int status;
242 
243 	/* send */
244 	do {
245 		status = rte_ring_sp_enqueue(msgq_req, req);
246 	} while (status == -ENOBUFS);
247 
248 	/* recv */
249 	do {
250 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
251 	} while (status != 0);
252 
253 	return rsp;
254 }
255 
256 int
257 thread_pipeline_enable(uint32_t thread_id,
258 	const char *pipeline_name)
259 {
260 	struct pipeline *p = pipeline_find(pipeline_name);
261 	struct thread *t;
262 	struct thread_msg_req *req;
263 	struct thread_msg_rsp *rsp;
264 	uint32_t i;
265 	int status;
266 
267 	/* Check input params */
268 	if ((thread_id >= RTE_MAX_LCORE) ||
269 		(p == NULL) ||
270 		(p->n_ports_in == 0) ||
271 		(p->n_ports_out == 0) ||
272 		(p->n_tables == 0))
273 		return -1;
274 
275 	t = &thread[thread_id];
276 	if ((t->enabled == 0) ||
277 		p->enabled)
278 		return -1;
279 
280 	if (!thread_is_running(thread_id)) {
281 		struct thread_data *td = &thread_data[thread_id];
282 		struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
283 
284 		if (td->n_pipelines >= THREAD_PIPELINES_MAX)
285 			return -1;
286 
287 		/* Data plane thread */
288 		td->p[td->n_pipelines] = p->p;
289 
290 		tdp->p = p->p;
291 		for (i = 0; i < p->n_tables; i++)
292 			tdp->table_data[i].a = p->table[i].a;
293 
294 		tdp->n_tables = p->n_tables;
295 
296 		tdp->msgq_req = p->msgq_req;
297 		tdp->msgq_rsp = p->msgq_rsp;
298 		tdp->timer_period = (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
299 		tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
300 
301 		td->n_pipelines++;
302 
303 		/* Pipeline */
304 		p->thread_id = thread_id;
305 		p->enabled = 1;
306 
307 		return 0;
308 	}
309 
310 	/* Allocate request */
311 	req = thread_msg_alloc();
312 	if (req == NULL)
313 		return -1;
314 
315 	/* Write request */
316 	req->type = THREAD_REQ_PIPELINE_ENABLE;
317 	req->pipeline_enable.p = p->p;
318 	for (i = 0; i < p->n_tables; i++)
319 		req->pipeline_enable.table[i].a =
320 			p->table[i].a;
321 	req->pipeline_enable.msgq_req = p->msgq_req;
322 	req->pipeline_enable.msgq_rsp = p->msgq_rsp;
323 	req->pipeline_enable.timer_period_ms = p->timer_period_ms;
324 	req->pipeline_enable.n_tables = p->n_tables;
325 
326 	/* Send request and wait for response */
327 	rsp = thread_msg_send_recv(thread_id, req);
328 	if (rsp == NULL)
329 		return -1;
330 
331 	/* Read response */
332 	status = rsp->status;
333 
334 	/* Free response */
335 	thread_msg_free(rsp);
336 
337 	/* Request completion */
338 	if (status)
339 		return status;
340 
341 	p->thread_id = thread_id;
342 	p->enabled = 1;
343 
344 	return 0;
345 }
346 
347 int
348 thread_pipeline_disable(uint32_t thread_id,
349 	const char *pipeline_name)
350 {
351 	struct pipeline *p = pipeline_find(pipeline_name);
352 	struct thread *t;
353 	struct thread_msg_req *req;
354 	struct thread_msg_rsp *rsp;
355 	int status;
356 
357 	/* Check input params */
358 	if ((thread_id >= RTE_MAX_LCORE) ||
359 		(p == NULL))
360 		return -1;
361 
362 	t = &thread[thread_id];
363 	if (t->enabled == 0)
364 		return -1;
365 
366 	if (p->enabled == 0)
367 		return 0;
368 
369 	if (p->thread_id != thread_id)
370 		return -1;
371 
372 	if (!thread_is_running(thread_id)) {
373 		struct thread_data *td = &thread_data[thread_id];
374 		uint32_t i;
375 
376 		for (i = 0; i < td->n_pipelines; i++) {
377 			struct pipeline_data *tdp = &td->pipeline_data[i];
378 
379 			if (tdp->p != p->p)
380 				continue;
381 
382 			/* Data plane thread */
383 			if (i < td->n_pipelines - 1) {
384 				struct rte_pipeline *pipeline_last =
385 					td->p[td->n_pipelines - 1];
386 				struct pipeline_data *tdp_last =
387 					&td->pipeline_data[td->n_pipelines - 1];
388 
389 				td->p[i] = pipeline_last;
390 				memcpy(tdp, tdp_last, sizeof(*tdp));
391 			}
392 
393 			td->n_pipelines--;
394 
395 			/* Pipeline */
396 			p->enabled = 0;
397 
398 			break;
399 		}
400 
401 		return 0;
402 	}
403 
404 	/* Allocate request */
405 	req = thread_msg_alloc();
406 	if (req == NULL)
407 		return -1;
408 
409 	/* Write request */
410 	req->type = THREAD_REQ_PIPELINE_DISABLE;
411 	req->pipeline_disable.p = p->p;
412 
413 	/* Send request and wait for response */
414 	rsp = thread_msg_send_recv(thread_id, req);
415 	if (rsp == NULL)
416 		return -1;
417 
418 	/* Read response */
419 	status = rsp->status;
420 
421 	/* Free response */
422 	thread_msg_free(rsp);
423 
424 	/* Request completion */
425 	if (status)
426 		return status;
427 
428 	p->enabled = 0;
429 
430 	return 0;
431 }
432 
433 /**
434  * Data plane threads: message handling
435  */
436 static inline struct thread_msg_req *
437 thread_msg_recv(struct rte_ring *msgq_req)
438 {
439 	struct thread_msg_req *req;
440 
441 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
442 
443 	if (status != 0)
444 		return NULL;
445 
446 	return req;
447 }
448 
449 static inline void
450 thread_msg_send(struct rte_ring *msgq_rsp,
451 	struct thread_msg_rsp *rsp)
452 {
453 	int status;
454 
455 	do {
456 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
457 	} while (status == -ENOBUFS);
458 }
459 
460 static struct thread_msg_rsp *
461 thread_msg_handle_pipeline_enable(struct thread_data *t,
462 	struct thread_msg_req *req)
463 {
464 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
465 	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
466 	uint32_t i;
467 
468 	/* Request */
469 	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
470 		rsp->status = -1;
471 		return rsp;
472 	}
473 
474 	t->p[t->n_pipelines] = req->pipeline_enable.p;
475 
476 	p->p = req->pipeline_enable.p;
477 	for (i = 0; i < req->pipeline_enable.n_tables; i++)
478 		p->table_data[i].a =
479 			req->pipeline_enable.table[i].a;
480 
481 	p->n_tables = req->pipeline_enable.n_tables;
482 
483 	p->msgq_req = req->pipeline_enable.msgq_req;
484 	p->msgq_rsp = req->pipeline_enable.msgq_rsp;
485 	p->timer_period =
486 		(rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
487 	p->time_next = rte_get_tsc_cycles() + p->timer_period;
488 
489 	t->n_pipelines++;
490 
491 	/* Response */
492 	rsp->status = 0;
493 	return rsp;
494 }
495 
496 static struct thread_msg_rsp *
497 thread_msg_handle_pipeline_disable(struct thread_data *t,
498 	struct thread_msg_req *req)
499 {
500 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
501 	uint32_t n_pipelines = t->n_pipelines;
502 	struct rte_pipeline *pipeline = req->pipeline_disable.p;
503 	uint32_t i;
504 
505 	/* find pipeline */
506 	for (i = 0; i < n_pipelines; i++) {
507 		struct pipeline_data *p = &t->pipeline_data[i];
508 
509 		if (p->p != pipeline)
510 			continue;
511 
512 		if (i < n_pipelines - 1) {
513 			struct rte_pipeline *pipeline_last =
514 				t->p[n_pipelines - 1];
515 			struct pipeline_data *p_last =
516 				&t->pipeline_data[n_pipelines - 1];
517 
518 			t->p[i] = pipeline_last;
519 			memcpy(p, p_last, sizeof(*p));
520 		}
521 
522 		t->n_pipelines--;
523 
524 		rsp->status = 0;
525 		return rsp;
526 	}
527 
528 	/* should not get here */
529 	rsp->status = 0;
530 	return rsp;
531 }
532 
533 static void
534 thread_msg_handle(struct thread_data *t)
535 {
536 	for ( ; ; ) {
537 		struct thread_msg_req *req;
538 		struct thread_msg_rsp *rsp;
539 
540 		req = thread_msg_recv(t->msgq_req);
541 		if (req == NULL)
542 			break;
543 
544 		switch (req->type) {
545 		case THREAD_REQ_PIPELINE_ENABLE:
546 			rsp = thread_msg_handle_pipeline_enable(t, req);
547 			break;
548 
549 		case THREAD_REQ_PIPELINE_DISABLE:
550 			rsp = thread_msg_handle_pipeline_disable(t, req);
551 			break;
552 
553 		default:
554 			rsp = (struct thread_msg_rsp *) req;
555 			rsp->status = -1;
556 		}
557 
558 		thread_msg_send(t->msgq_rsp, rsp);
559 	}
560 }
561 
562 /**
563  * Master thread & data plane threads: message passing
564  */
565 enum pipeline_req_type {
566 	/* Port IN */
567 	PIPELINE_REQ_PORT_IN_STATS_READ,
568 	PIPELINE_REQ_PORT_IN_ENABLE,
569 	PIPELINE_REQ_PORT_IN_DISABLE,
570 
571 	/* Port OUT */
572 	PIPELINE_REQ_PORT_OUT_STATS_READ,
573 
574 	/* Table */
575 	PIPELINE_REQ_TABLE_STATS_READ,
576 	PIPELINE_REQ_TABLE_RULE_ADD,
577 	PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT,
578 	PIPELINE_REQ_TABLE_RULE_ADD_BULK,
579 	PIPELINE_REQ_TABLE_RULE_DELETE,
580 	PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT,
581 	PIPELINE_REQ_TABLE_RULE_STATS_READ,
582 	PIPELINE_REQ_TABLE_MTR_PROFILE_ADD,
583 	PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE,
584 	PIPELINE_REQ_TABLE_RULE_MTR_READ,
585 	PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE,
586 	PIPELINE_REQ_TABLE_RULE_TTL_READ,
587 	PIPELINE_REQ_MAX
588 };
589 
590 struct pipeline_msg_req_port_in_stats_read {
591 	int clear;
592 };
593 
594 struct pipeline_msg_req_port_out_stats_read {
595 	int clear;
596 };
597 
598 struct pipeline_msg_req_table_stats_read {
599 	int clear;
600 };
601 
602 struct pipeline_msg_req_table_rule_add {
603 	struct table_rule_match match;
604 	struct table_rule_action action;
605 };
606 
607 struct pipeline_msg_req_table_rule_add_default {
608 	struct table_rule_action action;
609 };
610 
611 struct pipeline_msg_req_table_rule_add_bulk {
612 	struct table_rule_list *list;
613 	int bulk;
614 };
615 
616 struct pipeline_msg_req_table_rule_delete {
617 	struct table_rule_match match;
618 };
619 
620 struct pipeline_msg_req_table_rule_stats_read {
621 	void *data;
622 	int clear;
623 };
624 
625 struct pipeline_msg_req_table_mtr_profile_add {
626 	uint32_t meter_profile_id;
627 	struct rte_table_action_meter_profile profile;
628 };
629 
630 struct pipeline_msg_req_table_mtr_profile_delete {
631 	uint32_t meter_profile_id;
632 };
633 
634 struct pipeline_msg_req_table_rule_mtr_read {
635 	void *data;
636 	uint32_t tc_mask;
637 	int clear;
638 };
639 
640 struct pipeline_msg_req_table_dscp_table_update {
641 	uint64_t dscp_mask;
642 	struct rte_table_action_dscp_table dscp_table;
643 };
644 
645 struct pipeline_msg_req_table_rule_ttl_read {
646 	void *data;
647 	int clear;
648 };
649 
650 struct pipeline_msg_req {
651 	enum pipeline_req_type type;
652 	uint32_t id; /* Port IN, port OUT or table ID */
653 
654 	RTE_STD_C11
655 	union {
656 		struct pipeline_msg_req_port_in_stats_read port_in_stats_read;
657 		struct pipeline_msg_req_port_out_stats_read port_out_stats_read;
658 		struct pipeline_msg_req_table_stats_read table_stats_read;
659 		struct pipeline_msg_req_table_rule_add table_rule_add;
660 		struct pipeline_msg_req_table_rule_add_default table_rule_add_default;
661 		struct pipeline_msg_req_table_rule_add_bulk table_rule_add_bulk;
662 		struct pipeline_msg_req_table_rule_delete table_rule_delete;
663 		struct pipeline_msg_req_table_rule_stats_read table_rule_stats_read;
664 		struct pipeline_msg_req_table_mtr_profile_add table_mtr_profile_add;
665 		struct pipeline_msg_req_table_mtr_profile_delete table_mtr_profile_delete;
666 		struct pipeline_msg_req_table_rule_mtr_read table_rule_mtr_read;
667 		struct pipeline_msg_req_table_dscp_table_update table_dscp_table_update;
668 		struct pipeline_msg_req_table_rule_ttl_read table_rule_ttl_read;
669 	};
670 };
671 
672 struct pipeline_msg_rsp_port_in_stats_read {
673 	struct rte_pipeline_port_in_stats stats;
674 };
675 
676 struct pipeline_msg_rsp_port_out_stats_read {
677 	struct rte_pipeline_port_out_stats stats;
678 };
679 
680 struct pipeline_msg_rsp_table_stats_read {
681 	struct rte_pipeline_table_stats stats;
682 };
683 
684 struct pipeline_msg_rsp_table_rule_add {
685 	void *data;
686 };
687 
688 struct pipeline_msg_rsp_table_rule_add_default {
689 	void *data;
690 };
691 
692 struct pipeline_msg_rsp_table_rule_add_bulk {
693 	uint32_t n_rules;
694 };
695 
696 struct pipeline_msg_rsp_table_rule_stats_read {
697 	struct rte_table_action_stats_counters stats;
698 };
699 
700 struct pipeline_msg_rsp_table_rule_mtr_read {
701 	struct rte_table_action_mtr_counters stats;
702 };
703 
704 struct pipeline_msg_rsp_table_rule_ttl_read {
705 	struct rte_table_action_ttl_counters stats;
706 };
707 
708 struct pipeline_msg_rsp {
709 	int status;
710 
711 	RTE_STD_C11
712 	union {
713 		struct pipeline_msg_rsp_port_in_stats_read port_in_stats_read;
714 		struct pipeline_msg_rsp_port_out_stats_read port_out_stats_read;
715 		struct pipeline_msg_rsp_table_stats_read table_stats_read;
716 		struct pipeline_msg_rsp_table_rule_add table_rule_add;
717 		struct pipeline_msg_rsp_table_rule_add_default table_rule_add_default;
718 		struct pipeline_msg_rsp_table_rule_add_bulk table_rule_add_bulk;
719 		struct pipeline_msg_rsp_table_rule_stats_read table_rule_stats_read;
720 		struct pipeline_msg_rsp_table_rule_mtr_read table_rule_mtr_read;
721 		struct pipeline_msg_rsp_table_rule_ttl_read table_rule_ttl_read;
722 	};
723 };
724 
725 /**
726  * Master thread
727  */
728 static struct pipeline_msg_req *
729 pipeline_msg_alloc(void)
730 {
731 	size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
732 		sizeof(struct pipeline_msg_rsp));
733 
734 	return calloc(1, size);
735 }
736 
737 static void
738 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
739 {
740 	free(rsp);
741 }
742 
743 static struct pipeline_msg_rsp *
744 pipeline_msg_send_recv(struct pipeline *p,
745 	struct pipeline_msg_req *req)
746 {
747 	struct rte_ring *msgq_req = p->msgq_req;
748 	struct rte_ring *msgq_rsp = p->msgq_rsp;
749 	struct pipeline_msg_rsp *rsp;
750 	int status;
751 
752 	/* send */
753 	do {
754 		status = rte_ring_sp_enqueue(msgq_req, req);
755 	} while (status == -ENOBUFS);
756 
757 	/* recv */
758 	do {
759 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
760 	} while (status != 0);
761 
762 	return rsp;
763 }
764 
765 int
766 pipeline_port_in_stats_read(const char *pipeline_name,
767 	uint32_t port_id,
768 	struct rte_pipeline_port_in_stats *stats,
769 	int clear)
770 {
771 	struct pipeline *p;
772 	struct pipeline_msg_req *req;
773 	struct pipeline_msg_rsp *rsp;
774 	int status;
775 
776 	/* Check input params */
777 	if ((pipeline_name == NULL) ||
778 		(stats == NULL))
779 		return -1;
780 
781 	p = pipeline_find(pipeline_name);
782 	if ((p == NULL) ||
783 		(port_id >= p->n_ports_in))
784 		return -1;
785 
786 	if (!pipeline_is_running(p)) {
787 		status = rte_pipeline_port_in_stats_read(p->p,
788 			port_id,
789 			stats,
790 			clear);
791 
792 		return status;
793 	}
794 
795 	/* Allocate request */
796 	req = pipeline_msg_alloc();
797 	if (req == NULL)
798 		return -1;
799 
800 	/* Write request */
801 	req->type = PIPELINE_REQ_PORT_IN_STATS_READ;
802 	req->id = port_id;
803 	req->port_in_stats_read.clear = clear;
804 
805 	/* Send request and wait for response */
806 	rsp = pipeline_msg_send_recv(p, req);
807 	if (rsp == NULL)
808 		return -1;
809 
810 	/* Read response */
811 	status = rsp->status;
812 	if (status)
813 		memcpy(stats, &rsp->port_in_stats_read.stats, sizeof(*stats));
814 
815 	/* Free response */
816 	pipeline_msg_free(rsp);
817 
818 	return status;
819 }
820 
821 int
822 pipeline_port_in_enable(const char *pipeline_name,
823 	uint32_t port_id)
824 {
825 	struct pipeline *p;
826 	struct pipeline_msg_req *req;
827 	struct pipeline_msg_rsp *rsp;
828 	int status;
829 
830 	/* Check input params */
831 	if (pipeline_name == NULL)
832 		return -1;
833 
834 	p = pipeline_find(pipeline_name);
835 	if ((p == NULL) ||
836 		(port_id >= p->n_ports_in))
837 		return -1;
838 
839 	if (!pipeline_is_running(p)) {
840 		status = rte_pipeline_port_in_enable(p->p, port_id);
841 		return status;
842 	}
843 
844 	/* Allocate request */
845 	req = pipeline_msg_alloc();
846 	if (req == NULL)
847 		return -1;
848 
849 	/* Write request */
850 	req->type = PIPELINE_REQ_PORT_IN_ENABLE;
851 	req->id = port_id;
852 
853 	/* Send request and wait for response */
854 	rsp = pipeline_msg_send_recv(p, req);
855 	if (rsp == NULL)
856 		return -1;
857 
858 	/* Read response */
859 	status = rsp->status;
860 
861 	/* Free response */
862 	pipeline_msg_free(rsp);
863 
864 	return status;
865 }
866 
867 int
868 pipeline_port_in_disable(const char *pipeline_name,
869 	uint32_t port_id)
870 {
871 	struct pipeline *p;
872 	struct pipeline_msg_req *req;
873 	struct pipeline_msg_rsp *rsp;
874 	int status;
875 
876 	/* Check input params */
877 	if (pipeline_name == NULL)
878 		return -1;
879 
880 	p = pipeline_find(pipeline_name);
881 	if ((p == NULL) ||
882 		(port_id >= p->n_ports_in))
883 		return -1;
884 
885 	if (!pipeline_is_running(p)) {
886 		status = rte_pipeline_port_in_disable(p->p, port_id);
887 		return status;
888 	}
889 
890 	/* Allocate request */
891 	req = pipeline_msg_alloc();
892 	if (req == NULL)
893 		return -1;
894 
895 	/* Write request */
896 	req->type = PIPELINE_REQ_PORT_IN_DISABLE;
897 	req->id = port_id;
898 
899 	/* Send request and wait for response */
900 	rsp = pipeline_msg_send_recv(p, req);
901 	if (rsp == NULL)
902 		return -1;
903 
904 	/* Read response */
905 	status = rsp->status;
906 
907 	/* Free response */
908 	pipeline_msg_free(rsp);
909 
910 	return status;
911 }
912 
913 int
914 pipeline_port_out_stats_read(const char *pipeline_name,
915 	uint32_t port_id,
916 	struct rte_pipeline_port_out_stats *stats,
917 	int clear)
918 {
919 	struct pipeline *p;
920 	struct pipeline_msg_req *req;
921 	struct pipeline_msg_rsp *rsp;
922 	int status;
923 
924 	/* Check input params */
925 	if ((pipeline_name == NULL) ||
926 		(stats == NULL))
927 		return -1;
928 
929 	p = pipeline_find(pipeline_name);
930 	if ((p == NULL) ||
931 		(port_id >= p->n_ports_out))
932 		return -1;
933 
934 	if (!pipeline_is_running(p)) {
935 		status = rte_pipeline_port_out_stats_read(p->p,
936 			port_id,
937 			stats,
938 			clear);
939 
940 		return status;
941 	}
942 
943 	/* Allocate request */
944 	req = pipeline_msg_alloc();
945 	if (req == NULL)
946 		return -1;
947 
948 	/* Write request */
949 	req->type = PIPELINE_REQ_PORT_OUT_STATS_READ;
950 	req->id = port_id;
951 	req->port_out_stats_read.clear = clear;
952 
953 	/* Send request and wait for response */
954 	rsp = pipeline_msg_send_recv(p, req);
955 	if (rsp == NULL)
956 		return -1;
957 
958 	/* Read response */
959 	status = rsp->status;
960 	if (status)
961 		memcpy(stats, &rsp->port_out_stats_read.stats, sizeof(*stats));
962 
963 	/* Free response */
964 	pipeline_msg_free(rsp);
965 
966 	return status;
967 }
968 
969 int
970 pipeline_table_stats_read(const char *pipeline_name,
971 	uint32_t table_id,
972 	struct rte_pipeline_table_stats *stats,
973 	int clear)
974 {
975 	struct pipeline *p;
976 	struct pipeline_msg_req *req;
977 	struct pipeline_msg_rsp *rsp;
978 	int status;
979 
980 	/* Check input params */
981 	if ((pipeline_name == NULL) ||
982 		(stats == NULL))
983 		return -1;
984 
985 	p = pipeline_find(pipeline_name);
986 	if ((p == NULL) ||
987 		(table_id >= p->n_tables))
988 		return -1;
989 
990 	if (!pipeline_is_running(p)) {
991 		status = rte_pipeline_table_stats_read(p->p,
992 			table_id,
993 			stats,
994 			clear);
995 
996 		return status;
997 	}
998 
999 	/* Allocate request */
1000 	req = pipeline_msg_alloc();
1001 	if (req == NULL)
1002 		return -1;
1003 
1004 	/* Write request */
1005 	req->type = PIPELINE_REQ_TABLE_STATS_READ;
1006 	req->id = table_id;
1007 	req->table_stats_read.clear = clear;
1008 
1009 	/* Send request and wait for response */
1010 	rsp = pipeline_msg_send_recv(p, req);
1011 	if (rsp == NULL)
1012 		return -1;
1013 
1014 	/* Read response */
1015 	status = rsp->status;
1016 	if (status)
1017 		memcpy(stats, &rsp->table_stats_read.stats, sizeof(*stats));
1018 
1019 	/* Free response */
1020 	pipeline_msg_free(rsp);
1021 
1022 	return status;
1023 }
1024 
1025 static int
1026 match_check(struct table_rule_match *match,
1027 	struct pipeline *p,
1028 	uint32_t table_id)
1029 {
1030 	struct table *table;
1031 
1032 	if ((match == NULL) ||
1033 		(p == NULL) ||
1034 		(table_id >= p->n_tables))
1035 		return -1;
1036 
1037 	table = &p->table[table_id];
1038 	if (match->match_type != table->params.match_type)
1039 		return -1;
1040 
1041 	switch (match->match_type) {
1042 	case TABLE_ACL:
1043 	{
1044 		struct table_acl_params *t = &table->params.match.acl;
1045 		struct table_rule_match_acl *r = &match->match.acl;
1046 
1047 		if ((r->ip_version && (t->ip_version == 0)) ||
1048 			((r->ip_version == 0) && t->ip_version))
1049 			return -1;
1050 
1051 		if (r->ip_version) {
1052 			if ((r->sa_depth > 32) ||
1053 				(r->da_depth > 32))
1054 				return -1;
1055 		} else {
1056 			if ((r->sa_depth > 128) ||
1057 				(r->da_depth > 128))
1058 				return -1;
1059 		}
1060 		return 0;
1061 	}
1062 
1063 	case TABLE_ARRAY:
1064 		return 0;
1065 
1066 	case TABLE_HASH:
1067 		return 0;
1068 
1069 	case TABLE_LPM:
1070 	{
1071 		struct table_lpm_params *t = &table->params.match.lpm;
1072 		struct table_rule_match_lpm *r = &match->match.lpm;
1073 
1074 		if ((r->ip_version && (t->key_size != 4)) ||
1075 			((r->ip_version == 0) && (t->key_size != 16)))
1076 			return -1;
1077 
1078 		if (r->ip_version) {
1079 			if (r->depth > 32)
1080 				return -1;
1081 		} else {
1082 			if (r->depth > 128)
1083 				return -1;
1084 		}
1085 		return 0;
1086 	}
1087 
1088 	case TABLE_STUB:
1089 		return -1;
1090 
1091 	default:
1092 		return -1;
1093 	}
1094 }
1095 
1096 static int
1097 action_check(struct table_rule_action *action,
1098 	struct pipeline *p,
1099 	uint32_t table_id)
1100 {
1101 	struct table_action_profile *ap;
1102 
1103 	if ((action == NULL) ||
1104 		(p == NULL) ||
1105 		(table_id >= p->n_tables))
1106 		return -1;
1107 
1108 	ap = p->table[table_id].ap;
1109 	if (action->action_mask != ap->params.action_mask)
1110 		return -1;
1111 
1112 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1113 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1114 			(action->fwd.id >= p->n_ports_out))
1115 			return -1;
1116 
1117 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1118 			(action->fwd.id >= p->n_tables))
1119 			return -1;
1120 	}
1121 
1122 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
1123 		uint32_t tc_mask0 = (1 << ap->params.mtr.n_tc) - 1;
1124 		uint32_t tc_mask1 = action->mtr.tc_mask;
1125 
1126 		if (tc_mask1 != tc_mask0)
1127 			return -1;
1128 	}
1129 
1130 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
1131 		uint32_t n_subports_per_port =
1132 			ap->params.tm.n_subports_per_port;
1133 		uint32_t n_pipes_per_subport =
1134 			ap->params.tm.n_pipes_per_subport;
1135 		uint32_t subport_id = action->tm.subport_id;
1136 		uint32_t pipe_id = action->tm.pipe_id;
1137 
1138 		if ((subport_id >= n_subports_per_port) ||
1139 			(pipe_id >= n_pipes_per_subport))
1140 			return -1;
1141 	}
1142 
1143 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
1144 		uint64_t encap_mask = ap->params.encap.encap_mask;
1145 		enum rte_table_action_encap_type type = action->encap.type;
1146 
1147 		if ((encap_mask & (1LLU << type)) == 0)
1148 			return -1;
1149 	}
1150 
1151 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
1152 		int ip_version0 = ap->params.common.ip_version;
1153 		int ip_version1 = action->nat.ip_version;
1154 
1155 		if ((ip_version1 && (ip_version0 == 0)) ||
1156 			((ip_version1 == 0) && ip_version0))
1157 			return -1;
1158 	}
1159 
1160 	return 0;
1161 }
1162 
1163 static int
1164 action_default_check(struct table_rule_action *action,
1165 	struct pipeline *p,
1166 	uint32_t table_id)
1167 {
1168 	if ((action == NULL) ||
1169 		(action->action_mask != (1LLU << RTE_TABLE_ACTION_FWD)) ||
1170 		(p == NULL) ||
1171 		(table_id >= p->n_tables))
1172 		return -1;
1173 
1174 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1175 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1176 			(action->fwd.id >= p->n_ports_out))
1177 			return -1;
1178 
1179 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1180 			(action->fwd.id >= p->n_tables))
1181 			return -1;
1182 	}
1183 
1184 	return 0;
1185 }
1186 
1187 union table_rule_match_low_level {
1188 	struct rte_table_acl_rule_add_params acl_add;
1189 	struct rte_table_acl_rule_delete_params acl_delete;
1190 	struct rte_table_array_key array;
1191 	uint8_t hash[TABLE_RULE_MATCH_SIZE_MAX];
1192 	struct rte_table_lpm_key lpm_ipv4;
1193 	struct rte_table_lpm_ipv6_key lpm_ipv6;
1194 };
1195 
1196 static int
1197 match_convert(struct table_rule_match *mh,
1198 	union table_rule_match_low_level *ml,
1199 	int add);
1200 
1201 static int
1202 action_convert(struct rte_table_action *a,
1203 	struct table_rule_action *action,
1204 	struct rte_pipeline_table_entry *data);
1205 
1206 struct table_ll {
1207 	struct rte_pipeline *p;
1208 	int table_id;
1209 	struct rte_table_action *a;
1210 	int bulk_supported;
1211 };
1212 
1213 static int
1214 table_rule_add_bulk_ll(struct table_ll *table,
1215 	struct table_rule_list *list,
1216 	uint32_t *n_rules)
1217 {
1218 	union table_rule_match_low_level *match_ll = NULL;
1219 	uint8_t *action_ll = NULL;
1220 	void **match_ll_ptr = NULL;
1221 	struct rte_pipeline_table_entry **action_ll_ptr = NULL;
1222 	struct rte_pipeline_table_entry **entries_ptr = NULL;
1223 	int *found = NULL;
1224 	struct table_rule *rule;
1225 	uint32_t n, i;
1226 	int status = 0;
1227 
1228 	n = 0;
1229 	TAILQ_FOREACH(rule, list, node)
1230 		n++;
1231 
1232 	/* Memory allocation */
1233 	match_ll = calloc(n, sizeof(union table_rule_match_low_level));
1234 	action_ll = calloc(n, TABLE_RULE_ACTION_SIZE_MAX);
1235 
1236 	match_ll_ptr = calloc(n, sizeof(void *));
1237 	action_ll_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1238 
1239 	entries_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1240 	found = calloc(n, sizeof(int));
1241 
1242 	if (match_ll == NULL ||
1243 		action_ll == NULL ||
1244 		match_ll_ptr == NULL ||
1245 		action_ll_ptr == NULL ||
1246 		entries_ptr == NULL ||
1247 		found == NULL) {
1248 			status = -ENOMEM;
1249 			goto table_rule_add_bulk_ll_free;
1250 	}
1251 
1252 	/* Init */
1253 	for (i = 0; i < n; i++) {
1254 		match_ll_ptr[i] = (void *)&match_ll[i];
1255 		action_ll_ptr[i] = (struct rte_pipeline_table_entry *)
1256 			&action_ll[i * TABLE_RULE_ACTION_SIZE_MAX];
1257 	}
1258 
1259 	/* Rule (match, action) conversion */
1260 	i = 0;
1261 	TAILQ_FOREACH(rule, list, node) {
1262 		status = match_convert(&rule->match, match_ll_ptr[i], 1);
1263 		if (status)
1264 			goto table_rule_add_bulk_ll_free;
1265 
1266 		status = action_convert(table->a, &rule->action, action_ll_ptr[i]);
1267 		if (status)
1268 			goto table_rule_add_bulk_ll_free;
1269 
1270 		i++;
1271 	}
1272 
1273 	/* Add rule (match, action) to table */
1274 	if (table->bulk_supported) {
1275 		status = rte_pipeline_table_entry_add_bulk(table->p,
1276 			table->table_id,
1277 			match_ll_ptr,
1278 			action_ll_ptr,
1279 			n,
1280 			found,
1281 			entries_ptr);
1282 		if (status)
1283 			goto table_rule_add_bulk_ll_free;
1284 	} else
1285 		for (i = 0; i < n; i++) {
1286 			status = rte_pipeline_table_entry_add(table->p,
1287 				table->table_id,
1288 				match_ll_ptr[i],
1289 				action_ll_ptr[i],
1290 				&found[i],
1291 				&entries_ptr[i]);
1292 			if (status) {
1293 				if (i == 0)
1294 					goto table_rule_add_bulk_ll_free;
1295 
1296 				/* No roll-back. */
1297 				status = 0;
1298 				n = i;
1299 				break;
1300 			}
1301 		}
1302 
1303 	/* Write back to the rule list. */
1304 	i = 0;
1305 	TAILQ_FOREACH(rule, list, node) {
1306 		if (i >= n)
1307 			break;
1308 
1309 		rule->data = entries_ptr[i];
1310 
1311 		i++;
1312 	}
1313 
1314 	*n_rules = n;
1315 
1316 	/* Free */
1317 table_rule_add_bulk_ll_free:
1318 	free(found);
1319 	free(entries_ptr);
1320 	free(action_ll_ptr);
1321 	free(match_ll_ptr);
1322 	free(action_ll);
1323 	free(match_ll);
1324 
1325 	return status;
1326 }
1327 
1328 int
1329 pipeline_table_rule_add(const char *pipeline_name,
1330 	uint32_t table_id,
1331 	struct table_rule_match *match,
1332 	struct table_rule_action *action)
1333 {
1334 	struct pipeline *p;
1335 	struct table *table;
1336 	struct pipeline_msg_req *req;
1337 	struct pipeline_msg_rsp *rsp;
1338 	struct table_rule *rule;
1339 	int status;
1340 
1341 	/* Check input params */
1342 	if ((pipeline_name == NULL) ||
1343 		(match == NULL) ||
1344 		(action == NULL))
1345 		return -1;
1346 
1347 	p = pipeline_find(pipeline_name);
1348 	if ((p == NULL) ||
1349 		(table_id >= p->n_tables) ||
1350 		match_check(match, p, table_id) ||
1351 		action_check(action, p, table_id))
1352 		return -1;
1353 
1354 	table = &p->table[table_id];
1355 
1356 	rule = calloc(1, sizeof(struct table_rule));
1357 	if (rule == NULL)
1358 		return -1;
1359 
1360 	memcpy(&rule->match, match, sizeof(*match));
1361 	memcpy(&rule->action, action, sizeof(*action));
1362 
1363 	if (!pipeline_is_running(p)) {
1364 		union table_rule_match_low_level match_ll;
1365 		struct rte_pipeline_table_entry *data_in, *data_out;
1366 		int key_found;
1367 		uint8_t *buffer;
1368 
1369 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1370 		if (buffer == NULL) {
1371 			free(rule);
1372 			return -1;
1373 		}
1374 
1375 		/* Table match-action rule conversion */
1376 		data_in = (struct rte_pipeline_table_entry *)buffer;
1377 
1378 		status = match_convert(match, &match_ll, 1);
1379 		if (status) {
1380 			free(buffer);
1381 			free(rule);
1382 			return -1;
1383 		}
1384 
1385 		status = action_convert(table->a, action, data_in);
1386 		if (status) {
1387 			free(buffer);
1388 			free(rule);
1389 			return -1;
1390 		}
1391 
1392 		/* Add rule (match, action) to table */
1393 		status = rte_pipeline_table_entry_add(p->p,
1394 				table_id,
1395 				&match_ll,
1396 				data_in,
1397 				&key_found,
1398 				&data_out);
1399 		if (status) {
1400 			free(buffer);
1401 			free(rule);
1402 			return -1;
1403 		}
1404 
1405 		/* Write Response */
1406 		rule->data = data_out;
1407 		table_rule_add(table, rule);
1408 
1409 		free(buffer);
1410 		return 0;
1411 	}
1412 
1413 	/* Allocate request */
1414 	req = pipeline_msg_alloc();
1415 	if (req == NULL) {
1416 		free(rule);
1417 		return -1;
1418 	}
1419 
1420 	/* Write request */
1421 	req->type = PIPELINE_REQ_TABLE_RULE_ADD;
1422 	req->id = table_id;
1423 	memcpy(&req->table_rule_add.match, match, sizeof(*match));
1424 	memcpy(&req->table_rule_add.action, action, sizeof(*action));
1425 
1426 	/* Send request and wait for response */
1427 	rsp = pipeline_msg_send_recv(p, req);
1428 	if (rsp == NULL) {
1429 		free(rule);
1430 		return -1;
1431 	}
1432 
1433 	/* Read response */
1434 	status = rsp->status;
1435 	if (status == 0) {
1436 		rule->data = rsp->table_rule_add.data;
1437 		table_rule_add(table, rule);
1438 	} else
1439 		free(rule);
1440 
1441 	/* Free response */
1442 	pipeline_msg_free(rsp);
1443 
1444 	return status;
1445 }
1446 
1447 int
1448 pipeline_table_rule_add_default(const char *pipeline_name,
1449 	uint32_t table_id,
1450 	struct table_rule_action *action)
1451 {
1452 	struct pipeline *p;
1453 	struct table *table;
1454 	struct pipeline_msg_req *req;
1455 	struct pipeline_msg_rsp *rsp;
1456 	struct table_rule *rule;
1457 	int status;
1458 
1459 	/* Check input params */
1460 	if ((pipeline_name == NULL) ||
1461 		(action == NULL))
1462 		return -1;
1463 
1464 	p = pipeline_find(pipeline_name);
1465 	if ((p == NULL) ||
1466 		(table_id >= p->n_tables) ||
1467 		action_default_check(action, p, table_id))
1468 		return -1;
1469 
1470 	table = &p->table[table_id];
1471 
1472 	rule = calloc(1, sizeof(struct table_rule));
1473 	if (rule == NULL)
1474 		return -1;
1475 
1476 	memcpy(&rule->action, action, sizeof(*action));
1477 
1478 	if (!pipeline_is_running(p)) {
1479 		struct rte_pipeline_table_entry *data_in, *data_out;
1480 		uint8_t *buffer;
1481 
1482 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1483 		if (buffer == NULL) {
1484 			free(rule);
1485 			return -1;
1486 		}
1487 
1488 		/* Apply actions */
1489 		data_in = (struct rte_pipeline_table_entry *)buffer;
1490 
1491 		data_in->action = action->fwd.action;
1492 		if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
1493 			data_in->port_id = action->fwd.id;
1494 		if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
1495 			data_in->table_id = action->fwd.id;
1496 
1497 		/* Add default rule to table */
1498 		status = rte_pipeline_table_default_entry_add(p->p,
1499 				table_id,
1500 				data_in,
1501 				&data_out);
1502 		if (status) {
1503 			free(buffer);
1504 			free(rule);
1505 			return -1;
1506 		}
1507 
1508 		/* Write Response */
1509 		rule->data = data_out;
1510 		table_rule_default_add(table, rule);
1511 
1512 		free(buffer);
1513 		return 0;
1514 	}
1515 
1516 	/* Allocate request */
1517 	req = pipeline_msg_alloc();
1518 	if (req == NULL) {
1519 		free(rule);
1520 		return -1;
1521 	}
1522 
1523 	/* Write request */
1524 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT;
1525 	req->id = table_id;
1526 	memcpy(&req->table_rule_add_default.action, action, sizeof(*action));
1527 
1528 	/* Send request and wait for response */
1529 	rsp = pipeline_msg_send_recv(p, req);
1530 	if (rsp == NULL) {
1531 		free(rule);
1532 		return -1;
1533 	}
1534 
1535 	/* Read response */
1536 	status = rsp->status;
1537 	if (status == 0) {
1538 		rule->data = rsp->table_rule_add_default.data;
1539 		table_rule_default_add(table, rule);
1540 	} else
1541 		free(rule);
1542 
1543 	/* Free response */
1544 	pipeline_msg_free(rsp);
1545 
1546 	return status;
1547 }
1548 
1549 static uint32_t
1550 table_rule_list_free(struct table_rule_list *list)
1551 {
1552 	uint32_t n = 0;
1553 
1554 	if (!list)
1555 		return 0;
1556 
1557 	for ( ; ; ) {
1558 		struct table_rule *rule;
1559 
1560 		rule = TAILQ_FIRST(list);
1561 		if (rule == NULL)
1562 			break;
1563 
1564 		TAILQ_REMOVE(list, rule, node);
1565 		free(rule);
1566 		n++;
1567 	}
1568 
1569 	free(list);
1570 	return n;
1571 }
1572 
1573 int
1574 pipeline_table_rule_add_bulk(const char *pipeline_name,
1575 	uint32_t table_id,
1576 	struct table_rule_list *list,
1577 	uint32_t *n_rules_added,
1578 	uint32_t *n_rules_not_added)
1579 {
1580 	struct pipeline *p;
1581 	struct table *table;
1582 	struct pipeline_msg_req *req;
1583 	struct pipeline_msg_rsp *rsp;
1584 	struct table_rule *rule;
1585 	int status = 0;
1586 
1587 	/* Check input params */
1588 	if ((pipeline_name == NULL) ||
1589 		(list == NULL) ||
1590 		TAILQ_EMPTY(list) ||
1591 		(n_rules_added == NULL) ||
1592 		(n_rules_not_added == NULL)) {
1593 		table_rule_list_free(list);
1594 		return -EINVAL;
1595 	}
1596 
1597 	p = pipeline_find(pipeline_name);
1598 	if ((p == NULL) ||
1599 		(table_id >= p->n_tables)) {
1600 		table_rule_list_free(list);
1601 		return -EINVAL;
1602 	}
1603 
1604 	table = &p->table[table_id];
1605 
1606 	TAILQ_FOREACH(rule, list, node)
1607 		if (match_check(&rule->match, p, table_id) ||
1608 			action_check(&rule->action, p, table_id)) {
1609 			table_rule_list_free(list);
1610 			return -EINVAL;
1611 		}
1612 
1613 	if (!pipeline_is_running(p)) {
1614 		struct table_ll table_ll = {
1615 			.p = p->p,
1616 			.table_id = table_id,
1617 			.a = table->a,
1618 			.bulk_supported = table->params.match_type == TABLE_ACL,
1619 		};
1620 
1621 		status = table_rule_add_bulk_ll(&table_ll, list, n_rules_added);
1622 		if (status) {
1623 			table_rule_list_free(list);
1624 			return status;
1625 		}
1626 
1627 		table_rule_add_bulk(table, list, *n_rules_added);
1628 		*n_rules_not_added = table_rule_list_free(list);
1629 		return 0;
1630 	}
1631 
1632 	/* Allocate request */
1633 	req = pipeline_msg_alloc();
1634 	if (req == NULL) {
1635 		table_rule_list_free(list);
1636 		return -ENOMEM;
1637 	}
1638 
1639 	/* Write request */
1640 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_BULK;
1641 	req->id = table_id;
1642 	req->table_rule_add_bulk.list = list;
1643 	req->table_rule_add_bulk.bulk = table->params.match_type == TABLE_ACL;
1644 
1645 	/* Send request and wait for response */
1646 	rsp = pipeline_msg_send_recv(p, req);
1647 	if (rsp == NULL) {
1648 		table_rule_list_free(list);
1649 		return -ENOMEM;
1650 	}
1651 
1652 	/* Read response */
1653 	status = rsp->status;
1654 	if (status == 0) {
1655 		*n_rules_added = rsp->table_rule_add_bulk.n_rules;
1656 
1657 		table_rule_add_bulk(table, list, *n_rules_added);
1658 		*n_rules_not_added = table_rule_list_free(list);
1659 	} else
1660 		table_rule_list_free(list);
1661 
1662 
1663 	/* Free response */
1664 	pipeline_msg_free(rsp);
1665 
1666 	return status;
1667 }
1668 
1669 int
1670 pipeline_table_rule_delete(const char *pipeline_name,
1671 	uint32_t table_id,
1672 	struct table_rule_match *match)
1673 {
1674 	struct pipeline *p;
1675 	struct table *table;
1676 	struct pipeline_msg_req *req;
1677 	struct pipeline_msg_rsp *rsp;
1678 	int status;
1679 
1680 	/* Check input params */
1681 	if ((pipeline_name == NULL) ||
1682 		(match == NULL))
1683 		return -1;
1684 
1685 	p = pipeline_find(pipeline_name);
1686 	if ((p == NULL) ||
1687 		(table_id >= p->n_tables) ||
1688 		match_check(match, p, table_id))
1689 		return -1;
1690 
1691 	table = &p->table[table_id];
1692 
1693 	if (!pipeline_is_running(p)) {
1694 		union table_rule_match_low_level match_ll;
1695 		int key_found;
1696 
1697 		status = match_convert(match, &match_ll, 0);
1698 		if (status)
1699 			return -1;
1700 
1701 		status = rte_pipeline_table_entry_delete(p->p,
1702 				table_id,
1703 				&match_ll,
1704 				&key_found,
1705 				NULL);
1706 
1707 		if (status == 0)
1708 			table_rule_delete(table, match);
1709 
1710 		return status;
1711 	}
1712 
1713 	/* Allocate request */
1714 	req = pipeline_msg_alloc();
1715 	if (req == NULL)
1716 		return -1;
1717 
1718 	/* Write request */
1719 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE;
1720 	req->id = table_id;
1721 	memcpy(&req->table_rule_delete.match, match, sizeof(*match));
1722 
1723 	/* Send request and wait for response */
1724 	rsp = pipeline_msg_send_recv(p, req);
1725 	if (rsp == NULL)
1726 		return -1;
1727 
1728 	/* Read response */
1729 	status = rsp->status;
1730 	if (status == 0)
1731 		table_rule_delete(table, match);
1732 
1733 	/* Free response */
1734 	pipeline_msg_free(rsp);
1735 
1736 	return status;
1737 }
1738 
1739 int
1740 pipeline_table_rule_delete_default(const char *pipeline_name,
1741 	uint32_t table_id)
1742 {
1743 	struct pipeline *p;
1744 	struct table *table;
1745 	struct pipeline_msg_req *req;
1746 	struct pipeline_msg_rsp *rsp;
1747 	int status;
1748 
1749 	/* Check input params */
1750 	if (pipeline_name == NULL)
1751 		return -1;
1752 
1753 	p = pipeline_find(pipeline_name);
1754 	if ((p == NULL) ||
1755 		(table_id >= p->n_tables))
1756 		return -1;
1757 
1758 	table = &p->table[table_id];
1759 
1760 	if (!pipeline_is_running(p)) {
1761 		status = rte_pipeline_table_default_entry_delete(p->p,
1762 			table_id,
1763 			NULL);
1764 
1765 		if (status == 0)
1766 			table_rule_default_delete(table);
1767 
1768 		return status;
1769 	}
1770 
1771 	/* Allocate request */
1772 	req = pipeline_msg_alloc();
1773 	if (req == NULL)
1774 		return -1;
1775 
1776 	/* Write request */
1777 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT;
1778 	req->id = table_id;
1779 
1780 	/* Send request and wait for response */
1781 	rsp = pipeline_msg_send_recv(p, req);
1782 	if (rsp == NULL)
1783 		return -1;
1784 
1785 	/* Read response */
1786 	status = rsp->status;
1787 	if (status == 0)
1788 		table_rule_default_delete(table);
1789 
1790 	/* Free response */
1791 	pipeline_msg_free(rsp);
1792 
1793 	return status;
1794 }
1795 
1796 int
1797 pipeline_table_rule_stats_read(const char *pipeline_name,
1798 	uint32_t table_id,
1799 	struct table_rule_match *match,
1800 	struct rte_table_action_stats_counters *stats,
1801 	int clear)
1802 {
1803 	struct pipeline *p;
1804 	struct table *table;
1805 	struct pipeline_msg_req *req;
1806 	struct pipeline_msg_rsp *rsp;
1807 	struct table_rule *rule;
1808 	int status;
1809 
1810 	/* Check input params */
1811 	if ((pipeline_name == NULL) ||
1812 		(match == NULL) ||
1813 		(stats == NULL))
1814 		return -1;
1815 
1816 	p = pipeline_find(pipeline_name);
1817 	if ((p == NULL) ||
1818 		(table_id >= p->n_tables) ||
1819 		match_check(match, p, table_id))
1820 		return -1;
1821 
1822 	table = &p->table[table_id];
1823 	rule = table_rule_find(table, match);
1824 	if (rule == NULL)
1825 		return -1;
1826 
1827 	if (!pipeline_is_running(p)) {
1828 		status = rte_table_action_stats_read(table->a,
1829 			rule->data,
1830 			stats,
1831 			clear);
1832 
1833 		return status;
1834 	}
1835 
1836 	/* Allocate request */
1837 	req = pipeline_msg_alloc();
1838 	if (req == NULL)
1839 		return -1;
1840 
1841 	/* Write request */
1842 	req->type = PIPELINE_REQ_TABLE_RULE_STATS_READ;
1843 	req->id = table_id;
1844 	req->table_rule_stats_read.data = rule->data;
1845 	req->table_rule_stats_read.clear = clear;
1846 
1847 	/* Send request and wait for response */
1848 	rsp = pipeline_msg_send_recv(p, req);
1849 	if (rsp == NULL)
1850 		return -1;
1851 
1852 	/* Read response */
1853 	status = rsp->status;
1854 	if (status == 0)
1855 		memcpy(stats, &rsp->table_rule_stats_read.stats, sizeof(*stats));
1856 
1857 	/* Free response */
1858 	pipeline_msg_free(rsp);
1859 
1860 	return status;
1861 }
1862 
1863 int
1864 pipeline_table_mtr_profile_add(const char *pipeline_name,
1865 	uint32_t table_id,
1866 	uint32_t meter_profile_id,
1867 	struct rte_table_action_meter_profile *profile)
1868 {
1869 	struct pipeline *p;
1870 	struct pipeline_msg_req *req;
1871 	struct pipeline_msg_rsp *rsp;
1872 	int status;
1873 
1874 	/* Check input params */
1875 	if ((pipeline_name == NULL) ||
1876 		(profile == NULL))
1877 		return -1;
1878 
1879 	p = pipeline_find(pipeline_name);
1880 	if ((p == NULL) ||
1881 		(table_id >= p->n_tables))
1882 		return -1;
1883 
1884 	if (!pipeline_is_running(p)) {
1885 		struct rte_table_action *a = p->table[table_id].a;
1886 
1887 		status = rte_table_action_meter_profile_add(a,
1888 			meter_profile_id,
1889 			profile);
1890 
1891 		return status;
1892 	}
1893 
1894 	/* Allocate request */
1895 	req = pipeline_msg_alloc();
1896 	if (req == NULL)
1897 		return -1;
1898 
1899 	/* Write request */
1900 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_ADD;
1901 	req->id = table_id;
1902 	req->table_mtr_profile_add.meter_profile_id = meter_profile_id;
1903 	memcpy(&req->table_mtr_profile_add.profile, profile, sizeof(*profile));
1904 
1905 	/* Send request and wait for response */
1906 	rsp = pipeline_msg_send_recv(p, req);
1907 	if (rsp == NULL)
1908 		return -1;
1909 
1910 	/* Read response */
1911 	status = rsp->status;
1912 
1913 	/* Free response */
1914 	pipeline_msg_free(rsp);
1915 
1916 	return status;
1917 }
1918 
1919 int
1920 pipeline_table_mtr_profile_delete(const char *pipeline_name,
1921 	uint32_t table_id,
1922 	uint32_t meter_profile_id)
1923 {
1924 	struct pipeline *p;
1925 	struct pipeline_msg_req *req;
1926 	struct pipeline_msg_rsp *rsp;
1927 	int status;
1928 
1929 	/* Check input params */
1930 	if (pipeline_name == NULL)
1931 		return -1;
1932 
1933 	p = pipeline_find(pipeline_name);
1934 	if ((p == NULL) ||
1935 		(table_id >= p->n_tables))
1936 		return -1;
1937 
1938 	if (!pipeline_is_running(p)) {
1939 		struct rte_table_action *a = p->table[table_id].a;
1940 
1941 		status = rte_table_action_meter_profile_delete(a,
1942 				meter_profile_id);
1943 
1944 		return status;
1945 	}
1946 
1947 	/* Allocate request */
1948 	req = pipeline_msg_alloc();
1949 	if (req == NULL)
1950 		return -1;
1951 
1952 	/* Write request */
1953 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE;
1954 	req->id = table_id;
1955 	req->table_mtr_profile_delete.meter_profile_id = meter_profile_id;
1956 
1957 	/* Send request and wait for response */
1958 	rsp = pipeline_msg_send_recv(p, req);
1959 	if (rsp == NULL)
1960 		return -1;
1961 
1962 	/* Read response */
1963 	status = rsp->status;
1964 
1965 	/* Free response */
1966 	pipeline_msg_free(rsp);
1967 
1968 	return status;
1969 }
1970 
1971 int
1972 pipeline_table_rule_mtr_read(const char *pipeline_name,
1973 	uint32_t table_id,
1974 	void *data,
1975 	uint32_t tc_mask,
1976 	struct rte_table_action_mtr_counters *stats,
1977 	int clear)
1978 {
1979 	struct pipeline *p;
1980 	struct pipeline_msg_req *req;
1981 	struct pipeline_msg_rsp *rsp;
1982 	int status;
1983 
1984 	/* Check input params */
1985 	if ((pipeline_name == NULL) ||
1986 		(data == NULL) ||
1987 		(stats == NULL))
1988 		return -1;
1989 
1990 	p = pipeline_find(pipeline_name);
1991 	if ((p == NULL) ||
1992 		(table_id >= p->n_tables))
1993 		return -1;
1994 
1995 	if (!pipeline_is_running(p)) {
1996 		struct rte_table_action *a = p->table[table_id].a;
1997 
1998 		status = rte_table_action_meter_read(a,
1999 				data,
2000 				tc_mask,
2001 				stats,
2002 				clear);
2003 
2004 		return status;
2005 	}
2006 
2007 	/* Allocate request */
2008 	req = pipeline_msg_alloc();
2009 	if (req == NULL)
2010 		return -1;
2011 
2012 	/* Write request */
2013 	req->type = PIPELINE_REQ_TABLE_RULE_MTR_READ;
2014 	req->id = table_id;
2015 	req->table_rule_mtr_read.data = data;
2016 	req->table_rule_mtr_read.tc_mask = tc_mask;
2017 	req->table_rule_mtr_read.clear = clear;
2018 
2019 	/* Send request and wait for response */
2020 	rsp = pipeline_msg_send_recv(p, req);
2021 	if (rsp == NULL)
2022 		return -1;
2023 
2024 	/* Read response */
2025 	status = rsp->status;
2026 	if (status)
2027 		memcpy(stats, &rsp->table_rule_mtr_read.stats, sizeof(*stats));
2028 
2029 	/* Free response */
2030 	pipeline_msg_free(rsp);
2031 
2032 	return status;
2033 }
2034 
2035 int
2036 pipeline_table_dscp_table_update(const char *pipeline_name,
2037 	uint32_t table_id,
2038 	uint64_t dscp_mask,
2039 	struct rte_table_action_dscp_table *dscp_table)
2040 {
2041 	struct pipeline *p;
2042 	struct pipeline_msg_req *req;
2043 	struct pipeline_msg_rsp *rsp;
2044 	int status;
2045 
2046 	/* Check input params */
2047 	if ((pipeline_name == NULL) ||
2048 		(dscp_table == NULL))
2049 		return -1;
2050 
2051 	p = pipeline_find(pipeline_name);
2052 	if ((p == NULL) ||
2053 		(table_id >= p->n_tables))
2054 		return -1;
2055 
2056 	if (!pipeline_is_running(p)) {
2057 		struct rte_table_action *a = p->table[table_id].a;
2058 
2059 		status = rte_table_action_dscp_table_update(a,
2060 				dscp_mask,
2061 				dscp_table);
2062 
2063 		return status;
2064 	}
2065 
2066 	/* Allocate request */
2067 	req = pipeline_msg_alloc();
2068 	if (req == NULL)
2069 		return -1;
2070 
2071 	/* Write request */
2072 	req->type = PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE;
2073 	req->id = table_id;
2074 	req->table_dscp_table_update.dscp_mask = dscp_mask;
2075 	memcpy(&req->table_dscp_table_update.dscp_table,
2076 		dscp_table, sizeof(*dscp_table));
2077 
2078 	/* Send request and wait for response */
2079 	rsp = pipeline_msg_send_recv(p, req);
2080 	if (rsp == NULL)
2081 		return -1;
2082 
2083 	/* Read response */
2084 	status = rsp->status;
2085 
2086 	/* Free response */
2087 	pipeline_msg_free(rsp);
2088 
2089 	return status;
2090 }
2091 
2092 int
2093 pipeline_table_rule_ttl_read(const char *pipeline_name,
2094 	uint32_t table_id,
2095 	void *data,
2096 	struct rte_table_action_ttl_counters *stats,
2097 	int clear)
2098 {
2099 	struct pipeline *p;
2100 	struct pipeline_msg_req *req;
2101 	struct pipeline_msg_rsp *rsp;
2102 	int status;
2103 
2104 	/* Check input params */
2105 	if ((pipeline_name == NULL) ||
2106 		(data == NULL) ||
2107 		(stats == NULL))
2108 		return -1;
2109 
2110 	p = pipeline_find(pipeline_name);
2111 	if ((p == NULL) ||
2112 		(table_id >= p->n_tables))
2113 		return -1;
2114 
2115 	if (!pipeline_is_running(p)) {
2116 		struct rte_table_action *a = p->table[table_id].a;
2117 
2118 		status = rte_table_action_ttl_read(a,
2119 				data,
2120 				stats,
2121 				clear);
2122 
2123 		return status;
2124 	}
2125 
2126 	/* Allocate request */
2127 	req = pipeline_msg_alloc();
2128 	if (req == NULL)
2129 		return -1;
2130 
2131 	/* Write request */
2132 	req->type = PIPELINE_REQ_TABLE_RULE_TTL_READ;
2133 	req->id = table_id;
2134 	req->table_rule_ttl_read.data = data;
2135 	req->table_rule_ttl_read.clear = clear;
2136 
2137 	/* Send request and wait for response */
2138 	rsp = pipeline_msg_send_recv(p, req);
2139 	if (rsp == NULL)
2140 		return -1;
2141 
2142 	/* Read response */
2143 	status = rsp->status;
2144 	if (status)
2145 		memcpy(stats, &rsp->table_rule_ttl_read.stats, sizeof(*stats));
2146 
2147 	/* Free response */
2148 	pipeline_msg_free(rsp);
2149 
2150 	return status;
2151 }
2152 
2153 /**
2154  * Data plane threads: message handling
2155  */
2156 static inline struct pipeline_msg_req *
2157 pipeline_msg_recv(struct rte_ring *msgq_req)
2158 {
2159 	struct pipeline_msg_req *req;
2160 
2161 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
2162 
2163 	if (status != 0)
2164 		return NULL;
2165 
2166 	return req;
2167 }
2168 
2169 static inline void
2170 pipeline_msg_send(struct rte_ring *msgq_rsp,
2171 	struct pipeline_msg_rsp *rsp)
2172 {
2173 	int status;
2174 
2175 	do {
2176 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
2177 	} while (status == -ENOBUFS);
2178 }
2179 
2180 static struct pipeline_msg_rsp *
2181 pipeline_msg_handle_port_in_stats_read(struct pipeline_data *p,
2182 	struct pipeline_msg_req *req)
2183 {
2184 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2185 	uint32_t port_id = req->id;
2186 	int clear = req->port_in_stats_read.clear;
2187 
2188 	rsp->status = rte_pipeline_port_in_stats_read(p->p,
2189 		port_id,
2190 		&rsp->port_in_stats_read.stats,
2191 		clear);
2192 
2193 	return rsp;
2194 }
2195 
2196 static struct pipeline_msg_rsp *
2197 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
2198 	struct pipeline_msg_req *req)
2199 {
2200 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2201 	uint32_t port_id = req->id;
2202 
2203 	rsp->status = rte_pipeline_port_in_enable(p->p,
2204 		port_id);
2205 
2206 	return rsp;
2207 }
2208 
2209 static struct pipeline_msg_rsp *
2210 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
2211 	struct pipeline_msg_req *req)
2212 {
2213 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2214 	uint32_t port_id = req->id;
2215 
2216 	rsp->status = rte_pipeline_port_in_disable(p->p,
2217 		port_id);
2218 
2219 	return rsp;
2220 }
2221 
2222 static struct pipeline_msg_rsp *
2223 pipeline_msg_handle_port_out_stats_read(struct pipeline_data *p,
2224 	struct pipeline_msg_req *req)
2225 {
2226 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2227 	uint32_t port_id = req->id;
2228 	int clear = req->port_out_stats_read.clear;
2229 
2230 	rsp->status = rte_pipeline_port_out_stats_read(p->p,
2231 		port_id,
2232 		&rsp->port_out_stats_read.stats,
2233 		clear);
2234 
2235 	return rsp;
2236 }
2237 
2238 static struct pipeline_msg_rsp *
2239 pipeline_msg_handle_table_stats_read(struct pipeline_data *p,
2240 	struct pipeline_msg_req *req)
2241 {
2242 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2243 	uint32_t port_id = req->id;
2244 	int clear = req->table_stats_read.clear;
2245 
2246 	rsp->status = rte_pipeline_table_stats_read(p->p,
2247 		port_id,
2248 		&rsp->table_stats_read.stats,
2249 		clear);
2250 
2251 	return rsp;
2252 }
2253 
2254 static int
2255 match_convert_ipv6_depth(uint32_t depth, uint32_t *depth32)
2256 {
2257 	if (depth > 128)
2258 		return -1;
2259 
2260 	switch (depth / 32) {
2261 	case 0:
2262 		depth32[0] = depth;
2263 		depth32[1] = 0;
2264 		depth32[2] = 0;
2265 		depth32[3] = 0;
2266 		return 0;
2267 
2268 	case 1:
2269 		depth32[0] = 32;
2270 		depth32[1] = depth - 32;
2271 		depth32[2] = 0;
2272 		depth32[3] = 0;
2273 		return 0;
2274 
2275 	case 2:
2276 		depth32[0] = 32;
2277 		depth32[1] = 32;
2278 		depth32[2] = depth - 64;
2279 		depth32[3] = 0;
2280 		return 0;
2281 
2282 	case 3:
2283 		depth32[0] = 32;
2284 		depth32[1] = 32;
2285 		depth32[2] = 32;
2286 		depth32[3] = depth - 96;
2287 		return 0;
2288 
2289 	case 4:
2290 		depth32[0] = 32;
2291 		depth32[1] = 32;
2292 		depth32[2] = 32;
2293 		depth32[3] = 32;
2294 		return 0;
2295 
2296 	default:
2297 		return -1;
2298 	}
2299 }
2300 
2301 static int
2302 match_convert(struct table_rule_match *mh,
2303 	union table_rule_match_low_level *ml,
2304 	int add)
2305 {
2306 	memset(ml, 0, sizeof(*ml));
2307 
2308 	switch (mh->match_type) {
2309 	case TABLE_ACL:
2310 		if (mh->match.acl.ip_version)
2311 			if (add) {
2312 				ml->acl_add.field_value[0].value.u8 =
2313 					mh->match.acl.proto;
2314 				ml->acl_add.field_value[0].mask_range.u8 =
2315 					mh->match.acl.proto_mask;
2316 
2317 				ml->acl_add.field_value[1].value.u32 =
2318 					mh->match.acl.ipv4.sa;
2319 				ml->acl_add.field_value[1].mask_range.u32 =
2320 					mh->match.acl.sa_depth;
2321 
2322 				ml->acl_add.field_value[2].value.u32 =
2323 					mh->match.acl.ipv4.da;
2324 				ml->acl_add.field_value[2].mask_range.u32 =
2325 					mh->match.acl.da_depth;
2326 
2327 				ml->acl_add.field_value[3].value.u16 =
2328 					mh->match.acl.sp0;
2329 				ml->acl_add.field_value[3].mask_range.u16 =
2330 					mh->match.acl.sp1;
2331 
2332 				ml->acl_add.field_value[4].value.u16 =
2333 					mh->match.acl.dp0;
2334 				ml->acl_add.field_value[4].mask_range.u16 =
2335 					mh->match.acl.dp1;
2336 
2337 				ml->acl_add.priority =
2338 					(int32_t) mh->match.acl.priority;
2339 			} else {
2340 				ml->acl_delete.field_value[0].value.u8 =
2341 					mh->match.acl.proto;
2342 				ml->acl_delete.field_value[0].mask_range.u8 =
2343 					mh->match.acl.proto_mask;
2344 
2345 				ml->acl_delete.field_value[1].value.u32 =
2346 					mh->match.acl.ipv4.sa;
2347 				ml->acl_delete.field_value[1].mask_range.u32 =
2348 					mh->match.acl.sa_depth;
2349 
2350 				ml->acl_delete.field_value[2].value.u32 =
2351 					mh->match.acl.ipv4.da;
2352 				ml->acl_delete.field_value[2].mask_range.u32 =
2353 					mh->match.acl.da_depth;
2354 
2355 				ml->acl_delete.field_value[3].value.u16 =
2356 					mh->match.acl.sp0;
2357 				ml->acl_delete.field_value[3].mask_range.u16 =
2358 					mh->match.acl.sp1;
2359 
2360 				ml->acl_delete.field_value[4].value.u16 =
2361 					mh->match.acl.dp0;
2362 				ml->acl_delete.field_value[4].mask_range.u16 =
2363 					mh->match.acl.dp1;
2364 			}
2365 		else
2366 			if (add) {
2367 				uint32_t *sa32 =
2368 					(uint32_t *) mh->match.acl.ipv6.sa;
2369 				uint32_t *da32 =
2370 					(uint32_t *) mh->match.acl.ipv6.da;
2371 				uint32_t sa32_depth[4], da32_depth[4];
2372 				int status;
2373 
2374 				status = match_convert_ipv6_depth(
2375 					mh->match.acl.sa_depth,
2376 					sa32_depth);
2377 				if (status)
2378 					return status;
2379 
2380 				status = match_convert_ipv6_depth(
2381 					mh->match.acl.da_depth,
2382 					da32_depth);
2383 				if (status)
2384 					return status;
2385 
2386 				ml->acl_add.field_value[0].value.u8 =
2387 					mh->match.acl.proto;
2388 				ml->acl_add.field_value[0].mask_range.u8 =
2389 					mh->match.acl.proto_mask;
2390 
2391 				ml->acl_add.field_value[1].value.u32 =
2392 					rte_be_to_cpu_32(sa32[0]);
2393 				ml->acl_add.field_value[1].mask_range.u32 =
2394 					sa32_depth[0];
2395 				ml->acl_add.field_value[2].value.u32 =
2396 					rte_be_to_cpu_32(sa32[1]);
2397 				ml->acl_add.field_value[2].mask_range.u32 =
2398 					sa32_depth[1];
2399 				ml->acl_add.field_value[3].value.u32 =
2400 					rte_be_to_cpu_32(sa32[2]);
2401 				ml->acl_add.field_value[3].mask_range.u32 =
2402 					sa32_depth[2];
2403 				ml->acl_add.field_value[4].value.u32 =
2404 					rte_be_to_cpu_32(sa32[3]);
2405 				ml->acl_add.field_value[4].mask_range.u32 =
2406 					sa32_depth[3];
2407 
2408 				ml->acl_add.field_value[5].value.u32 =
2409 					rte_be_to_cpu_32(da32[0]);
2410 				ml->acl_add.field_value[5].mask_range.u32 =
2411 					da32_depth[0];
2412 				ml->acl_add.field_value[6].value.u32 =
2413 					rte_be_to_cpu_32(da32[1]);
2414 				ml->acl_add.field_value[6].mask_range.u32 =
2415 					da32_depth[1];
2416 				ml->acl_add.field_value[7].value.u32 =
2417 					rte_be_to_cpu_32(da32[2]);
2418 				ml->acl_add.field_value[7].mask_range.u32 =
2419 					da32_depth[2];
2420 				ml->acl_add.field_value[8].value.u32 =
2421 					rte_be_to_cpu_32(da32[3]);
2422 				ml->acl_add.field_value[8].mask_range.u32 =
2423 					da32_depth[3];
2424 
2425 				ml->acl_add.field_value[9].value.u16 =
2426 					mh->match.acl.sp0;
2427 				ml->acl_add.field_value[9].mask_range.u16 =
2428 					mh->match.acl.sp1;
2429 
2430 				ml->acl_add.field_value[10].value.u16 =
2431 					mh->match.acl.dp0;
2432 				ml->acl_add.field_value[10].mask_range.u16 =
2433 					mh->match.acl.dp1;
2434 
2435 				ml->acl_add.priority =
2436 					(int32_t) mh->match.acl.priority;
2437 			} else {
2438 				uint32_t *sa32 =
2439 					(uint32_t *) mh->match.acl.ipv6.sa;
2440 				uint32_t *da32 =
2441 					(uint32_t *) mh->match.acl.ipv6.da;
2442 				uint32_t sa32_depth[4], da32_depth[4];
2443 				int status;
2444 
2445 				status = match_convert_ipv6_depth(
2446 					mh->match.acl.sa_depth,
2447 					sa32_depth);
2448 				if (status)
2449 					return status;
2450 
2451 				status = match_convert_ipv6_depth(
2452 					mh->match.acl.da_depth,
2453 					da32_depth);
2454 				if (status)
2455 					return status;
2456 
2457 				ml->acl_delete.field_value[0].value.u8 =
2458 					mh->match.acl.proto;
2459 				ml->acl_delete.field_value[0].mask_range.u8 =
2460 					mh->match.acl.proto_mask;
2461 
2462 				ml->acl_delete.field_value[1].value.u32 =
2463 					rte_be_to_cpu_32(sa32[0]);
2464 				ml->acl_delete.field_value[1].mask_range.u32 =
2465 					sa32_depth[0];
2466 				ml->acl_delete.field_value[2].value.u32 =
2467 					rte_be_to_cpu_32(sa32[1]);
2468 				ml->acl_delete.field_value[2].mask_range.u32 =
2469 					sa32_depth[1];
2470 				ml->acl_delete.field_value[3].value.u32 =
2471 					rte_be_to_cpu_32(sa32[2]);
2472 				ml->acl_delete.field_value[3].mask_range.u32 =
2473 					sa32_depth[2];
2474 				ml->acl_delete.field_value[4].value.u32 =
2475 					rte_be_to_cpu_32(sa32[3]);
2476 				ml->acl_delete.field_value[4].mask_range.u32 =
2477 					sa32_depth[3];
2478 
2479 				ml->acl_delete.field_value[5].value.u32 =
2480 					rte_be_to_cpu_32(da32[0]);
2481 				ml->acl_delete.field_value[5].mask_range.u32 =
2482 					da32_depth[0];
2483 				ml->acl_delete.field_value[6].value.u32 =
2484 					rte_be_to_cpu_32(da32[1]);
2485 				ml->acl_delete.field_value[6].mask_range.u32 =
2486 					da32_depth[1];
2487 				ml->acl_delete.field_value[7].value.u32 =
2488 					rte_be_to_cpu_32(da32[2]);
2489 				ml->acl_delete.field_value[7].mask_range.u32 =
2490 					da32_depth[2];
2491 				ml->acl_delete.field_value[8].value.u32 =
2492 					rte_be_to_cpu_32(da32[3]);
2493 				ml->acl_delete.field_value[8].mask_range.u32 =
2494 					da32_depth[3];
2495 
2496 				ml->acl_delete.field_value[9].value.u16 =
2497 					mh->match.acl.sp0;
2498 				ml->acl_delete.field_value[9].mask_range.u16 =
2499 					mh->match.acl.sp1;
2500 
2501 				ml->acl_delete.field_value[10].value.u16 =
2502 					mh->match.acl.dp0;
2503 				ml->acl_delete.field_value[10].mask_range.u16 =
2504 					mh->match.acl.dp1;
2505 			}
2506 		return 0;
2507 
2508 	case TABLE_ARRAY:
2509 		ml->array.pos = mh->match.array.pos;
2510 		return 0;
2511 
2512 	case TABLE_HASH:
2513 		memcpy(ml->hash, mh->match.hash.key, sizeof(ml->hash));
2514 		return 0;
2515 
2516 	case TABLE_LPM:
2517 		if (mh->match.lpm.ip_version) {
2518 			ml->lpm_ipv4.ip = mh->match.lpm.ipv4;
2519 			ml->lpm_ipv4.depth = mh->match.lpm.depth;
2520 		} else {
2521 			memcpy(ml->lpm_ipv6.ip,
2522 				mh->match.lpm.ipv6, sizeof(ml->lpm_ipv6.ip));
2523 			ml->lpm_ipv6.depth = mh->match.lpm.depth;
2524 		}
2525 
2526 		return 0;
2527 
2528 	default:
2529 		return -1;
2530 	}
2531 }
2532 
2533 static int
2534 action_convert(struct rte_table_action *a,
2535 	struct table_rule_action *action,
2536 	struct rte_pipeline_table_entry *data)
2537 {
2538 	int status;
2539 
2540 	/* Apply actions */
2541 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
2542 		status = rte_table_action_apply(a,
2543 			data,
2544 			RTE_TABLE_ACTION_FWD,
2545 			&action->fwd);
2546 
2547 		if (status)
2548 			return status;
2549 	}
2550 
2551 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_LB)) {
2552 		status = rte_table_action_apply(a,
2553 			data,
2554 			RTE_TABLE_ACTION_LB,
2555 			&action->lb);
2556 
2557 		if (status)
2558 			return status;
2559 	}
2560 
2561 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
2562 		status = rte_table_action_apply(a,
2563 			data,
2564 			RTE_TABLE_ACTION_MTR,
2565 			&action->mtr);
2566 
2567 		if (status)
2568 			return status;
2569 	}
2570 
2571 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
2572 		status = rte_table_action_apply(a,
2573 			data,
2574 			RTE_TABLE_ACTION_TM,
2575 			&action->tm);
2576 
2577 		if (status)
2578 			return status;
2579 	}
2580 
2581 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
2582 		status = rte_table_action_apply(a,
2583 			data,
2584 			RTE_TABLE_ACTION_ENCAP,
2585 			&action->encap);
2586 
2587 		if (status)
2588 			return status;
2589 	}
2590 
2591 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
2592 		status = rte_table_action_apply(a,
2593 			data,
2594 			RTE_TABLE_ACTION_NAT,
2595 			&action->nat);
2596 
2597 		if (status)
2598 			return status;
2599 	}
2600 
2601 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TTL)) {
2602 		status = rte_table_action_apply(a,
2603 			data,
2604 			RTE_TABLE_ACTION_TTL,
2605 			&action->ttl);
2606 
2607 		if (status)
2608 			return status;
2609 	}
2610 
2611 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_STATS)) {
2612 		status = rte_table_action_apply(a,
2613 			data,
2614 			RTE_TABLE_ACTION_STATS,
2615 			&action->stats);
2616 
2617 		if (status)
2618 			return status;
2619 	}
2620 
2621 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TIME)) {
2622 		status = rte_table_action_apply(a,
2623 			data,
2624 			RTE_TABLE_ACTION_TIME,
2625 			&action->time);
2626 
2627 		if (status)
2628 			return status;
2629 	}
2630 
2631 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_SYM_CRYPTO)) {
2632 		status = rte_table_action_apply(a,
2633 			data,
2634 			RTE_TABLE_ACTION_SYM_CRYPTO,
2635 			&action->sym_crypto);
2636 
2637 		if (status)
2638 			return status;
2639 	}
2640 
2641 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TAG)) {
2642 		status = rte_table_action_apply(a,
2643 			data,
2644 			RTE_TABLE_ACTION_TAG,
2645 			&action->tag);
2646 
2647 		if (status)
2648 			return status;
2649 	}
2650 
2651 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_DECAP)) {
2652 		status = rte_table_action_apply(a,
2653 			data,
2654 			RTE_TABLE_ACTION_DECAP,
2655 			&action->decap);
2656 
2657 		if (status)
2658 			return status;
2659 	}
2660 
2661 	return 0;
2662 }
2663 
2664 static struct pipeline_msg_rsp *
2665 pipeline_msg_handle_table_rule_add(struct pipeline_data *p,
2666 	struct pipeline_msg_req *req)
2667 {
2668 	union table_rule_match_low_level match_ll;
2669 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2670 	struct table_rule_match *match = &req->table_rule_add.match;
2671 	struct table_rule_action *action = &req->table_rule_add.action;
2672 	struct rte_pipeline_table_entry *data_in, *data_out;
2673 	uint32_t table_id = req->id;
2674 	int key_found, status;
2675 	struct rte_table_action *a = p->table_data[table_id].a;
2676 
2677 	/* Apply actions */
2678 	memset(p->buffer, 0, sizeof(p->buffer));
2679 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2680 
2681 	status = match_convert(match, &match_ll, 1);
2682 	if (status) {
2683 		rsp->status = -1;
2684 		return rsp;
2685 	}
2686 
2687 	status = action_convert(a, action, data_in);
2688 	if (status) {
2689 		rsp->status = -1;
2690 		return rsp;
2691 	}
2692 
2693 	status = rte_pipeline_table_entry_add(p->p,
2694 		table_id,
2695 		&match_ll,
2696 		data_in,
2697 		&key_found,
2698 		&data_out);
2699 	if (status) {
2700 		rsp->status = -1;
2701 		return rsp;
2702 	}
2703 
2704 	/* Write response */
2705 	rsp->status = 0;
2706 	rsp->table_rule_add.data = data_out;
2707 
2708 	return rsp;
2709 }
2710 
2711 static struct pipeline_msg_rsp *
2712 pipeline_msg_handle_table_rule_add_default(struct pipeline_data *p,
2713 	struct pipeline_msg_req *req)
2714 {
2715 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2716 	struct table_rule_action *action = &req->table_rule_add_default.action;
2717 	struct rte_pipeline_table_entry *data_in, *data_out;
2718 	uint32_t table_id = req->id;
2719 	int status;
2720 
2721 	/* Apply actions */
2722 	memset(p->buffer, 0, sizeof(p->buffer));
2723 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2724 
2725 	data_in->action = action->fwd.action;
2726 	if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
2727 		data_in->port_id = action->fwd.id;
2728 	if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
2729 		data_in->table_id = action->fwd.id;
2730 
2731 	/* Add default rule to table */
2732 	status = rte_pipeline_table_default_entry_add(p->p,
2733 		table_id,
2734 		data_in,
2735 		&data_out);
2736 	if (status) {
2737 		rsp->status = -1;
2738 		return rsp;
2739 	}
2740 
2741 	/* Write response */
2742 	rsp->status = 0;
2743 	rsp->table_rule_add_default.data = data_out;
2744 
2745 	return rsp;
2746 }
2747 
2748 static struct pipeline_msg_rsp *
2749 pipeline_msg_handle_table_rule_add_bulk(struct pipeline_data *p,
2750 	struct pipeline_msg_req *req)
2751 {
2752 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2753 
2754 	uint32_t table_id = req->id;
2755 	struct table_rule_list *list = req->table_rule_add_bulk.list;
2756 	uint32_t bulk = req->table_rule_add_bulk.bulk;
2757 
2758 	uint32_t n_rules_added;
2759 	int status;
2760 
2761 	struct table_ll table_ll = {
2762 		.p = p->p,
2763 		.table_id = table_id,
2764 		.a = p->table_data[table_id].a,
2765 		.bulk_supported = bulk,
2766 	};
2767 
2768 	status = table_rule_add_bulk_ll(&table_ll, list, &n_rules_added);
2769 	if (status) {
2770 		rsp->status = -1;
2771 		rsp->table_rule_add_bulk.n_rules = 0;
2772 		return rsp;
2773 	}
2774 
2775 	/* Write response */
2776 	rsp->status = 0;
2777 	rsp->table_rule_add_bulk.n_rules = n_rules_added;
2778 	return rsp;
2779 }
2780 
2781 static struct pipeline_msg_rsp *
2782 pipeline_msg_handle_table_rule_delete(struct pipeline_data *p,
2783 	struct pipeline_msg_req *req)
2784 {
2785 	union table_rule_match_low_level match_ll;
2786 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2787 	struct table_rule_match *match = &req->table_rule_delete.match;
2788 	uint32_t table_id = req->id;
2789 	int key_found, status;
2790 
2791 	status = match_convert(match, &match_ll, 0);
2792 	if (status) {
2793 		rsp->status = -1;
2794 		return rsp;
2795 	}
2796 
2797 	rsp->status = rte_pipeline_table_entry_delete(p->p,
2798 		table_id,
2799 		&match_ll,
2800 		&key_found,
2801 		NULL);
2802 
2803 	return rsp;
2804 }
2805 
2806 static struct pipeline_msg_rsp *
2807 pipeline_msg_handle_table_rule_delete_default(struct pipeline_data *p,
2808 	struct pipeline_msg_req *req)
2809 {
2810 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2811 	uint32_t table_id = req->id;
2812 
2813 	rsp->status = rte_pipeline_table_default_entry_delete(p->p,
2814 		table_id,
2815 		NULL);
2816 
2817 	return rsp;
2818 }
2819 
2820 static struct pipeline_msg_rsp *
2821 pipeline_msg_handle_table_rule_stats_read(struct pipeline_data *p,
2822 	struct pipeline_msg_req *req)
2823 {
2824 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2825 	uint32_t table_id = req->id;
2826 	void *data = req->table_rule_stats_read.data;
2827 	int clear = req->table_rule_stats_read.clear;
2828 	struct rte_table_action *a = p->table_data[table_id].a;
2829 
2830 	rsp->status = rte_table_action_stats_read(a,
2831 		data,
2832 		&rsp->table_rule_stats_read.stats,
2833 		clear);
2834 
2835 	return rsp;
2836 }
2837 
2838 static struct pipeline_msg_rsp *
2839 pipeline_msg_handle_table_mtr_profile_add(struct pipeline_data *p,
2840 	struct pipeline_msg_req *req)
2841 {
2842 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2843 	uint32_t table_id = req->id;
2844 	uint32_t meter_profile_id = req->table_mtr_profile_add.meter_profile_id;
2845 	struct rte_table_action_meter_profile *profile =
2846 		&req->table_mtr_profile_add.profile;
2847 	struct rte_table_action *a = p->table_data[table_id].a;
2848 
2849 	rsp->status = rte_table_action_meter_profile_add(a,
2850 		meter_profile_id,
2851 		profile);
2852 
2853 	return rsp;
2854 }
2855 
2856 static struct pipeline_msg_rsp *
2857 pipeline_msg_handle_table_mtr_profile_delete(struct pipeline_data *p,
2858 	struct pipeline_msg_req *req)
2859 {
2860 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2861 	uint32_t table_id = req->id;
2862 	uint32_t meter_profile_id =
2863 		req->table_mtr_profile_delete.meter_profile_id;
2864 	struct rte_table_action *a = p->table_data[table_id].a;
2865 
2866 	rsp->status = rte_table_action_meter_profile_delete(a,
2867 		meter_profile_id);
2868 
2869 	return rsp;
2870 }
2871 
2872 static struct pipeline_msg_rsp *
2873 pipeline_msg_handle_table_rule_mtr_read(struct pipeline_data *p,
2874 	struct pipeline_msg_req *req)
2875 {
2876 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2877 	uint32_t table_id = req->id;
2878 	void *data = req->table_rule_mtr_read.data;
2879 	uint32_t tc_mask = req->table_rule_mtr_read.tc_mask;
2880 	int clear = req->table_rule_mtr_read.clear;
2881 	struct rte_table_action *a = p->table_data[table_id].a;
2882 
2883 	rsp->status = rte_table_action_meter_read(a,
2884 		data,
2885 		tc_mask,
2886 		&rsp->table_rule_mtr_read.stats,
2887 		clear);
2888 
2889 	return rsp;
2890 }
2891 
2892 static struct pipeline_msg_rsp *
2893 pipeline_msg_handle_table_dscp_table_update(struct pipeline_data *p,
2894 	struct pipeline_msg_req *req)
2895 {
2896 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2897 	uint32_t table_id = req->id;
2898 	uint64_t dscp_mask = req->table_dscp_table_update.dscp_mask;
2899 	struct rte_table_action_dscp_table *dscp_table =
2900 		&req->table_dscp_table_update.dscp_table;
2901 	struct rte_table_action *a = p->table_data[table_id].a;
2902 
2903 	rsp->status = rte_table_action_dscp_table_update(a,
2904 		dscp_mask,
2905 		dscp_table);
2906 
2907 	return rsp;
2908 }
2909 
2910 static struct pipeline_msg_rsp *
2911 pipeline_msg_handle_table_rule_ttl_read(struct pipeline_data *p,
2912 	struct pipeline_msg_req *req)
2913 {
2914 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2915 	uint32_t table_id = req->id;
2916 	void *data = req->table_rule_ttl_read.data;
2917 	int clear = req->table_rule_ttl_read.clear;
2918 	struct rte_table_action *a = p->table_data[table_id].a;
2919 
2920 	rsp->status = rte_table_action_ttl_read(a,
2921 		data,
2922 		&rsp->table_rule_ttl_read.stats,
2923 		clear);
2924 
2925 	return rsp;
2926 }
2927 
2928 static void
2929 pipeline_msg_handle(struct pipeline_data *p)
2930 {
2931 	for ( ; ; ) {
2932 		struct pipeline_msg_req *req;
2933 		struct pipeline_msg_rsp *rsp;
2934 
2935 		req = pipeline_msg_recv(p->msgq_req);
2936 		if (req == NULL)
2937 			break;
2938 
2939 		switch (req->type) {
2940 		case PIPELINE_REQ_PORT_IN_STATS_READ:
2941 			rsp = pipeline_msg_handle_port_in_stats_read(p, req);
2942 			break;
2943 
2944 		case PIPELINE_REQ_PORT_IN_ENABLE:
2945 			rsp = pipeline_msg_handle_port_in_enable(p, req);
2946 			break;
2947 
2948 		case PIPELINE_REQ_PORT_IN_DISABLE:
2949 			rsp = pipeline_msg_handle_port_in_disable(p, req);
2950 			break;
2951 
2952 		case PIPELINE_REQ_PORT_OUT_STATS_READ:
2953 			rsp = pipeline_msg_handle_port_out_stats_read(p, req);
2954 			break;
2955 
2956 		case PIPELINE_REQ_TABLE_STATS_READ:
2957 			rsp = pipeline_msg_handle_table_stats_read(p, req);
2958 			break;
2959 
2960 		case PIPELINE_REQ_TABLE_RULE_ADD:
2961 			rsp = pipeline_msg_handle_table_rule_add(p, req);
2962 			break;
2963 
2964 		case PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT:
2965 			rsp = pipeline_msg_handle_table_rule_add_default(p,	req);
2966 			break;
2967 
2968 		case PIPELINE_REQ_TABLE_RULE_ADD_BULK:
2969 			rsp = pipeline_msg_handle_table_rule_add_bulk(p, req);
2970 			break;
2971 
2972 		case PIPELINE_REQ_TABLE_RULE_DELETE:
2973 			rsp = pipeline_msg_handle_table_rule_delete(p, req);
2974 			break;
2975 
2976 		case PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT:
2977 			rsp = pipeline_msg_handle_table_rule_delete_default(p, req);
2978 			break;
2979 
2980 		case PIPELINE_REQ_TABLE_RULE_STATS_READ:
2981 			rsp = pipeline_msg_handle_table_rule_stats_read(p, req);
2982 			break;
2983 
2984 		case PIPELINE_REQ_TABLE_MTR_PROFILE_ADD:
2985 			rsp = pipeline_msg_handle_table_mtr_profile_add(p, req);
2986 			break;
2987 
2988 		case PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE:
2989 			rsp = pipeline_msg_handle_table_mtr_profile_delete(p, req);
2990 			break;
2991 
2992 		case PIPELINE_REQ_TABLE_RULE_MTR_READ:
2993 			rsp = pipeline_msg_handle_table_rule_mtr_read(p, req);
2994 			break;
2995 
2996 		case PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE:
2997 			rsp = pipeline_msg_handle_table_dscp_table_update(p, req);
2998 			break;
2999 
3000 		case PIPELINE_REQ_TABLE_RULE_TTL_READ:
3001 			rsp = pipeline_msg_handle_table_rule_ttl_read(p, req);
3002 			break;
3003 
3004 		default:
3005 			rsp = (struct pipeline_msg_rsp *) req;
3006 			rsp->status = -1;
3007 		}
3008 
3009 		pipeline_msg_send(p->msgq_rsp, rsp);
3010 	}
3011 }
3012 
3013 /**
3014  * Data plane threads: main
3015  */
3016 int
3017 thread_main(void *arg __rte_unused)
3018 {
3019 	struct thread_data *t;
3020 	uint32_t thread_id, i;
3021 
3022 	thread_id = rte_lcore_id();
3023 	t = &thread_data[thread_id];
3024 
3025 	/* Dispatch loop */
3026 	for (i = 0; ; i++) {
3027 		uint32_t j;
3028 
3029 		/* Data Plane */
3030 		for (j = 0; j < t->n_pipelines; j++)
3031 			rte_pipeline_run(t->p[j]);
3032 
3033 		/* Control Plane */
3034 		if ((i & 0xF) == 0) {
3035 			uint64_t time = rte_get_tsc_cycles();
3036 			uint64_t time_next_min = UINT64_MAX;
3037 
3038 			if (time < t->time_next_min)
3039 				continue;
3040 
3041 			/* Pipeline message queues */
3042 			for (j = 0; j < t->n_pipelines; j++) {
3043 				struct pipeline_data *p =
3044 					&t->pipeline_data[j];
3045 				uint64_t time_next = p->time_next;
3046 
3047 				if (time_next <= time) {
3048 					pipeline_msg_handle(p);
3049 					rte_pipeline_flush(p->p);
3050 					time_next = time + p->timer_period;
3051 					p->time_next = time_next;
3052 				}
3053 
3054 				if (time_next < time_next_min)
3055 					time_next_min = time_next;
3056 			}
3057 
3058 			/* Thread message queues */
3059 			{
3060 				uint64_t time_next = t->time_next;
3061 
3062 				if (time_next <= time) {
3063 					thread_msg_handle(t);
3064 					time_next = time + t->timer_period;
3065 					t->time_next = time_next;
3066 				}
3067 
3068 				if (time_next < time_next_min)
3069 					time_next_min = time_next;
3070 			}
3071 
3072 			t->time_next_min = time_next_min;
3073 		}
3074 	}
3075 
3076 	return 0;
3077 }
3078