New upstream version 18.11-rc1
[deb_dpdk.git] / lib / librte_eal / common / eal_common_proc.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2018 Intel Corporation
3  */
4
5 #include <dirent.h>
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <fnmatch.h>
9 #include <inttypes.h>
10 #include <libgen.h>
11 #include <limits.h>
12 #include <pthread.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <sys/file.h>
17 #include <sys/time.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/un.h>
21 #include <unistd.h>
22
23 #include <rte_alarm.h>
24 #include <rte_common.h>
25 #include <rte_cycles.h>
26 #include <rte_eal.h>
27 #include <rte_errno.h>
28 #include <rte_lcore.h>
29 #include <rte_log.h>
30 #include <rte_tailq.h>
31
32 #include "eal_private.h"
33 #include "eal_filesystem.h"
34 #include "eal_internal_cfg.h"
35
36 static int mp_fd = -1;
37 static char mp_filter[PATH_MAX];   /* Filter for secondary process sockets */
38 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */
39 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
40
41 struct action_entry {
42         TAILQ_ENTRY(action_entry) next;
43         char action_name[RTE_MP_MAX_NAME_LEN];
44         rte_mp_t action;
45 };
46
47 /** Double linked list of actions. */
48 TAILQ_HEAD(action_entry_list, action_entry);
49
50 static struct action_entry_list action_entry_list =
51         TAILQ_HEAD_INITIALIZER(action_entry_list);
52
53 enum mp_type {
54         MP_MSG, /* Share message with peers, will not block */
55         MP_REQ, /* Request for information, Will block for a reply */
56         MP_REP, /* Response to previously-received request */
57         MP_IGN, /* Response telling requester to ignore this response */
58 };
59
60 struct mp_msg_internal {
61         int type;
62         struct rte_mp_msg msg;
63 };
64
65 struct async_request_param {
66         rte_mp_async_reply_t clb;
67         struct rte_mp_reply user_reply;
68         struct timespec end;
69         int n_responses_processed;
70 };
71
72 struct pending_request {
73         TAILQ_ENTRY(pending_request) next;
74         enum {
75                 REQUEST_TYPE_SYNC,
76                 REQUEST_TYPE_ASYNC
77         } type;
78         char dst[PATH_MAX];
79         struct rte_mp_msg *request;
80         struct rte_mp_msg *reply;
81         int reply_received;
82         RTE_STD_C11
83         union {
84                 struct {
85                         struct async_request_param *param;
86                 } async;
87                 struct {
88                         pthread_cond_t cond;
89                 } sync;
90         };
91 };
92
93 TAILQ_HEAD(pending_request_list, pending_request);
94
95 static struct {
96         struct pending_request_list requests;
97         pthread_mutex_t lock;
98 } pending_requests = {
99         .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
100         .lock = PTHREAD_MUTEX_INITIALIZER,
101         /**< used in async requests only */
102 };
103
104 /* forward declarations */
105 static int
106 mp_send(struct rte_mp_msg *msg, const char *peer, int type);
107
108 /* for use with alarm callback */
109 static void
110 async_reply_handle(void *arg);
111
112 /* for use with process_msg */
113 static struct pending_request *
114 async_reply_handle_thread_unsafe(void *arg);
115
116 static void
117 trigger_async_action(struct pending_request *req);
118
119 static struct pending_request *
120 find_pending_request(const char *dst, const char *act_name)
121 {
122         struct pending_request *r;
123
124         TAILQ_FOREACH(r, &pending_requests.requests, next) {
125                 if (!strcmp(r->dst, dst) &&
126                     !strcmp(r->request->name, act_name))
127                         break;
128         }
129
130         return r;
131 }
132
133 static void
134 create_socket_path(const char *name, char *buf, int len)
135 {
136         const char *prefix = eal_mp_socket_path();
137
138         if (strlen(name) > 0)
139                 snprintf(buf, len, "%s_%s", prefix, name);
140         else
141                 strlcpy(buf, prefix, len);
142 }
143
144 int
145 rte_eal_primary_proc_alive(const char *config_file_path)
146 {
147         int config_fd;
148
149         if (config_file_path)
150                 config_fd = open(config_file_path, O_RDONLY);
151         else {
152                 const char *path;
153
154                 path = eal_runtime_config_path();
155                 config_fd = open(path, O_RDONLY);
156         }
157         if (config_fd < 0)
158                 return 0;
159
160         int ret = lockf(config_fd, F_TEST, 0);
161         close(config_fd);
162
163         return !!ret;
164 }
165
166 static struct action_entry *
167 find_action_entry_by_name(const char *name)
168 {
169         struct action_entry *entry;
170
171         TAILQ_FOREACH(entry, &action_entry_list, next) {
172                 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0)
173                         break;
174         }
175
176         return entry;
177 }
178
179 static int
180 validate_action_name(const char *name)
181 {
182         if (name == NULL) {
183                 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n");
184                 rte_errno = EINVAL;
185                 return -1;
186         }
187         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) {
188                 RTE_LOG(ERR, EAL, "Length of action name is zero\n");
189                 rte_errno = EINVAL;
190                 return -1;
191         }
192         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) {
193                 rte_errno = E2BIG;
194                 return -1;
195         }
196         return 0;
197 }
198
199 int __rte_experimental
200 rte_mp_action_register(const char *name, rte_mp_t action)
201 {
202         struct action_entry *entry;
203
204         if (validate_action_name(name))
205                 return -1;
206
207         entry = malloc(sizeof(struct action_entry));
208         if (entry == NULL) {
209                 rte_errno = ENOMEM;
210                 return -1;
211         }
212         strlcpy(entry->action_name, name, sizeof(entry->action_name));
213         entry->action = action;
214
215         pthread_mutex_lock(&mp_mutex_action);
216         if (find_action_entry_by_name(name) != NULL) {
217                 pthread_mutex_unlock(&mp_mutex_action);
218                 rte_errno = EEXIST;
219                 free(entry);
220                 return -1;
221         }
222         TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
223         pthread_mutex_unlock(&mp_mutex_action);
224         return 0;
225 }
226
227 void __rte_experimental
228 rte_mp_action_unregister(const char *name)
229 {
230         struct action_entry *entry;
231
232         if (validate_action_name(name))
233                 return;
234
235         pthread_mutex_lock(&mp_mutex_action);
236         entry = find_action_entry_by_name(name);
237         if (entry == NULL) {
238                 pthread_mutex_unlock(&mp_mutex_action);
239                 return;
240         }
241         TAILQ_REMOVE(&action_entry_list, entry, next);
242         pthread_mutex_unlock(&mp_mutex_action);
243         free(entry);
244 }
245
246 static int
247 read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
248 {
249         int msglen;
250         struct iovec iov;
251         struct msghdr msgh;
252         char control[CMSG_SPACE(sizeof(m->msg.fds))];
253         struct cmsghdr *cmsg;
254         int buflen = sizeof(*m) - sizeof(m->msg.fds);
255
256         memset(&msgh, 0, sizeof(msgh));
257         iov.iov_base = m;
258         iov.iov_len  = buflen;
259
260         msgh.msg_name = s;
261         msgh.msg_namelen = sizeof(*s);
262         msgh.msg_iov = &iov;
263         msgh.msg_iovlen = 1;
264         msgh.msg_control = control;
265         msgh.msg_controllen = sizeof(control);
266
267         msglen = recvmsg(mp_fd, &msgh, 0);
268         if (msglen < 0) {
269                 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno));
270                 return -1;
271         }
272
273         if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) {
274                 RTE_LOG(ERR, EAL, "truncted msg\n");
275                 return -1;
276         }
277
278         /* read auxiliary FDs if any */
279         for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
280                 cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
281                 if ((cmsg->cmsg_level == SOL_SOCKET) &&
282                         (cmsg->cmsg_type == SCM_RIGHTS)) {
283                         memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds));
284                         break;
285                 }
286         }
287
288         return 0;
289 }
290
291 static void
292 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
293 {
294         struct pending_request *pending_req;
295         struct action_entry *entry;
296         struct rte_mp_msg *msg = &m->msg;
297         rte_mp_t action = NULL;
298
299         RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);
300
301         if (m->type == MP_REP || m->type == MP_IGN) {
302                 struct pending_request *req = NULL;
303
304                 pthread_mutex_lock(&pending_requests.lock);
305                 pending_req = find_pending_request(s->sun_path, msg->name);
306                 if (pending_req) {
307                         memcpy(pending_req->reply, msg, sizeof(*msg));
308                         /* -1 indicates that we've been asked to ignore */
309                         pending_req->reply_received =
310                                 m->type == MP_REP ? 1 : -1;
311
312                         if (pending_req->type == REQUEST_TYPE_SYNC)
313                                 pthread_cond_signal(&pending_req->sync.cond);
314                         else if (pending_req->type == REQUEST_TYPE_ASYNC)
315                                 req = async_reply_handle_thread_unsafe(
316                                                 pending_req);
317                 } else
318                         RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
319                 pthread_mutex_unlock(&pending_requests.lock);
320
321                 if (req != NULL)
322                         trigger_async_action(req);
323                 return;
324         }
325
326         pthread_mutex_lock(&mp_mutex_action);
327         entry = find_action_entry_by_name(msg->name);
328         if (entry != NULL)
329                 action = entry->action;
330         pthread_mutex_unlock(&mp_mutex_action);
331
332         if (!action) {
333                 if (m->type == MP_REQ && !internal_config.init_complete) {
334                         /* if this is a request, and init is not yet complete,
335                          * and callback wasn't registered, we should tell the
336                          * requester to ignore our existence because we're not
337                          * yet ready to process this request.
338                          */
339                         struct rte_mp_msg dummy;
340
341                         memset(&dummy, 0, sizeof(dummy));
342                         strlcpy(dummy.name, msg->name, sizeof(dummy.name));
343                         mp_send(&dummy, s->sun_path, MP_IGN);
344                 } else {
345                         RTE_LOG(ERR, EAL, "Cannot find action: %s\n",
346                                 msg->name);
347                 }
348         } else if (action(msg, s->sun_path) < 0) {
349                 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name);
350         }
351 }
352
353 static void *
354 mp_handle(void *arg __rte_unused)
355 {
356         struct mp_msg_internal msg;
357         struct sockaddr_un sa;
358
359         while (1) {
360                 if (read_msg(&msg, &sa) == 0)
361                         process_msg(&msg, &sa);
362         }
363
364         return NULL;
365 }
366
367 static int
368 timespec_cmp(const struct timespec *a, const struct timespec *b)
369 {
370         if (a->tv_sec < b->tv_sec)
371                 return -1;
372         if (a->tv_sec > b->tv_sec)
373                 return 1;
374         if (a->tv_nsec < b->tv_nsec)
375                 return -1;
376         if (a->tv_nsec > b->tv_nsec)
377                 return 1;
378         return 0;
379 }
380
381 enum async_action {
382         ACTION_FREE, /**< free the action entry, but don't trigger callback */
383         ACTION_TRIGGER /**< trigger callback, then free action entry */
384 };
385
386 static enum async_action
387 process_async_request(struct pending_request *sr, const struct timespec *now)
388 {
389         struct async_request_param *param;
390         struct rte_mp_reply *reply;
391         bool timeout, last_msg;
392
393         param = sr->async.param;
394         reply = &param->user_reply;
395
396         /* did we timeout? */
397         timeout = timespec_cmp(&param->end, now) <= 0;
398
399         /* if we received a response, adjust relevant data and copy mesasge. */
400         if (sr->reply_received == 1 && sr->reply) {
401                 struct rte_mp_msg *msg, *user_msgs, *tmp;
402
403                 msg = sr->reply;
404                 user_msgs = reply->msgs;
405
406                 tmp = realloc(user_msgs, sizeof(*msg) *
407                                 (reply->nb_received + 1));
408                 if (!tmp) {
409                         RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
410                                 sr->dst, sr->request->name);
411                         /* this entry is going to be removed and its message
412                          * dropped, but we don't want to leak memory, so
413                          * continue.
414                          */
415                 } else {
416                         user_msgs = tmp;
417                         reply->msgs = user_msgs;
418                         memcpy(&user_msgs[reply->nb_received],
419                                         msg, sizeof(*msg));
420                         reply->nb_received++;
421                 }
422
423                 /* mark this request as processed */
424                 param->n_responses_processed++;
425         } else if (sr->reply_received == -1) {
426                 /* we were asked to ignore this process */
427                 reply->nb_sent--;
428         } else if (timeout) {
429                 /* count it as processed response, but don't increment
430                  * nb_received.
431                  */
432                 param->n_responses_processed++;
433         }
434
435         free(sr->reply);
436
437         last_msg = param->n_responses_processed == reply->nb_sent;
438
439         return last_msg ? ACTION_TRIGGER : ACTION_FREE;
440 }
441
442 static void
443 trigger_async_action(struct pending_request *sr)
444 {
445         struct async_request_param *param;
446         struct rte_mp_reply *reply;
447
448         param = sr->async.param;
449         reply = &param->user_reply;
450
451         param->clb(sr->request, reply);
452
453         /* clean up */
454         free(sr->async.param->user_reply.msgs);
455         free(sr->async.param);
456         free(sr->request);
457         free(sr);
458 }
459
460 static struct pending_request *
461 async_reply_handle_thread_unsafe(void *arg)
462 {
463         struct pending_request *req = (struct pending_request *)arg;
464         enum async_action action;
465         struct timespec ts_now;
466         struct timeval now;
467
468         if (gettimeofday(&now, NULL) < 0) {
469                 RTE_LOG(ERR, EAL, "Cannot get current time\n");
470                 goto no_trigger;
471         }
472         ts_now.tv_nsec = now.tv_usec * 1000;
473         ts_now.tv_sec = now.tv_sec;
474
475         action = process_async_request(req, &ts_now);
476
477         TAILQ_REMOVE(&pending_requests.requests, req, next);
478
479         if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) {
480                 /* if we failed to cancel the alarm because it's already in
481                  * progress, don't proceed because otherwise we will end up
482                  * handling the same message twice.
483                  */
484                 if (rte_errno == EINPROGRESS) {
485                         RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n");
486                         goto no_trigger;
487                 }
488                 RTE_LOG(ERR, EAL, "Failed to cancel alarm\n");
489         }
490
491         if (action == ACTION_TRIGGER)
492                 return req;
493 no_trigger:
494         free(req);
495         return NULL;
496 }
497
498 static void
499 async_reply_handle(void *arg)
500 {
501         struct pending_request *req;
502
503         pthread_mutex_lock(&pending_requests.lock);
504         req = async_reply_handle_thread_unsafe(arg);
505         pthread_mutex_unlock(&pending_requests.lock);
506
507         if (req != NULL)
508                 trigger_async_action(req);
509 }
510
511 static int
512 open_socket_fd(void)
513 {
514         char peer_name[PATH_MAX] = {0};
515         struct sockaddr_un un;
516
517         if (rte_eal_process_type() == RTE_PROC_SECONDARY)
518                 snprintf(peer_name, sizeof(peer_name),
519                                 "%d_%"PRIx64, getpid(), rte_rdtsc());
520
521         mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
522         if (mp_fd < 0) {
523                 RTE_LOG(ERR, EAL, "failed to create unix socket\n");
524                 return -1;
525         }
526
527         memset(&un, 0, sizeof(un));
528         un.sun_family = AF_UNIX;
529
530         create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path));
531
532         unlink(un.sun_path); /* May still exist since last run */
533
534         if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
535                 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
536                         un.sun_path, strerror(errno));
537                 close(mp_fd);
538                 return -1;
539         }
540
541         RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path);
542         return mp_fd;
543 }
544
545 static int
546 unlink_sockets(const char *filter)
547 {
548         int dir_fd;
549         DIR *mp_dir;
550         struct dirent *ent;
551
552         mp_dir = opendir(mp_dir_path);
553         if (!mp_dir) {
554                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
555                 return -1;
556         }
557         dir_fd = dirfd(mp_dir);
558
559         while ((ent = readdir(mp_dir))) {
560                 if (fnmatch(filter, ent->d_name, 0) == 0)
561                         unlinkat(dir_fd, ent->d_name, 0);
562         }
563
564         closedir(mp_dir);
565         return 0;
566 }
567
568 int
569 rte_mp_channel_init(void)
570 {
571         char path[PATH_MAX];
572         int dir_fd;
573         pthread_t mp_handle_tid;
574
575         /* in no shared files mode, we do not have secondary processes support,
576          * so no need to initialize IPC.
577          */
578         if (internal_config.no_shconf) {
579                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC will be disabled\n");
580                 return 0;
581         }
582
583         /* create filter path */
584         create_socket_path("*", path, sizeof(path));
585         strlcpy(mp_filter, basename(path), sizeof(mp_filter));
586
587         /* path may have been modified, so recreate it */
588         create_socket_path("*", path, sizeof(path));
589         strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path));
590
591         /* lock the directory */
592         dir_fd = open(mp_dir_path, O_RDONLY);
593         if (dir_fd < 0) {
594                 RTE_LOG(ERR, EAL, "failed to open %s: %s\n",
595                         mp_dir_path, strerror(errno));
596                 return -1;
597         }
598
599         if (flock(dir_fd, LOCK_EX)) {
600                 RTE_LOG(ERR, EAL, "failed to lock %s: %s\n",
601                         mp_dir_path, strerror(errno));
602                 close(dir_fd);
603                 return -1;
604         }
605
606         if (rte_eal_process_type() == RTE_PROC_PRIMARY &&
607                         unlink_sockets(mp_filter)) {
608                 RTE_LOG(ERR, EAL, "failed to unlink mp sockets\n");
609                 close(dir_fd);
610                 return -1;
611         }
612
613         if (open_socket_fd() < 0) {
614                 close(dir_fd);
615                 return -1;
616         }
617
618         if (rte_ctrl_thread_create(&mp_handle_tid, "rte_mp_handle",
619                         NULL, mp_handle, NULL) < 0) {
620                 RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
621                         strerror(errno));
622                 close(mp_fd);
623                 close(dir_fd);
624                 mp_fd = -1;
625                 return -1;
626         }
627
628         /* unlock the directory */
629         flock(dir_fd, LOCK_UN);
630         close(dir_fd);
631
632         return 0;
633 }
634
635 /**
636  * Return -1, as fail to send message and it's caused by the local side.
637  * Return 0, as fail to send message and it's caused by the remote side.
638  * Return 1, as succeed to send message.
639  *
640  */
641 static int
642 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
643 {
644         int snd;
645         struct iovec iov;
646         struct msghdr msgh;
647         struct cmsghdr *cmsg;
648         struct sockaddr_un dst;
649         struct mp_msg_internal m;
650         int fd_size = msg->num_fds * sizeof(int);
651         char control[CMSG_SPACE(fd_size)];
652
653         m.type = type;
654         memcpy(&m.msg, msg, sizeof(*msg));
655
656         memset(&dst, 0, sizeof(dst));
657         dst.sun_family = AF_UNIX;
658         strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path));
659
660         memset(&msgh, 0, sizeof(msgh));
661         memset(control, 0, sizeof(control));
662
663         iov.iov_base = &m;
664         iov.iov_len = sizeof(m) - sizeof(msg->fds);
665
666         msgh.msg_name = &dst;
667         msgh.msg_namelen = sizeof(dst);
668         msgh.msg_iov = &iov;
669         msgh.msg_iovlen = 1;
670         msgh.msg_control = control;
671         msgh.msg_controllen = sizeof(control);
672
673         cmsg = CMSG_FIRSTHDR(&msgh);
674         cmsg->cmsg_len = CMSG_LEN(fd_size);
675         cmsg->cmsg_level = SOL_SOCKET;
676         cmsg->cmsg_type = SCM_RIGHTS;
677         memcpy(CMSG_DATA(cmsg), msg->fds, fd_size);
678
679         do {
680                 snd = sendmsg(mp_fd, &msgh, 0);
681         } while (snd < 0 && errno == EINTR);
682
683         if (snd < 0) {
684                 rte_errno = errno;
685                 /* Check if it caused by peer process exits */
686                 if (errno == ECONNREFUSED &&
687                                 rte_eal_process_type() == RTE_PROC_PRIMARY) {
688                         unlink(dst_path);
689                         return 0;
690                 }
691                 if (errno == ENOBUFS) {
692                         RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n",
693                                 dst_path);
694                         return 0;
695                 }
696                 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n",
697                         dst_path, strerror(errno));
698                 return -1;
699         }
700
701         return 1;
702 }
703
704 static int
705 mp_send(struct rte_mp_msg *msg, const char *peer, int type)
706 {
707         int dir_fd, ret = 0;
708         DIR *mp_dir;
709         struct dirent *ent;
710
711         if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY))
712                 peer = eal_mp_socket_path();
713
714         if (peer) {
715                 if (send_msg(peer, msg, type) < 0)
716                         return -1;
717                 else
718                         return 0;
719         }
720
721         /* broadcast to all secondary processes */
722         mp_dir = opendir(mp_dir_path);
723         if (!mp_dir) {
724                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n",
725                                 mp_dir_path);
726                 rte_errno = errno;
727                 return -1;
728         }
729
730         dir_fd = dirfd(mp_dir);
731         /* lock the directory to prevent processes spinning up while we send */
732         if (flock(dir_fd, LOCK_SH)) {
733                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
734                         mp_dir_path);
735                 rte_errno = errno;
736                 closedir(mp_dir);
737                 return -1;
738         }
739
740         while ((ent = readdir(mp_dir))) {
741                 char path[PATH_MAX];
742
743                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
744                         continue;
745
746                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
747                          ent->d_name);
748                 if (send_msg(path, msg, type) < 0)
749                         ret = -1;
750         }
751         /* unlock the dir */
752         flock(dir_fd, LOCK_UN);
753
754         /* dir_fd automatically closed on closedir */
755         closedir(mp_dir);
756         return ret;
757 }
758
759 static bool
760 check_input(const struct rte_mp_msg *msg)
761 {
762         if (msg == NULL) {
763                 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n");
764                 rte_errno = EINVAL;
765                 return false;
766         }
767
768         if (validate_action_name(msg->name))
769                 return false;
770
771         if (msg->len_param > RTE_MP_MAX_PARAM_LEN) {
772                 RTE_LOG(ERR, EAL, "Message data is too long\n");
773                 rte_errno = E2BIG;
774                 return false;
775         }
776
777         if (msg->num_fds > RTE_MP_MAX_FD_NUM) {
778                 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n",
779                         RTE_MP_MAX_FD_NUM);
780                 rte_errno = E2BIG;
781                 return false;
782         }
783
784         return true;
785 }
786
787 int __rte_experimental
788 rte_mp_sendmsg(struct rte_mp_msg *msg)
789 {
790         if (!check_input(msg))
791                 return -1;
792
793         RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name);
794         return mp_send(msg, NULL, MP_MSG);
795 }
796
797 static int
798 mp_request_async(const char *dst, struct rte_mp_msg *req,
799                 struct async_request_param *param, const struct timespec *ts)
800 {
801         struct rte_mp_msg *reply_msg;
802         struct pending_request *pending_req, *exist;
803         int ret;
804
805         pending_req = calloc(1, sizeof(*pending_req));
806         reply_msg = calloc(1, sizeof(*reply_msg));
807         if (pending_req == NULL || reply_msg == NULL) {
808                 RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
809                 rte_errno = ENOMEM;
810                 ret = -1;
811                 goto fail;
812         }
813
814         pending_req->type = REQUEST_TYPE_ASYNC;
815         strlcpy(pending_req->dst, dst, sizeof(pending_req->dst));
816         pending_req->request = req;
817         pending_req->reply = reply_msg;
818         pending_req->async.param = param;
819
820         /* queue already locked by caller */
821
822         exist = find_pending_request(dst, req->name);
823         if (exist) {
824                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
825                 rte_errno = EEXIST;
826                 ret = -1;
827                 goto fail;
828         }
829
830         ret = send_msg(dst, req, MP_REQ);
831         if (ret < 0) {
832                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
833                         dst, req->name);
834                 ret = -1;
835                 goto fail;
836         } else if (ret == 0) {
837                 ret = 0;
838                 goto fail;
839         }
840         TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next);
841
842         param->user_reply.nb_sent++;
843
844         if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000,
845                               async_reply_handle, pending_req) < 0) {
846                 RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n",
847                         dst, req->name);
848                 rte_panic("Fix the above shit to properly free all memory\n");
849         }
850
851         return 0;
852 fail:
853         free(pending_req);
854         free(reply_msg);
855         return ret;
856 }
857
858 static int
859 mp_request_sync(const char *dst, struct rte_mp_msg *req,
860                struct rte_mp_reply *reply, const struct timespec *ts)
861 {
862         int ret;
863         struct rte_mp_msg msg, *tmp;
864         struct pending_request pending_req, *exist;
865
866         pending_req.type = REQUEST_TYPE_SYNC;
867         pending_req.reply_received = 0;
868         strlcpy(pending_req.dst, dst, sizeof(pending_req.dst));
869         pending_req.request = req;
870         pending_req.reply = &msg;
871         pthread_cond_init(&pending_req.sync.cond, NULL);
872
873         exist = find_pending_request(dst, req->name);
874         if (exist) {
875                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
876                 rte_errno = EEXIST;
877                 return -1;
878         }
879
880         ret = send_msg(dst, req, MP_REQ);
881         if (ret < 0) {
882                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
883                         dst, req->name);
884                 return -1;
885         } else if (ret == 0)
886                 return 0;
887
888         TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next);
889
890         reply->nb_sent++;
891
892         do {
893                 ret = pthread_cond_timedwait(&pending_req.sync.cond,
894                                 &pending_requests.lock, ts);
895         } while (ret != 0 && ret != ETIMEDOUT);
896
897         TAILQ_REMOVE(&pending_requests.requests, &pending_req, next);
898
899         if (pending_req.reply_received == 0) {
900                 RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n",
901                         dst, req->name);
902                 rte_errno = ETIMEDOUT;
903                 return -1;
904         }
905         if (pending_req.reply_received == -1) {
906                 RTE_LOG(DEBUG, EAL, "Asked to ignore response\n");
907                 /* not receiving this message is not an error, so decrement
908                  * number of sent messages
909                  */
910                 reply->nb_sent--;
911                 return 0;
912         }
913
914         tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1));
915         if (!tmp) {
916                 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
917                         dst, req->name);
918                 rte_errno = ENOMEM;
919                 return -1;
920         }
921         memcpy(&tmp[reply->nb_received], &msg, sizeof(msg));
922         reply->msgs = tmp;
923         reply->nb_received++;
924         return 0;
925 }
926
927 int __rte_experimental
928 rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply,
929                 const struct timespec *ts)
930 {
931         int dir_fd, ret = 0;
932         DIR *mp_dir;
933         struct dirent *ent;
934         struct timeval now;
935         struct timespec end;
936
937         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
938
939         if (check_input(req) == false)
940                 return -1;
941
942         reply->nb_sent = 0;
943         reply->nb_received = 0;
944         reply->msgs = NULL;
945
946         if (internal_config.no_shconf) {
947                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
948                 return 0;
949         }
950
951         if (gettimeofday(&now, NULL) < 0) {
952                 RTE_LOG(ERR, EAL, "Failed to get current time\n");
953                 rte_errno = errno;
954                 return -1;
955         }
956
957         end.tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
958         end.tv_sec = now.tv_sec + ts->tv_sec +
959                         (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
960
961         /* for secondary process, send request to the primary process only */
962         if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
963                 pthread_mutex_lock(&pending_requests.lock);
964                 ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end);
965                 pthread_mutex_unlock(&pending_requests.lock);
966                 return ret;
967         }
968
969         /* for primary process, broadcast request, and collect reply 1 by 1 */
970         mp_dir = opendir(mp_dir_path);
971         if (!mp_dir) {
972                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
973                 rte_errno = errno;
974                 return -1;
975         }
976
977         dir_fd = dirfd(mp_dir);
978         /* lock the directory to prevent processes spinning up while we send */
979         if (flock(dir_fd, LOCK_SH)) {
980                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
981                         mp_dir_path);
982                 closedir(mp_dir);
983                 rte_errno = errno;
984                 return -1;
985         }
986
987         pthread_mutex_lock(&pending_requests.lock);
988         while ((ent = readdir(mp_dir))) {
989                 char path[PATH_MAX];
990
991                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
992                         continue;
993
994                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
995                          ent->d_name);
996
997                 /* unlocks the mutex while waiting for response,
998                  * locks on receive
999                  */
1000                 if (mp_request_sync(path, req, reply, &end))
1001                         ret = -1;
1002         }
1003         pthread_mutex_unlock(&pending_requests.lock);
1004         /* unlock the directory */
1005         flock(dir_fd, LOCK_UN);
1006
1007         /* dir_fd automatically closed on closedir */
1008         closedir(mp_dir);
1009         return ret;
1010 }
1011
1012 int __rte_experimental
1013 rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
1014                 rte_mp_async_reply_t clb)
1015 {
1016         struct rte_mp_msg *copy;
1017         struct pending_request *dummy;
1018         struct async_request_param *param;
1019         struct rte_mp_reply *reply;
1020         int dir_fd, ret = 0;
1021         DIR *mp_dir;
1022         struct dirent *ent;
1023         struct timeval now;
1024         struct timespec *end;
1025         bool dummy_used = false;
1026
1027         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
1028
1029         if (check_input(req) == false)
1030                 return -1;
1031
1032         if (internal_config.no_shconf) {
1033                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
1034                 return 0;
1035         }
1036
1037         if (gettimeofday(&now, NULL) < 0) {
1038                 RTE_LOG(ERR, EAL, "Faile to get current time\n");
1039                 rte_errno = errno;
1040                 return -1;
1041         }
1042         copy = calloc(1, sizeof(*copy));
1043         dummy = calloc(1, sizeof(*dummy));
1044         param = calloc(1, sizeof(*param));
1045         if (copy == NULL || dummy == NULL || param == NULL) {
1046                 RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
1047                 rte_errno = ENOMEM;
1048                 goto fail;
1049         }
1050
1051         /* copy message */
1052         memcpy(copy, req, sizeof(*copy));
1053
1054         param->n_responses_processed = 0;
1055         param->clb = clb;
1056         end = &param->end;
1057         reply = &param->user_reply;
1058
1059         end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
1060         end->tv_sec = now.tv_sec + ts->tv_sec +
1061                         (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
1062         reply->nb_sent = 0;
1063         reply->nb_received = 0;
1064         reply->msgs = NULL;
1065
1066         /* we have to lock the request queue here, as we will be adding a bunch
1067          * of requests to the queue at once, and some of the replies may arrive
1068          * before we add all of the requests to the queue.
1069          */
1070         pthread_mutex_lock(&pending_requests.lock);
1071
1072         /* we have to ensure that callback gets triggered even if we don't send
1073          * anything, therefore earlier we have allocated a dummy request. fill
1074          * it, and put it on the queue if we don't send any requests.
1075          */
1076         dummy->type = REQUEST_TYPE_ASYNC;
1077         dummy->request = copy;
1078         dummy->reply = NULL;
1079         dummy->async.param = param;
1080         dummy->reply_received = 1; /* short-circuit the timeout */
1081
1082         /* for secondary process, send request to the primary process only */
1083         if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1084                 ret = mp_request_async(eal_mp_socket_path(), copy, param, ts);
1085
1086                 /* if we didn't send anything, put dummy request on the queue */
1087                 if (ret == 0 && reply->nb_sent == 0) {
1088                         TAILQ_INSERT_TAIL(&pending_requests.requests, dummy,
1089                                         next);
1090                         dummy_used = true;
1091                 }
1092
1093                 pthread_mutex_unlock(&pending_requests.lock);
1094
1095                 /* if we couldn't send anything, clean up */
1096                 if (ret != 0)
1097                         goto fail;
1098                 return 0;
1099         }
1100
1101         /* for primary process, broadcast request */
1102         mp_dir = opendir(mp_dir_path);
1103         if (!mp_dir) {
1104                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
1105                 rte_errno = errno;
1106                 goto unlock_fail;
1107         }
1108         dir_fd = dirfd(mp_dir);
1109
1110         /* lock the directory to prevent processes spinning up while we send */
1111         if (flock(dir_fd, LOCK_SH)) {
1112                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
1113                         mp_dir_path);
1114                 rte_errno = errno;
1115                 goto closedir_fail;
1116         }
1117
1118         while ((ent = readdir(mp_dir))) {
1119                 char path[PATH_MAX];
1120
1121                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
1122                         continue;
1123
1124                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
1125                          ent->d_name);
1126
1127                 if (mp_request_async(path, copy, param, ts))
1128                         ret = -1;
1129         }
1130         /* if we didn't send anything, put dummy request on the queue */
1131         if (ret == 0 && reply->nb_sent == 0) {
1132                 TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next);
1133                 dummy_used = true;
1134         }
1135
1136         /* finally, unlock the queue */
1137         pthread_mutex_unlock(&pending_requests.lock);
1138
1139         /* unlock the directory */
1140         flock(dir_fd, LOCK_UN);
1141
1142         /* dir_fd automatically closed on closedir */
1143         closedir(mp_dir);
1144
1145         /* if dummy was unused, free it */
1146         if (!dummy_used)
1147                 free(dummy);
1148
1149         return ret;
1150 closedir_fail:
1151         closedir(mp_dir);
1152 unlock_fail:
1153         pthread_mutex_unlock(&pending_requests.lock);
1154 fail:
1155         free(dummy);
1156         free(param);
1157         free(copy);
1158         return -1;
1159 }
1160
1161 int __rte_experimental
1162 rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
1163 {
1164         RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);
1165
1166         if (check_input(msg) == false)
1167                 return -1;
1168
1169         if (peer == NULL) {
1170                 RTE_LOG(ERR, EAL, "peer is not specified\n");
1171                 rte_errno = EINVAL;
1172                 return -1;
1173         }
1174
1175         if (internal_config.no_shconf) {
1176                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
1177                 return 0;
1178         }
1179
1180         return mp_send(msg, peer, MP_REP);
1181 }