FD.io VPP  v21.06-1-gbb7418cf9
Vector Packet Processing
client.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <assert.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <stddef.h>
19 #include <sys/types.h>
20 #include <sys/socket.h>
21 #include <sys/mman.h>
22 #include <sys/stat.h>
23 #include <netinet/in.h>
24 #include <netdb.h>
25 #include <signal.h>
26 #include <stdbool.h>
27 #include <vnet/vnet.h>
28 #include <vlib/vlib.h>
29 #include <vlib/unix/unix.h>
30 #include <vlibapi/api.h>
31 #include <vlibmemory/api.h>
32 
33 #include <vpp/api/vpe_msg_enum.h>
34 
35 #include "vppapiclient.h"
36 
40 
41 /*
42  * Asynchronous mode:
43  * Client registers a callback. All messages are sent to the callback.
44  * Synchronous mode:
45  * Client calls blocking read().
46  * Clients are expected to collate events on a queue.
47  * vac_write() -> suspends RX thread
48  * vac_read() -> resumes RX thread
49  */
50 
51 #define vl_typedefs /* define message structures */
52 #include <vpp/api/vpe_all_api_h.h>
53 #undef vl_typedefs
54 
55 #define vl_endianfun /* define message structures */
56 #include <vpp/api/vpe_all_api_h.h>
57 #undef vl_endianfun
58 
59 typedef struct {
61  pthread_t rx_thread_handle;
63  pthread_mutex_t queue_lock;
64  pthread_cond_t suspend_cv;
65  pthread_cond_t resume_cv;
66  pthread_mutex_t timeout_lock;
68  pthread_cond_t timeout_cv;
69  pthread_cond_t timeout_cancel_cv;
70  pthread_cond_t terminate_cv;
71 } vac_main_t;
72 
76 bool rx_is_running = false;
78 
79 /* Only ever allocate one heap */
80 bool mem_initialized = false;
81 
82 static void
83 init (void)
84 {
85  vac_main_t *pm = &vac_main;
86  clib_memset(pm, 0, sizeof(*pm));
87  pthread_mutex_init(&pm->queue_lock, NULL);
88  pthread_cond_init(&pm->suspend_cv, NULL);
89  pthread_cond_init(&pm->resume_cv, NULL);
90  pthread_mutex_init(&pm->timeout_lock, NULL);
91  pm->timeout_loop = 1;
92  pthread_cond_init(&pm->timeout_cv, NULL);
93  pthread_cond_init(&pm->timeout_cancel_cv, NULL);
94  pthread_cond_init(&pm->terminate_cv, NULL);
95 }
96 
97 static void
98 cleanup (void)
99 {
100  vac_main_t *pm = &vac_main;
101  pthread_mutex_destroy(&pm->queue_lock);
102  pthread_cond_destroy(&pm->suspend_cv);
103  pthread_cond_destroy(&pm->resume_cv);
104  pthread_mutex_destroy(&pm->timeout_lock);
105  pthread_cond_destroy(&pm->timeout_cv);
106  pthread_cond_destroy(&pm->timeout_cancel_cv);
107  pthread_cond_destroy(&pm->terminate_cv);
108  clib_memset(pm, 0, sizeof(*pm));
109 }
110 
111 /*
112  * Satisfy external references when -lvlib is not available.
113  */
114 void vlib_cli_output (struct vlib_main_t * vm, char * fmt, ...)
115 {
116  clib_warning ("vlib_cli_output called...");
117 }
118 
119 void
120 vac_free (void * msg)
121 {
122  vl_msg_api_free (msg);
123 }
124 
125 static void
126 vac_api_handler (void *msg)
127 {
128  u16 id = ntohs(*((u16 *)msg));
129  msgbuf_t *msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
130  int l = ntohl(msgbuf->data_len);
131  if (l == 0)
132  clib_warning("Message ID %d has wrong length: %d\n", id, l);
133 
134  /* Call Python callback */
135  ASSERT(vac_callback);
136  (vac_callback)(msg, l);
137  vac_free(msg);
138 }
139 
140 static void *
141 vac_rx_thread_fn (void *arg)
142 {
143  svm_queue_t *q;
145  vl_api_memclnt_keepalive_reply_t *rmp;
146  vac_main_t *pm = &vac_main;
148  vl_shmem_hdr_t *shmem_hdr;
149  uword msg;
150 
151  q = am->vl_input_queue;
152 
153  while (1)
154  while (!svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0))
155  {
156  VL_MSG_API_UNPOISON((void *)msg);
157  u16 id = ntohs(*((u16 *)msg));
158  switch (id) {
159  case VL_API_RX_THREAD_EXIT:
160  vl_msg_api_free((void *) msg);
161  /* signal waiting threads that this thread is about to terminate */
162  pthread_mutex_lock(&pm->queue_lock);
163  rx_thread_done = true;
164  pthread_cond_signal(&pm->terminate_cv);
165  pthread_mutex_unlock(&pm->queue_lock);
166  pthread_exit(0);
167  return 0;
168  break;
169 
170  case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
171  vl_msg_api_free((void * )msg);
172  /* Suspend thread and signal reader */
173  pthread_mutex_lock(&pm->queue_lock);
174  pthread_cond_signal(&pm->suspend_cv);
175  /* Wait for the resume signal */
176  pthread_cond_wait (&pm->resume_cv, &pm->queue_lock);
177  pthread_mutex_unlock(&pm->queue_lock);
178  break;
179 
180  case VL_API_MEMCLNT_READ_TIMEOUT:
181  clib_warning("Received read timeout in async thread\n");
182  vl_msg_api_free((void *) msg);
183  break;
184 
185  case VL_API_MEMCLNT_KEEPALIVE:
186  mp = (void *)msg;
187  rmp = vl_msg_api_alloc (sizeof (*rmp));
188  clib_memset (rmp, 0, sizeof (*rmp));
189  rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
190  rmp->context = mp->context;
191  shmem_hdr = am->shmem_hdr;
192  vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
193  vl_msg_api_free((void *) msg);
194  break;
195 
196  default:
197  vac_api_handler((void *)msg);
198  }
199  }
200 }
201 
202 static void *
204 {
206  vac_main_t *pm = &vac_main;
208  struct timespec ts;
209  struct timeval tv;
210  int rv;
211 
212  while (pm->timeout_loop)
213  {
214  /* Wait for poke */
215  pthread_mutex_lock(&pm->timeout_lock);
216  while (!timeout_in_progress)
217  pthread_cond_wait (&pm->timeout_cv, &pm->timeout_lock);
218 
219  /* Starting timer */
220  gettimeofday(&tv, NULL);
221  ts.tv_sec = tv.tv_sec + read_timeout;
222  ts.tv_nsec = 0;
223 
224  if (!timeout_cancelled) {
225  rv = pthread_cond_timedwait (&pm->timeout_cancel_cv,
226  &pm->timeout_lock, &ts);
227  if (rv == ETIMEDOUT && !timeout_thread_cancelled) {
228  ep = vl_msg_api_alloc (sizeof (*ep));
229  ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_READ_TIMEOUT);
231  }
232  }
233 
234  pthread_mutex_unlock(&pm->timeout_lock);
235  }
236  pthread_exit(0);
237 }
238 
239 void
241 {
243  vac_main_t *pm = &vac_main;
245 
246  if (!pm->rx_thread_handle) return;
247  pthread_mutex_lock(&pm->queue_lock);
248  if (rx_is_running)
249  {
250  ep = vl_msg_api_alloc (sizeof (*ep));
251  ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_RX_THREAD_SUSPEND);
253  /* Wait for RX thread to tell us it has suspended */
254  pthread_cond_wait(&pm->suspend_cv, &pm->queue_lock);
255  rx_is_running = false;
256  }
257  pthread_mutex_unlock(&pm->queue_lock);
258 }
259 
260 void
262 {
263  vac_main_t *pm = &vac_main;
264  if (!pm->rx_thread_handle) return;
265  pthread_mutex_lock(&pm->queue_lock);
266  if (rx_is_running) goto unlock;
267  pthread_cond_signal(&pm->resume_cv);
268  rx_is_running = true;
269  unlock:
270  pthread_mutex_unlock(&pm->queue_lock);
271 }
272 
273 static uword *
275 {
277  return (am->msg_index_by_name_and_crc);
278 }
279 
280 int
282 {
285 }
286 
287 int
288 vac_connect (char * name, char * chroot_prefix, vac_callback_t cb,
289  int rx_qlen)
290 {
291  rx_thread_done = false;
292  int rv = 0;
293  vac_main_t *pm = &vac_main;
294 
296  init();
297  if (chroot_prefix != NULL)
298  vl_set_memory_root_path (chroot_prefix);
299 
300  if ((rv = vl_client_api_map("/vpe-api"))) {
301  clib_warning ("vl_client_api_map returned %d", rv);
302  return rv;
303  }
304 
305  if (vl_client_connect(name, 0, rx_qlen) < 0) {
307  return (-1);
308  }
309 
310  if (cb) {
311  /* Start the rx queue thread */
312  rv = pthread_create(&pm->rx_thread_handle, NULL, vac_rx_thread_fn, 0);
313  if (rv) {
314  clib_warning("pthread_create returned %d", rv);
316  return (-1);
317  }
318  vac_callback = cb;
319  rx_is_running = true;
320  }
321 
322  /* Start read timeout thread */
323  rv = pthread_create(&pm->timeout_thread_handle, NULL,
325  if (rv) {
326  clib_warning("pthread_create returned %d", rv);
328  return (-1);
329  }
330 
331  pm->connected_to_vlib = 1;
332 
333  return (0);
334 }
335 static void
336 set_timeout (unsigned short timeout)
337 {
338  vac_main_t *pm = &vac_main;
339  pthread_mutex_lock(&pm->timeout_lock);
340  read_timeout = timeout;
341  timeout_in_progress = true;
342  timeout_cancelled = false;
343  pthread_cond_signal(&pm->timeout_cv);
344  pthread_mutex_unlock(&pm->timeout_lock);
345 }
346 
347 static void
349 {
350  vac_main_t *pm = &vac_main;
351  pthread_mutex_lock(&pm->timeout_lock);
352  timeout_in_progress = false;
353  timeout_cancelled = true;
354  pthread_cond_signal(&pm->timeout_cancel_cv);
355  pthread_mutex_unlock(&pm->timeout_lock);
356 }
357 
358 int
360 {
362  vac_main_t *pm = &vac_main;
363  uword junk;
364  int rv = 0;
365 
366  if (!pm->connected_to_vlib) return 0;
367 
368  if (pm->rx_thread_handle) {
370  ep = vl_msg_api_alloc (sizeof (*ep));
371  ep->_vl_msg_id = ntohs(VL_API_RX_THREAD_EXIT);
373 
374  /* wait (with timeout) until RX thread has finished */
375  struct timespec ts;
376  struct timeval tv;
377  gettimeofday(&tv, NULL);
378  ts.tv_sec = tv.tv_sec + 5;
379  ts.tv_nsec = 0;
380 
381  pthread_mutex_lock(&pm->queue_lock);
382  if (rx_thread_done == false)
383  rv = pthread_cond_timedwait(&pm->terminate_cv, &pm->queue_lock, &ts);
384  pthread_mutex_unlock(&pm->queue_lock);
385 
386  /* now join so we wait until thread has -really- finished */
387  if (rv == ETIMEDOUT)
388  pthread_cancel(pm->rx_thread_handle);
389  else
390  pthread_join(pm->rx_thread_handle, (void **) &junk);
391  }
392  if (pm->timeout_thread_handle) {
393  /* cancel, wake then join the timeout thread */
394  pm->timeout_loop = 0;
395  timeout_thread_cancelled = true;
396  set_timeout(0);
397  pthread_join(pm->timeout_thread_handle, (void **) &junk);
398  }
399 
402  //vac_callback = 0;
403 
404  cleanup();
405 
406  return (0);
407 }
408 
409 int
410 vac_read (char **p, int *l, u16 timeout)
411 {
412  svm_queue_t *q;
414  vac_main_t *pm = &vac_main;
416  vl_api_memclnt_keepalive_reply_t *rmp;
417  uword msg;
418  msgbuf_t *msgbuf;
419  int rv;
420  vl_shmem_hdr_t *shmem_hdr;
421 
422  /* svm_queue_sub(below) returns {-1, -2} */
423  if (!pm->connected_to_vlib)
424  return VAC_NOT_CONNECTED;
425 
426  *l = 0;
427 
428  /* svm_queue_sub(below) returns {-1, -2} */
429  if (am->our_pid == 0)
430  return (VAC_SHM_NOT_READY);
431 
432  /* Poke timeout thread */
433  if (timeout)
434  set_timeout(timeout);
435 
436  q = am->vl_input_queue;
437 
438  again:
439  rv = svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0);
440 
441  if (rv == 0) {
442  VL_MSG_API_UNPOISON((void *)msg);
443  u16 msg_id = ntohs(*((u16 *)msg));
444  switch (msg_id) {
445  case VL_API_RX_THREAD_EXIT:
446  vl_msg_api_free((void *) msg);
447  goto error;
448  case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
449  goto error;
450  case VL_API_MEMCLNT_READ_TIMEOUT:
451  goto error;
452  case VL_API_MEMCLNT_KEEPALIVE:
453  /* Handle an alive-check ping from vpp. */
454  mp = (void *)msg;
455  rmp = vl_msg_api_alloc (sizeof (*rmp));
456  clib_memset (rmp, 0, sizeof (*rmp));
457  rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
458  rmp->context = mp->context;
459  shmem_hdr = am->shmem_hdr;
460  vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
461  vl_msg_api_free((void *) msg);
462  /*
463  * Python code is blissfully unaware of these pings, so
464  * act as if it never happened...
465  */
466  goto again;
467 
468  default:
469  msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
470  *l = ntohl(msgbuf->data_len);
471  if (*l == 0) {
472  fprintf(stderr, "Unregistered API message: %d\n", msg_id);
473  goto error;
474  }
475  }
476  *p = (char *)msg;
477 
478 
479  } else {
480  fprintf(stderr, "Read failed with %d\n", rv);
481  }
482  /* Let timeout notification thread know we're done */
483  if (timeout)
484  unset_timeout();
485 
486  return (rv);
487 
488  error:
489  if (timeout)
490  unset_timeout();
491  vl_msg_api_free((void *) msg);
492  /* Client might forget to resume RX thread on failure */
493  vac_rx_resume ();
494  return VAC_TIMEOUT;
495 }
496 
497 /*
498  * XXX: Makes the assumption that client_index is the first member
499  */
500 typedef VL_API_PACKED(struct _vl_api_header {
501  u16 _vl_msg_id;
502  u32 client_index;
503 }) vl_api_header_t;
504 
505 static u32
506 vac_client_index (void)
507 {
508  return (vlibapi_get_main()->my_client_index);
509 }
510 
511 int
512 vac_write (char *p, int l)
513 {
514  int rv = -1;
516  vl_api_header_t *mp = vl_msg_api_alloc(l);
517  svm_queue_t *q;
518  vac_main_t *pm = &vac_main;
519 
520  if (!pm->connected_to_vlib)
521  return VAC_NOT_CONNECTED;
522  if (!mp) return (-1);
523 
524  memcpy(mp, p, l);
525  mp->client_index = vac_client_index();
526  q = am->shmem_hdr->vl_input_queue;
527  rv = svm_queue_add(q, (u8 *)&mp, 0);
528  if (rv != 0) {
529  fprintf(stderr, "vpe_api_write fails: %d\n", rv);
530  /* Clear message */
531  vac_free(mp);
532  }
533  return (rv);
534 }
535 
536 int
538 {
539  return vl_msg_api_get_msg_index ((u8 *)name);
540 }
541 
542 int
544 {
545  int max = 0;
546  hash_pair_t *hp;
548  hash_foreach_pair (hp, h,
549  ({
550  if (hp->value[0] > max)
551  max = hp->value[0];
552  }));
553 
554  return max;
555 }
556 
557 void
559 {
561  if (cb) clib_error_register_handler (cb, 0);
562 }
563 
564 /*
565  * Required if application doesn't use a VPP heap.
566  */
567 void
569 {
570  if (mem_initialized)
571  return;
572  if (size == 0)
573  clib_mem_init (0, 1 << 30); // default
574  else
575  clib_mem_init (0, size);
576  mem_initialized = true;
577 }
bool timeout_thread_cancelled
Definition: client.c:77
void vac_rx_resume(void)
Definition: client.c:261
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:260
bool mem_initialized
Definition: client.c:80
void vac_mem_init(size_t size)
Definition: client.c:568
#define ntohs(x)
Definition: af_xdp.bpf.c:29
int vac_get_msg_index(char *name)
Definition: client.c:537
void * clib_mem_init(void *base, uword size)
Definition: mem_dlmalloc.c:266
pthread_t timeout_thread_handle
Definition: client.c:62
clib_memset(h->entries, 0, sizeof(h->entries[0]) *entries)
static void set_timeout(unsigned short timeout)
Definition: client.c:336
pthread_t rx_thread_handle
Definition: client.c:61
int vac_msg_table_max_index(void)
Definition: client.c:543
void vlib_cli_output(struct vlib_main_t *vm, char *fmt,...)
Definition: client.c:114
string name[64]
Definition: fib.api:25
int vac_write(char *p, int l)
Definition: client.c:512
svm_queue_t * vl_input_queue
Peer input queue pointer.
Definition: api_common.h:333
void * vl_msg_api_alloc(int nbytes)
bool rx_is_running
Definition: client.c:76
unsigned char u8
Definition: types.h:56
u8 data[128]
Definition: ipsec_types.api:92
uword value[0]
Definition: hash.h:165
void vl_client_api_unmap(void)
unsigned int u32
Definition: types.h:88
#define assert(x)
Definition: dlmalloc.c:31
int our_pid
Current process PID.
Definition: api_common.h:281
u8 connected_to_vlib
Definition: client.c:60
pthread_cond_t timeout_cancel_cv
Definition: client.c:69
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
Definition: queue.c:369
pthread_mutex_t queue_lock
Definition: client.c:63
bool rx_thread_done
Definition: client.c:39
__clib_export void clib_error_register_handler(clib_error_handler_func_t func, void *arg)
Definition: error.c:75
bool timeout_cancelled
Definition: client.c:37
struct vl_shmem_hdr_ * shmem_hdr
Binary API shared-memory segment header pointer.
Definition: api_common.h:294
int __clib_unused rv
Definition: application.c:491
void vac_rx_suspend(void)
Definition: client.c:240
void vl_msg_api_send_shmem(svm_queue_t *q, u8 *elem)
bool timeout_in_progress
Definition: client.c:38
vac_callback_t vac_callback
Definition: client.c:74
static void unset_timeout(void)
Definition: client.c:348
int vac_connect(char *name, char *chroot_prefix, vac_callback_t cb, int rx_qlen)
Definition: client.c:288
void vl_set_memory_root_path(const char *name)
int vac_disconnect(void)
Definition: client.c:359
Definition: cJSON.c:88
void(* vac_error_callback_t)(void *, unsigned char *, int)
Definition: vppapiclient.h:31
u8 timeout_loop
Definition: client.c:67
int vl_client_connect(const char *name, int ctx_quota, int input_queue_size)
unsigned short u16
Definition: types.h:57
u32 size
Definition: vhost_user.h:125
static void cleanup(void)
Definition: client.c:98
vac_main_t vac_main
Definition: client.c:73
static void * vac_timeout_thread_fn(void *arg)
Definition: client.c:203
vlib_main_t * vm
X-connect all packets from the HOST to the PHY.
Definition: nat44_ei.c:3047
int cJSON_bool fmt
Definition: cJSON.h:160
u16 read_timeout
Definition: client.c:75
API main structure, used by both vpp and binary API clients.
Definition: api_common.h:228
#define clib_warning(format, args...)
Definition: error.h:59
u32 vl_msg_api_get_msg_index(u8 *name_and_crc)
Definition: api_shared.c:1119
int vl_client_api_map(const char *region_name)
pthread_cond_t suspend_cv
Definition: client.c:64
static uword * vac_msg_table_get_hash(void)
Definition: client.c:274
blocking call - best used in combination with condvars, for eventfds we don&#39;t yield the cpu ...
Definition: queue.h:42
svm_queue_t * vl_input_queue
Definition: memory_shared.h:84
static clib_mem_heap_t * clib_mem_get_heap(void)
Definition: mem.h:359
static uword hash_elts(void *v)
Definition: hash.h:118
#define ASSERT(truth)
u32 data_len
message length not including header
Definition: api_common.h:143
Message header structure.
Definition: api_common.h:140
int vac_read(char **p, int *l, u16 timeout)
Definition: client.c:410
static void init(void)
Definition: client.c:83
pthread_cond_t terminate_cv
Definition: client.c:70
void vl_msg_api_free(void *)
pthread_cond_t timeout_cv
Definition: client.c:68
pthread_cond_t resume_cv
Definition: client.c:65
#define hash_foreach_pair(p, v, body)
Iterate over hash pairs.
Definition: hash.h:373
int vl_client_disconnect(void)
u64 uword
Definition: types.h:112
void vac_set_error_handler(vac_error_callback_t cb)
Definition: client.c:558
int vac_msg_table_size(void)
Definition: client.c:281
struct _svm_queue svm_queue_t
void(* vac_callback_t)(unsigned char *data, int len)
Definition: vppapiclient.h:30
pthread_mutex_t timeout_lock
Definition: client.c:66
static api_main_t * vlibapi_get_main(void)
Definition: api_common.h:390
void vac_free(void *msg)
Definition: client.c:120
static void vac_api_handler(void *msg)
Definition: client.c:126
uword * msg_index_by_name_and_crc
client message index hash table
Definition: api_common.h:351
app_main_t * am
Definition: application.c:489
typedef VL_API_PACKED(struct _vl_api_header { u16 _vl_msg_id;u32 client_index;})
Definition: client.c:500
static CLIB_NOSANITIZE_ADDR void VL_MSG_API_UNPOISON(const void *a)
Definition: api_common.h:149
static void * vac_rx_thread_fn(void *arg)
Definition: client.c:141