8 from config import config
9 from framework import VppTestCase
10 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, UDPerror
13 from scapy.layers.l2 import Ether
17 class TestDET44(VppTestCase):
18 """Deterministic NAT Test Cases"""
22 super(TestDET44, cls).setUpClass()
23 cls.vapi.cli("set log class det44 level debug")
25 cls.tcp_port_in = 6303
26 cls.tcp_external_port = 6303
27 cls.udp_port_in = 6304
28 cls.udp_external_port = 6304
30 cls.nat_addr = "10.0.0.3"
32 cls.create_pg_interfaces(range(3))
33 cls.interfaces = list(cls.pg_interfaces)
35 for i in cls.interfaces:
40 cls.pg0.generate_remote_hosts(2)
41 cls.pg0.configure_ipv4_neighbors()
44 def tearDownClass(cls):
45 super(TestDET44, cls).tearDownClass()
48 super(TestDET44, self).setUp()
49 self.vapi.det44_plugin_enable_disable(enable=1)
52 super(TestDET44, self).tearDown()
54 self.vapi.det44_plugin_enable_disable(enable=0)
56 def show_commands_at_teardown(self):
57 self.logger.info(self.vapi.cli("show det44 interfaces"))
58 self.logger.info(self.vapi.cli("show det44 timeouts"))
59 self.logger.info(self.vapi.cli("show det44 mappings"))
60 self.logger.info(self.vapi.cli("show det44 sessions"))
62 def verify_capture_in(self, capture, in_if):
64 Verify captured packets on inside network
66 :param capture: Captured packets
67 :param in_if: Inside interface
70 for packet in capture:
72 self.assert_packet_checksums_valid(packet)
73 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
74 if packet.haslayer(TCP):
75 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
76 elif packet.haslayer(UDP):
77 self.assertEqual(packet[UDP].dport, self.udp_port_in)
79 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
83 ppp("Unexpected or invalid packet (inside network):", packet)
88 def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
90 Verify IPFIX maximum entries per user exceeded event
92 :param data: Decoded IPFIX data records
93 :param limit: Number of maximum entries per user
94 :param src_addr: IPv4 source address
96 self.assertEqual(1, len(data))
99 self.assertEqual(scapy.compat.orb(record[230]), 13)
100 # natQuotaExceededEvent
101 self.assertEqual(struct.pack("I", 3), record[466])
103 self.assertEqual(struct.pack("I", limit), record[473])
105 self.assertEqual(socket.inet_pton(socket.AF_INET, src_addr), record[8])
107 def initiate_tcp_session(self, in_if, out_if):
109 Initiates TCP session 3 WAY HAND SHAKE
111 :param in_if: Inside interface
112 :param out_if: Outside interface
117 Ether(src=in_if.remote_mac, dst=in_if.local_mac)
118 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
119 / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S")
122 self.pg_enable_capture(self.pg_interfaces)
124 capture = out_if.get_capture(1)
126 self.tcp_port_out = p[TCP].sport
128 # SYN + ACK packet out->in
130 Ether(src=out_if.remote_mac, dst=out_if.local_mac)
131 / IP(src=out_if.remote_ip4, dst=self.nat_addr)
132 / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA")
135 self.pg_enable_capture(self.pg_interfaces)
141 Ether(src=in_if.remote_mac, dst=in_if.local_mac)
142 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
143 / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
146 self.pg_enable_capture(self.pg_interfaces)
148 out_if.get_capture(1)
150 def create_stream_in(self, in_if, out_if, ttl=64):
152 Create packet stream for inside network
154 :param in_if: Inside interface
155 :param out_if: Outside interface
156 :param ttl: TTL of generated packets
161 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
162 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl)
163 / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
169 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
170 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl)
171 / UDP(sport=self.udp_port_in, dport=self.udp_external_port)
177 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
178 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl)
179 / ICMP(id=self.icmp_id_in, type="echo-request")
185 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
187 Create packet stream for outside network
189 :param out_if: Outside interface
190 :param dst_ip: Destination IP address (Default use global NAT address)
191 :param ttl: TTL of generated packets
194 dst_ip = self.nat_addr
198 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
199 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
200 / TCP(dport=self.tcp_port_out, sport=self.tcp_external_port)
206 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
207 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
208 / UDP(dport=self.udp_port_out, sport=self.udp_external_port)
214 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
215 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
216 / ICMP(id=self.icmp_external_id, type="echo-reply")
222 def verify_capture_out(self, capture, nat_ip=None):
224 Verify captured packets on outside network
226 :param capture: Captured packets
227 :param nat_ip: Translated IP address (Default use global NAT address)
228 :param same_port: Source port number is not translated (Default False)
231 nat_ip = self.nat_addr
232 for packet in capture:
234 self.assertEqual(packet[IP].src, nat_ip)
235 if packet.haslayer(TCP):
236 self.tcp_port_out = packet[TCP].sport
237 elif packet.haslayer(UDP):
238 self.udp_port_out = packet[UDP].sport
240 self.icmp_external_id = packet[ICMP].id
243 ppp("Unexpected or invalid packet (outside network):", packet)
247 def test_deterministic_mode(self):
248 """NAT plugin run deterministic mode"""
249 in_addr = "172.16.255.0"
250 out_addr = "172.17.255.50"
251 in_addr_t = "172.16.255.20"
255 self.vapi.det44_add_del_map(
263 rep1 = self.vapi.det44_forward(in_addr_t)
264 self.assertEqual(str(rep1.out_addr), out_addr)
265 rep2 = self.vapi.det44_reverse(rep1.out_port_hi, out_addr)
267 self.assertEqual(str(rep2.in_addr), in_addr_t)
269 deterministic_mappings = self.vapi.det44_map_dump()
270 self.assertEqual(len(deterministic_mappings), 1)
271 dsm = deterministic_mappings[0]
272 self.assertEqual(in_addr, str(dsm.in_addr))
273 self.assertEqual(in_plen, dsm.in_plen)
274 self.assertEqual(out_addr, str(dsm.out_addr))
275 self.assertEqual(out_plen, dsm.out_plen)
277 def test_set_timeouts(self):
278 """Set deterministic NAT timeouts"""
279 timeouts_before = self.vapi.det44_get_timeouts()
281 self.vapi.det44_set_timeouts(
282 udp=timeouts_before.udp + 10,
283 tcp_established=timeouts_before.tcp_established + 10,
284 tcp_transitory=timeouts_before.tcp_transitory + 10,
285 icmp=timeouts_before.icmp + 10,
288 timeouts_after = self.vapi.det44_get_timeouts()
290 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
291 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
293 timeouts_before.tcp_established, timeouts_after.tcp_established
296 timeouts_before.tcp_transitory, timeouts_after.tcp_transitory
300 """DET44 translation test (TCP, UDP, ICMP)"""
304 self.vapi.det44_add_del_map(
306 in_addr=self.pg0.remote_ip4,
308 out_addr=socket.inet_aton(nat_ip),
312 self.vapi.det44_interface_add_del_feature(
313 sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
315 self.vapi.det44_interface_add_del_feature(
316 sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
320 pkts = self.create_stream_in(self.pg0, self.pg1)
321 self.pg0.add_stream(pkts)
322 self.pg_enable_capture(self.pg_interfaces)
324 capture = self.pg1.get_capture(len(pkts))
325 self.verify_capture_out(capture, nat_ip)
328 pkts = self.create_stream_out(self.pg1, nat_ip)
329 self.pg1.add_stream(pkts)
330 self.pg_enable_capture(self.pg_interfaces)
332 capture = self.pg0.get_capture(len(pkts))
333 self.verify_capture_in(capture, self.pg0)
336 sessions = self.vapi.det44_session_dump(self.pg0.remote_ip4)
337 self.assertEqual(len(sessions), 3)
341 self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4)
342 self.assertEqual(s.in_port, self.tcp_port_in)
343 self.assertEqual(s.out_port, self.tcp_port_out)
344 self.assertEqual(s.ext_port, self.tcp_external_port)
348 self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4)
349 self.assertEqual(s.in_port, self.udp_port_in)
350 self.assertEqual(s.out_port, self.udp_port_out)
351 self.assertEqual(s.ext_port, self.udp_external_port)
355 self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4)
356 self.assertEqual(s.in_port, self.icmp_id_in)
357 self.assertEqual(s.out_port, self.icmp_external_id)
359 def test_multiple_users(self):
360 """Deterministic NAT multiple users"""
366 host0 = self.pg0.remote_hosts[0]
367 host1 = self.pg0.remote_hosts[1]
369 self.vapi.det44_add_del_map(
373 out_addr=socket.inet_aton(nat_ip),
376 self.vapi.det44_interface_add_del_feature(
377 sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
379 self.vapi.det44_interface_add_del_feature(
380 sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
385 Ether(src=host0.mac, dst=self.pg0.local_mac)
386 / IP(src=host0.ip4, dst=self.pg1.remote_ip4)
387 / TCP(sport=port_in, dport=external_port)
389 self.pg0.add_stream(p)
390 self.pg_enable_capture(self.pg_interfaces)
392 capture = self.pg1.get_capture(1)
397 self.assertEqual(ip.src, nat_ip)
398 self.assertEqual(ip.dst, self.pg1.remote_ip4)
399 self.assertEqual(tcp.dport, external_port)
400 port_out0 = tcp.sport
402 self.logger.error(ppp("Unexpected or invalid packet:", p))
407 Ether(src=host1.mac, dst=self.pg0.local_mac)
408 / IP(src=host1.ip4, dst=self.pg1.remote_ip4)
409 / TCP(sport=port_in, dport=external_port)
411 self.pg0.add_stream(p)
412 self.pg_enable_capture(self.pg_interfaces)
414 capture = self.pg1.get_capture(1)
419 self.assertEqual(ip.src, nat_ip)
420 self.assertEqual(ip.dst, self.pg1.remote_ip4)
421 self.assertEqual(tcp.dport, external_port)
422 port_out1 = tcp.sport
424 self.logger.error(ppp("Unexpected or invalid packet:", p))
427 dms = self.vapi.det44_map_dump()
428 self.assertEqual(1, len(dms))
429 self.assertEqual(2, dms[0].ses_num)
433 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
434 / IP(src=self.pg1.remote_ip4, dst=nat_ip)
435 / TCP(sport=external_port, dport=port_out0)
437 self.pg1.add_stream(p)
438 self.pg_enable_capture(self.pg_interfaces)
440 capture = self.pg0.get_capture(1)
445 self.assertEqual(ip.src, self.pg1.remote_ip4)
446 self.assertEqual(ip.dst, host0.ip4)
447 self.assertEqual(tcp.dport, port_in)
448 self.assertEqual(tcp.sport, external_port)
450 self.logger.error(ppp("Unexpected or invalid packet:", p))
455 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
456 / IP(src=self.pg1.remote_ip4, dst=nat_ip)
457 / TCP(sport=external_port, dport=port_out1)
459 self.pg1.add_stream(p)
460 self.pg_enable_capture(self.pg_interfaces)
462 capture = self.pg0.get_capture(1)
467 self.assertEqual(ip.src, self.pg1.remote_ip4)
468 self.assertEqual(ip.dst, host1.ip4)
469 self.assertEqual(tcp.dport, port_in)
470 self.assertEqual(tcp.sport, external_port)
472 self.logger.error(ppp("Unexpected or invalid packet", p))
475 # session close api test
476 self.vapi.det44_close_session_out(
477 socket.inet_aton(nat_ip), port_out1, self.pg1.remote_ip4, external_port
479 dms = self.vapi.det44_map_dump()
480 self.assertEqual(dms[0].ses_num, 1)
482 self.vapi.det44_close_session_in(
483 host0.ip4, port_in, self.pg1.remote_ip4, external_port
485 dms = self.vapi.det44_map_dump()
486 self.assertEqual(dms[0].ses_num, 0)
488 def test_tcp_session_close_detection_in(self):
489 """DET44 TCP session close from inside network"""
490 self.vapi.det44_add_del_map(
492 in_addr=self.pg0.remote_ip4,
494 out_addr=socket.inet_aton(self.nat_addr),
497 self.vapi.det44_interface_add_del_feature(
498 sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
500 self.vapi.det44_interface_add_del_feature(
501 sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
504 self.initiate_tcp_session(self.pg0, self.pg1)
506 # close the session from inside
508 # FIN packet in -> out
510 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
511 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
512 / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
514 self.pg0.add_stream(p)
515 self.pg_enable_capture(self.pg_interfaces)
517 self.pg1.get_capture(1)
521 # ACK packet out -> in
523 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
524 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
525 / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="A")
529 # FIN packet out -> in
531 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
532 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
533 / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
537 self.pg1.add_stream(pkts)
538 self.pg_enable_capture(self.pg_interfaces)
540 self.pg0.get_capture(2)
542 # ACK packet in -> out
544 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
545 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
546 / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
548 self.pg0.add_stream(p)
549 self.pg_enable_capture(self.pg_interfaces)
551 self.pg1.get_capture(1)
553 # Check if deterministic NAT44 closed the session
554 dms = self.vapi.det44_map_dump()
555 self.assertEqual(0, dms[0].ses_num)
557 self.logger.error("TCP session termination failed")
560 def test_tcp_session_close_detection_out(self):
561 """Deterministic NAT TCP session close from outside network"""
562 self.vapi.det44_add_del_map(
564 in_addr=self.pg0.remote_ip4,
566 out_addr=socket.inet_aton(self.nat_addr),
569 self.vapi.det44_interface_add_del_feature(
570 sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
572 self.vapi.det44_interface_add_del_feature(
573 sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
576 self.initiate_tcp_session(self.pg0, self.pg1)
578 # close the session from outside
580 # FIN packet out -> in
582 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
583 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
584 / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
586 self.pg1.add_stream(p)
587 self.pg_enable_capture(self.pg_interfaces)
589 self.pg0.get_capture(1)
593 # ACK packet in -> out
595 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
596 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
597 / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
601 # ACK packet in -> out
603 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
604 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
605 / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
609 self.pg0.add_stream(pkts)
610 self.pg_enable_capture(self.pg_interfaces)
612 self.pg1.get_capture(2)
614 # ACK packet out -> in
616 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
617 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
618 / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="A")
620 self.pg1.add_stream(p)
621 self.pg_enable_capture(self.pg_interfaces)
623 self.pg0.get_capture(1)
625 # Check if deterministic NAT44 closed the session
626 dms = self.vapi.det44_map_dump()
627 self.assertEqual(0, dms[0].ses_num)
629 self.logger.error("TCP session termination failed")
632 def test_session_timeout(self):
633 """Deterministic NAT session timeouts"""
634 self.vapi.det44_add_del_map(
636 in_addr=self.pg0.remote_ip4,
638 out_addr=socket.inet_aton(self.nat_addr),
641 self.vapi.det44_interface_add_del_feature(
642 sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
644 self.vapi.det44_interface_add_del_feature(
645 sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
648 self.initiate_tcp_session(self.pg0, self.pg1)
649 self.vapi.det44_set_timeouts(udp=5, tcp_established=5, tcp_transitory=5, icmp=5)
650 pkts = self.create_stream_in(self.pg0, self.pg1)
651 self.pg0.add_stream(pkts)
652 self.pg_enable_capture(self.pg_interfaces)
654 self.pg1.get_capture(len(pkts))
655 self.virtual_sleep(15)
657 dms = self.vapi.det44_map_dump()
658 self.assertEqual(0, dms[0].ses_num)
660 # TODO: ipfix needs to be separated from NAT base plugin
661 @unittest.skipUnless(config.extended, "part of extended tests")
662 def test_session_limit_per_user(self):
663 """Deterministic NAT maximum sessions per user limit"""
664 self.vapi.det44_add_del_map(
666 in_addr=self.pg0.remote_ip4,
668 out_addr=socket.inet_aton(self.nat_addr),
671 self.vapi.det44_interface_add_del_feature(
672 sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
674 self.vapi.det44_interface_add_del_feature(
675 sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
677 self.vapi.set_ipfix_exporter(
678 collector_address=self.pg2.remote_ip4,
679 src_address=self.pg2.local_ip4,
681 template_interval=10,
683 self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, enable=1)
686 for port in range(1025, 2025):
688 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
689 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
690 / UDP(sport=port, dport=port)
694 self.pg0.add_stream(pkts)
695 self.pg_enable_capture(self.pg_interfaces)
697 self.pg1.get_capture(len(pkts))
700 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
701 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
702 / UDP(sport=3001, dport=3002)
704 self.pg0.add_stream(p)
705 self.pg_enable_capture(self.pg_interfaces)
707 self.pg1.assert_nothing_captured()
709 # verify ICMP error packet
710 capture = self.pg0.get_capture(1)
712 self.assertTrue(p.haslayer(ICMP))
714 self.assertEqual(icmp.type, 3)
715 self.assertEqual(icmp.code, 1)
716 self.assertTrue(icmp.haslayer(IPerror))
717 inner_ip = icmp[IPerror]
718 self.assertEqual(inner_ip[UDPerror].sport, 3001)
719 self.assertEqual(inner_ip[UDPerror].dport, 3002)
721 dms = self.vapi.det44_map_dump()
723 self.assertEqual(1000, dms[0].ses_num)
725 # verify IPFIX logging
726 self.vapi.ipfix_flush()
727 capture = self.pg2.get_capture(2)
728 ipfix = IPFIXDecoder()
729 # first load template
731 self.assertTrue(p.haslayer(IPFIX))
732 if p.haslayer(Template):
733 ipfix.add_template(p.getlayer(Template))
734 # verify events in data set
737 data = ipfix.decode_data_set(p.getlayer(Set))
738 self.verify_ipfix_max_entries_per_user(data, 1000, self.pg0.remote_ip4)
739 self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, enable=0)