FD.io VPP  v17.10-9-gd594711
Vector Packet Processing
session.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef __included_session_h__
16 #define __included_session_h__
17 
24 
25 #define HALF_OPEN_LOOKUP_INVALID_VALUE ((u64)~0)
26 #define INVALID_INDEX ((u32)~0)
27 
28 /* TODO decide how much since we have pre-data as well */
29 #define MAX_HDRS_LEN 100 /* Max number of bytes for headers */
30 
31 typedef enum
32 {
40 
41 static inline const char *
42 fifo_event_type_str (fifo_event_type_t et)
43 {
44  switch (et)
45  {
46  case FIFO_EVENT_APP_RX:
47  return "FIFO_EVENT_APP_RX";
48  case FIFO_EVENT_APP_TX:
49  return "FIFO_EVENT_APP_TX";
50  case FIFO_EVENT_TIMEOUT:
51  return "FIFO_EVENT_TIMEOUT";
53  return "FIFO_EVENT_DISCONNECT";
55  return "FIFO_EVENT_BUILTIN_RX";
56  case FIFO_EVENT_RPC:
57  return "FIFO_EVENT_RPC";
58  default:
59  return "UNKNOWN FIFO EVENT";
60  }
61 }
62 
63 #define foreach_session_input_error \
64 _(NO_SESSION, "No session drops") \
65 _(NO_LISTENER, "No listener for dst port drops") \
66 _(ENQUEUED, "Packets pushed into rx fifo") \
67 _(NOT_READY, "Session not ready packets") \
68 _(FIFO_FULL, "Packets dropped for lack of rx fifo space") \
69 _(EVENT_FIFO_FULL, "Events not sent for lack of event fifo space") \
70 _(API_QUEUE_FULL, "Sessions not created for lack of API queue space") \
71 _(NEW_SEG_NO_SPACE, "Created segment, couldn't allocate a fifo pair") \
72 _(NO_SPACE, "Couldn't allocate a fifo pair")
73 
74 typedef enum
75 {
76 #define _(sym,str) SESSION_ERROR_##sym,
78 #undef _
81 
82 /* Event queue input node static next indices */
83 typedef enum
84 {
92 
93 typedef struct
94 {
95  void *fp;
96  void *arg;
97 } rpc_args_t;
98 
99 /* *INDENT-OFF* */
100 typedef CLIB_PACKED (struct {
101  union
102  {
103  svm_fifo_t * fifo;
104  u64 session_handle;
105  rpc_args_t rpc_args;
106  };
107  u8 event_type;
108  u16 event_id;
109 }) session_fifo_event_t;
110 /* *INDENT-ON* */
111 
112 /* Forward definition */
113 typedef struct _session_manager_main session_manager_main_t;
114 
115 typedef int
118  session_fifo_event_t * e0, stream_session_t * s0,
119  u32 thread_index, int *n_tx_pkts);
120 
123 
124 u8 session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e);
125 
126 struct _session_manager_main
127 {
128  /** Per worker thread session pools */
129  stream_session_t **sessions;
130 
131  /** Pool of listen sessions. Same type as stream sessions to ease lookups */
132  stream_session_t *listen_sessions[SESSION_N_TYPES];
133 
134  /** Sparse vector to map dst port to stream server */
135  u16 *stream_server_by_dst_port[SESSION_N_TYPES];
136 
137  /** per-worker enqueue epoch counters */
138  u8 *current_enqueue_epoch;
139 
140  /** Per-worker thread vector of sessions to enqueue */
141  u32 **session_indices_to_enqueue_by_thread;
142 
143  /** per-worker tx buffer free lists */
144  u32 **tx_buffers;
145 
146  /** Per worker-thread vector of partially read events */
147  session_fifo_event_t **free_event_vector;
148 
149  /** per-worker active event vectors */
150  session_fifo_event_t **pending_event_vector;
151 
152  /** vpp fifo event queue */
153  unix_shared_memory_queue_t **vpp_event_queues;
154 
155  /** vpp fifo event queue configured length */
156  u32 configured_event_queue_length;
157 
158  /** session table size parameters */
159  u32 configured_v4_session_table_buckets;
160  u32 configured_v4_session_table_memory;
161  u32 configured_v4_halfopen_table_buckets;
162  u32 configured_v4_halfopen_table_memory;
163  u32 configured_v6_session_table_buckets;
164  u32 configured_v6_session_table_memory;
165  u32 configured_v6_halfopen_table_buckets;
166  u32 configured_v6_halfopen_table_memory;
167 
168  /** Unique segment name counter */
169  u32 unique_segment_name_counter;
170 
171  /** Per transport rx function that can either dequeue or peek */
172  session_fifo_rx_fn *session_tx_fns[SESSION_N_TYPES];
173 
174  /** Session manager is enabled */
175  u8 is_enabled;
176 
177  /** Preallocate session config parameter */
178  u32 preallocated_sessions;
179 
180 #if SESSION_DBG
181  /**
182  * last event poll time by thread
183  * Debug only. Will cause false cache-line sharing as-is
184  */
185  f64 *last_event_poll_by_thread;
186 #endif
187 
188 };
189 
192 
193 /*
194  * Session manager function
195  */
198 {
199  return &session_manager_main;
200 }
201 
203 stream_session_is_valid (u32 si, u8 thread_index)
204 {
205  stream_session_t *s;
206  s = pool_elt_at_index (session_manager_main.sessions[thread_index], si);
207  if (s->thread_index != thread_index || s->session_index != si
208  /* || s->server_rx_fifo->master_session_index != si
209  || s->server_tx_fifo->master_session_index != si
210  || s->server_rx_fifo->master_thread_index != thread_index
211  || s->server_tx_fifo->master_thread_index != thread_index */ )
212  return 0;
213  return 1;
214 }
215 
217 stream_session_get (u32 si, u32 thread_index)
218 {
219  ASSERT (stream_session_is_valid (si, thread_index));
220  return pool_elt_at_index (session_manager_main.sessions[thread_index], si);
221 }
222 
225 {
226  if (thread_index >= vec_len (session_manager_main.sessions))
227  return 0;
228 
229  if (pool_is_free_index (session_manager_main.sessions[thread_index], si))
230  return 0;
231 
232  ASSERT (stream_session_is_valid (si, thread_index));
233  return pool_elt_at_index (session_manager_main.sessions[thread_index], si);
234 }
235 
238 {
239  return ((u64) s->thread_index << 32) | (u64) s->session_index;
240 }
241 
244 {
245  return handle & 0xFFFFFFFF;
246 }
247 
250 {
251  return handle >> 32;
252 }
253 
254 always_inline void
255 stream_session_parse_handle (u64 handle, u32 * index, u32 * thread_index)
256 {
257  *index = stream_session_index_from_handle (handle);
258  *thread_index = stream_session_thread_from_handle (handle);
259 }
260 
263 {
266  (handle)],
268 }
269 
272 {
273  return pool_elt_at_index (session_manager_main.listen_sessions[sst], si);
274 }
275 
278 {
279  if (s->session_state == SESSION_STATE_LISTENING)
280  return s - session_manager_main.listen_sessions[s->session_type];
281 
282  return s - session_manager_main.sessions[s->thread_index];
283 }
284 
287 {
288  stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
289  return svm_fifo_max_enqueue (s->server_rx_fifo);
290 }
291 
294 {
295  stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
296  return s->server_rx_fifo->nitems;
297 }
298 
300 
301 int
303  u32 offset, u8 queue_event, u8 is_in_order);
304 int
306  u32 offset, u32 max_bytes);
308 
311  u32 rx_pointer, u32 tx_pointer);
312 
317 int
318 stream_session_accept (transport_connection_t * tc, u32 listener_index,
319  u8 sst, u8 notify);
320 int
321 stream_session_open (u32 app_index, session_type_t st,
322  transport_endpoint_t * tep,
323  transport_connection_t ** tc);
328 void session_send_session_evt_to_thread (u64 session_handle,
329  fifo_event_type_t evt_type,
330  u32 thread_index);
331 
332 u8 *format_stream_session (u8 * s, va_list * args);
333 uword unformat_stream_session (unformat_input_t * input, va_list * args);
335  va_list * args);
336 
337 int
338 send_session_connected_callback (u32 app_index, u32 api_context,
339  stream_session_t * s, u8 is_fail);
340 
341 
343 
346 {
347  return session_manager_main.vpp_event_queues[thread_index];
348 }
349 
350 int session_manager_flush_enqueue_events (u32 thread_index);
351 
354 {
355  ASSERT (s->session_state == SESSION_STATE_LISTENING);
356  return ((u64) s->session_type << 32) | s->session_index;
357 }
358 
361 {
363  stream_session_t *s;
364  u32 type, index;
365  type = handle >> 32;
366  index = handle & 0xFFFFFFFF;
367 
368  if (pool_is_free_index (smm->listen_sessions[type], index))
369  return 0;
370 
371  s = pool_elt_at_index (smm->listen_sessions[type], index);
372  ASSERT (s->session_state == SESSION_STATE_LISTENING);
373  return s;
374 }
375 
378 {
379  stream_session_t *s;
380  pool_get_aligned (session_manager_main.listen_sessions[type], s,
382  memset (s, 0, sizeof (*s));
383 
384  s->session_type = type;
385  s->session_state = SESSION_STATE_LISTENING;
386  s->session_index = s - session_manager_main.listen_sessions[type];
387 
388  return s;
389 }
390 
393 {
394  return pool_elt_at_index (session_manager_main.listen_sessions[type],
395  index);
396 }
397 
398 always_inline void
400 {
401  pool_put (session_manager_main.listen_sessions[s->session_type], s);
402 }
403 
406 {
407  return pool_elt_at_index (session_manager_main.listen_sessions[type],
408  index);
409 }
410 
411 always_inline void
413 {
414  /* If an offset function is provided, then peek instead of dequeue */
415  session_manager_main.session_tx_fns[type] = (is_peek) ?
416  session_tx_fifo_peek_and_snd : session_tx_fifo_dequeue_and_snd;
417 }
418 
421 
424 {
425  return session_manager_main.is_enabled == 1;
426 }
427 
428 #endif /* __included_session_h__ */
429 
430 /*
431  * fd.io coding-style-patch-verification: ON
432  *
433  * Local Variables:
434  * eval: (c-set-style "gnu")
435  * End:
436  */
int send_session_connected_callback(u32 app_index, u32 api_context, stream_session_t *s, u8 is_fail)
Definition: session_api.c:155
session_queue_next_t
Definition: session.h:83
static stream_session_t * listen_session_get_from_handle(u64 handle)
Definition: session.h:360
static u32 stream_session_index_from_handle(u64 handle)
Definition: session.h:243
session_type_t session_type_from_proto_and_ip(transport_proto_t proto, u8 is_ip4)
Definition: session.c:820
void * arg
Definition: session.h:96
struct _transport_connection transport_connection_t
uword unformat_transport_connection(unformat_input_t *input, va_list *args)
Definition: session_cli.c:166
uword unformat_stream_session(unformat_input_t *input, va_list *args)
Definition: session_cli.c:135
struct _vlib_node_registration vlib_node_registration_t
static stream_session_t * listen_session_new(session_type_t type)
Definition: session.h:377
static u32 svm_fifo_max_enqueue(svm_fifo_t *f)
Definition: svm_fifo.h:106
void * fp
Definition: session.h:95
static u8 session_manager_is_enabled()
Definition: session.h:423
static u32 stream_session_max_rx_enqueue(transport_connection_t *tc)
Definition: session.h:286
vlib_node_registration_t session_queue_node
(constructor) VLIB_REGISTER_NODE (session_queue_node)
Definition: session_node.c:46
session_fifo_rx_fn session_tx_fifo_peek_and_snd
struct _svm_fifo svm_fifo_t
fifo_event_type_t
Definition: session.h:31
static stream_session_t * session_manager_get_listener(u8 type, u32 index)
Definition: session.h:405
#define always_inline
Definition: clib.h:84
unsigned long u64
Definition: types.h:89
void stream_session_accept_notify(transport_connection_t *tc)
Definition: session.c:506
typedef CLIB_PACKED(struct{union{svm_fifo_t *fifo;u64 session_handle;rpc_args_t rpc_args;};u8 event_type;u16 event_id;}) session_fifo_event_t
u32 stream_session_dequeue_drop(transport_connection_t *tc, u32 max_bytes)
Definition: session.c:307
struct _stream_session_t stream_session_t
u8 session_node_lookup_fifo_event(svm_fifo_t *f, session_fifo_event_t *e)
Definition: session_node.c:496
static void listen_session_del(stream_session_t *s)
Definition: session.h:399
static session_manager_main_t * vnet_get_session_manager_main()
Definition: session.h:197
#define pool_elt_at_index(p, i)
Returns pointer to element at given index.
Definition: pool.h:458
struct _unformat_input_t unformat_input_t
void stream_session_init_fifos_pointers(transport_connection_t *tc, u32 rx_pointer, u32 tx_pointer)
Init fifo tail and head pointers.
Definition: session.c:431
#define pool_put(P, E)
Free an object E in pool P.
Definition: pool.h:270
static void session_manager_set_transport_rx_fn(u8 type, u8 is_peek)
Definition: session.h:412
session_type_t
struct _session_manager_main session_manager_main_t
Definition: session.h:113
#define pool_get_aligned(P, E, A)
Allocate an object E from a pool P (general version).
Definition: pool.h:188
static unix_shared_memory_queue_t * session_manager_get_vpp_event_queue(u32 thread_index)
Definition: session.h:345
session_fifo_rx_fn session_tx_fifo_dequeue_and_snd
vlib_main_t * vm
Definition: buffer.c:283
static u32 stream_session_get_index(stream_session_t *s)
Definition: session.h:277
u8 * format_stream_session(u8 *s, va_list *args)
Format stream session as per the following format.
Definition: session_cli.c:52
void stream_session_disconnect(stream_session_t *s)
Disconnect session and propagate to transport.
Definition: session.c:763
int stream_session_stop_listen(stream_session_t *s)
Ask transport to stop listening on local transport endpoint.
Definition: session.c:707
int stream_session_enqueue_data(transport_connection_t *tc, vlib_buffer_t *b, u32 offset, u8 queue_event, u8 is_in_order)
Definition: session.c:220
int stream_session_connect_notify(transport_connection_t *tc, u8 is_fail)
Definition: session.c:441
#define pool_is_free_index(P, I)
Use free bitmap to query whether given index is free.
Definition: pool.h:267
session_manager_main_t session_manager_main
Definition: session.c:28
static stream_session_t * stream_session_get_if_valid(u64 si, u32 thread_index)
Definition: session.h:224
static u8 stream_session_is_valid(u32 si, u8 thread_index)
Definition: session.h:203
clib_error_t * vnet_session_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: session.c:928
void stream_session_delete_notify(transport_connection_t *tc)
Notification from transport that connection is being deleted.
Definition: session.c:565
static void stream_session_parse_handle(u64 handle, u32 *index, u32 *thread_index)
Definition: session.h:255
void stream_session_cleanup(stream_session_t *s)
Cleanup transport and session state.
Definition: session.c:776
#define ASSERT(truth)
unsigned int u32
Definition: types.h:88
void stream_session_reset_notify(transport_connection_t *tc)
Notify application that connection has been reset.
Definition: session.c:580
static u32 stream_session_thread_from_handle(u64 handle)
Definition: session.h:249
static const char * fifo_event_type_str(fifo_event_type_t et)
Definition: session.h:42
void session_send_session_evt_to_thread(u64 session_handle, fifo_event_type_t evt_type, u32 thread_index)
Definition: session.c:730
int stream_session_open(u32 app_index, session_type_t st, transport_endpoint_t *tep, transport_connection_t **tc)
Ask transport to open connection to remote transport endpoint.
Definition: session.c:637
static stream_session_t * stream_session_listener_get(u8 sst, u64 si)
Definition: session.h:271
u32 stream_session_tx_fifo_max_dequeue(transport_connection_t *tc)
Definition: session.c:290
int stream_session_listen(stream_session_t *s, transport_endpoint_t *tep)
Ask transport to listen on local transport endpoint.
Definition: session.c:676
int stream_session_accept(transport_connection_t *tc, u32 listener_index, u8 sst, u8 notify)
Accept a stream session.
Definition: session.c:594
u64 uword
Definition: types.h:112
static u64 stream_session_handle(stream_session_t *s)
Definition: session.h:237
unsigned short u16
Definition: types.h:57
enum _transport_proto transport_proto_t
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
double f64
Definition: types.h:142
static stream_session_t * listen_session_get(session_type_t type, u32 index)
Definition: session.h:392
unsigned char u8
Definition: types.h:56
struct _transport_endpoint transport_endpoint_t
session_error_t
Definition: session.h:74
#define foreach_session_input_error
Definition: session.h:63
static u32 stream_session_rx_fifo_size(transport_connection_t *tc)
Definition: session.h:293
int stream_session_peek_bytes(transport_connection_t *tc, u8 *buffer, u32 offset, u32 max_bytes)
Definition: session.c:299
int( session_fifo_rx_fn)(vlib_main_t *vm, vlib_node_runtime_t *node, session_manager_main_t *smm, session_fifo_event_t *e0, stream_session_t *s0, u32 thread_index, int *n_tx_pkts)
Definition: session.h:116
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:67
static u64 listen_session_get_handle(stream_session_t *s)
Definition: session.h:353
void stream_session_disconnect_notify(transport_connection_t *tc)
Notification from transport that connection is being closed.
Definition: session.c:524
static stream_session_t * stream_session_get_from_handle(u64 handle)
Definition: session.h:262
int session_manager_flush_enqueue_events(u32 thread_index)
Flushes queue of sessions that are to be notified of new data enqueued events.
Definition: session.c:392
struct _unix_shared_memory_queue unix_shared_memory_queue_t
static stream_session_t * stream_session_get(u32 si, u32 thread_index)
Definition: session.h:217