7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
20 class MethodHolder(VppTestCase):
21 """ SNAT create capture and verify method holder """
25 super(MethodHolder, cls).setUpClass()
28 super(MethodHolder, self).tearDown()
30 def check_ip_checksum(self, pkt):
32 Check IP checksum of the packet
34 :param pkt: Packet to check IP checksum
36 new = pkt.__class__(str(pkt))
38 new = new.__class__(str(new))
39 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
41 def check_tcp_checksum(self, pkt):
43 Check TCP checksum in IP packet
45 :param pkt: Packet to check TCP checksum
47 new = pkt.__class__(str(pkt))
49 new = new.__class__(str(new))
50 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
52 def check_udp_checksum(self, pkt):
54 Check UDP checksum in IP packet
56 :param pkt: Packet to check UDP checksum
58 new = pkt.__class__(str(pkt))
60 new = new.__class__(str(new))
61 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
63 def check_icmp_errror_embedded(self, pkt):
65 Check ICMP error embeded packet checksum
67 :param pkt: Packet to check ICMP error embeded packet checksum
69 if pkt.haslayer(IPerror):
70 new = pkt.__class__(str(pkt))
71 del new['IPerror'].chksum
72 new = new.__class__(str(new))
73 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
75 if pkt.haslayer(TCPerror):
76 new = pkt.__class__(str(pkt))
77 del new['TCPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
81 if pkt.haslayer(UDPerror):
82 if pkt['UDPerror'].chksum != 0:
83 new = pkt.__class__(str(pkt))
84 del new['UDPerror'].chksum
85 new = new.__class__(str(new))
86 self.assertEqual(new['UDPerror'].chksum,
87 pkt['UDPerror'].chksum)
89 if pkt.haslayer(ICMPerror):
90 del new['ICMPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
94 def check_icmp_checksum(self, pkt):
96 Check ICMP checksum in IPv4 packet
98 :param pkt: Packet to check ICMP checksum
100 new = pkt.__class__(str(pkt))
101 del new['ICMP'].chksum
102 new = new.__class__(str(new))
103 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104 if pkt.haslayer(IPerror):
105 self.check_icmp_errror_embedded(pkt)
107 def check_icmpv6_checksum(self, pkt):
109 Check ICMPv6 checksum in IPv4 packet
111 :param pkt: Packet to check ICMPv6 checksum
113 new = pkt.__class__(str(pkt))
114 if pkt.haslayer(ICMPv6DestUnreach):
115 del new['ICMPv6DestUnreach'].cksum
116 new = new.__class__(str(new))
117 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118 pkt['ICMPv6DestUnreach'].cksum)
119 self.check_icmp_errror_embedded(pkt)
120 if pkt.haslayer(ICMPv6EchoRequest):
121 del new['ICMPv6EchoRequest'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124 pkt['ICMPv6EchoRequest'].cksum)
125 if pkt.haslayer(ICMPv6EchoReply):
126 del new['ICMPv6EchoReply'].cksum
127 new = new.__class__(str(new))
128 self.assertEqual(new['ICMPv6EchoReply'].cksum,
129 pkt['ICMPv6EchoReply'].cksum)
131 def create_stream_in(self, in_if, out_if, ttl=64):
133 Create packet stream for inside network
135 :param in_if: Inside interface
136 :param out_if: Outside interface
137 :param ttl: TTL of generated packets
141 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143 TCP(sport=self.tcp_port_in, dport=20))
147 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149 UDP(sport=self.udp_port_in, dport=20))
153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155 ICMP(id=self.icmp_id_in, type='echo-request'))
160 def create_stream_in_ip6(self, in_if, out_if, hlim=64):
162 Create IPv6 packet stream for inside network
164 :param in_if: Inside interface
165 :param out_if: Outside interface
166 :param ttl: Hop Limit of generated packets
169 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
171 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
172 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
173 TCP(sport=self.tcp_port_in, dport=20))
177 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
178 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
179 UDP(sport=self.udp_port_in, dport=20))
183 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
184 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
185 ICMPv6EchoRequest(id=self.icmp_id_in))
190 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
192 Create packet stream for outside network
194 :param out_if: Outside interface
195 :param dst_ip: Destination IP address (Default use global SNAT address)
196 :param ttl: TTL of generated packets
199 dst_ip = self.snat_addr
202 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
203 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
204 TCP(dport=self.tcp_port_out, sport=20))
208 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
209 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
210 UDP(dport=self.udp_port_out, sport=20))
214 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
215 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
216 ICMP(id=self.icmp_id_out, type='echo-reply'))
221 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
222 packet_num=3, dst_ip=None):
224 Verify captured packets on outside network
226 :param capture: Captured packets
227 :param nat_ip: Translated IP address (Default use global SNAT address)
228 :param same_port: Sorce port number is not translated (Default False)
229 :param packet_num: Expected number of packets (Default 3)
230 :param dst_ip: Destination IP address (Default do not verify)
233 nat_ip = self.snat_addr
234 self.assertEqual(packet_num, len(capture))
235 for packet in capture:
237 self.check_ip_checksum(packet)
238 self.assertEqual(packet[IP].src, nat_ip)
239 if dst_ip is not None:
240 self.assertEqual(packet[IP].dst, dst_ip)
241 if packet.haslayer(TCP):
243 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
246 packet[TCP].sport, self.tcp_port_in)
247 self.tcp_port_out = packet[TCP].sport
248 self.check_tcp_checksum(packet)
249 elif packet.haslayer(UDP):
251 self.assertEqual(packet[UDP].sport, self.udp_port_in)
254 packet[UDP].sport, self.udp_port_in)
255 self.udp_port_out = packet[UDP].sport
258 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
260 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
261 self.icmp_id_out = packet[ICMP].id
262 self.check_icmp_checksum(packet)
264 self.logger.error(ppp("Unexpected or invalid packet "
265 "(outside network):", packet))
268 def verify_capture_in(self, capture, in_if, packet_num=3):
270 Verify captured packets on inside network
272 :param capture: Captured packets
273 :param in_if: Inside interface
274 :param packet_num: Expected number of packets (Default 3)
276 self.assertEqual(packet_num, len(capture))
277 for packet in capture:
279 self.check_ip_checksum(packet)
280 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
281 if packet.haslayer(TCP):
282 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
283 self.check_tcp_checksum(packet)
284 elif packet.haslayer(UDP):
285 self.assertEqual(packet[UDP].dport, self.udp_port_in)
287 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
288 self.check_icmp_checksum(packet)
290 self.logger.error(ppp("Unexpected or invalid packet "
291 "(inside network):", packet))
294 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
296 Verify captured IPv6 packets on inside network
298 :param capture: Captured packets
299 :param src_ip: Source IP
300 :param dst_ip: Destination IP address
301 :param packet_num: Expected number of packets (Default 3)
303 self.assertEqual(packet_num, len(capture))
304 for packet in capture:
306 self.assertEqual(packet[IPv6].src, src_ip)
307 self.assertEqual(packet[IPv6].dst, dst_ip)
308 if packet.haslayer(TCP):
309 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
310 self.check_tcp_checksum(packet)
311 elif packet.haslayer(UDP):
312 self.assertEqual(packet[UDP].dport, self.udp_port_in)
313 self.check_udp_checksum(packet)
315 self.assertEqual(packet[ICMPv6EchoReply].id,
317 self.check_icmpv6_checksum(packet)
319 self.logger.error(ppp("Unexpected or invalid packet "
320 "(inside network):", packet))
323 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
325 Verify captured packet that don't have to be translated
327 :param capture: Captured packets
328 :param ingress_if: Ingress interface
329 :param egress_if: Egress interface
331 for packet in capture:
333 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
334 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
335 if packet.haslayer(TCP):
336 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
337 elif packet.haslayer(UDP):
338 self.assertEqual(packet[UDP].sport, self.udp_port_in)
340 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
342 self.logger.error(ppp("Unexpected or invalid packet "
343 "(inside network):", packet))
346 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
347 packet_num=3, icmp_type=11):
349 Verify captured packets with ICMP errors on outside network
351 :param capture: Captured packets
352 :param src_ip: Translated IP address or IP address of VPP
353 (Default use global SNAT address)
354 :param packet_num: Expected number of packets (Default 3)
355 :param icmp_type: Type of error ICMP packet
356 we are expecting (Default 11)
359 src_ip = self.snat_addr
360 self.assertEqual(packet_num, len(capture))
361 for packet in capture:
363 self.assertEqual(packet[IP].src, src_ip)
364 self.assertTrue(packet.haslayer(ICMP))
366 self.assertEqual(icmp.type, icmp_type)
367 self.assertTrue(icmp.haslayer(IPerror))
368 inner_ip = icmp[IPerror]
369 if inner_ip.haslayer(TCPerror):
370 self.assertEqual(inner_ip[TCPerror].dport,
372 elif inner_ip.haslayer(UDPerror):
373 self.assertEqual(inner_ip[UDPerror].dport,
376 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
378 self.logger.error(ppp("Unexpected or invalid packet "
379 "(outside network):", packet))
382 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
385 Verify captured packets with ICMP errors on inside network
387 :param capture: Captured packets
388 :param in_if: Inside interface
389 :param packet_num: Expected number of packets (Default 3)
390 :param icmp_type: Type of error ICMP packet
391 we are expecting (Default 11)
393 self.assertEqual(packet_num, len(capture))
394 for packet in capture:
396 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
397 self.assertTrue(packet.haslayer(ICMP))
399 self.assertEqual(icmp.type, icmp_type)
400 self.assertTrue(icmp.haslayer(IPerror))
401 inner_ip = icmp[IPerror]
402 if inner_ip.haslayer(TCPerror):
403 self.assertEqual(inner_ip[TCPerror].sport,
405 elif inner_ip.haslayer(UDPerror):
406 self.assertEqual(inner_ip[UDPerror].sport,
409 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
411 self.logger.error(ppp("Unexpected or invalid packet "
412 "(inside network):", packet))
415 def verify_ipfix_nat44_ses(self, data):
417 Verify IPFIX NAT44 session create/delete event
419 :param data: Decoded IPFIX data records
421 nat44_ses_create_num = 0
422 nat44_ses_delete_num = 0
423 self.assertEqual(6, len(data))
426 self.assertIn(ord(record[230]), [4, 5])
427 if ord(record[230]) == 4:
428 nat44_ses_create_num += 1
430 nat44_ses_delete_num += 1
432 self.assertEqual(self.pg0.remote_ip4n, record[8])
433 # postNATSourceIPv4Address
434 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
437 self.assertEqual(struct.pack("!I", 0), record[234])
438 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
439 if IP_PROTOS.icmp == ord(record[4]):
440 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
441 self.assertEqual(struct.pack("!H", self.icmp_id_out),
443 elif IP_PROTOS.tcp == ord(record[4]):
444 self.assertEqual(struct.pack("!H", self.tcp_port_in),
446 self.assertEqual(struct.pack("!H", self.tcp_port_out),
448 elif IP_PROTOS.udp == ord(record[4]):
449 self.assertEqual(struct.pack("!H", self.udp_port_in),
451 self.assertEqual(struct.pack("!H", self.udp_port_out),
454 self.fail("Invalid protocol")
455 self.assertEqual(3, nat44_ses_create_num)
456 self.assertEqual(3, nat44_ses_delete_num)
458 def verify_ipfix_addr_exhausted(self, data):
460 Verify IPFIX NAT addresses event
462 :param data: Decoded IPFIX data records
464 self.assertEqual(1, len(data))
467 self.assertEqual(ord(record[230]), 3)
469 self.assertEqual(struct.pack("!I", 0), record[283])
472 class TestSNAT(MethodHolder):
473 """ SNAT Test Cases """
477 super(TestSNAT, cls).setUpClass()
480 cls.tcp_port_in = 6303
481 cls.tcp_port_out = 6303
482 cls.udp_port_in = 6304
483 cls.udp_port_out = 6304
484 cls.icmp_id_in = 6305
485 cls.icmp_id_out = 6305
486 cls.snat_addr = '10.0.0.3'
487 cls.ipfix_src_port = 4739
488 cls.ipfix_domain_id = 1
490 cls.create_pg_interfaces(range(9))
491 cls.interfaces = list(cls.pg_interfaces[0:4])
493 for i in cls.interfaces:
498 cls.pg0.generate_remote_hosts(3)
499 cls.pg0.configure_ipv4_neighbors()
501 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
503 cls.pg4._local_ip4 = "172.16.255.1"
504 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
505 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
506 cls.pg4.set_table_ip4(10)
507 cls.pg5._local_ip4 = "172.16.255.3"
508 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
509 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
510 cls.pg5.set_table_ip4(10)
511 cls.pg6._local_ip4 = "172.16.255.1"
512 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
513 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
514 cls.pg6.set_table_ip4(20)
515 for i in cls.overlapping_interfaces:
524 super(TestSNAT, cls).tearDownClass()
527 def clear_snat(self):
529 Clear SNAT configuration.
531 # I found no elegant way to do this
532 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
533 dst_address_length=32,
534 next_hop_address=self.pg7.remote_ip4n,
535 next_hop_sw_if_index=self.pg7.sw_if_index,
537 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
538 dst_address_length=32,
539 next_hop_address=self.pg8.remote_ip4n,
540 next_hop_sw_if_index=self.pg8.sw_if_index,
543 for intf in [self.pg7, self.pg8]:
544 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
546 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
551 if self.pg7.has_ip4_config:
552 self.pg7.unconfig_ip4()
554 interfaces = self.vapi.snat_interface_addr_dump()
555 for intf in interfaces:
556 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
558 self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
559 domain_id=self.ipfix_domain_id)
560 self.ipfix_src_port = 4739
561 self.ipfix_domain_id = 1
563 interfaces = self.vapi.snat_interface_dump()
564 for intf in interfaces:
565 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
569 static_mappings = self.vapi.snat_static_mapping_dump()
570 for sm in static_mappings:
571 self.vapi.snat_add_static_mapping(sm.local_ip_address,
572 sm.external_ip_address,
573 local_port=sm.local_port,
574 external_port=sm.external_port,
575 addr_only=sm.addr_only,
577 protocol=sm.protocol,
580 adresses = self.vapi.snat_address_dump()
581 for addr in adresses:
582 self.vapi.snat_add_address_range(addr.ip_address,
586 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
587 local_port=0, external_port=0, vrf_id=0,
588 is_add=1, external_sw_if_index=0xFFFFFFFF,
591 Add/delete S-NAT static mapping
593 :param local_ip: Local IP address
594 :param external_ip: External IP address
595 :param local_port: Local port number (Optional)
596 :param external_port: External port number (Optional)
597 :param vrf_id: VRF ID (Default 0)
598 :param is_add: 1 if add, 0 if delete (Default add)
599 :param external_sw_if_index: External interface instead of IP address
600 :param proto: IP protocol (Mandatory if port specified)
603 if local_port and external_port:
605 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
606 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
607 self.vapi.snat_add_static_mapping(
610 external_sw_if_index,
618 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
620 Add/delete S-NAT address
622 :param ip: IP address
623 :param is_add: 1 if add, 0 if delete (Default add)
625 snat_addr = socket.inet_pton(socket.AF_INET, ip)
626 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
629 def test_dynamic(self):
630 """ SNAT dynamic translation test """
632 self.snat_add_address(self.snat_addr)
633 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
634 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
638 pkts = self.create_stream_in(self.pg0, self.pg1)
639 self.pg0.add_stream(pkts)
640 self.pg_enable_capture(self.pg_interfaces)
642 capture = self.pg1.get_capture(len(pkts))
643 self.verify_capture_out(capture)
646 pkts = self.create_stream_out(self.pg1)
647 self.pg1.add_stream(pkts)
648 self.pg_enable_capture(self.pg_interfaces)
650 capture = self.pg0.get_capture(len(pkts))
651 self.verify_capture_in(capture, self.pg0)
653 def test_dynamic_icmp_errors_in2out_ttl_1(self):
654 """ SNAT handling of client packets with TTL=1 """
656 self.snat_add_address(self.snat_addr)
657 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
658 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
661 # Client side - generate traffic
662 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
663 self.pg0.add_stream(pkts)
664 self.pg_enable_capture(self.pg_interfaces)
667 # Client side - verify ICMP type 11 packets
668 capture = self.pg0.get_capture(len(pkts))
669 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
671 def test_dynamic_icmp_errors_out2in_ttl_1(self):
672 """ SNAT handling of server packets with TTL=1 """
674 self.snat_add_address(self.snat_addr)
675 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
676 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
679 # Client side - create sessions
680 pkts = self.create_stream_in(self.pg0, self.pg1)
681 self.pg0.add_stream(pkts)
682 self.pg_enable_capture(self.pg_interfaces)
685 # Server side - generate traffic
686 capture = self.pg1.get_capture(len(pkts))
687 self.verify_capture_out(capture)
688 pkts = self.create_stream_out(self.pg1, ttl=1)
689 self.pg1.add_stream(pkts)
690 self.pg_enable_capture(self.pg_interfaces)
693 # Server side - verify ICMP type 11 packets
694 capture = self.pg1.get_capture(len(pkts))
695 self.verify_capture_out_with_icmp_errors(capture,
696 src_ip=self.pg1.local_ip4)
698 def test_dynamic_icmp_errors_in2out_ttl_2(self):
699 """ SNAT handling of error responses to client packets with TTL=2 """
701 self.snat_add_address(self.snat_addr)
702 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
703 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
706 # Client side - generate traffic
707 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
708 self.pg0.add_stream(pkts)
709 self.pg_enable_capture(self.pg_interfaces)
712 # Server side - simulate ICMP type 11 response
713 capture = self.pg1.get_capture(len(pkts))
714 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
715 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
716 ICMP(type=11) / packet[IP] for packet in capture]
717 self.pg1.add_stream(pkts)
718 self.pg_enable_capture(self.pg_interfaces)
721 # Client side - verify ICMP type 11 packets
722 capture = self.pg0.get_capture(len(pkts))
723 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
725 def test_dynamic_icmp_errors_out2in_ttl_2(self):
726 """ SNAT handling of error responses to server packets with TTL=2 """
728 self.snat_add_address(self.snat_addr)
729 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
730 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
733 # Client side - create sessions
734 pkts = self.create_stream_in(self.pg0, self.pg1)
735 self.pg0.add_stream(pkts)
736 self.pg_enable_capture(self.pg_interfaces)
739 # Server side - generate traffic
740 capture = self.pg1.get_capture(len(pkts))
741 self.verify_capture_out(capture)
742 pkts = self.create_stream_out(self.pg1, ttl=2)
743 self.pg1.add_stream(pkts)
744 self.pg_enable_capture(self.pg_interfaces)
747 # Client side - simulate ICMP type 11 response
748 capture = self.pg0.get_capture(len(pkts))
749 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
750 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
751 ICMP(type=11) / packet[IP] for packet in capture]
752 self.pg0.add_stream(pkts)
753 self.pg_enable_capture(self.pg_interfaces)
756 # Server side - verify ICMP type 11 packets
757 capture = self.pg1.get_capture(len(pkts))
758 self.verify_capture_out_with_icmp_errors(capture)
760 def test_ping_out_interface_from_outside(self):
761 """ Ping SNAT out interface from outside network """
763 self.snat_add_address(self.snat_addr)
764 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
765 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
768 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
769 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
770 ICMP(id=self.icmp_id_out, type='echo-request'))
772 self.pg1.add_stream(pkts)
773 self.pg_enable_capture(self.pg_interfaces)
775 capture = self.pg1.get_capture(len(pkts))
776 self.assertEqual(1, len(capture))
779 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
780 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
781 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
782 self.assertEqual(packet[ICMP].type, 0) # echo reply
784 self.logger.error(ppp("Unexpected or invalid packet "
785 "(outside network):", packet))
788 def test_ping_internal_host_from_outside(self):
789 """ Ping internal host from outside network """
791 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
792 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
793 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
797 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
798 IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
799 ICMP(id=self.icmp_id_out, type='echo-request'))
800 self.pg1.add_stream(pkt)
801 self.pg_enable_capture(self.pg_interfaces)
803 capture = self.pg0.get_capture(1)
804 self.verify_capture_in(capture, self.pg0, packet_num=1)
805 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
808 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
809 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
810 ICMP(id=self.icmp_id_in, type='echo-reply'))
811 self.pg0.add_stream(pkt)
812 self.pg_enable_capture(self.pg_interfaces)
814 capture = self.pg1.get_capture(1)
815 self.verify_capture_out(capture, same_port=True, packet_num=1)
816 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
818 def test_static_in(self):
819 """ SNAT 1:1 NAT initialized from inside network """
822 self.tcp_port_out = 6303
823 self.udp_port_out = 6304
824 self.icmp_id_out = 6305
826 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
827 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
828 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
832 pkts = self.create_stream_in(self.pg0, self.pg1)
833 self.pg0.add_stream(pkts)
834 self.pg_enable_capture(self.pg_interfaces)
836 capture = self.pg1.get_capture(len(pkts))
837 self.verify_capture_out(capture, nat_ip, True)
840 pkts = self.create_stream_out(self.pg1, nat_ip)
841 self.pg1.add_stream(pkts)
842 self.pg_enable_capture(self.pg_interfaces)
844 capture = self.pg0.get_capture(len(pkts))
845 self.verify_capture_in(capture, self.pg0)
847 def test_static_out(self):
848 """ SNAT 1:1 NAT initialized from outside network """
851 self.tcp_port_out = 6303
852 self.udp_port_out = 6304
853 self.icmp_id_out = 6305
855 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
856 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
857 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
861 pkts = self.create_stream_out(self.pg1, nat_ip)
862 self.pg1.add_stream(pkts)
863 self.pg_enable_capture(self.pg_interfaces)
865 capture = self.pg0.get_capture(len(pkts))
866 self.verify_capture_in(capture, self.pg0)
869 pkts = self.create_stream_in(self.pg0, self.pg1)
870 self.pg0.add_stream(pkts)
871 self.pg_enable_capture(self.pg_interfaces)
873 capture = self.pg1.get_capture(len(pkts))
874 self.verify_capture_out(capture, nat_ip, True)
876 def test_static_with_port_in(self):
877 """ SNAT 1:1 NAT with port initialized from inside network """
879 self.tcp_port_out = 3606
880 self.udp_port_out = 3607
881 self.icmp_id_out = 3608
883 self.snat_add_address(self.snat_addr)
884 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
885 self.tcp_port_in, self.tcp_port_out,
887 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
888 self.udp_port_in, self.udp_port_out,
890 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
891 self.icmp_id_in, self.icmp_id_out,
892 proto=IP_PROTOS.icmp)
893 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
894 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
898 pkts = self.create_stream_in(self.pg0, self.pg1)
899 self.pg0.add_stream(pkts)
900 self.pg_enable_capture(self.pg_interfaces)
902 capture = self.pg1.get_capture(len(pkts))
903 self.verify_capture_out(capture)
906 pkts = self.create_stream_out(self.pg1)
907 self.pg1.add_stream(pkts)
908 self.pg_enable_capture(self.pg_interfaces)
910 capture = self.pg0.get_capture(len(pkts))
911 self.verify_capture_in(capture, self.pg0)
913 def test_static_with_port_out(self):
914 """ SNAT 1:1 NAT with port initialized from outside network """
916 self.tcp_port_out = 30606
917 self.udp_port_out = 30607
918 self.icmp_id_out = 30608
920 self.snat_add_address(self.snat_addr)
921 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
922 self.tcp_port_in, self.tcp_port_out,
924 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
925 self.udp_port_in, self.udp_port_out,
927 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
928 self.icmp_id_in, self.icmp_id_out,
929 proto=IP_PROTOS.icmp)
930 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
931 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
935 pkts = self.create_stream_out(self.pg1)
936 self.pg1.add_stream(pkts)
937 self.pg_enable_capture(self.pg_interfaces)
939 capture = self.pg0.get_capture(len(pkts))
940 self.verify_capture_in(capture, self.pg0)
943 pkts = self.create_stream_in(self.pg0, self.pg1)
944 self.pg0.add_stream(pkts)
945 self.pg_enable_capture(self.pg_interfaces)
947 capture = self.pg1.get_capture(len(pkts))
948 self.verify_capture_out(capture)
950 def test_static_vrf_aware(self):
951 """ SNAT 1:1 NAT VRF awareness """
953 nat_ip1 = "10.0.0.30"
954 nat_ip2 = "10.0.0.40"
955 self.tcp_port_out = 6303
956 self.udp_port_out = 6304
957 self.icmp_id_out = 6305
959 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
961 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
963 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
965 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
966 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
968 # inside interface VRF match SNAT static mapping VRF
969 pkts = self.create_stream_in(self.pg4, self.pg3)
970 self.pg4.add_stream(pkts)
971 self.pg_enable_capture(self.pg_interfaces)
973 capture = self.pg3.get_capture(len(pkts))
974 self.verify_capture_out(capture, nat_ip1, True)
976 # inside interface VRF don't match SNAT static mapping VRF (packets
978 pkts = self.create_stream_in(self.pg0, self.pg3)
979 self.pg0.add_stream(pkts)
980 self.pg_enable_capture(self.pg_interfaces)
982 self.pg3.assert_nothing_captured()
984 def test_multiple_inside_interfaces(self):
985 """ SNAT multiple inside interfaces (non-overlapping address space) """
987 self.snat_add_address(self.snat_addr)
988 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
989 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
990 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
993 # between two S-NAT inside interfaces (no translation)
994 pkts = self.create_stream_in(self.pg0, self.pg1)
995 self.pg0.add_stream(pkts)
996 self.pg_enable_capture(self.pg_interfaces)
998 capture = self.pg1.get_capture(len(pkts))
999 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1001 # from S-NAT inside to interface without S-NAT feature (no translation)
1002 pkts = self.create_stream_in(self.pg0, self.pg2)
1003 self.pg0.add_stream(pkts)
1004 self.pg_enable_capture(self.pg_interfaces)
1006 capture = self.pg2.get_capture(len(pkts))
1007 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1009 # in2out 1st interface
1010 pkts = self.create_stream_in(self.pg0, self.pg3)
1011 self.pg0.add_stream(pkts)
1012 self.pg_enable_capture(self.pg_interfaces)
1014 capture = self.pg3.get_capture(len(pkts))
1015 self.verify_capture_out(capture)
1017 # out2in 1st interface
1018 pkts = self.create_stream_out(self.pg3)
1019 self.pg3.add_stream(pkts)
1020 self.pg_enable_capture(self.pg_interfaces)
1022 capture = self.pg0.get_capture(len(pkts))
1023 self.verify_capture_in(capture, self.pg0)
1025 # in2out 2nd interface
1026 pkts = self.create_stream_in(self.pg1, self.pg3)
1027 self.pg1.add_stream(pkts)
1028 self.pg_enable_capture(self.pg_interfaces)
1030 capture = self.pg3.get_capture(len(pkts))
1031 self.verify_capture_out(capture)
1033 # out2in 2nd interface
1034 pkts = self.create_stream_out(self.pg3)
1035 self.pg3.add_stream(pkts)
1036 self.pg_enable_capture(self.pg_interfaces)
1038 capture = self.pg1.get_capture(len(pkts))
1039 self.verify_capture_in(capture, self.pg1)
1041 def test_inside_overlapping_interfaces(self):
1042 """ SNAT multiple inside interfaces with overlapping address space """
1044 static_nat_ip = "10.0.0.10"
1045 self.snat_add_address(self.snat_addr)
1046 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1048 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1049 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
1050 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
1051 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1054 # between S-NAT inside interfaces with same VRF (no translation)
1055 pkts = self.create_stream_in(self.pg4, self.pg5)
1056 self.pg4.add_stream(pkts)
1057 self.pg_enable_capture(self.pg_interfaces)
1059 capture = self.pg5.get_capture(len(pkts))
1060 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1062 # between S-NAT inside interfaces with different VRF (hairpinning)
1063 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1064 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1065 TCP(sport=1234, dport=5678))
1066 self.pg4.add_stream(p)
1067 self.pg_enable_capture(self.pg_interfaces)
1069 capture = self.pg6.get_capture(1)
1074 self.assertEqual(ip.src, self.snat_addr)
1075 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1076 self.assertNotEqual(tcp.sport, 1234)
1077 self.assertEqual(tcp.dport, 5678)
1079 self.logger.error(ppp("Unexpected or invalid packet:", p))
1082 # in2out 1st interface
1083 pkts = self.create_stream_in(self.pg4, self.pg3)
1084 self.pg4.add_stream(pkts)
1085 self.pg_enable_capture(self.pg_interfaces)
1087 capture = self.pg3.get_capture(len(pkts))
1088 self.verify_capture_out(capture)
1090 # out2in 1st interface
1091 pkts = self.create_stream_out(self.pg3)
1092 self.pg3.add_stream(pkts)
1093 self.pg_enable_capture(self.pg_interfaces)
1095 capture = self.pg4.get_capture(len(pkts))
1096 self.verify_capture_in(capture, self.pg4)
1098 # in2out 2nd interface
1099 pkts = self.create_stream_in(self.pg5, self.pg3)
1100 self.pg5.add_stream(pkts)
1101 self.pg_enable_capture(self.pg_interfaces)
1103 capture = self.pg3.get_capture(len(pkts))
1104 self.verify_capture_out(capture)
1106 # out2in 2nd interface
1107 pkts = self.create_stream_out(self.pg3)
1108 self.pg3.add_stream(pkts)
1109 self.pg_enable_capture(self.pg_interfaces)
1111 capture = self.pg5.get_capture(len(pkts))
1112 self.verify_capture_in(capture, self.pg5)
1115 addresses = self.vapi.snat_address_dump()
1116 self.assertEqual(len(addresses), 1)
1117 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1118 self.assertEqual(len(sessions), 3)
1119 for session in sessions:
1120 self.assertFalse(session.is_static)
1121 self.assertEqual(session.inside_ip_address[0:4],
1122 self.pg5.remote_ip4n)
1123 self.assertEqual(session.outside_ip_address,
1124 addresses[0].ip_address)
1125 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1126 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1127 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1128 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1129 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1130 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1131 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1132 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1133 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1135 # in2out 3rd interface
1136 pkts = self.create_stream_in(self.pg6, self.pg3)
1137 self.pg6.add_stream(pkts)
1138 self.pg_enable_capture(self.pg_interfaces)
1140 capture = self.pg3.get_capture(len(pkts))
1141 self.verify_capture_out(capture, static_nat_ip, True)
1143 # out2in 3rd interface
1144 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1145 self.pg3.add_stream(pkts)
1146 self.pg_enable_capture(self.pg_interfaces)
1148 capture = self.pg6.get_capture(len(pkts))
1149 self.verify_capture_in(capture, self.pg6)
1151 # general user and session dump verifications
1152 users = self.vapi.snat_user_dump()
1153 self.assertTrue(len(users) >= 3)
1154 addresses = self.vapi.snat_address_dump()
1155 self.assertEqual(len(addresses), 1)
1157 sessions = self.vapi.snat_user_session_dump(user.ip_address,
1159 for session in sessions:
1160 self.assertEqual(user.ip_address, session.inside_ip_address)
1161 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1162 self.assertTrue(session.protocol in
1163 [IP_PROTOS.tcp, IP_PROTOS.udp,
1167 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1168 self.assertTrue(len(sessions) >= 4)
1169 for session in sessions:
1170 self.assertFalse(session.is_static)
1171 self.assertEqual(session.inside_ip_address[0:4],
1172 self.pg4.remote_ip4n)
1173 self.assertEqual(session.outside_ip_address,
1174 addresses[0].ip_address)
1177 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1178 self.assertTrue(len(sessions) >= 3)
1179 for session in sessions:
1180 self.assertTrue(session.is_static)
1181 self.assertEqual(session.inside_ip_address[0:4],
1182 self.pg6.remote_ip4n)
1183 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1184 map(int, static_nat_ip.split('.')))
1185 self.assertTrue(session.inside_port in
1186 [self.tcp_port_in, self.udp_port_in,
1189 def test_hairpinning(self):
1190 """ SNAT hairpinning - 1:1 NAT with port"""
1192 host = self.pg0.remote_hosts[0]
1193 server = self.pg0.remote_hosts[1]
1196 server_in_port = 5678
1197 server_out_port = 8765
1199 self.snat_add_address(self.snat_addr)
1200 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1201 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1203 # add static mapping for server
1204 self.snat_add_static_mapping(server.ip4, self.snat_addr,
1205 server_in_port, server_out_port,
1206 proto=IP_PROTOS.tcp)
1208 # send packet from host to server
1209 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1210 IP(src=host.ip4, dst=self.snat_addr) /
1211 TCP(sport=host_in_port, dport=server_out_port))
1212 self.pg0.add_stream(p)
1213 self.pg_enable_capture(self.pg_interfaces)
1215 capture = self.pg0.get_capture(1)
1220 self.assertEqual(ip.src, self.snat_addr)
1221 self.assertEqual(ip.dst, server.ip4)
1222 self.assertNotEqual(tcp.sport, host_in_port)
1223 self.assertEqual(tcp.dport, server_in_port)
1224 self.check_tcp_checksum(p)
1225 host_out_port = tcp.sport
1227 self.logger.error(ppp("Unexpected or invalid packet:", p))
1230 # send reply from server to host
1231 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1232 IP(src=server.ip4, dst=self.snat_addr) /
1233 TCP(sport=server_in_port, dport=host_out_port))
1234 self.pg0.add_stream(p)
1235 self.pg_enable_capture(self.pg_interfaces)
1237 capture = self.pg0.get_capture(1)
1242 self.assertEqual(ip.src, self.snat_addr)
1243 self.assertEqual(ip.dst, host.ip4)
1244 self.assertEqual(tcp.sport, server_out_port)
1245 self.assertEqual(tcp.dport, host_in_port)
1246 self.check_tcp_checksum(p)
1248 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1251 def test_hairpinning2(self):
1252 """ SNAT hairpinning - 1:1 NAT"""
1254 server1_nat_ip = "10.0.0.10"
1255 server2_nat_ip = "10.0.0.11"
1256 host = self.pg0.remote_hosts[0]
1257 server1 = self.pg0.remote_hosts[1]
1258 server2 = self.pg0.remote_hosts[2]
1259 server_tcp_port = 22
1260 server_udp_port = 20
1262 self.snat_add_address(self.snat_addr)
1263 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1264 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1267 # add static mapping for servers
1268 self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1269 self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1273 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1274 IP(src=host.ip4, dst=server1_nat_ip) /
1275 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1277 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1278 IP(src=host.ip4, dst=server1_nat_ip) /
1279 UDP(sport=self.udp_port_in, dport=server_udp_port))
1281 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1282 IP(src=host.ip4, dst=server1_nat_ip) /
1283 ICMP(id=self.icmp_id_in, type='echo-request'))
1285 self.pg0.add_stream(pkts)
1286 self.pg_enable_capture(self.pg_interfaces)
1288 capture = self.pg0.get_capture(len(pkts))
1289 for packet in capture:
1291 self.assertEqual(packet[IP].src, self.snat_addr)
1292 self.assertEqual(packet[IP].dst, server1.ip4)
1293 if packet.haslayer(TCP):
1294 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1295 self.assertEqual(packet[TCP].dport, server_tcp_port)
1296 self.tcp_port_out = packet[TCP].sport
1297 self.check_tcp_checksum(packet)
1298 elif packet.haslayer(UDP):
1299 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1300 self.assertEqual(packet[UDP].dport, server_udp_port)
1301 self.udp_port_out = packet[UDP].sport
1303 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1304 self.icmp_id_out = packet[ICMP].id
1306 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1311 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1312 IP(src=server1.ip4, dst=self.snat_addr) /
1313 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1315 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1316 IP(src=server1.ip4, dst=self.snat_addr) /
1317 UDP(sport=server_udp_port, dport=self.udp_port_out))
1319 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1320 IP(src=server1.ip4, dst=self.snat_addr) /
1321 ICMP(id=self.icmp_id_out, type='echo-reply'))
1323 self.pg0.add_stream(pkts)
1324 self.pg_enable_capture(self.pg_interfaces)
1326 capture = self.pg0.get_capture(len(pkts))
1327 for packet in capture:
1329 self.assertEqual(packet[IP].src, server1_nat_ip)
1330 self.assertEqual(packet[IP].dst, host.ip4)
1331 if packet.haslayer(TCP):
1332 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1333 self.assertEqual(packet[TCP].sport, server_tcp_port)
1334 self.check_tcp_checksum(packet)
1335 elif packet.haslayer(UDP):
1336 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1337 self.assertEqual(packet[UDP].sport, server_udp_port)
1339 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1341 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1344 # server2 to server1
1346 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1347 IP(src=server2.ip4, dst=server1_nat_ip) /
1348 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1350 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1351 IP(src=server2.ip4, dst=server1_nat_ip) /
1352 UDP(sport=self.udp_port_in, dport=server_udp_port))
1354 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1355 IP(src=server2.ip4, dst=server1_nat_ip) /
1356 ICMP(id=self.icmp_id_in, type='echo-request'))
1358 self.pg0.add_stream(pkts)
1359 self.pg_enable_capture(self.pg_interfaces)
1361 capture = self.pg0.get_capture(len(pkts))
1362 for packet in capture:
1364 self.assertEqual(packet[IP].src, server2_nat_ip)
1365 self.assertEqual(packet[IP].dst, server1.ip4)
1366 if packet.haslayer(TCP):
1367 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1368 self.assertEqual(packet[TCP].dport, server_tcp_port)
1369 self.tcp_port_out = packet[TCP].sport
1370 self.check_tcp_checksum(packet)
1371 elif packet.haslayer(UDP):
1372 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1373 self.assertEqual(packet[UDP].dport, server_udp_port)
1374 self.udp_port_out = packet[UDP].sport
1376 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1377 self.icmp_id_out = packet[ICMP].id
1379 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1382 # server1 to server2
1384 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1385 IP(src=server1.ip4, dst=server2_nat_ip) /
1386 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1388 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1389 IP(src=server1.ip4, dst=server2_nat_ip) /
1390 UDP(sport=server_udp_port, dport=self.udp_port_out))
1392 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1393 IP(src=server1.ip4, dst=server2_nat_ip) /
1394 ICMP(id=self.icmp_id_out, type='echo-reply'))
1396 self.pg0.add_stream(pkts)
1397 self.pg_enable_capture(self.pg_interfaces)
1399 capture = self.pg0.get_capture(len(pkts))
1400 for packet in capture:
1402 self.assertEqual(packet[IP].src, server1_nat_ip)
1403 self.assertEqual(packet[IP].dst, server2.ip4)
1404 if packet.haslayer(TCP):
1405 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1406 self.assertEqual(packet[TCP].sport, server_tcp_port)
1407 self.check_tcp_checksum(packet)
1408 elif packet.haslayer(UDP):
1409 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1410 self.assertEqual(packet[UDP].sport, server_udp_port)
1412 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1414 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1417 def test_max_translations_per_user(self):
1418 """ MAX translations per user - recycle the least recently used """
1420 self.snat_add_address(self.snat_addr)
1421 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1422 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1425 # get maximum number of translations per user
1426 snat_config = self.vapi.snat_show_config()
1428 # send more than maximum number of translations per user packets
1429 pkts_num = snat_config.max_translations_per_user + 5
1431 for port in range(0, pkts_num):
1432 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1433 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1434 TCP(sport=1025 + port))
1436 self.pg0.add_stream(pkts)
1437 self.pg_enable_capture(self.pg_interfaces)
1440 # verify number of translated packet
1441 self.pg1.get_capture(pkts_num)
1443 def test_interface_addr(self):
1444 """ Acquire SNAT addresses from interface """
1445 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1447 # no address in NAT pool
1448 adresses = self.vapi.snat_address_dump()
1449 self.assertEqual(0, len(adresses))
1451 # configure interface address and check NAT address pool
1452 self.pg7.config_ip4()
1453 adresses = self.vapi.snat_address_dump()
1454 self.assertEqual(1, len(adresses))
1455 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1457 # remove interface address and check NAT address pool
1458 self.pg7.unconfig_ip4()
1459 adresses = self.vapi.snat_address_dump()
1460 self.assertEqual(0, len(adresses))
1462 def test_interface_addr_static_mapping(self):
1463 """ Static mapping with addresses from interface """
1464 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1465 self.snat_add_static_mapping('1.2.3.4',
1466 external_sw_if_index=self.pg7.sw_if_index)
1468 # static mappings with external interface
1469 static_mappings = self.vapi.snat_static_mapping_dump()
1470 self.assertEqual(1, len(static_mappings))
1471 self.assertEqual(self.pg7.sw_if_index,
1472 static_mappings[0].external_sw_if_index)
1474 # configure interface address and check static mappings
1475 self.pg7.config_ip4()
1476 static_mappings = self.vapi.snat_static_mapping_dump()
1477 self.assertEqual(1, len(static_mappings))
1478 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1479 self.pg7.local_ip4n)
1480 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1482 # remove interface address and check static mappings
1483 self.pg7.unconfig_ip4()
1484 static_mappings = self.vapi.snat_static_mapping_dump()
1485 self.assertEqual(0, len(static_mappings))
1487 def test_ipfix_nat44_sess(self):
1488 """ S-NAT IPFIX logging NAT44 session created/delted """
1489 self.ipfix_domain_id = 10
1490 self.ipfix_src_port = 20202
1491 colector_port = 30303
1492 bind_layers(UDP, IPFIX, dport=30303)
1493 self.snat_add_address(self.snat_addr)
1494 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1495 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1497 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1498 src_address=self.pg3.local_ip4n,
1500 template_interval=10,
1501 collector_port=colector_port)
1502 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1503 src_port=self.ipfix_src_port)
1505 pkts = self.create_stream_in(self.pg0, self.pg1)
1506 self.pg0.add_stream(pkts)
1507 self.pg_enable_capture(self.pg_interfaces)
1509 capture = self.pg1.get_capture(len(pkts))
1510 self.verify_capture_out(capture)
1511 self.snat_add_address(self.snat_addr, is_add=0)
1512 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1513 capture = self.pg3.get_capture(3)
1514 ipfix = IPFIXDecoder()
1515 # first load template
1517 self.assertTrue(p.haslayer(IPFIX))
1518 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1519 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1520 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1521 self.assertEqual(p[UDP].dport, colector_port)
1522 self.assertEqual(p[IPFIX].observationDomainID,
1523 self.ipfix_domain_id)
1524 if p.haslayer(Template):
1525 ipfix.add_template(p.getlayer(Template))
1526 # verify events in data set
1528 if p.haslayer(Data):
1529 data = ipfix.decode_data_set(p.getlayer(Set))
1530 self.verify_ipfix_nat44_ses(data)
1532 def test_ipfix_addr_exhausted(self):
1533 """ S-NAT IPFIX logging NAT addresses exhausted """
1534 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1535 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1537 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1538 src_address=self.pg3.local_ip4n,
1540 template_interval=10)
1541 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1542 src_port=self.ipfix_src_port)
1544 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1545 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1547 self.pg0.add_stream(p)
1548 self.pg_enable_capture(self.pg_interfaces)
1550 capture = self.pg1.get_capture(0)
1551 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1552 capture = self.pg3.get_capture(3)
1553 ipfix = IPFIXDecoder()
1554 # first load template
1556 self.assertTrue(p.haslayer(IPFIX))
1557 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1558 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1559 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1560 self.assertEqual(p[UDP].dport, 4739)
1561 self.assertEqual(p[IPFIX].observationDomainID,
1562 self.ipfix_domain_id)
1563 if p.haslayer(Template):
1564 ipfix.add_template(p.getlayer(Template))
1565 # verify events in data set
1567 if p.haslayer(Data):
1568 data = ipfix.decode_data_set(p.getlayer(Set))
1569 self.verify_ipfix_addr_exhausted(data)
1571 def test_pool_addr_fib(self):
1572 """ S-NAT add pool addresses to FIB """
1573 static_addr = '10.0.0.10'
1574 self.snat_add_address(self.snat_addr)
1575 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1576 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1578 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1581 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1582 ARP(op=ARP.who_has, pdst=self.snat_addr,
1583 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1584 self.pg1.add_stream(p)
1585 self.pg_enable_capture(self.pg_interfaces)
1587 capture = self.pg1.get_capture(1)
1588 self.assertTrue(capture[0].haslayer(ARP))
1589 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1592 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1593 ARP(op=ARP.who_has, pdst=static_addr,
1594 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1595 self.pg1.add_stream(p)
1596 self.pg_enable_capture(self.pg_interfaces)
1598 capture = self.pg1.get_capture(1)
1599 self.assertTrue(capture[0].haslayer(ARP))
1600 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1602 # send ARP to non-SNAT interface
1603 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1604 ARP(op=ARP.who_has, pdst=self.snat_addr,
1605 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1606 self.pg2.add_stream(p)
1607 self.pg_enable_capture(self.pg_interfaces)
1609 capture = self.pg1.get_capture(0)
1611 # remove addresses and verify
1612 self.snat_add_address(self.snat_addr, is_add=0)
1613 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1616 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1617 ARP(op=ARP.who_has, pdst=self.snat_addr,
1618 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1619 self.pg1.add_stream(p)
1620 self.pg_enable_capture(self.pg_interfaces)
1622 capture = self.pg1.get_capture(0)
1624 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1625 ARP(op=ARP.who_has, pdst=static_addr,
1626 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1627 self.pg1.add_stream(p)
1628 self.pg_enable_capture(self.pg_interfaces)
1630 capture = self.pg1.get_capture(0)
1632 def test_vrf_mode(self):
1633 """ S-NAT tenant VRF aware address pool mode """
1637 nat_ip1 = "10.0.0.10"
1638 nat_ip2 = "10.0.0.11"
1640 self.pg0.unconfig_ip4()
1641 self.pg1.unconfig_ip4()
1642 self.pg0.set_table_ip4(vrf_id1)
1643 self.pg1.set_table_ip4(vrf_id2)
1644 self.pg0.config_ip4()
1645 self.pg1.config_ip4()
1647 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1648 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1649 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1650 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1651 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1655 pkts = self.create_stream_in(self.pg0, self.pg2)
1656 self.pg0.add_stream(pkts)
1657 self.pg_enable_capture(self.pg_interfaces)
1659 capture = self.pg2.get_capture(len(pkts))
1660 self.verify_capture_out(capture, nat_ip1)
1663 pkts = self.create_stream_in(self.pg1, self.pg2)
1664 self.pg1.add_stream(pkts)
1665 self.pg_enable_capture(self.pg_interfaces)
1667 capture = self.pg2.get_capture(len(pkts))
1668 self.verify_capture_out(capture, nat_ip2)
1670 def test_vrf_feature_independent(self):
1671 """ S-NAT tenant VRF independent address pool mode """
1673 nat_ip1 = "10.0.0.10"
1674 nat_ip2 = "10.0.0.11"
1676 self.snat_add_address(nat_ip1)
1677 self.snat_add_address(nat_ip2)
1678 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1679 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1680 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1684 pkts = self.create_stream_in(self.pg0, self.pg2)
1685 self.pg0.add_stream(pkts)
1686 self.pg_enable_capture(self.pg_interfaces)
1688 capture = self.pg2.get_capture(len(pkts))
1689 self.verify_capture_out(capture, nat_ip1)
1692 pkts = self.create_stream_in(self.pg1, self.pg2)
1693 self.pg1.add_stream(pkts)
1694 self.pg_enable_capture(self.pg_interfaces)
1696 capture = self.pg2.get_capture(len(pkts))
1697 self.verify_capture_out(capture, nat_ip1)
1699 def test_dynamic_ipless_interfaces(self):
1700 """ SNAT interfaces without configured ip dynamic map """
1702 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1703 self.pg7.remote_mac,
1704 self.pg7.remote_ip4n,
1706 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1707 self.pg8.remote_mac,
1708 self.pg8.remote_ip4n,
1711 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1712 dst_address_length=32,
1713 next_hop_address=self.pg7.remote_ip4n,
1714 next_hop_sw_if_index=self.pg7.sw_if_index)
1715 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1716 dst_address_length=32,
1717 next_hop_address=self.pg8.remote_ip4n,
1718 next_hop_sw_if_index=self.pg8.sw_if_index)
1720 self.snat_add_address(self.snat_addr)
1721 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1722 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1726 pkts = self.create_stream_in(self.pg7, self.pg8)
1727 self.pg7.add_stream(pkts)
1728 self.pg_enable_capture(self.pg_interfaces)
1730 capture = self.pg8.get_capture(len(pkts))
1731 self.verify_capture_out(capture)
1734 pkts = self.create_stream_out(self.pg8, self.snat_addr)
1735 self.pg8.add_stream(pkts)
1736 self.pg_enable_capture(self.pg_interfaces)
1738 capture = self.pg7.get_capture(len(pkts))
1739 self.verify_capture_in(capture, self.pg7)
1741 def test_static_ipless_interfaces(self):
1742 """ SNAT 1:1 NAT interfaces without configured ip """
1744 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1745 self.pg7.remote_mac,
1746 self.pg7.remote_ip4n,
1748 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1749 self.pg8.remote_mac,
1750 self.pg8.remote_ip4n,
1753 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1754 dst_address_length=32,
1755 next_hop_address=self.pg7.remote_ip4n,
1756 next_hop_sw_if_index=self.pg7.sw_if_index)
1757 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1758 dst_address_length=32,
1759 next_hop_address=self.pg8.remote_ip4n,
1760 next_hop_sw_if_index=self.pg8.sw_if_index)
1762 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1763 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1764 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1768 pkts = self.create_stream_out(self.pg8)
1769 self.pg8.add_stream(pkts)
1770 self.pg_enable_capture(self.pg_interfaces)
1772 capture = self.pg7.get_capture(len(pkts))
1773 self.verify_capture_in(capture, self.pg7)
1776 pkts = self.create_stream_in(self.pg7, self.pg8)
1777 self.pg7.add_stream(pkts)
1778 self.pg_enable_capture(self.pg_interfaces)
1780 capture = self.pg8.get_capture(len(pkts))
1781 self.verify_capture_out(capture, self.snat_addr, True)
1783 def test_static_with_port_ipless_interfaces(self):
1784 """ SNAT 1:1 NAT with port interfaces without configured ip """
1786 self.tcp_port_out = 30606
1787 self.udp_port_out = 30607
1788 self.icmp_id_out = 30608
1790 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1791 self.pg7.remote_mac,
1792 self.pg7.remote_ip4n,
1794 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1795 self.pg8.remote_mac,
1796 self.pg8.remote_ip4n,
1799 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1800 dst_address_length=32,
1801 next_hop_address=self.pg7.remote_ip4n,
1802 next_hop_sw_if_index=self.pg7.sw_if_index)
1803 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1804 dst_address_length=32,
1805 next_hop_address=self.pg8.remote_ip4n,
1806 next_hop_sw_if_index=self.pg8.sw_if_index)
1808 self.snat_add_address(self.snat_addr)
1809 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1810 self.tcp_port_in, self.tcp_port_out,
1811 proto=IP_PROTOS.tcp)
1812 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1813 self.udp_port_in, self.udp_port_out,
1814 proto=IP_PROTOS.udp)
1815 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1816 self.icmp_id_in, self.icmp_id_out,
1817 proto=IP_PROTOS.icmp)
1818 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1819 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1823 pkts = self.create_stream_out(self.pg8)
1824 self.pg8.add_stream(pkts)
1825 self.pg_enable_capture(self.pg_interfaces)
1827 capture = self.pg7.get_capture(len(pkts))
1828 self.verify_capture_in(capture, self.pg7)
1831 pkts = self.create_stream_in(self.pg7, self.pg8)
1832 self.pg7.add_stream(pkts)
1833 self.pg_enable_capture(self.pg_interfaces)
1835 capture = self.pg8.get_capture(len(pkts))
1836 self.verify_capture_out(capture)
1838 def test_static_unknown_proto(self):
1839 """ 1:1 NAT translate packet with unknown protocol """
1840 nat_ip = "10.0.0.10"
1841 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1842 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1843 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1847 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1848 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1850 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
1851 TCP(sport=1234, dport=1234))
1852 self.pg0.add_stream(p)
1853 self.pg_enable_capture(self.pg_interfaces)
1855 p = self.pg1.get_capture(1)
1858 self.assertEqual(packet[IP].src, nat_ip)
1859 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1860 self.assertTrue(packet.haslayer(GRE))
1861 self.check_ip_checksum(packet)
1863 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1867 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1868 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1870 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
1871 TCP(sport=1234, dport=1234))
1872 self.pg1.add_stream(p)
1873 self.pg_enable_capture(self.pg_interfaces)
1875 p = self.pg0.get_capture(1)
1878 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
1879 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
1880 self.assertTrue(packet.haslayer(GRE))
1881 self.check_ip_checksum(packet)
1883 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1887 super(TestSNAT, self).tearDown()
1888 if not self.vpp_dead:
1889 self.logger.info(self.vapi.cli("show snat verbose"))
1893 class TestDeterministicNAT(MethodHolder):
1894 """ Deterministic NAT Test Cases """
1897 def setUpConstants(cls):
1898 super(TestDeterministicNAT, cls).setUpConstants()
1899 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1902 def setUpClass(cls):
1903 super(TestDeterministicNAT, cls).setUpClass()
1906 cls.tcp_port_in = 6303
1907 cls.tcp_external_port = 6303
1908 cls.udp_port_in = 6304
1909 cls.udp_external_port = 6304
1910 cls.icmp_id_in = 6305
1911 cls.snat_addr = '10.0.0.3'
1913 cls.create_pg_interfaces(range(3))
1914 cls.interfaces = list(cls.pg_interfaces)
1916 for i in cls.interfaces:
1921 cls.pg0.generate_remote_hosts(2)
1922 cls.pg0.configure_ipv4_neighbors()
1925 super(TestDeterministicNAT, cls).tearDownClass()
1928 def create_stream_in(self, in_if, out_if, ttl=64):
1930 Create packet stream for inside network
1932 :param in_if: Inside interface
1933 :param out_if: Outside interface
1934 :param ttl: TTL of generated packets
1938 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1939 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1940 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1944 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1945 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1946 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1950 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1951 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1952 ICMP(id=self.icmp_id_in, type='echo-request'))
1957 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1959 Create packet stream for outside network
1961 :param out_if: Outside interface
1962 :param dst_ip: Destination IP address (Default use global SNAT address)
1963 :param ttl: TTL of generated packets
1966 dst_ip = self.snat_addr
1969 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1970 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1971 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1975 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1976 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1977 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1981 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1982 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1983 ICMP(id=self.icmp_external_id, type='echo-reply'))
1988 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1990 Verify captured packets on outside network
1992 :param capture: Captured packets
1993 :param nat_ip: Translated IP address (Default use global SNAT address)
1994 :param same_port: Sorce port number is not translated (Default False)
1995 :param packet_num: Expected number of packets (Default 3)
1998 nat_ip = self.snat_addr
1999 self.assertEqual(packet_num, len(capture))
2000 for packet in capture:
2002 self.assertEqual(packet[IP].src, nat_ip)
2003 if packet.haslayer(TCP):
2004 self.tcp_port_out = packet[TCP].sport
2005 elif packet.haslayer(UDP):
2006 self.udp_port_out = packet[UDP].sport
2008 self.icmp_external_id = packet[ICMP].id
2010 self.logger.error(ppp("Unexpected or invalid packet "
2011 "(outside network):", packet))
2014 def initiate_tcp_session(self, in_if, out_if):
2016 Initiates TCP session
2018 :param in_if: Inside interface
2019 :param out_if: Outside interface
2022 # SYN packet in->out
2023 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2024 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2025 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2028 self.pg_enable_capture(self.pg_interfaces)
2030 capture = out_if.get_capture(1)
2032 self.tcp_port_out = p[TCP].sport
2034 # SYN + ACK packet out->in
2035 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2036 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
2037 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2039 out_if.add_stream(p)
2040 self.pg_enable_capture(self.pg_interfaces)
2042 in_if.get_capture(1)
2044 # ACK packet in->out
2045 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2046 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2047 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2050 self.pg_enable_capture(self.pg_interfaces)
2052 out_if.get_capture(1)
2055 self.logger.error("TCP 3 way handshake failed")
2058 def verify_ipfix_max_entries_per_user(self, data):
2060 Verify IPFIX maximum entries per user exceeded event
2062 :param data: Decoded IPFIX data records
2064 self.assertEqual(1, len(data))
2067 self.assertEqual(ord(record[230]), 13)
2068 # natQuotaExceededEvent
2069 self.assertEqual('\x03\x00\x00\x00', record[466])
2071 self.assertEqual(self.pg0.remote_ip4n, record[8])
2073 def test_deterministic_mode(self):
2074 """ S-NAT run deterministic mode """
2075 in_addr = '172.16.255.0'
2076 out_addr = '172.17.255.50'
2077 in_addr_t = '172.16.255.20'
2078 in_addr_n = socket.inet_aton(in_addr)
2079 out_addr_n = socket.inet_aton(out_addr)
2080 in_addr_t_n = socket.inet_aton(in_addr_t)
2084 snat_config = self.vapi.snat_show_config()
2085 self.assertEqual(1, snat_config.deterministic)
2087 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
2089 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
2090 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2091 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
2092 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2094 deterministic_mappings = self.vapi.snat_det_map_dump()
2095 self.assertEqual(len(deterministic_mappings), 1)
2096 dsm = deterministic_mappings[0]
2097 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2098 self.assertEqual(in_plen, dsm.in_plen)
2099 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2100 self.assertEqual(out_plen, dsm.out_plen)
2103 deterministic_mappings = self.vapi.snat_det_map_dump()
2104 self.assertEqual(len(deterministic_mappings), 0)
2106 def test_set_timeouts(self):
2107 """ Set deterministic NAT timeouts """
2108 timeouts_before = self.vapi.snat_det_get_timeouts()
2110 self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
2111 timeouts_before.tcp_established + 10,
2112 timeouts_before.tcp_transitory + 10,
2113 timeouts_before.icmp + 10)
2115 timeouts_after = self.vapi.snat_det_get_timeouts()
2117 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2118 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2119 self.assertNotEqual(timeouts_before.tcp_established,
2120 timeouts_after.tcp_established)
2121 self.assertNotEqual(timeouts_before.tcp_transitory,
2122 timeouts_after.tcp_transitory)
2124 def test_det_in(self):
2125 """ CGNAT translation test (TCP, UDP, ICMP) """
2127 nat_ip = "10.0.0.10"
2129 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2131 socket.inet_aton(nat_ip),
2133 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2134 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2138 pkts = self.create_stream_in(self.pg0, self.pg1)
2139 self.pg0.add_stream(pkts)
2140 self.pg_enable_capture(self.pg_interfaces)
2142 capture = self.pg1.get_capture(len(pkts))
2143 self.verify_capture_out(capture, nat_ip)
2146 pkts = self.create_stream_out(self.pg1, nat_ip)
2147 self.pg1.add_stream(pkts)
2148 self.pg_enable_capture(self.pg_interfaces)
2150 capture = self.pg0.get_capture(len(pkts))
2151 self.verify_capture_in(capture, self.pg0)
2154 sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2155 self.assertEqual(len(sessions), 3)
2159 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2160 self.assertEqual(s.in_port, self.tcp_port_in)
2161 self.assertEqual(s.out_port, self.tcp_port_out)
2162 self.assertEqual(s.ext_port, self.tcp_external_port)
2166 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2167 self.assertEqual(s.in_port, self.udp_port_in)
2168 self.assertEqual(s.out_port, self.udp_port_out)
2169 self.assertEqual(s.ext_port, self.udp_external_port)
2173 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2174 self.assertEqual(s.in_port, self.icmp_id_in)
2175 self.assertEqual(s.out_port, self.icmp_external_id)
2177 def test_multiple_users(self):
2178 """ CGNAT multiple users """
2180 nat_ip = "10.0.0.10"
2182 external_port = 6303
2184 host0 = self.pg0.remote_hosts[0]
2185 host1 = self.pg0.remote_hosts[1]
2187 self.vapi.snat_add_det_map(host0.ip4n,
2189 socket.inet_aton(nat_ip),
2191 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2192 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2196 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2197 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2198 TCP(sport=port_in, dport=external_port))
2199 self.pg0.add_stream(p)
2200 self.pg_enable_capture(self.pg_interfaces)
2202 capture = self.pg1.get_capture(1)
2207 self.assertEqual(ip.src, nat_ip)
2208 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2209 self.assertEqual(tcp.dport, external_port)
2210 port_out0 = tcp.sport
2212 self.logger.error(ppp("Unexpected or invalid packet:", p))
2216 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2217 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2218 TCP(sport=port_in, dport=external_port))
2219 self.pg0.add_stream(p)
2220 self.pg_enable_capture(self.pg_interfaces)
2222 capture = self.pg1.get_capture(1)
2227 self.assertEqual(ip.src, nat_ip)
2228 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2229 self.assertEqual(tcp.dport, external_port)
2230 port_out1 = tcp.sport
2232 self.logger.error(ppp("Unexpected or invalid packet:", p))
2235 dms = self.vapi.snat_det_map_dump()
2236 self.assertEqual(1, len(dms))
2237 self.assertEqual(2, dms[0].ses_num)
2240 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2241 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2242 TCP(sport=external_port, dport=port_out0))
2243 self.pg1.add_stream(p)
2244 self.pg_enable_capture(self.pg_interfaces)
2246 capture = self.pg0.get_capture(1)
2251 self.assertEqual(ip.src, self.pg1.remote_ip4)
2252 self.assertEqual(ip.dst, host0.ip4)
2253 self.assertEqual(tcp.dport, port_in)
2254 self.assertEqual(tcp.sport, external_port)
2256 self.logger.error(ppp("Unexpected or invalid packet:", p))
2260 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2261 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2262 TCP(sport=external_port, dport=port_out1))
2263 self.pg1.add_stream(p)
2264 self.pg_enable_capture(self.pg_interfaces)
2266 capture = self.pg0.get_capture(1)
2271 self.assertEqual(ip.src, self.pg1.remote_ip4)
2272 self.assertEqual(ip.dst, host1.ip4)
2273 self.assertEqual(tcp.dport, port_in)
2274 self.assertEqual(tcp.sport, external_port)
2276 self.logger.error(ppp("Unexpected or invalid packet", p))
2279 # session close api test
2280 self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2282 self.pg1.remote_ip4n,
2284 dms = self.vapi.snat_det_map_dump()
2285 self.assertEqual(dms[0].ses_num, 1)
2287 self.vapi.snat_det_close_session_in(host0.ip4n,
2289 self.pg1.remote_ip4n,
2291 dms = self.vapi.snat_det_map_dump()
2292 self.assertEqual(dms[0].ses_num, 0)
2294 def test_tcp_session_close_detection_in(self):
2295 """ CGNAT TCP session close initiated from inside network """
2296 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2298 socket.inet_aton(self.snat_addr),
2300 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2301 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2304 self.initiate_tcp_session(self.pg0, self.pg1)
2306 # close the session from inside
2308 # FIN packet in -> out
2309 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2310 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2311 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2313 self.pg0.add_stream(p)
2314 self.pg_enable_capture(self.pg_interfaces)
2316 self.pg1.get_capture(1)
2320 # ACK packet out -> in
2321 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2322 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2323 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2327 # FIN packet out -> in
2328 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2329 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2330 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2334 self.pg1.add_stream(pkts)
2335 self.pg_enable_capture(self.pg_interfaces)
2337 self.pg0.get_capture(2)
2339 # ACK packet in -> out
2340 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2341 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2342 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2344 self.pg0.add_stream(p)
2345 self.pg_enable_capture(self.pg_interfaces)
2347 self.pg1.get_capture(1)
2349 # Check if snat closed the session
2350 dms = self.vapi.snat_det_map_dump()
2351 self.assertEqual(0, dms[0].ses_num)
2353 self.logger.error("TCP session termination failed")
2356 def test_tcp_session_close_detection_out(self):
2357 """ CGNAT TCP session close initiated from outside network """
2358 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2360 socket.inet_aton(self.snat_addr),
2362 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2363 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2366 self.initiate_tcp_session(self.pg0, self.pg1)
2368 # close the session from outside
2370 # FIN packet out -> in
2371 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2372 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2373 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2375 self.pg1.add_stream(p)
2376 self.pg_enable_capture(self.pg_interfaces)
2378 self.pg0.get_capture(1)
2382 # ACK packet in -> out
2383 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2384 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2385 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2389 # ACK packet in -> out
2390 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2391 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2392 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2396 self.pg0.add_stream(pkts)
2397 self.pg_enable_capture(self.pg_interfaces)
2399 self.pg1.get_capture(2)
2401 # ACK packet out -> in
2402 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2403 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2404 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2406 self.pg1.add_stream(p)
2407 self.pg_enable_capture(self.pg_interfaces)
2409 self.pg0.get_capture(1)
2411 # Check if snat closed the session
2412 dms = self.vapi.snat_det_map_dump()
2413 self.assertEqual(0, dms[0].ses_num)
2415 self.logger.error("TCP session termination failed")
2418 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2419 def test_session_timeout(self):
2420 """ CGNAT session timeouts """
2421 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2423 socket.inet_aton(self.snat_addr),
2425 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2426 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2429 self.initiate_tcp_session(self.pg0, self.pg1)
2430 self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2431 pkts = self.create_stream_in(self.pg0, self.pg1)
2432 self.pg0.add_stream(pkts)
2433 self.pg_enable_capture(self.pg_interfaces)
2435 capture = self.pg1.get_capture(len(pkts))
2438 dms = self.vapi.snat_det_map_dump()
2439 self.assertEqual(0, dms[0].ses_num)
2441 def test_session_limit_per_user(self):
2442 """ CGNAT maximum 1000 sessions per user should be created """
2443 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2445 socket.inet_aton(self.snat_addr),
2447 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2448 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2450 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2451 src_address=self.pg2.local_ip4n,
2453 template_interval=10)
2454 self.vapi.snat_ipfix()
2457 for port in range(1025, 2025):
2458 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2459 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2460 UDP(sport=port, dport=port))
2463 self.pg0.add_stream(pkts)
2464 self.pg_enable_capture(self.pg_interfaces)
2466 capture = self.pg1.get_capture(len(pkts))
2468 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2469 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2470 UDP(sport=3001, dport=3002))
2471 self.pg0.add_stream(p)
2472 self.pg_enable_capture(self.pg_interfaces)
2474 capture = self.pg1.assert_nothing_captured()
2476 # verify ICMP error packet
2477 capture = self.pg0.get_capture(1)
2479 self.assertTrue(p.haslayer(ICMP))
2481 self.assertEqual(icmp.type, 3)
2482 self.assertEqual(icmp.code, 1)
2483 self.assertTrue(icmp.haslayer(IPerror))
2484 inner_ip = icmp[IPerror]
2485 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2486 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2488 dms = self.vapi.snat_det_map_dump()
2490 self.assertEqual(1000, dms[0].ses_num)
2492 # verify IPFIX logging
2493 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2494 capture = self.pg2.get_capture(2)
2495 ipfix = IPFIXDecoder()
2496 # first load template
2498 self.assertTrue(p.haslayer(IPFIX))
2499 if p.haslayer(Template):
2500 ipfix.add_template(p.getlayer(Template))
2501 # verify events in data set
2503 if p.haslayer(Data):
2504 data = ipfix.decode_data_set(p.getlayer(Set))
2505 self.verify_ipfix_max_entries_per_user(data)
2507 def clear_snat(self):
2509 Clear SNAT configuration.
2511 self.vapi.snat_ipfix(enable=0)
2512 self.vapi.snat_det_set_timeouts()
2513 deterministic_mappings = self.vapi.snat_det_map_dump()
2514 for dsm in deterministic_mappings:
2515 self.vapi.snat_add_det_map(dsm.in_addr,
2521 interfaces = self.vapi.snat_interface_dump()
2522 for intf in interfaces:
2523 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2528 super(TestDeterministicNAT, self).tearDown()
2529 if not self.vpp_dead:
2530 self.logger.info(self.vapi.cli("show snat detail"))
2534 class TestNAT64(MethodHolder):
2535 """ NAT64 Test Cases """
2538 def setUpClass(cls):
2539 super(TestNAT64, cls).setUpClass()
2542 cls.tcp_port_in = 6303
2543 cls.tcp_port_out = 6303
2544 cls.udp_port_in = 6304
2545 cls.udp_port_out = 6304
2546 cls.icmp_id_in = 6305
2547 cls.icmp_id_out = 6305
2548 cls.nat_addr = '10.0.0.3'
2549 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2551 cls.vrf1_nat_addr = '10.0.10.3'
2552 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2555 cls.create_pg_interfaces(range(3))
2556 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2557 cls.ip6_interfaces.append(cls.pg_interfaces[2])
2558 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2560 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2562 cls.pg0.generate_remote_hosts(2)
2564 for i in cls.ip6_interfaces:
2567 i.configure_ipv6_neighbors()
2569 for i in cls.ip4_interfaces:
2575 super(TestNAT64, cls).tearDownClass()
2578 def test_pool(self):
2579 """ Add/delete address to NAT64 pool """
2580 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2582 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2584 addresses = self.vapi.nat64_pool_addr_dump()
2585 self.assertEqual(len(addresses), 1)
2586 self.assertEqual(addresses[0].address, nat_addr)
2588 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2590 addresses = self.vapi.nat64_pool_addr_dump()
2591 self.assertEqual(len(addresses), 0)
2593 def test_interface(self):
2594 """ Enable/disable NAT64 feature on the interface """
2595 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2596 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2598 interfaces = self.vapi.nat64_interface_dump()
2599 self.assertEqual(len(interfaces), 2)
2602 for intf in interfaces:
2603 if intf.sw_if_index == self.pg0.sw_if_index:
2604 self.assertEqual(intf.is_inside, 1)
2606 elif intf.sw_if_index == self.pg1.sw_if_index:
2607 self.assertEqual(intf.is_inside, 0)
2609 self.assertTrue(pg0_found)
2610 self.assertTrue(pg1_found)
2612 features = self.vapi.cli("show interface features pg0")
2613 self.assertNotEqual(features.find('nat64-in2out'), -1)
2614 features = self.vapi.cli("show interface features pg1")
2615 self.assertNotEqual(features.find('nat64-out2in'), -1)
2617 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2618 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2620 interfaces = self.vapi.nat64_interface_dump()
2621 self.assertEqual(len(interfaces), 0)
2623 def test_static_bib(self):
2624 """ Add/delete static BIB entry """
2625 in_addr = socket.inet_pton(socket.AF_INET6,
2626 '2001:db8:85a3::8a2e:370:7334')
2627 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2630 proto = IP_PROTOS.tcp
2632 self.vapi.nat64_add_del_static_bib(in_addr,
2637 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2642 self.assertEqual(bibe.i_addr, in_addr)
2643 self.assertEqual(bibe.o_addr, out_addr)
2644 self.assertEqual(bibe.i_port, in_port)
2645 self.assertEqual(bibe.o_port, out_port)
2646 self.assertEqual(static_bib_num, 1)
2648 self.vapi.nat64_add_del_static_bib(in_addr,
2654 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2659 self.assertEqual(static_bib_num, 0)
2661 def test_set_timeouts(self):
2662 """ Set NAT64 timeouts """
2663 # verify default values
2664 timeouts = self.vapi.nat64_get_timeouts()
2665 self.assertEqual(timeouts.udp, 300)
2666 self.assertEqual(timeouts.icmp, 60)
2667 self.assertEqual(timeouts.tcp_trans, 240)
2668 self.assertEqual(timeouts.tcp_est, 7440)
2669 self.assertEqual(timeouts.tcp_incoming_syn, 6)
2671 # set and verify custom values
2672 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2673 tcp_est=7450, tcp_incoming_syn=10)
2674 timeouts = self.vapi.nat64_get_timeouts()
2675 self.assertEqual(timeouts.udp, 200)
2676 self.assertEqual(timeouts.icmp, 30)
2677 self.assertEqual(timeouts.tcp_trans, 250)
2678 self.assertEqual(timeouts.tcp_est, 7450)
2679 self.assertEqual(timeouts.tcp_incoming_syn, 10)
2681 def test_dynamic(self):
2682 """ NAT64 dynamic translation test """
2683 self.tcp_port_in = 6303
2684 self.udp_port_in = 6304
2685 self.icmp_id_in = 6305
2687 ses_num_start = self.nat64_get_ses_num()
2689 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2691 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2692 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2695 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2696 self.pg0.add_stream(pkts)
2697 self.pg_enable_capture(self.pg_interfaces)
2699 capture = self.pg1.get_capture(len(pkts))
2700 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2701 dst_ip=self.pg1.remote_ip4)
2704 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2705 self.pg1.add_stream(pkts)
2706 self.pg_enable_capture(self.pg_interfaces)
2708 capture = self.pg0.get_capture(len(pkts))
2709 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2710 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2713 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2714 self.pg0.add_stream(pkts)
2715 self.pg_enable_capture(self.pg_interfaces)
2717 capture = self.pg1.get_capture(len(pkts))
2718 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2719 dst_ip=self.pg1.remote_ip4)
2722 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2723 self.pg1.add_stream(pkts)
2724 self.pg_enable_capture(self.pg_interfaces)
2726 capture = self.pg0.get_capture(len(pkts))
2727 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2729 ses_num_end = self.nat64_get_ses_num()
2731 self.assertEqual(ses_num_end - ses_num_start, 3)
2733 # tenant with specific VRF
2734 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
2735 self.vrf1_nat_addr_n,
2736 vrf_id=self.vrf1_id)
2737 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
2739 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
2740 self.pg2.add_stream(pkts)
2741 self.pg_enable_capture(self.pg_interfaces)
2743 capture = self.pg1.get_capture(len(pkts))
2744 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
2745 dst_ip=self.pg1.remote_ip4)
2747 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
2748 self.pg1.add_stream(pkts)
2749 self.pg_enable_capture(self.pg_interfaces)
2751 capture = self.pg2.get_capture(len(pkts))
2752 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
2754 def test_static(self):
2755 """ NAT64 static translation test """
2756 self.tcp_port_in = 60303
2757 self.udp_port_in = 60304
2758 self.icmp_id_in = 60305
2759 self.tcp_port_out = 60303
2760 self.udp_port_out = 60304
2761 self.icmp_id_out = 60305
2763 ses_num_start = self.nat64_get_ses_num()
2765 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2767 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2768 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2770 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2775 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2780 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2787 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2788 self.pg0.add_stream(pkts)
2789 self.pg_enable_capture(self.pg_interfaces)
2791 capture = self.pg1.get_capture(len(pkts))
2792 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2793 dst_ip=self.pg1.remote_ip4, same_port=True)
2796 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2797 self.pg1.add_stream(pkts)
2798 self.pg_enable_capture(self.pg_interfaces)
2800 capture = self.pg0.get_capture(len(pkts))
2801 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2802 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2804 ses_num_end = self.nat64_get_ses_num()
2806 self.assertEqual(ses_num_end - ses_num_start, 3)
2808 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2809 def test_session_timeout(self):
2810 """ NAT64 session timeout """
2811 self.icmp_id_in = 1234
2812 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2814 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2815 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2816 self.vapi.nat64_set_timeouts(icmp=5)
2818 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2819 self.pg0.add_stream(pkts)
2820 self.pg_enable_capture(self.pg_interfaces)
2822 capture = self.pg1.get_capture(len(pkts))
2824 ses_num_before_timeout = self.nat64_get_ses_num()
2828 # ICMP session after timeout
2829 ses_num_after_timeout = self.nat64_get_ses_num()
2830 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2832 def test_icmp_error(self):
2833 """ NAT64 ICMP Error message translation """
2834 self.tcp_port_in = 6303
2835 self.udp_port_in = 6304
2836 self.icmp_id_in = 6305
2838 ses_num_start = self.nat64_get_ses_num()
2840 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2842 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2843 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2845 # send some packets to create sessions
2846 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2847 self.pg0.add_stream(pkts)
2848 self.pg_enable_capture(self.pg_interfaces)
2850 capture_ip4 = self.pg1.get_capture(len(pkts))
2851 self.verify_capture_out(capture_ip4,
2852 nat_ip=self.nat_addr,
2853 dst_ip=self.pg1.remote_ip4)
2855 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2856 self.pg1.add_stream(pkts)
2857 self.pg_enable_capture(self.pg_interfaces)
2859 capture_ip6 = self.pg0.get_capture(len(pkts))
2860 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2861 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
2862 self.pg0.remote_ip6)
2865 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2866 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
2867 ICMPv6DestUnreach(code=1) /
2868 packet[IPv6] for packet in capture_ip6]
2869 self.pg0.add_stream(pkts)
2870 self.pg_enable_capture(self.pg_interfaces)
2872 capture = self.pg1.get_capture(len(pkts))
2873 for packet in capture:
2875 self.assertEqual(packet[IP].src, self.nat_addr)
2876 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2877 self.assertEqual(packet[ICMP].type, 3)
2878 self.assertEqual(packet[ICMP].code, 13)
2879 inner = packet[IPerror]
2880 self.assertEqual(inner.src, self.pg1.remote_ip4)
2881 self.assertEqual(inner.dst, self.nat_addr)
2882 self.check_icmp_checksum(packet)
2883 if inner.haslayer(TCPerror):
2884 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
2885 elif inner.haslayer(UDPerror):
2886 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
2888 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
2890 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2894 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2895 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2896 ICMP(type=3, code=13) /
2897 packet[IP] for packet in capture_ip4]
2898 self.pg1.add_stream(pkts)
2899 self.pg_enable_capture(self.pg_interfaces)
2901 capture = self.pg0.get_capture(len(pkts))
2902 for packet in capture:
2904 self.assertEqual(packet[IPv6].src, ip.src)
2905 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
2906 icmp = packet[ICMPv6DestUnreach]
2907 self.assertEqual(icmp.code, 1)
2908 inner = icmp[IPerror6]
2909 self.assertEqual(inner.src, self.pg0.remote_ip6)
2910 self.assertEqual(inner.dst, ip.src)
2911 self.check_icmpv6_checksum(packet)
2912 if inner.haslayer(TCPerror):
2913 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
2914 elif inner.haslayer(UDPerror):
2915 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
2917 self.assertEqual(inner[ICMPv6EchoRequest].id,
2920 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2923 def test_hairpinning(self):
2924 """ NAT64 hairpinning """
2926 client = self.pg0.remote_hosts[0]
2927 server = self.pg0.remote_hosts[1]
2928 server_tcp_in_port = 22
2929 server_tcp_out_port = 4022
2930 server_udp_in_port = 23
2931 server_udp_out_port = 4023
2932 client_tcp_in_port = 1234
2933 client_udp_in_port = 1235
2934 client_tcp_out_port = 0
2935 client_udp_out_port = 0
2936 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
2937 nat_addr_ip6 = ip.src
2939 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2941 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2942 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2944 self.vapi.nat64_add_del_static_bib(server.ip6n,
2947 server_tcp_out_port,
2949 self.vapi.nat64_add_del_static_bib(server.ip6n,
2952 server_udp_out_port,
2957 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2958 IPv6(src=client.ip6, dst=nat_addr_ip6) /
2959 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
2961 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2962 IPv6(src=client.ip6, dst=nat_addr_ip6) /
2963 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
2965 self.pg0.add_stream(pkts)
2966 self.pg_enable_capture(self.pg_interfaces)
2968 capture = self.pg0.get_capture(len(pkts))
2969 for packet in capture:
2971 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
2972 self.assertEqual(packet[IPv6].dst, server.ip6)
2973 if packet.haslayer(TCP):
2974 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
2975 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
2976 self.check_tcp_checksum(packet)
2977 client_tcp_out_port = packet[TCP].sport
2979 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
2980 self.assertEqual(packet[UDP].dport, server_udp_in_port)
2981 self.check_udp_checksum(packet)
2982 client_udp_out_port = packet[UDP].sport
2984 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2989 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2990 IPv6(src=server.ip6, dst=nat_addr_ip6) /
2991 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
2993 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2994 IPv6(src=server.ip6, dst=nat_addr_ip6) /
2995 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
2997 self.pg0.add_stream(pkts)
2998 self.pg_enable_capture(self.pg_interfaces)
3000 capture = self.pg0.get_capture(len(pkts))
3001 for packet in capture:
3003 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3004 self.assertEqual(packet[IPv6].dst, client.ip6)
3005 if packet.haslayer(TCP):
3006 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3007 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3008 self.check_tcp_checksum(packet)
3010 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3011 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3012 self.check_udp_checksum(packet)
3014 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3019 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3020 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3021 ICMPv6DestUnreach(code=1) /
3022 packet[IPv6] for packet in capture]
3023 self.pg0.add_stream(pkts)
3024 self.pg_enable_capture(self.pg_interfaces)
3026 capture = self.pg0.get_capture(len(pkts))
3027 for packet in capture:
3029 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3030 self.assertEqual(packet[IPv6].dst, server.ip6)
3031 icmp = packet[ICMPv6DestUnreach]
3032 self.assertEqual(icmp.code, 1)
3033 inner = icmp[IPerror6]
3034 self.assertEqual(inner.src, server.ip6)
3035 self.assertEqual(inner.dst, nat_addr_ip6)
3036 self.check_icmpv6_checksum(packet)
3037 if inner.haslayer(TCPerror):
3038 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3039 self.assertEqual(inner[TCPerror].dport,
3040 client_tcp_out_port)
3042 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3043 self.assertEqual(inner[UDPerror].dport,
3044 client_udp_out_port)
3046 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3049 def nat64_get_ses_num(self):
3051 Return number of active NAT64 sessions.
3054 st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
3056 st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
3058 st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
3062 def clear_nat64(self):
3064 Clear NAT64 configuration.
3066 self.vapi.nat64_set_timeouts()
3068 interfaces = self.vapi.nat64_interface_dump()
3069 for intf in interfaces:
3070 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3074 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3077 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3085 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3088 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3096 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3099 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3107 adresses = self.vapi.nat64_pool_addr_dump()
3108 for addr in adresses:
3109 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3115 super(TestNAT64, self).tearDown()
3116 if not self.vpp_dead:
3117 self.logger.info(self.vapi.cli("show nat64 pool"))
3118 self.logger.info(self.vapi.cli("show nat64 interfaces"))
3119 self.logger.info(self.vapi.cli("show nat64 bib tcp"))
3120 self.logger.info(self.vapi.cli("show nat64 bib udp"))
3121 self.logger.info(self.vapi.cli("show nat64 bib icmp"))
3122 self.logger.info(self.vapi.cli("show nat64 session table tcp"))
3123 self.logger.info(self.vapi.cli("show nat64 session table udp"))
3124 self.logger.info(self.vapi.cli("show nat64 session table icmp"))
3127 if __name__ == '__main__':
3128 unittest.main(testRunner=VppTestRunner)