FD.io VPP  v17.10-9-gd594711
Vector Packet Processing
builtin_proxy.c
Go to the documentation of this file.
1 /*
2 * Copyright (c) 2015-2017 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 #include <vnet/vnet.h>
17 #include <vlibmemory/api.h>
20 #include <vnet/tcp/builtin_proxy.h>
21 
23 
24 static void
25 delete_proxy_session (stream_session_t * s, int is_active_open)
26 {
28  proxy_session_t *ps = 0;
29  vnet_disconnect_args_t _a, *a = &_a;
30  stream_session_t *active_open_session = 0;
31  stream_session_t *server_session = 0;
32  uword *p;
33  u64 handle;
34 
35  handle = stream_session_handle (s);
36 
38  if (is_active_open)
39  {
40  active_open_session = s;
41 
43  if (p == 0)
44  {
45  clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
46  is_active_open ? "active open" : "server",
47  handle, handle);
48  }
49  else
50  {
51  ps = pool_elt_at_index (bpm->sessions, p[0]);
52  if (ps->vpp_server_handle != ~0)
53  server_session = stream_session_get_from_handle
54  (ps->vpp_server_handle);
55  else
56  server_session = 0;
57  }
58  }
59  else
60  {
61  server_session = s;
62 
63  p = hash_get (bpm->proxy_session_by_server_handle, handle);
64  if (p == 0)
65  {
66  clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
67  is_active_open ? "active open" : "server",
68  handle, handle);
69  }
70  else
71  {
72  ps = pool_elt_at_index (bpm->sessions, p[0]);
73  if (ps->vpp_server_handle != ~0)
74  active_open_session = stream_session_get_from_handle
75  (ps->vpp_server_handle);
76  else
77  active_open_session = 0;
78  }
79  }
80 
81  if (ps)
82  {
83  if (CLIB_DEBUG > 0)
84  memset (ps, 0xFE, sizeof (*ps));
85  pool_put (bpm->sessions, ps);
86  }
87 
89 
90  if (active_open_session)
91  {
92  a->handle = stream_session_handle (active_open_session);
93  a->app_index = bpm->active_open_app_index;
95  stream_session_handle (active_open_session));
97  }
98 
99  if (server_session)
100  {
101  a->handle = stream_session_handle (server_session);
102  a->app_index = bpm->server_app_index;
104  stream_session_handle (server_session));
106  }
107 }
108 
109 static int
111 {
113 
114  s->session_state = SESSION_STATE_READY;
115 
117 
118  return 0;
119 }
120 
121 static void
123 {
124  delete_proxy_session (s, 0 /* is_active_open */ );
125 }
126 
127 static void
129 {
130  clib_warning ("Reset session %U", format_stream_session, s, 2);
131  delete_proxy_session (s, 0 /* is_active_open */ );
132 }
133 
134 static int
135 server_connected_callback (u32 app_index, u32 api_context,
136  stream_session_t * s, u8 is_fail)
137 {
138  clib_warning ("called...");
139  return -1;
140 }
141 
142 static int
144  const u8 * seg_name, u32 seg_size)
145 {
146  clib_warning ("called...");
147  return -1;
148 }
149 
150 static int
151 server_redirect_connect_callback (u32 client_index, void *mp)
152 {
153  clib_warning ("called...");
154  return -1;
155 }
156 
157 static int
159 {
160  u32 max_dequeue;
161  int actual_transfer __attribute__ ((unused));
162  svm_fifo_t *tx_fifo, *rx_fifo;
164  u32 thread_index = vlib_get_thread_index ();
165  vnet_connect_args_t _a, *a = &_a;
166  proxy_session_t *ps;
167  int proxy_index;
168  uword *p;
169  svm_fifo_t *active_open_tx_fifo;
170  session_fifo_event_t evt;
171 
172  ASSERT (s->thread_index == thread_index);
173 
175  p =
177 
178  if (PREDICT_TRUE (p != 0))
179  {
181  active_open_tx_fifo = s->server_rx_fifo;
182 
183  /*
184  * Send event for active open tx fifo
185  */
186  if (svm_fifo_set_event (active_open_tx_fifo))
187  {
188  evt.fifo = active_open_tx_fifo;
189  evt.event_type = FIFO_EVENT_APP_TX;
191  (bpm->active_open_event_queue[thread_index], (u8 *) & evt,
192  0 /* do wait for mutex */ ))
193  clib_warning ("failed to enqueue tx evt");
194  }
195  }
196  else
197  {
198  rx_fifo = s->server_rx_fifo;
199  tx_fifo = s->server_tx_fifo;
200 
201  ASSERT (rx_fifo->master_thread_index == thread_index);
202  ASSERT (tx_fifo->master_thread_index == thread_index);
203 
204  max_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
205 
206  if (PREDICT_FALSE (max_dequeue == 0))
207  return 0;
208 
209  actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
210  max_dequeue,
211  bpm->rx_buf[thread_index]);
212 
213  /* $$$ your message in this space: parse url, etc. */
214 
215  memset (a, 0, sizeof (*a));
216 
218  pool_get (bpm->sessions, ps);
219  memset (ps, 0, sizeof (*ps));
220  ps->server_rx_fifo = rx_fifo;
221  ps->server_tx_fifo = tx_fifo;
223 
224  proxy_index = ps - bpm->sessions;
225 
227  proxy_index);
228 
230 
231  a->uri = "tcp://6.0.2.2/23";
232  a->api_context = proxy_index;
233  a->app_index = bpm->active_open_app_index;
234  a->mp = 0;
235  vnet_connect_uri (a);
236  }
237 
238  return 0;
239 }
240 
242  .session_accept_callback = server_accept_callback,
243  .session_disconnect_callback = server_disconnect_callback,
244  .session_connected_callback = server_connected_callback,
245  .add_segment_callback = server_add_segment_callback,
246  .redirect_connect_callback = server_redirect_connect_callback,
247  .builtin_server_rx_callback = server_rx_callback,
248  .session_reset_callback = server_reset_callback
249 };
250 
251 static int
253  stream_session_t * s, u8 is_fail)
254 {
256  proxy_session_t *ps;
257  u8 thread_index = vlib_get_thread_index ();
258  session_fifo_event_t evt;
259 
260  if (is_fail)
261  {
262  clib_warning ("connection %d failed!", opaque);
263  return 0;
264  }
265 
266  /*
267  * Setup proxy session handle.
268  */
270 
271  ps = pool_elt_at_index (bpm->sessions, opaque);
273 
274  s->server_tx_fifo = ps->server_rx_fifo;
275  s->server_rx_fifo = ps->server_tx_fifo;
276 
277  /*
278  * Reset the active-open tx-fifo master indices so the active-open session
279  * will receive data, etc.
280  */
281  s->server_tx_fifo->master_session_index = s->session_index;
282  s->server_tx_fifo->master_thread_index = s->thread_index;
283 
284  /*
285  * Account for the active-open session's use of the fifos
286  * so they won't disappear until the last session which uses
287  * them disappears
288  */
289  s->server_tx_fifo->refcnt++;
290  s->server_rx_fifo->refcnt++;
291 
293  ps->vpp_active_open_handle, opaque);
294 
296 
297  /*
298  * Send event for active open tx fifo
299  */
300  if (svm_fifo_set_event (s->server_tx_fifo))
301  {
302  evt.fifo = s->server_tx_fifo;
303  evt.event_type = FIFO_EVENT_APP_TX;
305  (bpm->active_open_event_queue[thread_index], (u8 *) & evt,
306  0 /* do wait for mutex */ ))
307  clib_warning ("failed to enqueue tx evt");
308  }
309 
310  return 0;
311 }
312 
313 static void
315 {
316  delete_proxy_session (s, 1 /* is_active_open */ );
317 }
318 
319 static int
321 {
322  return 0;
323 }
324 
325 static void
327 {
328  delete_proxy_session (s, 1 /* is_active_open */ );
329 }
330 
331 static int
333 {
335  session_fifo_event_t evt;
336  svm_fifo_t *server_rx_fifo;
337  u32 thread_index = vlib_get_thread_index ();
338 
339  server_rx_fifo = s->server_rx_fifo;
340 
341  /*
342  * Send event for server tx fifo
343  */
344  if (svm_fifo_set_event (server_rx_fifo))
345  {
346  evt.fifo = server_rx_fifo;
347  evt.event_type = FIFO_EVENT_APP_TX;
349  (bpm->server_event_queue[thread_index], (u8 *) & evt,
350  0 /* do wait for mutex */ ))
351  clib_warning ("failed to enqueue server rx evt");
352  }
353 
354  return 0;
355 }
356 
357 /* *INDENT-OFF* */
359  .session_reset_callback = active_open_reset_callback,
360  .session_connected_callback = active_open_connected_callback,
361  .session_accept_callback = active_open_create_callback,
362  .session_disconnect_callback = active_open_disconnect_callback,
363  .builtin_server_rx_callback = active_open_rx_callback
364 };
365 /* *INDENT-ON* */
366 
367 
368 static void
370 {
372  api_main_t *am = &api_main;
374 
375  shmem_hdr = am->shmem_hdr;
376  bpm->vl_input_queue = shmem_hdr->vl_input_queue;
377  bpm->server_client_index =
378  vl_api_memclnt_create_internal ("proxy_server", bpm->vl_input_queue);
380  vl_api_memclnt_create_internal ("proxy_active_open", bpm->vl_input_queue);
381 }
382 
383 static int
385 {
387  u8 segment_name[128];
389  vnet_app_attach_args_t _a, *a = &_a;
390 
391  memset (a, 0, sizeof (*a));
392  memset (options, 0, sizeof (options));
393 
394  a->api_client_index = bpm->server_client_index;
395  a->session_cb_vft = &builtin_session_cb_vft;
396  a->options = options;
397  a->options[SESSION_OPTIONS_SEGMENT_SIZE] = 512 << 20;
398  a->options[SESSION_OPTIONS_RX_FIFO_SIZE] = bpm->fifo_size;
399  a->options[SESSION_OPTIONS_TX_FIFO_SIZE] = bpm->fifo_size;
402  a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
403  bpm->prealloc_fifos ? bpm->prealloc_fifos : 1;
404 
405  a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_BUILTIN_APP;
406 
407  a->segment_name = segment_name;
408  a->segment_name_length = ARRAY_LEN (segment_name);
409 
410  if (vnet_application_attach (a))
411  {
412  clib_warning ("failed to attach server");
413  return -1;
414  }
415  bpm->server_app_index = a->app_index;
416 
417  return 0;
418 }
419 
420 static int
422 {
424  vnet_app_attach_args_t _a, *a = &_a;
425  u8 segment_name[128];
426  u32 segment_name_length;
427  u64 options[16];
428 
429  segment_name_length = ARRAY_LEN (segment_name);
430 
431  memset (a, 0, sizeof (*a));
432  memset (options, 0, sizeof (options));
433 
434  a->api_client_index = bpm->active_open_client_index;
435  a->segment_name = segment_name;
436  a->segment_name_length = segment_name_length;
437  a->session_cb_vft = &builtin_clients;
438 
439  options[SESSION_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
440  options[SESSION_OPTIONS_SEGMENT_SIZE] = 512 << 20;
441  options[SESSION_OPTIONS_RX_FIFO_SIZE] = bpm->fifo_size;
442  options[SESSION_OPTIONS_TX_FIFO_SIZE] = bpm->fifo_size;
446  bpm->prealloc_fifos ? bpm->prealloc_fifos : 1;
447 
448  options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_BUILTIN_APP
449  | APP_OPTIONS_FLAGS_IS_PROXY;
450 
451  a->options = options;
452 
453  if (vnet_application_attach (a))
454  return -1;
455 
456  bpm->active_open_app_index = a->app_index;
457 
458  return 0;
459 }
460 
461 static int
463 {
465  vnet_bind_args_t _a, *a = &_a;
466  memset (a, 0, sizeof (*a));
467  a->app_index = bpm->server_app_index;
468  a->uri = "tcp://0.0.0.0/23";
469  return vnet_bind_uri (a);
470 }
471 
472 static int
474 {
477  u32 num_threads;
478  int i;
479 
480  if (bpm->server_client_index == (u32) ~ 0)
482 
483  num_threads = 1 /* main thread */ + vtm->n_threads;
484  vec_validate (builtin_proxy_main.server_event_queue, num_threads - 1);
485  vec_validate (builtin_proxy_main.active_open_event_queue, num_threads - 1);
486  vec_validate (bpm->rx_buf, num_threads - 1);
487 
488  for (i = 0; i < num_threads; i++)
489  vec_validate (bpm->rx_buf[i], bpm->rcv_buffer_size);
490 
491  if (server_attach ())
492  {
493  clib_warning ("failed to attach server app");
494  return -1;
495  }
496  if (server_listen ())
497  {
498  clib_warning ("failed to start listening");
499  return -1;
500  }
501  if (active_open_attach ())
502  {
503  clib_warning ("failed to attach active open app");
504  return -1;
505  }
506 
507  for (i = 0; i < num_threads; i++)
508  {
509  bpm->active_open_event_queue[i] =
511 
513 
515  }
516 
517  return 0;
518 }
519 
520 static clib_error_t *
522  vlib_cli_command_t * cmd)
523 {
525  int rv;
526  u64 tmp;
527 
528  bpm->fifo_size = 64 << 10;
529  bpm->rcv_buffer_size = 1024;
530  bpm->prealloc_fifos = 0;
531  bpm->private_segment_count = 0;
532  bpm->private_segment_size = 0;
533 
535  {
536  if (unformat (input, "fifo-size %d", &bpm->fifo_size))
537  bpm->fifo_size <<= 10;
538  else if (unformat (input, "rcv-buf-size %d", &bpm->rcv_buffer_size))
539  ;
540  else if (unformat (input, "prealloc-fifos %d", &bpm->prealloc_fifos))
541  ;
542  else if (unformat (input, "private-segment-count %d",
543  &bpm->private_segment_count))
544  ;
545  else if (unformat (input, "private-segment-size %U",
546  unformat_memory_size, &tmp))
547  {
548  if (tmp >= 0x100000000ULL)
549  return clib_error_return
550  (0, "private segment size %lld (%llu) too large", tmp, tmp);
551  bpm->private_segment_size = tmp;
552  }
553  else
554  return clib_error_return (0, "unknown input `%U'",
555  format_unformat_error, input);
556  }
557 
558  vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ );
559 
560  rv = server_create (vm);
561  switch (rv)
562  {
563  case 0:
564  break;
565  default:
566  return clib_error_return (0, "server_create returned %d", rv);
567  }
568 
569  return 0;
570 }
571 
572 /* *INDENT-OFF* */
573 VLIB_CLI_COMMAND (server_create_command, static) =
574 {
575  .path = "test proxy server",
576  .short_help = "test proxy server",
577  .function = proxy_server_create_command_fn,
578 };
579 /* *INDENT-ON* */
580 
581 clib_error_t *
583 {
585  bpm->server_client_index = ~0;
586  bpm->active_open_client_index = ~0;
588  bpm->proxy_session_by_server_handle = hash_create (0, sizeof (uword));
589 
590  return 0;
591 }
592 
594 
595 /*
596 * fd.io coding-style-patch-verification: ON
597 *
598 * Local Variables:
599 * eval: (c-set-style "gnu")
600 * End:
601 */
u32 private_segment_size
size of private fifo segs
Definition: builtin_proxy.h:65
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:432
u32 server_app_index
server app index
Definition: builtin_proxy.h:51
u64 vpp_active_open_handle
Definition: builtin_proxy.h:38
#define hash_set(h, key, value)
Definition: hash.h:254
sll srl srl sll sra u16x4 i
Definition: vector_sse2.h:337
static clib_error_t * proxy_server_create_command_fn(vlib_main_t *vm, unformat_input_t *input, vlib_cli_command_t *cmd)
static void server_disconnect_callback(stream_session_t *s)
#define hash_unset(h, key)
Definition: hash.h:260
a
Definition: bitmap.h:516
struct _vnet_connect_args vnet_connect_args_t
static int server_create(vlib_main_t *vm)
int vnet_bind_uri(vnet_bind_args_t *a)
clib_spinlock_t sessions_lock
Definition: builtin_proxy.h:72
#define PREDICT_TRUE(x)
Definition: clib.h:98
unix_shared_memory_queue_t * vl_input_queue
Definition: api_common.h:68
static_always_inline void clib_spinlock_unlock_if_init(clib_spinlock_t *p)
Definition: lock.h:85
static int server_rx_callback(stream_session_t *s)
static int active_open_connected_callback(u32 app_index, u32 opaque, stream_session_t *s, u8 is_fail)
#define pool_get(P, E)
Allocate an object E from a pool P (unspecified alignment).
Definition: pool.h:225
uword * proxy_session_by_server_handle
Definition: builtin_proxy.h:55
proxy_session_t * sessions
Session pool, shared.
Definition: builtin_proxy.h:71
svm_fifo_t * server_rx_fifo
Definition: builtin_proxy.h:34
struct _svm_fifo svm_fifo_t
static void create_api_loopbacks(vlib_main_t *vm)
u32 active_open_client_index
active open API client handle
Definition: builtin_proxy.h:52
builtin_proxy_main_t builtin_proxy_main
Definition: builtin_proxy.c:22
#define VLIB_INIT_FUNCTION(x)
Definition: init.h:111
struct _vnet_disconnect_args_t vnet_disconnect_args_t
static int server_add_segment_callback(u32 client_index, const u8 *seg_name, u32 seg_size)
static u32 svm_fifo_max_dequeue(svm_fifo_t *f)
Definition: svm_fifo.h:100
static int active_open_attach(void)
struct _stream_session_cb_vft session_cb_vft_t
#define clib_error_return(e, args...)
Definition: error.h:99
static void server_reset_callback(stream_session_t *s)
unsigned long u64
Definition: types.h:89
struct vl_shmem_hdr_ * shmem_hdr
Binary API shared-memory segment header pointer.
Definition: api_common.h:241
struct _stream_session_t stream_session_t
u8 prealloc_fifos
Request fifo preallocation.
Definition: builtin_proxy.h:80
struct _vnet_app_attach_args_t vnet_app_attach_args_t
vl_shmem_hdr_t * shmem_hdr
int unix_shared_memory_queue_add(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
#define hash_get(h, key)
Definition: hash.h:248
unix_shared_memory_queue_t ** server_event_queue
per-thread vectors
Definition: builtin_proxy.h:45
#define pool_elt_at_index(p, i)
Returns pointer to element at given index.
Definition: pool.h:458
struct _unformat_input_t unformat_input_t
static int server_connected_callback(u32 app_index, u32 api_context, stream_session_t *s, u8 is_fail)
static int server_attach()
#define pool_put(P, E)
Free an object E in pool P.
Definition: pool.h:270
u32 server_client_index
server API client handle
Definition: builtin_proxy.h:50
#define PREDICT_FALSE(x)
Definition: clib.h:97
clib_error_t * vnet_session_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: session.c:928
API main structure, used by both vpp and binary API clients.
Definition: api_common.h:182
static int server_redirect_connect_callback(u32 client_index, void *mp)
static u8 svm_fifo_set_event(svm_fifo_t *f)
Sets fifo event flag.
Definition: svm_fifo.h:123
u32 private_segment_count
Number of private fifo segs.
Definition: builtin_proxy.h:64
static unix_shared_memory_queue_t * session_manager_get_vpp_event_queue(u32 thread_index)
Definition: session.h:345
api_main_t api_main
Definition: api_shared.c:35
#define UNFORMAT_END_OF_INPUT
Definition: format.h:143
static_always_inline uword vlib_get_thread_index(void)
Definition: threads.h:221
static void delete_proxy_session(stream_session_t *s, int is_active_open)
Definition: builtin_proxy.c:25
static void active_open_disconnect_callback(stream_session_t *s)
vlib_main_t * vm
Definition: buffer.c:283
u8 * format_stream_session(u8 *s, va_list *args)
Format stream session as per the following format.
Definition: session_cli.c:52
static int active_open_create_callback(stream_session_t *s)
#define clib_warning(format, args...)
Definition: error.h:59
clib_error_t * builtin_tcp_proxy_main_init(vlib_main_t *vm)
u32 vl_api_memclnt_create_internal(char *, unix_shared_memory_queue_t *)
Definition: memory_vlib.c:152
int vnet_disconnect_session(vnet_disconnect_args_t *a)
#define ARRAY_LEN(x)
Definition: clib.h:59
#define VLIB_CLI_COMMAND(x,...)
Definition: cli.h:154
static int server_accept_callback(stream_session_t *s)
int vnet_application_attach(vnet_app_attach_args_t *a)
Attaches application.
#define hash_create(elts, value_bytes)
Definition: hash.h:658
#define ASSERT(truth)
unsigned int u32
Definition: types.h:88
uword * proxy_session_by_active_open_handle
Definition: builtin_proxy.h:56
static void active_open_reset_callback(stream_session_t *s)
u64 uword
Definition: types.h:112
static u64 stream_session_handle(stream_session_t *s)
Definition: session.h:237
svm_fifo_t * server_tx_fifo
Definition: builtin_proxy.h:35
unsigned char u8
Definition: types.h:56
static int active_open_rx_callback(stream_session_t *s)
unix_shared_memory_queue_t ** active_open_event_queue
Definition: builtin_proxy.h:46
u8 ** rx_buf
intermediate rx buffers
Definition: builtin_proxy.h:47
static int server_listen()
unformat_function_t unformat_memory_size
Definition: format.h:294
u8 * format_unformat_error(u8 *s, va_list *va)
Definition: unformat.c:91
static vlib_thread_main_t * vlib_get_thread_main()
Definition: global_funcs.h:32
int vnet_connect_uri(vnet_connect_args_t *a)
int svm_fifo_peek(svm_fifo_t *f, u32 relative_offset, u32 max_bytes, u8 *copy_here)
Definition: svm_fifo.c:756
static_always_inline void clib_spinlock_lock_if_init(clib_spinlock_t *p)
Definition: lock.h:65
struct _vnet_bind_args_t vnet_bind_args_t
unix_shared_memory_queue_t * vl_input_queue
vpe input queue
Definition: builtin_proxy.h:43
static stream_session_t * stream_session_get_from_handle(u64 handle)
Definition: session.h:262
uword unformat(unformat_input_t *i, const char *fmt,...)
Definition: unformat.c:972
u32 active_open_app_index
active open index after attach
Definition: builtin_proxy.h:53
static uword unformat_check_input(unformat_input_t *i)
Definition: format.h:169
static session_cb_vft_t builtin_clients
static session_cb_vft_t builtin_session_cb_vft