7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.data import IP_PROTOS
13 from scapy.packet import bind_layers
15 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
16 from time import sleep
19 class MethodHolder(VppTestCase):
20 """ SNAT create capture and verify method holder """
24 super(MethodHolder, cls).setUpClass()
27 super(MethodHolder, self).tearDown()
29 def create_stream_in(self, in_if, out_if, ttl=64):
31 Create packet stream for inside network
33 :param in_if: Inside interface
34 :param out_if: Outside interface
35 :param ttl: TTL of generated packets
39 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
40 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
41 TCP(sport=self.tcp_port_in, dport=20))
45 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
46 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
47 UDP(sport=self.udp_port_in, dport=20))
51 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
52 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
53 ICMP(id=self.icmp_id_in, type='echo-request'))
58 def create_stream_in_ip6(self, in_if, out_if, hlim=64):
60 Create IPv6 packet stream for inside network
62 :param in_if: Inside interface
63 :param out_if: Outside interface
64 :param ttl: Hop Limit of generated packets
67 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
69 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
70 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
71 TCP(sport=self.tcp_port_in, dport=20))
75 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
76 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
77 UDP(sport=self.udp_port_in, dport=20))
81 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
82 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
83 ICMPv6EchoRequest(id=self.icmp_id_in))
88 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
90 Create packet stream for outside network
92 :param out_if: Outside interface
93 :param dst_ip: Destination IP address (Default use global SNAT address)
94 :param ttl: TTL of generated packets
97 dst_ip = self.snat_addr
100 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
101 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
102 TCP(dport=self.tcp_port_out, sport=20))
106 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
107 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
108 UDP(dport=self.udp_port_out, sport=20))
112 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
113 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
114 ICMP(id=self.icmp_id_out, type='echo-reply'))
119 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
120 packet_num=3, dst_ip=None):
122 Verify captured packets on outside network
124 :param capture: Captured packets
125 :param nat_ip: Translated IP address (Default use global SNAT address)
126 :param same_port: Sorce port number is not translated (Default False)
127 :param packet_num: Expected number of packets (Default 3)
128 :param dst_ip: Destination IP address (Default do not verify)
131 nat_ip = self.snat_addr
132 self.assertEqual(packet_num, len(capture))
133 for packet in capture:
135 self.assertEqual(packet[IP].src, nat_ip)
136 if dst_ip is not None:
137 self.assertEqual(packet[IP].dst, dst_ip)
138 if packet.haslayer(TCP):
140 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
143 packet[TCP].sport, self.tcp_port_in)
144 self.tcp_port_out = packet[TCP].sport
145 elif packet.haslayer(UDP):
147 self.assertEqual(packet[UDP].sport, self.udp_port_in)
150 packet[UDP].sport, self.udp_port_in)
151 self.udp_port_out = packet[UDP].sport
154 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
156 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
157 self.icmp_id_out = packet[ICMP].id
159 self.logger.error(ppp("Unexpected or invalid packet "
160 "(outside network):", packet))
163 def verify_capture_in(self, capture, in_if, packet_num=3):
165 Verify captured packets on inside network
167 :param capture: Captured packets
168 :param in_if: Inside interface
169 :param packet_num: Expected number of packets (Default 3)
171 self.assertEqual(packet_num, len(capture))
172 for packet in capture:
174 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
175 if packet.haslayer(TCP):
176 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
177 elif packet.haslayer(UDP):
178 self.assertEqual(packet[UDP].dport, self.udp_port_in)
180 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
182 self.logger.error(ppp("Unexpected or invalid packet "
183 "(inside network):", packet))
186 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
188 Verify captured IPv6 packets on inside network
190 :param capture: Captured packets
191 :param src_ip: Source IP
192 :param dst_ip: Destination IP address
193 :param packet_num: Expected number of packets (Default 3)
195 self.assertEqual(packet_num, len(capture))
196 for packet in capture:
198 self.assertEqual(packet[IPv6].src, src_ip)
199 self.assertEqual(packet[IPv6].dst, dst_ip)
200 if packet.haslayer(TCP):
201 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
202 elif packet.haslayer(UDP):
203 self.assertEqual(packet[UDP].dport, self.udp_port_in)
205 self.assertEqual(packet[ICMPv6EchoReply].id,
208 self.logger.error(ppp("Unexpected or invalid packet "
209 "(inside network):", packet))
212 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
214 Verify captured packet that don't have to be translated
216 :param capture: Captured packets
217 :param ingress_if: Ingress interface
218 :param egress_if: Egress interface
220 for packet in capture:
222 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
223 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
224 if packet.haslayer(TCP):
225 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
226 elif packet.haslayer(UDP):
227 self.assertEqual(packet[UDP].sport, self.udp_port_in)
229 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
231 self.logger.error(ppp("Unexpected or invalid packet "
232 "(inside network):", packet))
235 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
236 packet_num=3, icmp_type=11):
238 Verify captured packets with ICMP errors on outside network
240 :param capture: Captured packets
241 :param src_ip: Translated IP address or IP address of VPP
242 (Default use global SNAT address)
243 :param packet_num: Expected number of packets (Default 3)
244 :param icmp_type: Type of error ICMP packet
245 we are expecting (Default 11)
248 src_ip = self.snat_addr
249 self.assertEqual(packet_num, len(capture))
250 for packet in capture:
252 self.assertEqual(packet[IP].src, src_ip)
253 self.assertTrue(packet.haslayer(ICMP))
255 self.assertEqual(icmp.type, icmp_type)
256 self.assertTrue(icmp.haslayer(IPerror))
257 inner_ip = icmp[IPerror]
258 if inner_ip.haslayer(TCPerror):
259 self.assertEqual(inner_ip[TCPerror].dport,
261 elif inner_ip.haslayer(UDPerror):
262 self.assertEqual(inner_ip[UDPerror].dport,
265 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
267 self.logger.error(ppp("Unexpected or invalid packet "
268 "(outside network):", packet))
271 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
274 Verify captured packets with ICMP errors on inside network
276 :param capture: Captured packets
277 :param in_if: Inside interface
278 :param packet_num: Expected number of packets (Default 3)
279 :param icmp_type: Type of error ICMP packet
280 we are expecting (Default 11)
282 self.assertEqual(packet_num, len(capture))
283 for packet in capture:
285 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
286 self.assertTrue(packet.haslayer(ICMP))
288 self.assertEqual(icmp.type, icmp_type)
289 self.assertTrue(icmp.haslayer(IPerror))
290 inner_ip = icmp[IPerror]
291 if inner_ip.haslayer(TCPerror):
292 self.assertEqual(inner_ip[TCPerror].sport,
294 elif inner_ip.haslayer(UDPerror):
295 self.assertEqual(inner_ip[UDPerror].sport,
298 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
300 self.logger.error(ppp("Unexpected or invalid packet "
301 "(inside network):", packet))
304 def verify_ipfix_nat44_ses(self, data):
306 Verify IPFIX NAT44 session create/delete event
308 :param data: Decoded IPFIX data records
310 nat44_ses_create_num = 0
311 nat44_ses_delete_num = 0
312 self.assertEqual(6, len(data))
315 self.assertIn(ord(record[230]), [4, 5])
316 if ord(record[230]) == 4:
317 nat44_ses_create_num += 1
319 nat44_ses_delete_num += 1
321 self.assertEqual(self.pg0.remote_ip4n, record[8])
322 # postNATSourceIPv4Address
323 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
326 self.assertEqual(struct.pack("!I", 0), record[234])
327 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
328 if IP_PROTOS.icmp == ord(record[4]):
329 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
330 self.assertEqual(struct.pack("!H", self.icmp_id_out),
332 elif IP_PROTOS.tcp == ord(record[4]):
333 self.assertEqual(struct.pack("!H", self.tcp_port_in),
335 self.assertEqual(struct.pack("!H", self.tcp_port_out),
337 elif IP_PROTOS.udp == ord(record[4]):
338 self.assertEqual(struct.pack("!H", self.udp_port_in),
340 self.assertEqual(struct.pack("!H", self.udp_port_out),
343 self.fail("Invalid protocol")
344 self.assertEqual(3, nat44_ses_create_num)
345 self.assertEqual(3, nat44_ses_delete_num)
347 def verify_ipfix_addr_exhausted(self, data):
349 Verify IPFIX NAT addresses event
351 :param data: Decoded IPFIX data records
353 self.assertEqual(1, len(data))
356 self.assertEqual(ord(record[230]), 3)
358 self.assertEqual(struct.pack("!I", 0), record[283])
361 class TestSNAT(MethodHolder):
362 """ SNAT Test Cases """
366 super(TestSNAT, cls).setUpClass()
369 cls.tcp_port_in = 6303
370 cls.tcp_port_out = 6303
371 cls.udp_port_in = 6304
372 cls.udp_port_out = 6304
373 cls.icmp_id_in = 6305
374 cls.icmp_id_out = 6305
375 cls.snat_addr = '10.0.0.3'
376 cls.ipfix_src_port = 4739
377 cls.ipfix_domain_id = 1
379 cls.create_pg_interfaces(range(9))
380 cls.interfaces = list(cls.pg_interfaces[0:4])
382 for i in cls.interfaces:
387 cls.pg0.generate_remote_hosts(3)
388 cls.pg0.configure_ipv4_neighbors()
390 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
392 cls.pg4._local_ip4 = "172.16.255.1"
393 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
394 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
395 cls.pg4.set_table_ip4(10)
396 cls.pg5._local_ip4 = "172.16.255.3"
397 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
398 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
399 cls.pg5.set_table_ip4(10)
400 cls.pg6._local_ip4 = "172.16.255.1"
401 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
402 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
403 cls.pg6.set_table_ip4(20)
404 for i in cls.overlapping_interfaces:
413 super(TestSNAT, cls).tearDownClass()
416 def clear_snat(self):
418 Clear SNAT configuration.
420 # I found no elegant way to do this
421 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
422 dst_address_length=32,
423 next_hop_address=self.pg7.remote_ip4n,
424 next_hop_sw_if_index=self.pg7.sw_if_index,
426 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
427 dst_address_length=32,
428 next_hop_address=self.pg8.remote_ip4n,
429 next_hop_sw_if_index=self.pg8.sw_if_index,
432 for intf in [self.pg7, self.pg8]:
433 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
435 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
440 if self.pg7.has_ip4_config:
441 self.pg7.unconfig_ip4()
443 interfaces = self.vapi.snat_interface_addr_dump()
444 for intf in interfaces:
445 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
447 self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
448 domain_id=self.ipfix_domain_id)
449 self.ipfix_src_port = 4739
450 self.ipfix_domain_id = 1
452 interfaces = self.vapi.snat_interface_dump()
453 for intf in interfaces:
454 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
458 static_mappings = self.vapi.snat_static_mapping_dump()
459 for sm in static_mappings:
460 self.vapi.snat_add_static_mapping(sm.local_ip_address,
461 sm.external_ip_address,
462 local_port=sm.local_port,
463 external_port=sm.external_port,
464 addr_only=sm.addr_only,
466 protocol=sm.protocol,
469 adresses = self.vapi.snat_address_dump()
470 for addr in adresses:
471 self.vapi.snat_add_address_range(addr.ip_address,
475 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
476 local_port=0, external_port=0, vrf_id=0,
477 is_add=1, external_sw_if_index=0xFFFFFFFF,
480 Add/delete S-NAT static mapping
482 :param local_ip: Local IP address
483 :param external_ip: External IP address
484 :param local_port: Local port number (Optional)
485 :param external_port: External port number (Optional)
486 :param vrf_id: VRF ID (Default 0)
487 :param is_add: 1 if add, 0 if delete (Default add)
488 :param external_sw_if_index: External interface instead of IP address
489 :param proto: IP protocol (Mandatory if port specified)
492 if local_port and external_port:
494 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
495 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
496 self.vapi.snat_add_static_mapping(
499 external_sw_if_index,
507 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
509 Add/delete S-NAT address
511 :param ip: IP address
512 :param is_add: 1 if add, 0 if delete (Default add)
514 snat_addr = socket.inet_pton(socket.AF_INET, ip)
515 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
518 def test_dynamic(self):
519 """ SNAT dynamic translation test """
521 self.snat_add_address(self.snat_addr)
522 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
523 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
527 pkts = self.create_stream_in(self.pg0, self.pg1)
528 self.pg0.add_stream(pkts)
529 self.pg_enable_capture(self.pg_interfaces)
531 capture = self.pg1.get_capture(len(pkts))
532 self.verify_capture_out(capture)
535 pkts = self.create_stream_out(self.pg1)
536 self.pg1.add_stream(pkts)
537 self.pg_enable_capture(self.pg_interfaces)
539 capture = self.pg0.get_capture(len(pkts))
540 self.verify_capture_in(capture, self.pg0)
542 def test_dynamic_icmp_errors_in2out_ttl_1(self):
543 """ SNAT handling of client packets with TTL=1 """
545 self.snat_add_address(self.snat_addr)
546 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
547 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
550 # Client side - generate traffic
551 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
552 self.pg0.add_stream(pkts)
553 self.pg_enable_capture(self.pg_interfaces)
556 # Client side - verify ICMP type 11 packets
557 capture = self.pg0.get_capture(len(pkts))
558 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
560 def test_dynamic_icmp_errors_out2in_ttl_1(self):
561 """ SNAT handling of server packets with TTL=1 """
563 self.snat_add_address(self.snat_addr)
564 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
565 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
568 # Client side - create sessions
569 pkts = self.create_stream_in(self.pg0, self.pg1)
570 self.pg0.add_stream(pkts)
571 self.pg_enable_capture(self.pg_interfaces)
574 # Server side - generate traffic
575 capture = self.pg1.get_capture(len(pkts))
576 self.verify_capture_out(capture)
577 pkts = self.create_stream_out(self.pg1, ttl=1)
578 self.pg1.add_stream(pkts)
579 self.pg_enable_capture(self.pg_interfaces)
582 # Server side - verify ICMP type 11 packets
583 capture = self.pg1.get_capture(len(pkts))
584 self.verify_capture_out_with_icmp_errors(capture,
585 src_ip=self.pg1.local_ip4)
587 def test_dynamic_icmp_errors_in2out_ttl_2(self):
588 """ SNAT handling of error responses to client packets with TTL=2 """
590 self.snat_add_address(self.snat_addr)
591 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
592 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
595 # Client side - generate traffic
596 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
597 self.pg0.add_stream(pkts)
598 self.pg_enable_capture(self.pg_interfaces)
601 # Server side - simulate ICMP type 11 response
602 capture = self.pg1.get_capture(len(pkts))
603 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
604 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
605 ICMP(type=11) / packet[IP] for packet in capture]
606 self.pg1.add_stream(pkts)
607 self.pg_enable_capture(self.pg_interfaces)
610 # Client side - verify ICMP type 11 packets
611 capture = self.pg0.get_capture(len(pkts))
612 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
614 def test_dynamic_icmp_errors_out2in_ttl_2(self):
615 """ SNAT handling of error responses to server packets with TTL=2 """
617 self.snat_add_address(self.snat_addr)
618 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
619 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
622 # Client side - create sessions
623 pkts = self.create_stream_in(self.pg0, self.pg1)
624 self.pg0.add_stream(pkts)
625 self.pg_enable_capture(self.pg_interfaces)
628 # Server side - generate traffic
629 capture = self.pg1.get_capture(len(pkts))
630 self.verify_capture_out(capture)
631 pkts = self.create_stream_out(self.pg1, ttl=2)
632 self.pg1.add_stream(pkts)
633 self.pg_enable_capture(self.pg_interfaces)
636 # Client side - simulate ICMP type 11 response
637 capture = self.pg0.get_capture(len(pkts))
638 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
639 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
640 ICMP(type=11) / packet[IP] for packet in capture]
641 self.pg0.add_stream(pkts)
642 self.pg_enable_capture(self.pg_interfaces)
645 # Server side - verify ICMP type 11 packets
646 capture = self.pg1.get_capture(len(pkts))
647 self.verify_capture_out_with_icmp_errors(capture)
649 def test_ping_out_interface_from_outside(self):
650 """ Ping SNAT out interface from outside network """
652 self.snat_add_address(self.snat_addr)
653 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
654 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
657 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
658 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
659 ICMP(id=self.icmp_id_out, type='echo-request'))
661 self.pg1.add_stream(pkts)
662 self.pg_enable_capture(self.pg_interfaces)
664 capture = self.pg1.get_capture(len(pkts))
665 self.assertEqual(1, len(capture))
668 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
669 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
670 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
671 self.assertEqual(packet[ICMP].type, 0) # echo reply
673 self.logger.error(ppp("Unexpected or invalid packet "
674 "(outside network):", packet))
677 def test_ping_internal_host_from_outside(self):
678 """ Ping internal host from outside network """
680 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
681 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
682 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
686 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
687 IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
688 ICMP(id=self.icmp_id_out, type='echo-request'))
689 self.pg1.add_stream(pkt)
690 self.pg_enable_capture(self.pg_interfaces)
692 capture = self.pg0.get_capture(1)
693 self.verify_capture_in(capture, self.pg0, packet_num=1)
694 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
697 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
698 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
699 ICMP(id=self.icmp_id_in, type='echo-reply'))
700 self.pg0.add_stream(pkt)
701 self.pg_enable_capture(self.pg_interfaces)
703 capture = self.pg1.get_capture(1)
704 self.verify_capture_out(capture, same_port=True, packet_num=1)
705 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
707 def test_static_in(self):
708 """ SNAT 1:1 NAT initialized from inside network """
711 self.tcp_port_out = 6303
712 self.udp_port_out = 6304
713 self.icmp_id_out = 6305
715 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
716 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
717 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
721 pkts = self.create_stream_in(self.pg0, self.pg1)
722 self.pg0.add_stream(pkts)
723 self.pg_enable_capture(self.pg_interfaces)
725 capture = self.pg1.get_capture(len(pkts))
726 self.verify_capture_out(capture, nat_ip, True)
729 pkts = self.create_stream_out(self.pg1, nat_ip)
730 self.pg1.add_stream(pkts)
731 self.pg_enable_capture(self.pg_interfaces)
733 capture = self.pg0.get_capture(len(pkts))
734 self.verify_capture_in(capture, self.pg0)
736 def test_static_out(self):
737 """ SNAT 1:1 NAT initialized from outside network """
740 self.tcp_port_out = 6303
741 self.udp_port_out = 6304
742 self.icmp_id_out = 6305
744 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
745 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
746 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
750 pkts = self.create_stream_out(self.pg1, nat_ip)
751 self.pg1.add_stream(pkts)
752 self.pg_enable_capture(self.pg_interfaces)
754 capture = self.pg0.get_capture(len(pkts))
755 self.verify_capture_in(capture, self.pg0)
758 pkts = self.create_stream_in(self.pg0, self.pg1)
759 self.pg0.add_stream(pkts)
760 self.pg_enable_capture(self.pg_interfaces)
762 capture = self.pg1.get_capture(len(pkts))
763 self.verify_capture_out(capture, nat_ip, True)
765 def test_static_with_port_in(self):
766 """ SNAT 1:1 NAT with port initialized from inside network """
768 self.tcp_port_out = 3606
769 self.udp_port_out = 3607
770 self.icmp_id_out = 3608
772 self.snat_add_address(self.snat_addr)
773 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
774 self.tcp_port_in, self.tcp_port_out,
776 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
777 self.udp_port_in, self.udp_port_out,
779 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
780 self.icmp_id_in, self.icmp_id_out,
781 proto=IP_PROTOS.icmp)
782 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
783 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
787 pkts = self.create_stream_in(self.pg0, self.pg1)
788 self.pg0.add_stream(pkts)
789 self.pg_enable_capture(self.pg_interfaces)
791 capture = self.pg1.get_capture(len(pkts))
792 self.verify_capture_out(capture)
795 pkts = self.create_stream_out(self.pg1)
796 self.pg1.add_stream(pkts)
797 self.pg_enable_capture(self.pg_interfaces)
799 capture = self.pg0.get_capture(len(pkts))
800 self.verify_capture_in(capture, self.pg0)
802 def test_static_with_port_out(self):
803 """ SNAT 1:1 NAT with port initialized from outside network """
805 self.tcp_port_out = 30606
806 self.udp_port_out = 30607
807 self.icmp_id_out = 30608
809 self.snat_add_address(self.snat_addr)
810 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
811 self.tcp_port_in, self.tcp_port_out,
813 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
814 self.udp_port_in, self.udp_port_out,
816 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
817 self.icmp_id_in, self.icmp_id_out,
818 proto=IP_PROTOS.icmp)
819 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
820 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
824 pkts = self.create_stream_out(self.pg1)
825 self.pg1.add_stream(pkts)
826 self.pg_enable_capture(self.pg_interfaces)
828 capture = self.pg0.get_capture(len(pkts))
829 self.verify_capture_in(capture, self.pg0)
832 pkts = self.create_stream_in(self.pg0, self.pg1)
833 self.pg0.add_stream(pkts)
834 self.pg_enable_capture(self.pg_interfaces)
836 capture = self.pg1.get_capture(len(pkts))
837 self.verify_capture_out(capture)
839 def test_static_vrf_aware(self):
840 """ SNAT 1:1 NAT VRF awareness """
842 nat_ip1 = "10.0.0.30"
843 nat_ip2 = "10.0.0.40"
844 self.tcp_port_out = 6303
845 self.udp_port_out = 6304
846 self.icmp_id_out = 6305
848 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
850 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
852 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
854 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
855 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
857 # inside interface VRF match SNAT static mapping VRF
858 pkts = self.create_stream_in(self.pg4, self.pg3)
859 self.pg4.add_stream(pkts)
860 self.pg_enable_capture(self.pg_interfaces)
862 capture = self.pg3.get_capture(len(pkts))
863 self.verify_capture_out(capture, nat_ip1, True)
865 # inside interface VRF don't match SNAT static mapping VRF (packets
867 pkts = self.create_stream_in(self.pg0, self.pg3)
868 self.pg0.add_stream(pkts)
869 self.pg_enable_capture(self.pg_interfaces)
871 self.pg3.assert_nothing_captured()
873 def test_multiple_inside_interfaces(self):
874 """ SNAT multiple inside interfaces (non-overlapping address space) """
876 self.snat_add_address(self.snat_addr)
877 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
878 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
879 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
882 # between two S-NAT inside interfaces (no translation)
883 pkts = self.create_stream_in(self.pg0, self.pg1)
884 self.pg0.add_stream(pkts)
885 self.pg_enable_capture(self.pg_interfaces)
887 capture = self.pg1.get_capture(len(pkts))
888 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
890 # from S-NAT inside to interface without S-NAT feature (no translation)
891 pkts = self.create_stream_in(self.pg0, self.pg2)
892 self.pg0.add_stream(pkts)
893 self.pg_enable_capture(self.pg_interfaces)
895 capture = self.pg2.get_capture(len(pkts))
896 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
898 # in2out 1st interface
899 pkts = self.create_stream_in(self.pg0, self.pg3)
900 self.pg0.add_stream(pkts)
901 self.pg_enable_capture(self.pg_interfaces)
903 capture = self.pg3.get_capture(len(pkts))
904 self.verify_capture_out(capture)
906 # out2in 1st interface
907 pkts = self.create_stream_out(self.pg3)
908 self.pg3.add_stream(pkts)
909 self.pg_enable_capture(self.pg_interfaces)
911 capture = self.pg0.get_capture(len(pkts))
912 self.verify_capture_in(capture, self.pg0)
914 # in2out 2nd interface
915 pkts = self.create_stream_in(self.pg1, self.pg3)
916 self.pg1.add_stream(pkts)
917 self.pg_enable_capture(self.pg_interfaces)
919 capture = self.pg3.get_capture(len(pkts))
920 self.verify_capture_out(capture)
922 # out2in 2nd interface
923 pkts = self.create_stream_out(self.pg3)
924 self.pg3.add_stream(pkts)
925 self.pg_enable_capture(self.pg_interfaces)
927 capture = self.pg1.get_capture(len(pkts))
928 self.verify_capture_in(capture, self.pg1)
930 def test_inside_overlapping_interfaces(self):
931 """ SNAT multiple inside interfaces with overlapping address space """
933 static_nat_ip = "10.0.0.10"
934 self.snat_add_address(self.snat_addr)
935 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
937 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
938 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
939 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
940 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
943 # between S-NAT inside interfaces with same VRF (no translation)
944 pkts = self.create_stream_in(self.pg4, self.pg5)
945 self.pg4.add_stream(pkts)
946 self.pg_enable_capture(self.pg_interfaces)
948 capture = self.pg5.get_capture(len(pkts))
949 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
951 # between S-NAT inside interfaces with different VRF (hairpinning)
952 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
953 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
954 TCP(sport=1234, dport=5678))
955 self.pg4.add_stream(p)
956 self.pg_enable_capture(self.pg_interfaces)
958 capture = self.pg6.get_capture(1)
963 self.assertEqual(ip.src, self.snat_addr)
964 self.assertEqual(ip.dst, self.pg6.remote_ip4)
965 self.assertNotEqual(tcp.sport, 1234)
966 self.assertEqual(tcp.dport, 5678)
968 self.logger.error(ppp("Unexpected or invalid packet:", p))
971 # in2out 1st interface
972 pkts = self.create_stream_in(self.pg4, self.pg3)
973 self.pg4.add_stream(pkts)
974 self.pg_enable_capture(self.pg_interfaces)
976 capture = self.pg3.get_capture(len(pkts))
977 self.verify_capture_out(capture)
979 # out2in 1st interface
980 pkts = self.create_stream_out(self.pg3)
981 self.pg3.add_stream(pkts)
982 self.pg_enable_capture(self.pg_interfaces)
984 capture = self.pg4.get_capture(len(pkts))
985 self.verify_capture_in(capture, self.pg4)
987 # in2out 2nd interface
988 pkts = self.create_stream_in(self.pg5, self.pg3)
989 self.pg5.add_stream(pkts)
990 self.pg_enable_capture(self.pg_interfaces)
992 capture = self.pg3.get_capture(len(pkts))
993 self.verify_capture_out(capture)
995 # out2in 2nd interface
996 pkts = self.create_stream_out(self.pg3)
997 self.pg3.add_stream(pkts)
998 self.pg_enable_capture(self.pg_interfaces)
1000 capture = self.pg5.get_capture(len(pkts))
1001 self.verify_capture_in(capture, self.pg5)
1004 addresses = self.vapi.snat_address_dump()
1005 self.assertEqual(len(addresses), 1)
1006 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1007 self.assertEqual(len(sessions), 3)
1008 for session in sessions:
1009 self.assertFalse(session.is_static)
1010 self.assertEqual(session.inside_ip_address[0:4],
1011 self.pg5.remote_ip4n)
1012 self.assertEqual(session.outside_ip_address,
1013 addresses[0].ip_address)
1014 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1015 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1016 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1017 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1018 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1019 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1020 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1021 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1022 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1024 # in2out 3rd interface
1025 pkts = self.create_stream_in(self.pg6, self.pg3)
1026 self.pg6.add_stream(pkts)
1027 self.pg_enable_capture(self.pg_interfaces)
1029 capture = self.pg3.get_capture(len(pkts))
1030 self.verify_capture_out(capture, static_nat_ip, True)
1032 # out2in 3rd interface
1033 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1034 self.pg3.add_stream(pkts)
1035 self.pg_enable_capture(self.pg_interfaces)
1037 capture = self.pg6.get_capture(len(pkts))
1038 self.verify_capture_in(capture, self.pg6)
1040 # general user and session dump verifications
1041 users = self.vapi.snat_user_dump()
1042 self.assertTrue(len(users) >= 3)
1043 addresses = self.vapi.snat_address_dump()
1044 self.assertEqual(len(addresses), 1)
1046 sessions = self.vapi.snat_user_session_dump(user.ip_address,
1048 for session in sessions:
1049 self.assertEqual(user.ip_address, session.inside_ip_address)
1050 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1051 self.assertTrue(session.protocol in
1052 [IP_PROTOS.tcp, IP_PROTOS.udp,
1056 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1057 self.assertTrue(len(sessions) >= 4)
1058 for session in sessions:
1059 self.assertFalse(session.is_static)
1060 self.assertEqual(session.inside_ip_address[0:4],
1061 self.pg4.remote_ip4n)
1062 self.assertEqual(session.outside_ip_address,
1063 addresses[0].ip_address)
1066 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1067 self.assertTrue(len(sessions) >= 3)
1068 for session in sessions:
1069 self.assertTrue(session.is_static)
1070 self.assertEqual(session.inside_ip_address[0:4],
1071 self.pg6.remote_ip4n)
1072 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1073 map(int, static_nat_ip.split('.')))
1074 self.assertTrue(session.inside_port in
1075 [self.tcp_port_in, self.udp_port_in,
1078 def test_hairpinning(self):
1079 """ SNAT hairpinning - 1:1 NAT with port"""
1081 host = self.pg0.remote_hosts[0]
1082 server = self.pg0.remote_hosts[1]
1085 server_in_port = 5678
1086 server_out_port = 8765
1088 self.snat_add_address(self.snat_addr)
1089 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1090 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1092 # add static mapping for server
1093 self.snat_add_static_mapping(server.ip4, self.snat_addr,
1094 server_in_port, server_out_port,
1095 proto=IP_PROTOS.tcp)
1097 # send packet from host to server
1098 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1099 IP(src=host.ip4, dst=self.snat_addr) /
1100 TCP(sport=host_in_port, dport=server_out_port))
1101 self.pg0.add_stream(p)
1102 self.pg_enable_capture(self.pg_interfaces)
1104 capture = self.pg0.get_capture(1)
1109 self.assertEqual(ip.src, self.snat_addr)
1110 self.assertEqual(ip.dst, server.ip4)
1111 self.assertNotEqual(tcp.sport, host_in_port)
1112 self.assertEqual(tcp.dport, server_in_port)
1113 host_out_port = tcp.sport
1115 self.logger.error(ppp("Unexpected or invalid packet:", p))
1118 # send reply from server to host
1119 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1120 IP(src=server.ip4, dst=self.snat_addr) /
1121 TCP(sport=server_in_port, dport=host_out_port))
1122 self.pg0.add_stream(p)
1123 self.pg_enable_capture(self.pg_interfaces)
1125 capture = self.pg0.get_capture(1)
1130 self.assertEqual(ip.src, self.snat_addr)
1131 self.assertEqual(ip.dst, host.ip4)
1132 self.assertEqual(tcp.sport, server_out_port)
1133 self.assertEqual(tcp.dport, host_in_port)
1135 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1138 def test_hairpinning2(self):
1139 """ SNAT hairpinning - 1:1 NAT"""
1141 server1_nat_ip = "10.0.0.10"
1142 server2_nat_ip = "10.0.0.11"
1143 host = self.pg0.remote_hosts[0]
1144 server1 = self.pg0.remote_hosts[1]
1145 server2 = self.pg0.remote_hosts[2]
1146 server_tcp_port = 22
1147 server_udp_port = 20
1149 self.snat_add_address(self.snat_addr)
1150 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1151 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1154 # add static mapping for servers
1155 self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1156 self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1160 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1161 IP(src=host.ip4, dst=server1_nat_ip) /
1162 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1164 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1165 IP(src=host.ip4, dst=server1_nat_ip) /
1166 UDP(sport=self.udp_port_in, dport=server_udp_port))
1168 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1169 IP(src=host.ip4, dst=server1_nat_ip) /
1170 ICMP(id=self.icmp_id_in, type='echo-request'))
1172 self.pg0.add_stream(pkts)
1173 self.pg_enable_capture(self.pg_interfaces)
1175 capture = self.pg0.get_capture(len(pkts))
1176 for packet in capture:
1178 self.assertEqual(packet[IP].src, self.snat_addr)
1179 self.assertEqual(packet[IP].dst, server1.ip4)
1180 if packet.haslayer(TCP):
1181 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1182 self.assertEqual(packet[TCP].dport, server_tcp_port)
1183 self.tcp_port_out = packet[TCP].sport
1184 elif packet.haslayer(UDP):
1185 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1186 self.assertEqual(packet[UDP].dport, server_udp_port)
1187 self.udp_port_out = packet[UDP].sport
1189 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1190 self.icmp_id_out = packet[ICMP].id
1192 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1197 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1198 IP(src=server1.ip4, dst=self.snat_addr) /
1199 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1201 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1202 IP(src=server1.ip4, dst=self.snat_addr) /
1203 UDP(sport=server_udp_port, dport=self.udp_port_out))
1205 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1206 IP(src=server1.ip4, dst=self.snat_addr) /
1207 ICMP(id=self.icmp_id_out, type='echo-reply'))
1209 self.pg0.add_stream(pkts)
1210 self.pg_enable_capture(self.pg_interfaces)
1212 capture = self.pg0.get_capture(len(pkts))
1213 for packet in capture:
1215 self.assertEqual(packet[IP].src, server1_nat_ip)
1216 self.assertEqual(packet[IP].dst, host.ip4)
1217 if packet.haslayer(TCP):
1218 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1219 self.assertEqual(packet[TCP].sport, server_tcp_port)
1220 elif packet.haslayer(UDP):
1221 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1222 self.assertEqual(packet[UDP].sport, server_udp_port)
1224 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1226 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1229 # server2 to server1
1231 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1232 IP(src=server2.ip4, dst=server1_nat_ip) /
1233 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1235 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1236 IP(src=server2.ip4, dst=server1_nat_ip) /
1237 UDP(sport=self.udp_port_in, dport=server_udp_port))
1239 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1240 IP(src=server2.ip4, dst=server1_nat_ip) /
1241 ICMP(id=self.icmp_id_in, type='echo-request'))
1243 self.pg0.add_stream(pkts)
1244 self.pg_enable_capture(self.pg_interfaces)
1246 capture = self.pg0.get_capture(len(pkts))
1247 for packet in capture:
1249 self.assertEqual(packet[IP].src, server2_nat_ip)
1250 self.assertEqual(packet[IP].dst, server1.ip4)
1251 if packet.haslayer(TCP):
1252 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1253 self.assertEqual(packet[TCP].dport, server_tcp_port)
1254 self.tcp_port_out = packet[TCP].sport
1255 elif packet.haslayer(UDP):
1256 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1257 self.assertEqual(packet[UDP].dport, server_udp_port)
1258 self.udp_port_out = packet[UDP].sport
1260 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1261 self.icmp_id_out = packet[ICMP].id
1263 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1266 # server1 to server2
1268 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1269 IP(src=server1.ip4, dst=server2_nat_ip) /
1270 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1272 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1273 IP(src=server1.ip4, dst=server2_nat_ip) /
1274 UDP(sport=server_udp_port, dport=self.udp_port_out))
1276 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1277 IP(src=server1.ip4, dst=server2_nat_ip) /
1278 ICMP(id=self.icmp_id_out, type='echo-reply'))
1280 self.pg0.add_stream(pkts)
1281 self.pg_enable_capture(self.pg_interfaces)
1283 capture = self.pg0.get_capture(len(pkts))
1284 for packet in capture:
1286 self.assertEqual(packet[IP].src, server1_nat_ip)
1287 self.assertEqual(packet[IP].dst, server2.ip4)
1288 if packet.haslayer(TCP):
1289 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1290 self.assertEqual(packet[TCP].sport, server_tcp_port)
1291 elif packet.haslayer(UDP):
1292 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1293 self.assertEqual(packet[UDP].sport, server_udp_port)
1295 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1297 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1300 def test_max_translations_per_user(self):
1301 """ MAX translations per user - recycle the least recently used """
1303 self.snat_add_address(self.snat_addr)
1304 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1305 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1308 # get maximum number of translations per user
1309 snat_config = self.vapi.snat_show_config()
1311 # send more than maximum number of translations per user packets
1312 pkts_num = snat_config.max_translations_per_user + 5
1314 for port in range(0, pkts_num):
1315 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1316 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1317 TCP(sport=1025 + port))
1319 self.pg0.add_stream(pkts)
1320 self.pg_enable_capture(self.pg_interfaces)
1323 # verify number of translated packet
1324 self.pg1.get_capture(pkts_num)
1326 def test_interface_addr(self):
1327 """ Acquire SNAT addresses from interface """
1328 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1330 # no address in NAT pool
1331 adresses = self.vapi.snat_address_dump()
1332 self.assertEqual(0, len(adresses))
1334 # configure interface address and check NAT address pool
1335 self.pg7.config_ip4()
1336 adresses = self.vapi.snat_address_dump()
1337 self.assertEqual(1, len(adresses))
1338 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1340 # remove interface address and check NAT address pool
1341 self.pg7.unconfig_ip4()
1342 adresses = self.vapi.snat_address_dump()
1343 self.assertEqual(0, len(adresses))
1345 def test_interface_addr_static_mapping(self):
1346 """ Static mapping with addresses from interface """
1347 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1348 self.snat_add_static_mapping('1.2.3.4',
1349 external_sw_if_index=self.pg7.sw_if_index)
1351 # static mappings with external interface
1352 static_mappings = self.vapi.snat_static_mapping_dump()
1353 self.assertEqual(1, len(static_mappings))
1354 self.assertEqual(self.pg7.sw_if_index,
1355 static_mappings[0].external_sw_if_index)
1357 # configure interface address and check static mappings
1358 self.pg7.config_ip4()
1359 static_mappings = self.vapi.snat_static_mapping_dump()
1360 self.assertEqual(1, len(static_mappings))
1361 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1362 self.pg7.local_ip4n)
1363 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1365 # remove interface address and check static mappings
1366 self.pg7.unconfig_ip4()
1367 static_mappings = self.vapi.snat_static_mapping_dump()
1368 self.assertEqual(0, len(static_mappings))
1370 def test_ipfix_nat44_sess(self):
1371 """ S-NAT IPFIX logging NAT44 session created/delted """
1372 self.ipfix_domain_id = 10
1373 self.ipfix_src_port = 20202
1374 colector_port = 30303
1375 bind_layers(UDP, IPFIX, dport=30303)
1376 self.snat_add_address(self.snat_addr)
1377 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1378 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1380 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1381 src_address=self.pg3.local_ip4n,
1383 template_interval=10,
1384 collector_port=colector_port)
1385 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1386 src_port=self.ipfix_src_port)
1388 pkts = self.create_stream_in(self.pg0, self.pg1)
1389 self.pg0.add_stream(pkts)
1390 self.pg_enable_capture(self.pg_interfaces)
1392 capture = self.pg1.get_capture(len(pkts))
1393 self.verify_capture_out(capture)
1394 self.snat_add_address(self.snat_addr, is_add=0)
1395 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1396 capture = self.pg3.get_capture(3)
1397 ipfix = IPFIXDecoder()
1398 # first load template
1400 self.assertTrue(p.haslayer(IPFIX))
1401 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1402 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1403 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1404 self.assertEqual(p[UDP].dport, colector_port)
1405 self.assertEqual(p[IPFIX].observationDomainID,
1406 self.ipfix_domain_id)
1407 if p.haslayer(Template):
1408 ipfix.add_template(p.getlayer(Template))
1409 # verify events in data set
1411 if p.haslayer(Data):
1412 data = ipfix.decode_data_set(p.getlayer(Set))
1413 self.verify_ipfix_nat44_ses(data)
1415 def test_ipfix_addr_exhausted(self):
1416 """ S-NAT IPFIX logging NAT addresses exhausted """
1417 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1418 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1420 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1421 src_address=self.pg3.local_ip4n,
1423 template_interval=10)
1424 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1425 src_port=self.ipfix_src_port)
1427 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1428 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1430 self.pg0.add_stream(p)
1431 self.pg_enable_capture(self.pg_interfaces)
1433 capture = self.pg1.get_capture(0)
1434 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1435 capture = self.pg3.get_capture(3)
1436 ipfix = IPFIXDecoder()
1437 # first load template
1439 self.assertTrue(p.haslayer(IPFIX))
1440 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1441 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1442 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1443 self.assertEqual(p[UDP].dport, 4739)
1444 self.assertEqual(p[IPFIX].observationDomainID,
1445 self.ipfix_domain_id)
1446 if p.haslayer(Template):
1447 ipfix.add_template(p.getlayer(Template))
1448 # verify events in data set
1450 if p.haslayer(Data):
1451 data = ipfix.decode_data_set(p.getlayer(Set))
1452 self.verify_ipfix_addr_exhausted(data)
1454 def test_pool_addr_fib(self):
1455 """ S-NAT add pool addresses to FIB """
1456 static_addr = '10.0.0.10'
1457 self.snat_add_address(self.snat_addr)
1458 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1459 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1461 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1464 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1465 ARP(op=ARP.who_has, pdst=self.snat_addr,
1466 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1467 self.pg1.add_stream(p)
1468 self.pg_enable_capture(self.pg_interfaces)
1470 capture = self.pg1.get_capture(1)
1471 self.assertTrue(capture[0].haslayer(ARP))
1472 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1475 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1476 ARP(op=ARP.who_has, pdst=static_addr,
1477 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1478 self.pg1.add_stream(p)
1479 self.pg_enable_capture(self.pg_interfaces)
1481 capture = self.pg1.get_capture(1)
1482 self.assertTrue(capture[0].haslayer(ARP))
1483 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1485 # send ARP to non-SNAT interface
1486 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1487 ARP(op=ARP.who_has, pdst=self.snat_addr,
1488 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1489 self.pg2.add_stream(p)
1490 self.pg_enable_capture(self.pg_interfaces)
1492 capture = self.pg1.get_capture(0)
1494 # remove addresses and verify
1495 self.snat_add_address(self.snat_addr, is_add=0)
1496 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1499 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1500 ARP(op=ARP.who_has, pdst=self.snat_addr,
1501 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1502 self.pg1.add_stream(p)
1503 self.pg_enable_capture(self.pg_interfaces)
1505 capture = self.pg1.get_capture(0)
1507 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1508 ARP(op=ARP.who_has, pdst=static_addr,
1509 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1510 self.pg1.add_stream(p)
1511 self.pg_enable_capture(self.pg_interfaces)
1513 capture = self.pg1.get_capture(0)
1515 def test_vrf_mode(self):
1516 """ S-NAT tenant VRF aware address pool mode """
1520 nat_ip1 = "10.0.0.10"
1521 nat_ip2 = "10.0.0.11"
1523 self.pg0.unconfig_ip4()
1524 self.pg1.unconfig_ip4()
1525 self.pg0.set_table_ip4(vrf_id1)
1526 self.pg1.set_table_ip4(vrf_id2)
1527 self.pg0.config_ip4()
1528 self.pg1.config_ip4()
1530 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1531 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1532 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1533 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1534 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1538 pkts = self.create_stream_in(self.pg0, self.pg2)
1539 self.pg0.add_stream(pkts)
1540 self.pg_enable_capture(self.pg_interfaces)
1542 capture = self.pg2.get_capture(len(pkts))
1543 self.verify_capture_out(capture, nat_ip1)
1546 pkts = self.create_stream_in(self.pg1, self.pg2)
1547 self.pg1.add_stream(pkts)
1548 self.pg_enable_capture(self.pg_interfaces)
1550 capture = self.pg2.get_capture(len(pkts))
1551 self.verify_capture_out(capture, nat_ip2)
1553 def test_vrf_feature_independent(self):
1554 """ S-NAT tenant VRF independent address pool mode """
1556 nat_ip1 = "10.0.0.10"
1557 nat_ip2 = "10.0.0.11"
1559 self.snat_add_address(nat_ip1)
1560 self.snat_add_address(nat_ip2)
1561 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1562 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1563 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1567 pkts = self.create_stream_in(self.pg0, self.pg2)
1568 self.pg0.add_stream(pkts)
1569 self.pg_enable_capture(self.pg_interfaces)
1571 capture = self.pg2.get_capture(len(pkts))
1572 self.verify_capture_out(capture, nat_ip1)
1575 pkts = self.create_stream_in(self.pg1, self.pg2)
1576 self.pg1.add_stream(pkts)
1577 self.pg_enable_capture(self.pg_interfaces)
1579 capture = self.pg2.get_capture(len(pkts))
1580 self.verify_capture_out(capture, nat_ip1)
1582 def test_dynamic_ipless_interfaces(self):
1583 """ SNAT interfaces without configured ip dynamic map """
1585 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1586 self.pg7.remote_mac,
1587 self.pg7.remote_ip4n,
1589 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1590 self.pg8.remote_mac,
1591 self.pg8.remote_ip4n,
1594 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1595 dst_address_length=32,
1596 next_hop_address=self.pg7.remote_ip4n,
1597 next_hop_sw_if_index=self.pg7.sw_if_index)
1598 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1599 dst_address_length=32,
1600 next_hop_address=self.pg8.remote_ip4n,
1601 next_hop_sw_if_index=self.pg8.sw_if_index)
1603 self.snat_add_address(self.snat_addr)
1604 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1605 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1609 pkts = self.create_stream_in(self.pg7, self.pg8)
1610 self.pg7.add_stream(pkts)
1611 self.pg_enable_capture(self.pg_interfaces)
1613 capture = self.pg8.get_capture(len(pkts))
1614 self.verify_capture_out(capture)
1617 pkts = self.create_stream_out(self.pg8, self.snat_addr)
1618 self.pg8.add_stream(pkts)
1619 self.pg_enable_capture(self.pg_interfaces)
1621 capture = self.pg7.get_capture(len(pkts))
1622 self.verify_capture_in(capture, self.pg7)
1624 def test_static_ipless_interfaces(self):
1625 """ SNAT 1:1 NAT interfaces without configured ip """
1627 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1628 self.pg7.remote_mac,
1629 self.pg7.remote_ip4n,
1631 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1632 self.pg8.remote_mac,
1633 self.pg8.remote_ip4n,
1636 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1637 dst_address_length=32,
1638 next_hop_address=self.pg7.remote_ip4n,
1639 next_hop_sw_if_index=self.pg7.sw_if_index)
1640 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1641 dst_address_length=32,
1642 next_hop_address=self.pg8.remote_ip4n,
1643 next_hop_sw_if_index=self.pg8.sw_if_index)
1645 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1646 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1647 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1651 pkts = self.create_stream_out(self.pg8)
1652 self.pg8.add_stream(pkts)
1653 self.pg_enable_capture(self.pg_interfaces)
1655 capture = self.pg7.get_capture(len(pkts))
1656 self.verify_capture_in(capture, self.pg7)
1659 pkts = self.create_stream_in(self.pg7, self.pg8)
1660 self.pg7.add_stream(pkts)
1661 self.pg_enable_capture(self.pg_interfaces)
1663 capture = self.pg8.get_capture(len(pkts))
1664 self.verify_capture_out(capture, self.snat_addr, True)
1666 def test_static_with_port_ipless_interfaces(self):
1667 """ SNAT 1:1 NAT with port interfaces without configured ip """
1669 self.tcp_port_out = 30606
1670 self.udp_port_out = 30607
1671 self.icmp_id_out = 30608
1673 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1674 self.pg7.remote_mac,
1675 self.pg7.remote_ip4n,
1677 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1678 self.pg8.remote_mac,
1679 self.pg8.remote_ip4n,
1682 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1683 dst_address_length=32,
1684 next_hop_address=self.pg7.remote_ip4n,
1685 next_hop_sw_if_index=self.pg7.sw_if_index)
1686 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1687 dst_address_length=32,
1688 next_hop_address=self.pg8.remote_ip4n,
1689 next_hop_sw_if_index=self.pg8.sw_if_index)
1691 self.snat_add_address(self.snat_addr)
1692 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1693 self.tcp_port_in, self.tcp_port_out,
1694 proto=IP_PROTOS.tcp)
1695 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1696 self.udp_port_in, self.udp_port_out,
1697 proto=IP_PROTOS.udp)
1698 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1699 self.icmp_id_in, self.icmp_id_out,
1700 proto=IP_PROTOS.icmp)
1701 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1702 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1706 pkts = self.create_stream_out(self.pg8)
1707 self.pg8.add_stream(pkts)
1708 self.pg_enable_capture(self.pg_interfaces)
1710 capture = self.pg7.get_capture(len(pkts))
1711 self.verify_capture_in(capture, self.pg7)
1714 pkts = self.create_stream_in(self.pg7, self.pg8)
1715 self.pg7.add_stream(pkts)
1716 self.pg_enable_capture(self.pg_interfaces)
1718 capture = self.pg8.get_capture(len(pkts))
1719 self.verify_capture_out(capture)
1722 super(TestSNAT, self).tearDown()
1723 if not self.vpp_dead:
1724 self.logger.info(self.vapi.cli("show snat verbose"))
1728 class TestDeterministicNAT(MethodHolder):
1729 """ Deterministic NAT Test Cases """
1732 def setUpConstants(cls):
1733 super(TestDeterministicNAT, cls).setUpConstants()
1734 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1737 def setUpClass(cls):
1738 super(TestDeterministicNAT, cls).setUpClass()
1741 cls.tcp_port_in = 6303
1742 cls.tcp_external_port = 6303
1743 cls.udp_port_in = 6304
1744 cls.udp_external_port = 6304
1745 cls.icmp_id_in = 6305
1746 cls.snat_addr = '10.0.0.3'
1748 cls.create_pg_interfaces(range(3))
1749 cls.interfaces = list(cls.pg_interfaces)
1751 for i in cls.interfaces:
1756 cls.pg0.generate_remote_hosts(2)
1757 cls.pg0.configure_ipv4_neighbors()
1760 super(TestDeterministicNAT, cls).tearDownClass()
1763 def create_stream_in(self, in_if, out_if, ttl=64):
1765 Create packet stream for inside network
1767 :param in_if: Inside interface
1768 :param out_if: Outside interface
1769 :param ttl: TTL of generated packets
1773 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1774 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1775 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1779 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1780 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1781 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1785 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1786 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1787 ICMP(id=self.icmp_id_in, type='echo-request'))
1792 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1794 Create packet stream for outside network
1796 :param out_if: Outside interface
1797 :param dst_ip: Destination IP address (Default use global SNAT address)
1798 :param ttl: TTL of generated packets
1801 dst_ip = self.snat_addr
1804 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1805 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1806 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1810 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1811 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1812 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1816 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1817 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1818 ICMP(id=self.icmp_external_id, type='echo-reply'))
1823 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1825 Verify captured packets on outside network
1827 :param capture: Captured packets
1828 :param nat_ip: Translated IP address (Default use global SNAT address)
1829 :param same_port: Sorce port number is not translated (Default False)
1830 :param packet_num: Expected number of packets (Default 3)
1833 nat_ip = self.snat_addr
1834 self.assertEqual(packet_num, len(capture))
1835 for packet in capture:
1837 self.assertEqual(packet[IP].src, nat_ip)
1838 if packet.haslayer(TCP):
1839 self.tcp_port_out = packet[TCP].sport
1840 elif packet.haslayer(UDP):
1841 self.udp_port_out = packet[UDP].sport
1843 self.icmp_external_id = packet[ICMP].id
1845 self.logger.error(ppp("Unexpected or invalid packet "
1846 "(outside network):", packet))
1849 def initiate_tcp_session(self, in_if, out_if):
1851 Initiates TCP session
1853 :param in_if: Inside interface
1854 :param out_if: Outside interface
1857 # SYN packet in->out
1858 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1859 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1860 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1863 self.pg_enable_capture(self.pg_interfaces)
1865 capture = out_if.get_capture(1)
1867 self.tcp_port_out = p[TCP].sport
1869 # SYN + ACK packet out->in
1870 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1871 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1872 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1874 out_if.add_stream(p)
1875 self.pg_enable_capture(self.pg_interfaces)
1877 in_if.get_capture(1)
1879 # ACK packet in->out
1880 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1881 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1882 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1885 self.pg_enable_capture(self.pg_interfaces)
1887 out_if.get_capture(1)
1890 self.logger.error("TCP 3 way handshake failed")
1893 def verify_ipfix_max_entries_per_user(self, data):
1895 Verify IPFIX maximum entries per user exceeded event
1897 :param data: Decoded IPFIX data records
1899 self.assertEqual(1, len(data))
1902 self.assertEqual(ord(record[230]), 13)
1903 # natQuotaExceededEvent
1904 self.assertEqual('\x03\x00\x00\x00', record[466])
1906 self.assertEqual(self.pg0.remote_ip4n, record[8])
1908 def test_deterministic_mode(self):
1909 """ S-NAT run deterministic mode """
1910 in_addr = '172.16.255.0'
1911 out_addr = '172.17.255.50'
1912 in_addr_t = '172.16.255.20'
1913 in_addr_n = socket.inet_aton(in_addr)
1914 out_addr_n = socket.inet_aton(out_addr)
1915 in_addr_t_n = socket.inet_aton(in_addr_t)
1919 snat_config = self.vapi.snat_show_config()
1920 self.assertEqual(1, snat_config.deterministic)
1922 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1924 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1925 self.assertEqual(rep1.out_addr[:4], out_addr_n)
1926 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1927 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1929 deterministic_mappings = self.vapi.snat_det_map_dump()
1930 self.assertEqual(len(deterministic_mappings), 1)
1931 dsm = deterministic_mappings[0]
1932 self.assertEqual(in_addr_n, dsm.in_addr[:4])
1933 self.assertEqual(in_plen, dsm.in_plen)
1934 self.assertEqual(out_addr_n, dsm.out_addr[:4])
1935 self.assertEqual(out_plen, dsm.out_plen)
1938 deterministic_mappings = self.vapi.snat_det_map_dump()
1939 self.assertEqual(len(deterministic_mappings), 0)
1941 def test_set_timeouts(self):
1942 """ Set deterministic NAT timeouts """
1943 timeouts_before = self.vapi.snat_det_get_timeouts()
1945 self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1946 timeouts_before.tcp_established + 10,
1947 timeouts_before.tcp_transitory + 10,
1948 timeouts_before.icmp + 10)
1950 timeouts_after = self.vapi.snat_det_get_timeouts()
1952 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1953 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1954 self.assertNotEqual(timeouts_before.tcp_established,
1955 timeouts_after.tcp_established)
1956 self.assertNotEqual(timeouts_before.tcp_transitory,
1957 timeouts_after.tcp_transitory)
1959 def test_det_in(self):
1960 """ CGNAT translation test (TCP, UDP, ICMP) """
1962 nat_ip = "10.0.0.10"
1964 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1966 socket.inet_aton(nat_ip),
1968 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1969 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1973 pkts = self.create_stream_in(self.pg0, self.pg1)
1974 self.pg0.add_stream(pkts)
1975 self.pg_enable_capture(self.pg_interfaces)
1977 capture = self.pg1.get_capture(len(pkts))
1978 self.verify_capture_out(capture, nat_ip)
1981 pkts = self.create_stream_out(self.pg1, nat_ip)
1982 self.pg1.add_stream(pkts)
1983 self.pg_enable_capture(self.pg_interfaces)
1985 capture = self.pg0.get_capture(len(pkts))
1986 self.verify_capture_in(capture, self.pg0)
1989 sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
1990 self.assertEqual(len(sessions), 3)
1994 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1995 self.assertEqual(s.in_port, self.tcp_port_in)
1996 self.assertEqual(s.out_port, self.tcp_port_out)
1997 self.assertEqual(s.ext_port, self.tcp_external_port)
2001 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2002 self.assertEqual(s.in_port, self.udp_port_in)
2003 self.assertEqual(s.out_port, self.udp_port_out)
2004 self.assertEqual(s.ext_port, self.udp_external_port)
2008 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2009 self.assertEqual(s.in_port, self.icmp_id_in)
2010 self.assertEqual(s.out_port, self.icmp_external_id)
2012 def test_multiple_users(self):
2013 """ CGNAT multiple users """
2015 nat_ip = "10.0.0.10"
2017 external_port = 6303
2019 host0 = self.pg0.remote_hosts[0]
2020 host1 = self.pg0.remote_hosts[1]
2022 self.vapi.snat_add_det_map(host0.ip4n,
2024 socket.inet_aton(nat_ip),
2026 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2027 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2031 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2032 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2033 TCP(sport=port_in, dport=external_port))
2034 self.pg0.add_stream(p)
2035 self.pg_enable_capture(self.pg_interfaces)
2037 capture = self.pg1.get_capture(1)
2042 self.assertEqual(ip.src, nat_ip)
2043 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2044 self.assertEqual(tcp.dport, external_port)
2045 port_out0 = tcp.sport
2047 self.logger.error(ppp("Unexpected or invalid packet:", p))
2051 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2052 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2053 TCP(sport=port_in, dport=external_port))
2054 self.pg0.add_stream(p)
2055 self.pg_enable_capture(self.pg_interfaces)
2057 capture = self.pg1.get_capture(1)
2062 self.assertEqual(ip.src, nat_ip)
2063 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2064 self.assertEqual(tcp.dport, external_port)
2065 port_out1 = tcp.sport
2067 self.logger.error(ppp("Unexpected or invalid packet:", p))
2070 dms = self.vapi.snat_det_map_dump()
2071 self.assertEqual(1, len(dms))
2072 self.assertEqual(2, dms[0].ses_num)
2075 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2076 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2077 TCP(sport=external_port, dport=port_out0))
2078 self.pg1.add_stream(p)
2079 self.pg_enable_capture(self.pg_interfaces)
2081 capture = self.pg0.get_capture(1)
2086 self.assertEqual(ip.src, self.pg1.remote_ip4)
2087 self.assertEqual(ip.dst, host0.ip4)
2088 self.assertEqual(tcp.dport, port_in)
2089 self.assertEqual(tcp.sport, external_port)
2091 self.logger.error(ppp("Unexpected or invalid packet:", p))
2095 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2096 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2097 TCP(sport=external_port, dport=port_out1))
2098 self.pg1.add_stream(p)
2099 self.pg_enable_capture(self.pg_interfaces)
2101 capture = self.pg0.get_capture(1)
2106 self.assertEqual(ip.src, self.pg1.remote_ip4)
2107 self.assertEqual(ip.dst, host1.ip4)
2108 self.assertEqual(tcp.dport, port_in)
2109 self.assertEqual(tcp.sport, external_port)
2111 self.logger.error(ppp("Unexpected or invalid packet", p))
2114 # session close api test
2115 self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2117 self.pg1.remote_ip4n,
2119 dms = self.vapi.snat_det_map_dump()
2120 self.assertEqual(dms[0].ses_num, 1)
2122 self.vapi.snat_det_close_session_in(host0.ip4n,
2124 self.pg1.remote_ip4n,
2126 dms = self.vapi.snat_det_map_dump()
2127 self.assertEqual(dms[0].ses_num, 0)
2129 def test_tcp_session_close_detection_in(self):
2130 """ CGNAT TCP session close initiated from inside network """
2131 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2133 socket.inet_aton(self.snat_addr),
2135 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2136 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2139 self.initiate_tcp_session(self.pg0, self.pg1)
2141 # close the session from inside
2143 # FIN packet in -> out
2144 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2145 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2146 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2148 self.pg0.add_stream(p)
2149 self.pg_enable_capture(self.pg_interfaces)
2151 self.pg1.get_capture(1)
2155 # ACK packet out -> in
2156 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2157 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2158 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2162 # FIN packet out -> in
2163 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2164 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2165 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2169 self.pg1.add_stream(pkts)
2170 self.pg_enable_capture(self.pg_interfaces)
2172 self.pg0.get_capture(2)
2174 # ACK packet in -> out
2175 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2176 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2177 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2179 self.pg0.add_stream(p)
2180 self.pg_enable_capture(self.pg_interfaces)
2182 self.pg1.get_capture(1)
2184 # Check if snat closed the session
2185 dms = self.vapi.snat_det_map_dump()
2186 self.assertEqual(0, dms[0].ses_num)
2188 self.logger.error("TCP session termination failed")
2191 def test_tcp_session_close_detection_out(self):
2192 """ CGNAT TCP session close initiated from outside network """
2193 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2195 socket.inet_aton(self.snat_addr),
2197 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2198 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2201 self.initiate_tcp_session(self.pg0, self.pg1)
2203 # close the session from outside
2205 # FIN packet out -> in
2206 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2207 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2208 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2210 self.pg1.add_stream(p)
2211 self.pg_enable_capture(self.pg_interfaces)
2213 self.pg0.get_capture(1)
2217 # ACK packet in -> out
2218 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2219 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2220 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2224 # ACK packet in -> out
2225 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2226 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2227 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2231 self.pg0.add_stream(pkts)
2232 self.pg_enable_capture(self.pg_interfaces)
2234 self.pg1.get_capture(2)
2236 # ACK packet out -> in
2237 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2238 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2239 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2241 self.pg1.add_stream(p)
2242 self.pg_enable_capture(self.pg_interfaces)
2244 self.pg0.get_capture(1)
2246 # Check if snat closed the session
2247 dms = self.vapi.snat_det_map_dump()
2248 self.assertEqual(0, dms[0].ses_num)
2250 self.logger.error("TCP session termination failed")
2253 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2254 def test_session_timeout(self):
2255 """ CGNAT session timeouts """
2256 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2258 socket.inet_aton(self.snat_addr),
2260 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2261 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2264 self.initiate_tcp_session(self.pg0, self.pg1)
2265 self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2266 pkts = self.create_stream_in(self.pg0, self.pg1)
2267 self.pg0.add_stream(pkts)
2268 self.pg_enable_capture(self.pg_interfaces)
2270 capture = self.pg1.get_capture(len(pkts))
2273 dms = self.vapi.snat_det_map_dump()
2274 self.assertEqual(0, dms[0].ses_num)
2276 def test_session_limit_per_user(self):
2277 """ CGNAT maximum 1000 sessions per user should be created """
2278 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2280 socket.inet_aton(self.snat_addr),
2282 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2283 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2285 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2286 src_address=self.pg2.local_ip4n,
2288 template_interval=10)
2289 self.vapi.snat_ipfix()
2292 for port in range(1025, 2025):
2293 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2294 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2295 UDP(sport=port, dport=port))
2298 self.pg0.add_stream(pkts)
2299 self.pg_enable_capture(self.pg_interfaces)
2301 capture = self.pg1.get_capture(len(pkts))
2303 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2304 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2305 UDP(sport=3001, dport=3002))
2306 self.pg0.add_stream(p)
2307 self.pg_enable_capture(self.pg_interfaces)
2309 capture = self.pg1.assert_nothing_captured()
2311 # verify ICMP error packet
2312 capture = self.pg0.get_capture(1)
2314 self.assertTrue(p.haslayer(ICMP))
2316 self.assertEqual(icmp.type, 3)
2317 self.assertEqual(icmp.code, 1)
2318 self.assertTrue(icmp.haslayer(IPerror))
2319 inner_ip = icmp[IPerror]
2320 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2321 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2323 dms = self.vapi.snat_det_map_dump()
2325 self.assertEqual(1000, dms[0].ses_num)
2327 # verify IPFIX logging
2328 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2329 capture = self.pg2.get_capture(2)
2330 ipfix = IPFIXDecoder()
2331 # first load template
2333 self.assertTrue(p.haslayer(IPFIX))
2334 if p.haslayer(Template):
2335 ipfix.add_template(p.getlayer(Template))
2336 # verify events in data set
2338 if p.haslayer(Data):
2339 data = ipfix.decode_data_set(p.getlayer(Set))
2340 self.verify_ipfix_max_entries_per_user(data)
2342 def clear_snat(self):
2344 Clear SNAT configuration.
2346 self.vapi.snat_ipfix(enable=0)
2347 self.vapi.snat_det_set_timeouts()
2348 deterministic_mappings = self.vapi.snat_det_map_dump()
2349 for dsm in deterministic_mappings:
2350 self.vapi.snat_add_det_map(dsm.in_addr,
2356 interfaces = self.vapi.snat_interface_dump()
2357 for intf in interfaces:
2358 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2363 super(TestDeterministicNAT, self).tearDown()
2364 if not self.vpp_dead:
2365 self.logger.info(self.vapi.cli("show snat detail"))
2369 class TestNAT64(MethodHolder):
2370 """ NAT64 Test Cases """
2373 def setUpClass(cls):
2374 super(TestNAT64, cls).setUpClass()
2377 cls.tcp_port_in = 6303
2378 cls.tcp_port_out = 6303
2379 cls.udp_port_in = 6304
2380 cls.udp_port_out = 6304
2381 cls.icmp_id_in = 6305
2382 cls.icmp_id_out = 6305
2383 cls.nat_addr = '10.0.0.3'
2384 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2386 cls.create_pg_interfaces(range(2))
2387 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2388 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2390 for i in cls.ip6_interfaces:
2395 for i in cls.ip4_interfaces:
2401 super(TestNAT64, cls).tearDownClass()
2404 def test_pool(self):
2405 """ Add/delete address to NAT64 pool """
2406 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2408 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2410 addresses = self.vapi.nat64_pool_addr_dump()
2411 self.assertEqual(len(addresses), 1)
2412 self.assertEqual(addresses[0].address, nat_addr)
2414 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2416 addresses = self.vapi.nat64_pool_addr_dump()
2417 self.assertEqual(len(addresses), 0)
2419 def test_interface(self):
2420 """ Enable/disable NAT64 feature on the interface """
2421 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2422 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2424 interfaces = self.vapi.nat64_interface_dump()
2425 self.assertEqual(len(interfaces), 2)
2428 for intf in interfaces:
2429 if intf.sw_if_index == self.pg0.sw_if_index:
2430 self.assertEqual(intf.is_inside, 1)
2432 elif intf.sw_if_index == self.pg1.sw_if_index:
2433 self.assertEqual(intf.is_inside, 0)
2435 self.assertTrue(pg0_found)
2436 self.assertTrue(pg1_found)
2438 features = self.vapi.cli("show interface features pg0")
2439 self.assertNotEqual(features.find('nat64-in2out'), -1)
2440 features = self.vapi.cli("show interface features pg1")
2441 self.assertNotEqual(features.find('nat64-out2in'), -1)
2443 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2444 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2446 interfaces = self.vapi.nat64_interface_dump()
2447 self.assertEqual(len(interfaces), 0)
2449 def test_static_bib(self):
2450 """ Add/delete static BIB entry """
2451 in_addr = socket.inet_pton(socket.AF_INET6,
2452 '2001:db8:85a3::8a2e:370:7334')
2453 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2456 proto = IP_PROTOS.tcp
2458 self.vapi.nat64_add_del_static_bib(in_addr,
2463 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2468 self.assertEqual(bibe.i_addr, in_addr)
2469 self.assertEqual(bibe.o_addr, out_addr)
2470 self.assertEqual(bibe.i_port, in_port)
2471 self.assertEqual(bibe.o_port, out_port)
2472 self.assertEqual(static_bib_num, 1)
2474 self.vapi.nat64_add_del_static_bib(in_addr,
2480 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2485 self.assertEqual(static_bib_num, 0)
2487 def test_set_timeouts(self):
2488 """ Set NAT64 timeouts """
2489 # verify default values
2490 timeouts = self.vapi.nat64_get_timeouts()
2491 self.assertEqual(timeouts.udp, 300)
2492 self.assertEqual(timeouts.icmp, 60)
2493 self.assertEqual(timeouts.tcp_trans, 240)
2494 self.assertEqual(timeouts.tcp_est, 7440)
2495 self.assertEqual(timeouts.tcp_incoming_syn, 6)
2497 # set and verify custom values
2498 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2499 tcp_est=7450, tcp_incoming_syn=10)
2500 timeouts = self.vapi.nat64_get_timeouts()
2501 self.assertEqual(timeouts.udp, 200)
2502 self.assertEqual(timeouts.icmp, 30)
2503 self.assertEqual(timeouts.tcp_trans, 250)
2504 self.assertEqual(timeouts.tcp_est, 7450)
2505 self.assertEqual(timeouts.tcp_incoming_syn, 10)
2507 def test_dynamic(self):
2508 """ NAT64 dynamic translation test """
2509 self.tcp_port_in = 6303
2510 self.udp_port_in = 6304
2511 self.icmp_id_in = 6305
2513 ses_num_start = self.nat64_get_ses_num()
2515 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2517 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2518 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2521 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2522 self.pg0.add_stream(pkts)
2523 self.pg_enable_capture(self.pg_interfaces)
2525 capture = self.pg1.get_capture(3)
2526 self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2527 dst_ip=self.pg1.remote_ip4)
2530 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2531 self.pg1.add_stream(pkts)
2532 self.pg_enable_capture(self.pg_interfaces)
2534 capture = self.pg0.get_capture(3)
2535 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2536 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2539 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2540 self.pg0.add_stream(pkts)
2541 self.pg_enable_capture(self.pg_interfaces)
2543 capture = self.pg1.get_capture(3)
2544 self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2545 dst_ip=self.pg1.remote_ip4)
2548 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2549 self.pg1.add_stream(pkts)
2550 self.pg_enable_capture(self.pg_interfaces)
2552 capture = self.pg0.get_capture(3)
2553 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2554 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2556 ses_num_end = self.nat64_get_ses_num()
2558 self.assertEqual(ses_num_end - ses_num_start, 3)
2560 def test_static(self):
2561 """ NAT64 static translation test """
2562 self.tcp_port_in = 60303
2563 self.udp_port_in = 60304
2564 self.icmp_id_in = 60305
2565 self.tcp_port_out = 60303
2566 self.udp_port_out = 60304
2567 self.icmp_id_out = 60305
2569 ses_num_start = self.nat64_get_ses_num()
2571 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2573 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2574 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2576 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2581 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2586 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2593 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2594 self.pg0.add_stream(pkts)
2595 self.pg_enable_capture(self.pg_interfaces)
2597 capture = self.pg1.get_capture(3)
2598 self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2599 dst_ip=self.pg1.remote_ip4, same_port=True)
2602 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2603 self.pg1.add_stream(pkts)
2604 self.pg_enable_capture(self.pg_interfaces)
2606 capture = self.pg0.get_capture(3)
2607 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2608 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2610 ses_num_end = self.nat64_get_ses_num()
2612 self.assertEqual(ses_num_end - ses_num_start, 3)
2614 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2615 def test_session_timeout(self):
2616 """ NAT64 session timeout """
2617 self.icmp_id_in = 1234
2618 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2620 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2621 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2622 self.vapi.nat64_set_timeouts(icmp=5)
2624 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2625 self.pg0.add_stream(pkts)
2626 self.pg_enable_capture(self.pg_interfaces)
2628 capture = self.pg1.get_capture(3)
2630 ses_num_before_timeout = self.nat64_get_ses_num()
2634 # ICMP session after timeout
2635 ses_num_after_timeout = self.nat64_get_ses_num()
2636 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2638 def nat64_get_ses_num(self):
2640 Return number of active NAT64 sessions.
2643 st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
2645 st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
2647 st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
2651 def clear_nat64(self):
2653 Clear NAT64 configuration.
2655 self.vapi.nat64_set_timeouts()
2657 interfaces = self.vapi.nat64_interface_dump()
2658 for intf in interfaces:
2659 self.vapi.nat64_add_del_interface(intf.sw_if_index,
2663 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2666 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2674 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
2677 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2685 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
2688 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2696 adresses = self.vapi.nat64_pool_addr_dump()
2697 for addr in adresses:
2698 self.vapi.nat64_add_del_pool_addr_range(addr.address,
2703 super(TestNAT64, self).tearDown()
2704 if not self.vpp_dead:
2705 self.logger.info(self.vapi.cli("show nat64 pool"))
2706 self.logger.info(self.vapi.cli("show nat64 interfaces"))
2707 self.logger.info(self.vapi.cli("show nat64 bib tcp"))
2708 self.logger.info(self.vapi.cli("show nat64 bib udp"))
2709 self.logger.info(self.vapi.cli("show nat64 bib icmp"))
2710 self.logger.info(self.vapi.cli("show nat64 session table tcp"))
2711 self.logger.info(self.vapi.cli("show nat64 session table udp"))
2712 self.logger.info(self.vapi.cli("show nat64 session table icmp"))
2715 if __name__ == '__main__':
2716 unittest.main(testRunner=VppTestRunner)