7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
20 class MethodHolder(VppTestCase):
21 """ SNAT create capture and verify method holder """
25 super(MethodHolder, cls).setUpClass()
28 super(MethodHolder, self).tearDown()
30 def check_ip_checksum(self, pkt):
32 Check IP checksum of the packet
34 :param pkt: Packet to check IP checksum
36 new = pkt.__class__(str(pkt))
38 new = new.__class__(str(new))
39 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
41 def check_tcp_checksum(self, pkt):
43 Check TCP checksum in IP packet
45 :param pkt: Packet to check TCP checksum
47 new = pkt.__class__(str(pkt))
49 new = new.__class__(str(new))
50 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
52 def check_udp_checksum(self, pkt):
54 Check UDP checksum in IP packet
56 :param pkt: Packet to check UDP checksum
58 new = pkt.__class__(str(pkt))
60 new = new.__class__(str(new))
61 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
63 def check_icmp_errror_embedded(self, pkt):
65 Check ICMP error embeded packet checksum
67 :param pkt: Packet to check ICMP error embeded packet checksum
69 if pkt.haslayer(IPerror):
70 new = pkt.__class__(str(pkt))
71 del new['IPerror'].chksum
72 new = new.__class__(str(new))
73 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
75 if pkt.haslayer(TCPerror):
76 new = pkt.__class__(str(pkt))
77 del new['TCPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
81 if pkt.haslayer(UDPerror):
82 if pkt['UDPerror'].chksum != 0:
83 new = pkt.__class__(str(pkt))
84 del new['UDPerror'].chksum
85 new = new.__class__(str(new))
86 self.assertEqual(new['UDPerror'].chksum,
87 pkt['UDPerror'].chksum)
89 if pkt.haslayer(ICMPerror):
90 del new['ICMPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
94 def check_icmp_checksum(self, pkt):
96 Check ICMP checksum in IPv4 packet
98 :param pkt: Packet to check ICMP checksum
100 new = pkt.__class__(str(pkt))
101 del new['ICMP'].chksum
102 new = new.__class__(str(new))
103 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104 if pkt.haslayer(IPerror):
105 self.check_icmp_errror_embedded(pkt)
107 def check_icmpv6_checksum(self, pkt):
109 Check ICMPv6 checksum in IPv4 packet
111 :param pkt: Packet to check ICMPv6 checksum
113 new = pkt.__class__(str(pkt))
114 if pkt.haslayer(ICMPv6DestUnreach):
115 del new['ICMPv6DestUnreach'].cksum
116 new = new.__class__(str(new))
117 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118 pkt['ICMPv6DestUnreach'].cksum)
119 self.check_icmp_errror_embedded(pkt)
120 if pkt.haslayer(ICMPv6EchoRequest):
121 del new['ICMPv6EchoRequest'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124 pkt['ICMPv6EchoRequest'].cksum)
125 if pkt.haslayer(ICMPv6EchoReply):
126 del new['ICMPv6EchoReply'].cksum
127 new = new.__class__(str(new))
128 self.assertEqual(new['ICMPv6EchoReply'].cksum,
129 pkt['ICMPv6EchoReply'].cksum)
131 def create_stream_in(self, in_if, out_if, ttl=64):
133 Create packet stream for inside network
135 :param in_if: Inside interface
136 :param out_if: Outside interface
137 :param ttl: TTL of generated packets
141 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143 TCP(sport=self.tcp_port_in, dport=20))
147 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149 UDP(sport=self.udp_port_in, dport=20))
153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155 ICMP(id=self.icmp_id_in, type='echo-request'))
160 def create_stream_in_ip6(self, in_if, out_if, hlim=64):
162 Create IPv6 packet stream for inside network
164 :param in_if: Inside interface
165 :param out_if: Outside interface
166 :param ttl: Hop Limit of generated packets
169 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
171 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
172 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
173 TCP(sport=self.tcp_port_in, dport=20))
177 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
178 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
179 UDP(sport=self.udp_port_in, dport=20))
183 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
184 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
185 ICMPv6EchoRequest(id=self.icmp_id_in))
190 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
192 Create packet stream for outside network
194 :param out_if: Outside interface
195 :param dst_ip: Destination IP address (Default use global SNAT address)
196 :param ttl: TTL of generated packets
199 dst_ip = self.snat_addr
202 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
203 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
204 TCP(dport=self.tcp_port_out, sport=20))
208 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
209 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
210 UDP(dport=self.udp_port_out, sport=20))
214 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
215 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
216 ICMP(id=self.icmp_id_out, type='echo-reply'))
221 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
222 packet_num=3, dst_ip=None):
224 Verify captured packets on outside network
226 :param capture: Captured packets
227 :param nat_ip: Translated IP address (Default use global SNAT address)
228 :param same_port: Sorce port number is not translated (Default False)
229 :param packet_num: Expected number of packets (Default 3)
230 :param dst_ip: Destination IP address (Default do not verify)
233 nat_ip = self.snat_addr
234 self.assertEqual(packet_num, len(capture))
235 for packet in capture:
237 self.check_ip_checksum(packet)
238 self.assertEqual(packet[IP].src, nat_ip)
239 if dst_ip is not None:
240 self.assertEqual(packet[IP].dst, dst_ip)
241 if packet.haslayer(TCP):
243 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
246 packet[TCP].sport, self.tcp_port_in)
247 self.tcp_port_out = packet[TCP].sport
248 self.check_tcp_checksum(packet)
249 elif packet.haslayer(UDP):
251 self.assertEqual(packet[UDP].sport, self.udp_port_in)
254 packet[UDP].sport, self.udp_port_in)
255 self.udp_port_out = packet[UDP].sport
258 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
260 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
261 self.icmp_id_out = packet[ICMP].id
262 self.check_icmp_checksum(packet)
264 self.logger.error(ppp("Unexpected or invalid packet "
265 "(outside network):", packet))
268 def verify_capture_in(self, capture, in_if, packet_num=3):
270 Verify captured packets on inside network
272 :param capture: Captured packets
273 :param in_if: Inside interface
274 :param packet_num: Expected number of packets (Default 3)
276 self.assertEqual(packet_num, len(capture))
277 for packet in capture:
279 self.check_ip_checksum(packet)
280 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
281 if packet.haslayer(TCP):
282 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
283 self.check_tcp_checksum(packet)
284 elif packet.haslayer(UDP):
285 self.assertEqual(packet[UDP].dport, self.udp_port_in)
287 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
288 self.check_icmp_checksum(packet)
290 self.logger.error(ppp("Unexpected or invalid packet "
291 "(inside network):", packet))
294 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
296 Verify captured IPv6 packets on inside network
298 :param capture: Captured packets
299 :param src_ip: Source IP
300 :param dst_ip: Destination IP address
301 :param packet_num: Expected number of packets (Default 3)
303 self.assertEqual(packet_num, len(capture))
304 for packet in capture:
306 self.assertEqual(packet[IPv6].src, src_ip)
307 self.assertEqual(packet[IPv6].dst, dst_ip)
308 if packet.haslayer(TCP):
309 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
310 self.check_tcp_checksum(packet)
311 elif packet.haslayer(UDP):
312 self.assertEqual(packet[UDP].dport, self.udp_port_in)
313 self.check_udp_checksum(packet)
315 self.assertEqual(packet[ICMPv6EchoReply].id,
317 self.check_icmpv6_checksum(packet)
319 self.logger.error(ppp("Unexpected or invalid packet "
320 "(inside network):", packet))
323 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
325 Verify captured packet that don't have to be translated
327 :param capture: Captured packets
328 :param ingress_if: Ingress interface
329 :param egress_if: Egress interface
331 for packet in capture:
333 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
334 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
335 if packet.haslayer(TCP):
336 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
337 elif packet.haslayer(UDP):
338 self.assertEqual(packet[UDP].sport, self.udp_port_in)
340 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
342 self.logger.error(ppp("Unexpected or invalid packet "
343 "(inside network):", packet))
346 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
347 packet_num=3, icmp_type=11):
349 Verify captured packets with ICMP errors on outside network
351 :param capture: Captured packets
352 :param src_ip: Translated IP address or IP address of VPP
353 (Default use global SNAT address)
354 :param packet_num: Expected number of packets (Default 3)
355 :param icmp_type: Type of error ICMP packet
356 we are expecting (Default 11)
359 src_ip = self.snat_addr
360 self.assertEqual(packet_num, len(capture))
361 for packet in capture:
363 self.assertEqual(packet[IP].src, src_ip)
364 self.assertTrue(packet.haslayer(ICMP))
366 self.assertEqual(icmp.type, icmp_type)
367 self.assertTrue(icmp.haslayer(IPerror))
368 inner_ip = icmp[IPerror]
369 if inner_ip.haslayer(TCPerror):
370 self.assertEqual(inner_ip[TCPerror].dport,
372 elif inner_ip.haslayer(UDPerror):
373 self.assertEqual(inner_ip[UDPerror].dport,
376 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
378 self.logger.error(ppp("Unexpected or invalid packet "
379 "(outside network):", packet))
382 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
385 Verify captured packets with ICMP errors on inside network
387 :param capture: Captured packets
388 :param in_if: Inside interface
389 :param packet_num: Expected number of packets (Default 3)
390 :param icmp_type: Type of error ICMP packet
391 we are expecting (Default 11)
393 self.assertEqual(packet_num, len(capture))
394 for packet in capture:
396 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
397 self.assertTrue(packet.haslayer(ICMP))
399 self.assertEqual(icmp.type, icmp_type)
400 self.assertTrue(icmp.haslayer(IPerror))
401 inner_ip = icmp[IPerror]
402 if inner_ip.haslayer(TCPerror):
403 self.assertEqual(inner_ip[TCPerror].sport,
405 elif inner_ip.haslayer(UDPerror):
406 self.assertEqual(inner_ip[UDPerror].sport,
409 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
411 self.logger.error(ppp("Unexpected or invalid packet "
412 "(inside network):", packet))
415 def verify_ipfix_nat44_ses(self, data):
417 Verify IPFIX NAT44 session create/delete event
419 :param data: Decoded IPFIX data records
421 nat44_ses_create_num = 0
422 nat44_ses_delete_num = 0
423 self.assertEqual(6, len(data))
426 self.assertIn(ord(record[230]), [4, 5])
427 if ord(record[230]) == 4:
428 nat44_ses_create_num += 1
430 nat44_ses_delete_num += 1
432 self.assertEqual(self.pg0.remote_ip4n, record[8])
433 # postNATSourceIPv4Address
434 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
437 self.assertEqual(struct.pack("!I", 0), record[234])
438 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
439 if IP_PROTOS.icmp == ord(record[4]):
440 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
441 self.assertEqual(struct.pack("!H", self.icmp_id_out),
443 elif IP_PROTOS.tcp == ord(record[4]):
444 self.assertEqual(struct.pack("!H", self.tcp_port_in),
446 self.assertEqual(struct.pack("!H", self.tcp_port_out),
448 elif IP_PROTOS.udp == ord(record[4]):
449 self.assertEqual(struct.pack("!H", self.udp_port_in),
451 self.assertEqual(struct.pack("!H", self.udp_port_out),
454 self.fail("Invalid protocol")
455 self.assertEqual(3, nat44_ses_create_num)
456 self.assertEqual(3, nat44_ses_delete_num)
458 def verify_ipfix_addr_exhausted(self, data):
460 Verify IPFIX NAT addresses event
462 :param data: Decoded IPFIX data records
464 self.assertEqual(1, len(data))
467 self.assertEqual(ord(record[230]), 3)
469 self.assertEqual(struct.pack("!I", 0), record[283])
472 class TestSNAT(MethodHolder):
473 """ SNAT Test Cases """
477 super(TestSNAT, cls).setUpClass()
480 cls.tcp_port_in = 6303
481 cls.tcp_port_out = 6303
482 cls.udp_port_in = 6304
483 cls.udp_port_out = 6304
484 cls.icmp_id_in = 6305
485 cls.icmp_id_out = 6305
486 cls.snat_addr = '10.0.0.3'
487 cls.ipfix_src_port = 4739
488 cls.ipfix_domain_id = 1
490 cls.create_pg_interfaces(range(9))
491 cls.interfaces = list(cls.pg_interfaces[0:4])
493 for i in cls.interfaces:
498 cls.pg0.generate_remote_hosts(3)
499 cls.pg0.configure_ipv4_neighbors()
501 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
503 cls.pg4._local_ip4 = "172.16.255.1"
504 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
505 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
506 cls.pg4.set_table_ip4(10)
507 cls.pg5._local_ip4 = "172.16.255.3"
508 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
509 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
510 cls.pg5.set_table_ip4(10)
511 cls.pg6._local_ip4 = "172.16.255.1"
512 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
513 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
514 cls.pg6.set_table_ip4(20)
515 for i in cls.overlapping_interfaces:
524 super(TestSNAT, cls).tearDownClass()
527 def clear_snat(self):
529 Clear SNAT configuration.
531 # I found no elegant way to do this
532 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
533 dst_address_length=32,
534 next_hop_address=self.pg7.remote_ip4n,
535 next_hop_sw_if_index=self.pg7.sw_if_index,
537 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
538 dst_address_length=32,
539 next_hop_address=self.pg8.remote_ip4n,
540 next_hop_sw_if_index=self.pg8.sw_if_index,
543 for intf in [self.pg7, self.pg8]:
544 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
546 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
551 if self.pg7.has_ip4_config:
552 self.pg7.unconfig_ip4()
554 interfaces = self.vapi.snat_interface_addr_dump()
555 for intf in interfaces:
556 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
558 self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
559 domain_id=self.ipfix_domain_id)
560 self.ipfix_src_port = 4739
561 self.ipfix_domain_id = 1
563 interfaces = self.vapi.snat_interface_dump()
564 for intf in interfaces:
565 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
569 static_mappings = self.vapi.snat_static_mapping_dump()
570 for sm in static_mappings:
571 self.vapi.snat_add_static_mapping(sm.local_ip_address,
572 sm.external_ip_address,
573 local_port=sm.local_port,
574 external_port=sm.external_port,
575 addr_only=sm.addr_only,
577 protocol=sm.protocol,
580 adresses = self.vapi.snat_address_dump()
581 for addr in adresses:
582 self.vapi.snat_add_address_range(addr.ip_address,
586 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
587 local_port=0, external_port=0, vrf_id=0,
588 is_add=1, external_sw_if_index=0xFFFFFFFF,
591 Add/delete S-NAT static mapping
593 :param local_ip: Local IP address
594 :param external_ip: External IP address
595 :param local_port: Local port number (Optional)
596 :param external_port: External port number (Optional)
597 :param vrf_id: VRF ID (Default 0)
598 :param is_add: 1 if add, 0 if delete (Default add)
599 :param external_sw_if_index: External interface instead of IP address
600 :param proto: IP protocol (Mandatory if port specified)
603 if local_port and external_port:
605 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
606 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
607 self.vapi.snat_add_static_mapping(
610 external_sw_if_index,
618 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
620 Add/delete S-NAT address
622 :param ip: IP address
623 :param is_add: 1 if add, 0 if delete (Default add)
625 snat_addr = socket.inet_pton(socket.AF_INET, ip)
626 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
629 def test_dynamic(self):
630 """ SNAT dynamic translation test """
632 self.snat_add_address(self.snat_addr)
633 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
634 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
638 pkts = self.create_stream_in(self.pg0, self.pg1)
639 self.pg0.add_stream(pkts)
640 self.pg_enable_capture(self.pg_interfaces)
642 capture = self.pg1.get_capture(len(pkts))
643 self.verify_capture_out(capture)
646 pkts = self.create_stream_out(self.pg1)
647 self.pg1.add_stream(pkts)
648 self.pg_enable_capture(self.pg_interfaces)
650 capture = self.pg0.get_capture(len(pkts))
651 self.verify_capture_in(capture, self.pg0)
653 def test_dynamic_icmp_errors_in2out_ttl_1(self):
654 """ SNAT handling of client packets with TTL=1 """
656 self.snat_add_address(self.snat_addr)
657 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
658 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
661 # Client side - generate traffic
662 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
663 self.pg0.add_stream(pkts)
664 self.pg_enable_capture(self.pg_interfaces)
667 # Client side - verify ICMP type 11 packets
668 capture = self.pg0.get_capture(len(pkts))
669 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
671 def test_dynamic_icmp_errors_out2in_ttl_1(self):
672 """ SNAT handling of server packets with TTL=1 """
674 self.snat_add_address(self.snat_addr)
675 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
676 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
679 # Client side - create sessions
680 pkts = self.create_stream_in(self.pg0, self.pg1)
681 self.pg0.add_stream(pkts)
682 self.pg_enable_capture(self.pg_interfaces)
685 # Server side - generate traffic
686 capture = self.pg1.get_capture(len(pkts))
687 self.verify_capture_out(capture)
688 pkts = self.create_stream_out(self.pg1, ttl=1)
689 self.pg1.add_stream(pkts)
690 self.pg_enable_capture(self.pg_interfaces)
693 # Server side - verify ICMP type 11 packets
694 capture = self.pg1.get_capture(len(pkts))
695 self.verify_capture_out_with_icmp_errors(capture,
696 src_ip=self.pg1.local_ip4)
698 def test_dynamic_icmp_errors_in2out_ttl_2(self):
699 """ SNAT handling of error responses to client packets with TTL=2 """
701 self.snat_add_address(self.snat_addr)
702 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
703 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
706 # Client side - generate traffic
707 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
708 self.pg0.add_stream(pkts)
709 self.pg_enable_capture(self.pg_interfaces)
712 # Server side - simulate ICMP type 11 response
713 capture = self.pg1.get_capture(len(pkts))
714 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
715 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
716 ICMP(type=11) / packet[IP] for packet in capture]
717 self.pg1.add_stream(pkts)
718 self.pg_enable_capture(self.pg_interfaces)
721 # Client side - verify ICMP type 11 packets
722 capture = self.pg0.get_capture(len(pkts))
723 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
725 def test_dynamic_icmp_errors_out2in_ttl_2(self):
726 """ SNAT handling of error responses to server packets with TTL=2 """
728 self.snat_add_address(self.snat_addr)
729 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
730 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
733 # Client side - create sessions
734 pkts = self.create_stream_in(self.pg0, self.pg1)
735 self.pg0.add_stream(pkts)
736 self.pg_enable_capture(self.pg_interfaces)
739 # Server side - generate traffic
740 capture = self.pg1.get_capture(len(pkts))
741 self.verify_capture_out(capture)
742 pkts = self.create_stream_out(self.pg1, ttl=2)
743 self.pg1.add_stream(pkts)
744 self.pg_enable_capture(self.pg_interfaces)
747 # Client side - simulate ICMP type 11 response
748 capture = self.pg0.get_capture(len(pkts))
749 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
750 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
751 ICMP(type=11) / packet[IP] for packet in capture]
752 self.pg0.add_stream(pkts)
753 self.pg_enable_capture(self.pg_interfaces)
756 # Server side - verify ICMP type 11 packets
757 capture = self.pg1.get_capture(len(pkts))
758 self.verify_capture_out_with_icmp_errors(capture)
760 def test_ping_out_interface_from_outside(self):
761 """ Ping SNAT out interface from outside network """
763 self.snat_add_address(self.snat_addr)
764 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
765 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
768 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
769 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
770 ICMP(id=self.icmp_id_out, type='echo-request'))
772 self.pg1.add_stream(pkts)
773 self.pg_enable_capture(self.pg_interfaces)
775 capture = self.pg1.get_capture(len(pkts))
776 self.assertEqual(1, len(capture))
779 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
780 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
781 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
782 self.assertEqual(packet[ICMP].type, 0) # echo reply
784 self.logger.error(ppp("Unexpected or invalid packet "
785 "(outside network):", packet))
788 def test_ping_internal_host_from_outside(self):
789 """ Ping internal host from outside network """
791 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
792 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
793 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
797 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
798 IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
799 ICMP(id=self.icmp_id_out, type='echo-request'))
800 self.pg1.add_stream(pkt)
801 self.pg_enable_capture(self.pg_interfaces)
803 capture = self.pg0.get_capture(1)
804 self.verify_capture_in(capture, self.pg0, packet_num=1)
805 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
808 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
809 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
810 ICMP(id=self.icmp_id_in, type='echo-reply'))
811 self.pg0.add_stream(pkt)
812 self.pg_enable_capture(self.pg_interfaces)
814 capture = self.pg1.get_capture(1)
815 self.verify_capture_out(capture, same_port=True, packet_num=1)
816 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
818 def test_static_in(self):
819 """ SNAT 1:1 NAT initialized from inside network """
822 self.tcp_port_out = 6303
823 self.udp_port_out = 6304
824 self.icmp_id_out = 6305
826 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
827 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
828 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
832 pkts = self.create_stream_in(self.pg0, self.pg1)
833 self.pg0.add_stream(pkts)
834 self.pg_enable_capture(self.pg_interfaces)
836 capture = self.pg1.get_capture(len(pkts))
837 self.verify_capture_out(capture, nat_ip, True)
840 pkts = self.create_stream_out(self.pg1, nat_ip)
841 self.pg1.add_stream(pkts)
842 self.pg_enable_capture(self.pg_interfaces)
844 capture = self.pg0.get_capture(len(pkts))
845 self.verify_capture_in(capture, self.pg0)
847 def test_static_out(self):
848 """ SNAT 1:1 NAT initialized from outside network """
851 self.tcp_port_out = 6303
852 self.udp_port_out = 6304
853 self.icmp_id_out = 6305
855 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
856 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
857 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
861 pkts = self.create_stream_out(self.pg1, nat_ip)
862 self.pg1.add_stream(pkts)
863 self.pg_enable_capture(self.pg_interfaces)
865 capture = self.pg0.get_capture(len(pkts))
866 self.verify_capture_in(capture, self.pg0)
869 pkts = self.create_stream_in(self.pg0, self.pg1)
870 self.pg0.add_stream(pkts)
871 self.pg_enable_capture(self.pg_interfaces)
873 capture = self.pg1.get_capture(len(pkts))
874 self.verify_capture_out(capture, nat_ip, True)
876 def test_static_with_port_in(self):
877 """ SNAT 1:1 NAT with port initialized from inside network """
879 self.tcp_port_out = 3606
880 self.udp_port_out = 3607
881 self.icmp_id_out = 3608
883 self.snat_add_address(self.snat_addr)
884 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
885 self.tcp_port_in, self.tcp_port_out,
887 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
888 self.udp_port_in, self.udp_port_out,
890 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
891 self.icmp_id_in, self.icmp_id_out,
892 proto=IP_PROTOS.icmp)
893 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
894 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
898 pkts = self.create_stream_in(self.pg0, self.pg1)
899 self.pg0.add_stream(pkts)
900 self.pg_enable_capture(self.pg_interfaces)
902 capture = self.pg1.get_capture(len(pkts))
903 self.verify_capture_out(capture)
906 pkts = self.create_stream_out(self.pg1)
907 self.pg1.add_stream(pkts)
908 self.pg_enable_capture(self.pg_interfaces)
910 capture = self.pg0.get_capture(len(pkts))
911 self.verify_capture_in(capture, self.pg0)
913 def test_static_with_port_out(self):
914 """ SNAT 1:1 NAT with port initialized from outside network """
916 self.tcp_port_out = 30606
917 self.udp_port_out = 30607
918 self.icmp_id_out = 30608
920 self.snat_add_address(self.snat_addr)
921 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
922 self.tcp_port_in, self.tcp_port_out,
924 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
925 self.udp_port_in, self.udp_port_out,
927 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
928 self.icmp_id_in, self.icmp_id_out,
929 proto=IP_PROTOS.icmp)
930 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
931 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
935 pkts = self.create_stream_out(self.pg1)
936 self.pg1.add_stream(pkts)
937 self.pg_enable_capture(self.pg_interfaces)
939 capture = self.pg0.get_capture(len(pkts))
940 self.verify_capture_in(capture, self.pg0)
943 pkts = self.create_stream_in(self.pg0, self.pg1)
944 self.pg0.add_stream(pkts)
945 self.pg_enable_capture(self.pg_interfaces)
947 capture = self.pg1.get_capture(len(pkts))
948 self.verify_capture_out(capture)
950 def test_static_vrf_aware(self):
951 """ SNAT 1:1 NAT VRF awareness """
953 nat_ip1 = "10.0.0.30"
954 nat_ip2 = "10.0.0.40"
955 self.tcp_port_out = 6303
956 self.udp_port_out = 6304
957 self.icmp_id_out = 6305
959 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
961 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
963 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
965 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
966 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
968 # inside interface VRF match SNAT static mapping VRF
969 pkts = self.create_stream_in(self.pg4, self.pg3)
970 self.pg4.add_stream(pkts)
971 self.pg_enable_capture(self.pg_interfaces)
973 capture = self.pg3.get_capture(len(pkts))
974 self.verify_capture_out(capture, nat_ip1, True)
976 # inside interface VRF don't match SNAT static mapping VRF (packets
978 pkts = self.create_stream_in(self.pg0, self.pg3)
979 self.pg0.add_stream(pkts)
980 self.pg_enable_capture(self.pg_interfaces)
982 self.pg3.assert_nothing_captured()
984 def test_multiple_inside_interfaces(self):
985 """ SNAT multiple inside interfaces (non-overlapping address space) """
987 self.snat_add_address(self.snat_addr)
988 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
989 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
990 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
993 # between two S-NAT inside interfaces (no translation)
994 pkts = self.create_stream_in(self.pg0, self.pg1)
995 self.pg0.add_stream(pkts)
996 self.pg_enable_capture(self.pg_interfaces)
998 capture = self.pg1.get_capture(len(pkts))
999 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1001 # from S-NAT inside to interface without S-NAT feature (no translation)
1002 pkts = self.create_stream_in(self.pg0, self.pg2)
1003 self.pg0.add_stream(pkts)
1004 self.pg_enable_capture(self.pg_interfaces)
1006 capture = self.pg2.get_capture(len(pkts))
1007 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1009 # in2out 1st interface
1010 pkts = self.create_stream_in(self.pg0, self.pg3)
1011 self.pg0.add_stream(pkts)
1012 self.pg_enable_capture(self.pg_interfaces)
1014 capture = self.pg3.get_capture(len(pkts))
1015 self.verify_capture_out(capture)
1017 # out2in 1st interface
1018 pkts = self.create_stream_out(self.pg3)
1019 self.pg3.add_stream(pkts)
1020 self.pg_enable_capture(self.pg_interfaces)
1022 capture = self.pg0.get_capture(len(pkts))
1023 self.verify_capture_in(capture, self.pg0)
1025 # in2out 2nd interface
1026 pkts = self.create_stream_in(self.pg1, self.pg3)
1027 self.pg1.add_stream(pkts)
1028 self.pg_enable_capture(self.pg_interfaces)
1030 capture = self.pg3.get_capture(len(pkts))
1031 self.verify_capture_out(capture)
1033 # out2in 2nd interface
1034 pkts = self.create_stream_out(self.pg3)
1035 self.pg3.add_stream(pkts)
1036 self.pg_enable_capture(self.pg_interfaces)
1038 capture = self.pg1.get_capture(len(pkts))
1039 self.verify_capture_in(capture, self.pg1)
1041 def test_inside_overlapping_interfaces(self):
1042 """ SNAT multiple inside interfaces with overlapping address space """
1044 static_nat_ip = "10.0.0.10"
1045 self.snat_add_address(self.snat_addr)
1046 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1048 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1049 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
1050 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
1051 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1054 # between S-NAT inside interfaces with same VRF (no translation)
1055 pkts = self.create_stream_in(self.pg4, self.pg5)
1056 self.pg4.add_stream(pkts)
1057 self.pg_enable_capture(self.pg_interfaces)
1059 capture = self.pg5.get_capture(len(pkts))
1060 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1062 # between S-NAT inside interfaces with different VRF (hairpinning)
1063 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1064 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1065 TCP(sport=1234, dport=5678))
1066 self.pg4.add_stream(p)
1067 self.pg_enable_capture(self.pg_interfaces)
1069 capture = self.pg6.get_capture(1)
1074 self.assertEqual(ip.src, self.snat_addr)
1075 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1076 self.assertNotEqual(tcp.sport, 1234)
1077 self.assertEqual(tcp.dport, 5678)
1079 self.logger.error(ppp("Unexpected or invalid packet:", p))
1082 # in2out 1st interface
1083 pkts = self.create_stream_in(self.pg4, self.pg3)
1084 self.pg4.add_stream(pkts)
1085 self.pg_enable_capture(self.pg_interfaces)
1087 capture = self.pg3.get_capture(len(pkts))
1088 self.verify_capture_out(capture)
1090 # out2in 1st interface
1091 pkts = self.create_stream_out(self.pg3)
1092 self.pg3.add_stream(pkts)
1093 self.pg_enable_capture(self.pg_interfaces)
1095 capture = self.pg4.get_capture(len(pkts))
1096 self.verify_capture_in(capture, self.pg4)
1098 # in2out 2nd interface
1099 pkts = self.create_stream_in(self.pg5, self.pg3)
1100 self.pg5.add_stream(pkts)
1101 self.pg_enable_capture(self.pg_interfaces)
1103 capture = self.pg3.get_capture(len(pkts))
1104 self.verify_capture_out(capture)
1106 # out2in 2nd interface
1107 pkts = self.create_stream_out(self.pg3)
1108 self.pg3.add_stream(pkts)
1109 self.pg_enable_capture(self.pg_interfaces)
1111 capture = self.pg5.get_capture(len(pkts))
1112 self.verify_capture_in(capture, self.pg5)
1115 addresses = self.vapi.snat_address_dump()
1116 self.assertEqual(len(addresses), 1)
1117 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1118 self.assertEqual(len(sessions), 3)
1119 for session in sessions:
1120 self.assertFalse(session.is_static)
1121 self.assertEqual(session.inside_ip_address[0:4],
1122 self.pg5.remote_ip4n)
1123 self.assertEqual(session.outside_ip_address,
1124 addresses[0].ip_address)
1125 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1126 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1127 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1128 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1129 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1130 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1131 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1132 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1133 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1135 # in2out 3rd interface
1136 pkts = self.create_stream_in(self.pg6, self.pg3)
1137 self.pg6.add_stream(pkts)
1138 self.pg_enable_capture(self.pg_interfaces)
1140 capture = self.pg3.get_capture(len(pkts))
1141 self.verify_capture_out(capture, static_nat_ip, True)
1143 # out2in 3rd interface
1144 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1145 self.pg3.add_stream(pkts)
1146 self.pg_enable_capture(self.pg_interfaces)
1148 capture = self.pg6.get_capture(len(pkts))
1149 self.verify_capture_in(capture, self.pg6)
1151 # general user and session dump verifications
1152 users = self.vapi.snat_user_dump()
1153 self.assertTrue(len(users) >= 3)
1154 addresses = self.vapi.snat_address_dump()
1155 self.assertEqual(len(addresses), 1)
1157 sessions = self.vapi.snat_user_session_dump(user.ip_address,
1159 for session in sessions:
1160 self.assertEqual(user.ip_address, session.inside_ip_address)
1161 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1162 self.assertTrue(session.protocol in
1163 [IP_PROTOS.tcp, IP_PROTOS.udp,
1167 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1168 self.assertTrue(len(sessions) >= 4)
1169 for session in sessions:
1170 self.assertFalse(session.is_static)
1171 self.assertEqual(session.inside_ip_address[0:4],
1172 self.pg4.remote_ip4n)
1173 self.assertEqual(session.outside_ip_address,
1174 addresses[0].ip_address)
1177 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1178 self.assertTrue(len(sessions) >= 3)
1179 for session in sessions:
1180 self.assertTrue(session.is_static)
1181 self.assertEqual(session.inside_ip_address[0:4],
1182 self.pg6.remote_ip4n)
1183 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1184 map(int, static_nat_ip.split('.')))
1185 self.assertTrue(session.inside_port in
1186 [self.tcp_port_in, self.udp_port_in,
1189 def test_hairpinning(self):
1190 """ SNAT hairpinning - 1:1 NAT with port"""
1192 host = self.pg0.remote_hosts[0]
1193 server = self.pg0.remote_hosts[1]
1196 server_in_port = 5678
1197 server_out_port = 8765
1199 self.snat_add_address(self.snat_addr)
1200 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1201 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1203 # add static mapping for server
1204 self.snat_add_static_mapping(server.ip4, self.snat_addr,
1205 server_in_port, server_out_port,
1206 proto=IP_PROTOS.tcp)
1208 # send packet from host to server
1209 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1210 IP(src=host.ip4, dst=self.snat_addr) /
1211 TCP(sport=host_in_port, dport=server_out_port))
1212 self.pg0.add_stream(p)
1213 self.pg_enable_capture(self.pg_interfaces)
1215 capture = self.pg0.get_capture(1)
1220 self.assertEqual(ip.src, self.snat_addr)
1221 self.assertEqual(ip.dst, server.ip4)
1222 self.assertNotEqual(tcp.sport, host_in_port)
1223 self.assertEqual(tcp.dport, server_in_port)
1224 self.check_tcp_checksum(p)
1225 host_out_port = tcp.sport
1227 self.logger.error(ppp("Unexpected or invalid packet:", p))
1230 # send reply from server to host
1231 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1232 IP(src=server.ip4, dst=self.snat_addr) /
1233 TCP(sport=server_in_port, dport=host_out_port))
1234 self.pg0.add_stream(p)
1235 self.pg_enable_capture(self.pg_interfaces)
1237 capture = self.pg0.get_capture(1)
1242 self.assertEqual(ip.src, self.snat_addr)
1243 self.assertEqual(ip.dst, host.ip4)
1244 self.assertEqual(tcp.sport, server_out_port)
1245 self.assertEqual(tcp.dport, host_in_port)
1246 self.check_tcp_checksum(p)
1248 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1251 def test_hairpinning2(self):
1252 """ SNAT hairpinning - 1:1 NAT"""
1254 server1_nat_ip = "10.0.0.10"
1255 server2_nat_ip = "10.0.0.11"
1256 host = self.pg0.remote_hosts[0]
1257 server1 = self.pg0.remote_hosts[1]
1258 server2 = self.pg0.remote_hosts[2]
1259 server_tcp_port = 22
1260 server_udp_port = 20
1262 self.snat_add_address(self.snat_addr)
1263 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1264 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1267 # add static mapping for servers
1268 self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1269 self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1273 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1274 IP(src=host.ip4, dst=server1_nat_ip) /
1275 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1277 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1278 IP(src=host.ip4, dst=server1_nat_ip) /
1279 UDP(sport=self.udp_port_in, dport=server_udp_port))
1281 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1282 IP(src=host.ip4, dst=server1_nat_ip) /
1283 ICMP(id=self.icmp_id_in, type='echo-request'))
1285 self.pg0.add_stream(pkts)
1286 self.pg_enable_capture(self.pg_interfaces)
1288 capture = self.pg0.get_capture(len(pkts))
1289 for packet in capture:
1291 self.assertEqual(packet[IP].src, self.snat_addr)
1292 self.assertEqual(packet[IP].dst, server1.ip4)
1293 if packet.haslayer(TCP):
1294 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1295 self.assertEqual(packet[TCP].dport, server_tcp_port)
1296 self.tcp_port_out = packet[TCP].sport
1297 self.check_tcp_checksum(packet)
1298 elif packet.haslayer(UDP):
1299 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1300 self.assertEqual(packet[UDP].dport, server_udp_port)
1301 self.udp_port_out = packet[UDP].sport
1303 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1304 self.icmp_id_out = packet[ICMP].id
1306 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1311 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1312 IP(src=server1.ip4, dst=self.snat_addr) /
1313 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1315 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1316 IP(src=server1.ip4, dst=self.snat_addr) /
1317 UDP(sport=server_udp_port, dport=self.udp_port_out))
1319 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1320 IP(src=server1.ip4, dst=self.snat_addr) /
1321 ICMP(id=self.icmp_id_out, type='echo-reply'))
1323 self.pg0.add_stream(pkts)
1324 self.pg_enable_capture(self.pg_interfaces)
1326 capture = self.pg0.get_capture(len(pkts))
1327 for packet in capture:
1329 self.assertEqual(packet[IP].src, server1_nat_ip)
1330 self.assertEqual(packet[IP].dst, host.ip4)
1331 if packet.haslayer(TCP):
1332 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1333 self.assertEqual(packet[TCP].sport, server_tcp_port)
1334 self.check_tcp_checksum(packet)
1335 elif packet.haslayer(UDP):
1336 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1337 self.assertEqual(packet[UDP].sport, server_udp_port)
1339 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1341 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1344 # server2 to server1
1346 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1347 IP(src=server2.ip4, dst=server1_nat_ip) /
1348 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1350 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1351 IP(src=server2.ip4, dst=server1_nat_ip) /
1352 UDP(sport=self.udp_port_in, dport=server_udp_port))
1354 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1355 IP(src=server2.ip4, dst=server1_nat_ip) /
1356 ICMP(id=self.icmp_id_in, type='echo-request'))
1358 self.pg0.add_stream(pkts)
1359 self.pg_enable_capture(self.pg_interfaces)
1361 capture = self.pg0.get_capture(len(pkts))
1362 for packet in capture:
1364 self.assertEqual(packet[IP].src, server2_nat_ip)
1365 self.assertEqual(packet[IP].dst, server1.ip4)
1366 if packet.haslayer(TCP):
1367 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1368 self.assertEqual(packet[TCP].dport, server_tcp_port)
1369 self.tcp_port_out = packet[TCP].sport
1370 self.check_tcp_checksum(packet)
1371 elif packet.haslayer(UDP):
1372 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1373 self.assertEqual(packet[UDP].dport, server_udp_port)
1374 self.udp_port_out = packet[UDP].sport
1376 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1377 self.icmp_id_out = packet[ICMP].id
1379 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1382 # server1 to server2
1384 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1385 IP(src=server1.ip4, dst=server2_nat_ip) /
1386 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1388 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1389 IP(src=server1.ip4, dst=server2_nat_ip) /
1390 UDP(sport=server_udp_port, dport=self.udp_port_out))
1392 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1393 IP(src=server1.ip4, dst=server2_nat_ip) /
1394 ICMP(id=self.icmp_id_out, type='echo-reply'))
1396 self.pg0.add_stream(pkts)
1397 self.pg_enable_capture(self.pg_interfaces)
1399 capture = self.pg0.get_capture(len(pkts))
1400 for packet in capture:
1402 self.assertEqual(packet[IP].src, server1_nat_ip)
1403 self.assertEqual(packet[IP].dst, server2.ip4)
1404 if packet.haslayer(TCP):
1405 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1406 self.assertEqual(packet[TCP].sport, server_tcp_port)
1407 self.check_tcp_checksum(packet)
1408 elif packet.haslayer(UDP):
1409 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1410 self.assertEqual(packet[UDP].sport, server_udp_port)
1412 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1414 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1417 def test_max_translations_per_user(self):
1418 """ MAX translations per user - recycle the least recently used """
1420 self.snat_add_address(self.snat_addr)
1421 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1422 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1425 # get maximum number of translations per user
1426 snat_config = self.vapi.snat_show_config()
1428 # send more than maximum number of translations per user packets
1429 pkts_num = snat_config.max_translations_per_user + 5
1431 for port in range(0, pkts_num):
1432 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1433 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1434 TCP(sport=1025 + port))
1436 self.pg0.add_stream(pkts)
1437 self.pg_enable_capture(self.pg_interfaces)
1440 # verify number of translated packet
1441 self.pg1.get_capture(pkts_num)
1443 def test_interface_addr(self):
1444 """ Acquire SNAT addresses from interface """
1445 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1447 # no address in NAT pool
1448 adresses = self.vapi.snat_address_dump()
1449 self.assertEqual(0, len(adresses))
1451 # configure interface address and check NAT address pool
1452 self.pg7.config_ip4()
1453 adresses = self.vapi.snat_address_dump()
1454 self.assertEqual(1, len(adresses))
1455 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1457 # remove interface address and check NAT address pool
1458 self.pg7.unconfig_ip4()
1459 adresses = self.vapi.snat_address_dump()
1460 self.assertEqual(0, len(adresses))
1462 def test_interface_addr_static_mapping(self):
1463 """ Static mapping with addresses from interface """
1464 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1465 self.snat_add_static_mapping('1.2.3.4',
1466 external_sw_if_index=self.pg7.sw_if_index)
1468 # static mappings with external interface
1469 static_mappings = self.vapi.snat_static_mapping_dump()
1470 self.assertEqual(1, len(static_mappings))
1471 self.assertEqual(self.pg7.sw_if_index,
1472 static_mappings[0].external_sw_if_index)
1474 # configure interface address and check static mappings
1475 self.pg7.config_ip4()
1476 static_mappings = self.vapi.snat_static_mapping_dump()
1477 self.assertEqual(1, len(static_mappings))
1478 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1479 self.pg7.local_ip4n)
1480 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1482 # remove interface address and check static mappings
1483 self.pg7.unconfig_ip4()
1484 static_mappings = self.vapi.snat_static_mapping_dump()
1485 self.assertEqual(0, len(static_mappings))
1487 def test_ipfix_nat44_sess(self):
1488 """ S-NAT IPFIX logging NAT44 session created/delted """
1489 self.ipfix_domain_id = 10
1490 self.ipfix_src_port = 20202
1491 colector_port = 30303
1492 bind_layers(UDP, IPFIX, dport=30303)
1493 self.snat_add_address(self.snat_addr)
1494 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1495 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1497 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1498 src_address=self.pg3.local_ip4n,
1500 template_interval=10,
1501 collector_port=colector_port)
1502 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1503 src_port=self.ipfix_src_port)
1505 pkts = self.create_stream_in(self.pg0, self.pg1)
1506 self.pg0.add_stream(pkts)
1507 self.pg_enable_capture(self.pg_interfaces)
1509 capture = self.pg1.get_capture(len(pkts))
1510 self.verify_capture_out(capture)
1511 self.snat_add_address(self.snat_addr, is_add=0)
1512 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1513 capture = self.pg3.get_capture(3)
1514 ipfix = IPFIXDecoder()
1515 # first load template
1517 self.assertTrue(p.haslayer(IPFIX))
1518 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1519 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1520 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1521 self.assertEqual(p[UDP].dport, colector_port)
1522 self.assertEqual(p[IPFIX].observationDomainID,
1523 self.ipfix_domain_id)
1524 if p.haslayer(Template):
1525 ipfix.add_template(p.getlayer(Template))
1526 # verify events in data set
1528 if p.haslayer(Data):
1529 data = ipfix.decode_data_set(p.getlayer(Set))
1530 self.verify_ipfix_nat44_ses(data)
1532 def test_ipfix_addr_exhausted(self):
1533 """ S-NAT IPFIX logging NAT addresses exhausted """
1534 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1535 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1537 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1538 src_address=self.pg3.local_ip4n,
1540 template_interval=10)
1541 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1542 src_port=self.ipfix_src_port)
1544 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1545 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1547 self.pg0.add_stream(p)
1548 self.pg_enable_capture(self.pg_interfaces)
1550 capture = self.pg1.get_capture(0)
1551 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1552 capture = self.pg3.get_capture(3)
1553 ipfix = IPFIXDecoder()
1554 # first load template
1556 self.assertTrue(p.haslayer(IPFIX))
1557 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1558 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1559 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1560 self.assertEqual(p[UDP].dport, 4739)
1561 self.assertEqual(p[IPFIX].observationDomainID,
1562 self.ipfix_domain_id)
1563 if p.haslayer(Template):
1564 ipfix.add_template(p.getlayer(Template))
1565 # verify events in data set
1567 if p.haslayer(Data):
1568 data = ipfix.decode_data_set(p.getlayer(Set))
1569 self.verify_ipfix_addr_exhausted(data)
1571 def test_pool_addr_fib(self):
1572 """ S-NAT add pool addresses to FIB """
1573 static_addr = '10.0.0.10'
1574 self.snat_add_address(self.snat_addr)
1575 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1576 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1578 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1581 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1582 ARP(op=ARP.who_has, pdst=self.snat_addr,
1583 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1584 self.pg1.add_stream(p)
1585 self.pg_enable_capture(self.pg_interfaces)
1587 capture = self.pg1.get_capture(1)
1588 self.assertTrue(capture[0].haslayer(ARP))
1589 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1592 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1593 ARP(op=ARP.who_has, pdst=static_addr,
1594 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1595 self.pg1.add_stream(p)
1596 self.pg_enable_capture(self.pg_interfaces)
1598 capture = self.pg1.get_capture(1)
1599 self.assertTrue(capture[0].haslayer(ARP))
1600 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1602 # send ARP to non-SNAT interface
1603 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1604 ARP(op=ARP.who_has, pdst=self.snat_addr,
1605 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1606 self.pg2.add_stream(p)
1607 self.pg_enable_capture(self.pg_interfaces)
1609 capture = self.pg1.get_capture(0)
1611 # remove addresses and verify
1612 self.snat_add_address(self.snat_addr, is_add=0)
1613 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1616 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1617 ARP(op=ARP.who_has, pdst=self.snat_addr,
1618 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1619 self.pg1.add_stream(p)
1620 self.pg_enable_capture(self.pg_interfaces)
1622 capture = self.pg1.get_capture(0)
1624 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1625 ARP(op=ARP.who_has, pdst=static_addr,
1626 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1627 self.pg1.add_stream(p)
1628 self.pg_enable_capture(self.pg_interfaces)
1630 capture = self.pg1.get_capture(0)
1632 def test_vrf_mode(self):
1633 """ S-NAT tenant VRF aware address pool mode """
1637 nat_ip1 = "10.0.0.10"
1638 nat_ip2 = "10.0.0.11"
1640 self.pg0.unconfig_ip4()
1641 self.pg1.unconfig_ip4()
1642 self.pg0.set_table_ip4(vrf_id1)
1643 self.pg1.set_table_ip4(vrf_id2)
1644 self.pg0.config_ip4()
1645 self.pg1.config_ip4()
1647 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1648 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1649 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1650 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1651 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1655 pkts = self.create_stream_in(self.pg0, self.pg2)
1656 self.pg0.add_stream(pkts)
1657 self.pg_enable_capture(self.pg_interfaces)
1659 capture = self.pg2.get_capture(len(pkts))
1660 self.verify_capture_out(capture, nat_ip1)
1663 pkts = self.create_stream_in(self.pg1, self.pg2)
1664 self.pg1.add_stream(pkts)
1665 self.pg_enable_capture(self.pg_interfaces)
1667 capture = self.pg2.get_capture(len(pkts))
1668 self.verify_capture_out(capture, nat_ip2)
1670 def test_vrf_feature_independent(self):
1671 """ S-NAT tenant VRF independent address pool mode """
1673 nat_ip1 = "10.0.0.10"
1674 nat_ip2 = "10.0.0.11"
1676 self.snat_add_address(nat_ip1)
1677 self.snat_add_address(nat_ip2)
1678 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1679 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1680 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1684 pkts = self.create_stream_in(self.pg0, self.pg2)
1685 self.pg0.add_stream(pkts)
1686 self.pg_enable_capture(self.pg_interfaces)
1688 capture = self.pg2.get_capture(len(pkts))
1689 self.verify_capture_out(capture, nat_ip1)
1692 pkts = self.create_stream_in(self.pg1, self.pg2)
1693 self.pg1.add_stream(pkts)
1694 self.pg_enable_capture(self.pg_interfaces)
1696 capture = self.pg2.get_capture(len(pkts))
1697 self.verify_capture_out(capture, nat_ip1)
1699 def test_dynamic_ipless_interfaces(self):
1700 """ SNAT interfaces without configured ip dynamic map """
1702 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1703 self.pg7.remote_mac,
1704 self.pg7.remote_ip4n,
1706 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1707 self.pg8.remote_mac,
1708 self.pg8.remote_ip4n,
1711 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1712 dst_address_length=32,
1713 next_hop_address=self.pg7.remote_ip4n,
1714 next_hop_sw_if_index=self.pg7.sw_if_index)
1715 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1716 dst_address_length=32,
1717 next_hop_address=self.pg8.remote_ip4n,
1718 next_hop_sw_if_index=self.pg8.sw_if_index)
1720 self.snat_add_address(self.snat_addr)
1721 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1722 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1726 pkts = self.create_stream_in(self.pg7, self.pg8)
1727 self.pg7.add_stream(pkts)
1728 self.pg_enable_capture(self.pg_interfaces)
1730 capture = self.pg8.get_capture(len(pkts))
1731 self.verify_capture_out(capture)
1734 pkts = self.create_stream_out(self.pg8, self.snat_addr)
1735 self.pg8.add_stream(pkts)
1736 self.pg_enable_capture(self.pg_interfaces)
1738 capture = self.pg7.get_capture(len(pkts))
1739 self.verify_capture_in(capture, self.pg7)
1741 def test_static_ipless_interfaces(self):
1742 """ SNAT 1:1 NAT interfaces without configured ip """
1744 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1745 self.pg7.remote_mac,
1746 self.pg7.remote_ip4n,
1748 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1749 self.pg8.remote_mac,
1750 self.pg8.remote_ip4n,
1753 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1754 dst_address_length=32,
1755 next_hop_address=self.pg7.remote_ip4n,
1756 next_hop_sw_if_index=self.pg7.sw_if_index)
1757 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1758 dst_address_length=32,
1759 next_hop_address=self.pg8.remote_ip4n,
1760 next_hop_sw_if_index=self.pg8.sw_if_index)
1762 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1763 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1764 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1768 pkts = self.create_stream_out(self.pg8)
1769 self.pg8.add_stream(pkts)
1770 self.pg_enable_capture(self.pg_interfaces)
1772 capture = self.pg7.get_capture(len(pkts))
1773 self.verify_capture_in(capture, self.pg7)
1776 pkts = self.create_stream_in(self.pg7, self.pg8)
1777 self.pg7.add_stream(pkts)
1778 self.pg_enable_capture(self.pg_interfaces)
1780 capture = self.pg8.get_capture(len(pkts))
1781 self.verify_capture_out(capture, self.snat_addr, True)
1783 def test_static_with_port_ipless_interfaces(self):
1784 """ SNAT 1:1 NAT with port interfaces without configured ip """
1786 self.tcp_port_out = 30606
1787 self.udp_port_out = 30607
1788 self.icmp_id_out = 30608
1790 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1791 self.pg7.remote_mac,
1792 self.pg7.remote_ip4n,
1794 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1795 self.pg8.remote_mac,
1796 self.pg8.remote_ip4n,
1799 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1800 dst_address_length=32,
1801 next_hop_address=self.pg7.remote_ip4n,
1802 next_hop_sw_if_index=self.pg7.sw_if_index)
1803 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1804 dst_address_length=32,
1805 next_hop_address=self.pg8.remote_ip4n,
1806 next_hop_sw_if_index=self.pg8.sw_if_index)
1808 self.snat_add_address(self.snat_addr)
1809 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1810 self.tcp_port_in, self.tcp_port_out,
1811 proto=IP_PROTOS.tcp)
1812 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1813 self.udp_port_in, self.udp_port_out,
1814 proto=IP_PROTOS.udp)
1815 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1816 self.icmp_id_in, self.icmp_id_out,
1817 proto=IP_PROTOS.icmp)
1818 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1819 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1823 pkts = self.create_stream_out(self.pg8)
1824 self.pg8.add_stream(pkts)
1825 self.pg_enable_capture(self.pg_interfaces)
1827 capture = self.pg7.get_capture(len(pkts))
1828 self.verify_capture_in(capture, self.pg7)
1831 pkts = self.create_stream_in(self.pg7, self.pg8)
1832 self.pg7.add_stream(pkts)
1833 self.pg_enable_capture(self.pg_interfaces)
1835 capture = self.pg8.get_capture(len(pkts))
1836 self.verify_capture_out(capture)
1839 super(TestSNAT, self).tearDown()
1840 if not self.vpp_dead:
1841 self.logger.info(self.vapi.cli("show snat verbose"))
1845 class TestDeterministicNAT(MethodHolder):
1846 """ Deterministic NAT Test Cases """
1849 def setUpConstants(cls):
1850 super(TestDeterministicNAT, cls).setUpConstants()
1851 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1854 def setUpClass(cls):
1855 super(TestDeterministicNAT, cls).setUpClass()
1858 cls.tcp_port_in = 6303
1859 cls.tcp_external_port = 6303
1860 cls.udp_port_in = 6304
1861 cls.udp_external_port = 6304
1862 cls.icmp_id_in = 6305
1863 cls.snat_addr = '10.0.0.3'
1865 cls.create_pg_interfaces(range(3))
1866 cls.interfaces = list(cls.pg_interfaces)
1868 for i in cls.interfaces:
1873 cls.pg0.generate_remote_hosts(2)
1874 cls.pg0.configure_ipv4_neighbors()
1877 super(TestDeterministicNAT, cls).tearDownClass()
1880 def create_stream_in(self, in_if, out_if, ttl=64):
1882 Create packet stream for inside network
1884 :param in_if: Inside interface
1885 :param out_if: Outside interface
1886 :param ttl: TTL of generated packets
1890 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1891 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1892 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1896 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1897 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1898 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1902 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1903 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1904 ICMP(id=self.icmp_id_in, type='echo-request'))
1909 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1911 Create packet stream for outside network
1913 :param out_if: Outside interface
1914 :param dst_ip: Destination IP address (Default use global SNAT address)
1915 :param ttl: TTL of generated packets
1918 dst_ip = self.snat_addr
1921 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1922 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1923 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1927 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1928 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1929 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1933 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1934 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1935 ICMP(id=self.icmp_external_id, type='echo-reply'))
1940 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1942 Verify captured packets on outside network
1944 :param capture: Captured packets
1945 :param nat_ip: Translated IP address (Default use global SNAT address)
1946 :param same_port: Sorce port number is not translated (Default False)
1947 :param packet_num: Expected number of packets (Default 3)
1950 nat_ip = self.snat_addr
1951 self.assertEqual(packet_num, len(capture))
1952 for packet in capture:
1954 self.assertEqual(packet[IP].src, nat_ip)
1955 if packet.haslayer(TCP):
1956 self.tcp_port_out = packet[TCP].sport
1957 elif packet.haslayer(UDP):
1958 self.udp_port_out = packet[UDP].sport
1960 self.icmp_external_id = packet[ICMP].id
1962 self.logger.error(ppp("Unexpected or invalid packet "
1963 "(outside network):", packet))
1966 def initiate_tcp_session(self, in_if, out_if):
1968 Initiates TCP session
1970 :param in_if: Inside interface
1971 :param out_if: Outside interface
1974 # SYN packet in->out
1975 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1976 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1977 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1980 self.pg_enable_capture(self.pg_interfaces)
1982 capture = out_if.get_capture(1)
1984 self.tcp_port_out = p[TCP].sport
1986 # SYN + ACK packet out->in
1987 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1988 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1989 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1991 out_if.add_stream(p)
1992 self.pg_enable_capture(self.pg_interfaces)
1994 in_if.get_capture(1)
1996 # ACK packet in->out
1997 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1998 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1999 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2002 self.pg_enable_capture(self.pg_interfaces)
2004 out_if.get_capture(1)
2007 self.logger.error("TCP 3 way handshake failed")
2010 def verify_ipfix_max_entries_per_user(self, data):
2012 Verify IPFIX maximum entries per user exceeded event
2014 :param data: Decoded IPFIX data records
2016 self.assertEqual(1, len(data))
2019 self.assertEqual(ord(record[230]), 13)
2020 # natQuotaExceededEvent
2021 self.assertEqual('\x03\x00\x00\x00', record[466])
2023 self.assertEqual(self.pg0.remote_ip4n, record[8])
2025 def test_deterministic_mode(self):
2026 """ S-NAT run deterministic mode """
2027 in_addr = '172.16.255.0'
2028 out_addr = '172.17.255.50'
2029 in_addr_t = '172.16.255.20'
2030 in_addr_n = socket.inet_aton(in_addr)
2031 out_addr_n = socket.inet_aton(out_addr)
2032 in_addr_t_n = socket.inet_aton(in_addr_t)
2036 snat_config = self.vapi.snat_show_config()
2037 self.assertEqual(1, snat_config.deterministic)
2039 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
2041 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
2042 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2043 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
2044 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2046 deterministic_mappings = self.vapi.snat_det_map_dump()
2047 self.assertEqual(len(deterministic_mappings), 1)
2048 dsm = deterministic_mappings[0]
2049 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2050 self.assertEqual(in_plen, dsm.in_plen)
2051 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2052 self.assertEqual(out_plen, dsm.out_plen)
2055 deterministic_mappings = self.vapi.snat_det_map_dump()
2056 self.assertEqual(len(deterministic_mappings), 0)
2058 def test_set_timeouts(self):
2059 """ Set deterministic NAT timeouts """
2060 timeouts_before = self.vapi.snat_det_get_timeouts()
2062 self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
2063 timeouts_before.tcp_established + 10,
2064 timeouts_before.tcp_transitory + 10,
2065 timeouts_before.icmp + 10)
2067 timeouts_after = self.vapi.snat_det_get_timeouts()
2069 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2070 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2071 self.assertNotEqual(timeouts_before.tcp_established,
2072 timeouts_after.tcp_established)
2073 self.assertNotEqual(timeouts_before.tcp_transitory,
2074 timeouts_after.tcp_transitory)
2076 def test_det_in(self):
2077 """ CGNAT translation test (TCP, UDP, ICMP) """
2079 nat_ip = "10.0.0.10"
2081 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2083 socket.inet_aton(nat_ip),
2085 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2086 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2090 pkts = self.create_stream_in(self.pg0, self.pg1)
2091 self.pg0.add_stream(pkts)
2092 self.pg_enable_capture(self.pg_interfaces)
2094 capture = self.pg1.get_capture(len(pkts))
2095 self.verify_capture_out(capture, nat_ip)
2098 pkts = self.create_stream_out(self.pg1, nat_ip)
2099 self.pg1.add_stream(pkts)
2100 self.pg_enable_capture(self.pg_interfaces)
2102 capture = self.pg0.get_capture(len(pkts))
2103 self.verify_capture_in(capture, self.pg0)
2106 sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2107 self.assertEqual(len(sessions), 3)
2111 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2112 self.assertEqual(s.in_port, self.tcp_port_in)
2113 self.assertEqual(s.out_port, self.tcp_port_out)
2114 self.assertEqual(s.ext_port, self.tcp_external_port)
2118 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2119 self.assertEqual(s.in_port, self.udp_port_in)
2120 self.assertEqual(s.out_port, self.udp_port_out)
2121 self.assertEqual(s.ext_port, self.udp_external_port)
2125 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2126 self.assertEqual(s.in_port, self.icmp_id_in)
2127 self.assertEqual(s.out_port, self.icmp_external_id)
2129 def test_multiple_users(self):
2130 """ CGNAT multiple users """
2132 nat_ip = "10.0.0.10"
2134 external_port = 6303
2136 host0 = self.pg0.remote_hosts[0]
2137 host1 = self.pg0.remote_hosts[1]
2139 self.vapi.snat_add_det_map(host0.ip4n,
2141 socket.inet_aton(nat_ip),
2143 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2144 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2148 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2149 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2150 TCP(sport=port_in, dport=external_port))
2151 self.pg0.add_stream(p)
2152 self.pg_enable_capture(self.pg_interfaces)
2154 capture = self.pg1.get_capture(1)
2159 self.assertEqual(ip.src, nat_ip)
2160 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2161 self.assertEqual(tcp.dport, external_port)
2162 port_out0 = tcp.sport
2164 self.logger.error(ppp("Unexpected or invalid packet:", p))
2168 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2169 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2170 TCP(sport=port_in, dport=external_port))
2171 self.pg0.add_stream(p)
2172 self.pg_enable_capture(self.pg_interfaces)
2174 capture = self.pg1.get_capture(1)
2179 self.assertEqual(ip.src, nat_ip)
2180 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2181 self.assertEqual(tcp.dport, external_port)
2182 port_out1 = tcp.sport
2184 self.logger.error(ppp("Unexpected or invalid packet:", p))
2187 dms = self.vapi.snat_det_map_dump()
2188 self.assertEqual(1, len(dms))
2189 self.assertEqual(2, dms[0].ses_num)
2192 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2193 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2194 TCP(sport=external_port, dport=port_out0))
2195 self.pg1.add_stream(p)
2196 self.pg_enable_capture(self.pg_interfaces)
2198 capture = self.pg0.get_capture(1)
2203 self.assertEqual(ip.src, self.pg1.remote_ip4)
2204 self.assertEqual(ip.dst, host0.ip4)
2205 self.assertEqual(tcp.dport, port_in)
2206 self.assertEqual(tcp.sport, external_port)
2208 self.logger.error(ppp("Unexpected or invalid packet:", p))
2212 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2213 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2214 TCP(sport=external_port, dport=port_out1))
2215 self.pg1.add_stream(p)
2216 self.pg_enable_capture(self.pg_interfaces)
2218 capture = self.pg0.get_capture(1)
2223 self.assertEqual(ip.src, self.pg1.remote_ip4)
2224 self.assertEqual(ip.dst, host1.ip4)
2225 self.assertEqual(tcp.dport, port_in)
2226 self.assertEqual(tcp.sport, external_port)
2228 self.logger.error(ppp("Unexpected or invalid packet", p))
2231 # session close api test
2232 self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2234 self.pg1.remote_ip4n,
2236 dms = self.vapi.snat_det_map_dump()
2237 self.assertEqual(dms[0].ses_num, 1)
2239 self.vapi.snat_det_close_session_in(host0.ip4n,
2241 self.pg1.remote_ip4n,
2243 dms = self.vapi.snat_det_map_dump()
2244 self.assertEqual(dms[0].ses_num, 0)
2246 def test_tcp_session_close_detection_in(self):
2247 """ CGNAT TCP session close initiated from inside network """
2248 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2250 socket.inet_aton(self.snat_addr),
2252 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2253 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2256 self.initiate_tcp_session(self.pg0, self.pg1)
2258 # close the session from inside
2260 # FIN packet in -> out
2261 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2262 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2263 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2265 self.pg0.add_stream(p)
2266 self.pg_enable_capture(self.pg_interfaces)
2268 self.pg1.get_capture(1)
2272 # ACK packet out -> in
2273 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2274 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2275 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2279 # FIN packet out -> in
2280 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2281 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2282 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2286 self.pg1.add_stream(pkts)
2287 self.pg_enable_capture(self.pg_interfaces)
2289 self.pg0.get_capture(2)
2291 # ACK packet in -> out
2292 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2293 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2294 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2296 self.pg0.add_stream(p)
2297 self.pg_enable_capture(self.pg_interfaces)
2299 self.pg1.get_capture(1)
2301 # Check if snat closed the session
2302 dms = self.vapi.snat_det_map_dump()
2303 self.assertEqual(0, dms[0].ses_num)
2305 self.logger.error("TCP session termination failed")
2308 def test_tcp_session_close_detection_out(self):
2309 """ CGNAT TCP session close initiated from outside network """
2310 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2312 socket.inet_aton(self.snat_addr),
2314 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2315 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2318 self.initiate_tcp_session(self.pg0, self.pg1)
2320 # close the session from outside
2322 # FIN packet out -> in
2323 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2324 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2325 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2327 self.pg1.add_stream(p)
2328 self.pg_enable_capture(self.pg_interfaces)
2330 self.pg0.get_capture(1)
2334 # ACK packet in -> out
2335 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2336 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2337 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2341 # ACK packet in -> out
2342 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2343 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2344 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2348 self.pg0.add_stream(pkts)
2349 self.pg_enable_capture(self.pg_interfaces)
2351 self.pg1.get_capture(2)
2353 # ACK packet out -> in
2354 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2355 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2356 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2358 self.pg1.add_stream(p)
2359 self.pg_enable_capture(self.pg_interfaces)
2361 self.pg0.get_capture(1)
2363 # Check if snat closed the session
2364 dms = self.vapi.snat_det_map_dump()
2365 self.assertEqual(0, dms[0].ses_num)
2367 self.logger.error("TCP session termination failed")
2370 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2371 def test_session_timeout(self):
2372 """ CGNAT session timeouts """
2373 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2375 socket.inet_aton(self.snat_addr),
2377 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2378 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2381 self.initiate_tcp_session(self.pg0, self.pg1)
2382 self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2383 pkts = self.create_stream_in(self.pg0, self.pg1)
2384 self.pg0.add_stream(pkts)
2385 self.pg_enable_capture(self.pg_interfaces)
2387 capture = self.pg1.get_capture(len(pkts))
2390 dms = self.vapi.snat_det_map_dump()
2391 self.assertEqual(0, dms[0].ses_num)
2393 def test_session_limit_per_user(self):
2394 """ CGNAT maximum 1000 sessions per user should be created """
2395 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2397 socket.inet_aton(self.snat_addr),
2399 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2400 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2402 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2403 src_address=self.pg2.local_ip4n,
2405 template_interval=10)
2406 self.vapi.snat_ipfix()
2409 for port in range(1025, 2025):
2410 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2411 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2412 UDP(sport=port, dport=port))
2415 self.pg0.add_stream(pkts)
2416 self.pg_enable_capture(self.pg_interfaces)
2418 capture = self.pg1.get_capture(len(pkts))
2420 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2421 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2422 UDP(sport=3001, dport=3002))
2423 self.pg0.add_stream(p)
2424 self.pg_enable_capture(self.pg_interfaces)
2426 capture = self.pg1.assert_nothing_captured()
2428 # verify ICMP error packet
2429 capture = self.pg0.get_capture(1)
2431 self.assertTrue(p.haslayer(ICMP))
2433 self.assertEqual(icmp.type, 3)
2434 self.assertEqual(icmp.code, 1)
2435 self.assertTrue(icmp.haslayer(IPerror))
2436 inner_ip = icmp[IPerror]
2437 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2438 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2440 dms = self.vapi.snat_det_map_dump()
2442 self.assertEqual(1000, dms[0].ses_num)
2444 # verify IPFIX logging
2445 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2446 capture = self.pg2.get_capture(2)
2447 ipfix = IPFIXDecoder()
2448 # first load template
2450 self.assertTrue(p.haslayer(IPFIX))
2451 if p.haslayer(Template):
2452 ipfix.add_template(p.getlayer(Template))
2453 # verify events in data set
2455 if p.haslayer(Data):
2456 data = ipfix.decode_data_set(p.getlayer(Set))
2457 self.verify_ipfix_max_entries_per_user(data)
2459 def clear_snat(self):
2461 Clear SNAT configuration.
2463 self.vapi.snat_ipfix(enable=0)
2464 self.vapi.snat_det_set_timeouts()
2465 deterministic_mappings = self.vapi.snat_det_map_dump()
2466 for dsm in deterministic_mappings:
2467 self.vapi.snat_add_det_map(dsm.in_addr,
2473 interfaces = self.vapi.snat_interface_dump()
2474 for intf in interfaces:
2475 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2480 super(TestDeterministicNAT, self).tearDown()
2481 if not self.vpp_dead:
2482 self.logger.info(self.vapi.cli("show snat detail"))
2486 class TestNAT64(MethodHolder):
2487 """ NAT64 Test Cases """
2490 def setUpClass(cls):
2491 super(TestNAT64, cls).setUpClass()
2494 cls.tcp_port_in = 6303
2495 cls.tcp_port_out = 6303
2496 cls.udp_port_in = 6304
2497 cls.udp_port_out = 6304
2498 cls.icmp_id_in = 6305
2499 cls.icmp_id_out = 6305
2500 cls.nat_addr = '10.0.0.3'
2501 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2503 cls.vrf1_nat_addr = '10.0.10.3'
2504 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2507 cls.create_pg_interfaces(range(3))
2508 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2509 cls.ip6_interfaces.append(cls.pg_interfaces[2])
2510 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2512 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2514 cls.pg0.generate_remote_hosts(2)
2516 for i in cls.ip6_interfaces:
2519 i.configure_ipv6_neighbors()
2521 for i in cls.ip4_interfaces:
2527 super(TestNAT64, cls).tearDownClass()
2530 def test_pool(self):
2531 """ Add/delete address to NAT64 pool """
2532 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2534 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2536 addresses = self.vapi.nat64_pool_addr_dump()
2537 self.assertEqual(len(addresses), 1)
2538 self.assertEqual(addresses[0].address, nat_addr)
2540 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2542 addresses = self.vapi.nat64_pool_addr_dump()
2543 self.assertEqual(len(addresses), 0)
2545 def test_interface(self):
2546 """ Enable/disable NAT64 feature on the interface """
2547 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2548 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2550 interfaces = self.vapi.nat64_interface_dump()
2551 self.assertEqual(len(interfaces), 2)
2554 for intf in interfaces:
2555 if intf.sw_if_index == self.pg0.sw_if_index:
2556 self.assertEqual(intf.is_inside, 1)
2558 elif intf.sw_if_index == self.pg1.sw_if_index:
2559 self.assertEqual(intf.is_inside, 0)
2561 self.assertTrue(pg0_found)
2562 self.assertTrue(pg1_found)
2564 features = self.vapi.cli("show interface features pg0")
2565 self.assertNotEqual(features.find('nat64-in2out'), -1)
2566 features = self.vapi.cli("show interface features pg1")
2567 self.assertNotEqual(features.find('nat64-out2in'), -1)
2569 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2570 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2572 interfaces = self.vapi.nat64_interface_dump()
2573 self.assertEqual(len(interfaces), 0)
2575 def test_static_bib(self):
2576 """ Add/delete static BIB entry """
2577 in_addr = socket.inet_pton(socket.AF_INET6,
2578 '2001:db8:85a3::8a2e:370:7334')
2579 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2582 proto = IP_PROTOS.tcp
2584 self.vapi.nat64_add_del_static_bib(in_addr,
2589 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2594 self.assertEqual(bibe.i_addr, in_addr)
2595 self.assertEqual(bibe.o_addr, out_addr)
2596 self.assertEqual(bibe.i_port, in_port)
2597 self.assertEqual(bibe.o_port, out_port)
2598 self.assertEqual(static_bib_num, 1)
2600 self.vapi.nat64_add_del_static_bib(in_addr,
2606 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2611 self.assertEqual(static_bib_num, 0)
2613 def test_set_timeouts(self):
2614 """ Set NAT64 timeouts """
2615 # verify default values
2616 timeouts = self.vapi.nat64_get_timeouts()
2617 self.assertEqual(timeouts.udp, 300)
2618 self.assertEqual(timeouts.icmp, 60)
2619 self.assertEqual(timeouts.tcp_trans, 240)
2620 self.assertEqual(timeouts.tcp_est, 7440)
2621 self.assertEqual(timeouts.tcp_incoming_syn, 6)
2623 # set and verify custom values
2624 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2625 tcp_est=7450, tcp_incoming_syn=10)
2626 timeouts = self.vapi.nat64_get_timeouts()
2627 self.assertEqual(timeouts.udp, 200)
2628 self.assertEqual(timeouts.icmp, 30)
2629 self.assertEqual(timeouts.tcp_trans, 250)
2630 self.assertEqual(timeouts.tcp_est, 7450)
2631 self.assertEqual(timeouts.tcp_incoming_syn, 10)
2633 def test_dynamic(self):
2634 """ NAT64 dynamic translation test """
2635 self.tcp_port_in = 6303
2636 self.udp_port_in = 6304
2637 self.icmp_id_in = 6305
2639 ses_num_start = self.nat64_get_ses_num()
2641 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2643 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2644 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2647 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2648 self.pg0.add_stream(pkts)
2649 self.pg_enable_capture(self.pg_interfaces)
2651 capture = self.pg1.get_capture(len(pkts))
2652 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2653 dst_ip=self.pg1.remote_ip4)
2656 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2657 self.pg1.add_stream(pkts)
2658 self.pg_enable_capture(self.pg_interfaces)
2660 capture = self.pg0.get_capture(len(pkts))
2661 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2662 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2665 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2666 self.pg0.add_stream(pkts)
2667 self.pg_enable_capture(self.pg_interfaces)
2669 capture = self.pg1.get_capture(len(pkts))
2670 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2671 dst_ip=self.pg1.remote_ip4)
2674 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2675 self.pg1.add_stream(pkts)
2676 self.pg_enable_capture(self.pg_interfaces)
2678 capture = self.pg0.get_capture(len(pkts))
2679 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2681 ses_num_end = self.nat64_get_ses_num()
2683 self.assertEqual(ses_num_end - ses_num_start, 3)
2685 # tenant with specific VRF
2686 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
2687 self.vrf1_nat_addr_n,
2688 vrf_id=self.vrf1_id)
2689 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
2691 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
2692 self.pg2.add_stream(pkts)
2693 self.pg_enable_capture(self.pg_interfaces)
2695 capture = self.pg1.get_capture(len(pkts))
2696 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
2697 dst_ip=self.pg1.remote_ip4)
2699 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
2700 self.pg1.add_stream(pkts)
2701 self.pg_enable_capture(self.pg_interfaces)
2703 capture = self.pg2.get_capture(len(pkts))
2704 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
2706 def test_static(self):
2707 """ NAT64 static translation test """
2708 self.tcp_port_in = 60303
2709 self.udp_port_in = 60304
2710 self.icmp_id_in = 60305
2711 self.tcp_port_out = 60303
2712 self.udp_port_out = 60304
2713 self.icmp_id_out = 60305
2715 ses_num_start = self.nat64_get_ses_num()
2717 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2719 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2720 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2722 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2727 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2732 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2739 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2740 self.pg0.add_stream(pkts)
2741 self.pg_enable_capture(self.pg_interfaces)
2743 capture = self.pg1.get_capture(len(pkts))
2744 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2745 dst_ip=self.pg1.remote_ip4, same_port=True)
2748 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2749 self.pg1.add_stream(pkts)
2750 self.pg_enable_capture(self.pg_interfaces)
2752 capture = self.pg0.get_capture(len(pkts))
2753 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2754 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2756 ses_num_end = self.nat64_get_ses_num()
2758 self.assertEqual(ses_num_end - ses_num_start, 3)
2760 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2761 def test_session_timeout(self):
2762 """ NAT64 session timeout """
2763 self.icmp_id_in = 1234
2764 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2766 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2767 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2768 self.vapi.nat64_set_timeouts(icmp=5)
2770 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2771 self.pg0.add_stream(pkts)
2772 self.pg_enable_capture(self.pg_interfaces)
2774 capture = self.pg1.get_capture(len(pkts))
2776 ses_num_before_timeout = self.nat64_get_ses_num()
2780 # ICMP session after timeout
2781 ses_num_after_timeout = self.nat64_get_ses_num()
2782 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2784 def test_icmp_error(self):
2785 """ NAT64 ICMP Error message translation """
2786 self.tcp_port_in = 6303
2787 self.udp_port_in = 6304
2788 self.icmp_id_in = 6305
2790 ses_num_start = self.nat64_get_ses_num()
2792 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2794 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2795 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2797 # send some packets to create sessions
2798 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2799 self.pg0.add_stream(pkts)
2800 self.pg_enable_capture(self.pg_interfaces)
2802 capture_ip4 = self.pg1.get_capture(len(pkts))
2803 self.verify_capture_out(capture_ip4,
2804 nat_ip=self.nat_addr,
2805 dst_ip=self.pg1.remote_ip4)
2807 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2808 self.pg1.add_stream(pkts)
2809 self.pg_enable_capture(self.pg_interfaces)
2811 capture_ip6 = self.pg0.get_capture(len(pkts))
2812 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2813 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
2814 self.pg0.remote_ip6)
2817 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2818 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
2819 ICMPv6DestUnreach(code=1) /
2820 packet[IPv6] for packet in capture_ip6]
2821 self.pg0.add_stream(pkts)
2822 self.pg_enable_capture(self.pg_interfaces)
2824 capture = self.pg1.get_capture(len(pkts))
2825 for packet in capture:
2827 self.assertEqual(packet[IP].src, self.nat_addr)
2828 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2829 self.assertEqual(packet[ICMP].type, 3)
2830 self.assertEqual(packet[ICMP].code, 13)
2831 inner = packet[IPerror]
2832 self.assertEqual(inner.src, self.pg1.remote_ip4)
2833 self.assertEqual(inner.dst, self.nat_addr)
2834 self.check_icmp_checksum(packet)
2835 if inner.haslayer(TCPerror):
2836 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
2837 elif inner.haslayer(UDPerror):
2838 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
2840 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
2842 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2846 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2847 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2848 ICMP(type=3, code=13) /
2849 packet[IP] for packet in capture_ip4]
2850 self.pg1.add_stream(pkts)
2851 self.pg_enable_capture(self.pg_interfaces)
2853 capture = self.pg0.get_capture(len(pkts))
2854 for packet in capture:
2856 self.assertEqual(packet[IPv6].src, ip.src)
2857 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
2858 icmp = packet[ICMPv6DestUnreach]
2859 self.assertEqual(icmp.code, 1)
2860 inner = icmp[IPerror6]
2861 self.assertEqual(inner.src, self.pg0.remote_ip6)
2862 self.assertEqual(inner.dst, ip.src)
2863 self.check_icmpv6_checksum(packet)
2864 if inner.haslayer(TCPerror):
2865 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
2866 elif inner.haslayer(UDPerror):
2867 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
2869 self.assertEqual(inner[ICMPv6EchoRequest].id,
2872 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2875 def test_hairpinning(self):
2876 """ NAT64 hairpinning """
2878 client = self.pg0.remote_hosts[0]
2879 server = self.pg0.remote_hosts[1]
2880 server_tcp_in_port = 22
2881 server_tcp_out_port = 4022
2882 server_udp_in_port = 23
2883 server_udp_out_port = 4023
2884 client_tcp_in_port = 1234
2885 client_udp_in_port = 1235
2886 client_tcp_out_port = 0
2887 client_udp_out_port = 0
2888 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
2889 nat_addr_ip6 = ip.src
2891 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2893 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2894 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2896 self.vapi.nat64_add_del_static_bib(server.ip6n,
2899 server_tcp_out_port,
2901 self.vapi.nat64_add_del_static_bib(server.ip6n,
2904 server_udp_out_port,
2909 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2910 IPv6(src=client.ip6, dst=nat_addr_ip6) /
2911 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
2913 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2914 IPv6(src=client.ip6, dst=nat_addr_ip6) /
2915 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
2917 self.pg0.add_stream(pkts)
2918 self.pg_enable_capture(self.pg_interfaces)
2920 capture = self.pg0.get_capture(len(pkts))
2921 for packet in capture:
2923 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
2924 self.assertEqual(packet[IPv6].dst, server.ip6)
2925 if packet.haslayer(TCP):
2926 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
2927 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
2928 self.check_tcp_checksum(packet)
2929 client_tcp_out_port = packet[TCP].sport
2931 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
2932 self.assertEqual(packet[UDP].dport, server_udp_in_port)
2933 self.check_udp_checksum(packet)
2934 client_udp_out_port = packet[UDP].sport
2936 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2941 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2942 IPv6(src=server.ip6, dst=nat_addr_ip6) /
2943 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
2945 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2946 IPv6(src=server.ip6, dst=nat_addr_ip6) /
2947 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
2949 self.pg0.add_stream(pkts)
2950 self.pg_enable_capture(self.pg_interfaces)
2952 capture = self.pg0.get_capture(len(pkts))
2953 for packet in capture:
2955 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
2956 self.assertEqual(packet[IPv6].dst, client.ip6)
2957 if packet.haslayer(TCP):
2958 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
2959 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
2960 self.check_tcp_checksum(packet)
2962 self.assertEqual(packet[UDP].sport, server_udp_out_port)
2963 self.assertEqual(packet[UDP].dport, client_udp_in_port)
2964 self.check_udp_checksum(packet)
2966 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2971 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2972 IPv6(src=client.ip6, dst=nat_addr_ip6) /
2973 ICMPv6DestUnreach(code=1) /
2974 packet[IPv6] for packet in capture]
2975 self.pg0.add_stream(pkts)
2976 self.pg_enable_capture(self.pg_interfaces)
2978 capture = self.pg0.get_capture(len(pkts))
2979 for packet in capture:
2981 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
2982 self.assertEqual(packet[IPv6].dst, server.ip6)
2983 icmp = packet[ICMPv6DestUnreach]
2984 self.assertEqual(icmp.code, 1)
2985 inner = icmp[IPerror6]
2986 self.assertEqual(inner.src, server.ip6)
2987 self.assertEqual(inner.dst, nat_addr_ip6)
2988 self.check_icmpv6_checksum(packet)
2989 if inner.haslayer(TCPerror):
2990 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
2991 self.assertEqual(inner[TCPerror].dport,
2992 client_tcp_out_port)
2994 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
2995 self.assertEqual(inner[UDPerror].dport,
2996 client_udp_out_port)
2998 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3001 def nat64_get_ses_num(self):
3003 Return number of active NAT64 sessions.
3006 st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
3008 st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
3010 st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
3014 def clear_nat64(self):
3016 Clear NAT64 configuration.
3018 self.vapi.nat64_set_timeouts()
3020 interfaces = self.vapi.nat64_interface_dump()
3021 for intf in interfaces:
3022 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3026 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3029 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3037 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3040 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3048 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3051 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3059 adresses = self.vapi.nat64_pool_addr_dump()
3060 for addr in adresses:
3061 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3067 super(TestNAT64, self).tearDown()
3068 if not self.vpp_dead:
3069 self.logger.info(self.vapi.cli("show nat64 pool"))
3070 self.logger.info(self.vapi.cli("show nat64 interfaces"))
3071 self.logger.info(self.vapi.cli("show nat64 bib tcp"))
3072 self.logger.info(self.vapi.cli("show nat64 bib udp"))
3073 self.logger.info(self.vapi.cli("show nat64 bib icmp"))
3074 self.logger.info(self.vapi.cli("show nat64 session table tcp"))
3075 self.logger.info(self.vapi.cli("show nat64 session table udp"))
3076 self.logger.info(self.vapi.cli("show nat64 session table icmp"))
3079 if __name__ == '__main__':
3080 unittest.main(testRunner=VppTestRunner)