FD.io VPP  v18.04-17-g3a0d853
Vector Packet Processing
client.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <stddef.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/mman.h>
21 #include <sys/stat.h>
22 #include <netinet/in.h>
23 #include <netdb.h>
24 #include <signal.h>
25 #include <stdbool.h>
26 #include <vnet/vnet.h>
27 #include <vlib/vlib.h>
28 #include <vlib/unix/unix.h>
29 #include <vlibapi/api.h>
30 #include <vlibmemory/api.h>
31 
32 #include <vpp/api/vpe_msg_enum.h>
33 
34 #include "vppapiclient.h"
35 
36 /*
37  * Asynchronous mode:
38  * Client registers a callback. All messages are sent to the callback.
39  * Synchronous mode:
40  * Client calls blocking read().
41  * Clients are expected to collate events on a queue.
42  * vac_write() -> suspends RX thread
43  * vac_read() -> resumes RX thread
44  */
45 
46 #define vl_typedefs /* define message structures */
47 #include <vpp/api/vpe_all_api_h.h>
48 #undef vl_typedefs
49 
50 #define vl_endianfun /* define message structures */
51 #include <vpp/api/vpe_all_api_h.h>
52 #undef vl_endianfun
53 
56 
57 typedef struct {
59  pthread_t rx_thread_handle;
61  pthread_mutex_t queue_lock;
62  pthread_cond_t suspend_cv;
63  pthread_cond_t resume_cv;
64  pthread_mutex_t timeout_lock;
65  pthread_cond_t timeout_cv;
66  pthread_cond_t timeout_cancel_cv;
67  pthread_cond_t terminate_cv;
68 } vac_main_t;
69 
73 bool rx_is_running = false;
74 
75 static void
76 init (void)
77 {
78  vac_main_t *pm = &vac_main;
79  memset(pm, 0, sizeof(*pm));
80  pthread_mutex_init(&pm->queue_lock, NULL);
81  pthread_cond_init(&pm->suspend_cv, NULL);
82  pthread_cond_init(&pm->resume_cv, NULL);
83  pthread_mutex_init(&pm->timeout_lock, NULL);
84  pthread_cond_init(&pm->timeout_cv, NULL);
85  pthread_cond_init(&pm->timeout_cancel_cv, NULL);
86  pthread_cond_init(&pm->terminate_cv, NULL);
87 }
88 
89 static void
90 cleanup (void)
91 {
92  vac_main_t *pm = &vac_main;
93  pthread_cond_destroy(&pm->suspend_cv);
94  pthread_cond_destroy(&pm->resume_cv);
95  pthread_cond_destroy(&pm->timeout_cv);
96  pthread_cond_destroy(&pm->timeout_cancel_cv);
97  pthread_cond_destroy(&pm->terminate_cv);
98  pthread_mutex_destroy(&pm->queue_lock);
99  pthread_mutex_destroy(&pm->timeout_lock);
100  memset (pm, 0, sizeof (*pm));
101 }
102 
103 /*
104  * Satisfy external references when -lvlib is not available.
105  */
106 void vlib_cli_output (struct vlib_main_t * vm, char * fmt, ...)
107 {
108  clib_warning ("vlib_cli_output called...");
109 }
110 
111 void
112 vac_free (void * msg)
113 {
114  vl_msg_api_free (msg);
115 }
116 
117 static void
118 vac_api_handler (void *msg)
119 {
120  u16 id = ntohs(*((u16 *)msg));
121  msgbuf_t *msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
122  int l = ntohl(msgbuf->data_len);
123  if (l == 0)
124  clib_warning("Message ID %d has wrong length: %d\n", id, l);
125 
126  /* Call Python callback */
127  ASSERT(vac_callback);
128  (vac_callback)(msg, l);
129  vac_free(msg);
130 }
131 
132 static void *
133 vac_rx_thread_fn (void *arg)
134 {
135  svm_queue_t *q;
137  vl_api_memclnt_keepalive_reply_t *rmp;
138  vac_main_t *pm = &vac_main;
139  api_main_t *am = &api_main;
141  uword msg;
142 
143  q = am->vl_input_queue;
144 
145  while (1)
146  while (!svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0))
147  {
148  u16 id = ntohs(*((u16 *)msg));
149  switch (id) {
150  case VL_API_RX_THREAD_EXIT:
151  vl_msg_api_free((void *) msg);
152  /* signal waiting threads that this thread is about to terminate */
153  pthread_mutex_lock(&pm->queue_lock);
154  pthread_cond_signal(&pm->terminate_cv);
155  pthread_mutex_unlock(&pm->queue_lock);
156  pthread_exit(0);
157  return 0;
158  break;
159 
160  case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
161  vl_msg_api_free((void * )msg);
162  /* Suspend thread and signal reader */
163  pthread_mutex_lock(&pm->queue_lock);
164  pthread_cond_signal(&pm->suspend_cv);
165  /* Wait for the resume signal */
166  pthread_cond_wait (&pm->resume_cv, &pm->queue_lock);
167  pthread_mutex_unlock(&pm->queue_lock);
168  break;
169 
170  case VL_API_MEMCLNT_READ_TIMEOUT:
171  clib_warning("Received read timeout in async thread\n");
172  vl_msg_api_free((void *) msg);
173  break;
174 
175  case VL_API_MEMCLNT_KEEPALIVE:
176  mp = (void *)msg;
177  rmp = vl_msg_api_alloc (sizeof (*rmp));
178  memset (rmp, 0, sizeof (*rmp));
179  rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
180  rmp->context = mp->context;
181  shmem_hdr = am->shmem_hdr;
182  vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
183  vl_msg_api_free((void *) msg);
184  break;
185 
186  default:
187  vac_api_handler((void *)msg);
188  }
189  }
190 }
191 
192 static void *
194 {
196  vac_main_t *pm = &vac_main;
197  api_main_t *am = &api_main;
198  struct timespec ts;
199  struct timeval tv;
200  u16 timeout;
201  int rv;
202 
203  while (1)
204  {
205  /* Wait for poke */
206  pthread_mutex_lock(&pm->timeout_lock);
207  pthread_cond_wait (&pm->timeout_cv, &pm->timeout_lock);
208  timeout = read_timeout;
209  gettimeofday(&tv, NULL);
210  ts.tv_sec = tv.tv_sec + timeout;
211  ts.tv_nsec = 0;
212  rv = pthread_cond_timedwait (&pm->timeout_cancel_cv,
213  &pm->timeout_lock, &ts);
214  pthread_mutex_unlock(&pm->timeout_lock);
215  if (rv == ETIMEDOUT)
216  {
217  ep = vl_msg_api_alloc (sizeof (*ep));
218  ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_READ_TIMEOUT);
220  }
221  }
222  pthread_exit(0);
223 }
224 
225 void
227 {
228  api_main_t *am = &api_main;
229  vac_main_t *pm = &vac_main;
231 
232  if (!pm->rx_thread_handle) return;
233  pthread_mutex_lock(&pm->queue_lock);
234  if (rx_is_running)
235  {
236  ep = vl_msg_api_alloc (sizeof (*ep));
237  ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_RX_THREAD_SUSPEND);
239  /* Wait for RX thread to tell us it has suspendend */
240  pthread_cond_wait(&pm->suspend_cv, &pm->queue_lock);
241  rx_is_running = false;
242  }
243  pthread_mutex_unlock(&pm->queue_lock);
244 }
245 
246 void
248 {
249  vac_main_t *pm = &vac_main;
250  if (!pm->rx_thread_handle) return;
251  pthread_mutex_lock(&pm->queue_lock);
252  if (rx_is_running) goto unlock;
253  pthread_cond_signal(&pm->resume_cv);
254  rx_is_running = true;
255  unlock:
256  pthread_mutex_unlock(&pm->queue_lock);
257 }
258 
259 static uword *
261 {
262  api_main_t *am = &api_main;
263  return (am->msg_index_by_name_and_crc);
264 }
265 
266 int
268 {
269  api_main_t *am = &api_main;
271 }
272 
273 int
274 vac_connect (char * name, char * chroot_prefix, vac_callback_t cb,
275  int rx_qlen)
276 {
277  int rv = 0;
278  vac_main_t *pm = &vac_main;
279 
280  init();
281  if (chroot_prefix != NULL)
282  vl_set_memory_root_path (chroot_prefix);
283 
284  if ((rv = vl_client_api_map("/vpe-api"))) {
285  clib_warning ("vl_client_api map rv %d", rv);
286  return rv;
287  }
288 
289  if (vl_client_connect(name, 0, rx_qlen) < 0) {
291  return (-1);
292  }
293 
294  if (cb) {
295  /* Start the rx queue thread */
296  rv = pthread_create(&pm->rx_thread_handle, NULL, vac_rx_thread_fn, 0);
297  if (rv) {
298  clib_warning("pthread_create returned %d", rv);
300  return (-1);
301  }
302  vac_callback = cb;
303  rx_is_running = true;
304  }
305 
306  /* Start read timeout thread */
307  rv = pthread_create(&pm->timeout_thread_handle, NULL,
309  if (rv) {
310  clib_warning("pthread_create returned %d", rv);
312  return (-1);
313  }
314 
315  pm->connected_to_vlib = 1;
316 
317  return (0);
318 }
319 
320 int
322 {
323  api_main_t *am = &api_main;
324  vac_main_t *pm = &vac_main;
325 
326  if (!pm->connected_to_vlib) return 0;
327 
328  if (pm->rx_thread_handle) {
330  uword junk;
331  ep = vl_msg_api_alloc (sizeof (*ep));
332  ep->_vl_msg_id = ntohs(VL_API_RX_THREAD_EXIT);
334 
335  /* wait (with timeout) until RX thread has finished */
336  struct timespec ts;
337  struct timeval tv;
338  gettimeofday(&tv, NULL);
339  ts.tv_sec = tv.tv_sec + 5;
340  ts.tv_nsec = 0;
341  pthread_mutex_lock(&pm->queue_lock);
342  int rv = pthread_cond_timedwait(&pm->terminate_cv, &pm->queue_lock, &ts);
343  pthread_mutex_unlock(&pm->queue_lock);
344  /* now join so we wait until thread has -really- finished */
345  if (rv == ETIMEDOUT)
346  pthread_cancel(pm->rx_thread_handle);
347  else
348  pthread_join(pm->rx_thread_handle, (void **) &junk);
349  }
350  if (pm->timeout_thread_handle)
351  pthread_cancel(pm->timeout_thread_handle);
352 
355  vac_callback = 0;
356 
357  cleanup();
358 
359  return (0);
360 }
361 
362 static void
363 set_timeout (unsigned short timeout)
364 {
365  vac_main_t *pm = &vac_main;
366  pthread_mutex_lock(&pm->timeout_lock);
367  read_timeout = timeout;
368  pthread_cond_signal(&pm->timeout_cv);
369  pthread_mutex_unlock(&pm->timeout_lock);
370 }
371 
372 static void
374 {
375  vac_main_t *pm = &vac_main;
376  pthread_mutex_lock(&pm->timeout_lock);
377  pthread_cond_signal(&pm->timeout_cancel_cv);
378  pthread_mutex_unlock(&pm->timeout_lock);
379 }
380 
381 int
382 vac_read (char **p, int *l, u16 timeout)
383 {
384  svm_queue_t *q;
385  api_main_t *am = &api_main;
386  vac_main_t *pm = &vac_main;
388  vl_api_memclnt_keepalive_reply_t *rmp;
389  uword msg;
390  msgbuf_t *msgbuf;
391  int rv;
393 
394  if (!pm->connected_to_vlib) return -1;
395 
396  *l = 0;
397 
398  if (am->our_pid == 0) return (-1);
399 
400  /* Poke timeout thread */
401  if (timeout)
402  set_timeout(timeout);
403 
404  q = am->vl_input_queue;
405 
406  again:
407  rv = svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0);
408 
409  if (rv == 0) {
410  u16 msg_id = ntohs(*((u16 *)msg));
411  switch (msg_id) {
412  case VL_API_RX_THREAD_EXIT:
413  printf("Received thread exit\n");
414  return -1;
415  case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
416  printf("Received thread suspend\n");
417  goto error;
418  case VL_API_MEMCLNT_READ_TIMEOUT:
419  printf("Received read timeout %ds\n", timeout);
420  goto error;
421  case VL_API_MEMCLNT_KEEPALIVE:
422  /* Handle an alive-check ping from vpp. */
423  mp = (void *)msg;
424  rmp = vl_msg_api_alloc (sizeof (*rmp));
425  memset (rmp, 0, sizeof (*rmp));
426  rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
427  rmp->context = mp->context;
428  shmem_hdr = am->shmem_hdr;
429  vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
430  vl_msg_api_free((void *) msg);
431  /*
432  * Python code is blissfully unaware of these pings, so
433  * act as if it never happened...
434  */
435  goto again;
436 
437  default:
438  msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
439  *l = ntohl(msgbuf->data_len);
440  if (*l == 0) {
441  printf("Unregistered API message: %d\n", msg_id);
442  goto error;
443  }
444  }
445  *p = (char *)msg;
446 
447  /* Let timeout notification thread know we're done */
448  unset_timeout();
449 
450  } else {
451  printf("Read failed with %d\n", rv);
452  }
453  return (rv);
454 
455  error:
456  vl_msg_api_free((void *) msg);
457  /* Client might forget to resume RX thread on failure */
458  vac_rx_resume ();
459  return -1;
460 }
461 
462 /*
463  * XXX: Makes the assumption that client_index is the first member
464  */
465 typedef VL_API_PACKED(struct _vl_api_header {
466  u16 _vl_msg_id;
467  u32 client_index;
468 }) vl_api_header_t;
469 
470 static unsigned int
471 vac_client_index (void)
472 {
473  return (api_main.my_client_index);
474 }
475 
476 int
477 vac_write (char *p, int l)
478 {
479  int rv = -1;
480  api_main_t *am = &api_main;
481  vl_api_header_t *mp = vl_msg_api_alloc(l);
482  svm_queue_t *q;
483  vac_main_t *pm = &vac_main;
484 
485  if (!pm->connected_to_vlib) return -1;
486  if (!mp) return (-1);
487 
488  memcpy(mp, p, l);
489  mp->client_index = vac_client_index();
490  q = am->shmem_hdr->vl_input_queue;
491  rv = svm_queue_add(q, (u8 *)&mp, 0);
492  if (rv != 0) {
493  clib_warning("vpe_api_write fails: %d\n", rv);
494  /* Clear message */
495  vac_free(mp);
496  }
497  return (rv);
498 }
499 
500 int
501 vac_get_msg_index (unsigned char * name)
502 {
503  return vl_msg_api_get_msg_index (name);
504 }
505 
506 int
508 {
509  int max = 0;
510  hash_pair_t *hp;
512  hash_foreach_pair (hp, h,
513  ({
514  if (hp->value[0] > max)
515  max = hp->value[0];
516  }));
517 
518  return max;
519 }
520 
521 void
523 {
524  if (cb) clib_error_register_handler (cb, 0);
525 }
void vac_rx_resume(void)
Definition: client.c:247
int vac_get_msg_index(unsigned char *name)
Definition: client.c:501
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:184
pthread_t timeout_thread_handle
Definition: client.c:60
int my_client_index
All VLIB-side message handlers use my_client_index to identify the queue / client.
Definition: api_common.h:307
#define NULL
Definition: clib.h:55
typedef VL_API_PACKED(struct _vl_api_header{u16 _vl_msg_id;u32 client_index;})
Definition: client.c:465
static void set_timeout(unsigned short timeout)
Definition: client.c:363
pthread_t rx_thread_handle
Definition: client.c:59
int vac_msg_table_max_index(void)
Definition: client.c:507
void vlib_cli_output(struct vlib_main_t *vm, char *fmt,...)
Definition: client.c:106
int vac_write(char *p, int l)
Definition: client.c:477
void clib_error_register_handler(clib_error_handler_func_t func, void *arg)
Definition: error.c:75
svm_queue_t * vl_input_queue
Peer input queue pointer.
Definition: api_common.h:301
void * vl_msg_api_alloc(int nbytes)
bool rx_is_running
Definition: client.c:73
uword value[0]
Definition: hash.h:164
void vl_client_api_unmap(void)
int our_pid
Current process PID.
Definition: api_common.h:249
u8 connected_to_vlib
Definition: client.c:58
pthread_cond_t timeout_cancel_cv
Definition: client.c:66
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
Definition: queue.c:303
pthread_mutex_t queue_lock
Definition: client.c:61
vlib_main_t ** vlib_mains
Definition: client.c:55
struct vl_shmem_hdr_ * shmem_hdr
Binary API shared-memory segment header pointer.
Definition: api_common.h:262
void vac_rx_suspend(void)
Definition: client.c:226
vac_callback_t vac_callback
Definition: client.c:71
static void unset_timeout(void)
Definition: client.c:373
vlib_main_t vlib_global_main
Definition: client.c:54
vl_shmem_hdr_t * shmem_hdr
int vac_connect(char *name, char *chroot_prefix, vac_callback_t cb, int rx_qlen)
Definition: client.c:274
void vl_set_memory_root_path(const char *name)
int vac_disconnect(void)
Definition: client.c:321
void(* vac_error_callback_t)(void *, unsigned char *, int)
Definition: vppapiclient.h:21
int vl_client_connect(const char *name, int ctx_quota, int input_queue_size)
static void cleanup(void)
Definition: client.c:90
vac_main_t vac_main
Definition: client.c:70
static void * vac_timeout_thread_fn(void *arg)
Definition: client.c:193
u16 read_timeout
Definition: client.c:72
API main structure, used by both vpp and binary API clients.
Definition: api_common.h:199
void vl_msg_api_send_shmem(svm_queue_t *q, u8 *elem)
vlib_main_t * vm
Definition: buffer.c:294
#define clib_warning(format, args...)
Definition: error.h:59
u32 vl_msg_api_get_msg_index(u8 *name_and_crc)
Definition: api_shared.c:944
int vl_client_api_map(const char *region_name)
pthread_cond_t suspend_cv
Definition: client.c:62
static uword * vac_msg_table_get_hash(void)
Definition: client.c:260
blocking call
Definition: queue.h:44
svm_queue_t * vl_input_queue
Definition: memory_shared.h:84
static uword hash_elts(void *v)
Definition: hash.h:117
#define ASSERT(truth)
unsigned int u32
Definition: types.h:88
u32 data_len
message length not including header
Definition: api_common.h:138
Message header structure.
Definition: api_common.h:135
int vac_read(char **p, int *l, u16 timeout)
Definition: client.c:382
static void init(void)
Definition: client.c:76
pthread_cond_t terminate_cv
Definition: client.c:67
void vl_msg_api_free(void *)
u64 uword
Definition: types.h:112
pthread_cond_t timeout_cv
Definition: client.c:65
unsigned short u16
Definition: types.h:57
unsigned char u8
Definition: types.h:56
pthread_cond_t resume_cv
Definition: client.c:63
#define hash_foreach_pair(p, v, body)
Iterate over hash pairs.
Definition: hash.h:372
int vl_client_disconnect(void)
void vac_set_error_handler(vac_error_callback_t cb)
Definition: client.c:522
int vac_msg_table_size(void)
Definition: client.c:267
struct _svm_queue svm_queue_t
void(* vac_callback_t)(unsigned char *data, int len)
Definition: vppapiclient.h:20
pthread_mutex_t timeout_lock
Definition: client.c:64
void vac_free(void *msg)
Definition: client.c:112
static void vac_api_handler(void *msg)
Definition: client.c:118
uword * msg_index_by_name_and_crc
client message index hash table
Definition: api_common.h:322
api_main_t api_main
Definition: api_shared.c:35
static void * vac_rx_thread_fn(void *arg)
Definition: client.c:133