7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
20 class MethodHolder(VppTestCase):
21 """ SNAT create capture and verify method holder """
25 super(MethodHolder, cls).setUpClass()
28 super(MethodHolder, self).tearDown()
30 def check_ip_checksum(self, pkt):
32 Check IP checksum of the packet
34 :param pkt: Packet to check IP checksum
36 new = pkt.__class__(str(pkt))
38 new = new.__class__(str(new))
39 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
41 def check_tcp_checksum(self, pkt):
43 Check TCP checksum in IP packet
45 :param pkt: Packet to check TCP checksum
47 new = pkt.__class__(str(pkt))
49 new = new.__class__(str(new))
50 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
52 def check_udp_checksum(self, pkt):
54 Check UDP checksum in IP packet
56 :param pkt: Packet to check UDP checksum
58 new = pkt.__class__(str(pkt))
60 new = new.__class__(str(new))
61 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
63 def check_icmp_errror_embedded(self, pkt):
65 Check ICMP error embeded packet checksum
67 :param pkt: Packet to check ICMP error embeded packet checksum
69 if pkt.haslayer(IPerror):
70 new = pkt.__class__(str(pkt))
71 del new['IPerror'].chksum
72 new = new.__class__(str(new))
73 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
75 if pkt.haslayer(TCPerror):
76 new = pkt.__class__(str(pkt))
77 del new['TCPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
81 if pkt.haslayer(UDPerror):
82 if pkt['UDPerror'].chksum != 0:
83 new = pkt.__class__(str(pkt))
84 del new['UDPerror'].chksum
85 new = new.__class__(str(new))
86 self.assertEqual(new['UDPerror'].chksum,
87 pkt['UDPerror'].chksum)
89 if pkt.haslayer(ICMPerror):
90 del new['ICMPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
94 def check_icmp_checksum(self, pkt):
96 Check ICMP checksum in IPv4 packet
98 :param pkt: Packet to check ICMP checksum
100 new = pkt.__class__(str(pkt))
101 del new['ICMP'].chksum
102 new = new.__class__(str(new))
103 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104 if pkt.haslayer(IPerror):
105 self.check_icmp_errror_embedded(pkt)
107 def check_icmpv6_checksum(self, pkt):
109 Check ICMPv6 checksum in IPv4 packet
111 :param pkt: Packet to check ICMPv6 checksum
113 new = pkt.__class__(str(pkt))
114 if pkt.haslayer(ICMPv6DestUnreach):
115 del new['ICMPv6DestUnreach'].cksum
116 new = new.__class__(str(new))
117 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118 pkt['ICMPv6DestUnreach'].cksum)
119 self.check_icmp_errror_embedded(pkt)
120 if pkt.haslayer(ICMPv6EchoRequest):
121 del new['ICMPv6EchoRequest'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124 pkt['ICMPv6EchoRequest'].cksum)
125 if pkt.haslayer(ICMPv6EchoReply):
126 del new['ICMPv6EchoReply'].cksum
127 new = new.__class__(str(new))
128 self.assertEqual(new['ICMPv6EchoReply'].cksum,
129 pkt['ICMPv6EchoReply'].cksum)
131 def create_stream_in(self, in_if, out_if, ttl=64):
133 Create packet stream for inside network
135 :param in_if: Inside interface
136 :param out_if: Outside interface
137 :param ttl: TTL of generated packets
141 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143 TCP(sport=self.tcp_port_in, dport=20))
147 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149 UDP(sport=self.udp_port_in, dport=20))
153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155 ICMP(id=self.icmp_id_in, type='echo-request'))
160 def compose_ip6(self, ip4, pref, plen):
162 Compose IPv4-embedded IPv6 addresses
164 :param ip4: IPv4 address
165 :param pref: IPv6 prefix
166 :param plen: IPv6 prefix length
167 :returns: IPv4-embedded IPv6 addresses
169 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
170 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
185 pref_n[10] = ip4_n[3]
189 pref_n[10] = ip4_n[2]
190 pref_n[11] = ip4_n[3]
193 pref_n[10] = ip4_n[1]
194 pref_n[11] = ip4_n[2]
195 pref_n[12] = ip4_n[3]
197 pref_n[12] = ip4_n[0]
198 pref_n[13] = ip4_n[1]
199 pref_n[14] = ip4_n[2]
200 pref_n[15] = ip4_n[3]
201 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
203 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
205 Create IPv6 packet stream for inside network
207 :param in_if: Inside interface
208 :param out_if: Outside interface
209 :param ttl: Hop Limit of generated packets
210 :param pref: NAT64 prefix
211 :param plen: NAT64 prefix length
215 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
217 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
220 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
221 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
222 TCP(sport=self.tcp_port_in, dport=20))
226 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228 UDP(sport=self.udp_port_in, dport=20))
232 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234 ICMPv6EchoRequest(id=self.icmp_id_in))
239 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
241 Create packet stream for outside network
243 :param out_if: Outside interface
244 :param dst_ip: Destination IP address (Default use global SNAT address)
245 :param ttl: TTL of generated packets
248 dst_ip = self.snat_addr
251 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
252 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
253 TCP(dport=self.tcp_port_out, sport=20))
257 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259 UDP(dport=self.udp_port_out, sport=20))
263 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265 ICMP(id=self.icmp_id_out, type='echo-reply'))
270 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
271 packet_num=3, dst_ip=None):
273 Verify captured packets on outside network
275 :param capture: Captured packets
276 :param nat_ip: Translated IP address (Default use global SNAT address)
277 :param same_port: Sorce port number is not translated (Default False)
278 :param packet_num: Expected number of packets (Default 3)
279 :param dst_ip: Destination IP address (Default do not verify)
282 nat_ip = self.snat_addr
283 self.assertEqual(packet_num, len(capture))
284 for packet in capture:
286 self.check_ip_checksum(packet)
287 self.assertEqual(packet[IP].src, nat_ip)
288 if dst_ip is not None:
289 self.assertEqual(packet[IP].dst, dst_ip)
290 if packet.haslayer(TCP):
292 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
295 packet[TCP].sport, self.tcp_port_in)
296 self.tcp_port_out = packet[TCP].sport
297 self.check_tcp_checksum(packet)
298 elif packet.haslayer(UDP):
300 self.assertEqual(packet[UDP].sport, self.udp_port_in)
303 packet[UDP].sport, self.udp_port_in)
304 self.udp_port_out = packet[UDP].sport
307 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
309 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
310 self.icmp_id_out = packet[ICMP].id
311 self.check_icmp_checksum(packet)
313 self.logger.error(ppp("Unexpected or invalid packet "
314 "(outside network):", packet))
317 def verify_capture_in(self, capture, in_if, packet_num=3):
319 Verify captured packets on inside network
321 :param capture: Captured packets
322 :param in_if: Inside interface
323 :param packet_num: Expected number of packets (Default 3)
325 self.assertEqual(packet_num, len(capture))
326 for packet in capture:
328 self.check_ip_checksum(packet)
329 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
330 if packet.haslayer(TCP):
331 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
332 self.check_tcp_checksum(packet)
333 elif packet.haslayer(UDP):
334 self.assertEqual(packet[UDP].dport, self.udp_port_in)
336 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
337 self.check_icmp_checksum(packet)
339 self.logger.error(ppp("Unexpected or invalid packet "
340 "(inside network):", packet))
343 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
345 Verify captured IPv6 packets on inside network
347 :param capture: Captured packets
348 :param src_ip: Source IP
349 :param dst_ip: Destination IP address
350 :param packet_num: Expected number of packets (Default 3)
352 self.assertEqual(packet_num, len(capture))
353 for packet in capture:
355 self.assertEqual(packet[IPv6].src, src_ip)
356 self.assertEqual(packet[IPv6].dst, dst_ip)
357 if packet.haslayer(TCP):
358 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
359 self.check_tcp_checksum(packet)
360 elif packet.haslayer(UDP):
361 self.assertEqual(packet[UDP].dport, self.udp_port_in)
362 self.check_udp_checksum(packet)
364 self.assertEqual(packet[ICMPv6EchoReply].id,
366 self.check_icmpv6_checksum(packet)
368 self.logger.error(ppp("Unexpected or invalid packet "
369 "(inside network):", packet))
372 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
374 Verify captured packet that don't have to be translated
376 :param capture: Captured packets
377 :param ingress_if: Ingress interface
378 :param egress_if: Egress interface
380 for packet in capture:
382 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
383 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
384 if packet.haslayer(TCP):
385 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
386 elif packet.haslayer(UDP):
387 self.assertEqual(packet[UDP].sport, self.udp_port_in)
389 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
391 self.logger.error(ppp("Unexpected or invalid packet "
392 "(inside network):", packet))
395 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
396 packet_num=3, icmp_type=11):
398 Verify captured packets with ICMP errors on outside network
400 :param capture: Captured packets
401 :param src_ip: Translated IP address or IP address of VPP
402 (Default use global SNAT address)
403 :param packet_num: Expected number of packets (Default 3)
404 :param icmp_type: Type of error ICMP packet
405 we are expecting (Default 11)
408 src_ip = self.snat_addr
409 self.assertEqual(packet_num, len(capture))
410 for packet in capture:
412 self.assertEqual(packet[IP].src, src_ip)
413 self.assertTrue(packet.haslayer(ICMP))
415 self.assertEqual(icmp.type, icmp_type)
416 self.assertTrue(icmp.haslayer(IPerror))
417 inner_ip = icmp[IPerror]
418 if inner_ip.haslayer(TCPerror):
419 self.assertEqual(inner_ip[TCPerror].dport,
421 elif inner_ip.haslayer(UDPerror):
422 self.assertEqual(inner_ip[UDPerror].dport,
425 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
427 self.logger.error(ppp("Unexpected or invalid packet "
428 "(outside network):", packet))
431 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
434 Verify captured packets with ICMP errors on inside network
436 :param capture: Captured packets
437 :param in_if: Inside interface
438 :param packet_num: Expected number of packets (Default 3)
439 :param icmp_type: Type of error ICMP packet
440 we are expecting (Default 11)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
446 self.assertTrue(packet.haslayer(ICMP))
448 self.assertEqual(icmp.type, icmp_type)
449 self.assertTrue(icmp.haslayer(IPerror))
450 inner_ip = icmp[IPerror]
451 if inner_ip.haslayer(TCPerror):
452 self.assertEqual(inner_ip[TCPerror].sport,
454 elif inner_ip.haslayer(UDPerror):
455 self.assertEqual(inner_ip[UDPerror].sport,
458 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
460 self.logger.error(ppp("Unexpected or invalid packet "
461 "(inside network):", packet))
464 def verify_ipfix_nat44_ses(self, data):
466 Verify IPFIX NAT44 session create/delete event
468 :param data: Decoded IPFIX data records
470 nat44_ses_create_num = 0
471 nat44_ses_delete_num = 0
472 self.assertEqual(6, len(data))
475 self.assertIn(ord(record[230]), [4, 5])
476 if ord(record[230]) == 4:
477 nat44_ses_create_num += 1
479 nat44_ses_delete_num += 1
481 self.assertEqual(self.pg0.remote_ip4n, record[8])
482 # postNATSourceIPv4Address
483 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
486 self.assertEqual(struct.pack("!I", 0), record[234])
487 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
488 if IP_PROTOS.icmp == ord(record[4]):
489 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
490 self.assertEqual(struct.pack("!H", self.icmp_id_out),
492 elif IP_PROTOS.tcp == ord(record[4]):
493 self.assertEqual(struct.pack("!H", self.tcp_port_in),
495 self.assertEqual(struct.pack("!H", self.tcp_port_out),
497 elif IP_PROTOS.udp == ord(record[4]):
498 self.assertEqual(struct.pack("!H", self.udp_port_in),
500 self.assertEqual(struct.pack("!H", self.udp_port_out),
503 self.fail("Invalid protocol")
504 self.assertEqual(3, nat44_ses_create_num)
505 self.assertEqual(3, nat44_ses_delete_num)
507 def verify_ipfix_addr_exhausted(self, data):
509 Verify IPFIX NAT addresses event
511 :param data: Decoded IPFIX data records
513 self.assertEqual(1, len(data))
516 self.assertEqual(ord(record[230]), 3)
518 self.assertEqual(struct.pack("!I", 0), record[283])
521 class TestSNAT(MethodHolder):
522 """ SNAT Test Cases """
526 super(TestSNAT, cls).setUpClass()
529 cls.tcp_port_in = 6303
530 cls.tcp_port_out = 6303
531 cls.udp_port_in = 6304
532 cls.udp_port_out = 6304
533 cls.icmp_id_in = 6305
534 cls.icmp_id_out = 6305
535 cls.snat_addr = '10.0.0.3'
536 cls.ipfix_src_port = 4739
537 cls.ipfix_domain_id = 1
539 cls.create_pg_interfaces(range(9))
540 cls.interfaces = list(cls.pg_interfaces[0:4])
542 for i in cls.interfaces:
547 cls.pg0.generate_remote_hosts(3)
548 cls.pg0.configure_ipv4_neighbors()
550 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
552 cls.pg4._local_ip4 = "172.16.255.1"
553 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
554 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
555 cls.pg4.set_table_ip4(10)
556 cls.pg5._local_ip4 = "172.16.255.3"
557 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
558 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
559 cls.pg5.set_table_ip4(10)
560 cls.pg6._local_ip4 = "172.16.255.1"
561 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
562 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
563 cls.pg6.set_table_ip4(20)
564 for i in cls.overlapping_interfaces:
573 super(TestSNAT, cls).tearDownClass()
576 def clear_snat(self):
578 Clear SNAT configuration.
580 # I found no elegant way to do this
581 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
582 dst_address_length=32,
583 next_hop_address=self.pg7.remote_ip4n,
584 next_hop_sw_if_index=self.pg7.sw_if_index,
586 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
587 dst_address_length=32,
588 next_hop_address=self.pg8.remote_ip4n,
589 next_hop_sw_if_index=self.pg8.sw_if_index,
592 for intf in [self.pg7, self.pg8]:
593 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
595 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
600 if self.pg7.has_ip4_config:
601 self.pg7.unconfig_ip4()
603 interfaces = self.vapi.snat_interface_addr_dump()
604 for intf in interfaces:
605 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
607 self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
608 domain_id=self.ipfix_domain_id)
609 self.ipfix_src_port = 4739
610 self.ipfix_domain_id = 1
612 interfaces = self.vapi.snat_interface_dump()
613 for intf in interfaces:
614 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
618 static_mappings = self.vapi.snat_static_mapping_dump()
619 for sm in static_mappings:
620 self.vapi.snat_add_static_mapping(sm.local_ip_address,
621 sm.external_ip_address,
622 local_port=sm.local_port,
623 external_port=sm.external_port,
624 addr_only=sm.addr_only,
626 protocol=sm.protocol,
629 adresses = self.vapi.snat_address_dump()
630 for addr in adresses:
631 self.vapi.snat_add_address_range(addr.ip_address,
635 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
636 local_port=0, external_port=0, vrf_id=0,
637 is_add=1, external_sw_if_index=0xFFFFFFFF,
640 Add/delete S-NAT static mapping
642 :param local_ip: Local IP address
643 :param external_ip: External IP address
644 :param local_port: Local port number (Optional)
645 :param external_port: External port number (Optional)
646 :param vrf_id: VRF ID (Default 0)
647 :param is_add: 1 if add, 0 if delete (Default add)
648 :param external_sw_if_index: External interface instead of IP address
649 :param proto: IP protocol (Mandatory if port specified)
652 if local_port and external_port:
654 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
655 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
656 self.vapi.snat_add_static_mapping(
659 external_sw_if_index,
667 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
669 Add/delete S-NAT address
671 :param ip: IP address
672 :param is_add: 1 if add, 0 if delete (Default add)
674 snat_addr = socket.inet_pton(socket.AF_INET, ip)
675 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
678 def test_dynamic(self):
679 """ SNAT dynamic translation test """
681 self.snat_add_address(self.snat_addr)
682 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
683 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
687 pkts = self.create_stream_in(self.pg0, self.pg1)
688 self.pg0.add_stream(pkts)
689 self.pg_enable_capture(self.pg_interfaces)
691 capture = self.pg1.get_capture(len(pkts))
692 self.verify_capture_out(capture)
695 pkts = self.create_stream_out(self.pg1)
696 self.pg1.add_stream(pkts)
697 self.pg_enable_capture(self.pg_interfaces)
699 capture = self.pg0.get_capture(len(pkts))
700 self.verify_capture_in(capture, self.pg0)
702 def test_dynamic_icmp_errors_in2out_ttl_1(self):
703 """ SNAT handling of client packets with TTL=1 """
705 self.snat_add_address(self.snat_addr)
706 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
707 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
710 # Client side - generate traffic
711 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
712 self.pg0.add_stream(pkts)
713 self.pg_enable_capture(self.pg_interfaces)
716 # Client side - verify ICMP type 11 packets
717 capture = self.pg0.get_capture(len(pkts))
718 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
720 def test_dynamic_icmp_errors_out2in_ttl_1(self):
721 """ SNAT handling of server packets with TTL=1 """
723 self.snat_add_address(self.snat_addr)
724 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
725 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
728 # Client side - create sessions
729 pkts = self.create_stream_in(self.pg0, self.pg1)
730 self.pg0.add_stream(pkts)
731 self.pg_enable_capture(self.pg_interfaces)
734 # Server side - generate traffic
735 capture = self.pg1.get_capture(len(pkts))
736 self.verify_capture_out(capture)
737 pkts = self.create_stream_out(self.pg1, ttl=1)
738 self.pg1.add_stream(pkts)
739 self.pg_enable_capture(self.pg_interfaces)
742 # Server side - verify ICMP type 11 packets
743 capture = self.pg1.get_capture(len(pkts))
744 self.verify_capture_out_with_icmp_errors(capture,
745 src_ip=self.pg1.local_ip4)
747 def test_dynamic_icmp_errors_in2out_ttl_2(self):
748 """ SNAT handling of error responses to client packets with TTL=2 """
750 self.snat_add_address(self.snat_addr)
751 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
752 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
755 # Client side - generate traffic
756 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
757 self.pg0.add_stream(pkts)
758 self.pg_enable_capture(self.pg_interfaces)
761 # Server side - simulate ICMP type 11 response
762 capture = self.pg1.get_capture(len(pkts))
763 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
764 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
765 ICMP(type=11) / packet[IP] for packet in capture]
766 self.pg1.add_stream(pkts)
767 self.pg_enable_capture(self.pg_interfaces)
770 # Client side - verify ICMP type 11 packets
771 capture = self.pg0.get_capture(len(pkts))
772 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
774 def test_dynamic_icmp_errors_out2in_ttl_2(self):
775 """ SNAT handling of error responses to server packets with TTL=2 """
777 self.snat_add_address(self.snat_addr)
778 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
779 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
782 # Client side - create sessions
783 pkts = self.create_stream_in(self.pg0, self.pg1)
784 self.pg0.add_stream(pkts)
785 self.pg_enable_capture(self.pg_interfaces)
788 # Server side - generate traffic
789 capture = self.pg1.get_capture(len(pkts))
790 self.verify_capture_out(capture)
791 pkts = self.create_stream_out(self.pg1, ttl=2)
792 self.pg1.add_stream(pkts)
793 self.pg_enable_capture(self.pg_interfaces)
796 # Client side - simulate ICMP type 11 response
797 capture = self.pg0.get_capture(len(pkts))
798 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
799 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
800 ICMP(type=11) / packet[IP] for packet in capture]
801 self.pg0.add_stream(pkts)
802 self.pg_enable_capture(self.pg_interfaces)
805 # Server side - verify ICMP type 11 packets
806 capture = self.pg1.get_capture(len(pkts))
807 self.verify_capture_out_with_icmp_errors(capture)
809 def test_ping_out_interface_from_outside(self):
810 """ Ping SNAT out interface from outside network """
812 self.snat_add_address(self.snat_addr)
813 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
814 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
817 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
818 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
819 ICMP(id=self.icmp_id_out, type='echo-request'))
821 self.pg1.add_stream(pkts)
822 self.pg_enable_capture(self.pg_interfaces)
824 capture = self.pg1.get_capture(len(pkts))
825 self.assertEqual(1, len(capture))
828 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
829 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
830 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
831 self.assertEqual(packet[ICMP].type, 0) # echo reply
833 self.logger.error(ppp("Unexpected or invalid packet "
834 "(outside network):", packet))
837 def test_ping_internal_host_from_outside(self):
838 """ Ping internal host from outside network """
840 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
841 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
842 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
846 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
847 IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
848 ICMP(id=self.icmp_id_out, type='echo-request'))
849 self.pg1.add_stream(pkt)
850 self.pg_enable_capture(self.pg_interfaces)
852 capture = self.pg0.get_capture(1)
853 self.verify_capture_in(capture, self.pg0, packet_num=1)
854 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
857 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
858 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
859 ICMP(id=self.icmp_id_in, type='echo-reply'))
860 self.pg0.add_stream(pkt)
861 self.pg_enable_capture(self.pg_interfaces)
863 capture = self.pg1.get_capture(1)
864 self.verify_capture_out(capture, same_port=True, packet_num=1)
865 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
867 def test_static_in(self):
868 """ SNAT 1:1 NAT initialized from inside network """
871 self.tcp_port_out = 6303
872 self.udp_port_out = 6304
873 self.icmp_id_out = 6305
875 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
876 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
877 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
881 pkts = self.create_stream_in(self.pg0, self.pg1)
882 self.pg0.add_stream(pkts)
883 self.pg_enable_capture(self.pg_interfaces)
885 capture = self.pg1.get_capture(len(pkts))
886 self.verify_capture_out(capture, nat_ip, True)
889 pkts = self.create_stream_out(self.pg1, nat_ip)
890 self.pg1.add_stream(pkts)
891 self.pg_enable_capture(self.pg_interfaces)
893 capture = self.pg0.get_capture(len(pkts))
894 self.verify_capture_in(capture, self.pg0)
896 def test_static_out(self):
897 """ SNAT 1:1 NAT initialized from outside network """
900 self.tcp_port_out = 6303
901 self.udp_port_out = 6304
902 self.icmp_id_out = 6305
904 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
905 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
906 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
910 pkts = self.create_stream_out(self.pg1, nat_ip)
911 self.pg1.add_stream(pkts)
912 self.pg_enable_capture(self.pg_interfaces)
914 capture = self.pg0.get_capture(len(pkts))
915 self.verify_capture_in(capture, self.pg0)
918 pkts = self.create_stream_in(self.pg0, self.pg1)
919 self.pg0.add_stream(pkts)
920 self.pg_enable_capture(self.pg_interfaces)
922 capture = self.pg1.get_capture(len(pkts))
923 self.verify_capture_out(capture, nat_ip, True)
925 def test_static_with_port_in(self):
926 """ SNAT 1:1 NAT with port initialized from inside network """
928 self.tcp_port_out = 3606
929 self.udp_port_out = 3607
930 self.icmp_id_out = 3608
932 self.snat_add_address(self.snat_addr)
933 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
934 self.tcp_port_in, self.tcp_port_out,
936 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
937 self.udp_port_in, self.udp_port_out,
939 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
940 self.icmp_id_in, self.icmp_id_out,
941 proto=IP_PROTOS.icmp)
942 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
943 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
947 pkts = self.create_stream_in(self.pg0, self.pg1)
948 self.pg0.add_stream(pkts)
949 self.pg_enable_capture(self.pg_interfaces)
951 capture = self.pg1.get_capture(len(pkts))
952 self.verify_capture_out(capture)
955 pkts = self.create_stream_out(self.pg1)
956 self.pg1.add_stream(pkts)
957 self.pg_enable_capture(self.pg_interfaces)
959 capture = self.pg0.get_capture(len(pkts))
960 self.verify_capture_in(capture, self.pg0)
962 def test_static_with_port_out(self):
963 """ SNAT 1:1 NAT with port initialized from outside network """
965 self.tcp_port_out = 30606
966 self.udp_port_out = 30607
967 self.icmp_id_out = 30608
969 self.snat_add_address(self.snat_addr)
970 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
971 self.tcp_port_in, self.tcp_port_out,
973 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
974 self.udp_port_in, self.udp_port_out,
976 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
977 self.icmp_id_in, self.icmp_id_out,
978 proto=IP_PROTOS.icmp)
979 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
980 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
984 pkts = self.create_stream_out(self.pg1)
985 self.pg1.add_stream(pkts)
986 self.pg_enable_capture(self.pg_interfaces)
988 capture = self.pg0.get_capture(len(pkts))
989 self.verify_capture_in(capture, self.pg0)
992 pkts = self.create_stream_in(self.pg0, self.pg1)
993 self.pg0.add_stream(pkts)
994 self.pg_enable_capture(self.pg_interfaces)
996 capture = self.pg1.get_capture(len(pkts))
997 self.verify_capture_out(capture)
999 def test_static_vrf_aware(self):
1000 """ SNAT 1:1 NAT VRF awareness """
1002 nat_ip1 = "10.0.0.30"
1003 nat_ip2 = "10.0.0.40"
1004 self.tcp_port_out = 6303
1005 self.udp_port_out = 6304
1006 self.icmp_id_out = 6305
1008 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1010 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1012 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1014 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1015 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1017 # inside interface VRF match SNAT static mapping VRF
1018 pkts = self.create_stream_in(self.pg4, self.pg3)
1019 self.pg4.add_stream(pkts)
1020 self.pg_enable_capture(self.pg_interfaces)
1022 capture = self.pg3.get_capture(len(pkts))
1023 self.verify_capture_out(capture, nat_ip1, True)
1025 # inside interface VRF don't match SNAT static mapping VRF (packets
1027 pkts = self.create_stream_in(self.pg0, self.pg3)
1028 self.pg0.add_stream(pkts)
1029 self.pg_enable_capture(self.pg_interfaces)
1031 self.pg3.assert_nothing_captured()
1033 def test_multiple_inside_interfaces(self):
1034 """ SNAT multiple inside interfaces (non-overlapping address space) """
1036 self.snat_add_address(self.snat_addr)
1037 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1038 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1039 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1042 # between two S-NAT inside interfaces (no translation)
1043 pkts = self.create_stream_in(self.pg0, self.pg1)
1044 self.pg0.add_stream(pkts)
1045 self.pg_enable_capture(self.pg_interfaces)
1047 capture = self.pg1.get_capture(len(pkts))
1048 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1050 # from S-NAT inside to interface without S-NAT feature (no translation)
1051 pkts = self.create_stream_in(self.pg0, self.pg2)
1052 self.pg0.add_stream(pkts)
1053 self.pg_enable_capture(self.pg_interfaces)
1055 capture = self.pg2.get_capture(len(pkts))
1056 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1058 # in2out 1st interface
1059 pkts = self.create_stream_in(self.pg0, self.pg3)
1060 self.pg0.add_stream(pkts)
1061 self.pg_enable_capture(self.pg_interfaces)
1063 capture = self.pg3.get_capture(len(pkts))
1064 self.verify_capture_out(capture)
1066 # out2in 1st interface
1067 pkts = self.create_stream_out(self.pg3)
1068 self.pg3.add_stream(pkts)
1069 self.pg_enable_capture(self.pg_interfaces)
1071 capture = self.pg0.get_capture(len(pkts))
1072 self.verify_capture_in(capture, self.pg0)
1074 # in2out 2nd interface
1075 pkts = self.create_stream_in(self.pg1, self.pg3)
1076 self.pg1.add_stream(pkts)
1077 self.pg_enable_capture(self.pg_interfaces)
1079 capture = self.pg3.get_capture(len(pkts))
1080 self.verify_capture_out(capture)
1082 # out2in 2nd interface
1083 pkts = self.create_stream_out(self.pg3)
1084 self.pg3.add_stream(pkts)
1085 self.pg_enable_capture(self.pg_interfaces)
1087 capture = self.pg1.get_capture(len(pkts))
1088 self.verify_capture_in(capture, self.pg1)
1090 def test_inside_overlapping_interfaces(self):
1091 """ SNAT multiple inside interfaces with overlapping address space """
1093 static_nat_ip = "10.0.0.10"
1094 self.snat_add_address(self.snat_addr)
1095 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1097 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1098 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
1099 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
1100 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1103 # between S-NAT inside interfaces with same VRF (no translation)
1104 pkts = self.create_stream_in(self.pg4, self.pg5)
1105 self.pg4.add_stream(pkts)
1106 self.pg_enable_capture(self.pg_interfaces)
1108 capture = self.pg5.get_capture(len(pkts))
1109 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1111 # between S-NAT inside interfaces with different VRF (hairpinning)
1112 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1113 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1114 TCP(sport=1234, dport=5678))
1115 self.pg4.add_stream(p)
1116 self.pg_enable_capture(self.pg_interfaces)
1118 capture = self.pg6.get_capture(1)
1123 self.assertEqual(ip.src, self.snat_addr)
1124 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1125 self.assertNotEqual(tcp.sport, 1234)
1126 self.assertEqual(tcp.dport, 5678)
1128 self.logger.error(ppp("Unexpected or invalid packet:", p))
1131 # in2out 1st interface
1132 pkts = self.create_stream_in(self.pg4, self.pg3)
1133 self.pg4.add_stream(pkts)
1134 self.pg_enable_capture(self.pg_interfaces)
1136 capture = self.pg3.get_capture(len(pkts))
1137 self.verify_capture_out(capture)
1139 # out2in 1st interface
1140 pkts = self.create_stream_out(self.pg3)
1141 self.pg3.add_stream(pkts)
1142 self.pg_enable_capture(self.pg_interfaces)
1144 capture = self.pg4.get_capture(len(pkts))
1145 self.verify_capture_in(capture, self.pg4)
1147 # in2out 2nd interface
1148 pkts = self.create_stream_in(self.pg5, self.pg3)
1149 self.pg5.add_stream(pkts)
1150 self.pg_enable_capture(self.pg_interfaces)
1152 capture = self.pg3.get_capture(len(pkts))
1153 self.verify_capture_out(capture)
1155 # out2in 2nd interface
1156 pkts = self.create_stream_out(self.pg3)
1157 self.pg3.add_stream(pkts)
1158 self.pg_enable_capture(self.pg_interfaces)
1160 capture = self.pg5.get_capture(len(pkts))
1161 self.verify_capture_in(capture, self.pg5)
1164 addresses = self.vapi.snat_address_dump()
1165 self.assertEqual(len(addresses), 1)
1166 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1167 self.assertEqual(len(sessions), 3)
1168 for session in sessions:
1169 self.assertFalse(session.is_static)
1170 self.assertEqual(session.inside_ip_address[0:4],
1171 self.pg5.remote_ip4n)
1172 self.assertEqual(session.outside_ip_address,
1173 addresses[0].ip_address)
1174 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1175 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1176 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1177 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1178 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1179 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1180 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1181 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1182 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1184 # in2out 3rd interface
1185 pkts = self.create_stream_in(self.pg6, self.pg3)
1186 self.pg6.add_stream(pkts)
1187 self.pg_enable_capture(self.pg_interfaces)
1189 capture = self.pg3.get_capture(len(pkts))
1190 self.verify_capture_out(capture, static_nat_ip, True)
1192 # out2in 3rd interface
1193 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1194 self.pg3.add_stream(pkts)
1195 self.pg_enable_capture(self.pg_interfaces)
1197 capture = self.pg6.get_capture(len(pkts))
1198 self.verify_capture_in(capture, self.pg6)
1200 # general user and session dump verifications
1201 users = self.vapi.snat_user_dump()
1202 self.assertTrue(len(users) >= 3)
1203 addresses = self.vapi.snat_address_dump()
1204 self.assertEqual(len(addresses), 1)
1206 sessions = self.vapi.snat_user_session_dump(user.ip_address,
1208 for session in sessions:
1209 self.assertEqual(user.ip_address, session.inside_ip_address)
1210 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1211 self.assertTrue(session.protocol in
1212 [IP_PROTOS.tcp, IP_PROTOS.udp,
1216 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1217 self.assertTrue(len(sessions) >= 4)
1218 for session in sessions:
1219 self.assertFalse(session.is_static)
1220 self.assertEqual(session.inside_ip_address[0:4],
1221 self.pg4.remote_ip4n)
1222 self.assertEqual(session.outside_ip_address,
1223 addresses[0].ip_address)
1226 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1227 self.assertTrue(len(sessions) >= 3)
1228 for session in sessions:
1229 self.assertTrue(session.is_static)
1230 self.assertEqual(session.inside_ip_address[0:4],
1231 self.pg6.remote_ip4n)
1232 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1233 map(int, static_nat_ip.split('.')))
1234 self.assertTrue(session.inside_port in
1235 [self.tcp_port_in, self.udp_port_in,
1238 def test_hairpinning(self):
1239 """ SNAT hairpinning - 1:1 NAT with port"""
1241 host = self.pg0.remote_hosts[0]
1242 server = self.pg0.remote_hosts[1]
1245 server_in_port = 5678
1246 server_out_port = 8765
1248 self.snat_add_address(self.snat_addr)
1249 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1250 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1252 # add static mapping for server
1253 self.snat_add_static_mapping(server.ip4, self.snat_addr,
1254 server_in_port, server_out_port,
1255 proto=IP_PROTOS.tcp)
1257 # send packet from host to server
1258 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1259 IP(src=host.ip4, dst=self.snat_addr) /
1260 TCP(sport=host_in_port, dport=server_out_port))
1261 self.pg0.add_stream(p)
1262 self.pg_enable_capture(self.pg_interfaces)
1264 capture = self.pg0.get_capture(1)
1269 self.assertEqual(ip.src, self.snat_addr)
1270 self.assertEqual(ip.dst, server.ip4)
1271 self.assertNotEqual(tcp.sport, host_in_port)
1272 self.assertEqual(tcp.dport, server_in_port)
1273 self.check_tcp_checksum(p)
1274 host_out_port = tcp.sport
1276 self.logger.error(ppp("Unexpected or invalid packet:", p))
1279 # send reply from server to host
1280 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1281 IP(src=server.ip4, dst=self.snat_addr) /
1282 TCP(sport=server_in_port, dport=host_out_port))
1283 self.pg0.add_stream(p)
1284 self.pg_enable_capture(self.pg_interfaces)
1286 capture = self.pg0.get_capture(1)
1291 self.assertEqual(ip.src, self.snat_addr)
1292 self.assertEqual(ip.dst, host.ip4)
1293 self.assertEqual(tcp.sport, server_out_port)
1294 self.assertEqual(tcp.dport, host_in_port)
1295 self.check_tcp_checksum(p)
1297 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1300 def test_hairpinning2(self):
1301 """ SNAT hairpinning - 1:1 NAT"""
1303 server1_nat_ip = "10.0.0.10"
1304 server2_nat_ip = "10.0.0.11"
1305 host = self.pg0.remote_hosts[0]
1306 server1 = self.pg0.remote_hosts[1]
1307 server2 = self.pg0.remote_hosts[2]
1308 server_tcp_port = 22
1309 server_udp_port = 20
1311 self.snat_add_address(self.snat_addr)
1312 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1313 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1316 # add static mapping for servers
1317 self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1318 self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1322 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1323 IP(src=host.ip4, dst=server1_nat_ip) /
1324 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1326 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1327 IP(src=host.ip4, dst=server1_nat_ip) /
1328 UDP(sport=self.udp_port_in, dport=server_udp_port))
1330 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1331 IP(src=host.ip4, dst=server1_nat_ip) /
1332 ICMP(id=self.icmp_id_in, type='echo-request'))
1334 self.pg0.add_stream(pkts)
1335 self.pg_enable_capture(self.pg_interfaces)
1337 capture = self.pg0.get_capture(len(pkts))
1338 for packet in capture:
1340 self.assertEqual(packet[IP].src, self.snat_addr)
1341 self.assertEqual(packet[IP].dst, server1.ip4)
1342 if packet.haslayer(TCP):
1343 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1344 self.assertEqual(packet[TCP].dport, server_tcp_port)
1345 self.tcp_port_out = packet[TCP].sport
1346 self.check_tcp_checksum(packet)
1347 elif packet.haslayer(UDP):
1348 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1349 self.assertEqual(packet[UDP].dport, server_udp_port)
1350 self.udp_port_out = packet[UDP].sport
1352 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1353 self.icmp_id_out = packet[ICMP].id
1355 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1360 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1361 IP(src=server1.ip4, dst=self.snat_addr) /
1362 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1364 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1365 IP(src=server1.ip4, dst=self.snat_addr) /
1366 UDP(sport=server_udp_port, dport=self.udp_port_out))
1368 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1369 IP(src=server1.ip4, dst=self.snat_addr) /
1370 ICMP(id=self.icmp_id_out, type='echo-reply'))
1372 self.pg0.add_stream(pkts)
1373 self.pg_enable_capture(self.pg_interfaces)
1375 capture = self.pg0.get_capture(len(pkts))
1376 for packet in capture:
1378 self.assertEqual(packet[IP].src, server1_nat_ip)
1379 self.assertEqual(packet[IP].dst, host.ip4)
1380 if packet.haslayer(TCP):
1381 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1382 self.assertEqual(packet[TCP].sport, server_tcp_port)
1383 self.check_tcp_checksum(packet)
1384 elif packet.haslayer(UDP):
1385 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1386 self.assertEqual(packet[UDP].sport, server_udp_port)
1388 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1390 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1393 # server2 to server1
1395 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1396 IP(src=server2.ip4, dst=server1_nat_ip) /
1397 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1399 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1400 IP(src=server2.ip4, dst=server1_nat_ip) /
1401 UDP(sport=self.udp_port_in, dport=server_udp_port))
1403 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1404 IP(src=server2.ip4, dst=server1_nat_ip) /
1405 ICMP(id=self.icmp_id_in, type='echo-request'))
1407 self.pg0.add_stream(pkts)
1408 self.pg_enable_capture(self.pg_interfaces)
1410 capture = self.pg0.get_capture(len(pkts))
1411 for packet in capture:
1413 self.assertEqual(packet[IP].src, server2_nat_ip)
1414 self.assertEqual(packet[IP].dst, server1.ip4)
1415 if packet.haslayer(TCP):
1416 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1417 self.assertEqual(packet[TCP].dport, server_tcp_port)
1418 self.tcp_port_out = packet[TCP].sport
1419 self.check_tcp_checksum(packet)
1420 elif packet.haslayer(UDP):
1421 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1422 self.assertEqual(packet[UDP].dport, server_udp_port)
1423 self.udp_port_out = packet[UDP].sport
1425 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1426 self.icmp_id_out = packet[ICMP].id
1428 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1431 # server1 to server2
1433 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1434 IP(src=server1.ip4, dst=server2_nat_ip) /
1435 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1437 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1438 IP(src=server1.ip4, dst=server2_nat_ip) /
1439 UDP(sport=server_udp_port, dport=self.udp_port_out))
1441 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1442 IP(src=server1.ip4, dst=server2_nat_ip) /
1443 ICMP(id=self.icmp_id_out, type='echo-reply'))
1445 self.pg0.add_stream(pkts)
1446 self.pg_enable_capture(self.pg_interfaces)
1448 capture = self.pg0.get_capture(len(pkts))
1449 for packet in capture:
1451 self.assertEqual(packet[IP].src, server1_nat_ip)
1452 self.assertEqual(packet[IP].dst, server2.ip4)
1453 if packet.haslayer(TCP):
1454 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1455 self.assertEqual(packet[TCP].sport, server_tcp_port)
1456 self.check_tcp_checksum(packet)
1457 elif packet.haslayer(UDP):
1458 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1459 self.assertEqual(packet[UDP].sport, server_udp_port)
1461 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1463 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1466 def test_max_translations_per_user(self):
1467 """ MAX translations per user - recycle the least recently used """
1469 self.snat_add_address(self.snat_addr)
1470 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1471 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1474 # get maximum number of translations per user
1475 snat_config = self.vapi.snat_show_config()
1477 # send more than maximum number of translations per user packets
1478 pkts_num = snat_config.max_translations_per_user + 5
1480 for port in range(0, pkts_num):
1481 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1482 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1483 TCP(sport=1025 + port))
1485 self.pg0.add_stream(pkts)
1486 self.pg_enable_capture(self.pg_interfaces)
1489 # verify number of translated packet
1490 self.pg1.get_capture(pkts_num)
1492 def test_interface_addr(self):
1493 """ Acquire SNAT addresses from interface """
1494 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1496 # no address in NAT pool
1497 adresses = self.vapi.snat_address_dump()
1498 self.assertEqual(0, len(adresses))
1500 # configure interface address and check NAT address pool
1501 self.pg7.config_ip4()
1502 adresses = self.vapi.snat_address_dump()
1503 self.assertEqual(1, len(adresses))
1504 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1506 # remove interface address and check NAT address pool
1507 self.pg7.unconfig_ip4()
1508 adresses = self.vapi.snat_address_dump()
1509 self.assertEqual(0, len(adresses))
1511 def test_interface_addr_static_mapping(self):
1512 """ Static mapping with addresses from interface """
1513 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1514 self.snat_add_static_mapping('1.2.3.4',
1515 external_sw_if_index=self.pg7.sw_if_index)
1517 # static mappings with external interface
1518 static_mappings = self.vapi.snat_static_mapping_dump()
1519 self.assertEqual(1, len(static_mappings))
1520 self.assertEqual(self.pg7.sw_if_index,
1521 static_mappings[0].external_sw_if_index)
1523 # configure interface address and check static mappings
1524 self.pg7.config_ip4()
1525 static_mappings = self.vapi.snat_static_mapping_dump()
1526 self.assertEqual(1, len(static_mappings))
1527 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1528 self.pg7.local_ip4n)
1529 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1531 # remove interface address and check static mappings
1532 self.pg7.unconfig_ip4()
1533 static_mappings = self.vapi.snat_static_mapping_dump()
1534 self.assertEqual(0, len(static_mappings))
1536 def test_ipfix_nat44_sess(self):
1537 """ S-NAT IPFIX logging NAT44 session created/delted """
1538 self.ipfix_domain_id = 10
1539 self.ipfix_src_port = 20202
1540 colector_port = 30303
1541 bind_layers(UDP, IPFIX, dport=30303)
1542 self.snat_add_address(self.snat_addr)
1543 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1544 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1546 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1547 src_address=self.pg3.local_ip4n,
1549 template_interval=10,
1550 collector_port=colector_port)
1551 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1552 src_port=self.ipfix_src_port)
1554 pkts = self.create_stream_in(self.pg0, self.pg1)
1555 self.pg0.add_stream(pkts)
1556 self.pg_enable_capture(self.pg_interfaces)
1558 capture = self.pg1.get_capture(len(pkts))
1559 self.verify_capture_out(capture)
1560 self.snat_add_address(self.snat_addr, is_add=0)
1561 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1562 capture = self.pg3.get_capture(3)
1563 ipfix = IPFIXDecoder()
1564 # first load template
1566 self.assertTrue(p.haslayer(IPFIX))
1567 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1568 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1569 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1570 self.assertEqual(p[UDP].dport, colector_port)
1571 self.assertEqual(p[IPFIX].observationDomainID,
1572 self.ipfix_domain_id)
1573 if p.haslayer(Template):
1574 ipfix.add_template(p.getlayer(Template))
1575 # verify events in data set
1577 if p.haslayer(Data):
1578 data = ipfix.decode_data_set(p.getlayer(Set))
1579 self.verify_ipfix_nat44_ses(data)
1581 def test_ipfix_addr_exhausted(self):
1582 """ S-NAT IPFIX logging NAT addresses exhausted """
1583 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1584 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1586 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1587 src_address=self.pg3.local_ip4n,
1589 template_interval=10)
1590 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1591 src_port=self.ipfix_src_port)
1593 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1594 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1596 self.pg0.add_stream(p)
1597 self.pg_enable_capture(self.pg_interfaces)
1599 capture = self.pg1.get_capture(0)
1600 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1601 capture = self.pg3.get_capture(3)
1602 ipfix = IPFIXDecoder()
1603 # first load template
1605 self.assertTrue(p.haslayer(IPFIX))
1606 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1607 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1608 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1609 self.assertEqual(p[UDP].dport, 4739)
1610 self.assertEqual(p[IPFIX].observationDomainID,
1611 self.ipfix_domain_id)
1612 if p.haslayer(Template):
1613 ipfix.add_template(p.getlayer(Template))
1614 # verify events in data set
1616 if p.haslayer(Data):
1617 data = ipfix.decode_data_set(p.getlayer(Set))
1618 self.verify_ipfix_addr_exhausted(data)
1620 def test_pool_addr_fib(self):
1621 """ S-NAT add pool addresses to FIB """
1622 static_addr = '10.0.0.10'
1623 self.snat_add_address(self.snat_addr)
1624 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1625 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1627 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1630 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1631 ARP(op=ARP.who_has, pdst=self.snat_addr,
1632 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1633 self.pg1.add_stream(p)
1634 self.pg_enable_capture(self.pg_interfaces)
1636 capture = self.pg1.get_capture(1)
1637 self.assertTrue(capture[0].haslayer(ARP))
1638 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1641 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1642 ARP(op=ARP.who_has, pdst=static_addr,
1643 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1644 self.pg1.add_stream(p)
1645 self.pg_enable_capture(self.pg_interfaces)
1647 capture = self.pg1.get_capture(1)
1648 self.assertTrue(capture[0].haslayer(ARP))
1649 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1651 # send ARP to non-SNAT interface
1652 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1653 ARP(op=ARP.who_has, pdst=self.snat_addr,
1654 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1655 self.pg2.add_stream(p)
1656 self.pg_enable_capture(self.pg_interfaces)
1658 capture = self.pg1.get_capture(0)
1660 # remove addresses and verify
1661 self.snat_add_address(self.snat_addr, is_add=0)
1662 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1665 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1666 ARP(op=ARP.who_has, pdst=self.snat_addr,
1667 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1668 self.pg1.add_stream(p)
1669 self.pg_enable_capture(self.pg_interfaces)
1671 capture = self.pg1.get_capture(0)
1673 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1674 ARP(op=ARP.who_has, pdst=static_addr,
1675 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1676 self.pg1.add_stream(p)
1677 self.pg_enable_capture(self.pg_interfaces)
1679 capture = self.pg1.get_capture(0)
1681 def test_vrf_mode(self):
1682 """ S-NAT tenant VRF aware address pool mode """
1686 nat_ip1 = "10.0.0.10"
1687 nat_ip2 = "10.0.0.11"
1689 self.pg0.unconfig_ip4()
1690 self.pg1.unconfig_ip4()
1691 self.pg0.set_table_ip4(vrf_id1)
1692 self.pg1.set_table_ip4(vrf_id2)
1693 self.pg0.config_ip4()
1694 self.pg1.config_ip4()
1696 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1697 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1698 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1699 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1700 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1704 pkts = self.create_stream_in(self.pg0, self.pg2)
1705 self.pg0.add_stream(pkts)
1706 self.pg_enable_capture(self.pg_interfaces)
1708 capture = self.pg2.get_capture(len(pkts))
1709 self.verify_capture_out(capture, nat_ip1)
1712 pkts = self.create_stream_in(self.pg1, self.pg2)
1713 self.pg1.add_stream(pkts)
1714 self.pg_enable_capture(self.pg_interfaces)
1716 capture = self.pg2.get_capture(len(pkts))
1717 self.verify_capture_out(capture, nat_ip2)
1719 def test_vrf_feature_independent(self):
1720 """ S-NAT tenant VRF independent address pool mode """
1722 nat_ip1 = "10.0.0.10"
1723 nat_ip2 = "10.0.0.11"
1725 self.snat_add_address(nat_ip1)
1726 self.snat_add_address(nat_ip2)
1727 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1728 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1729 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1733 pkts = self.create_stream_in(self.pg0, self.pg2)
1734 self.pg0.add_stream(pkts)
1735 self.pg_enable_capture(self.pg_interfaces)
1737 capture = self.pg2.get_capture(len(pkts))
1738 self.verify_capture_out(capture, nat_ip1)
1741 pkts = self.create_stream_in(self.pg1, self.pg2)
1742 self.pg1.add_stream(pkts)
1743 self.pg_enable_capture(self.pg_interfaces)
1745 capture = self.pg2.get_capture(len(pkts))
1746 self.verify_capture_out(capture, nat_ip1)
1748 def test_dynamic_ipless_interfaces(self):
1749 """ SNAT interfaces without configured ip dynamic map """
1751 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1752 self.pg7.remote_mac,
1753 self.pg7.remote_ip4n,
1755 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1756 self.pg8.remote_mac,
1757 self.pg8.remote_ip4n,
1760 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1761 dst_address_length=32,
1762 next_hop_address=self.pg7.remote_ip4n,
1763 next_hop_sw_if_index=self.pg7.sw_if_index)
1764 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1765 dst_address_length=32,
1766 next_hop_address=self.pg8.remote_ip4n,
1767 next_hop_sw_if_index=self.pg8.sw_if_index)
1769 self.snat_add_address(self.snat_addr)
1770 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1771 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1775 pkts = self.create_stream_in(self.pg7, self.pg8)
1776 self.pg7.add_stream(pkts)
1777 self.pg_enable_capture(self.pg_interfaces)
1779 capture = self.pg8.get_capture(len(pkts))
1780 self.verify_capture_out(capture)
1783 pkts = self.create_stream_out(self.pg8, self.snat_addr)
1784 self.pg8.add_stream(pkts)
1785 self.pg_enable_capture(self.pg_interfaces)
1787 capture = self.pg7.get_capture(len(pkts))
1788 self.verify_capture_in(capture, self.pg7)
1790 def test_static_ipless_interfaces(self):
1791 """ SNAT 1:1 NAT interfaces without configured ip """
1793 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1794 self.pg7.remote_mac,
1795 self.pg7.remote_ip4n,
1797 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1798 self.pg8.remote_mac,
1799 self.pg8.remote_ip4n,
1802 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1803 dst_address_length=32,
1804 next_hop_address=self.pg7.remote_ip4n,
1805 next_hop_sw_if_index=self.pg7.sw_if_index)
1806 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1807 dst_address_length=32,
1808 next_hop_address=self.pg8.remote_ip4n,
1809 next_hop_sw_if_index=self.pg8.sw_if_index)
1811 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1812 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1813 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1817 pkts = self.create_stream_out(self.pg8)
1818 self.pg8.add_stream(pkts)
1819 self.pg_enable_capture(self.pg_interfaces)
1821 capture = self.pg7.get_capture(len(pkts))
1822 self.verify_capture_in(capture, self.pg7)
1825 pkts = self.create_stream_in(self.pg7, self.pg8)
1826 self.pg7.add_stream(pkts)
1827 self.pg_enable_capture(self.pg_interfaces)
1829 capture = self.pg8.get_capture(len(pkts))
1830 self.verify_capture_out(capture, self.snat_addr, True)
1832 def test_static_with_port_ipless_interfaces(self):
1833 """ SNAT 1:1 NAT with port interfaces without configured ip """
1835 self.tcp_port_out = 30606
1836 self.udp_port_out = 30607
1837 self.icmp_id_out = 30608
1839 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1840 self.pg7.remote_mac,
1841 self.pg7.remote_ip4n,
1843 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1844 self.pg8.remote_mac,
1845 self.pg8.remote_ip4n,
1848 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1849 dst_address_length=32,
1850 next_hop_address=self.pg7.remote_ip4n,
1851 next_hop_sw_if_index=self.pg7.sw_if_index)
1852 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1853 dst_address_length=32,
1854 next_hop_address=self.pg8.remote_ip4n,
1855 next_hop_sw_if_index=self.pg8.sw_if_index)
1857 self.snat_add_address(self.snat_addr)
1858 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1859 self.tcp_port_in, self.tcp_port_out,
1860 proto=IP_PROTOS.tcp)
1861 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1862 self.udp_port_in, self.udp_port_out,
1863 proto=IP_PROTOS.udp)
1864 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1865 self.icmp_id_in, self.icmp_id_out,
1866 proto=IP_PROTOS.icmp)
1867 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1868 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1872 pkts = self.create_stream_out(self.pg8)
1873 self.pg8.add_stream(pkts)
1874 self.pg_enable_capture(self.pg_interfaces)
1876 capture = self.pg7.get_capture(len(pkts))
1877 self.verify_capture_in(capture, self.pg7)
1880 pkts = self.create_stream_in(self.pg7, self.pg8)
1881 self.pg7.add_stream(pkts)
1882 self.pg_enable_capture(self.pg_interfaces)
1884 capture = self.pg8.get_capture(len(pkts))
1885 self.verify_capture_out(capture)
1887 def test_static_unknown_proto(self):
1888 """ 1:1 NAT translate packet with unknown protocol """
1889 nat_ip = "10.0.0.10"
1890 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1891 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1892 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1896 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1897 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1899 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1900 TCP(sport=1234, dport=1234))
1901 self.pg0.add_stream(p)
1902 self.pg_enable_capture(self.pg_interfaces)
1904 p = self.pg1.get_capture(1)
1907 self.assertEqual(packet[IP].src, nat_ip)
1908 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1909 self.assertTrue(packet.haslayer(GRE))
1910 self.check_ip_checksum(packet)
1912 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1916 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1917 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1919 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1920 TCP(sport=1234, dport=1234))
1921 self.pg1.add_stream(p)
1922 self.pg_enable_capture(self.pg_interfaces)
1924 p = self.pg0.get_capture(1)
1927 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
1928 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
1929 self.assertTrue(packet.haslayer(GRE))
1930 self.check_ip_checksum(packet)
1932 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1935 def test_hairpinning_static_unknown_proto(self):
1936 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
1938 host = self.pg0.remote_hosts[0]
1939 server = self.pg0.remote_hosts[1]
1941 host_nat_ip = "10.0.0.10"
1942 server_nat_ip = "10.0.0.11"
1944 self.snat_add_static_mapping(host.ip4, host_nat_ip)
1945 self.snat_add_static_mapping(server.ip4, server_nat_ip)
1946 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1947 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1951 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
1952 IP(src=host.ip4, dst=server_nat_ip) /
1954 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1955 TCP(sport=1234, dport=1234))
1956 self.pg0.add_stream(p)
1957 self.pg_enable_capture(self.pg_interfaces)
1959 p = self.pg0.get_capture(1)
1962 self.assertEqual(packet[IP].src, host_nat_ip)
1963 self.assertEqual(packet[IP].dst, server.ip4)
1964 self.assertTrue(packet.haslayer(GRE))
1965 self.check_ip_checksum(packet)
1967 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1971 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
1972 IP(src=server.ip4, dst=host_nat_ip) /
1974 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1975 TCP(sport=1234, dport=1234))
1976 self.pg0.add_stream(p)
1977 self.pg_enable_capture(self.pg_interfaces)
1979 p = self.pg0.get_capture(1)
1982 self.assertEqual(packet[IP].src, server_nat_ip)
1983 self.assertEqual(packet[IP].dst, host.ip4)
1984 self.assertTrue(packet.haslayer(GRE))
1985 self.check_ip_checksum(packet)
1987 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1990 def test_unknown_proto(self):
1991 """ SNAT translate packet with unknown protocol """
1992 self.snat_add_address(self.snat_addr)
1993 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1994 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1998 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1999 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2000 TCP(sport=self.tcp_port_in, dport=20))
2001 self.pg0.add_stream(p)
2002 self.pg_enable_capture(self.pg_interfaces)
2004 p = self.pg1.get_capture(1)
2006 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2007 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2009 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2010 TCP(sport=1234, dport=1234))
2011 self.pg0.add_stream(p)
2012 self.pg_enable_capture(self.pg_interfaces)
2014 p = self.pg1.get_capture(1)
2017 self.assertEqual(packet[IP].src, self.snat_addr)
2018 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2019 self.assertTrue(packet.haslayer(GRE))
2020 self.check_ip_checksum(packet)
2022 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2026 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2027 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2029 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2030 TCP(sport=1234, dport=1234))
2031 self.pg1.add_stream(p)
2032 self.pg_enable_capture(self.pg_interfaces)
2034 p = self.pg0.get_capture(1)
2037 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2038 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2039 self.assertTrue(packet.haslayer(GRE))
2040 self.check_ip_checksum(packet)
2042 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2045 def test_hairpinning_unknown_proto(self):
2046 """ SNAT translate packet with unknown protocol - hairpinning """
2047 host = self.pg0.remote_hosts[0]
2048 server = self.pg0.remote_hosts[1]
2051 server_in_port = 5678
2052 server_out_port = 8765
2053 server_nat_ip = "10.0.0.11"
2055 self.snat_add_address(self.snat_addr)
2056 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2057 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2060 # add static mapping for server
2061 self.snat_add_static_mapping(server.ip4, server_nat_ip)
2064 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2065 IP(src=host.ip4, dst=server_nat_ip) /
2066 TCP(sport=host_in_port, dport=server_out_port))
2067 self.pg0.add_stream(p)
2068 self.pg_enable_capture(self.pg_interfaces)
2070 capture = self.pg0.get_capture(1)
2072 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2073 IP(src=host.ip4, dst=server_nat_ip) /
2075 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2076 TCP(sport=1234, dport=1234))
2077 self.pg0.add_stream(p)
2078 self.pg_enable_capture(self.pg_interfaces)
2080 p = self.pg0.get_capture(1)
2083 self.assertEqual(packet[IP].src, self.snat_addr)
2084 self.assertEqual(packet[IP].dst, server.ip4)
2085 self.assertTrue(packet.haslayer(GRE))
2086 self.check_ip_checksum(packet)
2088 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2092 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2093 IP(src=server.ip4, dst=self.snat_addr) /
2095 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2096 TCP(sport=1234, dport=1234))
2097 self.pg0.add_stream(p)
2098 self.pg_enable_capture(self.pg_interfaces)
2100 p = self.pg0.get_capture(1)
2103 self.assertEqual(packet[IP].src, server_nat_ip)
2104 self.assertEqual(packet[IP].dst, host.ip4)
2105 self.assertTrue(packet.haslayer(GRE))
2106 self.check_ip_checksum(packet)
2108 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2112 super(TestSNAT, self).tearDown()
2113 if not self.vpp_dead:
2114 self.logger.info(self.vapi.cli("show snat verbose"))
2118 class TestDeterministicNAT(MethodHolder):
2119 """ Deterministic NAT Test Cases """
2122 def setUpConstants(cls):
2123 super(TestDeterministicNAT, cls).setUpConstants()
2124 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
2127 def setUpClass(cls):
2128 super(TestDeterministicNAT, cls).setUpClass()
2131 cls.tcp_port_in = 6303
2132 cls.tcp_external_port = 6303
2133 cls.udp_port_in = 6304
2134 cls.udp_external_port = 6304
2135 cls.icmp_id_in = 6305
2136 cls.snat_addr = '10.0.0.3'
2138 cls.create_pg_interfaces(range(3))
2139 cls.interfaces = list(cls.pg_interfaces)
2141 for i in cls.interfaces:
2146 cls.pg0.generate_remote_hosts(2)
2147 cls.pg0.configure_ipv4_neighbors()
2150 super(TestDeterministicNAT, cls).tearDownClass()
2153 def create_stream_in(self, in_if, out_if, ttl=64):
2155 Create packet stream for inside network
2157 :param in_if: Inside interface
2158 :param out_if: Outside interface
2159 :param ttl: TTL of generated packets
2163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2164 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2165 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2169 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2170 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2171 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2175 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2176 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2177 ICMP(id=self.icmp_id_in, type='echo-request'))
2182 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2184 Create packet stream for outside network
2186 :param out_if: Outside interface
2187 :param dst_ip: Destination IP address (Default use global SNAT address)
2188 :param ttl: TTL of generated packets
2191 dst_ip = self.snat_addr
2194 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2195 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2196 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2200 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2201 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2202 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2206 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2207 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2208 ICMP(id=self.icmp_external_id, type='echo-reply'))
2213 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2215 Verify captured packets on outside network
2217 :param capture: Captured packets
2218 :param nat_ip: Translated IP address (Default use global SNAT address)
2219 :param same_port: Sorce port number is not translated (Default False)
2220 :param packet_num: Expected number of packets (Default 3)
2223 nat_ip = self.snat_addr
2224 self.assertEqual(packet_num, len(capture))
2225 for packet in capture:
2227 self.assertEqual(packet[IP].src, nat_ip)
2228 if packet.haslayer(TCP):
2229 self.tcp_port_out = packet[TCP].sport
2230 elif packet.haslayer(UDP):
2231 self.udp_port_out = packet[UDP].sport
2233 self.icmp_external_id = packet[ICMP].id
2235 self.logger.error(ppp("Unexpected or invalid packet "
2236 "(outside network):", packet))
2239 def initiate_tcp_session(self, in_if, out_if):
2241 Initiates TCP session
2243 :param in_if: Inside interface
2244 :param out_if: Outside interface
2247 # SYN packet in->out
2248 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2249 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2250 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2253 self.pg_enable_capture(self.pg_interfaces)
2255 capture = out_if.get_capture(1)
2257 self.tcp_port_out = p[TCP].sport
2259 # SYN + ACK packet out->in
2260 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2261 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
2262 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2264 out_if.add_stream(p)
2265 self.pg_enable_capture(self.pg_interfaces)
2267 in_if.get_capture(1)
2269 # ACK packet in->out
2270 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2271 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2272 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2275 self.pg_enable_capture(self.pg_interfaces)
2277 out_if.get_capture(1)
2280 self.logger.error("TCP 3 way handshake failed")
2283 def verify_ipfix_max_entries_per_user(self, data):
2285 Verify IPFIX maximum entries per user exceeded event
2287 :param data: Decoded IPFIX data records
2289 self.assertEqual(1, len(data))
2292 self.assertEqual(ord(record[230]), 13)
2293 # natQuotaExceededEvent
2294 self.assertEqual('\x03\x00\x00\x00', record[466])
2296 self.assertEqual(self.pg0.remote_ip4n, record[8])
2298 def test_deterministic_mode(self):
2299 """ S-NAT run deterministic mode """
2300 in_addr = '172.16.255.0'
2301 out_addr = '172.17.255.50'
2302 in_addr_t = '172.16.255.20'
2303 in_addr_n = socket.inet_aton(in_addr)
2304 out_addr_n = socket.inet_aton(out_addr)
2305 in_addr_t_n = socket.inet_aton(in_addr_t)
2309 snat_config = self.vapi.snat_show_config()
2310 self.assertEqual(1, snat_config.deterministic)
2312 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
2314 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
2315 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2316 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
2317 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2319 deterministic_mappings = self.vapi.snat_det_map_dump()
2320 self.assertEqual(len(deterministic_mappings), 1)
2321 dsm = deterministic_mappings[0]
2322 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2323 self.assertEqual(in_plen, dsm.in_plen)
2324 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2325 self.assertEqual(out_plen, dsm.out_plen)
2328 deterministic_mappings = self.vapi.snat_det_map_dump()
2329 self.assertEqual(len(deterministic_mappings), 0)
2331 def test_set_timeouts(self):
2332 """ Set deterministic NAT timeouts """
2333 timeouts_before = self.vapi.snat_det_get_timeouts()
2335 self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
2336 timeouts_before.tcp_established + 10,
2337 timeouts_before.tcp_transitory + 10,
2338 timeouts_before.icmp + 10)
2340 timeouts_after = self.vapi.snat_det_get_timeouts()
2342 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2343 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2344 self.assertNotEqual(timeouts_before.tcp_established,
2345 timeouts_after.tcp_established)
2346 self.assertNotEqual(timeouts_before.tcp_transitory,
2347 timeouts_after.tcp_transitory)
2349 def test_det_in(self):
2350 """ CGNAT translation test (TCP, UDP, ICMP) """
2352 nat_ip = "10.0.0.10"
2354 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2356 socket.inet_aton(nat_ip),
2358 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2359 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2363 pkts = self.create_stream_in(self.pg0, self.pg1)
2364 self.pg0.add_stream(pkts)
2365 self.pg_enable_capture(self.pg_interfaces)
2367 capture = self.pg1.get_capture(len(pkts))
2368 self.verify_capture_out(capture, nat_ip)
2371 pkts = self.create_stream_out(self.pg1, nat_ip)
2372 self.pg1.add_stream(pkts)
2373 self.pg_enable_capture(self.pg_interfaces)
2375 capture = self.pg0.get_capture(len(pkts))
2376 self.verify_capture_in(capture, self.pg0)
2379 sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2380 self.assertEqual(len(sessions), 3)
2384 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2385 self.assertEqual(s.in_port, self.tcp_port_in)
2386 self.assertEqual(s.out_port, self.tcp_port_out)
2387 self.assertEqual(s.ext_port, self.tcp_external_port)
2391 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2392 self.assertEqual(s.in_port, self.udp_port_in)
2393 self.assertEqual(s.out_port, self.udp_port_out)
2394 self.assertEqual(s.ext_port, self.udp_external_port)
2398 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2399 self.assertEqual(s.in_port, self.icmp_id_in)
2400 self.assertEqual(s.out_port, self.icmp_external_id)
2402 def test_multiple_users(self):
2403 """ CGNAT multiple users """
2405 nat_ip = "10.0.0.10"
2407 external_port = 6303
2409 host0 = self.pg0.remote_hosts[0]
2410 host1 = self.pg0.remote_hosts[1]
2412 self.vapi.snat_add_det_map(host0.ip4n,
2414 socket.inet_aton(nat_ip),
2416 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2417 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2421 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2422 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2423 TCP(sport=port_in, dport=external_port))
2424 self.pg0.add_stream(p)
2425 self.pg_enable_capture(self.pg_interfaces)
2427 capture = self.pg1.get_capture(1)
2432 self.assertEqual(ip.src, nat_ip)
2433 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2434 self.assertEqual(tcp.dport, external_port)
2435 port_out0 = tcp.sport
2437 self.logger.error(ppp("Unexpected or invalid packet:", p))
2441 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2442 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2443 TCP(sport=port_in, dport=external_port))
2444 self.pg0.add_stream(p)
2445 self.pg_enable_capture(self.pg_interfaces)
2447 capture = self.pg1.get_capture(1)
2452 self.assertEqual(ip.src, nat_ip)
2453 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2454 self.assertEqual(tcp.dport, external_port)
2455 port_out1 = tcp.sport
2457 self.logger.error(ppp("Unexpected or invalid packet:", p))
2460 dms = self.vapi.snat_det_map_dump()
2461 self.assertEqual(1, len(dms))
2462 self.assertEqual(2, dms[0].ses_num)
2465 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2466 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2467 TCP(sport=external_port, dport=port_out0))
2468 self.pg1.add_stream(p)
2469 self.pg_enable_capture(self.pg_interfaces)
2471 capture = self.pg0.get_capture(1)
2476 self.assertEqual(ip.src, self.pg1.remote_ip4)
2477 self.assertEqual(ip.dst, host0.ip4)
2478 self.assertEqual(tcp.dport, port_in)
2479 self.assertEqual(tcp.sport, external_port)
2481 self.logger.error(ppp("Unexpected or invalid packet:", p))
2485 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2486 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2487 TCP(sport=external_port, dport=port_out1))
2488 self.pg1.add_stream(p)
2489 self.pg_enable_capture(self.pg_interfaces)
2491 capture = self.pg0.get_capture(1)
2496 self.assertEqual(ip.src, self.pg1.remote_ip4)
2497 self.assertEqual(ip.dst, host1.ip4)
2498 self.assertEqual(tcp.dport, port_in)
2499 self.assertEqual(tcp.sport, external_port)
2501 self.logger.error(ppp("Unexpected or invalid packet", p))
2504 # session close api test
2505 self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2507 self.pg1.remote_ip4n,
2509 dms = self.vapi.snat_det_map_dump()
2510 self.assertEqual(dms[0].ses_num, 1)
2512 self.vapi.snat_det_close_session_in(host0.ip4n,
2514 self.pg1.remote_ip4n,
2516 dms = self.vapi.snat_det_map_dump()
2517 self.assertEqual(dms[0].ses_num, 0)
2519 def test_tcp_session_close_detection_in(self):
2520 """ CGNAT TCP session close initiated from inside network """
2521 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2523 socket.inet_aton(self.snat_addr),
2525 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2526 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2529 self.initiate_tcp_session(self.pg0, self.pg1)
2531 # close the session from inside
2533 # FIN packet in -> out
2534 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2535 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2536 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2538 self.pg0.add_stream(p)
2539 self.pg_enable_capture(self.pg_interfaces)
2541 self.pg1.get_capture(1)
2545 # ACK packet out -> in
2546 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2547 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2548 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2552 # FIN packet out -> in
2553 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2554 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2555 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2559 self.pg1.add_stream(pkts)
2560 self.pg_enable_capture(self.pg_interfaces)
2562 self.pg0.get_capture(2)
2564 # ACK packet in -> out
2565 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2566 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2567 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2569 self.pg0.add_stream(p)
2570 self.pg_enable_capture(self.pg_interfaces)
2572 self.pg1.get_capture(1)
2574 # Check if snat closed the session
2575 dms = self.vapi.snat_det_map_dump()
2576 self.assertEqual(0, dms[0].ses_num)
2578 self.logger.error("TCP session termination failed")
2581 def test_tcp_session_close_detection_out(self):
2582 """ CGNAT TCP session close initiated from outside network """
2583 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2585 socket.inet_aton(self.snat_addr),
2587 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2588 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2591 self.initiate_tcp_session(self.pg0, self.pg1)
2593 # close the session from outside
2595 # FIN packet out -> in
2596 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2597 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2598 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2600 self.pg1.add_stream(p)
2601 self.pg_enable_capture(self.pg_interfaces)
2603 self.pg0.get_capture(1)
2607 # ACK packet in -> out
2608 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2609 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2610 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2614 # ACK packet in -> out
2615 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2616 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2617 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2621 self.pg0.add_stream(pkts)
2622 self.pg_enable_capture(self.pg_interfaces)
2624 self.pg1.get_capture(2)
2626 # ACK packet out -> in
2627 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2628 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2629 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2631 self.pg1.add_stream(p)
2632 self.pg_enable_capture(self.pg_interfaces)
2634 self.pg0.get_capture(1)
2636 # Check if snat closed the session
2637 dms = self.vapi.snat_det_map_dump()
2638 self.assertEqual(0, dms[0].ses_num)
2640 self.logger.error("TCP session termination failed")
2643 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2644 def test_session_timeout(self):
2645 """ CGNAT session timeouts """
2646 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2648 socket.inet_aton(self.snat_addr),
2650 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2651 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2654 self.initiate_tcp_session(self.pg0, self.pg1)
2655 self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2656 pkts = self.create_stream_in(self.pg0, self.pg1)
2657 self.pg0.add_stream(pkts)
2658 self.pg_enable_capture(self.pg_interfaces)
2660 capture = self.pg1.get_capture(len(pkts))
2663 dms = self.vapi.snat_det_map_dump()
2664 self.assertEqual(0, dms[0].ses_num)
2666 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2667 def test_session_limit_per_user(self):
2668 """ CGNAT maximum 1000 sessions per user should be created """
2669 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2671 socket.inet_aton(self.snat_addr),
2673 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2674 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2676 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2677 src_address=self.pg2.local_ip4n,
2679 template_interval=10)
2680 self.vapi.snat_ipfix()
2683 for port in range(1025, 2025):
2684 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2685 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2686 UDP(sport=port, dport=port))
2689 self.pg0.add_stream(pkts)
2690 self.pg_enable_capture(self.pg_interfaces)
2692 capture = self.pg1.get_capture(len(pkts))
2694 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2695 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2696 UDP(sport=3001, dport=3002))
2697 self.pg0.add_stream(p)
2698 self.pg_enable_capture(self.pg_interfaces)
2700 capture = self.pg1.assert_nothing_captured()
2702 # verify ICMP error packet
2703 capture = self.pg0.get_capture(1)
2705 self.assertTrue(p.haslayer(ICMP))
2707 self.assertEqual(icmp.type, 3)
2708 self.assertEqual(icmp.code, 1)
2709 self.assertTrue(icmp.haslayer(IPerror))
2710 inner_ip = icmp[IPerror]
2711 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2712 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2714 dms = self.vapi.snat_det_map_dump()
2716 self.assertEqual(1000, dms[0].ses_num)
2718 # verify IPFIX logging
2719 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2721 capture = self.pg2.get_capture(2)
2722 ipfix = IPFIXDecoder()
2723 # first load template
2725 self.assertTrue(p.haslayer(IPFIX))
2726 if p.haslayer(Template):
2727 ipfix.add_template(p.getlayer(Template))
2728 # verify events in data set
2730 if p.haslayer(Data):
2731 data = ipfix.decode_data_set(p.getlayer(Set))
2732 self.verify_ipfix_max_entries_per_user(data)
2734 def clear_snat(self):
2736 Clear SNAT configuration.
2738 self.vapi.snat_ipfix(enable=0)
2739 self.vapi.snat_det_set_timeouts()
2740 deterministic_mappings = self.vapi.snat_det_map_dump()
2741 for dsm in deterministic_mappings:
2742 self.vapi.snat_add_det_map(dsm.in_addr,
2748 interfaces = self.vapi.snat_interface_dump()
2749 for intf in interfaces:
2750 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2755 super(TestDeterministicNAT, self).tearDown()
2756 if not self.vpp_dead:
2757 self.logger.info(self.vapi.cli("show snat detail"))
2761 class TestNAT64(MethodHolder):
2762 """ NAT64 Test Cases """
2765 def setUpClass(cls):
2766 super(TestNAT64, cls).setUpClass()
2769 cls.tcp_port_in = 6303
2770 cls.tcp_port_out = 6303
2771 cls.udp_port_in = 6304
2772 cls.udp_port_out = 6304
2773 cls.icmp_id_in = 6305
2774 cls.icmp_id_out = 6305
2775 cls.nat_addr = '10.0.0.3'
2776 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2778 cls.vrf1_nat_addr = '10.0.10.3'
2779 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2782 cls.create_pg_interfaces(range(3))
2783 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2784 cls.ip6_interfaces.append(cls.pg_interfaces[2])
2785 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2787 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2789 cls.pg0.generate_remote_hosts(2)
2791 for i in cls.ip6_interfaces:
2794 i.configure_ipv6_neighbors()
2796 for i in cls.ip4_interfaces:
2802 super(TestNAT64, cls).tearDownClass()
2805 def test_pool(self):
2806 """ Add/delete address to NAT64 pool """
2807 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2809 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2811 addresses = self.vapi.nat64_pool_addr_dump()
2812 self.assertEqual(len(addresses), 1)
2813 self.assertEqual(addresses[0].address, nat_addr)
2815 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2817 addresses = self.vapi.nat64_pool_addr_dump()
2818 self.assertEqual(len(addresses), 0)
2820 def test_interface(self):
2821 """ Enable/disable NAT64 feature on the interface """
2822 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2823 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2825 interfaces = self.vapi.nat64_interface_dump()
2826 self.assertEqual(len(interfaces), 2)
2829 for intf in interfaces:
2830 if intf.sw_if_index == self.pg0.sw_if_index:
2831 self.assertEqual(intf.is_inside, 1)
2833 elif intf.sw_if_index == self.pg1.sw_if_index:
2834 self.assertEqual(intf.is_inside, 0)
2836 self.assertTrue(pg0_found)
2837 self.assertTrue(pg1_found)
2839 features = self.vapi.cli("show interface features pg0")
2840 self.assertNotEqual(features.find('nat64-in2out'), -1)
2841 features = self.vapi.cli("show interface features pg1")
2842 self.assertNotEqual(features.find('nat64-out2in'), -1)
2844 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2845 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2847 interfaces = self.vapi.nat64_interface_dump()
2848 self.assertEqual(len(interfaces), 0)
2850 def test_static_bib(self):
2851 """ Add/delete static BIB entry """
2852 in_addr = socket.inet_pton(socket.AF_INET6,
2853 '2001:db8:85a3::8a2e:370:7334')
2854 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2857 proto = IP_PROTOS.tcp
2859 self.vapi.nat64_add_del_static_bib(in_addr,
2864 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2869 self.assertEqual(bibe.i_addr, in_addr)
2870 self.assertEqual(bibe.o_addr, out_addr)
2871 self.assertEqual(bibe.i_port, in_port)
2872 self.assertEqual(bibe.o_port, out_port)
2873 self.assertEqual(static_bib_num, 1)
2875 self.vapi.nat64_add_del_static_bib(in_addr,
2881 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2886 self.assertEqual(static_bib_num, 0)
2888 def test_set_timeouts(self):
2889 """ Set NAT64 timeouts """
2890 # verify default values
2891 timeouts = self.vapi.nat64_get_timeouts()
2892 self.assertEqual(timeouts.udp, 300)
2893 self.assertEqual(timeouts.icmp, 60)
2894 self.assertEqual(timeouts.tcp_trans, 240)
2895 self.assertEqual(timeouts.tcp_est, 7440)
2896 self.assertEqual(timeouts.tcp_incoming_syn, 6)
2898 # set and verify custom values
2899 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2900 tcp_est=7450, tcp_incoming_syn=10)
2901 timeouts = self.vapi.nat64_get_timeouts()
2902 self.assertEqual(timeouts.udp, 200)
2903 self.assertEqual(timeouts.icmp, 30)
2904 self.assertEqual(timeouts.tcp_trans, 250)
2905 self.assertEqual(timeouts.tcp_est, 7450)
2906 self.assertEqual(timeouts.tcp_incoming_syn, 10)
2908 def test_dynamic(self):
2909 """ NAT64 dynamic translation test """
2910 self.tcp_port_in = 6303
2911 self.udp_port_in = 6304
2912 self.icmp_id_in = 6305
2914 ses_num_start = self.nat64_get_ses_num()
2916 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2918 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2919 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2922 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2923 self.pg0.add_stream(pkts)
2924 self.pg_enable_capture(self.pg_interfaces)
2926 capture = self.pg1.get_capture(len(pkts))
2927 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2928 dst_ip=self.pg1.remote_ip4)
2931 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2932 self.pg1.add_stream(pkts)
2933 self.pg_enable_capture(self.pg_interfaces)
2935 capture = self.pg0.get_capture(len(pkts))
2936 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2937 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2940 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2941 self.pg0.add_stream(pkts)
2942 self.pg_enable_capture(self.pg_interfaces)
2944 capture = self.pg1.get_capture(len(pkts))
2945 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2946 dst_ip=self.pg1.remote_ip4)
2949 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2950 self.pg1.add_stream(pkts)
2951 self.pg_enable_capture(self.pg_interfaces)
2953 capture = self.pg0.get_capture(len(pkts))
2954 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2956 ses_num_end = self.nat64_get_ses_num()
2958 self.assertEqual(ses_num_end - ses_num_start, 3)
2960 # tenant with specific VRF
2961 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
2962 self.vrf1_nat_addr_n,
2963 vrf_id=self.vrf1_id)
2964 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
2966 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
2967 self.pg2.add_stream(pkts)
2968 self.pg_enable_capture(self.pg_interfaces)
2970 capture = self.pg1.get_capture(len(pkts))
2971 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
2972 dst_ip=self.pg1.remote_ip4)
2974 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
2975 self.pg1.add_stream(pkts)
2976 self.pg_enable_capture(self.pg_interfaces)
2978 capture = self.pg2.get_capture(len(pkts))
2979 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
2981 def test_static(self):
2982 """ NAT64 static translation test """
2983 self.tcp_port_in = 60303
2984 self.udp_port_in = 60304
2985 self.icmp_id_in = 60305
2986 self.tcp_port_out = 60303
2987 self.udp_port_out = 60304
2988 self.icmp_id_out = 60305
2990 ses_num_start = self.nat64_get_ses_num()
2992 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2994 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2995 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2997 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3002 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3007 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3014 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3015 self.pg0.add_stream(pkts)
3016 self.pg_enable_capture(self.pg_interfaces)
3018 capture = self.pg1.get_capture(len(pkts))
3019 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3020 dst_ip=self.pg1.remote_ip4, same_port=True)
3023 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3024 self.pg1.add_stream(pkts)
3025 self.pg_enable_capture(self.pg_interfaces)
3027 capture = self.pg0.get_capture(len(pkts))
3028 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3029 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3031 ses_num_end = self.nat64_get_ses_num()
3033 self.assertEqual(ses_num_end - ses_num_start, 3)
3035 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3036 def test_session_timeout(self):
3037 """ NAT64 session timeout """
3038 self.icmp_id_in = 1234
3039 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3041 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3042 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3043 self.vapi.nat64_set_timeouts(icmp=5)
3045 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3046 self.pg0.add_stream(pkts)
3047 self.pg_enable_capture(self.pg_interfaces)
3049 capture = self.pg1.get_capture(len(pkts))
3051 ses_num_before_timeout = self.nat64_get_ses_num()
3055 # ICMP session after timeout
3056 ses_num_after_timeout = self.nat64_get_ses_num()
3057 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3059 def test_icmp_error(self):
3060 """ NAT64 ICMP Error message translation """
3061 self.tcp_port_in = 6303
3062 self.udp_port_in = 6304
3063 self.icmp_id_in = 6305
3065 ses_num_start = self.nat64_get_ses_num()
3067 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3069 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3070 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3072 # send some packets to create sessions
3073 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3074 self.pg0.add_stream(pkts)
3075 self.pg_enable_capture(self.pg_interfaces)
3077 capture_ip4 = self.pg1.get_capture(len(pkts))
3078 self.verify_capture_out(capture_ip4,
3079 nat_ip=self.nat_addr,
3080 dst_ip=self.pg1.remote_ip4)
3082 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3083 self.pg1.add_stream(pkts)
3084 self.pg_enable_capture(self.pg_interfaces)
3086 capture_ip6 = self.pg0.get_capture(len(pkts))
3087 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3088 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3089 self.pg0.remote_ip6)
3092 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3093 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3094 ICMPv6DestUnreach(code=1) /
3095 packet[IPv6] for packet in capture_ip6]
3096 self.pg0.add_stream(pkts)
3097 self.pg_enable_capture(self.pg_interfaces)
3099 capture = self.pg1.get_capture(len(pkts))
3100 for packet in capture:
3102 self.assertEqual(packet[IP].src, self.nat_addr)
3103 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3104 self.assertEqual(packet[ICMP].type, 3)
3105 self.assertEqual(packet[ICMP].code, 13)
3106 inner = packet[IPerror]
3107 self.assertEqual(inner.src, self.pg1.remote_ip4)
3108 self.assertEqual(inner.dst, self.nat_addr)
3109 self.check_icmp_checksum(packet)
3110 if inner.haslayer(TCPerror):
3111 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3112 elif inner.haslayer(UDPerror):
3113 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3115 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3117 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3121 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3122 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3123 ICMP(type=3, code=13) /
3124 packet[IP] for packet in capture_ip4]
3125 self.pg1.add_stream(pkts)
3126 self.pg_enable_capture(self.pg_interfaces)
3128 capture = self.pg0.get_capture(len(pkts))
3129 for packet in capture:
3131 self.assertEqual(packet[IPv6].src, ip.src)
3132 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3133 icmp = packet[ICMPv6DestUnreach]
3134 self.assertEqual(icmp.code, 1)
3135 inner = icmp[IPerror6]
3136 self.assertEqual(inner.src, self.pg0.remote_ip6)
3137 self.assertEqual(inner.dst, ip.src)
3138 self.check_icmpv6_checksum(packet)
3139 if inner.haslayer(TCPerror):
3140 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3141 elif inner.haslayer(UDPerror):
3142 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3144 self.assertEqual(inner[ICMPv6EchoRequest].id,
3147 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3150 def test_hairpinning(self):
3151 """ NAT64 hairpinning """
3153 client = self.pg0.remote_hosts[0]
3154 server = self.pg0.remote_hosts[1]
3155 server_tcp_in_port = 22
3156 server_tcp_out_port = 4022
3157 server_udp_in_port = 23
3158 server_udp_out_port = 4023
3159 client_tcp_in_port = 1234
3160 client_udp_in_port = 1235
3161 client_tcp_out_port = 0
3162 client_udp_out_port = 0
3163 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3164 nat_addr_ip6 = ip.src
3166 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3168 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3169 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3171 self.vapi.nat64_add_del_static_bib(server.ip6n,
3174 server_tcp_out_port,
3176 self.vapi.nat64_add_del_static_bib(server.ip6n,
3179 server_udp_out_port,
3184 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3185 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3186 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3188 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3189 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3190 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3192 self.pg0.add_stream(pkts)
3193 self.pg_enable_capture(self.pg_interfaces)
3195 capture = self.pg0.get_capture(len(pkts))
3196 for packet in capture:
3198 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3199 self.assertEqual(packet[IPv6].dst, server.ip6)
3200 if packet.haslayer(TCP):
3201 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3202 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3203 self.check_tcp_checksum(packet)
3204 client_tcp_out_port = packet[TCP].sport
3206 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3207 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3208 self.check_udp_checksum(packet)
3209 client_udp_out_port = packet[UDP].sport
3211 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3216 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3217 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3218 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3220 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3221 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3222 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3224 self.pg0.add_stream(pkts)
3225 self.pg_enable_capture(self.pg_interfaces)
3227 capture = self.pg0.get_capture(len(pkts))
3228 for packet in capture:
3230 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3231 self.assertEqual(packet[IPv6].dst, client.ip6)
3232 if packet.haslayer(TCP):
3233 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3234 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3235 self.check_tcp_checksum(packet)
3237 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3238 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3239 self.check_udp_checksum(packet)
3241 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3246 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3247 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3248 ICMPv6DestUnreach(code=1) /
3249 packet[IPv6] for packet in capture]
3250 self.pg0.add_stream(pkts)
3251 self.pg_enable_capture(self.pg_interfaces)
3253 capture = self.pg0.get_capture(len(pkts))
3254 for packet in capture:
3256 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3257 self.assertEqual(packet[IPv6].dst, server.ip6)
3258 icmp = packet[ICMPv6DestUnreach]
3259 self.assertEqual(icmp.code, 1)
3260 inner = icmp[IPerror6]
3261 self.assertEqual(inner.src, server.ip6)
3262 self.assertEqual(inner.dst, nat_addr_ip6)
3263 self.check_icmpv6_checksum(packet)
3264 if inner.haslayer(TCPerror):
3265 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3266 self.assertEqual(inner[TCPerror].dport,
3267 client_tcp_out_port)
3269 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3270 self.assertEqual(inner[UDPerror].dport,
3271 client_udp_out_port)
3273 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3276 def test_prefix(self):
3277 """ NAT64 Network-Specific Prefix """
3279 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3281 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3282 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3283 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3284 self.vrf1_nat_addr_n,
3285 vrf_id=self.vrf1_id)
3286 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3289 global_pref64 = "2001:db8::"
3290 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3291 global_pref64_len = 32
3292 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3294 prefix = self.vapi.nat64_prefix_dump()
3295 self.assertEqual(len(prefix), 1)
3296 self.assertEqual(prefix[0].prefix, global_pref64_n)
3297 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3298 self.assertEqual(prefix[0].vrf_id, 0)
3300 # Add tenant specific prefix
3301 vrf1_pref64 = "2001:db8:122:300::"
3302 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3303 vrf1_pref64_len = 56
3304 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3306 vrf_id=self.vrf1_id)
3307 prefix = self.vapi.nat64_prefix_dump()
3308 self.assertEqual(len(prefix), 2)
3311 pkts = self.create_stream_in_ip6(self.pg0,
3314 plen=global_pref64_len)
3315 self.pg0.add_stream(pkts)
3316 self.pg_enable_capture(self.pg_interfaces)
3318 capture = self.pg1.get_capture(len(pkts))
3319 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3320 dst_ip=self.pg1.remote_ip4)
3322 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3323 self.pg1.add_stream(pkts)
3324 self.pg_enable_capture(self.pg_interfaces)
3326 capture = self.pg0.get_capture(len(pkts))
3327 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3330 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3332 # Tenant specific prefix
3333 pkts = self.create_stream_in_ip6(self.pg2,
3336 plen=vrf1_pref64_len)
3337 self.pg2.add_stream(pkts)
3338 self.pg_enable_capture(self.pg_interfaces)
3340 capture = self.pg1.get_capture(len(pkts))
3341 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3342 dst_ip=self.pg1.remote_ip4)
3344 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3345 self.pg1.add_stream(pkts)
3346 self.pg_enable_capture(self.pg_interfaces)
3348 capture = self.pg2.get_capture(len(pkts))
3349 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3352 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3354 def _test_unknown_proto(self):
3355 """ NAT64 translate packet with unknown protocol """
3357 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3359 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3360 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3361 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3364 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3365 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3366 TCP(sport=self.tcp_port_in, dport=20))
3367 self.pg0.add_stream(p)
3368 self.pg_enable_capture(self.pg_interfaces)
3370 p = self.pg1.get_capture(1)
3372 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3373 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3375 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3376 TCP(sport=1234, dport=1234))
3377 self.pg0.add_stream(p)
3378 self.pg_enable_capture(self.pg_interfaces)
3380 p = self.pg1.get_capture(1)
3383 self.assertEqual(packet[IP].src, self.nat_addr)
3384 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3385 self.assertTrue(packet.haslayer(GRE))
3386 self.check_ip_checksum(packet)
3388 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3392 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3393 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3395 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3396 TCP(sport=1234, dport=1234))
3397 self.pg1.add_stream(p)
3398 self.pg_enable_capture(self.pg_interfaces)
3400 p = self.pg0.get_capture(1)
3403 self.assertEqual(packet[IPv6].src, remote_ip6)
3404 self.assertEqual(packet[IPv6].dst, self.pgi0.remote_ip6)
3405 self.assertTrue(packet.haslayer(GRE))
3407 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3410 def _test_hairpinning_unknown_proto(self):
3411 """ NAT64 translate packet with unknown protocol - hairpinning """
3413 client = self.pg0.remote_hosts[0]
3414 server = self.pg0.remote_hosts[1]
3415 server_tcp_in_port = 22
3416 server_tcp_out_port = 4022
3417 client_tcp_in_port = 1234
3418 client_udp_in_port = 1235
3419 nat_addr_ip6 = self.compose_ip6(self.nat_addr, '64:ff9b::', 96)
3421 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3423 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3424 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3426 self.vapi.nat64_add_del_static_bib(server.ip6n,
3429 server_tcp_out_port,
3433 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3434 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3435 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3436 self.pg0.add_stream(p)
3437 self.pg_enable_capture(self.pg_interfaces)
3439 p = self.pg0.get_capture(1)
3441 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3442 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3444 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3445 TCP(sport=1234, dport=1234))
3446 self.pg0.add_stream(p)
3447 self.pg_enable_capture(self.pg_interfaces)
3449 p = self.pg0.get_capture(1)
3452 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3453 self.assertEqual(packet[IPv6].dst, server.ip6)
3454 self.assertTrue(packet.haslayer(GRE))
3456 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3460 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3461 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3463 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3464 TCP(sport=1234, dport=1234))
3465 self.pg0.add_stream(p)
3466 self.pg_enable_capture(self.pg_interfaces)
3468 p = self.pg0.get_capture(1)
3471 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3472 self.assertEqual(packet[IPv6].dst, client.ip6)
3473 self.assertTrue(packet.haslayer(GRE))
3475 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3478 def nat64_get_ses_num(self):
3480 Return number of active NAT64 sessions.
3483 st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
3485 st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
3487 st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
3491 def clear_nat64(self):
3493 Clear NAT64 configuration.
3495 self.vapi.nat64_set_timeouts()
3497 interfaces = self.vapi.nat64_interface_dump()
3498 for intf in interfaces:
3499 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3503 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3506 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3514 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3517 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3525 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3528 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3536 adresses = self.vapi.nat64_pool_addr_dump()
3537 for addr in adresses:
3538 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3543 prefixes = self.vapi.nat64_prefix_dump()
3544 for prefix in prefixes:
3545 self.vapi.nat64_add_del_prefix(prefix.prefix,
3547 vrf_id=prefix.vrf_id,
3551 super(TestNAT64, self).tearDown()
3552 if not self.vpp_dead:
3553 self.logger.info(self.vapi.cli("show nat64 pool"))
3554 self.logger.info(self.vapi.cli("show nat64 interfaces"))
3555 self.logger.info(self.vapi.cli("show nat64 prefix"))
3556 self.logger.info(self.vapi.cli("show nat64 bib tcp"))
3557 self.logger.info(self.vapi.cli("show nat64 bib udp"))
3558 self.logger.info(self.vapi.cli("show nat64 bib icmp"))
3559 self.logger.info(self.vapi.cli("show nat64 session table tcp"))
3560 self.logger.info(self.vapi.cli("show nat64 session table udp"))
3561 self.logger.info(self.vapi.cli("show nat64 session table icmp"))
3564 if __name__ == '__main__':
3565 unittest.main(testRunner=VppTestRunner)