7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.l2 import Ether, ARP
11 from scapy.data import IP_PROTOS
12 from scapy.packet import bind_layers
14 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 from time import sleep
18 class MethodHolder(VppTestCase):
19 """ SNAT create capture and verify method holder """
23 super(MethodHolder, cls).setUpClass()
26 super(MethodHolder, self).tearDown()
28 def create_stream_in(self, in_if, out_if, ttl=64):
30 Create packet stream for inside network
32 :param in_if: Inside interface
33 :param out_if: Outside interface
34 :param ttl: TTL of generated packets
38 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
39 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
40 TCP(sport=self.tcp_port_in))
44 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
45 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
46 UDP(sport=self.udp_port_in))
50 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
51 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
52 ICMP(id=self.icmp_id_in, type='echo-request'))
57 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
59 Create packet stream for outside network
61 :param out_if: Outside interface
62 :param dst_ip: Destination IP address (Default use global SNAT address)
63 :param ttl: TTL of generated packets
66 dst_ip = self.snat_addr
69 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
70 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
71 TCP(dport=self.tcp_port_out))
75 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
76 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
77 UDP(dport=self.udp_port_out))
81 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
82 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
83 ICMP(id=self.icmp_id_out, type='echo-reply'))
88 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
91 Verify captured packets on outside network
93 :param capture: Captured packets
94 :param nat_ip: Translated IP address (Default use global SNAT address)
95 :param same_port: Sorce port number is not translated (Default False)
96 :param packet_num: Expected number of packets (Default 3)
99 nat_ip = self.snat_addr
100 self.assertEqual(packet_num, len(capture))
101 for packet in capture:
103 self.assertEqual(packet[IP].src, nat_ip)
104 if packet.haslayer(TCP):
106 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
109 packet[TCP].sport, self.tcp_port_in)
110 self.tcp_port_out = packet[TCP].sport
111 elif packet.haslayer(UDP):
113 self.assertEqual(packet[UDP].sport, self.udp_port_in)
116 packet[UDP].sport, self.udp_port_in)
117 self.udp_port_out = packet[UDP].sport
120 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
122 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
123 self.icmp_id_out = packet[ICMP].id
125 self.logger.error(ppp("Unexpected or invalid packet "
126 "(outside network):", packet))
129 def verify_capture_in(self, capture, in_if, packet_num=3):
131 Verify captured packets on inside network
133 :param capture: Captured packets
134 :param in_if: Inside interface
135 :param packet_num: Expected number of packets (Default 3)
137 self.assertEqual(packet_num, len(capture))
138 for packet in capture:
140 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
141 if packet.haslayer(TCP):
142 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
143 elif packet.haslayer(UDP):
144 self.assertEqual(packet[UDP].dport, self.udp_port_in)
146 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
148 self.logger.error(ppp("Unexpected or invalid packet "
149 "(inside network):", packet))
152 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
154 Verify captured packet that don't have to be translated
156 :param capture: Captured packets
157 :param ingress_if: Ingress interface
158 :param egress_if: Egress interface
160 for packet in capture:
162 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
163 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
164 if packet.haslayer(TCP):
165 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
166 elif packet.haslayer(UDP):
167 self.assertEqual(packet[UDP].sport, self.udp_port_in)
169 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
171 self.logger.error(ppp("Unexpected or invalid packet "
172 "(inside network):", packet))
175 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
176 packet_num=3, icmp_type=11):
178 Verify captured packets with ICMP errors on outside network
180 :param capture: Captured packets
181 :param src_ip: Translated IP address or IP address of VPP
182 (Default use global SNAT address)
183 :param packet_num: Expected number of packets (Default 3)
184 :param icmp_type: Type of error ICMP packet
185 we are expecting (Default 11)
188 src_ip = self.snat_addr
189 self.assertEqual(packet_num, len(capture))
190 for packet in capture:
192 self.assertEqual(packet[IP].src, src_ip)
193 self.assertTrue(packet.haslayer(ICMP))
195 self.assertEqual(icmp.type, icmp_type)
196 self.assertTrue(icmp.haslayer(IPerror))
197 inner_ip = icmp[IPerror]
198 if inner_ip.haslayer(TCPerror):
199 self.assertEqual(inner_ip[TCPerror].dport,
201 elif inner_ip.haslayer(UDPerror):
202 self.assertEqual(inner_ip[UDPerror].dport,
205 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
207 self.logger.error(ppp("Unexpected or invalid packet "
208 "(outside network):", packet))
211 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
214 Verify captured packets with ICMP errors on inside network
216 :param capture: Captured packets
217 :param in_if: Inside interface
218 :param packet_num: Expected number of packets (Default 3)
219 :param icmp_type: Type of error ICMP packet
220 we are expecting (Default 11)
222 self.assertEqual(packet_num, len(capture))
223 for packet in capture:
225 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
226 self.assertTrue(packet.haslayer(ICMP))
228 self.assertEqual(icmp.type, icmp_type)
229 self.assertTrue(icmp.haslayer(IPerror))
230 inner_ip = icmp[IPerror]
231 if inner_ip.haslayer(TCPerror):
232 self.assertEqual(inner_ip[TCPerror].sport,
234 elif inner_ip.haslayer(UDPerror):
235 self.assertEqual(inner_ip[UDPerror].sport,
238 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
240 self.logger.error(ppp("Unexpected or invalid packet "
241 "(inside network):", packet))
244 def verify_ipfix_nat44_ses(self, data):
246 Verify IPFIX NAT44 session create/delete event
248 :param data: Decoded IPFIX data records
250 nat44_ses_create_num = 0
251 nat44_ses_delete_num = 0
252 self.assertEqual(6, len(data))
255 self.assertIn(ord(record[230]), [4, 5])
256 if ord(record[230]) == 4:
257 nat44_ses_create_num += 1
259 nat44_ses_delete_num += 1
261 self.assertEqual(self.pg0.remote_ip4n, record[8])
262 # postNATSourceIPv4Address
263 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
266 self.assertEqual(struct.pack("!I", 0), record[234])
267 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
268 if IP_PROTOS.icmp == ord(record[4]):
269 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
270 self.assertEqual(struct.pack("!H", self.icmp_id_out),
272 elif IP_PROTOS.tcp == ord(record[4]):
273 self.assertEqual(struct.pack("!H", self.tcp_port_in),
275 self.assertEqual(struct.pack("!H", self.tcp_port_out),
277 elif IP_PROTOS.udp == ord(record[4]):
278 self.assertEqual(struct.pack("!H", self.udp_port_in),
280 self.assertEqual(struct.pack("!H", self.udp_port_out),
283 self.fail("Invalid protocol")
284 self.assertEqual(3, nat44_ses_create_num)
285 self.assertEqual(3, nat44_ses_delete_num)
287 def verify_ipfix_addr_exhausted(self, data):
289 Verify IPFIX NAT addresses event
291 :param data: Decoded IPFIX data records
293 self.assertEqual(1, len(data))
296 self.assertEqual(ord(record[230]), 3)
298 self.assertEqual(struct.pack("!I", 0), record[283])
301 class TestSNAT(MethodHolder):
302 """ SNAT Test Cases """
306 super(TestSNAT, cls).setUpClass()
309 cls.tcp_port_in = 6303
310 cls.tcp_port_out = 6303
311 cls.udp_port_in = 6304
312 cls.udp_port_out = 6304
313 cls.icmp_id_in = 6305
314 cls.icmp_id_out = 6305
315 cls.snat_addr = '10.0.0.3'
316 cls.ipfix_src_port = 4739
317 cls.ipfix_domain_id = 1
319 cls.create_pg_interfaces(range(9))
320 cls.interfaces = list(cls.pg_interfaces[0:4])
322 for i in cls.interfaces:
327 cls.pg0.generate_remote_hosts(2)
328 cls.pg0.configure_ipv4_neighbors()
330 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
332 cls.pg4._local_ip4 = "172.16.255.1"
333 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
334 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
335 cls.pg4.set_table_ip4(10)
336 cls.pg5._local_ip4 = "172.16.255.3"
337 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
338 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
339 cls.pg5.set_table_ip4(10)
340 cls.pg6._local_ip4 = "172.16.255.1"
341 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
342 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
343 cls.pg6.set_table_ip4(20)
344 for i in cls.overlapping_interfaces:
353 super(TestSNAT, cls).tearDownClass()
356 def clear_snat(self):
358 Clear SNAT configuration.
360 # I found no elegant way to do this
361 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
362 dst_address_length=32,
363 next_hop_address=self.pg7.remote_ip4n,
364 next_hop_sw_if_index=self.pg7.sw_if_index,
366 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
367 dst_address_length=32,
368 next_hop_address=self.pg8.remote_ip4n,
369 next_hop_sw_if_index=self.pg8.sw_if_index,
372 for intf in [self.pg7, self.pg8]:
373 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
375 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
380 if self.pg7.has_ip4_config:
381 self.pg7.unconfig_ip4()
383 interfaces = self.vapi.snat_interface_addr_dump()
384 for intf in interfaces:
385 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
387 self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
388 domain_id=self.ipfix_domain_id)
389 self.ipfix_src_port = 4739
390 self.ipfix_domain_id = 1
392 interfaces = self.vapi.snat_interface_dump()
393 for intf in interfaces:
394 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
398 static_mappings = self.vapi.snat_static_mapping_dump()
399 for sm in static_mappings:
400 self.vapi.snat_add_static_mapping(sm.local_ip_address,
401 sm.external_ip_address,
402 local_port=sm.local_port,
403 external_port=sm.external_port,
404 addr_only=sm.addr_only,
406 protocol=sm.protocol,
409 adresses = self.vapi.snat_address_dump()
410 for addr in adresses:
411 self.vapi.snat_add_address_range(addr.ip_address,
415 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
416 local_port=0, external_port=0, vrf_id=0,
417 is_add=1, external_sw_if_index=0xFFFFFFFF,
420 Add/delete S-NAT static mapping
422 :param local_ip: Local IP address
423 :param external_ip: External IP address
424 :param local_port: Local port number (Optional)
425 :param external_port: External port number (Optional)
426 :param vrf_id: VRF ID (Default 0)
427 :param is_add: 1 if add, 0 if delete (Default add)
428 :param external_sw_if_index: External interface instead of IP address
429 :param proto: IP protocol (Mandatory if port specified)
432 if local_port and external_port:
434 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
435 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
436 self.vapi.snat_add_static_mapping(
439 external_sw_if_index,
447 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
449 Add/delete S-NAT address
451 :param ip: IP address
452 :param is_add: 1 if add, 0 if delete (Default add)
454 snat_addr = socket.inet_pton(socket.AF_INET, ip)
455 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
458 def test_dynamic(self):
459 """ SNAT dynamic translation test """
461 self.snat_add_address(self.snat_addr)
462 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
463 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
467 pkts = self.create_stream_in(self.pg0, self.pg1)
468 self.pg0.add_stream(pkts)
469 self.pg_enable_capture(self.pg_interfaces)
471 capture = self.pg1.get_capture(len(pkts))
472 self.verify_capture_out(capture)
475 pkts = self.create_stream_out(self.pg1)
476 self.pg1.add_stream(pkts)
477 self.pg_enable_capture(self.pg_interfaces)
479 capture = self.pg0.get_capture(len(pkts))
480 self.verify_capture_in(capture, self.pg0)
482 def test_dynamic_icmp_errors_in2out_ttl_1(self):
483 """ SNAT handling of client packets with TTL=1 """
485 self.snat_add_address(self.snat_addr)
486 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
487 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
490 # Client side - generate traffic
491 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
492 self.pg0.add_stream(pkts)
493 self.pg_enable_capture(self.pg_interfaces)
496 # Client side - verify ICMP type 11 packets
497 capture = self.pg0.get_capture(len(pkts))
498 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
500 def test_dynamic_icmp_errors_out2in_ttl_1(self):
501 """ SNAT handling of server packets with TTL=1 """
503 self.snat_add_address(self.snat_addr)
504 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
505 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
508 # Client side - create sessions
509 pkts = self.create_stream_in(self.pg0, self.pg1)
510 self.pg0.add_stream(pkts)
511 self.pg_enable_capture(self.pg_interfaces)
514 # Server side - generate traffic
515 capture = self.pg1.get_capture(len(pkts))
516 self.verify_capture_out(capture)
517 pkts = self.create_stream_out(self.pg1, ttl=1)
518 self.pg1.add_stream(pkts)
519 self.pg_enable_capture(self.pg_interfaces)
522 # Server side - verify ICMP type 11 packets
523 capture = self.pg1.get_capture(len(pkts))
524 self.verify_capture_out_with_icmp_errors(capture,
525 src_ip=self.pg1.local_ip4)
527 def test_dynamic_icmp_errors_in2out_ttl_2(self):
528 """ SNAT handling of error responses to client packets with TTL=2 """
530 self.snat_add_address(self.snat_addr)
531 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
532 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
535 # Client side - generate traffic
536 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
537 self.pg0.add_stream(pkts)
538 self.pg_enable_capture(self.pg_interfaces)
541 # Server side - simulate ICMP type 11 response
542 capture = self.pg1.get_capture(len(pkts))
543 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
544 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
545 ICMP(type=11) / packet[IP] for packet in capture]
546 self.pg1.add_stream(pkts)
547 self.pg_enable_capture(self.pg_interfaces)
550 # Client side - verify ICMP type 11 packets
551 capture = self.pg0.get_capture(len(pkts))
552 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
554 def test_dynamic_icmp_errors_out2in_ttl_2(self):
555 """ SNAT handling of error responses to server packets with TTL=2 """
557 self.snat_add_address(self.snat_addr)
558 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
559 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
562 # Client side - create sessions
563 pkts = self.create_stream_in(self.pg0, self.pg1)
564 self.pg0.add_stream(pkts)
565 self.pg_enable_capture(self.pg_interfaces)
568 # Server side - generate traffic
569 capture = self.pg1.get_capture(len(pkts))
570 self.verify_capture_out(capture)
571 pkts = self.create_stream_out(self.pg1, ttl=2)
572 self.pg1.add_stream(pkts)
573 self.pg_enable_capture(self.pg_interfaces)
576 # Client side - simulate ICMP type 11 response
577 capture = self.pg0.get_capture(len(pkts))
578 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
579 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
580 ICMP(type=11) / packet[IP] for packet in capture]
581 self.pg0.add_stream(pkts)
582 self.pg_enable_capture(self.pg_interfaces)
585 # Server side - verify ICMP type 11 packets
586 capture = self.pg1.get_capture(len(pkts))
587 self.verify_capture_out_with_icmp_errors(capture)
589 def test_ping_out_interface_from_outside(self):
590 """ Ping SNAT out interface from outside network """
592 self.snat_add_address(self.snat_addr)
593 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
594 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
597 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
598 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
599 ICMP(id=self.icmp_id_out, type='echo-request'))
601 self.pg1.add_stream(pkts)
602 self.pg_enable_capture(self.pg_interfaces)
604 capture = self.pg1.get_capture(len(pkts))
605 self.assertEqual(1, len(capture))
608 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
609 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
610 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
611 self.assertEqual(packet[ICMP].type, 0) # echo reply
613 self.logger.error(ppp("Unexpected or invalid packet "
614 "(outside network):", packet))
617 def test_ping_internal_host_from_outside(self):
618 """ Ping internal host from outside network """
620 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
621 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
622 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
626 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
627 IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
628 ICMP(id=self.icmp_id_out, type='echo-request'))
629 self.pg1.add_stream(pkt)
630 self.pg_enable_capture(self.pg_interfaces)
632 capture = self.pg0.get_capture(1)
633 self.verify_capture_in(capture, self.pg0, packet_num=1)
634 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
637 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
638 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
639 ICMP(id=self.icmp_id_in, type='echo-reply'))
640 self.pg0.add_stream(pkt)
641 self.pg_enable_capture(self.pg_interfaces)
643 capture = self.pg1.get_capture(1)
644 self.verify_capture_out(capture, same_port=True, packet_num=1)
645 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
647 def test_static_in(self):
648 """ SNAT 1:1 NAT initialized from inside network """
651 self.tcp_port_out = 6303
652 self.udp_port_out = 6304
653 self.icmp_id_out = 6305
655 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
656 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
657 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
661 pkts = self.create_stream_in(self.pg0, self.pg1)
662 self.pg0.add_stream(pkts)
663 self.pg_enable_capture(self.pg_interfaces)
665 capture = self.pg1.get_capture(len(pkts))
666 self.verify_capture_out(capture, nat_ip, True)
669 pkts = self.create_stream_out(self.pg1, nat_ip)
670 self.pg1.add_stream(pkts)
671 self.pg_enable_capture(self.pg_interfaces)
673 capture = self.pg0.get_capture(len(pkts))
674 self.verify_capture_in(capture, self.pg0)
676 def test_static_out(self):
677 """ SNAT 1:1 NAT initialized from outside network """
680 self.tcp_port_out = 6303
681 self.udp_port_out = 6304
682 self.icmp_id_out = 6305
684 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
685 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
686 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
690 pkts = self.create_stream_out(self.pg1, nat_ip)
691 self.pg1.add_stream(pkts)
692 self.pg_enable_capture(self.pg_interfaces)
694 capture = self.pg0.get_capture(len(pkts))
695 self.verify_capture_in(capture, self.pg0)
698 pkts = self.create_stream_in(self.pg0, self.pg1)
699 self.pg0.add_stream(pkts)
700 self.pg_enable_capture(self.pg_interfaces)
702 capture = self.pg1.get_capture(len(pkts))
703 self.verify_capture_out(capture, nat_ip, True)
705 def test_static_with_port_in(self):
706 """ SNAT 1:1 NAT with port initialized from inside network """
708 self.tcp_port_out = 3606
709 self.udp_port_out = 3607
710 self.icmp_id_out = 3608
712 self.snat_add_address(self.snat_addr)
713 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
714 self.tcp_port_in, self.tcp_port_out,
716 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
717 self.udp_port_in, self.udp_port_out,
719 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
720 self.icmp_id_in, self.icmp_id_out,
721 proto=IP_PROTOS.icmp)
722 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
723 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
727 pkts = self.create_stream_in(self.pg0, self.pg1)
728 self.pg0.add_stream(pkts)
729 self.pg_enable_capture(self.pg_interfaces)
731 capture = self.pg1.get_capture(len(pkts))
732 self.verify_capture_out(capture)
735 pkts = self.create_stream_out(self.pg1)
736 self.pg1.add_stream(pkts)
737 self.pg_enable_capture(self.pg_interfaces)
739 capture = self.pg0.get_capture(len(pkts))
740 self.verify_capture_in(capture, self.pg0)
742 def test_static_with_port_out(self):
743 """ SNAT 1:1 NAT with port initialized from outside network """
745 self.tcp_port_out = 30606
746 self.udp_port_out = 30607
747 self.icmp_id_out = 30608
749 self.snat_add_address(self.snat_addr)
750 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
751 self.tcp_port_in, self.tcp_port_out,
753 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
754 self.udp_port_in, self.udp_port_out,
756 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
757 self.icmp_id_in, self.icmp_id_out,
758 proto=IP_PROTOS.icmp)
759 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
760 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
764 pkts = self.create_stream_out(self.pg1)
765 self.pg1.add_stream(pkts)
766 self.pg_enable_capture(self.pg_interfaces)
768 capture = self.pg0.get_capture(len(pkts))
769 self.verify_capture_in(capture, self.pg0)
772 pkts = self.create_stream_in(self.pg0, self.pg1)
773 self.pg0.add_stream(pkts)
774 self.pg_enable_capture(self.pg_interfaces)
776 capture = self.pg1.get_capture(len(pkts))
777 self.verify_capture_out(capture)
779 def test_static_vrf_aware(self):
780 """ SNAT 1:1 NAT VRF awareness """
782 nat_ip1 = "10.0.0.30"
783 nat_ip2 = "10.0.0.40"
784 self.tcp_port_out = 6303
785 self.udp_port_out = 6304
786 self.icmp_id_out = 6305
788 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
790 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
792 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
794 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
795 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
797 # inside interface VRF match SNAT static mapping VRF
798 pkts = self.create_stream_in(self.pg4, self.pg3)
799 self.pg4.add_stream(pkts)
800 self.pg_enable_capture(self.pg_interfaces)
802 capture = self.pg3.get_capture(len(pkts))
803 self.verify_capture_out(capture, nat_ip1, True)
805 # inside interface VRF don't match SNAT static mapping VRF (packets
807 pkts = self.create_stream_in(self.pg0, self.pg3)
808 self.pg0.add_stream(pkts)
809 self.pg_enable_capture(self.pg_interfaces)
811 self.pg3.assert_nothing_captured()
813 def test_multiple_inside_interfaces(self):
814 """ SNAT multiple inside interfaces (non-overlapping address space) """
816 self.snat_add_address(self.snat_addr)
817 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
818 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
819 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
822 # between two S-NAT inside interfaces (no translation)
823 pkts = self.create_stream_in(self.pg0, self.pg1)
824 self.pg0.add_stream(pkts)
825 self.pg_enable_capture(self.pg_interfaces)
827 capture = self.pg1.get_capture(len(pkts))
828 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
830 # from S-NAT inside to interface without S-NAT feature (no translation)
831 pkts = self.create_stream_in(self.pg0, self.pg2)
832 self.pg0.add_stream(pkts)
833 self.pg_enable_capture(self.pg_interfaces)
835 capture = self.pg2.get_capture(len(pkts))
836 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
838 # in2out 1st interface
839 pkts = self.create_stream_in(self.pg0, self.pg3)
840 self.pg0.add_stream(pkts)
841 self.pg_enable_capture(self.pg_interfaces)
843 capture = self.pg3.get_capture(len(pkts))
844 self.verify_capture_out(capture)
846 # out2in 1st interface
847 pkts = self.create_stream_out(self.pg3)
848 self.pg3.add_stream(pkts)
849 self.pg_enable_capture(self.pg_interfaces)
851 capture = self.pg0.get_capture(len(pkts))
852 self.verify_capture_in(capture, self.pg0)
854 # in2out 2nd interface
855 pkts = self.create_stream_in(self.pg1, self.pg3)
856 self.pg1.add_stream(pkts)
857 self.pg_enable_capture(self.pg_interfaces)
859 capture = self.pg3.get_capture(len(pkts))
860 self.verify_capture_out(capture)
862 # out2in 2nd interface
863 pkts = self.create_stream_out(self.pg3)
864 self.pg3.add_stream(pkts)
865 self.pg_enable_capture(self.pg_interfaces)
867 capture = self.pg1.get_capture(len(pkts))
868 self.verify_capture_in(capture, self.pg1)
870 def test_inside_overlapping_interfaces(self):
871 """ SNAT multiple inside interfaces with overlapping address space """
873 static_nat_ip = "10.0.0.10"
874 self.snat_add_address(self.snat_addr)
875 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
877 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
878 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
879 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
880 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
883 # between S-NAT inside interfaces with same VRF (no translation)
884 pkts = self.create_stream_in(self.pg4, self.pg5)
885 self.pg4.add_stream(pkts)
886 self.pg_enable_capture(self.pg_interfaces)
888 capture = self.pg5.get_capture(len(pkts))
889 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
891 # between S-NAT inside interfaces with different VRF (hairpinning)
892 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
893 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
894 TCP(sport=1234, dport=5678))
895 self.pg4.add_stream(p)
896 self.pg_enable_capture(self.pg_interfaces)
898 capture = self.pg6.get_capture(1)
903 self.assertEqual(ip.src, self.snat_addr)
904 self.assertEqual(ip.dst, self.pg6.remote_ip4)
905 self.assertNotEqual(tcp.sport, 1234)
906 self.assertEqual(tcp.dport, 5678)
908 self.logger.error(ppp("Unexpected or invalid packet:", p))
911 # in2out 1st interface
912 pkts = self.create_stream_in(self.pg4, self.pg3)
913 self.pg4.add_stream(pkts)
914 self.pg_enable_capture(self.pg_interfaces)
916 capture = self.pg3.get_capture(len(pkts))
917 self.verify_capture_out(capture)
919 # out2in 1st interface
920 pkts = self.create_stream_out(self.pg3)
921 self.pg3.add_stream(pkts)
922 self.pg_enable_capture(self.pg_interfaces)
924 capture = self.pg4.get_capture(len(pkts))
925 self.verify_capture_in(capture, self.pg4)
927 # in2out 2nd interface
928 pkts = self.create_stream_in(self.pg5, self.pg3)
929 self.pg5.add_stream(pkts)
930 self.pg_enable_capture(self.pg_interfaces)
932 capture = self.pg3.get_capture(len(pkts))
933 self.verify_capture_out(capture)
935 # out2in 2nd interface
936 pkts = self.create_stream_out(self.pg3)
937 self.pg3.add_stream(pkts)
938 self.pg_enable_capture(self.pg_interfaces)
940 capture = self.pg5.get_capture(len(pkts))
941 self.verify_capture_in(capture, self.pg5)
944 addresses = self.vapi.snat_address_dump()
945 self.assertEqual(len(addresses), 1)
946 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
947 self.assertEqual(len(sessions), 3)
948 for session in sessions:
949 self.assertFalse(session.is_static)
950 self.assertEqual(session.inside_ip_address[0:4],
951 self.pg5.remote_ip4n)
952 self.assertEqual(session.outside_ip_address,
953 addresses[0].ip_address)
954 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
955 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
956 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
957 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
958 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
959 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
960 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
961 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
962 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
964 # in2out 3rd interface
965 pkts = self.create_stream_in(self.pg6, self.pg3)
966 self.pg6.add_stream(pkts)
967 self.pg_enable_capture(self.pg_interfaces)
969 capture = self.pg3.get_capture(len(pkts))
970 self.verify_capture_out(capture, static_nat_ip, True)
972 # out2in 3rd interface
973 pkts = self.create_stream_out(self.pg3, static_nat_ip)
974 self.pg3.add_stream(pkts)
975 self.pg_enable_capture(self.pg_interfaces)
977 capture = self.pg6.get_capture(len(pkts))
978 self.verify_capture_in(capture, self.pg6)
980 # general user and session dump verifications
981 users = self.vapi.snat_user_dump()
982 self.assertTrue(len(users) >= 3)
983 addresses = self.vapi.snat_address_dump()
984 self.assertEqual(len(addresses), 1)
986 sessions = self.vapi.snat_user_session_dump(user.ip_address,
988 for session in sessions:
989 self.assertEqual(user.ip_address, session.inside_ip_address)
990 self.assertTrue(session.total_bytes > session.total_pkts > 0)
991 self.assertTrue(session.protocol in
992 [IP_PROTOS.tcp, IP_PROTOS.udp,
996 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
997 self.assertTrue(len(sessions) >= 4)
998 for session in sessions:
999 self.assertFalse(session.is_static)
1000 self.assertEqual(session.inside_ip_address[0:4],
1001 self.pg4.remote_ip4n)
1002 self.assertEqual(session.outside_ip_address,
1003 addresses[0].ip_address)
1006 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1007 self.assertTrue(len(sessions) >= 3)
1008 for session in sessions:
1009 self.assertTrue(session.is_static)
1010 self.assertEqual(session.inside_ip_address[0:4],
1011 self.pg6.remote_ip4n)
1012 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1013 map(int, static_nat_ip.split('.')))
1014 self.assertTrue(session.inside_port in
1015 [self.tcp_port_in, self.udp_port_in,
1018 def test_hairpinning(self):
1019 """ SNAT hairpinning """
1021 host = self.pg0.remote_hosts[0]
1022 server = self.pg0.remote_hosts[1]
1025 server_in_port = 5678
1026 server_out_port = 8765
1028 self.snat_add_address(self.snat_addr)
1029 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1030 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1032 # add static mapping for server
1033 self.snat_add_static_mapping(server.ip4, self.snat_addr,
1034 server_in_port, server_out_port,
1035 proto=IP_PROTOS.tcp)
1037 # send packet from host to server
1038 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1039 IP(src=host.ip4, dst=self.snat_addr) /
1040 TCP(sport=host_in_port, dport=server_out_port))
1041 self.pg0.add_stream(p)
1042 self.pg_enable_capture(self.pg_interfaces)
1044 capture = self.pg0.get_capture(1)
1049 self.assertEqual(ip.src, self.snat_addr)
1050 self.assertEqual(ip.dst, server.ip4)
1051 self.assertNotEqual(tcp.sport, host_in_port)
1052 self.assertEqual(tcp.dport, server_in_port)
1053 host_out_port = tcp.sport
1055 self.logger.error(ppp("Unexpected or invalid packet:", p))
1058 # send reply from server to host
1059 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1060 IP(src=server.ip4, dst=self.snat_addr) /
1061 TCP(sport=server_in_port, dport=host_out_port))
1062 self.pg0.add_stream(p)
1063 self.pg_enable_capture(self.pg_interfaces)
1065 capture = self.pg0.get_capture(1)
1070 self.assertEqual(ip.src, self.snat_addr)
1071 self.assertEqual(ip.dst, host.ip4)
1072 self.assertEqual(tcp.sport, server_out_port)
1073 self.assertEqual(tcp.dport, host_in_port)
1075 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1078 def test_max_translations_per_user(self):
1079 """ MAX translations per user - recycle the least recently used """
1081 self.snat_add_address(self.snat_addr)
1082 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1083 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1086 # get maximum number of translations per user
1087 snat_config = self.vapi.snat_show_config()
1089 # send more than maximum number of translations per user packets
1090 pkts_num = snat_config.max_translations_per_user + 5
1092 for port in range(0, pkts_num):
1093 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1094 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1095 TCP(sport=1025 + port))
1097 self.pg0.add_stream(pkts)
1098 self.pg_enable_capture(self.pg_interfaces)
1101 # verify number of translated packet
1102 self.pg1.get_capture(pkts_num)
1104 def test_interface_addr(self):
1105 """ Acquire SNAT addresses from interface """
1106 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1108 # no address in NAT pool
1109 adresses = self.vapi.snat_address_dump()
1110 self.assertEqual(0, len(adresses))
1112 # configure interface address and check NAT address pool
1113 self.pg7.config_ip4()
1114 adresses = self.vapi.snat_address_dump()
1115 self.assertEqual(1, len(adresses))
1116 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1118 # remove interface address and check NAT address pool
1119 self.pg7.unconfig_ip4()
1120 adresses = self.vapi.snat_address_dump()
1121 self.assertEqual(0, len(adresses))
1123 def test_interface_addr_static_mapping(self):
1124 """ Static mapping with addresses from interface """
1125 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1126 self.snat_add_static_mapping('1.2.3.4',
1127 external_sw_if_index=self.pg7.sw_if_index)
1129 # static mappings with external interface
1130 static_mappings = self.vapi.snat_static_mapping_dump()
1131 self.assertEqual(1, len(static_mappings))
1132 self.assertEqual(self.pg7.sw_if_index,
1133 static_mappings[0].external_sw_if_index)
1135 # configure interface address and check static mappings
1136 self.pg7.config_ip4()
1137 static_mappings = self.vapi.snat_static_mapping_dump()
1138 self.assertEqual(1, len(static_mappings))
1139 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1140 self.pg7.local_ip4n)
1141 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1143 # remove interface address and check static mappings
1144 self.pg7.unconfig_ip4()
1145 static_mappings = self.vapi.snat_static_mapping_dump()
1146 self.assertEqual(0, len(static_mappings))
1148 def test_ipfix_nat44_sess(self):
1149 """ S-NAT IPFIX logging NAT44 session created/delted """
1150 self.ipfix_domain_id = 10
1151 self.ipfix_src_port = 20202
1152 colector_port = 30303
1153 bind_layers(UDP, IPFIX, dport=30303)
1154 self.snat_add_address(self.snat_addr)
1155 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1156 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1158 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1159 src_address=self.pg3.local_ip4n,
1161 template_interval=10,
1162 collector_port=colector_port)
1163 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1164 src_port=self.ipfix_src_port)
1166 pkts = self.create_stream_in(self.pg0, self.pg1)
1167 self.pg0.add_stream(pkts)
1168 self.pg_enable_capture(self.pg_interfaces)
1170 capture = self.pg1.get_capture(len(pkts))
1171 self.verify_capture_out(capture)
1172 self.snat_add_address(self.snat_addr, is_add=0)
1173 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1174 capture = self.pg3.get_capture(3)
1175 ipfix = IPFIXDecoder()
1176 # first load template
1178 self.assertTrue(p.haslayer(IPFIX))
1179 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1180 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1181 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1182 self.assertEqual(p[UDP].dport, colector_port)
1183 self.assertEqual(p[IPFIX].observationDomainID,
1184 self.ipfix_domain_id)
1185 if p.haslayer(Template):
1186 ipfix.add_template(p.getlayer(Template))
1187 # verify events in data set
1189 if p.haslayer(Data):
1190 data = ipfix.decode_data_set(p.getlayer(Set))
1191 self.verify_ipfix_nat44_ses(data)
1193 def test_ipfix_addr_exhausted(self):
1194 """ S-NAT IPFIX logging NAT addresses exhausted """
1195 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1196 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1198 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1199 src_address=self.pg3.local_ip4n,
1201 template_interval=10)
1202 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1203 src_port=self.ipfix_src_port)
1205 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1206 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1208 self.pg0.add_stream(p)
1209 self.pg_enable_capture(self.pg_interfaces)
1211 capture = self.pg1.get_capture(0)
1212 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1213 capture = self.pg3.get_capture(3)
1214 ipfix = IPFIXDecoder()
1215 # first load template
1217 self.assertTrue(p.haslayer(IPFIX))
1218 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1219 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1220 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1221 self.assertEqual(p[UDP].dport, 4739)
1222 self.assertEqual(p[IPFIX].observationDomainID,
1223 self.ipfix_domain_id)
1224 if p.haslayer(Template):
1225 ipfix.add_template(p.getlayer(Template))
1226 # verify events in data set
1228 if p.haslayer(Data):
1229 data = ipfix.decode_data_set(p.getlayer(Set))
1230 self.verify_ipfix_addr_exhausted(data)
1232 def test_pool_addr_fib(self):
1233 """ S-NAT add pool addresses to FIB """
1234 static_addr = '10.0.0.10'
1235 self.snat_add_address(self.snat_addr)
1236 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1237 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1239 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1242 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1243 ARP(op=ARP.who_has, pdst=self.snat_addr,
1244 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1245 self.pg1.add_stream(p)
1246 self.pg_enable_capture(self.pg_interfaces)
1248 capture = self.pg1.get_capture(1)
1249 self.assertTrue(capture[0].haslayer(ARP))
1250 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1253 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1254 ARP(op=ARP.who_has, pdst=static_addr,
1255 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1256 self.pg1.add_stream(p)
1257 self.pg_enable_capture(self.pg_interfaces)
1259 capture = self.pg1.get_capture(1)
1260 self.assertTrue(capture[0].haslayer(ARP))
1261 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1263 # send ARP to non-SNAT interface
1264 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1265 ARP(op=ARP.who_has, pdst=self.snat_addr,
1266 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1267 self.pg2.add_stream(p)
1268 self.pg_enable_capture(self.pg_interfaces)
1270 capture = self.pg1.get_capture(0)
1272 # remove addresses and verify
1273 self.snat_add_address(self.snat_addr, is_add=0)
1274 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1277 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1278 ARP(op=ARP.who_has, pdst=self.snat_addr,
1279 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1280 self.pg1.add_stream(p)
1281 self.pg_enable_capture(self.pg_interfaces)
1283 capture = self.pg1.get_capture(0)
1285 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1286 ARP(op=ARP.who_has, pdst=static_addr,
1287 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1288 self.pg1.add_stream(p)
1289 self.pg_enable_capture(self.pg_interfaces)
1291 capture = self.pg1.get_capture(0)
1293 def test_vrf_mode(self):
1294 """ S-NAT tenant VRF aware address pool mode """
1298 nat_ip1 = "10.0.0.10"
1299 nat_ip2 = "10.0.0.11"
1301 self.pg0.unconfig_ip4()
1302 self.pg1.unconfig_ip4()
1303 self.pg0.set_table_ip4(vrf_id1)
1304 self.pg1.set_table_ip4(vrf_id2)
1305 self.pg0.config_ip4()
1306 self.pg1.config_ip4()
1308 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1309 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1310 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1311 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1312 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1316 pkts = self.create_stream_in(self.pg0, self.pg2)
1317 self.pg0.add_stream(pkts)
1318 self.pg_enable_capture(self.pg_interfaces)
1320 capture = self.pg2.get_capture(len(pkts))
1321 self.verify_capture_out(capture, nat_ip1)
1324 pkts = self.create_stream_in(self.pg1, self.pg2)
1325 self.pg1.add_stream(pkts)
1326 self.pg_enable_capture(self.pg_interfaces)
1328 capture = self.pg2.get_capture(len(pkts))
1329 self.verify_capture_out(capture, nat_ip2)
1331 def test_vrf_feature_independent(self):
1332 """ S-NAT tenant VRF independent address pool mode """
1334 nat_ip1 = "10.0.0.10"
1335 nat_ip2 = "10.0.0.11"
1337 self.snat_add_address(nat_ip1)
1338 self.snat_add_address(nat_ip2)
1339 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1340 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1341 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1345 pkts = self.create_stream_in(self.pg0, self.pg2)
1346 self.pg0.add_stream(pkts)
1347 self.pg_enable_capture(self.pg_interfaces)
1349 capture = self.pg2.get_capture(len(pkts))
1350 self.verify_capture_out(capture, nat_ip1)
1353 pkts = self.create_stream_in(self.pg1, self.pg2)
1354 self.pg1.add_stream(pkts)
1355 self.pg_enable_capture(self.pg_interfaces)
1357 capture = self.pg2.get_capture(len(pkts))
1358 self.verify_capture_out(capture, nat_ip1)
1360 def test_dynamic_ipless_interfaces(self):
1361 """ SNAT interfaces without configured ip dynamic map """
1363 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1364 self.pg7.remote_mac,
1365 self.pg7.remote_ip4n,
1367 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1368 self.pg8.remote_mac,
1369 self.pg8.remote_ip4n,
1372 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1373 dst_address_length=32,
1374 next_hop_address=self.pg7.remote_ip4n,
1375 next_hop_sw_if_index=self.pg7.sw_if_index)
1376 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1377 dst_address_length=32,
1378 next_hop_address=self.pg8.remote_ip4n,
1379 next_hop_sw_if_index=self.pg8.sw_if_index)
1381 self.snat_add_address(self.snat_addr)
1382 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1383 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1387 pkts = self.create_stream_in(self.pg7, self.pg8)
1388 self.pg7.add_stream(pkts)
1389 self.pg_enable_capture(self.pg_interfaces)
1391 capture = self.pg8.get_capture(len(pkts))
1392 self.verify_capture_out(capture)
1395 pkts = self.create_stream_out(self.pg8, self.snat_addr)
1396 self.pg8.add_stream(pkts)
1397 self.pg_enable_capture(self.pg_interfaces)
1399 capture = self.pg7.get_capture(len(pkts))
1400 self.verify_capture_in(capture, self.pg7)
1402 def test_static_ipless_interfaces(self):
1403 """ SNAT 1:1 NAT interfaces without configured ip """
1405 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1406 self.pg7.remote_mac,
1407 self.pg7.remote_ip4n,
1409 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1410 self.pg8.remote_mac,
1411 self.pg8.remote_ip4n,
1414 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1415 dst_address_length=32,
1416 next_hop_address=self.pg7.remote_ip4n,
1417 next_hop_sw_if_index=self.pg7.sw_if_index)
1418 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1419 dst_address_length=32,
1420 next_hop_address=self.pg8.remote_ip4n,
1421 next_hop_sw_if_index=self.pg8.sw_if_index)
1423 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1424 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1425 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1429 pkts = self.create_stream_out(self.pg8)
1430 self.pg8.add_stream(pkts)
1431 self.pg_enable_capture(self.pg_interfaces)
1433 capture = self.pg7.get_capture(len(pkts))
1434 self.verify_capture_in(capture, self.pg7)
1437 pkts = self.create_stream_in(self.pg7, self.pg8)
1438 self.pg7.add_stream(pkts)
1439 self.pg_enable_capture(self.pg_interfaces)
1441 capture = self.pg8.get_capture(len(pkts))
1442 self.verify_capture_out(capture, self.snat_addr, True)
1444 def test_static_with_port_ipless_interfaces(self):
1445 """ SNAT 1:1 NAT with port interfaces without configured ip """
1447 self.tcp_port_out = 30606
1448 self.udp_port_out = 30607
1449 self.icmp_id_out = 30608
1451 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1452 self.pg7.remote_mac,
1453 self.pg7.remote_ip4n,
1455 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1456 self.pg8.remote_mac,
1457 self.pg8.remote_ip4n,
1460 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1461 dst_address_length=32,
1462 next_hop_address=self.pg7.remote_ip4n,
1463 next_hop_sw_if_index=self.pg7.sw_if_index)
1464 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1465 dst_address_length=32,
1466 next_hop_address=self.pg8.remote_ip4n,
1467 next_hop_sw_if_index=self.pg8.sw_if_index)
1469 self.snat_add_address(self.snat_addr)
1470 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1471 self.tcp_port_in, self.tcp_port_out,
1472 proto=IP_PROTOS.tcp)
1473 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1474 self.udp_port_in, self.udp_port_out,
1475 proto=IP_PROTOS.udp)
1476 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1477 self.icmp_id_in, self.icmp_id_out,
1478 proto=IP_PROTOS.icmp)
1479 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1480 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1484 pkts = self.create_stream_out(self.pg8)
1485 self.pg8.add_stream(pkts)
1486 self.pg_enable_capture(self.pg_interfaces)
1488 capture = self.pg7.get_capture(len(pkts))
1489 self.verify_capture_in(capture, self.pg7)
1492 pkts = self.create_stream_in(self.pg7, self.pg8)
1493 self.pg7.add_stream(pkts)
1494 self.pg_enable_capture(self.pg_interfaces)
1496 capture = self.pg8.get_capture(len(pkts))
1497 self.verify_capture_out(capture)
1500 super(TestSNAT, self).tearDown()
1501 if not self.vpp_dead:
1502 self.logger.info(self.vapi.cli("show snat verbose"))
1506 class TestDeterministicNAT(MethodHolder):
1507 """ Deterministic NAT Test Cases """
1510 def setUpConstants(cls):
1511 super(TestDeterministicNAT, cls).setUpConstants()
1512 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1515 def setUpClass(cls):
1516 super(TestDeterministicNAT, cls).setUpClass()
1519 cls.tcp_port_in = 6303
1520 cls.tcp_external_port = 6303
1521 cls.udp_port_in = 6304
1522 cls.udp_external_port = 6304
1523 cls.icmp_id_in = 6305
1524 cls.snat_addr = '10.0.0.3'
1526 cls.create_pg_interfaces(range(3))
1527 cls.interfaces = list(cls.pg_interfaces)
1529 for i in cls.interfaces:
1534 cls.pg0.generate_remote_hosts(2)
1535 cls.pg0.configure_ipv4_neighbors()
1538 super(TestDeterministicNAT, cls).tearDownClass()
1541 def create_stream_in(self, in_if, out_if, ttl=64):
1543 Create packet stream for inside network
1545 :param in_if: Inside interface
1546 :param out_if: Outside interface
1547 :param ttl: TTL of generated packets
1551 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1552 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1553 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1557 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1558 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1559 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1563 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1564 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1565 ICMP(id=self.icmp_id_in, type='echo-request'))
1570 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1572 Create packet stream for outside network
1574 :param out_if: Outside interface
1575 :param dst_ip: Destination IP address (Default use global SNAT address)
1576 :param ttl: TTL of generated packets
1579 dst_ip = self.snat_addr
1582 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1583 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1584 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1588 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1589 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1590 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1594 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1595 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1596 ICMP(id=self.icmp_external_id, type='echo-reply'))
1601 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1603 Verify captured packets on outside network
1605 :param capture: Captured packets
1606 :param nat_ip: Translated IP address (Default use global SNAT address)
1607 :param same_port: Sorce port number is not translated (Default False)
1608 :param packet_num: Expected number of packets (Default 3)
1611 nat_ip = self.snat_addr
1612 self.assertEqual(packet_num, len(capture))
1613 for packet in capture:
1615 self.assertEqual(packet[IP].src, nat_ip)
1616 if packet.haslayer(TCP):
1617 self.tcp_port_out = packet[TCP].sport
1618 elif packet.haslayer(UDP):
1619 self.udp_port_out = packet[UDP].sport
1621 self.icmp_external_id = packet[ICMP].id
1623 self.logger.error(ppp("Unexpected or invalid packet "
1624 "(outside network):", packet))
1627 def initiate_tcp_session(self, in_if, out_if):
1629 Initiates TCP session
1631 :param in_if: Inside interface
1632 :param out_if: Outside interface
1635 # SYN packet in->out
1636 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1637 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1638 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1641 self.pg_enable_capture(self.pg_interfaces)
1643 capture = out_if.get_capture(1)
1645 self.tcp_port_out = p[TCP].sport
1647 # SYN + ACK packet out->in
1648 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1649 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1650 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1652 out_if.add_stream(p)
1653 self.pg_enable_capture(self.pg_interfaces)
1655 in_if.get_capture(1)
1657 # ACK packet in->out
1658 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1659 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1660 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1663 self.pg_enable_capture(self.pg_interfaces)
1665 out_if.get_capture(1)
1668 self.logger.error("TCP 3 way handshake failed")
1671 def verify_ipfix_max_entries_per_user(self, data):
1673 Verify IPFIX maximum entries per user exceeded event
1675 :param data: Decoded IPFIX data records
1677 self.assertEqual(1, len(data))
1680 self.assertEqual(ord(record[230]), 13)
1681 # natQuotaExceededEvent
1682 self.assertEqual('\x03\x00\x00\x00', record[466])
1684 self.assertEqual(self.pg0.remote_ip4n, record[8])
1686 def test_deterministic_mode(self):
1687 """ S-NAT run deterministic mode """
1688 in_addr = '172.16.255.0'
1689 out_addr = '172.17.255.50'
1690 in_addr_t = '172.16.255.20'
1691 in_addr_n = socket.inet_aton(in_addr)
1692 out_addr_n = socket.inet_aton(out_addr)
1693 in_addr_t_n = socket.inet_aton(in_addr_t)
1697 snat_config = self.vapi.snat_show_config()
1698 self.assertEqual(1, snat_config.deterministic)
1700 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1702 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1703 self.assertEqual(rep1.out_addr[:4], out_addr_n)
1704 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1705 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1707 deterministic_mappings = self.vapi.snat_det_map_dump()
1708 self.assertEqual(len(deterministic_mappings), 1)
1709 dsm = deterministic_mappings[0]
1710 self.assertEqual(in_addr_n, dsm.in_addr[:4])
1711 self.assertEqual(in_plen, dsm.in_plen)
1712 self.assertEqual(out_addr_n, dsm.out_addr[:4])
1713 self.assertEqual(out_plen, dsm.out_plen)
1716 deterministic_mappings = self.vapi.snat_det_map_dump()
1717 self.assertEqual(len(deterministic_mappings), 0)
1719 def test_set_timeouts(self):
1720 """ Set deterministic NAT timeouts """
1721 timeouts_before = self.vapi.snat_det_get_timeouts()
1723 self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1724 timeouts_before.tcp_established + 10,
1725 timeouts_before.tcp_transitory + 10,
1726 timeouts_before.icmp + 10)
1728 timeouts_after = self.vapi.snat_det_get_timeouts()
1730 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1731 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1732 self.assertNotEqual(timeouts_before.tcp_established,
1733 timeouts_after.tcp_established)
1734 self.assertNotEqual(timeouts_before.tcp_transitory,
1735 timeouts_after.tcp_transitory)
1737 def test_det_in(self):
1738 """ CGNAT translation test (TCP, UDP, ICMP) """
1740 nat_ip = "10.0.0.10"
1742 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1744 socket.inet_aton(nat_ip),
1746 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1747 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1751 pkts = self.create_stream_in(self.pg0, self.pg1)
1752 self.pg0.add_stream(pkts)
1753 self.pg_enable_capture(self.pg_interfaces)
1755 capture = self.pg1.get_capture(len(pkts))
1756 self.verify_capture_out(capture, nat_ip)
1759 pkts = self.create_stream_out(self.pg1, nat_ip)
1760 self.pg1.add_stream(pkts)
1761 self.pg_enable_capture(self.pg_interfaces)
1763 capture = self.pg0.get_capture(len(pkts))
1764 self.verify_capture_in(capture, self.pg0)
1767 sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
1768 self.assertEqual(len(sessions), 3)
1772 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1773 self.assertEqual(s.in_port, self.tcp_port_in)
1774 self.assertEqual(s.out_port, self.tcp_port_out)
1775 self.assertEqual(s.ext_port, self.tcp_external_port)
1779 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1780 self.assertEqual(s.in_port, self.udp_port_in)
1781 self.assertEqual(s.out_port, self.udp_port_out)
1782 self.assertEqual(s.ext_port, self.udp_external_port)
1786 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1787 self.assertEqual(s.in_port, self.icmp_id_in)
1788 self.assertEqual(s.out_port, self.icmp_external_id)
1790 def test_multiple_users(self):
1791 """ CGNAT multiple users """
1793 nat_ip = "10.0.0.10"
1795 external_port = 6303
1797 host0 = self.pg0.remote_hosts[0]
1798 host1 = self.pg0.remote_hosts[1]
1800 self.vapi.snat_add_det_map(host0.ip4n,
1802 socket.inet_aton(nat_ip),
1804 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1805 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1809 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
1810 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
1811 TCP(sport=port_in, dport=external_port))
1812 self.pg0.add_stream(p)
1813 self.pg_enable_capture(self.pg_interfaces)
1815 capture = self.pg1.get_capture(1)
1820 self.assertEqual(ip.src, nat_ip)
1821 self.assertEqual(ip.dst, self.pg1.remote_ip4)
1822 self.assertEqual(tcp.dport, external_port)
1823 port_out0 = tcp.sport
1825 self.logger.error(ppp("Unexpected or invalid packet:", p))
1829 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
1830 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
1831 TCP(sport=port_in, dport=external_port))
1832 self.pg0.add_stream(p)
1833 self.pg_enable_capture(self.pg_interfaces)
1835 capture = self.pg1.get_capture(1)
1840 self.assertEqual(ip.src, nat_ip)
1841 self.assertEqual(ip.dst, self.pg1.remote_ip4)
1842 self.assertEqual(tcp.dport, external_port)
1843 port_out1 = tcp.sport
1845 self.logger.error(ppp("Unexpected or invalid packet:", p))
1848 dms = self.vapi.snat_det_map_dump()
1849 self.assertEqual(1, len(dms))
1850 self.assertEqual(2, dms[0].ses_num)
1853 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1854 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1855 TCP(sport=external_port, dport=port_out0))
1856 self.pg1.add_stream(p)
1857 self.pg_enable_capture(self.pg_interfaces)
1859 capture = self.pg0.get_capture(1)
1864 self.assertEqual(ip.src, self.pg1.remote_ip4)
1865 self.assertEqual(ip.dst, host0.ip4)
1866 self.assertEqual(tcp.dport, port_in)
1867 self.assertEqual(tcp.sport, external_port)
1869 self.logger.error(ppp("Unexpected or invalid packet:", p))
1873 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1874 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1875 TCP(sport=external_port, dport=port_out1))
1876 self.pg1.add_stream(p)
1877 self.pg_enable_capture(self.pg_interfaces)
1879 capture = self.pg0.get_capture(1)
1884 self.assertEqual(ip.src, self.pg1.remote_ip4)
1885 self.assertEqual(ip.dst, host1.ip4)
1886 self.assertEqual(tcp.dport, port_in)
1887 self.assertEqual(tcp.sport, external_port)
1889 self.logger.error(ppp("Unexpected or invalid packet", p))
1892 # session close api test
1893 self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
1895 self.pg1.remote_ip4n,
1897 dms = self.vapi.snat_det_map_dump()
1898 self.assertEqual(dms[0].ses_num, 1)
1900 self.vapi.snat_det_close_session_in(host0.ip4n,
1902 self.pg1.remote_ip4n,
1904 dms = self.vapi.snat_det_map_dump()
1905 self.assertEqual(dms[0].ses_num, 0)
1907 def test_tcp_session_close_detection_in(self):
1908 """ CGNAT TCP session close initiated from inside network """
1909 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1911 socket.inet_aton(self.snat_addr),
1913 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1914 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1917 self.initiate_tcp_session(self.pg0, self.pg1)
1919 # close the session from inside
1921 # FIN packet in -> out
1922 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1923 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1924 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1926 self.pg0.add_stream(p)
1927 self.pg_enable_capture(self.pg_interfaces)
1929 self.pg1.get_capture(1)
1933 # ACK packet out -> in
1934 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1935 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1936 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1940 # FIN packet out -> in
1941 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1942 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1943 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1947 self.pg1.add_stream(pkts)
1948 self.pg_enable_capture(self.pg_interfaces)
1950 self.pg0.get_capture(2)
1952 # ACK packet in -> out
1953 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1954 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1955 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1957 self.pg0.add_stream(p)
1958 self.pg_enable_capture(self.pg_interfaces)
1960 self.pg1.get_capture(1)
1962 # Check if snat closed the session
1963 dms = self.vapi.snat_det_map_dump()
1964 self.assertEqual(0, dms[0].ses_num)
1966 self.logger.error("TCP session termination failed")
1969 def test_tcp_session_close_detection_out(self):
1970 """ CGNAT TCP session close initiated from outside network """
1971 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1973 socket.inet_aton(self.snat_addr),
1975 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1976 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1979 self.initiate_tcp_session(self.pg0, self.pg1)
1981 # close the session from outside
1983 # FIN packet out -> in
1984 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1985 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1986 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1988 self.pg1.add_stream(p)
1989 self.pg_enable_capture(self.pg_interfaces)
1991 self.pg0.get_capture(1)
1995 # ACK packet in -> out
1996 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1997 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1998 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2002 # ACK packet in -> out
2003 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2004 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2005 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2009 self.pg0.add_stream(pkts)
2010 self.pg_enable_capture(self.pg_interfaces)
2012 self.pg1.get_capture(2)
2014 # ACK packet out -> in
2015 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2016 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2017 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2019 self.pg1.add_stream(p)
2020 self.pg_enable_capture(self.pg_interfaces)
2022 self.pg0.get_capture(1)
2024 # Check if snat closed the session
2025 dms = self.vapi.snat_det_map_dump()
2026 self.assertEqual(0, dms[0].ses_num)
2028 self.logger.error("TCP session termination failed")
2031 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2032 def test_session_timeout(self):
2033 """ CGNAT session timeouts """
2034 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2036 socket.inet_aton(self.snat_addr),
2038 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2039 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2042 self.initiate_tcp_session(self.pg0, self.pg1)
2043 self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2044 pkts = self.create_stream_in(self.pg0, self.pg1)
2045 self.pg0.add_stream(pkts)
2046 self.pg_enable_capture(self.pg_interfaces)
2048 capture = self.pg1.get_capture(len(pkts))
2051 dms = self.vapi.snat_det_map_dump()
2052 self.assertEqual(0, dms[0].ses_num)
2054 def test_session_limit_per_user(self):
2055 """ CGNAT maximum 1000 sessions per user should be created """
2056 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2058 socket.inet_aton(self.snat_addr),
2060 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2061 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2063 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2064 src_address=self.pg2.local_ip4n,
2066 template_interval=10)
2067 self.vapi.snat_ipfix()
2070 for port in range(1025, 2025):
2071 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2072 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2073 UDP(sport=port, dport=port))
2076 self.pg0.add_stream(pkts)
2077 self.pg_enable_capture(self.pg_interfaces)
2079 capture = self.pg1.get_capture(len(pkts))
2081 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2082 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2083 UDP(sport=3001, dport=3002))
2084 self.pg0.add_stream(p)
2085 self.pg_enable_capture(self.pg_interfaces)
2087 capture = self.pg1.assert_nothing_captured()
2089 # verify ICMP error packet
2090 capture = self.pg0.get_capture(1)
2092 self.assertTrue(p.haslayer(ICMP))
2094 self.assertEqual(icmp.type, 3)
2095 self.assertEqual(icmp.code, 1)
2096 self.assertTrue(icmp.haslayer(IPerror))
2097 inner_ip = icmp[IPerror]
2098 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2099 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2101 dms = self.vapi.snat_det_map_dump()
2103 self.assertEqual(1000, dms[0].ses_num)
2105 # verify IPFIX logging
2106 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2107 capture = self.pg2.get_capture(2)
2108 ipfix = IPFIXDecoder()
2109 # first load template
2111 self.assertTrue(p.haslayer(IPFIX))
2112 if p.haslayer(Template):
2113 ipfix.add_template(p.getlayer(Template))
2114 # verify events in data set
2116 if p.haslayer(Data):
2117 data = ipfix.decode_data_set(p.getlayer(Set))
2118 self.verify_ipfix_max_entries_per_user(data)
2120 def clear_snat(self):
2122 Clear SNAT configuration.
2124 self.vapi.snat_ipfix(enable=0)
2125 self.vapi.snat_det_set_timeouts()
2126 deterministic_mappings = self.vapi.snat_det_map_dump()
2127 for dsm in deterministic_mappings:
2128 self.vapi.snat_add_det_map(dsm.in_addr,
2134 interfaces = self.vapi.snat_interface_dump()
2135 for intf in interfaces:
2136 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2141 super(TestDeterministicNAT, self).tearDown()
2142 if not self.vpp_dead:
2143 self.logger.info(self.vapi.cli("show snat detail"))
2146 if __name__ == '__main__':
2147 unittest.main(testRunner=VppTestRunner)