7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
20 class MethodHolder(VppTestCase):
21 """ SNAT create capture and verify method holder """
25 super(MethodHolder, cls).setUpClass()
28 super(MethodHolder, self).tearDown()
30 def check_ip_checksum(self, pkt):
32 Check IP checksum of the packet
34 :param pkt: Packet to check IP checksum
36 new = pkt.__class__(str(pkt))
38 new = new.__class__(str(new))
39 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
41 def check_tcp_checksum(self, pkt):
43 Check TCP checksum in IP packet
45 :param pkt: Packet to check TCP checksum
47 new = pkt.__class__(str(pkt))
49 new = new.__class__(str(new))
50 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
52 def check_udp_checksum(self, pkt):
54 Check UDP checksum in IP packet
56 :param pkt: Packet to check UDP checksum
58 new = pkt.__class__(str(pkt))
60 new = new.__class__(str(new))
61 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
63 def check_icmp_errror_embedded(self, pkt):
65 Check ICMP error embeded packet checksum
67 :param pkt: Packet to check ICMP error embeded packet checksum
69 if pkt.haslayer(IPerror):
70 new = pkt.__class__(str(pkt))
71 del new['IPerror'].chksum
72 new = new.__class__(str(new))
73 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
75 if pkt.haslayer(TCPerror):
76 new = pkt.__class__(str(pkt))
77 del new['TCPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
81 if pkt.haslayer(UDPerror):
82 if pkt['UDPerror'].chksum != 0:
83 new = pkt.__class__(str(pkt))
84 del new['UDPerror'].chksum
85 new = new.__class__(str(new))
86 self.assertEqual(new['UDPerror'].chksum,
87 pkt['UDPerror'].chksum)
89 if pkt.haslayer(ICMPerror):
90 del new['ICMPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
94 def check_icmp_checksum(self, pkt):
96 Check ICMP checksum in IPv4 packet
98 :param pkt: Packet to check ICMP checksum
100 new = pkt.__class__(str(pkt))
101 del new['ICMP'].chksum
102 new = new.__class__(str(new))
103 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104 if pkt.haslayer(IPerror):
105 self.check_icmp_errror_embedded(pkt)
107 def check_icmpv6_checksum(self, pkt):
109 Check ICMPv6 checksum in IPv4 packet
111 :param pkt: Packet to check ICMPv6 checksum
113 new = pkt.__class__(str(pkt))
114 if pkt.haslayer(ICMPv6DestUnreach):
115 del new['ICMPv6DestUnreach'].cksum
116 new = new.__class__(str(new))
117 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118 pkt['ICMPv6DestUnreach'].cksum)
119 self.check_icmp_errror_embedded(pkt)
120 if pkt.haslayer(ICMPv6EchoRequest):
121 del new['ICMPv6EchoRequest'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124 pkt['ICMPv6EchoRequest'].cksum)
125 if pkt.haslayer(ICMPv6EchoReply):
126 del new['ICMPv6EchoReply'].cksum
127 new = new.__class__(str(new))
128 self.assertEqual(new['ICMPv6EchoReply'].cksum,
129 pkt['ICMPv6EchoReply'].cksum)
131 def create_stream_in(self, in_if, out_if, ttl=64):
133 Create packet stream for inside network
135 :param in_if: Inside interface
136 :param out_if: Outside interface
137 :param ttl: TTL of generated packets
141 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143 TCP(sport=self.tcp_port_in, dport=20))
147 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149 UDP(sport=self.udp_port_in, dport=20))
153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155 ICMP(id=self.icmp_id_in, type='echo-request'))
160 def compose_ip6(self, ip4, pref, plen):
162 Compose IPv4-embedded IPv6 addresses
164 :param ip4: IPv4 address
165 :param pref: IPv6 prefix
166 :param plen: IPv6 prefix length
167 :returns: IPv4-embedded IPv6 addresses
169 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
170 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
185 pref_n[10] = ip4_n[3]
189 pref_n[10] = ip4_n[2]
190 pref_n[11] = ip4_n[3]
193 pref_n[10] = ip4_n[1]
194 pref_n[11] = ip4_n[2]
195 pref_n[12] = ip4_n[3]
197 pref_n[12] = ip4_n[0]
198 pref_n[13] = ip4_n[1]
199 pref_n[14] = ip4_n[2]
200 pref_n[15] = ip4_n[3]
201 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
203 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
205 Create IPv6 packet stream for inside network
207 :param in_if: Inside interface
208 :param out_if: Outside interface
209 :param ttl: Hop Limit of generated packets
210 :param pref: NAT64 prefix
211 :param plen: NAT64 prefix length
215 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
217 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
220 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
221 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
222 TCP(sport=self.tcp_port_in, dport=20))
226 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228 UDP(sport=self.udp_port_in, dport=20))
232 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234 ICMPv6EchoRequest(id=self.icmp_id_in))
239 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
241 Create packet stream for outside network
243 :param out_if: Outside interface
244 :param dst_ip: Destination IP address (Default use global SNAT address)
245 :param ttl: TTL of generated packets
248 dst_ip = self.snat_addr
251 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
252 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
253 TCP(dport=self.tcp_port_out, sport=20))
257 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259 UDP(dport=self.udp_port_out, sport=20))
263 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265 ICMP(id=self.icmp_id_out, type='echo-reply'))
270 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
271 packet_num=3, dst_ip=None):
273 Verify captured packets on outside network
275 :param capture: Captured packets
276 :param nat_ip: Translated IP address (Default use global SNAT address)
277 :param same_port: Sorce port number is not translated (Default False)
278 :param packet_num: Expected number of packets (Default 3)
279 :param dst_ip: Destination IP address (Default do not verify)
282 nat_ip = self.snat_addr
283 self.assertEqual(packet_num, len(capture))
284 for packet in capture:
286 self.check_ip_checksum(packet)
287 self.assertEqual(packet[IP].src, nat_ip)
288 if dst_ip is not None:
289 self.assertEqual(packet[IP].dst, dst_ip)
290 if packet.haslayer(TCP):
292 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
295 packet[TCP].sport, self.tcp_port_in)
296 self.tcp_port_out = packet[TCP].sport
297 self.check_tcp_checksum(packet)
298 elif packet.haslayer(UDP):
300 self.assertEqual(packet[UDP].sport, self.udp_port_in)
303 packet[UDP].sport, self.udp_port_in)
304 self.udp_port_out = packet[UDP].sport
307 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
309 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
310 self.icmp_id_out = packet[ICMP].id
311 self.check_icmp_checksum(packet)
313 self.logger.error(ppp("Unexpected or invalid packet "
314 "(outside network):", packet))
317 def verify_capture_in(self, capture, in_if, packet_num=3):
319 Verify captured packets on inside network
321 :param capture: Captured packets
322 :param in_if: Inside interface
323 :param packet_num: Expected number of packets (Default 3)
325 self.assertEqual(packet_num, len(capture))
326 for packet in capture:
328 self.check_ip_checksum(packet)
329 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
330 if packet.haslayer(TCP):
331 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
332 self.check_tcp_checksum(packet)
333 elif packet.haslayer(UDP):
334 self.assertEqual(packet[UDP].dport, self.udp_port_in)
336 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
337 self.check_icmp_checksum(packet)
339 self.logger.error(ppp("Unexpected or invalid packet "
340 "(inside network):", packet))
343 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
345 Verify captured IPv6 packets on inside network
347 :param capture: Captured packets
348 :param src_ip: Source IP
349 :param dst_ip: Destination IP address
350 :param packet_num: Expected number of packets (Default 3)
352 self.assertEqual(packet_num, len(capture))
353 for packet in capture:
355 self.assertEqual(packet[IPv6].src, src_ip)
356 self.assertEqual(packet[IPv6].dst, dst_ip)
357 if packet.haslayer(TCP):
358 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
359 self.check_tcp_checksum(packet)
360 elif packet.haslayer(UDP):
361 self.assertEqual(packet[UDP].dport, self.udp_port_in)
362 self.check_udp_checksum(packet)
364 self.assertEqual(packet[ICMPv6EchoReply].id,
366 self.check_icmpv6_checksum(packet)
368 self.logger.error(ppp("Unexpected or invalid packet "
369 "(inside network):", packet))
372 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
374 Verify captured packet that don't have to be translated
376 :param capture: Captured packets
377 :param ingress_if: Ingress interface
378 :param egress_if: Egress interface
380 for packet in capture:
382 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
383 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
384 if packet.haslayer(TCP):
385 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
386 elif packet.haslayer(UDP):
387 self.assertEqual(packet[UDP].sport, self.udp_port_in)
389 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
391 self.logger.error(ppp("Unexpected or invalid packet "
392 "(inside network):", packet))
395 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
396 packet_num=3, icmp_type=11):
398 Verify captured packets with ICMP errors on outside network
400 :param capture: Captured packets
401 :param src_ip: Translated IP address or IP address of VPP
402 (Default use global SNAT address)
403 :param packet_num: Expected number of packets (Default 3)
404 :param icmp_type: Type of error ICMP packet
405 we are expecting (Default 11)
408 src_ip = self.snat_addr
409 self.assertEqual(packet_num, len(capture))
410 for packet in capture:
412 self.assertEqual(packet[IP].src, src_ip)
413 self.assertTrue(packet.haslayer(ICMP))
415 self.assertEqual(icmp.type, icmp_type)
416 self.assertTrue(icmp.haslayer(IPerror))
417 inner_ip = icmp[IPerror]
418 if inner_ip.haslayer(TCPerror):
419 self.assertEqual(inner_ip[TCPerror].dport,
421 elif inner_ip.haslayer(UDPerror):
422 self.assertEqual(inner_ip[UDPerror].dport,
425 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
427 self.logger.error(ppp("Unexpected or invalid packet "
428 "(outside network):", packet))
431 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
434 Verify captured packets with ICMP errors on inside network
436 :param capture: Captured packets
437 :param in_if: Inside interface
438 :param packet_num: Expected number of packets (Default 3)
439 :param icmp_type: Type of error ICMP packet
440 we are expecting (Default 11)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
446 self.assertTrue(packet.haslayer(ICMP))
448 self.assertEqual(icmp.type, icmp_type)
449 self.assertTrue(icmp.haslayer(IPerror))
450 inner_ip = icmp[IPerror]
451 if inner_ip.haslayer(TCPerror):
452 self.assertEqual(inner_ip[TCPerror].sport,
454 elif inner_ip.haslayer(UDPerror):
455 self.assertEqual(inner_ip[UDPerror].sport,
458 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
460 self.logger.error(ppp("Unexpected or invalid packet "
461 "(inside network):", packet))
464 def verify_ipfix_nat44_ses(self, data):
466 Verify IPFIX NAT44 session create/delete event
468 :param data: Decoded IPFIX data records
470 nat44_ses_create_num = 0
471 nat44_ses_delete_num = 0
472 self.assertEqual(6, len(data))
475 self.assertIn(ord(record[230]), [4, 5])
476 if ord(record[230]) == 4:
477 nat44_ses_create_num += 1
479 nat44_ses_delete_num += 1
481 self.assertEqual(self.pg0.remote_ip4n, record[8])
482 # postNATSourceIPv4Address
483 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
486 self.assertEqual(struct.pack("!I", 0), record[234])
487 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
488 if IP_PROTOS.icmp == ord(record[4]):
489 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
490 self.assertEqual(struct.pack("!H", self.icmp_id_out),
492 elif IP_PROTOS.tcp == ord(record[4]):
493 self.assertEqual(struct.pack("!H", self.tcp_port_in),
495 self.assertEqual(struct.pack("!H", self.tcp_port_out),
497 elif IP_PROTOS.udp == ord(record[4]):
498 self.assertEqual(struct.pack("!H", self.udp_port_in),
500 self.assertEqual(struct.pack("!H", self.udp_port_out),
503 self.fail("Invalid protocol")
504 self.assertEqual(3, nat44_ses_create_num)
505 self.assertEqual(3, nat44_ses_delete_num)
507 def verify_ipfix_addr_exhausted(self, data):
509 Verify IPFIX NAT addresses event
511 :param data: Decoded IPFIX data records
513 self.assertEqual(1, len(data))
516 self.assertEqual(ord(record[230]), 3)
518 self.assertEqual(struct.pack("!I", 0), record[283])
521 class TestSNAT(MethodHolder):
522 """ SNAT Test Cases """
526 super(TestSNAT, cls).setUpClass()
529 cls.tcp_port_in = 6303
530 cls.tcp_port_out = 6303
531 cls.udp_port_in = 6304
532 cls.udp_port_out = 6304
533 cls.icmp_id_in = 6305
534 cls.icmp_id_out = 6305
535 cls.snat_addr = '10.0.0.3'
536 cls.ipfix_src_port = 4739
537 cls.ipfix_domain_id = 1
539 cls.create_pg_interfaces(range(9))
540 cls.interfaces = list(cls.pg_interfaces[0:4])
542 for i in cls.interfaces:
547 cls.pg0.generate_remote_hosts(3)
548 cls.pg0.configure_ipv4_neighbors()
550 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
552 cls.pg4._local_ip4 = "172.16.255.1"
553 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
554 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
555 cls.pg4.set_table_ip4(10)
556 cls.pg5._local_ip4 = "172.17.255.3"
557 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
558 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
559 cls.pg5.set_table_ip4(10)
560 cls.pg6._local_ip4 = "172.16.255.1"
561 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
562 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
563 cls.pg6.set_table_ip4(20)
564 for i in cls.overlapping_interfaces:
573 super(TestSNAT, cls).tearDownClass()
576 def clear_snat(self):
578 Clear SNAT configuration.
580 # I found no elegant way to do this
581 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
582 dst_address_length=32,
583 next_hop_address=self.pg7.remote_ip4n,
584 next_hop_sw_if_index=self.pg7.sw_if_index,
586 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
587 dst_address_length=32,
588 next_hop_address=self.pg8.remote_ip4n,
589 next_hop_sw_if_index=self.pg8.sw_if_index,
592 for intf in [self.pg7, self.pg8]:
593 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
595 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
600 if self.pg7.has_ip4_config:
601 self.pg7.unconfig_ip4()
603 interfaces = self.vapi.snat_interface_addr_dump()
604 for intf in interfaces:
605 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
607 self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
608 domain_id=self.ipfix_domain_id)
609 self.ipfix_src_port = 4739
610 self.ipfix_domain_id = 1
612 interfaces = self.vapi.snat_interface_dump()
613 for intf in interfaces:
614 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
618 interfaces = self.vapi.snat_interface_output_feature_dump()
619 for intf in interfaces:
620 self.vapi.snat_interface_add_del_output_feature(intf.sw_if_index,
624 static_mappings = self.vapi.snat_static_mapping_dump()
625 for sm in static_mappings:
626 self.vapi.snat_add_static_mapping(sm.local_ip_address,
627 sm.external_ip_address,
628 local_port=sm.local_port,
629 external_port=sm.external_port,
630 addr_only=sm.addr_only,
632 protocol=sm.protocol,
635 adresses = self.vapi.snat_address_dump()
636 for addr in adresses:
637 self.vapi.snat_add_address_range(addr.ip_address,
641 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
642 local_port=0, external_port=0, vrf_id=0,
643 is_add=1, external_sw_if_index=0xFFFFFFFF,
646 Add/delete S-NAT static mapping
648 :param local_ip: Local IP address
649 :param external_ip: External IP address
650 :param local_port: Local port number (Optional)
651 :param external_port: External port number (Optional)
652 :param vrf_id: VRF ID (Default 0)
653 :param is_add: 1 if add, 0 if delete (Default add)
654 :param external_sw_if_index: External interface instead of IP address
655 :param proto: IP protocol (Mandatory if port specified)
658 if local_port and external_port:
660 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
661 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
662 self.vapi.snat_add_static_mapping(
665 external_sw_if_index,
673 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
675 Add/delete S-NAT address
677 :param ip: IP address
678 :param is_add: 1 if add, 0 if delete (Default add)
680 snat_addr = socket.inet_pton(socket.AF_INET, ip)
681 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
684 def test_dynamic(self):
685 """ SNAT dynamic translation test """
687 self.snat_add_address(self.snat_addr)
688 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
689 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
693 pkts = self.create_stream_in(self.pg0, self.pg1)
694 self.pg0.add_stream(pkts)
695 self.pg_enable_capture(self.pg_interfaces)
697 capture = self.pg1.get_capture(len(pkts))
698 self.verify_capture_out(capture)
701 pkts = self.create_stream_out(self.pg1)
702 self.pg1.add_stream(pkts)
703 self.pg_enable_capture(self.pg_interfaces)
705 capture = self.pg0.get_capture(len(pkts))
706 self.verify_capture_in(capture, self.pg0)
708 def test_dynamic_icmp_errors_in2out_ttl_1(self):
709 """ SNAT handling of client packets with TTL=1 """
711 self.snat_add_address(self.snat_addr)
712 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
713 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
716 # Client side - generate traffic
717 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
718 self.pg0.add_stream(pkts)
719 self.pg_enable_capture(self.pg_interfaces)
722 # Client side - verify ICMP type 11 packets
723 capture = self.pg0.get_capture(len(pkts))
724 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
726 def test_dynamic_icmp_errors_out2in_ttl_1(self):
727 """ SNAT handling of server packets with TTL=1 """
729 self.snat_add_address(self.snat_addr)
730 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
731 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
734 # Client side - create sessions
735 pkts = self.create_stream_in(self.pg0, self.pg1)
736 self.pg0.add_stream(pkts)
737 self.pg_enable_capture(self.pg_interfaces)
740 # Server side - generate traffic
741 capture = self.pg1.get_capture(len(pkts))
742 self.verify_capture_out(capture)
743 pkts = self.create_stream_out(self.pg1, ttl=1)
744 self.pg1.add_stream(pkts)
745 self.pg_enable_capture(self.pg_interfaces)
748 # Server side - verify ICMP type 11 packets
749 capture = self.pg1.get_capture(len(pkts))
750 self.verify_capture_out_with_icmp_errors(capture,
751 src_ip=self.pg1.local_ip4)
753 def test_dynamic_icmp_errors_in2out_ttl_2(self):
754 """ SNAT handling of error responses to client packets with TTL=2 """
756 self.snat_add_address(self.snat_addr)
757 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
758 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
761 # Client side - generate traffic
762 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
763 self.pg0.add_stream(pkts)
764 self.pg_enable_capture(self.pg_interfaces)
767 # Server side - simulate ICMP type 11 response
768 capture = self.pg1.get_capture(len(pkts))
769 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
770 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
771 ICMP(type=11) / packet[IP] for packet in capture]
772 self.pg1.add_stream(pkts)
773 self.pg_enable_capture(self.pg_interfaces)
776 # Client side - verify ICMP type 11 packets
777 capture = self.pg0.get_capture(len(pkts))
778 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
780 def test_dynamic_icmp_errors_out2in_ttl_2(self):
781 """ SNAT handling of error responses to server packets with TTL=2 """
783 self.snat_add_address(self.snat_addr)
784 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
785 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
788 # Client side - create sessions
789 pkts = self.create_stream_in(self.pg0, self.pg1)
790 self.pg0.add_stream(pkts)
791 self.pg_enable_capture(self.pg_interfaces)
794 # Server side - generate traffic
795 capture = self.pg1.get_capture(len(pkts))
796 self.verify_capture_out(capture)
797 pkts = self.create_stream_out(self.pg1, ttl=2)
798 self.pg1.add_stream(pkts)
799 self.pg_enable_capture(self.pg_interfaces)
802 # Client side - simulate ICMP type 11 response
803 capture = self.pg0.get_capture(len(pkts))
804 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
805 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
806 ICMP(type=11) / packet[IP] for packet in capture]
807 self.pg0.add_stream(pkts)
808 self.pg_enable_capture(self.pg_interfaces)
811 # Server side - verify ICMP type 11 packets
812 capture = self.pg1.get_capture(len(pkts))
813 self.verify_capture_out_with_icmp_errors(capture)
815 def test_ping_out_interface_from_outside(self):
816 """ Ping SNAT out interface from outside network """
818 self.snat_add_address(self.snat_addr)
819 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
820 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
823 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
824 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
825 ICMP(id=self.icmp_id_out, type='echo-request'))
827 self.pg1.add_stream(pkts)
828 self.pg_enable_capture(self.pg_interfaces)
830 capture = self.pg1.get_capture(len(pkts))
831 self.assertEqual(1, len(capture))
834 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
835 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
836 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
837 self.assertEqual(packet[ICMP].type, 0) # echo reply
839 self.logger.error(ppp("Unexpected or invalid packet "
840 "(outside network):", packet))
843 def test_ping_internal_host_from_outside(self):
844 """ Ping internal host from outside network """
846 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
847 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
848 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
852 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
853 IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
854 ICMP(id=self.icmp_id_out, type='echo-request'))
855 self.pg1.add_stream(pkt)
856 self.pg_enable_capture(self.pg_interfaces)
858 capture = self.pg0.get_capture(1)
859 self.verify_capture_in(capture, self.pg0, packet_num=1)
860 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
863 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
864 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
865 ICMP(id=self.icmp_id_in, type='echo-reply'))
866 self.pg0.add_stream(pkt)
867 self.pg_enable_capture(self.pg_interfaces)
869 capture = self.pg1.get_capture(1)
870 self.verify_capture_out(capture, same_port=True, packet_num=1)
871 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
873 def test_static_in(self):
874 """ SNAT 1:1 NAT initialized from inside network """
877 self.tcp_port_out = 6303
878 self.udp_port_out = 6304
879 self.icmp_id_out = 6305
881 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
882 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
883 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
887 pkts = self.create_stream_in(self.pg0, self.pg1)
888 self.pg0.add_stream(pkts)
889 self.pg_enable_capture(self.pg_interfaces)
891 capture = self.pg1.get_capture(len(pkts))
892 self.verify_capture_out(capture, nat_ip, True)
895 pkts = self.create_stream_out(self.pg1, nat_ip)
896 self.pg1.add_stream(pkts)
897 self.pg_enable_capture(self.pg_interfaces)
899 capture = self.pg0.get_capture(len(pkts))
900 self.verify_capture_in(capture, self.pg0)
902 def test_static_out(self):
903 """ SNAT 1:1 NAT initialized from outside network """
906 self.tcp_port_out = 6303
907 self.udp_port_out = 6304
908 self.icmp_id_out = 6305
910 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
911 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
912 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
916 pkts = self.create_stream_out(self.pg1, nat_ip)
917 self.pg1.add_stream(pkts)
918 self.pg_enable_capture(self.pg_interfaces)
920 capture = self.pg0.get_capture(len(pkts))
921 self.verify_capture_in(capture, self.pg0)
924 pkts = self.create_stream_in(self.pg0, self.pg1)
925 self.pg0.add_stream(pkts)
926 self.pg_enable_capture(self.pg_interfaces)
928 capture = self.pg1.get_capture(len(pkts))
929 self.verify_capture_out(capture, nat_ip, True)
931 def test_static_with_port_in(self):
932 """ SNAT 1:1 NAT with port initialized from inside network """
934 self.tcp_port_out = 3606
935 self.udp_port_out = 3607
936 self.icmp_id_out = 3608
938 self.snat_add_address(self.snat_addr)
939 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
940 self.tcp_port_in, self.tcp_port_out,
942 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
943 self.udp_port_in, self.udp_port_out,
945 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
946 self.icmp_id_in, self.icmp_id_out,
947 proto=IP_PROTOS.icmp)
948 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
949 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
953 pkts = self.create_stream_in(self.pg0, self.pg1)
954 self.pg0.add_stream(pkts)
955 self.pg_enable_capture(self.pg_interfaces)
957 capture = self.pg1.get_capture(len(pkts))
958 self.verify_capture_out(capture)
961 pkts = self.create_stream_out(self.pg1)
962 self.pg1.add_stream(pkts)
963 self.pg_enable_capture(self.pg_interfaces)
965 capture = self.pg0.get_capture(len(pkts))
966 self.verify_capture_in(capture, self.pg0)
968 def test_static_with_port_out(self):
969 """ SNAT 1:1 NAT with port initialized from outside network """
971 self.tcp_port_out = 30606
972 self.udp_port_out = 30607
973 self.icmp_id_out = 30608
975 self.snat_add_address(self.snat_addr)
976 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
977 self.tcp_port_in, self.tcp_port_out,
979 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
980 self.udp_port_in, self.udp_port_out,
982 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
983 self.icmp_id_in, self.icmp_id_out,
984 proto=IP_PROTOS.icmp)
985 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
986 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
990 pkts = self.create_stream_out(self.pg1)
991 self.pg1.add_stream(pkts)
992 self.pg_enable_capture(self.pg_interfaces)
994 capture = self.pg0.get_capture(len(pkts))
995 self.verify_capture_in(capture, self.pg0)
998 pkts = self.create_stream_in(self.pg0, self.pg1)
999 self.pg0.add_stream(pkts)
1000 self.pg_enable_capture(self.pg_interfaces)
1002 capture = self.pg1.get_capture(len(pkts))
1003 self.verify_capture_out(capture)
1005 def test_static_vrf_aware(self):
1006 """ SNAT 1:1 NAT VRF awareness """
1008 nat_ip1 = "10.0.0.30"
1009 nat_ip2 = "10.0.0.40"
1010 self.tcp_port_out = 6303
1011 self.udp_port_out = 6304
1012 self.icmp_id_out = 6305
1014 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1016 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1018 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1020 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1021 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1023 # inside interface VRF match SNAT static mapping VRF
1024 pkts = self.create_stream_in(self.pg4, self.pg3)
1025 self.pg4.add_stream(pkts)
1026 self.pg_enable_capture(self.pg_interfaces)
1028 capture = self.pg3.get_capture(len(pkts))
1029 self.verify_capture_out(capture, nat_ip1, True)
1031 # inside interface VRF don't match SNAT static mapping VRF (packets
1033 pkts = self.create_stream_in(self.pg0, self.pg3)
1034 self.pg0.add_stream(pkts)
1035 self.pg_enable_capture(self.pg_interfaces)
1037 self.pg3.assert_nothing_captured()
1039 def test_multiple_inside_interfaces(self):
1040 """ SNAT multiple inside interfaces (non-overlapping address space) """
1042 self.snat_add_address(self.snat_addr)
1043 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1044 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1045 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1048 # between two S-NAT inside interfaces (no translation)
1049 pkts = self.create_stream_in(self.pg0, self.pg1)
1050 self.pg0.add_stream(pkts)
1051 self.pg_enable_capture(self.pg_interfaces)
1053 capture = self.pg1.get_capture(len(pkts))
1054 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1056 # from S-NAT inside to interface without S-NAT feature (no translation)
1057 pkts = self.create_stream_in(self.pg0, self.pg2)
1058 self.pg0.add_stream(pkts)
1059 self.pg_enable_capture(self.pg_interfaces)
1061 capture = self.pg2.get_capture(len(pkts))
1062 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1064 # in2out 1st interface
1065 pkts = self.create_stream_in(self.pg0, self.pg3)
1066 self.pg0.add_stream(pkts)
1067 self.pg_enable_capture(self.pg_interfaces)
1069 capture = self.pg3.get_capture(len(pkts))
1070 self.verify_capture_out(capture)
1072 # out2in 1st interface
1073 pkts = self.create_stream_out(self.pg3)
1074 self.pg3.add_stream(pkts)
1075 self.pg_enable_capture(self.pg_interfaces)
1077 capture = self.pg0.get_capture(len(pkts))
1078 self.verify_capture_in(capture, self.pg0)
1080 # in2out 2nd interface
1081 pkts = self.create_stream_in(self.pg1, self.pg3)
1082 self.pg1.add_stream(pkts)
1083 self.pg_enable_capture(self.pg_interfaces)
1085 capture = self.pg3.get_capture(len(pkts))
1086 self.verify_capture_out(capture)
1088 # out2in 2nd interface
1089 pkts = self.create_stream_out(self.pg3)
1090 self.pg3.add_stream(pkts)
1091 self.pg_enable_capture(self.pg_interfaces)
1093 capture = self.pg1.get_capture(len(pkts))
1094 self.verify_capture_in(capture, self.pg1)
1096 def test_inside_overlapping_interfaces(self):
1097 """ SNAT multiple inside interfaces with overlapping address space """
1099 static_nat_ip = "10.0.0.10"
1100 self.snat_add_address(self.snat_addr)
1101 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1103 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1104 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
1105 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
1106 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1109 # between S-NAT inside interfaces with same VRF (no translation)
1110 pkts = self.create_stream_in(self.pg4, self.pg5)
1111 self.pg4.add_stream(pkts)
1112 self.pg_enable_capture(self.pg_interfaces)
1114 capture = self.pg5.get_capture(len(pkts))
1115 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1117 # between S-NAT inside interfaces with different VRF (hairpinning)
1118 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1119 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1120 TCP(sport=1234, dport=5678))
1121 self.pg4.add_stream(p)
1122 self.pg_enable_capture(self.pg_interfaces)
1124 capture = self.pg6.get_capture(1)
1129 self.assertEqual(ip.src, self.snat_addr)
1130 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1131 self.assertNotEqual(tcp.sport, 1234)
1132 self.assertEqual(tcp.dport, 5678)
1134 self.logger.error(ppp("Unexpected or invalid packet:", p))
1137 # in2out 1st interface
1138 pkts = self.create_stream_in(self.pg4, self.pg3)
1139 self.pg4.add_stream(pkts)
1140 self.pg_enable_capture(self.pg_interfaces)
1142 capture = self.pg3.get_capture(len(pkts))
1143 self.verify_capture_out(capture)
1145 # out2in 1st interface
1146 pkts = self.create_stream_out(self.pg3)
1147 self.pg3.add_stream(pkts)
1148 self.pg_enable_capture(self.pg_interfaces)
1150 capture = self.pg4.get_capture(len(pkts))
1151 self.verify_capture_in(capture, self.pg4)
1153 # in2out 2nd interface
1154 pkts = self.create_stream_in(self.pg5, self.pg3)
1155 self.pg5.add_stream(pkts)
1156 self.pg_enable_capture(self.pg_interfaces)
1158 capture = self.pg3.get_capture(len(pkts))
1159 self.verify_capture_out(capture)
1161 # out2in 2nd interface
1162 pkts = self.create_stream_out(self.pg3)
1163 self.pg3.add_stream(pkts)
1164 self.pg_enable_capture(self.pg_interfaces)
1166 capture = self.pg5.get_capture(len(pkts))
1167 self.verify_capture_in(capture, self.pg5)
1170 addresses = self.vapi.snat_address_dump()
1171 self.assertEqual(len(addresses), 1)
1172 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1173 self.assertEqual(len(sessions), 3)
1174 for session in sessions:
1175 self.assertFalse(session.is_static)
1176 self.assertEqual(session.inside_ip_address[0:4],
1177 self.pg5.remote_ip4n)
1178 self.assertEqual(session.outside_ip_address,
1179 addresses[0].ip_address)
1180 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1181 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1182 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1183 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1184 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1185 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1186 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1187 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1188 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1190 # in2out 3rd interface
1191 pkts = self.create_stream_in(self.pg6, self.pg3)
1192 self.pg6.add_stream(pkts)
1193 self.pg_enable_capture(self.pg_interfaces)
1195 capture = self.pg3.get_capture(len(pkts))
1196 self.verify_capture_out(capture, static_nat_ip, True)
1198 # out2in 3rd interface
1199 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1200 self.pg3.add_stream(pkts)
1201 self.pg_enable_capture(self.pg_interfaces)
1203 capture = self.pg6.get_capture(len(pkts))
1204 self.verify_capture_in(capture, self.pg6)
1206 # general user and session dump verifications
1207 users = self.vapi.snat_user_dump()
1208 self.assertTrue(len(users) >= 3)
1209 addresses = self.vapi.snat_address_dump()
1210 self.assertEqual(len(addresses), 1)
1212 sessions = self.vapi.snat_user_session_dump(user.ip_address,
1214 for session in sessions:
1215 self.assertEqual(user.ip_address, session.inside_ip_address)
1216 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1217 self.assertTrue(session.protocol in
1218 [IP_PROTOS.tcp, IP_PROTOS.udp,
1222 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1223 self.assertTrue(len(sessions) >= 4)
1224 for session in sessions:
1225 self.assertFalse(session.is_static)
1226 self.assertEqual(session.inside_ip_address[0:4],
1227 self.pg4.remote_ip4n)
1228 self.assertEqual(session.outside_ip_address,
1229 addresses[0].ip_address)
1232 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1233 self.assertTrue(len(sessions) >= 3)
1234 for session in sessions:
1235 self.assertTrue(session.is_static)
1236 self.assertEqual(session.inside_ip_address[0:4],
1237 self.pg6.remote_ip4n)
1238 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1239 map(int, static_nat_ip.split('.')))
1240 self.assertTrue(session.inside_port in
1241 [self.tcp_port_in, self.udp_port_in,
1244 def test_hairpinning(self):
1245 """ SNAT hairpinning - 1:1 NAT with port"""
1247 host = self.pg0.remote_hosts[0]
1248 server = self.pg0.remote_hosts[1]
1251 server_in_port = 5678
1252 server_out_port = 8765
1254 self.snat_add_address(self.snat_addr)
1255 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1256 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1258 # add static mapping for server
1259 self.snat_add_static_mapping(server.ip4, self.snat_addr,
1260 server_in_port, server_out_port,
1261 proto=IP_PROTOS.tcp)
1263 # send packet from host to server
1264 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1265 IP(src=host.ip4, dst=self.snat_addr) /
1266 TCP(sport=host_in_port, dport=server_out_port))
1267 self.pg0.add_stream(p)
1268 self.pg_enable_capture(self.pg_interfaces)
1270 capture = self.pg0.get_capture(1)
1275 self.assertEqual(ip.src, self.snat_addr)
1276 self.assertEqual(ip.dst, server.ip4)
1277 self.assertNotEqual(tcp.sport, host_in_port)
1278 self.assertEqual(tcp.dport, server_in_port)
1279 self.check_tcp_checksum(p)
1280 host_out_port = tcp.sport
1282 self.logger.error(ppp("Unexpected or invalid packet:", p))
1285 # send reply from server to host
1286 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1287 IP(src=server.ip4, dst=self.snat_addr) /
1288 TCP(sport=server_in_port, dport=host_out_port))
1289 self.pg0.add_stream(p)
1290 self.pg_enable_capture(self.pg_interfaces)
1292 capture = self.pg0.get_capture(1)
1297 self.assertEqual(ip.src, self.snat_addr)
1298 self.assertEqual(ip.dst, host.ip4)
1299 self.assertEqual(tcp.sport, server_out_port)
1300 self.assertEqual(tcp.dport, host_in_port)
1301 self.check_tcp_checksum(p)
1303 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1306 def test_hairpinning2(self):
1307 """ SNAT hairpinning - 1:1 NAT"""
1309 server1_nat_ip = "10.0.0.10"
1310 server2_nat_ip = "10.0.0.11"
1311 host = self.pg0.remote_hosts[0]
1312 server1 = self.pg0.remote_hosts[1]
1313 server2 = self.pg0.remote_hosts[2]
1314 server_tcp_port = 22
1315 server_udp_port = 20
1317 self.snat_add_address(self.snat_addr)
1318 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1319 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1322 # add static mapping for servers
1323 self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1324 self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1328 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1329 IP(src=host.ip4, dst=server1_nat_ip) /
1330 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1332 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1333 IP(src=host.ip4, dst=server1_nat_ip) /
1334 UDP(sport=self.udp_port_in, dport=server_udp_port))
1336 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1337 IP(src=host.ip4, dst=server1_nat_ip) /
1338 ICMP(id=self.icmp_id_in, type='echo-request'))
1340 self.pg0.add_stream(pkts)
1341 self.pg_enable_capture(self.pg_interfaces)
1343 capture = self.pg0.get_capture(len(pkts))
1344 for packet in capture:
1346 self.assertEqual(packet[IP].src, self.snat_addr)
1347 self.assertEqual(packet[IP].dst, server1.ip4)
1348 if packet.haslayer(TCP):
1349 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1350 self.assertEqual(packet[TCP].dport, server_tcp_port)
1351 self.tcp_port_out = packet[TCP].sport
1352 self.check_tcp_checksum(packet)
1353 elif packet.haslayer(UDP):
1354 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1355 self.assertEqual(packet[UDP].dport, server_udp_port)
1356 self.udp_port_out = packet[UDP].sport
1358 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1359 self.icmp_id_out = packet[ICMP].id
1361 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1366 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1367 IP(src=server1.ip4, dst=self.snat_addr) /
1368 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1370 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1371 IP(src=server1.ip4, dst=self.snat_addr) /
1372 UDP(sport=server_udp_port, dport=self.udp_port_out))
1374 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1375 IP(src=server1.ip4, dst=self.snat_addr) /
1376 ICMP(id=self.icmp_id_out, type='echo-reply'))
1378 self.pg0.add_stream(pkts)
1379 self.pg_enable_capture(self.pg_interfaces)
1381 capture = self.pg0.get_capture(len(pkts))
1382 for packet in capture:
1384 self.assertEqual(packet[IP].src, server1_nat_ip)
1385 self.assertEqual(packet[IP].dst, host.ip4)
1386 if packet.haslayer(TCP):
1387 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1388 self.assertEqual(packet[TCP].sport, server_tcp_port)
1389 self.check_tcp_checksum(packet)
1390 elif packet.haslayer(UDP):
1391 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1392 self.assertEqual(packet[UDP].sport, server_udp_port)
1394 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1396 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1399 # server2 to server1
1401 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1402 IP(src=server2.ip4, dst=server1_nat_ip) /
1403 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1405 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1406 IP(src=server2.ip4, dst=server1_nat_ip) /
1407 UDP(sport=self.udp_port_in, dport=server_udp_port))
1409 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1410 IP(src=server2.ip4, dst=server1_nat_ip) /
1411 ICMP(id=self.icmp_id_in, type='echo-request'))
1413 self.pg0.add_stream(pkts)
1414 self.pg_enable_capture(self.pg_interfaces)
1416 capture = self.pg0.get_capture(len(pkts))
1417 for packet in capture:
1419 self.assertEqual(packet[IP].src, server2_nat_ip)
1420 self.assertEqual(packet[IP].dst, server1.ip4)
1421 if packet.haslayer(TCP):
1422 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1423 self.assertEqual(packet[TCP].dport, server_tcp_port)
1424 self.tcp_port_out = packet[TCP].sport
1425 self.check_tcp_checksum(packet)
1426 elif packet.haslayer(UDP):
1427 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1428 self.assertEqual(packet[UDP].dport, server_udp_port)
1429 self.udp_port_out = packet[UDP].sport
1431 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1432 self.icmp_id_out = packet[ICMP].id
1434 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1437 # server1 to server2
1439 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1440 IP(src=server1.ip4, dst=server2_nat_ip) /
1441 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1443 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1444 IP(src=server1.ip4, dst=server2_nat_ip) /
1445 UDP(sport=server_udp_port, dport=self.udp_port_out))
1447 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1448 IP(src=server1.ip4, dst=server2_nat_ip) /
1449 ICMP(id=self.icmp_id_out, type='echo-reply'))
1451 self.pg0.add_stream(pkts)
1452 self.pg_enable_capture(self.pg_interfaces)
1454 capture = self.pg0.get_capture(len(pkts))
1455 for packet in capture:
1457 self.assertEqual(packet[IP].src, server1_nat_ip)
1458 self.assertEqual(packet[IP].dst, server2.ip4)
1459 if packet.haslayer(TCP):
1460 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1461 self.assertEqual(packet[TCP].sport, server_tcp_port)
1462 self.check_tcp_checksum(packet)
1463 elif packet.haslayer(UDP):
1464 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1465 self.assertEqual(packet[UDP].sport, server_udp_port)
1467 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1469 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1472 def test_max_translations_per_user(self):
1473 """ MAX translations per user - recycle the least recently used """
1475 self.snat_add_address(self.snat_addr)
1476 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1477 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1480 # get maximum number of translations per user
1481 snat_config = self.vapi.snat_show_config()
1483 # send more than maximum number of translations per user packets
1484 pkts_num = snat_config.max_translations_per_user + 5
1486 for port in range(0, pkts_num):
1487 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1488 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1489 TCP(sport=1025 + port))
1491 self.pg0.add_stream(pkts)
1492 self.pg_enable_capture(self.pg_interfaces)
1495 # verify number of translated packet
1496 self.pg1.get_capture(pkts_num)
1498 def test_interface_addr(self):
1499 """ Acquire SNAT addresses from interface """
1500 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1502 # no address in NAT pool
1503 adresses = self.vapi.snat_address_dump()
1504 self.assertEqual(0, len(adresses))
1506 # configure interface address and check NAT address pool
1507 self.pg7.config_ip4()
1508 adresses = self.vapi.snat_address_dump()
1509 self.assertEqual(1, len(adresses))
1510 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1512 # remove interface address and check NAT address pool
1513 self.pg7.unconfig_ip4()
1514 adresses = self.vapi.snat_address_dump()
1515 self.assertEqual(0, len(adresses))
1517 def test_interface_addr_static_mapping(self):
1518 """ Static mapping with addresses from interface """
1519 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1520 self.snat_add_static_mapping('1.2.3.4',
1521 external_sw_if_index=self.pg7.sw_if_index)
1523 # static mappings with external interface
1524 static_mappings = self.vapi.snat_static_mapping_dump()
1525 self.assertEqual(1, len(static_mappings))
1526 self.assertEqual(self.pg7.sw_if_index,
1527 static_mappings[0].external_sw_if_index)
1529 # configure interface address and check static mappings
1530 self.pg7.config_ip4()
1531 static_mappings = self.vapi.snat_static_mapping_dump()
1532 self.assertEqual(1, len(static_mappings))
1533 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1534 self.pg7.local_ip4n)
1535 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1537 # remove interface address and check static mappings
1538 self.pg7.unconfig_ip4()
1539 static_mappings = self.vapi.snat_static_mapping_dump()
1540 self.assertEqual(0, len(static_mappings))
1542 def test_ipfix_nat44_sess(self):
1543 """ S-NAT IPFIX logging NAT44 session created/delted """
1544 self.ipfix_domain_id = 10
1545 self.ipfix_src_port = 20202
1546 colector_port = 30303
1547 bind_layers(UDP, IPFIX, dport=30303)
1548 self.snat_add_address(self.snat_addr)
1549 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1550 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1552 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1553 src_address=self.pg3.local_ip4n,
1555 template_interval=10,
1556 collector_port=colector_port)
1557 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1558 src_port=self.ipfix_src_port)
1560 pkts = self.create_stream_in(self.pg0, self.pg1)
1561 self.pg0.add_stream(pkts)
1562 self.pg_enable_capture(self.pg_interfaces)
1564 capture = self.pg1.get_capture(len(pkts))
1565 self.verify_capture_out(capture)
1566 self.snat_add_address(self.snat_addr, is_add=0)
1567 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1568 capture = self.pg3.get_capture(3)
1569 ipfix = IPFIXDecoder()
1570 # first load template
1572 self.assertTrue(p.haslayer(IPFIX))
1573 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1574 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1575 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1576 self.assertEqual(p[UDP].dport, colector_port)
1577 self.assertEqual(p[IPFIX].observationDomainID,
1578 self.ipfix_domain_id)
1579 if p.haslayer(Template):
1580 ipfix.add_template(p.getlayer(Template))
1581 # verify events in data set
1583 if p.haslayer(Data):
1584 data = ipfix.decode_data_set(p.getlayer(Set))
1585 self.verify_ipfix_nat44_ses(data)
1587 def test_ipfix_addr_exhausted(self):
1588 """ S-NAT IPFIX logging NAT addresses exhausted """
1589 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1590 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1592 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1593 src_address=self.pg3.local_ip4n,
1595 template_interval=10)
1596 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1597 src_port=self.ipfix_src_port)
1599 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1600 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1602 self.pg0.add_stream(p)
1603 self.pg_enable_capture(self.pg_interfaces)
1605 capture = self.pg1.get_capture(0)
1606 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1607 capture = self.pg3.get_capture(3)
1608 ipfix = IPFIXDecoder()
1609 # first load template
1611 self.assertTrue(p.haslayer(IPFIX))
1612 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1613 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1614 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1615 self.assertEqual(p[UDP].dport, 4739)
1616 self.assertEqual(p[IPFIX].observationDomainID,
1617 self.ipfix_domain_id)
1618 if p.haslayer(Template):
1619 ipfix.add_template(p.getlayer(Template))
1620 # verify events in data set
1622 if p.haslayer(Data):
1623 data = ipfix.decode_data_set(p.getlayer(Set))
1624 self.verify_ipfix_addr_exhausted(data)
1626 def test_pool_addr_fib(self):
1627 """ S-NAT add pool addresses to FIB """
1628 static_addr = '10.0.0.10'
1629 self.snat_add_address(self.snat_addr)
1630 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1631 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1633 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1636 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1637 ARP(op=ARP.who_has, pdst=self.snat_addr,
1638 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1639 self.pg1.add_stream(p)
1640 self.pg_enable_capture(self.pg_interfaces)
1642 capture = self.pg1.get_capture(1)
1643 self.assertTrue(capture[0].haslayer(ARP))
1644 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1647 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1648 ARP(op=ARP.who_has, pdst=static_addr,
1649 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1650 self.pg1.add_stream(p)
1651 self.pg_enable_capture(self.pg_interfaces)
1653 capture = self.pg1.get_capture(1)
1654 self.assertTrue(capture[0].haslayer(ARP))
1655 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1657 # send ARP to non-SNAT interface
1658 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1659 ARP(op=ARP.who_has, pdst=self.snat_addr,
1660 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1661 self.pg2.add_stream(p)
1662 self.pg_enable_capture(self.pg_interfaces)
1664 capture = self.pg1.get_capture(0)
1666 # remove addresses and verify
1667 self.snat_add_address(self.snat_addr, is_add=0)
1668 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1671 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1672 ARP(op=ARP.who_has, pdst=self.snat_addr,
1673 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1674 self.pg1.add_stream(p)
1675 self.pg_enable_capture(self.pg_interfaces)
1677 capture = self.pg1.get_capture(0)
1679 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1680 ARP(op=ARP.who_has, pdst=static_addr,
1681 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1682 self.pg1.add_stream(p)
1683 self.pg_enable_capture(self.pg_interfaces)
1685 capture = self.pg1.get_capture(0)
1687 def test_vrf_mode(self):
1688 """ S-NAT tenant VRF aware address pool mode """
1692 nat_ip1 = "10.0.0.10"
1693 nat_ip2 = "10.0.0.11"
1695 self.pg0.unconfig_ip4()
1696 self.pg1.unconfig_ip4()
1697 self.pg0.set_table_ip4(vrf_id1)
1698 self.pg1.set_table_ip4(vrf_id2)
1699 self.pg0.config_ip4()
1700 self.pg1.config_ip4()
1702 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1703 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1704 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1705 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1706 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1710 pkts = self.create_stream_in(self.pg0, self.pg2)
1711 self.pg0.add_stream(pkts)
1712 self.pg_enable_capture(self.pg_interfaces)
1714 capture = self.pg2.get_capture(len(pkts))
1715 self.verify_capture_out(capture, nat_ip1)
1718 pkts = self.create_stream_in(self.pg1, self.pg2)
1719 self.pg1.add_stream(pkts)
1720 self.pg_enable_capture(self.pg_interfaces)
1722 capture = self.pg2.get_capture(len(pkts))
1723 self.verify_capture_out(capture, nat_ip2)
1725 def test_vrf_feature_independent(self):
1726 """ S-NAT tenant VRF independent address pool mode """
1728 nat_ip1 = "10.0.0.10"
1729 nat_ip2 = "10.0.0.11"
1731 self.snat_add_address(nat_ip1)
1732 self.snat_add_address(nat_ip2)
1733 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1734 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1735 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1739 pkts = self.create_stream_in(self.pg0, self.pg2)
1740 self.pg0.add_stream(pkts)
1741 self.pg_enable_capture(self.pg_interfaces)
1743 capture = self.pg2.get_capture(len(pkts))
1744 self.verify_capture_out(capture, nat_ip1)
1747 pkts = self.create_stream_in(self.pg1, self.pg2)
1748 self.pg1.add_stream(pkts)
1749 self.pg_enable_capture(self.pg_interfaces)
1751 capture = self.pg2.get_capture(len(pkts))
1752 self.verify_capture_out(capture, nat_ip1)
1754 def test_dynamic_ipless_interfaces(self):
1755 """ SNAT interfaces without configured ip dynamic map """
1757 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1758 self.pg7.remote_mac,
1759 self.pg7.remote_ip4n,
1761 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1762 self.pg8.remote_mac,
1763 self.pg8.remote_ip4n,
1766 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1767 dst_address_length=32,
1768 next_hop_address=self.pg7.remote_ip4n,
1769 next_hop_sw_if_index=self.pg7.sw_if_index)
1770 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1771 dst_address_length=32,
1772 next_hop_address=self.pg8.remote_ip4n,
1773 next_hop_sw_if_index=self.pg8.sw_if_index)
1775 self.snat_add_address(self.snat_addr)
1776 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1777 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1781 pkts = self.create_stream_in(self.pg7, self.pg8)
1782 self.pg7.add_stream(pkts)
1783 self.pg_enable_capture(self.pg_interfaces)
1785 capture = self.pg8.get_capture(len(pkts))
1786 self.verify_capture_out(capture)
1789 pkts = self.create_stream_out(self.pg8, self.snat_addr)
1790 self.pg8.add_stream(pkts)
1791 self.pg_enable_capture(self.pg_interfaces)
1793 capture = self.pg7.get_capture(len(pkts))
1794 self.verify_capture_in(capture, self.pg7)
1796 def test_static_ipless_interfaces(self):
1797 """ SNAT 1:1 NAT interfaces without configured ip """
1799 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1800 self.pg7.remote_mac,
1801 self.pg7.remote_ip4n,
1803 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1804 self.pg8.remote_mac,
1805 self.pg8.remote_ip4n,
1808 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1809 dst_address_length=32,
1810 next_hop_address=self.pg7.remote_ip4n,
1811 next_hop_sw_if_index=self.pg7.sw_if_index)
1812 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1813 dst_address_length=32,
1814 next_hop_address=self.pg8.remote_ip4n,
1815 next_hop_sw_if_index=self.pg8.sw_if_index)
1817 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1818 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1819 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1823 pkts = self.create_stream_out(self.pg8)
1824 self.pg8.add_stream(pkts)
1825 self.pg_enable_capture(self.pg_interfaces)
1827 capture = self.pg7.get_capture(len(pkts))
1828 self.verify_capture_in(capture, self.pg7)
1831 pkts = self.create_stream_in(self.pg7, self.pg8)
1832 self.pg7.add_stream(pkts)
1833 self.pg_enable_capture(self.pg_interfaces)
1835 capture = self.pg8.get_capture(len(pkts))
1836 self.verify_capture_out(capture, self.snat_addr, True)
1838 def test_static_with_port_ipless_interfaces(self):
1839 """ SNAT 1:1 NAT with port interfaces without configured ip """
1841 self.tcp_port_out = 30606
1842 self.udp_port_out = 30607
1843 self.icmp_id_out = 30608
1845 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1846 self.pg7.remote_mac,
1847 self.pg7.remote_ip4n,
1849 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1850 self.pg8.remote_mac,
1851 self.pg8.remote_ip4n,
1854 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1855 dst_address_length=32,
1856 next_hop_address=self.pg7.remote_ip4n,
1857 next_hop_sw_if_index=self.pg7.sw_if_index)
1858 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1859 dst_address_length=32,
1860 next_hop_address=self.pg8.remote_ip4n,
1861 next_hop_sw_if_index=self.pg8.sw_if_index)
1863 self.snat_add_address(self.snat_addr)
1864 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1865 self.tcp_port_in, self.tcp_port_out,
1866 proto=IP_PROTOS.tcp)
1867 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1868 self.udp_port_in, self.udp_port_out,
1869 proto=IP_PROTOS.udp)
1870 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1871 self.icmp_id_in, self.icmp_id_out,
1872 proto=IP_PROTOS.icmp)
1873 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1874 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1878 pkts = self.create_stream_out(self.pg8)
1879 self.pg8.add_stream(pkts)
1880 self.pg_enable_capture(self.pg_interfaces)
1882 capture = self.pg7.get_capture(len(pkts))
1883 self.verify_capture_in(capture, self.pg7)
1886 pkts = self.create_stream_in(self.pg7, self.pg8)
1887 self.pg7.add_stream(pkts)
1888 self.pg_enable_capture(self.pg_interfaces)
1890 capture = self.pg8.get_capture(len(pkts))
1891 self.verify_capture_out(capture)
1893 def test_static_unknown_proto(self):
1894 """ 1:1 NAT translate packet with unknown protocol """
1895 nat_ip = "10.0.0.10"
1896 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1897 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1898 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1902 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1903 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1905 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1906 TCP(sport=1234, dport=1234))
1907 self.pg0.add_stream(p)
1908 self.pg_enable_capture(self.pg_interfaces)
1910 p = self.pg1.get_capture(1)
1913 self.assertEqual(packet[IP].src, nat_ip)
1914 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1915 self.assertTrue(packet.haslayer(GRE))
1916 self.check_ip_checksum(packet)
1918 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1922 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1923 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1925 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1926 TCP(sport=1234, dport=1234))
1927 self.pg1.add_stream(p)
1928 self.pg_enable_capture(self.pg_interfaces)
1930 p = self.pg0.get_capture(1)
1933 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
1934 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
1935 self.assertTrue(packet.haslayer(GRE))
1936 self.check_ip_checksum(packet)
1938 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1941 def test_hairpinning_static_unknown_proto(self):
1942 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
1944 host = self.pg0.remote_hosts[0]
1945 server = self.pg0.remote_hosts[1]
1947 host_nat_ip = "10.0.0.10"
1948 server_nat_ip = "10.0.0.11"
1950 self.snat_add_static_mapping(host.ip4, host_nat_ip)
1951 self.snat_add_static_mapping(server.ip4, server_nat_ip)
1952 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1953 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1957 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
1958 IP(src=host.ip4, dst=server_nat_ip) /
1960 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1961 TCP(sport=1234, dport=1234))
1962 self.pg0.add_stream(p)
1963 self.pg_enable_capture(self.pg_interfaces)
1965 p = self.pg0.get_capture(1)
1968 self.assertEqual(packet[IP].src, host_nat_ip)
1969 self.assertEqual(packet[IP].dst, server.ip4)
1970 self.assertTrue(packet.haslayer(GRE))
1971 self.check_ip_checksum(packet)
1973 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1977 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
1978 IP(src=server.ip4, dst=host_nat_ip) /
1980 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1981 TCP(sport=1234, dport=1234))
1982 self.pg0.add_stream(p)
1983 self.pg_enable_capture(self.pg_interfaces)
1985 p = self.pg0.get_capture(1)
1988 self.assertEqual(packet[IP].src, server_nat_ip)
1989 self.assertEqual(packet[IP].dst, host.ip4)
1990 self.assertTrue(packet.haslayer(GRE))
1991 self.check_ip_checksum(packet)
1993 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1996 def test_unknown_proto(self):
1997 """ SNAT translate packet with unknown protocol """
1998 self.snat_add_address(self.snat_addr)
1999 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2000 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2004 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2005 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2006 TCP(sport=self.tcp_port_in, dport=20))
2007 self.pg0.add_stream(p)
2008 self.pg_enable_capture(self.pg_interfaces)
2010 p = self.pg1.get_capture(1)
2012 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2013 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2015 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2016 TCP(sport=1234, dport=1234))
2017 self.pg0.add_stream(p)
2018 self.pg_enable_capture(self.pg_interfaces)
2020 p = self.pg1.get_capture(1)
2023 self.assertEqual(packet[IP].src, self.snat_addr)
2024 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2025 self.assertTrue(packet.haslayer(GRE))
2026 self.check_ip_checksum(packet)
2028 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2032 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2033 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2035 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2036 TCP(sport=1234, dport=1234))
2037 self.pg1.add_stream(p)
2038 self.pg_enable_capture(self.pg_interfaces)
2040 p = self.pg0.get_capture(1)
2043 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2044 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2045 self.assertTrue(packet.haslayer(GRE))
2046 self.check_ip_checksum(packet)
2048 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2051 def test_hairpinning_unknown_proto(self):
2052 """ SNAT translate packet with unknown protocol - hairpinning """
2053 host = self.pg0.remote_hosts[0]
2054 server = self.pg0.remote_hosts[1]
2057 server_in_port = 5678
2058 server_out_port = 8765
2059 server_nat_ip = "10.0.0.11"
2061 self.snat_add_address(self.snat_addr)
2062 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2063 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2066 # add static mapping for server
2067 self.snat_add_static_mapping(server.ip4, server_nat_ip)
2070 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2071 IP(src=host.ip4, dst=server_nat_ip) /
2072 TCP(sport=host_in_port, dport=server_out_port))
2073 self.pg0.add_stream(p)
2074 self.pg_enable_capture(self.pg_interfaces)
2076 capture = self.pg0.get_capture(1)
2078 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2079 IP(src=host.ip4, dst=server_nat_ip) /
2081 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2082 TCP(sport=1234, dport=1234))
2083 self.pg0.add_stream(p)
2084 self.pg_enable_capture(self.pg_interfaces)
2086 p = self.pg0.get_capture(1)
2089 self.assertEqual(packet[IP].src, self.snat_addr)
2090 self.assertEqual(packet[IP].dst, server.ip4)
2091 self.assertTrue(packet.haslayer(GRE))
2092 self.check_ip_checksum(packet)
2094 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2098 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2099 IP(src=server.ip4, dst=self.snat_addr) /
2101 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2102 TCP(sport=1234, dport=1234))
2103 self.pg0.add_stream(p)
2104 self.pg_enable_capture(self.pg_interfaces)
2106 p = self.pg0.get_capture(1)
2109 self.assertEqual(packet[IP].src, server_nat_ip)
2110 self.assertEqual(packet[IP].dst, host.ip4)
2111 self.assertTrue(packet.haslayer(GRE))
2112 self.check_ip_checksum(packet)
2114 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2117 def test_output_feature(self):
2118 """ S-NAT interface output feature (in2out postrouting) """
2119 self.snat_add_address(self.snat_addr)
2120 self.vapi.snat_interface_add_del_output_feature(self.pg0.sw_if_index)
2121 self.vapi.snat_interface_add_del_output_feature(self.pg1.sw_if_index,
2125 pkts = self.create_stream_in(self.pg0, self.pg1)
2126 self.pg0.add_stream(pkts)
2127 self.pg_enable_capture(self.pg_interfaces)
2129 capture = self.pg1.get_capture(len(pkts))
2130 self.verify_capture_out(capture)
2133 pkts = self.create_stream_out(self.pg1)
2134 self.pg1.add_stream(pkts)
2135 self.pg_enable_capture(self.pg_interfaces)
2137 capture = self.pg0.get_capture(len(pkts))
2138 self.verify_capture_in(capture, self.pg0)
2140 def test_output_feature_vrf_aware(self):
2141 """ S-NAT interface output feature VRF aware (in2out postrouting) """
2142 nat_ip_vrf10 = "10.0.0.10"
2143 nat_ip_vrf20 = "10.0.0.20"
2145 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2146 dst_address_length=32,
2147 next_hop_address=self.pg3.remote_ip4n,
2148 next_hop_sw_if_index=self.pg3.sw_if_index,
2150 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2151 dst_address_length=32,
2152 next_hop_address=self.pg3.remote_ip4n,
2153 next_hop_sw_if_index=self.pg3.sw_if_index,
2156 self.snat_add_address(nat_ip_vrf10, vrf_id=10)
2157 self.snat_add_address(nat_ip_vrf20, vrf_id=20)
2158 self.vapi.snat_interface_add_del_output_feature(self.pg4.sw_if_index)
2159 self.vapi.snat_interface_add_del_output_feature(self.pg6.sw_if_index)
2160 self.vapi.snat_interface_add_del_output_feature(self.pg3.sw_if_index,
2164 pkts = self.create_stream_in(self.pg4, self.pg3)
2165 self.pg4.add_stream(pkts)
2166 self.pg_enable_capture(self.pg_interfaces)
2168 capture = self.pg3.get_capture(len(pkts))
2169 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2172 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2173 self.pg3.add_stream(pkts)
2174 self.pg_enable_capture(self.pg_interfaces)
2176 capture = self.pg4.get_capture(len(pkts))
2177 self.verify_capture_in(capture, self.pg4)
2180 pkts = self.create_stream_in(self.pg6, self.pg3)
2181 self.pg6.add_stream(pkts)
2182 self.pg_enable_capture(self.pg_interfaces)
2184 capture = self.pg3.get_capture(len(pkts))
2185 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2188 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2189 self.pg3.add_stream(pkts)
2190 self.pg_enable_capture(self.pg_interfaces)
2192 capture = self.pg6.get_capture(len(pkts))
2193 self.verify_capture_in(capture, self.pg6)
2195 def test_output_feature_hairpinning(self):
2196 """ S-NAT interface output feature hairpinning (in2out postrouting) """
2197 host = self.pg0.remote_hosts[0]
2198 server = self.pg0.remote_hosts[1]
2201 server_in_port = 5678
2202 server_out_port = 8765
2204 self.snat_add_address(self.snat_addr)
2205 self.vapi.snat_interface_add_del_output_feature(self.pg0.sw_if_index)
2206 self.vapi.snat_interface_add_del_output_feature(self.pg1.sw_if_index,
2209 # add static mapping for server
2210 self.snat_add_static_mapping(server.ip4, self.snat_addr,
2211 server_in_port, server_out_port,
2212 proto=IP_PROTOS.tcp)
2214 # send packet from host to server
2215 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2216 IP(src=host.ip4, dst=self.snat_addr) /
2217 TCP(sport=host_in_port, dport=server_out_port))
2218 self.pg0.add_stream(p)
2219 self.pg_enable_capture(self.pg_interfaces)
2221 capture = self.pg0.get_capture(1)
2226 self.assertEqual(ip.src, self.snat_addr)
2227 self.assertEqual(ip.dst, server.ip4)
2228 self.assertNotEqual(tcp.sport, host_in_port)
2229 self.assertEqual(tcp.dport, server_in_port)
2230 self.check_tcp_checksum(p)
2231 host_out_port = tcp.sport
2233 self.logger.error(ppp("Unexpected or invalid packet:", p))
2236 # send reply from server to host
2237 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2238 IP(src=server.ip4, dst=self.snat_addr) /
2239 TCP(sport=server_in_port, dport=host_out_port))
2240 self.pg0.add_stream(p)
2241 self.pg_enable_capture(self.pg_interfaces)
2243 capture = self.pg0.get_capture(1)
2248 self.assertEqual(ip.src, self.snat_addr)
2249 self.assertEqual(ip.dst, host.ip4)
2250 self.assertEqual(tcp.sport, server_out_port)
2251 self.assertEqual(tcp.dport, host_in_port)
2252 self.check_tcp_checksum(p)
2254 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2258 super(TestSNAT, self).tearDown()
2259 if not self.vpp_dead:
2260 self.logger.info(self.vapi.cli("show snat verbose"))
2264 class TestDeterministicNAT(MethodHolder):
2265 """ Deterministic NAT Test Cases """
2268 def setUpConstants(cls):
2269 super(TestDeterministicNAT, cls).setUpConstants()
2270 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
2273 def setUpClass(cls):
2274 super(TestDeterministicNAT, cls).setUpClass()
2277 cls.tcp_port_in = 6303
2278 cls.tcp_external_port = 6303
2279 cls.udp_port_in = 6304
2280 cls.udp_external_port = 6304
2281 cls.icmp_id_in = 6305
2282 cls.snat_addr = '10.0.0.3'
2284 cls.create_pg_interfaces(range(3))
2285 cls.interfaces = list(cls.pg_interfaces)
2287 for i in cls.interfaces:
2292 cls.pg0.generate_remote_hosts(2)
2293 cls.pg0.configure_ipv4_neighbors()
2296 super(TestDeterministicNAT, cls).tearDownClass()
2299 def create_stream_in(self, in_if, out_if, ttl=64):
2301 Create packet stream for inside network
2303 :param in_if: Inside interface
2304 :param out_if: Outside interface
2305 :param ttl: TTL of generated packets
2309 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2310 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2311 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2315 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2316 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2317 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2321 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2322 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2323 ICMP(id=self.icmp_id_in, type='echo-request'))
2328 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2330 Create packet stream for outside network
2332 :param out_if: Outside interface
2333 :param dst_ip: Destination IP address (Default use global SNAT address)
2334 :param ttl: TTL of generated packets
2337 dst_ip = self.snat_addr
2340 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2341 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2342 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2346 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2347 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2348 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2352 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2353 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2354 ICMP(id=self.icmp_external_id, type='echo-reply'))
2359 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2361 Verify captured packets on outside network
2363 :param capture: Captured packets
2364 :param nat_ip: Translated IP address (Default use global SNAT address)
2365 :param same_port: Sorce port number is not translated (Default False)
2366 :param packet_num: Expected number of packets (Default 3)
2369 nat_ip = self.snat_addr
2370 self.assertEqual(packet_num, len(capture))
2371 for packet in capture:
2373 self.assertEqual(packet[IP].src, nat_ip)
2374 if packet.haslayer(TCP):
2375 self.tcp_port_out = packet[TCP].sport
2376 elif packet.haslayer(UDP):
2377 self.udp_port_out = packet[UDP].sport
2379 self.icmp_external_id = packet[ICMP].id
2381 self.logger.error(ppp("Unexpected or invalid packet "
2382 "(outside network):", packet))
2385 def initiate_tcp_session(self, in_if, out_if):
2387 Initiates TCP session
2389 :param in_if: Inside interface
2390 :param out_if: Outside interface
2393 # SYN packet in->out
2394 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2395 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2396 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2399 self.pg_enable_capture(self.pg_interfaces)
2401 capture = out_if.get_capture(1)
2403 self.tcp_port_out = p[TCP].sport
2405 # SYN + ACK packet out->in
2406 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2407 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
2408 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2410 out_if.add_stream(p)
2411 self.pg_enable_capture(self.pg_interfaces)
2413 in_if.get_capture(1)
2415 # ACK packet in->out
2416 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2417 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2418 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2421 self.pg_enable_capture(self.pg_interfaces)
2423 out_if.get_capture(1)
2426 self.logger.error("TCP 3 way handshake failed")
2429 def verify_ipfix_max_entries_per_user(self, data):
2431 Verify IPFIX maximum entries per user exceeded event
2433 :param data: Decoded IPFIX data records
2435 self.assertEqual(1, len(data))
2438 self.assertEqual(ord(record[230]), 13)
2439 # natQuotaExceededEvent
2440 self.assertEqual('\x03\x00\x00\x00', record[466])
2442 self.assertEqual(self.pg0.remote_ip4n, record[8])
2444 def test_deterministic_mode(self):
2445 """ S-NAT run deterministic mode """
2446 in_addr = '172.16.255.0'
2447 out_addr = '172.17.255.50'
2448 in_addr_t = '172.16.255.20'
2449 in_addr_n = socket.inet_aton(in_addr)
2450 out_addr_n = socket.inet_aton(out_addr)
2451 in_addr_t_n = socket.inet_aton(in_addr_t)
2455 snat_config = self.vapi.snat_show_config()
2456 self.assertEqual(1, snat_config.deterministic)
2458 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
2460 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
2461 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2462 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
2463 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2465 deterministic_mappings = self.vapi.snat_det_map_dump()
2466 self.assertEqual(len(deterministic_mappings), 1)
2467 dsm = deterministic_mappings[0]
2468 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2469 self.assertEqual(in_plen, dsm.in_plen)
2470 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2471 self.assertEqual(out_plen, dsm.out_plen)
2474 deterministic_mappings = self.vapi.snat_det_map_dump()
2475 self.assertEqual(len(deterministic_mappings), 0)
2477 def test_set_timeouts(self):
2478 """ Set deterministic NAT timeouts """
2479 timeouts_before = self.vapi.snat_det_get_timeouts()
2481 self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
2482 timeouts_before.tcp_established + 10,
2483 timeouts_before.tcp_transitory + 10,
2484 timeouts_before.icmp + 10)
2486 timeouts_after = self.vapi.snat_det_get_timeouts()
2488 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2489 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2490 self.assertNotEqual(timeouts_before.tcp_established,
2491 timeouts_after.tcp_established)
2492 self.assertNotEqual(timeouts_before.tcp_transitory,
2493 timeouts_after.tcp_transitory)
2495 def test_det_in(self):
2496 """ CGNAT translation test (TCP, UDP, ICMP) """
2498 nat_ip = "10.0.0.10"
2500 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2502 socket.inet_aton(nat_ip),
2504 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2505 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2509 pkts = self.create_stream_in(self.pg0, self.pg1)
2510 self.pg0.add_stream(pkts)
2511 self.pg_enable_capture(self.pg_interfaces)
2513 capture = self.pg1.get_capture(len(pkts))
2514 self.verify_capture_out(capture, nat_ip)
2517 pkts = self.create_stream_out(self.pg1, nat_ip)
2518 self.pg1.add_stream(pkts)
2519 self.pg_enable_capture(self.pg_interfaces)
2521 capture = self.pg0.get_capture(len(pkts))
2522 self.verify_capture_in(capture, self.pg0)
2525 sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2526 self.assertEqual(len(sessions), 3)
2530 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2531 self.assertEqual(s.in_port, self.tcp_port_in)
2532 self.assertEqual(s.out_port, self.tcp_port_out)
2533 self.assertEqual(s.ext_port, self.tcp_external_port)
2537 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2538 self.assertEqual(s.in_port, self.udp_port_in)
2539 self.assertEqual(s.out_port, self.udp_port_out)
2540 self.assertEqual(s.ext_port, self.udp_external_port)
2544 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2545 self.assertEqual(s.in_port, self.icmp_id_in)
2546 self.assertEqual(s.out_port, self.icmp_external_id)
2548 def test_multiple_users(self):
2549 """ CGNAT multiple users """
2551 nat_ip = "10.0.0.10"
2553 external_port = 6303
2555 host0 = self.pg0.remote_hosts[0]
2556 host1 = self.pg0.remote_hosts[1]
2558 self.vapi.snat_add_det_map(host0.ip4n,
2560 socket.inet_aton(nat_ip),
2562 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2563 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2567 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2568 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2569 TCP(sport=port_in, dport=external_port))
2570 self.pg0.add_stream(p)
2571 self.pg_enable_capture(self.pg_interfaces)
2573 capture = self.pg1.get_capture(1)
2578 self.assertEqual(ip.src, nat_ip)
2579 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2580 self.assertEqual(tcp.dport, external_port)
2581 port_out0 = tcp.sport
2583 self.logger.error(ppp("Unexpected or invalid packet:", p))
2587 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2588 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2589 TCP(sport=port_in, dport=external_port))
2590 self.pg0.add_stream(p)
2591 self.pg_enable_capture(self.pg_interfaces)
2593 capture = self.pg1.get_capture(1)
2598 self.assertEqual(ip.src, nat_ip)
2599 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2600 self.assertEqual(tcp.dport, external_port)
2601 port_out1 = tcp.sport
2603 self.logger.error(ppp("Unexpected or invalid packet:", p))
2606 dms = self.vapi.snat_det_map_dump()
2607 self.assertEqual(1, len(dms))
2608 self.assertEqual(2, dms[0].ses_num)
2611 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2612 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2613 TCP(sport=external_port, dport=port_out0))
2614 self.pg1.add_stream(p)
2615 self.pg_enable_capture(self.pg_interfaces)
2617 capture = self.pg0.get_capture(1)
2622 self.assertEqual(ip.src, self.pg1.remote_ip4)
2623 self.assertEqual(ip.dst, host0.ip4)
2624 self.assertEqual(tcp.dport, port_in)
2625 self.assertEqual(tcp.sport, external_port)
2627 self.logger.error(ppp("Unexpected or invalid packet:", p))
2631 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2632 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2633 TCP(sport=external_port, dport=port_out1))
2634 self.pg1.add_stream(p)
2635 self.pg_enable_capture(self.pg_interfaces)
2637 capture = self.pg0.get_capture(1)
2642 self.assertEqual(ip.src, self.pg1.remote_ip4)
2643 self.assertEqual(ip.dst, host1.ip4)
2644 self.assertEqual(tcp.dport, port_in)
2645 self.assertEqual(tcp.sport, external_port)
2647 self.logger.error(ppp("Unexpected or invalid packet", p))
2650 # session close api test
2651 self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2653 self.pg1.remote_ip4n,
2655 dms = self.vapi.snat_det_map_dump()
2656 self.assertEqual(dms[0].ses_num, 1)
2658 self.vapi.snat_det_close_session_in(host0.ip4n,
2660 self.pg1.remote_ip4n,
2662 dms = self.vapi.snat_det_map_dump()
2663 self.assertEqual(dms[0].ses_num, 0)
2665 def test_tcp_session_close_detection_in(self):
2666 """ CGNAT TCP session close initiated from inside network """
2667 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2669 socket.inet_aton(self.snat_addr),
2671 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2672 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2675 self.initiate_tcp_session(self.pg0, self.pg1)
2677 # close the session from inside
2679 # FIN packet in -> out
2680 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2681 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2682 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2684 self.pg0.add_stream(p)
2685 self.pg_enable_capture(self.pg_interfaces)
2687 self.pg1.get_capture(1)
2691 # ACK packet out -> in
2692 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2693 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2694 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2698 # FIN packet out -> in
2699 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2700 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2701 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2705 self.pg1.add_stream(pkts)
2706 self.pg_enable_capture(self.pg_interfaces)
2708 self.pg0.get_capture(2)
2710 # ACK packet in -> out
2711 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2712 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2713 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2715 self.pg0.add_stream(p)
2716 self.pg_enable_capture(self.pg_interfaces)
2718 self.pg1.get_capture(1)
2720 # Check if snat closed the session
2721 dms = self.vapi.snat_det_map_dump()
2722 self.assertEqual(0, dms[0].ses_num)
2724 self.logger.error("TCP session termination failed")
2727 def test_tcp_session_close_detection_out(self):
2728 """ CGNAT TCP session close initiated from outside network """
2729 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2731 socket.inet_aton(self.snat_addr),
2733 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2734 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2737 self.initiate_tcp_session(self.pg0, self.pg1)
2739 # close the session from outside
2741 # FIN packet out -> in
2742 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2743 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2744 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2746 self.pg1.add_stream(p)
2747 self.pg_enable_capture(self.pg_interfaces)
2749 self.pg0.get_capture(1)
2753 # ACK packet in -> out
2754 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2755 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2756 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2760 # ACK packet in -> out
2761 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2762 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2763 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2767 self.pg0.add_stream(pkts)
2768 self.pg_enable_capture(self.pg_interfaces)
2770 self.pg1.get_capture(2)
2772 # ACK packet out -> in
2773 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2774 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2775 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2777 self.pg1.add_stream(p)
2778 self.pg_enable_capture(self.pg_interfaces)
2780 self.pg0.get_capture(1)
2782 # Check if snat closed the session
2783 dms = self.vapi.snat_det_map_dump()
2784 self.assertEqual(0, dms[0].ses_num)
2786 self.logger.error("TCP session termination failed")
2789 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2790 def test_session_timeout(self):
2791 """ CGNAT session timeouts """
2792 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2794 socket.inet_aton(self.snat_addr),
2796 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2797 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2800 self.initiate_tcp_session(self.pg0, self.pg1)
2801 self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2802 pkts = self.create_stream_in(self.pg0, self.pg1)
2803 self.pg0.add_stream(pkts)
2804 self.pg_enable_capture(self.pg_interfaces)
2806 capture = self.pg1.get_capture(len(pkts))
2809 dms = self.vapi.snat_det_map_dump()
2810 self.assertEqual(0, dms[0].ses_num)
2812 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2813 def test_session_limit_per_user(self):
2814 """ CGNAT maximum 1000 sessions per user should be created """
2815 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2817 socket.inet_aton(self.snat_addr),
2819 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2820 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2822 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2823 src_address=self.pg2.local_ip4n,
2825 template_interval=10)
2826 self.vapi.snat_ipfix()
2829 for port in range(1025, 2025):
2830 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2831 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2832 UDP(sport=port, dport=port))
2835 self.pg0.add_stream(pkts)
2836 self.pg_enable_capture(self.pg_interfaces)
2838 capture = self.pg1.get_capture(len(pkts))
2840 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2841 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2842 UDP(sport=3001, dport=3002))
2843 self.pg0.add_stream(p)
2844 self.pg_enable_capture(self.pg_interfaces)
2846 capture = self.pg1.assert_nothing_captured()
2848 # verify ICMP error packet
2849 capture = self.pg0.get_capture(1)
2851 self.assertTrue(p.haslayer(ICMP))
2853 self.assertEqual(icmp.type, 3)
2854 self.assertEqual(icmp.code, 1)
2855 self.assertTrue(icmp.haslayer(IPerror))
2856 inner_ip = icmp[IPerror]
2857 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2858 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2860 dms = self.vapi.snat_det_map_dump()
2862 self.assertEqual(1000, dms[0].ses_num)
2864 # verify IPFIX logging
2865 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2867 capture = self.pg2.get_capture(2)
2868 ipfix = IPFIXDecoder()
2869 # first load template
2871 self.assertTrue(p.haslayer(IPFIX))
2872 if p.haslayer(Template):
2873 ipfix.add_template(p.getlayer(Template))
2874 # verify events in data set
2876 if p.haslayer(Data):
2877 data = ipfix.decode_data_set(p.getlayer(Set))
2878 self.verify_ipfix_max_entries_per_user(data)
2880 def clear_snat(self):
2882 Clear SNAT configuration.
2884 self.vapi.snat_ipfix(enable=0)
2885 self.vapi.snat_det_set_timeouts()
2886 deterministic_mappings = self.vapi.snat_det_map_dump()
2887 for dsm in deterministic_mappings:
2888 self.vapi.snat_add_det_map(dsm.in_addr,
2894 interfaces = self.vapi.snat_interface_dump()
2895 for intf in interfaces:
2896 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2901 super(TestDeterministicNAT, self).tearDown()
2902 if not self.vpp_dead:
2903 self.logger.info(self.vapi.cli("show snat detail"))
2907 class TestNAT64(MethodHolder):
2908 """ NAT64 Test Cases """
2911 def setUpClass(cls):
2912 super(TestNAT64, cls).setUpClass()
2915 cls.tcp_port_in = 6303
2916 cls.tcp_port_out = 6303
2917 cls.udp_port_in = 6304
2918 cls.udp_port_out = 6304
2919 cls.icmp_id_in = 6305
2920 cls.icmp_id_out = 6305
2921 cls.nat_addr = '10.0.0.3'
2922 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2924 cls.vrf1_nat_addr = '10.0.10.3'
2925 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2928 cls.create_pg_interfaces(range(3))
2929 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2930 cls.ip6_interfaces.append(cls.pg_interfaces[2])
2931 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2933 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2935 cls.pg0.generate_remote_hosts(2)
2937 for i in cls.ip6_interfaces:
2940 i.configure_ipv6_neighbors()
2942 for i in cls.ip4_interfaces:
2948 super(TestNAT64, cls).tearDownClass()
2951 def test_pool(self):
2952 """ Add/delete address to NAT64 pool """
2953 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2955 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2957 addresses = self.vapi.nat64_pool_addr_dump()
2958 self.assertEqual(len(addresses), 1)
2959 self.assertEqual(addresses[0].address, nat_addr)
2961 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2963 addresses = self.vapi.nat64_pool_addr_dump()
2964 self.assertEqual(len(addresses), 0)
2966 def test_interface(self):
2967 """ Enable/disable NAT64 feature on the interface """
2968 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2969 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2971 interfaces = self.vapi.nat64_interface_dump()
2972 self.assertEqual(len(interfaces), 2)
2975 for intf in interfaces:
2976 if intf.sw_if_index == self.pg0.sw_if_index:
2977 self.assertEqual(intf.is_inside, 1)
2979 elif intf.sw_if_index == self.pg1.sw_if_index:
2980 self.assertEqual(intf.is_inside, 0)
2982 self.assertTrue(pg0_found)
2983 self.assertTrue(pg1_found)
2985 features = self.vapi.cli("show interface features pg0")
2986 self.assertNotEqual(features.find('nat64-in2out'), -1)
2987 features = self.vapi.cli("show interface features pg1")
2988 self.assertNotEqual(features.find('nat64-out2in'), -1)
2990 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2991 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2993 interfaces = self.vapi.nat64_interface_dump()
2994 self.assertEqual(len(interfaces), 0)
2996 def test_static_bib(self):
2997 """ Add/delete static BIB entry """
2998 in_addr = socket.inet_pton(socket.AF_INET6,
2999 '2001:db8:85a3::8a2e:370:7334')
3000 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3003 proto = IP_PROTOS.tcp
3005 self.vapi.nat64_add_del_static_bib(in_addr,
3010 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3015 self.assertEqual(bibe.i_addr, in_addr)
3016 self.assertEqual(bibe.o_addr, out_addr)
3017 self.assertEqual(bibe.i_port, in_port)
3018 self.assertEqual(bibe.o_port, out_port)
3019 self.assertEqual(static_bib_num, 1)
3021 self.vapi.nat64_add_del_static_bib(in_addr,
3027 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3032 self.assertEqual(static_bib_num, 0)
3034 def test_set_timeouts(self):
3035 """ Set NAT64 timeouts """
3036 # verify default values
3037 timeouts = self.vapi.nat64_get_timeouts()
3038 self.assertEqual(timeouts.udp, 300)
3039 self.assertEqual(timeouts.icmp, 60)
3040 self.assertEqual(timeouts.tcp_trans, 240)
3041 self.assertEqual(timeouts.tcp_est, 7440)
3042 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3044 # set and verify custom values
3045 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3046 tcp_est=7450, tcp_incoming_syn=10)
3047 timeouts = self.vapi.nat64_get_timeouts()
3048 self.assertEqual(timeouts.udp, 200)
3049 self.assertEqual(timeouts.icmp, 30)
3050 self.assertEqual(timeouts.tcp_trans, 250)
3051 self.assertEqual(timeouts.tcp_est, 7450)
3052 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3054 def test_dynamic(self):
3055 """ NAT64 dynamic translation test """
3056 self.tcp_port_in = 6303
3057 self.udp_port_in = 6304
3058 self.icmp_id_in = 6305
3060 ses_num_start = self.nat64_get_ses_num()
3062 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3064 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3065 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3068 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3069 self.pg0.add_stream(pkts)
3070 self.pg_enable_capture(self.pg_interfaces)
3072 capture = self.pg1.get_capture(len(pkts))
3073 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3074 dst_ip=self.pg1.remote_ip4)
3077 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3078 self.pg1.add_stream(pkts)
3079 self.pg_enable_capture(self.pg_interfaces)
3081 capture = self.pg0.get_capture(len(pkts))
3082 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3083 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3086 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3087 self.pg0.add_stream(pkts)
3088 self.pg_enable_capture(self.pg_interfaces)
3090 capture = self.pg1.get_capture(len(pkts))
3091 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3092 dst_ip=self.pg1.remote_ip4)
3095 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3096 self.pg1.add_stream(pkts)
3097 self.pg_enable_capture(self.pg_interfaces)
3099 capture = self.pg0.get_capture(len(pkts))
3100 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3102 ses_num_end = self.nat64_get_ses_num()
3104 self.assertEqual(ses_num_end - ses_num_start, 3)
3106 # tenant with specific VRF
3107 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3108 self.vrf1_nat_addr_n,
3109 vrf_id=self.vrf1_id)
3110 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3112 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3113 self.pg2.add_stream(pkts)
3114 self.pg_enable_capture(self.pg_interfaces)
3116 capture = self.pg1.get_capture(len(pkts))
3117 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3118 dst_ip=self.pg1.remote_ip4)
3120 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3121 self.pg1.add_stream(pkts)
3122 self.pg_enable_capture(self.pg_interfaces)
3124 capture = self.pg2.get_capture(len(pkts))
3125 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3127 def test_static(self):
3128 """ NAT64 static translation test """
3129 self.tcp_port_in = 60303
3130 self.udp_port_in = 60304
3131 self.icmp_id_in = 60305
3132 self.tcp_port_out = 60303
3133 self.udp_port_out = 60304
3134 self.icmp_id_out = 60305
3136 ses_num_start = self.nat64_get_ses_num()
3138 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3140 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3141 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3143 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3148 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3153 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3160 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3161 self.pg0.add_stream(pkts)
3162 self.pg_enable_capture(self.pg_interfaces)
3164 capture = self.pg1.get_capture(len(pkts))
3165 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3166 dst_ip=self.pg1.remote_ip4, same_port=True)
3169 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3170 self.pg1.add_stream(pkts)
3171 self.pg_enable_capture(self.pg_interfaces)
3173 capture = self.pg0.get_capture(len(pkts))
3174 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3175 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3177 ses_num_end = self.nat64_get_ses_num()
3179 self.assertEqual(ses_num_end - ses_num_start, 3)
3181 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3182 def test_session_timeout(self):
3183 """ NAT64 session timeout """
3184 self.icmp_id_in = 1234
3185 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3187 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3188 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3189 self.vapi.nat64_set_timeouts(icmp=5)
3191 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3192 self.pg0.add_stream(pkts)
3193 self.pg_enable_capture(self.pg_interfaces)
3195 capture = self.pg1.get_capture(len(pkts))
3197 ses_num_before_timeout = self.nat64_get_ses_num()
3201 # ICMP session after timeout
3202 ses_num_after_timeout = self.nat64_get_ses_num()
3203 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3205 def test_icmp_error(self):
3206 """ NAT64 ICMP Error message translation """
3207 self.tcp_port_in = 6303
3208 self.udp_port_in = 6304
3209 self.icmp_id_in = 6305
3211 ses_num_start = self.nat64_get_ses_num()
3213 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3215 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3216 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3218 # send some packets to create sessions
3219 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3220 self.pg0.add_stream(pkts)
3221 self.pg_enable_capture(self.pg_interfaces)
3223 capture_ip4 = self.pg1.get_capture(len(pkts))
3224 self.verify_capture_out(capture_ip4,
3225 nat_ip=self.nat_addr,
3226 dst_ip=self.pg1.remote_ip4)
3228 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3229 self.pg1.add_stream(pkts)
3230 self.pg_enable_capture(self.pg_interfaces)
3232 capture_ip6 = self.pg0.get_capture(len(pkts))
3233 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3234 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3235 self.pg0.remote_ip6)
3238 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3239 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3240 ICMPv6DestUnreach(code=1) /
3241 packet[IPv6] for packet in capture_ip6]
3242 self.pg0.add_stream(pkts)
3243 self.pg_enable_capture(self.pg_interfaces)
3245 capture = self.pg1.get_capture(len(pkts))
3246 for packet in capture:
3248 self.assertEqual(packet[IP].src, self.nat_addr)
3249 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3250 self.assertEqual(packet[ICMP].type, 3)
3251 self.assertEqual(packet[ICMP].code, 13)
3252 inner = packet[IPerror]
3253 self.assertEqual(inner.src, self.pg1.remote_ip4)
3254 self.assertEqual(inner.dst, self.nat_addr)
3255 self.check_icmp_checksum(packet)
3256 if inner.haslayer(TCPerror):
3257 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3258 elif inner.haslayer(UDPerror):
3259 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3261 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3263 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3267 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3268 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3269 ICMP(type=3, code=13) /
3270 packet[IP] for packet in capture_ip4]
3271 self.pg1.add_stream(pkts)
3272 self.pg_enable_capture(self.pg_interfaces)
3274 capture = self.pg0.get_capture(len(pkts))
3275 for packet in capture:
3277 self.assertEqual(packet[IPv6].src, ip.src)
3278 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3279 icmp = packet[ICMPv6DestUnreach]
3280 self.assertEqual(icmp.code, 1)
3281 inner = icmp[IPerror6]
3282 self.assertEqual(inner.src, self.pg0.remote_ip6)
3283 self.assertEqual(inner.dst, ip.src)
3284 self.check_icmpv6_checksum(packet)
3285 if inner.haslayer(TCPerror):
3286 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3287 elif inner.haslayer(UDPerror):
3288 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3290 self.assertEqual(inner[ICMPv6EchoRequest].id,
3293 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3296 def test_hairpinning(self):
3297 """ NAT64 hairpinning """
3299 client = self.pg0.remote_hosts[0]
3300 server = self.pg0.remote_hosts[1]
3301 server_tcp_in_port = 22
3302 server_tcp_out_port = 4022
3303 server_udp_in_port = 23
3304 server_udp_out_port = 4023
3305 client_tcp_in_port = 1234
3306 client_udp_in_port = 1235
3307 client_tcp_out_port = 0
3308 client_udp_out_port = 0
3309 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3310 nat_addr_ip6 = ip.src
3312 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3314 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3315 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3317 self.vapi.nat64_add_del_static_bib(server.ip6n,
3320 server_tcp_out_port,
3322 self.vapi.nat64_add_del_static_bib(server.ip6n,
3325 server_udp_out_port,
3330 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3331 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3332 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3334 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3335 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3336 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3338 self.pg0.add_stream(pkts)
3339 self.pg_enable_capture(self.pg_interfaces)
3341 capture = self.pg0.get_capture(len(pkts))
3342 for packet in capture:
3344 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3345 self.assertEqual(packet[IPv6].dst, server.ip6)
3346 if packet.haslayer(TCP):
3347 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3348 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3349 self.check_tcp_checksum(packet)
3350 client_tcp_out_port = packet[TCP].sport
3352 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3353 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3354 self.check_udp_checksum(packet)
3355 client_udp_out_port = packet[UDP].sport
3357 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3362 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3363 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3364 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3366 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3367 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3368 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3370 self.pg0.add_stream(pkts)
3371 self.pg_enable_capture(self.pg_interfaces)
3373 capture = self.pg0.get_capture(len(pkts))
3374 for packet in capture:
3376 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3377 self.assertEqual(packet[IPv6].dst, client.ip6)
3378 if packet.haslayer(TCP):
3379 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3380 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3381 self.check_tcp_checksum(packet)
3383 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3384 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3385 self.check_udp_checksum(packet)
3387 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3392 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3393 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3394 ICMPv6DestUnreach(code=1) /
3395 packet[IPv6] for packet in capture]
3396 self.pg0.add_stream(pkts)
3397 self.pg_enable_capture(self.pg_interfaces)
3399 capture = self.pg0.get_capture(len(pkts))
3400 for packet in capture:
3402 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3403 self.assertEqual(packet[IPv6].dst, server.ip6)
3404 icmp = packet[ICMPv6DestUnreach]
3405 self.assertEqual(icmp.code, 1)
3406 inner = icmp[IPerror6]
3407 self.assertEqual(inner.src, server.ip6)
3408 self.assertEqual(inner.dst, nat_addr_ip6)
3409 self.check_icmpv6_checksum(packet)
3410 if inner.haslayer(TCPerror):
3411 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3412 self.assertEqual(inner[TCPerror].dport,
3413 client_tcp_out_port)
3415 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3416 self.assertEqual(inner[UDPerror].dport,
3417 client_udp_out_port)
3419 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3422 def test_prefix(self):
3423 """ NAT64 Network-Specific Prefix """
3425 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3427 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3428 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3429 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3430 self.vrf1_nat_addr_n,
3431 vrf_id=self.vrf1_id)
3432 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3435 global_pref64 = "2001:db8::"
3436 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3437 global_pref64_len = 32
3438 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3440 prefix = self.vapi.nat64_prefix_dump()
3441 self.assertEqual(len(prefix), 1)
3442 self.assertEqual(prefix[0].prefix, global_pref64_n)
3443 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3444 self.assertEqual(prefix[0].vrf_id, 0)
3446 # Add tenant specific prefix
3447 vrf1_pref64 = "2001:db8:122:300::"
3448 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3449 vrf1_pref64_len = 56
3450 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3452 vrf_id=self.vrf1_id)
3453 prefix = self.vapi.nat64_prefix_dump()
3454 self.assertEqual(len(prefix), 2)
3457 pkts = self.create_stream_in_ip6(self.pg0,
3460 plen=global_pref64_len)
3461 self.pg0.add_stream(pkts)
3462 self.pg_enable_capture(self.pg_interfaces)
3464 capture = self.pg1.get_capture(len(pkts))
3465 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3466 dst_ip=self.pg1.remote_ip4)
3468 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3469 self.pg1.add_stream(pkts)
3470 self.pg_enable_capture(self.pg_interfaces)
3472 capture = self.pg0.get_capture(len(pkts))
3473 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3476 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3478 # Tenant specific prefix
3479 pkts = self.create_stream_in_ip6(self.pg2,
3482 plen=vrf1_pref64_len)
3483 self.pg2.add_stream(pkts)
3484 self.pg_enable_capture(self.pg_interfaces)
3486 capture = self.pg1.get_capture(len(pkts))
3487 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3488 dst_ip=self.pg1.remote_ip4)
3490 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3491 self.pg1.add_stream(pkts)
3492 self.pg_enable_capture(self.pg_interfaces)
3494 capture = self.pg2.get_capture(len(pkts))
3495 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3498 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3500 def _test_unknown_proto(self):
3501 """ NAT64 translate packet with unknown protocol """
3503 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3505 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3506 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3507 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3510 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3511 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3512 TCP(sport=self.tcp_port_in, dport=20))
3513 self.pg0.add_stream(p)
3514 self.pg_enable_capture(self.pg_interfaces)
3516 p = self.pg1.get_capture(1)
3518 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3519 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3521 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3522 TCP(sport=1234, dport=1234))
3523 self.pg0.add_stream(p)
3524 self.pg_enable_capture(self.pg_interfaces)
3526 p = self.pg1.get_capture(1)
3529 self.assertEqual(packet[IP].src, self.nat_addr)
3530 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3531 self.assertTrue(packet.haslayer(GRE))
3532 self.check_ip_checksum(packet)
3534 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3538 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3539 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3541 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3542 TCP(sport=1234, dport=1234))
3543 self.pg1.add_stream(p)
3544 self.pg_enable_capture(self.pg_interfaces)
3546 p = self.pg0.get_capture(1)
3549 self.assertEqual(packet[IPv6].src, remote_ip6)
3550 self.assertEqual(packet[IPv6].dst, self.pgi0.remote_ip6)
3551 self.assertTrue(packet.haslayer(GRE))
3553 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3556 def _test_hairpinning_unknown_proto(self):
3557 """ NAT64 translate packet with unknown protocol - hairpinning """
3559 client = self.pg0.remote_hosts[0]
3560 server = self.pg0.remote_hosts[1]
3561 server_tcp_in_port = 22
3562 server_tcp_out_port = 4022
3563 client_tcp_in_port = 1234
3564 client_udp_in_port = 1235
3565 nat_addr_ip6 = self.compose_ip6(self.nat_addr, '64:ff9b::', 96)
3567 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3569 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3570 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3572 self.vapi.nat64_add_del_static_bib(server.ip6n,
3575 server_tcp_out_port,
3579 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3580 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3581 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3582 self.pg0.add_stream(p)
3583 self.pg_enable_capture(self.pg_interfaces)
3585 p = self.pg0.get_capture(1)
3587 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3588 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3590 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3591 TCP(sport=1234, dport=1234))
3592 self.pg0.add_stream(p)
3593 self.pg_enable_capture(self.pg_interfaces)
3595 p = self.pg0.get_capture(1)
3598 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3599 self.assertEqual(packet[IPv6].dst, server.ip6)
3600 self.assertTrue(packet.haslayer(GRE))
3602 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3606 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3607 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3609 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3610 TCP(sport=1234, dport=1234))
3611 self.pg0.add_stream(p)
3612 self.pg_enable_capture(self.pg_interfaces)
3614 p = self.pg0.get_capture(1)
3617 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3618 self.assertEqual(packet[IPv6].dst, client.ip6)
3619 self.assertTrue(packet.haslayer(GRE))
3621 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3624 def nat64_get_ses_num(self):
3626 Return number of active NAT64 sessions.
3629 st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
3631 st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
3633 st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
3637 def clear_nat64(self):
3639 Clear NAT64 configuration.
3641 self.vapi.nat64_set_timeouts()
3643 interfaces = self.vapi.nat64_interface_dump()
3644 for intf in interfaces:
3645 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3649 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3652 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3660 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3663 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3671 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3674 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3682 adresses = self.vapi.nat64_pool_addr_dump()
3683 for addr in adresses:
3684 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3689 prefixes = self.vapi.nat64_prefix_dump()
3690 for prefix in prefixes:
3691 self.vapi.nat64_add_del_prefix(prefix.prefix,
3693 vrf_id=prefix.vrf_id,
3697 super(TestNAT64, self).tearDown()
3698 if not self.vpp_dead:
3699 self.logger.info(self.vapi.cli("show nat64 pool"))
3700 self.logger.info(self.vapi.cli("show nat64 interfaces"))
3701 self.logger.info(self.vapi.cli("show nat64 prefix"))
3702 self.logger.info(self.vapi.cli("show nat64 bib tcp"))
3703 self.logger.info(self.vapi.cli("show nat64 bib udp"))
3704 self.logger.info(self.vapi.cli("show nat64 bib icmp"))
3705 self.logger.info(self.vapi.cli("show nat64 session table tcp"))
3706 self.logger.info(self.vapi.cli("show nat64 session table udp"))
3707 self.logger.info(self.vapi.cli("show nat64 session table icmp"))
3710 if __name__ == '__main__':
3711 unittest.main(testRunner=VppTestRunner)