FD.io VPP  v20.01-48-g3e0dafb74
Vector Packet Processing
session.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19 
20 #include <vnet/session/session.h>
23 #include <vnet/dpo/load_balance.h>
24 #include <vnet/fib/ip4_fib.h>
25 
27 
28 static inline int
29 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
30  session_evt_type_t evt_type)
31 {
32  session_event_t *evt;
33  svm_msg_q_msg_t msg;
34  svm_msg_q_t *mq;
35 
36  mq = session_main_get_vpp_event_queue (thread_index);
37  if (PREDICT_FALSE (svm_msg_q_lock (mq)))
38  return -1;
41  {
42  svm_msg_q_unlock (mq);
43  return -2;
44  }
45  switch (evt_type)
46  {
49  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
50  evt->rpc_args.fp = data;
51  evt->rpc_args.arg = args;
52  break;
53  case SESSION_IO_EVT_RX:
54  case SESSION_IO_EVT_TX:
58  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
59  evt->session_index = *(u32 *) data;
60  break;
65  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
66  evt->session_handle = session_handle ((session_t *) data);
67  break;
68  default:
69  clib_warning ("evt unhandled!");
70  svm_msg_q_unlock (mq);
71  return -1;
72  }
73  evt->event_type = evt_type;
74 
75  svm_msg_q_add_and_unlock (mq, &msg);
76  return 0;
77 }
78 
79 int
81 {
82  return session_send_evt_to_thread (&f->master_session_index, 0,
83  f->master_thread_index, evt_type);
84 }
85 
86 int
88  session_evt_type_t evt_type)
89 {
90  return session_send_evt_to_thread (data, 0, thread_index, evt_type);
91 }
92 
93 int
95 {
96  /* only events supported are disconnect and reset */
97  ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE
98  || evt_type == SESSION_CTRL_EVT_RESET);
99  return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
100 }
101 
102 void
104  void *rpc_args)
105 {
106  session_send_evt_to_thread (fp, rpc_args, thread_index,
108 }
109 
110 void
111 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
112 {
113  if (thread_index != vlib_get_thread_index ())
114  session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
115  else
116  {
117  void (*fnp) (void *) = fp;
118  fnp (rpc_args);
119  }
120 }
121 
122 void
124 {
125  session_t *s;
126 
127  s = session_get (tc->s_index, tc->thread_index);
129  ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
130  if (!(s->flags & SESSION_F_CUSTOM_TX))
131  {
132  s->flags |= SESSION_F_CUSTOM_TX;
133  if (svm_fifo_set_event (s->tx_fifo))
134  {
135  session_worker_t *wrk;
136  session_evt_elt_t *elt;
137  wrk = session_main_get_worker (tc->thread_index);
138  if (has_prio)
139  elt = session_evt_alloc_new (wrk);
140  else
141  elt = session_evt_alloc_old (wrk);
142  elt->evt.session_index = tc->s_index;
143  elt->evt.event_type = SESSION_IO_EVT_TX;
144  }
145  }
146 }
147 
148 static void
150 {
151  u32 thread_index = vlib_get_thread_index ();
152  session_evt_elt_t *elt;
153  session_worker_t *wrk;
154 
155  /* If we are in the handler thread, or being called with the worker barrier
156  * held, just append a new event to pending disconnects vector. */
157  if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
158  {
160  elt = session_evt_alloc_ctrl (wrk);
161  clib_memset (&elt->evt, 0, sizeof (session_event_t));
162  elt->evt.session_handle = session_handle (s);
163  elt->evt.event_type = evt;
164  }
165  else
167 }
168 
169 session_t *
170 session_alloc (u32 thread_index)
171 {
172  session_worker_t *wrk = &session_main.wrk[thread_index];
173  session_t *s;
174  u8 will_expand = 0;
175  pool_get_aligned_will_expand (wrk->sessions, will_expand,
177  /* If we have peekers, let them finish */
178  if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
179  {
183  }
184  else
185  {
187  }
188  clib_memset (s, 0, sizeof (*s));
189  s->session_index = s - wrk->sessions;
190  s->thread_index = thread_index;
192  return s;
193 }
194 
195 void
197 {
198  if (CLIB_DEBUG)
199  {
200  u8 thread_index = s->thread_index;
201  clib_memset (s, 0xFA, sizeof (*s));
202  pool_put (session_main.wrk[thread_index].sessions, s);
203  return;
204  }
205  SESSION_EVT (SESSION_EVT_FREE, s);
206  pool_put (session_main.wrk[s->thread_index].sessions, s);
207 }
208 
209 u8
210 session_is_valid (u32 si, u8 thread_index)
211 {
212  session_t *s;
214 
215  s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
216 
217  if (!s)
218  return 1;
219 
220  if (s->thread_index != thread_index || s->session_index != si)
221  return 0;
222 
223  if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
224  || s->session_state <= SESSION_STATE_LISTENING)
225  return 1;
226 
227  tc = session_get_transport (s);
228  if (s->connection_index != tc->c_index
229  || s->thread_index != tc->thread_index || tc->s_index != si)
230  return 0;
231 
232  return 1;
233 }
234 
235 static void
237 {
238  app_worker_t *app_wrk;
239 
240  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
241  if (!app_wrk)
242  return;
243  app_worker_cleanup_notify (app_wrk, s, ntf);
244 }
245 
246 void
248 {
251  session_free (s);
252 }
253 
254 /**
255  * Cleans up session and lookup table.
256  *
257  * Transport connection must still be valid.
258  */
259 static void
261 {
262  int rv;
263 
264  /* Delete from the main lookup table. */
265  if ((rv = session_lookup_del_session (s)))
266  clib_warning ("session %u hash delete rv %d", s->session_index, rv);
267 
269 }
270 
271 static session_t *
273 {
274  session_t *s;
275  u32 thread_index = tc->thread_index;
276 
277  ASSERT (thread_index == vlib_get_thread_index ()
278  || transport_protocol_is_cl (tc->proto));
279 
280  s = session_alloc (thread_index);
281  s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
282  s->session_state = SESSION_STATE_CLOSED;
283 
284  /* Attach transport to session and vice versa */
285  s->connection_index = tc->c_index;
286  tc->s_index = s->session_index;
287  return s;
288 }
289 
290 /**
291  * Discards bytes from buffer chain
292  *
293  * It discards n_bytes_to_drop starting at first buffer after chain_b
294  */
295 always_inline void
297  vlib_buffer_t ** chain_b,
298  u32 n_bytes_to_drop)
299 {
300  vlib_buffer_t *next = *chain_b;
301  u32 to_drop = n_bytes_to_drop;
302  ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
303  while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
304  {
305  next = vlib_get_buffer (vm, next->next_buffer);
306  if (next->current_length > to_drop)
307  {
308  vlib_buffer_advance (next, to_drop);
309  to_drop = 0;
310  }
311  else
312  {
313  to_drop -= next->current_length;
314  next->current_length = 0;
315  }
316  }
317  *chain_b = next;
318 
319  if (to_drop == 0)
320  b->total_length_not_including_first_buffer -= n_bytes_to_drop;
321 }
322 
323 /**
324  * Enqueue buffer chain tail
325  */
326 always_inline int
328  u32 offset, u8 is_in_order)
329 {
330  vlib_buffer_t *chain_b;
331  u32 chain_bi, len, diff;
333  u8 *data;
334  u32 written = 0;
335  int rv = 0;
336 
337  if (is_in_order && offset)
338  {
339  diff = offset - b->current_length;
341  return 0;
342  chain_b = b;
343  session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
344  chain_bi = vlib_get_buffer_index (vm, chain_b);
345  }
346  else
347  chain_bi = b->next_buffer;
348 
349  do
350  {
351  chain_b = vlib_get_buffer (vm, chain_bi);
352  data = vlib_buffer_get_current (chain_b);
353  len = chain_b->current_length;
354  if (!len)
355  continue;
356  if (is_in_order)
357  {
358  rv = svm_fifo_enqueue (s->rx_fifo, len, data);
359  if (rv == len)
360  {
361  written += rv;
362  }
363  else if (rv < len)
364  {
365  return (rv > 0) ? (written + rv) : written;
366  }
367  else if (rv > len)
368  {
369  written += rv;
370 
371  /* written more than what was left in chain */
373  return written;
374 
375  /* drop the bytes that have already been delivered */
376  session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
377  }
378  }
379  else
380  {
381  rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
382  if (rv)
383  {
384  clib_warning ("failed to enqueue multi-buffer seg");
385  return -1;
386  }
387  offset += len;
388  }
389  }
390  while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
391  ? chain_b->next_buffer : 0));
392 
393  if (is_in_order)
394  return written;
395 
396  return 0;
397 }
398 
399 /*
400  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
401  * event but on request can queue notification events for later delivery by
402  * calling stream_server_flush_enqueue_events().
403  *
404  * @param tc Transport connection which is to be enqueued data
405  * @param b Buffer to be enqueued
406  * @param offset Offset at which to start enqueueing if out-of-order
407  * @param queue_event Flag to indicate if peer is to be notified or if event
408  * is to be queued. The former is useful when more data is
409  * enqueued and only one event is to be generated.
410  * @param is_in_order Flag to indicate if data is in order
411  * @return Number of bytes enqueued or a negative value if enqueueing failed.
412  */
413 int
415  vlib_buffer_t * b, u32 offset,
416  u8 queue_event, u8 is_in_order)
417 {
418  session_t *s;
419  int enqueued = 0, rv, in_order_off;
420 
421  s = session_get (tc->s_index, tc->thread_index);
422 
423  if (is_in_order)
424  {
425  enqueued = svm_fifo_enqueue (s->rx_fifo,
426  b->current_length,
428  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
429  && enqueued >= 0))
430  {
431  in_order_off = enqueued > b->current_length ? enqueued : 0;
432  rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
433  if (rv > 0)
434  enqueued += rv;
435  }
436  }
437  else
438  {
439  rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
440  b->current_length,
442  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
443  session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
444  /* if something was enqueued, report even this as success for ooo
445  * segment handling */
446  return rv;
447  }
448 
449  if (queue_event)
450  {
451  /* Queue RX event on this fifo. Eventually these will need to be flushed
452  * by calling stream_server_flush_enqueue_events () */
453  session_worker_t *wrk;
454 
456  if (!(s->flags & SESSION_F_RX_EVT))
457  {
458  s->flags |= SESSION_F_RX_EVT;
459  vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
460  }
461  }
462 
463  return enqueued;
464 }
465 
466 int
468  session_dgram_hdr_t * hdr,
469  vlib_buffer_t * b, u8 proto, u8 queue_event)
470 {
471  int enqueued = 0, rv, in_order_off;
472 
474  >= b->current_length + sizeof (*hdr));
475 
476  svm_fifo_enqueue (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr);
477  enqueued = svm_fifo_enqueue (s->rx_fifo, b->current_length,
479  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
480  {
481  in_order_off = enqueued > b->current_length ? enqueued : 0;
482  rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
483  if (rv > 0)
484  enqueued += rv;
485  }
486  if (queue_event)
487  {
488  /* Queue RX event on this fifo. Eventually these will need to be flushed
489  * by calling stream_server_flush_enqueue_events () */
490  session_worker_t *wrk;
491 
493  if (!(s->flags & SESSION_F_RX_EVT))
494  {
495  s->flags |= SESSION_F_RX_EVT;
496  vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
497  }
498  }
499  return enqueued;
500 }
501 
502 int
504  u32 offset, u32 max_bytes)
505 {
506  session_t *s = session_get (tc->s_index, tc->thread_index);
507  return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
508 }
509 
510 u32
512 {
513  session_t *s = session_get (tc->s_index, tc->thread_index);
514  u32 rv;
515 
516  rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
517 
518  if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
520 
521  return rv;
522 }
523 
524 static inline int
526  svm_fifo_t * f, session_evt_type_t evt_type)
527 {
528  app_worker_t *app_wrk;
529  application_t *app;
530  int i;
531 
532  app = application_get (app_index);
533  if (!app)
534  return -1;
535 
536  for (i = 0; i < f->n_subscribers; i++)
537  {
538  app_wrk = application_get_worker (app, f->subscribers[i]);
539  if (!app_wrk)
540  continue;
541  if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
542  return -1;
543  }
544 
545  return 0;
546 }
547 
548 /**
549  * Notify session peer that new data has been enqueued.
550  *
551  * @param s Stream session for which the event is to be generated.
552  * @param lock Flag to indicate if call should lock message queue.
553  *
554  * @return 0 on success or negative number if failed to send notification.
555  */
556 static inline int
558 {
559  app_worker_t *app_wrk;
560  u32 session_index;
561  u8 n_subscribers;
562 
563  session_index = s->session_index;
564  n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
565 
566  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
567  if (PREDICT_FALSE (!app_wrk))
568  {
569  SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
570  return 0;
571  }
572 
573  SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
574 
575  s->flags &= ~SESSION_F_RX_EVT;
578  return -1;
579 
580  if (PREDICT_FALSE (n_subscribers))
581  {
582  s = session_get (session_index, vlib_get_thread_index ());
583  return session_notify_subscribers (app_wrk->app_index, s,
585  }
586 
587  return 0;
588 }
589 
590 int
592 {
594 }
595 
596 static void
598 {
599  u32 session_index = pointer_to_uword (arg);
600  session_t *s;
601 
602  s = session_get_if_valid (session_index, vlib_get_thread_index ());
603  if (!s)
604  return;
605 
607 }
608 
609 /**
610  * Like session_enqueue_notify, but can be called from a thread that does not
611  * own the session.
612  */
613 void
615 {
616  u32 thread_index = session_thread_from_handle (sh);
617  u32 session_index = session_index_from_handle (sh);
618 
619  /*
620  * Pass session index (u32) as opposed to handle (u64) in case pointers
621  * are not 64-bit.
622  */
623  session_send_rpc_evt_to_thread (thread_index,
625  uword_to_pointer (session_index, void *));
626 }
627 
628 int
630 {
631  app_worker_t *app_wrk;
632 
634 
635  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
636  if (PREDICT_FALSE (!app_wrk))
637  return -1;
638 
641  return -1;
642 
643  if (PREDICT_FALSE (s->tx_fifo->n_subscribers))
644  return session_notify_subscribers (app_wrk->app_index, s,
646 
647  return 0;
648 }
649 
650 /**
651  * Flushes queue of sessions that are to be notified of new data
652  * enqueued events.
653  *
654  * @param thread_index Thread index for which the flush is to be performed.
655  * @return 0 on success or a positive number indicating the number of
656  * failures due to API queue being full.
657  */
658 int
659 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
660 {
661  session_worker_t *wrk = session_main_get_worker (thread_index);
662  session_t *s;
663  int i, errors = 0;
664  u32 *indices;
665 
666  indices = wrk->session_to_enqueue[transport_proto];
667 
668  for (i = 0; i < vec_len (indices); i++)
669  {
670  s = session_get_if_valid (indices[i], thread_index);
671  if (PREDICT_FALSE (!s))
672  {
673  errors++;
674  continue;
675  }
676 
678  errors++;
679  }
680 
681  vec_reset_length (indices);
682  wrk->session_to_enqueue[transport_proto] = indices;
683 
684  return errors;
685 }
686 
687 int
689 {
691  int i, errors = 0;
692  for (i = 0; i < 1 + vtm->n_threads; i++)
693  errors += session_main_flush_enqueue_events (transport_proto, i);
694  return errors;
695 }
696 
697 static inline int
699  session_state_t opened_state)
700 {
701  u32 opaque = 0, new_ti, new_si;
702  app_worker_t *app_wrk;
703  session_t *s = 0;
704  u64 ho_handle;
705 
706  /*
707  * Find connection handle and cleanup half-open table
708  */
709  ho_handle = session_lookup_half_open_handle (tc);
710  if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
711  {
712  SESSION_DBG ("half-open was removed!");
713  return -1;
714  }
716 
717  /* Get the app's index from the handle we stored when opening connection
718  * and the opaque (api_context for external apps) from transport session
719  * index */
720  app_wrk = app_worker_get_if_valid (ho_handle >> 32);
721  if (!app_wrk)
722  return -1;
723 
724  opaque = tc->s_index;
725 
726  if (is_fail)
727  return app_worker_connect_notify (app_wrk, s, opaque);
728 
730  s->session_state = SESSION_STATE_CONNECTING;
731  s->app_wrk_index = app_wrk->wrk_index;
732  new_si = s->session_index;
733  new_ti = s->thread_index;
734 
735  if (app_worker_init_connected (app_wrk, s))
736  {
737  session_free (s);
738  app_worker_connect_notify (app_wrk, 0, opaque);
739  return -1;
740  }
741 
742  s = session_get (new_si, new_ti);
743  s->session_state = opened_state;
745 
746  if (app_worker_connect_notify (app_wrk, s, opaque))
747  {
748  s = session_get (new_si, new_ti);
750  return -1;
751  }
752 
753  return 0;
754 }
755 
756 int
758 {
759  return session_stream_connect_notify_inline (tc, is_fail,
760  SESSION_STATE_READY);
761 }
762 
763 int
765 {
766  return session_stream_connect_notify_inline (tc, is_fail,
767  SESSION_STATE_OPENED);
768 }
769 
770 typedef struct _session_switch_pool_args
771 {
772  u32 session_index;
773  u32 thread_index;
774  u32 new_thread_index;
775  u32 new_session_index;
777 
778 /**
779  * Notify old thread of the session pool switch
780  */
781 static void
782 session_switch_pool (void *cb_args)
783 {
785  app_worker_t *app_wrk;
786  session_t *s;
787 
788  ASSERT (args->thread_index == vlib_get_thread_index ());
789  s = session_get (args->session_index, args->thread_index);
790  s->tx_fifo->master_session_index = args->new_session_index;
791  s->tx_fifo->master_thread_index = args->new_thread_index;
793  s->thread_index);
794 
795  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
796  if (app_wrk)
797  {
798  session_handle_t new_sh;
799  new_sh = session_make_handle (args->new_session_index,
800  args->new_thread_index);
801  app_worker_migrate_notify (app_wrk, s, new_sh);
802 
803  /* Trigger app read on the new thread */
805  }
806 
807  session_free (s);
808  clib_mem_free (cb_args);
809 }
810 
811 /**
812  * Move dgram session to the right thread
813  */
814 int
816  u32 old_thread_index, session_t ** new_session)
817 {
818  session_t *new_s;
819  session_switch_pool_args_t *rpc_args;
820 
821  /*
822  * Clone half-open session to the right thread.
823  */
824  new_s = session_clone_safe (tc->s_index, old_thread_index);
825  new_s->connection_index = tc->c_index;
826  new_s->rx_fifo->master_session_index = new_s->session_index;
827  new_s->rx_fifo->master_thread_index = new_s->thread_index;
828  new_s->session_state = SESSION_STATE_READY;
829  new_s->flags |= SESSION_F_IS_MIGRATING;
831 
832  /*
833  * Ask thread owning the old session to clean it up and make us the tx
834  * fifo owner
835  */
836  rpc_args = clib_mem_alloc (sizeof (*rpc_args));
837  rpc_args->new_session_index = new_s->session_index;
838  rpc_args->new_thread_index = new_s->thread_index;
839  rpc_args->session_index = tc->s_index;
840  rpc_args->thread_index = old_thread_index;
841  session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
842  rpc_args);
843 
844  tc->s_index = new_s->session_index;
845  new_s->connection_index = tc->c_index;
846  *new_session = new_s;
847  return 0;
848 }
849 
850 /**
851  * Notification from transport that connection is being closed.
852  *
853  * A disconnect is sent to application but state is not removed. Once
854  * disconnect is acknowledged by application, session disconnect is called.
855  * Ultimately this leads to close being called on transport (passive close).
856  */
857 void
859 {
860  app_worker_t *app_wrk;
861  session_t *s;
862 
863  s = session_get (tc->s_index, tc->thread_index);
864  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
865  return;
866  s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
867  app_wrk = app_worker_get (s->app_wrk_index);
868  app_worker_close_notify (app_wrk, s);
869 }
870 
871 /**
872  * Notification from transport that connection is being deleted
873  *
874  * This removes the session if it is still valid. It should be called only on
875  * previously fully established sessions. For instance failed connects should
876  * call stream_session_connect_notify and indicate that the connect has
877  * failed.
878  */
879 void
881 {
882  session_t *s;
883 
884  /* App might've been removed already */
885  if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
886  return;
887 
888  switch (s->session_state)
889  {
890  case SESSION_STATE_CREATED:
891  /* Session was created but accept notification was not yet sent to the
892  * app. Cleanup everything. */
895  session_free (s);
896  break;
897  case SESSION_STATE_ACCEPTING:
898  case SESSION_STATE_TRANSPORT_CLOSING:
899  case SESSION_STATE_CLOSING:
900  case SESSION_STATE_TRANSPORT_CLOSED:
901  /* If transport finishes or times out before we get a reply
902  * from the app, mark transport as closed and wait for reply
903  * before removing the session. Cleanup session table in advance
904  * because transport will soon be closed and closed sessions
905  * are assumed to have been removed from the lookup table */
907  s->session_state = SESSION_STATE_TRANSPORT_DELETED;
910  break;
911  case SESSION_STATE_APP_CLOSED:
912  /* Cleanup lookup table as transport needs to still be valid.
913  * Program transport close to ensure that all session events
914  * have been cleaned up. Once transport close is called, the
915  * session is just removed because both transport and app have
916  * confirmed the close*/
918  s->session_state = SESSION_STATE_TRANSPORT_DELETED;
922  break;
923  case SESSION_STATE_TRANSPORT_DELETED:
924  break;
925  case SESSION_STATE_CLOSED:
927  session_delete (s);
928  break;
929  default:
930  clib_warning ("session state %u", s->session_state);
932  session_delete (s);
933  break;
934  }
935 }
936 
937 /**
938  * Notification from transport that it is closed
939  *
940  * Should be called by transport, prior to calling delete notify, once it
941  * knows that no more data will be exchanged. This could serve as an
942  * early acknowledgment of an active close especially if transport delete
943  * can be delayed a long time, e.g., tcp time-wait.
944  */
945 void
947 {
948  app_worker_t *app_wrk;
949  session_t *s;
950 
951  if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
952  return;
953 
954  /* Transport thinks that app requested close but it actually didn't.
955  * Can happen for tcp if fin and rst are received in close succession. */
956  if (s->session_state == SESSION_STATE_READY)
957  {
960  s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
961  }
962  /* If app close has not been received or has not yet resulted in
963  * a transport close, only mark the session transport as closed */
964  else if (s->session_state <= SESSION_STATE_CLOSING)
965  {
966  s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
967  }
968  /* If app also closed, switch to closed */
969  else if (s->session_state == SESSION_STATE_APP_CLOSED)
970  s->session_state = SESSION_STATE_CLOSED;
971 
972  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
973  if (app_wrk)
975 }
976 
977 /**
978  * Notify application that connection has been reset.
979  */
980 void
982 {
983  app_worker_t *app_wrk;
984  session_t *s;
985 
986  s = session_get (tc->s_index, tc->thread_index);
988  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
989  return;
990  s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
991  app_wrk = app_worker_get (s->app_wrk_index);
992  app_worker_reset_notify (app_wrk, s);
993 }
994 
995 int
997 {
998  app_worker_t *app_wrk;
999  session_t *s;
1000 
1001  s = session_get (tc->s_index, tc->thread_index);
1002  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1003  if (!app_wrk)
1004  return -1;
1005  s->session_state = SESSION_STATE_ACCEPTING;
1006  return app_worker_accept_notify (app_wrk, s);
1007 }
1008 
1009 /**
1010  * Accept a stream session. Optionally ping the server by callback.
1011  */
1012 int
1014  u32 thread_index, u8 notify)
1015 {
1016  session_t *s;
1017  int rv;
1018 
1020  s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1021  s->session_state = SESSION_STATE_CREATED;
1022 
1023  if ((rv = app_worker_init_accepted (s)))
1024  return rv;
1025 
1027 
1028  /* Shoulder-tap the server */
1029  if (notify)
1030  {
1031  app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1032  return app_worker_accept_notify (app_wrk, s);
1033  }
1034 
1035  return 0;
1036 }
1037 
1038 int
1039 session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1040 {
1043  app_worker_t *app_wrk;
1044  session_handle_t sh;
1045  session_t *s;
1046  int rv;
1047 
1049  rv = transport_connect (rmt->transport_proto, tep);
1050  if (rv < 0)
1051  {
1052  SESSION_DBG ("Transport failed to open connection.");
1053  return VNET_API_ERROR_SESSION_CONNECT;
1054  }
1055 
1056  tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1057 
1058  /* For dgram type of service, allocate session and fifos now */
1059  app_wrk = app_worker_get (app_wrk_index);
1061  s->app_wrk_index = app_wrk->wrk_index;
1062  s->session_state = SESSION_STATE_OPENED;
1063  if (app_worker_init_connected (app_wrk, s))
1064  {
1065  session_free (s);
1066  return -1;
1067  }
1068 
1069  sh = session_handle (s);
1071  return app_worker_connect_notify (app_wrk, s, opaque);
1072 }
1073 
1074 int
1075 session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1076 {
1079  u64 handle;
1080  int rv;
1081 
1083  rv = transport_connect (rmt->transport_proto, tep);
1084  if (rv < 0)
1085  {
1086  SESSION_DBG ("Transport failed to open connection.");
1087  return VNET_API_ERROR_SESSION_CONNECT;
1088  }
1089 
1090  tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1091 
1092  /* If transport offers a stream service, only allocate session once the
1093  * connection has been established.
1094  * Add connection to half-open table and save app and tc index. The
1095  * latter is needed to help establish the connection while the former
1096  * is needed when the connect notify comes and we have to notify the
1097  * external app
1098  */
1099  handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
1100  session_lookup_add_half_open (tc, handle);
1101 
1102  /* Store api_context (opaque) for when the reply comes. Not the nicest
1103  * thing but better than allocating a separate half-open pool.
1104  */
1105  tc->s_index = opaque;
1106  if (transport_half_open_has_fifos (rmt->transport_proto))
1107  return session_ho_stream_connect_notify (tc, 0 /* is_fail */ );
1108  return 0;
1109 }
1110 
1111 int
1112 session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1113 {
1116 
1117  sep->app_wrk_index = app_wrk_index;
1118  sep->opaque = opaque;
1119 
1120  return transport_connect (rmt->transport_proto, tep_cfg);
1121 }
1122 
1124 
1125 /* *INDENT-OFF* */
1130 };
1131 /* *INDENT-ON* */
1132 
1133 /**
1134  * Ask transport to open connection to remote transport endpoint.
1135  *
1136  * Stores handle for matching request with reply since the call can be
1137  * asynchronous. For instance, for TCP the 3-way handshake must complete
1138  * before reply comes. Session is only created once connection is established.
1139  *
1140  * @param app_index Index of the application requesting the connect
1141  * @param st Session type requested.
1142  * @param tep Remote transport endpoint
1143  * @param opaque Opaque data (typically, api_context) the application expects
1144  * on open completion.
1145  */
1146 int
1147 session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1148 {
1150  tst = transport_protocol_service_type (rmt->transport_proto);
1151  return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
1152 }
1153 
1154 /**
1155  * Ask transport to listen on session endpoint.
1156  *
1157  * @param s Session for which listen will be called. Note that unlike
1158  * established sessions, listen sessions are not associated to a
1159  * thread.
1160  * @param sep Local endpoint to be listened on.
1161  */
1162 int
1164 {
1165  transport_endpoint_t *tep;
1166  u32 tc_index, s_index;
1167 
1168  /* Transport bind/listen */
1169  tep = session_endpoint_to_transport (sep);
1170  s_index = ls->session_index;
1172  s_index, tep);
1173 
1174  if (tc_index == (u32) ~ 0)
1175  return -1;
1176 
1177  /* Attach transport to session. Lookup tables are populated by the app
1178  * worker because local tables (for ct sessions) are not backed by a fib */
1179  ls = listen_session_get (s_index);
1180  ls->connection_index = tc_index;
1181 
1182  return 0;
1183 }
1184 
1185 /**
1186  * Ask transport to stop listening on local transport endpoint.
1187  *
1188  * @param s Session to stop listening on. It must be in state LISTENING.
1189  */
1190 int
1192 {
1195 
1196  if (s->session_state != SESSION_STATE_LISTENING)
1197  return -1;
1198 
1200  if (!tc)
1201  return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
1202 
1203  if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1206  return 0;
1207 }
1208 
1209 /**
1210  * Initialize session closing procedure.
1211  *
1212  * Request is always sent to session node to ensure that all outstanding
1213  * requests are served before transport is notified.
1214  */
1215 void
1217 {
1218  if (!s)
1219  return;
1220 
1221  if (s->session_state >= SESSION_STATE_CLOSING)
1222  {
1223  /* Session will only be removed once both app and transport
1224  * acknowledge the close */
1225  if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1226  || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1228  return;
1229  }
1230 
1231  s->session_state = SESSION_STATE_CLOSING;
1233 }
1234 
1235 /**
1236  * Force a close without waiting for data to be flushed
1237  */
1238 void
1240 {
1241  if (s->session_state >= SESSION_STATE_CLOSING)
1242  return;
1243  /* Drop all outstanding tx data */
1245  s->session_state = SESSION_STATE_CLOSING;
1247 }
1248 
1249 /**
1250  * Notify transport the session can be disconnected. This should eventually
1251  * result in a delete notification that allows us to cleanup session state.
1252  * Called for both active/passive disconnects.
1253  *
1254  * Must be called from the session's thread.
1255  */
1256 void
1258 {
1259  if (s->session_state >= SESSION_STATE_APP_CLOSED)
1260  {
1261  if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1262  s->session_state = SESSION_STATE_CLOSED;
1263  /* If transport is already deleted, just free the session */
1264  else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1266  return;
1267  }
1268 
1269  /* If the tx queue wasn't drained, the transport can continue to try
1270  * sending the outstanding data (in closed state it cannot). It MUST however
1271  * at one point, either after sending everything or after a timeout, call
1272  * delete notify. This will finally lead to the complete cleanup of the
1273  * session.
1274  */
1275  s->session_state = SESSION_STATE_APP_CLOSED;
1276 
1278  s->thread_index);
1279 }
1280 
1281 /**
1282  * Force transport close
1283  */
1284 void
1286 {
1287  if (s->session_state >= SESSION_STATE_APP_CLOSED)
1288  {
1289  if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1290  s->session_state = SESSION_STATE_CLOSED;
1291  else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1293  return;
1294  }
1295 
1296  s->session_state = SESSION_STATE_APP_CLOSED;
1298  s->thread_index);
1299 }
1300 
1301 /**
1302  * Cleanup transport and session state.
1303  *
1304  * Notify transport of the cleanup and free the session. This should
1305  * be called only if transport reported some error and is already
1306  * closed.
1307  */
1308 void
1310 {
1311  /* Delete from main lookup table before we axe the the transport */
1313  if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1315  s->thread_index);
1316  /* Since we called cleanup, no delete notification will come. So, make
1317  * sure the session is properly freed. */
1319  session_free (s);
1320 }
1321 
1322 /**
1323  * Allocate event queues in the shared-memory segment
1324  *
1325  * That can either be a newly created memfd segment, that will need to be
1326  * mapped by all stack users, or the binary api's svm region. The latter is
1327  * assumed to be already mapped. NOTE that this assumption DOES NOT hold if
1328  * api clients bootstrap shm api over sockets (i.e. use memfd segments) and
1329  * vpp uses api svm region for event queues.
1330  */
1331 void
1333 {
1334  u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
1335  ssvm_private_t *eqs = &smm->evt_qs_segment;
1336  uword eqs_size = 64 << 20;
1337  pid_t vpp_pid = getpid ();
1338  void *oldheap;
1339  int i;
1340 
1342  evt_q_length = smm->configured_event_queue_length;
1343 
1344  if (smm->evt_qs_use_memfd_seg)
1345  {
1346  if (smm->evt_qs_segment_size)
1347  eqs_size = smm->evt_qs_segment_size;
1348 
1349  eqs->ssvm_size = eqs_size;
1350  eqs->i_am_master = 1;
1351  eqs->my_pid = vpp_pid;
1352  eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
1353  eqs->requested_va = smm->session_baseva;
1354 
1356  {
1357  clib_warning ("failed to initialize queue segment");
1358  return;
1359  }
1360  }
1361 
1362  if (smm->evt_qs_use_memfd_seg)
1363  oldheap = ssvm_push_heap (eqs->sh);
1364  else
1365  oldheap = vl_msg_push_heap ();
1366 
1367  for (i = 0; i < vec_len (smm->wrk); i++)
1368  {
1369  svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1371  {evt_q_length, evt_size, 0}
1372  ,
1373  {evt_q_length >> 1, 256, 0}
1374  };
1375  cfg->consumer_pid = 0;
1376  cfg->n_rings = 2;
1377  cfg->q_nitems = evt_q_length;
1378  cfg->ring_cfgs = rc;
1379  smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
1380  if (smm->evt_qs_use_memfd_seg)
1381  {
1383  clib_warning ("eventfd returned");
1384  }
1385  }
1386 
1387  if (smm->evt_qs_use_memfd_seg)
1388  ssvm_pop_heap (oldheap);
1389  else
1390  vl_msg_pop_heap (oldheap);
1391 }
1392 
1395 {
1396  session_main_t *smm = &session_main;
1397  if (smm->evt_qs_use_memfd_seg)
1398  return &smm->evt_qs_segment;
1399  return 0;
1400 }
1401 
1402 u64
1404 {
1405  svm_fifo_t *f;
1406 
1407  if (!s->rx_fifo)
1408  return SESSION_INVALID_HANDLE;
1409 
1410  f = s->rx_fifo;
1411  return segment_manager_make_segment_handle (f->segment_manager,
1412  f->segment_index);
1413 }
1414 
1415 /* *INDENT-OFF* */
1420  session_tx_fifo_dequeue_and_snd
1421 };
1422 /* *INDENT-ON* */
1423 
1424 /**
1425  * Initialize session layer for given transport proto and ip version
1426  *
1427  * Allocates per session type (transport proto + ip version) data structures
1428  * and adds arc from session queue node to session type output node.
1429  */
1430 void
1432  const transport_proto_vft_t * vft, u8 is_ip4,
1433  u32 output_node)
1434 {
1435  session_main_t *smm = &session_main;
1436  session_type_t session_type;
1437  u32 next_index = ~0;
1438 
1439  session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1440 
1441  vec_validate (smm->session_type_to_next, session_type);
1442  vec_validate (smm->session_tx_fns, session_type);
1443 
1444  /* *INDENT-OFF* */
1445  if (output_node != ~0)
1446  {
1447  foreach_vlib_main (({
1448  next_index = vlib_node_add_next (this_vlib_main,
1449  session_queue_node.index,
1450  output_node);
1451  }));
1452  }
1453  /* *INDENT-ON* */
1454 
1455  smm->session_type_to_next[session_type] = next_index;
1456  smm->session_tx_fns[session_type] =
1457  session_tx_fns[vft->transport_options.tx_type];
1458 }
1459 
1462 {
1463  if (s->session_state != SESSION_STATE_LISTENING)
1466  else
1468  s->connection_index);
1469 }
1470 
1471 void
1473 {
1474  if (s->session_state != SESSION_STATE_LISTENING)
1476  s->connection_index, s->thread_index, tep,
1477  is_lcl);
1478  else
1480  s->connection_index, tep, is_lcl);
1481 }
1482 
1485 {
1487  s->connection_index);
1488 }
1489 
1490 void
1492 {
1493  ASSERT (vlib_get_thread_index () == 0);
1496 }
1497 
1498 static clib_error_t *
1500 {
1501  segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args;
1502  session_main_t *smm = &session_main;
1504  u32 num_threads, preallocated_sessions_per_worker;
1505  session_worker_t *wrk;
1506  int i;
1507 
1508  num_threads = 1 /* main thread */ + vtm->n_threads;
1509 
1510  if (num_threads < 1)
1511  return clib_error_return (0, "n_thread_stacks not set");
1512 
1513  /* Allocate cache line aligned worker contexts */
1514  vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1515 
1516  for (i = 0; i < num_threads; i++)
1517  {
1518  wrk = &smm->wrk[i];
1519  wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1520  wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1521  wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1522  wrk->vm = vlib_mains[i];
1523  wrk->last_vlib_time = vlib_time_now (vm);
1525 
1526  if (num_threads > 1)
1528  }
1529 
1530  /* Allocate vpp event queues segment and queue */
1532 
1533  /* Initialize fifo segment main baseva and timeout */
1534  sm_args->baseva = smm->session_baseva + smm->evt_qs_segment_size;
1535  sm_args->size = smm->session_va_space_size;
1536  segment_manager_main_init (sm_args);
1537 
1538  /* Preallocate sessions */
1539  if (smm->preallocated_sessions)
1540  {
1541  if (num_threads == 1)
1542  {
1544  }
1545  else
1546  {
1547  int j;
1548  preallocated_sessions_per_worker =
1549  (1.1 * (f64) smm->preallocated_sessions /
1550  (f64) (num_threads - 1));
1551 
1552  for (j = 1; j < num_threads; j++)
1553  {
1554  pool_init_fixed (smm->wrk[j].sessions,
1555  preallocated_sessions_per_worker);
1556  }
1557  }
1558  }
1559 
1562  transport_init ();
1563 
1564  smm->is_enabled = 1;
1565 
1566  /* Enable transports */
1567  transport_enable_disable (vm, 1);
1568  return 0;
1569 }
1570 
1571 void
1573 {
1574  u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1576  u8 have_workers = vtm->n_threads != 0;
1577 
1578  /* *INDENT-OFF* */
1579  foreach_vlib_main (({
1580  if (have_workers && ii == 0)
1581  {
1582  vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
1583  state);
1584  if (is_en)
1585  {
1586  vlib_node_t *n = vlib_get_node (this_vlib_main,
1588  vlib_start_process (this_vlib_main, n->runtime_index);
1589  }
1590  else
1591  {
1592  vlib_process_signal_event_mt (this_vlib_main,
1595  }
1596 
1597  continue;
1598  }
1599  vlib_node_set_state (this_vlib_main, session_queue_node.index,
1600  state);
1601  }));
1602  /* *INDENT-ON* */
1603 }
1604 
1605 clib_error_t *
1607 {
1608  clib_error_t *error = 0;
1609  if (is_en)
1610  {
1611  if (session_main.is_enabled)
1612  return 0;
1613 
1614  error = session_manager_main_enable (vm);
1616  }
1617  else
1618  {
1619  session_main.is_enabled = 0;
1621  }
1622 
1623  return error;
1624 }
1625 
1626 clib_error_t *
1628 {
1629  session_main_t *smm = &session_main;
1631 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
1632  smm->session_va_space_size = 128ULL << 30;
1633  smm->evt_qs_segment_size = 64 << 20;
1634 #else
1635  smm->session_va_space_size = 128 << 20;
1636  smm->evt_qs_segment_size = 1 << 20;
1637 #endif
1638  smm->is_enabled = 0;
1639  smm->session_enable_asap = 0;
1640  return 0;
1641 }
1642 
1643 static clib_error_t *
1645 {
1646  session_main_t *smm = &session_main;
1647  if (smm->session_enable_asap)
1648  {
1650  vnet_session_enable_disable (vm, 1 /* is_en */ );
1652  }
1653  return 0;
1654 }
1655 
1658 
1659 static clib_error_t *
1661 {
1662  session_main_t *smm = &session_main;
1663  u32 nitems;
1664  uword tmp;
1665 
1666  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1667  {
1668  if (unformat (input, "event-queue-length %d", &nitems))
1669  {
1670  if (nitems >= 2048)
1671  smm->configured_event_queue_length = nitems;
1672  else
1673  clib_warning ("event queue length %d too small, ignored", nitems);
1674  }
1675  else if (unformat (input, "preallocated-sessions %d",
1676  &smm->preallocated_sessions))
1677  ;
1678  else if (unformat (input, "v4-session-table-buckets %d",
1680  ;
1681  else if (unformat (input, "v4-halfopen-table-buckets %d",
1683  ;
1684  else if (unformat (input, "v6-session-table-buckets %d",
1686  ;
1687  else if (unformat (input, "v6-halfopen-table-buckets %d",
1689  ;
1690  else if (unformat (input, "v4-session-table-memory %U",
1691  unformat_memory_size, &tmp))
1692  {
1693  if (tmp >= 0x100000000)
1694  return clib_error_return (0, "memory size %llx (%lld) too large",
1695  tmp, tmp);
1697  }
1698  else if (unformat (input, "v4-halfopen-table-memory %U",
1699  unformat_memory_size, &tmp))
1700  {
1701  if (tmp >= 0x100000000)
1702  return clib_error_return (0, "memory size %llx (%lld) too large",
1703  tmp, tmp);
1705  }
1706  else if (unformat (input, "v6-session-table-memory %U",
1707  unformat_memory_size, &tmp))
1708  {
1709  if (tmp >= 0x100000000)
1710  return clib_error_return (0, "memory size %llx (%lld) too large",
1711  tmp, tmp);
1713  }
1714  else if (unformat (input, "v6-halfopen-table-memory %U",
1715  unformat_memory_size, &tmp))
1716  {
1717  if (tmp >= 0x100000000)
1718  return clib_error_return (0, "memory size %llx (%lld) too large",
1719  tmp, tmp);
1721  }
1722  else if (unformat (input, "local-endpoints-table-memory %U",
1723  unformat_memory_size, &tmp))
1724  {
1725  if (tmp >= 0x100000000)
1726  return clib_error_return (0, "memory size %llx (%lld) too large",
1727  tmp, tmp);
1728  smm->local_endpoints_table_memory = tmp;
1729  }
1730  else if (unformat (input, "local-endpoints-table-buckets %d",
1732  ;
1733  else if (unformat (input, "evt_qs_memfd_seg"))
1734  smm->evt_qs_use_memfd_seg = 1;
1735  else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
1736  &smm->evt_qs_segment_size))
1737  ;
1738  else if (unformat (input, "enable"))
1739  smm->session_enable_asap = 1;
1740  else
1741  return clib_error_return (0, "unknown input `%U'",
1742  format_unformat_error, input);
1743  }
1744  return 0;
1745 }
1746 
1748 
1749 /*
1750  * fd.io coding-style-patch-verification: ON
1751  *
1752  * Local Variables:
1753  * eval: (c-set-style "gnu")
1754  * End:
1755  */
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:440
u32 flags
buffer flags: VLIB_BUFFER_FREE_LIST_INDEX_MASK: bits used to store free list index, VLIB_BUFFER_IS_TRACED: trace this buffer.
Definition: buffer.h:124
#define session_endpoint_to_transport_cfg(_sep)
Definition: session_types.h:95
u32 preallocated_sessions
Preallocate session config parameter.
Definition: session.h:196
int app_worker_lock_and_send_event(app_worker_t *app, session_t *s, u8 evt_type)
Send event to application.
void transport_close(transport_proto_t tp, u32 conn_index, u8 thread_index)
Definition: transport.c:295
static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES]
Definition: session.c:1126
u32 connection_index
Index of the transport connection associated to the session.
uword evt_qs_segment_size
Definition: session.h:178
void session_flush_frames_main_thread(vlib_main_t *vm)
Definition: session.c:1491
int app_worker_init_accepted(session_t *s)
void session_transport_reset(session_t *s)
Force transport close.
Definition: session.c:1285
int session_lookup_del_connection(transport_connection_t *tc)
Delete transport connection from session table.
int app_worker_reset_notify(app_worker_t *app_wrk, session_t *s)
u8 proto
Definition: acl_types.api:47
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
static u8 svm_msg_q_ring_is_full(svm_msg_q_t *mq, u32 ring_index)
int session_open(u32 app_wrk_index, session_endpoint_t *rmt, u32 opaque)
Ask transport to open connection to remote transport endpoint.
Definition: session.c:1147
session_type_t session_type
Type built from transport and network protocol types.
uword requested_va
Definition: ssvm.h:88
#define SESSION_Q_PROCESS_FLUSH_FRAMES
Definition: session.h:205
svm_msg_q_t * vpp_event_queue
vpp event message queue for worker
Definition: session.h:84
void transport_get_listener_endpoint(transport_proto_t tp, u32 conn_index, transport_endpoint_t *tep, u8 is_lcl)
Definition: transport.c:363
static u32 svm_fifo_max_enqueue_prod(svm_fifo_t *f)
Maximum number of bytes that can be enqueued into fifo.
Definition: svm_fifo.h:619
static void clib_rwlock_writer_lock(clib_rwlock_t *p)
Definition: lock.h:173
uword ssvm_size
Definition: ssvm.h:85
svm_fifo_t * tx_fifo
session_main_t session_main
Definition: session.c:26
int session_tx_fifo_peek_bytes(transport_connection_t *tc, u8 *buffer, u32 offset, u32 max_bytes)
Definition: session.c:503
#define VLIB_MAIN_LOOP_ENTER_FUNCTION(x)
Definition: init.h:176
u32 session_index
Index in thread pool where session was allocated.
int session_stop_listen(session_t *s)
Ask transport to stop listening on local transport endpoint.
Definition: session.c:1191
unsigned long u64
Definition: types.h:89
transport_connection_t * listen_session_get_transport(session_t *s)
Definition: session.c:1484
static svm_msg_q_t * session_main_get_vpp_event_queue(u32 thread_index)
Definition: session.h:607
u32 configured_v4_halfopen_table_buckets
Definition: session.h:184
u8 session_enable_asap
Enable session manager at startup.
Definition: session.h:170
int session_ho_stream_connect_notify(transport_connection_t *tc, u8 is_fail)
Definition: session.c:764
clib_memset(h->entries, 0, sizeof(h->entries[0]) *entries)
void session_transport_delete_notify(transport_connection_t *tc)
Notification from transport that connection is being deleted.
Definition: session.c:880
transport_connection_t * session_get_transport(session_t *s)
Definition: session.c:1461
static f64 vlib_time_now(vlib_main_t *vm)
Definition: main.h:279
svm_fifo_t * rx_fifo
Pointers to rx/tx buffers.
session_worker_t * wrk
Worker contexts.
Definition: session.h:147
static session_t * session_get_if_valid(u64 si, u32 thread_index)
Definition: session.h:302
void session_add_self_custom_tx_evt(transport_connection_t *tc, u8 has_prio)
Definition: session.c:123
#define pool_get_aligned_will_expand(P, YESNO, A)
See if pool_get will expand the pool or not.
Definition: pool.h:243
int session_enqueue_dgram_connection(session_t *s, session_dgram_hdr_t *hdr, vlib_buffer_t *b, u8 proto, u8 queue_event)
Definition: session.c:467
u16 current_length
Nbytes between current data and the end of this buffer.
Definition: buffer.h:113
static int session_enqueue_notify_inline(session_t *s)
Notify session peer that new data has been enqueued.
Definition: session.c:557
int session_main_flush_enqueue_events(u8 transport_proto, u32 thread_index)
Flushes queue of sessions that are to be notified of new data enqueued events.
Definition: session.c:659
session_evt_type_t
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
Definition: vec.h:523
static transport_proto_t session_get_transport_proto(session_t *s)
void session_send_rpc_evt_to_thread(u32 thread_index, void *fp, void *rpc_args)
Definition: session.c:111
int svm_fifo_peek(svm_fifo_t *f, u32 offset, u32 len, u8 *dst)
Peek data from fifo.
Definition: svm_fifo.c:1007
void session_transport_reset_notify(transport_connection_t *tc)
Notify application that connection has been reset.
Definition: session.c:981
int i
int session_open_vc(u32 app_wrk_index, session_endpoint_t *rmt, u32 opaque)
Definition: session.c:1075
void svm_fifo_dequeue_drop_all(svm_fifo_t *f)
Dequeue and drop all bytes from fifo.
Definition: svm_fifo.c:1061
ssvm_shared_header_t * sh
Definition: ssvm.h:84
u32 transport_start_listen(transport_proto_t tp, u32 session_index, transport_endpoint_t *tep)
Definition: transport.c:310
static session_t * session_get(u32 si, u32 thread_index)
Definition: session.h:295
u8 * format(u8 *s, const char *fmt,...)
Definition: format.c:424
session_evt_elt_t * event_elts
Pool of session event list elements.
Definition: session.h:105
#define vec_validate_aligned(V, I, A)
Make sure vector is long enough for given index (no header, specified alignment)
Definition: vec.h:451
u32 flags
Session flags.
int session_enqueue_stream_connection(transport_connection_t *tc, vlib_buffer_t *b, u32 offset, u8 queue_event, u8 is_in_order)
Definition: session.c:414
u64 session_lookup_half_open_handle(transport_connection_t *tc)
static session_t * session_clone_safe(u32 session_index, u32 thread_index)
Definition: session.h:392
u32 local_endpoints_table_memory
Transport table (preallocation) size parameters.
Definition: session.h:192
void session_node_enable_disable(u8 is_en)
Definition: session.c:1572
vlib_node_registration_t session_queue_node
(constructor) VLIB_REGISTER_NODE (session_queue_node)
vlib_main_t ** vlib_mains
Definition: buffer.c:332
static uword vlib_node_add_next(vlib_main_t *vm, uword node, uword next_node)
Definition: node_funcs.h:1092
unsigned char u8
Definition: types.h:56
app_worker_t * application_get_worker(application_t *app, u32 wrk_map_index)
Definition: application.c:650
session_fifo_rx_fn session_tx_fifo_peek_and_snd
session_t * sessions
Worker session pool.
Definition: session.h:81
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
double f64
Definition: types.h:142
static session_fifo_rx_fn * session_tx_fns[TRANSPORT_TX_N_FNS]
Definition: session.c:1416
#define vlib_worker_thread_barrier_sync(X)
Definition: threads.h:204
static session_handle_t session_handle(session_t *s)
struct _svm_fifo svm_fifo_t
void session_get_endpoint(session_t *s, transport_endpoint_t *tep, u8 is_lcl)
Definition: session.c:1472
u8 session_type_t
void session_transport_closing_notify(transport_connection_t *tc)
Notification from transport that connection is being closed.
Definition: session.c:858
u64 segment_manager_make_segment_handle(u32 segment_manager_index, u32 segment_index)
int session_open_cl(u32 app_wrk_index, session_endpoint_t *rmt, u32 opaque)
Definition: session.c:1039
#define clib_llist_make_head(LP, name)
Initialize llist head.
Definition: llist.h:106
static session_worker_t * session_main_get_worker(u32 thread_index)
Definition: session.h:593
void session_free_w_fifos(session_t *s)
Definition: session.c:247
#define session_endpoint_to_transport(_sep)
Definition: session_types.h:94
void app_namespaces_init(void)
vlib_main_t * vm
Convenience pointer to this worker&#39;s vlib_main.
Definition: session.h:93
static clib_error_t * session_config_fn(vlib_main_t *vm, unformat_input_t *input)
Definition: session.c:1660
svm_msg_q_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
Definition: message_queue.c:41
u8 transport_half_open_has_fifos(transport_proto_t tp)
Definition: transport.c:265
int session_send_ctrl_evt_to_thread(session_t *s, session_evt_type_t evt_type)
Definition: session.c:94
#define VLIB_INIT_FUNCTION(x)
Definition: init.h:173
void segment_manager_dealloc_fifos(svm_fifo_t *rx_fifo, svm_fifo_t *tx_fifo)
clib_llist_index_t new_head
Head of list of elements.
Definition: session.h:114
u32 * session_type_to_next
Per session type output nodes.
Definition: session.h:161
static int session_stream_connect_notify_inline(transport_connection_t *tc, u8 is_fail, session_state_t opened_state)
Definition: session.c:698
static int session_enqueue_chain_tail(session_t *s, vlib_buffer_t *b, u32 offset, u8 is_in_order)
Enqueue buffer chain tail.
Definition: session.c:327
static void * ssvm_push_heap(ssvm_shared_header_t *sh)
Definition: ssvm.h:143
#define clib_error_return(e, args...)
Definition: error.h:99
uword session_baseva
Session ssvm segment configs.
Definition: session.h:176
unsigned int u32
Definition: types.h:88
int session_send_io_evt_to_thread(svm_fifo_t *f, session_evt_type_t evt_type)
Definition: session.c:80
int ssvm_master_init(ssvm_private_t *ssvm, ssvm_segment_type_t type)
Definition: ssvm.c:425
u8 session_is_valid(u32 si, u8 thread_index)
Definition: session.c:210
static void ssvm_pop_heap(void *oldheap)
Definition: ssvm.h:151
void transport_reset(transport_proto_t tp, u32 conn_index, u8 thread_index)
Definition: transport.c:301
#define HALF_OPEN_LOOKUP_INVALID_VALUE
#define SESSION_INVALID_HANDLE
Definition: session_types.h:23
int session_lookup_del_half_open(transport_connection_t *tc)
session_state_t
static u32 vlib_get_buffer_index(vlib_main_t *vm, void *p)
Translate buffer pointer into buffer index.
Definition: buffer_funcs.h:257
static clib_error_t * session_manager_main_enable(vlib_main_t *vm)
Definition: session.c:1499
struct _transport_proto_vft transport_proto_vft_t
int session_dequeue_notify(session_t *s)
Definition: session.c:629
struct _session_endpoint_cfg session_endpoint_cfg_t
u32 configured_v6_halfopen_table_memory
Definition: session.h:189
u32 configured_v6_session_table_buckets
Definition: session.h:186
static session_type_t session_type_from_proto_and_ip(transport_proto_t proto, u8 is_ip4)
#define pool_elt_at_index(p, i)
Returns pointer to element at given index.
Definition: pool.h:519
static void clib_rwlock_init(clib_rwlock_t *p)
Definition: lock.h:133
#define CLIB_US_TIME_FREQ
Definition: time.h:201
int svm_msg_q_alloc_consumer_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue consumer.
session_event_t evt
Definition: session.h:68
transport_service_type_t transport_protocol_service_type(transport_proto_t tp)
Definition: transport.c:271
void vl_msg_pop_heap(void *oldheap)
Definition: cli.c:720
uword session_va_space_size
Definition: session.h:177
void session_send_rpc_evt_to_thread_force(u32 thread_index, void *fp, void *rpc_args)
Definition: session.c:103
u32 configured_v4_session_table_buckets
Session table size parameters.
Definition: session.h:182
void transport_get_endpoint(transport_proto_t tp, u32 conn_index, u32 thread_index, transport_endpoint_t *tep, u8 is_lcl)
Definition: transport.c:347
int app_worker_accept_notify(app_worker_t *app_wrk, session_t *s)
u32 app_index
Index of application that owns the listener.
struct _unformat_input_t unformat_input_t
u32 configured_v6_halfopen_table_buckets
Definition: session.h:188
u32 configured_event_queue_length
vpp fifo event queue configured length
Definition: session.h:173
u8 is_enabled
Session manager is enabled.
Definition: session.h:168
static void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
Definition: buffer.h:229
#define pool_put(P, E)
Free an object E in pool P.
Definition: pool.h:287
#define APP_INVALID_INDEX
Definition: application.h:169
int svm_fifo_enqueue(svm_fifo_t *f, u32 len, const u8 *src)
Enqueue data to fifo.
Definition: svm_fifo.c:888
enum transport_service_type_ transport_service_type_t
#define PREDICT_FALSE(x)
Definition: clib.h:111
#define VLIB_CONFIG_FUNCTION(x, n,...)
Definition: init.h:182
#define always_inline
Definition: ipsec.h:28
int session_lookup_del_session(session_t *s)
u32 wrk_index
Worker index in global worker pool.
Definition: application.h:37
clib_error_t * session_manager_main_init(vlib_main_t *vm)
Definition: session.c:1627
app_worker_t * app_worker_get_if_valid(u32 wrk_index)
static u32 svm_fifo_max_dequeue_prod(svm_fifo_t *f)
Fifo max bytes to dequeue optimized for producer.
Definition: svm_fifo.h:513
session_fifo_rx_fn ** session_tx_fns
Per transport rx function that can either dequeue or peek.
Definition: session.h:156
u64 session_segment_handle(session_t *s)
Definition: session.c:1403
#define foreach_vlib_main(body)
Definition: threads.h:241
static session_evt_elt_t * session_evt_alloc_old(session_worker_t *wrk)
Definition: session.h:280
void transport_cleanup(transport_proto_t tp, u32 conn_index, u8 thread_index)
Definition: transport.c:283
vlib_main_t * vm
Definition: in2out_ed.c:1810
clib_error_t * vnet_session_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: session.c:1606
u32 transport_stop_listen(transport_proto_t tp, u32 conn_index)
Definition: transport.c:317
struct _session_switch_pool_args session_switch_pool_args_t
u8 len
Definition: ip_types.api:91
clib_rwlock_t peekers_rw_locks
Peekers rw lock.
Definition: session.h:120
#define pool_get_aligned(P, E, A)
Allocate an object E from a pool P with alignment A.
Definition: pool.h:231
#define SESSION_DBG(_fmt, _args...)
static void clib_rwlock_writer_unlock(clib_rwlock_t *p)
Definition: lock.h:187
static u8 svm_fifo_set_event(svm_fifo_t *f)
Set fifo event flag.
Definition: svm_fifo.h:749
u32 n_rings
number of msg rings
Definition: message_queue.h:54
u8 is_ip4
Definition: lisp_gpe.api:232
u8 evt_qs_use_memfd_seg
Definition: session.h:179
void transport_init(void)
Definition: transport.c:745
int app_worker_transport_closed_notify(app_worker_t *app_wrk, session_t *s)
session_fifo_rx_fn session_tx_fifo_dequeue_and_snd
ssvm_private_t evt_qs_segment
Event queues memfd segment initialized only if so configured.
Definition: session.h:150
static u8 svm_fifo_needs_deq_ntf(svm_fifo_t *f, u32 n_last_deq)
Check if fifo needs dequeue notification.
Definition: svm_fifo.h:840
#define UNFORMAT_END_OF_INPUT
Definition: format.h:145
u32 runtime_index
Definition: node.h:283
session_handle_t listener_handle
Parent listener session index if the result of an accept.
static_always_inline uword vlib_get_thread_index(void)
Definition: threads.h:218
int transport_connect(transport_proto_t tp, transport_endpoint_cfg_t *tep)
Definition: transport.c:289
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
static transport_connection_t * transport_get_connection(transport_proto_t tp, u32 conn_index, u8 thread_index)
Definition: transport.h:119
static void vlib_process_signal_event_mt(vlib_main_t *vm, uword node_index, uword type_opaque, uword data)
Signal event to process from any thread.
Definition: node_funcs.h:958
static void session_cleanup_notify(session_t *s, session_cleanup_ntf_t ntf)
Definition: session.c:236
void session_free(session_t *s)
Definition: session.c:196
int app_worker_cleanup_notify(app_worker_t *app_wrk, session_t *s, session_cleanup_ntf_t ntf)
#define clib_warning(format, args...)
Definition: error.h:59
static int session_send_evt_to_thread(void *data, void *args, u32 thread_index, session_evt_type_t evt_type)
Definition: session.c:29
int session_dgram_connect_notify(transport_connection_t *tc, u32 old_thread_index, session_t **new_session)
Move dgram session to the right thread.
Definition: session.c:815
Don&#39;t register connection in lookup Does not apply to local apps and transports using the network lay...
struct _transport_connection transport_connection_t
#define HIGH_SEGMENT_BASEVA
Definition: svm_common.h:87
int app_worker_init_connected(app_worker_t *app_wrk, session_t *s)
static session_evt_elt_t * session_evt_alloc_new(session_worker_t *wrk)
Definition: session.h:270
u32 my_pid
Definition: ssvm.h:86
void transport_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: transport.c:734
static u8 vlib_thread_is_main_w_barrier(void)
Definition: threads.h:517
#define pool_init_fixed(pool, max_elts)
initialize a fixed-size, preallocated pool
Definition: pool.h:86
application_t * application_get(u32 app_index)
Definition: application.c:426
static u32 session_thread_from_handle(session_handle_t handle)
#define SESSION_Q_PROCESS_STOP
Definition: session.h:206
int session_listen(session_t *ls, session_endpoint_cfg_t *sep)
Ask transport to listen on session endpoint.
Definition: session.c:1163
static u32 session_index_from_handle(session_handle_t handle)
#define uword_to_pointer(u, type)
Definition: types.h:136
void session_vpp_event_queues_allocate(session_main_t *smm)
Allocate event queues in the shared-memory segment.
Definition: session.c:1332
static session_handle_t session_make_handle(u32 session_index, u32 thread_index)
#define ASSERT(truth)
static int session_notify_subscribers(u32 app_index, session_t *s, svm_fifo_t *f, session_evt_type_t evt_type)
Definition: session.c:525
void svm_msg_q_add_and_unlock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Producer enqueue one message to queue with mutex held.
u8 data[128]
Definition: ipsec_types.api:87
int session_main_flush_all_enqueue_events(u8 transport_proto)
Definition: session.c:688
void segment_manager_main_init(segment_manager_main_init_args_t *a)
static void clib_mem_free(void *p)
Definition: mem.h:226
enum _transport_proto transport_proto_t
static void svm_fifo_clear_deq_ntf(svm_fifo_t *f)
Clear the want notification flag and set has notification.
Definition: svm_fifo.h:807
int svm_fifo_enqueue_with_offset(svm_fifo_t *f, u32 offset, u32 len, u8 *src)
Enqueue a future segment.
Definition: svm_fifo.c:931
static void vlib_buffer_advance(vlib_buffer_t *b, word l)
Advance current data pointer by the supplied (signed!) amount.
Definition: buffer.h:248
static void vlib_node_set_state(vlib_main_t *vm, u32 node_index, vlib_node_state_t new_state)
Set node dispatch state.
Definition: node_funcs.h:148
clib_time_type_t last_vlib_time
vlib_time_now last time around the track
Definition: session.h:87
u32 configured_v4_halfopen_table_memory
Definition: session.h:185
static session_t * session_alloc_for_connection(transport_connection_t *tc)
Definition: session.c:272
int app_worker_migrate_notify(app_worker_t *app_wrk, session_t *s, session_handle_t new_sh)
u32 configured_v4_session_table_memory
Definition: session.h:183
int session_stream_accept_notify(transport_connection_t *tc)
Definition: session.c:996
static void * clib_mem_alloc(uword size)
Definition: mem.h:153
void session_enqueue_notify_thread(session_handle_t sh)
Like session_enqueue_notify, but can be called from a thread that does not own the session...
Definition: session.c:614
static uword pointer_to_uword(const void *p)
Definition: types.h:131
svm_msg_q_ring_cfg_t * ring_cfgs
array of ring cfgs
Definition: message_queue.h:55
static vlib_main_t * vlib_get_main(void)
Definition: global_funcs.h:23
u8 * name
Definition: ssvm.h:87
int() session_fifo_rx_fn(session_worker_t *wrk, vlib_node_runtime_t *node, session_evt_elt_t *e, int *n_tx_packets)
Definition: session.h:134
u8 thread_index
Index of the thread that allocated the session.
session_t * session_alloc(u32 thread_index)
Definition: session.c:170
void session_transport_cleanup(session_t *s)
Cleanup transport and session state.
Definition: session.c:1309
static void session_enqueue_discard_chain_bytes(vlib_main_t *vm, vlib_buffer_t *b, vlib_buffer_t **chain_b, u32 n_bytes_to_drop)
Discards bytes from buffer chain.
Definition: session.c:296
template key/value backing page structure
Definition: bihash_doc.h:44
static void session_switch_pool(void *cb_args)
Notify old thread of the session pool switch.
Definition: session.c:782
static transport_connection_t * transport_get_half_open(transport_proto_t tp, u32 conn_index)
Definition: transport.h:132
u32 local_endpoints_table_buckets
Definition: session.h:193
app_worker_t * app_worker_get(u32 wrk_index)
u32 q_nitems
msg queue size (not rings)
Definition: message_queue.h:53
u64 session_handle_t
void session_lookup_init(void)
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
session_fifo_rx_fn session_tx_fifo_dequeue_internal
clib_llist_index_t old_head
Head of list of pending events.
Definition: session.h:117
u32 next_buffer
Next buffer for this linked-list of buffers.
Definition: buffer.h:140
volatile u8 session_state
State in session layer state machine.
int session_enqueue_notify(session_t *s)
Definition: session.c:591
void session_transport_closed_notify(transport_connection_t *tc)
Notification from transport that it is closed.
Definition: session.c:946
u32 * session_to_enqueue[TRANSPORT_N_PROTO]
Per-proto vector of sessions to enqueue.
Definition: session.h:96
void session_close(session_t *s)
Initialize session closing procedure.
Definition: session.c:1216
VLIB buffer representation.
Definition: buffer.h:102
u64 uword
Definition: types.h:112
void session_reset(session_t *s)
Force a close without waiting for data to be flushed.
Definition: session.c:1239
ssvm_private_t * session_main_get_evt_q_segment(void)
Definition: session.c:1394
session_cleanup_ntf_t
int session_send_io_evt_to_thread_custom(void *data, u32 thread_index, session_evt_type_t evt_type)
Definition: session.c:87
unformat_function_t unformat_memory_size
Definition: format.h:296
static session_evt_elt_t * session_evt_alloc_ctrl(session_worker_t *wrk)
Definition: session.h:246
u32 app_index
Index of owning app.
Definition: application.h:43
static u8 svm_fifo_n_subscribers(svm_fifo_t *f)
Definition: svm_fifo.h:681
int session_lookup_add_connection(transport_connection_t *tc, u64 value)
Add transport connection to a session table.
u8 * format_unformat_error(u8 *s, va_list *va)
Definition: unformat.c:91
void vlib_worker_thread_barrier_release(vlib_main_t *vm)
Definition: threads.c:1496
u32 configured_v6_session_table_memory
Definition: session.h:187
clib_us_time_t last_vlib_us_time
vlib_time_now rounded to us precision and as u64
Definition: session.h:90
int session_stream_connect_notify(transport_connection_t *tc, u8 is_fail)
Definition: session.c:757
static vlib_thread_main_t * vlib_get_thread_main()
Definition: global_funcs.h:32
vlib_node_registration_t session_queue_process_node
(constructor) VLIB_REGISTER_NODE (session_queue_process_node)
vl_api_dhcp_client_state_t state
Definition: dhcp.api:201
static u32 vlib_num_workers()
Definition: threads.h:372
static vlib_node_t * vlib_get_node(vlib_main_t *vm, u32 i)
Get vlib node by index.
Definition: node_funcs.h:59
void * vl_msg_push_heap(void)
Definition: cli.c:713
int(* session_open_service_fn)(u32, session_endpoint_t *, u32)
Definition: session.c:1123
u32 app_wrk_index
Index of the app worker that owns the session.
u32 session_tx_fifo_dequeue_drop(transport_connection_t *tc, u32 max_bytes)
Definition: session.c:511
static void session_delete(session_t *s)
Cleans up session and lookup table.
Definition: session.c:260
void session_transport_close(session_t *s)
Notify transport the session can be disconnected.
Definition: session.c:1257
static clib_error_t * session_main_init(vlib_main_t *vm)
Definition: session.c:1644
clib_llist_index_t ctrl_head
Head of control events list.
Definition: session.h:111
int app_worker_close_notify(app_worker_t *app_wrk, session_t *s)
struct _session_endpoint session_endpoint_t
int session_stream_accept(transport_connection_t *tc, u32 listener_index, u32 thread_index, u8 notify)
Accept a stream session.
Definition: session.c:1013
int consumer_pid
pid of msg consumer
Definition: message_queue.h:52
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:59
static transport_connection_t * transport_get_listener(transport_proto_t tp, u32 conn_index)
Definition: transport.h:126
u32 total_length_not_including_first_buffer
Only valid for first buffer in chain.
Definition: buffer.h:167
int svm_fifo_dequeue_drop(svm_fifo_t *f, u32 len)
Dequeue and drop bytes from fifo.
Definition: svm_fifo.c:1029
static int svm_msg_q_lock(svm_msg_q_t *mq)
Lock, or block trying, the message queue.
static void session_enqueue_notify_rpc(void *arg)
Definition: session.c:597
int session_open_app(u32 app_wrk_index, session_endpoint_t *rmt, u32 opaque)
Definition: session.c:1112
void vlib_start_process(vlib_main_t *vm, uword process_index)
Definition: main.c:1590
void session_register_transport(transport_proto_t transport_proto, const transport_proto_vft_t *vft, u8 is_ip4, u32 output_node)
Initialize session layer for given transport proto and ip version.
Definition: session.c:1431
int session_lookup_add_half_open(transport_connection_t *tc, u64 value)
u8 transport_protocol_is_cl(transport_proto_t tp)
Definition: transport.c:323
static u8 svm_msg_q_is_full(svm_msg_q_t *mq)
Check if message queue is full.
int app_worker_connect_notify(app_worker_t *app_wrk, session_t *s, u32 opaque)
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
Definition: buffer_funcs.h:85
svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index)
Allocate message buffer on ring.
#define SESSION_EVT(_evt, _args...)
int i_am_master
Definition: ssvm.h:89
static void session_program_transport_ctrl_evt(session_t *s, session_evt_type_t evt)
Definition: session.c:149
static session_t * listen_session_get(u32 ls_index)
Definition: session.h:569
uword unformat(unformat_input_t *i, const char *fmt,...)
Definition: unformat.c:978
static uword unformat_check_input(unformat_input_t *i)
Definition: format.h:171