1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2020 Intel Corporation
3 */
4
5 #include <stdlib.h>
6
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17
18 #include "obj.h"
19 #include "thread.h"
20
21 #ifndef THREAD_PIPELINES_MAX
22 #define THREAD_PIPELINES_MAX 256
23 #endif
24
25 #ifndef THREAD_MSGQ_SIZE
26 #define THREAD_MSGQ_SIZE 64
27 #endif
28
29 #ifndef THREAD_TIMER_PERIOD_MS
30 #define THREAD_TIMER_PERIOD_MS 100
31 #endif
32
33 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful
34 * work, but not too big to avoid starving any other pipelines mapped to the
35 * same thread. For a pipeline that executes 10 instructions per packet, a
36 * quanta of 1000 instructions equates to processing 100 packets.
37 */
38 #ifndef PIPELINE_INSTR_QUANTA
39 #define PIPELINE_INSTR_QUANTA 1000
40 #endif
41
42 /**
43 * Control thread: data plane thread context
44 */
45 struct thread {
46 struct rte_ring *msgq_req;
47 struct rte_ring *msgq_rsp;
48
49 uint32_t enabled;
50 };
51
52 static struct thread thread[RTE_MAX_LCORE];
53
54 /**
55 * Data plane threads: context
56 */
57 struct pipeline_data {
58 struct rte_swx_pipeline *p;
59 uint64_t timer_period; /* Measured in CPU cycles. */
60 uint64_t time_next;
61 };
62
63 struct thread_data {
64 struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
65 uint32_t n_pipelines;
66
67 struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
68 struct rte_ring *msgq_req;
69 struct rte_ring *msgq_rsp;
70 uint64_t timer_period; /* Measured in CPU cycles. */
71 uint64_t time_next;
72 uint64_t time_next_min;
73 } __rte_cache_aligned;
74
75 static struct thread_data thread_data[RTE_MAX_LCORE];
76
77 /**
78 * Control thread: data plane thread init
79 */
80 static void
thread_free(void)81 thread_free(void)
82 {
83 uint32_t i;
84
85 for (i = 0; i < RTE_MAX_LCORE; i++) {
86 struct thread *t = &thread[i];
87
88 if (!rte_lcore_is_enabled(i))
89 continue;
90
91 /* MSGQs */
92 rte_ring_free(t->msgq_req);
93
94 rte_ring_free(t->msgq_rsp);
95 }
96 }
97
98 int
thread_init(void)99 thread_init(void)
100 {
101 uint32_t i;
102
103 RTE_LCORE_FOREACH_WORKER(i) {
104 char name[NAME_MAX];
105 struct rte_ring *msgq_req, *msgq_rsp;
106 struct thread *t = &thread[i];
107 struct thread_data *t_data = &thread_data[i];
108 uint32_t cpu_id = rte_lcore_to_socket_id(i);
109
110 /* MSGQs */
111 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
112
113 msgq_req = rte_ring_create(name,
114 THREAD_MSGQ_SIZE,
115 cpu_id,
116 RING_F_SP_ENQ | RING_F_SC_DEQ);
117
118 if (msgq_req == NULL) {
119 thread_free();
120 return -1;
121 }
122
123 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
124
125 msgq_rsp = rte_ring_create(name,
126 THREAD_MSGQ_SIZE,
127 cpu_id,
128 RING_F_SP_ENQ | RING_F_SC_DEQ);
129
130 if (msgq_rsp == NULL) {
131 thread_free();
132 return -1;
133 }
134
135 /* Control thread records */
136 t->msgq_req = msgq_req;
137 t->msgq_rsp = msgq_rsp;
138 t->enabled = 1;
139
140 /* Data plane thread records */
141 t_data->n_pipelines = 0;
142 t_data->msgq_req = msgq_req;
143 t_data->msgq_rsp = msgq_rsp;
144 t_data->timer_period =
145 (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
146 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
147 t_data->time_next_min = t_data->time_next;
148 }
149
150 return 0;
151 }
152
153 static inline int
thread_is_running(uint32_t thread_id)154 thread_is_running(uint32_t thread_id)
155 {
156 enum rte_lcore_state_t thread_state;
157
158 thread_state = rte_eal_get_lcore_state(thread_id);
159 return (thread_state == RUNNING) ? 1 : 0;
160 }
161
162 /**
163 * Control thread & data plane threads: message passing
164 */
165 enum thread_req_type {
166 THREAD_REQ_PIPELINE_ENABLE = 0,
167 THREAD_REQ_PIPELINE_DISABLE,
168 THREAD_REQ_MAX
169 };
170
171 struct thread_msg_req {
172 enum thread_req_type type;
173
174 union {
175 struct {
176 struct rte_swx_pipeline *p;
177 uint32_t timer_period_ms;
178 } pipeline_enable;
179
180 struct {
181 struct rte_swx_pipeline *p;
182 } pipeline_disable;
183 };
184 };
185
186 struct thread_msg_rsp {
187 int status;
188 };
189
190 /**
191 * Control thread
192 */
193 static struct thread_msg_req *
thread_msg_alloc(void)194 thread_msg_alloc(void)
195 {
196 size_t size = RTE_MAX(sizeof(struct thread_msg_req),
197 sizeof(struct thread_msg_rsp));
198
199 return calloc(1, size);
200 }
201
202 static void
thread_msg_free(struct thread_msg_rsp * rsp)203 thread_msg_free(struct thread_msg_rsp *rsp)
204 {
205 free(rsp);
206 }
207
208 static struct thread_msg_rsp *
thread_msg_send_recv(uint32_t thread_id,struct thread_msg_req * req)209 thread_msg_send_recv(uint32_t thread_id,
210 struct thread_msg_req *req)
211 {
212 struct thread *t = &thread[thread_id];
213 struct rte_ring *msgq_req = t->msgq_req;
214 struct rte_ring *msgq_rsp = t->msgq_rsp;
215 struct thread_msg_rsp *rsp;
216 int status;
217
218 /* send */
219 do {
220 status = rte_ring_sp_enqueue(msgq_req, req);
221 } while (status == -ENOBUFS);
222
223 /* recv */
224 do {
225 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
226 } while (status != 0);
227
228 return rsp;
229 }
230
231 int
thread_pipeline_enable(uint32_t thread_id,struct obj * obj,const char * pipeline_name)232 thread_pipeline_enable(uint32_t thread_id,
233 struct obj *obj,
234 const char *pipeline_name)
235 {
236 struct pipeline *p = pipeline_find(obj, pipeline_name);
237 struct thread *t;
238 struct thread_msg_req *req;
239 struct thread_msg_rsp *rsp;
240 int status;
241
242 /* Check input params */
243 if ((thread_id >= RTE_MAX_LCORE) ||
244 (p == NULL))
245 return -1;
246
247 t = &thread[thread_id];
248 if (t->enabled == 0)
249 return -1;
250
251 if (!thread_is_running(thread_id)) {
252 struct thread_data *td = &thread_data[thread_id];
253 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
254
255 if (td->n_pipelines >= THREAD_PIPELINES_MAX)
256 return -1;
257
258 /* Data plane thread */
259 td->p[td->n_pipelines] = p->p;
260
261 tdp->p = p->p;
262 tdp->timer_period =
263 (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
264 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
265
266 td->n_pipelines++;
267
268 /* Pipeline */
269 p->thread_id = thread_id;
270 p->enabled = 1;
271
272 return 0;
273 }
274
275 /* Allocate request */
276 req = thread_msg_alloc();
277 if (req == NULL)
278 return -1;
279
280 /* Write request */
281 req->type = THREAD_REQ_PIPELINE_ENABLE;
282 req->pipeline_enable.p = p->p;
283 req->pipeline_enable.timer_period_ms = p->timer_period_ms;
284
285 /* Send request and wait for response */
286 rsp = thread_msg_send_recv(thread_id, req);
287
288 /* Read response */
289 status = rsp->status;
290
291 /* Free response */
292 thread_msg_free(rsp);
293
294 /* Request completion */
295 if (status)
296 return status;
297
298 p->thread_id = thread_id;
299 p->enabled = 1;
300
301 return 0;
302 }
303
304 int
thread_pipeline_disable(uint32_t thread_id,struct obj * obj,const char * pipeline_name)305 thread_pipeline_disable(uint32_t thread_id,
306 struct obj *obj,
307 const char *pipeline_name)
308 {
309 struct pipeline *p = pipeline_find(obj, pipeline_name);
310 struct thread *t;
311 struct thread_msg_req *req;
312 struct thread_msg_rsp *rsp;
313 int status;
314
315 /* Check input params */
316 if ((thread_id >= RTE_MAX_LCORE) ||
317 (p == NULL))
318 return -1;
319
320 t = &thread[thread_id];
321 if (t->enabled == 0)
322 return -1;
323
324 if (p->enabled == 0)
325 return 0;
326
327 if (p->thread_id != thread_id)
328 return -1;
329
330 if (!thread_is_running(thread_id)) {
331 struct thread_data *td = &thread_data[thread_id];
332 uint32_t i;
333
334 for (i = 0; i < td->n_pipelines; i++) {
335 struct pipeline_data *tdp = &td->pipeline_data[i];
336
337 if (tdp->p != p->p)
338 continue;
339
340 /* Data plane thread */
341 if (i < td->n_pipelines - 1) {
342 struct rte_swx_pipeline *pipeline_last =
343 td->p[td->n_pipelines - 1];
344 struct pipeline_data *tdp_last =
345 &td->pipeline_data[td->n_pipelines - 1];
346
347 td->p[i] = pipeline_last;
348 memcpy(tdp, tdp_last, sizeof(*tdp));
349 }
350
351 td->n_pipelines--;
352
353 /* Pipeline */
354 p->enabled = 0;
355
356 break;
357 }
358
359 return 0;
360 }
361
362 /* Allocate request */
363 req = thread_msg_alloc();
364 if (req == NULL)
365 return -1;
366
367 /* Write request */
368 req->type = THREAD_REQ_PIPELINE_DISABLE;
369 req->pipeline_disable.p = p->p;
370
371 /* Send request and wait for response */
372 rsp = thread_msg_send_recv(thread_id, req);
373
374 /* Read response */
375 status = rsp->status;
376
377 /* Free response */
378 thread_msg_free(rsp);
379
380 /* Request completion */
381 if (status)
382 return status;
383
384 p->enabled = 0;
385
386 return 0;
387 }
388
389 /**
390 * Data plane threads: message handling
391 */
392 static inline struct thread_msg_req *
thread_msg_recv(struct rte_ring * msgq_req)393 thread_msg_recv(struct rte_ring *msgq_req)
394 {
395 struct thread_msg_req *req;
396
397 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
398
399 if (status != 0)
400 return NULL;
401
402 return req;
403 }
404
405 static inline void
thread_msg_send(struct rte_ring * msgq_rsp,struct thread_msg_rsp * rsp)406 thread_msg_send(struct rte_ring *msgq_rsp,
407 struct thread_msg_rsp *rsp)
408 {
409 int status;
410
411 do {
412 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
413 } while (status == -ENOBUFS);
414 }
415
416 static struct thread_msg_rsp *
thread_msg_handle_pipeline_enable(struct thread_data * t,struct thread_msg_req * req)417 thread_msg_handle_pipeline_enable(struct thread_data *t,
418 struct thread_msg_req *req)
419 {
420 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
421 struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
422
423 /* Request */
424 if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
425 rsp->status = -1;
426 return rsp;
427 }
428
429 t->p[t->n_pipelines] = req->pipeline_enable.p;
430
431 p->p = req->pipeline_enable.p;
432 p->timer_period = (rte_get_tsc_hz() *
433 req->pipeline_enable.timer_period_ms) / 1000;
434 p->time_next = rte_get_tsc_cycles() + p->timer_period;
435
436 t->n_pipelines++;
437
438 /* Response */
439 rsp->status = 0;
440 return rsp;
441 }
442
443 static struct thread_msg_rsp *
thread_msg_handle_pipeline_disable(struct thread_data * t,struct thread_msg_req * req)444 thread_msg_handle_pipeline_disable(struct thread_data *t,
445 struct thread_msg_req *req)
446 {
447 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
448 uint32_t n_pipelines = t->n_pipelines;
449 struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
450 uint32_t i;
451
452 /* find pipeline */
453 for (i = 0; i < n_pipelines; i++) {
454 struct pipeline_data *p = &t->pipeline_data[i];
455
456 if (p->p != pipeline)
457 continue;
458
459 if (i < n_pipelines - 1) {
460 struct rte_swx_pipeline *pipeline_last =
461 t->p[n_pipelines - 1];
462 struct pipeline_data *p_last =
463 &t->pipeline_data[n_pipelines - 1];
464
465 t->p[i] = pipeline_last;
466 memcpy(p, p_last, sizeof(*p));
467 }
468
469 t->n_pipelines--;
470
471 rsp->status = 0;
472 return rsp;
473 }
474
475 /* should not get here */
476 rsp->status = 0;
477 return rsp;
478 }
479
480 static void
thread_msg_handle(struct thread_data * t)481 thread_msg_handle(struct thread_data *t)
482 {
483 for ( ; ; ) {
484 struct thread_msg_req *req;
485 struct thread_msg_rsp *rsp;
486
487 req = thread_msg_recv(t->msgq_req);
488 if (req == NULL)
489 break;
490
491 switch (req->type) {
492 case THREAD_REQ_PIPELINE_ENABLE:
493 rsp = thread_msg_handle_pipeline_enable(t, req);
494 break;
495
496 case THREAD_REQ_PIPELINE_DISABLE:
497 rsp = thread_msg_handle_pipeline_disable(t, req);
498 break;
499
500 default:
501 rsp = (struct thread_msg_rsp *) req;
502 rsp->status = -1;
503 }
504
505 thread_msg_send(t->msgq_rsp, rsp);
506 }
507 }
508
509 /**
510 * Data plane threads: main
511 */
512 int
thread_main(void * arg __rte_unused)513 thread_main(void *arg __rte_unused)
514 {
515 struct thread_data *t;
516 uint32_t thread_id, i;
517
518 thread_id = rte_lcore_id();
519 t = &thread_data[thread_id];
520
521 /* Dispatch loop */
522 for (i = 0; ; i++) {
523 uint32_t j;
524
525 /* Data Plane */
526 for (j = 0; j < t->n_pipelines; j++)
527 rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA);
528
529 /* Control Plane */
530 if ((i & 0xF) == 0) {
531 uint64_t time = rte_get_tsc_cycles();
532 uint64_t time_next_min = UINT64_MAX;
533
534 if (time < t->time_next_min)
535 continue;
536
537 /* Thread message queues */
538 {
539 uint64_t time_next = t->time_next;
540
541 if (time_next <= time) {
542 thread_msg_handle(t);
543 time_next = time + t->timer_period;
544 t->time_next = time_next;
545 }
546
547 if (time_next < time_next_min)
548 time_next_min = time_next;
549 }
550
551 t->time_next_min = time_next_min;
552 }
553 }
554
555 return 0;
556 }
557