xref: /f-stack/dpdk/lib/librte_eal/common/malloc_mp.c (revision 4418919f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation
3  */
4 
5 #include <string.h>
6 #include <sys/time.h>
7 
8 #include <rte_alarm.h>
9 #include <rte_errno.h>
10 #include <rte_string_fns.h>
11 
12 #include "eal_memalloc.h"
13 #include "eal_memcfg.h"
14 
15 #include "malloc_elem.h"
16 #include "malloc_mp.h"
17 
18 #define MP_ACTION_SYNC "mp_malloc_sync"
19 /**< request sent by primary process to notify of changes in memory map */
20 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
21 /**< request sent by primary process to notify of changes in memory map. this is
22  * essentially a regular sync request, but we cannot send sync requests while
23  * another one is in progress, and we might have to - therefore, we do this as
24  * a separate callback.
25  */
26 #define MP_ACTION_REQUEST "mp_malloc_request"
27 /**< request sent by secondary process to ask for allocation/deallocation */
28 #define MP_ACTION_RESPONSE "mp_malloc_response"
29 /**< response sent to secondary process to indicate result of request */
30 
31 /* forward declarations */
32 static int
33 handle_sync_response(const struct rte_mp_msg *request,
34 		const struct rte_mp_reply *reply);
35 static int
36 handle_rollback_response(const struct rte_mp_msg *request,
37 		const struct rte_mp_reply *reply);
38 
39 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
40 
41 /* when we're allocating, we need to store some state to ensure that we can
42  * roll back later
43  */
44 struct primary_alloc_req_state {
45 	struct malloc_heap *heap;
46 	struct rte_memseg **ms;
47 	int ms_len;
48 	struct malloc_elem *elem;
49 	void *map_addr;
50 	size_t map_len;
51 };
52 
53 enum req_state {
54 	REQ_STATE_INACTIVE = 0,
55 	REQ_STATE_ACTIVE,
56 	REQ_STATE_COMPLETE
57 };
58 
59 struct mp_request {
60 	TAILQ_ENTRY(mp_request) next;
61 	struct malloc_mp_req user_req; /**< contents of request */
62 	pthread_cond_t cond; /**< variable we use to time out on this request */
63 	enum req_state state; /**< indicate status of this request */
64 	struct primary_alloc_req_state alloc_state;
65 };
66 
67 /*
68  * We could've used just a single request, but it may be possible for
69  * secondaries to timeout earlier than the primary, and send a new request while
70  * primary is still expecting replies to the old one. Therefore, each new
71  * request will get assigned a new ID, which is how we will distinguish between
72  * expected and unexpected messages.
73  */
74 TAILQ_HEAD(mp_request_list, mp_request);
75 static struct {
76 	struct mp_request_list list;
77 	pthread_mutex_t lock;
78 } mp_request_list = {
79 	.list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
80 	.lock = PTHREAD_MUTEX_INITIALIZER
81 };
82 
83 /**
84  * General workflow is the following:
85  *
86  * Allocation:
87  * S: send request to primary
88  * P: attempt to allocate memory
89  *    if failed, sendmsg failure
90  *    if success, send sync request
91  * S: if received msg of failure, quit
92  *    if received sync request, synchronize memory map and reply with result
93  * P: if received sync request result
94  *    if success, sendmsg success
95  *    if failure, roll back allocation and send a rollback request
96  * S: if received msg of success, quit
97  *    if received rollback request, synchronize memory map and reply with result
98  * P: if received sync request result
99  *    sendmsg sync request result
100  * S: if received msg, quit
101  *
102  * Aside from timeouts, there are three points where we can quit:
103  *  - if allocation failed straight away
104  *  - if allocation and sync request succeeded
105  *  - if allocation succeeded, sync request failed, allocation rolled back and
106  *    rollback request received (irrespective of whether it succeeded or failed)
107  *
108  * Deallocation:
109  * S: send request to primary
110  * P: attempt to deallocate memory
111  *    if failed, sendmsg failure
112  *    if success, send sync request
113  * S: if received msg of failure, quit
114  *    if received sync request, synchronize memory map and reply with result
115  * P: if received sync request result
116  *    sendmsg sync request result
117  * S: if received msg, quit
118  *
119  * There is no "rollback" from deallocation, as it's safe to have some memory
120  * mapped in some processes - it's absent from the heap, so it won't get used.
121  */
122 
123 static struct mp_request *
find_request_by_id(uint64_t id)124 find_request_by_id(uint64_t id)
125 {
126 	struct mp_request *req;
127 	TAILQ_FOREACH(req, &mp_request_list.list, next) {
128 		if (req->user_req.id == id)
129 			break;
130 	}
131 	return req;
132 }
133 
134 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
135 static uint64_t
get_unique_id(void)136 get_unique_id(void)
137 {
138 	uint64_t id;
139 	do {
140 		id = rte_rand();
141 	} while (find_request_by_id(id) != NULL);
142 	return id;
143 }
144 
145 /* secondary will respond to sync requests thusly */
146 static int
handle_sync(const struct rte_mp_msg * msg,const void * peer)147 handle_sync(const struct rte_mp_msg *msg, const void *peer)
148 {
149 	struct rte_mp_msg reply;
150 	const struct malloc_mp_req *req =
151 			(const struct malloc_mp_req *)msg->param;
152 	struct malloc_mp_req *resp =
153 			(struct malloc_mp_req *)reply.param;
154 	int ret;
155 
156 	if (req->t != REQ_TYPE_SYNC) {
157 		RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
158 		return -1;
159 	}
160 
161 	memset(&reply, 0, sizeof(reply));
162 
163 	reply.num_fds = 0;
164 	strlcpy(reply.name, msg->name, sizeof(reply.name));
165 	reply.len_param = sizeof(*resp);
166 
167 	ret = eal_memalloc_sync_with_primary();
168 
169 	resp->t = REQ_TYPE_SYNC;
170 	resp->id = req->id;
171 	resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
172 
173 	rte_mp_reply(&reply, peer);
174 
175 	return 0;
176 }
177 
178 static int
handle_alloc_request(const struct malloc_mp_req * m,struct mp_request * req)179 handle_alloc_request(const struct malloc_mp_req *m,
180 		struct mp_request *req)
181 {
182 	const struct malloc_req_alloc *ar = &m->alloc_req;
183 	struct malloc_heap *heap;
184 	struct malloc_elem *elem;
185 	struct rte_memseg **ms;
186 	size_t alloc_sz;
187 	int n_segs;
188 	void *map_addr;
189 
190 	alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
191 			MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
192 	n_segs = alloc_sz / ar->page_sz;
193 
194 	heap = ar->heap;
195 
196 	/* we can't know in advance how many pages we'll need, so we malloc */
197 	ms = malloc(sizeof(*ms) * n_segs);
198 	if (ms == NULL) {
199 		RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
200 		goto fail;
201 	}
202 	memset(ms, 0, sizeof(*ms) * n_segs);
203 
204 	elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
205 			ar->flags, ar->align, ar->bound, ar->contig, ms,
206 			n_segs);
207 
208 	if (elem == NULL)
209 		goto fail;
210 
211 	map_addr = ms[0]->addr;
212 
213 	eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz);
214 
215 	/* we have succeeded in allocating memory, but we still need to sync
216 	 * with other processes. however, since DPDK IPC is single-threaded, we
217 	 * send an asynchronous request and exit this callback.
218 	 */
219 
220 	req->alloc_state.ms = ms;
221 	req->alloc_state.ms_len = n_segs;
222 	req->alloc_state.map_addr = map_addr;
223 	req->alloc_state.map_len = alloc_sz;
224 	req->alloc_state.elem = elem;
225 	req->alloc_state.heap = heap;
226 
227 	return 0;
228 fail:
229 	free(ms);
230 	return -1;
231 }
232 
233 /* first stage of primary handling requests from secondary */
234 static int
handle_request(const struct rte_mp_msg * msg,const void * peer __rte_unused)235 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
236 {
237 	const struct malloc_mp_req *m =
238 			(const struct malloc_mp_req *)msg->param;
239 	struct mp_request *entry;
240 	int ret;
241 
242 	/* lock access to request */
243 	pthread_mutex_lock(&mp_request_list.lock);
244 
245 	/* make sure it's not a dupe */
246 	entry = find_request_by_id(m->id);
247 	if (entry != NULL) {
248 		RTE_LOG(ERR, EAL, "Duplicate request id\n");
249 		goto fail;
250 	}
251 
252 	entry = malloc(sizeof(*entry));
253 	if (entry == NULL) {
254 		RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
255 		goto fail;
256 	}
257 
258 	/* erase all data */
259 	memset(entry, 0, sizeof(*entry));
260 
261 	if (m->t == REQ_TYPE_ALLOC) {
262 		ret = handle_alloc_request(m, entry);
263 	} else if (m->t == REQ_TYPE_FREE) {
264 		eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
265 				m->free_req.addr, m->free_req.len);
266 
267 		ret = malloc_heap_free_pages(m->free_req.addr,
268 				m->free_req.len);
269 	} else {
270 		RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
271 		goto fail;
272 	}
273 
274 	if (ret != 0) {
275 		struct rte_mp_msg resp_msg;
276 		struct malloc_mp_req *resp =
277 				(struct malloc_mp_req *)resp_msg.param;
278 
279 		/* send failure message straight away */
280 		resp_msg.num_fds = 0;
281 		resp_msg.len_param = sizeof(*resp);
282 		strlcpy(resp_msg.name, MP_ACTION_RESPONSE,
283 				sizeof(resp_msg.name));
284 
285 		resp->t = m->t;
286 		resp->result = REQ_RESULT_FAIL;
287 		resp->id = m->id;
288 
289 		if (rte_mp_sendmsg(&resp_msg)) {
290 			RTE_LOG(ERR, EAL, "Couldn't send response\n");
291 			goto fail;
292 		}
293 		/* we did not modify the request */
294 		free(entry);
295 	} else {
296 		struct rte_mp_msg sr_msg;
297 		struct malloc_mp_req *sr =
298 				(struct malloc_mp_req *)sr_msg.param;
299 		struct timespec ts;
300 
301 		memset(&sr_msg, 0, sizeof(sr_msg));
302 
303 		/* we can do something, so send sync request asynchronously */
304 		sr_msg.num_fds = 0;
305 		sr_msg.len_param = sizeof(*sr);
306 		strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name));
307 
308 		ts.tv_nsec = 0;
309 		ts.tv_sec = MP_TIMEOUT_S;
310 
311 		/* sync requests carry no data */
312 		sr->t = REQ_TYPE_SYNC;
313 		sr->id = m->id;
314 
315 		/* there may be stray timeout still waiting */
316 		do {
317 			ret = rte_mp_request_async(&sr_msg, &ts,
318 					handle_sync_response);
319 		} while (ret != 0 && rte_errno == EEXIST);
320 		if (ret != 0) {
321 			RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
322 			if (m->t == REQ_TYPE_ALLOC)
323 				free(entry->alloc_state.ms);
324 			goto fail;
325 		}
326 
327 		/* mark request as in progress */
328 		memcpy(&entry->user_req, m, sizeof(*m));
329 		entry->state = REQ_STATE_ACTIVE;
330 
331 		TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
332 	}
333 	pthread_mutex_unlock(&mp_request_list.lock);
334 	return 0;
335 fail:
336 	pthread_mutex_unlock(&mp_request_list.lock);
337 	free(entry);
338 	return -1;
339 }
340 
341 /* callback for asynchronous sync requests for primary. this will either do a
342  * sendmsg with results, or trigger rollback request.
343  */
344 static int
handle_sync_response(const struct rte_mp_msg * request,const struct rte_mp_reply * reply)345 handle_sync_response(const struct rte_mp_msg *request,
346 		const struct rte_mp_reply *reply)
347 {
348 	enum malloc_req_result result;
349 	struct mp_request *entry;
350 	const struct malloc_mp_req *mpreq =
351 			(const struct malloc_mp_req *)request->param;
352 	int i;
353 
354 	/* lock the request */
355 	pthread_mutex_lock(&mp_request_list.lock);
356 
357 	entry = find_request_by_id(mpreq->id);
358 	if (entry == NULL) {
359 		RTE_LOG(ERR, EAL, "Wrong request ID\n");
360 		goto fail;
361 	}
362 
363 	result = REQ_RESULT_SUCCESS;
364 
365 	if (reply->nb_received != reply->nb_sent)
366 		result = REQ_RESULT_FAIL;
367 
368 	for (i = 0; i < reply->nb_received; i++) {
369 		struct malloc_mp_req *resp =
370 				(struct malloc_mp_req *)reply->msgs[i].param;
371 
372 		if (resp->t != REQ_TYPE_SYNC) {
373 			RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
374 			result = REQ_RESULT_FAIL;
375 			break;
376 		}
377 		if (resp->id != entry->user_req.id) {
378 			RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
379 			result = REQ_RESULT_FAIL;
380 			break;
381 		}
382 		if (resp->result == REQ_RESULT_FAIL) {
383 			result = REQ_RESULT_FAIL;
384 			break;
385 		}
386 	}
387 
388 	if (entry->user_req.t == REQ_TYPE_FREE) {
389 		struct rte_mp_msg msg;
390 		struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
391 
392 		memset(&msg, 0, sizeof(msg));
393 
394 		/* this is a free request, just sendmsg result */
395 		resp->t = REQ_TYPE_FREE;
396 		resp->result = result;
397 		resp->id = entry->user_req.id;
398 		msg.num_fds = 0;
399 		msg.len_param = sizeof(*resp);
400 		strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
401 
402 		if (rte_mp_sendmsg(&msg))
403 			RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
404 
405 		TAILQ_REMOVE(&mp_request_list.list, entry, next);
406 		free(entry);
407 	} else if (entry->user_req.t == REQ_TYPE_ALLOC &&
408 			result == REQ_RESULT_SUCCESS) {
409 		struct malloc_heap *heap = entry->alloc_state.heap;
410 		struct rte_mp_msg msg;
411 		struct malloc_mp_req *resp =
412 				(struct malloc_mp_req *)msg.param;
413 
414 		memset(&msg, 0, sizeof(msg));
415 
416 		heap->total_size += entry->alloc_state.map_len;
417 
418 		/* result is success, so just notify secondary about this */
419 		resp->t = REQ_TYPE_ALLOC;
420 		resp->result = result;
421 		resp->id = entry->user_req.id;
422 		msg.num_fds = 0;
423 		msg.len_param = sizeof(*resp);
424 		strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
425 
426 		if (rte_mp_sendmsg(&msg))
427 			RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
428 
429 		TAILQ_REMOVE(&mp_request_list.list, entry, next);
430 		free(entry->alloc_state.ms);
431 		free(entry);
432 	} else if (entry->user_req.t == REQ_TYPE_ALLOC &&
433 			result == REQ_RESULT_FAIL) {
434 		struct rte_mp_msg rb_msg;
435 		struct malloc_mp_req *rb =
436 				(struct malloc_mp_req *)rb_msg.param;
437 		struct timespec ts;
438 		struct primary_alloc_req_state *state =
439 				&entry->alloc_state;
440 		int ret;
441 
442 		memset(&rb_msg, 0, sizeof(rb_msg));
443 
444 		/* we've failed to sync, so do a rollback */
445 		eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
446 				state->map_addr, state->map_len);
447 
448 		rollback_expand_heap(state->ms, state->ms_len, state->elem,
449 				state->map_addr, state->map_len);
450 
451 		/* send rollback request */
452 		rb_msg.num_fds = 0;
453 		rb_msg.len_param = sizeof(*rb);
454 		strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name));
455 
456 		ts.tv_nsec = 0;
457 		ts.tv_sec = MP_TIMEOUT_S;
458 
459 		/* sync requests carry no data */
460 		rb->t = REQ_TYPE_SYNC;
461 		rb->id = entry->user_req.id;
462 
463 		/* there may be stray timeout still waiting */
464 		do {
465 			ret = rte_mp_request_async(&rb_msg, &ts,
466 					handle_rollback_response);
467 		} while (ret != 0 && rte_errno == EEXIST);
468 		if (ret != 0) {
469 			RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
470 
471 			/* we couldn't send rollback request, but that's OK -
472 			 * secondary will time out, and memory has been removed
473 			 * from heap anyway.
474 			 */
475 			TAILQ_REMOVE(&mp_request_list.list, entry, next);
476 			free(state->ms);
477 			free(entry);
478 			goto fail;
479 		}
480 	} else {
481 		RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
482 		goto fail;
483 	}
484 
485 	pthread_mutex_unlock(&mp_request_list.lock);
486 	return 0;
487 fail:
488 	pthread_mutex_unlock(&mp_request_list.lock);
489 	return -1;
490 }
491 
492 static int
handle_rollback_response(const struct rte_mp_msg * request,const struct rte_mp_reply * reply __rte_unused)493 handle_rollback_response(const struct rte_mp_msg *request,
494 		const struct rte_mp_reply *reply __rte_unused)
495 {
496 	struct rte_mp_msg msg;
497 	struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
498 	const struct malloc_mp_req *mpreq =
499 			(const struct malloc_mp_req *)request->param;
500 	struct mp_request *entry;
501 
502 	/* lock the request */
503 	pthread_mutex_lock(&mp_request_list.lock);
504 
505 	memset(&msg, 0, sizeof(msg));
506 
507 	entry = find_request_by_id(mpreq->id);
508 	if (entry == NULL) {
509 		RTE_LOG(ERR, EAL, "Wrong request ID\n");
510 		goto fail;
511 	}
512 
513 	if (entry->user_req.t != REQ_TYPE_ALLOC) {
514 		RTE_LOG(ERR, EAL, "Unexpected active request\n");
515 		goto fail;
516 	}
517 
518 	/* we don't care if rollback succeeded, request still failed */
519 	resp->t = REQ_TYPE_ALLOC;
520 	resp->result = REQ_RESULT_FAIL;
521 	resp->id = mpreq->id;
522 	msg.num_fds = 0;
523 	msg.len_param = sizeof(*resp);
524 	strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
525 
526 	if (rte_mp_sendmsg(&msg))
527 		RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
528 
529 	/* clean up */
530 	TAILQ_REMOVE(&mp_request_list.list, entry, next);
531 	free(entry->alloc_state.ms);
532 	free(entry);
533 
534 	pthread_mutex_unlock(&mp_request_list.lock);
535 	return 0;
536 fail:
537 	pthread_mutex_unlock(&mp_request_list.lock);
538 	return -1;
539 }
540 
541 /* final stage of the request from secondary */
542 static int
handle_response(const struct rte_mp_msg * msg,const void * peer __rte_unused)543 handle_response(const struct rte_mp_msg *msg, const void *peer  __rte_unused)
544 {
545 	const struct malloc_mp_req *m =
546 			(const struct malloc_mp_req *)msg->param;
547 	struct mp_request *entry;
548 
549 	pthread_mutex_lock(&mp_request_list.lock);
550 
551 	entry = find_request_by_id(m->id);
552 	if (entry != NULL) {
553 		/* update request status */
554 		entry->user_req.result = m->result;
555 
556 		entry->state = REQ_STATE_COMPLETE;
557 
558 		/* trigger thread wakeup */
559 		pthread_cond_signal(&entry->cond);
560 	}
561 
562 	pthread_mutex_unlock(&mp_request_list.lock);
563 
564 	return 0;
565 }
566 
567 /* synchronously request memory map sync, this is only called whenever primary
568  * process initiates the allocation.
569  */
570 int
request_sync(void)571 request_sync(void)
572 {
573 	struct rte_mp_msg msg;
574 	struct rte_mp_reply reply;
575 	struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
576 	struct timespec ts;
577 	int i, ret = -1;
578 
579 	memset(&msg, 0, sizeof(msg));
580 	memset(&reply, 0, sizeof(reply));
581 
582 	/* no need to create tailq entries as this is entirely synchronous */
583 
584 	msg.num_fds = 0;
585 	msg.len_param = sizeof(*req);
586 	strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name));
587 
588 	/* sync request carries no data */
589 	req->t = REQ_TYPE_SYNC;
590 	req->id = get_unique_id();
591 
592 	ts.tv_nsec = 0;
593 	ts.tv_sec = MP_TIMEOUT_S;
594 
595 	/* there may be stray timeout still waiting */
596 	do {
597 		ret = rte_mp_request_sync(&msg, &reply, &ts);
598 	} while (ret != 0 && rte_errno == EEXIST);
599 	if (ret != 0) {
600 		/* if IPC is unsupported, behave as if the call succeeded */
601 		if (rte_errno != ENOTSUP)
602 			RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
603 		else
604 			ret = 0;
605 		goto out;
606 	}
607 
608 	if (reply.nb_received != reply.nb_sent) {
609 		RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
610 		goto out;
611 	}
612 
613 	for (i = 0; i < reply.nb_received; i++) {
614 		struct malloc_mp_req *resp =
615 				(struct malloc_mp_req *)reply.msgs[i].param;
616 		if (resp->t != REQ_TYPE_SYNC) {
617 			RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
618 			goto out;
619 		}
620 		if (resp->id != req->id) {
621 			RTE_LOG(ERR, EAL, "Wrong request ID\n");
622 			goto out;
623 		}
624 		if (resp->result != REQ_RESULT_SUCCESS) {
625 			RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
626 			goto out;
627 		}
628 	}
629 
630 	ret = 0;
631 out:
632 	free(reply.msgs);
633 	return ret;
634 }
635 
636 /* this is a synchronous wrapper around a bunch of asynchronous requests to
637  * primary process. this will initiate a request and wait until responses come.
638  */
639 int
request_to_primary(struct malloc_mp_req * user_req)640 request_to_primary(struct malloc_mp_req *user_req)
641 {
642 	struct rte_mp_msg msg;
643 	struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
644 	struct mp_request *entry;
645 	struct timespec ts;
646 	struct timeval now;
647 	int ret;
648 
649 	memset(&msg, 0, sizeof(msg));
650 	memset(&ts, 0, sizeof(ts));
651 
652 	pthread_mutex_lock(&mp_request_list.lock);
653 
654 	entry = malloc(sizeof(*entry));
655 	if (entry == NULL) {
656 		RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
657 		goto fail;
658 	}
659 
660 	memset(entry, 0, sizeof(*entry));
661 
662 	if (gettimeofday(&now, NULL) < 0) {
663 		RTE_LOG(ERR, EAL, "Cannot get current time\n");
664 		goto fail;
665 	}
666 
667 	ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
668 	ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
669 			(now.tv_usec * 1000) / 1000000000;
670 
671 	/* initialize the request */
672 	pthread_cond_init(&entry->cond, NULL);
673 
674 	msg.num_fds = 0;
675 	msg.len_param = sizeof(*msg_req);
676 	strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name));
677 
678 	/* (attempt to) get a unique id */
679 	user_req->id = get_unique_id();
680 
681 	/* copy contents of user request into the message */
682 	memcpy(msg_req, user_req, sizeof(*msg_req));
683 
684 	if (rte_mp_sendmsg(&msg)) {
685 		RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
686 		goto fail;
687 	}
688 
689 	/* copy contents of user request into active request */
690 	memcpy(&entry->user_req, user_req, sizeof(*user_req));
691 
692 	/* mark request as in progress */
693 	entry->state = REQ_STATE_ACTIVE;
694 
695 	TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
696 
697 	/* finally, wait on timeout */
698 	do {
699 		ret = pthread_cond_timedwait(&entry->cond,
700 				&mp_request_list.lock, &ts);
701 	} while (ret != 0 && ret != ETIMEDOUT);
702 
703 	if (entry->state != REQ_STATE_COMPLETE) {
704 		RTE_LOG(ERR, EAL, "Request timed out\n");
705 		ret = -1;
706 	} else {
707 		ret = 0;
708 		user_req->result = entry->user_req.result;
709 	}
710 	TAILQ_REMOVE(&mp_request_list.list, entry, next);
711 	free(entry);
712 
713 	pthread_mutex_unlock(&mp_request_list.lock);
714 	return ret;
715 fail:
716 	pthread_mutex_unlock(&mp_request_list.lock);
717 	free(entry);
718 	return -1;
719 }
720 
721 int
register_mp_requests(void)722 register_mp_requests(void)
723 {
724 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
725 		/* it's OK for primary to not support IPC */
726 		if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) &&
727 				rte_errno != ENOTSUP) {
728 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
729 				MP_ACTION_REQUEST);
730 			return -1;
731 		}
732 	} else {
733 		if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
734 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
735 				MP_ACTION_SYNC);
736 			return -1;
737 		}
738 		if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
739 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
740 				MP_ACTION_SYNC);
741 			return -1;
742 		}
743 		if (rte_mp_action_register(MP_ACTION_RESPONSE,
744 				handle_response)) {
745 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
746 				MP_ACTION_RESPONSE);
747 			return -1;
748 		}
749 	}
750 	return 0;
751 }
752