flowprobe: add support for reporting on inbound packets 94/36094/4
authorAlexander Chernavin <achernavin@netgate.com>
Fri, 6 May 2022 11:35:59 +0000 (11:35 +0000)
committerOle Tr�an <otroan@employees.org>
Fri, 13 May 2022 07:34:41 +0000 (07:34 +0000)
Type: feature

Currently, the plugin supports only IPFIX flow record generation for
outbound packets.

With this change:
 - add a new API message for enabling the feature on an interface that
   accepts direction (rx, tx, both);
 - update existing debug command for feature enabling to accept
   direction;
 - update existing debug command for showing currently enabled feature
   on interfaces to display direction;
 - update templates to include a direction field;
 - generate flow records on the specified direction and data path;
 - report direction in flow data;
 - update tests to use the new API;
 - add tests for inbound flows.

Change-Id: I121fd904b38408641036ebeea848df7a4e5e0b30
Signed-off-by: Alexander Chernavin <achernavin@netgate.com>
src/plugins/flowprobe/FEATURE.yaml
src/plugins/flowprobe/flowprobe.api
src/plugins/flowprobe/flowprobe.c
src/plugins/flowprobe/flowprobe.h
src/plugins/flowprobe/flowprobe_plugin_doc.rst
src/plugins/flowprobe/flowprobe_test.c
src/plugins/flowprobe/node.c
test/test_flowprobe.py

index 6638243..9c80b12 100644 (file)
@@ -2,12 +2,11 @@
 name: IPFIX probe
 maintainer: Ole Troan <ot@cisco.com>
 features:
-  - L2 input feature
-  - IPv4 / IPv6 input feature
-  - Recording of L2, L3 and L4 information
-description: "IPFIX flow probe. Works in the L2, or IP input feature path."
+  - L2 input and output feature path
+  - IPv4 / IPv6 input and output feature path
+  - Recording of L2, L3, and L4 information
+description: "IPFIX flow probe. Works in the L2 or IP feature path both input and output."
 missing:
-  - Output path
   - Export over IPv6
   - Export over TCP/SCTP
 state: production
index 55dd51d..8702568 100644 (file)
@@ -5,7 +5,7 @@
     used to control the flowprobe plugin
 */
 
-option version = "1.0.0";
+option version = "2.0.0";
 
 import "vnet/interface_types.api";
 
@@ -16,6 +16,13 @@ enum flowprobe_which_flags : u8
   FLOWPROBE_WHICH_FLAG_IP6 = 0x4,
 };
 
+enum flowprobe_which : u8
+{
+  FLOWPROBE_WHICH_IP4 = 0,
+  FLOWPROBE_WHICH_IP6,
+  FLOWPROBE_WHICH_L2,
+};
+
 enum flowprobe_record_flags : u8
 {
   FLOWPROBE_RECORD_FLAG_L2 = 0x1,
@@ -23,6 +30,13 @@ enum flowprobe_record_flags : u8
   FLOWPROBE_RECORD_FLAG_L4 = 0x4,
 };
 
+enum flowprobe_direction : u8
+{
+  FLOWPROBE_DIRECTION_RX = 0,
+  FLOWPROBE_DIRECTION_TX,
+  FLOWPROBE_DIRECTION_BOTH,
+};
+
 /** \brief Enable / disable per-packet IPFIX recording on an interface
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
@@ -32,6 +46,8 @@ enum flowprobe_record_flags : u8
 */
 autoreply define flowprobe_tx_interface_add_del
 {
+  option replaced_by="flowprobe_interface_add_del";
+
   /* Client identifier, set from api_main.my_client_index */
   u32 client_index;
 
@@ -47,6 +63,26 @@ autoreply define flowprobe_tx_interface_add_del
   option vat_help = "<intfc> [disable]";
 };
 
