FD.io VPP  v18.11-rc0-18-g2a3fb1a
Vector Packet Processing
session_node.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <math.h>
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vppinfra/elog.h>
20 #include <vnet/session/transport.h>
21 #include <vnet/session/session.h>
25 #include <svm/queue.h>
26 
27 static void
29 {
31  vnet_disconnect_args_t _a = { 0 }, *a = &_a;
32  local_session_t *ls;
34 
35  /* Server isn't interested, kill the session */
36  if (mp->retval)
37  {
38  a->app_index = mp->context;
39  a->handle = mp->handle;
41  return;
42  }
43 
45  {
47  if (!ls || ls->app_index != mp->context)
48  {
49  clib_warning ("server %u doesn't own local handle %llu",
50  mp->context, mp->handle);
51  return;
52  }
54  return;
56  }
57  else
58  {
60  if (!s)
61  {
62  clib_warning ("session doesn't exist");
63  return;
64  }
65  if (s->app_index != mp->context)
66  {
67  clib_warning ("app doesn't own session");
68  return;
69  }
70  s->session_state = SESSION_STATE_READY;
71  }
72 }
73 
74 static void
76 {
78  application_t *app;
80  u32 index, thread_index;
81 
82  mp = (session_reset_reply_msg_t *) data;
83  app = application_lookup (mp->client_index);
84  if (!app)
85  return;
86 
87  session_parse_handle (mp->handle, &index, &thread_index);
88  s = session_get_if_valid (index, thread_index);
89  if (s == 0 || app->index != s->app_index)
90  {
91  clib_warning ("Invalid session!");
92  return;
93  }
94 
95  /* Client objected to resetting the session, log and continue */
96  if (mp->retval)
97  {
98  clib_warning ("client retval %d", mp->retval);
99  return;
100  }
101 
102  /* This comes as a response to a reset, transport only waiting for
103  * confirmation to remove connection state, no need to disconnect */
105 }
106 
107 static void
109 {
111  vnet_disconnect_args_t _a, *a = &_a;
112  svm_msg_q_msg_t _msg, *msg = &_msg;
114  session_event_t *evt;
115  application_t *app;
116  int rv = 0;
117 
118  mp = (session_disconnected_msg_t *) data;
119  app = application_lookup (mp->client_index);
120  if (app)
121  {
122  a->handle = mp->handle;
123  a->app_index = app->index;
124  rv = vnet_disconnect_session (a);
125  }
126  else
127  {
128  rv = VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
129  }
130 
131  svm_msg_q_lock_and_alloc_msg_w_ring (app->event_queue,
133  SVM_Q_WAIT, msg);
134  svm_msg_q_unlock (app->event_queue);
135  evt = svm_msg_q_msg_data (app->event_queue, msg);
136  memset (evt, 0, sizeof (*evt));
137  evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
138  rmp = (session_disconnected_reply_msg_t *) evt->data;
139  rmp->handle = mp->handle;
140  rmp->context = mp->context;
141  rmp->retval = rv;
142  svm_msg_q_add (app->event_queue, msg, SVM_Q_WAIT);
143 }
144 
145 static void
147 {
149  vnet_disconnect_args_t _a, *a = &_a;
150  application_t *app;
151 
152  mp = (session_disconnected_reply_msg_t *) data;
153 
154  /* Client objected to disconnecting the session, log and continue */
155  if (mp->retval)
156  {
157  clib_warning ("client retval %d", mp->retval);
158  return;
159  }
160 
161  /* Disconnect has been confirmed. Confirm close to transport */
162  app = application_lookup (mp->context);
163  if (app)
164  {
165  a->handle = mp->handle;
166  a->app_index = app->index;
168  }
169 }
170 
172 
173 typedef struct
174 {
178 
179 /* packet trace format function */
180 static u8 *
181 format_session_queue_trace (u8 * s, va_list * args)
182 {
183  CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
184  CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
185  session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
186 
187  s = format (s, "SESSION_QUEUE: session index %d, server thread index %d",
189  return s;
190 }
191 
192 #define foreach_session_queue_error \
193 _(TX, "Packets transmitted") \
194 _(TIMER, "Timer events") \
195 _(NO_BUFFER, "Out of buffers")
196 
197 typedef enum
198 {
199 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
201 #undef _
204 
205 static char *session_queue_error_strings[] = {
206 #define _(sym,string) string,
208 #undef _
209 };
210 
211 enum
212 {
216 };
217 
218 static void
220  u32 next_index, u32 * to_next, u16 n_segs,
221  stream_session_t * s, u32 n_trace)
222 {
224  vlib_buffer_t *b;
225  int i;
226 
227  for (i = 0; i < clib_min (n_trace, n_segs); i++)
228  {
229  b = vlib_get_buffer (vm, to_next[i - n_segs]);
230  vlib_trace_buffer (vm, node, next_index, b, 1 /* follow_chain */ );
231  t = vlib_add_trace (vm, node, b, sizeof (*t));
232  t->session_index = s->session_index;
233  t->server_thread_index = s->thread_index;
234  }
235  vlib_set_trace_count (vm, node, n_trace - i);
236 }
237 
238 always_inline void
240  vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
241 {
243  vlib_buffer_t *chain_b, *prev_b;
244  u32 chain_bi0, to_deq, left_from_seg;
245  u16 len_to_deq, n_bytes_read;
246  u8 *data, j;
247 
248  b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
250 
251  chain_b = b;
252  left_from_seg = clib_min (ctx->snd_mss - b->current_length,
253  ctx->left_to_snd);
254  to_deq = left_from_seg;
255  for (j = 1; j < ctx->n_bufs_per_seg; j++)
256  {
257  prev_b = chain_b;
258  len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
259 
260  *n_bufs -= 1;
261  chain_bi0 = smm->tx_buffers[ctx->s->thread_index][*n_bufs];
262  _vec_len (smm->tx_buffers[ctx->s->thread_index]) = *n_bufs;
263 
264  chain_b = vlib_get_buffer (vm, chain_bi0);
265  chain_b->current_data = 0;
266  data = vlib_buffer_get_current (chain_b);
267  if (peek_data)
268  {
269  n_bytes_read = svm_fifo_peek (ctx->s->server_tx_fifo,
270  ctx->tx_offset, len_to_deq, data);
271  ctx->tx_offset += n_bytes_read;
272  }
273  else
274  {
275  if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
276  {
277  svm_fifo_t *f = ctx->s->server_tx_fifo;
278  session_dgram_hdr_t *hdr = &ctx->hdr;
279  u16 deq_now;
280  deq_now = clib_min (hdr->data_length - hdr->data_offset,
281  len_to_deq);
282  n_bytes_read = svm_fifo_peek (f, hdr->data_offset, deq_now,
283  data);
284  ASSERT (n_bytes_read > 0);
285 
286  hdr->data_offset += n_bytes_read;
287  if (hdr->data_offset == hdr->data_length)
288  svm_fifo_dequeue_drop (f, hdr->data_length);
289  }
290  else
291  n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->server_tx_fifo,
292  len_to_deq, data);
293  }
294  ASSERT (n_bytes_read == len_to_deq);
295  chain_b->current_length = n_bytes_read;
297 
298  /* update previous buffer */
299  prev_b->next_buffer = chain_bi0;
300  prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
301 
302  /* update current buffer */
303  chain_b->next_buffer = 0;
304 
305  to_deq -= n_bytes_read;
306  if (to_deq == 0)
307  break;
308  }
309  ASSERT (to_deq == 0
310  && b->total_length_not_including_first_buffer == left_from_seg);
311  ctx->left_to_snd -= left_from_seg;
312 }
313 
314 always_inline int
317  u32 thread_index, u16 * n_bufs, u32 wanted)
318 {
319  u32 n_alloc;
320  vec_validate_aligned (smm->tx_buffers[thread_index], wanted - 1,
322  n_alloc = vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][*n_bufs],
323  wanted - *n_bufs);
324  *n_bufs += n_alloc;
325  _vec_len (smm->tx_buffers[thread_index]) = *n_bufs;
326  return n_alloc;
327 }
328 
329 always_inline void
331  vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
332 {
333  u32 len_to_deq;
334  u8 *data0;
335  int n_bytes_read;
336 
337  /*
338  * Start with the first buffer in chain
339  */
340  b->error = 0;
341  b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
342  b->current_data = 0;
343 
345  len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
346 
347  if (peek_data)
348  {
349  n_bytes_read = svm_fifo_peek (ctx->s->server_tx_fifo, ctx->tx_offset,
350  len_to_deq, data0);
351  ASSERT (n_bytes_read > 0);
352  /* Keep track of progress locally, transport is also supposed to
353  * increment it independently when pushing the header */
354  ctx->tx_offset += n_bytes_read;
355  }
356  else
357  {
358  if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
359  {
360  session_dgram_hdr_t *hdr = &ctx->hdr;
361  svm_fifo_t *f = ctx->s->server_tx_fifo;
362  u16 deq_now;
363  u32 offset;
364 
365  ASSERT (hdr->data_length > hdr->data_offset);
366  deq_now = clib_min (hdr->data_length - hdr->data_offset,
367  len_to_deq);
368  offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
369  n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
370  ASSERT (n_bytes_read > 0);
371 
372  if (ctx->s->session_state == SESSION_STATE_LISTENING)
373  {
374  ip_copy (&ctx->tc->rmt_ip, &hdr->rmt_ip, ctx->tc->is_ip4);
375  ctx->tc->rmt_port = hdr->rmt_port;
376  }
377  hdr->data_offset += n_bytes_read;
378  if (hdr->data_offset == hdr->data_length)
379  {
380  offset = hdr->data_length + SESSION_CONN_HDR_LEN;
381  svm_fifo_dequeue_drop (f, offset);
382  }
383  }
384  else
385  {
386  n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->server_tx_fifo,
387  len_to_deq, data0);
388  ASSERT (n_bytes_read > 0);
389  }
390  }
391  b->current_length = n_bytes_read;
392  ctx->left_to_snd -= n_bytes_read;
393 
394  /*
395  * Fill in the remaining buffers in the chain, if any
396  */
397  if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
398  session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
399 
400  /* *INDENT-OFF* */
401  SESSION_EVT_DBG(SESSION_EVT_DEQ, ctx->s, ({
402  ed->data[0] = FIFO_EVENT_APP_TX;
403  ed->data[1] = ctx->max_dequeue;
404  ed->data[2] = len_to_deq;
405  ed->data[3] = ctx->left_to_snd;
406  }));
407  /* *INDENT-ON* */
408 }
409 
412 {
413  if (peek_data)
414  {
415  /* Can retransmit for closed sessions but can't send new data if
416  * session is not ready or closed */
417  if (s->session_state < SESSION_STATE_READY)
418  return 1;
419  if (s->session_state == SESSION_STATE_CLOSED)
420  return 2;
421  }
422  return 0;
423 }
424 
427 {
428  if (peek_data)
429  {
430  return ctx->transport_vft->get_connection (ctx->s->connection_index,
431  ctx->s->thread_index);
432  }
433  else
434  {
435  if (ctx->s->session_state == SESSION_STATE_LISTENING)
436  return ctx->transport_vft->get_listener (ctx->s->connection_index);
437  else
438  {
439  return ctx->transport_vft->get_connection (ctx->s->connection_index,
440  ctx->s->thread_index);
441  }
442  }
443 }
444 
445 always_inline void
447  u32 max_segs, u8 peek_data)
448 {
449  u32 n_bytes_per_buf, n_bytes_per_seg;
450  ctx->max_dequeue = svm_fifo_max_dequeue (ctx->s->server_tx_fifo);
451  if (peek_data)
452  {
453  /* Offset in rx fifo from where to peek data */
454  ctx->tx_offset = ctx->transport_vft->tx_fifo_offset (ctx->tc);
455  if (PREDICT_FALSE (ctx->tx_offset >= ctx->max_dequeue))
456  {
457  ctx->max_len_to_snd = 0;
458  return;
459  }
460  ctx->max_dequeue -= ctx->tx_offset;
461  }
462  else
463  {
464  if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
465  {
466  if (ctx->max_dequeue <= sizeof (ctx->hdr))
467  {
468  ctx->max_len_to_snd = 0;
469  return;
470  }
471  svm_fifo_peek (ctx->s->server_tx_fifo, 0, sizeof (ctx->hdr),
472  (u8 *) & ctx->hdr);
473  ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
474  ctx->max_dequeue = ctx->hdr.data_length - ctx->hdr.data_offset;
475  }
476  }
477  ASSERT (ctx->max_dequeue > 0);
478 
479  /* Ensure we're not writing more than transport window allows */
480  if (ctx->max_dequeue < ctx->snd_space)
481  {
482  /* Constrained by tx queue. Try to send only fully formed segments */
483  ctx->max_len_to_snd =
484  (ctx->max_dequeue > ctx->snd_mss) ?
485  ctx->max_dequeue - ctx->max_dequeue % ctx->snd_mss : ctx->max_dequeue;
486  /* TODO Nagle ? */
487  }
488  else
489  {
490  /* Expectation is that snd_space0 is already a multiple of snd_mss */
491  ctx->max_len_to_snd = ctx->snd_space;
492  }
493 
494  /* Check if we're tx constrained by the node */
495  ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->snd_mss);
496  if (ctx->n_segs_per_evt > max_segs)
497  {
498  ctx->n_segs_per_evt = max_segs;
499  ctx->max_len_to_snd = max_segs * ctx->snd_mss;
500  }
501 
502  n_bytes_per_buf = vlib_buffer_free_list_buffer_size (vm,
504  ASSERT (n_bytes_per_buf > MAX_HDRS_LEN);
505  n_bytes_per_seg = MAX_HDRS_LEN + ctx->snd_mss;
506  ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
507  ctx->deq_per_buf = clib_min (ctx->snd_mss, n_bytes_per_buf);
508  ctx->deq_per_first_buf = clib_min (ctx->snd_mss,
509  n_bytes_per_buf - MAX_HDRS_LEN);
510 }
511 
512 always_inline int
514  session_event_t * e,
515  stream_session_t * s, int *n_tx_packets,
516  u8 peek_data)
517 {
518  u32 next_index, next0, next1, *to_next, n_left_to_next;
519  u32 n_trace = vlib_get_trace_count (vm, node), n_bufs_needed = 0;
520  u32 thread_index = s->thread_index, n_left, pbi;
522  session_tx_context_t *ctx = &smm->ctx[thread_index];
524  vlib_buffer_t *pb;
525  u16 n_bufs, rv;
526 
527  if (PREDICT_FALSE ((rv = session_tx_not_ready (s, peek_data))))
528  {
529  if (rv < 2)
530  vec_add1 (smm->pending_event_vector[thread_index], *e);
531  return SESSION_TX_NO_DATA;
532  }
533 
534  next_index = smm->session_type_to_next[s->session_type];
535  next0 = next1 = next_index;
536 
538  ctx->s = s;
540  ctx->tc = session_tx_get_transport (ctx, peek_data);
541  ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
542  ctx->snd_space = ctx->transport_vft->send_space (ctx->tc);
543  if (ctx->snd_space == 0 || ctx->snd_mss == 0)
544  {
545  vec_add1 (smm->pending_event_vector[thread_index], *e);
546  return SESSION_TX_NO_DATA;
547  }
548 
549  /* Allow enqueuing of a new event */
550  svm_fifo_unset_event (s->server_tx_fifo);
551 
552  /* Check how much we can pull. */
553  session_tx_set_dequeue_params (vm, ctx, VLIB_FRAME_SIZE - *n_tx_packets,
554  peek_data);
555 
556  if (PREDICT_FALSE (!ctx->max_len_to_snd))
557  return SESSION_TX_NO_DATA;
558 
559  n_bufs = vec_len (smm->tx_buffers[thread_index]);
560  n_bufs_needed = ctx->n_segs_per_evt * ctx->n_bufs_per_seg;
561 
562  /*
563  * Make sure we have at least one full frame of buffers ready
564  */
565  if (n_bufs < n_bufs_needed)
566  {
567  session_output_try_get_buffers (vm, smm, thread_index, &n_bufs,
569  if (PREDICT_FALSE (n_bufs < n_bufs_needed))
570  {
571  vec_add1 (smm->pending_event_vector[thread_index], *e);
572  return SESSION_TX_NO_BUFFERS;
573  }
574  }
575 
576  /*
577  * Write until we fill up a frame
578  */
579  vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
580  if (PREDICT_FALSE (ctx->n_segs_per_evt > n_left_to_next))
581  {
582  ctx->n_segs_per_evt = n_left_to_next;
583  ctx->max_len_to_snd = ctx->snd_mss * n_left_to_next;
584  }
585  ctx->left_to_snd = ctx->max_len_to_snd;
586  n_left = ctx->n_segs_per_evt;
587 
588  while (n_left >= 4)
589  {
590  vlib_buffer_t *b0, *b1;
591  u32 bi0, bi1;
592 
593  pbi = smm->tx_buffers[thread_index][n_bufs - 3];
594  pb = vlib_get_buffer (vm, pbi);
595  vlib_prefetch_buffer_header (pb, STORE);
596  pbi = smm->tx_buffers[thread_index][n_bufs - 4];
597  pb = vlib_get_buffer (vm, pbi);
598  vlib_prefetch_buffer_header (pb, STORE);
599 
600  to_next[0] = bi0 = smm->tx_buffers[thread_index][--n_bufs];
601  to_next[1] = bi1 = smm->tx_buffers[thread_index][--n_bufs];
602 
603  b0 = vlib_get_buffer (vm, bi0);
604  b1 = vlib_get_buffer (vm, bi1);
605 
606  session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
607  session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data);
608 
609  ctx->transport_vft->push_header (ctx->tc, b0);
610  ctx->transport_vft->push_header (ctx->tc, b1);
611 
612  to_next += 2;
613  n_left_to_next -= 2;
614  n_left -= 2;
615 
618 
619  vlib_validate_buffer_enqueue_x2 (vm, node, next_index, to_next,
620  n_left_to_next, bi0, bi1, next0,
621  next1);
622  }
623  while (n_left)
624  {
625  vlib_buffer_t *b0;
626  u32 bi0;
627 
628  if (n_left > 1)
629  {
630  pbi = smm->tx_buffers[thread_index][n_bufs - 2];
631  pb = vlib_get_buffer (vm, pbi);
632  vlib_prefetch_buffer_header (pb, STORE);
633  }
634 
635  to_next[0] = bi0 = smm->tx_buffers[thread_index][--n_bufs];
636  b0 = vlib_get_buffer (vm, bi0);
637  session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
638 
639  /* Ask transport to push header after current_length and
640  * total_length_not_including_first_buffer are updated */
641  ctx->transport_vft->push_header (ctx->tc, b0);
642 
643  to_next += 1;
644  n_left_to_next -= 1;
645  n_left -= 1;
646 
648 
649  vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
650  n_left_to_next, bi0, next0);
651  }
652 
653  if (PREDICT_FALSE (n_trace > 0))
654  session_tx_trace_frame (vm, node, next_index, to_next,
655  ctx->n_segs_per_evt, s, n_trace);
656 
657  _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
658  *n_tx_packets += ctx->n_segs_per_evt;
659  vlib_put_next_frame (vm, node, next_index, n_left_to_next);
660 
661  /* If we couldn't dequeue all bytes mark as partially read */
662  ASSERT (ctx->left_to_snd == 0);
663  if (ctx->max_len_to_snd < ctx->max_dequeue)
664  if (svm_fifo_set_event (s->server_tx_fifo))
665  vec_add1 (smm->pending_event_vector[thread_index], *e);
666 
667  if (!peek_data && ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
668  {
669  /* Fix dgram pre header */
670  if (ctx->max_len_to_snd < ctx->max_dequeue)
671  svm_fifo_overwrite_head (s->server_tx_fifo, (u8 *) & ctx->hdr,
672  sizeof (session_dgram_pre_hdr_t));
673  /* More data needs to be read */
674  else if (svm_fifo_max_dequeue (s->server_tx_fifo) > 0)
675  if (svm_fifo_set_event (s->server_tx_fifo))
676  vec_add1 (smm->pending_event_vector[thread_index], *e);
677  }
678  return SESSION_TX_OK;
679 }
680 
681 int
683  session_event_t * e,
684  stream_session_t * s, int *n_tx_pkts)
685 {
686  return session_tx_fifo_read_and_snd_i (vm, node, e, s, n_tx_pkts, 1);
687 }
688 
689 int
691  session_event_t * e,
692  stream_session_t * s, int *n_tx_pkts)
693 {
694  return session_tx_fifo_read_and_snd_i (vm, node, e, s, n_tx_pkts, 0);
695 }
696 
697 int
699  vlib_node_runtime_t * node,
700  session_event_t * e,
701  stream_session_t * s, int *n_tx_pkts)
702 {
703  application_t *app;
704  app = application_get (s->opaque);
705  svm_fifo_unset_event (s->server_tx_fifo);
706  return app->cb_fns.builtin_app_tx_callback (s);
707 }
708 
710 session_event_get_session (session_event_t * e, u8 thread_index)
711 {
712  return session_get_if_valid (e->fifo->master_session_index, thread_index);
713 }
714 
715 static uword
717  vlib_frame_t * frame)
718 {
720  u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
721  session_event_t *pending_events, *e;
722  session_event_t *fifo_events;
723  svm_msg_q_msg_t _msg, *msg = &_msg;
724  f64 now = vlib_time_now (vm);
725  int n_tx_packets = 0, i, rv;
726  application_t *app;
727  svm_msg_q_t *mq;
728  void (*fp) (void *);
729 
730  SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index);
731 
732  /*
733  * Update transport time
734  */
735  transport_update_time (now, thread_index);
736 
737  /*
738  * Get vpp queue events that we can dequeue without blocking
739  */
740  mq = smm->vpp_event_queues[thread_index];
741  fifo_events = smm->free_event_vector[thread_index];
742  n_to_dequeue = svm_msg_q_size (mq);
743  pending_events = smm->pending_event_vector[thread_index];
744 
745  if (!n_to_dequeue && !vec_len (pending_events)
746  && !vec_len (smm->pending_disconnects[thread_index]))
747  return 0;
748 
749  SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
750 
751  /*
752  * If we didn't manage to process previous events try going
753  * over them again without dequeuing new ones.
754  * XXX: Handle senders to sessions that can't keep up
755  */
756  if (0 && vec_len (pending_events) >= 100)
757  {
758  clib_warning ("too many fifo events unsolved");
759  goto skip_dequeue;
760  }
761 
762  /* See you in the next life, don't be late
763  * XXX: we may need priorities here */
764  if (svm_msg_q_try_lock (mq))
765  return 0;
766 
767  for (i = 0; i < n_to_dequeue; i++)
768  {
769  vec_add2 (fifo_events, e, 1);
770  svm_msg_q_sub_w_lock (mq, msg);
771  clib_memcpy (e, svm_msg_q_msg_data (mq, msg), sizeof (*e));
772  svm_msg_q_free_msg (mq, msg);
773  }
774 
775  svm_msg_q_unlock (mq);
776 
777  vec_append (fifo_events, pending_events);
778  vec_append (fifo_events, smm->pending_disconnects[thread_index]);
779 
780  _vec_len (pending_events) = 0;
781  smm->pending_event_vector[thread_index] = pending_events;
782  _vec_len (smm->pending_disconnects[thread_index]) = 0;
783 
784 skip_dequeue:
785  n_events = vec_len (fifo_events);
786  for (i = 0; i < n_events; i++)
787  {
788  stream_session_t *s; /* $$$ prefetch 1 ahead maybe */
789  session_event_t *e;
790  u32 to_dequeue;
791 
792  e = &fifo_events[i];
793  switch (e->event_type)
794  {
795  case FIFO_EVENT_APP_TX:
796  /* Don't try to send more that one frame per dispatch cycle */
797  if (n_tx_packets == VLIB_FRAME_SIZE)
798  {
799  vec_add1 (smm->pending_event_vector[thread_index], *e);
800  break;
801  }
802 
803  s = session_event_get_session (e, thread_index);
804  if (PREDICT_FALSE (!s))
805  {
806  clib_warning ("It's dead, Jim!");
807  continue;
808  }
809  to_dequeue = svm_fifo_max_dequeue (s->server_tx_fifo);
810 
811  /* Spray packets in per session type frames, since they go to
812  * different nodes */
813  rv = (smm->session_tx_fns[s->session_type]) (vm, node, e, s,
814  &n_tx_packets);
815  if (PREDICT_TRUE (rv == SESSION_TX_OK))
816  {
817  /* Notify app there's tx space if not polling */
818  if (PREDICT_FALSE (to_dequeue == s->server_tx_fifo->nitems
819  && !svm_fifo_has_event (s->server_tx_fifo)))
821  }
822  else if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
823  {
825  SESSION_QUEUE_ERROR_NO_BUFFER, 1);
826  continue;
827  }
828  break;
830  /* Make sure stream disconnects run after the pending list is
831  * drained */
832  s = session_get_from_handle (e->session_handle);
833  if (!e->postponed)
834  {
835  e->postponed = 1;
836  vec_add1 (smm->pending_disconnects[thread_index], *e);
837  continue;
838  }
839  /* If tx queue is still not empty, wait */
840  if (svm_fifo_max_dequeue (s->server_tx_fifo))
841  {
842  vec_add1 (smm->pending_disconnects[thread_index], *e);
843  continue;
844  }
845 
847  break;
849  s = session_event_get_session (e, thread_index);
850  if (PREDICT_FALSE (!s))
851  continue;
852  svm_fifo_unset_event (s->server_rx_fifo);
853  app = application_get (s->app_index);
854  app->cb_fns.builtin_app_rx_callback (s);
855  break;
856  case FIFO_EVENT_RPC:
857  fp = e->rpc_args.fp;
858  (*fp) (e->rpc_args.arg);
859  break;
862  break;
865  break;
867  break;
870  break;
873  break;
874  default:
875  clib_warning ("unhandled event type %d", e->event_type);
876  }
877  }
878 
879  _vec_len (fifo_events) = 0;
880  smm->free_event_vector[thread_index] = fifo_events;
881 
883  SESSION_QUEUE_ERROR_TX, n_tx_packets);
884 
885  SESSION_EVT_DBG (SESSION_EVT_DISPATCH_END, smm, thread_index);
886 
887  return n_tx_packets;
888 }
889 
890 /* *INDENT-OFF* */
892 {
893  .function = session_queue_node_fn,
894  .name = "session-queue",
895  .format_trace = format_session_queue_trace,
896  .type = VLIB_NODE_TYPE_INPUT,
898  .error_strings = session_queue_error_strings,
899  .state = VLIB_NODE_STATE_DISABLED,
900 };
901 /* *INDENT-ON* */
902 
903 void
905 {
908  u32 my_thread_index = vm->thread_index;
909  session_event_t _e, *e = &_e;
910  svm_msg_q_ring_t *ring;
911  stream_session_t *s0;
912  svm_msg_q_msg_t *msg;
913  svm_msg_q_t *mq;
914  int i, index;
915 
916  mq = smm->vpp_event_queues[my_thread_index];
917  index = mq->q->head;
918 
919  for (i = 0; i < mq->q->cursize; i++)
920  {
921  msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
922  ring = svm_msg_q_ring (mq, msg->ring_index);
923  clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
924 
925  switch (e->event_type)
926  {
927  case FIFO_EVENT_APP_TX:
928  s0 = session_event_get_session (e, my_thread_index);
929  fformat (stdout, "[%04d] TX session %d\n", i, s0->session_index);
930  break;
931 
933  s0 = session_get_from_handle (e->session_handle);
934  fformat (stdout, "[%04d] disconnect session %d\n", i,
935  s0->session_index);
936  break;
937 
939  s0 = session_event_get_session (e, my_thread_index);
940  fformat (stdout, "[%04d] builtin_rx %d\n", i, s0->session_index);
941  break;
942 
943  case FIFO_EVENT_RPC:
944  fformat (stdout, "[%04d] RPC call %llx with %llx\n",
945  i, (u64) (e->rpc_args.fp), (u64) (e->rpc_args.arg));
946  break;
947 
948  default:
949  fformat (stdout, "[%04d] unhandled event type %d\n",
950  i, e->event_type);
951  break;
952  }
953 
954  index++;
955 
956  if (index == mq->q->maxsize)
957  index = 0;
958  }
959 }
960 
961 static u8
962 session_node_cmp_event (session_event_t * e, svm_fifo_t * f)
963 {
964  stream_session_t *s;
965  switch (e->event_type)
966  {
967  case FIFO_EVENT_APP_RX:
968  case FIFO_EVENT_APP_TX:
970  if (e->fifo == f)
971  return 1;
972  break;
974  break;
975  case FIFO_EVENT_RPC:
976  s = session_get_from_handle (e->session_handle);
977  if (!s)
978  {
979  clib_warning ("session has event but doesn't exist!");
980  break;
981  }
982  if (s->server_rx_fifo == f || s->server_tx_fifo == f)
983  return 1;
984  break;
985  default:
986  break;
987  }
988  return 0;
989 }
990 
991 u8
992 session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
993 {
995  svm_msg_q_t *mq;
996  session_event_t *pending_event_vector, *evt;
997  int i, index, found = 0;
998  svm_msg_q_msg_t *msg;
999  svm_msg_q_ring_t *ring;
1000  u8 thread_index;
1001 
1002  ASSERT (e);
1003  thread_index = f->master_thread_index;
1004  /*
1005  * Search evt queue
1006  */
1007  mq = smm->vpp_event_queues[thread_index];
1008  index = mq->q->head;
1009  for (i = 0; i < mq->q->cursize; i++)
1010  {
1011  msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
1012  ring = svm_msg_q_ring (mq, msg->ring_index);
1013  clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
1014  found = session_node_cmp_event (e, f);
1015  if (found)
1016  return 1;
1017  if (++index == mq->q->maxsize)
1018  index = 0;
1019  }
1020  /*
1021  * Search pending events vector
1022  */
1023  pending_event_vector = smm->pending_event_vector[thread_index];
1024  vec_foreach (evt, pending_event_vector)
1025  {
1026  found = session_node_cmp_event (evt, f);
1027  if (found)
1028  {
1029  clib_memcpy (e, evt, sizeof (*evt));
1030  break;
1031  }
1032  }
1033  return found;
1034 }
1035 
1036 static clib_error_t *
1038 {
1039  if (vec_len (vlib_mains) < 2)
1040  return 0;
1041 
1042  /*
1043  * Shut off (especially) worker-thread session nodes.
1044  * Otherwise, vpp can crash as the main thread unmaps the
1045  * API segment.
1046  */
1048  session_node_enable_disable (0 /* is_enable */ );
1050  return 0;
1051 }
1052 
1054 
1055 static uword
1057  vlib_frame_t * f)
1058 {
1059  f64 now, timeout = 1.0;
1060  uword *event_data = 0;
1061  uword event_type;
1062 
1063  while (1)
1064  {
1066  now = vlib_time_now (vm);
1067  event_type = vlib_process_get_events (vm, (uword **) & event_data);
1068 
1069  switch (event_type)
1070  {
1072  /* Flush the frames by updating all transports times */
1073  transport_update_time (now, 0);
1074  break;
1076  timeout = 100000.0;
1077  break;
1078  case ~0:
1079  /* Timed out. Update time for all transports to trigger all
1080  * outstanding retransmits. */
1081  transport_update_time (now, 0);
1082  break;
1083  }
1084  vec_reset_length (event_data);
1085  }
1086  return 0;
1087 }
1088 
1089 /* *INDENT-OFF* */
1091 {
1092  .function = session_queue_process,
1093  .type = VLIB_NODE_TYPE_PROCESS,
1094  .name = "session-queue-process",
1095  .state = VLIB_NODE_STATE_DISABLED,
1096 };
1097 /* *INDENT-ON* */
1098 
1099 
1100 /*
1101  * fd.io coding-style-patch-verification: ON
1102  *
1103  * Local Variables:
1104  * eval: (c-set-style "gnu")
1105  * End:
1106  */
vlib_main_t vlib_global_main
Definition: main.c:1644
static void session_tx_trace_frame(vlib_main_t *vm, vlib_node_runtime_t *node, u32 next_index, u32 *to_next, u16 n_segs, stream_session_t *s, u32 n_trace)
Definition: session_node.c:219
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
#define clib_min(x, y)
Definition: clib.h:289
#define CLIB_UNUSED(x)
Definition: clib.h:79
void ip_copy(ip46_address_t *dst, ip46_address_t *src, u8 is_ip4)
Definition: ip.c:81
#define SESSION_Q_PROCESS_FLUSH_FRAMES
Definition: session.h:278
static u32 vlib_get_trace_count(vlib_main_t *vm, vlib_node_runtime_t *rt)
Definition: trace_funcs.h:143
static f64 vlib_process_wait_for_event_or_clock(vlib_main_t *vm, f64 dt)
Suspend a cooperative multi-tasking thread Waits for an event, or for the indicated number of seconds...
Definition: node_funcs.h:699
a
Definition: bitmap.h:538
#define SESSION_CONN_HDR_LEN
Definition: session.h:145
struct _transport_connection transport_connection_t
static int session_output_try_get_buffers(vlib_main_t *vm, session_manager_main_t *smm, u32 thread_index, u16 *n_bufs, u32 wanted)
Definition: session_node.c:315
#define PREDICT_TRUE(x)
Definition: clib.h:106
unsigned long u64
Definition: types.h:89
session_manager_main_t session_manager_main
Definition: session.c:27
static f64 vlib_time_now(vlib_main_t *vm)
Definition: main.h:225
u32 thread_index
Definition: main.h:176
local_session_t * application_get_local_session_from_handle(session_handle_t handle)
Definition: application.c:972
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
Definition: vec.h:523
application_t * application_lookup(u32 api_client_index)
Definition: application.c:136
void svm_fifo_overwrite_head(svm_fifo_t *f, u8 *data, u32 len)
Definition: svm_fifo.c:623
#define vec_add2(V, P, N)
Add N elements to end of vector V, return pointer to new elements in P.
Definition: vec.h:562
int i
static clib_error_t * session_queue_exit(vlib_main_t *vm)
static uword session_queue_node_fn(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
Definition: session_node.c:716
static void session_parse_handle(session_handle_t handle, u32 *index, u32 *thread_index)
Definition: session.h:347
u8 * format(u8 *s, const char *fmt,...)
Definition: format.c:419
#define vec_validate_aligned(V, I, A)
Make sure vector is long enough for given index (no header, specified alignment)
Definition: vec.h:448
static stream_session_t * session_get_from_handle(session_handle_t handle)
Definition: session.h:355
void session_node_enable_disable(u8 is_en)
Definition: session.c:1447
vlib_main_t ** vlib_mains
Definition: buffer.c:303
unsigned char u8
Definition: types.h:56
static void session_tx_fill_buffer(vlib_main_t *vm, session_tx_context_t *ctx, vlib_buffer_t *b, u16 *n_bufs, u8 peek_data)
Definition: session_node.c:330
void transport_update_time(f64 time_now, u8 thread_index)
Definition: transport.c:380
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
double f64
Definition: types.h:142
static stream_session_t * session_event_get_session(session_event_t *e, u8 thread_index)
Definition: session_node.c:710
#define vlib_worker_thread_barrier_sync(X)
Definition: threads.h:212
struct _svm_fifo svm_fifo_t
static void vlib_trace_buffer(vlib_main_t *vm, vlib_node_runtime_t *r, u32 next_index, vlib_buffer_t *b, int follow_chain)
Definition: trace_funcs.h:104
int svm_msg_q_lock_and_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index, u8 noblock, svm_msg_q_msg_t *msg)
Lock message queue and allocate message buffer on ring.
Definition: message_queue.c:99
void dump_thread_0_event_queue(void)
Definition: session_node.c:904
i16 current_data
signed offset in data[], pre_data[] that we are currently processing.
Definition: buffer.h:104
static uword vlib_process_get_events(vlib_main_t *vm, uword **data_vector)
Return the first event type which has occurred and a vector of per-event data of that type...
Definition: node_funcs.h:542
struct _vnet_disconnect_args_t vnet_disconnect_args_t
u8 session_node_lookup_fifo_event(svm_fifo_t *f, session_event_t *e)
Definition: session_node.c:992
#define always_inline
Definition: clib.h:92
int application_local_session_connect_notify(local_session_t *ls)
Definition: application.c:1183
static u32 svm_fifo_max_dequeue(svm_fifo_t *f)
Definition: svm_fifo.h:105
int session_dequeue_notify(stream_session_t *s)
Definition: session.c:514
#define vlib_prefetch_buffer_header(b, type)
Prefetch buffer metadata.
Definition: buffer.h:184
stream_session_t * s
Definition: session.h:153
void stream_session_cleanup(stream_session_t *s)
Cleanup transport and session state.
Definition: session.c:1150
unsigned int u32
Definition: types.h:88
struct _stream_session_t stream_session_t
#define VLIB_FRAME_SIZE
Definition: node.h:364
static char * session_queue_error_strings[]
Definition: session_node.c:205
transport_proto_vft_t * transport_protocol_get_vft(transport_proto_t transport_proto)
Get transport virtual function table.
Definition: transport.c:191
static int session_tx_fifo_read_and_snd_i(vlib_main_t *vm, vlib_node_runtime_t *node, session_event_t *e, stream_session_t *s, int *n_tx_packets, u8 peek_data)
Definition: session_node.c:513
static transport_proto_t session_get_transport_proto(stream_session_t *s)
Definition: session.h:392
static session_manager_main_t * vnet_get_session_manager_main()
Definition: session.h:285
int session_tx_fifo_dequeue_internal(vlib_main_t *vm, vlib_node_runtime_t *node, session_event_t *e, stream_session_t *s, int *n_tx_pkts)
Definition: session_node.c:698
u16 current_length
Nbytes between current data and the end of this buffer.
Definition: buffer.h:108
static void * vlib_buffer_make_headroom(vlib_buffer_t *b, u8 size)
Make head room, typically for packet headers.
Definition: buffer.h:314
#define MAX_HDRS_LEN
Definition: session.h:31
session_queue_error_t
Definition: session_node.c:197
transport_proto_vft_t * transport_vft
Definition: session.h:154
static u8 session_handle_is_local(session_handle_t handle)
Definition: session.h:372
unsigned short u16
Definition: types.h:57
static void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
Definition: buffer.h:202
#define VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX
Definition: buffer.h:439
static uword session_queue_process(vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f)
#define PREDICT_FALSE(x)
Definition: clib.h:105
static u8 session_node_cmp_event(session_event_t *e, svm_fifo_t *f)
Definition: session_node.c:962
svm_msg_q_ring_t * svm_msg_q_ring(svm_msg_q_t *mq, u32 ring_index)
Get message queue ring.
Definition: message_queue.c:26
static void svm_fifo_unset_event(svm_fifo_t *f)
Unsets fifo event flag.
Definition: svm_fifo.h:150
static stream_session_t * session_get_if_valid(u64 si, u32 thread_index)
Definition: session.h:316
u32 node_index
Node index.
Definition: node.h:473
#define vlib_validate_buffer_enqueue_x2(vm, node, next_index, to_next, n_left_to_next, bi0, bi1, next0, next1)
Finish enqueueing two buffers forward in the graph.
Definition: buffer_node.h:70
int session_tx_fifo_peek_and_snd(vlib_main_t *vm, vlib_node_runtime_t *node, session_event_t *e, stream_session_t *s, int *n_tx_pkts)
Definition: session_node.c:682
struct _session_manager_main session_manager_main_t
Definition: session.h:172
#define vlib_validate_buffer_enqueue_x1(vm, node, next_index, to_next, n_left_to_next, bi0, next0)
Finish enqueueing one buffer forward in the graph.
Definition: buffer_node.h:218
#define vlib_get_next_frame(vm, node, next_index, vectors, n_vectors_left)
Get pointer to next frame vector data by (vlib_node_runtime_t, next_index).
Definition: node_funcs.h:364
volatile u8 session_state
State.
vlib_error_t error
Error code for buffers to be enqueued to error handler.
Definition: buffer.h:135
word fformat(FILE *f, char *fmt,...)
Definition: format.c:453
static void vlib_node_increment_counter(vlib_main_t *vm, u32 node_index, u32 counter_index, u64 increment)
Definition: node_funcs.h:1168
static u8 svm_fifo_set_event(svm_fifo_t *f)
Sets fifo event flag.
Definition: svm_fifo.h:140
The fine-grained event logger allows lightweight, thread-safe event logging at minimum cost...
#define SESSION_EVT_DBG(_evt, _args...)
#define VLIB_REGISTER_NODE(x,...)
Definition: node.h:153
static void session_tx_fifo_chain_tail(vlib_main_t *vm, session_tx_context_t *ctx, vlib_buffer_t *b, u16 *n_bufs, u8 peek_data)
Definition: session_node.c:239
vlib_main_t * vm
Definition: buffer.c:294
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
#define VLIB_MAIN_LOOP_EXIT_FUNCTION(x)
Definition: init.h:161
#define clib_warning(format, args...)
Definition: error.h:59
svm_queue_t * q
queue for exchanging messages
Definition: message_queue.h:39
#define clib_memcpy(a, b, c)
Definition: string.h:75
int svm_msg_q_add(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, int nowait)
Producer enqueue one message to queue.
struct _application application_t
int vnet_disconnect_session(vnet_disconnect_args_t *a)
#define ARRAY_LEN(x)
Definition: clib.h:59
void vlib_put_next_frame(vlib_main_t *vm, vlib_node_runtime_t *r, u32 next_index, u32 n_vectors_left)
Release pointer to next frame vector data.
Definition: main.c:454
blocking call
Definition: queue.h:44
static int svm_msg_q_try_lock(svm_msg_q_t *mq)
Try locking message queue.
static stream_session_t * session_get_from_handle_if_valid(session_handle_t handle)
Definition: session.h:364
#define SESSION_Q_PROCESS_STOP
Definition: session.h:279
u32 ring_index
ring index, could be u8
Definition: message_queue.h:62
#define foreach_session_queue_error
Definition: session_node.c:192
#define ASSERT(truth)
static void session_mq_disconnected_reply_handler(void *data)
Definition: session_node.c:146
int session_tx_fifo_dequeue_and_snd(vlib_main_t *vm, vlib_node_runtime_t *node, session_event_t *e, stream_session_t *s, int *n_tx_pkts)
Definition: session_node.c:690
long ctx[MAX_CONNS]
Definition: main.c:126
u32 next_buffer
Next buffer for this linked-list of buffers.
Definition: buffer.h:126
#define vec_append(v1, v2)
Append v2 after v1.
Definition: vec.h:820
static u8 * format_session_queue_trace(u8 *s, va_list *args)
Definition: session_node.c:181
session_dgram_hdr_t hdr
Definition: session.h:168
static void * vlib_add_trace(vlib_main_t *vm, vlib_node_runtime_t *r, vlib_buffer_t *b, u32 n_data_bytes)
Definition: trace_funcs.h:55
u32 total_length_not_including_first_buffer
Only valid for first buffer in chain.
Definition: buffer.h:152
static void session_tx_set_dequeue_params(vlib_main_t *vm, session_tx_context_t *ctx, u32 max_segs, u8 peek_data)
Definition: session_node.c:446
struct _vlib_node_registration vlib_node_registration_t
int svm_fifo_dequeue_drop(svm_fifo_t *f, u32 max_bytes)
Definition: svm_fifo.c:792
static u8 session_tx_not_ready(stream_session_t *s, u8 peek_data)
Definition: session_node.c:411
void svm_msg_q_free_msg(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Free message buffer.
enum _transport_proto transport_proto_t
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
#define VLIB_BUFFER_TRACE_TRAJECTORY_INIT(b)
Definition: buffer.h:546
application_t * application_get(u32 index)
Definition: application.c:386
u64 uword
Definition: types.h:112
static void session_mq_accepted_reply_handler(void *data)
Definition: session_node.c:28
static u32 vlib_buffer_free_list_buffer_size(vlib_main_t *vm, vlib_buffer_free_list_index_t index)
Definition: buffer_funcs.h:676
vlib_node_registration_t session_queue_node
(constructor) VLIB_REGISTER_NODE (session_queue_node)
Definition: session_node.c:171
u32 elsize
size of an element
Definition: message_queue.h:33
struct clib_bihash_value offset
template key/value backing page structure
static void session_mq_disconnected_handler(void *data)
Definition: session_node.c:108
static transport_connection_t * session_tx_get_transport(session_tx_context_t *ctx, u8 peek_data)
Definition: session_node.c:426
transport_connection_t * tc
Definition: session.h:155
void vlib_worker_thread_barrier_release(vlib_main_t *vm)
Definition: threads.c:1513
static u32 svm_msg_q_size(svm_msg_q_t *mq)
Check length of message queue.
#define vec_foreach(var, vec)
Vector iterator.
static int svm_fifo_has_event(svm_fifo_t *f)
Definition: svm_fifo.h:123
static void vlib_set_trace_count(vlib_main_t *vm, vlib_node_runtime_t *rt, u32 count)
Definition: trace_funcs.h:159
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:62
u32 flags
buffer flags: VLIB_BUFFER_FREE_LIST_INDEX_MASK: bits used to store free list index, VLIB_BUFFER_IS_TRACED: trace this buffer.
Definition: buffer.h:111
static u32 vlib_buffer_alloc(vlib_main_t *vm, u32 *buffers, u32 n_buffers)
Allocate buffers into supplied array.
Definition: buffer_funcs.h:490
vlib_node_registration_t session_queue_process_node
(constructor) VLIB_REGISTER_NODE (session_queue_process_node)
int svm_fifo_peek(svm_fifo_t *f, u32 relative_offset, u32 max_bytes, u8 *copy_here)
Definition: svm_fifo.c:776
int svm_fifo_dequeue_nowait(svm_fifo_t *f, u32 max_bytes, u8 *copy_here)
Definition: svm_fifo.c:710
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
Definition: buffer_funcs.h:57
static void session_mq_reset_reply_handler(void *data)
Definition: session_node.c:75
void stream_session_disconnect_transport(stream_session_t *s)
Notify transport the session can be disconnected.
Definition: session.c:1135
void svm_msg_q_sub_w_lock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Consumer dequeue one message from queue with mutex held.
u32 app_index
Server index.