6 from scapy.packet import Raw
7 from scapy.layers.l2 import Ether
8 from scapy.layers.ppp import PPPoE, PPPoED, PPP
9 from scapy.layers.inet import IP
11 from framework import VppTestCase
12 from asfframework import VppTestRunner
13 from vpp_ip_route import VppIpRoute, VppRoutePath
14 from vpp_pppoe_interface import VppPppoeInterface
18 class TestPPPoE(VppTestCase):
23 super(TestPPPoE, cls).setUpClass()
26 cls.dst_ip = "100.1.1.100"
27 cls.dst_ipn = socket.inet_pton(socket.AF_INET, cls.dst_ip)
30 def tearDownClass(cls):
31 super(TestPPPoE, cls).tearDownClass()
34 super(TestPPPoE, self).setUp()
36 # create 2 pg interfaces
37 self.create_pg_interfaces(range(3))
39 for i in self.pg_interfaces:
45 super(TestPPPoE, self).tearDown()
47 for i in self.pg_interfaces:
51 def show_commands_at_teardown(self):
52 self.logger.info(self.vapi.cli("show int"))
53 self.logger.info(self.vapi.cli("show pppoe fib"))
54 self.logger.info(self.vapi.cli("show pppoe session"))
55 self.logger.info(self.vapi.cli("show ip fib"))
56 self.logger.info(self.vapi.cli("show trace"))
58 def create_stream_pppoe_discovery(self, src_if, dst_if, client_mac, count=1):
60 for i in range(count):
61 # create packet info stored in the test case instance
62 info = self.create_packet_info(src_if, dst_if)
63 # convert the info into packet payload
64 payload = self.info_to_payload(info)
65 # create the packet itself
67 Ether(dst=src_if.local_mac, src=client_mac)
71 # store a copy of the packet in the packet info
73 # append the packet to the list
76 # return the created packet list
79 def create_stream_pppoe_lcp(self, src_if, dst_if, client_mac, session_id, count=1):
81 for i in range(count):
82 # create packet info stored in the test case instance
83 info = self.create_packet_info(src_if, dst_if)
84 # convert the info into packet payload
85 payload = self.info_to_payload(info)
86 # create the packet itself
88 Ether(dst=src_if.local_mac, src=client_mac)
89 / PPPoE(sessionid=session_id)
93 # store a copy of the packet in the packet info
95 # append the packet to the list
98 # return the created packet list
101 def create_stream_pppoe_ip4(
102 self, src_if, dst_if, client_mac, session_id, client_ip, count=1
105 for i in range(count):
106 # create packet info stored in the test case instance
107 info = self.create_packet_info(src_if, dst_if)
108 # convert the info into packet payload
109 payload = self.info_to_payload(info)
110 # create the packet itself
112 Ether(dst=src_if.local_mac, src=client_mac)
113 / PPPoE(sessionid=session_id)
115 / IP(src=client_ip, dst=self.dst_ip)
118 # store a copy of the packet in the packet info
120 # append the packet to the list
123 # return the created packet list
126 def create_stream_ip4(self, src_if, dst_if, client_ip, dst_ip, count=1):
128 for i in range(count):
129 # create packet info stored in the test case instance
130 info = self.create_packet_info(src_if, dst_if)
131 # convert the info into packet payload
132 payload = self.info_to_payload(info)
133 # create the packet itself
135 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
136 / IP(src=dst_ip, dst=client_ip)
139 # store a copy of the packet in the packet info
141 # append the packet to the list
144 # return the created packet list
147 def verify_decapped_pppoe(self, src_if, capture, sent):
148 self.assertEqual(len(capture), len(sent))
150 for i in range(len(capture)):
158 self.assertEqual(rx_ip.src, tx_ip.src)
159 self.assertEqual(rx_ip.dst, tx_ip.dst)
162 self.logger.error(ppp("Rx:", rx))
163 self.logger.error(ppp("Tx:", tx))
166 def verify_encaped_pppoe(self, src_if, capture, sent, session_id):
167 self.assertEqual(len(capture), len(sent))
169 for i in range(len(capture)):
177 self.assertEqual(rx_ip.src, tx_ip.src)
178 self.assertEqual(rx_ip.dst, tx_ip.dst)
182 self.assertEqual(rx_pppoe.sessionid, session_id)
185 self.logger.error(ppp("Rx:", rx))
186 self.logger.error(ppp("Tx:", tx))
189 def test_PPPoE_Decap(self):
190 """PPPoE Decap Test"""
192 self.vapi.cli("clear trace")
195 # Add a route that resolves the server's destination
197 route_sever_dst = VppIpRoute(
201 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
203 route_sever_dst.add_vpp_config()
205 # Send PPPoE Discovery
206 tx0 = self.create_stream_pppoe_discovery(
207 self.pg0, self.pg1, self.pg0.remote_mac
209 self.pg0.add_stream(tx0)
213 tx1 = self.create_stream_pppoe_lcp(
214 self.pg0, self.pg1, self.pg0.remote_mac, self.session_id
216 self.pg0.add_stream(tx1)
219 # Create PPPoE session
220 pppoe_if = VppPppoeInterface(
221 self, self.pg0.remote_ip4, self.pg0.remote_mac, self.session_id
223 pppoe_if.add_vpp_config()
224 pppoe_if.set_unnumbered(self.pg0.sw_if_index)
227 # Send tunneled packets that match the created tunnel and
228 # are decapped and forwarded
230 tx2 = self.create_stream_pppoe_ip4(
237 self.pg0.add_stream(tx2)
239 self.pg_enable_capture(self.pg_interfaces)
242 rx2 = self.pg1.get_capture(len(tx2))
243 self.verify_decapped_pppoe(self.pg0, rx2, tx2)
245 self.logger.info(self.vapi.cli("show pppoe fib"))
246 self.logger.info(self.vapi.cli("show pppoe session"))
247 self.logger.info(self.vapi.cli("show ip fib"))
253 # Delete PPPoE session
254 pppoe_if.remove_vpp_config()
256 # Delete a route that resolves the server's destination
257 route_sever_dst.remove_vpp_config()
259 def test_PPPoE_Encap(self):
260 """PPPoE Encap Test"""
262 self.vapi.cli("clear trace")
265 # Add a route that resolves the server's destination
267 route_sever_dst = VppIpRoute(
271 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
273 route_sever_dst.add_vpp_config()
275 # Send PPPoE Discovery
276 tx0 = self.create_stream_pppoe_discovery(
277 self.pg0, self.pg1, self.pg0.remote_mac
279 self.pg0.add_stream(tx0)
283 tx1 = self.create_stream_pppoe_lcp(
284 self.pg0, self.pg1, self.pg0.remote_mac, self.session_id
286 self.pg0.add_stream(tx1)
289 # Create PPPoE session
290 pppoe_if = VppPppoeInterface(
291 self, self.pg0.remote_ip4, self.pg0.remote_mac, self.session_id
293 pppoe_if.add_vpp_config()
294 pppoe_if.set_unnumbered(self.pg0.sw_if_index)
297 # Send a packet stream that is routed into the session
298 # - packets are PPPoE encapped
300 self.vapi.cli("clear trace")
301 tx2 = self.create_stream_ip4(
302 self.pg1, self.pg0, self.pg0.remote_ip4, self.dst_ip, 65
304 self.pg1.add_stream(tx2)
306 self.pg_enable_capture(self.pg_interfaces)
309 rx2 = self.pg0.get_capture(len(tx2))
310 self.verify_encaped_pppoe(self.pg1, rx2, tx2, self.session_id)
312 self.logger.info(self.vapi.cli("show pppoe fib"))
313 self.logger.info(self.vapi.cli("show pppoe session"))
314 self.logger.info(self.vapi.cli("show ip fib"))
315 self.logger.info(self.vapi.cli("show adj"))
321 # Delete PPPoE session
322 pppoe_if.remove_vpp_config()
324 # Delete a route that resolves the server's destination
325 route_sever_dst.remove_vpp_config()
327 def test_PPPoE_Add_Twice(self):
328 """PPPoE Add Same Session Twice Test"""
330 self.vapi.cli("clear trace")
333 # Add a route that resolves the server's destination
335 route_sever_dst = VppIpRoute(
339 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
341 route_sever_dst.add_vpp_config()
343 # Send PPPoE Discovery
344 tx0 = self.create_stream_pppoe_discovery(
345 self.pg0, self.pg1, self.pg0.remote_mac
347 self.pg0.add_stream(tx0)
351 tx1 = self.create_stream_pppoe_lcp(
352 self.pg0, self.pg1, self.pg0.remote_mac, self.session_id
354 self.pg0.add_stream(tx1)
357 # Create PPPoE session
358 pppoe_if = VppPppoeInterface(
359 self, self.pg0.remote_ip4, self.pg0.remote_mac, self.session_id
361 pppoe_if.add_vpp_config()
362 pppoe_if.set_unnumbered(self.pg0.sw_if_index)
365 # The double create (create the same session twice) should fail,
366 # and we should still be able to use the original
369 pppoe_if.add_vpp_config()
373 self.fail("Double GRE tunnel add does not fail")
379 # Delete PPPoE session
380 pppoe_if.remove_vpp_config()
382 # Delete a route that resolves the server's destination
383 route_sever_dst.remove_vpp_config()
385 def test_PPPoE_Del_Twice(self):
386 """PPPoE Delete Same Session Twice Test"""
388 self.vapi.cli("clear trace")
391 # Add a route that resolves the server's destination
393 route_sever_dst = VppIpRoute(
397 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
399 route_sever_dst.add_vpp_config()
401 # Send PPPoE Discovery
402 tx0 = self.create_stream_pppoe_discovery(
403 self.pg0, self.pg1, self.pg0.remote_mac
405 self.pg0.add_stream(tx0)
409 tx1 = self.create_stream_pppoe_lcp(
410 self.pg0, self.pg1, self.pg0.remote_mac, self.session_id
412 self.pg0.add_stream(tx1)
415 # Create PPPoE session
416 pppoe_if = VppPppoeInterface(
417 self, self.pg0.remote_ip4, self.pg0.remote_mac, self.session_id
419 pppoe_if.add_vpp_config()
421 # Delete PPPoE session
422 pppoe_if.remove_vpp_config()
425 # The double del (del the same session twice) should fail,
426 # and we should still be able to use the original
429 pppoe_if.remove_vpp_config()
433 self.fail("Double GRE tunnel del does not fail")
439 # Delete a route that resolves the server's destination
440 route_sever_dst.remove_vpp_config()
442 def test_PPPoE_Decap_Multiple(self):
443 """PPPoE Decap Multiple Sessions Test"""
445 self.vapi.cli("clear trace")
448 # Add a route that resolves the server's destination
450 route_sever_dst = VppIpRoute(
454 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
456 route_sever_dst.add_vpp_config()
458 # Send PPPoE Discovery 1
459 tx0 = self.create_stream_pppoe_discovery(
460 self.pg0, self.pg1, self.pg0.remote_mac
462 self.pg0.add_stream(tx0)
465 # Send PPPoE PPP LCP 1
466 tx1 = self.create_stream_pppoe_lcp(
467 self.pg0, self.pg1, self.pg0.remote_mac, self.session_id
469 self.pg0.add_stream(tx1)
472 # Create PPPoE session 1
473 pppoe_if1 = VppPppoeInterface(
474 self, self.pg0.remote_ip4, self.pg0.remote_mac, self.session_id
476 pppoe_if1.add_vpp_config()
477 pppoe_if1.set_unnumbered(self.pg0.sw_if_index)
479 # Send PPPoE Discovery 2
480 tx3 = self.create_stream_pppoe_discovery(
481 self.pg2, self.pg1, self.pg2.remote_mac
483 self.pg2.add_stream(tx3)
486 # Send PPPoE PPP LCP 2
487 tx4 = self.create_stream_pppoe_lcp(
488 self.pg2, self.pg1, self.pg2.remote_mac, self.session_id + 1
490 self.pg2.add_stream(tx4)
493 # Create PPPoE session 2
494 pppoe_if2 = VppPppoeInterface(
495 self, self.pg2.remote_ip4, self.pg2.remote_mac, self.session_id + 1
497 pppoe_if2.add_vpp_config()
498 pppoe_if2.set_unnumbered(self.pg0.sw_if_index)
501 # Send tunneled packets that match the created tunnel and
502 # are decapped and forwarded
504 tx2 = self.create_stream_pppoe_ip4(
511 self.pg0.add_stream(tx2)
513 self.pg_enable_capture(self.pg_interfaces)
516 rx2 = self.pg1.get_capture(len(tx2))
517 self.verify_decapped_pppoe(self.pg0, rx2, tx2)
519 tx5 = self.create_stream_pppoe_ip4(
526 self.pg2.add_stream(tx5)
528 self.pg_enable_capture(self.pg_interfaces)
531 rx5 = self.pg1.get_capture(len(tx5))
532 self.verify_decapped_pppoe(self.pg2, rx5, tx5)
534 self.logger.info(self.vapi.cli("show pppoe fib"))
535 self.logger.info(self.vapi.cli("show pppoe session"))
536 self.logger.info(self.vapi.cli("show ip fib"))
542 # Delete PPPoE session
543 pppoe_if1.remove_vpp_config()
544 pppoe_if2.remove_vpp_config()
546 # Delete a route that resolves the server's destination
547 route_sever_dst.remove_vpp_config()
549 def test_PPPoE_Encap_Multiple(self):
550 """PPPoE Encap Multiple Sessions Test"""
552 self.vapi.cli("clear trace")
555 # Add a route that resolves the server's destination
557 route_sever_dst = VppIpRoute(
561 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
563 route_sever_dst.add_vpp_config()
565 # Send PPPoE Discovery 1
566 tx0 = self.create_stream_pppoe_discovery(
567 self.pg0, self.pg1, self.pg0.remote_mac
569 self.pg0.add_stream(tx0)
572 # Send PPPoE PPP LCP 1
573 tx1 = self.create_stream_pppoe_lcp(
574 self.pg0, self.pg1, self.pg0.remote_mac, self.session_id
576 self.pg0.add_stream(tx1)
579 # Create PPPoE session 1
580 pppoe_if1 = VppPppoeInterface(
581 self, self.pg0.remote_ip4, self.pg0.remote_mac, self.session_id
583 pppoe_if1.add_vpp_config()
585 # Send PPPoE Discovery 2
586 tx3 = self.create_stream_pppoe_discovery(
587 self.pg2, self.pg1, self.pg2.remote_mac
589 self.pg2.add_stream(tx3)
592 # Send PPPoE PPP LCP 2
593 tx4 = self.create_stream_pppoe_lcp(
594 self.pg2, self.pg1, self.pg2.remote_mac, self.session_id + 1
596 self.pg2.add_stream(tx4)
599 # Create PPPoE session 2
600 pppoe_if2 = VppPppoeInterface(
601 self, self.pg2.remote_ip4, self.pg2.remote_mac, self.session_id + 1
603 pppoe_if2.add_vpp_config()
606 # Send a packet stream that is routed into the session
607 # - packets are PPPoE encapped
609 self.vapi.cli("clear trace")
610 tx2 = self.create_stream_ip4(
611 self.pg1, self.pg0, self.pg0.remote_ip4, self.dst_ip
613 self.pg1.add_stream(tx2)
615 self.pg_enable_capture(self.pg_interfaces)
618 rx2 = self.pg0.get_capture(len(tx2))
619 self.verify_encaped_pppoe(self.pg1, rx2, tx2, self.session_id)
621 tx5 = self.create_stream_ip4(
622 self.pg1, self.pg2, self.pg2.remote_ip4, self.dst_ip
624 self.pg1.add_stream(tx5)
626 self.pg_enable_capture(self.pg_interfaces)
629 rx5 = self.pg2.get_capture(len(tx5))
630 self.verify_encaped_pppoe(self.pg1, rx5, tx5, self.session_id + 1)
632 self.logger.info(self.vapi.cli("show pppoe fib"))
633 self.logger.info(self.vapi.cli("show pppoe session"))
634 self.logger.info(self.vapi.cli("show ip fib"))
640 # Delete PPPoE session
641 pppoe_if1.remove_vpp_config()
642 pppoe_if2.remove_vpp_config()
644 # Delete a route that resolves the server's destination
645 route_sever_dst.remove_vpp_config()
648 if __name__ == "__main__":
649 unittest.main(testRunner=VppTestRunner)