+ def test_interface_addr(self):
+ """ Acquire SNAT addresses from interface """
+ self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
+
+ # no address in NAT pool
+ adresses = self.vapi.snat_address_dump()
+ self.assertEqual(0, len(adresses))
+
+ # configure interface address and check NAT address pool
+ self.pg7.config_ip4()
+ adresses = self.vapi.snat_address_dump()
+ self.assertEqual(1, len(adresses))
+ self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
+
+ # remove interface address and check NAT address pool
+ self.pg7.unconfig_ip4()
+ adresses = self.vapi.snat_address_dump()
+ self.assertEqual(0, len(adresses))
+
+ def test_interface_addr_static_mapping(self):
+ """ Static mapping with addresses from interface """
+ self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
+ self.snat_add_static_mapping('1.2.3.4',
+ external_sw_if_index=self.pg7.sw_if_index)
+
+ # no static mappings
+ static_mappings = self.vapi.snat_static_mapping_dump()
+ self.assertEqual(0, len(static_mappings))
+
+ # configure interface address and check static mappings
+ self.pg7.config_ip4()
+ static_mappings = self.vapi.snat_static_mapping_dump()
+ self.assertEqual(1, len(static_mappings))
+ self.assertEqual(static_mappings[0].external_ip_address[0:4],
+ self.pg7.local_ip4n)
+
+ # remove interface address and check static mappings
+ self.pg7.unconfig_ip4()
+ static_mappings = self.vapi.snat_static_mapping_dump()
+ self.assertEqual(0, len(static_mappings))
+
+ def test_ipfix_nat44_sess(self):
+ """ S-NAT IPFIX logging NAT44 session created/delted """
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
+ src_address=self.pg3.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.snat_ipfix()
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+ self.snat_add_address(self.snat_addr, is_add=0)
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg3.get_capture(3)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_nat44_ses(data)
+
+ def test_ipfix_addr_exhausted(self):
+ """ S-NAT IPFIX logging NAT addresses exhausted """
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
+ src_address=self.pg3.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.snat_ipfix()
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=3025))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(0)
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg3.get_capture(3)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_addr_exhausted(data)
+