xref: /dpdk/examples/ip_pipeline/thread.c (revision d2cb41c2)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2018 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11 
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17 
18 #include "common.h"
19 #include "thread.h"
20 #include "pipeline.h"
21 
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX                               256
24 #endif
25 
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE                                   64
28 #endif
29 
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS                             100
32 #endif
33 
34 /**
35  * Master thead: data plane thread context
36  */
37 struct thread {
38 	struct rte_ring *msgq_req;
39 	struct rte_ring *msgq_rsp;
40 
41 	uint32_t enabled;
42 };
43 
44 static struct thread thread[RTE_MAX_LCORE];
45 
46 /**
47  * Data plane threads: context
48  */
49 struct table_data {
50 	struct rte_table_action *a;
51 };
52 
53 struct pipeline_data {
54 	struct rte_pipeline *p;
55 	struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
56 	uint32_t n_tables;
57 
58 	struct rte_ring *msgq_req;
59 	struct rte_ring *msgq_rsp;
60 	uint64_t timer_period; /* Measured in CPU cycles. */
61 	uint64_t time_next;
62 
63 	uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
64 };
65 
66 struct thread_data {
67 	struct rte_pipeline *p[THREAD_PIPELINES_MAX];
68 	uint32_t n_pipelines;
69 
70 	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71 	struct rte_ring *msgq_req;
72 	struct rte_ring *msgq_rsp;
73 	uint64_t timer_period; /* Measured in CPU cycles. */
74 	uint64_t time_next;
75 	uint64_t time_next_min;
76 } __rte_cache_aligned;
77 
78 static struct thread_data thread_data[RTE_MAX_LCORE];
79 
80 /**
81  * Master thread: data plane thread init
82  */
83 static void
84 thread_free(void)
85 {
86 	uint32_t i;
87 
88 	for (i = 0; i < RTE_MAX_LCORE; i++) {
89 		struct thread *t = &thread[i];
90 
91 		if (!rte_lcore_is_enabled(i))
92 			continue;
93 
94 		/* MSGQs */
95 		if (t->msgq_req)
96 			rte_ring_free(t->msgq_req);
97 
98 		if (t->msgq_rsp)
99 			rte_ring_free(t->msgq_rsp);
100 	}
101 }
102 
103 int
104 thread_init(void)
105 {
106 	uint32_t i;
107 
108 	RTE_LCORE_FOREACH_SLAVE(i) {
109 		char name[NAME_MAX];
110 		struct rte_ring *msgq_req, *msgq_rsp;
111 		struct thread *t = &thread[i];
112 		struct thread_data *t_data = &thread_data[i];
113 		uint32_t cpu_id = rte_lcore_to_socket_id(i);
114 
115 		/* MSGQs */
116 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
117 
118 		msgq_req = rte_ring_create(name,
119 			THREAD_MSGQ_SIZE,
120 			cpu_id,
121 			RING_F_SP_ENQ | RING_F_SC_DEQ);
122 
123 		if (msgq_req == NULL) {
124 			thread_free();
125 			return -1;
126 		}
127 
128 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
129 
130 		msgq_rsp = rte_ring_create(name,
131 			THREAD_MSGQ_SIZE,
132 			cpu_id,
133 			RING_F_SP_ENQ | RING_F_SC_DEQ);
134 
135 		if (msgq_rsp == NULL) {
136 			thread_free();
137 			return -1;
138 		}
139 
140 		/* Master thread records */
141 		t->msgq_req = msgq_req;
142 		t->msgq_rsp = msgq_rsp;
143 		t->enabled = 1;
144 
145 		/* Data plane thread records */
146 		t_data->n_pipelines = 0;
147 		t_data->msgq_req = msgq_req;
148 		t_data->msgq_rsp = msgq_rsp;
149 		t_data->timer_period =
150 			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
151 		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
152 		t_data->time_next_min = t_data->time_next;
153 	}
154 
155 	return 0;
156 }
157 
158 static inline int
159 thread_is_running(uint32_t thread_id)
160 {
161 	enum rte_lcore_state_t thread_state;
162 
163 	thread_state = rte_eal_get_lcore_state(thread_id);
164 	return (thread_state == RUNNING) ? 1 : 0;
165 }
166 
167 /**
168  * Pipeline is running when:
169  *    (A) Pipeline is mapped to a data plane thread AND
170  *    (B) Its data plane thread is in RUNNING state.
171  */
172 static inline int
173 pipeline_is_running(struct pipeline *p)
174 {
175 	if (p->enabled == 0)
176 		return 0;
177 
178 	return thread_is_running(p->thread_id);
179 }
180 
181 /**
182  * Master thread & data plane threads: message passing
183  */
184 enum thread_req_type {
185 	THREAD_REQ_PIPELINE_ENABLE = 0,
186 	THREAD_REQ_PIPELINE_DISABLE,
187 	THREAD_REQ_MAX
188 };
189 
190 struct thread_msg_req {
191 	enum thread_req_type type;
192 
193 	union {
194 		struct {
195 			struct rte_pipeline *p;
196 			struct {
197 				struct rte_table_action *a;
198 			} table[RTE_PIPELINE_TABLE_MAX];
199 			struct rte_ring *msgq_req;
200 			struct rte_ring *msgq_rsp;
201 			uint32_t timer_period_ms;
202 			uint32_t n_tables;
203 		} pipeline_enable;
204 
205 		struct {
206 			struct rte_pipeline *p;
207 		} pipeline_disable;
208 	};
209 };
210 
211 struct thread_msg_rsp {
212 	int status;
213 };
214 
215 /**
216  * Master thread
217  */
218 static struct thread_msg_req *
219 thread_msg_alloc(void)
220 {
221 	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
222 		sizeof(struct thread_msg_rsp));
223 
224 	return calloc(1, size);
225 }
226 
227 static void
228 thread_msg_free(struct thread_msg_rsp *rsp)
229 {
230 	free(rsp);
231 }
232 
233 static struct thread_msg_rsp *
234 thread_msg_send_recv(uint32_t thread_id,
235 	struct thread_msg_req *req)
236 {
237 	struct thread *t = &thread[thread_id];
238 	struct rte_ring *msgq_req = t->msgq_req;
239 	struct rte_ring *msgq_rsp = t->msgq_rsp;
240 	struct thread_msg_rsp *rsp;
241 	int status;
242 
243 	/* send */
244 	do {
245 		status = rte_ring_sp_enqueue(msgq_req, req);
246 	} while (status == -ENOBUFS);
247 
248 	/* recv */
249 	do {
250 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
251 	} while (status != 0);
252 
253 	return rsp;
254 }
255 
256 int
257 thread_pipeline_enable(uint32_t thread_id,
258 	const char *pipeline_name)
259 {
260 	struct pipeline *p = pipeline_find(pipeline_name);
261 	struct thread *t;
262 	struct thread_msg_req *req;
263 	struct thread_msg_rsp *rsp;
264 	uint32_t i;
265 	int status;
266 
267 	/* Check input params */
268 	if ((thread_id >= RTE_MAX_LCORE) ||
269 		(p == NULL) ||
270 		(p->n_ports_in == 0) ||
271 		(p->n_ports_out == 0) ||
272 		(p->n_tables == 0))
273 		return -1;
274 
275 	t = &thread[thread_id];
276 	if ((t->enabled == 0) ||
277 		p->enabled)
278 		return -1;
279 
280 	if (!thread_is_running(thread_id)) {
281 		struct thread_data *td = &thread_data[thread_id];
282 		struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
283 
284 		if (td->n_pipelines >= THREAD_PIPELINES_MAX)
285 			return -1;
286 
287 		/* Data plane thread */
288 		td->p[td->n_pipelines] = p->p;
289 
290 		tdp->p = p->p;
291 		for (i = 0; i < p->n_tables; i++)
292 			tdp->table_data[i].a = p->table[i].a;
293 
294 		tdp->n_tables = p->n_tables;
295 
296 		tdp->msgq_req = p->msgq_req;
297 		tdp->msgq_rsp = p->msgq_rsp;
298 		tdp->timer_period = (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
299 		tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
300 
301 		td->n_pipelines++;
302 
303 		/* Pipeline */
304 		p->thread_id = thread_id;
305 		p->enabled = 1;
306 
307 		return 0;
308 	}
309 
310 	/* Allocate request */
311 	req = thread_msg_alloc();
312 	if (req == NULL)
313 		return -1;
314 
315 	/* Write request */
316 	req->type = THREAD_REQ_PIPELINE_ENABLE;
317 	req->pipeline_enable.p = p->p;
318 	for (i = 0; i < p->n_tables; i++)
319 		req->pipeline_enable.table[i].a =
320 			p->table[i].a;
321 	req->pipeline_enable.msgq_req = p->msgq_req;
322 	req->pipeline_enable.msgq_rsp = p->msgq_rsp;
323 	req->pipeline_enable.timer_period_ms = p->timer_period_ms;
324 	req->pipeline_enable.n_tables = p->n_tables;
325 
326 	/* Send request and wait for response */
327 	rsp = thread_msg_send_recv(thread_id, req);
328 	if (rsp == NULL)
329 		return -1;
330 
331 	/* Read response */
332 	status = rsp->status;
333 
334 	/* Free response */
335 	thread_msg_free(rsp);
336 
337 	/* Request completion */
338 	if (status)
339 		return status;
340 
341 	p->thread_id = thread_id;
342 	p->enabled = 1;
343 
344 	return 0;
345 }
346 
347 int
348 thread_pipeline_disable(uint32_t thread_id,
349 	const char *pipeline_name)
350 {
351 	struct pipeline *p = pipeline_find(pipeline_name);
352 	struct thread *t;
353 	struct thread_msg_req *req;
354 	struct thread_msg_rsp *rsp;
355 	int status;
356 
357 	/* Check input params */
358 	if ((thread_id >= RTE_MAX_LCORE) ||
359 		(p == NULL))
360 		return -1;
361 
362 	t = &thread[thread_id];
363 	if (t->enabled == 0)
364 		return -1;
365 
366 	if (p->enabled == 0)
367 		return 0;
368 
369 	if (p->thread_id != thread_id)
370 		return -1;
371 
372 	if (!thread_is_running(thread_id)) {
373 		struct thread_data *td = &thread_data[thread_id];
374 		uint32_t i;
375 
376 		for (i = 0; i < td->n_pipelines; i++) {
377 			struct pipeline_data *tdp = &td->pipeline_data[i];
378 
379 			if (tdp->p != p->p)
380 				continue;
381 
382 			/* Data plane thread */
383 			if (i < td->n_pipelines - 1) {
384 				struct rte_pipeline *pipeline_last =
385 					td->p[td->n_pipelines - 1];
386 				struct pipeline_data *tdp_last =
387 					&td->pipeline_data[td->n_pipelines - 1];
388 
389 				td->p[i] = pipeline_last;
390 				memcpy(tdp, tdp_last, sizeof(*tdp));
391 			}
392 
393 			td->n_pipelines--;
394 
395 			/* Pipeline */
396 			p->enabled = 0;
397 
398 			break;
399 		}
400 
401 		return 0;
402 	}
403 
404 	/* Allocate request */
405 	req = thread_msg_alloc();
406 	if (req == NULL)
407 		return -1;
408 
409 	/* Write request */
410 	req->type = THREAD_REQ_PIPELINE_DISABLE;
411 	req->pipeline_disable.p = p->p;
412 
413 	/* Send request and wait for response */
414 	rsp = thread_msg_send_recv(thread_id, req);
415 	if (rsp == NULL)
416 		return -1;
417 
418 	/* Read response */
419 	status = rsp->status;
420 
421 	/* Free response */
422 	thread_msg_free(rsp);
423 
424 	/* Request completion */
425 	if (status)
426 		return status;
427 
428 	p->enabled = 0;
429 
430 	return 0;
431 }
432 
433 /**
434  * Data plane threads: message handling
435  */
436 static inline struct thread_msg_req *
437 thread_msg_recv(struct rte_ring *msgq_req)
438 {
439 	struct thread_msg_req *req;
440 
441 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
442 
443 	if (status != 0)
444 		return NULL;
445 
446 	return req;
447 }
448 
449 static inline void
450 thread_msg_send(struct rte_ring *msgq_rsp,
451 	struct thread_msg_rsp *rsp)
452 {
453 	int status;
454 
455 	do {
456 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
457 	} while (status == -ENOBUFS);
458 }
459 
460 static struct thread_msg_rsp *
461 thread_msg_handle_pipeline_enable(struct thread_data *t,
462 	struct thread_msg_req *req)
463 {
464 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
465 	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
466 	uint32_t i;
467 
468 	/* Request */
469 	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
470 		rsp->status = -1;
471 		return rsp;
472 	}
473 
474 	t->p[t->n_pipelines] = req->pipeline_enable.p;
475 
476 	p->p = req->pipeline_enable.p;
477 	for (i = 0; i < req->pipeline_enable.n_tables; i++)
478 		p->table_data[i].a =
479 			req->pipeline_enable.table[i].a;
480 
481 	p->n_tables = req->pipeline_enable.n_tables;
482 
483 	p->msgq_req = req->pipeline_enable.msgq_req;
484 	p->msgq_rsp = req->pipeline_enable.msgq_rsp;
485 	p->timer_period =
486 		(rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
487 	p->time_next = rte_get_tsc_cycles() + p->timer_period;
488 
489 	t->n_pipelines++;
490 
491 	/* Response */
492 	rsp->status = 0;
493 	return rsp;
494 }
495 
496 static struct thread_msg_rsp *
497 thread_msg_handle_pipeline_disable(struct thread_data *t,
498 	struct thread_msg_req *req)
499 {
500 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
501 	uint32_t n_pipelines = t->n_pipelines;
502 	struct rte_pipeline *pipeline = req->pipeline_disable.p;
503 	uint32_t i;
504 
505 	/* find pipeline */
506 	for (i = 0; i < n_pipelines; i++) {
507 		struct pipeline_data *p = &t->pipeline_data[i];
508 
509 		if (p->p != pipeline)
510 			continue;
511 
512 		if (i < n_pipelines - 1) {
513 			struct rte_pipeline *pipeline_last =
514 				t->p[n_pipelines - 1];
515 			struct pipeline_data *p_last =
516 				&t->pipeline_data[n_pipelines - 1];
517 
518 			t->p[i] = pipeline_last;
519 			memcpy(p, p_last, sizeof(*p));
520 		}
521 
522 		t->n_pipelines--;
523 
524 		rsp->status = 0;
525 		return rsp;
526 	}
527 
528 	/* should not get here */
529 	rsp->status = 0;
530 	return rsp;
531 }
532 
533 static void
534 thread_msg_handle(struct thread_data *t)
535 {
536 	for ( ; ; ) {
537 		struct thread_msg_req *req;
538 		struct thread_msg_rsp *rsp;
539 
540 		req = thread_msg_recv(t->msgq_req);
541 		if (req == NULL)
542 			break;
543 
544 		switch (req->type) {
545 		case THREAD_REQ_PIPELINE_ENABLE:
546 			rsp = thread_msg_handle_pipeline_enable(t, req);
547 			break;
548 
549 		case THREAD_REQ_PIPELINE_DISABLE:
550 			rsp = thread_msg_handle_pipeline_disable(t, req);
551 			break;
552 
553 		default:
554 			rsp = (struct thread_msg_rsp *) req;
555 			rsp->status = -1;
556 		}
557 
558 		thread_msg_send(t->msgq_rsp, rsp);
559 	}
560 }
561 
562 /**
563  * Master thread & data plane threads: message passing
564  */
565 enum pipeline_req_type {
566 	/* Port IN */
567 	PIPELINE_REQ_PORT_IN_STATS_READ,
568 	PIPELINE_REQ_PORT_IN_ENABLE,
569 	PIPELINE_REQ_PORT_IN_DISABLE,
570 
571 	/* Port OUT */
572 	PIPELINE_REQ_PORT_OUT_STATS_READ,
573 
574 	/* Table */
575 	PIPELINE_REQ_TABLE_STATS_READ,
576 	PIPELINE_REQ_TABLE_RULE_ADD,
577 	PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT,
578 	PIPELINE_REQ_TABLE_RULE_ADD_BULK,
579 	PIPELINE_REQ_TABLE_RULE_DELETE,
580 	PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT,
581 	PIPELINE_REQ_TABLE_RULE_STATS_READ,
582 	PIPELINE_REQ_TABLE_MTR_PROFILE_ADD,
583 	PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE,
584 	PIPELINE_REQ_TABLE_RULE_MTR_READ,
585 	PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE,
586 	PIPELINE_REQ_TABLE_RULE_TTL_READ,
587 	PIPELINE_REQ_MAX
588 };
589 
590 struct pipeline_msg_req_port_in_stats_read {
591 	int clear;
592 };
593 
594 struct pipeline_msg_req_port_out_stats_read {
595 	int clear;
596 };
597 
598 struct pipeline_msg_req_table_stats_read {
599 	int clear;
600 };
601 
602 struct pipeline_msg_req_table_rule_add {
603 	struct table_rule_match match;
604 	struct table_rule_action action;
605 };
606 
607 struct pipeline_msg_req_table_rule_add_default {
608 	struct table_rule_action action;
609 };
610 
611 struct pipeline_msg_req_table_rule_add_bulk {
612 	struct table_rule_list *list;
613 	int bulk;
614 };
615 
616 struct pipeline_msg_req_table_rule_delete {
617 	struct table_rule_match match;
618 };
619 
620 struct pipeline_msg_req_table_rule_stats_read {
621 	void *data;
622 	int clear;
623 };
624 
625 struct pipeline_msg_req_table_mtr_profile_add {
626 	uint32_t meter_profile_id;
627 	struct rte_table_action_meter_profile profile;
628 };
629 
630 struct pipeline_msg_req_table_mtr_profile_delete {
631 	uint32_t meter_profile_id;
632 };
633 
634 struct pipeline_msg_req_table_rule_mtr_read {
635 	void *data;
636 	uint32_t tc_mask;
637 	int clear;
638 };
639 
640 struct pipeline_msg_req_table_dscp_table_update {
641 	uint64_t dscp_mask;
642 	struct rte_table_action_dscp_table dscp_table;
643 };
644 
645 struct pipeline_msg_req_table_rule_ttl_read {
646 	void *data;
647 	int clear;
648 };
649 
650 struct pipeline_msg_req {
651 	enum pipeline_req_type type;
652 	uint32_t id; /* Port IN, port OUT or table ID */
653 
654 	RTE_STD_C11
655 	union {
656 		struct pipeline_msg_req_port_in_stats_read port_in_stats_read;
657 		struct pipeline_msg_req_port_out_stats_read port_out_stats_read;
658 		struct pipeline_msg_req_table_stats_read table_stats_read;
659 		struct pipeline_msg_req_table_rule_add table_rule_add;
660 		struct pipeline_msg_req_table_rule_add_default table_rule_add_default;
661 		struct pipeline_msg_req_table_rule_add_bulk table_rule_add_bulk;
662 		struct pipeline_msg_req_table_rule_delete table_rule_delete;
663 		struct pipeline_msg_req_table_rule_stats_read table_rule_stats_read;
664 		struct pipeline_msg_req_table_mtr_profile_add table_mtr_profile_add;
665 		struct pipeline_msg_req_table_mtr_profile_delete table_mtr_profile_delete;
666 		struct pipeline_msg_req_table_rule_mtr_read table_rule_mtr_read;
667 		struct pipeline_msg_req_table_dscp_table_update table_dscp_table_update;
668 		struct pipeline_msg_req_table_rule_ttl_read table_rule_ttl_read;
669 	};
670 };
671 
672 struct pipeline_msg_rsp_port_in_stats_read {
673 	struct rte_pipeline_port_in_stats stats;
674 };
675 
676 struct pipeline_msg_rsp_port_out_stats_read {
677 	struct rte_pipeline_port_out_stats stats;
678 };
679 
680 struct pipeline_msg_rsp_table_stats_read {
681 	struct rte_pipeline_table_stats stats;
682 };
683 
684 struct pipeline_msg_rsp_table_rule_add {
685 	void *data;
686 };
687 
688 struct pipeline_msg_rsp_table_rule_add_default {
689 	void *data;
690 };
691 
692 struct pipeline_msg_rsp_table_rule_add_bulk {
693 	uint32_t n_rules;
694 };
695 
696 struct pipeline_msg_rsp_table_rule_stats_read {
697 	struct rte_table_action_stats_counters stats;
698 };
699 
700 struct pipeline_msg_rsp_table_rule_mtr_read {
701 	struct rte_table_action_mtr_counters stats;
702 };
703 
704 struct pipeline_msg_rsp_table_rule_ttl_read {
705 	struct rte_table_action_ttl_counters stats;
706 };
707 
708 struct pipeline_msg_rsp {
709 	int status;
710 
711 	RTE_STD_C11
712 	union {
713 		struct pipeline_msg_rsp_port_in_stats_read port_in_stats_read;
714 		struct pipeline_msg_rsp_port_out_stats_read port_out_stats_read;
715 		struct pipeline_msg_rsp_table_stats_read table_stats_read;
716 		struct pipeline_msg_rsp_table_rule_add table_rule_add;
717 		struct pipeline_msg_rsp_table_rule_add_default table_rule_add_default;
718 		struct pipeline_msg_rsp_table_rule_add_bulk table_rule_add_bulk;
719 		struct pipeline_msg_rsp_table_rule_stats_read table_rule_stats_read;
720 		struct pipeline_msg_rsp_table_rule_mtr_read table_rule_mtr_read;
721 		struct pipeline_msg_rsp_table_rule_ttl_read table_rule_ttl_read;
722 	};
723 };
724 
725 /**
726  * Master thread
727  */
728 static struct pipeline_msg_req *
729 pipeline_msg_alloc(void)
730 {
731 	size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
732 		sizeof(struct pipeline_msg_rsp));
733 
734 	return calloc(1, size);
735 }
736 
737 static void
738 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
739 {
740 	free(rsp);
741 }
742 
743 static struct pipeline_msg_rsp *
744 pipeline_msg_send_recv(struct pipeline *p,
745 	struct pipeline_msg_req *req)
746 {
747 	struct rte_ring *msgq_req = p->msgq_req;
748 	struct rte_ring *msgq_rsp = p->msgq_rsp;
749 	struct pipeline_msg_rsp *rsp;
750 	int status;
751 
752 	/* send */
753 	do {
754 		status = rte_ring_sp_enqueue(msgq_req, req);
755 	} while (status == -ENOBUFS);
756 
757 	/* recv */
758 	do {
759 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
760 	} while (status != 0);
761 
762 	return rsp;
763 }
764 
765 int
766 pipeline_port_in_stats_read(const char *pipeline_name,
767 	uint32_t port_id,
768 	struct rte_pipeline_port_in_stats *stats,
769 	int clear)
770 {
771 	struct pipeline *p;
772 	struct pipeline_msg_req *req;
773 	struct pipeline_msg_rsp *rsp;
774 	int status;
775 
776 	/* Check input params */
777 	if ((pipeline_name == NULL) ||
778 		(stats == NULL))
779 		return -1;
780 
781 	p = pipeline_find(pipeline_name);
782 	if ((p == NULL) ||
783 		(port_id >= p->n_ports_in))
784 		return -1;
785 
786 	if (!pipeline_is_running(p)) {
787 		status = rte_pipeline_port_in_stats_read(p->p,
788 			port_id,
789 			stats,
790 			clear);
791 
792 		return status;
793 	}
794 
795 	/* Allocate request */
796 	req = pipeline_msg_alloc();
797 	if (req == NULL)
798 		return -1;
799 
800 	/* Write request */
801 	req->type = PIPELINE_REQ_PORT_IN_STATS_READ;
802 	req->id = port_id;
803 	req->port_in_stats_read.clear = clear;
804 
805 	/* Send request and wait for response */
806 	rsp = pipeline_msg_send_recv(p, req);
807 	if (rsp == NULL)
808 		return -1;
809 
810 	/* Read response */
811 	status = rsp->status;
812 	if (status)
813 		memcpy(stats, &rsp->port_in_stats_read.stats, sizeof(*stats));
814 
815 	/* Free response */
816 	pipeline_msg_free(rsp);
817 
818 	return status;
819 }
820 
821 int
822 pipeline_port_in_enable(const char *pipeline_name,
823 	uint32_t port_id)
824 {
825 	struct pipeline *p;
826 	struct pipeline_msg_req *req;
827 	struct pipeline_msg_rsp *rsp;
828 	int status;
829 
830 	/* Check input params */
831 	if (pipeline_name == NULL)
832 		return -1;
833 
834 	p = pipeline_find(pipeline_name);
835 	if ((p == NULL) ||
836 		(port_id >= p->n_ports_in))
837 		return -1;
838 
839 	if (!pipeline_is_running(p)) {
840 		status = rte_pipeline_port_in_enable(p->p, port_id);
841 		return status;
842 	}
843 
844 	/* Allocate request */
845 	req = pipeline_msg_alloc();
846 	if (req == NULL)
847 		return -1;
848 
849 	/* Write request */
850 	req->type = PIPELINE_REQ_PORT_IN_ENABLE;
851 	req->id = port_id;
852 
853 	/* Send request and wait for response */
854 	rsp = pipeline_msg_send_recv(p, req);
855 	if (rsp == NULL)
856 		return -1;
857 
858 	/* Read response */
859 	status = rsp->status;
860 
861 	/* Free response */
862 	pipeline_msg_free(rsp);
863 
864 	return status;
865 }
866 
867 int
868 pipeline_port_in_disable(const char *pipeline_name,
869 	uint32_t port_id)
870 {
871 	struct pipeline *p;
872 	struct pipeline_msg_req *req;
873 	struct pipeline_msg_rsp *rsp;
874 	int status;
875 
876 	/* Check input params */
877 	if (pipeline_name == NULL)
878 		return -1;
879 
880 	p = pipeline_find(pipeline_name);
881 	if ((p == NULL) ||
882 		(port_id >= p->n_ports_in))
883 		return -1;
884 
885 	if (!pipeline_is_running(p)) {
886 		status = rte_pipeline_port_in_disable(p->p, port_id);
887 		return status;
888 	}
889 
890 	/* Allocate request */
891 	req = pipeline_msg_alloc();
892 	if (req == NULL)
893 		return -1;
894 
895 	/* Write request */
896 	req->type = PIPELINE_REQ_PORT_IN_DISABLE;
897 	req->id = port_id;
898 
899 	/* Send request and wait for response */
900 	rsp = pipeline_msg_send_recv(p, req);
901 	if (rsp == NULL)
902 		return -1;
903 
904 	/* Read response */
905 	status = rsp->status;
906 
907 	/* Free response */
908 	pipeline_msg_free(rsp);
909 
910 	return status;
911 }
912 
913 int
914 pipeline_port_out_stats_read(const char *pipeline_name,
915 	uint32_t port_id,
916 	struct rte_pipeline_port_out_stats *stats,
917 	int clear)
918 {
919 	struct pipeline *p;
920 	struct pipeline_msg_req *req;
921 	struct pipeline_msg_rsp *rsp;
922 	int status;
923 
924 	/* Check input params */
925 	if ((pipeline_name == NULL) ||
926 		(stats == NULL))
927 		return -1;
928 
929 	p = pipeline_find(pipeline_name);
930 	if ((p == NULL) ||
931 		(port_id >= p->n_ports_out))
932 		return -1;
933 
934 	if (!pipeline_is_running(p)) {
935 		status = rte_pipeline_port_out_stats_read(p->p,
936 			port_id,
937 			stats,
938 			clear);
939 
940 		return status;
941 	}
942 
943 	/* Allocate request */
944 	req = pipeline_msg_alloc();
945 	if (req == NULL)
946 		return -1;
947 
948 	/* Write request */
949 	req->type = PIPELINE_REQ_PORT_OUT_STATS_READ;
950 	req->id = port_id;
951 	req->port_out_stats_read.clear = clear;
952 
953 	/* Send request and wait for response */
954 	rsp = pipeline_msg_send_recv(p, req);
955 	if (rsp == NULL)
956 		return -1;
957 
958 	/* Read response */
959 	status = rsp->status;
960 	if (status)
961 		memcpy(stats, &rsp->port_out_stats_read.stats, sizeof(*stats));
962 
963 	/* Free response */
964 	pipeline_msg_free(rsp);
965 
966 	return status;
967 }
968 
969 int
970 pipeline_table_stats_read(const char *pipeline_name,
971 	uint32_t table_id,
972 	struct rte_pipeline_table_stats *stats,
973 	int clear)
974 {
975 	struct pipeline *p;
976 	struct pipeline_msg_req *req;
977 	struct pipeline_msg_rsp *rsp;
978 	int status;
979 
980 	/* Check input params */
981 	if ((pipeline_name == NULL) ||
982 		(stats == NULL))
983 		return -1;
984 
985 	p = pipeline_find(pipeline_name);
986 	if ((p == NULL) ||
987 		(table_id >= p->n_tables))
988 		return -1;
989 
990 	if (!pipeline_is_running(p)) {
991 		status = rte_pipeline_table_stats_read(p->p,
992 			table_id,
993 			stats,
994 			clear);
995 
996 		return status;
997 	}
998 
999 	/* Allocate request */
1000 	req = pipeline_msg_alloc();
1001 	if (req == NULL)
1002 		return -1;
1003 
1004 	/* Write request */
1005 	req->type = PIPELINE_REQ_TABLE_STATS_READ;
1006 	req->id = table_id;
1007 	req->table_stats_read.clear = clear;
1008 
1009 	/* Send request and wait for response */
1010 	rsp = pipeline_msg_send_recv(p, req);
1011 	if (rsp == NULL)
1012 		return -1;
1013 
1014 	/* Read response */
1015 	status = rsp->status;
1016 	if (status)
1017 		memcpy(stats, &rsp->table_stats_read.stats, sizeof(*stats));
1018 
1019 	/* Free response */
1020 	pipeline_msg_free(rsp);
1021 
1022 	return status;
1023 }
1024 
1025 static int
1026 match_check(struct table_rule_match *match,
1027 	struct pipeline *p,
1028 	uint32_t table_id)
1029 {
1030 	struct table *table;
1031 
1032 	if ((match == NULL) ||
1033 		(p == NULL) ||
1034 		(table_id >= p->n_tables))
1035 		return -1;
1036 
1037 	table = &p->table[table_id];
1038 	if (match->match_type != table->params.match_type)
1039 		return -1;
1040 
1041 	switch (match->match_type) {
1042 	case TABLE_ACL:
1043 	{
1044 		struct table_acl_params *t = &table->params.match.acl;
1045 		struct table_rule_match_acl *r = &match->match.acl;
1046 
1047 		if ((r->ip_version && (t->ip_version == 0)) ||
1048 			((r->ip_version == 0) && t->ip_version))
1049 			return -1;
1050 
1051 		if (r->ip_version) {
1052 			if ((r->sa_depth > 32) ||
1053 				(r->da_depth > 32))
1054 				return -1;
1055 		} else {
1056 			if ((r->sa_depth > 128) ||
1057 				(r->da_depth > 128))
1058 				return -1;
1059 		}
1060 		return 0;
1061 	}
1062 
1063 	case TABLE_ARRAY:
1064 		return 0;
1065 
1066 	case TABLE_HASH:
1067 		return 0;
1068 
1069 	case TABLE_LPM:
1070 	{
1071 		struct table_lpm_params *t = &table->params.match.lpm;
1072 		struct table_rule_match_lpm *r = &match->match.lpm;
1073 
1074 		if ((r->ip_version && (t->key_size != 4)) ||
1075 			((r->ip_version == 0) && (t->key_size != 16)))
1076 			return -1;
1077 
1078 		if (r->ip_version) {
1079 			if (r->depth > 32)
1080 				return -1;
1081 		} else {
1082 			if (r->depth > 128)
1083 				return -1;
1084 		}
1085 		return 0;
1086 	}
1087 
1088 	case TABLE_STUB:
1089 		return -1;
1090 
1091 	default:
1092 		return -1;
1093 	}
1094 }
1095 
1096 static int
1097 action_check(struct table_rule_action *action,
1098 	struct pipeline *p,
1099 	uint32_t table_id)
1100 {
1101 	struct table_action_profile *ap;
1102 
1103 	if ((action == NULL) ||
1104 		(p == NULL) ||
1105 		(table_id >= p->n_tables))
1106 		return -1;
1107 
1108 	ap = p->table[table_id].ap;
1109 	if (action->action_mask != ap->params.action_mask)
1110 		return -1;
1111 
1112 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1113 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1114 			(action->fwd.id >= p->n_ports_out))
1115 			return -1;
1116 
1117 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1118 			(action->fwd.id >= p->n_tables))
1119 			return -1;
1120 	}
1121 
1122 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
1123 		uint32_t tc_mask0 = (1 << ap->params.mtr.n_tc) - 1;
1124 		uint32_t tc_mask1 = action->mtr.tc_mask;
1125 
1126 		if (tc_mask1 != tc_mask0)
1127 			return -1;
1128 	}
1129 
1130 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
1131 		uint32_t n_subports_per_port =
1132 			ap->params.tm.n_subports_per_port;
1133 		uint32_t n_pipes_per_subport =
1134 			ap->params.tm.n_pipes_per_subport;
1135 		uint32_t subport_id = action->tm.subport_id;
1136 		uint32_t pipe_id = action->tm.pipe_id;
1137 
1138 		if ((subport_id >= n_subports_per_port) ||
1139 			(pipe_id >= n_pipes_per_subport))
1140 			return -1;
1141 	}
1142 
1143 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
1144 		uint64_t encap_mask = ap->params.encap.encap_mask;
1145 		enum rte_table_action_encap_type type = action->encap.type;
1146 
1147 		if ((encap_mask & (1LLU << type)) == 0)
1148 			return -1;
1149 	}
1150 
1151 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
1152 		int ip_version0 = ap->params.common.ip_version;
1153 		int ip_version1 = action->nat.ip_version;
1154 
1155 		if ((ip_version1 && (ip_version0 == 0)) ||
1156 			((ip_version1 == 0) && ip_version0))
1157 			return -1;
1158 	}
1159 
1160 	return 0;
1161 }
1162 
1163 static int
1164 action_default_check(struct table_rule_action *action,
1165 	struct pipeline *p,
1166 	uint32_t table_id)
1167 {
1168 	if ((action == NULL) ||
1169 		(action->action_mask != (1LLU << RTE_TABLE_ACTION_FWD)) ||
1170 		(p == NULL) ||
1171 		(table_id >= p->n_tables))
1172 		return -1;
1173 
1174 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1175 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1176 			(action->fwd.id >= p->n_ports_out))
1177 			return -1;
1178 
1179 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1180 			(action->fwd.id >= p->n_tables))
1181 			return -1;
1182 	}
1183 
1184 	return 0;
1185 }
1186 
1187 union table_rule_match_low_level {
1188 	struct rte_table_acl_rule_add_params acl_add;
1189 	struct rte_table_acl_rule_delete_params acl_delete;
1190 	struct rte_table_array_key array;
1191 	uint8_t hash[TABLE_RULE_MATCH_SIZE_MAX];
1192 	struct rte_table_lpm_key lpm_ipv4;
1193 	struct rte_table_lpm_ipv6_key lpm_ipv6;
1194 };
1195 
1196 static int
1197 match_convert(struct table_rule_match *mh,
1198 	union table_rule_match_low_level *ml,
1199 	int add);
1200 
1201 static int
1202 action_convert(struct rte_table_action *a,
1203 	struct table_rule_action *action,
1204 	struct rte_pipeline_table_entry *data);
1205 
1206 struct table_ll {
1207 	struct rte_pipeline *p;
1208 	int table_id;
1209 	struct rte_table_action *a;
1210 	int bulk_supported;
1211 };
1212 
1213 static int
1214 table_rule_add_bulk_ll(struct table_ll *table,
1215 	struct table_rule_list *list,
1216 	uint32_t *n_rules)
1217 {
1218 	union table_rule_match_low_level *match_ll = NULL;
1219 	uint8_t *action_ll = NULL;
1220 	void **match_ll_ptr = NULL;
1221 	struct rte_pipeline_table_entry **action_ll_ptr = NULL;
1222 	struct rte_pipeline_table_entry **entries_ptr = NULL;
1223 	int *found = NULL;
1224 	struct table_rule *rule;
1225 	uint32_t n, i;
1226 	int status = 0;
1227 
1228 	n = 0;
1229 	TAILQ_FOREACH(rule, list, node)
1230 		n++;
1231 
1232 	/* Memory allocation */
1233 	match_ll = calloc(n, sizeof(union table_rule_match_low_level));
1234 	action_ll = calloc(n, TABLE_RULE_ACTION_SIZE_MAX);
1235 
1236 	match_ll_ptr = calloc(n, sizeof(void *));
1237 	action_ll_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1238 
1239 	entries_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1240 	found = calloc(n, sizeof(int));
1241 
1242 	if (match_ll == NULL ||
1243 		action_ll == NULL ||
1244 		match_ll_ptr == NULL ||
1245 		action_ll_ptr == NULL ||
1246 		entries_ptr == NULL ||
1247 		found == NULL) {
1248 			status = -ENOMEM;
1249 			goto table_rule_add_bulk_ll_free;
1250 	}
1251 
1252 	/* Init */
1253 	for (i = 0; i < n; i++) {
1254 		match_ll_ptr[i] = (void *)&match_ll[i];
1255 		action_ll_ptr[i] = (struct rte_pipeline_table_entry *)
1256 			&action_ll[i * TABLE_RULE_ACTION_SIZE_MAX];
1257 	}
1258 
1259 	/* Rule (match, action) conversion */
1260 	i = 0;
1261 	TAILQ_FOREACH(rule, list, node) {
1262 		status = match_convert(&rule->match, match_ll_ptr[i], 1);
1263 		if (status)
1264 			goto table_rule_add_bulk_ll_free;
1265 
1266 		status = action_convert(table->a, &rule->action, action_ll_ptr[i]);
1267 		if (status)
1268 			goto table_rule_add_bulk_ll_free;
1269 
1270 		i++;
1271 	}
1272 
1273 	/* Add rule (match, action) to table */
1274 	if (table->bulk_supported) {
1275 		status = rte_pipeline_table_entry_add_bulk(table->p,
1276 			table->table_id,
1277 			match_ll_ptr,
1278 			action_ll_ptr,
1279 			n,
1280 			found,
1281 			entries_ptr);
1282 		if (status)
1283 			goto table_rule_add_bulk_ll_free;
1284 	} else
1285 		for (i = 0; i < n; i++) {
1286 			status = rte_pipeline_table_entry_add(table->p,
1287 				table->table_id,
1288 				match_ll_ptr[i],
1289 				action_ll_ptr[i],
1290 				&found[i],
1291 				&entries_ptr[i]);
1292 			if (status) {
1293 				if (i == 0)
1294 					goto table_rule_add_bulk_ll_free;
1295 
1296 				/* No roll-back. */
1297 				status = 0;
1298 				n = i;
1299 				break;
1300 			}
1301 		}
1302 
1303 	/* Write back to the rule list. */
1304 	i = 0;
1305 	TAILQ_FOREACH(rule, list, node) {
1306 		if (i >= n)
1307 			break;
1308 
1309 		rule->data = entries_ptr[i];
1310 
1311 		i++;
1312 	}
1313 
1314 	*n_rules = n;
1315 
1316 	/* Free */
1317 table_rule_add_bulk_ll_free:
1318 	free(found);
1319 	free(entries_ptr);
1320 	free(action_ll_ptr);
1321 	free(match_ll_ptr);
1322 	free(action_ll);
1323 	free(match_ll);
1324 
1325 	return status;
1326 }
1327 
1328 int
1329 pipeline_table_rule_add(const char *pipeline_name,
1330 	uint32_t table_id,
1331 	struct table_rule_match *match,
1332 	struct table_rule_action *action)
1333 {
1334 	struct pipeline *p;
1335 	struct table *table;
1336 	struct pipeline_msg_req *req;
1337 	struct pipeline_msg_rsp *rsp;
1338 	struct table_rule *rule;
1339 	int status;
1340 
1341 	/* Check input params */
1342 	if ((pipeline_name == NULL) ||
1343 		(match == NULL) ||
1344 		(action == NULL))
1345 		return -1;
1346 
1347 	p = pipeline_find(pipeline_name);
1348 	if ((p == NULL) ||
1349 		(table_id >= p->n_tables) ||
1350 		match_check(match, p, table_id) ||
1351 		action_check(action, p, table_id))
1352 		return -1;
1353 
1354 	table = &p->table[table_id];
1355 
1356 	rule = calloc(1, sizeof(struct table_rule));
1357 	if (rule == NULL)
1358 		return -1;
1359 
1360 	memcpy(&rule->match, match, sizeof(*match));
1361 	memcpy(&rule->action, action, sizeof(*action));
1362 
1363 	if (!pipeline_is_running(p)) {
1364 		union table_rule_match_low_level match_ll;
1365 		struct rte_pipeline_table_entry *data_in, *data_out;
1366 		int key_found;
1367 		uint8_t *buffer;
1368 
1369 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1370 		if (buffer == NULL) {
1371 			free(rule);
1372 			return -1;
1373 		}
1374 
1375 		/* Table match-action rule conversion */
1376 		data_in = (struct rte_pipeline_table_entry *)buffer;
1377 
1378 		status = match_convert(match, &match_ll, 1);
1379 		if (status) {
1380 			free(buffer);
1381 			free(rule);
1382 			return -1;
1383 		}
1384 
1385 		status = action_convert(table->a, action, data_in);
1386 		if (status) {
1387 			free(buffer);
1388 			free(rule);
1389 			return -1;
1390 		}
1391 
1392 		/* Add rule (match, action) to table */
1393 		status = rte_pipeline_table_entry_add(p->p,
1394 				table_id,
1395 				&match_ll,
1396 				data_in,
1397 				&key_found,
1398 				&data_out);
1399 		if (status) {
1400 			free(buffer);
1401 			free(rule);
1402 			return -1;
1403 		}
1404 
1405 		/* Write Response */
1406 		rule->data = data_out;
1407 		table_rule_add(table, rule);
1408 
1409 		free(buffer);
1410 		return 0;
1411 	}
1412 
1413 	/* Allocate request */
1414 	req = pipeline_msg_alloc();
1415 	if (req == NULL) {
1416 		free(rule);
1417 		return -1;
1418 	}
1419 
1420 	/* Write request */
1421 	req->type = PIPELINE_REQ_TABLE_RULE_ADD;
1422 	req->id = table_id;
1423 	memcpy(&req->table_rule_add.match, match, sizeof(*match));
1424 	memcpy(&req->table_rule_add.action, action, sizeof(*action));
1425 
1426 	/* Send request and wait for response */
1427 	rsp = pipeline_msg_send_recv(p, req);
1428 	if (rsp == NULL) {
1429 		free(rule);
1430 		return -1;
1431 	}
1432 
1433 	/* Read response */
1434 	status = rsp->status;
1435 	if (status == 0) {
1436 		rule->data = rsp->table_rule_add.data;
1437 		table_rule_add(table, rule);
1438 	} else
1439 		free(rule);
1440 
1441 	/* Free response */
1442 	pipeline_msg_free(rsp);
1443 
1444 	return status;
1445 }
1446 
1447 int
1448 pipeline_table_rule_add_default(const char *pipeline_name,
1449 	uint32_t table_id,
1450 	struct table_rule_action *action)
1451 {
1452 	struct pipeline *p;
1453 	struct table *table;
1454 	struct pipeline_msg_req *req;
1455 	struct pipeline_msg_rsp *rsp;
1456 	struct table_rule *rule;
1457 	int status;
1458 
1459 	/* Check input params */
1460 	if ((pipeline_name == NULL) ||
1461 		(action == NULL))
1462 		return -1;
1463 
1464 	p = pipeline_find(pipeline_name);
1465 	if ((p == NULL) ||
1466 		(table_id >= p->n_tables) ||
1467 		action_default_check(action, p, table_id))
1468 		return -1;
1469 
1470 	table = &p->table[table_id];
1471 
1472 	rule = calloc(1, sizeof(struct table_rule));
1473 	if (rule == NULL)
1474 		return -1;
1475 
1476 	memcpy(&rule->action, action, sizeof(*action));
1477 
1478 	if (!pipeline_is_running(p)) {
1479 		struct rte_pipeline_table_entry *data_in, *data_out;
1480 		uint8_t *buffer;
1481 
1482 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1483 		if (buffer == NULL) {
1484 			free(rule);
1485 			return -1;
1486 		}
1487 
1488 		/* Apply actions */
1489 		data_in = (struct rte_pipeline_table_entry *)buffer;
1490 
1491 		data_in->action = action->fwd.action;
1492 		if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
1493 			data_in->port_id = action->fwd.id;
1494 		if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
1495 			data_in->table_id = action->fwd.id;
1496 
1497 		/* Add default rule to table */
1498 		status = rte_pipeline_table_default_entry_add(p->p,
1499 				table_id,
1500 				data_in,
1501 				&data_out);
1502 		if (status) {
1503 			free(buffer);
1504 			free(rule);
1505 			return -1;
1506 		}
1507 
1508 		/* Write Response */
1509 		rule->data = data_out;
1510 		table_rule_default_add(table, rule);
1511 
1512 		free(buffer);
1513 		return 0;
1514 	}
1515 
1516 	/* Allocate request */
1517 	req = pipeline_msg_alloc();
1518 	if (req == NULL) {
1519 		free(rule);
1520 		return -1;
1521 	}
1522 
1523 	/* Write request */
1524 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT;
1525 	req->id = table_id;
1526 	memcpy(&req->table_rule_add_default.action, action, sizeof(*action));
1527 
1528 	/* Send request and wait for response */
1529 	rsp = pipeline_msg_send_recv(p, req);
1530 	if (rsp == NULL) {
1531 		free(rule);
1532 		return -1;
1533 	}
1534 
1535 	/* Read response */
1536 	status = rsp->status;
1537 	if (status == 0) {
1538 		rule->data = rsp->table_rule_add_default.data;
1539 		table_rule_default_add(table, rule);
1540 	} else
1541 		free(rule);
1542 
1543 	/* Free response */
1544 	pipeline_msg_free(rsp);
1545 
1546 	return status;
1547 }
1548 
1549 static uint32_t
1550 table_rule_list_free(struct table_rule_list *list)
1551 {
1552 	uint32_t n = 0;
1553 
1554 	if (!list)
1555 		return 0;
1556 
1557 	for ( ; ; ) {
1558 		struct table_rule *rule;
1559 
1560 		rule = TAILQ_FIRST(list);
1561 		if (rule == NULL)
1562 			break;
1563 
1564 		TAILQ_REMOVE(list, rule, node);
1565 		free(rule);
1566 		n++;
1567 	}
1568 
1569 	free(list);
1570 	return n;
1571 }
1572 
1573 int
1574 pipeline_table_rule_add_bulk(const char *pipeline_name,
1575 	uint32_t table_id,
1576 	struct table_rule_list *list,
1577 	uint32_t *n_rules_added,
1578 	uint32_t *n_rules_not_added)
1579 {
1580 	struct pipeline *p;
1581 	struct table *table;
1582 	struct pipeline_msg_req *req;
1583 	struct pipeline_msg_rsp *rsp;
1584 	struct table_rule *rule;
1585 	int status = 0;
1586 
1587 	/* Check input params */
1588 	if ((pipeline_name == NULL) ||
1589 		(list == NULL) ||
1590 		TAILQ_EMPTY(list) ||
1591 		(n_rules_added == NULL) ||
1592 		(n_rules_not_added == NULL)) {
1593 		table_rule_list_free(list);
1594 		return -EINVAL;
1595 	}
1596 
1597 	p = pipeline_find(pipeline_name);
1598 	if ((p == NULL) ||
1599 		(table_id >= p->n_tables)) {
1600 		table_rule_list_free(list);
1601 		return -EINVAL;
1602 	}
1603 
1604 	table = &p->table[table_id];
1605 
1606 	TAILQ_FOREACH(rule, list, node)
1607 		if (match_check(&rule->match, p, table_id) ||
1608 			action_check(&rule->action, p, table_id)) {
1609 			table_rule_list_free(list);
1610 			return -EINVAL;
1611 		}
1612 
1613 	if (!pipeline_is_running(p)) {
1614 		struct table_ll table_ll = {
1615 			.p = p->p,
1616 			.table_id = table_id,
1617 			.a = table->a,
1618 			.bulk_supported = table->params.match_type == TABLE_ACL,
1619 		};
1620 
1621 		status = table_rule_add_bulk_ll(&table_ll, list, n_rules_added);
1622 		if (status) {
1623 			table_rule_list_free(list);
1624 			return status;
1625 		}
1626 
1627 		table_rule_add_bulk(table, list, *n_rules_added);
1628 		*n_rules_not_added = table_rule_list_free(list);
1629 		return 0;
1630 	}
1631 
1632 	/* Allocate request */
1633 	req = pipeline_msg_alloc();
1634 	if (req == NULL) {
1635 		table_rule_list_free(list);
1636 		return -ENOMEM;
1637 	}
1638 
1639 	/* Write request */
1640 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_BULK;
1641 	req->id = table_id;
1642 	req->table_rule_add_bulk.list = list;
1643 	req->table_rule_add_bulk.bulk = table->params.match_type == TABLE_ACL;
1644 
1645 	/* Send request and wait for response */
1646 	rsp = pipeline_msg_send_recv(p, req);
1647 	if (rsp == NULL) {
1648 		table_rule_list_free(list);
1649 		return -ENOMEM;
1650 	}
1651 
1652 	/* Read response */
1653 	status = rsp->status;
1654 	if (status == 0) {
1655 		*n_rules_added = rsp->table_rule_add_bulk.n_rules;
1656 
1657 		table_rule_add_bulk(table, list, *n_rules_added);
1658 		*n_rules_not_added = table_rule_list_free(list);
1659 	} else
1660 		table_rule_list_free(list);
1661 
1662 
1663 	/* Free response */
1664 	pipeline_msg_free(rsp);
1665 
1666 	return status;
1667 }
1668 
1669 int
1670 pipeline_table_rule_delete(const char *pipeline_name,
1671 	uint32_t table_id,
1672 	struct table_rule_match *match)
1673 {
1674 	struct pipeline *p;
1675 	struct table *table;
1676 	struct pipeline_msg_req *req;
1677 	struct pipeline_msg_rsp *rsp;
1678 	int status;
1679 
1680 	/* Check input params */
1681 	if ((pipeline_name == NULL) ||
1682 		(match == NULL))
1683 		return -1;
1684 
1685 	p = pipeline_find(pipeline_name);
1686 	if ((p == NULL) ||
1687 		(table_id >= p->n_tables) ||
1688 		match_check(match, p, table_id))
1689 		return -1;
1690 
1691 	table = &p->table[table_id];
1692 
1693 	if (!pipeline_is_running(p)) {
1694 		union table_rule_match_low_level match_ll;
1695 		int key_found;
1696 
1697 		status = match_convert(match, &match_ll, 0);
1698 		if (status)
1699 			return -1;
1700 
1701 		status = rte_pipeline_table_entry_delete(p->p,
1702 				table_id,
1703 				&match_ll,
1704 				&key_found,
1705 				NULL);
1706 
1707 		if (status == 0)
1708 			table_rule_delete(table, match);
1709 
1710 		return status;
1711 	}
1712 
1713 	/* Allocate request */
1714 	req = pipeline_msg_alloc();
1715 	if (req == NULL)
1716 		return -1;
1717 
1718 	/* Write request */
1719 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE;
1720 	req->id = table_id;
1721 	memcpy(&req->table_rule_delete.match, match, sizeof(*match));
1722 
1723 	/* Send request and wait for response */
1724 	rsp = pipeline_msg_send_recv(p, req);
1725 	if (rsp == NULL)
1726 		return -1;
1727 
1728 	/* Read response */
1729 	status = rsp->status;
1730 	if (status == 0)
1731 		table_rule_delete(table, match);
1732 
1733 	/* Free response */
1734 	pipeline_msg_free(rsp);
1735 
1736 	return status;
1737 }
1738 
1739 int
1740 pipeline_table_rule_delete_default(const char *pipeline_name,
1741 	uint32_t table_id)
1742 {
1743 	struct pipeline *p;
1744 	struct pipeline_msg_req *req;
1745 	struct pipeline_msg_rsp *rsp;
1746 	int status;
1747 
1748 	/* Check input params */
1749 	if (pipeline_name == NULL)
1750 		return -1;
1751 
1752 	p = pipeline_find(pipeline_name);
1753 	if ((p == NULL) ||
1754 		(table_id >= p->n_tables))
1755 		return -1;
1756 
1757 	if (!pipeline_is_running(p)) {
1758 		status = rte_pipeline_table_default_entry_delete(p->p,
1759 			table_id,
1760 			NULL);
1761 
1762 		return status;
1763 	}
1764 
1765 	/* Allocate request */
1766 	req = pipeline_msg_alloc();
1767 	if (req == NULL)
1768 		return -1;
1769 
1770 	/* Write request */
1771 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT;
1772 	req->id = table_id;
1773 
1774 	/* Send request and wait for response */
1775 	rsp = pipeline_msg_send_recv(p, req);
1776 	if (rsp == NULL)
1777 		return -1;
1778 
1779 	/* Read response */
1780 	status = rsp->status;
1781 
1782 	/* Free response */
1783 	pipeline_msg_free(rsp);
1784 
1785 	return status;
1786 }
1787 
1788 int
1789 pipeline_table_rule_stats_read(const char *pipeline_name,
1790 	uint32_t table_id,
1791 	void *data,
1792 	struct rte_table_action_stats_counters *stats,
1793 	int clear)
1794 {
1795 	struct pipeline *p;
1796 	struct pipeline_msg_req *req;
1797 	struct pipeline_msg_rsp *rsp;
1798 	int status;
1799 
1800 	/* Check input params */
1801 	if ((pipeline_name == NULL) ||
1802 		(data == NULL) ||
1803 		(stats == NULL))
1804 		return -1;
1805 
1806 	p = pipeline_find(pipeline_name);
1807 	if ((p == NULL) ||
1808 		(table_id >= p->n_tables))
1809 		return -1;
1810 
1811 	if (!pipeline_is_running(p)) {
1812 		struct rte_table_action *a = p->table[table_id].a;
1813 
1814 		status = rte_table_action_stats_read(a,
1815 			data,
1816 			stats,
1817 			clear);
1818 
1819 		return status;
1820 	}
1821 
1822 	/* Allocate request */
1823 	req = pipeline_msg_alloc();
1824 	if (req == NULL)
1825 		return -1;
1826 
1827 	/* Write request */
1828 	req->type = PIPELINE_REQ_TABLE_RULE_STATS_READ;
1829 	req->id = table_id;
1830 	req->table_rule_stats_read.data = data;
1831 	req->table_rule_stats_read.clear = clear;
1832 
1833 	/* Send request and wait for response */
1834 	rsp = pipeline_msg_send_recv(p, req);
1835 	if (rsp == NULL)
1836 		return -1;
1837 
1838 	/* Read response */
1839 	status = rsp->status;
1840 	if (status)
1841 		memcpy(stats, &rsp->table_rule_stats_read.stats, sizeof(*stats));
1842 
1843 	/* Free response */
1844 	pipeline_msg_free(rsp);
1845 
1846 	return status;
1847 }
1848 
1849 int
1850 pipeline_table_mtr_profile_add(const char *pipeline_name,
1851 	uint32_t table_id,
1852 	uint32_t meter_profile_id,
1853 	struct rte_table_action_meter_profile *profile)
1854 {
1855 	struct pipeline *p;
1856 	struct pipeline_msg_req *req;
1857 	struct pipeline_msg_rsp *rsp;
1858 	int status;
1859 
1860 	/* Check input params */
1861 	if ((pipeline_name == NULL) ||
1862 		(profile == NULL))
1863 		return -1;
1864 
1865 	p = pipeline_find(pipeline_name);
1866 	if ((p == NULL) ||
1867 		(table_id >= p->n_tables))
1868 		return -1;
1869 
1870 	if (!pipeline_is_running(p)) {
1871 		struct rte_table_action *a = p->table[table_id].a;
1872 
1873 		status = rte_table_action_meter_profile_add(a,
1874 			meter_profile_id,
1875 			profile);
1876 
1877 		return status;
1878 	}
1879 
1880 	/* Allocate request */
1881 	req = pipeline_msg_alloc();
1882 	if (req == NULL)
1883 		return -1;
1884 
1885 	/* Write request */
1886 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_ADD;
1887 	req->id = table_id;
1888 	req->table_mtr_profile_add.meter_profile_id = meter_profile_id;
1889 	memcpy(&req->table_mtr_profile_add.profile, profile, sizeof(*profile));
1890 
1891 	/* Send request and wait for response */
1892 	rsp = pipeline_msg_send_recv(p, req);
1893 	if (rsp == NULL)
1894 		return -1;
1895 
1896 	/* Read response */
1897 	status = rsp->status;
1898 
1899 	/* Free response */
1900 	pipeline_msg_free(rsp);
1901 
1902 	return status;
1903 }
1904 
1905 int
1906 pipeline_table_mtr_profile_delete(const char *pipeline_name,
1907 	uint32_t table_id,
1908 	uint32_t meter_profile_id)
1909 {
1910 	struct pipeline *p;
1911 	struct pipeline_msg_req *req;
1912 	struct pipeline_msg_rsp *rsp;
1913 	int status;
1914 
1915 	/* Check input params */
1916 	if (pipeline_name == NULL)
1917 		return -1;
1918 
1919 	p = pipeline_find(pipeline_name);
1920 	if ((p == NULL) ||
1921 		(table_id >= p->n_tables))
1922 		return -1;
1923 
1924 	if (!pipeline_is_running(p)) {
1925 		struct rte_table_action *a = p->table[table_id].a;
1926 
1927 		status = rte_table_action_meter_profile_delete(a,
1928 				meter_profile_id);
1929 
1930 		return status;
1931 	}
1932 
1933 	/* Allocate request */
1934 	req = pipeline_msg_alloc();
1935 	if (req == NULL)
1936 		return -1;
1937 
1938 	/* Write request */
1939 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE;
1940 	req->id = table_id;
1941 	req->table_mtr_profile_delete.meter_profile_id = meter_profile_id;
1942 
1943 	/* Send request and wait for response */
1944 	rsp = pipeline_msg_send_recv(p, req);
1945 	if (rsp == NULL)
1946 		return -1;
1947 
1948 	/* Read response */
1949 	status = rsp->status;
1950 
1951 	/* Free response */
1952 	pipeline_msg_free(rsp);
1953 
1954 	return status;
1955 }
1956 
1957 int
1958 pipeline_table_rule_mtr_read(const char *pipeline_name,
1959 	uint32_t table_id,
1960 	void *data,
1961 	uint32_t tc_mask,
1962 	struct rte_table_action_mtr_counters *stats,
1963 	int clear)
1964 {
1965 	struct pipeline *p;
1966 	struct pipeline_msg_req *req;
1967 	struct pipeline_msg_rsp *rsp;
1968 	int status;
1969 
1970 	/* Check input params */
1971 	if ((pipeline_name == NULL) ||
1972 		(data == NULL) ||
1973 		(stats == NULL))
1974 		return -1;
1975 
1976 	p = pipeline_find(pipeline_name);
1977 	if ((p == NULL) ||
1978 		(table_id >= p->n_tables))
1979 		return -1;
1980 
1981 	if (!pipeline_is_running(p)) {
1982 		struct rte_table_action *a = p->table[table_id].a;
1983 
1984 		status = rte_table_action_meter_read(a,
1985 				data,
1986 				tc_mask,
1987 				stats,
1988 				clear);
1989 
1990 		return status;
1991 	}
1992 
1993 	/* Allocate request */
1994 	req = pipeline_msg_alloc();
1995 	if (req == NULL)
1996 		return -1;
1997 
1998 	/* Write request */
1999 	req->type = PIPELINE_REQ_TABLE_RULE_MTR_READ;
2000 	req->id = table_id;
2001 	req->table_rule_mtr_read.data = data;
2002 	req->table_rule_mtr_read.tc_mask = tc_mask;
2003 	req->table_rule_mtr_read.clear = clear;
2004 
2005 	/* Send request and wait for response */
2006 	rsp = pipeline_msg_send_recv(p, req);
2007 	if (rsp == NULL)
2008 		return -1;
2009 
2010 	/* Read response */
2011 	status = rsp->status;
2012 	if (status)
2013 		memcpy(stats, &rsp->table_rule_mtr_read.stats, sizeof(*stats));
2014 
2015 	/* Free response */
2016 	pipeline_msg_free(rsp);
2017 
2018 	return status;
2019 }
2020 
2021 int
2022 pipeline_table_dscp_table_update(const char *pipeline_name,
2023 	uint32_t table_id,
2024 	uint64_t dscp_mask,
2025 	struct rte_table_action_dscp_table *dscp_table)
2026 {
2027 	struct pipeline *p;
2028 	struct pipeline_msg_req *req;
2029 	struct pipeline_msg_rsp *rsp;
2030 	int status;
2031 
2032 	/* Check input params */
2033 	if ((pipeline_name == NULL) ||
2034 		(dscp_table == NULL))
2035 		return -1;
2036 
2037 	p = pipeline_find(pipeline_name);
2038 	if ((p == NULL) ||
2039 		(table_id >= p->n_tables))
2040 		return -1;
2041 
2042 	if (!pipeline_is_running(p)) {
2043 		struct rte_table_action *a = p->table[table_id].a;
2044 
2045 		status = rte_table_action_dscp_table_update(a,
2046 				dscp_mask,
2047 				dscp_table);
2048 
2049 		return status;
2050 	}
2051 
2052 	/* Allocate request */
2053 	req = pipeline_msg_alloc();
2054 	if (req == NULL)
2055 		return -1;
2056 
2057 	/* Write request */
2058 	req->type = PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE;
2059 	req->id = table_id;
2060 	req->table_dscp_table_update.dscp_mask = dscp_mask;
2061 	memcpy(&req->table_dscp_table_update.dscp_table,
2062 		dscp_table, sizeof(*dscp_table));
2063 
2064 	/* Send request and wait for response */
2065 	rsp = pipeline_msg_send_recv(p, req);
2066 	if (rsp == NULL)
2067 		return -1;
2068 
2069 	/* Read response */
2070 	status = rsp->status;
2071 
2072 	/* Free response */
2073 	pipeline_msg_free(rsp);
2074 
2075 	return status;
2076 }
2077 
2078 int
2079 pipeline_table_rule_ttl_read(const char *pipeline_name,
2080 	uint32_t table_id,
2081 	void *data,
2082 	struct rte_table_action_ttl_counters *stats,
2083 	int clear)
2084 {
2085 	struct pipeline *p;
2086 	struct pipeline_msg_req *req;
2087 	struct pipeline_msg_rsp *rsp;
2088 	int status;
2089 
2090 	/* Check input params */
2091 	if ((pipeline_name == NULL) ||
2092 		(data == NULL) ||
2093 		(stats == NULL))
2094 		return -1;
2095 
2096 	p = pipeline_find(pipeline_name);
2097 	if ((p == NULL) ||
2098 		(table_id >= p->n_tables))
2099 		return -1;
2100 
2101 	if (!pipeline_is_running(p)) {
2102 		struct rte_table_action *a = p->table[table_id].a;
2103 
2104 		status = rte_table_action_ttl_read(a,
2105 				data,
2106 				stats,
2107 				clear);
2108 
2109 		return status;
2110 	}
2111 
2112 	/* Allocate request */
2113 	req = pipeline_msg_alloc();
2114 	if (req == NULL)
2115 		return -1;
2116 
2117 	/* Write request */
2118 	req->type = PIPELINE_REQ_TABLE_RULE_TTL_READ;
2119 	req->id = table_id;
2120 	req->table_rule_ttl_read.data = data;
2121 	req->table_rule_ttl_read.clear = clear;
2122 
2123 	/* Send request and wait for response */
2124 	rsp = pipeline_msg_send_recv(p, req);
2125 	if (rsp == NULL)
2126 		return -1;
2127 
2128 	/* Read response */
2129 	status = rsp->status;
2130 	if (status)
2131 		memcpy(stats, &rsp->table_rule_ttl_read.stats, sizeof(*stats));
2132 
2133 	/* Free response */
2134 	pipeline_msg_free(rsp);
2135 
2136 	return status;
2137 }
2138 
2139 /**
2140  * Data plane threads: message handling
2141  */
2142 static inline struct pipeline_msg_req *
2143 pipeline_msg_recv(struct rte_ring *msgq_req)
2144 {
2145 	struct pipeline_msg_req *req;
2146 
2147 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
2148 
2149 	if (status != 0)
2150 		return NULL;
2151 
2152 	return req;
2153 }
2154 
2155 static inline void
2156 pipeline_msg_send(struct rte_ring *msgq_rsp,
2157 	struct pipeline_msg_rsp *rsp)
2158 {
2159 	int status;
2160 
2161 	do {
2162 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
2163 	} while (status == -ENOBUFS);
2164 }
2165 
2166 static struct pipeline_msg_rsp *
2167 pipeline_msg_handle_port_in_stats_read(struct pipeline_data *p,
2168 	struct pipeline_msg_req *req)
2169 {
2170 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2171 	uint32_t port_id = req->id;
2172 	int clear = req->port_in_stats_read.clear;
2173 
2174 	rsp->status = rte_pipeline_port_in_stats_read(p->p,
2175 		port_id,
2176 		&rsp->port_in_stats_read.stats,
2177 		clear);
2178 
2179 	return rsp;
2180 }
2181 
2182 static struct pipeline_msg_rsp *
2183 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
2184 	struct pipeline_msg_req *req)
2185 {
2186 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2187 	uint32_t port_id = req->id;
2188 
2189 	rsp->status = rte_pipeline_port_in_enable(p->p,
2190 		port_id);
2191 
2192 	return rsp;
2193 }
2194 
2195 static struct pipeline_msg_rsp *
2196 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
2197 	struct pipeline_msg_req *req)
2198 {
2199 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2200 	uint32_t port_id = req->id;
2201 
2202 	rsp->status = rte_pipeline_port_in_disable(p->p,
2203 		port_id);
2204 
2205 	return rsp;
2206 }
2207 
2208 static struct pipeline_msg_rsp *
2209 pipeline_msg_handle_port_out_stats_read(struct pipeline_data *p,
2210 	struct pipeline_msg_req *req)
2211 {
2212 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2213 	uint32_t port_id = req->id;
2214 	int clear = req->port_out_stats_read.clear;
2215 
2216 	rsp->status = rte_pipeline_port_out_stats_read(p->p,
2217 		port_id,
2218 		&rsp->port_out_stats_read.stats,
2219 		clear);
2220 
2221 	return rsp;
2222 }
2223 
2224 static struct pipeline_msg_rsp *
2225 pipeline_msg_handle_table_stats_read(struct pipeline_data *p,
2226 	struct pipeline_msg_req *req)
2227 {
2228 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2229 	uint32_t port_id = req->id;
2230 	int clear = req->table_stats_read.clear;
2231 
2232 	rsp->status = rte_pipeline_table_stats_read(p->p,
2233 		port_id,
2234 		&rsp->table_stats_read.stats,
2235 		clear);
2236 
2237 	return rsp;
2238 }
2239 
2240 static int
2241 match_convert_ipv6_depth(uint32_t depth, uint32_t *depth32)
2242 {
2243 	if (depth > 128)
2244 		return -1;
2245 
2246 	switch (depth / 32) {
2247 	case 0:
2248 		depth32[0] = depth;
2249 		depth32[1] = 0;
2250 		depth32[2] = 0;
2251 		depth32[3] = 0;
2252 		return 0;
2253 
2254 	case 1:
2255 		depth32[0] = 32;
2256 		depth32[1] = depth - 32;
2257 		depth32[2] = 0;
2258 		depth32[3] = 0;
2259 		return 0;
2260 
2261 	case 2:
2262 		depth32[0] = 32;
2263 		depth32[1] = 32;
2264 		depth32[2] = depth - 64;
2265 		depth32[3] = 0;
2266 		return 0;
2267 
2268 	case 3:
2269 		depth32[0] = 32;
2270 		depth32[1] = 32;
2271 		depth32[2] = 32;
2272 		depth32[3] = depth - 96;
2273 		return 0;
2274 
2275 	case 4:
2276 		depth32[0] = 32;
2277 		depth32[1] = 32;
2278 		depth32[2] = 32;
2279 		depth32[3] = 32;
2280 		return 0;
2281 
2282 	default:
2283 		return -1;
2284 	}
2285 }
2286 
2287 static int
2288 match_convert(struct table_rule_match *mh,
2289 	union table_rule_match_low_level *ml,
2290 	int add)
2291 {
2292 	memset(ml, 0, sizeof(*ml));
2293 
2294 	switch (mh->match_type) {
2295 	case TABLE_ACL:
2296 		if (mh->match.acl.ip_version)
2297 			if (add) {
2298 				ml->acl_add.field_value[0].value.u8 =
2299 					mh->match.acl.proto;
2300 				ml->acl_add.field_value[0].mask_range.u8 =
2301 					mh->match.acl.proto_mask;
2302 
2303 				ml->acl_add.field_value[1].value.u32 =
2304 					mh->match.acl.ipv4.sa;
2305 				ml->acl_add.field_value[1].mask_range.u32 =
2306 					mh->match.acl.sa_depth;
2307 
2308 				ml->acl_add.field_value[2].value.u32 =
2309 					mh->match.acl.ipv4.da;
2310 				ml->acl_add.field_value[2].mask_range.u32 =
2311 					mh->match.acl.da_depth;
2312 
2313 				ml->acl_add.field_value[3].value.u16 =
2314 					mh->match.acl.sp0;
2315 				ml->acl_add.field_value[3].mask_range.u16 =
2316 					mh->match.acl.sp1;
2317 
2318 				ml->acl_add.field_value[4].value.u16 =
2319 					mh->match.acl.dp0;
2320 				ml->acl_add.field_value[4].mask_range.u16 =
2321 					mh->match.acl.dp1;
2322 
2323 				ml->acl_add.priority =
2324 					(int32_t) mh->match.acl.priority;
2325 			} else {
2326 				ml->acl_delete.field_value[0].value.u8 =
2327 					mh->match.acl.proto;
2328 				ml->acl_delete.field_value[0].mask_range.u8 =
2329 					mh->match.acl.proto_mask;
2330 
2331 				ml->acl_delete.field_value[1].value.u32 =
2332 					mh->match.acl.ipv4.sa;
2333 				ml->acl_delete.field_value[1].mask_range.u32 =
2334 					mh->match.acl.sa_depth;
2335 
2336 				ml->acl_delete.field_value[2].value.u32 =
2337 					mh->match.acl.ipv4.da;
2338 				ml->acl_delete.field_value[2].mask_range.u32 =
2339 					mh->match.acl.da_depth;
2340 
2341 				ml->acl_delete.field_value[3].value.u16 =
2342 					mh->match.acl.sp0;
2343 				ml->acl_delete.field_value[3].mask_range.u16 =
2344 					mh->match.acl.sp1;
2345 
2346 				ml->acl_delete.field_value[4].value.u16 =
2347 					mh->match.acl.dp0;
2348 				ml->acl_delete.field_value[4].mask_range.u16 =
2349 					mh->match.acl.dp1;
2350 			}
2351 		else
2352 			if (add) {
2353 				uint32_t *sa32 =
2354 					(uint32_t *) mh->match.acl.ipv6.sa;
2355 				uint32_t *da32 =
2356 					(uint32_t *) mh->match.acl.ipv6.da;
2357 				uint32_t sa32_depth[4], da32_depth[4];
2358 				int status;
2359 
2360 				status = match_convert_ipv6_depth(
2361 					mh->match.acl.sa_depth,
2362 					sa32_depth);
2363 				if (status)
2364 					return status;
2365 
2366 				status = match_convert_ipv6_depth(
2367 					mh->match.acl.da_depth,
2368 					da32_depth);
2369 				if (status)
2370 					return status;
2371 
2372 				ml->acl_add.field_value[0].value.u8 =
2373 					mh->match.acl.proto;
2374 				ml->acl_add.field_value[0].mask_range.u8 =
2375 					mh->match.acl.proto_mask;
2376 
2377 				ml->acl_add.field_value[1].value.u32 =
2378 					rte_be_to_cpu_32(sa32[0]);
2379 				ml->acl_add.field_value[1].mask_range.u32 =
2380 					sa32_depth[0];
2381 				ml->acl_add.field_value[2].value.u32 =
2382 					rte_be_to_cpu_32(sa32[1]);
2383 				ml->acl_add.field_value[2].mask_range.u32 =
2384 					sa32_depth[1];
2385 				ml->acl_add.field_value[3].value.u32 =
2386 					rte_be_to_cpu_32(sa32[2]);
2387 				ml->acl_add.field_value[3].mask_range.u32 =
2388 					sa32_depth[2];
2389 				ml->acl_add.field_value[4].value.u32 =
2390 					rte_be_to_cpu_32(sa32[3]);
2391 				ml->acl_add.field_value[4].mask_range.u32 =
2392 					sa32_depth[3];
2393 
2394 				ml->acl_add.field_value[5].value.u32 =
2395 					rte_be_to_cpu_32(da32[0]);
2396 				ml->acl_add.field_value[5].mask_range.u32 =
2397 					da32_depth[0];
2398 				ml->acl_add.field_value[6].value.u32 =
2399 					rte_be_to_cpu_32(da32[1]);
2400 				ml->acl_add.field_value[6].mask_range.u32 =
2401 					da32_depth[1];
2402 				ml->acl_add.field_value[7].value.u32 =
2403 					rte_be_to_cpu_32(da32[2]);
2404 				ml->acl_add.field_value[7].mask_range.u32 =
2405 					da32_depth[2];
2406 				ml->acl_add.field_value[8].value.u32 =
2407 					rte_be_to_cpu_32(da32[3]);
2408 				ml->acl_add.field_value[8].mask_range.u32 =
2409 					da32_depth[3];
2410 
2411 				ml->acl_add.field_value[9].value.u16 =
2412 					mh->match.acl.sp0;
2413 				ml->acl_add.field_value[9].mask_range.u16 =
2414 					mh->match.acl.sp1;
2415 
2416 				ml->acl_add.field_value[10].value.u16 =
2417 					mh->match.acl.dp0;
2418 				ml->acl_add.field_value[10].mask_range.u16 =
2419 					mh->match.acl.dp1;
2420 
2421 				ml->acl_add.priority =
2422 					(int32_t) mh->match.acl.priority;
2423 			} else {
2424 				uint32_t *sa32 =
2425 					(uint32_t *) mh->match.acl.ipv6.sa;
2426 				uint32_t *da32 =
2427 					(uint32_t *) mh->match.acl.ipv6.da;
2428 				uint32_t sa32_depth[4], da32_depth[4];
2429 				int status;
2430 
2431 				status = match_convert_ipv6_depth(
2432 					mh->match.acl.sa_depth,
2433 					sa32_depth);
2434 				if (status)
2435 					return status;
2436 
2437 				status = match_convert_ipv6_depth(
2438 					mh->match.acl.da_depth,
2439 					da32_depth);
2440 				if (status)
2441 					return status;
2442 
2443 				ml->acl_delete.field_value[0].value.u8 =
2444 					mh->match.acl.proto;
2445 				ml->acl_delete.field_value[0].mask_range.u8 =
2446 					mh->match.acl.proto_mask;
2447 
2448 				ml->acl_delete.field_value[1].value.u32 =
2449 					rte_be_to_cpu_32(sa32[0]);
2450 				ml->acl_delete.field_value[1].mask_range.u32 =
2451 					sa32_depth[0];
2452 				ml->acl_delete.field_value[2].value.u32 =
2453 					rte_be_to_cpu_32(sa32[1]);
2454 				ml->acl_delete.field_value[2].mask_range.u32 =
2455 					sa32_depth[1];
2456 				ml->acl_delete.field_value[3].value.u32 =
2457 					rte_be_to_cpu_32(sa32[2]);
2458 				ml->acl_delete.field_value[3].mask_range.u32 =
2459 					sa32_depth[2];
2460 				ml->acl_delete.field_value[4].value.u32 =
2461 					rte_be_to_cpu_32(sa32[3]);
2462 				ml->acl_delete.field_value[4].mask_range.u32 =
2463 					sa32_depth[3];
2464 
2465 				ml->acl_delete.field_value[5].value.u32 =
2466 					rte_be_to_cpu_32(da32[0]);
2467 				ml->acl_delete.field_value[5].mask_range.u32 =
2468 					da32_depth[0];
2469 				ml->acl_delete.field_value[6].value.u32 =
2470 					rte_be_to_cpu_32(da32[1]);
2471 				ml->acl_delete.field_value[6].mask_range.u32 =
2472 					da32_depth[1];
2473 				ml->acl_delete.field_value[7].value.u32 =
2474 					rte_be_to_cpu_32(da32[2]);
2475 				ml->acl_delete.field_value[7].mask_range.u32 =
2476 					da32_depth[2];
2477 				ml->acl_delete.field_value[8].value.u32 =
2478 					rte_be_to_cpu_32(da32[3]);
2479 				ml->acl_delete.field_value[8].mask_range.u32 =
2480 					da32_depth[3];
2481 
2482 				ml->acl_delete.field_value[9].value.u16 =
2483 					mh->match.acl.sp0;
2484 				ml->acl_delete.field_value[9].mask_range.u16 =
2485 					mh->match.acl.sp1;
2486 
2487 				ml->acl_delete.field_value[10].value.u16 =
2488 					mh->match.acl.dp0;
2489 				ml->acl_delete.field_value[10].mask_range.u16 =
2490 					mh->match.acl.dp1;
2491 			}
2492 		return 0;
2493 
2494 	case TABLE_ARRAY:
2495 		ml->array.pos = mh->match.array.pos;
2496 		return 0;
2497 
2498 	case TABLE_HASH:
2499 		memcpy(ml->hash, mh->match.hash.key, sizeof(ml->hash));
2500 		return 0;
2501 
2502 	case TABLE_LPM:
2503 		if (mh->match.lpm.ip_version) {
2504 			ml->lpm_ipv4.ip = mh->match.lpm.ipv4;
2505 			ml->lpm_ipv4.depth = mh->match.lpm.depth;
2506 		} else {
2507 			memcpy(ml->lpm_ipv6.ip,
2508 				mh->match.lpm.ipv6, sizeof(ml->lpm_ipv6.ip));
2509 			ml->lpm_ipv6.depth = mh->match.lpm.depth;
2510 		}
2511 
2512 		return 0;
2513 
2514 	default:
2515 		return -1;
2516 	}
2517 }
2518 
2519 static int
2520 action_convert(struct rte_table_action *a,
2521 	struct table_rule_action *action,
2522 	struct rte_pipeline_table_entry *data)
2523 {
2524 	int status;
2525 
2526 	/* Apply actions */
2527 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
2528 		status = rte_table_action_apply(a,
2529 			data,
2530 			RTE_TABLE_ACTION_FWD,
2531 			&action->fwd);
2532 
2533 		if (status)
2534 			return status;
2535 	}
2536 
2537 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_LB)) {
2538 		status = rte_table_action_apply(a,
2539 			data,
2540 			RTE_TABLE_ACTION_LB,
2541 			&action->lb);
2542 
2543 		if (status)
2544 			return status;
2545 	}
2546 
2547 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
2548 		status = rte_table_action_apply(a,
2549 			data,
2550 			RTE_TABLE_ACTION_MTR,
2551 			&action->mtr);
2552 
2553 		if (status)
2554 			return status;
2555 	}
2556 
2557 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
2558 		status = rte_table_action_apply(a,
2559 			data,
2560 			RTE_TABLE_ACTION_TM,
2561 			&action->tm);
2562 
2563 		if (status)
2564 			return status;
2565 	}
2566 
2567 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
2568 		status = rte_table_action_apply(a,
2569 			data,
2570 			RTE_TABLE_ACTION_ENCAP,
2571 			&action->encap);
2572 
2573 		if (status)
2574 			return status;
2575 	}
2576 
2577 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
2578 		status = rte_table_action_apply(a,
2579 			data,
2580 			RTE_TABLE_ACTION_NAT,
2581 			&action->nat);
2582 
2583 		if (status)
2584 			return status;
2585 	}
2586 
2587 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TTL)) {
2588 		status = rte_table_action_apply(a,
2589 			data,
2590 			RTE_TABLE_ACTION_TTL,
2591 			&action->ttl);
2592 
2593 		if (status)
2594 			return status;
2595 	}
2596 
2597 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_STATS)) {
2598 		status = rte_table_action_apply(a,
2599 			data,
2600 			RTE_TABLE_ACTION_STATS,
2601 			&action->stats);
2602 
2603 		if (status)
2604 			return status;
2605 	}
2606 
2607 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TIME)) {
2608 		status = rte_table_action_apply(a,
2609 			data,
2610 			RTE_TABLE_ACTION_TIME,
2611 			&action->time);
2612 
2613 		if (status)
2614 			return status;
2615 	}
2616 
2617 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_SYM_CRYPTO)) {
2618 		status = rte_table_action_apply(a,
2619 			data,
2620 			RTE_TABLE_ACTION_SYM_CRYPTO,
2621 			&action->sym_crypto);
2622 
2623 		if (status)
2624 			return status;
2625 	}
2626 
2627 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TAG)) {
2628 		status = rte_table_action_apply(a,
2629 			data,
2630 			RTE_TABLE_ACTION_TAG,
2631 			&action->tag);
2632 
2633 		if (status)
2634 			return status;
2635 	}
2636 
2637 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_DECAP)) {
2638 		status = rte_table_action_apply(a,
2639 			data,
2640 			RTE_TABLE_ACTION_DECAP,
2641 			&action->decap);
2642 
2643 		if (status)
2644 			return status;
2645 	}
2646 
2647 	return 0;
2648 }
2649 
2650 static struct pipeline_msg_rsp *
2651 pipeline_msg_handle_table_rule_add(struct pipeline_data *p,
2652 	struct pipeline_msg_req *req)
2653 {
2654 	union table_rule_match_low_level match_ll;
2655 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2656 	struct table_rule_match *match = &req->table_rule_add.match;
2657 	struct table_rule_action *action = &req->table_rule_add.action;
2658 	struct rte_pipeline_table_entry *data_in, *data_out;
2659 	uint32_t table_id = req->id;
2660 	int key_found, status;
2661 	struct rte_table_action *a = p->table_data[table_id].a;
2662 
2663 	/* Apply actions */
2664 	memset(p->buffer, 0, sizeof(p->buffer));
2665 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2666 
2667 	status = match_convert(match, &match_ll, 1);
2668 	if (status) {
2669 		rsp->status = -1;
2670 		return rsp;
2671 	}
2672 
2673 	status = action_convert(a, action, data_in);
2674 	if (status) {
2675 		rsp->status = -1;
2676 		return rsp;
2677 	}
2678 
2679 	status = rte_pipeline_table_entry_add(p->p,
2680 		table_id,
2681 		&match_ll,
2682 		data_in,
2683 		&key_found,
2684 		&data_out);
2685 	if (status) {
2686 		rsp->status = -1;
2687 		return rsp;
2688 	}
2689 
2690 	/* Write response */
2691 	rsp->status = 0;
2692 	rsp->table_rule_add.data = data_out;
2693 
2694 	return rsp;
2695 }
2696 
2697 static struct pipeline_msg_rsp *
2698 pipeline_msg_handle_table_rule_add_default(struct pipeline_data *p,
2699 	struct pipeline_msg_req *req)
2700 {
2701 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2702 	struct table_rule_action *action = &req->table_rule_add_default.action;
2703 	struct rte_pipeline_table_entry *data_in, *data_out;
2704 	uint32_t table_id = req->id;
2705 	int status;
2706 
2707 	/* Apply actions */
2708 	memset(p->buffer, 0, sizeof(p->buffer));
2709 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2710 
2711 	data_in->action = action->fwd.action;
2712 	if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
2713 		data_in->port_id = action->fwd.id;
2714 	if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
2715 		data_in->table_id = action->fwd.id;
2716 
2717 	/* Add default rule to table */
2718 	status = rte_pipeline_table_default_entry_add(p->p,
2719 		table_id,
2720 		data_in,
2721 		&data_out);
2722 	if (status) {
2723 		rsp->status = -1;
2724 		return rsp;
2725 	}
2726 
2727 	/* Write response */
2728 	rsp->status = 0;
2729 	rsp->table_rule_add_default.data = data_out;
2730 
2731 	return rsp;
2732 }
2733 
2734 static struct pipeline_msg_rsp *
2735 pipeline_msg_handle_table_rule_add_bulk(struct pipeline_data *p,
2736 	struct pipeline_msg_req *req)
2737 {
2738 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2739 
2740 	uint32_t table_id = req->id;
2741 	struct table_rule_list *list = req->table_rule_add_bulk.list;
2742 	uint32_t bulk = req->table_rule_add_bulk.bulk;
2743 
2744 	uint32_t n_rules_added;
2745 	int status;
2746 
2747 	struct table_ll table_ll = {
2748 		.p = p->p,
2749 		.table_id = table_id,
2750 		.a = p->table_data[table_id].a,
2751 		.bulk_supported = bulk,
2752 	};
2753 
2754 	status = table_rule_add_bulk_ll(&table_ll, list, &n_rules_added);
2755 	if (status) {
2756 		rsp->status = -1;
2757 		rsp->table_rule_add_bulk.n_rules = 0;
2758 		return rsp;
2759 	}
2760 
2761 	/* Write response */
2762 	rsp->status = 0;
2763 	rsp->table_rule_add_bulk.n_rules = n_rules_added;
2764 	return rsp;
2765 }
2766 
2767 static struct pipeline_msg_rsp *
2768 pipeline_msg_handle_table_rule_delete(struct pipeline_data *p,
2769 	struct pipeline_msg_req *req)
2770 {
2771 	union table_rule_match_low_level match_ll;
2772 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2773 	struct table_rule_match *match = &req->table_rule_delete.match;
2774 	uint32_t table_id = req->id;
2775 	int key_found, status;
2776 
2777 	status = match_convert(match, &match_ll, 0);
2778 	if (status) {
2779 		rsp->status = -1;
2780 		return rsp;
2781 	}
2782 
2783 	rsp->status = rte_pipeline_table_entry_delete(p->p,
2784 		table_id,
2785 		&match_ll,
2786 		&key_found,
2787 		NULL);
2788 
2789 	return rsp;
2790 }
2791 
2792 static struct pipeline_msg_rsp *
2793 pipeline_msg_handle_table_rule_delete_default(struct pipeline_data *p,
2794 	struct pipeline_msg_req *req)
2795 {
2796 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2797 	uint32_t table_id = req->id;
2798 
2799 	rsp->status = rte_pipeline_table_default_entry_delete(p->p,
2800 		table_id,
2801 		NULL);
2802 
2803 	return rsp;
2804 }
2805 
2806 static struct pipeline_msg_rsp *
2807 pipeline_msg_handle_table_rule_stats_read(struct pipeline_data *p,
2808 	struct pipeline_msg_req *req)
2809 {
2810 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2811 	uint32_t table_id = req->id;
2812 	void *data = req->table_rule_stats_read.data;
2813 	int clear = req->table_rule_stats_read.clear;
2814 	struct rte_table_action *a = p->table_data[table_id].a;
2815 
2816 	rsp->status = rte_table_action_stats_read(a,
2817 		data,
2818 		&rsp->table_rule_stats_read.stats,
2819 		clear);
2820 
2821 	return rsp;
2822 }
2823 
2824 static struct pipeline_msg_rsp *
2825 pipeline_msg_handle_table_mtr_profile_add(struct pipeline_data *p,
2826 	struct pipeline_msg_req *req)
2827 {
2828 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2829 	uint32_t table_id = req->id;
2830 	uint32_t meter_profile_id = req->table_mtr_profile_add.meter_profile_id;
2831 	struct rte_table_action_meter_profile *profile =
2832 		&req->table_mtr_profile_add.profile;
2833 	struct rte_table_action *a = p->table_data[table_id].a;
2834 
2835 	rsp->status = rte_table_action_meter_profile_add(a,
2836 		meter_profile_id,
2837 		profile);
2838 
2839 	return rsp;
2840 }
2841 
2842 static struct pipeline_msg_rsp *
2843 pipeline_msg_handle_table_mtr_profile_delete(struct pipeline_data *p,
2844 	struct pipeline_msg_req *req)
2845 {
2846 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2847 	uint32_t table_id = req->id;
2848 	uint32_t meter_profile_id =
2849 		req->table_mtr_profile_delete.meter_profile_id;
2850 	struct rte_table_action *a = p->table_data[table_id].a;
2851 
2852 	rsp->status = rte_table_action_meter_profile_delete(a,
2853 		meter_profile_id);
2854 
2855 	return rsp;
2856 }
2857 
2858 static struct pipeline_msg_rsp *
2859 pipeline_msg_handle_table_rule_mtr_read(struct pipeline_data *p,
2860 	struct pipeline_msg_req *req)
2861 {
2862 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2863 	uint32_t table_id = req->id;
2864 	void *data = req->table_rule_mtr_read.data;
2865 	uint32_t tc_mask = req->table_rule_mtr_read.tc_mask;
2866 	int clear = req->table_rule_mtr_read.clear;
2867 	struct rte_table_action *a = p->table_data[table_id].a;
2868 
2869 	rsp->status = rte_table_action_meter_read(a,
2870 		data,
2871 		tc_mask,
2872 		&rsp->table_rule_mtr_read.stats,
2873 		clear);
2874 
2875 	return rsp;
2876 }
2877 
2878 static struct pipeline_msg_rsp *
2879 pipeline_msg_handle_table_dscp_table_update(struct pipeline_data *p,
2880 	struct pipeline_msg_req *req)
2881 {
2882 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2883 	uint32_t table_id = req->id;
2884 	uint64_t dscp_mask = req->table_dscp_table_update.dscp_mask;
2885 	struct rte_table_action_dscp_table *dscp_table =
2886 		&req->table_dscp_table_update.dscp_table;
2887 	struct rte_table_action *a = p->table_data[table_id].a;
2888 
2889 	rsp->status = rte_table_action_dscp_table_update(a,
2890 		dscp_mask,
2891 		dscp_table);
2892 
2893 	return rsp;
2894 }
2895 
2896 static struct pipeline_msg_rsp *
2897 pipeline_msg_handle_table_rule_ttl_read(struct pipeline_data *p,
2898 	struct pipeline_msg_req *req)
2899 {
2900 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2901 	uint32_t table_id = req->id;
2902 	void *data = req->table_rule_ttl_read.data;
2903 	int clear = req->table_rule_ttl_read.clear;
2904 	struct rte_table_action *a = p->table_data[table_id].a;
2905 
2906 	rsp->status = rte_table_action_ttl_read(a,
2907 		data,
2908 		&rsp->table_rule_ttl_read.stats,
2909 		clear);
2910 
2911 	return rsp;
2912 }
2913 
2914 static void
2915 pipeline_msg_handle(struct pipeline_data *p)
2916 {
2917 	for ( ; ; ) {
2918 		struct pipeline_msg_req *req;
2919 		struct pipeline_msg_rsp *rsp;
2920 
2921 		req = pipeline_msg_recv(p->msgq_req);
2922 		if (req == NULL)
2923 			break;
2924 
2925 		switch (req->type) {
2926 		case PIPELINE_REQ_PORT_IN_STATS_READ:
2927 			rsp = pipeline_msg_handle_port_in_stats_read(p, req);
2928 			break;
2929 
2930 		case PIPELINE_REQ_PORT_IN_ENABLE:
2931 			rsp = pipeline_msg_handle_port_in_enable(p, req);
2932 			break;
2933 
2934 		case PIPELINE_REQ_PORT_IN_DISABLE:
2935 			rsp = pipeline_msg_handle_port_in_disable(p, req);
2936 			break;
2937 
2938 		case PIPELINE_REQ_PORT_OUT_STATS_READ:
2939 			rsp = pipeline_msg_handle_port_out_stats_read(p, req);
2940 			break;
2941 
2942 		case PIPELINE_REQ_TABLE_STATS_READ:
2943 			rsp = pipeline_msg_handle_table_stats_read(p, req);
2944 			break;
2945 
2946 		case PIPELINE_REQ_TABLE_RULE_ADD:
2947 			rsp = pipeline_msg_handle_table_rule_add(p, req);
2948 			break;
2949 
2950 		case PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT:
2951 			rsp = pipeline_msg_handle_table_rule_add_default(p,	req);
2952 			break;
2953 
2954 		case PIPELINE_REQ_TABLE_RULE_ADD_BULK:
2955 			rsp = pipeline_msg_handle_table_rule_add_bulk(p, req);
2956 			break;
2957 
2958 		case PIPELINE_REQ_TABLE_RULE_DELETE:
2959 			rsp = pipeline_msg_handle_table_rule_delete(p, req);
2960 			break;
2961 
2962 		case PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT:
2963 			rsp = pipeline_msg_handle_table_rule_delete_default(p, req);
2964 			break;
2965 
2966 		case PIPELINE_REQ_TABLE_RULE_STATS_READ:
2967 			rsp = pipeline_msg_handle_table_rule_stats_read(p, req);
2968 			break;
2969 
2970 		case PIPELINE_REQ_TABLE_MTR_PROFILE_ADD:
2971 			rsp = pipeline_msg_handle_table_mtr_profile_add(p, req);
2972 			break;
2973 
2974 		case PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE:
2975 			rsp = pipeline_msg_handle_table_mtr_profile_delete(p, req);
2976 			break;
2977 
2978 		case PIPELINE_REQ_TABLE_RULE_MTR_READ:
2979 			rsp = pipeline_msg_handle_table_rule_mtr_read(p, req);
2980 			break;
2981 
2982 		case PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE:
2983 			rsp = pipeline_msg_handle_table_dscp_table_update(p, req);
2984 			break;
2985 
2986 		case PIPELINE_REQ_TABLE_RULE_TTL_READ:
2987 			rsp = pipeline_msg_handle_table_rule_ttl_read(p, req);
2988 			break;
2989 
2990 		default:
2991 			rsp = (struct pipeline_msg_rsp *) req;
2992 			rsp->status = -1;
2993 		}
2994 
2995 		pipeline_msg_send(p->msgq_rsp, rsp);
2996 	}
2997 }
2998 
2999 /**
3000  * Data plane threads: main
3001  */
3002 int
3003 thread_main(void *arg __rte_unused)
3004 {
3005 	struct thread_data *t;
3006 	uint32_t thread_id, i;
3007 
3008 	thread_id = rte_lcore_id();
3009 	t = &thread_data[thread_id];
3010 
3011 	/* Dispatch loop */
3012 	for (i = 0; ; i++) {
3013 		uint32_t j;
3014 
3015 		/* Data Plane */
3016 		for (j = 0; j < t->n_pipelines; j++)
3017 			rte_pipeline_run(t->p[j]);
3018 
3019 		/* Control Plane */
3020 		if ((i & 0xF) == 0) {
3021 			uint64_t time = rte_get_tsc_cycles();
3022 			uint64_t time_next_min = UINT64_MAX;
3023 
3024 			if (time < t->time_next_min)
3025 				continue;
3026 
3027 			/* Pipeline message queues */
3028 			for (j = 0; j < t->n_pipelines; j++) {
3029 				struct pipeline_data *p =
3030 					&t->pipeline_data[j];
3031 				uint64_t time_next = p->time_next;
3032 
3033 				if (time_next <= time) {
3034 					pipeline_msg_handle(p);
3035 					rte_pipeline_flush(p->p);
3036 					time_next = time + p->timer_period;
3037 					p->time_next = time_next;
3038 				}
3039 
3040 				if (time_next < time_next_min)
3041 					time_next_min = time_next;
3042 			}
3043 
3044 			/* Thread message queues */
3045 			{
3046 				uint64_t time_next = t->time_next;
3047 
3048 				if (time_next <= time) {
3049 					thread_msg_handle(t);
3050 					time_next = time + t->timer_period;
3051 					t->time_next = time_next;
3052 				}
3053 
3054 				if (time_next < time_next_min)
3055 					time_next_min = time_next;
3056 			}
3057 
3058 			t->time_next_min = time_next_min;
3059 		}
3060 	}
3061 
3062 	return 0;
3063 }
3064