+/** \brief Enable or disable IPFIX flow record generation on an interface
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+    @param is_add - add interface if non-zero, else delete
+    @param which - datapath on which to record flows
+    @param direction - direction of recorded flows
+    @param sw_if_index - index of the interface
+*/
+autoreply define flowprobe_interface_add_del
+{
+  option in_progress;
+  u32 client_index;
+  u32 context;
+  bool is_add;
+  vl_api_flowprobe_which_t which;
+  vl_api_flowprobe_direction_t direction;
+  vl_api_interface_index_t sw_if_index;
+  option vat_help = "(<intfc> | sw_if_index <if-idx>) [(ip4|ip6|l2)] [(rx|tx|both)] [disable]";
+};
+
 autoreply define flowprobe_params
 {
   u32 client_index;
index 27cb400..5a747a6 100644 (file)
@@ -46,23 +46,46 @@ uword flowprobe_walker_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
 
 /* Define the per-interface configurable features */
 /* *INDENT-OFF* */
-VNET_FEATURE_INIT (flow_perpacket_ip4, static) =
-{
+VNET_FEATURE_INIT (flowprobe_input_ip4_unicast, static) = {
+  .arc_name = "ip4-unicast",
+  .node_name = "flowprobe-input-ip4",
+  .runs_before = VNET_FEATURES ("ip4-lookup"),
+};
+VNET_FEATURE_INIT (flowprobe_input_ip4_multicast, static) = {
+  .arc_name = "ip4-multicast",
+  .node_name = "flowprobe-input-ip4",
+  .runs_before = VNET_FEATURES ("ip4-mfib-forward-lookup"),
+};
+VNET_FEATURE_INIT (flowprobe_input_ip6_unicast, static) = {
+  .arc_name = "ip6-unicast",
+  .node_name = "flowprobe-input-ip6",
+  .runs_before = VNET_FEATURES ("ip6-lookup"),
+};
+VNET_FEATURE_INIT (flowprobe_input_ip6_multicast, static) = {
+  .arc_name = "ip6-multicast",
+  .node_name = "flowprobe-input-ip6",
+  .runs_before = VNET_FEATURES ("ip6-mfib-forward-lookup"),
+};
+VNET_FEATURE_INIT (flowprobe_input_l2, static) = {
+  .arc_name = "device-input",
+  .node_name = "flowprobe-input-l2",
+  .runs_before = VNET_FEATURES ("ethernet-input"),
+};
+VNET_FEATURE_INIT (flowprobe_output_ip4, static) = {
   .arc_name = "ip4-output",
-  .node_name = "flowprobe-ip4",
+  .node_name = "flowprobe-output-ip4",
   .runs_before = VNET_FEATURES ("interface-output"),
 };
 
-VNET_FEATURE_INIT (flow_perpacket_ip6, static) =
-{
+VNET_FEATURE_INIT (flowprobe_output_ip6, static) = {
   .arc_name = "ip6-output",
-  .node_name = "flowprobe-ip6",
+  .node_name = "flowprobe-output-ip6",
   .runs_before = VNET_FEATURES ("interface-output"),
 };
 
-VNET_FEATURE_INIT (flow_perpacket_l2, static) = {
+VNET_FEATURE_INIT (flowprobe_output_l2, static) = {
   .arc_name = "interface-output",
-  .node_name = "flowprobe-l2",
+  .node_name = "flowprobe-output-l2",
   .runs_before = VNET_FEATURES ("interface-output-arc-end"),
 };
 /* *INDENT-ON* */
@@ -143,7 +166,7 @@ flowprobe_template_l2_fields (ipfix_field_specifier_t * f)
 static inline ipfix_field_specifier_t *
 flowprobe_template_common_fields (ipfix_field_specifier_t * f)
 {
-#define flowprobe_template_common_field_count() 5
+#define flowprobe_template_common_field_count() 6
   /* ingressInterface, TLV type 10, u32 */
   f->e_id_length = ipfix_e_id_length (0 /* enterprise */ ,
                                      ingressInterface, 4);
@@ -154,6 +177,10 @@ flowprobe_template_common_fields (ipfix_field_specifier_t * f)
                                      egressInterface, 4);
   f++;
 
+  /* flowDirection, TLV type 61, u8 */
+  f->e_id_length = ipfix_e_id_length (0 /* enterprise */, flowDirection, 1);
+  f++;
+
   /* packetDeltaCount, TLV type 2, u64 */
   f->e_id_length = ipfix_e_id_length (0 /* enterprise */ ,
                                      packetDeltaCount, 8);
@@ -483,6 +510,7 @@ validate_feature_on_interface (flowprobe_main_t * fm, u32 sw_if_index,
                               u8 which)
 {
   vec_validate_init_empty (fm->flow_per_interface, sw_if_index, ~0);
+  vec_validate_init_empty (fm->direction_per_interface, sw_if_index, ~0);
 
   if (fm->flow_per_interface[sw_if_index] == (u8) ~ 0)
     return -1;
@@ -496,13 +524,15 @@ validate_feature_on_interface (flowprobe_main_t * fm, u32 sw_if_index,
  * @brief configure / deconfigure the IPFIX flow-per-packet
  * @param fm flowprobe_main_t * fm
  * @param sw_if_index u32 the desired interface
+ * @param which u8 the desired datapath
+ * @param direction u8 the desired direction
  * @param is_add int 1 to enable the feature, 0 to disable it
  * @returns 0 if successful, non-zero otherwise
  */
 
 static int
-flowprobe_tx_interface_add_del_feature (flowprobe_main_t * fm,
-                                       u32 sw_if_index, u8 which, int is_add)
+flowprobe_interface_add_del_feature (flowprobe_main_t *fm, u32 sw_if_index,
+                                    u8 which, u8 direction, int is_add)
 {
   vlib_main_t *vm = vlib_get_main ();
   int rv = 0;
@@ -510,6 +540,7 @@ flowprobe_tx_interface_add_del_feature (flowprobe_main_t * fm,
   flowprobe_record_t flags = fm->record;
 
   fm->flow_per_interface[sw_if_index] = (is_add) ? which : (u8) ~ 0;
+  fm->direction_per_interface[sw_if_index] = (is_add) ? direction : (u8) ~0;
   fm->template_per_flow[which] += (is_add) ? 1 : -1;
   if (is_add && fm->template_per_flow[which] > 1)
     template_id = fm->template_reports[flags];
@@ -574,15 +605,39 @@ flowprobe_tx_interface_add_del_feature (flowprobe_main_t * fm,
       fm->template_reports[flags] = (is_add) ? template_id : 0;
     }
 
-  if (which == FLOW_VARIANT_IP4)
-    vnet_feature_enable_disable ("ip4-output", "flowprobe-ip4",
-                                sw_if_index, is_add, 0, 0);
-  else if (which == FLOW_VARIANT_IP6)
-    vnet_feature_enable_disable ("ip6-output", "flowprobe-ip6",
-                                sw_if_index, is_add, 0, 0);
-  else if (which == FLOW_VARIANT_L2)
-    vnet_feature_enable_disable ("interface-output", "flowprobe-l2",
-                                sw_if_index, is_add, 0, 0);
+  if (direction == FLOW_DIRECTION_RX || direction == FLOW_DIRECTION_BOTH)
+    {
+      if (which == FLOW_VARIANT_IP4)
+       {
+         vnet_feature_enable_disable ("ip4-unicast", "flowprobe-input-ip4",
+                                      sw_if_index, is_add, 0, 0);
+         vnet_feature_enable_disable ("ip4-multicast", "flowprobe-input-ip4",
+                                      sw_if_index, is_add, 0, 0);
+       }
+      else if (which == FLOW_VARIANT_IP6)
+       {
+         vnet_feature_enable_disable ("ip6-unicast", "flowprobe-input-ip6",
+                                      sw_if_index, is_add, 0, 0);
+         vnet_feature_enable_disable ("ip6-multicast", "flowprobe-input-ip6",
+                                      sw_if_index, is_add, 0, 0);
+       }
+      else if (which == FLOW_VARIANT_L2)
+       vnet_feature_enable_disable ("device-input", "flowprobe-input-l2",
+                                    sw_if_index, is_add, 0, 0);
+    }
+
+  if (direction == FLOW_DIRECTION_TX || direction == FLOW_DIRECTION_BOTH)
+    {
+      if (which == FLOW_VARIANT_IP4)
+       vnet_feature_enable_disable ("ip4-output", "flowprobe-output-ip4",
+                                    sw_if_index, is_add, 0, 0);
+      else if (which == FLOW_VARIANT_IP6)
+       vnet_feature_enable_disable ("ip6-output", "flowprobe-output-ip6",
+                                    sw_if_index, is_add, 0, 0);
+      else if (which == FLOW_VARIANT_L2)
+       vnet_feature_enable_disable ("interface-output", "flowprobe-output-l2",
+                                    sw_if_index, is_add, 0, 0);
+    }
 
   /* Stateful flow collection */
   if (is_add && !fm->initialized)
@@ -623,8 +678,8 @@ void vl_api_flowprobe_tx_interface_add_del_t_handler
       goto out;
     }
 
-  rv = flowprobe_tx_interface_add_del_feature
-    (fm, sw_if_index, mp->which, mp->is_add);
+  rv = flowprobe_interface_add_del_feature (fm, sw_if_index, mp->which,
+                                           FLOW_DIRECTION_TX, mp->is_add);
 
 out:
   BAD_SW_IF_INDEX_LABEL;
@@ -632,6 +687,91 @@ out:
   REPLY_MACRO (VL_API_FLOWPROBE_TX_INTERFACE_ADD_DEL_REPLY);
 }
 
+void
+vl_api_flowprobe_interface_add_del_t_handler (
+  vl_api_flowprobe_interface_add_del_t *mp)
+{
+  flowprobe_main_t *fm = &flowprobe_main;
+  vl_api_flowprobe_interface_add_del_reply_t *rmp;
+  u32 sw_if_index;
+  u8 which;
+  u8 direction;
+  bool is_add;
+  int rv = 0;
+
+  VALIDATE_SW_IF_INDEX (mp);
+
+  sw_if_index = ntohl (mp->sw_if_index);
+  is_add = mp->is_add;
+
+  if (mp->which == FLOWPROBE_WHICH_IP4)
+    which = FLOW_VARIANT_IP4;
+  else if (mp->which == FLOWPROBE_WHICH_IP6)
+    which = FLOW_VARIANT_IP6;
+  else if (mp->which == FLOWPROBE_WHICH_L2)
+    which = FLOW_VARIANT_L2;
+  else
+    {
+      clib_warning ("Invalid value of which");
+      rv = VNET_API_ERROR_INVALID_VALUE;
+      goto out;
+    }
+
+  if (mp->direction == FLOWPROBE_DIRECTION_RX)
+    direction = FLOW_DIRECTION_RX;
+  else if (mp->direction == FLOWPROBE_DIRECTION_TX)
+    direction = FLOW_DIRECTION_TX;
+  else if (mp->direction == FLOWPROBE_DIRECTION_BOTH)
+    direction = FLOW_DIRECTION_BOTH;
+  else
+    {
+      clib_warning ("Invalid value of direction");
+      rv = VNET_API_ERROR_INVALID_VALUE;
+      goto out;
+    }
+
+  if (fm->record == 0)
+    {
+      clib_warning ("Please specify flowprobe params record first");
+      rv = VNET_API_ERROR_CANNOT_ENABLE_DISABLE_FEATURE;
+      goto out;
+    }
+
+  rv = validate_feature_on_interface (fm, sw_if_index, which);
+  if (rv == 1)
+    {
+      if (is_add)
+       {
+         clib_warning ("Variant is already enabled for given interface");
+         rv = VNET_API_ERROR_ENTRY_ALREADY_EXISTS;
+         goto out;
+       }
+    }
+  else if (rv == 0)
+    {
+      clib_warning ("Interface has different variant enabled");
+      rv = VNET_API_ERROR_ENTRY_ALREADY_EXISTS;
+      goto out;
+    }
+  else if (rv == -1)
+    {
+      if (!is_add)
+       {
+         clib_warning ("Interface has no variant enabled");
+         rv = VNET_API_ERROR_NO_SUCH_ENTRY;
+         goto out;
+       }
+    }
+
+  rv = flowprobe_interface_add_del_feature (fm, sw_if_index, which, direction,
+                                           is_add);
+
+out:
+  BAD_SW_IF_INDEX_LABEL;
+
+  REPLY_MACRO (VL_API_FLOWPROBE_INTERFACE_ADD_DEL_REPLY);
+}
+
 #define vec_neg_search(v,E)         \
 ({              \
   word _v(i) = 0;         \
@@ -699,10 +839,25 @@ VLIB_PLUGIN_REGISTER () = {
 };
 /* *INDENT-ON* */
 
+u8 *
+format_flowprobe_direction (u8 *s, va_list *args)
+{
+  u8 *direction = va_arg (*args, u8 *);
+  if (*direction == FLOW_DIRECTION_RX)
+    s = format (s, "rx");
+  else if (*direction == FLOW_DIRECTION_TX)
+    s = format (s, "tx");
+  else if (*direction == FLOW_DIRECTION_BOTH)
+    s = format (s, "rx tx");
+
+  return s;
+}
+
 u8 *
 format_flowprobe_entry (u8 * s, va_list * args)
 {
   flowprobe_entry_t *e = va_arg (*args, flowprobe_entry_t *);
+  s = format (s, " %U", format_flowprobe_direction, &e->key.direction);
   s = format (s, " %d/%d", e->key.rx_sw_if_index, e->key.tx_sw_if_index);
 
   s = format (s, " %U %U", format_ethernet_address, &e->key.src_mac,
@@ -799,14 +954,15 @@ flowprobe_show_stats_fn (vlib_main_t * vm,
 }
 
 static clib_error_t *
-flowprobe_tx_interface_add_del_feature_command_fn (vlib_main_t * vm,
-                                                  unformat_input_t * input,
-                                                  vlib_cli_command_t * cmd)
+flowprobe_interface_add_del_feature_command_fn (vlib_main_t *vm,
+                                               unformat_input_t *input,
+                                               vlib_cli_command_t *cmd)
 {
   flowprobe_main_t *fm = &flowprobe_main;
   u32 sw_if_index = ~0;
   int is_add = 1;
   u8 which = FLOW_VARIANT_IP4;
+  flowprobe_direction_t direction = FLOW_DIRECTION_TX;
   int rv;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
@@ -821,6 +977,12 @@ flowprobe_tx_interface_add_del_feature_command_fn (vlib_main_t * vm,
        which = FLOW_VARIANT_IP6;
       else if (unformat (input, "l2"))
        which = FLOW_VARIANT_L2;
+      else if (unformat (input, "rx"))
+       direction = FLOW_DIRECTION_RX;
+      else if (unformat (input, "tx"))
+       direction = FLOW_DIRECTION_TX;
+      else if (unformat (input, "both"))
+       direction = FLOW_DIRECTION_BOTH;
       else
        break;
     }
@@ -842,9 +1004,16 @@ flowprobe_tx_interface_add_del_feature_command_fn (vlib_main_t * vm,
   else if (rv == 0)
     return clib_error_return (0,
                              "Interface has enable different datapath ...");
+  else if (rv == -1)
+    {
+      if (!is_add)
+       {
+         return clib_error_return (0, "Interface has no datapath enabled");
+       }
+    }
 
-  rv =
-    flowprobe_tx_interface_add_del_feature (fm, sw_if_index, which, is_add);
+  rv = flowprobe_interface_add_del_feature (fm, sw_if_index, which, direction,
+                                           is_add);
   switch (rv)
     {
     case 0:
@@ -881,9 +1050,10 @@ flowprobe_show_feature_command_fn (vlib_main_t * vm,
       continue;
 
     sw_if_index = which - fm->flow_per_interface;
-    vlib_cli_output (vm, " %U %U", format_vnet_sw_if_index_name,
+    vlib_cli_output (vm, " %U %U %U", format_vnet_sw_if_index_name,
                     vnet_get_main (), sw_if_index, format_flowprobe_feature,
-                    which);
+                    which, format_flowprobe_direction,
+                    &fm->direction_per_interface[sw_if_index]);
   }
   return 0;
 }
@@ -962,10 +1132,10 @@ flowprobe_show_params_command_fn (vlib_main_t * vm,
 ?*/
 /* *INDENT-OFF* */
 VLIB_CLI_COMMAND (flowprobe_enable_disable_command, static) = {
-    .path = "flowprobe feature add-del",
-    .short_help =
-    "flowprobe feature add-del <interface-name> <l2|ip4|ip6> disable",
-    .function = flowprobe_tx_interface_add_del_feature_command_fn,
+  .path = "flowprobe feature add-del",
+  .short_help = "flowprobe feature add-del <interface-name> [(l2|ip4|ip6)] "
+               "[(rx|tx|both)] [disable]",
+  .function = flowprobe_interface_add_del_feature_command_fn,
 };
 VLIB_CLI_COMMAND (flowprobe_params_command, static) = {
     .path = "flowprobe params",
index 2d28c81..3174a84 100644 (file)
@@ -45,13 +45,20 @@ typedef enum
 /* *INDENT-OFF* */
 typedef enum __attribute__ ((__packed__))
 {
-  FLOW_VARIANT_IP4,
+  FLOW_VARIANT_IP4 = 0,
   FLOW_VARIANT_IP6,
   FLOW_VARIANT_L2,
   FLOW_VARIANT_L2_IP4,
   FLOW_VARIANT_L2_IP6,
   FLOW_N_VARIANTS,
 } flowprobe_variant_t;
+
+typedef enum __attribute__ ((__packed__))
+{
+  FLOW_DIRECTION_RX = 0,
+  FLOW_DIRECTION_TX,
+  FLOW_DIRECTION_BOTH,
+} flowprobe_direction_t;
 /* *INDENT-ON* */
 
 STATIC_ASSERT (sizeof (flowprobe_variant_t) == 1,
@@ -85,6 +92,7 @@ typedef struct __attribute__ ((aligned (8))) {
   u16 src_port;
   u16 dst_port;
   flowprobe_variant_t which;
+  flowprobe_direction_t direction;
 } flowprobe_key_t;
 /* *INDENT-ON* */
 
@@ -149,6 +157,7 @@ typedef struct
 
   u16 template_per_flow[FLOW_N_VARIANTS];
   u8 *flow_per_interface;
+  u8 *direction_per_interface;
 
   /** convenience vlib_main_t pointer */
   vlib_main_t *vlib_main;
index 8ad9c88..4add41f 100644 (file)
@@ -10,8 +10,9 @@ feature enabled
 Sample configuration
 --------------------
 
-set ipfix exporter collector 192.168.6.2 src 192.168.6.1
-template-interval 20 port 4739 path-mtu 1500
+::
 
-flowprobe params record l3 active 20 passive 120 flowprobe feature
-add-del GigabitEthernet2/3/0 l2
+  set ipfix exporter collector 192.168.6.2 src 192.168.6.1 template-interval 20 port 4739 path-mtu 1450
+
+  flowprobe params record l3 active 20 passive 120
+  flowprobe feature add-del GigabitEthernet2/3/0 l2
index a694e45..ae2a3ed 100644 (file)
@@ -92,6 +92,63 @@ api_flowprobe_tx_interface_add_del (vat_main_t * vam)
   return ret;
 }
 
+static int
+api_flowprobe_interface_add_del (vat_main_t *vam)
+{
+  unformat_input_t *i = vam->input;
+  int enable_disable = 1;
+  u8 which = FLOWPROBE_WHICH_IP4;
+  u8 direction = FLOWPROBE_DIRECTION_TX;
+  u32 sw_if_index = ~0;
+  vl_api_flowprobe_interface_add_del_t *mp;
+  int ret;
+
+  /* Parse args required to build the message */
+  while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (i, "%U", unformat_sw_if_index, vam, &sw_if_index))
+       ;
+      else if (unformat (i, "sw_if_index %d", &sw_if_index))
+       ;
+      else if (unformat (i, "disable"))
+       enable_disable = 0;
+      else if (unformat (i, "ip4"))
+       which = FLOWPROBE_WHICH_IP4;
+      else if (unformat (i, "ip6"))
+       which = FLOWPROBE_WHICH_IP6;
+      else if (unformat (i, "l2"))
+       which = FLOWPROBE_WHICH_L2;
+      else if (unformat (i, "rx"))
+       direction = FLOWPROBE_DIRECTION_RX;
+      else if (unformat (i, "tx"))
+       direction = FLOWPROBE_DIRECTION_TX;
+      else if (unformat (i, "both"))
+       direction = FLOWPROBE_DIRECTION_BOTH;
+      else
+       break;
+    }
+
+  if (sw_if_index == ~0)
+    {
+      errmsg ("Missing interface name / explicit sw_if_index number\n");
+      return -99;
+    }
+
+  /* Construct the API message */
+  M (FLOWPROBE_INTERFACE_ADD_DEL, mp);
+  mp->sw_if_index = ntohl (sw_if_index);
+  mp->is_add = enable_disable;
+  mp->which = which;
+  mp->direction = direction;
+
+  /* Send it... */
+  S (mp);
+
+  /* Wait for a reply... */
+  W (ret);
+  return ret;
+}
+
 static int
 api_flowprobe_params (vat_main_t * vam)
 {
index 928d752..e9fa4b8 100644 (file)
@@ -99,9 +99,12 @@ format_flowprobe_trace (u8 * s, va_list * args)
   return s;
 }
 
-vlib_node_registration_t flowprobe_ip4_node;
-vlib_node_registration_t flowprobe_ip6_node;
-vlib_node_registration_t flowprobe_l2_node;
+vlib_node_registration_t flowprobe_input_ip4_node;
+vlib_node_registration_t flowprobe_input_ip6_node;
+vlib_node_registration_t flowprobe_input_l2_node;
+vlib_node_registration_t flowprobe_output_ip4_node;
+vlib_node_registration_t flowprobe_output_ip6_node;
+vlib_node_registration_t flowprobe_output_l2_node;
 
 /* No counters at the moment */
 #define foreach_flowprobe_error                        \
@@ -167,6 +170,11 @@ flowprobe_common_add (vlib_buffer_t * to_b, flowprobe_entry_t * e, u16 offset)
   clib_memcpy_fast (to_b->data + offset, &tx_if, sizeof (tx_if));
   offset += sizeof (tx_if);
 
+  /* Flow direction
+     0x00: ingress flow
+     0x01: egress flow */
+  to_b->data[offset++] = (e->key.direction == FLOW_DIRECTION_TX);
+
   /* packet delta count */
   u64 packetdelta = clib_host_to_net_u64 (e->packetcount);
   clib_memcpy_fast (to_b->data + offset, &packetdelta, sizeof (u64));
@@ -358,22 +366,27 @@ flowprobe_create (u32 my_cpu_number, flowprobe_key_t * k, u32 * poolindex)
 }
 
 static inline void
-add_to_flow_record_state (vlib_main_t * vm, vlib_node_runtime_t * node,
-                         flowprobe_main_t * fm, vlib_buffer_t * b,
+add_to_flow_record_state (vlib_main_t *vm, vlib_node_runtime_t *node,
+                         flowprobe_main_t *fm, vlib_buffer_t *b,
                          timestamp_nsec_t timestamp, u16 length,
-                         flowprobe_variant_t which, flowprobe_trace_t * t)
+                         flowprobe_variant_t which,
+                         flowprobe_direction_t direction,
+                         flowprobe_trace_t *t)
 {
   if (fm->disabled)
     return;
 
+  ASSERT (direction == FLOW_DIRECTION_RX || direction == FLOW_DIRECTION_TX);
+
   u32 my_cpu_number = vm->thread_index;
   u16 octets = 0;
 
   flowprobe_record_t flags = fm->context[which].flags;
   bool collect_ip4 = false, collect_ip6 = false;
   ASSERT (b);
-  ethernet_header_t *eth = vlib_buffer_get_current (b);
+  ethernet_header_t *eth = ethernet_buffer_get_header (b);
   u16 ethertype = clib_net_to_host_u16 (eth->type);
+  u16 l2_hdr_sz = sizeof (ethernet_header_t);
   /* *INDENT-OFF* */
   flowprobe_key_t k = {};
   /* *INDENT-ON* */
@@ -393,6 +406,7 @@ add_to_flow_record_state (vlib_main_t * vm, vlib_node_runtime_t * node,
   k.tx_sw_if_index = vnet_buffer (b)->sw_if_index[VLIB_TX];
 
   k.which = which;
+  k.direction = direction;
 
   if (flags & FLOW_RECORD_L2)
     {
@@ -409,12 +423,13 @@ add_to_flow_record_state (vlib_main_t * vm, vlib_node_runtime_t * node,
       while (clib_net_to_host_u16 (ethv->type) == ETHERNET_TYPE_VLAN)
        {
          ethv++;
+         l2_hdr_sz += sizeof (ethernet_vlan_header_tv_t);
        }
       k.ethertype = ethertype = clib_net_to_host_u16 ((ethv)->type);
     }
   if (collect_ip6 && ethertype == ETHERNET_TYPE_IP6)
     {
-      ip6 = (ip6_header_t *) (b->data + vnet_buffer (b)->l3_hdr_offset);
+      ip6 = (ip6_header_t *) (b->data + l2_hdr_sz);
       if (flags & FLOW_RECORD_L3)
        {
          k.src_address.as_u64[0] = ip6->src_address.as_u64[0];
@@ -433,7 +448,7 @@ add_to_flow_record_state (vlib_main_t * vm, vlib_node_runtime_t * node,
     }
   if (collect_ip4 && ethertype == ETHERNET_TYPE_IP4)
     {
-      ip4 = (ip4_header_t *) (b->data + vnet_buffer (b)->l3_hdr_offset);
+      ip4 = (ip4_header_t *) (b->data + l2_hdr_sz);
       if (flags & FLOW_RECORD_L3)
        {
          k.src_address.ip4.as_u32 = ip4->src_address.as_u32;
@@ -630,7 +645,7 @@ flowprobe_export_send (vlib_main_t * vm, vlib_buffer_t * b0,
     }
 
   vlib_put_frame_to_node (vm, ip4_lookup_node.index, f);
-  vlib_node_increment_counter (vm, flowprobe_l2_node.index,
+  vlib_node_increment_counter (vm, flowprobe_output_l2_node.index,
                               FLOWPROBE_ERROR_EXPORTED_PACKETS, 1);
 
   fm->context[which].frames_per_worker[my_cpu_number] = 0;
@@ -656,7 +671,7 @@ flowprobe_get_buffer (vlib_main_t * vm, flowprobe_variant_t which)
     {
       if (vlib_buffer_alloc (vm, &bi0, 1) != 1)
        {
-         vlib_node_increment_counter (vm, flowprobe_l2_node.index,
+         vlib_node_increment_counter (vm, flowprobe_output_l2_node.index,
                                       FLOWPROBE_ERROR_BUFFER, 1);
          return 0;
        }
@@ -730,9 +745,9 @@ flowprobe_export_entry (vlib_main_t * vm, flowprobe_entry_t * e)
 }
 
 uword
-flowprobe_node_fn (vlib_main_t * vm,
-                  vlib_node_runtime_t * node, vlib_frame_t * frame,
-                  flowprobe_variant_t which)
+flowprobe_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node,
+                  vlib_frame_t *frame, flowprobe_variant_t which,
+                  flowprobe_direction_t direction)
 {
   u32 n_left_from, *from, *to_next;
   flowprobe_next_t next_index;
@@ -792,20 +807,22 @@ flowprobe_node_fn (vlib_main_t * vm,
          u16 ethertype0 = clib_net_to_host_u16 (eh0->type);
 
          if (PREDICT_TRUE ((b0->flags & VNET_BUFFER_F_FLOW_REPORT) == 0))
-           add_to_flow_record_state (vm, node, fm, b0, timestamp, len0,
-                                     flowprobe_get_variant
-                                     (which, fm->context[which].flags,
-                                      ethertype0), 0);
+           add_to_flow_record_state (
+             vm, node, fm, b0, timestamp, len0,
+             flowprobe_get_variant (which, fm->context[which].flags,
+                                    ethertype0),
+             direction, 0);
 
          len1 = vlib_buffer_length_in_chain (vm, b1);
          ethernet_header_t *eh1 = vlib_buffer_get_current (b1);
          u16 ethertype1 = clib_net_to_host_u16 (eh1->type);
 
          if (PREDICT_TRUE ((b1->flags & VNET_BUFFER_F_FLOW_REPORT) == 0))
-           add_to_flow_record_state (vm, node, fm, b1, timestamp, len1,
-                                     flowprobe_get_variant
-                                     (which, fm->context[which].flags,
-                                      ethertype1), 0);
+           add_to_flow_record_state (
+             vm, node, fm, b1, timestamp, len1,
+             flowprobe_get_variant (which, fm->context[which].flags,
+                                    ethertype1),
+             direction, 0);
 
          /* verify speculative enqueues, maybe switch current next frame */
          vlib_validate_buffer_enqueue_x2 (vm, node, next_index,
@@ -843,10 +860,11 @@ flowprobe_node_fn (vlib_main_t * vm,
                                 && (b0->flags & VLIB_BUFFER_IS_TRACED)))
                t = vlib_add_trace (vm, node, b0, sizeof (*t));
 
-             add_to_flow_record_state (vm, node, fm, b0, timestamp, len0,
-                                       flowprobe_get_variant
-                                       (which, fm->context[which].flags,
-                                        ethertype0), t);
+             add_to_flow_record_state (
+               vm, node, fm, b0, timestamp, len0,
+               flowprobe_get_variant (which, fm->context[which].flags,
+                                      ethertype0),
+               direction, t);
            }
 
          /* verify speculative enqueue, maybe switch current next frame */
@@ -861,24 +879,51 @@ flowprobe_node_fn (vlib_main_t * vm,
 }
 
 static uword
-flowprobe_ip4_node_fn (vlib_main_t * vm,
-                      vlib_node_runtime_t * node, vlib_frame_t * frame)
+flowprobe_input_ip4_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node,
+                            vlib_frame_t *frame)
+{
+  return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_IP4,
+                           FLOW_DIRECTION_RX);
+}
+
+static uword
+flowprobe_input_ip6_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node,
+                            vlib_frame_t *frame)
+{
+  return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_IP6,
+                           FLOW_DIRECTION_RX);
+}
+
+static uword
+flowprobe_input_l2_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node,
+                           vlib_frame_t *frame)
+{
+  return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_L2,
+                           FLOW_DIRECTION_RX);
+}
+
+static uword
+flowprobe_output_ip4_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node,
+                             vlib_frame_t *frame)
 {
-  return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_IP4);
+  return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_IP4,
+                           FLOW_DIRECTION_TX);
 }
 
 static uword
-flowprobe_ip6_node_fn (vlib_main_t * vm,
-                      vlib_node_runtime_t * node, vlib_frame_t * frame)
+flowprobe_output_ip6_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node,
+                             vlib_frame_t *frame)
 {
-  return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_IP6);
+  return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_IP6,
+                           FLOW_DIRECTION_TX);
 }
 
 static uword
-flowprobe_l2_node_fn (vlib_main_t * vm,
-                     vlib_node_runtime_t * node, vlib_frame_t * frame)
+flowprobe_output_l2_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node,
+                            vlib_frame_t *frame)
 {
-  return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_L2);
+  return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_L2,
+                           FLOW_DIRECTION_TX);
 }
 
 static inline void
@@ -1012,35 +1057,68 @@ flowprobe_walker_process (vlib_main_t * vm,
 }
 
 /* *INDENT-OFF* */
-VLIB_REGISTER_NODE (flowprobe_ip4_node) = {
-  .function = flowprobe_ip4_node_fn,
-  .name = "flowprobe-ip4",
+VLIB_REGISTER_NODE (flowprobe_input_ip4_node) = {
+  .function = flowprobe_input_ip4_node_fn,
+  .name = "flowprobe-input-ip4",
+  .vector_size = sizeof (u32),
+  .format_trace = format_flowprobe_trace,
+  .type = VLIB_NODE_TYPE_INTERNAL,
+  .n_errors = ARRAY_LEN (flowprobe_error_strings),
+  .error_strings = flowprobe_error_strings,
+  .n_next_nodes = FLOWPROBE_N_NEXT,
+  .next_nodes = FLOWPROBE_NEXT_NODES,
+};
+VLIB_REGISTER_NODE (flowprobe_input_ip6_node) = {
+  .function = flowprobe_input_ip6_node_fn,
+  .name = "flowprobe-input-ip6",
+  .vector_size = sizeof (u32),
+  .format_trace = format_flowprobe_trace,
+  .type = VLIB_NODE_TYPE_INTERNAL,
+  .n_errors = ARRAY_LEN (flowprobe_error_strings),
+  .error_strings = flowprobe_error_strings,
+  .n_next_nodes = FLOWPROBE_N_NEXT,
+  .next_nodes = FLOWPROBE_NEXT_NODES,
+};
+VLIB_REGISTER_NODE (flowprobe_input_l2_node) = {
+  .function = flowprobe_input_l2_node_fn,
+  .name = "flowprobe-input-l2",
+  .vector_size = sizeof (u32),
+  .format_trace = format_flowprobe_trace,
+  .type = VLIB_NODE_TYPE_INTERNAL,
+  .n_errors = ARRAY_LEN (flowprobe_error_strings),
+  .error_strings = flowprobe_error_strings,
+  .n_next_nodes = FLOWPROBE_N_NEXT,
+  .next_nodes = FLOWPROBE_NEXT_NODES,
+};
+VLIB_REGISTER_NODE (flowprobe_output_ip4_node) = {
+  .function = flowprobe_output_ip4_node_fn,
+  .name = "flowprobe-output-ip4",
   .vector_size = sizeof (u32),
   .format_trace = format_flowprobe_trace,
   .type = VLIB_NODE_TYPE_INTERNAL,
-  .n_errors = ARRAY_LEN(flowprobe_error_strings),
+  .n_errors = ARRAY_LEN (flowprobe_error_strings),
   .error_strings = flowprobe_error_strings,
   .n_next_nodes = FLOWPROBE_N_NEXT,
   .next_nodes = FLOWPROBE_NEXT_NODES,
 };
-VLIB_REGISTER_NODE (flowprobe_ip6_node) = {
-  .function = flowprobe_ip6_node_fn,
-  .name = "flowprobe-ip6",
+VLIB_REGISTER_NODE (flowprobe_output_ip6_node) = {
+  .function = flowprobe_output_ip6_node_fn,
+  .name = "flowprobe-output-ip6",
   .vector_size = sizeof (u32),
   .format_trace = format_flowprobe_trace,
   .type = VLIB_NODE_TYPE_INTERNAL,
-  .n_errors = ARRAY_LEN(flowprobe_error_strings),
+  .n_errors = ARRAY_LEN (flowprobe_error_strings),
   .error_strings = flowprobe_error_strings,
   .n_next_nodes = FLOWPROBE_N_NEXT,
   .next_nodes = FLOWPROBE_NEXT_NODES,
 };
-VLIB_REGISTER_NODE (flowprobe_l2_node) = {
-  .function = flowprobe_l2_node_fn,
-  .name = "flowprobe-l2",
+VLIB_REGISTER_NODE (flowprobe_output_l2_node) = {
+  .function = flowprobe_output_l2_node_fn,
+  .name = "flowprobe-output-l2",
   .vector_size = sizeof (u32),
   .format_trace = format_flowprobe_trace,
   .type = VLIB_NODE_TYPE_INTERNAL,
-  .n_errors = ARRAY_LEN(flowprobe_error_strings),
+  .n_errors = ARRAY_LEN (flowprobe_error_strings),
   .error_strings = flowprobe_error_strings,
   .n_next_nodes = FLOWPROBE_N_NEXT,
   .next_nodes = FLOWPROBE_NEXT_NODES,
index 6b27179..1d86d19 100644 (file)
@@ -39,9 +39,11 @@ class VppCFLOW(VppObject):
         mtu=1024,
         datapath="l2",
         layer="l2 l3 l4",
+        direction="tx",
     ):
         self._test = test
         self._intf = intf
+        self._intf_obj = getattr(self._test, intf)
         self._active = active
         if passive == 0 or passive < active:
             self._passive = active + 1
@@ -49,6 +51,7 @@ class VppCFLOW(VppObject):
             self._passive = passive
         self._datapath = datapath  # l2 ip4 ip6
         self._collect = layer  # l2 l3 l4
+        self._direction = direction  # rx tx both
         self._timeout = timeout
         self._mtu = mtu
         self._configured = False
@@ -87,18 +90,32 @@ class VppCFLOW(VppObject):
             template_interval=self._timeout,
         )
 
-    def enable_flowprobe_feature(self):
-        self._test.vapi.ppcli(
-            "flowprobe feature add-del %s %s" % (self._intf, self._datapath)
+    def _enable_disable_flowprobe_feature(self, is_add):
+        which_map = {
+            "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
+            "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
+            "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
+        }
+        direction_map = {
+            "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
+            "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
+            "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
+        }
+        self._test.vapi.flowprobe_interface_add_del(
+            is_add=is_add,
+            which=which_map[self._datapath],
+            direction=direction_map[self._direction],
+            sw_if_index=self._intf_obj.sw_if_index,
         )
 
+    def enable_flowprobe_feature(self):
+        self._enable_disable_flowprobe_feature(is_add=True)
+
     def disable_exporter(self):
         self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
 
     def disable_flowprobe_feature(self):
-        self._test.vapi.cli(
-            "flowprobe feature add-del %s %s disable" % (self._intf, self._datapath)
-        )
+        self._enable_disable_flowprobe_feature(is_add=False)
 
     def object_id(self):
         return "ipfix-collector-%s-%s" % (self._src, self.dst)
@@ -261,8 +278,6 @@ class MethodHolder(VppTestCase):
                         continue
 
                     for field in data_set:
-                        if field not in record.keys():
-                            continue
                         value = data_set[field]
                         if value == "octets":
                             value = ip_layer.len
@@ -434,6 +449,8 @@ class Flowprobe(MethodHolder):
             self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
             # egress interface
             self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
+            # direction
+            self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
             # packets
             self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
             # src mac
@@ -465,24 +482,25 @@ class Flowprobe(MethodHolder):
         self.logger.info("FFP_TEST_FINISH_0000")
 
 
-@tag_fixme_vpp_workers
-class Datapath(MethodHolder):
+class DatapathTestsHolder(object):
     """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
 
     @classmethod
     def setUpClass(cls):
-        super(Datapath, cls).setUpClass()
+        super(DatapathTestsHolder, cls).setUpClass()
 
     @classmethod
     def tearDownClass(cls):
-        super(Datapath, cls).tearDownClass()
+        super(DatapathTestsHolder, cls).tearDownClass()
 
     def test_templatesL2(self):
         """verify template on L2 datapath"""
         self.logger.info("FFP_TEST_START_0000")
         self.pg_enable_capture(self.pg_interfaces)
 
-        ipfix = VppCFLOW(test=self, layer="l2")
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, layer="l2", direction=self.direction
+        )
         ipfix.add_vpp_config()
 
         # template packet should arrive immediately
@@ -499,7 +517,9 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, layer="l2")
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, layer="l2", direction=self.direction
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -513,7 +533,10 @@ class Datapath(MethodHolder):
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
-            ipfix_decoder, capture, cflow, {2: "packets", 256: 8}
+            ipfix_decoder,
+            capture,
+            cflow,
+            {2: "packets", 256: 8, 61: (self.direction == "tx")},
         )
         self.collector.get_capture(2)
 
@@ -526,7 +549,9 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, layer="l3")
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, layer="l3", direction=self.direction
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -543,7 +568,13 @@ class Datapath(MethodHolder):
             ipfix_decoder,
             capture,
             cflow,
-            {2: "packets", 4: 17, 8: "src_ip", 12: "dst_ip"},
+            {
+                2: "packets",
+                4: 17,
+                8: "src_ip",
+                12: "dst_ip",
+                61: (self.direction == "tx"),
+            },
         )
 
         self.collector.get_capture(3)
@@ -557,7 +588,9 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, layer="l4")
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, layer="l4", direction=self.direction
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -571,7 +604,10 @@ class Datapath(MethodHolder):
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
-            ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"}
+            ipfix_decoder,
+            capture,
+            cflow,
+            {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
         )
 
         self.collector.get_capture(3)
@@ -585,7 +621,9 @@ class Datapath(MethodHolder):
 
         self.pg_enable_capture(self.pg_interfaces)
 
-        ipfix = VppCFLOW(test=self, datapath="ip4")
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, datapath="ip4", direction=self.direction
+        )
         ipfix.add_vpp_config()
 
         # template packet should arrive immediately
@@ -603,7 +641,13 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, intf="pg4", layer="l2", datapath="ip4")
+        ipfix = VppCFLOW(
+            test=self,
+            intf=self.intf2,
+            layer="l2",
+            datapath="ip4",
+            direction=self.direction,
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -617,7 +661,10 @@ class Datapath(MethodHolder):
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
-            ipfix_decoder, capture, cflow, {2: "packets", 256: 8}
+            ipfix_decoder,
+            capture,
+            cflow,
+            {2: "packets", 256: 8, 61: (self.direction == "tx")},
         )
 
         # expected two templates and one cflow packet
@@ -632,7 +679,13 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, intf="pg4", layer="l3", datapath="ip4")
+        ipfix = VppCFLOW(
+            test=self,
+            intf=self.intf2,
+            layer="l3",
+            datapath="ip4",
+            direction=self.direction,
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -649,7 +702,13 @@ class Datapath(MethodHolder):
             ipfix_decoder,
             capture,
             cflow,
-            {1: "octets", 2: "packets", 8: "src_ip", 12: "dst_ip"},
+            {
+                1: "octets",
+                2: "packets",
+                8: "src_ip",
+                12: "dst_ip",
+                61: (self.direction == "tx"),
+            },
         )
 
         # expected two templates and one cflow packet
@@ -664,7 +723,13 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, intf="pg4", layer="l4", datapath="ip4")
+        ipfix = VppCFLOW(
+            test=self,
+            intf=self.intf2,
+            layer="l4",
+            datapath="ip4",
+            direction=self.direction,
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -678,7 +743,10 @@ class Datapath(MethodHolder):
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
-            ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"}
+            ipfix_decoder,
+            capture,
+            cflow,
+            {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
         )
 
         # expected two templates and one cflow packet
@@ -692,7 +760,9 @@ class Datapath(MethodHolder):
         self.logger.info("FFP_TEST_START_0000")
         self.pg_enable_capture(self.pg_interfaces)
 
-        ipfix = VppCFLOW(test=self, datapath="ip6")
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, datapath="ip6", direction=self.direction
+        )
         ipfix.add_vpp_config()
 
         # template packet should arrive immediately
@@ -709,7 +779,13 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, intf="pg6", layer="l2", datapath="ip6")
+        ipfix = VppCFLOW(
+            test=self,
+            intf=self.intf3,
+            layer="l2",
+            datapath="ip6",
+            direction=self.direction,
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -723,7 +799,11 @@ class Datapath(MethodHolder):
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
-            ipfix_decoder, capture, cflow, {2: "packets", 256: 56710}, ip_ver="v6"
+            ipfix_decoder,
+            capture,
+            cflow,
+            {2: "packets", 256: 56710, 61: (self.direction == "tx")},
+            ip_ver="v6",
         )
 
         # expected two templates and one cflow packet
@@ -738,7 +818,13 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, intf="pg6", layer="l3", datapath="ip6")
+        ipfix = VppCFLOW(
+            test=self,
+            intf=self.intf3,
+            layer="l3",
+            datapath="ip6",
+            direction=self.direction,
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -755,7 +841,7 @@ class Datapath(MethodHolder):
             ipfix_decoder,
             capture,
             cflow,
-            {2: "packets", 27: "src_ip", 28: "dst_ip"},
+            {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
             ip_ver="v6",
         )
 
@@ -771,7 +857,13 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, intf="pg6", layer="l4", datapath="ip6")
+        ipfix = VppCFLOW(
+            test=self,
+            intf=self.intf3,
+            layer="l4",
+            datapath="ip6",
+            direction=self.direction,
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -788,7 +880,7 @@ class Datapath(MethodHolder):
             ipfix_decoder,
             capture,
             cflow,
-            {2: "packets", 7: "sport", 11: "dport"},
+            {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
             ip_ver="v6",
         )
 
@@ -804,7 +896,7 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self)
+        ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -824,12 +916,12 @@ class Datapath(MethodHolder):
         self.logger.info("FFP_TEST_FINISH_0001")
 
     def test_0002(self):
-        """no timers, two CFLOW packets (mtu=256), 3 Flows in each"""
+        """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
         self.logger.info("FFP_TEST_START_0002")
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, mtu=256)
+        ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -852,6 +944,26 @@ class Datapath(MethodHolder):
         self.logger.info("FFP_TEST_FINISH_0002")
 
 
+@tag_fixme_vpp_workers
+class DatapathTx(MethodHolder, DatapathTestsHolder):
+    """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
+
+    intf1 = "pg2"
+    intf2 = "pg4"
+    intf3 = "pg6"
+    direction = "tx"
+
+
+@tag_fixme_vpp_workers
+class DatapathRx(MethodHolder, DatapathTestsHolder):
+    """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
+
+    intf1 = "pg1"
+    intf2 = "pg3"
+    intf3 = "pg5"
+    direction = "rx"
+
+
 @unittest.skipUnless(config.extended, "part of extended tests")
 class DisableIPFIX(MethodHolder):
     """Disable IPFIX"""