6 from scapy.packet import Raw
7 from scapy.layers.l2 import Ether
8 from scapy.layers.ppp import PPPoE, PPPoED, PPP
9 from scapy.layers.inet import IP
11 from framework import VppTestCase, VppTestRunner
12 from vpp_ip_route import VppIpRoute, VppRoutePath
13 from vpp_pppoe_interface import VppPppoeInterface
14 from util import ppp, ppc
17 class TestPPPoE(VppTestCase):
22 super(TestPPPoE, cls).setUpClass()
25 cls.dst_ip = "100.1.1.100"
26 cls.dst_ipn = socket.inet_pton(socket.AF_INET, cls.dst_ip)
29 def tearDownClass(cls):
30 super(TestPPPoE, cls).tearDownClass()
33 super(TestPPPoE, self).setUp()
35 # create 2 pg interfaces
36 self.create_pg_interfaces(range(3))
38 for i in self.pg_interfaces:
44 super(TestPPPoE, self).tearDown()
46 for i in self.pg_interfaces:
50 def show_commands_at_teardown(self):
51 self.logger.info(self.vapi.cli("show int"))
52 self.logger.info(self.vapi.cli("show pppoe fib"))
53 self.logger.info(self.vapi.cli("show pppoe session"))
54 self.logger.info(self.vapi.cli("show ip fib"))
55 self.logger.info(self.vapi.cli("show trace"))
57 def create_stream_pppoe_discovery(self, src_if, dst_if, client_mac, count=1):
59 for i in range(count):
60 # create packet info stored in the test case instance
61 info = self.create_packet_info(src_if, dst_if)
62 # convert the info into packet payload
63 payload = self.info_to_payload(info)
64 # create the packet itself
66 Ether(dst=src_if.local_mac, src=client_mac)
70 # store a copy of the packet in the packet info
72 # append the packet to the list
75 # return the created packet list
78 def create_stream_pppoe_lcp(self, src_if, dst_if, client_mac, session_id, count=1):
80 for i in range(count):
81 # create packet info stored in the test case instance
82 info = self.create_packet_info(src_if, dst_if)
83 # convert the info into packet payload
84 payload = self.info_to_payload(info)
85 # create the packet itself
87 Ether(dst=src_if.local_mac, src=client_mac)
88 / PPPoE(sessionid=session_id)
92 # store a copy of the packet in the packet info
94 # append the packet to the list
97 # return the created packet list
100 def create_stream_pppoe_ip4(
101 self, src_if, dst_if, client_mac, session_id, client_ip, count=1
104 for i in range(count):
105 # create packet info stored in the test case instance
106 info = self.create_packet_info(src_if, dst_if)
107 # convert the info into packet payload
108 payload = self.info_to_payload(info)
109 # create the packet itself
111 Ether(dst=src_if.local_mac, src=client_mac)
112 / PPPoE(sessionid=session_id)
114 / IP(src=client_ip, dst=self.dst_ip)
117 # store a copy of the packet in the packet info
119 # append the packet to the list
122 # return the created packet list
125 def create_stream_ip4(self, src_if, dst_if, client_ip, dst_ip, count=1):
127 for i in range(count):
128 # create packet info stored in the test case instance
129 info = self.create_packet_info(src_if, dst_if)
130 # convert the info into packet payload
131 payload = self.info_to_payload(info)
132 # create the packet itself
134 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
135 / IP(src=dst_ip, dst=client_ip)
138 # store a copy of the packet in the packet info
140 # append the packet to the list
143 # return the created packet list
146 def verify_decapped_pppoe(self, src_if, capture, sent):
147 self.assertEqual(len(capture), len(sent))
149 for i in range(len(capture)):
157 self.assertEqual(rx_ip.src, tx_ip.src)
158 self.assertEqual(rx_ip.dst, tx_ip.dst)
161 self.logger.error(ppp("Rx:", rx))
162 self.logger.error(ppp("Tx:", tx))
165 def verify_encaped_pppoe(self, src_if, capture, sent, session_id):
166 self.assertEqual(len(capture), len(sent))
168 for i in range(len(capture)):
176 self.assertEqual(rx_ip.src, tx_ip.src)
177 self.assertEqual(rx_ip.dst, tx_ip.dst)
181 self.assertEqual(rx_pppoe.sessionid, session_id)
184 self.logger.error(ppp("Rx:", rx))
185 self.logger.error(ppp("Tx:", tx))
188 def test_PPPoE_Decap(self):
189 """PPPoE Decap Test"""
191 self.vapi.cli("clear trace")
194 # Add a route that resolves the server's destination
196 route_sever_dst = VppIpRoute(
200 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
202 route_sever_dst.add_vpp_config()
204 # Send PPPoE Discovery
205 tx0 = self.create_stream_pppoe_discovery(
206 self.pg0, self.pg1, self.pg0.remote_mac
208 self.pg0.add_stream(tx0)
212 tx1 = self.create_stream_pppoe_lcp(
213 self.pg0, self.pg1, self.pg0.remote_mac, self.session_id
215 self.pg0.add_stream(tx1)
218 # Create PPPoE session
219 pppoe_if = VppPppoeInterface(
220 self, self.pg0.remote_ip4, self.pg0.remote_mac, self.session_id
222 pppoe_if.add_vpp_config()
223 pppoe_if.set_unnumbered(self.pg0.sw_if_index)
226 # Send tunneled packets that match the created tunnel and
227 # are decapped and forwarded
229 tx2 = self.create_stream_pppoe_ip4(
236 self.pg0.add_stream(tx2)
238 self.pg_enable_capture(self.pg_interfaces)
241 rx2 = self.pg1.get_capture(len(tx2))
242 self.verify_decapped_pppoe(self.pg0, rx2, tx2)
244 self.logger.info(self.vapi.cli("show pppoe fib"))
245 self.logger.info(self.vapi.cli("show pppoe session"))
246 self.logger.info(self.vapi.cli("show ip fib"))
252 # Delete PPPoE session
253 pppoe_if.remove_vpp_config()
255 # Delete a route that resolves the server's destination
256 route_sever_dst.remove_vpp_config()
258 def test_PPPoE_Encap(self):
259 """PPPoE Encap Test"""
261 self.vapi.cli("clear trace")
264 # Add a route that resolves the server's destination
266 route_sever_dst = VppIpRoute(
270 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
272 route_sever_dst.add_vpp_config()
274 # Send PPPoE Discovery
275 tx0 = self.create_stream_pppoe_discovery(
276 self.pg0, self.pg1, self.pg0.remote_mac
278 self.pg0.add_stream(tx0)
282 tx1 = self.create_stream_pppoe_lcp(
283 self.pg0, self.pg1, self.pg0.remote_mac, self.session_id
285 self.pg0.add_stream(tx1)
288 # Create PPPoE session
289 pppoe_if = VppPppoeInterface(
290 self, self.pg0.remote_ip4, self.pg0.remote_mac, self.session_id
292 pppoe_if.add_vpp_config()
293 pppoe_if.set_unnumbered(self.pg0.sw_if_index)
296 # Send a packet stream that is routed into the session
297 # - packets are PPPoE encapped
299 self.vapi.cli("clear trace")
300 tx2 = self.create_stream_ip4(
301 self.pg1, self.pg0, self.pg0.remote_ip4, self.dst_ip, 65
303 self.pg1.add_stream(tx2)
305 self.pg_enable_capture(self.pg_interfaces)
308 rx2 = self.pg0.get_capture(len(tx2))
309 self.verify_encaped_pppoe(self.pg1, rx2, tx2, self.session_id)
311 self.logger.info(self.vapi.cli("show pppoe fib"))
312 self.logger.info(self.vapi.cli("show pppoe session"))
313 self.logger.info(self.vapi.cli("show ip fib"))
314 self.logger.info(self.vapi.cli("show adj"))
320 # Delete PPPoE session
321 pppoe_if.remove_vpp_config()
323 # Delete a route that resolves the server's destination
324 route_sever_dst.remove_vpp_config()
326 def test_PPPoE_Add_Twice(self):
327 """PPPoE Add Same Session Twice Test"""
329 self.vapi.cli("clear trace")
332 # Add a route that resolves the server's destination
334 route_sever_dst = VppIpRoute(
338 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
340 route_sever_dst.add_vpp_config()
342 # Send PPPoE Discovery
343 tx0 = self.create_stream_pppoe_discovery(
344 self.pg0, self.pg1, self.pg0.remote_mac
346 self.pg0.add_stream(tx0)
350 tx1 = self.create_stream_pppoe_lcp(
351 self.pg0, self.pg1, self.pg0.remote_mac, self.session_id
353 self.pg0.add_stream(tx1)
356 # Create PPPoE session
357 pppoe_if = VppPppoeInterface(
358 self, self.pg0.remote_ip4, self.pg0.remote_mac, self.session_id
360 pppoe_if.add_vpp_config()
361 pppoe_if.set_unnumbered(self.pg0.sw_if_index)
364 # The double create (create the same session twice) should fail,
365 # and we should still be able to use the original
368 pppoe_if.add_vpp_config()
372 self.fail("Double GRE tunnel add does not fail")
378 # Delete PPPoE session
379 pppoe_if.remove_vpp_config()
381 # Delete a route that resolves the server's destination
382 route_sever_dst.remove_vpp_config()
384 def test_PPPoE_Del_Twice(self):
385 """PPPoE Delete Same Session Twice Test"""
387 self.vapi.cli("clear trace")
390 # Add a route that resolves the server's destination
392 route_sever_dst = VppIpRoute(
396 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
398 route_sever_dst.add_vpp_config()
400 # Send PPPoE Discovery
401 tx0 = self.create_stream_pppoe_discovery(
402 self.pg0, self.pg1, self.pg0.remote_mac
404 self.pg0.add_stream(tx0)
408 tx1 = self.create_stream_pppoe_lcp(
409 self.pg0, self.pg1, self.pg0.remote_mac, self.session_id
411 self.pg0.add_stream(tx1)
414 # Create PPPoE session
415 pppoe_if = VppPppoeInterface(
416 self, self.pg0.remote_ip4, self.pg0.remote_mac, self.session_id
418 pppoe_if.add_vpp_config()
420 # Delete PPPoE session
421 pppoe_if.remove_vpp_config()
424 # The double del (del the same session twice) should fail,
425 # and we should still be able to use the original
428 pppoe_if.remove_vpp_config()
432 self.fail("Double GRE tunnel del does not fail")
438 # Delete a route that resolves the server's destination
439 route_sever_dst.remove_vpp_config()
441 def test_PPPoE_Decap_Multiple(self):
442 """PPPoE Decap Multiple Sessions Test"""
444 self.vapi.cli("clear trace")
447 # Add a route that resolves the server's destination
449 route_sever_dst = VppIpRoute(
453 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
455 route_sever_dst.add_vpp_config()
457 # Send PPPoE Discovery 1
458 tx0 = self.create_stream_pppoe_discovery(
459 self.pg0, self.pg1, self.pg0.remote_mac
461 self.pg0.add_stream(tx0)
464 # Send PPPoE PPP LCP 1
465 tx1 = self.create_stream_pppoe_lcp(
466 self.pg0, self.pg1, self.pg0.remote_mac, self.session_id
468 self.pg0.add_stream(tx1)
471 # Create PPPoE session 1
472 pppoe_if1 = VppPppoeInterface(
473 self, self.pg0.remote_ip4, self.pg0.remote_mac, self.session_id
475 pppoe_if1.add_vpp_config()
476 pppoe_if1.set_unnumbered(self.pg0.sw_if_index)
478 # Send PPPoE Discovery 2
479 tx3 = self.create_stream_pppoe_discovery(
480 self.pg2, self.pg1, self.pg2.remote_mac
482 self.pg2.add_stream(tx3)
485 # Send PPPoE PPP LCP 2
486 tx4 = self.create_stream_pppoe_lcp(
487 self.pg2, self.pg1, self.pg2.remote_mac, self.session_id + 1
489 self.pg2.add_stream(tx4)
492 # Create PPPoE session 2
493 pppoe_if2 = VppPppoeInterface(
494 self, self.pg2.remote_ip4, self.pg2.remote_mac, self.session_id + 1
496 pppoe_if2.add_vpp_config()
497 pppoe_if2.set_unnumbered(self.pg0.sw_if_index)
500 # Send tunneled packets that match the created tunnel and
501 # are decapped and forwarded
503 tx2 = self.create_stream_pppoe_ip4(
510 self.pg0.add_stream(tx2)
512 self.pg_enable_capture(self.pg_interfaces)
515 rx2 = self.pg1.get_capture(len(tx2))
516 self.verify_decapped_pppoe(self.pg0, rx2, tx2)
518 tx5 = self.create_stream_pppoe_ip4(
525 self.pg2.add_stream(tx5)
527 self.pg_enable_capture(self.pg_interfaces)
530 rx5 = self.pg1.get_capture(len(tx5))
531 self.verify_decapped_pppoe(self.pg2, rx5, tx5)
533 self.logger.info(self.vapi.cli("show pppoe fib"))
534 self.logger.info(self.vapi.cli("show pppoe session"))
535 self.logger.info(self.vapi.cli("show ip fib"))
541 # Delete PPPoE session
542 pppoe_if1.remove_vpp_config()
543 pppoe_if2.remove_vpp_config()
545 # Delete a route that resolves the server's destination
546 route_sever_dst.remove_vpp_config()
548 def test_PPPoE_Encap_Multiple(self):
549 """PPPoE Encap Multiple Sessions Test"""
551 self.vapi.cli("clear trace")
554 # Add a route that resolves the server's destination
556 route_sever_dst = VppIpRoute(
560 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
562 route_sever_dst.add_vpp_config()
564 # Send PPPoE Discovery 1
565 tx0 = self.create_stream_pppoe_discovery(
566 self.pg0, self.pg1, self.pg0.remote_mac
568 self.pg0.add_stream(tx0)
571 # Send PPPoE PPP LCP 1
572 tx1 = self.create_stream_pppoe_lcp(
573 self.pg0, self.pg1, self.pg0.remote_mac, self.session_id
575 self.pg0.add_stream(tx1)
578 # Create PPPoE session 1
579 pppoe_if1 = VppPppoeInterface(
580 self, self.pg0.remote_ip4, self.pg0.remote_mac, self.session_id
582 pppoe_if1.add_vpp_config()
584 # Send PPPoE Discovery 2
585 tx3 = self.create_stream_pppoe_discovery(
586 self.pg2, self.pg1, self.pg2.remote_mac
588 self.pg2.add_stream(tx3)
591 # Send PPPoE PPP LCP 2
592 tx4 = self.create_stream_pppoe_lcp(
593 self.pg2, self.pg1, self.pg2.remote_mac, self.session_id + 1
595 self.pg2.add_stream(tx4)
598 # Create PPPoE session 2
599 pppoe_if2 = VppPppoeInterface(
600 self, self.pg2.remote_ip4, self.pg2.remote_mac, self.session_id + 1
602 pppoe_if2.add_vpp_config()
605 # Send a packet stream that is routed into the session
606 # - packets are PPPoE encapped
608 self.vapi.cli("clear trace")
609 tx2 = self.create_stream_ip4(
610 self.pg1, self.pg0, self.pg0.remote_ip4, self.dst_ip
612 self.pg1.add_stream(tx2)
614 self.pg_enable_capture(self.pg_interfaces)
617 rx2 = self.pg0.get_capture(len(tx2))
618 self.verify_encaped_pppoe(self.pg1, rx2, tx2, self.session_id)
620 tx5 = self.create_stream_ip4(
621 self.pg1, self.pg2, self.pg2.remote_ip4, self.dst_ip
623 self.pg1.add_stream(tx5)
625 self.pg_enable_capture(self.pg_interfaces)
628 rx5 = self.pg2.get_capture(len(tx5))
629 self.verify_encaped_pppoe(self.pg1, rx5, tx5, self.session_id + 1)
631 self.logger.info(self.vapi.cli("show pppoe fib"))
632 self.logger.info(self.vapi.cli("show pppoe session"))
633 self.logger.info(self.vapi.cli("show ip fib"))
639 # Delete PPPoE session
640 pppoe_if1.remove_vpp_config()
641 pppoe_if2.remove_vpp_config()
643 # Delete a route that resolves the server's destination
644 route_sever_dst.remove_vpp_config()
647 if __name__ == "__main__":
648 unittest.main(testRunner=VppTestRunner)