FD.io VPP  v18.01.1-37-g7ea3975
Vector Packet Processing
session.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19 
20 #include <vnet/session/session.h>
23 #include <vlibmemory/api.h>
24 #include <vnet/dpo/load_balance.h>
25 #include <vnet/fib/ip4_fib.h>
26 
29 
30 static void
32  u32 thread_index, void *fp, void *rpc_args)
33 {
34  u32 tries = 0;
35  session_fifo_event_t evt = { {0}, };
37 
38  evt.event_type = evt_type;
39  if (evt_type == FIFO_EVENT_RPC)
40  {
41  evt.rpc_args.fp = fp;
42  evt.rpc_args.arg = rpc_args;
43  }
44  else
45  evt.session_handle = session_handle;
46 
47  q = session_manager_get_vpp_event_queue (thread_index);
48  while (unix_shared_memory_queue_add (q, (u8 *) & evt, 1))
49  {
50  if (tries++ == 3)
51  {
52  SESSION_DBG ("failed to enqueue evt");
53  break;
54  }
55  }
56 }
57 
58 void
60  fifo_event_type_t evt_type,
61  u32 thread_index)
62 {
63  session_send_evt_to_thread (session_handle, evt_type, thread_index, 0, 0);
64 }
65 
66 void
67 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
68 {
69  if (thread_index != vlib_get_thread_index ())
70  session_send_evt_to_thread (0, FIFO_EVENT_RPC, thread_index, fp,
71  rpc_args);
72  else
73  {
74  void (*fnp) (void *) = fp;
75  fnp (rpc_args);
76  }
77 }
78 
80 session_alloc (u32 thread_index)
81 {
84  u8 will_expand = 0;
85  pool_get_aligned_will_expand (smm->sessions[thread_index], will_expand,
87  /* If we have peekers, let them finish */
88  if (PREDICT_FALSE (will_expand))
89  {
90  clib_spinlock_lock_if_init (&smm->peekers_write_locks[thread_index]);
91  pool_get_aligned (session_manager_main.sessions[thread_index], s,
93  clib_spinlock_unlock_if_init (&smm->peekers_write_locks[thread_index]);
94  }
95  else
96  {
97  pool_get_aligned (session_manager_main.sessions[thread_index], s,
99  }
100  memset (s, 0, sizeof (*s));
101  s->session_index = s - session_manager_main.sessions[thread_index];
102  s->thread_index = thread_index;
103  return s;
104 }
105 
106 static void
108 {
109  pool_put (session_manager_main.sessions[s->thread_index], s);
110  if (CLIB_DEBUG)
111  memset (s, 0xFA, sizeof (*s));
112 }
113 
114 static int
116 {
117  svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
118  u32 fifo_segment_index;
119  int rv;
120 
121  if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
122  &server_tx_fifo,
123  &fifo_segment_index)))
124  return rv;
125  /* Initialize backpointers */
126  server_rx_fifo->master_session_index = s->session_index;
127  server_rx_fifo->master_thread_index = s->thread_index;
128 
129  server_tx_fifo->master_session_index = s->session_index;
130  server_tx_fifo->master_thread_index = s->thread_index;
131 
132  s->server_rx_fifo = server_rx_fifo;
133  s->server_tx_fifo = server_tx_fifo;
134  s->svm_segment_index = fifo_segment_index;
135  return 0;
136 }
137 
138 static stream_session_t *
140 {
141  stream_session_t *s;
142  u32 thread_index = tc->thread_index;
143 
144  ASSERT (thread_index == vlib_get_thread_index ());
145 
146  s = session_alloc (thread_index);
147  s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
148  s->session_state = SESSION_STATE_CONNECTING;
149  s->enqueue_epoch = ~0;
150 
151  /* Attach transport to session and vice versa */
152  s->connection_index = tc->c_index;
153  tc->s_index = s->session_index;
154  return s;
155 }
156 
157 static int
159  u8 alloc_fifos, stream_session_t ** ret_s)
160 {
161  stream_session_t *s;
162  int rv;
163 
165  if (alloc_fifos && (rv = session_alloc_fifos (sm, s)))
166  {
167  session_free (s);
168  *ret_s = 0;
169  return rv;
170  }
171 
172  /* Add to the main lookup table */
174 
175  *ret_s = s;
176  return 0;
177 }
178 
179 /**
180  * Discards bytes from buffer chain
181  *
182  * It discards n_bytes_to_drop starting at first buffer after chain_b
183  */
184 always_inline void
186  vlib_buffer_t ** chain_b,
187  u32 n_bytes_to_drop)
188 {
189  vlib_buffer_t *next = *chain_b;
190  u32 to_drop = n_bytes_to_drop;
192  while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
193  {
194  next = vlib_get_buffer (vm, next->next_buffer);
195  if (next->current_length > to_drop)
196  {
197  vlib_buffer_advance (next, to_drop);
198  to_drop = 0;
199  }
200  else
201  {
202  to_drop -= next->current_length;
203  next->current_length = 0;
204  }
205  }
206  *chain_b = next;
207 
208  if (to_drop == 0)
209  b->total_length_not_including_first_buffer -= n_bytes_to_drop;
210 }
211 
212 /**
213  * Enqueue buffer chain tail
214  */
215 always_inline int
217  u32 offset, u8 is_in_order)
218 {
219  vlib_buffer_t *chain_b;
220  u32 chain_bi, len, diff;
222  u8 *data;
223  u32 written = 0;
224  int rv = 0;
225 
226  if (is_in_order && offset)
227  {
228  diff = offset - b->current_length;
230  return 0;
231  chain_b = b;
232  session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
233  chain_bi = vlib_get_buffer_index (vm, chain_b);
234  }
235  else
236  chain_bi = b->next_buffer;
237 
238  do
239  {
240  chain_b = vlib_get_buffer (vm, chain_bi);
241  data = vlib_buffer_get_current (chain_b);
242  len = chain_b->current_length;
243  if (!len)
244  continue;
245  if (is_in_order)
246  {
247  rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
248  if (rv == len)
249  {
250  written += rv;
251  }
252  else if (rv < len)
253  {
254  return (rv > 0) ? (written + rv) : written;
255  }
256  else if (rv > len)
257  {
258  written += rv;
259 
260  /* written more than what was left in chain */
262  return written;
263 
264  /* drop the bytes that have already been delivered */
265  session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
266  }
267  }
268  else
269  {
270  rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
271  data);
272  if (rv)
273  {
274  clib_warning ("failed to enqueue multi-buffer seg");
275  return -1;
276  }
277  offset += len;
278  }
279  }
280  while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
281  ? chain_b->next_buffer : 0));
282 
283  if (is_in_order)
284  return written;
285 
286  return 0;
287 }
288 
289 /*
290  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
291  * event but on request can queue notification events for later delivery by
292  * calling stream_server_flush_enqueue_events().
293  *
294  * @param tc Transport connection which is to be enqueued data
295  * @param b Buffer to be enqueued
296  * @param offset Offset at which to start enqueueing if out-of-order
297  * @param queue_event Flag to indicate if peer is to be notified or if event
298  * is to be queued. The former is useful when more data is
299  * enqueued and only one event is to be generated.
300  * @param is_in_order Flag to indicate if data is in order
301  * @return Number of bytes enqueued or a negative value if enqueueing failed.
302  */
303 int
305  vlib_buffer_t * b, u32 offset,
306  u8 queue_event, u8 is_in_order)
307 {
308  stream_session_t *s;
309  int enqueued = 0, rv, in_order_off;
310 
311  s = session_get (tc->s_index, tc->thread_index);
312 
313  if (is_in_order)
314  {
315  enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo,
316  b->current_length,
319  && enqueued >= 0))
320  {
321  in_order_off = enqueued > b->current_length ? enqueued : 0;
322  rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
323  if (rv > 0)
324  enqueued += rv;
325  }
326  }
327  else
328  {
329  rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset,
330  b->current_length,
332  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
333  session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
334  /* if something was enqueued, report even this as success for ooo
335  * segment handling */
336  return rv;
337  }
338 
339  if (queue_event)
340  {
341  /* Queue RX event on this fifo. Eventually these will need to be flushed
342  * by calling stream_server_flush_enqueue_events () */
344  u32 thread_index = s->thread_index;
345  u32 enqueue_epoch = smm->current_enqueue_epoch[tc->proto][thread_index];
346 
347  if (s->enqueue_epoch != enqueue_epoch)
348  {
349  s->enqueue_epoch = enqueue_epoch;
350  vec_add1 (smm->session_to_enqueue[tc->proto][thread_index],
351  s - smm->sessions[thread_index]);
352  }
353  }
354 
355  return enqueued;
356 }
357 
358 int
360  u8 proto, u8 queue_event)
361 {
362  int enqueued = 0, rv, in_order_off;
363 
364  if (svm_fifo_max_enqueue (s->server_rx_fifo) < b->current_length)
365  return -1;
366  enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
368  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
369  {
370  in_order_off = enqueued > b->current_length ? enqueued : 0;
371  rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
372  if (rv > 0)
373  enqueued += rv;
374  }
375  if (queue_event)
376  {
377  /* Queue RX event on this fifo. Eventually these will need to be flushed
378  * by calling stream_server_flush_enqueue_events () */
380  u32 thread_index = s->thread_index;
381  u32 enqueue_epoch = smm->current_enqueue_epoch[proto][thread_index];
382 
383  if (s->enqueue_epoch != enqueue_epoch)
384  {
385  s->enqueue_epoch = enqueue_epoch;
386  vec_add1 (smm->session_to_enqueue[proto][thread_index],
387  s - smm->sessions[thread_index]);
388  }
389  }
390  return enqueued;
391 }
392 
393 /** Check if we have space in rx fifo to push more bytes */
394 u8
396  u16 data_len)
397 {
398  stream_session_t *s = session_get (tc->s_index, thread_index);
399 
400  if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
401  return 1;
402 
403  if (data_len > svm_fifo_max_enqueue (s->server_rx_fifo))
404  return 1;
405 
406  return 0;
407 }
408 
409 u32
411 {
412  stream_session_t *s = session_get (tc->s_index, tc->thread_index);
413  if (!s->server_tx_fifo)
414  return 0;
415  return svm_fifo_max_dequeue (s->server_tx_fifo);
416 }
417 
418 int
420  u32 offset, u32 max_bytes)
421 {
422  stream_session_t *s = session_get (tc->s_index, tc->thread_index);
423  return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
424 }
425 
426 u32
428 {
429  stream_session_t *s = session_get (tc->s_index, tc->thread_index);
430  return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
431 }
432 
433 /**
434  * Notify session peer that new data has been enqueued.
435  *
436  * @param s Stream session for which the event is to be generated.
437  * @param block Flag to indicate if call should block if event queue is full.
438  *
439  * @return 0 on succes or negative number if failed to send notification.
440  */
441 static int
443 {
444  application_t *app;
445  session_fifo_event_t evt;
447 
448  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
449  {
450  /* Session is closed so app will never clean up. Flush rx fifo */
451  u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
452  if (to_dequeue)
453  svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
454  return 0;
455  }
456 
457  /* Get session's server */
458  app = application_get_if_valid (s->app_index);
459 
460  if (PREDICT_FALSE (app == 0))
461  {
462  clib_warning ("invalid s->app_index = %d", s->app_index);
463  return 0;
464  }
465 
466  /* Built-in server? Hand event to the callback... */
467  if (app->cb_fns.builtin_server_rx_callback)
468  return app->cb_fns.builtin_server_rx_callback (s);
469 
470  /* If no event, send one */
471  if (svm_fifo_set_event (s->server_rx_fifo))
472  {
473  /* Fabricate event */
474  evt.fifo = s->server_rx_fifo;
475  evt.event_type = FIFO_EVENT_APP_RX;
476 
477  /* Add event to server's event queue */
478  q = app->event_queue;
479 
480  /* Based on request block (or not) for lack of space */
481  if (block || PREDICT_TRUE (q->cursize < q->maxsize))
482  unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
483  0 /* do wait for mutex */ );
484  else
485  {
486  clib_warning ("fifo full");
487  return -1;
488  }
489  }
490 
491  /* *INDENT-OFF* */
492  SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
493  ed->data[0] = evt.event_type;
494  ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
495  }));
496  /* *INDENT-ON* */
497 
498  return 0;
499 }
500 
501 /**
502  * Flushes queue of sessions that are to be notified of new data
503  * enqueued events.
504  *
505  * @param thread_index Thread index for which the flush is to be performed.
506  * @return 0 on success or a positive number indicating the number of
507  * failures due to API queue being full.
508  */
509 int
510 session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
511 {
513  u32 *indices;
514  stream_session_t *s;
515  int i, errors = 0;
516 
517  indices = smm->session_to_enqueue[transport_proto][thread_index];
518 
519  for (i = 0; i < vec_len (indices); i++)
520  {
521  s = session_get_if_valid (indices[i], thread_index);
522  if (s == 0 || session_enqueue_notify (s, 0 /* don't block */ ))
523  errors++;
524  }
525 
526  vec_reset_length (indices);
527  smm->session_to_enqueue[transport_proto][thread_index] = indices;
528  smm->current_enqueue_epoch[transport_proto][thread_index]++;
529 
530  return errors;
531 }
532 
533 /**
534  * Init fifo tail and head pointers
535  *
536  * Useful if transport uses absolute offsets for tracking ooo segments.
537  */
538 void
540  u32 rx_pointer, u32 tx_pointer)
541 {
542  stream_session_t *s;
543  s = session_get (tc->s_index, tc->thread_index);
544  svm_fifo_init_pointers (s->server_rx_fifo, rx_pointer);
545  svm_fifo_init_pointers (s->server_tx_fifo, tx_pointer);
546 }
547 
548 int
550 {
551  application_t *app;
552  stream_session_t *new_s = 0;
553  u64 handle;
554  u32 opaque = 0;
555  int error = 0;
556  segment_manager_t *sm;
557  u8 alloc_fifos;
558 
559  /*
560  * Find connection handle and cleanup half-open table
561  */
562  handle = session_lookup_half_open_handle (tc);
563  if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
564  {
565  SESSION_DBG ("half-open was removed!");
566  return -1;
567  }
569 
570  /* Get the app's index from the handle we stored when opening connection
571  * and the opaque (api_context for external apps) from transport session
572  * index */
573  app = application_get_if_valid (handle >> 32);
574  if (!app)
575  return -1;
576  opaque = tc->s_index;
577 
578  /*
579  * Allocate new session with fifos (svm segments are allocated if needed)
580  */
581  if (!is_fail)
582  {
584  alloc_fifos = !application_is_builtin_proxy (app);
585  if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s))
586  {
587  is_fail = 1;
588  error = -1;
589  }
590  else
591  new_s->app_index = app->index;
592  }
593 
594  /*
595  * Notify client application
596  */
597  if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
598  is_fail))
599  {
600  SESSION_DBG ("failed to notify app");
601  if (!is_fail)
603  }
604  else
605  {
606  if (!is_fail)
607  new_s->session_state = SESSION_STATE_READY;
608  }
609 
610  return error;
611 }
612 
613 typedef struct _session_switch_pool_args
614 {
615  u32 session_index;
616  u32 thread_index;
617  u32 new_thread_index;
618  u32 new_session_index;
620 
621 static void
622 session_switch_pool (void *cb_args)
623 {
626  stream_session_t *s;
627  ASSERT (args->thread_index == vlib_get_thread_index ());
628  s = session_get (args->session_index, args->thread_index);
629  s->server_tx_fifo->master_session_index = args->new_session_index;
630  s->server_tx_fifo->master_thread_index = args->new_thread_index;
632  tp_vfts[tp].cleanup (s->connection_index, s->thread_index);
633  session_free (s);
634  clib_mem_free (cb_args);
635 }
636 
637 /**
638  * Move dgram session to the right thread
639  */
640 int
642  u32 old_thread_index,
643  stream_session_t ** new_session)
644 {
645  stream_session_t *new_s;
646  session_switch_pool_args_t *rpc_args;
647 
648  /*
649  * Clone half-open session to the right thread.
650  */
651  new_s = session_clone_safe (tc->s_index, old_thread_index);
652  new_s->connection_index = tc->c_index;
653  new_s->server_rx_fifo->master_session_index = new_s->session_index;
654  new_s->server_rx_fifo->master_thread_index = new_s->thread_index;
655  new_s->session_state = SESSION_STATE_READY;
657 
658  /*
659  * Ask thread owning the old session to clean it up and make us the tx
660  * fifo owner
661  */
662  rpc_args = clib_mem_alloc (sizeof (*rpc_args));
663  rpc_args->new_session_index = new_s->session_index;
664  rpc_args->new_thread_index = new_s->thread_index;
665  rpc_args->session_index = tc->s_index;
666  rpc_args->thread_index = old_thread_index;
667  session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
668  rpc_args);
669 
670  tc->s_index = new_s->session_index;
671  new_s->connection_index = tc->c_index;
672  *new_session = new_s;
673  return 0;
674 }
675 
676 void
678 {
679  application_t *server;
680  stream_session_t *s;
681 
682  s = session_get (tc->s_index, tc->thread_index);
683  server = application_get (s->app_index);
684  server->cb_fns.session_accept_callback (s);
685 }
686 
687 /**
688  * Notification from transport that connection is being closed.
689  *
690  * A disconnect is sent to application but state is not removed. Once
691  * disconnect is acknowledged by application, session disconnect is called.
692  * Ultimately this leads to close being called on transport (passive close).
693  */
694 void
696 {
697  application_t *server;
698  stream_session_t *s;
699 
700  s = session_get (tc->s_index, tc->thread_index);
701  server = application_get (s->app_index);
702  server->cb_fns.session_disconnect_callback (s);
703 }
704 
705 /**
706  * Cleans up session and lookup table.
707  */
708 void
710 {
711  int rv;
712 
713  /* Delete from the main lookup table. */
714  if ((rv = session_lookup_del_session (s)))
715  clib_warning ("hash delete error, rv %d", rv);
716 
717  /* Cleanup fifo segments */
718  segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
719  s->server_tx_fifo);
720  session_free (s);
721 }
722 
723 /**
724  * Notification from transport that connection is being deleted
725  *
726  * This removes the session if it is still valid. It should be called only on
727  * previously fully established sessions. For instance failed connects should
728  * call stream_session_connect_notify and indicate that the connect has
729  * failed.
730  */
731 void
733 {
734  stream_session_t *s;
735 
736  /* App might've been removed already */
737  s = session_get_if_valid (tc->s_index, tc->thread_index);
738  if (!s)
739  return;
741 }
742 
743 /**
744  * Notify application that connection has been reset.
745  */
746 void
748 {
749  stream_session_t *s;
750  application_t *app;
751  s = session_get (tc->s_index, tc->thread_index);
752 
753  app = application_get (s->app_index);
754  app->cb_fns.session_reset_callback (s);
755 }
756 
757 /**
758  * Accept a stream session. Optionally ping the server by callback.
759  */
760 int
762  u8 notify)
763 {
764  application_t *server;
765  stream_session_t *s, *listener;
766  segment_manager_t *sm;
767  session_type_t sst;
768  int rv;
769 
770  sst = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
771 
772  /* Find the server */
773  listener = listen_session_get (sst, listener_index);
774  server = application_get (listener->app_index);
775 
776  sm = application_get_listen_segment_manager (server, listener);
777  if ((rv = session_alloc_and_init (sm, tc, 1, &s)))
778  return rv;
779 
780  s->app_index = server->index;
781  s->listener_index = listener_index;
782  s->session_state = SESSION_STATE_ACCEPTING;
783 
784  /* Shoulder-tap the server */
785  if (notify)
786  {
787  server->cb_fns.session_accept_callback (s);
788  }
789 
790  return 0;
791 }
792 
793 /**
794  * Ask transport to open connection to remote transport endpoint.
795  *
796  * Stores handle for matching request with reply since the call can be
797  * asynchronous. For instance, for TCP the 3-way handshake must complete
798  * before reply comes. Session is only created once connection is established.
799  *
800  * @param app_index Index of the application requesting the connect
801  * @param st Session type requested.
802  * @param tep Remote transport endpoint
803  * @param opaque Opaque data (typically, api_context) the application expects
804  * on open completion.
805  */
806 int
808 {
811  segment_manager_t *sm;
812  stream_session_t *s;
813  application_t *app;
814  int rv;
815  u64 handle;
816 
817  tep = session_endpoint_to_transport (rmt);
818  rv = tp_vfts[rmt->transport_proto].open (tep);
819  if (rv < 0)
820  {
821  SESSION_DBG ("Transport failed to open connection.");
822  return VNET_API_ERROR_SESSION_CONNECT;
823  }
824 
825  tc = tp_vfts[rmt->transport_proto].get_half_open ((u32) rv);
826 
827  /* If transport offers a stream service, only allocate session once the
828  * connection has been established.
829  */
830  if (transport_is_stream (rmt->transport_proto))
831  {
832  /* Add connection to half-open table and save app and tc index. The
833  * latter is needed to help establish the connection while the former
834  * is needed when the connect notify comes and we have to notify the
835  * external app
836  */
837  handle = (((u64) app_index) << 32) | (u64) tc->c_index;
838  session_lookup_add_half_open (tc, handle);
839 
840  /* Store api_context (opaque) for when the reply comes. Not the nicest
841  * thing but better than allocating a separate half-open pool.
842  */
843  tc->s_index = opaque;
844  }
845  /* For dgram type of service, allocate session and fifos now.
846  */
847  else
848  {
849  app = application_get (app_index);
851 
852  if (session_alloc_and_init (sm, tc, 1, &s))
853  return -1;
854  s->app_index = app->index;
855  s->session_state = SESSION_STATE_CONNECTING_READY;
856 
857  /* Tell the app about the new event fifo for this session */
858  app->cb_fns.session_connected_callback (app->index, opaque, s, 0);
859  }
860  return 0;
861 }
862 
863 /**
864  * Ask transport to listen on local transport endpoint.
865  *
866  * @param s Session for which listen will be called. Note that unlike
867  * established sessions, listen sessions are not associated to a
868  * thread.
869  * @param tep Local endpoint to be listened on.
870  */
871 int
873 {
875  u32 tci;
876 
877  /* Transport bind/listen */
878  tci = tp_vfts[sep->transport_proto].bind (s->session_index,
880  (sep));
881 
882  if (tci == (u32) ~ 0)
883  return -1;
884 
885  /* Attach transport to session */
886  s->connection_index = tci;
887  tc = tp_vfts[sep->transport_proto].get_listener (tci);
888 
889  /* Weird but handle it ... */
890  if (tc == 0)
891  return -1;
892 
893  /* Add to the main lookup table */
894  session_lookup_add_connection (tc, s->session_index);
895  return 0;
896 }
897 
898 /**
899  * Ask transport to stop listening on local transport endpoint.
900  *
901  * @param s Session to stop listening on. It must be in state LISTENING.
902  */
903 int
905 {
908  if (s->session_state != SESSION_STATE_LISTENING)
909  {
910  clib_warning ("not a listening session");
911  return -1;
912  }
913 
914  tc = tp_vfts[tp].get_listener (s->connection_index);
915  if (!tc)
916  {
917  clib_warning ("no transport");
918  return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
919  }
920 
922  tp_vfts[tp].unbind (s->connection_index);
923  return 0;
924 }
925 
926 /**
927  * Disconnect session and propagate to transport. This should eventually
928  * result in a delete notification that allows us to cleanup session state.
929  * Called for both active/passive disconnects.
930  *
931  * Should be called from the session's thread.
932  */
933 void
935 {
936  s->session_state = SESSION_STATE_CLOSED;
937  tp_vfts[session_get_transport_proto (s)].close (s->connection_index,
938  s->thread_index);
939 }
940 
941 /**
942  * Cleanup transport and session state.
943  *
944  * Notify transport of the cleanup, wait for a delete notify to actually
945  * remove the session state.
946  */
947 void
949 {
950  int rv;
951 
952  s->session_state = SESSION_STATE_CLOSED;
953 
954  /* Delete from the main lookup table to avoid more enqueues */
956  if (rv)
957  clib_warning ("hash delete error, rv %d", rv);
958 
959  tp_vfts[session_get_transport_proto (s)].cleanup (s->connection_index,
960  s->thread_index);
961 }
962 
963 /**
964  * Allocate vpp event queue (once) per worker thread
965  */
966 void
968  u32 thread_index)
969 {
970  api_main_t *am = &api_main;
971  void *oldheap;
972  u32 event_queue_length = 2048;
973 
974  if (smm->vpp_event_queues[thread_index] == 0)
975  {
976  /* Allocate event fifo in the /vpe-api shared-memory segment */
977  oldheap = svm_push_data_heap (am->vlib_rp);
978 
979  if (smm->configured_event_queue_length)
980  event_queue_length = smm->configured_event_queue_length;
981 
982  smm->vpp_event_queues[thread_index] =
984  (event_queue_length,
985  sizeof (session_fifo_event_t), 0 /* consumer pid */ ,
986  0 /* (do not) send signal when queue non-empty */ );
987 
988  svm_pop_heap (oldheap);
989  }
990 }
991 
992 /**
993  * Initialize session layer for given transport proto and ip version
994  *
995  * Allocates per session type (transport proto + ip version) data structures
996  * and adds arc from session queue node to session type output node.
997  */
998 void
1000  const transport_proto_vft_t * vft, u8 is_ip4,
1001  u32 output_node)
1002 {
1004  session_type_t session_type;
1005  u32 next_index = ~0;
1006 
1007  session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1008 
1009  vec_validate (smm->session_type_to_next, session_type);
1010  vec_validate (smm->listen_sessions, session_type);
1011  vec_validate (smm->session_tx_fns, session_type);
1012 
1013  /* *INDENT-OFF* */
1014  foreach_vlib_main (({
1015  next_index = vlib_node_add_next (this_vlib_main, session_queue_node.index,
1016  output_node);
1017  }));
1018  /* *INDENT-ON* */
1019 
1020  smm->session_type_to_next[session_type] = next_index;
1022  vft->tx_fifo_offset != 0);
1023 }
1024 
1027 {
1028  transport_proto_t tp;
1029  if (s->session_state != SESSION_STATE_LISTENING)
1030  {
1031  tp = session_get_transport_proto (s);
1032  return tp_vfts[tp].get_connection (s->connection_index,
1033  s->thread_index);
1034  }
1035  return 0;
1036 }
1037 
1040 {
1042  return tp_vfts[tp].get_listener (s->connection_index);
1043 }
1044 
1045 int
1047  session_endpoint_t * sep)
1048 {
1051  tc = tp_vfts[tp].get_listener (listener->connection_index);
1052  if (!tc)
1053  {
1054  clib_warning ("no transport");
1055  return -1;
1056  }
1057 
1058  /* N.B. The ip should not be copied because this is the local endpoint */
1059  sep->port = tc->lcl_port;
1060  sep->transport_proto = tc->proto;
1061  sep->is_ip4 = tc->is_ip4;
1062  return 0;
1063 }
1064 
1065 static clib_error_t *
1067 {
1070  u32 num_threads;
1071  u32 preallocated_sessions_per_worker;
1072  int i, j;
1073 
1074  num_threads = 1 /* main thread */ + vtm->n_threads;
1075 
1076  if (num_threads < 1)
1077  return clib_error_return (0, "n_thread_stacks not set");
1078 
1079  /* $$$ config parameters */
1080  svm_fifo_segment_init (0x200000000ULL /* first segment base VA */ ,
1081  20 /* timeout in seconds */ );
1082 
1083  /* configure per-thread ** vectors */
1084  vec_validate (smm->sessions, num_threads - 1);
1085  vec_validate (smm->tx_buffers, num_threads - 1);
1086  vec_validate (smm->pending_event_vector, num_threads - 1);
1087  vec_validate (smm->pending_disconnects, num_threads - 1);
1088  vec_validate (smm->free_event_vector, num_threads - 1);
1089  vec_validate (smm->vpp_event_queues, num_threads - 1);
1090  vec_validate (smm->session_peekers, num_threads - 1);
1091  vec_validate (smm->peekers_readers_locks, num_threads - 1);
1092  vec_validate (smm->peekers_write_locks, num_threads - 1);
1093 
1094  for (i = 0; i < TRANSPORT_N_PROTO; i++)
1095  for (j = 0; j < num_threads; j++)
1096  {
1097  vec_validate (smm->session_to_enqueue[i], num_threads - 1);
1098  vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
1099  }
1100 
1101  for (i = 0; i < num_threads; i++)
1102  {
1103  vec_validate (smm->free_event_vector[i], 0);
1104  _vec_len (smm->free_event_vector[i]) = 0;
1105  vec_validate (smm->pending_event_vector[i], 0);
1106  _vec_len (smm->pending_event_vector[i]) = 0;
1107  vec_validate (smm->pending_disconnects[i], 0);
1108  _vec_len (smm->pending_disconnects[i]) = 0;
1109  if (num_threads > 1)
1110  {
1111  clib_spinlock_init (&smm->peekers_readers_locks[i]);
1112  clib_spinlock_init (&smm->peekers_write_locks[i]);
1113  }
1114  }
1115 
1116 #if SESSION_DBG
1117  vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
1118 #endif
1119 
1120  /* Allocate vpp event queues */
1121  for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
1123 
1124  /* Preallocate sessions */
1125  if (smm->preallocated_sessions)
1126  {
1127  if (num_threads == 1)
1128  {
1129  pool_init_fixed (smm->sessions[0], smm->preallocated_sessions);
1130  }
1131  else
1132  {
1133  int j;
1134  preallocated_sessions_per_worker =
1135  (1.1 * (f64) smm->preallocated_sessions /
1136  (f64) (num_threads - 1));
1137 
1138  for (j = 1; j < num_threads; j++)
1139  {
1140  pool_init_fixed (smm->sessions[j],
1141  preallocated_sessions_per_worker);
1142  }
1143  }
1144  }
1145 
1148  transport_init ();
1149 
1150  smm->is_enabled = 1;
1151 
1152  /* Enable transports */
1153  transport_enable_disable (vm, 1);
1154 
1155  return 0;
1156 }
1157 
1158 void
1160 {
1161  u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1162  /* *INDENT-OFF* */
1163  foreach_vlib_main (({
1164  vlib_node_set_state (this_vlib_main, session_queue_node.index,
1165  state);
1166  }));
1167  /* *INDENT-ON* */
1168 }
1169 
1170 clib_error_t *
1172 {
1173  clib_error_t *error = 0;
1174  if (is_en)
1175  {
1176  if (session_manager_main.is_enabled)
1177  return 0;
1178 
1180  error = session_manager_main_enable (vm);
1181  }
1182  else
1183  {
1184  session_manager_main.is_enabled = 0;
1186  }
1187 
1188  return error;
1189 }
1190 
1191 clib_error_t *
1193 {
1195  smm->is_enabled = 0;
1196  return 0;
1197 }
1198 
1200 
1201 static clib_error_t *
1203 {
1205  u32 nitems;
1206  uword tmp;
1207 
1208  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1209  {
1210  if (unformat (input, "event-queue-length %d", &nitems))
1211  {
1212  if (nitems >= 2048)
1213  smm->configured_event_queue_length = nitems;
1214  else
1215  clib_warning ("event queue length %d too small, ignored", nitems);
1216  }
1217  else if (unformat (input, "preallocated-sessions %d",
1218  &smm->preallocated_sessions))
1219  ;
1220  else if (unformat (input, "v4-session-table-buckets %d",
1221  &smm->configured_v4_session_table_buckets))
1222  ;
1223  else if (unformat (input, "v4-halfopen-table-buckets %d",
1224  &smm->configured_v4_halfopen_table_buckets))
1225  ;
1226  else if (unformat (input, "v6-session-table-buckets %d",
1227  &smm->configured_v6_session_table_buckets))
1228  ;
1229  else if (unformat (input, "v6-halfopen-table-buckets %d",
1230  &smm->configured_v6_halfopen_table_buckets))
1231  ;
1232  else if (unformat (input, "v4-session-table-memory %U",
1233  unformat_memory_size, &tmp))
1234  {
1235  if (tmp >= 0x100000000)
1236  return clib_error_return (0, "memory size %llx (%lld) too large",
1237  tmp, tmp);
1238  smm->configured_v4_session_table_memory = tmp;
1239  }
1240  else if (unformat (input, "v4-halfopen-table-memory %U",
1241  unformat_memory_size, &tmp))
1242  {
1243  if (tmp >= 0x100000000)
1244  return clib_error_return (0, "memory size %llx (%lld) too large",
1245  tmp, tmp);
1246  smm->configured_v4_halfopen_table_memory = tmp;
1247  }
1248  else if (unformat (input, "v6-session-table-memory %U",
1249  unformat_memory_size, &tmp))
1250  {
1251  if (tmp >= 0x100000000)
1252  return clib_error_return (0, "memory size %llx (%lld) too large",
1253  tmp, tmp);
1254  smm->configured_v6_session_table_memory = tmp;
1255  }
1256  else if (unformat (input, "v6-halfopen-table-memory %U",
1257  unformat_memory_size, &tmp))
1258  {
1259  if (tmp >= 0x100000000)
1260  return clib_error_return (0, "memory size %llx (%lld) too large",
1261  tmp, tmp);
1262  smm->configured_v6_halfopen_table_memory = tmp;
1263  }
1264  else if (unformat (input, "local-endpoints-table-memory %U",
1265  unformat_memory_size, &tmp))
1266  {
1267  if (tmp >= 0x100000000)
1268  return clib_error_return (0, "memory size %llx (%lld) too large",
1269  tmp, tmp);
1270  smm->local_endpoints_table_memory = tmp;
1271  }
1272  else if (unformat (input, "local-endpoints-table-buckets %d",
1273  &smm->local_endpoints_table_buckets))
1274  ;
1275  else
1276  return clib_error_return (0, "unknown input `%U'",
1277  format_unformat_error, input);
1278  }
1279  return 0;
1280 }
1281 
1283 
1284 /*
1285  * fd.io coding-style-patch-verification: ON
1286  *
1287  * Local Variables:
1288  * eval: (c-set-style "gnu")
1289  * End:
1290  */
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:432
int session_lookup_del_connection(transport_connection_t *tc)
Delete transport connection from session table.
sll srl srl sll sra u16x4 i
Definition: vector_sse2.h:337
static void svm_pop_heap(void *oldheap)
Definition: svm.h:94
int segment_manager_alloc_session_fifos(segment_manager_t *sm, svm_fifo_t **server_rx_fifo, svm_fifo_t **server_tx_fifo, u32 *fifo_segment_index)
void svm_fifo_init_pointers(svm_fifo_t *f, u32 pointer)
Set fifo pointers to requested offset.
Definition: svm_fifo.c:827
transport_proto_vft_t * tp_vfts
Per-type vector of transport protocol virtual function tables.
Definition: transport.c:23
struct _transport_connection transport_connection_t
#define PREDICT_TRUE(x)
Definition: clib.h:106
session_manager_main_t session_manager_main
Definition: session.c:27
static_always_inline void clib_spinlock_unlock_if_init(clib_spinlock_t *p)
Definition: lock.h:85
static int session_enqueue_notify(stream_session_t *s, u8 block)
Notify session peer that new data has been enqueued.
Definition: session.c:442
#define pool_get_aligned_will_expand(P, YESNO, A)
See if pool_get will expand the pool or not.
Definition: pool.h:228
struct _transport_proto_vft transport_proto_vft_t
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
Definition: vec.h:518
void session_send_rpc_evt_to_thread(u32 thread_index, void *fp, void *rpc_args)
Definition: session.c:67
transport_connection_t * session_get_transport(stream_session_t *s)
Definition: session.c:1026
static u32 svm_fifo_max_enqueue(svm_fifo_t *f)
Definition: svm_fifo.h:106
int session_lookup_del_session(stream_session_t *s)
int session_enqueue_stream_connection(transport_connection_t *tc, vlib_buffer_t *b, u32 offset, u8 queue_event, u8 is_in_order)
Definition: session.c:304
u64 session_lookup_half_open_handle(transport_connection_t *tc)
void session_node_enable_disable(u8 is_en)
Definition: session.c:1159
vlib_node_registration_t session_queue_node
(constructor) VLIB_REGISTER_NODE (session_queue_node)
Definition: session_node.c:46
static uword vlib_node_add_next(vlib_main_t *vm, uword node, uword next_node)
Definition: node_funcs.h:1108
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
static void session_send_evt_to_thread(u64 session_handle, fifo_event_type_t evt_type, u32 thread_index, void *fp, void *rpc_args)
Definition: session.c:31
struct _svm_fifo svm_fifo_t
fifo_event_type_t
Definition: session.h:32
segment_manager_t * application_get_listen_segment_manager(application_t *app, stream_session_t *s)
Definition: application.c:450
#define VLIB_BUFFER_NEXT_PRESENT
Definition: buffer.h:95
void app_namespaces_init(void)
static clib_error_t * session_config_fn(vlib_main_t *vm, unformat_input_t *input)
Definition: session.c:1202
#define VLIB_INIT_FUNCTION(x)
Definition: init.h:111
static void * svm_push_data_heap(svm_region_t *rp)
Definition: svm.h:86
void stream_session_accept_notify(transport_connection_t *tc)
Definition: session.c:677
#define always_inline
Definition: clib.h:92
static u32 svm_fifo_max_dequeue(svm_fifo_t *f)
Definition: svm_fifo.h:100
static void session_free(stream_session_t *s)
Definition: session.c:107
static stream_session_t * session_alloc_for_connection(transport_connection_t *tc)
Definition: session.c:139
#define clib_error_return(e, args...)
Definition: error.h:99
svm_region_t * vlib_rp
Current binary api segment descriptor.
Definition: api_common.h:251
unsigned long u64
Definition: types.h:89
void stream_session_delete_notify(transport_connection_t *tc)
Notification from transport that connection is being deleted.
Definition: session.c:732
void stream_session_cleanup(stream_session_t *s)
Cleanup transport and session state.
Definition: session.c:948
void stream_session_delete(stream_session_t *s)
Cleans up session and lookup table.
Definition: session.c:709
struct _stream_session_t stream_session_t
void session_send_session_evt_to_thread(u64 session_handle, fifo_event_type_t evt_type, u32 thread_index)
Definition: session.c:59
int session_lookup_del_half_open(transport_connection_t *tc)
static u32 vlib_get_buffer_index(vlib_main_t *vm, void *p)
Translate buffer pointer into buffer index.
Definition: buffer_funcs.h:74
static clib_error_t * session_manager_main_enable(vlib_main_t *vm)
Definition: session.c:1066
static void clib_spinlock_init(clib_spinlock_t *p)
Definition: lock.h:33
#define session_endpoint_to_transport(_sep)
int unix_shared_memory_queue_add(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
static transport_proto_t session_get_transport_proto(stream_session_t *s)
Definition: session.h:283
u32 stream_session_dequeue_drop(transport_connection_t *tc, u32 max_bytes)
Definition: session.c:427
int session_open(u32 app_index, session_endpoint_t *rmt, u32 opaque)
Ask transport to open connection to remote transport endpoint.
Definition: session.c:807
int session_dgram_connect_notify(transport_connection_t *tc, u32 old_thread_index, stream_session_t **new_session)
Move dgram session to the right thread.
Definition: session.c:641
static session_manager_main_t * vnet_get_session_manager_main()
Definition: session.h:207
u16 current_length
Nbytes between current data and the end of this buffer.
Definition: buffer.h:72
struct _session_endpoint session_endpoint_t
struct _unformat_input_t unformat_input_t
static void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
Definition: buffer.h:195
#define pool_put(P, E)
Free an object E in pool P.
Definition: pool.h:271
void svm_fifo_segment_init(u64 baseva, u32 timeout_in_seconds)
#define PREDICT_FALSE(x)
Definition: clib.h:105
#define VLIB_CONFIG_FUNCTION(x, n,...)
Definition: init.h:119
clib_error_t * session_manager_main_init(vlib_main_t *vm)
Definition: session.c:1192
static stream_session_t * session_get_if_valid(u64 si, u32 thread_index)
Definition: session.h:236
struct _session_manager_main session_manager_main_t
Definition: session.h:103
#define foreach_vlib_main(body)
Definition: threads.h:244
int stream_session_accept(transport_connection_t *tc, u32 listener_index, u8 notify)
Accept a stream session.
Definition: session.c:761
clib_error_t * vnet_session_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: session.c:1171
static stream_session_t * session_get(u32 si, u32 thread_index)
Definition: session.h:229
struct _session_switch_pool_args session_switch_pool_args_t
API main structure, used by both vpp and binary API clients.
Definition: api_common.h:198
#define pool_get_aligned(P, E, A)
Allocate an object E from a pool P (general version).
Definition: pool.h:188
#define SESSION_DBG(_fmt, _args...)
static u8 svm_fifo_set_event(svm_fifo_t *f)
Sets fifo event flag.
Definition: svm_fifo.h:123
#define SESSION_EVT_DBG(_evt, _args...)
static unix_shared_memory_queue_t * session_manager_get_vpp_event_queue(u32 thread_index)
Definition: session.h:459
void transport_init(void)
Definition: transport.c:353
api_main_t api_main
Definition: api_shared.c:35
#define UNFORMAT_END_OF_INPUT
Definition: format.h:143
static_always_inline uword vlib_get_thread_index(void)
Definition: threads.h:221
static session_type_t session_type_from_proto_and_ip(transport_proto_t proto, u8 is_ip4)
Definition: session.h:289
vlib_main_t * vm
Definition: buffer.c:283
u32 stream_session_tx_fifo_max_dequeue(transport_connection_t *tc)
Definition: session.c:410
segment_manager_t * application_get_connect_segment_manager(application_t *app)
Definition: application.c:443
#define clib_warning(format, args...)
Definition: error.h:59
void stream_session_init_fifos_pointers(transport_connection_t *tc, u32 rx_pointer, u32 tx_pointer)
Init fifo tail and head pointers.
Definition: session.c:539
struct _application application_t
static void session_manager_set_transport_rx_fn(session_type_t type, u8 is_peek)
Set peek or dequeue function for given session type.
Definition: session.h:538
int svm_fifo_enqueue_nowait(svm_fifo_t *f, u32 max_bytes, u8 *copy_from_here)
Definition: svm_fifo.c:533
void transport_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: transport.c:342
#define pool_init_fixed(pool, max_elts)
initialize a fixed-size, preallocated pool
Definition: pool.h:86
static u8 transport_is_stream(u8 proto)
Definition: transport.h:112
int session_enqueue_dgram_connection(stream_session_t *s, vlib_buffer_t *b, u8 proto, u8 queue_event)
Definition: session.c:359
void stream_session_disconnect_notify(transport_connection_t *tc)
Notification from transport that connection is being closed.
Definition: session.c:695
static int session_alloc_fifos(segment_manager_t *sm, stream_session_t *s)
Definition: session.c:115
#define ASSERT(truth)
unsigned int u32
Definition: types.h:88
u8 session_type_t
unix_shared_memory_queue_t * unix_shared_memory_queue_init(int nels, int elsize, int consumer_pid, int signal_when_queue_non_empty)
vhost_vring_state_t state
Definition: vhost-user.h:82
u32 next_buffer
Next buffer for this linked-list of buffers.
Definition: buffer.h:109
static void clib_mem_free(void *p)
Definition: mem.h:179
int listen_session_get_local_session_endpoint(stream_session_t *listener, session_endpoint_t *sep)
Definition: session.c:1046
void stream_session_disconnect(stream_session_t *s)
Disconnect session and propagate to transport.
Definition: session.c:934
static void vlib_buffer_advance(vlib_buffer_t *b, word l)
Advance current data pointer by the supplied (signed!) amount.
Definition: buffer.h:208
void stream_session_reset_notify(transport_connection_t *tc)
Notify application that connection has been reset.
Definition: session.c:747
static void vlib_node_set_state(vlib_main_t *vm, u32 node_index, vlib_node_state_t new_state)
Set node dispatch state.
Definition: node_funcs.h:147
static int app_index
int svm_fifo_enqueue_with_offset(svm_fifo_t *f, u32 offset, u32 required_bytes, u8 *copy_from_here)
Definition: svm_fifo.c:608
int stream_session_stop_listen(stream_session_t *s)
Ask transport to stop listening on local transport endpoint.
Definition: session.c:904
static void * clib_mem_alloc(uword size)
Definition: mem.h:112
static vlib_main_t * vlib_get_main(void)
Definition: global_funcs.h:23
u64 uword
Definition: types.h:112
#define HALF_OPEN_LOOKUP_INVALID_VALUE
Definition: session.h:25
static void session_enqueue_discard_chain_bytes(vlib_main_t *vm, vlib_buffer_t *b, vlib_buffer_t **chain_b, u32 n_bytes_to_drop)
Discards bytes from buffer chain.
Definition: session.c:185
u32 total_length_not_including_first_buffer
Only valid for first buffer in chain.
Definition: buffer.h:142
template key/value backing page structure
Definition: bihash_doc.h:44
static u64 session_handle(stream_session_t *s)
Definition: session.h:249
static void session_switch_pool(void *cb_args)
Definition: session.c:622
int svm_fifo_dequeue_drop(svm_fifo_t *f, u32 max_bytes)
Definition: svm_fifo.c:772
unsigned short u16
Definition: types.h:57
void segment_manager_dealloc_fifos(u32 svm_segment_index, svm_fifo_t *rx_fifo, svm_fifo_t *tx_fifo)
void session_vpp_event_queue_allocate(session_manager_main_t *smm, u32 thread_index)
Allocate vpp event queue (once) per worker thread.
Definition: session.c:967
enum _transport_proto transport_proto_t
transport_connection_t * listen_session_get_transport(stream_session_t *s)
Definition: session.c:1039
void session_lookup_init(void)
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
double f64
Definition: types.h:142
static stream_session_t * listen_session_get(session_type_t type, u32 index)
Definition: session.h:506
unsigned char u8
Definition: types.h:56
static stream_session_t * session_clone_safe(u32 session_index, u32 thread_index)
Definition: session.h:390
application_t * application_get(u32 index)
Definition: application.c:297
struct _transport_endpoint transport_endpoint_t
struct _segment_manager segment_manager_t
unformat_function_t unformat_memory_size
Definition: format.h:294
int session_lookup_add_connection(transport_connection_t *tc, u64 value)
Add transport connection to a session table.
int session_manager_flush_enqueue_events(u8 transport_proto, u32 thread_index)
Flushes queue of sessions that are to be notified of new data enqueued events.
Definition: session.c:510
u8 * format_unformat_error(u8 *s, va_list *va)
Definition: unformat.c:91
int session_stream_connect_notify(transport_connection_t *tc, u8 is_fail)
Definition: session.c:549
static vlib_thread_main_t * vlib_get_thread_main()
Definition: global_funcs.h:32
static int session_enqueue_chain_tail(stream_session_t *s, vlib_buffer_t *b, u32 offset, u8 is_in_order)
Enqueue buffer chain tail.
Definition: session.c:216
int application_is_builtin_proxy(application_t *app)
Definition: application.c:472
stream_session_t * session_alloc(u32 thread_index)
Definition: session.c:80
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:67
u32 flags
buffer flags: VLIB_BUFFER_FREE_LIST_INDEX_MASK: bits used to store free list index, VLIB_BUFFER_IS_TRACED: trace this buffer.
Definition: buffer.h:75
static int session_alloc_and_init(segment_manager_t *sm, transport_connection_t *tc, u8 alloc_fifos, stream_session_t **ret_s)
Definition: session.c:158
int svm_fifo_peek(svm_fifo_t *f, u32 relative_offset, u32 max_bytes, u8 *copy_here)
Definition: svm_fifo.c:756
void session_register_transport(transport_proto_t transport_proto, const transport_proto_vft_t *vft, u8 is_ip4, u32 output_node)
Initialize session layer for given transport proto and ip version.
Definition: session.c:999
int session_lookup_add_half_open(transport_connection_t *tc, u64 value)
int stream_session_peek_bytes(transport_connection_t *tc, u8 *buffer, u32 offset, u32 max_bytes)
Definition: session.c:419
static_always_inline void clib_spinlock_lock_if_init(clib_spinlock_t *p)
Definition: lock.h:65
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
Definition: buffer_funcs.h:57
application_t * application_get_if_valid(u32 index)
Definition: application.c:305
u8 stream_session_no_space(transport_connection_t *tc, u32 thread_index, u16 data_len)
Check if we have space in rx fifo to push more bytes.
Definition: session.c:395
uword unformat(unformat_input_t *i, const char *fmt,...)
Definition: unformat.c:972
static uword unformat_check_input(unformat_input_t *i)
Definition: format.h:169
struct _unix_shared_memory_queue unix_shared_memory_queue_t
int stream_session_listen(stream_session_t *s, session_endpoint_t *sep)
Ask transport to listen on local transport endpoint.
Definition: session.c:872