FD.io VPP  v19.01.3-6-g70449b9b9
Vector Packet Processing
session_node.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <math.h>
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vppinfra/elog.h>
20 #include <vnet/session/transport.h>
21 #include <vnet/session/session.h>
25 #include <svm/queue.h>
26 
27 static void
29 {
31  vnet_disconnect_args_t _a = { 0 }, *a = &_a;
32  stream_session_state_t old_state;
33  app_worker_t *app_wrk;
34  local_session_t *ls;
36 
37  /* Server isn't interested, kill the session */
38  if (mp->retval)
39  {
40  a->app_index = mp->context;
41  a->handle = mp->handle;
43  return;
44  }
45 
47  {
49  if (!ls)
50  {
51  clib_warning ("unknown local handle 0x%lx", mp->handle);
52  return;
53  }
54  app_wrk = app_worker_get (ls->app_wrk_index);
55  if (app_wrk->app_index != mp->context)
56  {
57  clib_warning ("server %u doesn't own local handle 0x%lx",
58  mp->context, mp->handle);
59  return;
60  }
62  return;
64  }
65  else
66  {
68  if (!s)
69  return;
70 
71  app_wrk = app_worker_get (s->app_wrk_index);
72  if (app_wrk->app_index != mp->context)
73  {
74  clib_warning ("app doesn't own session");
75  return;
76  }
77 
78  old_state = s->session_state;
79  s->session_state = SESSION_STATE_READY;
80  if (!svm_fifo_is_empty (s->server_rx_fifo))
82 
83  /* Closed while waiting for app to reply. Resend disconnect */
84  if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
85  {
86  application_t *app = application_get (app_wrk->app_index);
87  app->cb_fns.session_disconnect_callback (s);
88  s->session_state = old_state;
89  return;
90  }
91  }
92 }
93 
94 static void
96 {
97  vnet_disconnect_args_t _a = { 0 }, *a = &_a;
99  app_worker_t *app_wrk;
100  stream_session_t *s;
101  application_t *app;
102  u32 index, thread_index;
103 
104  mp = (session_reset_reply_msg_t *) data;
105  app = application_lookup (mp->context);
106  if (!app)
107  return;
108 
109  session_parse_handle (mp->handle, &index, &thread_index);
110  s = session_get_if_valid (index, thread_index);
111 
112  /* Session was already closed or already cleaned up */
113  if (!s || s->session_state != SESSION_STATE_TRANSPORT_CLOSING)
114  return;
115 
116  app_wrk = app_worker_get (s->app_wrk_index);
117  if (!app_wrk || app_wrk->app_index != app->app_index)
118  {
119  clib_warning ("App % does not own handle 0x%lx!", app->app_index,
120  mp->handle);
121  return;
122  }
123 
124  /* Client objected to resetting the session, log and continue */
125  if (mp->retval)
126  {
127  clib_warning ("client retval %d", mp->retval);
128  return;
129  }
130 
131  /* This comes as a response to a reset, transport only waiting for
132  * confirmation to remove connection state, no need to disconnect */
133  a->handle = mp->handle;
134  a->app_index = app->app_index;
136 }
137 
138 static void
140 {
142  vnet_disconnect_args_t _a, *a = &_a;
143  svm_msg_q_msg_t _msg, *msg = &_msg;
145  app_worker_t *app_wrk;
146  session_event_t *evt;
147  stream_session_t *s;
148  application_t *app;
149  int rv = 0;
150 
151  mp = (session_disconnected_msg_t *) data;
152  if (!(s = session_get_from_handle_if_valid (mp->handle)))
153  {
154  clib_warning ("could not disconnect handle %llu", mp->handle);
155  return;
156  }
157  app_wrk = app_worker_get (s->app_wrk_index);
158  app = application_lookup (mp->client_index);
159  if (!(app_wrk && app && app->app_index == app_wrk->app_index))
160  {
161  clib_warning ("could not disconnect session: %llu app: %u",
162  mp->handle, mp->client_index);
163  return;
164  }
165 
166  a->handle = mp->handle;
167  a->app_index = app_wrk->wrk_index;
168  rv = vnet_disconnect_session (a);
169 
172  SVM_Q_WAIT, msg);
173  svm_msg_q_unlock (app_wrk->event_queue);
174  evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
175  clib_memset (evt, 0, sizeof (*evt));
176  evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
177  rmp = (session_disconnected_reply_msg_t *) evt->data;
178  rmp->handle = mp->handle;
179  rmp->context = mp->context;
180  rmp->retval = rv;
181  svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
182 }
183 
184 static void
186 {
188  vnet_disconnect_args_t _a, *a = &_a;
189  application_t *app;
190 
191  mp = (session_disconnected_reply_msg_t *) data;
192 
193  /* Client objected to disconnecting the session, log and continue */
194  if (mp->retval)
195  {
196  clib_warning ("client retval %d", mp->retval);
197  return;
198  }
199 
200  /* Disconnect has been confirmed. Confirm close to transport */
201  app = application_lookup (mp->context);
202  if (app)
203  {
204  a->handle = mp->handle;
205  a->app_index = app->app_index;
207  }
208 }
209 
210 static void
212 {
215  svm_msg_q_msg_t _msg, *msg = &_msg;
216  app_worker_t *app_wrk;
217  u32 owner_app_wrk_map;
218  session_event_t *evt;
219  stream_session_t *s;
220  application_t *app;
221 
222  app = application_lookup (mp->client_index);
223  if (!app)
224  return;
225  if (!(s = session_get_from_handle_if_valid (mp->handle)))
226  {
227  clib_warning ("invalid handle %llu", mp->handle);
228  return;
229  }
230  app_wrk = app_worker_get (s->app_wrk_index);
231  if (app_wrk->app_index != app->app_index)
232  {
233  clib_warning ("app %u does not own session %llu", app->app_index,
234  mp->handle);
235  return;
236  }
237  owner_app_wrk_map = app_wrk->wrk_map_index;
238  app_wrk = application_get_worker (app, mp->wrk_index);
239 
240  /* This needs to come from the new owner */
241  if (mp->req_wrk_index == owner_app_wrk_map)
242  {
244 
247  SVM_Q_WAIT, msg);
248  svm_msg_q_unlock (app_wrk->event_queue);
249  evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
250  clib_memset (evt, 0, sizeof (*evt));
251  evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
252  wump = (session_req_worker_update_msg_t *) evt->data;
253  wump->session_handle = mp->handle;
254  svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
255  return;
256  }
257 
258  app_worker_own_session (app_wrk, s);
259 
260  /*
261  * Send reply
262  */
265  SVM_Q_WAIT, msg);
266  svm_msg_q_unlock (app_wrk->event_queue);
267  evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
268  clib_memset (evt, 0, sizeof (*evt));
269  evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
270  rmp = (session_worker_update_reply_msg_t *) evt->data;
271  rmp->handle = mp->handle;
272  rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo);
273  rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo);
275  svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
276 
277  /*
278  * Retransmit messages that may have been lost
279  */
280  if (!svm_fifo_is_empty (s->server_tx_fifo))
282 
283  if (!svm_fifo_is_empty (s->server_rx_fifo))
285 
286  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
287  app->cb_fns.session_disconnect_callback (s);
288 }
289 
291 
292 typedef struct
293 {
297 
298 /* packet trace format function */
299 static u8 *
300 format_session_queue_trace (u8 * s, va_list * args)
301 {
302  CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
303  CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
304  session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
305 
306  s = format (s, "SESSION_QUEUE: session index %d, server thread index %d",
308  return s;
309 }
310 
311 #define foreach_session_queue_error \
312 _(TX, "Packets transmitted") \
313 _(TIMER, "Timer events") \
314 _(NO_BUFFER, "Out of buffers")
315 
316 typedef enum
317 {
318 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
320 #undef _
323 
324 static char *session_queue_error_strings[] = {
325 #define _(sym,string) string,
327 #undef _
328 };
329 
330 enum
331 {
335 };
336 
337 static void
339  u32 next_index, u32 * to_next, u16 n_segs,
340  stream_session_t * s, u32 n_trace)
341 {
343  vlib_buffer_t *b;
344  int i;
345 
346  for (i = 0; i < clib_min (n_trace, n_segs); i++)
347  {
348  b = vlib_get_buffer (vm, to_next[i - n_segs]);
349  vlib_trace_buffer (vm, node, next_index, b, 1 /* follow_chain */ );
350  t = vlib_add_trace (vm, node, b, sizeof (*t));
351  t->session_index = s->session_index;
352  t->server_thread_index = s->thread_index;
353  }
354  vlib_set_trace_count (vm, node, n_trace - i);
355 }
356 
357 always_inline void
359  vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
360 {
361  vlib_buffer_t *chain_b, *prev_b;
362  u32 chain_bi0, to_deq, left_from_seg;
364  u16 len_to_deq, n_bytes_read;
365  u8 *data, j;
366 
367  wrk = session_manager_get_worker (ctx->s->thread_index);
368  b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
370 
371  chain_b = b;
372  left_from_seg = clib_min (ctx->snd_mss - b->current_length,
373  ctx->left_to_snd);
374  to_deq = left_from_seg;
375  for (j = 1; j < ctx->n_bufs_per_seg; j++)
376  {
377  prev_b = chain_b;
378  len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
379 
380  *n_bufs -= 1;
381  chain_bi0 = wrk->tx_buffers[*n_bufs];
382  _vec_len (wrk->tx_buffers) = *n_bufs;
383 
384  chain_b = vlib_get_buffer (vm, chain_bi0);
385  chain_b->current_data = 0;
386  data = vlib_buffer_get_current (chain_b);
387  if (peek_data)
388  {
389  n_bytes_read = svm_fifo_peek (ctx->s->server_tx_fifo,
390  ctx->tx_offset, len_to_deq, data);
391  ctx->tx_offset += n_bytes_read;
392  }
393  else
394  {
395  if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
396  {
397  svm_fifo_t *f = ctx->s->server_tx_fifo;
398  session_dgram_hdr_t *hdr = &ctx->hdr;
399  u16 deq_now;
400  deq_now = clib_min (hdr->data_length - hdr->data_offset,
401  len_to_deq);
402  n_bytes_read = svm_fifo_peek (f, hdr->data_offset, deq_now,
403  data);
404  ASSERT (n_bytes_read > 0);
405 
406  hdr->data_offset += n_bytes_read;
407  if (hdr->data_offset == hdr->data_length)
408  {
409  u32 offset = hdr->data_length + SESSION_CONN_HDR_LEN;
410  svm_fifo_dequeue_drop (f, offset);
411  }
412  }
413  else
414  n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->server_tx_fifo,
415  len_to_deq, data);
416  }
417  ASSERT (n_bytes_read == len_to_deq);
418  chain_b->current_length = n_bytes_read;
420 
421  /* update previous buffer */
422  prev_b->next_buffer = chain_bi0;
423  prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
424 
425  /* update current buffer */
426  chain_b->next_buffer = 0;
427 
428  to_deq -= n_bytes_read;
429  if (to_deq == 0)
430  break;
431  }
432  ASSERT (to_deq == 0
433  && b->total_length_not_including_first_buffer == left_from_seg);
434  ctx->left_to_snd -= left_from_seg;
435 }
436 
437 always_inline int
440  u32 thread_index, u16 * n_bufs, u32 wanted)
441 {
442  u32 n_alloc;
444  n_alloc = vlib_buffer_alloc (vm, &wrk->tx_buffers[*n_bufs],
445  wanted - *n_bufs);
446  *n_bufs += n_alloc;
447  _vec_len (wrk->tx_buffers) = *n_bufs;
448  return n_alloc;
449 }
450 
451 always_inline void
453  vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
454 {
455  u32 len_to_deq;
456  u8 *data0;
457  int n_bytes_read;
458 
459  /*
460  * Start with the first buffer in chain
461  */
462  b->error = 0;
463  b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
464  b->current_data = 0;
465 
467  len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
468 
469  if (peek_data)
470  {
471  n_bytes_read = svm_fifo_peek (ctx->s->server_tx_fifo, ctx->tx_offset,
472  len_to_deq, data0);
473  ASSERT (n_bytes_read > 0);
474  /* Keep track of progress locally, transport is also supposed to
475  * increment it independently when pushing the header */
476  ctx->tx_offset += n_bytes_read;
477  }
478  else
479  {
480  if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
481  {
482  session_dgram_hdr_t *hdr = &ctx->hdr;
483  svm_fifo_t *f = ctx->s->server_tx_fifo;
484  u16 deq_now;
485  u32 offset;
486 
487  ASSERT (hdr->data_length > hdr->data_offset);
488  deq_now = clib_min (hdr->data_length - hdr->data_offset,
489  len_to_deq);
490  offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
491  n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
492  ASSERT (n_bytes_read > 0);
493 
494  if (ctx->s->session_state == SESSION_STATE_LISTENING)
495  {
496  ip_copy (&ctx->tc->rmt_ip, &hdr->rmt_ip, ctx->tc->is_ip4);
497  ctx->tc->rmt_port = hdr->rmt_port;
498  }
499  hdr->data_offset += n_bytes_read;
500  if (hdr->data_offset == hdr->data_length)
501  {
502  offset = hdr->data_length + SESSION_CONN_HDR_LEN;
503  svm_fifo_dequeue_drop (f, offset);
504  }
505  }
506  else
507  {
508  n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->server_tx_fifo,
509  len_to_deq, data0);
510  ASSERT (n_bytes_read > 0);
511  }
512  }
513  b->current_length = n_bytes_read;
514  ctx->left_to_snd -= n_bytes_read;
515 
516  /*
517  * Fill in the remaining buffers in the chain, if any
518  */
519  if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
520  session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
521 
522  /* *INDENT-OFF* */
523  SESSION_EVT_DBG(SESSION_EVT_DEQ, ctx->s, ({
524  ed->data[0] = FIFO_EVENT_APP_TX;
525  ed->data[1] = ctx->max_dequeue;
526  ed->data[2] = len_to_deq;
527  ed->data[3] = ctx->left_to_snd;
528  }));
529  /* *INDENT-ON* */
530 }
531 
534 {
535  if (peek_data)
536  {
537  /* Can retransmit for closed sessions but can't send new data if
538  * session is not ready or closed */
539  if (s->session_state < SESSION_STATE_READY)
540  return 1;
541  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
542  return 2;
543  }
544  return 0;
545 }
546 
549 {
550  if (peek_data)
551  {
552  return ctx->transport_vft->get_connection (ctx->s->connection_index,
553  ctx->s->thread_index);
554  }
555  else
556  {
557  if (ctx->s->session_state == SESSION_STATE_LISTENING)
558  return ctx->transport_vft->get_listener (ctx->s->connection_index);
559  else
560  {
561  return ctx->transport_vft->get_connection (ctx->s->connection_index,
562  ctx->s->thread_index);
563  }
564  }
565 }
566 
567 always_inline void
569  u32 max_segs, u8 peek_data)
570 {
571  u32 n_bytes_per_buf, n_bytes_per_seg;
572  ctx->max_dequeue = svm_fifo_max_dequeue (ctx->s->server_tx_fifo);
573  if (peek_data)
574  {
575  /* Offset in rx fifo from where to peek data */
576  ctx->tx_offset = ctx->transport_vft->tx_fifo_offset (ctx->tc);
577  if (PREDICT_FALSE (ctx->tx_offset >= ctx->max_dequeue))
578  {
579  ctx->max_len_to_snd = 0;
580  return;
581  }
582  ctx->max_dequeue -= ctx->tx_offset;
583  }
584  else
585  {
586  if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
587  {
588  if (ctx->max_dequeue <= sizeof (ctx->hdr))
589  {
590  ctx->max_len_to_snd = 0;
591  return;
592  }
593  svm_fifo_peek (ctx->s->server_tx_fifo, 0, sizeof (ctx->hdr),
594  (u8 *) & ctx->hdr);
595  ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
596  ctx->max_dequeue = ctx->hdr.data_length - ctx->hdr.data_offset;
597  }
598  }
599  ASSERT (ctx->max_dequeue > 0);
600 
601  /* Ensure we're not writing more than transport window allows */
602  if (ctx->max_dequeue < ctx->snd_space)
603  {
604  /* Constrained by tx queue. Try to send only fully formed segments */
605  ctx->max_len_to_snd =
606  (ctx->max_dequeue > ctx->snd_mss) ?
607  ctx->max_dequeue - ctx->max_dequeue % ctx->snd_mss : ctx->max_dequeue;
608  /* TODO Nagle ? */
609  }
610  else
611  {
612  /* Expectation is that snd_space0 is already a multiple of snd_mss */
613  ctx->max_len_to_snd = ctx->snd_space;
614  }
615 
616  /* Check if we're tx constrained by the node */
617  ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->snd_mss);
618  if (ctx->n_segs_per_evt > max_segs)
619  {
620  ctx->n_segs_per_evt = max_segs;
621  ctx->max_len_to_snd = max_segs * ctx->snd_mss;
622  }
623 
624  n_bytes_per_buf = VLIB_BUFFER_DATA_SIZE;
625  ASSERT (n_bytes_per_buf > MAX_HDRS_LEN);
626  n_bytes_per_seg = MAX_HDRS_LEN + ctx->snd_mss;
627  ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
628  ctx->deq_per_buf = clib_min (ctx->snd_mss, n_bytes_per_buf);
629  ctx->deq_per_first_buf = clib_min (ctx->snd_mss,
630  n_bytes_per_buf - MAX_HDRS_LEN);
631 }
632 
633 always_inline int
636  session_event_t * e, int *n_tx_packets,
637  u8 peek_data)
638 {
639  u32 next_index, next0, next1, *to_next, n_left_to_next;
640  u32 n_trace = vlib_get_trace_count (vm, node), n_bufs_needed = 0;
641  u32 thread_index = vm->thread_index, n_left, pbi;
643  session_tx_context_t *ctx = &wrk->ctx;
645  vlib_buffer_t *pb;
646  u16 n_bufs, rv;
647 
648  if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
649  {
650  if (rv < 2)
651  vec_add1 (wrk->pending_event_vector, *e);
652  return SESSION_TX_NO_DATA;
653  }
654 
655  next_index = smm->session_type_to_next[ctx->s->session_type];
656  next0 = next1 = next_index;
657 
658  tp = session_get_transport_proto (ctx->s);
660  ctx->tc = session_tx_get_transport (ctx, peek_data);
661  ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
662 
663  if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
664  {
665  if (ctx->transport_vft->flush_data)
666  ctx->transport_vft->flush_data (ctx->tc);
667  }
668 
670  vm->clib_time.
671  last_cpu_time,
672  ctx->snd_mss);
673  if (ctx->snd_space == 0 || ctx->snd_mss == 0)
674  {
675  vec_add1 (wrk->pending_event_vector, *e);
676  return SESSION_TX_NO_DATA;
677  }
678 
679  /* Allow enqueuing of a new event */
680  svm_fifo_unset_event (ctx->s->server_tx_fifo);
681 
682  /* Check how much we can pull. */
683  session_tx_set_dequeue_params (vm, ctx, VLIB_FRAME_SIZE - *n_tx_packets,
684  peek_data);
685 
686  if (PREDICT_FALSE (!ctx->max_len_to_snd))
687  return SESSION_TX_NO_DATA;
688 
689  n_bufs = vec_len (wrk->tx_buffers);
690  n_bufs_needed = ctx->n_segs_per_evt * ctx->n_bufs_per_seg;
691 
692  /*
693  * Make sure we have at least one full frame of buffers ready
694  */
695  if (n_bufs < n_bufs_needed)
696  {
697  session_output_try_get_buffers (vm, wrk, thread_index, &n_bufs,
699  if (PREDICT_FALSE (n_bufs < n_bufs_needed))
700  {
701  vec_add1 (wrk->pending_event_vector, *e);
702  return SESSION_TX_NO_BUFFERS;
703  }
704  }
705 
706  /*
707  * Write until we fill up a frame
708  */
709  vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
710  if (PREDICT_FALSE (ctx->n_segs_per_evt > n_left_to_next))
711  {
712  ctx->n_segs_per_evt = n_left_to_next;
713  ctx->max_len_to_snd = ctx->snd_mss * n_left_to_next;
714  }
715  ctx->left_to_snd = ctx->max_len_to_snd;
716  n_left = ctx->n_segs_per_evt;
717 
718  while (n_left >= 4)
719  {
720  vlib_buffer_t *b0, *b1;
721  u32 bi0, bi1;
722 
723  pbi = wrk->tx_buffers[n_bufs - 3];
724  pb = vlib_get_buffer (vm, pbi);
725  vlib_prefetch_buffer_header (pb, STORE);
726  pbi = wrk->tx_buffers[n_bufs - 4];
727  pb = vlib_get_buffer (vm, pbi);
728  vlib_prefetch_buffer_header (pb, STORE);
729 
730  to_next[0] = bi0 = wrk->tx_buffers[--n_bufs];
731  to_next[1] = bi1 = wrk->tx_buffers[--n_bufs];
732 
733  b0 = vlib_get_buffer (vm, bi0);
734  b1 = vlib_get_buffer (vm, bi1);
735 
736  session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
737  session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data);
738 
739  ctx->transport_vft->push_header (ctx->tc, b0);
740  ctx->transport_vft->push_header (ctx->tc, b1);
741 
742  to_next += 2;
743  n_left_to_next -= 2;
744  n_left -= 2;
745 
748 
749  vlib_validate_buffer_enqueue_x2 (vm, node, next_index, to_next,
750  n_left_to_next, bi0, bi1, next0,
751  next1);
752  }
753  while (n_left)
754  {
755  vlib_buffer_t *b0;
756  u32 bi0;
757 
758  if (n_left > 1)
759  {
760  pbi = wrk->tx_buffers[n_bufs - 2];
761  pb = vlib_get_buffer (vm, pbi);
762  vlib_prefetch_buffer_header (pb, STORE);
763  }
764 
765  to_next[0] = bi0 = wrk->tx_buffers[--n_bufs];
766  b0 = vlib_get_buffer (vm, bi0);
767  session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
768 
769  /* Ask transport to push header after current_length and
770  * total_length_not_including_first_buffer are updated */
771  ctx->transport_vft->push_header (ctx->tc, b0);
772 
773  to_next += 1;
774  n_left_to_next -= 1;
775  n_left -= 1;
776 
778 
779  vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
780  n_left_to_next, bi0, next0);
781  }
782 
783  if (PREDICT_FALSE (n_trace > 0))
784  session_tx_trace_frame (vm, node, next_index, to_next,
785  ctx->n_segs_per_evt, ctx->s, n_trace);
786 
787  _vec_len (wrk->tx_buffers) = n_bufs;
788  *n_tx_packets += ctx->n_segs_per_evt;
790  vlib_put_next_frame (vm, node, next_index, n_left_to_next);
791 
792  /* If we couldn't dequeue all bytes mark as partially read */
793  ASSERT (ctx->left_to_snd == 0);
794  if (ctx->max_len_to_snd < ctx->max_dequeue)
795  if (svm_fifo_set_event (ctx->s->server_tx_fifo))
796  vec_add1 (wrk->pending_event_vector, *e);
797 
798  if (!peek_data && ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
799  {
800  /* Fix dgram pre header */
801  if (ctx->max_len_to_snd < ctx->max_dequeue)
802  svm_fifo_overwrite_head (ctx->s->server_tx_fifo, (u8 *) & ctx->hdr,
803  sizeof (session_dgram_pre_hdr_t));
804  /* More data needs to be read */
805  else if (svm_fifo_max_dequeue (ctx->s->server_tx_fifo) > 0)
806  if (svm_fifo_set_event (ctx->s->server_tx_fifo))
807  vec_add1 (wrk->pending_event_vector, *e);
808  }
809  return SESSION_TX_OK;
810 }
811 
812 int
815  session_event_t * e, int *n_tx_pkts)
816 {
817  return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 1);
818 }
819 
820 int
823  session_event_t * e, int *n_tx_pkts)
824 {
825  return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 0);
826 }
827 
828 int
830  vlib_node_runtime_t * node,
832  session_event_t * e, int *n_tx_pkts)
833 {
834  stream_session_t *s = wrk->ctx.s;
835  application_t *app;
836 
837  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
838  return 0;
839  app = application_get (s->t_app_index);
840  svm_fifo_unset_event (s->server_tx_fifo);
841  return app->cb_fns.builtin_app_tx_callback (s);
842 }
843 
845 session_event_get_session (session_event_t * e, u8 thread_index)
846 {
847  return session_get_if_valid (e->fifo->master_session_index, thread_index);
848 }
849 
850 static void
852  u32 thread_index)
853 {
854  if (wrk->last_tx_packets)
855  {
856  f64 sample = now - wrk->last_vlib_time;
857  wrk->dispatch_period = (wrk->dispatch_period + sample) * 0.5;
858  }
859  wrk->last_vlib_time = now;
860 }
861 
862 static uword
864  vlib_frame_t * frame)
865 {
867  u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
868  session_manager_worker_t *wrk = &smm->wrk[thread_index];
869  session_event_t *e, *fifo_events;
870  svm_msg_q_msg_t _msg, *msg = &_msg;
871  f64 now = vlib_time_now (vm);
872  int n_tx_packets = 0, i, rv;
873  app_worker_t *app_wrk;
874  application_t *app;
875  svm_msg_q_t *mq;
876  void (*fp) (void *);
877 
878  SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index);
879 
880  /*
881  * Update transport time
882  */
883  session_update_dispatch_period (wrk, now, thread_index);
884  transport_update_time (now, thread_index);
885 
886  SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
887 
888  /* Make sure postponed events are handled first */
889  fifo_events = wrk->free_event_vector;
890  vec_append (fifo_events, wrk->postponed_event_vector);
891  _vec_len (wrk->postponed_event_vector) = 0;
892 
893  /* Try to dequeue what is available. Don't wait for lock.
894  * XXX: we may need priorities here */
895  mq = wrk->vpp_event_queue;
896  n_to_dequeue = svm_msg_q_size (mq);
897  if (n_to_dequeue && svm_msg_q_try_lock (mq) == 0)
898  {
899  for (i = 0; i < n_to_dequeue; i++)
900  {
901  vec_add2 (fifo_events, e, 1);
902  svm_msg_q_sub_w_lock (mq, msg);
903  /* Works because reply messages are smaller than a session evt.
904  * If we ever need to support bigger messages this needs to be
905  * fixed */
906  clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), sizeof (*e));
907  svm_msg_q_free_msg (mq, msg);
908  }
909  svm_msg_q_unlock (mq);
910  }
911 
912  vec_append (fifo_events, wrk->pending_event_vector);
913  vec_append (fifo_events, wrk->pending_disconnects);
914 
915  _vec_len (wrk->pending_event_vector) = 0;
916  _vec_len (wrk->pending_disconnects) = 0;
917 
918  n_events = vec_len (fifo_events);
919  if (PREDICT_FALSE (!n_events))
920  return 0;
921 
922  for (i = 0; i < n_events; i++)
923  {
924  stream_session_t *s; /* $$$ prefetch 1 ahead maybe */
925  session_event_t *e;
926  u8 need_tx_ntf;
927 
928  e = &fifo_events[i];
929  switch (e->event_type)
930  {
932  case FIFO_EVENT_APP_TX:
933  /* Don't try to send more that one frame per dispatch cycle */
934  if (n_tx_packets == VLIB_FRAME_SIZE)
935  {
936  vec_add1 (wrk->postponed_event_vector, *e);
937  break;
938  }
939 
940  s = session_event_get_session (e, thread_index);
941  if (PREDICT_FALSE (!s))
942  {
943  clib_warning ("session was freed!");
944  continue;
945  }
946  wrk->ctx.s = s;
947  /* Spray packets in per session type frames, since they go to
948  * different nodes */
949  rv = (smm->session_tx_fns[s->session_type]) (vm, node, wrk, e,
950  &n_tx_packets);
951  if (PREDICT_TRUE (rv == SESSION_TX_OK))
952  {
953  need_tx_ntf = svm_fifo_needs_tx_ntf (s->server_tx_fifo,
954  wrk->ctx.max_len_to_snd);
955  if (PREDICT_FALSE (need_tx_ntf))
957  }
958  else if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
959  {
961  SESSION_QUEUE_ERROR_NO_BUFFER, 1);
962  continue;
963  }
964  break;
966  s = session_get_from_handle_if_valid (e->session_handle);
967  if (PREDICT_FALSE (!s))
968  break;
969 
970  /* Make sure session disconnects run after the pending list is
971  * drained, i.e., postpone if the first time. If not the first
972  * and the tx queue is still not empty, try to wait for some
973  * dispatch cycles */
974  if (!e->postponed
975  || (e->postponed < 200
976  && svm_fifo_max_dequeue (s->server_tx_fifo)))
977  {
978  e->postponed += 1;
979  vec_add1 (wrk->pending_disconnects, *e);
980  continue;
981  }
982 
984  break;
986  s = session_event_get_session (e, thread_index);
987  if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
988  continue;
989  svm_fifo_unset_event (s->server_rx_fifo);
990  app_wrk = app_worker_get (s->app_wrk_index);
991  app = application_get (app_wrk->app_index);
992  app->cb_fns.builtin_app_rx_callback (s);
993  break;
995  s = session_get_from_handle_if_valid (e->session_handle);
996  wrk->ctx.s = s;
997  if (PREDICT_TRUE (s != 0))
998  session_tx_fifo_dequeue_internal (vm, node, wrk, e,
999  &n_tx_packets);
1000  break;
1001  case FIFO_EVENT_RPC:
1002  fp = e->rpc_args.fp;
1003  (*fp) (e->rpc_args.arg);
1004  break;
1007  break;
1010  break;
1012  break;
1015  break;
1018  break;
1021  break;
1022  default:
1023  clib_warning ("unhandled event type %d", e->event_type);
1024  }
1025  }
1026 
1027  _vec_len (fifo_events) = 0;
1028  wrk->free_event_vector = fifo_events;
1029  wrk->last_tx_packets = n_tx_packets;
1030 
1032  SESSION_QUEUE_ERROR_TX, n_tx_packets);
1033 
1034  SESSION_EVT_DBG (SESSION_EVT_DISPATCH_END, smm, thread_index);
1035 
1036  return n_tx_packets;
1037 }
1038 
1039 /* *INDENT-OFF* */
1041 {
1042  .function = session_queue_node_fn,
1043  .name = "session-queue",
1044  .format_trace = format_session_queue_trace,
1045  .type = VLIB_NODE_TYPE_INPUT,
1046  .n_errors = ARRAY_LEN (session_queue_error_strings),
1047  .error_strings = session_queue_error_strings,
1048  .state = VLIB_NODE_STATE_DISABLED,
1049 };
1050 /* *INDENT-ON* */
1051 
1052 void
1054 {
1057  u32 my_thread_index = vm->thread_index;
1058  session_event_t _e, *e = &_e;
1059  svm_msg_q_ring_t *ring;
1060  stream_session_t *s0;
1061  svm_msg_q_msg_t *msg;
1062  svm_msg_q_t *mq;
1063  int i, index;
1064 
1065  mq = smm->wrk[my_thread_index].vpp_event_queue;
1066  index = mq->q->head;
1067 
1068  for (i = 0; i < mq->q->cursize; i++)
1069  {
1070  msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
1071  ring = svm_msg_q_ring (mq, msg->ring_index);
1072  clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
1073 
1074  switch (e->event_type)
1075  {
1076  case FIFO_EVENT_APP_TX:
1077  s0 = session_event_get_session (e, my_thread_index);
1078  fformat (stdout, "[%04d] TX session %d\n", i, s0->session_index);
1079  break;
1080 
1081  case FIFO_EVENT_DISCONNECT:
1082  s0 = session_get_from_handle (e->session_handle);
1083  fformat (stdout, "[%04d] disconnect session %d\n", i,
1084  s0->session_index);
1085  break;
1086 
1087  case FIFO_EVENT_BUILTIN_RX:
1088  s0 = session_event_get_session (e, my_thread_index);
1089  fformat (stdout, "[%04d] builtin_rx %d\n", i, s0->session_index);
1090  break;
1091 
1092  case FIFO_EVENT_RPC:
1093  fformat (stdout, "[%04d] RPC call %llx with %llx\n",
1094  i, (u64) (uword) (e->rpc_args.fp),
1095  (u64) (uword) (e->rpc_args.arg));
1096  break;
1097 
1098  default:
1099  fformat (stdout, "[%04d] unhandled event type %d\n",
1100  i, e->event_type);
1101  break;
1102  }
1103 
1104  index++;
1105 
1106  if (index == mq->q->maxsize)
1107  index = 0;
1108  }
1109 }
1110 
1111 static u8
1112 session_node_cmp_event (session_event_t * e, svm_fifo_t * f)
1113 {
1114  stream_session_t *s;
1115  switch (e->event_type)
1116  {
1117  case FIFO_EVENT_APP_RX:
1118  case FIFO_EVENT_APP_TX:
1119  case FIFO_EVENT_BUILTIN_RX:
1120  if (e->fifo == f)
1121  return 1;
1122  break;
1123  case FIFO_EVENT_DISCONNECT:
1124  break;
1125  case FIFO_EVENT_RPC:
1126  s = session_get_from_handle (e->session_handle);
1127  if (!s)
1128  {
1129  clib_warning ("session has event but doesn't exist!");
1130  break;
1131  }
1132  if (s->server_rx_fifo == f || s->server_tx_fifo == f)
1133  return 1;
1134  break;
1135  default:
1136  break;
1137  }
1138  return 0;
1139 }
1140 
1141 u8
1142 session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
1143 {
1144  session_event_t *pending_event_vector, *evt;
1146  int i, index, found = 0;
1147  svm_msg_q_msg_t *msg;
1148  svm_msg_q_ring_t *ring;
1149  svm_msg_q_t *mq;
1150  u8 thread_index;
1151 
1152  ASSERT (e);
1153  thread_index = f->master_thread_index;
1154  wrk = session_manager_get_worker (thread_index);
1155 
1156  /*
1157  * Search evt queue
1158  */
1159  mq = wrk->vpp_event_queue;
1160  index = mq->q->head;
1161  for (i = 0; i < mq->q->cursize; i++)
1162  {
1163  msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
1164  ring = svm_msg_q_ring (mq, msg->ring_index);
1165  clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
1166  found = session_node_cmp_event (e, f);
1167  if (found)
1168  return 1;
1169  if (++index == mq->q->maxsize)
1170  index = 0;
1171  }
1172  /*
1173  * Search pending events vector
1174  */
1175  pending_event_vector = wrk->pending_event_vector;
1176  vec_foreach (evt, pending_event_vector)
1177  {
1178  found = session_node_cmp_event (evt, f);
1179  if (found)
1180  {
1181  clib_memcpy_fast (e, evt, sizeof (*evt));
1182  break;
1183  }
1184  }
1185  return found;
1186 }
1187 
1188 static clib_error_t *
1190 {
1191  if (vec_len (vlib_mains) < 2)
1192  return 0;
1193 
1194  /*
1195  * Shut off (especially) worker-thread session nodes.
1196  * Otherwise, vpp can crash as the main thread unmaps the
1197  * API segment.
1198  */
1200  session_node_enable_disable (0 /* is_enable */ );
1202  return 0;
1203 }
1204 
1206 
1207 static uword
1209  vlib_frame_t * f)
1210 {
1211  f64 now, timeout = 1.0;
1212  uword *event_data = 0;
1213  uword event_type;
1214 
1215  while (1)
1216  {
1218  now = vlib_time_now (vm);
1219  event_type = vlib_process_get_events (vm, (uword **) & event_data);
1220 
1221  switch (event_type)
1222  {
1224  /* Flush the frames by updating all transports times */
1225  transport_update_time (now, 0);
1226  break;
1228  timeout = 100000.0;
1229  break;
1230  case ~0:
1231  /* Timed out. Update time for all transports to trigger all
1232  * outstanding retransmits. */
1233  transport_update_time (now, 0);
1234  break;
1235  }
1236  vec_reset_length (event_data);
1237  }
1238  return 0;
1239 }
1240 
1241 /* *INDENT-OFF* */
1243 {
1244  .function = session_queue_process,
1245  .type = VLIB_NODE_TYPE_PROCESS,
1246  .name = "session-queue-process",
1247  .state = VLIB_NODE_STATE_DISABLED,
1248 };
1249 /* *INDENT-ON* */
1250 
1251 
1252 /*
1253  * fd.io coding-style-patch-verification: ON
1254  *
1255  * Local Variables:
1256  * eval: (c-set-style "gnu")
1257  * End:
1258  */
static int session_output_try_get_buffers(vlib_main_t *vm, session_manager_worker_t *wrk, u32 thread_index, u16 *n_bufs, u32 wanted)
Definition: session_node.c:438
f64 dispatch_period
Our approximation of a "complete" dispatch loop period.
Definition: session.h:190
app_worker_t * app_worker_get(u32 wrk_index)
Definition: application.c:517
static u64 session_segment_handle(stream_session_t *s)
Definition: session.h:446
vlib_main_t vlib_global_main
Definition: main.c:1850
static void session_tx_trace_frame(vlib_main_t *vm, vlib_node_runtime_t *node, u32 next_index, u32 *to_next, u16 n_segs, stream_session_t *s, u32 n_trace)
Definition: session_node.c:338
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
#define clib_min(x, y)
Definition: clib.h:295
#define CLIB_UNUSED(x)
Definition: clib.h:82
void ip_copy(ip46_address_t *dst, ip46_address_t *src, u8 is_ip4)
Definition: ip.c:81
#define SESSION_Q_PROCESS_FLUSH_FRAMES
Definition: session.h:303
static u32 vlib_get_trace_count(vlib_main_t *vm, vlib_node_runtime_t *rt)
Definition: trace_funcs.h:156
static f64 vlib_process_wait_for_event_or_clock(vlib_main_t *vm, f64 dt)
Suspend a cooperative multi-tasking thread Waits for an event, or for the indicated number of seconds...
Definition: node_funcs.h:673
a
Definition: bitmap.h:538
#define SESSION_CONN_HDR_LEN
Definition: session.h:153
struct _transport_connection transport_connection_t
#define PREDICT_TRUE(x)
Definition: clib.h:112
unsigned long u64
Definition: types.h:89
#define clib_memcpy_fast(a, b, c)
Definition: string.h:81
session_manager_main_t session_manager_main
Definition: session.c:27
clib_memset(h->entries, 0, sizeof(h->entries[0]) *entries)
static f64 vlib_time_now(vlib_main_t *vm)
Definition: main.h:232
u32 thread_index
Definition: main.h:179
int app_worker_own_session(app_worker_t *app_wrk, stream_session_t *s)
Definition: application.c:728
local_session_t * application_get_local_session_from_handle(session_handle_t handle)
Definition: application.c:1453
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
Definition: vec.h:525
application_t * application_lookup(u32 api_client_index)
Definition: application.c:181
session_event_t * pending_event_vector
Vector of active event vectors.
Definition: session.h:211
void svm_fifo_overwrite_head(svm_fifo_t *f, u8 *data, u32 len)
Definition: svm_fifo.c:607
u32 wrk_map_index
Worker index in app&#39;s map pool.
Definition: application.h:68
#define vec_add2(V, P, N)
Add N elements to end of vector V, return pointer to new elements in P.
Definition: vec.h:564
int i
session_event_t * postponed_event_vector
Vector of postponed events.
Definition: session.h:217
static clib_error_t * session_queue_exit(vlib_main_t *vm)
static uword session_queue_node_fn(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
Definition: session_node.c:863
static void session_parse_handle(session_handle_t handle, u32 *index, u32 *thread_index)
Definition: session.h:382
u8 * format(u8 *s, const char *fmt,...)
Definition: format.c:419
clib_time_t clib_time
Definition: main.h:65
#define vec_validate_aligned(V, I, A)
Make sure vector is long enough for given index (no header, specified alignment)
Definition: vec.h:450
static stream_session_t * session_get_from_handle(session_handle_t handle)
Definition: session.h:390
void session_node_enable_disable(u8 is_en)
Definition: session.c:1495
vlib_main_t ** vlib_mains
Definition: buffer.c:310
unsigned char u8
Definition: types.h:56
app_worker_t * application_get_worker(application_t *app, u32 wrk_map_index)
Definition: application.c:456
static void session_tx_fill_buffer(vlib_main_t *vm, session_tx_context_t *ctx, vlib_buffer_t *b, u16 *n_bufs, u8 peek_data)
Definition: session_node.c:452
void transport_update_time(f64 time_now, u8 thread_index)
Definition: transport.c:625
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
double f64
Definition: types.h:142
static stream_session_t * session_event_get_session(session_event_t *e, u8 thread_index)
Definition: session_node.c:845
session_event_t * pending_disconnects
Vector of postponed disconnects.
Definition: session.h:214
#define vlib_worker_thread_barrier_sync(X)
Definition: threads.h:204
struct _svm_fifo svm_fifo_t
int session_tx_fifo_peek_and_snd(vlib_main_t *vm, vlib_node_runtime_t *node, session_manager_worker_t *wrk, session_event_t *e, int *n_tx_pkts)
Definition: session_node.c:813
static void vlib_trace_buffer(vlib_main_t *vm, vlib_node_runtime_t *r, u32 next_index, vlib_buffer_t *b, int follow_chain)
Definition: trace_funcs.h:114
int svm_msg_q_lock_and_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index, u8 noblock, svm_msg_q_msg_t *msg)
Lock message queue and allocate message buffer on ring.
stream_session_state_t
static int svm_fifo_is_empty(svm_fifo_t *f)
Definition: svm_fifo.h:136
void dump_thread_0_event_queue(void)
i16 current_data
signed offset in data[], pre_data[] that we are currently processing.
Definition: buffer.h:110
static uword vlib_process_get_events(vlib_main_t *vm, uword **data_vector)
Return the first event type which has occurred and a vector of per-event data of that type...
Definition: node_funcs.h:516
struct _vnet_disconnect_args_t vnet_disconnect_args_t
u8 session_node_lookup_fifo_event(svm_fifo_t *f, session_event_t *e)
#define always_inline
Definition: clib.h:98
int application_local_session_connect_notify(local_session_t *ls)
Definition: application.c:1763
static u32 svm_fifo_max_dequeue(svm_fifo_t *f)
Definition: svm_fifo.h:124
int session_dequeue_notify(stream_session_t *s)
Definition: session.c:552
#define vlib_prefetch_buffer_header(b, type)
Prefetch buffer metadata.
Definition: buffer.h:188
stream_session_t * s
Definition: session.h:161
unsigned int u32
Definition: types.h:88
struct _stream_session_t stream_session_t
int session_send_io_evt_to_thread(svm_fifo_t *f, session_evt_type_t evt_type)
Definition: session.c:88
int session_tx_fifo_dequeue_and_snd(vlib_main_t *vm, vlib_node_runtime_t *node, session_manager_worker_t *wrk, session_event_t *e, int *n_tx_pkts)
Definition: session_node.c:821
#define VLIB_FRAME_SIZE
Definition: node.h:401
static char * session_queue_error_strings[]
Definition: session_node.c:324
transport_proto_vft_t * transport_protocol_get_vft(transport_proto_t transport_proto)
Get transport virtual function table.
Definition: transport.c:256
session_event_t * free_event_vector
Vector of partially read events.
Definition: session.h:208
static u8 svm_fifo_needs_tx_ntf(svm_fifo_t *f, u32 n_last_deq)
Definition: svm_fifo.h:279
static transport_proto_t session_get_transport_proto(stream_session_t *s)
Definition: session.h:427
f64 last_vlib_time
vlib_time_now last time around the track
Definition: session.h:193
static session_manager_main_t * vnet_get_session_manager_main()
Definition: session.h:310
u16 current_length
Nbytes between current data and the end of this buffer.
Definition: buffer.h:114
static void * vlib_buffer_make_headroom(vlib_buffer_t *b, u8 size)
Make head room, typically for packet headers.
Definition: buffer.h:335
#define MAX_HDRS_LEN
Definition: session.h:31
session_queue_error_t
Definition: session_node.c:316
transport_proto_vft_t * transport_vft
Definition: session.h:162
long ctx[MAX_CONNS]
Definition: main.c:144
static u8 session_handle_is_local(session_handle_t handle)
Definition: session.h:407
unsigned short u16
Definition: types.h:57
static void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
Definition: buffer.h:214
static uword session_queue_process(vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f)
#define PREDICT_FALSE(x)
Definition: clib.h:111
static u8 session_node_cmp_event(session_event_t *e, svm_fifo_t *f)
svm_msg_q_ring_t * svm_msg_q_ring(svm_msg_q_t *mq, u32 ring_index)
Get message queue ring.
Definition: message_queue.c:27
static void svm_fifo_unset_event(svm_fifo_t *f)
Unsets fifo event flag.
Definition: svm_fifo.h:180
u32 wrk_index
Worker index in global worker pool.
Definition: application.h:65
static stream_session_t * session_get_if_valid(u64 si, u32 thread_index)
Definition: session.h:349
u32 node_index
Node index.
Definition: node.h:518
#define vlib_validate_buffer_enqueue_x2(vm, node, next_index, to_next, n_left_to_next, bi0, bi1, next0, next1)
Finish enqueueing two buffers forward in the graph.
Definition: buffer_node.h:70
#define vlib_validate_buffer_enqueue_x1(vm, node, next_index, to_next, n_left_to_next, bi0, next0)
Finish enqueueing one buffer forward in the graph.
Definition: buffer_node.h:218
#define vlib_get_next_frame(vm, node, next_index, vectors, n_vectors_left)
Get pointer to next frame vector data by (vlib_node_runtime_t, next_index).
Definition: node_funcs.h:338
volatile u8 session_state
State.
vlib_error_t error
Error code for buffers to be enqueued to error handler.
Definition: buffer.h:139
word fformat(FILE *f, char *fmt,...)
Definition: format.c:453
static void vlib_node_increment_counter(vlib_main_t *vm, u32 node_index, u32 counter_index, u64 increment)
Definition: node_funcs.h:1150
static u8 svm_fifo_set_event(svm_fifo_t *f)
Sets fifo event flag.
Definition: svm_fifo.h:167
The fine-grained event logger allows lightweight, thread-safe event logging at minimum cost...
#define SESSION_EVT_DBG(_evt, _args...)
session_tx_context_t ctx
Context for session tx.
Definition: session.h:202
#define VLIB_REGISTER_NODE(x,...)
Definition: node.h:169
static void session_tx_fifo_chain_tail(vlib_main_t *vm, session_tx_context_t *ctx, vlib_buffer_t *b, u16 *n_bufs, u8 peek_data)
Definition: session_node.c:358
static session_manager_worker_t * session_manager_get_worker(u32 thread_index)
Definition: session.h:316
vlib_main_t * vm
Definition: buffer.c:301
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
#define VLIB_MAIN_LOOP_EXIT_FUNCTION(x)
Definition: init.h:168
#define clib_warning(format, args...)
Definition: error.h:59
svm_queue_t * q
queue for exchanging messages
Definition: message_queue.h:39
int svm_msg_q_add(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, int nowait)
Producer enqueue one message to queue.
int vnet_disconnect_session(vnet_disconnect_args_t *a)
#define ARRAY_LEN(x)
Definition: clib.h:62
void vlib_put_next_frame(vlib_main_t *vm, vlib_node_runtime_t *r, u32 next_index, u32 n_vectors_left)
Release pointer to next frame vector data.
Definition: main.c:452
blocking call - best used in combination with condvars, for eventfds we don&#39;t yield the cpu ...
Definition: queue.h:42
u32 app_wrk_index
Server index.
void transport_connection_update_tx_stats(transport_connection_t *tc, u32 bytes)
Update tx byte stats for transport connection.
Definition: transport.c:603
void session_transport_close(stream_session_t *s)
Notify transport the session can be disconnected.
Definition: session.c:1165
static int svm_msg_q_try_lock(svm_msg_q_t *mq)
Try locking message queue.
#define VLIB_BUFFER_DATA_SIZE
Definition: buffer.h:51
application_t * application_get(u32 app_index)
Definition: application.c:213
int app_worker_lock_and_send_event(app_worker_t *app, stream_session_t *s, u8 evt_type)
Send event to application.
Definition: application.c:1418
static stream_session_t * session_get_from_handle_if_valid(session_handle_t handle)
Definition: session.h:399
#define SESSION_Q_PROCESS_STOP
Definition: session.h:304
u32 ring_index
ring index, could be u8
Definition: message_queue.h:62
session_fifo_rx_fn ** session_tx_fns
Per transport rx function that can either dequeue or peek.
Definition: session.h:249
#define foreach_session_queue_error
Definition: session_node.c:311
#define ASSERT(truth)
static void session_mq_disconnected_reply_handler(void *data)
Definition: session_node.c:185
u32 * tx_buffers
Vector of tx buffer free lists.
Definition: session.h:205
u32 next_buffer
Next buffer for this linked-list of buffers.
Definition: buffer.h:130
int session_tx_fifo_dequeue_internal(vlib_main_t *vm, vlib_node_runtime_t *node, session_manager_worker_t *wrk, session_event_t *e, int *n_tx_pkts)
Definition: session_node.c:829
session_cb_vft_t cb_fns
Callbacks: shoulder-taps for the server/client.
Definition: application.h:130
u32 * session_type_to_next
Per session type output nodes.
Definition: session.h:254
#define vec_append(v1, v2)
Append v2 after v1.
Definition: vec.h:822
u32 transport_connection_snd_space(transport_connection_t *tc, u64 time_now, u16 mss)
Get maximum tx burst allowed for transport connection.
Definition: transport.c:585
static uword pointer_to_uword(const void *p)
Definition: types.h:131
static u8 * format_session_queue_trace(u8 *s, va_list *args)
Definition: session_node.c:300
session_dgram_hdr_t hdr
Definition: session.h:176
static void * vlib_add_trace(vlib_main_t *vm, vlib_node_runtime_t *r, vlib_buffer_t *b, u32 n_data_bytes)
Definition: trace_funcs.h:57
static int session_tx_fifo_read_and_snd_i(vlib_main_t *vm, vlib_node_runtime_t *node, session_manager_worker_t *wrk, session_event_t *e, int *n_tx_packets, u8 peek_data)
Definition: session_node.c:634
u32 total_length_not_including_first_buffer
Only valid for first buffer in chain.
Definition: buffer.h:156
static void session_tx_set_dequeue_params(vlib_main_t *vm, session_tx_context_t *ctx, u32 max_segs, u8 peek_data)
Definition: session_node.c:568
u32 app_index
App index in app pool.
Definition: application.h:124
struct _vlib_node_registration vlib_node_registration_t
template key/value backing page structure
Definition: bihash_doc.h:44
int svm_fifo_dequeue_drop(svm_fifo_t *f, u32 max_bytes)
Definition: svm_fifo.c:734
static u8 session_tx_not_ready(stream_session_t *s, u8 peek_data)
Definition: session_node.c:533
session_manager_worker_t * wrk
Worker contexts.
Definition: session.h:240
void svm_msg_q_free_msg(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Free message buffer.
enum _transport_proto transport_proto_t
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
svm_msg_q_t * vpp_event_queue
vpp event message queue for worker
Definition: session.h:187
#define VLIB_BUFFER_TRACE_TRAJECTORY_INIT(b)
Definition: buffer.h:495
u64 uword
Definition: types.h:112
static void session_mq_worker_update_handler(void *data)
Definition: session_node.c:211
static void session_mq_accepted_reply_handler(void *data)
Definition: session_node.c:28
vlib_node_registration_t session_queue_node
(constructor) VLIB_REGISTER_NODE (session_queue_node)
Definition: session_node.c:290
u32 elsize
size of an element
Definition: message_queue.h:33
static void session_update_dispatch_period(session_manager_worker_t *wrk, f64 now, u32 thread_index)
Definition: session_node.c:851
struct clib_bihash_value offset
template key/value backing page structure
static void session_mq_disconnected_handler(void *data)
Definition: session_node.c:139
static transport_connection_t * session_tx_get_transport(session_tx_context_t *ctx, u8 peek_data)
Definition: session_node.c:548
u32 app_index
Index of owning app.
Definition: application.h:71
transport_connection_t * tc
Definition: session.h:163
void vlib_worker_thread_barrier_release(vlib_main_t *vm)
Definition: threads.c:1470
static u32 svm_msg_q_size(svm_msg_q_t *mq)
Check length of message queue.
#define vec_foreach(var, vec)
Vector iterator.
static void vlib_set_trace_count(vlib_main_t *vm, vlib_node_runtime_t *rt, u32 count)
Definition: trace_funcs.h:172
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:59
u32 flags
buffer flags: VLIB_BUFFER_FREE_LIST_INDEX_MASK: bits used to store free list index, VLIB_BUFFER_IS_TRACED: trace this buffer.
Definition: buffer.h:117
static u32 vlib_buffer_alloc(vlib_main_t *vm, u32 *buffers, u32 n_buffers)
Allocate buffers into supplied array.
Definition: buffer_funcs.h:485
vlib_node_registration_t session_queue_process_node
(constructor) VLIB_REGISTER_NODE (session_queue_process_node)
svm_msg_q_t * event_queue
Application listens for events on this svm queue.
Definition: application.h:74
int svm_fifo_peek(svm_fifo_t *f, u32 relative_offset, u32 max_bytes, u8 *copy_here)
Definition: svm_fifo.c:726
int svm_fifo_dequeue_nowait(svm_fifo_t *f, u32 max_bytes, u8 *copy_here)
Definition: svm_fifo.c:678
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
Definition: buffer_funcs.h:62
static void session_mq_reset_reply_handler(void *data)
Definition: session_node.c:95
void svm_msg_q_sub_w_lock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Consumer dequeue one message from queue with mutex held.