1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2018 Intel Corporation
3 */
4
5 #include <string.h>
6 #include <sys/time.h>
7
8 #include <rte_errno.h>
9 #include <rte_string_fns.h>
10
11 #include "eal_memalloc.h"
12 #include "eal_memcfg.h"
13 #include "eal_private.h"
14
15 #include "malloc_elem.h"
16 #include "malloc_mp.h"
17
18 #define MP_ACTION_SYNC "mp_malloc_sync"
19 /**< request sent by primary process to notify of changes in memory map */
20 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
21 /**< request sent by primary process to notify of changes in memory map. this is
22 * essentially a regular sync request, but we cannot send sync requests while
23 * another one is in progress, and we might have to - therefore, we do this as
24 * a separate callback.
25 */
26 #define MP_ACTION_REQUEST "mp_malloc_request"
27 /**< request sent by secondary process to ask for allocation/deallocation */
28 #define MP_ACTION_RESPONSE "mp_malloc_response"
29 /**< response sent to secondary process to indicate result of request */
30
31 /* forward declarations */
32 static int
33 handle_sync_response(const struct rte_mp_msg *request,
34 const struct rte_mp_reply *reply);
35 static int
36 handle_rollback_response(const struct rte_mp_msg *request,
37 const struct rte_mp_reply *reply);
38
39 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
40
41 /* when we're allocating, we need to store some state to ensure that we can
42 * roll back later
43 */
44 struct primary_alloc_req_state {
45 struct malloc_heap *heap;
46 struct rte_memseg **ms;
47 int ms_len;
48 struct malloc_elem *elem;
49 void *map_addr;
50 size_t map_len;
51 };
52
53 enum req_state {
54 REQ_STATE_INACTIVE = 0,
55 REQ_STATE_ACTIVE,
56 REQ_STATE_COMPLETE
57 };
58
59 struct mp_request {
60 TAILQ_ENTRY(mp_request) next;
61 struct malloc_mp_req user_req; /**< contents of request */
62 pthread_cond_t cond; /**< variable we use to time out on this request */
63 enum req_state state; /**< indicate status of this request */
64 struct primary_alloc_req_state alloc_state;
65 };
66
67 /*
68 * We could've used just a single request, but it may be possible for
69 * secondaries to timeout earlier than the primary, and send a new request while
70 * primary is still expecting replies to the old one. Therefore, each new
71 * request will get assigned a new ID, which is how we will distinguish between
72 * expected and unexpected messages.
73 */
74 TAILQ_HEAD(mp_request_list, mp_request);
75 static struct {
76 struct mp_request_list list;
77 pthread_mutex_t lock;
78 } mp_request_list = {
79 .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
80 .lock = PTHREAD_MUTEX_INITIALIZER
81 };
82
83 /**
84 * General workflow is the following:
85 *
86 * Allocation:
87 * S: send request to primary
88 * P: attempt to allocate memory
89 * if failed, sendmsg failure
90 * if success, send sync request
91 * S: if received msg of failure, quit
92 * if received sync request, synchronize memory map and reply with result
93 * P: if received sync request result
94 * if success, sendmsg success
95 * if failure, roll back allocation and send a rollback request
96 * S: if received msg of success, quit
97 * if received rollback request, synchronize memory map and reply with result
98 * P: if received sync request result
99 * sendmsg sync request result
100 * S: if received msg, quit
101 *
102 * Aside from timeouts, there are three points where we can quit:
103 * - if allocation failed straight away
104 * - if allocation and sync request succeeded
105 * - if allocation succeeded, sync request failed, allocation rolled back and
106 * rollback request received (irrespective of whether it succeeded or failed)
107 *
108 * Deallocation:
109 * S: send request to primary
110 * P: attempt to deallocate memory
111 * if failed, sendmsg failure
112 * if success, send sync request
113 * S: if received msg of failure, quit
114 * if received sync request, synchronize memory map and reply with result
115 * P: if received sync request result
116 * sendmsg sync request result
117 * S: if received msg, quit
118 *
119 * There is no "rollback" from deallocation, as it's safe to have some memory
120 * mapped in some processes - it's absent from the heap, so it won't get used.
121 */
122
123 static struct mp_request *
find_request_by_id(uint64_t id)124 find_request_by_id(uint64_t id)
125 {
126 struct mp_request *req;
127 TAILQ_FOREACH(req, &mp_request_list.list, next) {
128 if (req->user_req.id == id)
129 break;
130 }
131 return req;
132 }
133
134 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
135 static uint64_t
get_unique_id(void)136 get_unique_id(void)
137 {
138 uint64_t id;
139 do {
140 id = rte_rand();
141 } while (find_request_by_id(id) != NULL);
142 return id;
143 }
144
145 /* secondary will respond to sync requests thusly */
146 static int
handle_sync(const struct rte_mp_msg * msg,const void * peer)147 handle_sync(const struct rte_mp_msg *msg, const void *peer)
148 {
149 struct rte_mp_msg reply;
150 const struct malloc_mp_req *req =
151 (const struct malloc_mp_req *)msg->param;
152 struct malloc_mp_req *resp =
153 (struct malloc_mp_req *)reply.param;
154 int ret;
155
156 if (req->t != REQ_TYPE_SYNC) {
157 RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
158 return -1;
159 }
160
161 memset(&reply, 0, sizeof(reply));
162
163 reply.num_fds = 0;
164 strlcpy(reply.name, msg->name, sizeof(reply.name));
165 reply.len_param = sizeof(*resp);
166
167 ret = eal_memalloc_sync_with_primary();
168
169 resp->t = REQ_TYPE_SYNC;
170 resp->id = req->id;
171 resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
172
173 return rte_mp_reply(&reply, peer);
174 }
175
176 static int
handle_free_request(const struct malloc_mp_req * m)177 handle_free_request(const struct malloc_mp_req *m)
178 {
179 const struct rte_memseg_list *msl;
180 void *start, *end;
181 size_t len;
182
183 len = m->free_req.len;
184 start = m->free_req.addr;
185 end = RTE_PTR_ADD(start, len - 1);
186
187 /* check if the requested memory actually exists */
188 msl = rte_mem_virt2memseg_list(start);
189 if (msl == NULL) {
190 RTE_LOG(ERR, EAL, "Requested to free unknown memory\n");
191 return -1;
192 }
193
194 /* check if end is within the same memory region */
195 if (rte_mem_virt2memseg_list(end) != msl) {
196 RTE_LOG(ERR, EAL, "Requested to free memory spanning multiple regions\n");
197 return -1;
198 }
199
200 /* we're supposed to only free memory that's not external */
201 if (msl->external) {
202 RTE_LOG(ERR, EAL, "Requested to free external memory\n");
203 return -1;
204 }
205
206 /* now that we've validated the request, announce it */
207 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
208 m->free_req.addr, m->free_req.len);
209
210 /* now, do the actual freeing */
211 return malloc_heap_free_pages(m->free_req.addr, m->free_req.len);
212 }
213
214 static int
handle_alloc_request(const struct malloc_mp_req * m,struct mp_request * req)215 handle_alloc_request(const struct malloc_mp_req *m,
216 struct mp_request *req)
217 {
218 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
219 const struct malloc_req_alloc *ar = &m->alloc_req;
220 struct malloc_heap *heap;
221 struct malloc_elem *elem;
222 struct rte_memseg **ms;
223 size_t alloc_sz;
224 int n_segs;
225 void *map_addr;
226
227 /* this is checked by the API, but we need to prevent divide by zero */
228 if (ar->page_sz == 0 || !rte_is_power_of_2(ar->page_sz)) {
229 RTE_LOG(ERR, EAL, "Attempting to allocate with invalid page size\n");
230 return -1;
231 }
232
233 /* heap idx is index into the heap array, not socket ID */
234 if (ar->malloc_heap_idx >= RTE_MAX_HEAPS) {
235 RTE_LOG(ERR, EAL, "Attempting to allocate from invalid heap\n");
236 return -1;
237 }
238
239 heap = &mcfg->malloc_heaps[ar->malloc_heap_idx];
240
241 /*
242 * for allocations, we must only use internal heaps, but since the
243 * rte_malloc_heap_socket_is_external() is thread-safe and we're already
244 * read-locked, we'll have to take advantage of the fact that internal
245 * socket ID's are always lower than RTE_MAX_NUMA_NODES.
246 */
247 if (heap->socket_id >= RTE_MAX_NUMA_NODES) {
248 RTE_LOG(ERR, EAL, "Attempting to allocate from external heap\n");
249 return -1;
250 }
251
252 alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
253 MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
254 n_segs = alloc_sz / ar->page_sz;
255
256 /* we can't know in advance how many pages we'll need, so we malloc */
257 ms = malloc(sizeof(*ms) * n_segs);
258 if (ms == NULL) {
259 RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
260 return -1;
261 }
262 memset(ms, 0, sizeof(*ms) * n_segs);
263
264 elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
265 ar->flags, ar->align, ar->bound, ar->contig, ms,
266 n_segs);
267
268 if (elem == NULL)
269 goto fail;
270
271 map_addr = ms[0]->addr;
272
273 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz);
274
275 /* we have succeeded in allocating memory, but we still need to sync
276 * with other processes. however, since DPDK IPC is single-threaded, we
277 * send an asynchronous request and exit this callback.
278 */
279
280 req->alloc_state.ms = ms;
281 req->alloc_state.ms_len = n_segs;
282 req->alloc_state.map_addr = map_addr;
283 req->alloc_state.map_len = alloc_sz;
284 req->alloc_state.elem = elem;
285 req->alloc_state.heap = heap;
286
287 return 0;
288 fail:
289 free(ms);
290 return -1;
291 }
292
293 /* first stage of primary handling requests from secondary */
294 static int
handle_request(const struct rte_mp_msg * msg,const void * peer __rte_unused)295 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
296 {
297 const struct malloc_mp_req *m =
298 (const struct malloc_mp_req *)msg->param;
299 struct mp_request *entry;
300 int ret;
301
302 /* lock access to request */
303 pthread_mutex_lock(&mp_request_list.lock);
304
305 /* make sure it's not a dupe */
306 entry = find_request_by_id(m->id);
307 if (entry != NULL) {
308 RTE_LOG(ERR, EAL, "Duplicate request id\n");
309 goto fail;
310 }
311
312 entry = malloc(sizeof(*entry));
313 if (entry == NULL) {
314 RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
315 goto fail;
316 }
317
318 /* erase all data */
319 memset(entry, 0, sizeof(*entry));
320
321 if (m->t == REQ_TYPE_ALLOC) {
322 ret = handle_alloc_request(m, entry);
323 } else if (m->t == REQ_TYPE_FREE) {
324 ret = handle_free_request(m);
325 } else {
326 RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
327 goto fail;
328 }
329
330 if (ret != 0) {
331 struct rte_mp_msg resp_msg;
332 struct malloc_mp_req *resp =
333 (struct malloc_mp_req *)resp_msg.param;
334
335 /* send failure message straight away */
336 resp_msg.num_fds = 0;
337 resp_msg.len_param = sizeof(*resp);
338 strlcpy(resp_msg.name, MP_ACTION_RESPONSE,
339 sizeof(resp_msg.name));
340
341 resp->t = m->t;
342 resp->result = REQ_RESULT_FAIL;
343 resp->id = m->id;
344
345 if (rte_mp_sendmsg(&resp_msg)) {
346 RTE_LOG(ERR, EAL, "Couldn't send response\n");
347 goto fail;
348 }
349 /* we did not modify the request */
350 free(entry);
351 } else {
352 struct rte_mp_msg sr_msg;
353 struct malloc_mp_req *sr =
354 (struct malloc_mp_req *)sr_msg.param;
355 struct timespec ts;
356
357 memset(&sr_msg, 0, sizeof(sr_msg));
358
359 /* we can do something, so send sync request asynchronously */
360 sr_msg.num_fds = 0;
361 sr_msg.len_param = sizeof(*sr);
362 strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name));
363
364 ts.tv_nsec = 0;
365 ts.tv_sec = MP_TIMEOUT_S;
366
367 /* sync requests carry no data */
368 sr->t = REQ_TYPE_SYNC;
369 sr->id = m->id;
370
371 /* there may be stray timeout still waiting */
372 do {
373 ret = rte_mp_request_async(&sr_msg, &ts,
374 handle_sync_response);
375 } while (ret != 0 && rte_errno == EEXIST);
376 if (ret != 0) {
377 RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
378 if (m->t == REQ_TYPE_ALLOC)
379 free(entry->alloc_state.ms);
380 goto fail;
381 }
382
383 /* mark request as in progress */
384 memcpy(&entry->user_req, m, sizeof(*m));
385 entry->state = REQ_STATE_ACTIVE;
386
387 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
388 }
389 pthread_mutex_unlock(&mp_request_list.lock);
390 return 0;
391 fail:
392 pthread_mutex_unlock(&mp_request_list.lock);
393 free(entry);
394 return -1;
395 }
396
397 /* callback for asynchronous sync requests for primary. this will either do a
398 * sendmsg with results, or trigger rollback request.
399 */
400 static int
handle_sync_response(const struct rte_mp_msg * request,const struct rte_mp_reply * reply)401 handle_sync_response(const struct rte_mp_msg *request,
402 const struct rte_mp_reply *reply)
403 {
404 enum malloc_req_result result;
405 struct mp_request *entry;
406 const struct malloc_mp_req *mpreq =
407 (const struct malloc_mp_req *)request->param;
408 int i;
409
410 /* lock the request */
411 pthread_mutex_lock(&mp_request_list.lock);
412
413 entry = find_request_by_id(mpreq->id);
414 if (entry == NULL) {
415 RTE_LOG(ERR, EAL, "Wrong request ID\n");
416 goto fail;
417 }
418
419 result = REQ_RESULT_SUCCESS;
420
421 if (reply->nb_received != reply->nb_sent)
422 result = REQ_RESULT_FAIL;
423
424 for (i = 0; i < reply->nb_received; i++) {
425 struct malloc_mp_req *resp =
426 (struct malloc_mp_req *)reply->msgs[i].param;
427
428 if (resp->t != REQ_TYPE_SYNC) {
429 RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
430 result = REQ_RESULT_FAIL;
431 break;
432 }
433 if (resp->id != entry->user_req.id) {
434 RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
435 result = REQ_RESULT_FAIL;
436 break;
437 }
438 if (resp->result == REQ_RESULT_FAIL) {
439 result = REQ_RESULT_FAIL;
440 break;
441 }
442 }
443
444 if (entry->user_req.t == REQ_TYPE_FREE) {
445 struct rte_mp_msg msg;
446 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
447
448 memset(&msg, 0, sizeof(msg));
449
450 /* this is a free request, just sendmsg result */
451 resp->t = REQ_TYPE_FREE;
452 resp->result = result;
453 resp->id = entry->user_req.id;
454 msg.num_fds = 0;
455 msg.len_param = sizeof(*resp);
456 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
457
458 if (rte_mp_sendmsg(&msg))
459 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
460
461 TAILQ_REMOVE(&mp_request_list.list, entry, next);
462 free(entry);
463 } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
464 result == REQ_RESULT_SUCCESS) {
465 struct malloc_heap *heap = entry->alloc_state.heap;
466 struct rte_mp_msg msg;
467 struct malloc_mp_req *resp =
468 (struct malloc_mp_req *)msg.param;
469
470 memset(&msg, 0, sizeof(msg));
471
472 heap->total_size += entry->alloc_state.map_len;
473
474 /* result is success, so just notify secondary about this */
475 resp->t = REQ_TYPE_ALLOC;
476 resp->result = result;
477 resp->id = entry->user_req.id;
478 msg.num_fds = 0;
479 msg.len_param = sizeof(*resp);
480 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
481
482 if (rte_mp_sendmsg(&msg))
483 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
484
485 TAILQ_REMOVE(&mp_request_list.list, entry, next);
486 free(entry->alloc_state.ms);
487 free(entry);
488 } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
489 result == REQ_RESULT_FAIL) {
490 struct rte_mp_msg rb_msg;
491 struct malloc_mp_req *rb =
492 (struct malloc_mp_req *)rb_msg.param;
493 struct timespec ts;
494 struct primary_alloc_req_state *state =
495 &entry->alloc_state;
496 int ret;
497
498 memset(&rb_msg, 0, sizeof(rb_msg));
499
500 /* we've failed to sync, so do a rollback */
501 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
502 state->map_addr, state->map_len);
503
504 rollback_expand_heap(state->ms, state->ms_len, state->elem,
505 state->map_addr, state->map_len);
506
507 /* send rollback request */
508 rb_msg.num_fds = 0;
509 rb_msg.len_param = sizeof(*rb);
510 strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name));
511
512 ts.tv_nsec = 0;
513 ts.tv_sec = MP_TIMEOUT_S;
514
515 /* sync requests carry no data */
516 rb->t = REQ_TYPE_SYNC;
517 rb->id = entry->user_req.id;
518
519 /* there may be stray timeout still waiting */
520 do {
521 ret = rte_mp_request_async(&rb_msg, &ts,
522 handle_rollback_response);
523 } while (ret != 0 && rte_errno == EEXIST);
524 if (ret != 0) {
525 RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
526
527 /* we couldn't send rollback request, but that's OK -
528 * secondary will time out, and memory has been removed
529 * from heap anyway.
530 */
531 TAILQ_REMOVE(&mp_request_list.list, entry, next);
532 free(state->ms);
533 free(entry);
534 goto fail;
535 }
536 } else {
537 RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
538 goto fail;
539 }
540
541 pthread_mutex_unlock(&mp_request_list.lock);
542 return 0;
543 fail:
544 pthread_mutex_unlock(&mp_request_list.lock);
545 return -1;
546 }
547
548 static int
handle_rollback_response(const struct rte_mp_msg * request,const struct rte_mp_reply * reply __rte_unused)549 handle_rollback_response(const struct rte_mp_msg *request,
550 const struct rte_mp_reply *reply __rte_unused)
551 {
552 struct rte_mp_msg msg;
553 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
554 const struct malloc_mp_req *mpreq =
555 (const struct malloc_mp_req *)request->param;
556 struct mp_request *entry;
557
558 /* lock the request */
559 pthread_mutex_lock(&mp_request_list.lock);
560
561 memset(&msg, 0, sizeof(msg));
562
563 entry = find_request_by_id(mpreq->id);
564 if (entry == NULL) {
565 RTE_LOG(ERR, EAL, "Wrong request ID\n");
566 goto fail;
567 }
568
569 if (entry->user_req.t != REQ_TYPE_ALLOC) {
570 RTE_LOG(ERR, EAL, "Unexpected active request\n");
571 goto fail;
572 }
573
574 /* we don't care if rollback succeeded, request still failed */
575 resp->t = REQ_TYPE_ALLOC;
576 resp->result = REQ_RESULT_FAIL;
577 resp->id = mpreq->id;
578 msg.num_fds = 0;
579 msg.len_param = sizeof(*resp);
580 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
581
582 if (rte_mp_sendmsg(&msg))
583 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
584
585 /* clean up */
586 TAILQ_REMOVE(&mp_request_list.list, entry, next);
587 free(entry->alloc_state.ms);
588 free(entry);
589
590 pthread_mutex_unlock(&mp_request_list.lock);
591 return 0;
592 fail:
593 pthread_mutex_unlock(&mp_request_list.lock);
594 return -1;
595 }
596
597 /* final stage of the request from secondary */
598 static int
handle_response(const struct rte_mp_msg * msg,const void * peer __rte_unused)599 handle_response(const struct rte_mp_msg *msg, const void *peer __rte_unused)
600 {
601 const struct malloc_mp_req *m =
602 (const struct malloc_mp_req *)msg->param;
603 struct mp_request *entry;
604
605 pthread_mutex_lock(&mp_request_list.lock);
606
607 entry = find_request_by_id(m->id);
608 if (entry != NULL) {
609 /* update request status */
610 entry->user_req.result = m->result;
611
612 entry->state = REQ_STATE_COMPLETE;
613
614 /* trigger thread wakeup */
615 pthread_cond_signal(&entry->cond);
616 }
617
618 pthread_mutex_unlock(&mp_request_list.lock);
619
620 return 0;
621 }
622
623 /* synchronously request memory map sync, this is only called whenever primary
624 * process initiates the allocation.
625 */
626 int
request_sync(void)627 request_sync(void)
628 {
629 struct rte_mp_msg msg;
630 struct rte_mp_reply reply;
631 struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
632 struct timespec ts;
633 int i, ret = -1;
634
635 memset(&msg, 0, sizeof(msg));
636 memset(&reply, 0, sizeof(reply));
637
638 /* no need to create tailq entries as this is entirely synchronous */
639
640 msg.num_fds = 0;
641 msg.len_param = sizeof(*req);
642 strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name));
643
644 /* sync request carries no data */
645 req->t = REQ_TYPE_SYNC;
646 req->id = get_unique_id();
647
648 ts.tv_nsec = 0;
649 ts.tv_sec = MP_TIMEOUT_S;
650
651 /* there may be stray timeout still waiting */
652 do {
653 ret = rte_mp_request_sync(&msg, &reply, &ts);
654 } while (ret != 0 && rte_errno == EEXIST);
655 if (ret != 0) {
656 /* if IPC is unsupported, behave as if the call succeeded */
657 if (rte_errno != ENOTSUP)
658 RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
659 else
660 ret = 0;
661 goto out;
662 }
663
664 if (reply.nb_received != reply.nb_sent) {
665 RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
666 goto out;
667 }
668
669 for (i = 0; i < reply.nb_received; i++) {
670 struct malloc_mp_req *resp =
671 (struct malloc_mp_req *)reply.msgs[i].param;
672 if (resp->t != REQ_TYPE_SYNC) {
673 RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
674 goto out;
675 }
676 if (resp->id != req->id) {
677 RTE_LOG(ERR, EAL, "Wrong request ID\n");
678 goto out;
679 }
680 if (resp->result != REQ_RESULT_SUCCESS) {
681 RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
682 goto out;
683 }
684 }
685
686 ret = 0;
687 out:
688 free(reply.msgs);
689 return ret;
690 }
691
692 /* this is a synchronous wrapper around a bunch of asynchronous requests to
693 * primary process. this will initiate a request and wait until responses come.
694 */
695 int
request_to_primary(struct malloc_mp_req * user_req)696 request_to_primary(struct malloc_mp_req *user_req)
697 {
698 struct rte_mp_msg msg;
699 struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
700 struct mp_request *entry;
701 struct timespec ts;
702 struct timeval now;
703 int ret;
704
705 memset(&msg, 0, sizeof(msg));
706 memset(&ts, 0, sizeof(ts));
707
708 pthread_mutex_lock(&mp_request_list.lock);
709
710 entry = malloc(sizeof(*entry));
711 if (entry == NULL) {
712 RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
713 goto fail;
714 }
715
716 memset(entry, 0, sizeof(*entry));
717
718 if (gettimeofday(&now, NULL) < 0) {
719 RTE_LOG(ERR, EAL, "Cannot get current time\n");
720 goto fail;
721 }
722
723 ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
724 ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
725 (now.tv_usec * 1000) / 1000000000;
726
727 /* initialize the request */
728 pthread_cond_init(&entry->cond, NULL);
729
730 msg.num_fds = 0;
731 msg.len_param = sizeof(*msg_req);
732 strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name));
733
734 /* (attempt to) get a unique id */
735 user_req->id = get_unique_id();
736
737 /* copy contents of user request into the message */
738 memcpy(msg_req, user_req, sizeof(*msg_req));
739
740 if (rte_mp_sendmsg(&msg)) {
741 RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
742 goto fail;
743 }
744
745 /* copy contents of user request into active request */
746 memcpy(&entry->user_req, user_req, sizeof(*user_req));
747
748 /* mark request as in progress */
749 entry->state = REQ_STATE_ACTIVE;
750
751 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
752
753 /* finally, wait on timeout */
754 do {
755 ret = pthread_cond_timedwait(&entry->cond,
756 &mp_request_list.lock, &ts);
757 } while (ret != 0 && ret != ETIMEDOUT);
758
759 if (entry->state != REQ_STATE_COMPLETE) {
760 RTE_LOG(ERR, EAL, "Request timed out\n");
761 ret = -1;
762 } else {
763 ret = 0;
764 user_req->result = entry->user_req.result;
765 }
766 TAILQ_REMOVE(&mp_request_list.list, entry, next);
767 free(entry);
768
769 pthread_mutex_unlock(&mp_request_list.lock);
770 return ret;
771 fail:
772 pthread_mutex_unlock(&mp_request_list.lock);
773 free(entry);
774 return -1;
775 }
776
777 int
register_mp_requests(void)778 register_mp_requests(void)
779 {
780 if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
781 /* it's OK for primary to not support IPC */
782 if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) &&
783 rte_errno != ENOTSUP) {
784 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
785 MP_ACTION_REQUEST);
786 return -1;
787 }
788 } else {
789 if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
790 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
791 MP_ACTION_SYNC);
792 return -1;
793 }
794 if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
795 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
796 MP_ACTION_SYNC);
797 return -1;
798 }
799 if (rte_mp_action_register(MP_ACTION_RESPONSE,
800 handle_response)) {
801 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
802 MP_ACTION_RESPONSE);
803 return -1;
804 }
805 }
806 return 0;
807 }
808
809 void
unregister_mp_requests(void)810 unregister_mp_requests(void)
811 {
812 if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
813 rte_mp_action_unregister(MP_ACTION_REQUEST);
814 } else {
815 rte_mp_action_unregister(MP_ACTION_SYNC);
816 rte_mp_action_unregister(MP_ACTION_ROLLBACK);
817 rte_mp_action_unregister(MP_ACTION_RESPONSE);
818 }
819 }
820