49 case SESSION_TYPE_IP4_UDP:
50 case SESSION_TYPE_IP4_TCP:
53 clib_bihash_add_del_16_8 (&smm->v4_session_hash, &kv4, 1 );
55 case SESSION_TYPE_IP6_UDP:
56 case SESSION_TYPE_IP6_TCP:
59 clib_bihash_add_del_48_8 (&smm->v6_session_hash, &kv6, 1 );
73 tc =
tp_vfts[s->session_type].get_connection (s->connection_index,
88 case SESSION_TYPE_IP4_UDP:
89 case SESSION_TYPE_IP4_TCP:
92 clib_bihash_add_del_16_8 (&smm->v4_half_open_hash, &kv4,
95 case SESSION_TYPE_IP6_UDP:
96 case SESSION_TYPE_IP6_TCP:
99 clib_bihash_add_del_48_8 (&smm->v6_half_open_hash, &kv6,
117 case SESSION_TYPE_IP4_UDP:
118 case SESSION_TYPE_IP4_TCP:
120 return clib_bihash_add_del_16_8 (&smm->v4_session_hash, &kv4,
123 case SESSION_TYPE_IP6_UDP:
124 case SESSION_TYPE_IP6_TCP:
126 return clib_bihash_add_del_48_8 (&smm->v6_session_hash, &kv6,
142 ts =
tp_vfts[s->session_type].get_connection (s->connection_index,
156 case SESSION_TYPE_IP4_UDP:
157 case SESSION_TYPE_IP4_TCP:
159 clib_bihash_add_del_16_8 (&smm->v4_half_open_hash, &kv4,
162 case SESSION_TYPE_IP6_UDP:
163 case SESSION_TYPE_IP6_TCP:
165 clib_bihash_add_del_48_8 (&smm->v6_half_open_hash, &kv6,
182 rv = clib_bihash_search_inline_16_8 (&smm->v4_session_hash, &kv4);
188 rv = clib_bihash_search_inline_16_8 (&smm->v4_session_hash, &kv4);
203 u16 lcl_port,
u16 rmt_port,
u8 proto,
212 rv = clib_bihash_search_inline_16_8 (&smm->v4_session_hash, &kv4);
228 rv = clib_bihash_search_inline_48_8 (&smm->v6_session_hash, &kv6);
233 kv6.
key[0] = kv6.
key[1] = 0;
234 rv = clib_bihash_search_inline_48_8 (&smm->v6_session_hash, &kv6);
247 u16 lcl_port,
u16 rmt_port,
u8 proto,
255 rv = clib_bihash_search_inline_48_8 (&smm->v6_session_hash, &kv6);
268 case SESSION_TYPE_IP4_UDP:
269 case SESSION_TYPE_IP4_TCP:
272 case SESSION_TYPE_IP6_UDP:
273 case SESSION_TYPE_IP6_TCP:
282 ip46_address_t * lcl, ip46_address_t * rmt,
283 u16 lcl_port,
u16 rmt_port,
u8 proto)
291 case SESSION_TYPE_IP4_UDP:
292 case SESSION_TYPE_IP4_TCP:
293 make_v4_ss_kv (&kv4, &lcl->ip4, &rmt->ip4, lcl_port, rmt_port, proto);
294 rv = clib_bihash_search_inline_16_8 (&smm->v4_half_open_hash, &kv4);
301 case SESSION_TYPE_IP6_UDP:
302 case SESSION_TYPE_IP6_TCP:
303 make_v6_ss_kv (&kv6, &lcl->ip6, &rmt->ip6, lcl_port, rmt_port, proto);
304 rv = clib_bihash_search_inline_48_8 (&smm->v6_half_open_hash, &kv6);
317 u16 lcl_port,
u16 rmt_port,
u8 proto,
327 rv = clib_bihash_search_inline_16_8 (&smm->v4_session_hash, &kv4);
332 return tp_vfts[s->session_type].get_connection (s->connection_index,
339 return tp_vfts[s->session_type].get_listener (s->connection_index);
342 rv = clib_bihash_search_inline_16_8 (&smm->v4_half_open_hash, &kv4);
344 return tp_vfts[proto].get_half_open (kv4.
value & 0xFFFFFFFF);
351 u16 lcl_port,
u16 rmt_port,
u8 proto,
360 rv = clib_bihash_search_inline_48_8 (&smm->v6_session_hash, &kv6);
365 return tp_vfts[s->session_type].get_connection (s->connection_index,
372 return tp_vfts[s->session_type].get_listener (s->connection_index);
375 rv = clib_bihash_search_inline_48_8 (&smm->v6_half_open_hash, &kv6);
377 return tp_vfts[proto].get_half_open (kv6.
value & 0xFFFFFFFF);
392 if (smm->vpp_event_queues[thread_index] == 0)
397 smm->vpp_event_queues[thread_index] =
399 sizeof (session_fifo_event_t),
421 u32 segment_size,
u8 * segment_name)
426 memset (ca, 0,
sizeof (*ca));
434 clib_warning (
"svm_fifo_segment_create ('%s', %d) failed",
451 u32 add_segment_size;
452 u32 default_segment_size = 128 << 10;
454 memset (ca, 0,
sizeof (*ca));
455 segment_name =
format (0,
"%d-%d%c", getpid (),
456 smm->unique_segment_name_counter++, 0);
458 sm->add_segment_size ? sm->add_segment_size : default_segment_size;
470 memset (ca, 0,
sizeof (*ca));
471 *segment_name =
format (0,
"%d-%d%c", getpid (),
472 smm->unique_segment_name_counter++, 0);
479 u32 *deleted_sessions = 0;
480 u32 *deleted_thread_indices = 0;
484 for (j = 0; j <
vec_len (sm->segment_indices); j++)
497 for (i = 0; i <
vec_len (fifos); i++)
500 u32 session_index, thread_index;
511 if (!session->is_deleted)
513 session->is_deleted = 1;
515 session - smm->sessions[thread_index]);
516 vec_add1 (deleted_thread_indices, thread_index);
520 for (i = 0; i <
vec_len (deleted_sessions); i++)
526 deleted_sessions[i]);
555 u32 * fifo_segment_index,
556 u8 * added_a_segment)
559 u32 fifo_size, default_fifo_size = 128 << 10;
562 *added_a_segment = 0;
568 for (i = 0; i <
vec_len (sm->segment_indices); i++)
570 *fifo_segment_index = sm->segment_indices[
i];
573 fifo_size = sm->rx_fifo_size;
574 fifo_size = (fifo_size == 0) ? default_fifo_size : fifo_size;
577 fifo_size = sm->tx_fifo_size;
578 fifo_size = (fifo_size == 0) ? default_fifo_size : fifo_size;
581 if (*server_rx_fifo == 0)
584 if (*server_tx_fifo != 0)
591 if (*server_tx_fifo == 0)
593 if (*server_rx_fifo != 0)
604 if (*server_rx_fifo == 0)
608 if (*added_a_segment)
610 clib_warning (
"added a segment, still cant allocate a fifo");
611 return SESSION_ERROR_NEW_SEG_NO_SPACE;
615 return VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
617 *added_a_segment = 1;
623 return SESSION_ERROR_NO_SPACE;
635 svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
636 u32 fifo_segment_index;
637 u32 pool_index, seg_size;
640 u32 thread_index = tc->thread_index;
649 return SESSION_ERROR_API_QUEUE_FULL;
660 ASSERT (app->cb_fns.add_segment_callback);
664 if (app->cb_fns.add_segment_callback (app->api_client_index, seg_name,
666 return VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
670 pool_get (smm->sessions[thread_index], s);
671 memset (s, 0,
sizeof (*s));
674 pool_index = s - smm->sessions[thread_index];
678 server_tx_fifo->server_session_index = pool_index;
679 server_tx_fifo->server_thread_index = thread_index;
681 s->server_rx_fifo = server_rx_fifo;
682 s->server_tx_fifo = server_tx_fifo;
685 s->session_type = app->session_type;
688 s->server_segment_index = fifo_segment_index;
689 s->thread_index = thread_index;
690 s->session_index = pool_index;
693 s->connection_index = tc->c_index;
696 tc->s_index = s->session_index;
699 value = (((
u64) thread_index) << 32) | (
u64) s->session_index;
740 u32 thread_index = s->thread_index;
741 u32 my_enqueue_epoch = smm->current_enqueue_epoch[thread_index];
743 if (s->enqueue_epoch != my_enqueue_epoch)
745 s->enqueue_epoch = my_enqueue_epoch;
746 vec_add1 (smm->session_indices_to_enqueue_by_thread[thread_index],
747 s - smm->sessions[thread_index]);
775 return svm_fifo_peek (s->server_tx_fifo, s->pid, offset, max_bytes, buffer);
797 session_fifo_event_t evt;
799 static u32 serial_number;
808 evt.fifo = s->server_rx_fifo;
810 evt.event_id = serial_number++;
814 if (app->cb_fns.builtin_server_rx_callback)
815 return app->cb_fns.builtin_server_rx_callback (s, &evt);
818 q = app->event_queue;
829 ed->data[0] = evt.event_id;
830 ed->data[1] = evt.enqueue_length;
849 u32 *session_indices_to_enqueue;
852 session_indices_to_enqueue =
853 smm->session_indices_to_enqueue_by_thread[thread_index];
855 for (i = 0; i <
vec_len (session_indices_to_enqueue); i++)
869 smm->session_indices_to_enqueue_by_thread[thread_index] =
870 session_indices_to_enqueue;
873 smm->current_enqueue_epoch[thread_index]++;
896 pool_get (smm->listen_sessions[srv->session_type], s);
897 memset (s, 0,
sizeof (*s));
899 s->session_type = srv->session_type;
901 s->session_index = s - smm->listen_sessions[srv->session_type];
902 s->app_index = srv->index;
905 tci =
tp_vfts[srv->session_type].bind (s->session_index, ip, port);
908 s->connection_index = tci;
909 tc =
tp_vfts[srv->session_type].get_listener (tci);
911 srv->session_index = s->session_index;
931 tc =
tp_vfts[srv->session_type].get_listener (listener->connection_index);
934 tp_vfts[srv->session_type].unbind (listener->connection_index);
935 pool_put (smm->listen_sessions[srv->session_type], listener);
951 u32 connect_fifo_size = 256 << 10;
952 u32 default_segment_size = 1 << 20;
954 pool_get (smm->session_managers, sm);
955 memset (sm, 0,
sizeof (*sm));
957 sm->add_segment_size = default_segment_size;
958 sm->rx_fifo_size = connect_fifo_size;
959 sm->tx_fifo_size = connect_fifo_size;
963 smm->connect_manager_index[session_type] = sm - smm->session_managers;
976 tc->lcl_port, tc->rmt_port,
993 app->thread_index = new_s->thread_index;
1000 app->cb_fns.session_connected_callback (app->api_client_index, new_s,
1015 server->cb_fns.session_accept_callback (s);
1033 server->cb_fns.session_disconnect_callback (s);
1082 if (sm->segment_indices[0] != fifo_index &&
vec_len (fifos) == 0)
1086 pool_put (smm->sessions[s->thread_index], s);
1121 app->cb_fns.session_reset_callback (s);
1150 server->cb_fns.session_accept_callback (s);
1166 rv =
tp_vfts[sst].open (addr, port_host_byte_order);
1170 return VNET_API_ERROR_SESSION_CONNECT_FAIL;
1176 tc =
tp_vfts[sst].get_half_open (tci);
1179 value = (((
u64) app_index) << 32) | (
u64) tc->c_index;
1196 tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
1218 tp_vfts[s->session_type].cleanup (s->connection_index, s->thread_index);
1230 smm->session_tx_fns[type] =
1253 if (num_threads < 1)
1262 vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1);
1265 vec_validate (smm->evts_partially_read, num_threads - 1);
1266 vec_validate (smm->current_enqueue_epoch, num_threads - 1);
1270 for (i = 0; i < 200000; i++)
1274 memset (ss, 0,
sizeof (*ss));
1277 for (i = 0; i < 200000; i++)
1280 clib_bihash_init_16_8 (&smm->v4_session_hash,
"v4 session table",
1283 clib_bihash_init_48_8 (&smm->v6_session_hash,
"v6 session table",
1287 clib_bihash_init_16_8 (&smm->v4_half_open_hash,
"v4 half-open table",
1290 clib_bihash_init_48_8 (&smm->v6_half_open_hash,
"v6 half-open table",
1297 smm->is_enabled = 1;
1314 VLIB_NODE_STATE_POLLING);
1322 VLIB_NODE_STATE_DISABLED);
1333 smm->vlib_main =
vm;
1335 smm->is_enabled = 0;
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
int session_manager_flush_enqueue_events(u32 thread_index)
Flushes queue of sessions that are to be notified of new data enqueued events.
void vpp_session_event_queue_allocate(session_manager_main_t *smm, u32 thread_index)
Allocate vpp event queue (once) per worker thread.
sll srl srl sll sra u16x4 i
u32 stream_session_peek_bytes(transport_connection_t *tc, u8 *buffer, u32 offset, u32 max_bytes)
static void svm_pop_heap(void *oldheap)
int svm_fifo_enqueue_nowait(svm_fifo_t *f, int pid, u32 max_bytes, u8 *copy_from_here)
int stream_session_accept(transport_connection_t *tc, u32 listener_index, u8 sst, u8 notify)
Accept a stream session.
void stream_session_table_add(session_manager_main_t *smm, stream_session_t *s, u64 value)
void stream_session_connect_notify(transport_connection_t *tc, u8 sst, u8 is_fail)
static transport_proto_vft_t * tp_vfts
Per-type vector of transport protocol virtual function tables.
stream_session_t * stream_session_lookup_listener4(ip4_address_t *lcl, u16 lcl_port, u8 proto)
vnet_main_t * vnet_get_main(void)
struct _transport_connection transport_connection_t
int session_manager_allocate_session_fifos(session_manager_main_t *smm, session_manager_t *sm, svm_fifo_t **server_rx_fifo, svm_fifo_t **server_tx_fifo, u32 *fifo_segment_index, u8 *added_a_segment)
static void make_v4_ss_kv_from_tc(session_kv4_t *kv, transport_connection_t *t)
session_manager_main_t session_manager_main
void session_manager_del(session_manager_main_t *smm, session_manager_t *sm)
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
static void make_v6_ss_kv(session_kv6_t *kv, ip6_address_t *lcl, ip6_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto)
static void make_v6_ss_kv_from_tc(session_kv6_t *kv, transport_connection_t *t)
static u32 svm_fifo_max_enqueue(svm_fifo_t *f)
int session_manager_add_first_segment(session_manager_main_t *smm, session_manager_t *sm, u32 segment_size, u8 **segment_name)
#define pool_get(P, E)
Allocate an object E from a pool P (unspecified alignment).
int stream_session_open(u8 sst, ip46_address_t *addr, u16 port_host_byte_order, u32 app_index)
session_fifo_rx_fn session_tx_fifo_peek_and_snd
static void make_v6_listener_kv(session_kv6_t *kv, ip6_address_t *lcl, u16 lcl_port, u8 proto)
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
struct _stream_session_t stream_session_t
static void make_v4_listener_kv(session_kv4_t *kv, ip4_address_t *lcl, u16 lcl_port, u8 proto)
static stream_session_t * stream_session_get(u64 si, u32 thread_index)
void session_manager_get_segment_info(u32 index, u8 **name, u32 *size)
static void stream_session_half_open_table_del(session_manager_main_t *smm, u8 sst, transport_connection_t *tc)
static int session_manager_add_segment_i(session_manager_main_t *smm, session_manager_t *sm, u32 segment_size, u8 *segment_name)
#define VLIB_INIT_FUNCTION(x)
static void * svm_push_data_heap(svm_region_t *rp)
static void stream_session_table_add_for_tc(u8 sst, transport_connection_t *tc, u64 value)
void stream_session_accept_notify(transport_connection_t *tc)
static u32 svm_fifo_max_dequeue(svm_fifo_t *f)
static u64 stream_session_half_open_lookup(session_manager_main_t *smm, ip46_address_t *lcl, ip46_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto)
stream_session_t * stream_session_lookup_listener6(ip6_address_t *lcl, u16 lcl_port, u8 proto)
#define clib_error_return(e, args...)
static session_manager_t * session_manager_get(u32 index)
stream_session_t * stream_session_lookup4(ip4_address_t *lcl, ip4_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto, u32 my_thread_index)
Looks up a session based on the 5-tuple passed as argument.
void stream_session_delete_notify(transport_connection_t *tc)
Notification from transport that connection is being deleted.
void stream_session_cleanup(stream_session_t *s)
Cleanup transport and session state.
void stream_session_delete(stream_session_t *s)
Cleans up session and associated app if needed.
transport_connection_t * stream_session_lookup_transport4(ip4_address_t *lcl, ip4_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto, u32 my_thread_index)
static int session_manager_add_segment(session_manager_main_t *smm, session_manager_t *sm)
static clib_error_t * session_manager_main_enable(vlib_main_t *vm)
struct _transport_proto_vft transport_proto_vft_t
int unix_shared_memory_queue_add(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
u32 stream_session_dequeue_drop(transport_connection_t *tc, u32 max_bytes)
static session_manager_main_t * vnet_get_session_manager_main()
#define pool_elt_at_index(p, i)
Returns pointer to element at given index.
int stream_session_create_i(session_manager_main_t *smm, application_t *app, transport_connection_t *tc, stream_session_t **ret_s)
int svm_fifo_dequeue_drop(svm_fifo_t *f, int pid, u32 max_bytes)
#define pool_put(P, E)
Free an object E in pool P.
void svm_fifo_segment_init(u64 baseva, u32 timeout_in_seconds)
clib_error_t * vnet_tcp_enable_disable(vlib_main_t *vm, u8 is_en)
svm_fifo_segment_header_t * h
clib_error_t * session_manager_main_init(vlib_main_t *vm)
struct _session_manager_main session_manager_main_t
clib_error_t * vnet_session_enable_disable(vlib_main_t *vm, u8 is_en)
u32 svm_fifo_segment_index(svm_fifo_segment_private_t *s)
session_fifo_rx_fn session_tx_fifo_dequeue_and_snd
svm_fifo_t * svm_fifo_segment_alloc_fifo(svm_fifo_segment_private_t *s, u32 data_size_in_bytes)
void connects_session_manager_init(session_manager_main_t *smm, u8 session_type)
static u32 stream_session_get_index(stream_session_t *s)
int stream_session_start_listen(u32 server_index, ip46_address_t *ip, u16 port)
#define vec_free(V)
Free vector's memory (no header).
#define clib_warning(format, args...)
struct _session_manager session_manager_t
struct _application application_t
static stream_session_t * stream_session_get_if_valid(u64 si, u32 thread_index)
static svm_fifo_segment_private_t * svm_fifo_get_segment(u32 segment_index)
void stream_session_disconnect_notify(transport_connection_t *tc)
Notification from transport that connection is being closed.
static stream_session_t * stream_session_get_tsi(u64 ti_and_si, u32 thread_index)
#define pool_put_index(p, i)
Free pool element with given index.
unix_shared_memory_queue_t * unix_shared_memory_queue_init(int nels, int elsize, int consumer_pid, int signal_when_queue_non_empty)
static void make_v4_ss_kv(session_kv4_t *kv, ip4_address_t *lcl, ip4_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto)
void application_del(application_t *app)
void stream_session_disconnect(stream_session_t *s)
Disconnect session and propagate to transport.
u32 application_get_index(application_t *app)
void stream_session_reset_notify(transport_connection_t *tc)
Notify application that connection has been reset.
static void vlib_node_set_state(vlib_main_t *vm, u32 node_index, vlib_node_state_t new_state)
Set node dispatch state.
stream_session_t * stream_session_lookup6(ip6_address_t *lcl, ip6_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto, u32 my_thread_index)
vlib_node_registration_t session_queue_node
(constructor) VLIB_REGISTER_NODE (session_queue_node)
void session_register_transport(u8 type, const transport_proto_vft_t *vft)
int svm_fifo_segment_create(svm_fifo_segment_create_args_t *a)
(master) create an svm fifo segment
#define HALF_OPEN_LOOKUP_INVALID_VALUE
transport_proto_vft_t * session_get_transport_vft(u8 type)
template key/value backing page structure
static int stream_session_table_del(session_manager_main_t *smm, stream_session_t *s)
static int stream_session_enqueue_notify(stream_session_t *s, u8 block)
Notify session peer that new data has been enqueued.
void svm_fifo_segment_free_fifo(svm_fifo_segment_private_t *s, svm_fifo_t *f)
int connect_server_add_segment_cb(application_t *ss, char *segment_name, u32 segment_size)
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
int svm_fifo_peek(svm_fifo_t *f, int pid, u32 offset, u32 max_bytes, u8 *copy_here)
application_t * application_get(u32 index)
void stream_session_stop_listen(u32 server_index)
int stream_session_enqueue_data(transport_connection_t *tc, u8 *data, u16 len, u8 queue_event)
static vlib_thread_main_t * vlib_get_thread_main()
stream_session_t * stream_session_lookup_listener(ip46_address_t *lcl, u16 lcl_port, u8 proto)
static void stream_session_half_open_table_add(u8 sst, transport_connection_t *tc, u64 value)
transport_connection_t * stream_session_lookup_transport6(ip6_address_t *lcl, ip6_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto, u32 my_thread_index)
int application_api_queue_is_full(application_t *app)
#define SESSION_EVT_DBG(_s, _evt, _body)
application_t * application_get_if_valid(u32 index)
u8 stream_session_no_space(transport_connection_t *tc, u32 thread_index, u16 data_len)
Check if we have space in rx fifo to push more bytes.
void svm_fifo_segment_delete(svm_fifo_segment_private_t *s)
struct _unix_shared_memory_queue unix_shared_memory_queue_t
static int stream_session_table_del_for_tc(session_manager_main_t *smm, u8 sst, transport_connection_t *tc)