9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param ttl: TTL of generated packets
147 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149 TCP(sport=self.tcp_port_in, dport=20))
153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155 UDP(sport=self.udp_port_in, dport=20))
159 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
160 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
161 ICMP(id=self.icmp_id_in, type='echo-request'))
166 def compose_ip6(self, ip4, pref, plen):
168 Compose IPv4-embedded IPv6 addresses
170 :param ip4: IPv4 address
171 :param pref: IPv6 prefix
172 :param plen: IPv6 prefix length
173 :returns: IPv4-embedded IPv6 addresses
175 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
176 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
191 pref_n[10] = ip4_n[3]
195 pref_n[10] = ip4_n[2]
196 pref_n[11] = ip4_n[3]
199 pref_n[10] = ip4_n[1]
200 pref_n[11] = ip4_n[2]
201 pref_n[12] = ip4_n[3]
203 pref_n[12] = ip4_n[0]
204 pref_n[13] = ip4_n[1]
205 pref_n[14] = ip4_n[2]
206 pref_n[15] = ip4_n[3]
207 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
209 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
211 Create IPv6 packet stream for inside network
213 :param in_if: Inside interface
214 :param out_if: Outside interface
215 :param ttl: Hop Limit of generated packets
216 :param pref: NAT64 prefix
217 :param plen: NAT64 prefix length
221 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
223 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
226 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228 TCP(sport=self.tcp_port_in, dport=20))
232 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234 UDP(sport=self.udp_port_in, dport=20))
238 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
239 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
240 ICMPv6EchoRequest(id=self.icmp_id_in))
245 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
247 Create packet stream for outside network
249 :param out_if: Outside interface
250 :param dst_ip: Destination IP address (Default use global NAT address)
251 :param ttl: TTL of generated packets
254 dst_ip = self.nat_addr
257 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259 TCP(dport=self.tcp_port_out, sport=20))
263 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265 UDP(dport=self.udp_port_out, sport=20))
269 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
270 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
271 ICMP(id=self.icmp_id_out, type='echo-reply'))
276 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
277 packet_num=3, dst_ip=None):
279 Verify captured packets on outside network
281 :param capture: Captured packets
282 :param nat_ip: Translated IP address (Default use global NAT address)
283 :param same_port: Sorce port number is not translated (Default False)
284 :param packet_num: Expected number of packets (Default 3)
285 :param dst_ip: Destination IP address (Default do not verify)
288 nat_ip = self.nat_addr
289 self.assertEqual(packet_num, len(capture))
290 for packet in capture:
292 self.check_ip_checksum(packet)
293 self.assertEqual(packet[IP].src, nat_ip)
294 if dst_ip is not None:
295 self.assertEqual(packet[IP].dst, dst_ip)
296 if packet.haslayer(TCP):
298 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
301 packet[TCP].sport, self.tcp_port_in)
302 self.tcp_port_out = packet[TCP].sport
303 self.check_tcp_checksum(packet)
304 elif packet.haslayer(UDP):
306 self.assertEqual(packet[UDP].sport, self.udp_port_in)
309 packet[UDP].sport, self.udp_port_in)
310 self.udp_port_out = packet[UDP].sport
313 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
315 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
316 self.icmp_id_out = packet[ICMP].id
317 self.check_icmp_checksum(packet)
319 self.logger.error(ppp("Unexpected or invalid packet "
320 "(outside network):", packet))
323 def verify_capture_in(self, capture, in_if, packet_num=3):
325 Verify captured packets on inside network
327 :param capture: Captured packets
328 :param in_if: Inside interface
329 :param packet_num: Expected number of packets (Default 3)
331 self.assertEqual(packet_num, len(capture))
332 for packet in capture:
334 self.check_ip_checksum(packet)
335 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
336 if packet.haslayer(TCP):
337 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
338 self.check_tcp_checksum(packet)
339 elif packet.haslayer(UDP):
340 self.assertEqual(packet[UDP].dport, self.udp_port_in)
342 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
343 self.check_icmp_checksum(packet)
345 self.logger.error(ppp("Unexpected or invalid packet "
346 "(inside network):", packet))
349 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
351 Verify captured IPv6 packets on inside network
353 :param capture: Captured packets
354 :param src_ip: Source IP
355 :param dst_ip: Destination IP address
356 :param packet_num: Expected number of packets (Default 3)
358 self.assertEqual(packet_num, len(capture))
359 for packet in capture:
361 self.assertEqual(packet[IPv6].src, src_ip)
362 self.assertEqual(packet[IPv6].dst, dst_ip)
363 if packet.haslayer(TCP):
364 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
365 self.check_tcp_checksum(packet)
366 elif packet.haslayer(UDP):
367 self.assertEqual(packet[UDP].dport, self.udp_port_in)
368 self.check_udp_checksum(packet)
370 self.assertEqual(packet[ICMPv6EchoReply].id,
372 self.check_icmpv6_checksum(packet)
374 self.logger.error(ppp("Unexpected or invalid packet "
375 "(inside network):", packet))
378 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
380 Verify captured packet that don't have to be translated
382 :param capture: Captured packets
383 :param ingress_if: Ingress interface
384 :param egress_if: Egress interface
386 for packet in capture:
388 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
389 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
390 if packet.haslayer(TCP):
391 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
392 elif packet.haslayer(UDP):
393 self.assertEqual(packet[UDP].sport, self.udp_port_in)
395 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
397 self.logger.error(ppp("Unexpected or invalid packet "
398 "(inside network):", packet))
401 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
402 packet_num=3, icmp_type=11):
404 Verify captured packets with ICMP errors on outside network
406 :param capture: Captured packets
407 :param src_ip: Translated IP address or IP address of VPP
408 (Default use global NAT address)
409 :param packet_num: Expected number of packets (Default 3)
410 :param icmp_type: Type of error ICMP packet
411 we are expecting (Default 11)
414 src_ip = self.nat_addr
415 self.assertEqual(packet_num, len(capture))
416 for packet in capture:
418 self.assertEqual(packet[IP].src, src_ip)
419 self.assertTrue(packet.haslayer(ICMP))
421 self.assertEqual(icmp.type, icmp_type)
422 self.assertTrue(icmp.haslayer(IPerror))
423 inner_ip = icmp[IPerror]
424 if inner_ip.haslayer(TCPerror):
425 self.assertEqual(inner_ip[TCPerror].dport,
427 elif inner_ip.haslayer(UDPerror):
428 self.assertEqual(inner_ip[UDPerror].dport,
431 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
433 self.logger.error(ppp("Unexpected or invalid packet "
434 "(outside network):", packet))
437 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
440 Verify captured packets with ICMP errors on inside network
442 :param capture: Captured packets
443 :param in_if: Inside interface
444 :param packet_num: Expected number of packets (Default 3)
445 :param icmp_type: Type of error ICMP packet
446 we are expecting (Default 11)
448 self.assertEqual(packet_num, len(capture))
449 for packet in capture:
451 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
452 self.assertTrue(packet.haslayer(ICMP))
454 self.assertEqual(icmp.type, icmp_type)
455 self.assertTrue(icmp.haslayer(IPerror))
456 inner_ip = icmp[IPerror]
457 if inner_ip.haslayer(TCPerror):
458 self.assertEqual(inner_ip[TCPerror].sport,
460 elif inner_ip.haslayer(UDPerror):
461 self.assertEqual(inner_ip[UDPerror].sport,
464 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
466 self.logger.error(ppp("Unexpected or invalid packet "
467 "(inside network):", packet))
470 def create_stream_frag(self, src_if, dst, sport, dport, data):
472 Create fragmented packet stream
474 :param src_if: Source interface
475 :param dst: Destination IPv4 address
476 :param sport: Source TCP port
477 :param dport: Destination TCP port
478 :param data: Payload data
481 id = random.randint(0, 65535)
482 p = (IP(src=src_if.remote_ip4, dst=dst) /
483 TCP(sport=sport, dport=dport) /
485 p = p.__class__(str(p))
486 chksum = p['TCP'].chksum
488 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
489 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
490 TCP(sport=sport, dport=dport, chksum=chksum) /
493 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
494 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
495 proto=IP_PROTOS.tcp) /
498 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
499 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
505 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
506 pref=None, plen=0, frag_size=128):
508 Create fragmented packet stream
510 :param src_if: Source interface
511 :param dst: Destination IPv4 address
512 :param sport: Source TCP port
513 :param dport: Destination TCP port
514 :param data: Payload data
515 :param pref: NAT64 prefix
516 :param plen: NAT64 prefix length
517 :param fragsize: size of fragments
521 dst_ip6 = ''.join(['64:ff9b::', dst])
523 dst_ip6 = self.compose_ip6(dst, pref, plen)
525 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
526 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
527 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
528 TCP(sport=sport, dport=dport) /
531 return fragment6(p, frag_size)
533 def reass_frags_and_verify(self, frags, src, dst):
535 Reassemble and verify fragmented packet
537 :param frags: Captured fragments
538 :param src: Source IPv4 address to verify
539 :param dst: Destination IPv4 address to verify
541 :returns: Reassembled IPv4 packet
543 buffer = StringIO.StringIO()
545 self.assertEqual(p[IP].src, src)
546 self.assertEqual(p[IP].dst, dst)
547 self.check_ip_checksum(p)
548 buffer.seek(p[IP].frag * 8)
549 buffer.write(p[IP].payload)
550 ip = frags[0].getlayer(IP)
551 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
552 proto=frags[0][IP].proto)
553 if ip.proto == IP_PROTOS.tcp:
554 p = (ip / TCP(buffer.getvalue()))
555 self.check_tcp_checksum(p)
556 elif ip.proto == IP_PROTOS.udp:
557 p = (ip / UDP(buffer.getvalue()))
560 def reass_frags_and_verify_ip6(self, frags, src, dst):
562 Reassemble and verify fragmented packet
564 :param frags: Captured fragments
565 :param src: Source IPv6 address to verify
566 :param dst: Destination IPv6 address to verify
568 :returns: Reassembled IPv6 packet
570 buffer = StringIO.StringIO()
572 self.assertEqual(p[IPv6].src, src)
573 self.assertEqual(p[IPv6].dst, dst)
574 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
575 buffer.write(p[IPv6ExtHdrFragment].payload)
576 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
577 nh=frags[0][IPv6ExtHdrFragment].nh)
578 if ip.nh == IP_PROTOS.tcp:
579 p = (ip / TCP(buffer.getvalue()))
580 self.check_tcp_checksum(p)
581 elif ip.nh == IP_PROTOS.udp:
582 p = (ip / UDP(buffer.getvalue()))
585 def verify_ipfix_nat44_ses(self, data):
587 Verify IPFIX NAT44 session create/delete event
589 :param data: Decoded IPFIX data records
591 nat44_ses_create_num = 0
592 nat44_ses_delete_num = 0
593 self.assertEqual(6, len(data))
596 self.assertIn(ord(record[230]), [4, 5])
597 if ord(record[230]) == 4:
598 nat44_ses_create_num += 1
600 nat44_ses_delete_num += 1
602 self.assertEqual(self.pg0.remote_ip4n, record[8])
603 # postNATSourceIPv4Address
604 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
607 self.assertEqual(struct.pack("!I", 0), record[234])
608 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
609 if IP_PROTOS.icmp == ord(record[4]):
610 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
611 self.assertEqual(struct.pack("!H", self.icmp_id_out),
613 elif IP_PROTOS.tcp == ord(record[4]):
614 self.assertEqual(struct.pack("!H", self.tcp_port_in),
616 self.assertEqual(struct.pack("!H", self.tcp_port_out),
618 elif IP_PROTOS.udp == ord(record[4]):
619 self.assertEqual(struct.pack("!H", self.udp_port_in),
621 self.assertEqual(struct.pack("!H", self.udp_port_out),
624 self.fail("Invalid protocol")
625 self.assertEqual(3, nat44_ses_create_num)
626 self.assertEqual(3, nat44_ses_delete_num)
628 def verify_ipfix_addr_exhausted(self, data):
630 Verify IPFIX NAT addresses event
632 :param data: Decoded IPFIX data records
634 self.assertEqual(1, len(data))
637 self.assertEqual(ord(record[230]), 3)
639 self.assertEqual(struct.pack("!I", 0), record[283])
642 class TestNAT44(MethodHolder):
643 """ NAT44 Test Cases """
647 super(TestNAT44, cls).setUpClass()
650 cls.tcp_port_in = 6303
651 cls.tcp_port_out = 6303
652 cls.udp_port_in = 6304
653 cls.udp_port_out = 6304
654 cls.icmp_id_in = 6305
655 cls.icmp_id_out = 6305
656 cls.nat_addr = '10.0.0.3'
657 cls.ipfix_src_port = 4739
658 cls.ipfix_domain_id = 1
660 cls.create_pg_interfaces(range(10))
661 cls.interfaces = list(cls.pg_interfaces[0:4])
663 for i in cls.interfaces:
668 cls.pg0.generate_remote_hosts(3)
669 cls.pg0.configure_ipv4_neighbors()
671 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
672 cls.vapi.ip_table_add_del(10, is_add=1)
673 cls.vapi.ip_table_add_del(20, is_add=1)
675 cls.pg4._local_ip4 = "172.16.255.1"
676 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
677 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
678 cls.pg4.set_table_ip4(10)
679 cls.pg5._local_ip4 = "172.17.255.3"
680 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
681 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
682 cls.pg5.set_table_ip4(10)
683 cls.pg6._local_ip4 = "172.16.255.1"
684 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
685 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
686 cls.pg6.set_table_ip4(20)
687 for i in cls.overlapping_interfaces:
695 cls.pg9.generate_remote_hosts(2)
697 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
698 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
702 cls.pg9.resolve_arp()
703 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
704 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
705 cls.pg9.resolve_arp()
710 super(TestNAT44, cls).tearDownClass()
713 def clear_nat44(self):
715 Clear NAT44 configuration.
717 # I found no elegant way to do this
718 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
719 dst_address_length=32,
720 next_hop_address=self.pg7.remote_ip4n,
721 next_hop_sw_if_index=self.pg7.sw_if_index,
723 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
724 dst_address_length=32,
725 next_hop_address=self.pg8.remote_ip4n,
726 next_hop_sw_if_index=self.pg8.sw_if_index,
729 for intf in [self.pg7, self.pg8]:
730 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
732 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
737 if self.pg7.has_ip4_config:
738 self.pg7.unconfig_ip4()
740 interfaces = self.vapi.nat44_interface_addr_dump()
741 for intf in interfaces:
742 self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
744 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
745 domain_id=self.ipfix_domain_id)
746 self.ipfix_src_port = 4739
747 self.ipfix_domain_id = 1
749 interfaces = self.vapi.nat44_interface_dump()
750 for intf in interfaces:
751 if intf.is_inside > 1:
752 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
755 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
759 interfaces = self.vapi.nat44_interface_output_feature_dump()
760 for intf in interfaces:
761 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
765 static_mappings = self.vapi.nat44_static_mapping_dump()
766 for sm in static_mappings:
767 self.vapi.nat44_add_del_static_mapping(
769 sm.external_ip_address,
770 local_port=sm.local_port,
771 external_port=sm.external_port,
772 addr_only=sm.addr_only,
774 protocol=sm.protocol,
777 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
778 for lb_sm in lb_static_mappings:
779 self.vapi.nat44_add_del_lb_static_mapping(
788 adresses = self.vapi.nat44_address_dump()
789 for addr in adresses:
790 self.vapi.nat44_add_del_address_range(addr.ip_address,
794 self.vapi.nat_set_reass()
795 self.vapi.nat_set_reass(is_ip6=1)
797 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
798 local_port=0, external_port=0, vrf_id=0,
799 is_add=1, external_sw_if_index=0xFFFFFFFF,
802 Add/delete NAT44 static mapping
804 :param local_ip: Local IP address
805 :param external_ip: External IP address
806 :param local_port: Local port number (Optional)
807 :param external_port: External port number (Optional)
808 :param vrf_id: VRF ID (Default 0)
809 :param is_add: 1 if add, 0 if delete (Default add)
810 :param external_sw_if_index: External interface instead of IP address
811 :param proto: IP protocol (Mandatory if port specified)
814 if local_port and external_port:
816 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
817 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
818 self.vapi.nat44_add_del_static_mapping(
821 external_sw_if_index,
829 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
831 Add/delete NAT44 address
833 :param ip: IP address
834 :param is_add: 1 if add, 0 if delete (Default add)
836 nat_addr = socket.inet_pton(socket.AF_INET, ip)
837 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
840 def test_dynamic(self):
841 """ NAT44 dynamic translation test """
843 self.nat44_add_address(self.nat_addr)
844 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
845 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
849 pkts = self.create_stream_in(self.pg0, self.pg1)
850 self.pg0.add_stream(pkts)
851 self.pg_enable_capture(self.pg_interfaces)
853 capture = self.pg1.get_capture(len(pkts))
854 self.verify_capture_out(capture)
857 pkts = self.create_stream_out(self.pg1)
858 self.pg1.add_stream(pkts)
859 self.pg_enable_capture(self.pg_interfaces)
861 capture = self.pg0.get_capture(len(pkts))
862 self.verify_capture_in(capture, self.pg0)
864 def test_dynamic_icmp_errors_in2out_ttl_1(self):
865 """ NAT44 handling of client packets with TTL=1 """
867 self.nat44_add_address(self.nat_addr)
868 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
869 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
872 # Client side - generate traffic
873 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
874 self.pg0.add_stream(pkts)
875 self.pg_enable_capture(self.pg_interfaces)
878 # Client side - verify ICMP type 11 packets
879 capture = self.pg0.get_capture(len(pkts))
880 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
882 def test_dynamic_icmp_errors_out2in_ttl_1(self):
883 """ NAT44 handling of server packets with TTL=1 """
885 self.nat44_add_address(self.nat_addr)
886 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
887 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
890 # Client side - create sessions
891 pkts = self.create_stream_in(self.pg0, self.pg1)
892 self.pg0.add_stream(pkts)
893 self.pg_enable_capture(self.pg_interfaces)
896 # Server side - generate traffic
897 capture = self.pg1.get_capture(len(pkts))
898 self.verify_capture_out(capture)
899 pkts = self.create_stream_out(self.pg1, ttl=1)
900 self.pg1.add_stream(pkts)
901 self.pg_enable_capture(self.pg_interfaces)
904 # Server side - verify ICMP type 11 packets
905 capture = self.pg1.get_capture(len(pkts))
906 self.verify_capture_out_with_icmp_errors(capture,
907 src_ip=self.pg1.local_ip4)
909 def test_dynamic_icmp_errors_in2out_ttl_2(self):
910 """ NAT44 handling of error responses to client packets with TTL=2 """
912 self.nat44_add_address(self.nat_addr)
913 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
914 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
917 # Client side - generate traffic
918 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
919 self.pg0.add_stream(pkts)
920 self.pg_enable_capture(self.pg_interfaces)
923 # Server side - simulate ICMP type 11 response
924 capture = self.pg1.get_capture(len(pkts))
925 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
926 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
927 ICMP(type=11) / packet[IP] for packet in capture]
928 self.pg1.add_stream(pkts)
929 self.pg_enable_capture(self.pg_interfaces)
932 # Client side - verify ICMP type 11 packets
933 capture = self.pg0.get_capture(len(pkts))
934 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
936 def test_dynamic_icmp_errors_out2in_ttl_2(self):
937 """ NAT44 handling of error responses to server packets with TTL=2 """
939 self.nat44_add_address(self.nat_addr)
940 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
941 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
944 # Client side - create sessions
945 pkts = self.create_stream_in(self.pg0, self.pg1)
946 self.pg0.add_stream(pkts)
947 self.pg_enable_capture(self.pg_interfaces)
950 # Server side - generate traffic
951 capture = self.pg1.get_capture(len(pkts))
952 self.verify_capture_out(capture)
953 pkts = self.create_stream_out(self.pg1, ttl=2)
954 self.pg1.add_stream(pkts)
955 self.pg_enable_capture(self.pg_interfaces)
958 # Client side - simulate ICMP type 11 response
959 capture = self.pg0.get_capture(len(pkts))
960 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
961 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
962 ICMP(type=11) / packet[IP] for packet in capture]
963 self.pg0.add_stream(pkts)
964 self.pg_enable_capture(self.pg_interfaces)
967 # Server side - verify ICMP type 11 packets
968 capture = self.pg1.get_capture(len(pkts))
969 self.verify_capture_out_with_icmp_errors(capture)
971 def test_ping_out_interface_from_outside(self):
972 """ Ping NAT44 out interface from outside network """
974 self.nat44_add_address(self.nat_addr)
975 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
976 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
979 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
980 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
981 ICMP(id=self.icmp_id_out, type='echo-request'))
983 self.pg1.add_stream(pkts)
984 self.pg_enable_capture(self.pg_interfaces)
986 capture = self.pg1.get_capture(len(pkts))
987 self.assertEqual(1, len(capture))
990 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
991 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
992 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
993 self.assertEqual(packet[ICMP].type, 0) # echo reply
995 self.logger.error(ppp("Unexpected or invalid packet "
996 "(outside network):", packet))
999 def test_ping_internal_host_from_outside(self):
1000 """ Ping internal host from outside network """
1002 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1003 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1004 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1008 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1009 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1010 ICMP(id=self.icmp_id_out, type='echo-request'))
1011 self.pg1.add_stream(pkt)
1012 self.pg_enable_capture(self.pg_interfaces)
1014 capture = self.pg0.get_capture(1)
1015 self.verify_capture_in(capture, self.pg0, packet_num=1)
1016 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1019 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1020 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1021 ICMP(id=self.icmp_id_in, type='echo-reply'))
1022 self.pg0.add_stream(pkt)
1023 self.pg_enable_capture(self.pg_interfaces)
1025 capture = self.pg1.get_capture(1)
1026 self.verify_capture_out(capture, same_port=True, packet_num=1)
1027 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1029 def test_static_in(self):
1030 """ 1:1 NAT initialized from inside network """
1032 nat_ip = "10.0.0.10"
1033 self.tcp_port_out = 6303
1034 self.udp_port_out = 6304
1035 self.icmp_id_out = 6305
1037 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1038 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1039 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1043 pkts = self.create_stream_in(self.pg0, self.pg1)
1044 self.pg0.add_stream(pkts)
1045 self.pg_enable_capture(self.pg_interfaces)
1047 capture = self.pg1.get_capture(len(pkts))
1048 self.verify_capture_out(capture, nat_ip, True)
1051 pkts = self.create_stream_out(self.pg1, nat_ip)
1052 self.pg1.add_stream(pkts)
1053 self.pg_enable_capture(self.pg_interfaces)
1055 capture = self.pg0.get_capture(len(pkts))
1056 self.verify_capture_in(capture, self.pg0)
1058 def test_static_out(self):
1059 """ 1:1 NAT initialized from outside network """
1061 nat_ip = "10.0.0.20"
1062 self.tcp_port_out = 6303
1063 self.udp_port_out = 6304
1064 self.icmp_id_out = 6305
1066 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1067 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1068 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1072 pkts = self.create_stream_out(self.pg1, nat_ip)
1073 self.pg1.add_stream(pkts)
1074 self.pg_enable_capture(self.pg_interfaces)
1076 capture = self.pg0.get_capture(len(pkts))
1077 self.verify_capture_in(capture, self.pg0)
1080 pkts = self.create_stream_in(self.pg0, self.pg1)
1081 self.pg0.add_stream(pkts)
1082 self.pg_enable_capture(self.pg_interfaces)
1084 capture = self.pg1.get_capture(len(pkts))
1085 self.verify_capture_out(capture, nat_ip, True)
1087 def test_static_with_port_in(self):
1088 """ 1:1 NAPT initialized from inside network """
1090 self.tcp_port_out = 3606
1091 self.udp_port_out = 3607
1092 self.icmp_id_out = 3608
1094 self.nat44_add_address(self.nat_addr)
1095 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1096 self.tcp_port_in, self.tcp_port_out,
1097 proto=IP_PROTOS.tcp)
1098 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1099 self.udp_port_in, self.udp_port_out,
1100 proto=IP_PROTOS.udp)
1101 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1102 self.icmp_id_in, self.icmp_id_out,
1103 proto=IP_PROTOS.icmp)
1104 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1105 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1109 pkts = self.create_stream_in(self.pg0, self.pg1)
1110 self.pg0.add_stream(pkts)
1111 self.pg_enable_capture(self.pg_interfaces)
1113 capture = self.pg1.get_capture(len(pkts))
1114 self.verify_capture_out(capture)
1117 pkts = self.create_stream_out(self.pg1)
1118 self.pg1.add_stream(pkts)
1119 self.pg_enable_capture(self.pg_interfaces)
1121 capture = self.pg0.get_capture(len(pkts))
1122 self.verify_capture_in(capture, self.pg0)
1124 def test_static_with_port_out(self):
1125 """ 1:1 NAPT initialized from outside network """
1127 self.tcp_port_out = 30606
1128 self.udp_port_out = 30607
1129 self.icmp_id_out = 30608
1131 self.nat44_add_address(self.nat_addr)
1132 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1133 self.tcp_port_in, self.tcp_port_out,
1134 proto=IP_PROTOS.tcp)
1135 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1136 self.udp_port_in, self.udp_port_out,
1137 proto=IP_PROTOS.udp)
1138 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1139 self.icmp_id_in, self.icmp_id_out,
1140 proto=IP_PROTOS.icmp)
1141 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1142 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1146 pkts = self.create_stream_out(self.pg1)
1147 self.pg1.add_stream(pkts)
1148 self.pg_enable_capture(self.pg_interfaces)
1150 capture = self.pg0.get_capture(len(pkts))
1151 self.verify_capture_in(capture, self.pg0)
1154 pkts = self.create_stream_in(self.pg0, self.pg1)
1155 self.pg0.add_stream(pkts)
1156 self.pg_enable_capture(self.pg_interfaces)
1158 capture = self.pg1.get_capture(len(pkts))
1159 self.verify_capture_out(capture)
1161 def test_static_vrf_aware(self):
1162 """ 1:1 NAT VRF awareness """
1164 nat_ip1 = "10.0.0.30"
1165 nat_ip2 = "10.0.0.40"
1166 self.tcp_port_out = 6303
1167 self.udp_port_out = 6304
1168 self.icmp_id_out = 6305
1170 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1172 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1174 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1176 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1177 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1179 # inside interface VRF match NAT44 static mapping VRF
1180 pkts = self.create_stream_in(self.pg4, self.pg3)
1181 self.pg4.add_stream(pkts)
1182 self.pg_enable_capture(self.pg_interfaces)
1184 capture = self.pg3.get_capture(len(pkts))
1185 self.verify_capture_out(capture, nat_ip1, True)
1187 # inside interface VRF don't match NAT44 static mapping VRF (packets
1189 pkts = self.create_stream_in(self.pg0, self.pg3)
1190 self.pg0.add_stream(pkts)
1191 self.pg_enable_capture(self.pg_interfaces)
1193 self.pg3.assert_nothing_captured()
1195 def test_static_lb(self):
1196 """ NAT44 local service load balancing """
1197 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1200 server1 = self.pg0.remote_hosts[0]
1201 server2 = self.pg0.remote_hosts[1]
1203 locals = [{'addr': server1.ip4n,
1206 {'addr': server2.ip4n,
1210 self.nat44_add_address(self.nat_addr)
1211 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1214 local_num=len(locals),
1216 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1217 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1220 # from client to service
1221 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1222 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1223 TCP(sport=12345, dport=external_port))
1224 self.pg1.add_stream(p)
1225 self.pg_enable_capture(self.pg_interfaces)
1227 capture = self.pg0.get_capture(1)
1233 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1234 if ip.dst == server1.ip4:
1238 self.assertEqual(tcp.dport, local_port)
1239 self.check_tcp_checksum(p)
1240 self.check_ip_checksum(p)
1242 self.logger.error(ppp("Unexpected or invalid packet:", p))
1245 # from service back to client
1246 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1247 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1248 TCP(sport=local_port, dport=12345))
1249 self.pg0.add_stream(p)
1250 self.pg_enable_capture(self.pg_interfaces)
1252 capture = self.pg1.get_capture(1)
1257 self.assertEqual(ip.src, self.nat_addr)
1258 self.assertEqual(tcp.sport, external_port)
1259 self.check_tcp_checksum(p)
1260 self.check_ip_checksum(p)
1262 self.logger.error(ppp("Unexpected or invalid packet:", p))
1268 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1270 for client in clients:
1271 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1272 IP(src=client, dst=self.nat_addr) /
1273 TCP(sport=12345, dport=external_port))
1275 self.pg1.add_stream(pkts)
1276 self.pg_enable_capture(self.pg_interfaces)
1278 capture = self.pg0.get_capture(len(pkts))
1280 if p[IP].dst == server1.ip4:
1284 self.assertTrue(server1_n > server2_n)
1286 def test_multiple_inside_interfaces(self):
1287 """ NAT44 multiple non-overlapping address space inside interfaces """
1289 self.nat44_add_address(self.nat_addr)
1290 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1291 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1292 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1295 # between two NAT44 inside interfaces (no translation)
1296 pkts = self.create_stream_in(self.pg0, self.pg1)
1297 self.pg0.add_stream(pkts)
1298 self.pg_enable_capture(self.pg_interfaces)
1300 capture = self.pg1.get_capture(len(pkts))
1301 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1303 # from NAT44 inside to interface without NAT44 feature (no translation)
1304 pkts = self.create_stream_in(self.pg0, self.pg2)
1305 self.pg0.add_stream(pkts)
1306 self.pg_enable_capture(self.pg_interfaces)
1308 capture = self.pg2.get_capture(len(pkts))
1309 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1311 # in2out 1st interface
1312 pkts = self.create_stream_in(self.pg0, self.pg3)
1313 self.pg0.add_stream(pkts)
1314 self.pg_enable_capture(self.pg_interfaces)
1316 capture = self.pg3.get_capture(len(pkts))
1317 self.verify_capture_out(capture)
1319 # out2in 1st interface
1320 pkts = self.create_stream_out(self.pg3)
1321 self.pg3.add_stream(pkts)
1322 self.pg_enable_capture(self.pg_interfaces)
1324 capture = self.pg0.get_capture(len(pkts))
1325 self.verify_capture_in(capture, self.pg0)
1327 # in2out 2nd interface
1328 pkts = self.create_stream_in(self.pg1, self.pg3)
1329 self.pg1.add_stream(pkts)
1330 self.pg_enable_capture(self.pg_interfaces)
1332 capture = self.pg3.get_capture(len(pkts))
1333 self.verify_capture_out(capture)
1335 # out2in 2nd interface
1336 pkts = self.create_stream_out(self.pg3)
1337 self.pg3.add_stream(pkts)
1338 self.pg_enable_capture(self.pg_interfaces)
1340 capture = self.pg1.get_capture(len(pkts))
1341 self.verify_capture_in(capture, self.pg1)
1343 def test_inside_overlapping_interfaces(self):
1344 """ NAT44 multiple inside interfaces with overlapping address space """
1346 static_nat_ip = "10.0.0.10"
1347 self.nat44_add_address(self.nat_addr)
1348 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1350 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1351 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1352 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1353 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1356 # between NAT44 inside interfaces with same VRF (no translation)
1357 pkts = self.create_stream_in(self.pg4, self.pg5)
1358 self.pg4.add_stream(pkts)
1359 self.pg_enable_capture(self.pg_interfaces)
1361 capture = self.pg5.get_capture(len(pkts))
1362 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1364 # between NAT44 inside interfaces with different VRF (hairpinning)
1365 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1366 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1367 TCP(sport=1234, dport=5678))
1368 self.pg4.add_stream(p)
1369 self.pg_enable_capture(self.pg_interfaces)
1371 capture = self.pg6.get_capture(1)
1376 self.assertEqual(ip.src, self.nat_addr)
1377 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1378 self.assertNotEqual(tcp.sport, 1234)
1379 self.assertEqual(tcp.dport, 5678)
1381 self.logger.error(ppp("Unexpected or invalid packet:", p))
1384 # in2out 1st interface
1385 pkts = self.create_stream_in(self.pg4, self.pg3)
1386 self.pg4.add_stream(pkts)
1387 self.pg_enable_capture(self.pg_interfaces)
1389 capture = self.pg3.get_capture(len(pkts))
1390 self.verify_capture_out(capture)
1392 # out2in 1st interface
1393 pkts = self.create_stream_out(self.pg3)
1394 self.pg3.add_stream(pkts)
1395 self.pg_enable_capture(self.pg_interfaces)
1397 capture = self.pg4.get_capture(len(pkts))
1398 self.verify_capture_in(capture, self.pg4)
1400 # in2out 2nd interface
1401 pkts = self.create_stream_in(self.pg5, self.pg3)
1402 self.pg5.add_stream(pkts)
1403 self.pg_enable_capture(self.pg_interfaces)
1405 capture = self.pg3.get_capture(len(pkts))
1406 self.verify_capture_out(capture)
1408 # out2in 2nd interface
1409 pkts = self.create_stream_out(self.pg3)
1410 self.pg3.add_stream(pkts)
1411 self.pg_enable_capture(self.pg_interfaces)
1413 capture = self.pg5.get_capture(len(pkts))
1414 self.verify_capture_in(capture, self.pg5)
1417 addresses = self.vapi.nat44_address_dump()
1418 self.assertEqual(len(addresses), 1)
1419 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1420 self.assertEqual(len(sessions), 3)
1421 for session in sessions:
1422 self.assertFalse(session.is_static)
1423 self.assertEqual(session.inside_ip_address[0:4],
1424 self.pg5.remote_ip4n)
1425 self.assertEqual(session.outside_ip_address,
1426 addresses[0].ip_address)
1427 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1428 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1429 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1430 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1431 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1432 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1433 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1434 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1435 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1437 # in2out 3rd interface
1438 pkts = self.create_stream_in(self.pg6, self.pg3)
1439 self.pg6.add_stream(pkts)
1440 self.pg_enable_capture(self.pg_interfaces)
1442 capture = self.pg3.get_capture(len(pkts))
1443 self.verify_capture_out(capture, static_nat_ip, True)
1445 # out2in 3rd interface
1446 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1447 self.pg3.add_stream(pkts)
1448 self.pg_enable_capture(self.pg_interfaces)
1450 capture = self.pg6.get_capture(len(pkts))
1451 self.verify_capture_in(capture, self.pg6)
1453 # general user and session dump verifications
1454 users = self.vapi.nat44_user_dump()
1455 self.assertTrue(len(users) >= 3)
1456 addresses = self.vapi.nat44_address_dump()
1457 self.assertEqual(len(addresses), 1)
1459 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1461 for session in sessions:
1462 self.assertEqual(user.ip_address, session.inside_ip_address)
1463 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1464 self.assertTrue(session.protocol in
1465 [IP_PROTOS.tcp, IP_PROTOS.udp,
1469 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1470 self.assertTrue(len(sessions) >= 4)
1471 for session in sessions:
1472 self.assertFalse(session.is_static)
1473 self.assertEqual(session.inside_ip_address[0:4],
1474 self.pg4.remote_ip4n)
1475 self.assertEqual(session.outside_ip_address,
1476 addresses[0].ip_address)
1479 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1480 self.assertTrue(len(sessions) >= 3)
1481 for session in sessions:
1482 self.assertTrue(session.is_static)
1483 self.assertEqual(session.inside_ip_address[0:4],
1484 self.pg6.remote_ip4n)
1485 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1486 map(int, static_nat_ip.split('.')))
1487 self.assertTrue(session.inside_port in
1488 [self.tcp_port_in, self.udp_port_in,
1491 def test_hairpinning(self):
1492 """ NAT44 hairpinning - 1:1 NAPT """
1494 host = self.pg0.remote_hosts[0]
1495 server = self.pg0.remote_hosts[1]
1498 server_in_port = 5678
1499 server_out_port = 8765
1501 self.nat44_add_address(self.nat_addr)
1502 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1503 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1505 # add static mapping for server
1506 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1507 server_in_port, server_out_port,
1508 proto=IP_PROTOS.tcp)
1510 # send packet from host to server
1511 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1512 IP(src=host.ip4, dst=self.nat_addr) /
1513 TCP(sport=host_in_port, dport=server_out_port))
1514 self.pg0.add_stream(p)
1515 self.pg_enable_capture(self.pg_interfaces)
1517 capture = self.pg0.get_capture(1)
1522 self.assertEqual(ip.src, self.nat_addr)
1523 self.assertEqual(ip.dst, server.ip4)
1524 self.assertNotEqual(tcp.sport, host_in_port)
1525 self.assertEqual(tcp.dport, server_in_port)
1526 self.check_tcp_checksum(p)
1527 host_out_port = tcp.sport
1529 self.logger.error(ppp("Unexpected or invalid packet:", p))
1532 # send reply from server to host
1533 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1534 IP(src=server.ip4, dst=self.nat_addr) /
1535 TCP(sport=server_in_port, dport=host_out_port))
1536 self.pg0.add_stream(p)
1537 self.pg_enable_capture(self.pg_interfaces)
1539 capture = self.pg0.get_capture(1)
1544 self.assertEqual(ip.src, self.nat_addr)
1545 self.assertEqual(ip.dst, host.ip4)
1546 self.assertEqual(tcp.sport, server_out_port)
1547 self.assertEqual(tcp.dport, host_in_port)
1548 self.check_tcp_checksum(p)
1550 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1553 def test_hairpinning2(self):
1554 """ NAT44 hairpinning - 1:1 NAT"""
1556 server1_nat_ip = "10.0.0.10"
1557 server2_nat_ip = "10.0.0.11"
1558 host = self.pg0.remote_hosts[0]
1559 server1 = self.pg0.remote_hosts[1]
1560 server2 = self.pg0.remote_hosts[2]
1561 server_tcp_port = 22
1562 server_udp_port = 20
1564 self.nat44_add_address(self.nat_addr)
1565 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1566 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1569 # add static mapping for servers
1570 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1571 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1575 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1576 IP(src=host.ip4, dst=server1_nat_ip) /
1577 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1579 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1580 IP(src=host.ip4, dst=server1_nat_ip) /
1581 UDP(sport=self.udp_port_in, dport=server_udp_port))
1583 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1584 IP(src=host.ip4, dst=server1_nat_ip) /
1585 ICMP(id=self.icmp_id_in, type='echo-request'))
1587 self.pg0.add_stream(pkts)
1588 self.pg_enable_capture(self.pg_interfaces)
1590 capture = self.pg0.get_capture(len(pkts))
1591 for packet in capture:
1593 self.assertEqual(packet[IP].src, self.nat_addr)
1594 self.assertEqual(packet[IP].dst, server1.ip4)
1595 if packet.haslayer(TCP):
1596 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1597 self.assertEqual(packet[TCP].dport, server_tcp_port)
1598 self.tcp_port_out = packet[TCP].sport
1599 self.check_tcp_checksum(packet)
1600 elif packet.haslayer(UDP):
1601 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1602 self.assertEqual(packet[UDP].dport, server_udp_port)
1603 self.udp_port_out = packet[UDP].sport
1605 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1606 self.icmp_id_out = packet[ICMP].id
1608 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1613 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1614 IP(src=server1.ip4, dst=self.nat_addr) /
1615 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1617 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1618 IP(src=server1.ip4, dst=self.nat_addr) /
1619 UDP(sport=server_udp_port, dport=self.udp_port_out))
1621 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1622 IP(src=server1.ip4, dst=self.nat_addr) /
1623 ICMP(id=self.icmp_id_out, type='echo-reply'))
1625 self.pg0.add_stream(pkts)
1626 self.pg_enable_capture(self.pg_interfaces)
1628 capture = self.pg0.get_capture(len(pkts))
1629 for packet in capture:
1631 self.assertEqual(packet[IP].src, server1_nat_ip)
1632 self.assertEqual(packet[IP].dst, host.ip4)
1633 if packet.haslayer(TCP):
1634 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1635 self.assertEqual(packet[TCP].sport, server_tcp_port)
1636 self.check_tcp_checksum(packet)
1637 elif packet.haslayer(UDP):
1638 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1639 self.assertEqual(packet[UDP].sport, server_udp_port)
1641 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1643 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1646 # server2 to server1
1648 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1649 IP(src=server2.ip4, dst=server1_nat_ip) /
1650 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1652 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1653 IP(src=server2.ip4, dst=server1_nat_ip) /
1654 UDP(sport=self.udp_port_in, dport=server_udp_port))
1656 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1657 IP(src=server2.ip4, dst=server1_nat_ip) /
1658 ICMP(id=self.icmp_id_in, type='echo-request'))
1660 self.pg0.add_stream(pkts)
1661 self.pg_enable_capture(self.pg_interfaces)
1663 capture = self.pg0.get_capture(len(pkts))
1664 for packet in capture:
1666 self.assertEqual(packet[IP].src, server2_nat_ip)
1667 self.assertEqual(packet[IP].dst, server1.ip4)
1668 if packet.haslayer(TCP):
1669 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1670 self.assertEqual(packet[TCP].dport, server_tcp_port)
1671 self.tcp_port_out = packet[TCP].sport
1672 self.check_tcp_checksum(packet)
1673 elif packet.haslayer(UDP):
1674 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1675 self.assertEqual(packet[UDP].dport, server_udp_port)
1676 self.udp_port_out = packet[UDP].sport
1678 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1679 self.icmp_id_out = packet[ICMP].id
1681 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1684 # server1 to server2
1686 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1687 IP(src=server1.ip4, dst=server2_nat_ip) /
1688 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1690 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1691 IP(src=server1.ip4, dst=server2_nat_ip) /
1692 UDP(sport=server_udp_port, dport=self.udp_port_out))
1694 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1695 IP(src=server1.ip4, dst=server2_nat_ip) /
1696 ICMP(id=self.icmp_id_out, type='echo-reply'))
1698 self.pg0.add_stream(pkts)
1699 self.pg_enable_capture(self.pg_interfaces)
1701 capture = self.pg0.get_capture(len(pkts))
1702 for packet in capture:
1704 self.assertEqual(packet[IP].src, server1_nat_ip)
1705 self.assertEqual(packet[IP].dst, server2.ip4)
1706 if packet.haslayer(TCP):
1707 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1708 self.assertEqual(packet[TCP].sport, server_tcp_port)
1709 self.check_tcp_checksum(packet)
1710 elif packet.haslayer(UDP):
1711 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1712 self.assertEqual(packet[UDP].sport, server_udp_port)
1714 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1716 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1719 def test_max_translations_per_user(self):
1720 """ MAX translations per user - recycle the least recently used """
1722 self.nat44_add_address(self.nat_addr)
1723 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1724 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1727 # get maximum number of translations per user
1728 nat44_config = self.vapi.nat_show_config()
1730 # send more than maximum number of translations per user packets
1731 pkts_num = nat44_config.max_translations_per_user + 5
1733 for port in range(0, pkts_num):
1734 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1735 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1736 TCP(sport=1025 + port))
1738 self.pg0.add_stream(pkts)
1739 self.pg_enable_capture(self.pg_interfaces)
1742 # verify number of translated packet
1743 self.pg1.get_capture(pkts_num)
1745 def test_interface_addr(self):
1746 """ Acquire NAT44 addresses from interface """
1747 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1749 # no address in NAT pool
1750 adresses = self.vapi.nat44_address_dump()
1751 self.assertEqual(0, len(adresses))
1753 # configure interface address and check NAT address pool
1754 self.pg7.config_ip4()
1755 adresses = self.vapi.nat44_address_dump()
1756 self.assertEqual(1, len(adresses))
1757 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1759 # remove interface address and check NAT address pool
1760 self.pg7.unconfig_ip4()
1761 adresses = self.vapi.nat44_address_dump()
1762 self.assertEqual(0, len(adresses))
1764 def test_interface_addr_static_mapping(self):
1765 """ Static mapping with addresses from interface """
1766 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1767 self.nat44_add_static_mapping(
1769 external_sw_if_index=self.pg7.sw_if_index)
1771 # static mappings with external interface
1772 static_mappings = self.vapi.nat44_static_mapping_dump()
1773 self.assertEqual(1, len(static_mappings))
1774 self.assertEqual(self.pg7.sw_if_index,
1775 static_mappings[0].external_sw_if_index)
1777 # configure interface address and check static mappings
1778 self.pg7.config_ip4()
1779 static_mappings = self.vapi.nat44_static_mapping_dump()
1780 self.assertEqual(1, len(static_mappings))
1781 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1782 self.pg7.local_ip4n)
1783 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1785 # remove interface address and check static mappings
1786 self.pg7.unconfig_ip4()
1787 static_mappings = self.vapi.nat44_static_mapping_dump()
1788 self.assertEqual(0, len(static_mappings))
1790 def test_ipfix_nat44_sess(self):
1791 """ IPFIX logging NAT44 session created/delted """
1792 self.ipfix_domain_id = 10
1793 self.ipfix_src_port = 20202
1794 colector_port = 30303
1795 bind_layers(UDP, IPFIX, dport=30303)
1796 self.nat44_add_address(self.nat_addr)
1797 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1798 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1800 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1801 src_address=self.pg3.local_ip4n,
1803 template_interval=10,
1804 collector_port=colector_port)
1805 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1806 src_port=self.ipfix_src_port)
1808 pkts = self.create_stream_in(self.pg0, self.pg1)
1809 self.pg0.add_stream(pkts)
1810 self.pg_enable_capture(self.pg_interfaces)
1812 capture = self.pg1.get_capture(len(pkts))
1813 self.verify_capture_out(capture)
1814 self.nat44_add_address(self.nat_addr, is_add=0)
1815 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1816 capture = self.pg3.get_capture(3)
1817 ipfix = IPFIXDecoder()
1818 # first load template
1820 self.assertTrue(p.haslayer(IPFIX))
1821 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1822 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1823 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1824 self.assertEqual(p[UDP].dport, colector_port)
1825 self.assertEqual(p[IPFIX].observationDomainID,
1826 self.ipfix_domain_id)
1827 if p.haslayer(Template):
1828 ipfix.add_template(p.getlayer(Template))
1829 # verify events in data set
1831 if p.haslayer(Data):
1832 data = ipfix.decode_data_set(p.getlayer(Set))
1833 self.verify_ipfix_nat44_ses(data)
1835 def test_ipfix_addr_exhausted(self):
1836 """ IPFIX logging NAT addresses exhausted """
1837 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1838 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1840 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1841 src_address=self.pg3.local_ip4n,
1843 template_interval=10)
1844 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1845 src_port=self.ipfix_src_port)
1847 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1848 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1850 self.pg0.add_stream(p)
1851 self.pg_enable_capture(self.pg_interfaces)
1853 capture = self.pg1.get_capture(0)
1854 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1855 capture = self.pg3.get_capture(3)
1856 ipfix = IPFIXDecoder()
1857 # first load template
1859 self.assertTrue(p.haslayer(IPFIX))
1860 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1861 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1862 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1863 self.assertEqual(p[UDP].dport, 4739)
1864 self.assertEqual(p[IPFIX].observationDomainID,
1865 self.ipfix_domain_id)
1866 if p.haslayer(Template):
1867 ipfix.add_template(p.getlayer(Template))
1868 # verify events in data set
1870 if p.haslayer(Data):
1871 data = ipfix.decode_data_set(p.getlayer(Set))
1872 self.verify_ipfix_addr_exhausted(data)
1874 def test_pool_addr_fib(self):
1875 """ NAT44 add pool addresses to FIB """
1876 static_addr = '10.0.0.10'
1877 self.nat44_add_address(self.nat_addr)
1878 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1879 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1881 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1884 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1885 ARP(op=ARP.who_has, pdst=self.nat_addr,
1886 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1887 self.pg1.add_stream(p)
1888 self.pg_enable_capture(self.pg_interfaces)
1890 capture = self.pg1.get_capture(1)
1891 self.assertTrue(capture[0].haslayer(ARP))
1892 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1895 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1896 ARP(op=ARP.who_has, pdst=static_addr,
1897 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1898 self.pg1.add_stream(p)
1899 self.pg_enable_capture(self.pg_interfaces)
1901 capture = self.pg1.get_capture(1)
1902 self.assertTrue(capture[0].haslayer(ARP))
1903 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1905 # send ARP to non-NAT44 interface
1906 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1907 ARP(op=ARP.who_has, pdst=self.nat_addr,
1908 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1909 self.pg2.add_stream(p)
1910 self.pg_enable_capture(self.pg_interfaces)
1912 capture = self.pg1.get_capture(0)
1914 # remove addresses and verify
1915 self.nat44_add_address(self.nat_addr, is_add=0)
1916 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1919 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1920 ARP(op=ARP.who_has, pdst=self.nat_addr,
1921 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1922 self.pg1.add_stream(p)
1923 self.pg_enable_capture(self.pg_interfaces)
1925 capture = self.pg1.get_capture(0)
1927 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1928 ARP(op=ARP.who_has, pdst=static_addr,
1929 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1930 self.pg1.add_stream(p)
1931 self.pg_enable_capture(self.pg_interfaces)
1933 capture = self.pg1.get_capture(0)
1935 def test_vrf_mode(self):
1936 """ NAT44 tenant VRF aware address pool mode """
1940 nat_ip1 = "10.0.0.10"
1941 nat_ip2 = "10.0.0.11"
1943 self.pg0.unconfig_ip4()
1944 self.pg1.unconfig_ip4()
1945 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1946 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1947 self.pg0.set_table_ip4(vrf_id1)
1948 self.pg1.set_table_ip4(vrf_id2)
1949 self.pg0.config_ip4()
1950 self.pg1.config_ip4()
1952 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1953 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1954 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1955 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1956 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1960 pkts = self.create_stream_in(self.pg0, self.pg2)
1961 self.pg0.add_stream(pkts)
1962 self.pg_enable_capture(self.pg_interfaces)
1964 capture = self.pg2.get_capture(len(pkts))
1965 self.verify_capture_out(capture, nat_ip1)
1968 pkts = self.create_stream_in(self.pg1, self.pg2)
1969 self.pg1.add_stream(pkts)
1970 self.pg_enable_capture(self.pg_interfaces)
1972 capture = self.pg2.get_capture(len(pkts))
1973 self.verify_capture_out(capture, nat_ip2)
1975 self.pg0.unconfig_ip4()
1976 self.pg1.unconfig_ip4()
1977 self.pg0.set_table_ip4(0)
1978 self.pg1.set_table_ip4(0)
1979 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1980 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1982 def test_vrf_feature_independent(self):
1983 """ NAT44 tenant VRF independent address pool mode """
1985 nat_ip1 = "10.0.0.10"
1986 nat_ip2 = "10.0.0.11"
1988 self.nat44_add_address(nat_ip1)
1989 self.nat44_add_address(nat_ip2)
1990 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1991 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1992 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1996 pkts = self.create_stream_in(self.pg0, self.pg2)
1997 self.pg0.add_stream(pkts)
1998 self.pg_enable_capture(self.pg_interfaces)
2000 capture = self.pg2.get_capture(len(pkts))
2001 self.verify_capture_out(capture, nat_ip1)
2004 pkts = self.create_stream_in(self.pg1, self.pg2)
2005 self.pg1.add_stream(pkts)
2006 self.pg_enable_capture(self.pg_interfaces)
2008 capture = self.pg2.get_capture(len(pkts))
2009 self.verify_capture_out(capture, nat_ip1)
2011 def test_dynamic_ipless_interfaces(self):
2012 """ NAT44 interfaces without configured IP address """
2014 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2015 mactobinary(self.pg7.remote_mac),
2016 self.pg7.remote_ip4n,
2018 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2019 mactobinary(self.pg8.remote_mac),
2020 self.pg8.remote_ip4n,
2023 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2024 dst_address_length=32,
2025 next_hop_address=self.pg7.remote_ip4n,
2026 next_hop_sw_if_index=self.pg7.sw_if_index)
2027 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2028 dst_address_length=32,
2029 next_hop_address=self.pg8.remote_ip4n,
2030 next_hop_sw_if_index=self.pg8.sw_if_index)
2032 self.nat44_add_address(self.nat_addr)
2033 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2034 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2038 pkts = self.create_stream_in(self.pg7, self.pg8)
2039 self.pg7.add_stream(pkts)
2040 self.pg_enable_capture(self.pg_interfaces)
2042 capture = self.pg8.get_capture(len(pkts))
2043 self.verify_capture_out(capture)
2046 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2047 self.pg8.add_stream(pkts)
2048 self.pg_enable_capture(self.pg_interfaces)
2050 capture = self.pg7.get_capture(len(pkts))
2051 self.verify_capture_in(capture, self.pg7)
2053 def test_static_ipless_interfaces(self):
2054 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2056 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2057 mactobinary(self.pg7.remote_mac),
2058 self.pg7.remote_ip4n,
2060 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2061 mactobinary(self.pg8.remote_mac),
2062 self.pg8.remote_ip4n,
2065 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2066 dst_address_length=32,
2067 next_hop_address=self.pg7.remote_ip4n,
2068 next_hop_sw_if_index=self.pg7.sw_if_index)
2069 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2070 dst_address_length=32,
2071 next_hop_address=self.pg8.remote_ip4n,
2072 next_hop_sw_if_index=self.pg8.sw_if_index)
2074 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2075 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2076 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2080 pkts = self.create_stream_out(self.pg8)
2081 self.pg8.add_stream(pkts)
2082 self.pg_enable_capture(self.pg_interfaces)
2084 capture = self.pg7.get_capture(len(pkts))
2085 self.verify_capture_in(capture, self.pg7)
2088 pkts = self.create_stream_in(self.pg7, self.pg8)
2089 self.pg7.add_stream(pkts)
2090 self.pg_enable_capture(self.pg_interfaces)
2092 capture = self.pg8.get_capture(len(pkts))
2093 self.verify_capture_out(capture, self.nat_addr, True)
2095 def test_static_with_port_ipless_interfaces(self):
2096 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2098 self.tcp_port_out = 30606
2099 self.udp_port_out = 30607
2100 self.icmp_id_out = 30608
2102 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2103 mactobinary(self.pg7.remote_mac),
2104 self.pg7.remote_ip4n,
2106 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2107 mactobinary(self.pg8.remote_mac),
2108 self.pg8.remote_ip4n,
2111 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2112 dst_address_length=32,
2113 next_hop_address=self.pg7.remote_ip4n,
2114 next_hop_sw_if_index=self.pg7.sw_if_index)
2115 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2116 dst_address_length=32,
2117 next_hop_address=self.pg8.remote_ip4n,
2118 next_hop_sw_if_index=self.pg8.sw_if_index)
2120 self.nat44_add_address(self.nat_addr)
2121 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2122 self.tcp_port_in, self.tcp_port_out,
2123 proto=IP_PROTOS.tcp)
2124 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2125 self.udp_port_in, self.udp_port_out,
2126 proto=IP_PROTOS.udp)
2127 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2128 self.icmp_id_in, self.icmp_id_out,
2129 proto=IP_PROTOS.icmp)
2130 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2131 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2135 pkts = self.create_stream_out(self.pg8)
2136 self.pg8.add_stream(pkts)
2137 self.pg_enable_capture(self.pg_interfaces)
2139 capture = self.pg7.get_capture(len(pkts))
2140 self.verify_capture_in(capture, self.pg7)
2143 pkts = self.create_stream_in(self.pg7, self.pg8)
2144 self.pg7.add_stream(pkts)
2145 self.pg_enable_capture(self.pg_interfaces)
2147 capture = self.pg8.get_capture(len(pkts))
2148 self.verify_capture_out(capture)
2150 def test_static_unknown_proto(self):
2151 """ 1:1 NAT translate packet with unknown protocol """
2152 nat_ip = "10.0.0.10"
2153 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2154 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2155 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2159 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2160 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2162 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2163 TCP(sport=1234, dport=1234))
2164 self.pg0.add_stream(p)
2165 self.pg_enable_capture(self.pg_interfaces)
2167 p = self.pg1.get_capture(1)
2170 self.assertEqual(packet[IP].src, nat_ip)
2171 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2172 self.assertTrue(packet.haslayer(GRE))
2173 self.check_ip_checksum(packet)
2175 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2179 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2180 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2182 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2183 TCP(sport=1234, dport=1234))
2184 self.pg1.add_stream(p)
2185 self.pg_enable_capture(self.pg_interfaces)
2187 p = self.pg0.get_capture(1)
2190 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2191 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2192 self.assertTrue(packet.haslayer(GRE))
2193 self.check_ip_checksum(packet)
2195 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2198 def test_hairpinning_static_unknown_proto(self):
2199 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2201 host = self.pg0.remote_hosts[0]
2202 server = self.pg0.remote_hosts[1]
2204 host_nat_ip = "10.0.0.10"
2205 server_nat_ip = "10.0.0.11"
2207 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2208 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2209 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2210 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2214 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2215 IP(src=host.ip4, dst=server_nat_ip) /
2217 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2218 TCP(sport=1234, dport=1234))
2219 self.pg0.add_stream(p)
2220 self.pg_enable_capture(self.pg_interfaces)
2222 p = self.pg0.get_capture(1)
2225 self.assertEqual(packet[IP].src, host_nat_ip)
2226 self.assertEqual(packet[IP].dst, server.ip4)
2227 self.assertTrue(packet.haslayer(GRE))
2228 self.check_ip_checksum(packet)
2230 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2234 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2235 IP(src=server.ip4, dst=host_nat_ip) /
2237 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2238 TCP(sport=1234, dport=1234))
2239 self.pg0.add_stream(p)
2240 self.pg_enable_capture(self.pg_interfaces)
2242 p = self.pg0.get_capture(1)
2245 self.assertEqual(packet[IP].src, server_nat_ip)
2246 self.assertEqual(packet[IP].dst, host.ip4)
2247 self.assertTrue(packet.haslayer(GRE))
2248 self.check_ip_checksum(packet)
2250 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2253 def test_unknown_proto(self):
2254 """ NAT44 translate packet with unknown protocol """
2255 self.nat44_add_address(self.nat_addr)
2256 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2257 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2261 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2262 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2263 TCP(sport=self.tcp_port_in, dport=20))
2264 self.pg0.add_stream(p)
2265 self.pg_enable_capture(self.pg_interfaces)
2267 p = self.pg1.get_capture(1)
2269 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2272 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2273 TCP(sport=1234, dport=1234))
2274 self.pg0.add_stream(p)
2275 self.pg_enable_capture(self.pg_interfaces)
2277 p = self.pg1.get_capture(1)
2280 self.assertEqual(packet[IP].src, self.nat_addr)
2281 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2282 self.assertTrue(packet.haslayer(GRE))
2283 self.check_ip_checksum(packet)
2285 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2289 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2290 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2292 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2293 TCP(sport=1234, dport=1234))
2294 self.pg1.add_stream(p)
2295 self.pg_enable_capture(self.pg_interfaces)
2297 p = self.pg0.get_capture(1)
2300 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2301 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2302 self.assertTrue(packet.haslayer(GRE))
2303 self.check_ip_checksum(packet)
2305 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2308 def test_hairpinning_unknown_proto(self):
2309 """ NAT44 translate packet with unknown protocol - hairpinning """
2310 host = self.pg0.remote_hosts[0]
2311 server = self.pg0.remote_hosts[1]
2314 server_in_port = 5678
2315 server_out_port = 8765
2316 server_nat_ip = "10.0.0.11"
2318 self.nat44_add_address(self.nat_addr)
2319 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2320 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2323 # add static mapping for server
2324 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2327 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2328 IP(src=host.ip4, dst=server_nat_ip) /
2329 TCP(sport=host_in_port, dport=server_out_port))
2330 self.pg0.add_stream(p)
2331 self.pg_enable_capture(self.pg_interfaces)
2333 capture = self.pg0.get_capture(1)
2335 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2336 IP(src=host.ip4, dst=server_nat_ip) /
2338 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2339 TCP(sport=1234, dport=1234))
2340 self.pg0.add_stream(p)
2341 self.pg_enable_capture(self.pg_interfaces)
2343 p = self.pg0.get_capture(1)
2346 self.assertEqual(packet[IP].src, self.nat_addr)
2347 self.assertEqual(packet[IP].dst, server.ip4)
2348 self.assertTrue(packet.haslayer(GRE))
2349 self.check_ip_checksum(packet)
2351 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2355 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2356 IP(src=server.ip4, dst=self.nat_addr) /
2358 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2359 TCP(sport=1234, dport=1234))
2360 self.pg0.add_stream(p)
2361 self.pg_enable_capture(self.pg_interfaces)
2363 p = self.pg0.get_capture(1)
2366 self.assertEqual(packet[IP].src, server_nat_ip)
2367 self.assertEqual(packet[IP].dst, host.ip4)
2368 self.assertTrue(packet.haslayer(GRE))
2369 self.check_ip_checksum(packet)
2371 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2374 def test_output_feature(self):
2375 """ NAT44 interface output feature (in2out postrouting) """
2376 self.nat44_add_address(self.nat_addr)
2377 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2378 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2379 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2383 pkts = self.create_stream_in(self.pg0, self.pg3)
2384 self.pg0.add_stream(pkts)
2385 self.pg_enable_capture(self.pg_interfaces)
2387 capture = self.pg3.get_capture(len(pkts))
2388 self.verify_capture_out(capture)
2391 pkts = self.create_stream_out(self.pg3)
2392 self.pg3.add_stream(pkts)
2393 self.pg_enable_capture(self.pg_interfaces)
2395 capture = self.pg0.get_capture(len(pkts))
2396 self.verify_capture_in(capture, self.pg0)
2398 # from non-NAT interface to NAT inside interface
2399 pkts = self.create_stream_in(self.pg2, self.pg0)
2400 self.pg2.add_stream(pkts)
2401 self.pg_enable_capture(self.pg_interfaces)
2403 capture = self.pg0.get_capture(len(pkts))
2404 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2406 def test_output_feature_vrf_aware(self):
2407 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2408 nat_ip_vrf10 = "10.0.0.10"
2409 nat_ip_vrf20 = "10.0.0.20"
2411 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2412 dst_address_length=32,
2413 next_hop_address=self.pg3.remote_ip4n,
2414 next_hop_sw_if_index=self.pg3.sw_if_index,
2416 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2417 dst_address_length=32,
2418 next_hop_address=self.pg3.remote_ip4n,
2419 next_hop_sw_if_index=self.pg3.sw_if_index,
2422 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2423 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2424 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2425 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2426 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2430 pkts = self.create_stream_in(self.pg4, self.pg3)
2431 self.pg4.add_stream(pkts)
2432 self.pg_enable_capture(self.pg_interfaces)
2434 capture = self.pg3.get_capture(len(pkts))
2435 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2438 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2439 self.pg3.add_stream(pkts)
2440 self.pg_enable_capture(self.pg_interfaces)
2442 capture = self.pg4.get_capture(len(pkts))
2443 self.verify_capture_in(capture, self.pg4)
2446 pkts = self.create_stream_in(self.pg6, self.pg3)
2447 self.pg6.add_stream(pkts)
2448 self.pg_enable_capture(self.pg_interfaces)
2450 capture = self.pg3.get_capture(len(pkts))
2451 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2454 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2455 self.pg3.add_stream(pkts)
2456 self.pg_enable_capture(self.pg_interfaces)
2458 capture = self.pg6.get_capture(len(pkts))
2459 self.verify_capture_in(capture, self.pg6)
2461 def test_output_feature_hairpinning(self):
2462 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2463 host = self.pg0.remote_hosts[0]
2464 server = self.pg0.remote_hosts[1]
2467 server_in_port = 5678
2468 server_out_port = 8765
2470 self.nat44_add_address(self.nat_addr)
2471 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2472 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2475 # add static mapping for server
2476 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2477 server_in_port, server_out_port,
2478 proto=IP_PROTOS.tcp)
2480 # send packet from host to server
2481 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2482 IP(src=host.ip4, dst=self.nat_addr) /
2483 TCP(sport=host_in_port, dport=server_out_port))
2484 self.pg0.add_stream(p)
2485 self.pg_enable_capture(self.pg_interfaces)
2487 capture = self.pg0.get_capture(1)
2492 self.assertEqual(ip.src, self.nat_addr)
2493 self.assertEqual(ip.dst, server.ip4)
2494 self.assertNotEqual(tcp.sport, host_in_port)
2495 self.assertEqual(tcp.dport, server_in_port)
2496 self.check_tcp_checksum(p)
2497 host_out_port = tcp.sport
2499 self.logger.error(ppp("Unexpected or invalid packet:", p))
2502 # send reply from server to host
2503 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2504 IP(src=server.ip4, dst=self.nat_addr) /
2505 TCP(sport=server_in_port, dport=host_out_port))
2506 self.pg0.add_stream(p)
2507 self.pg_enable_capture(self.pg_interfaces)
2509 capture = self.pg0.get_capture(1)
2514 self.assertEqual(ip.src, self.nat_addr)
2515 self.assertEqual(ip.dst, host.ip4)
2516 self.assertEqual(tcp.sport, server_out_port)
2517 self.assertEqual(tcp.dport, host_in_port)
2518 self.check_tcp_checksum(p)
2520 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2523 def test_one_armed_nat44(self):
2524 """ One armed NAT44 """
2525 remote_host = self.pg9.remote_hosts[0]
2526 local_host = self.pg9.remote_hosts[1]
2529 self.nat44_add_address(self.nat_addr)
2530 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2531 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2535 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2536 IP(src=local_host.ip4, dst=remote_host.ip4) /
2537 TCP(sport=12345, dport=80))
2538 self.pg9.add_stream(p)
2539 self.pg_enable_capture(self.pg_interfaces)
2541 capture = self.pg9.get_capture(1)
2546 self.assertEqual(ip.src, self.nat_addr)
2547 self.assertEqual(ip.dst, remote_host.ip4)
2548 self.assertNotEqual(tcp.sport, 12345)
2549 external_port = tcp.sport
2550 self.assertEqual(tcp.dport, 80)
2551 self.check_tcp_checksum(p)
2552 self.check_ip_checksum(p)
2554 self.logger.error(ppp("Unexpected or invalid packet:", p))
2558 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2559 IP(src=remote_host.ip4, dst=self.nat_addr) /
2560 TCP(sport=80, dport=external_port))
2561 self.pg9.add_stream(p)
2562 self.pg_enable_capture(self.pg_interfaces)
2564 capture = self.pg9.get_capture(1)
2569 self.assertEqual(ip.src, remote_host.ip4)
2570 self.assertEqual(ip.dst, local_host.ip4)
2571 self.assertEqual(tcp.sport, 80)
2572 self.assertEqual(tcp.dport, 12345)
2573 self.check_tcp_checksum(p)
2574 self.check_ip_checksum(p)
2576 self.logger.error(ppp("Unexpected or invalid packet:", p))
2579 def test_del_session(self):
2580 """ Delete NAT44 session """
2581 self.nat44_add_address(self.nat_addr)
2582 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2583 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2586 pkts = self.create_stream_in(self.pg0, self.pg1)
2587 self.pg0.add_stream(pkts)
2588 self.pg_enable_capture(self.pg_interfaces)
2590 capture = self.pg1.get_capture(len(pkts))
2592 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2593 nsessions = len(sessions)
2595 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2596 sessions[0].inside_port,
2597 sessions[0].protocol)
2598 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2599 sessions[1].outside_port,
2600 sessions[1].protocol,
2603 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2604 self.assertEqual(nsessions - len(sessions), 2)
2606 def test_set_get_reass(self):
2607 """ NAT44 set/get virtual fragmentation reassembly """
2608 reas_cfg1 = self.vapi.nat_get_reass()
2610 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2611 max_reass=reas_cfg1.ip4_max_reass * 2,
2612 max_frag=reas_cfg1.ip4_max_frag * 2)
2614 reas_cfg2 = self.vapi.nat_get_reass()
2616 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2617 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2618 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2620 self.vapi.nat_set_reass(drop_frag=1)
2621 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2623 def test_frag_in_order(self):
2624 """ NAT44 translate fragments arriving in order """
2625 self.nat44_add_address(self.nat_addr)
2626 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2627 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2630 data = "A" * 4 + "B" * 16 + "C" * 3
2631 self.tcp_port_in = random.randint(1025, 65535)
2633 reass = self.vapi.nat_reass_dump()
2634 reass_n_start = len(reass)
2637 pkts = self.create_stream_frag(self.pg0,
2638 self.pg1.remote_ip4,
2642 self.pg0.add_stream(pkts)
2643 self.pg_enable_capture(self.pg_interfaces)
2645 frags = self.pg1.get_capture(len(pkts))
2646 p = self.reass_frags_and_verify(frags,
2648 self.pg1.remote_ip4)
2649 self.assertEqual(p[TCP].dport, 20)
2650 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2651 self.tcp_port_out = p[TCP].sport
2652 self.assertEqual(data, p[Raw].load)
2655 pkts = self.create_stream_frag(self.pg1,
2660 self.pg1.add_stream(pkts)
2661 self.pg_enable_capture(self.pg_interfaces)
2663 frags = self.pg0.get_capture(len(pkts))
2664 p = self.reass_frags_and_verify(frags,
2665 self.pg1.remote_ip4,
2666 self.pg0.remote_ip4)
2667 self.assertEqual(p[TCP].sport, 20)
2668 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2669 self.assertEqual(data, p[Raw].load)
2671 reass = self.vapi.nat_reass_dump()
2672 reass_n_end = len(reass)
2674 self.assertEqual(reass_n_end - reass_n_start, 2)
2676 def test_reass_hairpinning(self):
2677 """ NAT44 fragments hairpinning """
2678 host = self.pg0.remote_hosts[0]
2679 server = self.pg0.remote_hosts[1]
2680 host_in_port = random.randint(1025, 65535)
2682 server_in_port = random.randint(1025, 65535)
2683 server_out_port = random.randint(1025, 65535)
2684 data = "A" * 4 + "B" * 16 + "C" * 3
2686 self.nat44_add_address(self.nat_addr)
2687 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2688 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2690 # add static mapping for server
2691 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2692 server_in_port, server_out_port,
2693 proto=IP_PROTOS.tcp)
2695 # send packet from host to server
2696 pkts = self.create_stream_frag(self.pg0,
2701 self.pg0.add_stream(pkts)
2702 self.pg_enable_capture(self.pg_interfaces)
2704 frags = self.pg0.get_capture(len(pkts))
2705 p = self.reass_frags_and_verify(frags,
2708 self.assertNotEqual(p[TCP].sport, host_in_port)
2709 self.assertEqual(p[TCP].dport, server_in_port)
2710 self.assertEqual(data, p[Raw].load)
2712 def test_frag_out_of_order(self):
2713 """ NAT44 translate fragments arriving out of order """
2714 self.nat44_add_address(self.nat_addr)
2715 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2716 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2719 data = "A" * 4 + "B" * 16 + "C" * 3
2720 random.randint(1025, 65535)
2723 pkts = self.create_stream_frag(self.pg0,
2724 self.pg1.remote_ip4,
2729 self.pg0.add_stream(pkts)
2730 self.pg_enable_capture(self.pg_interfaces)
2732 frags = self.pg1.get_capture(len(pkts))
2733 p = self.reass_frags_and_verify(frags,
2735 self.pg1.remote_ip4)
2736 self.assertEqual(p[TCP].dport, 20)
2737 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2738 self.tcp_port_out = p[TCP].sport
2739 self.assertEqual(data, p[Raw].load)
2742 pkts = self.create_stream_frag(self.pg1,
2748 self.pg1.add_stream(pkts)
2749 self.pg_enable_capture(self.pg_interfaces)
2751 frags = self.pg0.get_capture(len(pkts))
2752 p = self.reass_frags_and_verify(frags,
2753 self.pg1.remote_ip4,
2754 self.pg0.remote_ip4)
2755 self.assertEqual(p[TCP].sport, 20)
2756 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2757 self.assertEqual(data, p[Raw].load)
2759 def test_port_restricted(self):
2760 """ Port restricted NAT44 (MAP-E CE) """
2761 self.nat44_add_address(self.nat_addr)
2762 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2763 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2765 self.vapi.cli("nat44 addr-port-assignment-alg map-e psid 10 "
2766 "psid-offset 6 psid-len 6")
2768 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2769 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2770 TCP(sport=4567, dport=22))
2771 self.pg0.add_stream(p)
2772 self.pg_enable_capture(self.pg_interfaces)
2774 capture = self.pg1.get_capture(1)
2779 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2780 self.assertEqual(ip.src, self.nat_addr)
2781 self.assertEqual(tcp.dport, 22)
2782 self.assertNotEqual(tcp.sport, 4567)
2783 self.assertEqual((tcp.sport >> 6) & 63, 10)
2784 self.check_tcp_checksum(p)
2785 self.check_ip_checksum(p)
2787 self.logger.error(ppp("Unexpected or invalid packet:", p))
2791 super(TestNAT44, self).tearDown()
2792 if not self.vpp_dead:
2793 self.logger.info(self.vapi.cli("show nat44 verbose"))
2794 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
2795 self.vapi.cli("nat44 addr-port-assignment-alg default")
2799 class TestDeterministicNAT(MethodHolder):
2800 """ Deterministic NAT Test Cases """
2803 def setUpConstants(cls):
2804 super(TestDeterministicNAT, cls).setUpConstants()
2805 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2808 def setUpClass(cls):
2809 super(TestDeterministicNAT, cls).setUpClass()
2812 cls.tcp_port_in = 6303
2813 cls.tcp_external_port = 6303
2814 cls.udp_port_in = 6304
2815 cls.udp_external_port = 6304
2816 cls.icmp_id_in = 6305
2817 cls.nat_addr = '10.0.0.3'
2819 cls.create_pg_interfaces(range(3))
2820 cls.interfaces = list(cls.pg_interfaces)
2822 for i in cls.interfaces:
2827 cls.pg0.generate_remote_hosts(2)
2828 cls.pg0.configure_ipv4_neighbors()
2831 super(TestDeterministicNAT, cls).tearDownClass()
2834 def create_stream_in(self, in_if, out_if, ttl=64):
2836 Create packet stream for inside network
2838 :param in_if: Inside interface
2839 :param out_if: Outside interface
2840 :param ttl: TTL of generated packets
2844 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2845 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2846 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2850 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2851 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2852 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2856 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2857 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2858 ICMP(id=self.icmp_id_in, type='echo-request'))
2863 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2865 Create packet stream for outside network
2867 :param out_if: Outside interface
2868 :param dst_ip: Destination IP address (Default use global NAT address)
2869 :param ttl: TTL of generated packets
2872 dst_ip = self.nat_addr
2875 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2876 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2877 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2881 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2882 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2883 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2887 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2888 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2889 ICMP(id=self.icmp_external_id, type='echo-reply'))
2894 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2896 Verify captured packets on outside network
2898 :param capture: Captured packets
2899 :param nat_ip: Translated IP address (Default use global NAT address)
2900 :param same_port: Sorce port number is not translated (Default False)
2901 :param packet_num: Expected number of packets (Default 3)
2904 nat_ip = self.nat_addr
2905 self.assertEqual(packet_num, len(capture))
2906 for packet in capture:
2908 self.assertEqual(packet[IP].src, nat_ip)
2909 if packet.haslayer(TCP):
2910 self.tcp_port_out = packet[TCP].sport
2911 elif packet.haslayer(UDP):
2912 self.udp_port_out = packet[UDP].sport
2914 self.icmp_external_id = packet[ICMP].id
2916 self.logger.error(ppp("Unexpected or invalid packet "
2917 "(outside network):", packet))
2920 def initiate_tcp_session(self, in_if, out_if):
2922 Initiates TCP session
2924 :param in_if: Inside interface
2925 :param out_if: Outside interface
2928 # SYN packet in->out
2929 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2930 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2931 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2934 self.pg_enable_capture(self.pg_interfaces)
2936 capture = out_if.get_capture(1)
2938 self.tcp_port_out = p[TCP].sport
2940 # SYN + ACK packet out->in
2941 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2942 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2943 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2945 out_if.add_stream(p)
2946 self.pg_enable_capture(self.pg_interfaces)
2948 in_if.get_capture(1)
2950 # ACK packet in->out
2951 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2952 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2953 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2956 self.pg_enable_capture(self.pg_interfaces)
2958 out_if.get_capture(1)
2961 self.logger.error("TCP 3 way handshake failed")
2964 def verify_ipfix_max_entries_per_user(self, data):
2966 Verify IPFIX maximum entries per user exceeded event
2968 :param data: Decoded IPFIX data records
2970 self.assertEqual(1, len(data))
2973 self.assertEqual(ord(record[230]), 13)
2974 # natQuotaExceededEvent
2975 self.assertEqual('\x03\x00\x00\x00', record[466])
2977 self.assertEqual(self.pg0.remote_ip4n, record[8])
2979 def test_deterministic_mode(self):
2980 """ NAT plugin run deterministic mode """
2981 in_addr = '172.16.255.0'
2982 out_addr = '172.17.255.50'
2983 in_addr_t = '172.16.255.20'
2984 in_addr_n = socket.inet_aton(in_addr)
2985 out_addr_n = socket.inet_aton(out_addr)
2986 in_addr_t_n = socket.inet_aton(in_addr_t)
2990 nat_config = self.vapi.nat_show_config()
2991 self.assertEqual(1, nat_config.deterministic)
2993 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2995 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2996 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2997 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2998 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3000 deterministic_mappings = self.vapi.nat_det_map_dump()
3001 self.assertEqual(len(deterministic_mappings), 1)
3002 dsm = deterministic_mappings[0]
3003 self.assertEqual(in_addr_n, dsm.in_addr[:4])
3004 self.assertEqual(in_plen, dsm.in_plen)
3005 self.assertEqual(out_addr_n, dsm.out_addr[:4])
3006 self.assertEqual(out_plen, dsm.out_plen)
3008 self.clear_nat_det()
3009 deterministic_mappings = self.vapi.nat_det_map_dump()
3010 self.assertEqual(len(deterministic_mappings), 0)
3012 def test_set_timeouts(self):
3013 """ Set deterministic NAT timeouts """
3014 timeouts_before = self.vapi.nat_det_get_timeouts()
3016 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3017 timeouts_before.tcp_established + 10,
3018 timeouts_before.tcp_transitory + 10,
3019 timeouts_before.icmp + 10)
3021 timeouts_after = self.vapi.nat_det_get_timeouts()
3023 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3024 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3025 self.assertNotEqual(timeouts_before.tcp_established,
3026 timeouts_after.tcp_established)
3027 self.assertNotEqual(timeouts_before.tcp_transitory,
3028 timeouts_after.tcp_transitory)
3030 def test_det_in(self):
3031 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3033 nat_ip = "10.0.0.10"
3035 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3037 socket.inet_aton(nat_ip),
3039 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3040 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3044 pkts = self.create_stream_in(self.pg0, self.pg1)
3045 self.pg0.add_stream(pkts)
3046 self.pg_enable_capture(self.pg_interfaces)
3048 capture = self.pg1.get_capture(len(pkts))
3049 self.verify_capture_out(capture, nat_ip)
3052 pkts = self.create_stream_out(self.pg1, nat_ip)
3053 self.pg1.add_stream(pkts)
3054 self.pg_enable_capture(self.pg_interfaces)
3056 capture = self.pg0.get_capture(len(pkts))
3057 self.verify_capture_in(capture, self.pg0)
3060 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3061 self.assertEqual(len(sessions), 3)
3065 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3066 self.assertEqual(s.in_port, self.tcp_port_in)
3067 self.assertEqual(s.out_port, self.tcp_port_out)
3068 self.assertEqual(s.ext_port, self.tcp_external_port)
3072 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3073 self.assertEqual(s.in_port, self.udp_port_in)
3074 self.assertEqual(s.out_port, self.udp_port_out)
3075 self.assertEqual(s.ext_port, self.udp_external_port)
3079 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3080 self.assertEqual(s.in_port, self.icmp_id_in)
3081 self.assertEqual(s.out_port, self.icmp_external_id)
3083 def test_multiple_users(self):
3084 """ Deterministic NAT multiple users """
3086 nat_ip = "10.0.0.10"
3088 external_port = 6303
3090 host0 = self.pg0.remote_hosts[0]
3091 host1 = self.pg0.remote_hosts[1]
3093 self.vapi.nat_det_add_del_map(host0.ip4n,
3095 socket.inet_aton(nat_ip),
3097 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3098 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3102 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3103 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3104 TCP(sport=port_in, dport=external_port))
3105 self.pg0.add_stream(p)
3106 self.pg_enable_capture(self.pg_interfaces)
3108 capture = self.pg1.get_capture(1)
3113 self.assertEqual(ip.src, nat_ip)
3114 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3115 self.assertEqual(tcp.dport, external_port)
3116 port_out0 = tcp.sport
3118 self.logger.error(ppp("Unexpected or invalid packet:", p))
3122 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3123 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3124 TCP(sport=port_in, dport=external_port))
3125 self.pg0.add_stream(p)
3126 self.pg_enable_capture(self.pg_interfaces)
3128 capture = self.pg1.get_capture(1)
3133 self.assertEqual(ip.src, nat_ip)
3134 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3135 self.assertEqual(tcp.dport, external_port)
3136 port_out1 = tcp.sport
3138 self.logger.error(ppp("Unexpected or invalid packet:", p))
3141 dms = self.vapi.nat_det_map_dump()
3142 self.assertEqual(1, len(dms))
3143 self.assertEqual(2, dms[0].ses_num)
3146 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3147 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3148 TCP(sport=external_port, dport=port_out0))
3149 self.pg1.add_stream(p)
3150 self.pg_enable_capture(self.pg_interfaces)
3152 capture = self.pg0.get_capture(1)
3157 self.assertEqual(ip.src, self.pg1.remote_ip4)
3158 self.assertEqual(ip.dst, host0.ip4)
3159 self.assertEqual(tcp.dport, port_in)
3160 self.assertEqual(tcp.sport, external_port)
3162 self.logger.error(ppp("Unexpected or invalid packet:", p))
3166 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3167 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3168 TCP(sport=external_port, dport=port_out1))
3169 self.pg1.add_stream(p)
3170 self.pg_enable_capture(self.pg_interfaces)
3172 capture = self.pg0.get_capture(1)
3177 self.assertEqual(ip.src, self.pg1.remote_ip4)
3178 self.assertEqual(ip.dst, host1.ip4)
3179 self.assertEqual(tcp.dport, port_in)
3180 self.assertEqual(tcp.sport, external_port)
3182 self.logger.error(ppp("Unexpected or invalid packet", p))
3185 # session close api test
3186 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3188 self.pg1.remote_ip4n,
3190 dms = self.vapi.nat_det_map_dump()
3191 self.assertEqual(dms[0].ses_num, 1)
3193 self.vapi.nat_det_close_session_in(host0.ip4n,
3195 self.pg1.remote_ip4n,
3197 dms = self.vapi.nat_det_map_dump()
3198 self.assertEqual(dms[0].ses_num, 0)
3200 def test_tcp_session_close_detection_in(self):
3201 """ Deterministic NAT TCP session close from inside network """
3202 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3204 socket.inet_aton(self.nat_addr),
3206 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3207 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3210 self.initiate_tcp_session(self.pg0, self.pg1)
3212 # close the session from inside
3214 # FIN packet in -> out
3215 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3216 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3217 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3219 self.pg0.add_stream(p)
3220 self.pg_enable_capture(self.pg_interfaces)
3222 self.pg1.get_capture(1)
3226 # ACK packet out -> in
3227 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3228 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3229 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3233 # FIN packet out -> in
3234 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3235 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3236 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3240 self.pg1.add_stream(pkts)
3241 self.pg_enable_capture(self.pg_interfaces)
3243 self.pg0.get_capture(2)
3245 # ACK packet in -> out
3246 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3247 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3248 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3250 self.pg0.add_stream(p)
3251 self.pg_enable_capture(self.pg_interfaces)
3253 self.pg1.get_capture(1)
3255 # Check if deterministic NAT44 closed the session
3256 dms = self.vapi.nat_det_map_dump()
3257 self.assertEqual(0, dms[0].ses_num)
3259 self.logger.error("TCP session termination failed")
3262 def test_tcp_session_close_detection_out(self):
3263 """ Deterministic NAT TCP session close from outside network """
3264 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3266 socket.inet_aton(self.nat_addr),
3268 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3269 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3272 self.initiate_tcp_session(self.pg0, self.pg1)
3274 # close the session from outside
3276 # FIN packet out -> in
3277 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3278 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3279 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3281 self.pg1.add_stream(p)
3282 self.pg_enable_capture(self.pg_interfaces)
3284 self.pg0.get_capture(1)
3288 # ACK packet in -> out
3289 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3290 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3291 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3295 # ACK packet in -> out
3296 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3297 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3298 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3302 self.pg0.add_stream(pkts)
3303 self.pg_enable_capture(self.pg_interfaces)
3305 self.pg1.get_capture(2)
3307 # ACK packet out -> in
3308 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3309 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3310 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3312 self.pg1.add_stream(p)
3313 self.pg_enable_capture(self.pg_interfaces)
3315 self.pg0.get_capture(1)
3317 # Check if deterministic NAT44 closed the session
3318 dms = self.vapi.nat_det_map_dump()
3319 self.assertEqual(0, dms[0].ses_num)
3321 self.logger.error("TCP session termination failed")
3324 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3325 def test_session_timeout(self):
3326 """ Deterministic NAT session timeouts """
3327 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3329 socket.inet_aton(self.nat_addr),
3331 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3332 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3335 self.initiate_tcp_session(self.pg0, self.pg1)
3336 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3337 pkts = self.create_stream_in(self.pg0, self.pg1)
3338 self.pg0.add_stream(pkts)
3339 self.pg_enable_capture(self.pg_interfaces)
3341 capture = self.pg1.get_capture(len(pkts))
3344 dms = self.vapi.nat_det_map_dump()
3345 self.assertEqual(0, dms[0].ses_num)
3347 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3348 def test_session_limit_per_user(self):
3349 """ Deterministic NAT maximum sessions per user limit """
3350 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3352 socket.inet_aton(self.nat_addr),
3354 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3355 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3357 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3358 src_address=self.pg2.local_ip4n,
3360 template_interval=10)
3361 self.vapi.nat_ipfix()
3364 for port in range(1025, 2025):
3365 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3366 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3367 UDP(sport=port, dport=port))
3370 self.pg0.add_stream(pkts)
3371 self.pg_enable_capture(self.pg_interfaces)
3373 capture = self.pg1.get_capture(len(pkts))
3375 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3376 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3377 UDP(sport=3001, dport=3002))
3378 self.pg0.add_stream(p)
3379 self.pg_enable_capture(self.pg_interfaces)
3381 capture = self.pg1.assert_nothing_captured()
3383 # verify ICMP error packet
3384 capture = self.pg0.get_capture(1)
3386 self.assertTrue(p.haslayer(ICMP))
3388 self.assertEqual(icmp.type, 3)
3389 self.assertEqual(icmp.code, 1)
3390 self.assertTrue(icmp.haslayer(IPerror))
3391 inner_ip = icmp[IPerror]
3392 self.assertEqual(inner_ip[UDPerror].sport, 3001)
3393 self.assertEqual(inner_ip[UDPerror].dport, 3002)
3395 dms = self.vapi.nat_det_map_dump()
3397 self.assertEqual(1000, dms[0].ses_num)
3399 # verify IPFIX logging
3400 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3402 capture = self.pg2.get_capture(2)
3403 ipfix = IPFIXDecoder()
3404 # first load template
3406 self.assertTrue(p.haslayer(IPFIX))
3407 if p.haslayer(Template):
3408 ipfix.add_template(p.getlayer(Template))
3409 # verify events in data set
3411 if p.haslayer(Data):
3412 data = ipfix.decode_data_set(p.getlayer(Set))
3413 self.verify_ipfix_max_entries_per_user(data)
3415 def clear_nat_det(self):
3417 Clear deterministic NAT configuration.
3419 self.vapi.nat_ipfix(enable=0)
3420 self.vapi.nat_det_set_timeouts()
3421 deterministic_mappings = self.vapi.nat_det_map_dump()
3422 for dsm in deterministic_mappings:
3423 self.vapi.nat_det_add_del_map(dsm.in_addr,
3429 interfaces = self.vapi.nat44_interface_dump()
3430 for intf in interfaces:
3431 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3436 super(TestDeterministicNAT, self).tearDown()
3437 if not self.vpp_dead:
3438 self.logger.info(self.vapi.cli("show nat44 detail"))
3439 self.clear_nat_det()
3442 class TestNAT64(MethodHolder):
3443 """ NAT64 Test Cases """
3446 def setUpClass(cls):
3447 super(TestNAT64, cls).setUpClass()
3450 cls.tcp_port_in = 6303
3451 cls.tcp_port_out = 6303
3452 cls.udp_port_in = 6304
3453 cls.udp_port_out = 6304
3454 cls.icmp_id_in = 6305
3455 cls.icmp_id_out = 6305
3456 cls.nat_addr = '10.0.0.3'
3457 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3459 cls.vrf1_nat_addr = '10.0.10.3'
3460 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3463 cls.create_pg_interfaces(range(5))
3464 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3465 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3466 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3468 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3470 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3472 cls.pg0.generate_remote_hosts(2)
3474 for i in cls.ip6_interfaces:
3477 i.configure_ipv6_neighbors()
3479 for i in cls.ip4_interfaces:
3485 cls.pg3.config_ip4()
3486 cls.pg3.resolve_arp()
3487 cls.pg3.config_ip6()
3488 cls.pg3.configure_ipv6_neighbors()
3491 super(TestNAT64, cls).tearDownClass()
3494 def test_pool(self):
3495 """ Add/delete address to NAT64 pool """
3496 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3498 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3500 addresses = self.vapi.nat64_pool_addr_dump()
3501 self.assertEqual(len(addresses), 1)
3502 self.assertEqual(addresses[0].address, nat_addr)
3504 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3506 addresses = self.vapi.nat64_pool_addr_dump()
3507 self.assertEqual(len(addresses), 0)
3509 def test_interface(self):
3510 """ Enable/disable NAT64 feature on the interface """
3511 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3512 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3514 interfaces = self.vapi.nat64_interface_dump()
3515 self.assertEqual(len(interfaces), 2)
3518 for intf in interfaces:
3519 if intf.sw_if_index == self.pg0.sw_if_index:
3520 self.assertEqual(intf.is_inside, 1)
3522 elif intf.sw_if_index == self.pg1.sw_if_index:
3523 self.assertEqual(intf.is_inside, 0)
3525 self.assertTrue(pg0_found)
3526 self.assertTrue(pg1_found)
3528 features = self.vapi.cli("show interface features pg0")
3529 self.assertNotEqual(features.find('nat64-in2out'), -1)
3530 features = self.vapi.cli("show interface features pg1")
3531 self.assertNotEqual(features.find('nat64-out2in'), -1)
3533 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3534 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3536 interfaces = self.vapi.nat64_interface_dump()
3537 self.assertEqual(len(interfaces), 0)
3539 def test_static_bib(self):
3540 """ Add/delete static BIB entry """
3541 in_addr = socket.inet_pton(socket.AF_INET6,
3542 '2001:db8:85a3::8a2e:370:7334')
3543 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3546 proto = IP_PROTOS.tcp
3548 self.vapi.nat64_add_del_static_bib(in_addr,
3553 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3558 self.assertEqual(bibe.i_addr, in_addr)
3559 self.assertEqual(bibe.o_addr, out_addr)
3560 self.assertEqual(bibe.i_port, in_port)
3561 self.assertEqual(bibe.o_port, out_port)
3562 self.assertEqual(static_bib_num, 1)
3564 self.vapi.nat64_add_del_static_bib(in_addr,
3570 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3575 self.assertEqual(static_bib_num, 0)
3577 def test_set_timeouts(self):
3578 """ Set NAT64 timeouts """
3579 # verify default values
3580 timeouts = self.vapi.nat64_get_timeouts()
3581 self.assertEqual(timeouts.udp, 300)
3582 self.assertEqual(timeouts.icmp, 60)
3583 self.assertEqual(timeouts.tcp_trans, 240)
3584 self.assertEqual(timeouts.tcp_est, 7440)
3585 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3587 # set and verify custom values
3588 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3589 tcp_est=7450, tcp_incoming_syn=10)
3590 timeouts = self.vapi.nat64_get_timeouts()
3591 self.assertEqual(timeouts.udp, 200)
3592 self.assertEqual(timeouts.icmp, 30)
3593 self.assertEqual(timeouts.tcp_trans, 250)
3594 self.assertEqual(timeouts.tcp_est, 7450)
3595 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3597 def test_dynamic(self):
3598 """ NAT64 dynamic translation test """
3599 self.tcp_port_in = 6303
3600 self.udp_port_in = 6304
3601 self.icmp_id_in = 6305
3603 ses_num_start = self.nat64_get_ses_num()
3605 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3607 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3608 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3611 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3612 self.pg0.add_stream(pkts)
3613 self.pg_enable_capture(self.pg_interfaces)
3615 capture = self.pg1.get_capture(len(pkts))
3616 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3617 dst_ip=self.pg1.remote_ip4)
3620 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3621 self.pg1.add_stream(pkts)
3622 self.pg_enable_capture(self.pg_interfaces)
3624 capture = self.pg0.get_capture(len(pkts))
3625 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3626 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3629 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3630 self.pg0.add_stream(pkts)
3631 self.pg_enable_capture(self.pg_interfaces)
3633 capture = self.pg1.get_capture(len(pkts))
3634 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3635 dst_ip=self.pg1.remote_ip4)
3638 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3639 self.pg1.add_stream(pkts)
3640 self.pg_enable_capture(self.pg_interfaces)
3642 capture = self.pg0.get_capture(len(pkts))
3643 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3645 ses_num_end = self.nat64_get_ses_num()
3647 self.assertEqual(ses_num_end - ses_num_start, 3)
3649 # tenant with specific VRF
3650 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3651 self.vrf1_nat_addr_n,
3652 vrf_id=self.vrf1_id)
3653 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3655 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3656 self.pg2.add_stream(pkts)
3657 self.pg_enable_capture(self.pg_interfaces)
3659 capture = self.pg1.get_capture(len(pkts))
3660 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3661 dst_ip=self.pg1.remote_ip4)
3663 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3664 self.pg1.add_stream(pkts)
3665 self.pg_enable_capture(self.pg_interfaces)
3667 capture = self.pg2.get_capture(len(pkts))
3668 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3670 def test_static(self):
3671 """ NAT64 static translation test """
3672 self.tcp_port_in = 60303
3673 self.udp_port_in = 60304
3674 self.icmp_id_in = 60305
3675 self.tcp_port_out = 60303
3676 self.udp_port_out = 60304
3677 self.icmp_id_out = 60305
3679 ses_num_start = self.nat64_get_ses_num()
3681 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3683 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3684 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3686 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3691 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3696 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3703 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3704 self.pg0.add_stream(pkts)
3705 self.pg_enable_capture(self.pg_interfaces)
3707 capture = self.pg1.get_capture(len(pkts))
3708 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3709 dst_ip=self.pg1.remote_ip4, same_port=True)
3712 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3713 self.pg1.add_stream(pkts)
3714 self.pg_enable_capture(self.pg_interfaces)
3716 capture = self.pg0.get_capture(len(pkts))
3717 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3718 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3720 ses_num_end = self.nat64_get_ses_num()
3722 self.assertEqual(ses_num_end - ses_num_start, 3)
3724 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3725 def test_session_timeout(self):
3726 """ NAT64 session timeout """
3727 self.icmp_id_in = 1234
3728 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3730 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3731 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3732 self.vapi.nat64_set_timeouts(icmp=5)
3734 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3735 self.pg0.add_stream(pkts)
3736 self.pg_enable_capture(self.pg_interfaces)
3738 capture = self.pg1.get_capture(len(pkts))
3740 ses_num_before_timeout = self.nat64_get_ses_num()
3744 # ICMP session after timeout
3745 ses_num_after_timeout = self.nat64_get_ses_num()
3746 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3748 def test_icmp_error(self):
3749 """ NAT64 ICMP Error message translation """
3750 self.tcp_port_in = 6303
3751 self.udp_port_in = 6304
3752 self.icmp_id_in = 6305
3754 ses_num_start = self.nat64_get_ses_num()
3756 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3758 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3759 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3761 # send some packets to create sessions
3762 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3763 self.pg0.add_stream(pkts)
3764 self.pg_enable_capture(self.pg_interfaces)
3766 capture_ip4 = self.pg1.get_capture(len(pkts))
3767 self.verify_capture_out(capture_ip4,
3768 nat_ip=self.nat_addr,
3769 dst_ip=self.pg1.remote_ip4)
3771 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3772 self.pg1.add_stream(pkts)
3773 self.pg_enable_capture(self.pg_interfaces)
3775 capture_ip6 = self.pg0.get_capture(len(pkts))
3776 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3777 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3778 self.pg0.remote_ip6)
3781 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3782 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3783 ICMPv6DestUnreach(code=1) /
3784 packet[IPv6] for packet in capture_ip6]
3785 self.pg0.add_stream(pkts)
3786 self.pg_enable_capture(self.pg_interfaces)
3788 capture = self.pg1.get_capture(len(pkts))
3789 for packet in capture:
3791 self.assertEqual(packet[IP].src, self.nat_addr)
3792 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3793 self.assertEqual(packet[ICMP].type, 3)
3794 self.assertEqual(packet[ICMP].code, 13)
3795 inner = packet[IPerror]
3796 self.assertEqual(inner.src, self.pg1.remote_ip4)
3797 self.assertEqual(inner.dst, self.nat_addr)
3798 self.check_icmp_checksum(packet)
3799 if inner.haslayer(TCPerror):
3800 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3801 elif inner.haslayer(UDPerror):
3802 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3804 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3806 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3810 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3811 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3812 ICMP(type=3, code=13) /
3813 packet[IP] for packet in capture_ip4]
3814 self.pg1.add_stream(pkts)
3815 self.pg_enable_capture(self.pg_interfaces)
3817 capture = self.pg0.get_capture(len(pkts))
3818 for packet in capture:
3820 self.assertEqual(packet[IPv6].src, ip.src)
3821 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3822 icmp = packet[ICMPv6DestUnreach]
3823 self.assertEqual(icmp.code, 1)
3824 inner = icmp[IPerror6]
3825 self.assertEqual(inner.src, self.pg0.remote_ip6)
3826 self.assertEqual(inner.dst, ip.src)
3827 self.check_icmpv6_checksum(packet)
3828 if inner.haslayer(TCPerror):
3829 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3830 elif inner.haslayer(UDPerror):
3831 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3833 self.assertEqual(inner[ICMPv6EchoRequest].id,
3836 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3839 def test_hairpinning(self):
3840 """ NAT64 hairpinning """
3842 client = self.pg0.remote_hosts[0]
3843 server = self.pg0.remote_hosts[1]
3844 server_tcp_in_port = 22
3845 server_tcp_out_port = 4022
3846 server_udp_in_port = 23
3847 server_udp_out_port = 4023
3848 client_tcp_in_port = 1234
3849 client_udp_in_port = 1235
3850 client_tcp_out_port = 0
3851 client_udp_out_port = 0
3852 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3853 nat_addr_ip6 = ip.src
3855 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3857 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3858 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3860 self.vapi.nat64_add_del_static_bib(server.ip6n,
3863 server_tcp_out_port,
3865 self.vapi.nat64_add_del_static_bib(server.ip6n,
3868 server_udp_out_port,
3873 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3874 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3875 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3877 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3878 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3879 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3881 self.pg0.add_stream(pkts)
3882 self.pg_enable_capture(self.pg_interfaces)
3884 capture = self.pg0.get_capture(len(pkts))
3885 for packet in capture:
3887 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3888 self.assertEqual(packet[IPv6].dst, server.ip6)
3889 if packet.haslayer(TCP):
3890 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3891 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3892 self.check_tcp_checksum(packet)
3893 client_tcp_out_port = packet[TCP].sport
3895 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3896 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3897 self.check_udp_checksum(packet)
3898 client_udp_out_port = packet[UDP].sport
3900 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3905 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3906 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3907 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3909 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3910 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3911 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3913 self.pg0.add_stream(pkts)
3914 self.pg_enable_capture(self.pg_interfaces)
3916 capture = self.pg0.get_capture(len(pkts))
3917 for packet in capture:
3919 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3920 self.assertEqual(packet[IPv6].dst, client.ip6)
3921 if packet.haslayer(TCP):
3922 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3923 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3924 self.check_tcp_checksum(packet)
3926 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3927 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3928 self.check_udp_checksum(packet)
3930 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3935 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3936 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3937 ICMPv6DestUnreach(code=1) /
3938 packet[IPv6] for packet in capture]
3939 self.pg0.add_stream(pkts)
3940 self.pg_enable_capture(self.pg_interfaces)
3942 capture = self.pg0.get_capture(len(pkts))
3943 for packet in capture:
3945 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3946 self.assertEqual(packet[IPv6].dst, server.ip6)
3947 icmp = packet[ICMPv6DestUnreach]
3948 self.assertEqual(icmp.code, 1)
3949 inner = icmp[IPerror6]
3950 self.assertEqual(inner.src, server.ip6)
3951 self.assertEqual(inner.dst, nat_addr_ip6)
3952 self.check_icmpv6_checksum(packet)
3953 if inner.haslayer(TCPerror):
3954 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3955 self.assertEqual(inner[TCPerror].dport,
3956 client_tcp_out_port)
3958 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3959 self.assertEqual(inner[UDPerror].dport,
3960 client_udp_out_port)
3962 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3965 def test_prefix(self):
3966 """ NAT64 Network-Specific Prefix """
3968 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3970 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3971 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3972 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3973 self.vrf1_nat_addr_n,
3974 vrf_id=self.vrf1_id)
3975 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3978 global_pref64 = "2001:db8::"
3979 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3980 global_pref64_len = 32
3981 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3983 prefix = self.vapi.nat64_prefix_dump()
3984 self.assertEqual(len(prefix), 1)
3985 self.assertEqual(prefix[0].prefix, global_pref64_n)
3986 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3987 self.assertEqual(prefix[0].vrf_id, 0)
3989 # Add tenant specific prefix
3990 vrf1_pref64 = "2001:db8:122:300::"
3991 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3992 vrf1_pref64_len = 56
3993 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3995 vrf_id=self.vrf1_id)
3996 prefix = self.vapi.nat64_prefix_dump()
3997 self.assertEqual(len(prefix), 2)
4000 pkts = self.create_stream_in_ip6(self.pg0,
4003 plen=global_pref64_len)
4004 self.pg0.add_stream(pkts)
4005 self.pg_enable_capture(self.pg_interfaces)
4007 capture = self.pg1.get_capture(len(pkts))
4008 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4009 dst_ip=self.pg1.remote_ip4)
4011 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4012 self.pg1.add_stream(pkts)
4013 self.pg_enable_capture(self.pg_interfaces)
4015 capture = self.pg0.get_capture(len(pkts))
4016 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4019 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4021 # Tenant specific prefix
4022 pkts = self.create_stream_in_ip6(self.pg2,
4025 plen=vrf1_pref64_len)
4026 self.pg2.add_stream(pkts)
4027 self.pg_enable_capture(self.pg_interfaces)
4029 capture = self.pg1.get_capture(len(pkts))
4030 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4031 dst_ip=self.pg1.remote_ip4)
4033 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4034 self.pg1.add_stream(pkts)
4035 self.pg_enable_capture(self.pg_interfaces)
4037 capture = self.pg2.get_capture(len(pkts))
4038 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4041 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4043 def test_unknown_proto(self):
4044 """ NAT64 translate packet with unknown protocol """
4046 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4048 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4049 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4050 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4053 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4054 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4055 TCP(sport=self.tcp_port_in, dport=20))
4056 self.pg0.add_stream(p)
4057 self.pg_enable_capture(self.pg_interfaces)
4059 p = self.pg1.get_capture(1)
4061 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4062 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4064 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4065 TCP(sport=1234, dport=1234))
4066 self.pg0.add_stream(p)
4067 self.pg_enable_capture(self.pg_interfaces)
4069 p = self.pg1.get_capture(1)
4072 self.assertEqual(packet[IP].src, self.nat_addr)
4073 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4074 self.assertTrue(packet.haslayer(GRE))
4075 self.check_ip_checksum(packet)
4077 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4081 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4082 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4084 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4085 TCP(sport=1234, dport=1234))
4086 self.pg1.add_stream(p)
4087 self.pg_enable_capture(self.pg_interfaces)
4089 p = self.pg0.get_capture(1)
4092 self.assertEqual(packet[IPv6].src, remote_ip6)
4093 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4094 self.assertEqual(packet[IPv6].nh, 47)
4096 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4099 def test_hairpinning_unknown_proto(self):
4100 """ NAT64 translate packet with unknown protocol - hairpinning """
4102 client = self.pg0.remote_hosts[0]
4103 server = self.pg0.remote_hosts[1]
4104 server_tcp_in_port = 22
4105 server_tcp_out_port = 4022
4106 client_tcp_in_port = 1234
4107 client_tcp_out_port = 1235
4108 server_nat_ip = "10.0.0.100"
4109 client_nat_ip = "10.0.0.110"
4110 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4111 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4112 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4113 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4115 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4117 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4118 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4120 self.vapi.nat64_add_del_static_bib(server.ip6n,
4123 server_tcp_out_port,
4126 self.vapi.nat64_add_del_static_bib(server.ip6n,
4132 self.vapi.nat64_add_del_static_bib(client.ip6n,
4135 client_tcp_out_port,
4139 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4140 IPv6(src=client.ip6, dst=server_nat_ip6) /
4141 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4142 self.pg0.add_stream(p)
4143 self.pg_enable_capture(self.pg_interfaces)
4145 p = self.pg0.get_capture(1)
4147 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4148 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4150 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4151 TCP(sport=1234, dport=1234))
4152 self.pg0.add_stream(p)
4153 self.pg_enable_capture(self.pg_interfaces)
4155 p = self.pg0.get_capture(1)
4158 self.assertEqual(packet[IPv6].src, client_nat_ip6)
4159 self.assertEqual(packet[IPv6].dst, server.ip6)
4160 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4162 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4166 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4167 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4169 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4170 TCP(sport=1234, dport=1234))
4171 self.pg0.add_stream(p)
4172 self.pg_enable_capture(self.pg_interfaces)
4174 p = self.pg0.get_capture(1)
4177 self.assertEqual(packet[IPv6].src, server_nat_ip6)
4178 self.assertEqual(packet[IPv6].dst, client.ip6)
4179 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4181 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4184 def test_one_armed_nat64(self):
4185 """ One armed NAT64 """
4187 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4191 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4193 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4194 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4197 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4198 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4199 TCP(sport=12345, dport=80))
4200 self.pg3.add_stream(p)
4201 self.pg_enable_capture(self.pg_interfaces)
4203 capture = self.pg3.get_capture(1)
4208 self.assertEqual(ip.src, self.nat_addr)
4209 self.assertEqual(ip.dst, self.pg3.remote_ip4)
4210 self.assertNotEqual(tcp.sport, 12345)
4211 external_port = tcp.sport
4212 self.assertEqual(tcp.dport, 80)
4213 self.check_tcp_checksum(p)
4214 self.check_ip_checksum(p)
4216 self.logger.error(ppp("Unexpected or invalid packet:", p))
4220 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4221 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4222 TCP(sport=80, dport=external_port))
4223 self.pg3.add_stream(p)
4224 self.pg_enable_capture(self.pg_interfaces)
4226 capture = self.pg3.get_capture(1)
4231 self.assertEqual(ip.src, remote_host_ip6)
4232 self.assertEqual(ip.dst, self.pg3.remote_ip6)
4233 self.assertEqual(tcp.sport, 80)
4234 self.assertEqual(tcp.dport, 12345)
4235 self.check_tcp_checksum(p)
4237 self.logger.error(ppp("Unexpected or invalid packet:", p))
4240 def test_frag_in_order(self):
4241 """ NAT64 translate fragments arriving in order """
4242 self.tcp_port_in = random.randint(1025, 65535)
4244 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4246 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4247 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4249 reass = self.vapi.nat_reass_dump()
4250 reass_n_start = len(reass)
4254 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4255 self.tcp_port_in, 20, data)
4256 self.pg0.add_stream(pkts)
4257 self.pg_enable_capture(self.pg_interfaces)
4259 frags = self.pg1.get_capture(len(pkts))
4260 p = self.reass_frags_and_verify(frags,
4262 self.pg1.remote_ip4)
4263 self.assertEqual(p[TCP].dport, 20)
4264 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4265 self.tcp_port_out = p[TCP].sport
4266 self.assertEqual(data, p[Raw].load)
4269 data = "A" * 4 + "b" * 16 + "C" * 3
4270 pkts = self.create_stream_frag(self.pg1,
4275 self.pg1.add_stream(pkts)
4276 self.pg_enable_capture(self.pg_interfaces)
4278 frags = self.pg0.get_capture(len(pkts))
4279 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4280 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4281 self.assertEqual(p[TCP].sport, 20)
4282 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4283 self.assertEqual(data, p[Raw].load)
4285 reass = self.vapi.nat_reass_dump()
4286 reass_n_end = len(reass)
4288 self.assertEqual(reass_n_end - reass_n_start, 2)
4290 def test_reass_hairpinning(self):
4291 """ NAT64 fragments hairpinning """
4293 client = self.pg0.remote_hosts[0]
4294 server = self.pg0.remote_hosts[1]
4295 server_in_port = random.randint(1025, 65535)
4296 server_out_port = random.randint(1025, 65535)
4297 client_in_port = random.randint(1025, 65535)
4298 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4299 nat_addr_ip6 = ip.src
4301 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4303 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4304 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4306 # add static BIB entry for server
4307 self.vapi.nat64_add_del_static_bib(server.ip6n,
4313 # send packet from host to server
4314 pkts = self.create_stream_frag_ip6(self.pg0,
4319 self.pg0.add_stream(pkts)
4320 self.pg_enable_capture(self.pg_interfaces)
4322 frags = self.pg0.get_capture(len(pkts))
4323 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4324 self.assertNotEqual(p[TCP].sport, client_in_port)
4325 self.assertEqual(p[TCP].dport, server_in_port)
4326 self.assertEqual(data, p[Raw].load)
4328 def test_frag_out_of_order(self):
4329 """ NAT64 translate fragments arriving out of order """
4330 self.tcp_port_in = random.randint(1025, 65535)
4332 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4334 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4335 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4339 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4340 self.tcp_port_in, 20, data)
4342 self.pg0.add_stream(pkts)
4343 self.pg_enable_capture(self.pg_interfaces)
4345 frags = self.pg1.get_capture(len(pkts))
4346 p = self.reass_frags_and_verify(frags,
4348 self.pg1.remote_ip4)
4349 self.assertEqual(p[TCP].dport, 20)
4350 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4351 self.tcp_port_out = p[TCP].sport
4352 self.assertEqual(data, p[Raw].load)
4355 data = "A" * 4 + "B" * 16 + "C" * 3
4356 pkts = self.create_stream_frag(self.pg1,
4362 self.pg1.add_stream(pkts)
4363 self.pg_enable_capture(self.pg_interfaces)
4365 frags = self.pg0.get_capture(len(pkts))
4366 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4367 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4368 self.assertEqual(p[TCP].sport, 20)
4369 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4370 self.assertEqual(data, p[Raw].load)
4372 def test_interface_addr(self):
4373 """ Acquire NAT64 pool addresses from interface """
4374 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
4376 # no address in NAT64 pool
4377 adresses = self.vapi.nat44_address_dump()
4378 self.assertEqual(0, len(adresses))
4380 # configure interface address and check NAT64 address pool
4381 self.pg4.config_ip4()
4382 addresses = self.vapi.nat64_pool_addr_dump()
4383 self.assertEqual(len(addresses), 1)
4384 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
4386 # remove interface address and check NAT64 address pool
4387 self.pg4.unconfig_ip4()
4388 addresses = self.vapi.nat64_pool_addr_dump()
4389 self.assertEqual(0, len(adresses))
4391 def nat64_get_ses_num(self):
4393 Return number of active NAT64 sessions.
4395 st = self.vapi.nat64_st_dump()
4398 def clear_nat64(self):
4400 Clear NAT64 configuration.
4402 self.vapi.nat64_set_timeouts()
4404 interfaces = self.vapi.nat64_interface_dump()
4405 for intf in interfaces:
4406 if intf.is_inside > 1:
4407 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4410 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4414 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4417 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4425 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4428 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4436 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4439 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4447 adresses = self.vapi.nat64_pool_addr_dump()
4448 for addr in adresses:
4449 self.vapi.nat64_add_del_pool_addr_range(addr.address,
4454 prefixes = self.vapi.nat64_prefix_dump()
4455 for prefix in prefixes:
4456 self.vapi.nat64_add_del_prefix(prefix.prefix,
4458 vrf_id=prefix.vrf_id,
4462 super(TestNAT64, self).tearDown()
4463 if not self.vpp_dead:
4464 self.logger.info(self.vapi.cli("show nat64 pool"))
4465 self.logger.info(self.vapi.cli("show nat64 interfaces"))
4466 self.logger.info(self.vapi.cli("show nat64 prefix"))
4467 self.logger.info(self.vapi.cli("show nat64 bib all"))
4468 self.logger.info(self.vapi.cli("show nat64 session table all"))
4469 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4473 class TestDSlite(MethodHolder):
4474 """ DS-Lite Test Cases """
4477 def setUpClass(cls):
4478 super(TestDSlite, cls).setUpClass()
4481 cls.nat_addr = '10.0.0.3'
4482 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4484 cls.create_pg_interfaces(range(2))
4486 cls.pg0.config_ip4()
4487 cls.pg0.resolve_arp()
4489 cls.pg1.config_ip6()
4490 cls.pg1.generate_remote_hosts(2)
4491 cls.pg1.configure_ipv6_neighbors()
4494 super(TestDSlite, cls).tearDownClass()
4497 def test_dslite(self):
4498 """ Test DS-Lite """
4499 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4501 aftr_ip4 = '192.0.0.1'
4502 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4503 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4504 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4505 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4508 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4509 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4510 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4511 UDP(sport=20000, dport=10000))
4512 self.pg1.add_stream(p)
4513 self.pg_enable_capture(self.pg_interfaces)
4515 capture = self.pg0.get_capture(1)
4516 capture = capture[0]
4517 self.assertFalse(capture.haslayer(IPv6))
4518 self.assertEqual(capture[IP].src, self.nat_addr)
4519 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4520 self.assertNotEqual(capture[UDP].sport, 20000)
4521 self.assertEqual(capture[UDP].dport, 10000)
4522 self.check_ip_checksum(capture)
4523 out_port = capture[UDP].sport
4525 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4526 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4527 UDP(sport=10000, dport=out_port))
4528 self.pg0.add_stream(p)
4529 self.pg_enable_capture(self.pg_interfaces)
4531 capture = self.pg1.get_capture(1)
4532 capture = capture[0]
4533 self.assertEqual(capture[IPv6].src, aftr_ip6)
4534 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4535 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4536 self.assertEqual(capture[IP].dst, '192.168.1.1')
4537 self.assertEqual(capture[UDP].sport, 10000)
4538 self.assertEqual(capture[UDP].dport, 20000)
4539 self.check_ip_checksum(capture)
4542 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4543 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4544 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4545 TCP(sport=20001, dport=10001))
4546 self.pg1.add_stream(p)
4547 self.pg_enable_capture(self.pg_interfaces)
4549 capture = self.pg0.get_capture(1)
4550 capture = capture[0]
4551 self.assertFalse(capture.haslayer(IPv6))
4552 self.assertEqual(capture[IP].src, self.nat_addr)
4553 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4554 self.assertNotEqual(capture[TCP].sport, 20001)
4555 self.assertEqual(capture[TCP].dport, 10001)
4556 self.check_ip_checksum(capture)
4557 self.check_tcp_checksum(capture)
4558 out_port = capture[TCP].sport
4560 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4561 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4562 TCP(sport=10001, dport=out_port))
4563 self.pg0.add_stream(p)
4564 self.pg_enable_capture(self.pg_interfaces)
4566 capture = self.pg1.get_capture(1)
4567 capture = capture[0]
4568 self.assertEqual(capture[IPv6].src, aftr_ip6)
4569 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4570 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4571 self.assertEqual(capture[IP].dst, '192.168.1.1')
4572 self.assertEqual(capture[TCP].sport, 10001)
4573 self.assertEqual(capture[TCP].dport, 20001)
4574 self.check_ip_checksum(capture)
4575 self.check_tcp_checksum(capture)
4578 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4579 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4580 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4581 ICMP(id=4000, type='echo-request'))
4582 self.pg1.add_stream(p)
4583 self.pg_enable_capture(self.pg_interfaces)
4585 capture = self.pg0.get_capture(1)
4586 capture = capture[0]
4587 self.assertFalse(capture.haslayer(IPv6))
4588 self.assertEqual(capture[IP].src, self.nat_addr)
4589 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4590 self.assertNotEqual(capture[ICMP].id, 4000)
4591 self.check_ip_checksum(capture)
4592 self.check_icmp_checksum(capture)
4593 out_id = capture[ICMP].id
4595 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4596 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4597 ICMP(id=out_id, type='echo-reply'))
4598 self.pg0.add_stream(p)
4599 self.pg_enable_capture(self.pg_interfaces)
4601 capture = self.pg1.get_capture(1)
4602 capture = capture[0]
4603 self.assertEqual(capture[IPv6].src, aftr_ip6)
4604 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4605 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4606 self.assertEqual(capture[IP].dst, '192.168.1.1')
4607 self.assertEqual(capture[ICMP].id, 4000)
4608 self.check_ip_checksum(capture)
4609 self.check_icmp_checksum(capture)
4612 super(TestDSlite, self).tearDown()
4613 if not self.vpp_dead:
4614 self.logger.info(self.vapi.cli("show dslite pool"))
4616 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4617 self.logger.info(self.vapi.cli("show dslite sessions"))
4619 if __name__ == '__main__':
4620 unittest.main(testRunner=VppTestRunner)