FD.io VPP  v17.04-9-g99c0734
Vector Packet Processing
session.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19 
20 #include <vnet/session/session.h>
21 #include <vlibmemory/api.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
25 #include <vnet/tcp/tcp.h>
27 
28 /**
29  * Per-type vector of transport protocol virtual function tables
30  */
32 
34 
35 /*
36  * Session lookup key; (src-ip, dst-ip, src-port, dst-port, session-type)
37  * Value: (owner thread index << 32 | session_index);
38  */
39 static void
41  u64 value)
42 {
44  session_kv4_t kv4;
45  session_kv6_t kv6;
46 
47  switch (sst)
48  {
49  case SESSION_TYPE_IP4_UDP:
50  case SESSION_TYPE_IP4_TCP:
51  make_v4_ss_kv_from_tc (&kv4, tc);
52  kv4.value = value;
53  clib_bihash_add_del_16_8 (&smm->v4_session_hash, &kv4, 1 /* is_add */ );
54  break;
55  case SESSION_TYPE_IP6_UDP:
56  case SESSION_TYPE_IP6_TCP:
57  make_v6_ss_kv_from_tc (&kv6, tc);
58  kv6.value = value;
59  clib_bihash_add_del_48_8 (&smm->v6_session_hash, &kv6, 1 /* is_add */ );
60  break;
61  default:
62  clib_warning ("Session type not supported");
63  ASSERT (0);
64  }
65 }
66 
67 void
69  u64 value)
70 {
72 
73  tc = tp_vfts[s->session_type].get_connection (s->connection_index,
74  s->thread_index);
75  stream_session_table_add_for_tc (s->session_type, tc, value);
76 }
77 
78 static void
80  u64 value)
81 {
83  session_kv4_t kv4;
84  session_kv6_t kv6;
85 
86  switch (sst)
87  {
88  case SESSION_TYPE_IP4_UDP:
89  case SESSION_TYPE_IP4_TCP:
90  make_v4_ss_kv_from_tc (&kv4, tc);
91  kv4.value = value;
92  clib_bihash_add_del_16_8 (&smm->v4_half_open_hash, &kv4,
93  1 /* is_add */ );
94  break;
95  case SESSION_TYPE_IP6_UDP:
96  case SESSION_TYPE_IP6_TCP:
97  make_v6_ss_kv_from_tc (&kv6, tc);
98  kv6.value = value;
99  clib_bihash_add_del_48_8 (&smm->v6_half_open_hash, &kv6,
100  1 /* is_add */ );
101  break;
102  default:
103  clib_warning ("Session type not supported");
104  ASSERT (0);
105  }
106 }
107 
108 static int
111 {
112  session_kv4_t kv4;
113  session_kv6_t kv6;
114 
115  switch (sst)
116  {
117  case SESSION_TYPE_IP4_UDP:
118  case SESSION_TYPE_IP4_TCP:
119  make_v4_ss_kv_from_tc (&kv4, tc);
120  return clib_bihash_add_del_16_8 (&smm->v4_session_hash, &kv4,
121  0 /* is_add */ );
122  break;
123  case SESSION_TYPE_IP6_UDP:
124  case SESSION_TYPE_IP6_TCP:
125  make_v6_ss_kv_from_tc (&kv6, tc);
126  return clib_bihash_add_del_48_8 (&smm->v6_session_hash, &kv6,
127  0 /* is_add */ );
128  break;
129  default:
130  clib_warning ("Session type not supported");
131  ASSERT (0);
132  }
133 
134  return 0;
135 }
136 
137 static int
139 {
141 
142  ts = tp_vfts[s->session_type].get_connection (s->connection_index,
143  s->thread_index);
144  return stream_session_table_del_for_tc (smm, s->session_type, ts);
145 }
146 
147 static void
150 {
151  session_kv4_t kv4;
152  session_kv6_t kv6;
153 
154  switch (sst)
155  {
156  case SESSION_TYPE_IP4_UDP:
157  case SESSION_TYPE_IP4_TCP:
158  make_v4_ss_kv_from_tc (&kv4, tc);
159  clib_bihash_add_del_16_8 (&smm->v4_half_open_hash, &kv4,
160  0 /* is_add */ );
161  break;
162  case SESSION_TYPE_IP6_UDP:
163  case SESSION_TYPE_IP6_TCP:
164  make_v6_ss_kv_from_tc (&kv6, tc);
165  clib_bihash_add_del_48_8 (&smm->v6_half_open_hash, &kv6,
166  0 /* is_add */ );
167  break;
168  default:
169  clib_warning ("Session type not supported");
170  ASSERT (0);
171  }
172 }
173 
176 {
178  session_kv4_t kv4;
179  int rv;
180 
181  make_v4_listener_kv (&kv4, lcl, lcl_port, proto);
182  rv = clib_bihash_search_inline_16_8 (&smm->v4_session_hash, &kv4);
183  if (rv == 0)
184  return pool_elt_at_index (smm->listen_sessions[proto], (u32) kv4.value);
185 
186  /* Zero out the lcl ip */
187  kv4.key[0] = 0;
188  rv = clib_bihash_search_inline_16_8 (&smm->v4_session_hash, &kv4);
189  if (rv == 0)
190  return pool_elt_at_index (smm->listen_sessions[proto], kv4.value);
191 
192  return 0;
193 }
194 
195 /** Looks up a session based on the 5-tuple passed as argument.
196  *
197  * First it tries to find an established session, if this fails, it tries
198  * finding a listener session if this fails, it tries a lookup with a
199  * wildcarded local source (listener bound to all interfaces)
200  */
203  u16 lcl_port, u16 rmt_port, u8 proto,
204  u32 my_thread_index)
205 {
207  session_kv4_t kv4;
208  int rv;
209 
210  /* Lookup session amongst established ones */
211  make_v4_ss_kv (&kv4, lcl, rmt, lcl_port, rmt_port, proto);
212  rv = clib_bihash_search_inline_16_8 (&smm->v4_session_hash, &kv4);
213  if (rv == 0)
214  return stream_session_get_tsi (kv4.value, my_thread_index);
215 
216  /* If nothing is found, check if any listener is available */
217  return stream_session_lookup_listener4 (lcl, lcl_port, proto);
218 }
219 
222 {
224  session_kv6_t kv6;
225  int rv;
226 
227  make_v6_listener_kv (&kv6, lcl, lcl_port, proto);
228  rv = clib_bihash_search_inline_48_8 (&smm->v6_session_hash, &kv6);
229  if (rv == 0)
230  return pool_elt_at_index (smm->listen_sessions[proto], kv6.value);
231 
232  /* Zero out the lcl ip */
233  kv6.key[0] = kv6.key[1] = 0;
234  rv = clib_bihash_search_inline_48_8 (&smm->v6_session_hash, &kv6);
235  if (rv == 0)
236  return pool_elt_at_index (smm->listen_sessions[proto], kv6.value);
237 
238  return 0;
239 }
240 
241 /* Looks up a session based on the 5-tuple passed as argument.
242  * First it tries to find an established session, if this fails, it tries
243  * finding a listener session if this fails, it tries a lookup with a
244  * wildcarded local source (listener bound to all interfaces) */
247  u16 lcl_port, u16 rmt_port, u8 proto,
248  u32 my_thread_index)
249 {
251  session_kv6_t kv6;
252  int rv;
253 
254  make_v6_ss_kv (&kv6, lcl, rmt, lcl_port, rmt_port, proto);
255  rv = clib_bihash_search_inline_48_8 (&smm->v6_session_hash, &kv6);
256  if (rv == 0)
257  return stream_session_get_tsi (kv6.value, my_thread_index);
258 
259  /* If nothing is found, check if any listener is available */
260  return stream_session_lookup_listener6 (lcl, lcl_port, proto);
261 }
262 
264 stream_session_lookup_listener (ip46_address_t * lcl, u16 lcl_port, u8 proto)
265 {
266  switch (proto)
267  {
268  case SESSION_TYPE_IP4_UDP:
269  case SESSION_TYPE_IP4_TCP:
270  return stream_session_lookup_listener4 (&lcl->ip4, lcl_port, proto);
271  break;
272  case SESSION_TYPE_IP6_UDP:
273  case SESSION_TYPE_IP6_TCP:
274  return stream_session_lookup_listener6 (&lcl->ip6, lcl_port, proto);
275  break;
276  }
277  return 0;
278 }
279 
280 static u64
282  ip46_address_t * lcl, ip46_address_t * rmt,
283  u16 lcl_port, u16 rmt_port, u8 proto)
284 {
285  session_kv4_t kv4;
286  session_kv6_t kv6;
287  int rv;
288 
289  switch (proto)
290  {
291  case SESSION_TYPE_IP4_UDP:
292  case SESSION_TYPE_IP4_TCP:
293  make_v4_ss_kv (&kv4, &lcl->ip4, &rmt->ip4, lcl_port, rmt_port, proto);
294  rv = clib_bihash_search_inline_16_8 (&smm->v4_half_open_hash, &kv4);
295 
296  if (rv == 0)
297  return kv4.value;
298 
299  return (u64) ~ 0;
300  break;
301  case SESSION_TYPE_IP6_UDP:
302  case SESSION_TYPE_IP6_TCP:
303  make_v6_ss_kv (&kv6, &lcl->ip6, &rmt->ip6, lcl_port, rmt_port, proto);
304  rv = clib_bihash_search_inline_48_8 (&smm->v6_half_open_hash, &kv6);
305 
306  if (rv == 0)
307  return kv6.value;
308 
309  return (u64) ~ 0;
310  break;
311  }
312  return 0;
313 }
314 
317  u16 lcl_port, u16 rmt_port, u8 proto,
318  u32 my_thread_index)
319 {
321  session_kv4_t kv4;
322  stream_session_t *s;
323  int rv;
324 
325  /* Lookup session amongst established ones */
326  make_v4_ss_kv (&kv4, lcl, rmt, lcl_port, rmt_port, proto);
327  rv = clib_bihash_search_inline_16_8 (&smm->v4_session_hash, &kv4);
328  if (rv == 0)
329  {
330  s = stream_session_get_tsi (kv4.value, my_thread_index);
331 
332  return tp_vfts[s->session_type].get_connection (s->connection_index,
333  my_thread_index);
334  }
335 
336  /* If nothing is found, check if any listener is available */
337  s = stream_session_lookup_listener4 (lcl, lcl_port, proto);
338  if (s)
339  return tp_vfts[s->session_type].get_listener (s->connection_index);
340 
341  /* Finally, try half-open connections */
342  rv = clib_bihash_search_inline_16_8 (&smm->v4_half_open_hash, &kv4);
343  if (rv == 0)
344  return tp_vfts[proto].get_half_open (kv4.value & 0xFFFFFFFF);
345 
346  return 0;
347 }
348 
351  u16 lcl_port, u16 rmt_port, u8 proto,
352  u32 my_thread_index)
353 {
355  stream_session_t *s;
356  session_kv6_t kv6;
357  int rv;
358 
359  make_v6_ss_kv (&kv6, lcl, rmt, lcl_port, rmt_port, proto);
360  rv = clib_bihash_search_inline_48_8 (&smm->v6_session_hash, &kv6);
361  if (rv == 0)
362  {
363  s = stream_session_get_tsi (kv6.value, my_thread_index);
364 
365  return tp_vfts[s->session_type].get_connection (s->connection_index,
366  my_thread_index);
367  }
368 
369  /* If nothing is found, check if any listener is available */
370  s = stream_session_lookup_listener6 (lcl, lcl_port, proto);
371  if (s)
372  return tp_vfts[s->session_type].get_listener (s->connection_index);
373 
374  /* Finally, try half-open connections */
375  rv = clib_bihash_search_inline_48_8 (&smm->v6_half_open_hash, &kv6);
376  if (rv == 0)
377  return tp_vfts[proto].get_half_open (kv6.value & 0xFFFFFFFF);
378 
379  return 0;
380 }
381 
382 /**
383  * Allocate vpp event queue (once) per worker thread
384  */
385 void
387  u32 thread_index)
388 {
389  api_main_t *am = &api_main;
390  void *oldheap;
391 
392  if (smm->vpp_event_queues[thread_index] == 0)
393  {
394  /* Allocate event fifo in the /vpe-api shared-memory segment */
395  oldheap = svm_push_data_heap (am->vlib_rp);
396 
397  smm->vpp_event_queues[thread_index] =
398  unix_shared_memory_queue_init (2048 /* nels $$$$ config */ ,
399  sizeof (session_fifo_event_t),
400  0 /* consumer pid */ ,
401  0
402  /* (do not) send signal when queue non-empty */
403  );
404 
405  svm_pop_heap (oldheap);
406  }
407 }
408 
409 void
411 {
413  s = svm_fifo_get_segment (index);
414  *name = s->h->segment_name;
415  *size = s->ssvm.ssvm_size;
416 }
417 
418 always_inline int
420  session_manager_t * sm,
421  u32 segment_size, u8 * segment_name)
422 {
423  svm_fifo_segment_create_args_t _ca, *ca = &_ca;
424  int rv;
425 
426  memset (ca, 0, sizeof (*ca));
427 
428  ca->segment_name = (char *) segment_name;
429  ca->segment_size = segment_size;
430 
431  rv = svm_fifo_segment_create (ca);
432  if (rv)
433  {
434  clib_warning ("svm_fifo_segment_create ('%s', %d) failed",
435  ca->segment_name, ca->segment_size);
436  vec_free (segment_name);
437  return -1;
438  }
439 
440  vec_add1 (sm->segment_indices, ca->new_segment_index);
441 
442  return 0;
443 }
444 
445 static int
447  session_manager_t * sm)
448 {
449  u8 *segment_name;
450  svm_fifo_segment_create_args_t _ca, *ca = &_ca;
451  u32 add_segment_size;
452  u32 default_segment_size = 128 << 10;
453 
454  memset (ca, 0, sizeof (*ca));
455  segment_name = format (0, "%d-%d%c", getpid (),
456  smm->unique_segment_name_counter++, 0);
457  add_segment_size =
458  sm->add_segment_size ? sm->add_segment_size : default_segment_size;
459 
460  return session_manager_add_segment_i (smm, sm, add_segment_size,
461  segment_name);
462 }
463 
464 int
466  session_manager_t * sm, u32 segment_size,
467  u8 ** segment_name)
468 {
469  svm_fifo_segment_create_args_t _ca, *ca = &_ca;
470  memset (ca, 0, sizeof (*ca));
471  *segment_name = format (0, "%d-%d%c", getpid (),
472  smm->unique_segment_name_counter++, 0);
473  return session_manager_add_segment_i (smm, sm, segment_size, *segment_name);
474 }
475 
476 void
478 {
479  u32 *deleted_sessions = 0;
480  u32 *deleted_thread_indices = 0;
481  int i, j;
482 
483  /* Across all fifo segments used by the server */
484  for (j = 0; j < vec_len (sm->segment_indices); j++)
485  {
486  svm_fifo_segment_private_t *fifo_segment;
487  svm_fifo_t **fifos;
488  /* Vector of fifos allocated in the segment */
489  fifo_segment = svm_fifo_get_segment (sm->segment_indices[j]);
490  fifos = (svm_fifo_t **) fifo_segment->h->fifos;
491 
492  /*
493  * Remove any residual sessions from the session lookup table
494  * Don't bother deleting the individual fifos, we're going to
495  * throw away the fifo segment in a minute.
496  */
497  for (i = 0; i < vec_len (fifos); i++)
498  {
499  svm_fifo_t *fifo;
500  u32 session_index, thread_index;
501  stream_session_t *session;
502 
503  fifo = fifos[i];
504  session_index = fifo->server_session_index;
505  thread_index = fifo->server_thread_index;
506 
507  session = pool_elt_at_index (smm->sessions[thread_index],
508  session_index);
509 
510  /* Add to the deleted_sessions vector (once!) */
511  if (!session->is_deleted)
512  {
513  session->is_deleted = 1;
514  vec_add1 (deleted_sessions,
515  session - smm->sessions[thread_index]);
516  vec_add1 (deleted_thread_indices, thread_index);
517  }
518  }
519 
520  for (i = 0; i < vec_len (deleted_sessions); i++)
521  {
522  stream_session_t *session;
523 
524  session =
525  pool_elt_at_index (smm->sessions[deleted_thread_indices[i]],
526  deleted_sessions[i]);
527 
528  /* Instead of directly removing the session call disconnect */
529  stream_session_disconnect (session);
530 
531  /*
532  stream_session_table_del (smm, session);
533  pool_put(smm->sessions[deleted_thread_indices[i]], session);
534  */
535  }
536 
537  vec_reset_length (deleted_sessions);
538  vec_reset_length (deleted_thread_indices);
539 
540  /* Instead of removing the segment, test when removing the session if
541  * the segment can be removed
542  */
543  /* svm_fifo_segment_delete (fifo_segment); */
544  }
545 
546  vec_free (deleted_sessions);
547  vec_free (deleted_thread_indices);
548 }
549 
550 int
552  session_manager_t * sm,
553  svm_fifo_t ** server_rx_fifo,
554  svm_fifo_t ** server_tx_fifo,
555  u32 * fifo_segment_index,
556  u8 * added_a_segment)
557 {
558  svm_fifo_segment_private_t *fifo_segment;
559  u32 fifo_size, default_fifo_size = 128 << 10; /* TODO config */
560  int i;
561 
562  *added_a_segment = 0;
563 
564  /* Allocate svm fifos */
565  ASSERT (vec_len (sm->segment_indices));
566 
567 again:
568  for (i = 0; i < vec_len (sm->segment_indices); i++)
569  {
570  *fifo_segment_index = sm->segment_indices[i];
571  fifo_segment = svm_fifo_get_segment (*fifo_segment_index);
572 
573  fifo_size = sm->rx_fifo_size;
574  fifo_size = (fifo_size == 0) ? default_fifo_size : fifo_size;
575  *server_rx_fifo = svm_fifo_segment_alloc_fifo (fifo_segment, fifo_size);
576 
577  fifo_size = sm->tx_fifo_size;
578  fifo_size = (fifo_size == 0) ? default_fifo_size : fifo_size;
579  *server_tx_fifo = svm_fifo_segment_alloc_fifo (fifo_segment, fifo_size);
580 
581  if (*server_rx_fifo == 0)
582  {
583  /* This would be very odd, but handle it... */
584  if (*server_tx_fifo != 0)
585  {
586  svm_fifo_segment_free_fifo (fifo_segment, *server_tx_fifo);
587  *server_tx_fifo = 0;
588  }
589  continue;
590  }
591  if (*server_tx_fifo == 0)
592  {
593  if (*server_rx_fifo != 0)
594  {
595  svm_fifo_segment_free_fifo (fifo_segment, *server_rx_fifo);
596  *server_rx_fifo = 0;
597  }
598  continue;
599  }
600  break;
601  }
602 
603  /* See if we're supposed to create another segment */
604  if (*server_rx_fifo == 0)
605  {
606  if (sm->add_segment)
607  {
608  if (*added_a_segment)
609  {
610  clib_warning ("added a segment, still cant allocate a fifo");
611  return SESSION_ERROR_NEW_SEG_NO_SPACE;
612  }
613 
614  if (session_manager_add_segment (smm, sm))
615  return VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
616 
617  *added_a_segment = 1;
618  goto again;
619  }
620  else
621  {
622  clib_warning ("No space to allocate fifos!");
623  return SESSION_ERROR_NO_SPACE;
624  }
625  }
626  return 0;
627 }
628 
629 int
632  stream_session_t ** ret_s)
633 {
634  int rv;
635  svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
636  u32 fifo_segment_index;
637  u32 pool_index, seg_size;
638  stream_session_t *s;
639  u64 value;
640  u32 thread_index = tc->thread_index;
641  session_manager_t *sm;
642  u8 segment_added;
643  u8 *seg_name;
644 
645  sm = session_manager_get (app->session_manager_index);
646 
647  /* Check the API queue */
648  if (app->mode == APP_SERVER && application_api_queue_is_full (app))
649  return SESSION_ERROR_API_QUEUE_FULL;
650 
651  if ((rv = session_manager_allocate_session_fifos (smm, sm, &server_rx_fifo,
652  &server_tx_fifo,
653  &fifo_segment_index,
654  &segment_added)))
655  return rv;
656 
657  if (segment_added && app->mode == APP_SERVER)
658  {
659  /* Send an API message to the external server, to map new segment */
660  ASSERT (app->cb_fns.add_segment_callback);
661 
662  session_manager_get_segment_info (fifo_segment_index, &seg_name,
663  &seg_size);
664  if (app->cb_fns.add_segment_callback (app->api_client_index, seg_name,
665  seg_size))
666  return VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
667  }
668 
669  /* Create the session */
670  pool_get (smm->sessions[thread_index], s);
671  memset (s, 0, sizeof (*s));
672 
673  /* Initialize backpointers */
674  pool_index = s - smm->sessions[thread_index];
675  server_rx_fifo->server_session_index = pool_index;
676  server_rx_fifo->server_thread_index = thread_index;
677 
678  server_tx_fifo->server_session_index = pool_index;
679  server_tx_fifo->server_thread_index = thread_index;
680 
681  s->server_rx_fifo = server_rx_fifo;
682  s->server_tx_fifo = server_tx_fifo;
683 
684  /* Initialize state machine, such as it is... */
685  s->session_type = app->session_type;
686  s->session_state = SESSION_STATE_CONNECTING;
687  s->app_index = application_get_index (app);
688  s->server_segment_index = fifo_segment_index;
689  s->thread_index = thread_index;
690  s->session_index = pool_index;
691 
692  /* Attach transport to session */
693  s->connection_index = tc->c_index;
694 
695  /* Attach session to transport */
696  tc->s_index = s->session_index;
697 
698  /* Add to the main lookup table */
699  value = (((u64) thread_index) << 32) | (u64) s->session_index;
700  stream_session_table_add_for_tc (app->session_type, tc, value);
701 
702  *ret_s = s;
703 
704  return 0;
705 }
706 
707 /*
708  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
709  * event but on request can queue notification events for later delivery by
710  * calling stream_server_flush_enqueue_events().
711  *
712  * @param tc Transport connection which is to be enqueued data
713  * @param data Data to be enqueued
714  * @param len Length of data to be enqueued
715  * @param queue_event Flag to indicate if peer is to be notified or if event
716  * is to be queued. The former is useful when more data is
717  * enqueued and only one event is to be generated.
718  * @return Number of bytes enqueued or a negative value if enqueueing failed.
719  */
720 int
722  u8 queue_event)
723 {
724  stream_session_t *s;
725  int enqueued;
726 
727  s = stream_session_get (tc->s_index, tc->thread_index);
728 
729  /* Make sure there's enough space left. We might've filled the pipes */
730  if (PREDICT_FALSE (len > svm_fifo_max_enqueue (s->server_rx_fifo)))
731  return -1;
732 
733  enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, s->pid, len, data);
734 
735  if (queue_event)
736  {
737  /* Queue RX event on this fifo. Eventually these will need to be flushed
738  * by calling stream_server_flush_enqueue_events () */
740  u32 thread_index = s->thread_index;
741  u32 my_enqueue_epoch = smm->current_enqueue_epoch[thread_index];
742 
743  if (s->enqueue_epoch != my_enqueue_epoch)
744  {
745  s->enqueue_epoch = my_enqueue_epoch;
746  vec_add1 (smm->session_indices_to_enqueue_by_thread[thread_index],
747  s - smm->sessions[thread_index]);
748  }
749  }
750 
751  return enqueued;
752 }
753 
754 /** Check if we have space in rx fifo to push more bytes */
755 u8
757  u16 data_len)
758 {
759  stream_session_t *s = stream_session_get (tc->c_index, thread_index);
760 
761  if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
762  return 1;
763 
764  if (data_len > svm_fifo_max_enqueue (s->server_rx_fifo))
765  return 1;
766 
767  return 0;
768 }
769 
770 u32
772  u32 offset, u32 max_bytes)
773 {
774  stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
775  return svm_fifo_peek (s->server_tx_fifo, s->pid, offset, max_bytes, buffer);
776 }
777 
778 u32
780 {
781  stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
782  return svm_fifo_dequeue_drop (s->server_tx_fifo, s->pid, max_bytes);
783 }
784 
785 /**
786  * Notify session peer that new data has been enqueued.
787  *
788  * @param s Stream session for which the event is to be generated.
789  * @param block Flag to indicate if call should block if event queue is full.
790  *
791  * @return 0 on succes or negative number if failed to send notification.
792  */
793 static int
795 {
796  application_t *app;
797  session_fifo_event_t evt;
799  static u32 serial_number;
800 
801  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
802  return 0;
803 
804  /* Get session's server */
805  app = application_get (s->app_index);
806 
807  /* Fabricate event */
808  evt.fifo = s->server_rx_fifo;
809  evt.event_type = FIFO_EVENT_SERVER_RX;
810  evt.event_id = serial_number++;
811  evt.enqueue_length = svm_fifo_max_dequeue (s->server_rx_fifo);
812 
813  /* Built-in server? Hand event to the callback... */
814  if (app->cb_fns.builtin_server_rx_callback)
815  return app->cb_fns.builtin_server_rx_callback (s, &evt);
816 
817  /* Add event to server's event queue */
818  q = app->event_queue;
819 
820  /* Based on request block (or not) for lack of space */
821  if (block || PREDICT_TRUE (q->cursize < q->maxsize))
822  unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
823  0 /* do wait for mutex */ );
824  else
825  return -1;
826 
827  /* *INDENT-OFF* */
828  SESSION_EVT_DBG(s, SESSION_EVT_ENQ, ({
829  ed->data[0] = evt.event_id;
830  ed->data[1] = evt.enqueue_length;
831  }));
832  /* *INDENT-ON* */
833 
834  return 0;
835 }
836 
837 /**
838  * Flushes queue of sessions that are to be notified of new data
839  * enqueued events.
840  *
841  * @param thread_index Thread index for which the flush is to be performed.
842  * @return 0 on success or a positive number indicating the number of
843  * failures due to API queue being full.
844  */
845 int
847 {
849  u32 *session_indices_to_enqueue;
850  int i, errors = 0;
851 
852  session_indices_to_enqueue =
853  smm->session_indices_to_enqueue_by_thread[thread_index];
854 
855  for (i = 0; i < vec_len (session_indices_to_enqueue); i++)
856  {
857  stream_session_t *s0;
858 
859  /* Get session */
860  s0 = stream_session_get (session_indices_to_enqueue[i], thread_index);
861  if (stream_session_enqueue_notify (s0, 0 /* don't block */ ))
862  {
863  errors++;
864  }
865  }
866 
867  vec_reset_length (session_indices_to_enqueue);
868 
869  smm->session_indices_to_enqueue_by_thread[thread_index] =
870  session_indices_to_enqueue;
871 
872  /* Increment enqueue epoch for next round */
873  smm->current_enqueue_epoch[thread_index]++;
874 
875  return errors;
876 }
877 
878 /*
879  * Start listening on server's ip/port pair for requested transport.
880  *
881  * Creates a 'dummy' stream session with state LISTENING to be used in session
882  * lookups, prior to establishing connection. Requests transport to build
883  * it's own specific listening connection.
884  */
885 int
886 stream_session_start_listen (u32 server_index, ip46_address_t * ip, u16 port)
887 {
889  stream_session_t *s;
891  application_t *srv;
892  u32 tci;
893 
894  srv = application_get (server_index);
895 
896  pool_get (smm->listen_sessions[srv->session_type], s);
897  memset (s, 0, sizeof (*s));
898 
899  s->session_type = srv->session_type;
900  s->session_state = SESSION_STATE_LISTENING;
901  s->session_index = s - smm->listen_sessions[srv->session_type];
902  s->app_index = srv->index;
903 
904  /* Transport bind/listen */
905  tci = tp_vfts[srv->session_type].bind (s->session_index, ip, port);
906 
907  /* Attach transport to session */
908  s->connection_index = tci;
909  tc = tp_vfts[srv->session_type].get_listener (tci);
910 
911  srv->session_index = s->session_index;
912 
913  /* Add to the main lookup table */
914  stream_session_table_add_for_tc (s->session_type, tc, s->session_index);
915 
916  return 0;
917 }
918 
919 void
921 {
923  stream_session_t *listener;
925  application_t *srv;
926 
927  srv = application_get (server_index);
928  listener = pool_elt_at_index (smm->listen_sessions[srv->session_type],
929  srv->session_index);
930 
931  tc = tp_vfts[srv->session_type].get_listener (listener->connection_index);
932  stream_session_table_del_for_tc (smm, listener->session_type, tc);
933 
934  tp_vfts[srv->session_type].unbind (listener->connection_index);
935  pool_put (smm->listen_sessions[srv->session_type], listener);
936 }
937 
938 int
940  u32 segment_size)
941 {
942  /* Does exactly nothing, but die */
943  ASSERT (0);
944  return 0;
945 }
946 
947 void
949 {
950  session_manager_t *sm;
951  u32 connect_fifo_size = 256 << 10; /* Config? */
952  u32 default_segment_size = 1 << 20;
953 
954  pool_get (smm->session_managers, sm);
955  memset (sm, 0, sizeof (*sm));
956 
957  sm->add_segment_size = default_segment_size;
958  sm->rx_fifo_size = connect_fifo_size;
959  sm->tx_fifo_size = connect_fifo_size;
960  sm->add_segment = 1;
961 
962  session_manager_add_segment (smm, sm);
963  smm->connect_manager_index[session_type] = sm - smm->session_managers;
964 }
965 
966 void
968  u8 is_fail)
969 {
971  application_t *app;
972  stream_session_t *new_s = 0;
973  u64 value;
974 
975  value = stream_session_half_open_lookup (smm, &tc->lcl_ip, &tc->rmt_ip,
976  tc->lcl_port, tc->rmt_port,
977  tc->proto);
978  if (value == HALF_OPEN_LOOKUP_INVALID_VALUE)
979  {
980  clib_warning ("This can't be good!");
981  return;
982  }
983 
984  app = application_get (value >> 32);
985 
986  if (!is_fail)
987  {
988  /* Create new session (server segments are allocated if needed) */
989  if (stream_session_create_i (smm, app, tc, &new_s))
990  return;
991 
992  app->session_index = stream_session_get_index (new_s);
993  app->thread_index = new_s->thread_index;
994 
995  /* Allocate vpp event queue for this thread if needed */
996  vpp_session_event_queue_allocate (smm, tc->thread_index);
997  }
998 
999  /* Notify client */
1000  app->cb_fns.session_connected_callback (app->api_client_index, new_s,
1001  is_fail);
1002 
1003  /* Cleanup session lookup */
1004  stream_session_half_open_table_del (smm, sst, tc);
1005 }
1006 
1007 void
1009 {
1010  application_t *server;
1011  stream_session_t *s;
1012 
1013  s = stream_session_get (tc->s_index, tc->thread_index);
1014  server = application_get (s->app_index);
1015  server->cb_fns.session_accept_callback (s);
1016 }
1017 
1018 /**
1019  * Notification from transport that connection is being closed.
1020  *
1021  * A disconnect is sent to application but state is not removed. Once
1022  * disconnect is acknowledged by application, session disconnect is called.
1023  * Ultimately this leads to close being called on transport (passive close).
1024  */
1025 void
1027 {
1028  application_t *server;
1029  stream_session_t *s;
1030 
1031  s = stream_session_get (tc->s_index, tc->thread_index);
1032  server = application_get (s->app_index);
1033  server->cb_fns.session_disconnect_callback (s);
1034 }
1035 
1036 /**
1037  * Cleans up session and associated app if needed.
1038  */
1039 void
1041 {
1043  svm_fifo_segment_private_t *fifo_segment;
1044  application_t *app;
1045 
1046  /* Delete from the main lookup table. */
1047  stream_session_table_del (smm, s);
1048 
1049  /* Cleanup fifo segments */
1050  fifo_segment = svm_fifo_get_segment (s->server_segment_index);
1051  svm_fifo_segment_free_fifo (fifo_segment, s->server_rx_fifo);
1052  svm_fifo_segment_free_fifo (fifo_segment, s->server_tx_fifo);
1053 
1054  app = application_get_if_valid (s->app_index);
1055 
1056  /* No app. A possibility: after disconnect application called unbind */
1057  if (!app)
1058  return;
1059 
1060  if (app->mode == APP_CLIENT)
1061  {
1062  /* Cleanup app if client */
1063  application_del (app);
1064  }
1065  else if (app->mode == APP_SERVER)
1066  {
1067  session_manager_t *sm;
1068  svm_fifo_segment_private_t *fifo_segment;
1069  svm_fifo_t **fifos;
1070  u32 fifo_index;
1071 
1072  /* For server, see if any segments can be removed */
1073  sm = session_manager_get (app->session_manager_index);
1074 
1075  /* Delete fifo */
1076  fifo_segment = svm_fifo_get_segment (s->server_segment_index);
1077  fifos = (svm_fifo_t **) fifo_segment->h->fifos;
1078 
1079  fifo_index = svm_fifo_segment_index (fifo_segment);
1080 
1081  /* Remove segment only if it holds no fifos and not the first */
1082  if (sm->segment_indices[0] != fifo_index && vec_len (fifos) == 0)
1083  svm_fifo_segment_delete (fifo_segment);
1084  }
1085 
1086  pool_put (smm->sessions[s->thread_index], s);
1087 }
1088 
1089 /**
1090  * Notification from transport that connection is being deleted
1091  *
1092  * This should be called only on previously fully established sessions. For
1093  * instance failed connects should call stream_session_connect_notify and
1094  * indicate that the connect has failed.
1095  */
1096 void
1098 {
1099  stream_session_t *s;
1100 
1101  /* App might've been removed already */
1102  s = stream_session_get_if_valid (tc->s_index, tc->thread_index);
1103  if (!s)
1104  {
1105  return;
1106  }
1108 }
1109 
1110 /**
1111  * Notify application that connection has been reset.
1112  */
1113 void
1115 {
1116  stream_session_t *s;
1117  application_t *app;
1118  s = stream_session_get (tc->s_index, tc->thread_index);
1119 
1120  app = application_get (s->app_index);
1121  app->cb_fns.session_reset_callback (s);
1122 }
1123 
1124 /**
1125  * Accept a stream session. Optionally ping the server by callback.
1126  */
1127 int
1129  u8 sst, u8 notify)
1130 {
1132  application_t *server;
1133  stream_session_t *s, *listener;
1134 
1135  int rv;
1136 
1137  /* Find the server */
1138  listener = pool_elt_at_index (smm->listen_sessions[sst], listener_index);
1139  server = application_get (listener->app_index);
1140 
1141  if ((rv = stream_session_create_i (smm, server, tc, &s)))
1142  return rv;
1143 
1144  /* Allocate vpp event queue for this thread if needed */
1145  vpp_session_event_queue_allocate (smm, tc->thread_index);
1146 
1147  /* Shoulder-tap the server */
1148  if (notify)
1149  {
1150  server->cb_fns.session_accept_callback (s);
1151  }
1152 
1153  return 0;
1154 }
1155 
1156 int
1157 stream_session_open (u8 sst, ip46_address_t * addr, u16 port_host_byte_order,
1158  u32 app_index)
1159 {
1161  u32 tci;
1162  u64 value;
1163  int rv;
1164 
1165  /* Ask transport to open connection */
1166  rv = tp_vfts[sst].open (addr, port_host_byte_order);
1167  if (rv < 0)
1168  {
1169  clib_warning ("Transport failed to open connection.");
1170  return VNET_API_ERROR_SESSION_CONNECT_FAIL;
1171  }
1172 
1173  tci = rv;
1174 
1175  /* Get transport connection */
1176  tc = tp_vfts[sst].get_half_open (tci);
1177 
1178  /* Store api_client_index and transport connection index */
1179  value = (((u64) app_index) << 32) | (u64) tc->c_index;
1180 
1181  /* Add to the half-open lookup table */
1182  stream_session_half_open_table_add (sst, tc, value);
1183 
1184  return 0;
1185 }
1186 
1187 /**
1188  * Disconnect session and propagate to transport. This should eventually
1189  * result in a delete notification that allows us to cleanup session state.
1190  * Called for both active/passive disconnects.
1191  */
1192 void
1194 {
1195  s->session_state = SESSION_STATE_CLOSED;
1196  tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
1197 }
1198 
1199 /**
1200  * Cleanup transport and session state.
1201  *
1202  * Notify transport of the cleanup, wait for a delete notify to actually
1203  * remove the session state.
1204  */
1205 void
1207 {
1209  int rv;
1210 
1211  s->session_state = SESSION_STATE_CLOSED;
1212 
1213  /* Delete from the main lookup table to avoid more enqueues */
1214  rv = stream_session_table_del (smm, s);
1215  if (rv)
1216  clib_warning ("hash delete error, rv %d", rv);
1217 
1218  tp_vfts[s->session_type].cleanup (s->connection_index, s->thread_index);
1219 }
1220 
1221 void
1223 {
1225 
1226  vec_validate (tp_vfts, type);
1227  tp_vfts[type] = *vft;
1228 
1229  /* If an offset function is provided, then peek instead of dequeue */
1230  smm->session_tx_fns[type] =
1231  (vft->tx_fifo_offset) ? session_tx_fifo_peek_and_snd :
1233 }
1234 
1237 {
1238  if (type >= vec_len (tp_vfts))
1239  return 0;
1240  return &tp_vfts[type];
1241 }
1242 
1243 static clib_error_t *
1245 {
1248  u32 num_threads;
1249  int i;
1250 
1251  num_threads = 1 /* main thread */ + vtm->n_threads;
1252 
1253  if (num_threads < 1)
1254  return clib_error_return (0, "n_thread_stacks not set");
1255 
1256  /* $$$ config parameters */
1257  svm_fifo_segment_init (0x200000000ULL /* first segment base VA */ ,
1258  20 /* timeout in seconds */ );
1259 
1260  /* configure per-thread ** vectors */
1261  vec_validate (smm->sessions, num_threads - 1);
1262  vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1);
1263  vec_validate (smm->tx_buffers, num_threads - 1);
1264  vec_validate (smm->fifo_events, num_threads - 1);
1265  vec_validate (smm->evts_partially_read, num_threads - 1);
1266  vec_validate (smm->current_enqueue_epoch, num_threads - 1);
1267  vec_validate (smm->vpp_event_queues, num_threads - 1);
1268 
1269  /* $$$$ preallocate hack config parameter */
1270  for (i = 0; i < 200000; i++)
1271  {
1272  stream_session_t *ss;
1273  pool_get (smm->sessions[0], ss);
1274  memset (ss, 0, sizeof (*ss));
1275  }
1276 
1277  for (i = 0; i < 200000; i++)
1278  pool_put_index (smm->sessions[0], i);
1279 
1280  clib_bihash_init_16_8 (&smm->v4_session_hash, "v4 session table",
1281  200000 /* $$$$ config parameter nbuckets */ ,
1282  (64 << 20) /*$$$ config parameter table size */ );
1283  clib_bihash_init_48_8 (&smm->v6_session_hash, "v6 session table",
1284  200000 /* $$$$ config parameter nbuckets */ ,
1285  (64 << 20) /*$$$ config parameter table size */ );
1286 
1287  clib_bihash_init_16_8 (&smm->v4_half_open_hash, "v4 half-open table",
1288  200000 /* $$$$ config parameter nbuckets */ ,
1289  (64 << 20) /*$$$ config parameter table size */ );
1290  clib_bihash_init_48_8 (&smm->v6_half_open_hash, "v6 half-open table",
1291  200000 /* $$$$ config parameter nbuckets */ ,
1292  (64 << 20) /*$$$ config parameter table size */ );
1293 
1294  for (i = 0; i < SESSION_N_TYPES; i++)
1295  smm->connect_manager_index[i] = INVALID_INDEX;
1296 
1297  smm->is_enabled = 1;
1298 
1299  /* Enable TCP transport */
1300  vnet_tcp_enable_disable (vm, 1);
1301 
1302  return 0;
1303 }
1304 
1305 clib_error_t *
1307 {
1308  if (is_en)
1309  {
1310  if (session_manager_main.is_enabled)
1311  return 0;
1312 
1314  VLIB_NODE_STATE_POLLING);
1315 
1316  return session_manager_main_enable (vm);
1317  }
1318  else
1319  {
1320  session_manager_main.is_enabled = 0;
1322  VLIB_NODE_STATE_DISABLED);
1323  }
1324 
1325  return 0;
1326 }
1327 
1328 clib_error_t *
1330 {
1332 
1333  smm->vlib_main = vm;
1334  smm->vnet_main = vnet_get_main ();
1335  smm->is_enabled = 0;
1336 
1337  return 0;
1338 }
1339 
1341 /*
1342  * fd.io coding-style-patch-verification: ON
1343  *
1344  * Local Variables:
1345  * eval: (c-set-style "gnu")
1346  * End:
1347  */
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:436
u64 ssvm_size
Definition: ssvm.h:77
int session_manager_flush_enqueue_events(u32 thread_index)
Flushes queue of sessions that are to be notified of new data enqueued events.
Definition: session.c:846
void vpp_session_event_queue_allocate(session_manager_main_t *smm, u32 thread_index)
Allocate vpp event queue (once) per worker thread.
Definition: session.c:386
sll srl srl sll sra u16x4 i
Definition: vector_sse2.h:343
u32 stream_session_peek_bytes(transport_connection_t *tc, u8 *buffer, u32 offset, u32 max_bytes)
Definition: session.c:771
static void svm_pop_heap(void *oldheap)
Definition: svm.h:190
int svm_fifo_enqueue_nowait(svm_fifo_t *f, int pid, u32 max_bytes, u8 *copy_from_here)
Definition: svm_fifo.c:359
int stream_session_accept(transport_connection_t *tc, u32 listener_index, u8 sst, u8 notify)
Accept a stream session.
Definition: session.c:1128
void stream_session_table_add(session_manager_main_t *smm, stream_session_t *s, u64 value)
Definition: session.c:68
void stream_session_connect_notify(transport_connection_t *tc, u8 sst, u8 is_fail)
Definition: session.c:967
static transport_proto_vft_t * tp_vfts
Per-type vector of transport protocol virtual function tables.
Definition: session.c:31
stream_session_t * stream_session_lookup_listener4(ip4_address_t *lcl, u16 lcl_port, u8 proto)
Definition: session.c:175
vnet_main_t * vnet_get_main(void)
Definition: misc.c:46
struct _transport_connection transport_connection_t
int session_manager_allocate_session_fifos(session_manager_main_t *smm, session_manager_t *sm, svm_fifo_t **server_rx_fifo, svm_fifo_t **server_tx_fifo, u32 *fifo_segment_index, u8 *added_a_segment)
Definition: session.c:551
static void make_v4_ss_kv_from_tc(session_kv4_t *kv, transport_connection_t *t)
Definition: transport.h:173
#define PREDICT_TRUE(x)
Definition: clib.h:98
session_manager_main_t session_manager_main
Definition: session.c:33
void session_manager_del(session_manager_main_t *smm, session_manager_t *sm)
Definition: session.c:477
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
Definition: vec.h:522
static void make_v6_ss_kv(session_kv6_t *kv, ip6_address_t *lcl, ip6_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto)
Definition: transport.h:180
static void make_v6_ss_kv_from_tc(session_kv6_t *kv, transport_connection_t *t)
Definition: transport.h:228
static u32 svm_fifo_max_enqueue(svm_fifo_t *f)
Definition: svm_fifo.h:104
u8 * format(u8 *s, const char *fmt,...)
Definition: format.c:418
int session_manager_add_first_segment(session_manager_main_t *smm, session_manager_t *sm, u32 segment_size, u8 **segment_name)
Definition: session.c:465
#define pool_get(P, E)
Allocate an object E from a pool P (unspecified alignment).
Definition: pool.h:200
int stream_session_open(u8 sst, ip46_address_t *addr, u16 port_host_byte_order, u32 app_index)
Definition: session.c:1157
session_fifo_rx_fn session_tx_fifo_peek_and_snd
static void make_v6_listener_kv(session_kv6_t *kv, ip6_address_t *lcl, u16 lcl_port, u8 proto)
Definition: transport.h:204
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
struct _stream_session_t stream_session_t
api_main_t api_main
Definition: api_shared.c:35
static void make_v4_listener_kv(session_kv4_t *kv, ip4_address_t *lcl, u16 lcl_port, u8 proto)
Definition: transport.h:155
static stream_session_t * stream_session_get(u64 si, u32 thread_index)
Definition: session.h:303
void session_manager_get_segment_info(u32 index, u8 **name, u32 *size)
Definition: session.c:410
static void stream_session_half_open_table_del(session_manager_main_t *smm, u8 sst, transport_connection_t *tc)
Definition: session.c:148
static int session_manager_add_segment_i(session_manager_main_t *smm, session_manager_t *sm, u32 segment_size, u8 *segment_name)
Definition: session.c:419
#define VLIB_INIT_FUNCTION(x)
Definition: init.h:111
static void * svm_push_data_heap(svm_region_t *rp)
Definition: svm.h:182
static void stream_session_table_add_for_tc(u8 sst, transport_connection_t *tc, u64 value)
Definition: session.c:40
void stream_session_accept_notify(transport_connection_t *tc)
Definition: session.c:1008
#define always_inline
Definition: clib.h:84
static u32 svm_fifo_max_dequeue(svm_fifo_t *f)
Definition: svm_fifo.h:98
static u64 stream_session_half_open_lookup(session_manager_main_t *smm, ip46_address_t *lcl, ip46_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto)
Definition: session.c:281
stream_session_t * stream_session_lookup_listener6(ip6_address_t *lcl, u16 lcl_port, u8 proto)
Definition: session.c:221
#define clib_error_return(e, args...)
Definition: error.h:111
svm_region_t * vlib_rp
Definition: api.h:142
static session_manager_t * session_manager_get(u32 index)
Definition: session.h:236
stream_session_t * stream_session_lookup4(ip4_address_t *lcl, ip4_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto, u32 my_thread_index)
Looks up a session based on the 5-tuple passed as argument.
Definition: session.c:202
unsigned long u64
Definition: types.h:89
void stream_session_delete_notify(transport_connection_t *tc)
Notification from transport that connection is being deleted.
Definition: session.c:1097
void stream_session_cleanup(stream_session_t *s)
Cleanup transport and session state.
Definition: session.c:1206
void stream_session_delete(stream_session_t *s)
Cleans up session and associated app if needed.
Definition: session.c:1040
transport_connection_t * stream_session_lookup_transport4(ip4_address_t *lcl, ip4_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto, u32 my_thread_index)
Definition: session.c:316
static int session_manager_add_segment(session_manager_main_t *smm, session_manager_t *sm)
Definition: session.c:446
static clib_error_t * session_manager_main_enable(vlib_main_t *vm)
Definition: session.c:1244
struct _transport_proto_vft transport_proto_vft_t
int unix_shared_memory_queue_add(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
u32 stream_session_dequeue_drop(transport_connection_t *tc, u32 max_bytes)
Definition: session.c:779
static session_manager_main_t * vnet_get_session_manager_main()
Definition: session.h:230
#define pool_elt_at_index(p, i)
Returns pointer to element at given index.
Definition: pool.h:397
int stream_session_create_i(session_manager_main_t *smm, application_t *app, transport_connection_t *tc, stream_session_t **ret_s)
Definition: session.c:630
int svm_fifo_dequeue_drop(svm_fifo_t *f, int pid, u32 max_bytes)
Definition: svm_fifo.c:527
#define pool_put(P, E)
Free an object E in pool P.
Definition: pool.h:241
void svm_fifo_segment_init(u64 baseva, u32 timeout_in_seconds)
#define PREDICT_FALSE(x)
Definition: clib.h:97
clib_error_t * vnet_tcp_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: tcp.c:780
svm_fifo_segment_header_t * h
clib_error_t * session_manager_main_init(vlib_main_t *vm)
Definition: session.c:1329
struct _session_manager_main session_manager_main_t
Definition: session.h:157
clib_error_t * vnet_session_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: session.c:1306
u32 svm_fifo_segment_index(svm_fifo_segment_private_t *s)
session_fifo_rx_fn session_tx_fifo_dequeue_and_snd
svm_fifo_t * svm_fifo_segment_alloc_fifo(svm_fifo_segment_private_t *s, u32 data_size_in_bytes)
void connects_session_manager_init(session_manager_main_t *smm, u8 session_type)
Definition: session.c:948
vlib_main_t * vm
Definition: buffer.c:276
#define INVALID_INDEX
Definition: session.h:25
static u32 stream_session_get_index(stream_session_t *s)
Definition: session.h:327
int stream_session_start_listen(u32 server_index, ip46_address_t *ip, u16 port)
Definition: session.c:886
#define vec_free(V)
Free vector&#39;s memory (no header).
Definition: vec.h:340
#define clib_warning(format, args...)
Definition: error.h:59
struct _session_manager session_manager_t
struct _application application_t
static stream_session_t * stream_session_get_if_valid(u64 si, u32 thread_index)
Definition: session.h:309
static svm_fifo_segment_private_t * svm_fifo_get_segment(u32 segment_index)
void stream_session_disconnect_notify(transport_connection_t *tc)
Notification from transport that connection is being closed.
Definition: session.c:1026
static stream_session_t * stream_session_get_tsi(u64 ti_and_si, u32 thread_index)
Definition: session.h:295
#define pool_put_index(p, i)
Free pool element with given index.
Definition: pool.h:255
#define ASSERT(truth)
unsigned int u32
Definition: types.h:88
unix_shared_memory_queue_t * unix_shared_memory_queue_init(int nels, int elsize, int consumer_pid, int signal_when_queue_non_empty)
u64 size
Definition: vhost-user.h:77
static void make_v4_ss_kv(session_kv4_t *kv, ip4_address_t *lcl, ip4_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto)
Definition: transport.h:137
void application_del(application_t *app)
Definition: application.c:71
void stream_session_disconnect(stream_session_t *s)
Disconnect session and propagate to transport.
Definition: session.c:1193
u32 application_get_index(application_t *app)
Definition: application.c:183
void stream_session_reset_notify(transport_connection_t *tc)
Notify application that connection has been reset.
Definition: session.c:1114
static void vlib_node_set_state(vlib_main_t *vm, u32 node_index, vlib_node_state_t new_state)
Set node dispatch state.
Definition: node_funcs.h:146
u32 server_session_index
Definition: svm_fifo.h:55
u8 server_thread_index
Definition: svm_fifo.h:57
stream_session_t * stream_session_lookup6(ip6_address_t *lcl, ip6_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto, u32 my_thread_index)
Definition: session.c:246
vlib_node_registration_t session_queue_node
(constructor) VLIB_REGISTER_NODE (session_queue_node)
Definition: node.c:32
void session_register_transport(u8 type, const transport_proto_vft_t *vft)
Definition: session.c:1222
int svm_fifo_segment_create(svm_fifo_segment_create_args_t *a)
(master) create an svm fifo segment
#define HALF_OPEN_LOOKUP_INVALID_VALUE
Definition: session.h:24
transport_proto_vft_t * session_get_transport_vft(u8 type)
Definition: session.c:1236
template key/value backing page structure
Definition: bihash_doc.h:44
static int stream_session_table_del(session_manager_main_t *smm, stream_session_t *s)
Definition: session.c:138
static int stream_session_enqueue_notify(stream_session_t *s, u8 block)
Notify session peer that new data has been enqueued.
Definition: session.c:794
void svm_fifo_segment_free_fifo(svm_fifo_segment_private_t *s, svm_fifo_t *f)
int connect_server_add_segment_cb(application_t *ss, char *segment_name, u32 segment_size)
Definition: session.c:939
unsigned short u16
Definition: types.h:57
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
unsigned char u8
Definition: types.h:56
int svm_fifo_peek(svm_fifo_t *f, int pid, u32 offset, u32 max_bytes, u8 *copy_here)
Definition: svm_fifo.c:491
application_t * application_get(u32 index)
Definition: application.c:168
void stream_session_stop_listen(u32 server_index)
Definition: session.c:920
int stream_session_enqueue_data(transport_connection_t *tc, u8 *data, u16 len, u8 queue_event)
Definition: session.c:721
static vlib_thread_main_t * vlib_get_thread_main()
Definition: global_funcs.h:32
stream_session_t * stream_session_lookup_listener(ip46_address_t *lcl, u16 lcl_port, u8 proto)
Definition: session.c:264
vhost_vring_addr_t addr
Definition: vhost-user.h:84
static void stream_session_half_open_table_add(u8 sst, transport_connection_t *tc, u64 value)
Definition: session.c:79
transport_connection_t * stream_session_lookup_transport6(ip6_address_t *lcl, ip6_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto, u32 my_thread_index)
Definition: session.c:350
int application_api_queue_is_full(application_t *app)
Definition: application.c:30
#define SESSION_EVT_DBG(_s, _evt, _body)
Definition: session_debug.h:76
volatile svm_fifo_t ** fifos
application_t * application_get_if_valid(u32 index)
Definition: application.c:174
u8 stream_session_no_space(transport_connection_t *tc, u32 thread_index, u16 data_len)
Check if we have space in rx fifo to push more bytes.
Definition: session.c:756
void svm_fifo_segment_delete(svm_fifo_segment_private_t *s)
struct _unix_shared_memory_queue unix_shared_memory_queue_t
static int stream_session_table_del_for_tc(session_manager_main_t *smm, u8 sst, transport_connection_t *tc)
Definition: session.c:109