FD.io VPP  v18.07-34-g55fbdb9
Vector Packet Processing
client.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <stddef.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/mman.h>
21 #include <sys/stat.h>
22 #include <netinet/in.h>
23 #include <netdb.h>
24 #include <signal.h>
25 #include <stdbool.h>
26 #include <vnet/vnet.h>
27 #include <vlib/vlib.h>
28 #include <vlib/unix/unix.h>
29 #include <vlibapi/api.h>
30 #include <vlibmemory/api.h>
31 
32 #include <vpp/api/vpe_msg_enum.h>
33 
34 #include "vppapiclient.h"
35 
36 /*
37  * Asynchronous mode:
38  * Client registers a callback. All messages are sent to the callback.
39  * Synchronous mode:
40  * Client calls blocking read().
41  * Clients are expected to collate events on a queue.
42  * vac_write() -> suspends RX thread
43  * vac_read() -> resumes RX thread
44  */
45 
46 #define vl_typedefs /* define message structures */
47 #include <vpp/api/vpe_all_api_h.h>
48 #undef vl_typedefs
49 
50 #define vl_endianfun /* define message structures */
51 #include <vpp/api/vpe_all_api_h.h>
52 #undef vl_endianfun
53 
56 
57 typedef struct {
59  pthread_t rx_thread_handle;
61  pthread_mutex_t queue_lock;
62  pthread_cond_t suspend_cv;
63  pthread_cond_t resume_cv;
64  pthread_mutex_t timeout_lock;
65  pthread_cond_t timeout_cv;
66  pthread_cond_t timeout_cancel_cv;
67  pthread_cond_t terminate_cv;
68 } vac_main_t;
69 
73 bool rx_is_running = false;
74 
75 /* Set to true to enable memory tracing */
76 bool mem_trace = false;
77 
78 __attribute__((constructor))
79 static void
81 {
82  u8 *heap;
83  mheap_t *h;
84  clib_mem_init (0, 1 << 30);
85  heap = clib_mem_get_per_cpu_heap ();
86  h = mheap_header (heap);
87  /* make the main heap thread-safe */
89  if (mem_trace)
90  clib_mem_trace (1);
91 }
92 
93 __attribute__((destructor))
94 static void
96 {
97  if (mem_trace)
98  fformat(stderr, "TRACE: %s",
99  format (0, "%U\n",
101 }
102 
103 
104 static void
105 init (void)
106 {
107  vac_main_t *pm = &vac_main;
108  memset(pm, 0, sizeof(*pm));
109  pthread_mutex_init(&pm->queue_lock, NULL);
110  pthread_cond_init(&pm->suspend_cv, NULL);
111  pthread_cond_init(&pm->resume_cv, NULL);
112  pthread_mutex_init(&pm->timeout_lock, NULL);
113  pthread_cond_init(&pm->timeout_cv, NULL);
114  pthread_cond_init(&pm->timeout_cancel_cv, NULL);
115  pthread_cond_init(&pm->terminate_cv, NULL);
116 }
117 
118 static void
119 cleanup (void)
120 {
121  vac_main_t *pm = &vac_main;
122  pthread_mutex_destroy(&pm->queue_lock);
123  pthread_cond_destroy(&pm->suspend_cv);
124  pthread_cond_destroy(&pm->resume_cv);
125  pthread_mutex_destroy(&pm->timeout_lock);
126  pthread_cond_destroy(&pm->timeout_cv);
127  pthread_cond_destroy(&pm->timeout_cancel_cv);
128  pthread_cond_destroy(&pm->terminate_cv);
129  memset(pm, 0, sizeof(*pm));
130 }
131 
132 /*
133  * Satisfy external references when -lvlib is not available.
134  */
135 void vlib_cli_output (struct vlib_main_t * vm, char * fmt, ...)
136 {
137  clib_warning ("vlib_cli_output called...");
138 }
139 
140 void
141 vac_free (void * msg)
142 {
143  vl_msg_api_free (msg);
144 }
145 
146 static void
147 vac_api_handler (void *msg)
148 {
149  u16 id = ntohs(*((u16 *)msg));
150  msgbuf_t *msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
151  int l = ntohl(msgbuf->data_len);
152  if (l == 0)
153  clib_warning("Message ID %d has wrong length: %d\n", id, l);
154 
155  /* Call Python callback */
156  ASSERT(vac_callback);
157  (vac_callback)(msg, l);
158  vac_free(msg);
159 }
160 
161 static void *
162 vac_rx_thread_fn (void *arg)
163 {
164  svm_queue_t *q;
166  vl_api_memclnt_keepalive_reply_t *rmp;
167  vac_main_t *pm = &vac_main;
168  api_main_t *am = &api_main;
170  uword msg;
171 
172  q = am->vl_input_queue;
173 
174  while (1)
175  while (!svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0))
176  {
177  u16 id = ntohs(*((u16 *)msg));
178  switch (id) {
179  case VL_API_RX_THREAD_EXIT:
180  vl_msg_api_free((void *) msg);
181  /* signal waiting threads that this thread is about to terminate */
182  pthread_mutex_lock(&pm->queue_lock);
183  pthread_cond_signal(&pm->terminate_cv);
184  pthread_mutex_unlock(&pm->queue_lock);
185  pthread_exit(0);
186  return 0;
187  break;
188 
189  case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
190  vl_msg_api_free((void * )msg);
191  /* Suspend thread and signal reader */
192  pthread_mutex_lock(&pm->queue_lock);
193  pthread_cond_signal(&pm->suspend_cv);
194  /* Wait for the resume signal */
195  pthread_cond_wait (&pm->resume_cv, &pm->queue_lock);
196  pthread_mutex_unlock(&pm->queue_lock);
197  break;
198 
199  case VL_API_MEMCLNT_READ_TIMEOUT:
200  clib_warning("Received read timeout in async thread\n");
201  vl_msg_api_free((void *) msg);
202  break;
203 
204  case VL_API_MEMCLNT_KEEPALIVE:
205  mp = (void *)msg;
206  rmp = vl_msg_api_alloc (sizeof (*rmp));
207  memset (rmp, 0, sizeof (*rmp));
208  rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
209  rmp->context = mp->context;
210  shmem_hdr = am->shmem_hdr;
211  vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
212  vl_msg_api_free((void *) msg);
213  break;
214 
215  default:
216  vac_api_handler((void *)msg);
217  }
218  }
219 }
220 
221 static void *
223 {
225  vac_main_t *pm = &vac_main;
226  api_main_t *am = &api_main;
227  struct timespec ts;
228  struct timeval tv;
229  u16 timeout;
230  int rv;
231 
232  while (1)
233  {
234  /* Wait for poke */
235  pthread_mutex_lock(&pm->timeout_lock);
236  pthread_cond_wait (&pm->timeout_cv, &pm->timeout_lock);
237  timeout = read_timeout;
238  gettimeofday(&tv, NULL);
239  ts.tv_sec = tv.tv_sec + timeout;
240  ts.tv_nsec = 0;
241  rv = pthread_cond_timedwait (&pm->timeout_cancel_cv,
242  &pm->timeout_lock, &ts);
243  pthread_mutex_unlock(&pm->timeout_lock);
244  if (rv == ETIMEDOUT)
245  {
246  ep = vl_msg_api_alloc (sizeof (*ep));
247  ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_READ_TIMEOUT);
249  }
250  }
251  pthread_exit(0);
252 }
253 
254 void
256 {
257  api_main_t *am = &api_main;
258  vac_main_t *pm = &vac_main;
260 
261  if (!pm->rx_thread_handle) return;
262  pthread_mutex_lock(&pm->queue_lock);
263  if (rx_is_running)
264  {
265  ep = vl_msg_api_alloc (sizeof (*ep));
266  ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_RX_THREAD_SUSPEND);
268  /* Wait for RX thread to tell us it has suspendend */
269  pthread_cond_wait(&pm->suspend_cv, &pm->queue_lock);
270  rx_is_running = false;
271  }
272  pthread_mutex_unlock(&pm->queue_lock);
273 }
274 
275 void
277 {
278  vac_main_t *pm = &vac_main;
279  if (!pm->rx_thread_handle) return;
280  pthread_mutex_lock(&pm->queue_lock);
281  if (rx_is_running) goto unlock;
282  pthread_cond_signal(&pm->resume_cv);
283  rx_is_running = true;
284  unlock:
285  pthread_mutex_unlock(&pm->queue_lock);
286 }
287 
288 static uword *
290 {
291  api_main_t *am = &api_main;
292  return (am->msg_index_by_name_and_crc);
293 }
294 
295 int
297 {
298  api_main_t *am = &api_main;
300 }
301 
302 int
303 vac_connect (char * name, char * chroot_prefix, vac_callback_t cb,
304  int rx_qlen)
305 {
306  int rv = 0;
307  vac_main_t *pm = &vac_main;
308 
309  init();
310  if (chroot_prefix != NULL)
311  vl_set_memory_root_path (chroot_prefix);
312 
313  if ((rv = vl_client_api_map("/vpe-api"))) {
314  clib_warning ("vl_client_api map rv %d", rv);
315  return rv;
316  }
317 
318  if (vl_client_connect(name, 0, rx_qlen) < 0) {
320  return (-1);
321  }
322 
323  if (cb) {
324  /* Start the rx queue thread */
325  rv = pthread_create(&pm->rx_thread_handle, NULL, vac_rx_thread_fn, 0);
326  if (rv) {
327  clib_warning("pthread_create returned %d", rv);
329  return (-1);
330  }
331  vac_callback = cb;
332  rx_is_running = true;
333  }
334 
335  /* Start read timeout thread */
336  rv = pthread_create(&pm->timeout_thread_handle, NULL,
338  if (rv) {
339  clib_warning("pthread_create returned %d", rv);
341  return (-1);
342  }
343 
344  pm->connected_to_vlib = 1;
345 
346  return (0);
347 }
348 
349 static void
350 set_timeout (unsigned short timeout)
351 {
352  vac_main_t *pm = &vac_main;
353  pthread_mutex_lock(&pm->timeout_lock);
354  read_timeout = timeout;
355  pthread_cond_signal(&pm->timeout_cv);
356  pthread_mutex_unlock(&pm->timeout_lock);
357 }
358 
359 static void
361 {
362  vac_main_t *pm = &vac_main;
363  pthread_mutex_lock(&pm->timeout_lock);
364  pthread_cond_signal(&pm->timeout_cancel_cv);
365  pthread_mutex_unlock(&pm->timeout_lock);
366 }
367 
368 int
370 {
371  api_main_t *am = &api_main;
372  vac_main_t *pm = &vac_main;
373  uword junk;
374 
375  if (!pm->connected_to_vlib) return 0;
376 
377  if (pm->rx_thread_handle) {
379  ep = vl_msg_api_alloc (sizeof (*ep));
380  ep->_vl_msg_id = ntohs(VL_API_RX_THREAD_EXIT);
382 
383  /* wait (with timeout) until RX thread has finished */
384  struct timespec ts;
385  struct timeval tv;
386  gettimeofday(&tv, NULL);
387  ts.tv_sec = tv.tv_sec + 5;
388  ts.tv_nsec = 0;
389  pthread_mutex_lock(&pm->queue_lock);
390  int rv = pthread_cond_timedwait(&pm->terminate_cv, &pm->queue_lock, &ts);
391  pthread_mutex_unlock(&pm->queue_lock);
392  /* now join so we wait until thread has -really- finished */
393  if (rv == ETIMEDOUT)
394  pthread_cancel(pm->rx_thread_handle);
395  else
396  pthread_join(pm->rx_thread_handle, (void **) &junk);
397  }
398  if (pm->timeout_thread_handle) {
399  /* cancel, wake then join the timeout thread */
400  pthread_cancel(pm->timeout_thread_handle);
401  set_timeout(0);
402  pthread_join(pm->timeout_thread_handle, (void **) &junk);
403  }
404 
407  vac_callback = 0;
408 
409  cleanup();
410 
411  return (0);
412 }
413 
414 int
415 vac_read (char **p, int *l, u16 timeout)
416 {
417  svm_queue_t *q;
418  api_main_t *am = &api_main;
419  vac_main_t *pm = &vac_main;
421  vl_api_memclnt_keepalive_reply_t *rmp;
422  uword msg;
423  msgbuf_t *msgbuf;
424  int rv;
426 
427  if (!pm->connected_to_vlib) return -1;
428 
429  *l = 0;
430 
431  if (am->our_pid == 0) return (-1);
432 
433  /* Poke timeout thread */
434  if (timeout)
435  set_timeout(timeout);
436 
437  q = am->vl_input_queue;
438 
439  again:
440  rv = svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0);
441 
442  if (rv == 0) {
443  u16 msg_id = ntohs(*((u16 *)msg));
444  switch (msg_id) {
445  case VL_API_RX_THREAD_EXIT:
446  printf("Received thread exit\n");
447  vl_msg_api_free((void *) msg);
448  return -1;
449  case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
450  printf("Received thread suspend\n");
451  goto error;
452  case VL_API_MEMCLNT_READ_TIMEOUT:
453  printf("Received read timeout %ds\n", timeout);
454  goto error;
455  case VL_API_MEMCLNT_KEEPALIVE:
456  /* Handle an alive-check ping from vpp. */
457  mp = (void *)msg;
458  rmp = vl_msg_api_alloc (sizeof (*rmp));
459  memset (rmp, 0, sizeof (*rmp));
460  rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
461  rmp->context = mp->context;
462  shmem_hdr = am->shmem_hdr;
463  vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
464  vl_msg_api_free((void *) msg);
465  /*
466  * Python code is blissfully unaware of these pings, so
467  * act as if it never happened...
468  */
469  goto again;
470 
471  default:
472  msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
473  *l = ntohl(msgbuf->data_len);
474  if (*l == 0) {
475  printf("Unregistered API message: %d\n", msg_id);
476  goto error;
477  }
478  }
479  *p = (char *)msg;
480 
481  /* Let timeout notification thread know we're done */
482  unset_timeout();
483 
484  } else {
485  printf("Read failed with %d\n", rv);
486  }
487  return (rv);
488 
489  error:
490  vl_msg_api_free((void *) msg);
491  /* Client might forget to resume RX thread on failure */
492  vac_rx_resume ();
493  return -1;
494 }
495 
496 /*
497  * XXX: Makes the assumption that client_index is the first member
498  */
499 typedef VL_API_PACKED(struct _vl_api_header {
500  u16 _vl_msg_id;
501  u32 client_index;
502 }) vl_api_header_t;
503 
504 static unsigned int
505 vac_client_index (void)
506 {
507  return (api_main.my_client_index);
508 }
509 
510 int
511 vac_write (char *p, int l)
512 {
513  int rv = -1;
514  api_main_t *am = &api_main;
515  vl_api_header_t *mp = vl_msg_api_alloc(l);
516  svm_queue_t *q;
517  vac_main_t *pm = &vac_main;
518 
519  if (!pm->connected_to_vlib) return -1;
520  if (!mp) return (-1);
521 
522  memcpy(mp, p, l);
523  mp->client_index = vac_client_index();
524  q = am->shmem_hdr->vl_input_queue;
525  rv = svm_queue_add(q, (u8 *)&mp, 0);
526  if (rv != 0) {
527  clib_warning("vpe_api_write fails: %d\n", rv);
528  /* Clear message */
529  vac_free(mp);
530  }
531  return (rv);
532 }
533 
534 int
535 vac_get_msg_index (unsigned char * name)
536 {
537  return vl_msg_api_get_msg_index (name);
538 }
539 
540 int
542 {
543  int max = 0;
544  hash_pair_t *hp;
546  hash_foreach_pair (hp, h,
547  ({
548  if (hp->value[0] > max)
549  max = hp->value[0];
550  }));
551 
552  return max;
553 }
554 
555 void
557 {
558  if (cb) clib_error_register_handler (cb, 0);
559 }
void vac_rx_resume(void)
Definition: client.c:276
int vac_get_msg_index(unsigned char *name)
Definition: client.c:535
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:184
pthread_t timeout_thread_handle
Definition: client.c:60
int my_client_index
All VLIB-side message handlers use my_client_index to identify the queue / client.
Definition: api_common.h:309
#define NULL
Definition: clib.h:55
typedef VL_API_PACKED(struct _vl_api_header{u16 _vl_msg_id;u32 client_index;})
Definition: client.c:499
static void set_timeout(unsigned short timeout)
Definition: client.c:350
static mheap_t * mheap_header(u8 *v)
pthread_t rx_thread_handle
Definition: client.c:59
int vac_msg_table_max_index(void)
Definition: client.c:541
void vlib_cli_output(struct vlib_main_t *vm, char *fmt,...)
Definition: client.c:135
u8 * format(u8 *s, const char *fmt,...)
Definition: format.c:419
int vac_write(char *p, int l)
Definition: client.c:511
void clib_error_register_handler(clib_error_handler_func_t func, void *arg)
Definition: error.c:75
#define MHEAP_FLAG_THREAD_SAFE
svm_queue_t * vl_input_queue
Peer input queue pointer.
Definition: api_common.h:303
void * vl_msg_api_alloc(int nbytes)
bool rx_is_running
Definition: client.c:73
unsigned char u8
Definition: types.h:56
u8 * format_mheap(u8 *s, va_list *va)
Definition: mheap.c:1178
uword value[0]
Definition: hash.h:165
void vl_client_api_unmap(void)
int our_pid
Current process PID.
Definition: api_common.h:251
u8 connected_to_vlib
Definition: client.c:58
pthread_cond_t timeout_cancel_cv
Definition: client.c:66
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
Definition: queue.c:303
pthread_mutex_t queue_lock
Definition: client.c:61
static void vac_client_constructor(void)
Definition: client.c:80
vlib_main_t ** vlib_mains
Definition: client.c:55
struct vl_shmem_hdr_ * shmem_hdr
Binary API shared-memory segment header pointer.
Definition: api_common.h:264
unsigned int u32
Definition: types.h:88
void vac_rx_suspend(void)
Definition: client.c:255
vac_callback_t vac_callback
Definition: client.c:71
static void unset_timeout(void)
Definition: client.c:360
vlib_main_t vlib_global_main
Definition: client.c:54
vl_shmem_hdr_t * shmem_hdr
int vac_connect(char *name, char *chroot_prefix, vac_callback_t cb, int rx_qlen)
Definition: client.c:303
void vl_set_memory_root_path(const char *name)
int vac_disconnect(void)
Definition: client.c:369
void(* vac_error_callback_t)(void *, unsigned char *, int)
Definition: vppapiclient.h:21
int vl_client_connect(const char *name, int ctx_quota, int input_queue_size)
bool mem_trace
Definition: client.c:76
static void vac_client_destructor(void)
Definition: client.c:95
unsigned short u16
Definition: types.h:57
static void * clib_mem_get_per_cpu_heap(void)
Definition: mem.h:58
static void cleanup(void)
Definition: client.c:119
vac_main_t vac_main
Definition: client.c:70
static void * vac_timeout_thread_fn(void *arg)
Definition: client.c:222
u16 read_timeout
Definition: client.c:72
word fformat(FILE *f, char *fmt,...)
Definition: format.c:453
API main structure, used by both vpp and binary API clients.
Definition: api_common.h:201
void * clib_mem_init(void *heap, uword size)
Definition: mem_mheap.c:60
void vl_msg_api_send_shmem(svm_queue_t *q, u8 *elem)
vlib_main_t * vm
Definition: buffer.c:294
#define clib_warning(format, args...)
Definition: error.h:59
u32 vl_msg_api_get_msg_index(u8 *name_and_crc)
Definition: api_shared.c:944
int vl_client_api_map(const char *region_name)
pthread_cond_t suspend_cv
Definition: client.c:62
static uword * vac_msg_table_get_hash(void)
Definition: client.c:289
blocking call
Definition: queue.h:44
svm_queue_t * vl_input_queue
Definition: memory_shared.h:84
static void * clib_mem_get_heap(void)
Definition: mem.h:220
static uword hash_elts(void *v)
Definition: hash.h:118
#define ASSERT(truth)
u32 data_len
message length not including header
Definition: api_common.h:138
Message header structure.
Definition: api_common.h:135
int vac_read(char **p, int *l, u16 timeout)
Definition: client.c:415
static void init(void)
Definition: client.c:105
pthread_cond_t terminate_cv
Definition: client.c:67
void vl_msg_api_free(void *)
pthread_cond_t timeout_cv
Definition: client.c:65
pthread_cond_t resume_cv
Definition: client.c:63
#define hash_foreach_pair(p, v, body)
Iterate over hash pairs.
Definition: hash.h:373
int vl_client_disconnect(void)
u64 uword
Definition: types.h:112
void vac_set_error_handler(vac_error_callback_t cb)
Definition: client.c:556
int vac_msg_table_size(void)
Definition: client.c:296
struct _svm_queue svm_queue_t
void(* vac_callback_t)(unsigned char *data, int len)
Definition: vppapiclient.h:20
pthread_mutex_t timeout_lock
Definition: client.c:64
void vac_free(void *msg)
Definition: client.c:141
static void vac_api_handler(void *msg)
Definition: client.c:147
void clib_mem_trace(int enable)
Definition: mem_mheap.c:154
uword * msg_index_by_name_and_crc
client message index hash table
Definition: api_common.h:324
api_main_t api_main
Definition: api_shared.c:35
static void * vac_rx_thread_fn(void *arg)
Definition: client.c:162