1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2020 Intel Corporation
3 */
4
5 #include <stdlib.h>
6
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17
18 #include "obj.h"
19 #include "thread.h"
20
21 #ifndef THREAD_PIPELINES_MAX
22 #define THREAD_PIPELINES_MAX 256
23 #endif
24
25 #ifndef THREAD_MSGQ_SIZE
26 #define THREAD_MSGQ_SIZE 64
27 #endif
28
29 #ifndef THREAD_TIMER_PERIOD_MS
30 #define THREAD_TIMER_PERIOD_MS 100
31 #endif
32
33 /**
34 * Control thread: data plane thread context
35 */
36 struct thread {
37 struct rte_ring *msgq_req;
38 struct rte_ring *msgq_rsp;
39
40 uint32_t enabled;
41 };
42
43 static struct thread thread[RTE_MAX_LCORE];
44
45 /**
46 * Data plane threads: context
47 */
48 struct pipeline_data {
49 struct rte_swx_pipeline *p;
50 uint64_t timer_period; /* Measured in CPU cycles. */
51 uint64_t time_next;
52 };
53
54 struct thread_data {
55 struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
56 uint32_t n_pipelines;
57
58 struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
59 struct rte_ring *msgq_req;
60 struct rte_ring *msgq_rsp;
61 uint64_t timer_period; /* Measured in CPU cycles. */
62 uint64_t time_next;
63 uint64_t time_next_min;
64 } __rte_cache_aligned;
65
66 static struct thread_data thread_data[RTE_MAX_LCORE];
67
68 /**
69 * Control thread: data plane thread init
70 */
71 static void
thread_free(void)72 thread_free(void)
73 {
74 uint32_t i;
75
76 for (i = 0; i < RTE_MAX_LCORE; i++) {
77 struct thread *t = &thread[i];
78
79 if (!rte_lcore_is_enabled(i))
80 continue;
81
82 /* MSGQs */
83 if (t->msgq_req)
84 rte_ring_free(t->msgq_req);
85
86 if (t->msgq_rsp)
87 rte_ring_free(t->msgq_rsp);
88 }
89 }
90
91 int
thread_init(void)92 thread_init(void)
93 {
94 uint32_t i;
95
96 RTE_LCORE_FOREACH_WORKER(i) {
97 char name[NAME_MAX];
98 struct rte_ring *msgq_req, *msgq_rsp;
99 struct thread *t = &thread[i];
100 struct thread_data *t_data = &thread_data[i];
101 uint32_t cpu_id = rte_lcore_to_socket_id(i);
102
103 /* MSGQs */
104 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
105
106 msgq_req = rte_ring_create(name,
107 THREAD_MSGQ_SIZE,
108 cpu_id,
109 RING_F_SP_ENQ | RING_F_SC_DEQ);
110
111 if (msgq_req == NULL) {
112 thread_free();
113 return -1;
114 }
115
116 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
117
118 msgq_rsp = rte_ring_create(name,
119 THREAD_MSGQ_SIZE,
120 cpu_id,
121 RING_F_SP_ENQ | RING_F_SC_DEQ);
122
123 if (msgq_rsp == NULL) {
124 thread_free();
125 return -1;
126 }
127
128 /* Control thread records */
129 t->msgq_req = msgq_req;
130 t->msgq_rsp = msgq_rsp;
131 t->enabled = 1;
132
133 /* Data plane thread records */
134 t_data->n_pipelines = 0;
135 t_data->msgq_req = msgq_req;
136 t_data->msgq_rsp = msgq_rsp;
137 t_data->timer_period =
138 (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
139 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
140 t_data->time_next_min = t_data->time_next;
141 }
142
143 return 0;
144 }
145
146 static inline int
thread_is_running(uint32_t thread_id)147 thread_is_running(uint32_t thread_id)
148 {
149 enum rte_lcore_state_t thread_state;
150
151 thread_state = rte_eal_get_lcore_state(thread_id);
152 return (thread_state == RUNNING) ? 1 : 0;
153 }
154
155 /**
156 * Control thread & data plane threads: message passing
157 */
158 enum thread_req_type {
159 THREAD_REQ_PIPELINE_ENABLE = 0,
160 THREAD_REQ_PIPELINE_DISABLE,
161 THREAD_REQ_MAX
162 };
163
164 struct thread_msg_req {
165 enum thread_req_type type;
166
167 union {
168 struct {
169 struct rte_swx_pipeline *p;
170 uint32_t timer_period_ms;
171 } pipeline_enable;
172
173 struct {
174 struct rte_swx_pipeline *p;
175 } pipeline_disable;
176 };
177 };
178
179 struct thread_msg_rsp {
180 int status;
181 };
182
183 /**
184 * Control thread
185 */
186 static struct thread_msg_req *
thread_msg_alloc(void)187 thread_msg_alloc(void)
188 {
189 size_t size = RTE_MAX(sizeof(struct thread_msg_req),
190 sizeof(struct thread_msg_rsp));
191
192 return calloc(1, size);
193 }
194
195 static void
thread_msg_free(struct thread_msg_rsp * rsp)196 thread_msg_free(struct thread_msg_rsp *rsp)
197 {
198 free(rsp);
199 }
200
201 static struct thread_msg_rsp *
thread_msg_send_recv(uint32_t thread_id,struct thread_msg_req * req)202 thread_msg_send_recv(uint32_t thread_id,
203 struct thread_msg_req *req)
204 {
205 struct thread *t = &thread[thread_id];
206 struct rte_ring *msgq_req = t->msgq_req;
207 struct rte_ring *msgq_rsp = t->msgq_rsp;
208 struct thread_msg_rsp *rsp;
209 int status;
210
211 /* send */
212 do {
213 status = rte_ring_sp_enqueue(msgq_req, req);
214 } while (status == -ENOBUFS);
215
216 /* recv */
217 do {
218 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
219 } while (status != 0);
220
221 return rsp;
222 }
223
224 int
thread_pipeline_enable(uint32_t thread_id,struct obj * obj,const char * pipeline_name)225 thread_pipeline_enable(uint32_t thread_id,
226 struct obj *obj,
227 const char *pipeline_name)
228 {
229 struct pipeline *p = pipeline_find(obj, pipeline_name);
230 struct thread *t;
231 struct thread_msg_req *req;
232 struct thread_msg_rsp *rsp;
233 int status;
234
235 /* Check input params */
236 if ((thread_id >= RTE_MAX_LCORE) ||
237 (p == NULL))
238 return -1;
239
240 t = &thread[thread_id];
241 if (t->enabled == 0)
242 return -1;
243
244 if (!thread_is_running(thread_id)) {
245 struct thread_data *td = &thread_data[thread_id];
246 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
247
248 if (td->n_pipelines >= THREAD_PIPELINES_MAX)
249 return -1;
250
251 /* Data plane thread */
252 td->p[td->n_pipelines] = p->p;
253
254 tdp->p = p->p;
255 tdp->timer_period =
256 (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
257 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
258
259 td->n_pipelines++;
260
261 /* Pipeline */
262 p->thread_id = thread_id;
263 p->enabled = 1;
264
265 return 0;
266 }
267
268 /* Allocate request */
269 req = thread_msg_alloc();
270 if (req == NULL)
271 return -1;
272
273 /* Write request */
274 req->type = THREAD_REQ_PIPELINE_ENABLE;
275 req->pipeline_enable.p = p->p;
276 req->pipeline_enable.timer_period_ms = p->timer_period_ms;
277
278 /* Send request and wait for response */
279 rsp = thread_msg_send_recv(thread_id, req);
280
281 /* Read response */
282 status = rsp->status;
283
284 /* Free response */
285 thread_msg_free(rsp);
286
287 /* Request completion */
288 if (status)
289 return status;
290
291 p->thread_id = thread_id;
292 p->enabled = 1;
293
294 return 0;
295 }
296
297 int
thread_pipeline_disable(uint32_t thread_id,struct obj * obj,const char * pipeline_name)298 thread_pipeline_disable(uint32_t thread_id,
299 struct obj *obj,
300 const char *pipeline_name)
301 {
302 struct pipeline *p = pipeline_find(obj, pipeline_name);
303 struct thread *t;
304 struct thread_msg_req *req;
305 struct thread_msg_rsp *rsp;
306 int status;
307
308 /* Check input params */
309 if ((thread_id >= RTE_MAX_LCORE) ||
310 (p == NULL))
311 return -1;
312
313 t = &thread[thread_id];
314 if (t->enabled == 0)
315 return -1;
316
317 if (p->enabled == 0)
318 return 0;
319
320 if (p->thread_id != thread_id)
321 return -1;
322
323 if (!thread_is_running(thread_id)) {
324 struct thread_data *td = &thread_data[thread_id];
325 uint32_t i;
326
327 for (i = 0; i < td->n_pipelines; i++) {
328 struct pipeline_data *tdp = &td->pipeline_data[i];
329
330 if (tdp->p != p->p)
331 continue;
332
333 /* Data plane thread */
334 if (i < td->n_pipelines - 1) {
335 struct rte_swx_pipeline *pipeline_last =
336 td->p[td->n_pipelines - 1];
337 struct pipeline_data *tdp_last =
338 &td->pipeline_data[td->n_pipelines - 1];
339
340 td->p[i] = pipeline_last;
341 memcpy(tdp, tdp_last, sizeof(*tdp));
342 }
343
344 td->n_pipelines--;
345
346 /* Pipeline */
347 p->enabled = 0;
348
349 break;
350 }
351
352 return 0;
353 }
354
355 /* Allocate request */
356 req = thread_msg_alloc();
357 if (req == NULL)
358 return -1;
359
360 /* Write request */
361 req->type = THREAD_REQ_PIPELINE_DISABLE;
362 req->pipeline_disable.p = p->p;
363
364 /* Send request and wait for response */
365 rsp = thread_msg_send_recv(thread_id, req);
366
367 /* Read response */
368 status = rsp->status;
369
370 /* Free response */
371 thread_msg_free(rsp);
372
373 /* Request completion */
374 if (status)
375 return status;
376
377 p->enabled = 0;
378
379 return 0;
380 }
381
382 /**
383 * Data plane threads: message handling
384 */
385 static inline struct thread_msg_req *
thread_msg_recv(struct rte_ring * msgq_req)386 thread_msg_recv(struct rte_ring *msgq_req)
387 {
388 struct thread_msg_req *req;
389
390 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
391
392 if (status != 0)
393 return NULL;
394
395 return req;
396 }
397
398 static inline void
thread_msg_send(struct rte_ring * msgq_rsp,struct thread_msg_rsp * rsp)399 thread_msg_send(struct rte_ring *msgq_rsp,
400 struct thread_msg_rsp *rsp)
401 {
402 int status;
403
404 do {
405 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
406 } while (status == -ENOBUFS);
407 }
408
409 static struct thread_msg_rsp *
thread_msg_handle_pipeline_enable(struct thread_data * t,struct thread_msg_req * req)410 thread_msg_handle_pipeline_enable(struct thread_data *t,
411 struct thread_msg_req *req)
412 {
413 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
414 struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
415
416 /* Request */
417 if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
418 rsp->status = -1;
419 return rsp;
420 }
421
422 t->p[t->n_pipelines] = req->pipeline_enable.p;
423
424 p->p = req->pipeline_enable.p;
425 p->timer_period = (rte_get_tsc_hz() *
426 req->pipeline_enable.timer_period_ms) / 1000;
427 p->time_next = rte_get_tsc_cycles() + p->timer_period;
428
429 t->n_pipelines++;
430
431 /* Response */
432 rsp->status = 0;
433 return rsp;
434 }
435
436 static struct thread_msg_rsp *
thread_msg_handle_pipeline_disable(struct thread_data * t,struct thread_msg_req * req)437 thread_msg_handle_pipeline_disable(struct thread_data *t,
438 struct thread_msg_req *req)
439 {
440 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
441 uint32_t n_pipelines = t->n_pipelines;
442 struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
443 uint32_t i;
444
445 /* find pipeline */
446 for (i = 0; i < n_pipelines; i++) {
447 struct pipeline_data *p = &t->pipeline_data[i];
448
449 if (p->p != pipeline)
450 continue;
451
452 if (i < n_pipelines - 1) {
453 struct rte_swx_pipeline *pipeline_last =
454 t->p[n_pipelines - 1];
455 struct pipeline_data *p_last =
456 &t->pipeline_data[n_pipelines - 1];
457
458 t->p[i] = pipeline_last;
459 memcpy(p, p_last, sizeof(*p));
460 }
461
462 t->n_pipelines--;
463
464 rsp->status = 0;
465 return rsp;
466 }
467
468 /* should not get here */
469 rsp->status = 0;
470 return rsp;
471 }
472
473 static void
thread_msg_handle(struct thread_data * t)474 thread_msg_handle(struct thread_data *t)
475 {
476 for ( ; ; ) {
477 struct thread_msg_req *req;
478 struct thread_msg_rsp *rsp;
479
480 req = thread_msg_recv(t->msgq_req);
481 if (req == NULL)
482 break;
483
484 switch (req->type) {
485 case THREAD_REQ_PIPELINE_ENABLE:
486 rsp = thread_msg_handle_pipeline_enable(t, req);
487 break;
488
489 case THREAD_REQ_PIPELINE_DISABLE:
490 rsp = thread_msg_handle_pipeline_disable(t, req);
491 break;
492
493 default:
494 rsp = (struct thread_msg_rsp *) req;
495 rsp->status = -1;
496 }
497
498 thread_msg_send(t->msgq_rsp, rsp);
499 }
500 }
501
502 /**
503 * Data plane threads: main
504 */
505 int
thread_main(void * arg __rte_unused)506 thread_main(void *arg __rte_unused)
507 {
508 struct thread_data *t;
509 uint32_t thread_id, i;
510
511 thread_id = rte_lcore_id();
512 t = &thread_data[thread_id];
513
514 /* Dispatch loop */
515 for (i = 0; ; i++) {
516 uint32_t j;
517
518 /* Data Plane */
519 for (j = 0; j < t->n_pipelines; j++)
520 rte_swx_pipeline_run(t->p[j], 1000000);
521
522 /* Control Plane */
523 if ((i & 0xF) == 0) {
524 uint64_t time = rte_get_tsc_cycles();
525 uint64_t time_next_min = UINT64_MAX;
526
527 if (time < t->time_next_min)
528 continue;
529
530 /* Thread message queues */
531 {
532 uint64_t time_next = t->time_next;
533
534 if (time_next <= time) {
535 thread_msg_handle(t);
536 time_next = time + t->timer_period;
537 t->time_next = time_next;
538 }
539
540 if (time_next < time_next_min)
541 time_next_min = time_next;
542 }
543
544 t->time_next_min = time_next_min;
545 }
546 }
547
548 return 0;
549 }
550