7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
20 class MethodHolder(VppTestCase):
21 """ SNAT create capture and verify method holder """
25 super(MethodHolder, cls).setUpClass()
28 super(MethodHolder, self).tearDown()
30 def check_ip_checksum(self, pkt):
32 Check IP checksum of the packet
34 :param pkt: Packet to check IP checksum
36 new = pkt.__class__(str(pkt))
38 new = new.__class__(str(new))
39 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
41 def check_tcp_checksum(self, pkt):
43 Check TCP checksum in IP packet
45 :param pkt: Packet to check TCP checksum
47 new = pkt.__class__(str(pkt))
49 new = new.__class__(str(new))
50 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
52 def check_udp_checksum(self, pkt):
54 Check UDP checksum in IP packet
56 :param pkt: Packet to check UDP checksum
58 new = pkt.__class__(str(pkt))
60 new = new.__class__(str(new))
61 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
63 def check_icmp_errror_embedded(self, pkt):
65 Check ICMP error embeded packet checksum
67 :param pkt: Packet to check ICMP error embeded packet checksum
69 if pkt.haslayer(IPerror):
70 new = pkt.__class__(str(pkt))
71 del new['IPerror'].chksum
72 new = new.__class__(str(new))
73 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
75 if pkt.haslayer(TCPerror):
76 new = pkt.__class__(str(pkt))
77 del new['TCPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
81 if pkt.haslayer(UDPerror):
82 if pkt['UDPerror'].chksum != 0:
83 new = pkt.__class__(str(pkt))
84 del new['UDPerror'].chksum
85 new = new.__class__(str(new))
86 self.assertEqual(new['UDPerror'].chksum,
87 pkt['UDPerror'].chksum)
89 if pkt.haslayer(ICMPerror):
90 del new['ICMPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
94 def check_icmp_checksum(self, pkt):
96 Check ICMP checksum in IPv4 packet
98 :param pkt: Packet to check ICMP checksum
100 new = pkt.__class__(str(pkt))
101 del new['ICMP'].chksum
102 new = new.__class__(str(new))
103 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104 if pkt.haslayer(IPerror):
105 self.check_icmp_errror_embedded(pkt)
107 def check_icmpv6_checksum(self, pkt):
109 Check ICMPv6 checksum in IPv4 packet
111 :param pkt: Packet to check ICMPv6 checksum
113 new = pkt.__class__(str(pkt))
114 if pkt.haslayer(ICMPv6DestUnreach):
115 del new['ICMPv6DestUnreach'].cksum
116 new = new.__class__(str(new))
117 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118 pkt['ICMPv6DestUnreach'].cksum)
119 self.check_icmp_errror_embedded(pkt)
120 if pkt.haslayer(ICMPv6EchoRequest):
121 del new['ICMPv6EchoRequest'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124 pkt['ICMPv6EchoRequest'].cksum)
125 if pkt.haslayer(ICMPv6EchoReply):
126 del new['ICMPv6EchoReply'].cksum
127 new = new.__class__(str(new))
128 self.assertEqual(new['ICMPv6EchoReply'].cksum,
129 pkt['ICMPv6EchoReply'].cksum)
131 def create_stream_in(self, in_if, out_if, ttl=64):
133 Create packet stream for inside network
135 :param in_if: Inside interface
136 :param out_if: Outside interface
137 :param ttl: TTL of generated packets
141 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143 TCP(sport=self.tcp_port_in, dport=20))
147 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149 UDP(sport=self.udp_port_in, dport=20))
153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155 ICMP(id=self.icmp_id_in, type='echo-request'))
160 def compose_ip6(self, ip4, pref, plen):
162 Compose IPv4-embedded IPv6 addresses
164 :param ip4: IPv4 address
165 :param pref: IPv6 prefix
166 :param plen: IPv6 prefix length
167 :returns: IPv4-embedded IPv6 addresses
169 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
170 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
185 pref_n[10] = ip4_n[3]
189 pref_n[10] = ip4_n[2]
190 pref_n[11] = ip4_n[3]
193 pref_n[10] = ip4_n[1]
194 pref_n[11] = ip4_n[2]
195 pref_n[12] = ip4_n[3]
197 pref_n[12] = ip4_n[0]
198 pref_n[13] = ip4_n[1]
199 pref_n[14] = ip4_n[2]
200 pref_n[15] = ip4_n[3]
201 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
203 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
205 Create IPv6 packet stream for inside network
207 :param in_if: Inside interface
208 :param out_if: Outside interface
209 :param ttl: Hop Limit of generated packets
210 :param pref: NAT64 prefix
211 :param plen: NAT64 prefix length
215 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
217 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
220 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
221 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
222 TCP(sport=self.tcp_port_in, dport=20))
226 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228 UDP(sport=self.udp_port_in, dport=20))
232 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234 ICMPv6EchoRequest(id=self.icmp_id_in))
239 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
241 Create packet stream for outside network
243 :param out_if: Outside interface
244 :param dst_ip: Destination IP address (Default use global SNAT address)
245 :param ttl: TTL of generated packets
248 dst_ip = self.snat_addr
251 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
252 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
253 TCP(dport=self.tcp_port_out, sport=20))
257 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259 UDP(dport=self.udp_port_out, sport=20))
263 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265 ICMP(id=self.icmp_id_out, type='echo-reply'))
270 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
271 packet_num=3, dst_ip=None):
273 Verify captured packets on outside network
275 :param capture: Captured packets
276 :param nat_ip: Translated IP address (Default use global SNAT address)
277 :param same_port: Sorce port number is not translated (Default False)
278 :param packet_num: Expected number of packets (Default 3)
279 :param dst_ip: Destination IP address (Default do not verify)
282 nat_ip = self.snat_addr
283 self.assertEqual(packet_num, len(capture))
284 for packet in capture:
286 self.check_ip_checksum(packet)
287 self.assertEqual(packet[IP].src, nat_ip)
288 if dst_ip is not None:
289 self.assertEqual(packet[IP].dst, dst_ip)
290 if packet.haslayer(TCP):
292 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
295 packet[TCP].sport, self.tcp_port_in)
296 self.tcp_port_out = packet[TCP].sport
297 self.check_tcp_checksum(packet)
298 elif packet.haslayer(UDP):
300 self.assertEqual(packet[UDP].sport, self.udp_port_in)
303 packet[UDP].sport, self.udp_port_in)
304 self.udp_port_out = packet[UDP].sport
307 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
309 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
310 self.icmp_id_out = packet[ICMP].id
311 self.check_icmp_checksum(packet)
313 self.logger.error(ppp("Unexpected or invalid packet "
314 "(outside network):", packet))
317 def verify_capture_in(self, capture, in_if, packet_num=3):
319 Verify captured packets on inside network
321 :param capture: Captured packets
322 :param in_if: Inside interface
323 :param packet_num: Expected number of packets (Default 3)
325 self.assertEqual(packet_num, len(capture))
326 for packet in capture:
328 self.check_ip_checksum(packet)
329 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
330 if packet.haslayer(TCP):
331 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
332 self.check_tcp_checksum(packet)
333 elif packet.haslayer(UDP):
334 self.assertEqual(packet[UDP].dport, self.udp_port_in)
336 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
337 self.check_icmp_checksum(packet)
339 self.logger.error(ppp("Unexpected or invalid packet "
340 "(inside network):", packet))
343 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
345 Verify captured IPv6 packets on inside network
347 :param capture: Captured packets
348 :param src_ip: Source IP
349 :param dst_ip: Destination IP address
350 :param packet_num: Expected number of packets (Default 3)
352 self.assertEqual(packet_num, len(capture))
353 for packet in capture:
355 self.assertEqual(packet[IPv6].src, src_ip)
356 self.assertEqual(packet[IPv6].dst, dst_ip)
357 if packet.haslayer(TCP):
358 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
359 self.check_tcp_checksum(packet)
360 elif packet.haslayer(UDP):
361 self.assertEqual(packet[UDP].dport, self.udp_port_in)
362 self.check_udp_checksum(packet)
364 self.assertEqual(packet[ICMPv6EchoReply].id,
366 self.check_icmpv6_checksum(packet)
368 self.logger.error(ppp("Unexpected or invalid packet "
369 "(inside network):", packet))
372 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
374 Verify captured packet that don't have to be translated
376 :param capture: Captured packets
377 :param ingress_if: Ingress interface
378 :param egress_if: Egress interface
380 for packet in capture:
382 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
383 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
384 if packet.haslayer(TCP):
385 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
386 elif packet.haslayer(UDP):
387 self.assertEqual(packet[UDP].sport, self.udp_port_in)
389 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
391 self.logger.error(ppp("Unexpected or invalid packet "
392 "(inside network):", packet))
395 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
396 packet_num=3, icmp_type=11):
398 Verify captured packets with ICMP errors on outside network
400 :param capture: Captured packets
401 :param src_ip: Translated IP address or IP address of VPP
402 (Default use global SNAT address)
403 :param packet_num: Expected number of packets (Default 3)
404 :param icmp_type: Type of error ICMP packet
405 we are expecting (Default 11)
408 src_ip = self.snat_addr
409 self.assertEqual(packet_num, len(capture))
410 for packet in capture:
412 self.assertEqual(packet[IP].src, src_ip)
413 self.assertTrue(packet.haslayer(ICMP))
415 self.assertEqual(icmp.type, icmp_type)
416 self.assertTrue(icmp.haslayer(IPerror))
417 inner_ip = icmp[IPerror]
418 if inner_ip.haslayer(TCPerror):
419 self.assertEqual(inner_ip[TCPerror].dport,
421 elif inner_ip.haslayer(UDPerror):
422 self.assertEqual(inner_ip[UDPerror].dport,
425 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
427 self.logger.error(ppp("Unexpected or invalid packet "
428 "(outside network):", packet))
431 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
434 Verify captured packets with ICMP errors on inside network
436 :param capture: Captured packets
437 :param in_if: Inside interface
438 :param packet_num: Expected number of packets (Default 3)
439 :param icmp_type: Type of error ICMP packet
440 we are expecting (Default 11)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
446 self.assertTrue(packet.haslayer(ICMP))
448 self.assertEqual(icmp.type, icmp_type)
449 self.assertTrue(icmp.haslayer(IPerror))
450 inner_ip = icmp[IPerror]
451 if inner_ip.haslayer(TCPerror):
452 self.assertEqual(inner_ip[TCPerror].sport,
454 elif inner_ip.haslayer(UDPerror):
455 self.assertEqual(inner_ip[UDPerror].sport,
458 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
460 self.logger.error(ppp("Unexpected or invalid packet "
461 "(inside network):", packet))
464 def verify_ipfix_nat44_ses(self, data):
466 Verify IPFIX NAT44 session create/delete event
468 :param data: Decoded IPFIX data records
470 nat44_ses_create_num = 0
471 nat44_ses_delete_num = 0
472 self.assertEqual(6, len(data))
475 self.assertIn(ord(record[230]), [4, 5])
476 if ord(record[230]) == 4:
477 nat44_ses_create_num += 1
479 nat44_ses_delete_num += 1
481 self.assertEqual(self.pg0.remote_ip4n, record[8])
482 # postNATSourceIPv4Address
483 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
486 self.assertEqual(struct.pack("!I", 0), record[234])
487 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
488 if IP_PROTOS.icmp == ord(record[4]):
489 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
490 self.assertEqual(struct.pack("!H", self.icmp_id_out),
492 elif IP_PROTOS.tcp == ord(record[4]):
493 self.assertEqual(struct.pack("!H", self.tcp_port_in),
495 self.assertEqual(struct.pack("!H", self.tcp_port_out),
497 elif IP_PROTOS.udp == ord(record[4]):
498 self.assertEqual(struct.pack("!H", self.udp_port_in),
500 self.assertEqual(struct.pack("!H", self.udp_port_out),
503 self.fail("Invalid protocol")
504 self.assertEqual(3, nat44_ses_create_num)
505 self.assertEqual(3, nat44_ses_delete_num)
507 def verify_ipfix_addr_exhausted(self, data):
509 Verify IPFIX NAT addresses event
511 :param data: Decoded IPFIX data records
513 self.assertEqual(1, len(data))
516 self.assertEqual(ord(record[230]), 3)
518 self.assertEqual(struct.pack("!I", 0), record[283])
521 class TestSNAT(MethodHolder):
522 """ SNAT Test Cases """
526 super(TestSNAT, cls).setUpClass()
529 cls.tcp_port_in = 6303
530 cls.tcp_port_out = 6303
531 cls.udp_port_in = 6304
532 cls.udp_port_out = 6304
533 cls.icmp_id_in = 6305
534 cls.icmp_id_out = 6305
535 cls.snat_addr = '10.0.0.3'
536 cls.ipfix_src_port = 4739
537 cls.ipfix_domain_id = 1
539 cls.create_pg_interfaces(range(9))
540 cls.interfaces = list(cls.pg_interfaces[0:4])
542 for i in cls.interfaces:
547 cls.pg0.generate_remote_hosts(3)
548 cls.pg0.configure_ipv4_neighbors()
550 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
552 cls.pg4._local_ip4 = "172.16.255.1"
553 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
554 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
555 cls.pg4.set_table_ip4(10)
556 cls.pg5._local_ip4 = "172.16.255.3"
557 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
558 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
559 cls.pg5.set_table_ip4(10)
560 cls.pg6._local_ip4 = "172.16.255.1"
561 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
562 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
563 cls.pg6.set_table_ip4(20)
564 for i in cls.overlapping_interfaces:
573 super(TestSNAT, cls).tearDownClass()
576 def clear_snat(self):
578 Clear SNAT configuration.
580 # I found no elegant way to do this
581 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
582 dst_address_length=32,
583 next_hop_address=self.pg7.remote_ip4n,
584 next_hop_sw_if_index=self.pg7.sw_if_index,
586 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
587 dst_address_length=32,
588 next_hop_address=self.pg8.remote_ip4n,
589 next_hop_sw_if_index=self.pg8.sw_if_index,
592 for intf in [self.pg7, self.pg8]:
593 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
595 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
600 if self.pg7.has_ip4_config:
601 self.pg7.unconfig_ip4()
603 interfaces = self.vapi.snat_interface_addr_dump()
604 for intf in interfaces:
605 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
607 self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
608 domain_id=self.ipfix_domain_id)
609 self.ipfix_src_port = 4739
610 self.ipfix_domain_id = 1
612 interfaces = self.vapi.snat_interface_dump()
613 for intf in interfaces:
614 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
618 static_mappings = self.vapi.snat_static_mapping_dump()
619 for sm in static_mappings:
620 self.vapi.snat_add_static_mapping(sm.local_ip_address,
621 sm.external_ip_address,
622 local_port=sm.local_port,
623 external_port=sm.external_port,
624 addr_only=sm.addr_only,
626 protocol=sm.protocol,
629 adresses = self.vapi.snat_address_dump()
630 for addr in adresses:
631 self.vapi.snat_add_address_range(addr.ip_address,
635 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
636 local_port=0, external_port=0, vrf_id=0,
637 is_add=1, external_sw_if_index=0xFFFFFFFF,
640 Add/delete S-NAT static mapping
642 :param local_ip: Local IP address
643 :param external_ip: External IP address
644 :param local_port: Local port number (Optional)
645 :param external_port: External port number (Optional)
646 :param vrf_id: VRF ID (Default 0)
647 :param is_add: 1 if add, 0 if delete (Default add)
648 :param external_sw_if_index: External interface instead of IP address
649 :param proto: IP protocol (Mandatory if port specified)
652 if local_port and external_port:
654 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
655 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
656 self.vapi.snat_add_static_mapping(
659 external_sw_if_index,
667 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
669 Add/delete S-NAT address
671 :param ip: IP address
672 :param is_add: 1 if add, 0 if delete (Default add)
674 snat_addr = socket.inet_pton(socket.AF_INET, ip)
675 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
678 def test_dynamic(self):
679 """ SNAT dynamic translation test """
681 self.snat_add_address(self.snat_addr)
682 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
683 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
687 pkts = self.create_stream_in(self.pg0, self.pg1)
688 self.pg0.add_stream(pkts)
689 self.pg_enable_capture(self.pg_interfaces)
691 capture = self.pg1.get_capture(len(pkts))
692 self.verify_capture_out(capture)
695 pkts = self.create_stream_out(self.pg1)
696 self.pg1.add_stream(pkts)
697 self.pg_enable_capture(self.pg_interfaces)
699 capture = self.pg0.get_capture(len(pkts))
700 self.verify_capture_in(capture, self.pg0)
702 def test_dynamic_icmp_errors_in2out_ttl_1(self):
703 """ SNAT handling of client packets with TTL=1 """
705 self.snat_add_address(self.snat_addr)
706 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
707 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
710 # Client side - generate traffic
711 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
712 self.pg0.add_stream(pkts)
713 self.pg_enable_capture(self.pg_interfaces)
716 # Client side - verify ICMP type 11 packets
717 capture = self.pg0.get_capture(len(pkts))
718 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
720 def test_dynamic_icmp_errors_out2in_ttl_1(self):
721 """ SNAT handling of server packets with TTL=1 """
723 self.snat_add_address(self.snat_addr)
724 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
725 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
728 # Client side - create sessions
729 pkts = self.create_stream_in(self.pg0, self.pg1)
730 self.pg0.add_stream(pkts)
731 self.pg_enable_capture(self.pg_interfaces)
734 # Server side - generate traffic
735 capture = self.pg1.get_capture(len(pkts))
736 self.verify_capture_out(capture)
737 pkts = self.create_stream_out(self.pg1, ttl=1)
738 self.pg1.add_stream(pkts)
739 self.pg_enable_capture(self.pg_interfaces)
742 # Server side - verify ICMP type 11 packets
743 capture = self.pg1.get_capture(len(pkts))
744 self.verify_capture_out_with_icmp_errors(capture,
745 src_ip=self.pg1.local_ip4)
747 def test_dynamic_icmp_errors_in2out_ttl_2(self):
748 """ SNAT handling of error responses to client packets with TTL=2 """
750 self.snat_add_address(self.snat_addr)
751 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
752 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
755 # Client side - generate traffic
756 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
757 self.pg0.add_stream(pkts)
758 self.pg_enable_capture(self.pg_interfaces)
761 # Server side - simulate ICMP type 11 response
762 capture = self.pg1.get_capture(len(pkts))
763 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
764 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
765 ICMP(type=11) / packet[IP] for packet in capture]
766 self.pg1.add_stream(pkts)
767 self.pg_enable_capture(self.pg_interfaces)
770 # Client side - verify ICMP type 11 packets
771 capture = self.pg0.get_capture(len(pkts))
772 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
774 def test_dynamic_icmp_errors_out2in_ttl_2(self):
775 """ SNAT handling of error responses to server packets with TTL=2 """
777 self.snat_add_address(self.snat_addr)
778 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
779 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
782 # Client side - create sessions
783 pkts = self.create_stream_in(self.pg0, self.pg1)
784 self.pg0.add_stream(pkts)
785 self.pg_enable_capture(self.pg_interfaces)
788 # Server side - generate traffic
789 capture = self.pg1.get_capture(len(pkts))
790 self.verify_capture_out(capture)
791 pkts = self.create_stream_out(self.pg1, ttl=2)
792 self.pg1.add_stream(pkts)
793 self.pg_enable_capture(self.pg_interfaces)
796 # Client side - simulate ICMP type 11 response
797 capture = self.pg0.get_capture(len(pkts))
798 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
799 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
800 ICMP(type=11) / packet[IP] for packet in capture]
801 self.pg0.add_stream(pkts)
802 self.pg_enable_capture(self.pg_interfaces)
805 # Server side - verify ICMP type 11 packets
806 capture = self.pg1.get_capture(len(pkts))
807 self.verify_capture_out_with_icmp_errors(capture)
809 def test_ping_out_interface_from_outside(self):
810 """ Ping SNAT out interface from outside network """
812 self.snat_add_address(self.snat_addr)
813 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
814 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
817 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
818 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
819 ICMP(id=self.icmp_id_out, type='echo-request'))
821 self.pg1.add_stream(pkts)
822 self.pg_enable_capture(self.pg_interfaces)
824 capture = self.pg1.get_capture(len(pkts))
825 self.assertEqual(1, len(capture))
828 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
829 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
830 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
831 self.assertEqual(packet[ICMP].type, 0) # echo reply
833 self.logger.error(ppp("Unexpected or invalid packet "
834 "(outside network):", packet))
837 def test_ping_internal_host_from_outside(self):
838 """ Ping internal host from outside network """
840 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
841 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
842 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
846 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
847 IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
848 ICMP(id=self.icmp_id_out, type='echo-request'))
849 self.pg1.add_stream(pkt)
850 self.pg_enable_capture(self.pg_interfaces)
852 capture = self.pg0.get_capture(1)
853 self.verify_capture_in(capture, self.pg0, packet_num=1)
854 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
857 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
858 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
859 ICMP(id=self.icmp_id_in, type='echo-reply'))
860 self.pg0.add_stream(pkt)
861 self.pg_enable_capture(self.pg_interfaces)
863 capture = self.pg1.get_capture(1)
864 self.verify_capture_out(capture, same_port=True, packet_num=1)
865 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
867 def test_static_in(self):
868 """ SNAT 1:1 NAT initialized from inside network """
871 self.tcp_port_out = 6303
872 self.udp_port_out = 6304
873 self.icmp_id_out = 6305
875 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
876 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
877 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
881 pkts = self.create_stream_in(self.pg0, self.pg1)
882 self.pg0.add_stream(pkts)
883 self.pg_enable_capture(self.pg_interfaces)
885 capture = self.pg1.get_capture(len(pkts))
886 self.verify_capture_out(capture, nat_ip, True)
889 pkts = self.create_stream_out(self.pg1, nat_ip)
890 self.pg1.add_stream(pkts)
891 self.pg_enable_capture(self.pg_interfaces)
893 capture = self.pg0.get_capture(len(pkts))
894 self.verify_capture_in(capture, self.pg0)
896 def test_static_out(self):
897 """ SNAT 1:1 NAT initialized from outside network """
900 self.tcp_port_out = 6303
901 self.udp_port_out = 6304
902 self.icmp_id_out = 6305
904 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
905 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
906 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
910 pkts = self.create_stream_out(self.pg1, nat_ip)
911 self.pg1.add_stream(pkts)
912 self.pg_enable_capture(self.pg_interfaces)
914 capture = self.pg0.get_capture(len(pkts))
915 self.verify_capture_in(capture, self.pg0)
918 pkts = self.create_stream_in(self.pg0, self.pg1)
919 self.pg0.add_stream(pkts)
920 self.pg_enable_capture(self.pg_interfaces)
922 capture = self.pg1.get_capture(len(pkts))
923 self.verify_capture_out(capture, nat_ip, True)
925 def test_static_with_port_in(self):
926 """ SNAT 1:1 NAT with port initialized from inside network """
928 self.tcp_port_out = 3606
929 self.udp_port_out = 3607
930 self.icmp_id_out = 3608
932 self.snat_add_address(self.snat_addr)
933 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
934 self.tcp_port_in, self.tcp_port_out,
936 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
937 self.udp_port_in, self.udp_port_out,
939 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
940 self.icmp_id_in, self.icmp_id_out,
941 proto=IP_PROTOS.icmp)
942 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
943 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
947 pkts = self.create_stream_in(self.pg0, self.pg1)
948 self.pg0.add_stream(pkts)
949 self.pg_enable_capture(self.pg_interfaces)
951 capture = self.pg1.get_capture(len(pkts))
952 self.verify_capture_out(capture)
955 pkts = self.create_stream_out(self.pg1)
956 self.pg1.add_stream(pkts)
957 self.pg_enable_capture(self.pg_interfaces)
959 capture = self.pg0.get_capture(len(pkts))
960 self.verify_capture_in(capture, self.pg0)
962 def test_static_with_port_out(self):
963 """ SNAT 1:1 NAT with port initialized from outside network """
965 self.tcp_port_out = 30606
966 self.udp_port_out = 30607
967 self.icmp_id_out = 30608
969 self.snat_add_address(self.snat_addr)
970 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
971 self.tcp_port_in, self.tcp_port_out,
973 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
974 self.udp_port_in, self.udp_port_out,
976 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
977 self.icmp_id_in, self.icmp_id_out,
978 proto=IP_PROTOS.icmp)
979 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
980 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
984 pkts = self.create_stream_out(self.pg1)
985 self.pg1.add_stream(pkts)
986 self.pg_enable_capture(self.pg_interfaces)
988 capture = self.pg0.get_capture(len(pkts))
989 self.verify_capture_in(capture, self.pg0)
992 pkts = self.create_stream_in(self.pg0, self.pg1)
993 self.pg0.add_stream(pkts)
994 self.pg_enable_capture(self.pg_interfaces)
996 capture = self.pg1.get_capture(len(pkts))
997 self.verify_capture_out(capture)
999 def test_static_vrf_aware(self):
1000 """ SNAT 1:1 NAT VRF awareness """
1002 nat_ip1 = "10.0.0.30"
1003 nat_ip2 = "10.0.0.40"
1004 self.tcp_port_out = 6303
1005 self.udp_port_out = 6304
1006 self.icmp_id_out = 6305
1008 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1010 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1012 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1014 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1015 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1017 # inside interface VRF match SNAT static mapping VRF
1018 pkts = self.create_stream_in(self.pg4, self.pg3)
1019 self.pg4.add_stream(pkts)
1020 self.pg_enable_capture(self.pg_interfaces)
1022 capture = self.pg3.get_capture(len(pkts))
1023 self.verify_capture_out(capture, nat_ip1, True)
1025 # inside interface VRF don't match SNAT static mapping VRF (packets
1027 pkts = self.create_stream_in(self.pg0, self.pg3)
1028 self.pg0.add_stream(pkts)
1029 self.pg_enable_capture(self.pg_interfaces)
1031 self.pg3.assert_nothing_captured()
1033 def test_multiple_inside_interfaces(self):
1034 """ SNAT multiple inside interfaces (non-overlapping address space) """
1036 self.snat_add_address(self.snat_addr)
1037 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1038 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1039 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1042 # between two S-NAT inside interfaces (no translation)
1043 pkts = self.create_stream_in(self.pg0, self.pg1)
1044 self.pg0.add_stream(pkts)
1045 self.pg_enable_capture(self.pg_interfaces)
1047 capture = self.pg1.get_capture(len(pkts))
1048 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1050 # from S-NAT inside to interface without S-NAT feature (no translation)
1051 pkts = self.create_stream_in(self.pg0, self.pg2)
1052 self.pg0.add_stream(pkts)
1053 self.pg_enable_capture(self.pg_interfaces)
1055 capture = self.pg2.get_capture(len(pkts))
1056 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1058 # in2out 1st interface
1059 pkts = self.create_stream_in(self.pg0, self.pg3)
1060 self.pg0.add_stream(pkts)
1061 self.pg_enable_capture(self.pg_interfaces)
1063 capture = self.pg3.get_capture(len(pkts))
1064 self.verify_capture_out(capture)
1066 # out2in 1st interface
1067 pkts = self.create_stream_out(self.pg3)
1068 self.pg3.add_stream(pkts)
1069 self.pg_enable_capture(self.pg_interfaces)
1071 capture = self.pg0.get_capture(len(pkts))
1072 self.verify_capture_in(capture, self.pg0)
1074 # in2out 2nd interface
1075 pkts = self.create_stream_in(self.pg1, self.pg3)
1076 self.pg1.add_stream(pkts)
1077 self.pg_enable_capture(self.pg_interfaces)
1079 capture = self.pg3.get_capture(len(pkts))
1080 self.verify_capture_out(capture)
1082 # out2in 2nd interface
1083 pkts = self.create_stream_out(self.pg3)
1084 self.pg3.add_stream(pkts)
1085 self.pg_enable_capture(self.pg_interfaces)
1087 capture = self.pg1.get_capture(len(pkts))
1088 self.verify_capture_in(capture, self.pg1)
1090 def test_inside_overlapping_interfaces(self):
1091 """ SNAT multiple inside interfaces with overlapping address space """
1093 static_nat_ip = "10.0.0.10"
1094 self.snat_add_address(self.snat_addr)
1095 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1097 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1098 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
1099 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
1100 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1103 # between S-NAT inside interfaces with same VRF (no translation)
1104 pkts = self.create_stream_in(self.pg4, self.pg5)
1105 self.pg4.add_stream(pkts)
1106 self.pg_enable_capture(self.pg_interfaces)
1108 capture = self.pg5.get_capture(len(pkts))
1109 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1111 # between S-NAT inside interfaces with different VRF (hairpinning)
1112 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1113 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1114 TCP(sport=1234, dport=5678))
1115 self.pg4.add_stream(p)
1116 self.pg_enable_capture(self.pg_interfaces)
1118 capture = self.pg6.get_capture(1)
1123 self.assertEqual(ip.src, self.snat_addr)
1124 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1125 self.assertNotEqual(tcp.sport, 1234)
1126 self.assertEqual(tcp.dport, 5678)
1128 self.logger.error(ppp("Unexpected or invalid packet:", p))
1131 # in2out 1st interface
1132 pkts = self.create_stream_in(self.pg4, self.pg3)
1133 self.pg4.add_stream(pkts)
1134 self.pg_enable_capture(self.pg_interfaces)
1136 capture = self.pg3.get_capture(len(pkts))
1137 self.verify_capture_out(capture)
1139 # out2in 1st interface
1140 pkts = self.create_stream_out(self.pg3)
1141 self.pg3.add_stream(pkts)
1142 self.pg_enable_capture(self.pg_interfaces)
1144 capture = self.pg4.get_capture(len(pkts))
1145 self.verify_capture_in(capture, self.pg4)
1147 # in2out 2nd interface
1148 pkts = self.create_stream_in(self.pg5, self.pg3)
1149 self.pg5.add_stream(pkts)
1150 self.pg_enable_capture(self.pg_interfaces)
1152 capture = self.pg3.get_capture(len(pkts))
1153 self.verify_capture_out(capture)
1155 # out2in 2nd interface
1156 pkts = self.create_stream_out(self.pg3)
1157 self.pg3.add_stream(pkts)
1158 self.pg_enable_capture(self.pg_interfaces)
1160 capture = self.pg5.get_capture(len(pkts))
1161 self.verify_capture_in(capture, self.pg5)
1164 addresses = self.vapi.snat_address_dump()
1165 self.assertEqual(len(addresses), 1)
1166 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1167 self.assertEqual(len(sessions), 3)
1168 for session in sessions:
1169 self.assertFalse(session.is_static)
1170 self.assertEqual(session.inside_ip_address[0:4],
1171 self.pg5.remote_ip4n)
1172 self.assertEqual(session.outside_ip_address,
1173 addresses[0].ip_address)
1174 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1175 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1176 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1177 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1178 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1179 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1180 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1181 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1182 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1184 # in2out 3rd interface
1185 pkts = self.create_stream_in(self.pg6, self.pg3)
1186 self.pg6.add_stream(pkts)
1187 self.pg_enable_capture(self.pg_interfaces)
1189 capture = self.pg3.get_capture(len(pkts))
1190 self.verify_capture_out(capture, static_nat_ip, True)
1192 # out2in 3rd interface
1193 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1194 self.pg3.add_stream(pkts)
1195 self.pg_enable_capture(self.pg_interfaces)
1197 capture = self.pg6.get_capture(len(pkts))
1198 self.verify_capture_in(capture, self.pg6)
1200 # general user and session dump verifications
1201 users = self.vapi.snat_user_dump()
1202 self.assertTrue(len(users) >= 3)
1203 addresses = self.vapi.snat_address_dump()
1204 self.assertEqual(len(addresses), 1)
1206 sessions = self.vapi.snat_user_session_dump(user.ip_address,
1208 for session in sessions:
1209 self.assertEqual(user.ip_address, session.inside_ip_address)
1210 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1211 self.assertTrue(session.protocol in
1212 [IP_PROTOS.tcp, IP_PROTOS.udp,
1216 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1217 self.assertTrue(len(sessions) >= 4)
1218 for session in sessions:
1219 self.assertFalse(session.is_static)
1220 self.assertEqual(session.inside_ip_address[0:4],
1221 self.pg4.remote_ip4n)
1222 self.assertEqual(session.outside_ip_address,
1223 addresses[0].ip_address)
1226 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1227 self.assertTrue(len(sessions) >= 3)
1228 for session in sessions:
1229 self.assertTrue(session.is_static)
1230 self.assertEqual(session.inside_ip_address[0:4],
1231 self.pg6.remote_ip4n)
1232 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1233 map(int, static_nat_ip.split('.')))
1234 self.assertTrue(session.inside_port in
1235 [self.tcp_port_in, self.udp_port_in,
1238 def test_hairpinning(self):
1239 """ SNAT hairpinning - 1:1 NAT with port"""
1241 host = self.pg0.remote_hosts[0]
1242 server = self.pg0.remote_hosts[1]
1245 server_in_port = 5678
1246 server_out_port = 8765
1248 self.snat_add_address(self.snat_addr)
1249 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1250 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1252 # add static mapping for server
1253 self.snat_add_static_mapping(server.ip4, self.snat_addr,
1254 server_in_port, server_out_port,
1255 proto=IP_PROTOS.tcp)
1257 # send packet from host to server
1258 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1259 IP(src=host.ip4, dst=self.snat_addr) /
1260 TCP(sport=host_in_port, dport=server_out_port))
1261 self.pg0.add_stream(p)
1262 self.pg_enable_capture(self.pg_interfaces)
1264 capture = self.pg0.get_capture(1)
1269 self.assertEqual(ip.src, self.snat_addr)
1270 self.assertEqual(ip.dst, server.ip4)
1271 self.assertNotEqual(tcp.sport, host_in_port)
1272 self.assertEqual(tcp.dport, server_in_port)
1273 self.check_tcp_checksum(p)
1274 host_out_port = tcp.sport
1276 self.logger.error(ppp("Unexpected or invalid packet:", p))
1279 # send reply from server to host
1280 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1281 IP(src=server.ip4, dst=self.snat_addr) /
1282 TCP(sport=server_in_port, dport=host_out_port))
1283 self.pg0.add_stream(p)
1284 self.pg_enable_capture(self.pg_interfaces)
1286 capture = self.pg0.get_capture(1)
1291 self.assertEqual(ip.src, self.snat_addr)
1292 self.assertEqual(ip.dst, host.ip4)
1293 self.assertEqual(tcp.sport, server_out_port)
1294 self.assertEqual(tcp.dport, host_in_port)
1295 self.check_tcp_checksum(p)
1297 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1300 def test_hairpinning2(self):
1301 """ SNAT hairpinning - 1:1 NAT"""
1303 server1_nat_ip = "10.0.0.10"
1304 server2_nat_ip = "10.0.0.11"
1305 host = self.pg0.remote_hosts[0]
1306 server1 = self.pg0.remote_hosts[1]
1307 server2 = self.pg0.remote_hosts[2]
1308 server_tcp_port = 22
1309 server_udp_port = 20
1311 self.snat_add_address(self.snat_addr)
1312 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1313 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1316 # add static mapping for servers
1317 self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1318 self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1322 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1323 IP(src=host.ip4, dst=server1_nat_ip) /
1324 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1326 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1327 IP(src=host.ip4, dst=server1_nat_ip) /
1328 UDP(sport=self.udp_port_in, dport=server_udp_port))
1330 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1331 IP(src=host.ip4, dst=server1_nat_ip) /
1332 ICMP(id=self.icmp_id_in, type='echo-request'))
1334 self.pg0.add_stream(pkts)
1335 self.pg_enable_capture(self.pg_interfaces)
1337 capture = self.pg0.get_capture(len(pkts))
1338 for packet in capture:
1340 self.assertEqual(packet[IP].src, self.snat_addr)
1341 self.assertEqual(packet[IP].dst, server1.ip4)
1342 if packet.haslayer(TCP):
1343 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1344 self.assertEqual(packet[TCP].dport, server_tcp_port)
1345 self.tcp_port_out = packet[TCP].sport
1346 self.check_tcp_checksum(packet)
1347 elif packet.haslayer(UDP):
1348 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1349 self.assertEqual(packet[UDP].dport, server_udp_port)
1350 self.udp_port_out = packet[UDP].sport
1352 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1353 self.icmp_id_out = packet[ICMP].id
1355 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1360 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1361 IP(src=server1.ip4, dst=self.snat_addr) /
1362 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1364 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1365 IP(src=server1.ip4, dst=self.snat_addr) /
1366 UDP(sport=server_udp_port, dport=self.udp_port_out))
1368 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1369 IP(src=server1.ip4, dst=self.snat_addr) /
1370 ICMP(id=self.icmp_id_out, type='echo-reply'))
1372 self.pg0.add_stream(pkts)
1373 self.pg_enable_capture(self.pg_interfaces)
1375 capture = self.pg0.get_capture(len(pkts))
1376 for packet in capture:
1378 self.assertEqual(packet[IP].src, server1_nat_ip)
1379 self.assertEqual(packet[IP].dst, host.ip4)
1380 if packet.haslayer(TCP):
1381 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1382 self.assertEqual(packet[TCP].sport, server_tcp_port)
1383 self.check_tcp_checksum(packet)
1384 elif packet.haslayer(UDP):
1385 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1386 self.assertEqual(packet[UDP].sport, server_udp_port)
1388 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1390 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1393 # server2 to server1
1395 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1396 IP(src=server2.ip4, dst=server1_nat_ip) /
1397 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1399 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1400 IP(src=server2.ip4, dst=server1_nat_ip) /
1401 UDP(sport=self.udp_port_in, dport=server_udp_port))
1403 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1404 IP(src=server2.ip4, dst=server1_nat_ip) /
1405 ICMP(id=self.icmp_id_in, type='echo-request'))
1407 self.pg0.add_stream(pkts)
1408 self.pg_enable_capture(self.pg_interfaces)
1410 capture = self.pg0.get_capture(len(pkts))
1411 for packet in capture:
1413 self.assertEqual(packet[IP].src, server2_nat_ip)
1414 self.assertEqual(packet[IP].dst, server1.ip4)
1415 if packet.haslayer(TCP):
1416 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1417 self.assertEqual(packet[TCP].dport, server_tcp_port)
1418 self.tcp_port_out = packet[TCP].sport
1419 self.check_tcp_checksum(packet)
1420 elif packet.haslayer(UDP):
1421 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1422 self.assertEqual(packet[UDP].dport, server_udp_port)
1423 self.udp_port_out = packet[UDP].sport
1425 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1426 self.icmp_id_out = packet[ICMP].id
1428 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1431 # server1 to server2
1433 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1434 IP(src=server1.ip4, dst=server2_nat_ip) /
1435 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1437 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1438 IP(src=server1.ip4, dst=server2_nat_ip) /
1439 UDP(sport=server_udp_port, dport=self.udp_port_out))
1441 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1442 IP(src=server1.ip4, dst=server2_nat_ip) /
1443 ICMP(id=self.icmp_id_out, type='echo-reply'))
1445 self.pg0.add_stream(pkts)
1446 self.pg_enable_capture(self.pg_interfaces)
1448 capture = self.pg0.get_capture(len(pkts))
1449 for packet in capture:
1451 self.assertEqual(packet[IP].src, server1_nat_ip)
1452 self.assertEqual(packet[IP].dst, server2.ip4)
1453 if packet.haslayer(TCP):
1454 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1455 self.assertEqual(packet[TCP].sport, server_tcp_port)
1456 self.check_tcp_checksum(packet)
1457 elif packet.haslayer(UDP):
1458 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1459 self.assertEqual(packet[UDP].sport, server_udp_port)
1461 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1463 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1466 def test_max_translations_per_user(self):
1467 """ MAX translations per user - recycle the least recently used """
1469 self.snat_add_address(self.snat_addr)
1470 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1471 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1474 # get maximum number of translations per user
1475 snat_config = self.vapi.snat_show_config()
1477 # send more than maximum number of translations per user packets
1478 pkts_num = snat_config.max_translations_per_user + 5
1480 for port in range(0, pkts_num):
1481 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1482 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1483 TCP(sport=1025 + port))
1485 self.pg0.add_stream(pkts)
1486 self.pg_enable_capture(self.pg_interfaces)
1489 # verify number of translated packet
1490 self.pg1.get_capture(pkts_num)
1492 def test_interface_addr(self):
1493 """ Acquire SNAT addresses from interface """
1494 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1496 # no address in NAT pool
1497 adresses = self.vapi.snat_address_dump()
1498 self.assertEqual(0, len(adresses))
1500 # configure interface address and check NAT address pool
1501 self.pg7.config_ip4()
1502 adresses = self.vapi.snat_address_dump()
1503 self.assertEqual(1, len(adresses))
1504 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1506 # remove interface address and check NAT address pool
1507 self.pg7.unconfig_ip4()
1508 adresses = self.vapi.snat_address_dump()
1509 self.assertEqual(0, len(adresses))
1511 def test_interface_addr_static_mapping(self):
1512 """ Static mapping with addresses from interface """
1513 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1514 self.snat_add_static_mapping('1.2.3.4',
1515 external_sw_if_index=self.pg7.sw_if_index)
1517 # static mappings with external interface
1518 static_mappings = self.vapi.snat_static_mapping_dump()
1519 self.assertEqual(1, len(static_mappings))
1520 self.assertEqual(self.pg7.sw_if_index,
1521 static_mappings[0].external_sw_if_index)
1523 # configure interface address and check static mappings
1524 self.pg7.config_ip4()
1525 static_mappings = self.vapi.snat_static_mapping_dump()
1526 self.assertEqual(1, len(static_mappings))
1527 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1528 self.pg7.local_ip4n)
1529 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1531 # remove interface address and check static mappings
1532 self.pg7.unconfig_ip4()
1533 static_mappings = self.vapi.snat_static_mapping_dump()
1534 self.assertEqual(0, len(static_mappings))
1536 def test_ipfix_nat44_sess(self):
1537 """ S-NAT IPFIX logging NAT44 session created/delted """
1538 self.ipfix_domain_id = 10
1539 self.ipfix_src_port = 20202
1540 colector_port = 30303
1541 bind_layers(UDP, IPFIX, dport=30303)
1542 self.snat_add_address(self.snat_addr)
1543 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1544 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1546 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1547 src_address=self.pg3.local_ip4n,
1549 template_interval=10,
1550 collector_port=colector_port)
1551 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1552 src_port=self.ipfix_src_port)
1554 pkts = self.create_stream_in(self.pg0, self.pg1)
1555 self.pg0.add_stream(pkts)
1556 self.pg_enable_capture(self.pg_interfaces)
1558 capture = self.pg1.get_capture(len(pkts))
1559 self.verify_capture_out(capture)
1560 self.snat_add_address(self.snat_addr, is_add=0)
1561 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1562 capture = self.pg3.get_capture(3)
1563 ipfix = IPFIXDecoder()
1564 # first load template
1566 self.assertTrue(p.haslayer(IPFIX))
1567 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1568 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1569 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1570 self.assertEqual(p[UDP].dport, colector_port)
1571 self.assertEqual(p[IPFIX].observationDomainID,
1572 self.ipfix_domain_id)
1573 if p.haslayer(Template):
1574 ipfix.add_template(p.getlayer(Template))
1575 # verify events in data set
1577 if p.haslayer(Data):
1578 data = ipfix.decode_data_set(p.getlayer(Set))
1579 self.verify_ipfix_nat44_ses(data)
1581 def test_ipfix_addr_exhausted(self):
1582 """ S-NAT IPFIX logging NAT addresses exhausted """
1583 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1584 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1586 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1587 src_address=self.pg3.local_ip4n,
1589 template_interval=10)
1590 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1591 src_port=self.ipfix_src_port)
1593 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1594 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1596 self.pg0.add_stream(p)
1597 self.pg_enable_capture(self.pg_interfaces)
1599 capture = self.pg1.get_capture(0)
1600 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1601 capture = self.pg3.get_capture(3)
1602 ipfix = IPFIXDecoder()
1603 # first load template
1605 self.assertTrue(p.haslayer(IPFIX))
1606 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1607 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1608 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1609 self.assertEqual(p[UDP].dport, 4739)
1610 self.assertEqual(p[IPFIX].observationDomainID,
1611 self.ipfix_domain_id)
1612 if p.haslayer(Template):
1613 ipfix.add_template(p.getlayer(Template))
1614 # verify events in data set
1616 if p.haslayer(Data):
1617 data = ipfix.decode_data_set(p.getlayer(Set))
1618 self.verify_ipfix_addr_exhausted(data)
1620 def test_pool_addr_fib(self):
1621 """ S-NAT add pool addresses to FIB """
1622 static_addr = '10.0.0.10'
1623 self.snat_add_address(self.snat_addr)
1624 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1625 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1627 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1630 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1631 ARP(op=ARP.who_has, pdst=self.snat_addr,
1632 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1633 self.pg1.add_stream(p)
1634 self.pg_enable_capture(self.pg_interfaces)
1636 capture = self.pg1.get_capture(1)
1637 self.assertTrue(capture[0].haslayer(ARP))
1638 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1641 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1642 ARP(op=ARP.who_has, pdst=static_addr,
1643 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1644 self.pg1.add_stream(p)
1645 self.pg_enable_capture(self.pg_interfaces)
1647 capture = self.pg1.get_capture(1)
1648 self.assertTrue(capture[0].haslayer(ARP))
1649 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1651 # send ARP to non-SNAT interface
1652 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1653 ARP(op=ARP.who_has, pdst=self.snat_addr,
1654 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1655 self.pg2.add_stream(p)
1656 self.pg_enable_capture(self.pg_interfaces)
1658 capture = self.pg1.get_capture(0)
1660 # remove addresses and verify
1661 self.snat_add_address(self.snat_addr, is_add=0)
1662 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1665 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1666 ARP(op=ARP.who_has, pdst=self.snat_addr,
1667 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1668 self.pg1.add_stream(p)
1669 self.pg_enable_capture(self.pg_interfaces)
1671 capture = self.pg1.get_capture(0)
1673 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1674 ARP(op=ARP.who_has, pdst=static_addr,
1675 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1676 self.pg1.add_stream(p)
1677 self.pg_enable_capture(self.pg_interfaces)
1679 capture = self.pg1.get_capture(0)
1681 def test_vrf_mode(self):
1682 """ S-NAT tenant VRF aware address pool mode """
1686 nat_ip1 = "10.0.0.10"
1687 nat_ip2 = "10.0.0.11"
1689 self.pg0.unconfig_ip4()
1690 self.pg1.unconfig_ip4()
1691 self.pg0.set_table_ip4(vrf_id1)
1692 self.pg1.set_table_ip4(vrf_id2)
1693 self.pg0.config_ip4()
1694 self.pg1.config_ip4()
1696 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1697 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1698 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1699 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1700 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1704 pkts = self.create_stream_in(self.pg0, self.pg2)
1705 self.pg0.add_stream(pkts)
1706 self.pg_enable_capture(self.pg_interfaces)
1708 capture = self.pg2.get_capture(len(pkts))
1709 self.verify_capture_out(capture, nat_ip1)
1712 pkts = self.create_stream_in(self.pg1, self.pg2)
1713 self.pg1.add_stream(pkts)
1714 self.pg_enable_capture(self.pg_interfaces)
1716 capture = self.pg2.get_capture(len(pkts))
1717 self.verify_capture_out(capture, nat_ip2)
1719 def test_vrf_feature_independent(self):
1720 """ S-NAT tenant VRF independent address pool mode """
1722 nat_ip1 = "10.0.0.10"
1723 nat_ip2 = "10.0.0.11"
1725 self.snat_add_address(nat_ip1)
1726 self.snat_add_address(nat_ip2)
1727 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1728 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1729 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1733 pkts = self.create_stream_in(self.pg0, self.pg2)
1734 self.pg0.add_stream(pkts)
1735 self.pg_enable_capture(self.pg_interfaces)
1737 capture = self.pg2.get_capture(len(pkts))
1738 self.verify_capture_out(capture, nat_ip1)
1741 pkts = self.create_stream_in(self.pg1, self.pg2)
1742 self.pg1.add_stream(pkts)
1743 self.pg_enable_capture(self.pg_interfaces)
1745 capture = self.pg2.get_capture(len(pkts))
1746 self.verify_capture_out(capture, nat_ip1)
1748 def test_dynamic_ipless_interfaces(self):
1749 """ SNAT interfaces without configured ip dynamic map """
1751 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1752 self.pg7.remote_mac,
1753 self.pg7.remote_ip4n,
1755 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1756 self.pg8.remote_mac,
1757 self.pg8.remote_ip4n,
1760 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1761 dst_address_length=32,
1762 next_hop_address=self.pg7.remote_ip4n,
1763 next_hop_sw_if_index=self.pg7.sw_if_index)
1764 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1765 dst_address_length=32,
1766 next_hop_address=self.pg8.remote_ip4n,
1767 next_hop_sw_if_index=self.pg8.sw_if_index)
1769 self.snat_add_address(self.snat_addr)
1770 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1771 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1775 pkts = self.create_stream_in(self.pg7, self.pg8)
1776 self.pg7.add_stream(pkts)
1777 self.pg_enable_capture(self.pg_interfaces)
1779 capture = self.pg8.get_capture(len(pkts))
1780 self.verify_capture_out(capture)
1783 pkts = self.create_stream_out(self.pg8, self.snat_addr)
1784 self.pg8.add_stream(pkts)
1785 self.pg_enable_capture(self.pg_interfaces)
1787 capture = self.pg7.get_capture(len(pkts))
1788 self.verify_capture_in(capture, self.pg7)
1790 def test_static_ipless_interfaces(self):
1791 """ SNAT 1:1 NAT interfaces without configured ip """
1793 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1794 self.pg7.remote_mac,
1795 self.pg7.remote_ip4n,
1797 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1798 self.pg8.remote_mac,
1799 self.pg8.remote_ip4n,
1802 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1803 dst_address_length=32,
1804 next_hop_address=self.pg7.remote_ip4n,
1805 next_hop_sw_if_index=self.pg7.sw_if_index)
1806 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1807 dst_address_length=32,
1808 next_hop_address=self.pg8.remote_ip4n,
1809 next_hop_sw_if_index=self.pg8.sw_if_index)
1811 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1812 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1813 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1817 pkts = self.create_stream_out(self.pg8)
1818 self.pg8.add_stream(pkts)
1819 self.pg_enable_capture(self.pg_interfaces)
1821 capture = self.pg7.get_capture(len(pkts))
1822 self.verify_capture_in(capture, self.pg7)
1825 pkts = self.create_stream_in(self.pg7, self.pg8)
1826 self.pg7.add_stream(pkts)
1827 self.pg_enable_capture(self.pg_interfaces)
1829 capture = self.pg8.get_capture(len(pkts))
1830 self.verify_capture_out(capture, self.snat_addr, True)
1832 def test_static_with_port_ipless_interfaces(self):
1833 """ SNAT 1:1 NAT with port interfaces without configured ip """
1835 self.tcp_port_out = 30606
1836 self.udp_port_out = 30607
1837 self.icmp_id_out = 30608
1839 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1840 self.pg7.remote_mac,
1841 self.pg7.remote_ip4n,
1843 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1844 self.pg8.remote_mac,
1845 self.pg8.remote_ip4n,
1848 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1849 dst_address_length=32,
1850 next_hop_address=self.pg7.remote_ip4n,
1851 next_hop_sw_if_index=self.pg7.sw_if_index)
1852 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1853 dst_address_length=32,
1854 next_hop_address=self.pg8.remote_ip4n,
1855 next_hop_sw_if_index=self.pg8.sw_if_index)
1857 self.snat_add_address(self.snat_addr)
1858 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1859 self.tcp_port_in, self.tcp_port_out,
1860 proto=IP_PROTOS.tcp)
1861 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1862 self.udp_port_in, self.udp_port_out,
1863 proto=IP_PROTOS.udp)
1864 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1865 self.icmp_id_in, self.icmp_id_out,
1866 proto=IP_PROTOS.icmp)
1867 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1868 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1872 pkts = self.create_stream_out(self.pg8)
1873 self.pg8.add_stream(pkts)
1874 self.pg_enable_capture(self.pg_interfaces)
1876 capture = self.pg7.get_capture(len(pkts))
1877 self.verify_capture_in(capture, self.pg7)
1880 pkts = self.create_stream_in(self.pg7, self.pg8)
1881 self.pg7.add_stream(pkts)
1882 self.pg_enable_capture(self.pg_interfaces)
1884 capture = self.pg8.get_capture(len(pkts))
1885 self.verify_capture_out(capture)
1887 def test_static_unknown_proto(self):
1888 """ 1:1 NAT translate packet with unknown protocol """
1889 nat_ip = "10.0.0.10"
1890 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1891 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1892 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1896 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1897 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1899 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1900 TCP(sport=1234, dport=1234))
1901 self.pg0.add_stream(p)
1902 self.pg_enable_capture(self.pg_interfaces)
1904 p = self.pg1.get_capture(1)
1907 self.assertEqual(packet[IP].src, nat_ip)
1908 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1909 self.assertTrue(packet.haslayer(GRE))
1910 self.check_ip_checksum(packet)
1912 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1916 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1917 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1919 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1920 TCP(sport=1234, dport=1234))
1921 self.pg1.add_stream(p)
1922 self.pg_enable_capture(self.pg_interfaces)
1924 p = self.pg0.get_capture(1)
1927 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
1928 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
1929 self.assertTrue(packet.haslayer(GRE))
1930 self.check_ip_checksum(packet)
1932 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1935 def test_hairpinning_unknown_proto(self):
1936 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
1938 host = self.pg0.remote_hosts[0]
1939 server = self.pg0.remote_hosts[1]
1941 host_nat_ip = "10.0.0.10"
1942 server_nat_ip = "10.0.0.11"
1944 self.snat_add_static_mapping(host.ip4, host_nat_ip)
1945 self.snat_add_static_mapping(server.ip4, server_nat_ip)
1946 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1947 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1951 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
1952 IP(src=host.ip4, dst=server_nat_ip) /
1954 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1955 TCP(sport=1234, dport=1234))
1956 self.pg0.add_stream(p)
1957 self.pg_enable_capture(self.pg_interfaces)
1959 p = self.pg0.get_capture(1)
1962 self.assertEqual(packet[IP].src, host_nat_ip)
1963 self.assertEqual(packet[IP].dst, server.ip4)
1964 self.assertTrue(packet.haslayer(GRE))
1965 self.check_ip_checksum(packet)
1967 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1971 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
1972 IP(src=server.ip4, dst=host_nat_ip) /
1974 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1975 TCP(sport=1234, dport=1234))
1976 self.pg0.add_stream(p)
1977 self.pg_enable_capture(self.pg_interfaces)
1979 p = self.pg0.get_capture(1)
1982 self.assertEqual(packet[IP].src, server_nat_ip)
1983 self.assertEqual(packet[IP].dst, host.ip4)
1984 self.assertTrue(packet.haslayer(GRE))
1985 self.check_ip_checksum(packet)
1987 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1991 super(TestSNAT, self).tearDown()
1992 if not self.vpp_dead:
1993 self.logger.info(self.vapi.cli("show snat verbose"))
1997 class TestDeterministicNAT(MethodHolder):
1998 """ Deterministic NAT Test Cases """
2001 def setUpConstants(cls):
2002 super(TestDeterministicNAT, cls).setUpConstants()
2003 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
2006 def setUpClass(cls):
2007 super(TestDeterministicNAT, cls).setUpClass()
2010 cls.tcp_port_in = 6303
2011 cls.tcp_external_port = 6303
2012 cls.udp_port_in = 6304
2013 cls.udp_external_port = 6304
2014 cls.icmp_id_in = 6305
2015 cls.snat_addr = '10.0.0.3'
2017 cls.create_pg_interfaces(range(3))
2018 cls.interfaces = list(cls.pg_interfaces)
2020 for i in cls.interfaces:
2025 cls.pg0.generate_remote_hosts(2)
2026 cls.pg0.configure_ipv4_neighbors()
2029 super(TestDeterministicNAT, cls).tearDownClass()
2032 def create_stream_in(self, in_if, out_if, ttl=64):
2034 Create packet stream for inside network
2036 :param in_if: Inside interface
2037 :param out_if: Outside interface
2038 :param ttl: TTL of generated packets
2042 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2043 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2044 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2048 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2049 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2050 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2054 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2055 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2056 ICMP(id=self.icmp_id_in, type='echo-request'))
2061 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2063 Create packet stream for outside network
2065 :param out_if: Outside interface
2066 :param dst_ip: Destination IP address (Default use global SNAT address)
2067 :param ttl: TTL of generated packets
2070 dst_ip = self.snat_addr
2073 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2074 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2075 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2079 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2080 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2081 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2085 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2086 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2087 ICMP(id=self.icmp_external_id, type='echo-reply'))
2092 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2094 Verify captured packets on outside network
2096 :param capture: Captured packets
2097 :param nat_ip: Translated IP address (Default use global SNAT address)
2098 :param same_port: Sorce port number is not translated (Default False)
2099 :param packet_num: Expected number of packets (Default 3)
2102 nat_ip = self.snat_addr
2103 self.assertEqual(packet_num, len(capture))
2104 for packet in capture:
2106 self.assertEqual(packet[IP].src, nat_ip)
2107 if packet.haslayer(TCP):
2108 self.tcp_port_out = packet[TCP].sport
2109 elif packet.haslayer(UDP):
2110 self.udp_port_out = packet[UDP].sport
2112 self.icmp_external_id = packet[ICMP].id
2114 self.logger.error(ppp("Unexpected or invalid packet "
2115 "(outside network):", packet))
2118 def initiate_tcp_session(self, in_if, out_if):
2120 Initiates TCP session
2122 :param in_if: Inside interface
2123 :param out_if: Outside interface
2126 # SYN packet in->out
2127 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2128 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2129 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2132 self.pg_enable_capture(self.pg_interfaces)
2134 capture = out_if.get_capture(1)
2136 self.tcp_port_out = p[TCP].sport
2138 # SYN + ACK packet out->in
2139 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2140 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
2141 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2143 out_if.add_stream(p)
2144 self.pg_enable_capture(self.pg_interfaces)
2146 in_if.get_capture(1)
2148 # ACK packet in->out
2149 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2150 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2151 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2154 self.pg_enable_capture(self.pg_interfaces)
2156 out_if.get_capture(1)
2159 self.logger.error("TCP 3 way handshake failed")
2162 def verify_ipfix_max_entries_per_user(self, data):
2164 Verify IPFIX maximum entries per user exceeded event
2166 :param data: Decoded IPFIX data records
2168 self.assertEqual(1, len(data))
2171 self.assertEqual(ord(record[230]), 13)
2172 # natQuotaExceededEvent
2173 self.assertEqual('\x03\x00\x00\x00', record[466])
2175 self.assertEqual(self.pg0.remote_ip4n, record[8])
2177 def test_deterministic_mode(self):
2178 """ S-NAT run deterministic mode """
2179 in_addr = '172.16.255.0'
2180 out_addr = '172.17.255.50'
2181 in_addr_t = '172.16.255.20'
2182 in_addr_n = socket.inet_aton(in_addr)
2183 out_addr_n = socket.inet_aton(out_addr)
2184 in_addr_t_n = socket.inet_aton(in_addr_t)
2188 snat_config = self.vapi.snat_show_config()
2189 self.assertEqual(1, snat_config.deterministic)
2191 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
2193 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
2194 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2195 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
2196 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2198 deterministic_mappings = self.vapi.snat_det_map_dump()
2199 self.assertEqual(len(deterministic_mappings), 1)
2200 dsm = deterministic_mappings[0]
2201 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2202 self.assertEqual(in_plen, dsm.in_plen)
2203 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2204 self.assertEqual(out_plen, dsm.out_plen)
2207 deterministic_mappings = self.vapi.snat_det_map_dump()
2208 self.assertEqual(len(deterministic_mappings), 0)
2210 def test_set_timeouts(self):
2211 """ Set deterministic NAT timeouts """
2212 timeouts_before = self.vapi.snat_det_get_timeouts()
2214 self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
2215 timeouts_before.tcp_established + 10,
2216 timeouts_before.tcp_transitory + 10,
2217 timeouts_before.icmp + 10)
2219 timeouts_after = self.vapi.snat_det_get_timeouts()
2221 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2222 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2223 self.assertNotEqual(timeouts_before.tcp_established,
2224 timeouts_after.tcp_established)
2225 self.assertNotEqual(timeouts_before.tcp_transitory,
2226 timeouts_after.tcp_transitory)
2228 def test_det_in(self):
2229 """ CGNAT translation test (TCP, UDP, ICMP) """
2231 nat_ip = "10.0.0.10"
2233 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2235 socket.inet_aton(nat_ip),
2237 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2238 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2242 pkts = self.create_stream_in(self.pg0, self.pg1)
2243 self.pg0.add_stream(pkts)
2244 self.pg_enable_capture(self.pg_interfaces)
2246 capture = self.pg1.get_capture(len(pkts))
2247 self.verify_capture_out(capture, nat_ip)
2250 pkts = self.create_stream_out(self.pg1, nat_ip)
2251 self.pg1.add_stream(pkts)
2252 self.pg_enable_capture(self.pg_interfaces)
2254 capture = self.pg0.get_capture(len(pkts))
2255 self.verify_capture_in(capture, self.pg0)
2258 sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2259 self.assertEqual(len(sessions), 3)
2263 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2264 self.assertEqual(s.in_port, self.tcp_port_in)
2265 self.assertEqual(s.out_port, self.tcp_port_out)
2266 self.assertEqual(s.ext_port, self.tcp_external_port)
2270 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2271 self.assertEqual(s.in_port, self.udp_port_in)
2272 self.assertEqual(s.out_port, self.udp_port_out)
2273 self.assertEqual(s.ext_port, self.udp_external_port)
2277 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2278 self.assertEqual(s.in_port, self.icmp_id_in)
2279 self.assertEqual(s.out_port, self.icmp_external_id)
2281 def test_multiple_users(self):
2282 """ CGNAT multiple users """
2284 nat_ip = "10.0.0.10"
2286 external_port = 6303
2288 host0 = self.pg0.remote_hosts[0]
2289 host1 = self.pg0.remote_hosts[1]
2291 self.vapi.snat_add_det_map(host0.ip4n,
2293 socket.inet_aton(nat_ip),
2295 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2296 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2300 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2301 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2302 TCP(sport=port_in, dport=external_port))
2303 self.pg0.add_stream(p)
2304 self.pg_enable_capture(self.pg_interfaces)
2306 capture = self.pg1.get_capture(1)
2311 self.assertEqual(ip.src, nat_ip)
2312 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2313 self.assertEqual(tcp.dport, external_port)
2314 port_out0 = tcp.sport
2316 self.logger.error(ppp("Unexpected or invalid packet:", p))
2320 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2321 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2322 TCP(sport=port_in, dport=external_port))
2323 self.pg0.add_stream(p)
2324 self.pg_enable_capture(self.pg_interfaces)
2326 capture = self.pg1.get_capture(1)
2331 self.assertEqual(ip.src, nat_ip)
2332 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2333 self.assertEqual(tcp.dport, external_port)
2334 port_out1 = tcp.sport
2336 self.logger.error(ppp("Unexpected or invalid packet:", p))
2339 dms = self.vapi.snat_det_map_dump()
2340 self.assertEqual(1, len(dms))
2341 self.assertEqual(2, dms[0].ses_num)
2344 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2345 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2346 TCP(sport=external_port, dport=port_out0))
2347 self.pg1.add_stream(p)
2348 self.pg_enable_capture(self.pg_interfaces)
2350 capture = self.pg0.get_capture(1)
2355 self.assertEqual(ip.src, self.pg1.remote_ip4)
2356 self.assertEqual(ip.dst, host0.ip4)
2357 self.assertEqual(tcp.dport, port_in)
2358 self.assertEqual(tcp.sport, external_port)
2360 self.logger.error(ppp("Unexpected or invalid packet:", p))
2364 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2365 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2366 TCP(sport=external_port, dport=port_out1))
2367 self.pg1.add_stream(p)
2368 self.pg_enable_capture(self.pg_interfaces)
2370 capture = self.pg0.get_capture(1)
2375 self.assertEqual(ip.src, self.pg1.remote_ip4)
2376 self.assertEqual(ip.dst, host1.ip4)
2377 self.assertEqual(tcp.dport, port_in)
2378 self.assertEqual(tcp.sport, external_port)
2380 self.logger.error(ppp("Unexpected or invalid packet", p))
2383 # session close api test
2384 self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2386 self.pg1.remote_ip4n,
2388 dms = self.vapi.snat_det_map_dump()
2389 self.assertEqual(dms[0].ses_num, 1)
2391 self.vapi.snat_det_close_session_in(host0.ip4n,
2393 self.pg1.remote_ip4n,
2395 dms = self.vapi.snat_det_map_dump()
2396 self.assertEqual(dms[0].ses_num, 0)
2398 def test_tcp_session_close_detection_in(self):
2399 """ CGNAT TCP session close initiated from inside network """
2400 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2402 socket.inet_aton(self.snat_addr),
2404 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2405 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2408 self.initiate_tcp_session(self.pg0, self.pg1)
2410 # close the session from inside
2412 # FIN packet in -> out
2413 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2414 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2415 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2417 self.pg0.add_stream(p)
2418 self.pg_enable_capture(self.pg_interfaces)
2420 self.pg1.get_capture(1)
2424 # ACK packet out -> in
2425 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2426 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2427 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2431 # FIN packet out -> in
2432 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2433 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2434 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2438 self.pg1.add_stream(pkts)
2439 self.pg_enable_capture(self.pg_interfaces)
2441 self.pg0.get_capture(2)
2443 # ACK packet in -> out
2444 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2445 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2446 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2448 self.pg0.add_stream(p)
2449 self.pg_enable_capture(self.pg_interfaces)
2451 self.pg1.get_capture(1)
2453 # Check if snat closed the session
2454 dms = self.vapi.snat_det_map_dump()
2455 self.assertEqual(0, dms[0].ses_num)
2457 self.logger.error("TCP session termination failed")
2460 def test_tcp_session_close_detection_out(self):
2461 """ CGNAT TCP session close initiated from outside network """
2462 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2464 socket.inet_aton(self.snat_addr),
2466 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2467 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2470 self.initiate_tcp_session(self.pg0, self.pg1)
2472 # close the session from outside
2474 # FIN packet out -> in
2475 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2476 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2477 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2479 self.pg1.add_stream(p)
2480 self.pg_enable_capture(self.pg_interfaces)
2482 self.pg0.get_capture(1)
2486 # ACK packet in -> out
2487 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2488 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2489 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2493 # ACK packet in -> out
2494 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2495 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2496 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2500 self.pg0.add_stream(pkts)
2501 self.pg_enable_capture(self.pg_interfaces)
2503 self.pg1.get_capture(2)
2505 # ACK packet out -> in
2506 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2507 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2508 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2510 self.pg1.add_stream(p)
2511 self.pg_enable_capture(self.pg_interfaces)
2513 self.pg0.get_capture(1)
2515 # Check if snat closed the session
2516 dms = self.vapi.snat_det_map_dump()
2517 self.assertEqual(0, dms[0].ses_num)
2519 self.logger.error("TCP session termination failed")
2522 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2523 def test_session_timeout(self):
2524 """ CGNAT session timeouts """
2525 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2527 socket.inet_aton(self.snat_addr),
2529 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2530 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2533 self.initiate_tcp_session(self.pg0, self.pg1)
2534 self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2535 pkts = self.create_stream_in(self.pg0, self.pg1)
2536 self.pg0.add_stream(pkts)
2537 self.pg_enable_capture(self.pg_interfaces)
2539 capture = self.pg1.get_capture(len(pkts))
2542 dms = self.vapi.snat_det_map_dump()
2543 self.assertEqual(0, dms[0].ses_num)
2545 def test_session_limit_per_user(self):
2546 """ CGNAT maximum 1000 sessions per user should be created """
2547 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2549 socket.inet_aton(self.snat_addr),
2551 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2552 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2554 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2555 src_address=self.pg2.local_ip4n,
2557 template_interval=10)
2558 self.vapi.snat_ipfix()
2561 for port in range(1025, 2025):
2562 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2563 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2564 UDP(sport=port, dport=port))
2567 self.pg0.add_stream(pkts)
2568 self.pg_enable_capture(self.pg_interfaces)
2570 capture = self.pg1.get_capture(len(pkts))
2572 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2573 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2574 UDP(sport=3001, dport=3002))
2575 self.pg0.add_stream(p)
2576 self.pg_enable_capture(self.pg_interfaces)
2578 capture = self.pg1.assert_nothing_captured()
2580 # verify ICMP error packet
2581 capture = self.pg0.get_capture(1)
2583 self.assertTrue(p.haslayer(ICMP))
2585 self.assertEqual(icmp.type, 3)
2586 self.assertEqual(icmp.code, 1)
2587 self.assertTrue(icmp.haslayer(IPerror))
2588 inner_ip = icmp[IPerror]
2589 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2590 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2592 dms = self.vapi.snat_det_map_dump()
2594 self.assertEqual(1000, dms[0].ses_num)
2596 # verify IPFIX logging
2597 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2598 capture = self.pg2.get_capture(2)
2599 ipfix = IPFIXDecoder()
2600 # first load template
2602 self.assertTrue(p.haslayer(IPFIX))
2603 if p.haslayer(Template):
2604 ipfix.add_template(p.getlayer(Template))
2605 # verify events in data set
2607 if p.haslayer(Data):
2608 data = ipfix.decode_data_set(p.getlayer(Set))
2609 self.verify_ipfix_max_entries_per_user(data)
2611 def clear_snat(self):
2613 Clear SNAT configuration.
2615 self.vapi.snat_ipfix(enable=0)
2616 self.vapi.snat_det_set_timeouts()
2617 deterministic_mappings = self.vapi.snat_det_map_dump()
2618 for dsm in deterministic_mappings:
2619 self.vapi.snat_add_det_map(dsm.in_addr,
2625 interfaces = self.vapi.snat_interface_dump()
2626 for intf in interfaces:
2627 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2632 super(TestDeterministicNAT, self).tearDown()
2633 if not self.vpp_dead:
2634 self.logger.info(self.vapi.cli("show snat detail"))
2638 class TestNAT64(MethodHolder):
2639 """ NAT64 Test Cases """
2642 def setUpClass(cls):
2643 super(TestNAT64, cls).setUpClass()
2646 cls.tcp_port_in = 6303
2647 cls.tcp_port_out = 6303
2648 cls.udp_port_in = 6304
2649 cls.udp_port_out = 6304
2650 cls.icmp_id_in = 6305
2651 cls.icmp_id_out = 6305
2652 cls.nat_addr = '10.0.0.3'
2653 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2655 cls.vrf1_nat_addr = '10.0.10.3'
2656 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2659 cls.create_pg_interfaces(range(3))
2660 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2661 cls.ip6_interfaces.append(cls.pg_interfaces[2])
2662 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2664 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2666 cls.pg0.generate_remote_hosts(2)
2668 for i in cls.ip6_interfaces:
2671 i.configure_ipv6_neighbors()
2673 for i in cls.ip4_interfaces:
2679 super(TestNAT64, cls).tearDownClass()
2682 def test_pool(self):
2683 """ Add/delete address to NAT64 pool """
2684 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2686 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2688 addresses = self.vapi.nat64_pool_addr_dump()
2689 self.assertEqual(len(addresses), 1)
2690 self.assertEqual(addresses[0].address, nat_addr)
2692 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2694 addresses = self.vapi.nat64_pool_addr_dump()
2695 self.assertEqual(len(addresses), 0)
2697 def test_interface(self):
2698 """ Enable/disable NAT64 feature on the interface """
2699 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2700 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2702 interfaces = self.vapi.nat64_interface_dump()
2703 self.assertEqual(len(interfaces), 2)
2706 for intf in interfaces:
2707 if intf.sw_if_index == self.pg0.sw_if_index:
2708 self.assertEqual(intf.is_inside, 1)
2710 elif intf.sw_if_index == self.pg1.sw_if_index:
2711 self.assertEqual(intf.is_inside, 0)
2713 self.assertTrue(pg0_found)
2714 self.assertTrue(pg1_found)
2716 features = self.vapi.cli("show interface features pg0")
2717 self.assertNotEqual(features.find('nat64-in2out'), -1)
2718 features = self.vapi.cli("show interface features pg1")
2719 self.assertNotEqual(features.find('nat64-out2in'), -1)
2721 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2722 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2724 interfaces = self.vapi.nat64_interface_dump()
2725 self.assertEqual(len(interfaces), 0)
2727 def test_static_bib(self):
2728 """ Add/delete static BIB entry """
2729 in_addr = socket.inet_pton(socket.AF_INET6,
2730 '2001:db8:85a3::8a2e:370:7334')
2731 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2734 proto = IP_PROTOS.tcp
2736 self.vapi.nat64_add_del_static_bib(in_addr,
2741 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2746 self.assertEqual(bibe.i_addr, in_addr)
2747 self.assertEqual(bibe.o_addr, out_addr)
2748 self.assertEqual(bibe.i_port, in_port)
2749 self.assertEqual(bibe.o_port, out_port)
2750 self.assertEqual(static_bib_num, 1)
2752 self.vapi.nat64_add_del_static_bib(in_addr,
2758 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2763 self.assertEqual(static_bib_num, 0)
2765 def test_set_timeouts(self):
2766 """ Set NAT64 timeouts """
2767 # verify default values
2768 timeouts = self.vapi.nat64_get_timeouts()
2769 self.assertEqual(timeouts.udp, 300)
2770 self.assertEqual(timeouts.icmp, 60)
2771 self.assertEqual(timeouts.tcp_trans, 240)
2772 self.assertEqual(timeouts.tcp_est, 7440)
2773 self.assertEqual(timeouts.tcp_incoming_syn, 6)
2775 # set and verify custom values
2776 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2777 tcp_est=7450, tcp_incoming_syn=10)
2778 timeouts = self.vapi.nat64_get_timeouts()
2779 self.assertEqual(timeouts.udp, 200)
2780 self.assertEqual(timeouts.icmp, 30)
2781 self.assertEqual(timeouts.tcp_trans, 250)
2782 self.assertEqual(timeouts.tcp_est, 7450)
2783 self.assertEqual(timeouts.tcp_incoming_syn, 10)
2785 def test_dynamic(self):
2786 """ NAT64 dynamic translation test """
2787 self.tcp_port_in = 6303
2788 self.udp_port_in = 6304
2789 self.icmp_id_in = 6305
2791 ses_num_start = self.nat64_get_ses_num()
2793 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2795 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2796 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2799 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2800 self.pg0.add_stream(pkts)
2801 self.pg_enable_capture(self.pg_interfaces)
2803 capture = self.pg1.get_capture(len(pkts))
2804 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2805 dst_ip=self.pg1.remote_ip4)
2808 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2809 self.pg1.add_stream(pkts)
2810 self.pg_enable_capture(self.pg_interfaces)
2812 capture = self.pg0.get_capture(len(pkts))
2813 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2814 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2817 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2818 self.pg0.add_stream(pkts)
2819 self.pg_enable_capture(self.pg_interfaces)
2821 capture = self.pg1.get_capture(len(pkts))
2822 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2823 dst_ip=self.pg1.remote_ip4)
2826 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2827 self.pg1.add_stream(pkts)
2828 self.pg_enable_capture(self.pg_interfaces)
2830 capture = self.pg0.get_capture(len(pkts))
2831 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2833 ses_num_end = self.nat64_get_ses_num()
2835 self.assertEqual(ses_num_end - ses_num_start, 3)
2837 # tenant with specific VRF
2838 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
2839 self.vrf1_nat_addr_n,
2840 vrf_id=self.vrf1_id)
2841 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
2843 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
2844 self.pg2.add_stream(pkts)
2845 self.pg_enable_capture(self.pg_interfaces)
2847 capture = self.pg1.get_capture(len(pkts))
2848 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
2849 dst_ip=self.pg1.remote_ip4)
2851 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
2852 self.pg1.add_stream(pkts)
2853 self.pg_enable_capture(self.pg_interfaces)
2855 capture = self.pg2.get_capture(len(pkts))
2856 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
2858 def test_static(self):
2859 """ NAT64 static translation test """
2860 self.tcp_port_in = 60303
2861 self.udp_port_in = 60304
2862 self.icmp_id_in = 60305
2863 self.tcp_port_out = 60303
2864 self.udp_port_out = 60304
2865 self.icmp_id_out = 60305
2867 ses_num_start = self.nat64_get_ses_num()
2869 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2871 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2872 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2874 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2879 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2884 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2891 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2892 self.pg0.add_stream(pkts)
2893 self.pg_enable_capture(self.pg_interfaces)
2895 capture = self.pg1.get_capture(len(pkts))
2896 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2897 dst_ip=self.pg1.remote_ip4, same_port=True)
2900 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2901 self.pg1.add_stream(pkts)
2902 self.pg_enable_capture(self.pg_interfaces)
2904 capture = self.pg0.get_capture(len(pkts))
2905 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2906 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2908 ses_num_end = self.nat64_get_ses_num()
2910 self.assertEqual(ses_num_end - ses_num_start, 3)
2912 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2913 def test_session_timeout(self):
2914 """ NAT64 session timeout """
2915 self.icmp_id_in = 1234
2916 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2918 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2919 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2920 self.vapi.nat64_set_timeouts(icmp=5)
2922 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2923 self.pg0.add_stream(pkts)
2924 self.pg_enable_capture(self.pg_interfaces)
2926 capture = self.pg1.get_capture(len(pkts))
2928 ses_num_before_timeout = self.nat64_get_ses_num()
2932 # ICMP session after timeout
2933 ses_num_after_timeout = self.nat64_get_ses_num()
2934 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2936 def test_icmp_error(self):
2937 """ NAT64 ICMP Error message translation """
2938 self.tcp_port_in = 6303
2939 self.udp_port_in = 6304
2940 self.icmp_id_in = 6305
2942 ses_num_start = self.nat64_get_ses_num()
2944 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2946 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2947 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2949 # send some packets to create sessions
2950 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2951 self.pg0.add_stream(pkts)
2952 self.pg_enable_capture(self.pg_interfaces)
2954 capture_ip4 = self.pg1.get_capture(len(pkts))
2955 self.verify_capture_out(capture_ip4,
2956 nat_ip=self.nat_addr,
2957 dst_ip=self.pg1.remote_ip4)
2959 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2960 self.pg1.add_stream(pkts)
2961 self.pg_enable_capture(self.pg_interfaces)
2963 capture_ip6 = self.pg0.get_capture(len(pkts))
2964 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2965 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
2966 self.pg0.remote_ip6)
2969 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2970 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
2971 ICMPv6DestUnreach(code=1) /
2972 packet[IPv6] for packet in capture_ip6]
2973 self.pg0.add_stream(pkts)
2974 self.pg_enable_capture(self.pg_interfaces)
2976 capture = self.pg1.get_capture(len(pkts))
2977 for packet in capture:
2979 self.assertEqual(packet[IP].src, self.nat_addr)
2980 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2981 self.assertEqual(packet[ICMP].type, 3)
2982 self.assertEqual(packet[ICMP].code, 13)
2983 inner = packet[IPerror]
2984 self.assertEqual(inner.src, self.pg1.remote_ip4)
2985 self.assertEqual(inner.dst, self.nat_addr)
2986 self.check_icmp_checksum(packet)
2987 if inner.haslayer(TCPerror):
2988 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
2989 elif inner.haslayer(UDPerror):
2990 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
2992 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
2994 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2998 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2999 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3000 ICMP(type=3, code=13) /
3001 packet[IP] for packet in capture_ip4]
3002 self.pg1.add_stream(pkts)
3003 self.pg_enable_capture(self.pg_interfaces)
3005 capture = self.pg0.get_capture(len(pkts))
3006 for packet in capture:
3008 self.assertEqual(packet[IPv6].src, ip.src)
3009 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3010 icmp = packet[ICMPv6DestUnreach]
3011 self.assertEqual(icmp.code, 1)
3012 inner = icmp[IPerror6]
3013 self.assertEqual(inner.src, self.pg0.remote_ip6)
3014 self.assertEqual(inner.dst, ip.src)
3015 self.check_icmpv6_checksum(packet)
3016 if inner.haslayer(TCPerror):
3017 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3018 elif inner.haslayer(UDPerror):
3019 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3021 self.assertEqual(inner[ICMPv6EchoRequest].id,
3024 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3027 def test_hairpinning(self):
3028 """ NAT64 hairpinning """
3030 client = self.pg0.remote_hosts[0]
3031 server = self.pg0.remote_hosts[1]
3032 server_tcp_in_port = 22
3033 server_tcp_out_port = 4022
3034 server_udp_in_port = 23
3035 server_udp_out_port = 4023
3036 client_tcp_in_port = 1234
3037 client_udp_in_port = 1235
3038 client_tcp_out_port = 0
3039 client_udp_out_port = 0
3040 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3041 nat_addr_ip6 = ip.src
3043 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3045 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3046 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3048 self.vapi.nat64_add_del_static_bib(server.ip6n,
3051 server_tcp_out_port,
3053 self.vapi.nat64_add_del_static_bib(server.ip6n,
3056 server_udp_out_port,
3061 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3062 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3063 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3065 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3066 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3067 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3069 self.pg0.add_stream(pkts)
3070 self.pg_enable_capture(self.pg_interfaces)
3072 capture = self.pg0.get_capture(len(pkts))
3073 for packet in capture:
3075 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3076 self.assertEqual(packet[IPv6].dst, server.ip6)
3077 if packet.haslayer(TCP):
3078 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3079 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3080 self.check_tcp_checksum(packet)
3081 client_tcp_out_port = packet[TCP].sport
3083 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3084 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3085 self.check_udp_checksum(packet)
3086 client_udp_out_port = packet[UDP].sport
3088 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3093 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3094 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3095 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3097 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3098 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3099 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3101 self.pg0.add_stream(pkts)
3102 self.pg_enable_capture(self.pg_interfaces)
3104 capture = self.pg0.get_capture(len(pkts))
3105 for packet in capture:
3107 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3108 self.assertEqual(packet[IPv6].dst, client.ip6)
3109 if packet.haslayer(TCP):
3110 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3111 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3112 self.check_tcp_checksum(packet)
3114 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3115 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3116 self.check_udp_checksum(packet)
3118 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3123 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3124 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3125 ICMPv6DestUnreach(code=1) /
3126 packet[IPv6] for packet in capture]
3127 self.pg0.add_stream(pkts)
3128 self.pg_enable_capture(self.pg_interfaces)
3130 capture = self.pg0.get_capture(len(pkts))
3131 for packet in capture:
3133 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3134 self.assertEqual(packet[IPv6].dst, server.ip6)
3135 icmp = packet[ICMPv6DestUnreach]
3136 self.assertEqual(icmp.code, 1)
3137 inner = icmp[IPerror6]
3138 self.assertEqual(inner.src, server.ip6)
3139 self.assertEqual(inner.dst, nat_addr_ip6)
3140 self.check_icmpv6_checksum(packet)
3141 if inner.haslayer(TCPerror):
3142 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3143 self.assertEqual(inner[TCPerror].dport,
3144 client_tcp_out_port)
3146 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3147 self.assertEqual(inner[UDPerror].dport,
3148 client_udp_out_port)
3150 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3153 def test_prefix(self):
3154 """ NAT64 Network-Specific Prefix """
3156 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3158 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3159 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3160 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3161 self.vrf1_nat_addr_n,
3162 vrf_id=self.vrf1_id)
3163 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3166 global_pref64 = "2001:db8::"
3167 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3168 global_pref64_len = 32
3169 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3171 prefix = self.vapi.nat64_prefix_dump()
3172 self.assertEqual(len(prefix), 1)
3173 self.assertEqual(prefix[0].prefix, global_pref64_n)
3174 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3175 self.assertEqual(prefix[0].vrf_id, 0)
3177 # Add tenant specific prefix
3178 vrf1_pref64 = "2001:db8:122:300::"
3179 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3180 vrf1_pref64_len = 56
3181 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3183 vrf_id=self.vrf1_id)
3184 prefix = self.vapi.nat64_prefix_dump()
3185 self.assertEqual(len(prefix), 2)
3188 pkts = self.create_stream_in_ip6(self.pg0,
3191 plen=global_pref64_len)
3192 self.pg0.add_stream(pkts)
3193 self.pg_enable_capture(self.pg_interfaces)
3195 capture = self.pg1.get_capture(len(pkts))
3196 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3197 dst_ip=self.pg1.remote_ip4)
3199 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3200 self.pg1.add_stream(pkts)
3201 self.pg_enable_capture(self.pg_interfaces)
3203 capture = self.pg0.get_capture(len(pkts))
3204 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3207 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3209 # Tenant specific prefix
3210 pkts = self.create_stream_in_ip6(self.pg2,
3213 plen=vrf1_pref64_len)
3214 self.pg2.add_stream(pkts)
3215 self.pg_enable_capture(self.pg_interfaces)
3217 capture = self.pg1.get_capture(len(pkts))
3218 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3219 dst_ip=self.pg1.remote_ip4)
3221 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3222 self.pg1.add_stream(pkts)
3223 self.pg_enable_capture(self.pg_interfaces)
3225 capture = self.pg2.get_capture(len(pkts))
3226 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3229 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3231 def nat64_get_ses_num(self):
3233 Return number of active NAT64 sessions.
3236 st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
3238 st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
3240 st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
3244 def clear_nat64(self):
3246 Clear NAT64 configuration.
3248 self.vapi.nat64_set_timeouts()
3250 interfaces = self.vapi.nat64_interface_dump()
3251 for intf in interfaces:
3252 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3256 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3259 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3267 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3270 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3278 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3281 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3289 adresses = self.vapi.nat64_pool_addr_dump()
3290 for addr in adresses:
3291 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3296 prefixes = self.vapi.nat64_prefix_dump()
3297 for prefix in prefixes:
3298 self.vapi.nat64_add_del_prefix(prefix.prefix,
3300 vrf_id=prefix.vrf_id,
3304 super(TestNAT64, self).tearDown()
3305 if not self.vpp_dead:
3306 self.logger.info(self.vapi.cli("show nat64 pool"))
3307 self.logger.info(self.vapi.cli("show nat64 interfaces"))
3308 self.logger.info(self.vapi.cli("show nat64 prefix"))
3309 self.logger.info(self.vapi.cli("show nat64 bib tcp"))
3310 self.logger.info(self.vapi.cli("show nat64 bib udp"))
3311 self.logger.info(self.vapi.cli("show nat64 bib icmp"))
3312 self.logger.info(self.vapi.cli("show nat64 session table tcp"))
3313 self.logger.info(self.vapi.cli("show nat64 session table udp"))
3314 self.logger.info(self.vapi.cli("show nat64 session table icmp"))
3317 if __name__ == '__main__':
3318 unittest.main(testRunner=VppTestRunner)