7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
20 class MethodHolder(VppTestCase):
21 """ SNAT create capture and verify method holder """
25 super(MethodHolder, cls).setUpClass()
28 super(MethodHolder, self).tearDown()
30 def check_ip_checksum(self, pkt):
32 Check IP checksum of the packet
34 :param pkt: Packet to check IP checksum
36 new = pkt.__class__(str(pkt))
38 new = new.__class__(str(new))
39 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
41 def check_tcp_checksum(self, pkt):
43 Check TCP checksum in IP packet
45 :param pkt: Packet to check TCP checksum
47 new = pkt.__class__(str(pkt))
49 new = new.__class__(str(new))
50 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
52 def check_udp_checksum(self, pkt):
54 Check UDP checksum in IP packet
56 :param pkt: Packet to check UDP checksum
58 new = pkt.__class__(str(pkt))
60 new = new.__class__(str(new))
61 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
63 def check_icmp_errror_embedded(self, pkt):
65 Check ICMP error embeded packet checksum
67 :param pkt: Packet to check ICMP error embeded packet checksum
69 if pkt.haslayer(IPerror):
70 new = pkt.__class__(str(pkt))
71 del new['IPerror'].chksum
72 new = new.__class__(str(new))
73 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
75 if pkt.haslayer(TCPerror):
76 new = pkt.__class__(str(pkt))
77 del new['TCPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
81 if pkt.haslayer(UDPerror):
82 if pkt['UDPerror'].chksum != 0:
83 new = pkt.__class__(str(pkt))
84 del new['UDPerror'].chksum
85 new = new.__class__(str(new))
86 self.assertEqual(new['UDPerror'].chksum,
87 pkt['UDPerror'].chksum)
89 if pkt.haslayer(ICMPerror):
90 del new['ICMPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
94 def check_icmp_checksum(self, pkt):
96 Check ICMP checksum in IPv4 packet
98 :param pkt: Packet to check ICMP checksum
100 new = pkt.__class__(str(pkt))
101 del new['ICMP'].chksum
102 new = new.__class__(str(new))
103 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104 if pkt.haslayer(IPerror):
105 self.check_icmp_errror_embedded(pkt)
107 def check_icmpv6_checksum(self, pkt):
109 Check ICMPv6 checksum in IPv4 packet
111 :param pkt: Packet to check ICMPv6 checksum
113 new = pkt.__class__(str(pkt))
114 if pkt.haslayer(ICMPv6DestUnreach):
115 del new['ICMPv6DestUnreach'].cksum
116 new = new.__class__(str(new))
117 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118 pkt['ICMPv6DestUnreach'].cksum)
119 self.check_icmp_errror_embedded(pkt)
120 if pkt.haslayer(ICMPv6EchoRequest):
121 del new['ICMPv6EchoRequest'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124 pkt['ICMPv6EchoRequest'].cksum)
125 if pkt.haslayer(ICMPv6EchoReply):
126 del new['ICMPv6EchoReply'].cksum
127 new = new.__class__(str(new))
128 self.assertEqual(new['ICMPv6EchoReply'].cksum,
129 pkt['ICMPv6EchoReply'].cksum)
131 def create_stream_in(self, in_if, out_if, ttl=64):
133 Create packet stream for inside network
135 :param in_if: Inside interface
136 :param out_if: Outside interface
137 :param ttl: TTL of generated packets
141 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143 TCP(sport=self.tcp_port_in, dport=20))
147 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149 UDP(sport=self.udp_port_in, dport=20))
153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155 ICMP(id=self.icmp_id_in, type='echo-request'))
160 def create_stream_in_ip6(self, in_if, out_if, hlim=64):
162 Create IPv6 packet stream for inside network
164 :param in_if: Inside interface
165 :param out_if: Outside interface
166 :param ttl: Hop Limit of generated packets
169 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
171 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
172 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
173 TCP(sport=self.tcp_port_in, dport=20))
177 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
178 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
179 UDP(sport=self.udp_port_in, dport=20))
183 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
184 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
185 ICMPv6EchoRequest(id=self.icmp_id_in))
190 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
192 Create packet stream for outside network
194 :param out_if: Outside interface
195 :param dst_ip: Destination IP address (Default use global SNAT address)
196 :param ttl: TTL of generated packets
199 dst_ip = self.snat_addr
202 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
203 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
204 TCP(dport=self.tcp_port_out, sport=20))
208 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
209 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
210 UDP(dport=self.udp_port_out, sport=20))
214 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
215 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
216 ICMP(id=self.icmp_id_out, type='echo-reply'))
221 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
222 packet_num=3, dst_ip=None):
224 Verify captured packets on outside network
226 :param capture: Captured packets
227 :param nat_ip: Translated IP address (Default use global SNAT address)
228 :param same_port: Sorce port number is not translated (Default False)
229 :param packet_num: Expected number of packets (Default 3)
230 :param dst_ip: Destination IP address (Default do not verify)
233 nat_ip = self.snat_addr
234 self.assertEqual(packet_num, len(capture))
235 for packet in capture:
237 self.check_ip_checksum(packet)
238 self.assertEqual(packet[IP].src, nat_ip)
239 if dst_ip is not None:
240 self.assertEqual(packet[IP].dst, dst_ip)
241 if packet.haslayer(TCP):
243 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
246 packet[TCP].sport, self.tcp_port_in)
247 self.tcp_port_out = packet[TCP].sport
248 self.check_tcp_checksum(packet)
249 elif packet.haslayer(UDP):
251 self.assertEqual(packet[UDP].sport, self.udp_port_in)
254 packet[UDP].sport, self.udp_port_in)
255 self.udp_port_out = packet[UDP].sport
258 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
260 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
261 self.icmp_id_out = packet[ICMP].id
262 self.check_icmp_checksum(packet)
264 self.logger.error(ppp("Unexpected or invalid packet "
265 "(outside network):", packet))
268 def verify_capture_in(self, capture, in_if, packet_num=3):
270 Verify captured packets on inside network
272 :param capture: Captured packets
273 :param in_if: Inside interface
274 :param packet_num: Expected number of packets (Default 3)
276 self.assertEqual(packet_num, len(capture))
277 for packet in capture:
279 self.check_ip_checksum(packet)
280 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
281 if packet.haslayer(TCP):
282 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
283 self.check_tcp_checksum(packet)
284 elif packet.haslayer(UDP):
285 self.assertEqual(packet[UDP].dport, self.udp_port_in)
287 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
288 self.check_icmp_checksum(packet)
290 self.logger.error(ppp("Unexpected or invalid packet "
291 "(inside network):", packet))
294 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
296 Verify captured IPv6 packets on inside network
298 :param capture: Captured packets
299 :param src_ip: Source IP
300 :param dst_ip: Destination IP address
301 :param packet_num: Expected number of packets (Default 3)
303 self.assertEqual(packet_num, len(capture))
304 for packet in capture:
306 self.assertEqual(packet[IPv6].src, src_ip)
307 self.assertEqual(packet[IPv6].dst, dst_ip)
308 if packet.haslayer(TCP):
309 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
310 self.check_tcp_checksum(packet)
311 elif packet.haslayer(UDP):
312 self.assertEqual(packet[UDP].dport, self.udp_port_in)
313 self.check_udp_checksum(packet)
315 self.assertEqual(packet[ICMPv6EchoReply].id,
317 self.check_icmpv6_checksum(packet)
319 self.logger.error(ppp("Unexpected or invalid packet "
320 "(inside network):", packet))
323 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
325 Verify captured packet that don't have to be translated
327 :param capture: Captured packets
328 :param ingress_if: Ingress interface
329 :param egress_if: Egress interface
331 for packet in capture:
333 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
334 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
335 if packet.haslayer(TCP):
336 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
337 elif packet.haslayer(UDP):
338 self.assertEqual(packet[UDP].sport, self.udp_port_in)
340 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
342 self.logger.error(ppp("Unexpected or invalid packet "
343 "(inside network):", packet))
346 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
347 packet_num=3, icmp_type=11):
349 Verify captured packets with ICMP errors on outside network
351 :param capture: Captured packets
352 :param src_ip: Translated IP address or IP address of VPP
353 (Default use global SNAT address)
354 :param packet_num: Expected number of packets (Default 3)
355 :param icmp_type: Type of error ICMP packet
356 we are expecting (Default 11)
359 src_ip = self.snat_addr
360 self.assertEqual(packet_num, len(capture))
361 for packet in capture:
363 self.assertEqual(packet[IP].src, src_ip)
364 self.assertTrue(packet.haslayer(ICMP))
366 self.assertEqual(icmp.type, icmp_type)
367 self.assertTrue(icmp.haslayer(IPerror))
368 inner_ip = icmp[IPerror]
369 if inner_ip.haslayer(TCPerror):
370 self.assertEqual(inner_ip[TCPerror].dport,
372 elif inner_ip.haslayer(UDPerror):
373 self.assertEqual(inner_ip[UDPerror].dport,
376 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
378 self.logger.error(ppp("Unexpected or invalid packet "
379 "(outside network):", packet))
382 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
385 Verify captured packets with ICMP errors on inside network
387 :param capture: Captured packets
388 :param in_if: Inside interface
389 :param packet_num: Expected number of packets (Default 3)
390 :param icmp_type: Type of error ICMP packet
391 we are expecting (Default 11)
393 self.assertEqual(packet_num, len(capture))
394 for packet in capture:
396 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
397 self.assertTrue(packet.haslayer(ICMP))
399 self.assertEqual(icmp.type, icmp_type)
400 self.assertTrue(icmp.haslayer(IPerror))
401 inner_ip = icmp[IPerror]
402 if inner_ip.haslayer(TCPerror):
403 self.assertEqual(inner_ip[TCPerror].sport,
405 elif inner_ip.haslayer(UDPerror):
406 self.assertEqual(inner_ip[UDPerror].sport,
409 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
411 self.logger.error(ppp("Unexpected or invalid packet "
412 "(inside network):", packet))
415 def verify_ipfix_nat44_ses(self, data):
417 Verify IPFIX NAT44 session create/delete event
419 :param data: Decoded IPFIX data records
421 nat44_ses_create_num = 0
422 nat44_ses_delete_num = 0
423 self.assertEqual(6, len(data))
426 self.assertIn(ord(record[230]), [4, 5])
427 if ord(record[230]) == 4:
428 nat44_ses_create_num += 1
430 nat44_ses_delete_num += 1
432 self.assertEqual(self.pg0.remote_ip4n, record[8])
433 # postNATSourceIPv4Address
434 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
437 self.assertEqual(struct.pack("!I", 0), record[234])
438 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
439 if IP_PROTOS.icmp == ord(record[4]):
440 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
441 self.assertEqual(struct.pack("!H", self.icmp_id_out),
443 elif IP_PROTOS.tcp == ord(record[4]):
444 self.assertEqual(struct.pack("!H", self.tcp_port_in),
446 self.assertEqual(struct.pack("!H", self.tcp_port_out),
448 elif IP_PROTOS.udp == ord(record[4]):
449 self.assertEqual(struct.pack("!H", self.udp_port_in),
451 self.assertEqual(struct.pack("!H", self.udp_port_out),
454 self.fail("Invalid protocol")
455 self.assertEqual(3, nat44_ses_create_num)
456 self.assertEqual(3, nat44_ses_delete_num)
458 def verify_ipfix_addr_exhausted(self, data):
460 Verify IPFIX NAT addresses event
462 :param data: Decoded IPFIX data records
464 self.assertEqual(1, len(data))
467 self.assertEqual(ord(record[230]), 3)
469 self.assertEqual(struct.pack("!I", 0), record[283])
472 class TestSNAT(MethodHolder):
473 """ SNAT Test Cases """
477 super(TestSNAT, cls).setUpClass()
480 cls.tcp_port_in = 6303
481 cls.tcp_port_out = 6303
482 cls.udp_port_in = 6304
483 cls.udp_port_out = 6304
484 cls.icmp_id_in = 6305
485 cls.icmp_id_out = 6305
486 cls.snat_addr = '10.0.0.3'
487 cls.ipfix_src_port = 4739
488 cls.ipfix_domain_id = 1
490 cls.create_pg_interfaces(range(9))
491 cls.interfaces = list(cls.pg_interfaces[0:4])
493 for i in cls.interfaces:
498 cls.pg0.generate_remote_hosts(3)
499 cls.pg0.configure_ipv4_neighbors()
501 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
503 cls.pg4._local_ip4 = "172.16.255.1"
504 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
505 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
506 cls.pg4.set_table_ip4(10)
507 cls.pg5._local_ip4 = "172.16.255.3"
508 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
509 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
510 cls.pg5.set_table_ip4(10)
511 cls.pg6._local_ip4 = "172.16.255.1"
512 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
513 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
514 cls.pg6.set_table_ip4(20)
515 for i in cls.overlapping_interfaces:
524 super(TestSNAT, cls).tearDownClass()
527 def clear_snat(self):
529 Clear SNAT configuration.
531 # I found no elegant way to do this
532 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
533 dst_address_length=32,
534 next_hop_address=self.pg7.remote_ip4n,
535 next_hop_sw_if_index=self.pg7.sw_if_index,
537 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
538 dst_address_length=32,
539 next_hop_address=self.pg8.remote_ip4n,
540 next_hop_sw_if_index=self.pg8.sw_if_index,
543 for intf in [self.pg7, self.pg8]:
544 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
546 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
551 if self.pg7.has_ip4_config:
552 self.pg7.unconfig_ip4()
554 interfaces = self.vapi.snat_interface_addr_dump()
555 for intf in interfaces:
556 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
558 self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
559 domain_id=self.ipfix_domain_id)
560 self.ipfix_src_port = 4739
561 self.ipfix_domain_id = 1
563 interfaces = self.vapi.snat_interface_dump()
564 for intf in interfaces:
565 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
569 static_mappings = self.vapi.snat_static_mapping_dump()
570 for sm in static_mappings:
571 self.vapi.snat_add_static_mapping(sm.local_ip_address,
572 sm.external_ip_address,
573 local_port=sm.local_port,
574 external_port=sm.external_port,
575 addr_only=sm.addr_only,
577 protocol=sm.protocol,
580 adresses = self.vapi.snat_address_dump()
581 for addr in adresses:
582 self.vapi.snat_add_address_range(addr.ip_address,
586 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
587 local_port=0, external_port=0, vrf_id=0,
588 is_add=1, external_sw_if_index=0xFFFFFFFF,
591 Add/delete S-NAT static mapping
593 :param local_ip: Local IP address
594 :param external_ip: External IP address
595 :param local_port: Local port number (Optional)
596 :param external_port: External port number (Optional)
597 :param vrf_id: VRF ID (Default 0)
598 :param is_add: 1 if add, 0 if delete (Default add)
599 :param external_sw_if_index: External interface instead of IP address
600 :param proto: IP protocol (Mandatory if port specified)
603 if local_port and external_port:
605 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
606 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
607 self.vapi.snat_add_static_mapping(
610 external_sw_if_index,
618 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
620 Add/delete S-NAT address
622 :param ip: IP address
623 :param is_add: 1 if add, 0 if delete (Default add)
625 snat_addr = socket.inet_pton(socket.AF_INET, ip)
626 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
629 def test_dynamic(self):
630 """ SNAT dynamic translation test """
632 self.snat_add_address(self.snat_addr)
633 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
634 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
638 pkts = self.create_stream_in(self.pg0, self.pg1)
639 self.pg0.add_stream(pkts)
640 self.pg_enable_capture(self.pg_interfaces)
642 capture = self.pg1.get_capture(len(pkts))
643 self.verify_capture_out(capture)
646 pkts = self.create_stream_out(self.pg1)
647 self.pg1.add_stream(pkts)
648 self.pg_enable_capture(self.pg_interfaces)
650 capture = self.pg0.get_capture(len(pkts))
651 self.verify_capture_in(capture, self.pg0)
653 def test_dynamic_icmp_errors_in2out_ttl_1(self):
654 """ SNAT handling of client packets with TTL=1 """
656 self.snat_add_address(self.snat_addr)
657 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
658 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
661 # Client side - generate traffic
662 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
663 self.pg0.add_stream(pkts)
664 self.pg_enable_capture(self.pg_interfaces)
667 # Client side - verify ICMP type 11 packets
668 capture = self.pg0.get_capture(len(pkts))
669 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
671 def test_dynamic_icmp_errors_out2in_ttl_1(self):
672 """ SNAT handling of server packets with TTL=1 """
674 self.snat_add_address(self.snat_addr)
675 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
676 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
679 # Client side - create sessions
680 pkts = self.create_stream_in(self.pg0, self.pg1)
681 self.pg0.add_stream(pkts)
682 self.pg_enable_capture(self.pg_interfaces)
685 # Server side - generate traffic
686 capture = self.pg1.get_capture(len(pkts))
687 self.verify_capture_out(capture)
688 pkts = self.create_stream_out(self.pg1, ttl=1)
689 self.pg1.add_stream(pkts)
690 self.pg_enable_capture(self.pg_interfaces)
693 # Server side - verify ICMP type 11 packets
694 capture = self.pg1.get_capture(len(pkts))
695 self.verify_capture_out_with_icmp_errors(capture,
696 src_ip=self.pg1.local_ip4)
698 def test_dynamic_icmp_errors_in2out_ttl_2(self):
699 """ SNAT handling of error responses to client packets with TTL=2 """
701 self.snat_add_address(self.snat_addr)
702 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
703 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
706 # Client side - generate traffic
707 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
708 self.pg0.add_stream(pkts)
709 self.pg_enable_capture(self.pg_interfaces)
712 # Server side - simulate ICMP type 11 response
713 capture = self.pg1.get_capture(len(pkts))
714 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
715 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
716 ICMP(type=11) / packet[IP] for packet in capture]
717 self.pg1.add_stream(pkts)
718 self.pg_enable_capture(self.pg_interfaces)
721 # Client side - verify ICMP type 11 packets
722 capture = self.pg0.get_capture(len(pkts))
723 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
725 def test_dynamic_icmp_errors_out2in_ttl_2(self):
726 """ SNAT handling of error responses to server packets with TTL=2 """
728 self.snat_add_address(self.snat_addr)
729 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
730 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
733 # Client side - create sessions
734 pkts = self.create_stream_in(self.pg0, self.pg1)
735 self.pg0.add_stream(pkts)
736 self.pg_enable_capture(self.pg_interfaces)
739 # Server side - generate traffic
740 capture = self.pg1.get_capture(len(pkts))
741 self.verify_capture_out(capture)
742 pkts = self.create_stream_out(self.pg1, ttl=2)
743 self.pg1.add_stream(pkts)
744 self.pg_enable_capture(self.pg_interfaces)
747 # Client side - simulate ICMP type 11 response
748 capture = self.pg0.get_capture(len(pkts))
749 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
750 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
751 ICMP(type=11) / packet[IP] for packet in capture]
752 self.pg0.add_stream(pkts)
753 self.pg_enable_capture(self.pg_interfaces)
756 # Server side - verify ICMP type 11 packets
757 capture = self.pg1.get_capture(len(pkts))
758 self.verify_capture_out_with_icmp_errors(capture)
760 def test_ping_out_interface_from_outside(self):
761 """ Ping SNAT out interface from outside network """
763 self.snat_add_address(self.snat_addr)
764 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
765 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
768 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
769 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
770 ICMP(id=self.icmp_id_out, type='echo-request'))
772 self.pg1.add_stream(pkts)
773 self.pg_enable_capture(self.pg_interfaces)
775 capture = self.pg1.get_capture(len(pkts))
776 self.assertEqual(1, len(capture))
779 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
780 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
781 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
782 self.assertEqual(packet[ICMP].type, 0) # echo reply
784 self.logger.error(ppp("Unexpected or invalid packet "
785 "(outside network):", packet))
788 def test_ping_internal_host_from_outside(self):
789 """ Ping internal host from outside network """
791 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
792 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
793 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
797 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
798 IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
799 ICMP(id=self.icmp_id_out, type='echo-request'))
800 self.pg1.add_stream(pkt)
801 self.pg_enable_capture(self.pg_interfaces)
803 capture = self.pg0.get_capture(1)
804 self.verify_capture_in(capture, self.pg0, packet_num=1)
805 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
808 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
809 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
810 ICMP(id=self.icmp_id_in, type='echo-reply'))
811 self.pg0.add_stream(pkt)
812 self.pg_enable_capture(self.pg_interfaces)
814 capture = self.pg1.get_capture(1)
815 self.verify_capture_out(capture, same_port=True, packet_num=1)
816 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
818 def test_static_in(self):
819 """ SNAT 1:1 NAT initialized from inside network """
822 self.tcp_port_out = 6303
823 self.udp_port_out = 6304
824 self.icmp_id_out = 6305
826 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
827 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
828 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
832 pkts = self.create_stream_in(self.pg0, self.pg1)
833 self.pg0.add_stream(pkts)
834 self.pg_enable_capture(self.pg_interfaces)
836 capture = self.pg1.get_capture(len(pkts))
837 self.verify_capture_out(capture, nat_ip, True)
840 pkts = self.create_stream_out(self.pg1, nat_ip)
841 self.pg1.add_stream(pkts)
842 self.pg_enable_capture(self.pg_interfaces)
844 capture = self.pg0.get_capture(len(pkts))
845 self.verify_capture_in(capture, self.pg0)
847 def test_static_out(self):
848 """ SNAT 1:1 NAT initialized from outside network """
851 self.tcp_port_out = 6303
852 self.udp_port_out = 6304
853 self.icmp_id_out = 6305
855 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
856 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
857 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
861 pkts = self.create_stream_out(self.pg1, nat_ip)
862 self.pg1.add_stream(pkts)
863 self.pg_enable_capture(self.pg_interfaces)
865 capture = self.pg0.get_capture(len(pkts))
866 self.verify_capture_in(capture, self.pg0)
869 pkts = self.create_stream_in(self.pg0, self.pg1)
870 self.pg0.add_stream(pkts)
871 self.pg_enable_capture(self.pg_interfaces)
873 capture = self.pg1.get_capture(len(pkts))
874 self.verify_capture_out(capture, nat_ip, True)
876 def test_static_with_port_in(self):
877 """ SNAT 1:1 NAT with port initialized from inside network """
879 self.tcp_port_out = 3606
880 self.udp_port_out = 3607
881 self.icmp_id_out = 3608
883 self.snat_add_address(self.snat_addr)
884 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
885 self.tcp_port_in, self.tcp_port_out,
887 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
888 self.udp_port_in, self.udp_port_out,
890 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
891 self.icmp_id_in, self.icmp_id_out,
892 proto=IP_PROTOS.icmp)
893 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
894 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
898 pkts = self.create_stream_in(self.pg0, self.pg1)
899 self.pg0.add_stream(pkts)
900 self.pg_enable_capture(self.pg_interfaces)
902 capture = self.pg1.get_capture(len(pkts))
903 self.verify_capture_out(capture)
906 pkts = self.create_stream_out(self.pg1)
907 self.pg1.add_stream(pkts)
908 self.pg_enable_capture(self.pg_interfaces)
910 capture = self.pg0.get_capture(len(pkts))
911 self.verify_capture_in(capture, self.pg0)
913 def test_static_with_port_out(self):
914 """ SNAT 1:1 NAT with port initialized from outside network """
916 self.tcp_port_out = 30606
917 self.udp_port_out = 30607
918 self.icmp_id_out = 30608
920 self.snat_add_address(self.snat_addr)
921 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
922 self.tcp_port_in, self.tcp_port_out,
924 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
925 self.udp_port_in, self.udp_port_out,
927 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
928 self.icmp_id_in, self.icmp_id_out,
929 proto=IP_PROTOS.icmp)
930 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
931 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
935 pkts = self.create_stream_out(self.pg1)
936 self.pg1.add_stream(pkts)
937 self.pg_enable_capture(self.pg_interfaces)
939 capture = self.pg0.get_capture(len(pkts))
940 self.verify_capture_in(capture, self.pg0)
943 pkts = self.create_stream_in(self.pg0, self.pg1)
944 self.pg0.add_stream(pkts)
945 self.pg_enable_capture(self.pg_interfaces)
947 capture = self.pg1.get_capture(len(pkts))
948 self.verify_capture_out(capture)
950 def test_static_vrf_aware(self):
951 """ SNAT 1:1 NAT VRF awareness """
953 nat_ip1 = "10.0.0.30"
954 nat_ip2 = "10.0.0.40"
955 self.tcp_port_out = 6303
956 self.udp_port_out = 6304
957 self.icmp_id_out = 6305
959 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
961 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
963 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
965 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
966 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
968 # inside interface VRF match SNAT static mapping VRF
969 pkts = self.create_stream_in(self.pg4, self.pg3)
970 self.pg4.add_stream(pkts)
971 self.pg_enable_capture(self.pg_interfaces)
973 capture = self.pg3.get_capture(len(pkts))
974 self.verify_capture_out(capture, nat_ip1, True)
976 # inside interface VRF don't match SNAT static mapping VRF (packets
978 pkts = self.create_stream_in(self.pg0, self.pg3)
979 self.pg0.add_stream(pkts)
980 self.pg_enable_capture(self.pg_interfaces)
982 self.pg3.assert_nothing_captured()
984 def test_multiple_inside_interfaces(self):
985 """ SNAT multiple inside interfaces (non-overlapping address space) """
987 self.snat_add_address(self.snat_addr)
988 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
989 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
990 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
993 # between two S-NAT inside interfaces (no translation)
994 pkts = self.create_stream_in(self.pg0, self.pg1)
995 self.pg0.add_stream(pkts)
996 self.pg_enable_capture(self.pg_interfaces)
998 capture = self.pg1.get_capture(len(pkts))
999 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1001 # from S-NAT inside to interface without S-NAT feature (no translation)
1002 pkts = self.create_stream_in(self.pg0, self.pg2)
1003 self.pg0.add_stream(pkts)
1004 self.pg_enable_capture(self.pg_interfaces)
1006 capture = self.pg2.get_capture(len(pkts))
1007 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1009 # in2out 1st interface
1010 pkts = self.create_stream_in(self.pg0, self.pg3)
1011 self.pg0.add_stream(pkts)
1012 self.pg_enable_capture(self.pg_interfaces)
1014 capture = self.pg3.get_capture(len(pkts))
1015 self.verify_capture_out(capture)
1017 # out2in 1st interface
1018 pkts = self.create_stream_out(self.pg3)
1019 self.pg3.add_stream(pkts)
1020 self.pg_enable_capture(self.pg_interfaces)
1022 capture = self.pg0.get_capture(len(pkts))
1023 self.verify_capture_in(capture, self.pg0)
1025 # in2out 2nd interface
1026 pkts = self.create_stream_in(self.pg1, self.pg3)
1027 self.pg1.add_stream(pkts)
1028 self.pg_enable_capture(self.pg_interfaces)
1030 capture = self.pg3.get_capture(len(pkts))
1031 self.verify_capture_out(capture)
1033 # out2in 2nd interface
1034 pkts = self.create_stream_out(self.pg3)
1035 self.pg3.add_stream(pkts)
1036 self.pg_enable_capture(self.pg_interfaces)
1038 capture = self.pg1.get_capture(len(pkts))
1039 self.verify_capture_in(capture, self.pg1)
1041 def test_inside_overlapping_interfaces(self):
1042 """ SNAT multiple inside interfaces with overlapping address space """
1044 static_nat_ip = "10.0.0.10"
1045 self.snat_add_address(self.snat_addr)
1046 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1048 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1049 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
1050 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
1051 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1054 # between S-NAT inside interfaces with same VRF (no translation)
1055 pkts = self.create_stream_in(self.pg4, self.pg5)
1056 self.pg4.add_stream(pkts)
1057 self.pg_enable_capture(self.pg_interfaces)
1059 capture = self.pg5.get_capture(len(pkts))
1060 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1062 # between S-NAT inside interfaces with different VRF (hairpinning)
1063 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1064 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1065 TCP(sport=1234, dport=5678))
1066 self.pg4.add_stream(p)
1067 self.pg_enable_capture(self.pg_interfaces)
1069 capture = self.pg6.get_capture(1)
1074 self.assertEqual(ip.src, self.snat_addr)
1075 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1076 self.assertNotEqual(tcp.sport, 1234)
1077 self.assertEqual(tcp.dport, 5678)
1079 self.logger.error(ppp("Unexpected or invalid packet:", p))
1082 # in2out 1st interface
1083 pkts = self.create_stream_in(self.pg4, self.pg3)
1084 self.pg4.add_stream(pkts)
1085 self.pg_enable_capture(self.pg_interfaces)
1087 capture = self.pg3.get_capture(len(pkts))
1088 self.verify_capture_out(capture)
1090 # out2in 1st interface
1091 pkts = self.create_stream_out(self.pg3)
1092 self.pg3.add_stream(pkts)
1093 self.pg_enable_capture(self.pg_interfaces)
1095 capture = self.pg4.get_capture(len(pkts))
1096 self.verify_capture_in(capture, self.pg4)
1098 # in2out 2nd interface
1099 pkts = self.create_stream_in(self.pg5, self.pg3)
1100 self.pg5.add_stream(pkts)
1101 self.pg_enable_capture(self.pg_interfaces)
1103 capture = self.pg3.get_capture(len(pkts))
1104 self.verify_capture_out(capture)
1106 # out2in 2nd interface
1107 pkts = self.create_stream_out(self.pg3)
1108 self.pg3.add_stream(pkts)
1109 self.pg_enable_capture(self.pg_interfaces)
1111 capture = self.pg5.get_capture(len(pkts))
1112 self.verify_capture_in(capture, self.pg5)
1115 addresses = self.vapi.snat_address_dump()
1116 self.assertEqual(len(addresses), 1)
1117 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1118 self.assertEqual(len(sessions), 3)
1119 for session in sessions:
1120 self.assertFalse(session.is_static)
1121 self.assertEqual(session.inside_ip_address[0:4],
1122 self.pg5.remote_ip4n)
1123 self.assertEqual(session.outside_ip_address,
1124 addresses[0].ip_address)
1125 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1126 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1127 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1128 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1129 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1130 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1131 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1132 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1133 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1135 # in2out 3rd interface
1136 pkts = self.create_stream_in(self.pg6, self.pg3)
1137 self.pg6.add_stream(pkts)
1138 self.pg_enable_capture(self.pg_interfaces)
1140 capture = self.pg3.get_capture(len(pkts))
1141 self.verify_capture_out(capture, static_nat_ip, True)
1143 # out2in 3rd interface
1144 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1145 self.pg3.add_stream(pkts)
1146 self.pg_enable_capture(self.pg_interfaces)
1148 capture = self.pg6.get_capture(len(pkts))
1149 self.verify_capture_in(capture, self.pg6)
1151 # general user and session dump verifications
1152 users = self.vapi.snat_user_dump()
1153 self.assertTrue(len(users) >= 3)
1154 addresses = self.vapi.snat_address_dump()
1155 self.assertEqual(len(addresses), 1)
1157 sessions = self.vapi.snat_user_session_dump(user.ip_address,
1159 for session in sessions:
1160 self.assertEqual(user.ip_address, session.inside_ip_address)
1161 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1162 self.assertTrue(session.protocol in
1163 [IP_PROTOS.tcp, IP_PROTOS.udp,
1167 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1168 self.assertTrue(len(sessions) >= 4)
1169 for session in sessions:
1170 self.assertFalse(session.is_static)
1171 self.assertEqual(session.inside_ip_address[0:4],
1172 self.pg4.remote_ip4n)
1173 self.assertEqual(session.outside_ip_address,
1174 addresses[0].ip_address)
1177 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1178 self.assertTrue(len(sessions) >= 3)
1179 for session in sessions:
1180 self.assertTrue(session.is_static)
1181 self.assertEqual(session.inside_ip_address[0:4],
1182 self.pg6.remote_ip4n)
1183 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1184 map(int, static_nat_ip.split('.')))
1185 self.assertTrue(session.inside_port in
1186 [self.tcp_port_in, self.udp_port_in,
1189 def test_hairpinning(self):
1190 """ SNAT hairpinning - 1:1 NAT with port"""
1192 host = self.pg0.remote_hosts[0]
1193 server = self.pg0.remote_hosts[1]
1196 server_in_port = 5678
1197 server_out_port = 8765
1199 self.snat_add_address(self.snat_addr)
1200 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1201 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1203 # add static mapping for server
1204 self.snat_add_static_mapping(server.ip4, self.snat_addr,
1205 server_in_port, server_out_port,
1206 proto=IP_PROTOS.tcp)
1208 # send packet from host to server
1209 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1210 IP(src=host.ip4, dst=self.snat_addr) /
1211 TCP(sport=host_in_port, dport=server_out_port))
1212 self.pg0.add_stream(p)
1213 self.pg_enable_capture(self.pg_interfaces)
1215 capture = self.pg0.get_capture(1)
1220 self.assertEqual(ip.src, self.snat_addr)
1221 self.assertEqual(ip.dst, server.ip4)
1222 self.assertNotEqual(tcp.sport, host_in_port)
1223 self.assertEqual(tcp.dport, server_in_port)
1224 self.check_tcp_checksum(p)
1225 host_out_port = tcp.sport
1227 self.logger.error(ppp("Unexpected or invalid packet:", p))
1230 # send reply from server to host
1231 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1232 IP(src=server.ip4, dst=self.snat_addr) /
1233 TCP(sport=server_in_port, dport=host_out_port))
1234 self.pg0.add_stream(p)
1235 self.pg_enable_capture(self.pg_interfaces)
1237 capture = self.pg0.get_capture(1)
1242 self.assertEqual(ip.src, self.snat_addr)
1243 self.assertEqual(ip.dst, host.ip4)
1244 self.assertEqual(tcp.sport, server_out_port)
1245 self.assertEqual(tcp.dport, host_in_port)
1246 self.check_tcp_checksum(p)
1248 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1251 def test_hairpinning2(self):
1252 """ SNAT hairpinning - 1:1 NAT"""
1254 server1_nat_ip = "10.0.0.10"
1255 server2_nat_ip = "10.0.0.11"
1256 host = self.pg0.remote_hosts[0]
1257 server1 = self.pg0.remote_hosts[1]
1258 server2 = self.pg0.remote_hosts[2]
1259 server_tcp_port = 22
1260 server_udp_port = 20
1262 self.snat_add_address(self.snat_addr)
1263 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1264 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1267 # add static mapping for servers
1268 self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1269 self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1273 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1274 IP(src=host.ip4, dst=server1_nat_ip) /
1275 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1277 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1278 IP(src=host.ip4, dst=server1_nat_ip) /
1279 UDP(sport=self.udp_port_in, dport=server_udp_port))
1281 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1282 IP(src=host.ip4, dst=server1_nat_ip) /
1283 ICMP(id=self.icmp_id_in, type='echo-request'))
1285 self.pg0.add_stream(pkts)
1286 self.pg_enable_capture(self.pg_interfaces)
1288 capture = self.pg0.get_capture(len(pkts))
1289 for packet in capture:
1291 self.assertEqual(packet[IP].src, self.snat_addr)
1292 self.assertEqual(packet[IP].dst, server1.ip4)
1293 if packet.haslayer(TCP):
1294 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1295 self.assertEqual(packet[TCP].dport, server_tcp_port)
1296 self.tcp_port_out = packet[TCP].sport
1297 self.check_tcp_checksum(packet)
1298 elif packet.haslayer(UDP):
1299 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1300 self.assertEqual(packet[UDP].dport, server_udp_port)
1301 self.udp_port_out = packet[UDP].sport
1303 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1304 self.icmp_id_out = packet[ICMP].id
1306 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1311 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1312 IP(src=server1.ip4, dst=self.snat_addr) /
1313 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1315 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1316 IP(src=server1.ip4, dst=self.snat_addr) /
1317 UDP(sport=server_udp_port, dport=self.udp_port_out))
1319 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1320 IP(src=server1.ip4, dst=self.snat_addr) /
1321 ICMP(id=self.icmp_id_out, type='echo-reply'))
1323 self.pg0.add_stream(pkts)
1324 self.pg_enable_capture(self.pg_interfaces)
1326 capture = self.pg0.get_capture(len(pkts))
1327 for packet in capture:
1329 self.assertEqual(packet[IP].src, server1_nat_ip)
1330 self.assertEqual(packet[IP].dst, host.ip4)
1331 if packet.haslayer(TCP):
1332 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1333 self.assertEqual(packet[TCP].sport, server_tcp_port)
1334 self.check_tcp_checksum(packet)
1335 elif packet.haslayer(UDP):
1336 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1337 self.assertEqual(packet[UDP].sport, server_udp_port)
1339 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1341 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1344 # server2 to server1
1346 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1347 IP(src=server2.ip4, dst=server1_nat_ip) /
1348 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1350 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1351 IP(src=server2.ip4, dst=server1_nat_ip) /
1352 UDP(sport=self.udp_port_in, dport=server_udp_port))
1354 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1355 IP(src=server2.ip4, dst=server1_nat_ip) /
1356 ICMP(id=self.icmp_id_in, type='echo-request'))
1358 self.pg0.add_stream(pkts)
1359 self.pg_enable_capture(self.pg_interfaces)
1361 capture = self.pg0.get_capture(len(pkts))
1362 for packet in capture:
1364 self.assertEqual(packet[IP].src, server2_nat_ip)
1365 self.assertEqual(packet[IP].dst, server1.ip4)
1366 if packet.haslayer(TCP):
1367 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1368 self.assertEqual(packet[TCP].dport, server_tcp_port)
1369 self.tcp_port_out = packet[TCP].sport
1370 self.check_tcp_checksum(packet)
1371 elif packet.haslayer(UDP):
1372 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1373 self.assertEqual(packet[UDP].dport, server_udp_port)
1374 self.udp_port_out = packet[UDP].sport
1376 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1377 self.icmp_id_out = packet[ICMP].id
1379 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1382 # server1 to server2
1384 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1385 IP(src=server1.ip4, dst=server2_nat_ip) /
1386 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1388 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1389 IP(src=server1.ip4, dst=server2_nat_ip) /
1390 UDP(sport=server_udp_port, dport=self.udp_port_out))
1392 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1393 IP(src=server1.ip4, dst=server2_nat_ip) /
1394 ICMP(id=self.icmp_id_out, type='echo-reply'))
1396 self.pg0.add_stream(pkts)
1397 self.pg_enable_capture(self.pg_interfaces)
1399 capture = self.pg0.get_capture(len(pkts))
1400 for packet in capture:
1402 self.assertEqual(packet[IP].src, server1_nat_ip)
1403 self.assertEqual(packet[IP].dst, server2.ip4)
1404 if packet.haslayer(TCP):
1405 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1406 self.assertEqual(packet[TCP].sport, server_tcp_port)
1407 self.check_tcp_checksum(packet)
1408 elif packet.haslayer(UDP):
1409 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1410 self.assertEqual(packet[UDP].sport, server_udp_port)
1412 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1414 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1417 def test_max_translations_per_user(self):
1418 """ MAX translations per user - recycle the least recently used """
1420 self.snat_add_address(self.snat_addr)
1421 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1422 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1425 # get maximum number of translations per user
1426 snat_config = self.vapi.snat_show_config()
1428 # send more than maximum number of translations per user packets
1429 pkts_num = snat_config.max_translations_per_user + 5
1431 for port in range(0, pkts_num):
1432 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1433 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1434 TCP(sport=1025 + port))
1436 self.pg0.add_stream(pkts)
1437 self.pg_enable_capture(self.pg_interfaces)
1440 # verify number of translated packet
1441 self.pg1.get_capture(pkts_num)
1443 def test_interface_addr(self):
1444 """ Acquire SNAT addresses from interface """
1445 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1447 # no address in NAT pool
1448 adresses = self.vapi.snat_address_dump()
1449 self.assertEqual(0, len(adresses))
1451 # configure interface address and check NAT address pool
1452 self.pg7.config_ip4()
1453 adresses = self.vapi.snat_address_dump()
1454 self.assertEqual(1, len(adresses))
1455 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1457 # remove interface address and check NAT address pool
1458 self.pg7.unconfig_ip4()
1459 adresses = self.vapi.snat_address_dump()
1460 self.assertEqual(0, len(adresses))
1462 def test_interface_addr_static_mapping(self):
1463 """ Static mapping with addresses from interface """
1464 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1465 self.snat_add_static_mapping('1.2.3.4',
1466 external_sw_if_index=self.pg7.sw_if_index)
1468 # static mappings with external interface
1469 static_mappings = self.vapi.snat_static_mapping_dump()
1470 self.assertEqual(1, len(static_mappings))
1471 self.assertEqual(self.pg7.sw_if_index,
1472 static_mappings[0].external_sw_if_index)
1474 # configure interface address and check static mappings
1475 self.pg7.config_ip4()
1476 static_mappings = self.vapi.snat_static_mapping_dump()
1477 self.assertEqual(1, len(static_mappings))
1478 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1479 self.pg7.local_ip4n)
1480 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1482 # remove interface address and check static mappings
1483 self.pg7.unconfig_ip4()
1484 static_mappings = self.vapi.snat_static_mapping_dump()
1485 self.assertEqual(0, len(static_mappings))
1487 def test_ipfix_nat44_sess(self):
1488 """ S-NAT IPFIX logging NAT44 session created/delted """
1489 self.ipfix_domain_id = 10
1490 self.ipfix_src_port = 20202
1491 colector_port = 30303
1492 bind_layers(UDP, IPFIX, dport=30303)
1493 self.snat_add_address(self.snat_addr)
1494 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1495 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1497 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1498 src_address=self.pg3.local_ip4n,
1500 template_interval=10,
1501 collector_port=colector_port)
1502 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1503 src_port=self.ipfix_src_port)
1505 pkts = self.create_stream_in(self.pg0, self.pg1)
1506 self.pg0.add_stream(pkts)
1507 self.pg_enable_capture(self.pg_interfaces)
1509 capture = self.pg1.get_capture(len(pkts))
1510 self.verify_capture_out(capture)
1511 self.snat_add_address(self.snat_addr, is_add=0)
1512 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1513 capture = self.pg3.get_capture(3)
1514 ipfix = IPFIXDecoder()
1515 # first load template
1517 self.assertTrue(p.haslayer(IPFIX))
1518 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1519 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1520 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1521 self.assertEqual(p[UDP].dport, colector_port)
1522 self.assertEqual(p[IPFIX].observationDomainID,
1523 self.ipfix_domain_id)
1524 if p.haslayer(Template):
1525 ipfix.add_template(p.getlayer(Template))
1526 # verify events in data set
1528 if p.haslayer(Data):
1529 data = ipfix.decode_data_set(p.getlayer(Set))
1530 self.verify_ipfix_nat44_ses(data)
1532 def test_ipfix_addr_exhausted(self):
1533 """ S-NAT IPFIX logging NAT addresses exhausted """
1534 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1535 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1537 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1538 src_address=self.pg3.local_ip4n,
1540 template_interval=10)
1541 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1542 src_port=self.ipfix_src_port)
1544 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1545 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1547 self.pg0.add_stream(p)
1548 self.pg_enable_capture(self.pg_interfaces)
1550 capture = self.pg1.get_capture(0)
1551 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1552 capture = self.pg3.get_capture(3)
1553 ipfix = IPFIXDecoder()
1554 # first load template
1556 self.assertTrue(p.haslayer(IPFIX))
1557 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1558 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1559 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1560 self.assertEqual(p[UDP].dport, 4739)
1561 self.assertEqual(p[IPFIX].observationDomainID,
1562 self.ipfix_domain_id)
1563 if p.haslayer(Template):
1564 ipfix.add_template(p.getlayer(Template))
1565 # verify events in data set
1567 if p.haslayer(Data):
1568 data = ipfix.decode_data_set(p.getlayer(Set))
1569 self.verify_ipfix_addr_exhausted(data)
1571 def test_pool_addr_fib(self):
1572 """ S-NAT add pool addresses to FIB """
1573 static_addr = '10.0.0.10'
1574 self.snat_add_address(self.snat_addr)
1575 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1576 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1578 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1581 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1582 ARP(op=ARP.who_has, pdst=self.snat_addr,
1583 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1584 self.pg1.add_stream(p)
1585 self.pg_enable_capture(self.pg_interfaces)
1587 capture = self.pg1.get_capture(1)
1588 self.assertTrue(capture[0].haslayer(ARP))
1589 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1592 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1593 ARP(op=ARP.who_has, pdst=static_addr,
1594 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1595 self.pg1.add_stream(p)
1596 self.pg_enable_capture(self.pg_interfaces)
1598 capture = self.pg1.get_capture(1)
1599 self.assertTrue(capture[0].haslayer(ARP))
1600 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1602 # send ARP to non-SNAT interface
1603 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1604 ARP(op=ARP.who_has, pdst=self.snat_addr,
1605 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1606 self.pg2.add_stream(p)
1607 self.pg_enable_capture(self.pg_interfaces)
1609 capture = self.pg1.get_capture(0)
1611 # remove addresses and verify
1612 self.snat_add_address(self.snat_addr, is_add=0)
1613 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1616 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1617 ARP(op=ARP.who_has, pdst=self.snat_addr,
1618 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1619 self.pg1.add_stream(p)
1620 self.pg_enable_capture(self.pg_interfaces)
1622 capture = self.pg1.get_capture(0)
1624 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1625 ARP(op=ARP.who_has, pdst=static_addr,
1626 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1627 self.pg1.add_stream(p)
1628 self.pg_enable_capture(self.pg_interfaces)
1630 capture = self.pg1.get_capture(0)
1632 def test_vrf_mode(self):
1633 """ S-NAT tenant VRF aware address pool mode """
1637 nat_ip1 = "10.0.0.10"
1638 nat_ip2 = "10.0.0.11"
1640 self.pg0.unconfig_ip4()
1641 self.pg1.unconfig_ip4()
1642 self.pg0.set_table_ip4(vrf_id1)
1643 self.pg1.set_table_ip4(vrf_id2)
1644 self.pg0.config_ip4()
1645 self.pg1.config_ip4()
1647 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1648 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1649 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1650 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1651 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1655 pkts = self.create_stream_in(self.pg0, self.pg2)
1656 self.pg0.add_stream(pkts)
1657 self.pg_enable_capture(self.pg_interfaces)
1659 capture = self.pg2.get_capture(len(pkts))
1660 self.verify_capture_out(capture, nat_ip1)
1663 pkts = self.create_stream_in(self.pg1, self.pg2)
1664 self.pg1.add_stream(pkts)
1665 self.pg_enable_capture(self.pg_interfaces)
1667 capture = self.pg2.get_capture(len(pkts))
1668 self.verify_capture_out(capture, nat_ip2)
1670 def test_vrf_feature_independent(self):
1671 """ S-NAT tenant VRF independent address pool mode """
1673 nat_ip1 = "10.0.0.10"
1674 nat_ip2 = "10.0.0.11"
1676 self.snat_add_address(nat_ip1)
1677 self.snat_add_address(nat_ip2)
1678 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1679 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1680 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1684 pkts = self.create_stream_in(self.pg0, self.pg2)
1685 self.pg0.add_stream(pkts)
1686 self.pg_enable_capture(self.pg_interfaces)
1688 capture = self.pg2.get_capture(len(pkts))
1689 self.verify_capture_out(capture, nat_ip1)
1692 pkts = self.create_stream_in(self.pg1, self.pg2)
1693 self.pg1.add_stream(pkts)
1694 self.pg_enable_capture(self.pg_interfaces)
1696 capture = self.pg2.get_capture(len(pkts))
1697 self.verify_capture_out(capture, nat_ip1)
1699 def test_dynamic_ipless_interfaces(self):
1700 """ SNAT interfaces without configured ip dynamic map """
1702 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1703 self.pg7.remote_mac,
1704 self.pg7.remote_ip4n,
1706 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1707 self.pg8.remote_mac,
1708 self.pg8.remote_ip4n,
1711 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1712 dst_address_length=32,
1713 next_hop_address=self.pg7.remote_ip4n,
1714 next_hop_sw_if_index=self.pg7.sw_if_index)
1715 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1716 dst_address_length=32,
1717 next_hop_address=self.pg8.remote_ip4n,
1718 next_hop_sw_if_index=self.pg8.sw_if_index)
1720 self.snat_add_address(self.snat_addr)
1721 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1722 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1726 pkts = self.create_stream_in(self.pg7, self.pg8)
1727 self.pg7.add_stream(pkts)
1728 self.pg_enable_capture(self.pg_interfaces)
1730 capture = self.pg8.get_capture(len(pkts))
1731 self.verify_capture_out(capture)
1734 pkts = self.create_stream_out(self.pg8, self.snat_addr)
1735 self.pg8.add_stream(pkts)
1736 self.pg_enable_capture(self.pg_interfaces)
1738 capture = self.pg7.get_capture(len(pkts))
1739 self.verify_capture_in(capture, self.pg7)
1741 def test_static_ipless_interfaces(self):
1742 """ SNAT 1:1 NAT interfaces without configured ip """
1744 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1745 self.pg7.remote_mac,
1746 self.pg7.remote_ip4n,
1748 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1749 self.pg8.remote_mac,
1750 self.pg8.remote_ip4n,
1753 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1754 dst_address_length=32,
1755 next_hop_address=self.pg7.remote_ip4n,
1756 next_hop_sw_if_index=self.pg7.sw_if_index)
1757 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1758 dst_address_length=32,
1759 next_hop_address=self.pg8.remote_ip4n,
1760 next_hop_sw_if_index=self.pg8.sw_if_index)
1762 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1763 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1764 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1768 pkts = self.create_stream_out(self.pg8)
1769 self.pg8.add_stream(pkts)
1770 self.pg_enable_capture(self.pg_interfaces)
1772 capture = self.pg7.get_capture(len(pkts))
1773 self.verify_capture_in(capture, self.pg7)
1776 pkts = self.create_stream_in(self.pg7, self.pg8)
1777 self.pg7.add_stream(pkts)
1778 self.pg_enable_capture(self.pg_interfaces)
1780 capture = self.pg8.get_capture(len(pkts))
1781 self.verify_capture_out(capture, self.snat_addr, True)
1783 def test_static_with_port_ipless_interfaces(self):
1784 """ SNAT 1:1 NAT with port interfaces without configured ip """
1786 self.tcp_port_out = 30606
1787 self.udp_port_out = 30607
1788 self.icmp_id_out = 30608
1790 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1791 self.pg7.remote_mac,
1792 self.pg7.remote_ip4n,
1794 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1795 self.pg8.remote_mac,
1796 self.pg8.remote_ip4n,
1799 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1800 dst_address_length=32,
1801 next_hop_address=self.pg7.remote_ip4n,
1802 next_hop_sw_if_index=self.pg7.sw_if_index)
1803 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1804 dst_address_length=32,
1805 next_hop_address=self.pg8.remote_ip4n,
1806 next_hop_sw_if_index=self.pg8.sw_if_index)
1808 self.snat_add_address(self.snat_addr)
1809 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1810 self.tcp_port_in, self.tcp_port_out,
1811 proto=IP_PROTOS.tcp)
1812 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1813 self.udp_port_in, self.udp_port_out,
1814 proto=IP_PROTOS.udp)
1815 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1816 self.icmp_id_in, self.icmp_id_out,
1817 proto=IP_PROTOS.icmp)
1818 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1819 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1823 pkts = self.create_stream_out(self.pg8)
1824 self.pg8.add_stream(pkts)
1825 self.pg_enable_capture(self.pg_interfaces)
1827 capture = self.pg7.get_capture(len(pkts))
1828 self.verify_capture_in(capture, self.pg7)
1831 pkts = self.create_stream_in(self.pg7, self.pg8)
1832 self.pg7.add_stream(pkts)
1833 self.pg_enable_capture(self.pg_interfaces)
1835 capture = self.pg8.get_capture(len(pkts))
1836 self.verify_capture_out(capture)
1838 def test_static_unknown_proto(self):
1839 """ 1:1 NAT translate packet with unknown protocol """
1840 nat_ip = "10.0.0.10"
1841 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1842 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1843 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1847 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1848 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1850 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1851 TCP(sport=1234, dport=1234))
1852 self.pg0.add_stream(p)
1853 self.pg_enable_capture(self.pg_interfaces)
1855 p = self.pg1.get_capture(1)
1858 self.assertEqual(packet[IP].src, nat_ip)
1859 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1860 self.assertTrue(packet.haslayer(GRE))
1861 self.check_ip_checksum(packet)
1863 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1867 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1868 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1870 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1871 TCP(sport=1234, dport=1234))
1872 self.pg1.add_stream(p)
1873 self.pg_enable_capture(self.pg_interfaces)
1875 p = self.pg0.get_capture(1)
1878 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
1879 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
1880 self.assertTrue(packet.haslayer(GRE))
1881 self.check_ip_checksum(packet)
1883 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1886 def test_hairpinning_unknown_proto(self):
1887 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
1889 host = self.pg0.remote_hosts[0]
1890 server = self.pg0.remote_hosts[1]
1892 host_nat_ip = "10.0.0.10"
1893 server_nat_ip = "10.0.0.11"
1895 self.snat_add_static_mapping(host.ip4, host_nat_ip)
1896 self.snat_add_static_mapping(server.ip4, server_nat_ip)
1897 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1898 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1902 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
1903 IP(src=host.ip4, dst=server_nat_ip) /
1905 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1906 TCP(sport=1234, dport=1234))
1907 self.pg0.add_stream(p)
1908 self.pg_enable_capture(self.pg_interfaces)
1910 p = self.pg0.get_capture(1)
1913 self.assertEqual(packet[IP].src, host_nat_ip)
1914 self.assertEqual(packet[IP].dst, server.ip4)
1915 self.assertTrue(packet.haslayer(GRE))
1916 self.check_ip_checksum(packet)
1918 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1922 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
1923 IP(src=server.ip4, dst=host_nat_ip) /
1925 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1926 TCP(sport=1234, dport=1234))
1927 self.pg0.add_stream(p)
1928 self.pg_enable_capture(self.pg_interfaces)
1930 p = self.pg0.get_capture(1)
1933 self.assertEqual(packet[IP].src, server_nat_ip)
1934 self.assertEqual(packet[IP].dst, host.ip4)
1935 self.assertTrue(packet.haslayer(GRE))
1936 self.check_ip_checksum(packet)
1938 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1942 super(TestSNAT, self).tearDown()
1943 if not self.vpp_dead:
1944 self.logger.info(self.vapi.cli("show snat verbose"))
1948 class TestDeterministicNAT(MethodHolder):
1949 """ Deterministic NAT Test Cases """
1952 def setUpConstants(cls):
1953 super(TestDeterministicNAT, cls).setUpConstants()
1954 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1957 def setUpClass(cls):
1958 super(TestDeterministicNAT, cls).setUpClass()
1961 cls.tcp_port_in = 6303
1962 cls.tcp_external_port = 6303
1963 cls.udp_port_in = 6304
1964 cls.udp_external_port = 6304
1965 cls.icmp_id_in = 6305
1966 cls.snat_addr = '10.0.0.3'
1968 cls.create_pg_interfaces(range(3))
1969 cls.interfaces = list(cls.pg_interfaces)
1971 for i in cls.interfaces:
1976 cls.pg0.generate_remote_hosts(2)
1977 cls.pg0.configure_ipv4_neighbors()
1980 super(TestDeterministicNAT, cls).tearDownClass()
1983 def create_stream_in(self, in_if, out_if, ttl=64):
1985 Create packet stream for inside network
1987 :param in_if: Inside interface
1988 :param out_if: Outside interface
1989 :param ttl: TTL of generated packets
1993 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1994 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1995 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1999 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2000 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2001 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2005 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2006 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2007 ICMP(id=self.icmp_id_in, type='echo-request'))
2012 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2014 Create packet stream for outside network
2016 :param out_if: Outside interface
2017 :param dst_ip: Destination IP address (Default use global SNAT address)
2018 :param ttl: TTL of generated packets
2021 dst_ip = self.snat_addr
2024 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2025 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2026 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2030 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2031 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2032 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2036 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2037 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2038 ICMP(id=self.icmp_external_id, type='echo-reply'))
2043 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2045 Verify captured packets on outside network
2047 :param capture: Captured packets
2048 :param nat_ip: Translated IP address (Default use global SNAT address)
2049 :param same_port: Sorce port number is not translated (Default False)
2050 :param packet_num: Expected number of packets (Default 3)
2053 nat_ip = self.snat_addr
2054 self.assertEqual(packet_num, len(capture))
2055 for packet in capture:
2057 self.assertEqual(packet[IP].src, nat_ip)
2058 if packet.haslayer(TCP):
2059 self.tcp_port_out = packet[TCP].sport
2060 elif packet.haslayer(UDP):
2061 self.udp_port_out = packet[UDP].sport
2063 self.icmp_external_id = packet[ICMP].id
2065 self.logger.error(ppp("Unexpected or invalid packet "
2066 "(outside network):", packet))
2069 def initiate_tcp_session(self, in_if, out_if):
2071 Initiates TCP session
2073 :param in_if: Inside interface
2074 :param out_if: Outside interface
2077 # SYN packet in->out
2078 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2079 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2080 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2083 self.pg_enable_capture(self.pg_interfaces)
2085 capture = out_if.get_capture(1)
2087 self.tcp_port_out = p[TCP].sport
2089 # SYN + ACK packet out->in
2090 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2091 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
2092 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2094 out_if.add_stream(p)
2095 self.pg_enable_capture(self.pg_interfaces)
2097 in_if.get_capture(1)
2099 # ACK packet in->out
2100 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2101 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2102 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2105 self.pg_enable_capture(self.pg_interfaces)
2107 out_if.get_capture(1)
2110 self.logger.error("TCP 3 way handshake failed")
2113 def verify_ipfix_max_entries_per_user(self, data):
2115 Verify IPFIX maximum entries per user exceeded event
2117 :param data: Decoded IPFIX data records
2119 self.assertEqual(1, len(data))
2122 self.assertEqual(ord(record[230]), 13)
2123 # natQuotaExceededEvent
2124 self.assertEqual('\x03\x00\x00\x00', record[466])
2126 self.assertEqual(self.pg0.remote_ip4n, record[8])
2128 def test_deterministic_mode(self):
2129 """ S-NAT run deterministic mode """
2130 in_addr = '172.16.255.0'
2131 out_addr = '172.17.255.50'
2132 in_addr_t = '172.16.255.20'
2133 in_addr_n = socket.inet_aton(in_addr)
2134 out_addr_n = socket.inet_aton(out_addr)
2135 in_addr_t_n = socket.inet_aton(in_addr_t)
2139 snat_config = self.vapi.snat_show_config()
2140 self.assertEqual(1, snat_config.deterministic)
2142 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
2144 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
2145 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2146 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
2147 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2149 deterministic_mappings = self.vapi.snat_det_map_dump()
2150 self.assertEqual(len(deterministic_mappings), 1)
2151 dsm = deterministic_mappings[0]
2152 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2153 self.assertEqual(in_plen, dsm.in_plen)
2154 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2155 self.assertEqual(out_plen, dsm.out_plen)
2158 deterministic_mappings = self.vapi.snat_det_map_dump()
2159 self.assertEqual(len(deterministic_mappings), 0)
2161 def test_set_timeouts(self):
2162 """ Set deterministic NAT timeouts """
2163 timeouts_before = self.vapi.snat_det_get_timeouts()
2165 self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
2166 timeouts_before.tcp_established + 10,
2167 timeouts_before.tcp_transitory + 10,
2168 timeouts_before.icmp + 10)
2170 timeouts_after = self.vapi.snat_det_get_timeouts()
2172 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2173 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2174 self.assertNotEqual(timeouts_before.tcp_established,
2175 timeouts_after.tcp_established)
2176 self.assertNotEqual(timeouts_before.tcp_transitory,
2177 timeouts_after.tcp_transitory)
2179 def test_det_in(self):
2180 """ CGNAT translation test (TCP, UDP, ICMP) """
2182 nat_ip = "10.0.0.10"
2184 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2186 socket.inet_aton(nat_ip),
2188 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2189 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2193 pkts = self.create_stream_in(self.pg0, self.pg1)
2194 self.pg0.add_stream(pkts)
2195 self.pg_enable_capture(self.pg_interfaces)
2197 capture = self.pg1.get_capture(len(pkts))
2198 self.verify_capture_out(capture, nat_ip)
2201 pkts = self.create_stream_out(self.pg1, nat_ip)
2202 self.pg1.add_stream(pkts)
2203 self.pg_enable_capture(self.pg_interfaces)
2205 capture = self.pg0.get_capture(len(pkts))
2206 self.verify_capture_in(capture, self.pg0)
2209 sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2210 self.assertEqual(len(sessions), 3)
2214 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2215 self.assertEqual(s.in_port, self.tcp_port_in)
2216 self.assertEqual(s.out_port, self.tcp_port_out)
2217 self.assertEqual(s.ext_port, self.tcp_external_port)
2221 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2222 self.assertEqual(s.in_port, self.udp_port_in)
2223 self.assertEqual(s.out_port, self.udp_port_out)
2224 self.assertEqual(s.ext_port, self.udp_external_port)
2228 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2229 self.assertEqual(s.in_port, self.icmp_id_in)
2230 self.assertEqual(s.out_port, self.icmp_external_id)
2232 def test_multiple_users(self):
2233 """ CGNAT multiple users """
2235 nat_ip = "10.0.0.10"
2237 external_port = 6303
2239 host0 = self.pg0.remote_hosts[0]
2240 host1 = self.pg0.remote_hosts[1]
2242 self.vapi.snat_add_det_map(host0.ip4n,
2244 socket.inet_aton(nat_ip),
2246 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2247 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2251 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2252 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2253 TCP(sport=port_in, dport=external_port))
2254 self.pg0.add_stream(p)
2255 self.pg_enable_capture(self.pg_interfaces)
2257 capture = self.pg1.get_capture(1)
2262 self.assertEqual(ip.src, nat_ip)
2263 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2264 self.assertEqual(tcp.dport, external_port)
2265 port_out0 = tcp.sport
2267 self.logger.error(ppp("Unexpected or invalid packet:", p))
2271 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2272 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2273 TCP(sport=port_in, dport=external_port))
2274 self.pg0.add_stream(p)
2275 self.pg_enable_capture(self.pg_interfaces)
2277 capture = self.pg1.get_capture(1)
2282 self.assertEqual(ip.src, nat_ip)
2283 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2284 self.assertEqual(tcp.dport, external_port)
2285 port_out1 = tcp.sport
2287 self.logger.error(ppp("Unexpected or invalid packet:", p))
2290 dms = self.vapi.snat_det_map_dump()
2291 self.assertEqual(1, len(dms))
2292 self.assertEqual(2, dms[0].ses_num)
2295 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2296 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2297 TCP(sport=external_port, dport=port_out0))
2298 self.pg1.add_stream(p)
2299 self.pg_enable_capture(self.pg_interfaces)
2301 capture = self.pg0.get_capture(1)
2306 self.assertEqual(ip.src, self.pg1.remote_ip4)
2307 self.assertEqual(ip.dst, host0.ip4)
2308 self.assertEqual(tcp.dport, port_in)
2309 self.assertEqual(tcp.sport, external_port)
2311 self.logger.error(ppp("Unexpected or invalid packet:", p))
2315 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2316 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2317 TCP(sport=external_port, dport=port_out1))
2318 self.pg1.add_stream(p)
2319 self.pg_enable_capture(self.pg_interfaces)
2321 capture = self.pg0.get_capture(1)
2326 self.assertEqual(ip.src, self.pg1.remote_ip4)
2327 self.assertEqual(ip.dst, host1.ip4)
2328 self.assertEqual(tcp.dport, port_in)
2329 self.assertEqual(tcp.sport, external_port)
2331 self.logger.error(ppp("Unexpected or invalid packet", p))
2334 # session close api test
2335 self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2337 self.pg1.remote_ip4n,
2339 dms = self.vapi.snat_det_map_dump()
2340 self.assertEqual(dms[0].ses_num, 1)
2342 self.vapi.snat_det_close_session_in(host0.ip4n,
2344 self.pg1.remote_ip4n,
2346 dms = self.vapi.snat_det_map_dump()
2347 self.assertEqual(dms[0].ses_num, 0)
2349 def test_tcp_session_close_detection_in(self):
2350 """ CGNAT TCP session close initiated from inside network """
2351 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2353 socket.inet_aton(self.snat_addr),
2355 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2356 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2359 self.initiate_tcp_session(self.pg0, self.pg1)
2361 # close the session from inside
2363 # FIN packet in -> out
2364 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2365 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2366 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2368 self.pg0.add_stream(p)
2369 self.pg_enable_capture(self.pg_interfaces)
2371 self.pg1.get_capture(1)
2375 # ACK packet out -> in
2376 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2377 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2378 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2382 # FIN packet out -> in
2383 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2384 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2385 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2389 self.pg1.add_stream(pkts)
2390 self.pg_enable_capture(self.pg_interfaces)
2392 self.pg0.get_capture(2)
2394 # ACK packet in -> out
2395 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2396 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2397 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2399 self.pg0.add_stream(p)
2400 self.pg_enable_capture(self.pg_interfaces)
2402 self.pg1.get_capture(1)
2404 # Check if snat closed the session
2405 dms = self.vapi.snat_det_map_dump()
2406 self.assertEqual(0, dms[0].ses_num)
2408 self.logger.error("TCP session termination failed")
2411 def test_tcp_session_close_detection_out(self):
2412 """ CGNAT TCP session close initiated from outside network """
2413 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2415 socket.inet_aton(self.snat_addr),
2417 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2418 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2421 self.initiate_tcp_session(self.pg0, self.pg1)
2423 # close the session from outside
2425 # FIN packet out -> in
2426 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2427 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2428 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2430 self.pg1.add_stream(p)
2431 self.pg_enable_capture(self.pg_interfaces)
2433 self.pg0.get_capture(1)
2437 # ACK packet in -> out
2438 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2439 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2440 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2444 # ACK packet in -> out
2445 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2446 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2447 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2451 self.pg0.add_stream(pkts)
2452 self.pg_enable_capture(self.pg_interfaces)
2454 self.pg1.get_capture(2)
2456 # ACK packet out -> in
2457 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2458 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2459 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2461 self.pg1.add_stream(p)
2462 self.pg_enable_capture(self.pg_interfaces)
2464 self.pg0.get_capture(1)
2466 # Check if snat closed the session
2467 dms = self.vapi.snat_det_map_dump()
2468 self.assertEqual(0, dms[0].ses_num)
2470 self.logger.error("TCP session termination failed")
2473 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2474 def test_session_timeout(self):
2475 """ CGNAT session timeouts """
2476 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2478 socket.inet_aton(self.snat_addr),
2480 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2481 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2484 self.initiate_tcp_session(self.pg0, self.pg1)
2485 self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2486 pkts = self.create_stream_in(self.pg0, self.pg1)
2487 self.pg0.add_stream(pkts)
2488 self.pg_enable_capture(self.pg_interfaces)
2490 capture = self.pg1.get_capture(len(pkts))
2493 dms = self.vapi.snat_det_map_dump()
2494 self.assertEqual(0, dms[0].ses_num)
2496 def test_session_limit_per_user(self):
2497 """ CGNAT maximum 1000 sessions per user should be created """
2498 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2500 socket.inet_aton(self.snat_addr),
2502 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2503 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2505 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2506 src_address=self.pg2.local_ip4n,
2508 template_interval=10)
2509 self.vapi.snat_ipfix()
2512 for port in range(1025, 2025):
2513 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2514 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2515 UDP(sport=port, dport=port))
2518 self.pg0.add_stream(pkts)
2519 self.pg_enable_capture(self.pg_interfaces)
2521 capture = self.pg1.get_capture(len(pkts))
2523 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2524 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2525 UDP(sport=3001, dport=3002))
2526 self.pg0.add_stream(p)
2527 self.pg_enable_capture(self.pg_interfaces)
2529 capture = self.pg1.assert_nothing_captured()
2531 # verify ICMP error packet
2532 capture = self.pg0.get_capture(1)
2534 self.assertTrue(p.haslayer(ICMP))
2536 self.assertEqual(icmp.type, 3)
2537 self.assertEqual(icmp.code, 1)
2538 self.assertTrue(icmp.haslayer(IPerror))
2539 inner_ip = icmp[IPerror]
2540 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2541 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2543 dms = self.vapi.snat_det_map_dump()
2545 self.assertEqual(1000, dms[0].ses_num)
2547 # verify IPFIX logging
2548 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2549 capture = self.pg2.get_capture(2)
2550 ipfix = IPFIXDecoder()
2551 # first load template
2553 self.assertTrue(p.haslayer(IPFIX))
2554 if p.haslayer(Template):
2555 ipfix.add_template(p.getlayer(Template))
2556 # verify events in data set
2558 if p.haslayer(Data):
2559 data = ipfix.decode_data_set(p.getlayer(Set))
2560 self.verify_ipfix_max_entries_per_user(data)
2562 def clear_snat(self):
2564 Clear SNAT configuration.
2566 self.vapi.snat_ipfix(enable=0)
2567 self.vapi.snat_det_set_timeouts()
2568 deterministic_mappings = self.vapi.snat_det_map_dump()
2569 for dsm in deterministic_mappings:
2570 self.vapi.snat_add_det_map(dsm.in_addr,
2576 interfaces = self.vapi.snat_interface_dump()
2577 for intf in interfaces:
2578 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2583 super(TestDeterministicNAT, self).tearDown()
2584 if not self.vpp_dead:
2585 self.logger.info(self.vapi.cli("show snat detail"))
2589 class TestNAT64(MethodHolder):
2590 """ NAT64 Test Cases """
2593 def setUpClass(cls):
2594 super(TestNAT64, cls).setUpClass()
2597 cls.tcp_port_in = 6303
2598 cls.tcp_port_out = 6303
2599 cls.udp_port_in = 6304
2600 cls.udp_port_out = 6304
2601 cls.icmp_id_in = 6305
2602 cls.icmp_id_out = 6305
2603 cls.nat_addr = '10.0.0.3'
2604 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2606 cls.vrf1_nat_addr = '10.0.10.3'
2607 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2610 cls.create_pg_interfaces(range(3))
2611 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2612 cls.ip6_interfaces.append(cls.pg_interfaces[2])
2613 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2615 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2617 cls.pg0.generate_remote_hosts(2)
2619 for i in cls.ip6_interfaces:
2622 i.configure_ipv6_neighbors()
2624 for i in cls.ip4_interfaces:
2630 super(TestNAT64, cls).tearDownClass()
2633 def test_pool(self):
2634 """ Add/delete address to NAT64 pool """
2635 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2637 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2639 addresses = self.vapi.nat64_pool_addr_dump()
2640 self.assertEqual(len(addresses), 1)
2641 self.assertEqual(addresses[0].address, nat_addr)
2643 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2645 addresses = self.vapi.nat64_pool_addr_dump()
2646 self.assertEqual(len(addresses), 0)
2648 def test_interface(self):
2649 """ Enable/disable NAT64 feature on the interface """
2650 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2651 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2653 interfaces = self.vapi.nat64_interface_dump()
2654 self.assertEqual(len(interfaces), 2)
2657 for intf in interfaces:
2658 if intf.sw_if_index == self.pg0.sw_if_index:
2659 self.assertEqual(intf.is_inside, 1)
2661 elif intf.sw_if_index == self.pg1.sw_if_index:
2662 self.assertEqual(intf.is_inside, 0)
2664 self.assertTrue(pg0_found)
2665 self.assertTrue(pg1_found)
2667 features = self.vapi.cli("show interface features pg0")
2668 self.assertNotEqual(features.find('nat64-in2out'), -1)
2669 features = self.vapi.cli("show interface features pg1")
2670 self.assertNotEqual(features.find('nat64-out2in'), -1)
2672 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2673 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2675 interfaces = self.vapi.nat64_interface_dump()
2676 self.assertEqual(len(interfaces), 0)
2678 def test_static_bib(self):
2679 """ Add/delete static BIB entry """
2680 in_addr = socket.inet_pton(socket.AF_INET6,
2681 '2001:db8:85a3::8a2e:370:7334')
2682 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2685 proto = IP_PROTOS.tcp
2687 self.vapi.nat64_add_del_static_bib(in_addr,
2692 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2697 self.assertEqual(bibe.i_addr, in_addr)
2698 self.assertEqual(bibe.o_addr, out_addr)
2699 self.assertEqual(bibe.i_port, in_port)
2700 self.assertEqual(bibe.o_port, out_port)
2701 self.assertEqual(static_bib_num, 1)
2703 self.vapi.nat64_add_del_static_bib(in_addr,
2709 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2714 self.assertEqual(static_bib_num, 0)
2716 def test_set_timeouts(self):
2717 """ Set NAT64 timeouts """
2718 # verify default values
2719 timeouts = self.vapi.nat64_get_timeouts()
2720 self.assertEqual(timeouts.udp, 300)
2721 self.assertEqual(timeouts.icmp, 60)
2722 self.assertEqual(timeouts.tcp_trans, 240)
2723 self.assertEqual(timeouts.tcp_est, 7440)
2724 self.assertEqual(timeouts.tcp_incoming_syn, 6)
2726 # set and verify custom values
2727 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2728 tcp_est=7450, tcp_incoming_syn=10)
2729 timeouts = self.vapi.nat64_get_timeouts()
2730 self.assertEqual(timeouts.udp, 200)
2731 self.assertEqual(timeouts.icmp, 30)
2732 self.assertEqual(timeouts.tcp_trans, 250)
2733 self.assertEqual(timeouts.tcp_est, 7450)
2734 self.assertEqual(timeouts.tcp_incoming_syn, 10)
2736 def test_dynamic(self):
2737 """ NAT64 dynamic translation test """
2738 self.tcp_port_in = 6303
2739 self.udp_port_in = 6304
2740 self.icmp_id_in = 6305
2742 ses_num_start = self.nat64_get_ses_num()
2744 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2746 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2747 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2750 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2751 self.pg0.add_stream(pkts)
2752 self.pg_enable_capture(self.pg_interfaces)
2754 capture = self.pg1.get_capture(len(pkts))
2755 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2756 dst_ip=self.pg1.remote_ip4)
2759 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2760 self.pg1.add_stream(pkts)
2761 self.pg_enable_capture(self.pg_interfaces)
2763 capture = self.pg0.get_capture(len(pkts))
2764 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2765 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2768 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2769 self.pg0.add_stream(pkts)
2770 self.pg_enable_capture(self.pg_interfaces)
2772 capture = self.pg1.get_capture(len(pkts))
2773 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2774 dst_ip=self.pg1.remote_ip4)
2777 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2778 self.pg1.add_stream(pkts)
2779 self.pg_enable_capture(self.pg_interfaces)
2781 capture = self.pg0.get_capture(len(pkts))
2782 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2784 ses_num_end = self.nat64_get_ses_num()
2786 self.assertEqual(ses_num_end - ses_num_start, 3)
2788 # tenant with specific VRF
2789 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
2790 self.vrf1_nat_addr_n,
2791 vrf_id=self.vrf1_id)
2792 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
2794 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
2795 self.pg2.add_stream(pkts)
2796 self.pg_enable_capture(self.pg_interfaces)
2798 capture = self.pg1.get_capture(len(pkts))
2799 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
2800 dst_ip=self.pg1.remote_ip4)
2802 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
2803 self.pg1.add_stream(pkts)
2804 self.pg_enable_capture(self.pg_interfaces)
2806 capture = self.pg2.get_capture(len(pkts))
2807 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
2809 def test_static(self):
2810 """ NAT64 static translation test """
2811 self.tcp_port_in = 60303
2812 self.udp_port_in = 60304
2813 self.icmp_id_in = 60305
2814 self.tcp_port_out = 60303
2815 self.udp_port_out = 60304
2816 self.icmp_id_out = 60305
2818 ses_num_start = self.nat64_get_ses_num()
2820 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2822 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2823 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2825 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2830 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2835 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2842 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2843 self.pg0.add_stream(pkts)
2844 self.pg_enable_capture(self.pg_interfaces)
2846 capture = self.pg1.get_capture(len(pkts))
2847 self.verify_capture_out(capture, nat_ip=self.nat_addr,
2848 dst_ip=self.pg1.remote_ip4, same_port=True)
2851 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2852 self.pg1.add_stream(pkts)
2853 self.pg_enable_capture(self.pg_interfaces)
2855 capture = self.pg0.get_capture(len(pkts))
2856 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2857 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2859 ses_num_end = self.nat64_get_ses_num()
2861 self.assertEqual(ses_num_end - ses_num_start, 3)
2863 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2864 def test_session_timeout(self):
2865 """ NAT64 session timeout """
2866 self.icmp_id_in = 1234
2867 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2869 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2870 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2871 self.vapi.nat64_set_timeouts(icmp=5)
2873 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2874 self.pg0.add_stream(pkts)
2875 self.pg_enable_capture(self.pg_interfaces)
2877 capture = self.pg1.get_capture(len(pkts))
2879 ses_num_before_timeout = self.nat64_get_ses_num()
2883 # ICMP session after timeout
2884 ses_num_after_timeout = self.nat64_get_ses_num()
2885 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2887 def test_icmp_error(self):
2888 """ NAT64 ICMP Error message translation """
2889 self.tcp_port_in = 6303
2890 self.udp_port_in = 6304
2891 self.icmp_id_in = 6305
2893 ses_num_start = self.nat64_get_ses_num()
2895 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2897 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2898 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2900 # send some packets to create sessions
2901 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2902 self.pg0.add_stream(pkts)
2903 self.pg_enable_capture(self.pg_interfaces)
2905 capture_ip4 = self.pg1.get_capture(len(pkts))
2906 self.verify_capture_out(capture_ip4,
2907 nat_ip=self.nat_addr,
2908 dst_ip=self.pg1.remote_ip4)
2910 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2911 self.pg1.add_stream(pkts)
2912 self.pg_enable_capture(self.pg_interfaces)
2914 capture_ip6 = self.pg0.get_capture(len(pkts))
2915 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2916 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
2917 self.pg0.remote_ip6)
2920 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2921 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
2922 ICMPv6DestUnreach(code=1) /
2923 packet[IPv6] for packet in capture_ip6]
2924 self.pg0.add_stream(pkts)
2925 self.pg_enable_capture(self.pg_interfaces)
2927 capture = self.pg1.get_capture(len(pkts))
2928 for packet in capture:
2930 self.assertEqual(packet[IP].src, self.nat_addr)
2931 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2932 self.assertEqual(packet[ICMP].type, 3)
2933 self.assertEqual(packet[ICMP].code, 13)
2934 inner = packet[IPerror]
2935 self.assertEqual(inner.src, self.pg1.remote_ip4)
2936 self.assertEqual(inner.dst, self.nat_addr)
2937 self.check_icmp_checksum(packet)
2938 if inner.haslayer(TCPerror):
2939 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
2940 elif inner.haslayer(UDPerror):
2941 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
2943 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
2945 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2949 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2950 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2951 ICMP(type=3, code=13) /
2952 packet[IP] for packet in capture_ip4]
2953 self.pg1.add_stream(pkts)
2954 self.pg_enable_capture(self.pg_interfaces)
2956 capture = self.pg0.get_capture(len(pkts))
2957 for packet in capture:
2959 self.assertEqual(packet[IPv6].src, ip.src)
2960 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
2961 icmp = packet[ICMPv6DestUnreach]
2962 self.assertEqual(icmp.code, 1)
2963 inner = icmp[IPerror6]
2964 self.assertEqual(inner.src, self.pg0.remote_ip6)
2965 self.assertEqual(inner.dst, ip.src)
2966 self.check_icmpv6_checksum(packet)
2967 if inner.haslayer(TCPerror):
2968 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
2969 elif inner.haslayer(UDPerror):
2970 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
2972 self.assertEqual(inner[ICMPv6EchoRequest].id,
2975 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2978 def test_hairpinning(self):
2979 """ NAT64 hairpinning """
2981 client = self.pg0.remote_hosts[0]
2982 server = self.pg0.remote_hosts[1]
2983 server_tcp_in_port = 22
2984 server_tcp_out_port = 4022
2985 server_udp_in_port = 23
2986 server_udp_out_port = 4023
2987 client_tcp_in_port = 1234
2988 client_udp_in_port = 1235
2989 client_tcp_out_port = 0
2990 client_udp_out_port = 0
2991 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
2992 nat_addr_ip6 = ip.src
2994 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2996 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2997 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2999 self.vapi.nat64_add_del_static_bib(server.ip6n,
3002 server_tcp_out_port,
3004 self.vapi.nat64_add_del_static_bib(server.ip6n,
3007 server_udp_out_port,
3012 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3013 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3014 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3016 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3017 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3018 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3020 self.pg0.add_stream(pkts)
3021 self.pg_enable_capture(self.pg_interfaces)
3023 capture = self.pg0.get_capture(len(pkts))
3024 for packet in capture:
3026 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3027 self.assertEqual(packet[IPv6].dst, server.ip6)
3028 if packet.haslayer(TCP):
3029 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3030 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3031 self.check_tcp_checksum(packet)
3032 client_tcp_out_port = packet[TCP].sport
3034 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3035 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3036 self.check_udp_checksum(packet)
3037 client_udp_out_port = packet[UDP].sport
3039 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3044 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3045 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3046 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3048 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3049 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3050 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3052 self.pg0.add_stream(pkts)
3053 self.pg_enable_capture(self.pg_interfaces)
3055 capture = self.pg0.get_capture(len(pkts))
3056 for packet in capture:
3058 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3059 self.assertEqual(packet[IPv6].dst, client.ip6)
3060 if packet.haslayer(TCP):
3061 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3062 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3063 self.check_tcp_checksum(packet)
3065 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3066 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3067 self.check_udp_checksum(packet)
3069 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3074 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3075 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3076 ICMPv6DestUnreach(code=1) /
3077 packet[IPv6] for packet in capture]
3078 self.pg0.add_stream(pkts)
3079 self.pg_enable_capture(self.pg_interfaces)
3081 capture = self.pg0.get_capture(len(pkts))
3082 for packet in capture:
3084 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3085 self.assertEqual(packet[IPv6].dst, server.ip6)
3086 icmp = packet[ICMPv6DestUnreach]
3087 self.assertEqual(icmp.code, 1)
3088 inner = icmp[IPerror6]
3089 self.assertEqual(inner.src, server.ip6)
3090 self.assertEqual(inner.dst, nat_addr_ip6)
3091 self.check_icmpv6_checksum(packet)
3092 if inner.haslayer(TCPerror):
3093 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3094 self.assertEqual(inner[TCPerror].dport,
3095 client_tcp_out_port)
3097 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3098 self.assertEqual(inner[UDPerror].dport,
3099 client_udp_out_port)
3101 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3104 def nat64_get_ses_num(self):
3106 Return number of active NAT64 sessions.
3109 st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
3111 st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
3113 st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
3117 def clear_nat64(self):
3119 Clear NAT64 configuration.
3121 self.vapi.nat64_set_timeouts()
3123 interfaces = self.vapi.nat64_interface_dump()
3124 for intf in interfaces:
3125 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3129 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3132 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3140 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3143 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3151 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3154 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3162 adresses = self.vapi.nat64_pool_addr_dump()
3163 for addr in adresses:
3164 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3170 super(TestNAT64, self).tearDown()
3171 if not self.vpp_dead:
3172 self.logger.info(self.vapi.cli("show nat64 pool"))
3173 self.logger.info(self.vapi.cli("show nat64 interfaces"))
3174 self.logger.info(self.vapi.cli("show nat64 bib tcp"))
3175 self.logger.info(self.vapi.cli("show nat64 bib udp"))
3176 self.logger.info(self.vapi.cli("show nat64 bib icmp"))
3177 self.logger.info(self.vapi.cli("show nat64 session table tcp"))
3178 self.logger.info(self.vapi.cli("show nat64 session table udp"))
3179 self.logger.info(self.vapi.cli("show nat64 session table icmp"))
3182 if __name__ == '__main__':
3183 unittest.main(testRunner=VppTestRunner)